diff options
| author | soryu <soryu@soryu.co> | 2026-01-27 11:03:45 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-27 11:03:58 +0000 |
| commit | b6f239c19f0d3130515f3745f842e17a69212295 (patch) | |
| tree | 25b10a4fa2b7f1b38086c0067ba2f102cfb27b43 | |
| parent | ee45906b07d1032afaf8a56cce48826bea0c3f8b (diff) | |
| download | soryu-b6f239c19f0d3130515f3745f842e17a69212295.tar.gz soryu-b6f239c19f0d3130515f3745f842e17a69212295.zip | |
Add patch merging and fix task healthcheck failing due to worktrees
| -rw-r--r-- | makima/src/daemon/task/manager.rs | 369 | ||||
| -rw-r--r-- | makima/src/daemon/ws/protocol.rs | 34 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 146 | ||||
| -rw-r--r-- | makima/src/server/state.rs | 17 |
4 files changed, 530 insertions, 36 deletions
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs index 075234f..f0da860 100644 --- a/makima/src/daemon/task/manager.rs +++ b/makima/src/daemon/task/manager.rs @@ -997,6 +997,10 @@ pub struct ManagedTask { pub autonomous_loop: bool, /// Whether the contract is in local-only mode (skips automatic completion actions). pub local_only: bool, + /// If set, merge this task's changes to the supervisor's worktree on completion (cross-daemon case). + pub merge_to_supervisor_task_id: Option<Uuid>, + /// If set, this task shares the worktree of the specified supervisor task. + pub supervisor_worktree_task_id: Option<Uuid>, /// Time task was created. pub created_at: Instant, /// Time task started running. @@ -1278,13 +1282,13 @@ impl TaskManager { pub async fn check_worktree_health(&self) -> Vec<Uuid> { let mut affected_task_ids = Vec::new(); - // Get all running tasks - let tasks_snapshot: Vec<(Uuid, Option<PathBuf>)> = { + // Get all running tasks with their worktree info and supervisor worktree task ID + let tasks_snapshot: Vec<(Uuid, Option<PathBuf>, Option<Uuid>)> = { let tasks = self.tasks.read().await; tasks .iter() .filter(|(_, t)| matches!(t.state, TaskState::Running | TaskState::Starting)) - .map(|(id, t)| (*id, t.worktree.as_ref().map(|w| w.path.clone()))) + .map(|(id, t)| (*id, t.worktree.as_ref().map(|w| w.path.clone()), t.supervisor_worktree_task_id)) .collect() }; @@ -1292,9 +1296,39 @@ impl TaskManager { return affected_task_ids; } - for (task_id, worktree_path) in tasks_snapshot { + for (task_id, worktree_path, supervisor_worktree_task_id) in tasks_snapshot { let worktree_exists = if let Some(ref path) = worktree_path { path.exists() && path.join(".git").exists() + } else if let Some(supervisor_task_id) = supervisor_worktree_task_id { + // Task uses shared supervisor worktree - check the supervisor's worktree + // First try to get from in-memory tasks + let supervisor_worktree_path: Option<PathBuf> = { + let tasks = self.tasks.read().await; + tasks.get(&supervisor_task_id) + .and_then(|t| t.worktree.as_ref().map(|w| w.path.clone())) + }; + if let Some(path) = supervisor_worktree_path { + path.exists() && path.join(".git").exists() + } else { + // Supervisor not in memory - scan worktrees directory + let short_id = &supervisor_task_id.to_string()[..8]; + let worktrees_dir = self.worktree_manager.base_dir(); + let mut found = false; + if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await { + while let Ok(Some(entry)) = entries.next_entry().await { + let name = entry.file_name(); + let name_str = name.to_string_lossy(); + if name_str.starts_with(short_id) { + let path = entry.path(); + if path.join(".git").exists() { + found = true; + break; + } + } + } + } + found + } } else { // No worktree set - scan by task ID let short_id = &task_id.to_string()[..8]; @@ -2038,6 +2072,20 @@ impl TaskManager { tracing::info!("Daemon restart: exiting process with code 42 (restart requested)"); std::process::exit(42); } + DaemonCommand::ApplyPatchToWorktree { + target_task_id, + source_task_id, + patch_data, + base_sha, + } => { + tracing::info!( + target_task_id = %target_task_id, + source_task_id = %source_task_id, + base_sha = %base_sha, + "Applying patch from cross-daemon task to worktree" + ); + self.handle_apply_patch_to_worktree(target_task_id, source_task_id, patch_data, base_sha).await?; + } } Ok(()) } @@ -2119,6 +2167,8 @@ impl TaskManager { concurrency_key, autonomous_loop, local_only, + merge_to_supervisor_task_id: None, // Set later if cross-daemon + supervisor_worktree_task_id, created_at: Instant::now(), started_at: None, completed_at: None, @@ -3662,6 +3712,210 @@ impl TaskManager { (total_added, total_removed, serde_json::json!(files)) } + /// Find worktree path for a task ID. + /// First checks in-memory tasks, then scans the worktrees directory. + async fn find_worktree_for_task_tm(&self, task_id: Uuid) -> Result<PathBuf, String> { + // First try to get from in-memory tasks + { + let tasks = self.tasks.read().await; + if let Some(task) = tasks.get(&task_id) { + if let Some(ref worktree) = task.worktree { + return Ok(worktree.path.clone()); + } + } + } + + // Task not in memory - scan worktrees directory for matching task ID + let short_id = &task_id.to_string()[..8]; + let worktrees_dir = self.worktree_manager.base_dir(); + + if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await { + while let Ok(Some(entry)) = entries.next_entry().await { + let name = entry.file_name(); + let name_str = name.to_string_lossy(); + if name_str.starts_with(short_id) { + let path = entry.path(); + // Verify it's a valid git directory + if path.join(".git").exists() { + tracing::info!( + task_id = %task_id, + worktree_path = %path.display(), + "Found worktree by scanning directory" + ); + return Ok(path); + } + } + } + } + + Err(format!( + "No worktree found for task {}. The worktree may have been cleaned up.", + task_id + )) + } + + /// Handle ApplyPatchToWorktree command - apply a patch from a cross-daemon task to a supervisor's worktree. + async fn handle_apply_patch_to_worktree( + &self, + target_task_id: Uuid, + source_task_id: Uuid, + patch_data: String, + base_sha: String, + ) -> Result<(), DaemonError> { + // Find the target task's worktree + let worktree_path = match self.find_worktree_for_task_tm(target_task_id).await { + Ok(path) => path, + Err(e) => { + tracing::error!( + target_task_id = %target_task_id, + error = %e, + "Failed to find worktree for patch application" + ); + let msg = DaemonMessage::task_output( + target_task_id, + format!("Failed to apply patch from task {}: worktree not found - {}\n", source_task_id, e), + true, + ); + let _ = self.ws_tx.send(msg).await; + return Ok(()); + } + }; + + tracing::info!( + target_task_id = %target_task_id, + source_task_id = %source_task_id, + worktree = %worktree_path.display(), + "Applying cross-daemon patch to worktree" + ); + + // Decode the base64-gzipped patch data + let patch_bytes = match base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &patch_data) { + Ok(bytes) => bytes, + Err(e) => { + tracing::error!(error = %e, "Failed to decode patch base64"); + let msg = DaemonMessage::task_output( + target_task_id, + format!("Failed to apply patch from task {}: base64 decode error - {}\n", source_task_id, e), + true, + ); + let _ = self.ws_tx.send(msg).await; + return Ok(()); + } + }; + + // Decompress the gzipped patch + let patch_content = { + use std::io::Read; + let mut decoder = flate2::read::GzDecoder::new(&patch_bytes[..]); + let mut content = String::new(); + match decoder.read_to_string(&mut content) { + Ok(_) => content, + Err(e) => { + tracing::error!(error = %e, "Failed to decompress patch"); + let msg = DaemonMessage::task_output( + target_task_id, + format!("Failed to apply patch from task {}: decompress error - {}\n", source_task_id, e), + true, + ); + let _ = self.ws_tx.send(msg).await; + return Ok(()); + } + } + }; + + // Check if patch is empty + if patch_content.trim().is_empty() { + tracing::info!( + target_task_id = %target_task_id, + source_task_id = %source_task_id, + "Cross-daemon task had no changes to merge" + ); + let msg = DaemonMessage::task_output( + target_task_id, + format!("Cross-daemon task {} completed with no changes to merge\n", source_task_id), + false, + ); + let _ = self.ws_tx.send(msg).await; + return Ok(()); + } + + // Apply the patch using git apply + let mut child = match tokio::process::Command::new("git") + .current_dir(&worktree_path) + .args(["apply", "--3way", "-"]) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + { + Ok(child) => child, + Err(e) => { + tracing::error!(error = %e, "Failed to spawn git apply"); + let msg = DaemonMessage::task_output( + target_task_id, + format!("Failed to apply patch from task {}: spawn error - {}\n", source_task_id, e), + true, + ); + let _ = self.ws_tx.send(msg).await; + return Ok(()); + } + }; + + // Write patch to stdin + if let Some(mut stdin) = child.stdin.take() { + use tokio::io::AsyncWriteExt; + if let Err(e) = stdin.write_all(patch_content.as_bytes()).await { + tracing::error!(error = %e, "Failed to write patch to git apply stdin"); + } + } + + // Wait for completion + let output = match child.wait_with_output().await { + Ok(output) => output, + Err(e) => { + tracing::error!(error = %e, "Failed to wait for git apply"); + let msg = DaemonMessage::task_output( + target_task_id, + format!("Failed to apply patch from task {}: wait error - {}\n", source_task_id, e), + true, + ); + let _ = self.ws_tx.send(msg).await; + return Ok(()); + } + }; + + if output.status.success() { + tracing::info!( + target_task_id = %target_task_id, + source_task_id = %source_task_id, + base_sha = %base_sha, + "Successfully applied cross-daemon patch" + ); + let msg = DaemonMessage::task_output( + target_task_id, + format!("Successfully merged changes from cross-daemon task {} (base: {})\n", source_task_id, &base_sha[..8]), + false, + ); + let _ = self.ws_tx.send(msg).await; + } else { + let stderr = String::from_utf8_lossy(&output.stderr); + tracing::error!( + target_task_id = %target_task_id, + source_task_id = %source_task_id, + stderr = %stderr, + "Failed to apply cross-daemon patch" + ); + let msg = DaemonMessage::task_output( + target_task_id, + format!("Failed to apply patch from task {}: {}\n", source_task_id, stderr), + true, + ); + let _ = self.ws_tx.send(msg).await; + } + + Ok(()) + } + /// Handle InheritGitConfig command - read git config from a directory and store it. async fn handle_inherit_git_config( &self, @@ -3948,6 +4202,9 @@ impl TaskManagerInner { // Determine working directory // First check if we should share a supervisor's worktree + // Track if we need to merge to supervisor on completion (cross-daemon case) + let mut merge_to_supervisor_on_completion: Option<Uuid> = None; + let shared_supervisor_worktree = if let Some(supervisor_task_id) = supervisor_worktree_task_id { match self.find_worktree_for_task(supervisor_task_id).await { Ok(path) => { @@ -3965,16 +4222,23 @@ impl TaskManagerInner { let _ = self.ws_tx.send(msg).await; Some(path) } - Err(e) => { - tracing::error!( + Err(_) => { + // Supervisor worktree not on this daemon (cross-daemon case) + // Will create own worktree and merge to supervisor on completion + tracing::info!( task_id = %task_id, supervisor_task_id = %supervisor_task_id, - error = %e, - "Supervisor worktree not found" + "Supervisor worktree not on this daemon, will create own and merge on completion" + ); + let msg = DaemonMessage::task_output( + task_id, + format!("Supervisor on different daemon, will merge changes on completion\n"), + false, ); - return Err(DaemonError::Task(TaskError::SetupFailed( - format!("Supervisor worktree not found for task {}: {}", supervisor_task_id, e) - ))); + let _ = self.ws_tx.send(msg).await; + // Mark for merge on completion + merge_to_supervisor_on_completion = Some(supervisor_task_id); + None } } } else { @@ -4207,6 +4471,19 @@ impl TaskManagerInner { temp_dir }; + // Store merge target if cross-daemon (will merge to supervisor on completion) + if let Some(supervisor_task_id) = merge_to_supervisor_on_completion { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.merge_to_supervisor_task_id = Some(supervisor_task_id); + tracing::info!( + task_id = %task_id, + supervisor_task_id = %supervisor_task_id, + "Task marked for merge to supervisor on completion" + ); + } + } + // Copy files from parent task's worktree if specified if let Some(ref files) = copy_files { if !files.is_empty() { @@ -5062,6 +5339,76 @@ impl TaskManagerInner { } } + // If this task needs to merge to supervisor (cross-daemon case), generate and send patch + let merge_to_supervisor = { + let tasks = self.tasks.read().await; + tasks.get(&task_id).and_then(|t| t.merge_to_supervisor_task_id) + }; + + if let Some(supervisor_task_id) = merge_to_supervisor { + if success { + tracing::info!( + task_id = %task_id, + supervisor_task_id = %supervisor_task_id, + "Task completed on cross-daemon, generating patch to merge to supervisor" + ); + + // Get base SHA from the worktree's initial commit or parent + match crate::daemon::storage::get_parent_sha(&working_dir).await { + Ok(base_sha) => { + // Generate patch + match crate::daemon::storage::create_patch(&working_dir, &base_sha).await { + Ok((patch_bytes, files_count)) => { + // Base64 encode the patch + let patch_data = base64::Engine::encode( + &base64::engine::general_purpose::STANDARD, + &patch_bytes, + ); + + tracing::info!( + task_id = %task_id, + supervisor_task_id = %supervisor_task_id, + files_count = files_count, + patch_size = patch_bytes.len(), + "Sending patch to supervisor" + ); + + // Send MergePatchToSupervisor message to server + let msg = DaemonMessage::MergePatchToSupervisor { + task_id, + supervisor_task_id, + patch_data, + base_sha, + }; + let _ = self.ws_tx.send(msg).await; + + let output_msg = DaemonMessage::task_output( + task_id, + format!("Sent {} file(s) to supervisor for merge\n", files_count), + false, + ); + let _ = self.ws_tx.send(output_msg).await; + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + error = %e, + "Failed to create patch for supervisor merge" + ); + } + } + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + error = %e, + "Failed to get base SHA for supervisor merge" + ); + } + } + } + } + // Notify server - but NOT for supervisors which should never complete if is_supervisor { tracing::info!( diff --git a/makima/src/daemon/ws/protocol.rs b/makima/src/daemon/ws/protocol.rs index 6e4f5cf..bd13975 100644 --- a/makima/src/daemon/ws/protocol.rs +++ b/makima/src/daemon/ws/protocol.rs @@ -380,6 +380,23 @@ pub enum DaemonMessage { /// Error message if failed error: Option<String>, }, + + /// Request to merge a task's patch to supervisor's worktree (cross-daemon case). + /// Sent when a task completes on a different daemon than its supervisor. + MergePatchToSupervisor { + /// The task that completed. + #[serde(rename = "taskId")] + task_id: Uuid, + /// The supervisor task to merge into. + #[serde(rename = "supervisorTaskId")] + supervisor_task_id: Uuid, + /// Base64-gzipped patch data. + #[serde(rename = "patchData")] + patch_data: String, + /// Base commit SHA for the patch. + #[serde(rename = "baseSha")] + base_sha: String, + }, } /// Information about a branch (used in BranchList message). @@ -737,6 +754,23 @@ pub enum DaemonCommand { /// Restart the daemon process. RestartDaemon, + + /// Apply a patch to a task's worktree (for cross-daemon merge). + /// Sent by server when routing MergePatchToSupervisor to the supervisor's daemon. + ApplyPatchToWorktree { + /// Target task whose worktree should be patched. + #[serde(rename = "targetTaskId")] + target_task_id: Uuid, + /// Source task that generated the patch (for logging). + #[serde(rename = "sourceTaskId")] + source_task_id: Uuid, + /// Base64-gzipped patch data. + #[serde(rename = "patchData")] + patch_data: String, + /// Base commit SHA for the patch. + #[serde(rename = "baseSha")] + base_sha: String, + }, } impl DaemonMessage { diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 433c787..1152502 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -520,6 +520,22 @@ pub enum DaemonMessage { /// Error message if failed error: Option<String>, }, + /// Request to merge a task's patch to supervisor's worktree (cross-daemon case). + /// Sent when a task completes on a different daemon than its supervisor. + MergePatchToSupervisor { + /// The task that completed. + #[serde(rename = "taskId")] + task_id: Uuid, + /// The supervisor task to merge into. + #[serde(rename = "supervisorTaskId")] + supervisor_task_id: Uuid, + /// Base64-gzipped patch data. + #[serde(rename = "patchData")] + patch_data: String, + /// Base commit SHA for the patch. + #[serde(rename = "baseSha")] + base_sha: String, + }, } /// Validated daemon authentication result. @@ -1780,15 +1796,15 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re pr_number, }); - // On successful PR creation, notify supervisor of next steps - if success { - if let Some(pool) = state.db_pool.as_ref() { - if let Ok(Some(task)) = repository::get_task(pool, task_id).await { - if let Some(contract_id) = task.contract_id { - // Get contract to determine next action - if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, task.owner_id).await { - if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(pool, contract_id).await { - let next_action = match (contract.contract_type.as_str(), contract.phase.as_str()) { + // Notify supervisor of PR result (both success and failure) + if let Some(pool) = state.db_pool.as_ref() { + if let Ok(Some(task)) = repository::get_task(pool, task_id).await { + if let Some(contract_id) = task.contract_id { + if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(pool, contract_id).await { + let prompt = if success { + // Get contract to determine next action + let next_action = if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, task.owner_id).await { + match (contract.contract_type.as_str(), contract.phase.as_str()) { ("simple", "execute") => { "Mark contract complete with `makima supervisor complete`".to_string() } @@ -1796,22 +1812,32 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re "Advance to review phase with `makima supervisor advance-phase review`".to_string() } _ => "Check contract status with `makima supervisor status`".to_string() - }; - - let prompt = format!( - "[ACTION REQUIRED] PR created successfully!\n\ - PR: {}\n\n\ - Next step: {}", - pr_url.as_deref().unwrap_or(&message), - next_action - ); - let _ = state.notify_supervisor( - supervisor.id, - supervisor.daemon_id, - &prompt, - ).await; - } - } + } + } else { + "Check contract status with `makima supervisor status`".to_string() + }; + + format!( + "[ACTION REQUIRED] PR created successfully!\n\ + PR: {}\n\n\ + Next step: {}", + pr_url.as_deref().unwrap_or(&message), + next_action + ) + } else { + format!( + "[ERROR] PR creation failed for task {}:\n\ + {}\n\n\ + Please fix the issue and retry with `makima supervisor pr`.", + task_id, + message + ) + }; + let _ = state.notify_supervisor( + supervisor.id, + supervisor.daemon_id, + &prompt, + ).await; } } } @@ -1947,6 +1973,76 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re let _ = tx.send(response); } } + Ok(DaemonMessage::MergePatchToSupervisor { + task_id, + supervisor_task_id, + patch_data, + base_sha, + }) => { + tracing::info!( + task_id = %task_id, + supervisor_task_id = %supervisor_task_id, + base_sha = %base_sha, + "Received cross-daemon merge patch request" + ); + + // Look up the supervisor task to find which daemon it's on + if let Some(ref pool) = state.db_pool { + match repository::get_task(pool, supervisor_task_id).await { + Ok(Some(supervisor_task)) => { + if let Some(target_daemon_id) = supervisor_task.daemon_id { + // Send ApplyPatchToWorktree command to the supervisor's daemon + let command = crate::server::state::DaemonCommand::ApplyPatchToWorktree { + target_task_id: supervisor_task_id, + source_task_id: task_id, + patch_data, + base_sha, + }; + match state.send_daemon_command(target_daemon_id, command).await { + Ok(()) => { + tracing::info!( + task_id = %task_id, + supervisor_task_id = %supervisor_task_id, + target_daemon_id = %target_daemon_id, + "Routed patch merge to supervisor's daemon" + ); + } + Err(e) => { + tracing::error!( + task_id = %task_id, + supervisor_task_id = %supervisor_task_id, + target_daemon_id = %target_daemon_id, + error = %e, + "Failed to route patch merge to supervisor's daemon" + ); + } + } + } else { + tracing::warn!( + task_id = %task_id, + supervisor_task_id = %supervisor_task_id, + "Supervisor task has no daemon assigned, cannot route patch merge" + ); + } + } + Ok(None) => { + tracing::warn!( + task_id = %task_id, + supervisor_task_id = %supervisor_task_id, + "Supervisor task not found, cannot route patch merge" + ); + } + Err(e) => { + tracing::error!( + task_id = %task_id, + supervisor_task_id = %supervisor_task_id, + error = %e, + "Failed to look up supervisor task for patch merge" + ); + } + } + } + } Err(e) => { tracing::warn!("Failed to parse daemon message: {}", e); } diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs index 6872d5e..1bc7d7e 100644 --- a/makima/src/server/state.rs +++ b/makima/src/server/state.rs @@ -506,6 +506,23 @@ pub enum DaemonCommand { /// Restart the daemon process RestartDaemon, + + /// Apply a patch to a task's worktree (for cross-daemon merge). + /// Sent by server when routing MergePatchToSupervisor to the supervisor's daemon. + ApplyPatchToWorktree { + /// Target task whose worktree should be patched. + #[serde(rename = "targetTaskId")] + target_task_id: Uuid, + /// Source task that generated the patch (for logging). + #[serde(rename = "sourceTaskId")] + source_task_id: Uuid, + /// Base64-gzipped patch data. + #[serde(rename = "patchData")] + patch_data: String, + /// Base commit SHA for the patch. + #[serde(rename = "baseSha")] + base_sha: String, + }, } /// Active daemon connection info stored in state. |
