diff options
Diffstat (limited to 'makima/src/server/handlers/mesh_daemon.rs')
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 419 |
1 files changed, 4 insertions, 415 deletions
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 19d2166..9900385 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -262,27 +262,6 @@ pub enum DaemonMessage { #[serde(rename = "activeTasks")] active_tasks: Vec<Uuid>, }, - /// Enhanced supervisor heartbeat with detailed state - SupervisorHeartbeat { - #[serde(rename = "taskId")] - task_id: Uuid, - #[serde(rename = "contractId")] - contract_id: Uuid, - /// Supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted - state: String, - /// Current contract phase - phase: String, - /// Description of current activity - #[serde(rename = "currentActivity")] - current_activity: Option<String>, - /// Progress percentage (0-100) - progress: u8, - /// Task IDs the supervisor is waiting on - #[serde(rename = "pendingTaskIds")] - pending_task_ids: Vec<Uuid>, - /// Timestamp of this heartbeat - timestamp: DateTime<Utc>, - }, /// Task output streaming (stdout/stderr from Claude Code) TaskOutput { #[serde(rename = "taskId")] @@ -618,96 +597,6 @@ struct DaemonAuthResult { /// Automatically create a PR when all non-supervisor tasks for a contract are done. /// Only applies to remote-repo contracts in the "execute" phase. /// Fires as a best-effort operation — errors are logged but not propagated. -async fn auto_create_pr_if_ready( - pool: &sqlx::PgPool, - state: &SharedState, - contract_id: Uuid, - owner_id: Uuid, -) { - // 1. Load contract — must be remote (not local_only) and in execute phase - let contract = match repository::get_contract_for_owner(pool, contract_id, owner_id).await { - Ok(Some(c)) => c, - _ => return, - }; - if contract.local_only || contract.phase != "execute" { - return; - } - - // 2. Load non-supervisor tasks — all must be done - let tasks = match repository::list_tasks_by_contract(pool, contract_id, owner_id).await { - Ok(t) => t, - _ => return, - }; - let non_supervisor_tasks: Vec<_> = tasks.iter().filter(|t| !t.is_supervisor).collect(); - if non_supervisor_tasks.is_empty() || !non_supervisor_tasks.iter().all(|t| t.status == "done") { - return; - } - - // 3. Check pull-request deliverable not already complete - let completed_deliverables = contract.get_completed_deliverables(&contract.phase); - if completed_deliverables.contains(&"pull-request".to_string()) { - return; - } - - // 4. Check at least one repository has a remote URL - let repos = match repository::list_contract_repositories(pool, contract_id).await { - Ok(r) => r, - _ => return, - }; - if !repos.iter().any(|r| r.repository_url.is_some()) { - return; - } - - // 5. Load supervisor task - let supervisor = match repository::get_contract_supervisor_task(pool, contract_id).await { - Ok(Some(s)) => s, - _ => return, - }; - - // Need supervisor's daemon_id to send command - let daemon_id = match supervisor.daemon_id { - Some(id) => id, - None => return, - }; - - // 6. Construct branch name - let sanitized_name: String = supervisor - .name - .chars() - .map(|c| if c.is_alphanumeric() || c == '-' || c == '_' { c } else { '-' }) - .collect::<String>() - .to_lowercase(); - let short_id = &supervisor.id.to_string()[..8]; - let branch = format!("makima/{}-{}", sanitized_name, short_id); - - // 7. Send CreatePR command to supervisor's daemon - let command = DaemonCommand::CreatePR { - task_id: supervisor.id, - title: contract.name.clone(), - body: contract.description.clone(), - base_branch: supervisor.base_branch.clone(), - branch, - }; - - match state.send_daemon_command(daemon_id, command).await { - Ok(()) => { - tracing::info!( - contract_id = %contract_id, - supervisor_id = %supervisor.id, - "Auto-PR: sent CreatePR command to supervisor daemon" - ); - } - Err(e) => { - tracing::warn!( - contract_id = %contract_id, - error = %e, - "Auto-PR: failed to send CreatePR command" - ); - } - } -} - -/// Validate an API key and return (user_id, owner_id). async fn validate_daemon_api_key(pool: &sqlx::PgPool, key: &str) -> Result<DaemonAuthResult, String> { let key_hash = hash_api_key(key); @@ -983,83 +872,6 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re }); } } - Ok(DaemonMessage::SupervisorHeartbeat { - task_id, - contract_id, - state: supervisor_state, - phase, - current_activity, - progress, - pending_task_ids, - timestamp: _, - }) => { - tracing::debug!( - task_id = %task_id, - contract_id = %contract_id, - state = %supervisor_state, - phase = %phase, - progress = progress, - "Supervisor heartbeat received" - ); - - // Store heartbeat in database and update supervisor state (Task 3.3) - if let Some(ref pool) = state.db_pool { - let pool = pool.clone(); - let pending_ids = pending_task_ids.clone(); - let activity = current_activity.clone(); - let state_str = supervisor_state.clone(); - let phase_str = phase.clone(); - tokio::spawn(async move { - // Store the heartbeat record - if let Err(e) = repository::create_supervisor_heartbeat( - &pool, - task_id, - contract_id, - &state_str, - &phase_str, - activity.as_deref(), - progress as i32, - &pending_ids, - ).await { - tracing::warn!( - task_id = %task_id, - contract_id = %contract_id, - error = %e, - "Failed to store supervisor heartbeat" - ); - } - - // Update supervisor_states table (lightweight heartbeat state update - Task 3.3) - if let Err(e) = repository::update_supervisor_heartbeat_state( - &pool, - contract_id, - &state_str, - activity.as_deref(), - progress as i32, - &pending_ids, - ).await { - tracing::debug!( - contract_id = %contract_id, - error = %e, - "Failed to update supervisor state from heartbeat (may not exist yet)" - ); - } - - // Also update the daemon heartbeat - if let Ok(Some(task)) = repository::get_task(&pool, task_id).await { - if let Some(daemon_id) = task.daemon_id { - if let Err(e) = repository::update_daemon_heartbeat(&pool, daemon_id).await { - tracing::warn!( - daemon_id = %daemon_id, - error = %e, - "Failed to update daemon heartbeat from supervisor" - ); - } - } - } - }); - } - } Ok(DaemonMessage::TaskOutput { task_id, output, is_partial }) => { // Parse the output line and broadcast structured data if let Some(notification) = parse_claude_output(task_id, owner_id, &output, is_partial) { @@ -1120,136 +932,16 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re updated_by: "daemon".into(), }); - // Initialize or restore supervisor_state when supervisor task starts running (Task 3.4) - if updated_task.is_supervisor && new_status_owned == "running" { - if let Some(contract_id) = updated_task.contract_id { - // Check if supervisor state already exists (restoration scenario) - match repository::get_supervisor_state(&pool, contract_id).await { - Ok(Some(existing_state)) => { - // State exists - this is a restoration - tracing::info!( - task_id = %task_id, - contract_id = %contract_id, - existing_state = %existing_state.state, - restoration_count = existing_state.restoration_count, - "Supervisor starting with existing state - restoration in progress" - ); - - // Mark as restored (increments restoration_count) - match repository::mark_supervisor_restored( - &pool, - contract_id, - "daemon_restart", - ).await { - Ok(restored_state) => { - tracing::info!( - task_id = %task_id, - contract_id = %contract_id, - restoration_count = restored_state.restoration_count, - "Supervisor restoration marked" - ); - - // Check for pending questions to re-deliver - if let Ok(questions) = serde_json::from_value::<Vec<crate::db::models::PendingQuestion>>( - restored_state.pending_questions.clone() - ) { - if !questions.is_empty() { - tracing::info!( - contract_id = %contract_id, - question_count = questions.len(), - "Pending questions found for re-delivery" - ); - // Questions will be re-delivered by the supervisor when it restores - } - } - } - Err(e) => { - tracing::warn!( - task_id = %task_id, - contract_id = %contract_id, - error = %e, - "Failed to mark supervisor as restored" - ); - } - } - } - Ok(None) => { - // No existing state - fresh start - // Get contract to get its phase - match repository::get_contract_for_owner( - &pool, - contract_id, - updated_task.owner_id, - ).await { - Ok(Some(contract)) => { - match repository::upsert_supervisor_state( - &pool, - contract_id, - task_id, - serde_json::json!([]), // Empty conversation - &[], // No pending tasks - &contract.phase, - ).await { - Ok(_) => { - tracing::info!( - task_id = %task_id, - contract_id = %contract_id, - phase = %contract.phase, - "Initialized fresh supervisor state" - ); - } - Err(e) => { - tracing::warn!( - task_id = %task_id, - contract_id = %contract_id, - error = %e, - "Failed to initialize supervisor state" - ); - } - } - } - Ok(None) => { - tracing::warn!( - task_id = %task_id, - contract_id = %contract_id, - "Contract not found when initializing supervisor state" - ); - } - Err(e) => { - tracing::warn!( - task_id = %task_id, - contract_id = %contract_id, - error = %e, - "Failed to get contract for supervisor state" - ); - } - } - } - Err(e) => { - tracing::warn!( - task_id = %task_id, - contract_id = %contract_id, - error = %e, - "Failed to check existing supervisor state" - ); - } - } - } - } - // Record history event when task starts running if new_status_owned == "running" { let _ = repository::record_history_event( &pool, updated_task.owner_id, - updated_task.contract_id, Some(task_id), "task", Some("started"), - None, serde_json::json!({ "name": &updated_task.name, - "isSupervisor": updated_task.is_supervisor, }), ).await; } @@ -1329,51 +1021,19 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re updated_by: "daemon".into(), }); - // Notify supervisor if this task belongs to a contract - if let Some(contract_id) = updated_task.contract_id { - // Don't notify for supervisor tasks (they don't report to themselves) - if !updated_task.is_supervisor { - if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await { - // action_directive used to come from - // compute_action_directive (now removed alongside the - // LLM module). Passing None preserves the existing - // supervisor protocol; the auto-PR path below still - // fires when every task is done. - state.notify_supervisor_of_task_completion( - supervisor.id, - supervisor.daemon_id, - updated_task.id, - &updated_task.name, - &updated_task.status, - updated_task.progress_summary.as_deref(), - updated_task.error_message.as_deref(), - None, - ).await; - } - } - } - - // Auto-create PR if all tasks are done and repo is remote - if updated_task.status == "done" { - if let Some(contract_id) = updated_task.contract_id { - let pool_c = pool.clone(); - let state_c = state.clone(); - tokio::spawn(async move { - auto_create_pr_if_ready(&pool_c, &state_c, contract_id, owner_id).await; - }); - } - } + // Supervisor notification + auto-PR removed alongside + // legacy contracts. Directive completion is handled + // by the directive reconciler. + let _ = owner_id; // Record history event for task completion let subtype = if updated_task.status == "done" { "completed" } else { "failed" }; let _ = repository::record_history_event( &pool, updated_task.owner_id, - updated_task.contract_id, Some(task_id), "task", Some(subtype), - None, serde_json::json!({ "name": &updated_task.name, "status": &updated_task.status, @@ -1962,16 +1622,13 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re }); // Record history event for checkpoint - // Get task to get contract_id if let Ok(Some(task)) = repository::get_task(pool, task_id).await { let _ = repository::record_history_event( pool, task.owner_id, - task.contract_id, Some(task_id), "checkpoint", Some("created"), - None, serde_json::json!({ "checkpointNumber": checkpoint.checkpoint_number, "commitSha": &sha, @@ -2103,28 +1760,6 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re conflicts: conflicts.clone(), }); - // On successful merge, notify supervisor to check if all merges complete - if success { - if let Some(pool) = state.db_pool.as_ref() { - if let Ok(Some(task)) = repository::get_task(pool, task_id).await { - if let Some(contract_id) = task.contract_id { - if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(pool, contract_id).await { - let prompt = format!( - "[INFO] Merge completed: {}\n\ - Check if all tasks are merged with `makima supervisor tasks`.\n\ - If ready, create PR with `makima supervisor pr`.", - message - ); - let _ = state.notify_supervisor( - supervisor.id, - supervisor.daemon_id, - &prompt, - ).await; - } - } - } - } - } } Ok(DaemonMessage::PRCreated { task_id, @@ -2150,52 +1785,6 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re pr_number, }); - // Notify supervisor of PR result (both success and failure) - if let Some(pool) = state.db_pool.as_ref() { - if let Ok(Some(task)) = repository::get_task(pool, task_id).await { - if let Some(contract_id) = task.contract_id { - if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(pool, contract_id).await { - let prompt = if success { - // Get contract to determine next action - let next_action = if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, task.owner_id).await { - match (contract.contract_type.as_str(), contract.phase.as_str()) { - ("simple", "execute") => { - "Mark contract complete with `makima supervisor complete`".to_string() - } - ("specification", "execute") => { - "Advance to review phase with `makima supervisor advance-phase review`".to_string() - } - _ => "Check contract status with `makima supervisor status`".to_string() - } - } else { - "Check contract status with `makima supervisor status`".to_string() - }; - - format!( - "[ACTION REQUIRED] PR created successfully!\n\ - PR: {}\n\n\ - Next step: {}", - pr_url.as_deref().unwrap_or(&message), - next_action - ) - } else { - format!( - "[ERROR] PR creation failed for task {}:\n\ - {}\n\n\ - Please fix the issue and retry with `makima supervisor pr`.", - task_id, - message - ) - }; - let _ = state.notify_supervisor( - supervisor.id, - supervisor.daemon_id, - &prompt, - ).await; - } - } - } - } } Ok(DaemonMessage::GitConfigInherited { success, |
