diff options
Diffstat (limited to 'makima/src/daemon')
| -rw-r--r-- | makima/src/daemon/api/supervisor.rs | 4 | ||||
| -rw-r--r-- | makima/src/daemon/cli/supervisor.rs | 4 | ||||
| -rw-r--r-- | makima/src/daemon/task/manager.rs | 208 | ||||
| -rw-r--r-- | makima/src/daemon/ws/protocol.rs | 37 |
4 files changed, 251 insertions, 2 deletions
diff --git a/makima/src/daemon/api/supervisor.rs b/makima/src/daemon/api/supervisor.rs index e79a9bb..6b99de0 100644 --- a/makima/src/daemon/api/supervisor.rs +++ b/makima/src/daemon/api/supervisor.rs @@ -17,6 +17,10 @@ pub struct SpawnTaskRequest { pub parent_task_id: Option<Uuid>, #[serde(skip_serializing_if = "Option::is_none")] pub checkpoint_sha: Option<String>, + /// If true, create a separate worktree for the task (requires merge after). + /// If false (default), the task shares the supervisor's worktree. + #[serde(default)] + pub use_own_worktree: bool, } #[derive(Serialize)] diff --git a/makima/src/daemon/cli/supervisor.rs b/makima/src/daemon/cli/supervisor.rs index 4f36fd8..09f61db 100644 --- a/makima/src/daemon/cli/supervisor.rs +++ b/makima/src/daemon/cli/supervisor.rs @@ -48,6 +48,10 @@ pub struct SpawnArgs { /// Repository URL (local path or remote URL). If not provided, will try to detect from current directory. #[arg(long)] pub repo: Option<String>, + + /// Create a separate worktree for the task (requires merge after). By default, tasks share the supervisor's worktree. + #[arg(long)] + pub own_worktree: bool, } /// Arguments for wait command. diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs index 9dd4506..075234f 100644 --- a/makima/src/daemon/task/manager.rs +++ b/makima/src/daemon/task/manager.rs @@ -1696,6 +1696,7 @@ impl TaskManager { patch_data, patch_base_sha, local_only, + supervisor_worktree_task_id, } => { tracing::info!( task_id = %task_id, @@ -1714,6 +1715,7 @@ impl TaskManager { continue_from_task_id = ?continue_from_task_id, copy_files = ?copy_files, contract_id = ?contract_id, + supervisor_worktree_task_id = ?supervisor_worktree_task_id, plan_len = plan.len(), "Spawning new task" ); @@ -1723,6 +1725,7 @@ impl TaskManager { target_repo_path, completion_action, continue_from_task_id, copy_files, contract_id, autonomous_loop, resume_session, conversation_history, patch_data, patch_base_sha, local_only, + supervisor_worktree_task_id, ).await?; } DaemonCommand::PauseTask { task_id } => { @@ -1824,6 +1827,7 @@ impl TaskManager { None, // patch_data - not available for respawn None, // patch_base_sha - not available for respawn local_only, + None, // supervisor_worktree_task_id - supervisors use their own worktree ).await { tracing::error!( task_id = %task_id, @@ -1993,6 +1997,12 @@ impl TaskManager { tracing::info!(task_id = %task_id, "Getting task diff"); self.handle_get_task_diff(task_id).await?; } + DaemonCommand::GetWorktreeInfo { + task_id, + } => { + tracing::info!(task_id = %task_id, "Getting worktree info"); + self.handle_get_worktree_info(task_id).await?; + } DaemonCommand::CreateCheckpoint { task_id, message, @@ -2057,6 +2067,7 @@ impl TaskManager { patch_data: Option<String>, patch_base_sha: Option<String>, local_only: bool, + supervisor_worktree_task_id: Option<Uuid>, ) -> TaskResult<()> { tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, patch_available = patch_data.is_some(), "=== SPAWN_TASK START ==="); @@ -2135,6 +2146,7 @@ impl TaskManager { is_orchestrator, is_supervisor, target_repo_path, completion_action, continue_from_task_id, copy_files, contract_id, autonomous_loop, resume_session, conversation_history, patch_data, patch_base_sha, local_only, + supervisor_worktree_task_id, ).await { tracing::error!(task_id = %task_id, error = %e, "Task execution failed"); inner.mark_failed(task_id, &e.to_string()).await; @@ -3245,6 +3257,160 @@ impl TaskManager { Ok(()) } + /// Handle GetWorktreeInfo command - get worktree files, stats, branch info. + async fn handle_get_worktree_info( + &self, + task_id: Uuid, + ) -> Result<(), DaemonError> { + // Get task's worktree path and branch + let task_info = { + let tasks = self.tasks.read().await; + tasks.get(&task_id).map(|t| ( + t.worktree.as_ref().map(|w| w.path.clone()), + t.worktree.as_ref().map(|w| w.branch.clone()), + )) + }; + + let (worktree_path, branch) = match task_info { + Some((Some(path), branch)) => (Some(path), branch), + Some((None, _)) => (None, None), + None => (None, None), + }; + + if worktree_path.is_none() { + let msg = DaemonMessage::WorktreeInfoResult { + task_id, + success: true, + worktree_path: None, + exists: false, + files_changed: 0, + insertions: 0, + deletions: 0, + files: None, + branch: None, + head_sha: None, + error: None, + }; + let _ = self.ws_tx.send(msg).await; + return Ok(()); + } + + let path = worktree_path.unwrap(); + let path_str = path.to_string_lossy().to_string(); + + // Check if worktree exists + if !path.exists() { + let msg = DaemonMessage::WorktreeInfoResult { + task_id, + success: true, + worktree_path: Some(path_str), + exists: false, + files_changed: 0, + insertions: 0, + deletions: 0, + files: None, + branch, + head_sha: None, + error: None, + }; + let _ = self.ws_tx.send(msg).await; + return Ok(()); + } + + // Get HEAD SHA + let head_sha = match tokio::process::Command::new("git") + .current_dir(&path) + .args(["rev-parse", "HEAD"]) + .output() + .await + { + Ok(output) if output.status.success() => { + Some(String::from_utf8_lossy(&output.stdout).trim().to_string()) + } + _ => None, + }; + + // Get changed files with status using git status --porcelain + let status_output = tokio::process::Command::new("git") + .current_dir(&path) + .args(["status", "--porcelain"]) + .output() + .await; + + let status_lines: Vec<(String, String)> = match status_output { + Ok(output) if output.status.success() => { + String::from_utf8_lossy(&output.stdout) + .lines() + .filter_map(|line| { + if line.len() < 3 { + return None; + } + let status = line[0..2].trim().to_string(); + let file_path = line[3..].to_string(); + Some((file_path, status)) + }) + .collect() + } + _ => vec![], + }; + + // Get numstat for line counts (staged + unstaged) + let numstat_output = tokio::process::Command::new("git") + .current_dir(&path) + .args(["diff", "HEAD", "--numstat"]) + .output() + .await; + + let mut file_stats: std::collections::HashMap<String, (i32, i32)> = std::collections::HashMap::new(); + if let Ok(output) = numstat_output { + if output.status.success() { + for line in String::from_utf8_lossy(&output.stdout).lines() { + let parts: Vec<&str> = line.split('\t').collect(); + if parts.len() >= 3 { + let added = parts[0].parse::<i32>().unwrap_or(0); + let removed = parts[1].parse::<i32>().unwrap_or(0); + let file = parts[2].to_string(); + file_stats.insert(file, (added, removed)); + } + } + } + } + + // Build file list with stats + let mut files_json = Vec::new(); + let mut total_insertions = 0; + let mut total_deletions = 0; + + for (file_path, status) in &status_lines { + let (lines_added, lines_removed) = file_stats.get(file_path).copied().unwrap_or((0, 0)); + total_insertions += lines_added; + total_deletions += lines_removed; + + files_json.push(serde_json::json!({ + "path": file_path, + "status": status, + "linesAdded": lines_added, + "linesRemoved": lines_removed, + })); + } + + let msg = DaemonMessage::WorktreeInfoResult { + task_id, + success: true, + worktree_path: Some(path_str), + exists: true, + files_changed: status_lines.len() as i32, + insertions: total_insertions, + deletions: total_deletions, + files: Some(serde_json::Value::Array(files_json)), + branch, + head_sha, + error: None, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + /// Handle CreateCheckpoint command - stage all changes, commit, and get stats. async fn handle_create_checkpoint( &self, @@ -3685,6 +3851,7 @@ impl TaskManagerInner { patch_data: Option<String>, patch_base_sha: Option<String>, local_only: bool, + supervisor_worktree_task_id: Option<Uuid>, ) -> Result<(), DaemonError> { tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, resume_session = resume_session, has_patch = patch_data.is_some(), "=== RUN_TASK START ==="); @@ -3780,8 +3947,45 @@ impl TaskManagerInner { }; // Determine working directory - let has_existing_worktree = existing_worktree.is_some() || restored_from_patch.is_some(); - let working_dir = if let Some(existing) = existing_worktree { + // First check if we should share a supervisor's worktree + let shared_supervisor_worktree = if let Some(supervisor_task_id) = supervisor_worktree_task_id { + match self.find_worktree_for_task(supervisor_task_id).await { + Ok(path) => { + tracing::info!( + task_id = %task_id, + supervisor_task_id = %supervisor_task_id, + path = %path.display(), + "Using shared worktree from supervisor" + ); + let msg = DaemonMessage::task_output( + task_id, + format!("Using shared worktree from supervisor: {}\n", path.display()), + false, + ); + let _ = self.ws_tx.send(msg).await; + Some(path) + } + Err(e) => { + tracing::error!( + task_id = %task_id, + supervisor_task_id = %supervisor_task_id, + error = %e, + "Supervisor worktree not found" + ); + return Err(DaemonError::Task(TaskError::SetupFailed( + format!("Supervisor worktree not found for task {}: {}", supervisor_task_id, e) + ))); + } + } + } else { + None + }; + + let has_existing_worktree = existing_worktree.is_some() || restored_from_patch.is_some() || shared_supervisor_worktree.is_some(); + let working_dir = if let Some(shared_path) = shared_supervisor_worktree { + // Use supervisor's worktree directly (no copy, no new branch) + shared_path + } else if let Some(existing) = existing_worktree { // Reuse existing worktree for session resume let msg = DaemonMessage::task_output( task_id, diff --git a/makima/src/daemon/ws/protocol.rs b/makima/src/daemon/ws/protocol.rs index 018dc7b..6e4f5cf 100644 --- a/makima/src/daemon/ws/protocol.rs +++ b/makima/src/daemon/ws/protocol.rs @@ -270,6 +270,34 @@ pub enum DaemonMessage { error: Option<String>, }, + /// Response to GetWorktreeInfo command. + WorktreeInfoResult { + #[serde(rename = "taskId")] + task_id: Uuid, + success: bool, + /// Path to the worktree directory + #[serde(rename = "worktreePath")] + worktree_path: Option<String>, + /// Whether the worktree exists + exists: bool, + /// Number of files changed + #[serde(rename = "filesChanged")] + files_changed: i32, + /// Total lines inserted + insertions: i32, + /// Total lines deleted + deletions: i32, + /// Changed files list: [{path, status, linesAdded, linesRemoved}] + files: Option<serde_json::Value>, + /// Current branch name + branch: Option<String>, + /// Current HEAD commit SHA + #[serde(rename = "headSha")] + head_sha: Option<String>, + /// Error message if failed + error: Option<String>, + }, + /// Response to CreateCheckpoint command. CheckpointCreated { #[serde(rename = "taskId")] @@ -449,6 +477,9 @@ pub enum DaemonCommand { /// Whether the contract is in local-only mode (skips automatic completion actions). #[serde(rename = "localOnly", default)] local_only: bool, + /// Task ID to share worktree with (supervisor's task ID). If Some, use that task's worktree instead of creating a new one. + #[serde(rename = "supervisorWorktreeTaskId", default, skip_serializing_if = "Option::is_none")] + supervisor_worktree_task_id: Option<Uuid>, }, /// Pause a running task. @@ -656,6 +687,12 @@ pub enum DaemonCommand { task_id: Uuid, }, + /// Get worktree information (files, stats, branch) for a task. + GetWorktreeInfo { + #[serde(rename = "taskId")] + task_id: Uuid, + }, + /// Create a checkpoint (stage changes, commit, get stats). CreateCheckpoint { #[serde(rename = "taskId")] |
