From b6f239c19f0d3130515f3745f842e17a69212295 Mon Sep 17 00:00:00 2001 From: soryu Date: Tue, 27 Jan 2026 11:03:45 +0000 Subject: Add patch merging and fix task healthcheck failing due to worktrees --- makima/src/server/handlers/mesh_daemon.rs | 146 +++++++++++++++++++++++++----- 1 file changed, 121 insertions(+), 25 deletions(-) (limited to 'makima/src/server/handlers/mesh_daemon.rs') diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 433c787..1152502 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -520,6 +520,22 @@ pub enum DaemonMessage { /// Error message if failed error: Option, }, + /// Request to merge a task's patch to supervisor's worktree (cross-daemon case). + /// Sent when a task completes on a different daemon than its supervisor. + MergePatchToSupervisor { + /// The task that completed. + #[serde(rename = "taskId")] + task_id: Uuid, + /// The supervisor task to merge into. + #[serde(rename = "supervisorTaskId")] + supervisor_task_id: Uuid, + /// Base64-gzipped patch data. + #[serde(rename = "patchData")] + patch_data: String, + /// Base commit SHA for the patch. + #[serde(rename = "baseSha")] + base_sha: String, + }, } /// Validated daemon authentication result. @@ -1780,15 +1796,15 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re pr_number, }); - // On successful PR creation, notify supervisor of next steps - if success { - if let Some(pool) = state.db_pool.as_ref() { - if let Ok(Some(task)) = repository::get_task(pool, task_id).await { - if let Some(contract_id) = task.contract_id { - // Get contract to determine next action - if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, task.owner_id).await { - if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(pool, contract_id).await { - let next_action = match (contract.contract_type.as_str(), contract.phase.as_str()) { + // Notify supervisor of PR result (both success and failure) + if let Some(pool) = state.db_pool.as_ref() { + if let Ok(Some(task)) = repository::get_task(pool, task_id).await { + if let Some(contract_id) = task.contract_id { + if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(pool, contract_id).await { + let prompt = if success { + // Get contract to determine next action + let next_action = if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, task.owner_id).await { + match (contract.contract_type.as_str(), contract.phase.as_str()) { ("simple", "execute") => { "Mark contract complete with `makima supervisor complete`".to_string() } @@ -1796,22 +1812,32 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re "Advance to review phase with `makima supervisor advance-phase review`".to_string() } _ => "Check contract status with `makima supervisor status`".to_string() - }; - - let prompt = format!( - "[ACTION REQUIRED] PR created successfully!\n\ - PR: {}\n\n\ - Next step: {}", - pr_url.as_deref().unwrap_or(&message), - next_action - ); - let _ = state.notify_supervisor( - supervisor.id, - supervisor.daemon_id, - &prompt, - ).await; - } - } + } + } else { + "Check contract status with `makima supervisor status`".to_string() + }; + + format!( + "[ACTION REQUIRED] PR created successfully!\n\ + PR: {}\n\n\ + Next step: {}", + pr_url.as_deref().unwrap_or(&message), + next_action + ) + } else { + format!( + "[ERROR] PR creation failed for task {}:\n\ + {}\n\n\ + Please fix the issue and retry with `makima supervisor pr`.", + task_id, + message + ) + }; + let _ = state.notify_supervisor( + supervisor.id, + supervisor.daemon_id, + &prompt, + ).await; } } } @@ -1947,6 +1973,76 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re let _ = tx.send(response); } } + Ok(DaemonMessage::MergePatchToSupervisor { + task_id, + supervisor_task_id, + patch_data, + base_sha, + }) => { + tracing::info!( + task_id = %task_id, + supervisor_task_id = %supervisor_task_id, + base_sha = %base_sha, + "Received cross-daemon merge patch request" + ); + + // Look up the supervisor task to find which daemon it's on + if let Some(ref pool) = state.db_pool { + match repository::get_task(pool, supervisor_task_id).await { + Ok(Some(supervisor_task)) => { + if let Some(target_daemon_id) = supervisor_task.daemon_id { + // Send ApplyPatchToWorktree command to the supervisor's daemon + let command = crate::server::state::DaemonCommand::ApplyPatchToWorktree { + target_task_id: supervisor_task_id, + source_task_id: task_id, + patch_data, + base_sha, + }; + match state.send_daemon_command(target_daemon_id, command).await { + Ok(()) => { + tracing::info!( + task_id = %task_id, + supervisor_task_id = %supervisor_task_id, + target_daemon_id = %target_daemon_id, + "Routed patch merge to supervisor's daemon" + ); + } + Err(e) => { + tracing::error!( + task_id = %task_id, + supervisor_task_id = %supervisor_task_id, + target_daemon_id = %target_daemon_id, + error = %e, + "Failed to route patch merge to supervisor's daemon" + ); + } + } + } else { + tracing::warn!( + task_id = %task_id, + supervisor_task_id = %supervisor_task_id, + "Supervisor task has no daemon assigned, cannot route patch merge" + ); + } + } + Ok(None) => { + tracing::warn!( + task_id = %task_id, + supervisor_task_id = %supervisor_task_id, + "Supervisor task not found, cannot route patch merge" + ); + } + Err(e) => { + tracing::error!( + task_id = %task_id, + supervisor_task_id = %supervisor_task_id, + error = %e, + "Failed to look up supervisor task for patch merge" + ); + } + } + } + } Err(e) => { tracing::warn!("Failed to parse daemon message: {}", e); } -- cgit v1.2.3