//! Task lifecycle manager using git worktrees and Claude Code subprocesses. use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; use std::time::Instant; use base64::Engine; use rand::Rng; use tokio::io::AsyncWriteExt; use tokio::sync::{mpsc, RwLock}; use uuid::Uuid; use std::collections::HashSet; use super::completion_gate::{CircuitBreaker, CompletionGate}; use super::state::TaskState; use crate::daemon::config::CheckpointPatchConfig; use crate::daemon::error::{DaemonError, TaskError, TaskResult}; use crate::daemon::process::{ClaudeInputMessage, ProcessManager}; use crate::daemon::storage; use crate::daemon::temp::TempManager; use crate::daemon::worktree::{is_new_repo_request, ConflictResolution, WorktreeError, WorktreeInfo, WorktreeManager}; use crate::daemon::db::local::LocalDb; use crate::daemon::ws::{BranchInfo, DaemonCommand, DaemonMessage}; /// Generate a secure random API key for orchestrator tool access. fn generate_tool_key() -> String { let mut rng = rand::thread_rng(); let bytes: [u8; 32] = rng.r#gen(); hex::encode(bytes) } /// Check if output contains an OAuth authentication error. /// Only checks system/error messages, not assistant responses (which could mention auth errors conversationally). fn is_oauth_auth_error(output: &str, json_type: Option<&str>, is_stdout: bool) -> bool { // Only check system messages or stderr output - not assistant messages // which could mention auth errors in conversation match json_type { Some("assistant") | Some("user") | Some("result") => return false, _ => {} } // For stdout JSON messages, only check system/error types if is_stdout && json_type.is_none() { // Non-JSON stdout output - could be startup messages, check carefully // Only match very specific patterns that wouldn't appear in conversation } // Match various authentication error patterns from Claude Code // These patterns are specific enough that they shouldn't appear in normal conversation if output.contains("Please run /login") && output.contains("authenticate") { return true; } if output.contains("Invalid API key") && output.contains("ANTHROPIC_API_KEY") { return true; } if output.contains("authentication_error") && (output.contains("OAuth token has expired") || output.contains("Please obtain a new token")) { return true; } // Check for Claude Code's specific error format if output.contains("\"type\":\"error\"") && output.contains("authentication") { return true; } false } /// Extract OAuth URL from text (looks for claude.ai OAuth URLs). fn extract_url(text: &str) -> Option { // Look for claude.ai OAuth URLs - try multiple patterns let patterns = [ "https://claude.ai/oauth", "https://console.anthropic.com/oauth", ]; for pattern in patterns { if let Some(start) = text.find(pattern) { let remaining = &text[start..]; // Find the end of the URL - stop at: // - Whitespace, common URL terminators, escape sequences let end = remaining .find(|c: char| { c.is_whitespace() || c == '"' || c == '\'' || c == '>' || c == ')' || c == ']' || c == '\x07' || c == '\x1b' }) .unwrap_or(remaining.len()); let url = &remaining[..end]; // Also check if there's another https:// within the match (hyperlink duplication) // Skip first 20 chars to avoid matching the start let url = if url.len() > 30 { if let Some(second_https) = url[20..].find("https://") { &url[..second_https + 20] // Keep only first URL } else { url } } else { url }; if url.len() > 20 { return Some(url.to_string()); } } } None } /// Global storage for pending OAuth flow (only one can be active at a time per daemon) static PENDING_AUTH_FLOW: std::sync::OnceLock>>> = std::sync::OnceLock::new(); fn get_auth_flow_storage() -> &'static std::sync::Mutex>> { PENDING_AUTH_FLOW.get_or_init(|| std::sync::Mutex::new(None)) } /// Send an auth code to the pending OAuth flow. pub fn send_auth_code(code: &str) -> bool { let storage = get_auth_flow_storage(); if let Ok(mut guard) = storage.lock() { if let Some(sender) = guard.take() { if sender.send(code.to_string()).is_ok() { tracing::info!("Auth code sent to setup-token process"); return true; } } } tracing::warn!("No pending auth flow to send code to"); false } /// Extract an OAuth token from a line of setup-token output. /// Looks for tokens matching the `sk-ant-oat01-` prefix format. fn extract_oauth_token(line: &str) -> Option { let trimmed = line.trim(); if trimmed.starts_with("sk-ant-oat01-") { Some(trimmed.to_string()) } else { None } } /// Save an OAuth token to the ~/.makima directory for later use by spawned Claude processes. fn save_oauth_token(token: &str) -> std::io::Result<()> { let makima_dir = dirs::home_dir() .unwrap_or_default() .join(".makima"); std::fs::create_dir_all(&makima_dir)?; let token_path = makima_dir.join("claude_oauth_token"); std::fs::write(&token_path, token)?; // Set restrictive permissions on Unix #[cfg(unix)] { use std::os::unix::fs::PermissionsExt; std::fs::set_permissions(&token_path, std::fs::Permissions::from_mode(0o600))?; } tracing::info!(path = %token_path.display(), "Saved OAuth token to disk"); Ok(()) } /// Load a previously saved OAuth token from ~/.makima/claude_oauth_token. /// Returns None if no token file exists or is empty. pub fn load_oauth_token() -> Option { let token_path = dirs::home_dir()? .join(".makima") .join("claude_oauth_token"); std::fs::read_to_string(&token_path).ok() .map(|s| s.trim().to_string()) .filter(|s| !s.is_empty()) } /// Result of the OAuth login flow initiated by `get_oauth_login_url`. /// Contains the URL for the user to visit, plus a receiver for when the token is saved. struct OAuthFlowResult { /// The OAuth login URL the user should visit. login_url: String, /// Receiver that will yield the saved token once authentication completes. token_rx: tokio::sync::oneshot::Receiver, } /// Spawn `claude setup-token` to initiate OAuth flow and capture the login URL. /// This spawns the process in a PTY (required by Ink) and reads output until we find a URL. /// /// The new `claude setup-token` flow outputs a token directly (sk-ant-oat01-...) after /// the user completes browser authentication, so no code submission is needed. /// The token is automatically detected, saved to disk, and reported via the token_rx channel. async fn get_oauth_login_url(claude_command: &str) -> Option { use portable_pty::{native_pty_system, CommandBuilder, PtySize}; use std::io::{Read, Write}; tracing::info!("Spawning claude setup-token in PTY to get OAuth login URL"); // Create a PTY - Ink requires a real terminal let pty_system = native_pty_system(); let pair = match pty_system.openpty(PtySize { rows: 24, cols: 200, // Wide enough to avoid line wrapping for long URLs/codes pixel_width: 0, pixel_height: 0, }) { Ok(pair) => pair, Err(e) => { tracing::error!(error = %e, "Failed to open PTY"); return None; } }; // Build the command let mut cmd = CommandBuilder::new(claude_command); cmd.arg("setup-token"); // Set environment variables to prevent browser from opening and disable fancy output // Use "false" so the browser command fails, forcing setup-token to show URL and wait for manual input cmd.env("BROWSER", "false"); cmd.env("TERM", "dumb"); // Disable hyperlinks and fancy terminal features cmd.env("NO_COLOR", "1"); // Disable colors // Spawn the process in the PTY let mut child = match pair.slave.spawn_command(cmd) { Ok(child) => child, Err(e) => { tracing::error!(error = %e, "Failed to spawn claude setup-token in PTY"); return None; } }; // Get the reader from the master side let mut reader = match pair.master.try_clone_reader() { Ok(reader) => reader, Err(e) => { tracing::error!(error = %e, "Failed to clone PTY reader"); return None; } }; let mut writer = match pair.master.take_writer() { Ok(writer) => writer, Err(e) => { tracing::error!(error = %e, "Failed to take PTY writer"); return None; } }; // Create channels for communication let (code_tx, code_rx) = std::sync::mpsc::channel::(); let (url_tx, url_rx) = std::sync::mpsc::channel::(); let (token_tx, token_rx) = tokio::sync::oneshot::channel::(); // Store the code sender globally so it can be used when AUTH_CODE message arrives { let storage = get_auth_flow_storage(); if let Ok(mut guard) = storage.lock() { *guard = Some(code_tx); } } // Spawn reader thread - reads PTY output, sends URL when found, and watches for token let reader_handle = std::thread::spawn(move || { let mut buffer = [0u8; 4096]; let mut accumulated = String::new(); let mut url_sent = false; let mut token_saved = false; let mut token_tx = Some(token_tx); let mut read_count = 0; tracing::info!("setup-token reader thread started"); loop { match reader.read(&mut buffer) { Ok(0) => { tracing::info!("setup-token PTY EOF reached after {} reads", read_count); break; } Ok(n) => { read_count += 1; let chunk = String::from_utf8_lossy(&buffer[..n]); accumulated.push_str(&chunk); // Process complete lines while let Some(newline_pos) = accumulated.find('\n') { let line = accumulated[..newline_pos].to_string(); accumulated = accumulated[newline_pos + 1..].to_string(); let clean_line = strip_ansi_codes(&line); if !clean_line.trim().is_empty() { tracing::info!(line = %clean_line, "setup-token output"); } // Look for OAuth URL if not found yet if !url_sent { if let Some(url) = extract_url(&line) { tracing::info!(url = %url, "Found OAuth login URL"); let _ = url_tx.send(url); url_sent = true; } } // Look for OAuth token in output (new setup-token format) if !token_saved { if let Some(token) = extract_oauth_token(&clean_line) { tracing::info!("Found OAuth token in setup-token output"); if let Err(e) = save_oauth_token(&token) { tracing::error!(error = %e, "Failed to save OAuth token"); } else { tracing::info!("OAuth token saved successfully"); } if let Some(tx) = token_tx.take() { let _ = tx.send(token); } token_saved = true; } } // Check for success/failure messages if clean_line.contains("successfully") || clean_line.contains("authenticated") || clean_line.contains("Success") { tracing::info!("Authentication appears successful!"); } if clean_line.contains("error") || clean_line.contains("failed") || clean_line.contains("invalid") { tracing::warn!(line = %clean_line, "setup-token may have encountered an error"); } } } Err(e) => { tracing::warn!(error = %e, "PTY read error after {} reads", read_count); break; } } } tracing::info!("setup-token reader thread ended (token_saved={})", token_saved); }); // Spawn writer thread - waits for auth code and writes it to PTY std::thread::spawn(move || { tracing::info!("setup-token writer thread started, waiting for auth code (10 min timeout)"); // Wait for auth code from frontend (with long timeout - user needs time to authenticate) match code_rx.recv_timeout(std::time::Duration::from_secs(600)) { Ok(code) => { tracing::info!(code_len = code.len(), "Received auth code from frontend, writing to PTY"); // Write code followed by carriage return (Enter key in raw terminal mode) let code_with_enter = format!("{}\r", code); if let Err(e) = writer.write_all(code_with_enter.as_bytes()) { tracing::error!(error = %e, "Failed to write auth code to PTY"); } else if let Err(e) = writer.flush() { tracing::error!(error = %e, "Failed to flush PTY writer"); } else { tracing::info!("Auth code written to setup-token PTY successfully"); // Give Ink a moment to process, then send another Enter in case first was buffered std::thread::sleep(std::time::Duration::from_millis(100)); let _ = writer.write_all(b"\r"); let _ = writer.flush(); tracing::info!("Sent additional Enter keypress"); } } Err(e) => { tracing::info!(error = %e, "Auth code receive ended (timeout or channel closed)"); } } // Wait for reader thread to finish tracing::debug!("Waiting for reader thread to finish..."); let _ = reader_handle.join(); // Wait for child to fully exit tracing::debug!("Waiting for setup-token child process to exit..."); match child.wait() { Ok(status) => { tracing::info!(exit_status = ?status, "setup-token process exited"); } Err(e) => { tracing::error!(error = %e, "Failed to wait for setup-token process"); } } }); // Wait for URL with timeout match url_rx.recv_timeout(std::time::Duration::from_secs(30)) { Ok(login_url) => Some(OAuthFlowResult { login_url, token_rx, }), Err(e) => { tracing::error!(error = %e, "Timed out waiting for OAuth login URL"); None } } } /// Strip ANSI escape codes from a string for cleaner logging. fn strip_ansi_codes(s: &str) -> String { let mut result = String::with_capacity(s.len()); let mut chars = s.chars().peekable(); while let Some(c) = chars.next() { if c == '\x1b' { // Check what type of escape sequence match chars.peek() { Some(&'[') => { // CSI sequence: ESC [ ... letter chars.next(); // consume '[' while let Some(&next) = chars.peek() { chars.next(); if next.is_ascii_alphabetic() { break; } } } Some(&']') => { // OSC sequence: ESC ] ... ST (where ST is BEL or ESC \) chars.next(); // consume ']' while let Some(&next) = chars.peek() { if next == '\x07' { chars.next(); // consume BEL (string terminator) break; } if next == '\x1b' { chars.next(); // consume ESC if chars.peek() == Some(&'\\') { chars.next(); // consume \ (string terminator) } break; } chars.next(); } } _ => { // Unknown escape, skip just the ESC } } } else if !c.is_control() || c == '\n' { result.push(c); } } result } /// System prompt for regular (non-orchestrator) subtasks. /// This tells subtasks they share a worktree with the supervisor and other tasks. const SUBTASK_SYSTEM_PROMPT: &str = r#"You are working in a shared worktree directory with other tasks in this contract. ## IMPORTANT: Shared Worktree **You share this worktree with the supervisor and other tasks in the contract.** - Work within your assigned area (files/modules specified in your task plan) - Be aware other tasks may be modifying other parts of the codebase - Your changes will be auto-committed when your task completes - DO NOT make commits yourself - the system handles this ## Directory Restrictions - DO NOT use `cd` to navigate outside your worktree - DO NOT use absolute paths pointing outside the worktree - All file operations should be relative to the current directory ## Your Role 1. Complete the specific task assigned to you 2. Stay focused on your task plan 3. The system will commit and integrate your changes automatically --- "#; /// The orchestrator system prompt that tells Claude how to use the helper script. const ORCHESTRATOR_SYSTEM_PROMPT: &str = r#"You are an orchestrator task. Your job is to coordinate subtasks and integrate their work, NOT to write code directly. ## FIRST STEP Start by checking if you have existing subtasks: ```bash # List all subtasks to see what work needs to be done ./.makima/orchestrate.sh list ``` If subtasks exist, start them. If you need additional subtasks or no subtasks exist yet, you can create them. --- ## Creating Subtasks You can create new subtasks to break down work: ```bash # Create a new subtask with a name and plan ./.makima/orchestrate.sh create "Subtask Name" "Detailed plan for what the subtask should do..." # The command returns the new subtask ID - use it to start the subtask ./.makima/orchestrate.sh start ``` Create subtasks when you need to: - Break down complex work into smaller pieces - Run multiple tasks in parallel on different parts of the codebase - Delegate specific implementation work ## Task Continuation (Sequential Dependencies) When subtasks need to build on each other's work (e.g., Task B depends on Task A's changes), use `--continue-from`: ```bash # Create Task B that continues from Task A's worktree ./.makima/orchestrate.sh create "Task B" "Build on Task A's work..." --continue-from ``` This copies all files from Task A's worktree into Task B's worktree, so Task B starts with Task A's changes. **When to use continuation:** - Sequential work: Task B needs Task A's output files - Staged implementation: Building features incrementally - Fix-and-extend: One task fixes issues, another adds features on top **When NOT to use continuation:** - Parallel tasks working on different files - Independent subtasks that can be merged separately **Important for merging:** When tasks continue from each other, only merge the FINAL task in the chain. Earlier tasks' changes are already included in later tasks' worktrees. ## Sharing Files with Subtasks Use `--files` to copy specific files from your orchestrator worktree to subtasks. This is useful for sharing plans, configs, or data files: ```bash # Create subtask with specific files copied from orchestrator ./.makima/orchestrate.sh create "Implement Feature" "Follow the plan in PLAN.md" --files "PLAN.md" # Copy multiple files (comma-separated) ./.makima/orchestrate.sh create "API Work" "Use the spec..." --files "PLAN.md,api-spec.yaml,types.ts" # Combine with --continue-from to share files AND continue from another task ./.makima/orchestrate.sh create "Step 2" "Continue..." --continue-from --files "requirements.md" ``` **Use cases for --files:** - Share a PLAN.md with detailed implementation steps - Distribute configuration or spec files - Pass generated data or intermediate results ## How Subtasks Work Each subtask runs in its own **worktree** - a separate directory with a copy of the codebase. When subtasks complete: - Their work remains in the worktree files (NOT committed to git) - **Subtasks do NOT auto-merge** - YOU must integrate their work into your worktree - You can view and copy files from subtask worktrees using their paths - The worktree path is returned when you get subtask status **IMPORTANT:** Subtasks never create PRs or merge to the target repository. Only the orchestrator (you) can trigger completion actions like PR creation or merging after integrating all subtask work. ## Subtask Commands ```bash # List all subtasks and their current status ./.makima/orchestrate.sh list # Create a new subtask (returns the subtask ID) ./.makima/orchestrate.sh create "Name" "Plan/description" # Create a subtask that continues from another task's worktree ./.makima/orchestrate.sh create "Name" "Plan" --continue-from # Create a subtask with specific files copied from orchestrator worktree ./.makima/orchestrate.sh create "Name" "Plan" --files "file1.md,file2.yaml" # Start a specific subtask (it will run in its own Claude instance) ./.makima/orchestrate.sh start # Stop a running subtask ./.makima/orchestrate.sh stop # Get detailed status of a subtask (includes worktree_path when available) ./.makima/orchestrate.sh status # Get the output/logs of a subtask ./.makima/orchestrate.sh output # Get the worktree path for a subtask ./.makima/orchestrate.sh worktree ``` ## Integrating Subtask Work When subtasks complete, their changes exist as files in their worktree directories: - Files are NOT committed to git branches - You must copy/integrate files from subtask worktrees into your worktree - Use standard file operations (cp, cat, etc.) to review and integrate changes ### Handling Continuation Chains **CRITICAL:** When subtasks use `--continue-from`, they form a chain where each task includes all changes from previous tasks. You must ONLY integrate the FINAL task in each chain. Example chain: Task A → Task B (continues from A) → Task C (continues from B) - Task C's worktree contains ALL changes from A, B, and C - You should ONLY integrate Task C's worktree - DO NOT integrate Task A or Task B separately (their changes are already in C) **How to track continuation chains:** 1. When you create tasks with `--continue-from`, note which task continues from which 2. Build a mental model: Independent tasks (no continuation) + Continuation chains 3. For each chain, only integrate the LAST task in the chain **Example with mixed independent and chained tasks:** ``` Independent tasks (integrate all): - Task X: API endpoints - Task Y: Database models Continuation chain (integrate ONLY the last one): - Task A: Core feature → Task B: Tests (continues from A) → Task C: Docs (continues from B) Only integrate Task C! ``` ### Integration Examples For independent subtasks (no continuation): ```bash # Get the worktree path for a completed subtask SUBTASK_PATH=$(./.makima/orchestrate.sh worktree ) # View what files were changed ls -la "$SUBTASK_PATH" diff -r . "$SUBTASK_PATH" --exclude=.git --exclude=.makima # Copy specific files from subtask cp "$SUBTASK_PATH/src/new_file.rs" ./src/ cp "$SUBTASK_PATH/src/modified_file.rs" ./src/ # Or use diff/patch for more control diff -u ./src/file.rs "$SUBTASK_PATH/src/file.rs" > changes.patch patch -p0 < changes.patch ``` For continuation chains (only integrate the final task): ```bash # If you have: Task A → Task B → Task C (each continues from previous) # ONLY get and integrate Task C's worktree - it has everything! FINAL_TASK_PATH=$(./.makima/orchestrate.sh worktree ) # Copy all changes from the final task rsync -av --exclude='.git' --exclude='.makima' "$FINAL_TASK_PATH/" ./ ``` ## Completion ```bash # Mark yourself as complete after integrating all subtask work ./.makima/orchestrate.sh done "Summary of what was accomplished" ``` ## Workflow 1. **List existing subtasks**: Run `list` to see current subtasks 2. **Create subtasks if needed**: Use `create` to add new subtasks for the work - For independent parallel work: create without `--continue-from` - For sequential dependencies: use `--continue-from ` - Track which tasks continue from which (continuation chains) 3. **Start subtasks**: Run `start` for each subtask 4. **Monitor progress**: Check status and output as subtasks run 5. **Integrate work**: When subtasks complete: - For independent tasks: integrate each one's worktree - For continuation chains: ONLY integrate the FINAL task (it has all changes) - Get worktree path with `worktree ` - Copy or merge files into your worktree 6. **Complete**: Call `done` once all work is integrated ## Important Notes - Subtask files are in worktrees, NOT committed git branches - **Subtasks do NOT auto-merge or create PRs** - you must integrate their work - You can read files from subtask worktrees using their paths - Use standard file tools (cp, diff, cat, rsync) to integrate changes - You should NOT edit files directly - that's what subtasks are for - DO NOT DO THE SUBTASKS' WORK! Your only job is to coordinate, not implement. - When you call `done`, YOUR worktree may be used for the final PR/merge "#; /// System prompt for supervisor tasks (contract orchestrators). /// Supervisors coordinate work by spawning tasks and responding to user questions. /// Git operations and phase advancement are handled automatically by the system. const SUPERVISOR_SYSTEM_PROMPT: &str = r###"You are the SUPERVISOR for this contract. Your job is to coordinate work by spawning tasks and responding to user questions. ## WHAT YOU DO 1. Break down the contract goal into actionable tasks 2. Spawn tasks using `makima supervisor spawn "Task Name" "Detailed plan..."` 3. Wait for tasks to complete using `makima supervisor wait ` 4. Respond to user questions when asked ## WHAT THE SYSTEM HANDLES AUTOMATICALLY - **Phase advancement** - When deliverables are complete, the system advances the phase - **Git commits** - Tasks auto-commit their changes on completion - **Pull requests** - System auto-creates PR when execute phase completes - **You will be notified** when phases advance so you know to continue ## CRITICAL RULES 1. **NEVER write code or edit files yourself** - you are a coordinator ONLY 2. **ALWAYS spawn tasks** for ANY work that involves writing or editing code 3. **ALWAYS wait for tasks to complete** - you MUST use `wait` after spawning ## AVAILABLE COMMANDS ### Task Management ```bash makima supervisor spawn "Task Name" "Detailed plan..." # Create and start a task makima supervisor wait [timeout_seconds] # Wait for task completion makima supervisor tasks # List all tasks makima supervisor tree # View task tree makima supervisor diff # View task changes makima supervisor read-file # Read file from task ``` ### User Interaction ```bash makima supervisor ask "Your question" [--choices "A,B,C"] # Ask user makima supervisor status # Contract status (read-only) ``` ## WORKFLOW PATTERN ```bash # 1. Spawn a task RESULT=$(makima supervisor spawn "Implement feature X" "Details...") TASK_ID=$(echo "$RESULT" | jq -r '.taskId') # 2. Wait for it makima supervisor wait "$TASK_ID" # 3. Check result makima supervisor diff "$TASK_ID" # 4. Repeat for more tasks # System handles commits, merging, and PR creation automatically ``` ## MULTI-PHASE PLANS When the plan document contains multiple implementation phases (Phase 1, Phase 2, etc.): 1. **Read the plan** to identify ALL phases 2. **Execute phases SEQUENTIALLY** - complete Phase 1 before Phase 2 3. **Track your progress** - keep track of which phases are done 4. **Confirm between phases** - use `ask` to confirm before proceeding 5. The system will auto-create PR when ALL phases are complete ## IMPORTANT NOTES - DO NOT call advance-phase - the system does this automatically - DO NOT manage git operations (branch, merge, pr) - the system handles this - Focus ONLY on spawning tasks and responding to users - You share a worktree with all tasks - changes are visible immediately - If you need user input, use `makima supervisor ask` - When all work is complete, use `makima supervisor complete` to finish ## WHEN TASKS COMPLETE When a task completes: 1. Check the result with `makima supervisor diff ` 2. If more work needed, spawn another task 3. The system automatically commits changes When ALL work is complete: - Use `makima supervisor complete` to mark the contract done - The system will auto-create PR (for remote repos) "###; /// System prompt for tasks that are part of a contract. /// This tells the task about contract.sh and how to use it to interact with the contract. const CONTRACT_INTEGRATION_PROMPT: &str = r##" ## Contract Integration This task is part of a contract. You have access to contract tools via the `makima contract` CLI. ### Contract Commands ```bash # Get contract context (name, phase, goals) makima contract status # Get phase checklist and deliverables makima contract checklist # List contract files makima contract files # Read a specific file content makima contract file # Report progress to the contract makima contract report "Completed X, working on Y..." # Create a new contract file (content via stdin) echo "# New Documentation" | makima contract create-file "New Document" # Update an existing contract file (content via stdin) cat updated_content.md | makima contract update-file # Get suggested next action when done makima contract suggest-action # Report completion with metrics makima contract completion-action --files "file1.rs,file2.rs" --code ``` ### What You Should Do **Before starting:** 1. Run `makima contract status` to understand the contract context 2. Run `makima contract checklist` to see phase deliverables 3. Run `makima contract files` to see existing documentation **While working:** - Report significant progress with `makima contract report "..."` **When completing:** 1. If your work should be documented, create or update contract files 2. Run `makima contract completion-action` to see recommended next steps 3. Consider what contract files or phases might need updating **Important:** Your work should contribute to the contract's goals. Check the contract status to understand what's expected. --- "##; /// Tracks merge state for an orchestrator task. #[derive(Default)] struct MergeTracker { /// Subtask branches that have been successfully merged. merged_subtasks: HashSet, /// Subtask branches that were explicitly skipped (with reason). skipped_subtasks: HashMap, } /// Managed task information. #[derive(Clone)] pub struct ManagedTask { /// Task ID. pub id: Uuid, /// Human-readable task name. pub task_name: String, /// Current state. pub state: TaskState, /// Worktree info if created. pub worktree: Option, /// Task plan. pub plan: String, /// Repository URL or path. pub repo_source: Option, /// Base branch. pub base_branch: Option, /// Target branch to merge into. pub target_branch: Option, /// Parent task ID if this is a subtask. pub parent_task_id: Option, /// Depth in task hierarchy (0=top-level, 1=subtask, 2=sub-subtask). pub depth: i32, /// Whether this task runs as an orchestrator (coordinates subtasks). pub is_orchestrator: bool, /// Whether this task is a supervisor (long-running contract orchestrator). pub is_supervisor: bool, /// Path to target repository for completion actions. pub target_repo_path: Option, /// Completion action: "none", "branch", "merge", "pr". pub completion_action: Option, /// Task ID to continue from (copy worktree from this task). pub continue_from_task_id: Option, /// Files to copy from parent task's worktree. pub copy_files: Option>, /// Contract ID if this task is associated with a contract. pub contract_id: Option, /// Key used for concurrency tracking (contract_id or task_id for standalone). pub concurrency_key: Uuid, /// Whether to run in autonomous loop mode. pub autonomous_loop: bool, /// Whether the contract is in local-only mode (skips automatic completion actions). pub local_only: bool, /// Whether to auto-merge to target branch locally when local_only mode is enabled. pub auto_merge_local: bool, /// If set, merge this task's changes to the supervisor's worktree on completion (cross-daemon case). pub merge_to_supervisor_task_id: Option, /// If set, this task shares the worktree of the specified supervisor task. pub supervisor_worktree_task_id: Option, /// Time task was created. pub created_at: Instant, /// Time task started running. pub started_at: Option, /// Time task completed. pub completed_at: Option, /// Error message if failed. pub error: Option, } /// Configuration for task execution. #[derive(Clone)] pub struct TaskConfig { /// Maximum concurrent tasks (global cap). pub max_concurrent_tasks: u32, /// Maximum concurrent tasks per contract/supervisor. pub max_tasks_per_contract: u32, /// Base directory for worktrees. pub worktree_base_dir: PathBuf, /// Environment variables to pass to Claude. pub env_vars: HashMap, /// Claude command path. pub claude_command: String, /// Additional arguments to pass to Claude Code. pub claude_args: Vec, /// Arguments to pass before defaults. pub claude_pre_args: Vec, /// Enable Claude's permission system. pub enable_permissions: bool, /// Disable verbose output. pub disable_verbose: bool, /// Bubblewrap sandbox configuration. pub bubblewrap: Option, /// API URL for spawned tasks (HTTP endpoint for makima CLI). pub api_url: String, /// API key for making authenticated API calls. pub api_key: String, /// Interval in seconds between heartbeat commits (WIP checkpoints). /// Set to 0 to disable. Default: 300 (5 minutes). pub heartbeat_commit_interval_secs: u64, /// Checkpoint patch storage configuration. pub checkpoint_patches: CheckpointPatchConfig, } impl Default for TaskConfig { fn default() -> Self { Self { max_concurrent_tasks: 10, max_tasks_per_contract: 10, worktree_base_dir: WorktreeManager::default_base_dir(), env_vars: HashMap::new(), claude_command: "claude".to_string(), claude_args: Vec::new(), claude_pre_args: Vec::new(), enable_permissions: false, disable_verbose: false, bubblewrap: None, api_url: "https://api.makima.jp".to_string(), api_key: String::new(), heartbeat_commit_interval_secs: 300, // 5 minutes checkpoint_patches: CheckpointPatchConfig::default(), } } } /// Task manager for handling task lifecycle. pub struct TaskManager { /// Worktree manager. worktree_manager: Arc, /// Process manager. process_manager: Arc, /// Temp directory manager. temp_manager: Arc, /// Task configuration. config: TaskConfig, /// Active tasks. tasks: Arc>>, /// Channel to send messages to server. ws_tx: mpsc::Sender, /// Tracks running task count per contract (or per standalone task). /// Key is contract_id for contract tasks, or task_id for standalone tasks. contract_task_counts: Arc>>, /// Channels for sending input to running tasks. /// Each sender allows sending messages to the stdin of a running Claude process. task_inputs: Arc>>>, /// Tracks merge state per orchestrator task (for completion gate). merge_trackers: Arc>>, /// Active process PIDs for graceful shutdown. active_pids: Arc>>, /// Inherited git user.email for worktrees. git_user_email: Arc>>, /// Inherited git user.name for worktrees. git_user_name: Arc>>, /// Local SQLite database for crash recovery. local_db: Arc>, } impl TaskManager { /// Create a new task manager with local database for crash recovery. pub fn new( config: TaskConfig, ws_tx: mpsc::Sender, local_db: Arc>, ) -> Self { let worktree_manager = Arc::new(WorktreeManager::new(config.worktree_base_dir.clone())); let process_manager = Arc::new( ProcessManager::with_command(config.claude_command.clone()) .with_args(config.claude_args.clone()) .with_pre_args(config.claude_pre_args.clone()) .with_permissions_enabled(config.enable_permissions) .with_verbose_disabled(config.disable_verbose) .with_env(config.env_vars.clone()) .with_bubblewrap(config.bubblewrap.clone()), ); let temp_manager = Arc::new(TempManager::new()); Self { worktree_manager, process_manager, temp_manager, config, tasks: Arc::new(RwLock::new(HashMap::new())), ws_tx, contract_task_counts: Arc::new(RwLock::new(HashMap::new())), task_inputs: Arc::new(RwLock::new(HashMap::new())), merge_trackers: Arc::new(RwLock::new(HashMap::new())), active_pids: Arc::new(RwLock::new(HashMap::new())), git_user_email: Arc::new(RwLock::new(None)), git_user_name: Arc::new(RwLock::new(None)), local_db, } } /// Persist task state to local SQLite database for crash recovery. fn persist_task_to_local_db(&self, task: &ManagedTask) { use crate::daemon::db::local::LocalTask; let local_task = LocalTask { id: task.id, server_task_id: task.id, // Same as task id state: task.state.clone(), container_id: None, overlay_path: task.worktree.as_ref().map(|w| w.path.to_string_lossy().to_string()), repo_url: task.repo_source.clone(), base_branch: task.base_branch.clone(), plan: task.plan.clone(), created_at: chrono::Utc::now(), started_at: task.started_at.map(|_| chrono::Utc::now()), completed_at: task.completed_at.map(|_| chrono::Utc::now()), error_message: task.error.clone(), }; if let Ok(db) = self.local_db.lock() { if let Err(e) = db.save_task(&local_task) { tracing::warn!(task_id = %task.id, error = %e, "Failed to persist task to local database"); } else { tracing::debug!(task_id = %task.id, state = ?task.state, "Persisted task to local database"); } } } /// Remove completed/failed task from local database. fn remove_task_from_local_db(&self, task_id: Uuid) { if let Ok(db) = self.local_db.lock() { if let Err(e) = db.delete_task(task_id) { tracing::warn!(task_id = %task_id, error = %e, "Failed to remove task from local database"); } else { tracing::debug!(task_id = %task_id, "Removed task from local database"); } } } /// Recover orphaned tasks from local database after daemon restart. /// Returns list of task IDs that have worktrees and can potentially be recovered. pub async fn recover_orphaned_tasks(&self) -> Vec { tracing::info!("=== STARTING ORPHANED TASK RECOVERY ==="); let active_tasks = { let db = match self.local_db.lock() { Ok(db) => db, Err(e) => { tracing::error!(error = %e, "Failed to lock local database for recovery"); return Vec::new(); } }; match db.get_active_tasks() { Ok(tasks) => tasks, Err(e) => { tracing::error!(error = %e, "Failed to load active tasks from local database"); return Vec::new(); } } }; if active_tasks.is_empty() { tracing::info!("No orphaned tasks found in local database"); return Vec::new(); } tracing::info!(count = active_tasks.len(), "Found orphaned tasks in local database"); let mut recoverable_task_ids = Vec::new(); for local_task in active_tasks { tracing::info!( task_id = %local_task.id, state = ?local_task.state, overlay_path = ?local_task.overlay_path, "Checking orphaned task" ); // Check if worktree exists on filesystem let worktree_exists = if let Some(ref path) = local_task.overlay_path { let path = std::path::PathBuf::from(path); path.exists() && path.join(".git").exists() } else { // Try to find worktree by task ID pattern (scan worktrees directory) let short_id = &local_task.id.to_string()[..8]; let worktrees_dir = self.worktree_manager.base_dir(); let mut found = false; if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await { while let Ok(Some(entry)) = entries.next_entry().await { let name = entry.file_name(); let name_str = name.to_string_lossy(); if name_str.starts_with(short_id) { let path = entry.path(); if path.join(".git").exists() { found = true; break; } } } } found }; if worktree_exists { tracing::info!( task_id = %local_task.id, "Found worktree for orphaned task - can be recovered" ); recoverable_task_ids.push(local_task.id); // Send structured recovery notification to server let msg = DaemonMessage::task_recovery_detected( local_task.id, local_task.state.as_str(), true, // worktree intact local_task.overlay_path.clone(), false, // doesn't need patch since worktree is intact ); let _ = self.ws_tx.send(msg).await; } else { tracing::warn!( task_id = %local_task.id, "Worktree missing for orphaned task - marking as lost" ); // Update local db to mark as failed if let Ok(db) = self.local_db.lock() { let _ = db.update_task_state(local_task.id, TaskState::Failed); } } } tracing::info!( recoverable = recoverable_task_ids.len(), "=== ORPHANED TASK RECOVERY COMPLETE ===" ); recoverable_task_ids } /// Check worktree health for all running tasks. /// If a worktree is missing, marks the task as interrupted and notifies the server. /// This allows the retry orchestrator to pick up the task and restore it from checkpoint. pub async fn check_worktree_health(&self) -> Vec { let mut affected_task_ids = Vec::new(); // Get all running tasks with their worktree info and supervisor worktree task ID let tasks_snapshot: Vec<(Uuid, Option, Option)> = { let tasks = self.tasks.read().await; tasks .iter() .filter(|(_, t)| matches!(t.state, TaskState::Running | TaskState::Starting)) .map(|(id, t)| (*id, t.worktree.as_ref().map(|w| w.path.clone()), t.supervisor_worktree_task_id)) .collect() }; if tasks_snapshot.is_empty() { return affected_task_ids; } for (task_id, worktree_path, supervisor_worktree_task_id) in tasks_snapshot { let worktree_exists = if let Some(ref path) = worktree_path { path.exists() && path.join(".git").exists() } else if let Some(supervisor_task_id) = supervisor_worktree_task_id { // Task uses shared supervisor worktree - check the supervisor's worktree // First try to get from in-memory tasks let supervisor_worktree_path: Option = { let tasks = self.tasks.read().await; tasks.get(&supervisor_task_id) .and_then(|t| t.worktree.as_ref().map(|w| w.path.clone())) }; if let Some(path) = supervisor_worktree_path { path.exists() && path.join(".git").exists() } else { // Supervisor not in memory - scan worktrees directory let short_id = &supervisor_task_id.to_string()[..8]; let worktrees_dir = self.worktree_manager.base_dir(); let mut found = false; if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await { while let Ok(Some(entry)) = entries.next_entry().await { let name = entry.file_name(); let name_str = name.to_string_lossy(); if name_str.starts_with(short_id) { let path = entry.path(); if path.join(".git").exists() { found = true; break; } } } } found } } else { // No worktree set - scan by task ID let short_id = &task_id.to_string()[..8]; let worktrees_dir = self.worktree_manager.base_dir(); let mut found = false; if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await { while let Ok(Some(entry)) = entries.next_entry().await { let name = entry.file_name(); let name_str = name.to_string_lossy(); if name_str.starts_with(short_id) { let path = entry.path(); if path.join(".git").exists() { found = true; break; } } } } found }; if !worktree_exists { tracing::warn!( task_id = %task_id, worktree_path = ?worktree_path, "Worktree missing for running task - marking as interrupted for retry" ); affected_task_ids.push(task_id); // Update task state to interrupted { let mut tasks = self.tasks.write().await; if let Some(task) = tasks.get_mut(&task_id) { task.state = TaskState::Interrupted; task.error = Some("Worktree directory was deleted".to_string()); task.completed_at = Some(Instant::now()); } } // Notify server - task needs recovery/retry let msg = DaemonMessage::task_complete( task_id, false, Some("Worktree deleted - task interrupted for recovery".to_string()), ); let _ = self.ws_tx.send(msg).await; // Remove from local db since server will handle retry self.remove_task_from_local_db(task_id); } } if !affected_task_ids.is_empty() { tracing::info!( count = affected_task_ids.len(), "Worktree health check found missing worktrees" ); } affected_task_ids } /// Check if a task can be spawned given contract-based concurrency limits. /// Returns the concurrency key to use (contract_id or task_id for standalone). async fn try_acquire_concurrency_slot( &self, contract_id: Option, task_id: Uuid, ) -> TaskResult { let mut counts = self.contract_task_counts.write().await; // Determine the concurrency key: // - For contract tasks: use contract_id // - For standalone tasks: use task_id (each standalone task is its own "contract") let concurrency_key = contract_id.unwrap_or(task_id); // Check global cap let total: usize = counts.values().sum(); if total >= self.config.max_concurrent_tasks as usize { tracing::warn!( task_id = %task_id, total_running = total, max = self.config.max_concurrent_tasks, "Global concurrency limit reached, cannot spawn task" ); return Err(TaskError::ConcurrencyLimit); } // Check per-contract cap let contract_count = counts.get(&concurrency_key).copied().unwrap_or(0); if contract_count >= self.config.max_tasks_per_contract as usize { tracing::warn!( task_id = %task_id, contract_id = ?contract_id, concurrency_key = %concurrency_key, contract_running = contract_count, max_per_contract = self.config.max_tasks_per_contract, "Contract concurrency limit reached, cannot spawn task" ); return Err(TaskError::ContractConcurrencyLimit); } // Increment count for this contract *counts.entry(concurrency_key).or_insert(0) += 1; tracing::debug!( task_id = %task_id, concurrency_key = %concurrency_key, new_count = counts.get(&concurrency_key).copied().unwrap_or(0), total = total + 1, "Acquired concurrency slot" ); Ok(concurrency_key) } /// Gracefully shutdown all running Claude processes and their children. /// /// This sends SIGTERM to all active process groups, waits for them to exit gracefully, /// and then sends SIGKILL to any that don't exit within the timeout. /// Uses process groups to ensure all child processes (bash commands, etc.) are also killed. #[cfg(unix)] pub async fn shutdown_all_processes(&self, timeout: std::time::Duration) { use nix::sys::signal::{killpg, Signal}; use nix::unistd::Pid; let pids: Vec<(Uuid, u32)> = { let guard = self.active_pids.read().await; guard.iter().map(|(k, v)| (*k, *v)).collect() }; if pids.is_empty() { tracing::info!("No active Claude processes to shutdown"); return; } tracing::info!(count = pids.len(), "Sending SIGTERM to all Claude process groups"); // Send SIGTERM to all process groups (each Claude process is its own group leader) for (task_id, pid) in &pids { match killpg(Pid::from_raw(*pid as i32), Signal::SIGTERM) { Ok(()) => { tracing::debug!(task_id = %task_id, pid = pid, "Sent SIGTERM to process group"); } Err(nix::errno::Errno::ESRCH) => { tracing::debug!(task_id = %task_id, pid = pid, "Process group already exited"); } Err(e) => { tracing::warn!(task_id = %task_id, pid = pid, error = %e, "Failed to send SIGTERM to process group"); } } } // Wait for processes to exit with timeout let start = std::time::Instant::now(); let check_interval = std::time::Duration::from_millis(100); while start.elapsed() < timeout { let remaining: Vec = { let guard = self.active_pids.read().await; guard.values().copied().collect() }; if remaining.is_empty() { tracing::info!("All Claude processes exited gracefully"); return; } tokio::time::sleep(check_interval).await; } // Send SIGKILL to any remaining process groups let remaining: Vec<(Uuid, u32)> = { let guard = self.active_pids.read().await; guard.iter().map(|(k, v)| (*k, *v)).collect() }; if !remaining.is_empty() { tracing::warn!( count = remaining.len(), "Some process groups did not exit gracefully, sending SIGKILL" ); for (task_id, pid) in &remaining { match killpg(Pid::from_raw(*pid as i32), Signal::SIGKILL) { Ok(()) => { tracing::debug!(task_id = %task_id, pid = pid, "Sent SIGKILL to process group"); } Err(e) => { tracing::warn!(task_id = %task_id, pid = pid, error = %e, "Failed to send SIGKILL to process group"); } } } } } /// Gracefully shutdown all running Claude processes (no-op on non-Unix). #[cfg(not(unix))] pub async fn shutdown_all_processes(&self, _timeout: std::time::Duration) { tracing::warn!("Graceful shutdown not supported on this platform"); } /// Pause a running task by sending SIGSTOP to its process. #[cfg(unix)] pub async fn pause_task(&self, task_id: Uuid) -> TaskResult<()> { use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; // Check if task exists and is running let current_state = { let tasks = self.tasks.read().await; tasks.get(&task_id).map(|t| t.state) }; match current_state { Some(TaskState::Running) => {} Some(TaskState::Paused) => { tracing::debug!(task_id = %task_id, "Task already paused"); return Ok(()); } Some(state) => { tracing::warn!(task_id = %task_id, state = ?state, "Cannot pause task in state"); return Err(TaskError::InvalidStateTransition { from: format!("{:?}", state), to: "Paused".to_string(), }); } None => { tracing::warn!(task_id = %task_id, "Task not found"); return Err(TaskError::NotFound(task_id)); } } // Get the process PID let pid = { let pids = self.active_pids.read().await; pids.get(&task_id).copied() }; let Some(pid) = pid else { tracing::warn!(task_id = %task_id, "No PID found for task"); return Err(TaskError::ExecutionFailed( "No active process for task".to_string(), )); }; // Send SIGSTOP to pause the process match kill(Pid::from_raw(pid as i32), Signal::SIGSTOP) { Ok(()) => { tracing::info!(task_id = %task_id, pid = pid, "Sent SIGSTOP to pause process"); } Err(nix::errno::Errno::ESRCH) => { tracing::warn!(task_id = %task_id, pid = pid, "Process not found"); return Err(TaskError::ExecutionFailed("Process not found".to_string())); } Err(e) => { tracing::error!(task_id = %task_id, pid = pid, error = %e, "Failed to send SIGSTOP"); return Err(TaskError::ExecutionFailed(format!( "Failed to pause: {}", e ))); } } // Update task state to Paused { let mut tasks = self.tasks.write().await; if let Some(task) = tasks.get_mut(&task_id) { task.state = TaskState::Paused; } } // Notify server of state change let msg = DaemonMessage::task_status_change(task_id, "running", "paused"); let _ = self.ws_tx.send(msg).await; Ok(()) } /// Pause a task (no-op on non-Unix). #[cfg(not(unix))] pub async fn pause_task(&self, task_id: Uuid) -> TaskResult<()> { tracing::warn!(task_id = %task_id, "Pause not supported on this platform"); Err(TaskError::ExecutionFailed( "Pause not supported on this platform".to_string(), )) } /// Resume a paused task by sending SIGCONT to its process. #[cfg(unix)] pub async fn resume_task(&self, task_id: Uuid) -> TaskResult<()> { use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; // Check if task exists and is paused let current_state = { let tasks = self.tasks.read().await; tasks.get(&task_id).map(|t| t.state) }; match current_state { Some(TaskState::Paused) => {} Some(TaskState::Running) => { tracing::debug!(task_id = %task_id, "Task already running"); return Ok(()); } Some(state) => { tracing::warn!(task_id = %task_id, state = ?state, "Cannot resume task in state"); return Err(TaskError::InvalidStateTransition { from: format!("{:?}", state), to: "Running".to_string(), }); } None => { tracing::warn!(task_id = %task_id, "Task not found"); return Err(TaskError::NotFound(task_id)); } } // Get the process PID let pid = { let pids = self.active_pids.read().await; pids.get(&task_id).copied() }; let Some(pid) = pid else { tracing::warn!(task_id = %task_id, "No PID found for task"); return Err(TaskError::ExecutionFailed( "No active process for task".to_string(), )); }; // Send SIGCONT to resume the process match kill(Pid::from_raw(pid as i32), Signal::SIGCONT) { Ok(()) => { tracing::info!(task_id = %task_id, pid = pid, "Sent SIGCONT to resume process"); } Err(nix::errno::Errno::ESRCH) => { tracing::warn!(task_id = %task_id, pid = pid, "Process not found"); return Err(TaskError::ExecutionFailed("Process not found".to_string())); } Err(e) => { tracing::error!(task_id = %task_id, pid = pid, error = %e, "Failed to send SIGCONT"); return Err(TaskError::ExecutionFailed(format!( "Failed to resume: {}", e ))); } } // Update task state to Running { let mut tasks = self.tasks.write().await; if let Some(task) = tasks.get_mut(&task_id) { task.state = TaskState::Running; } } // Notify server of state change let msg = DaemonMessage::task_status_change(task_id, "paused", "running"); let _ = self.ws_tx.send(msg).await; Ok(()) } /// Resume a task (no-op on non-Unix). #[cfg(not(unix))] pub async fn resume_task(&self, task_id: Uuid) -> TaskResult<()> { tracing::warn!(task_id = %task_id, "Resume not supported on this platform"); Err(TaskError::ExecutionFailed( "Resume not supported on this platform".to_string(), )) } /// Handle a command from the server. pub async fn handle_command(&self, command: DaemonCommand) -> Result<(), DaemonError> { tracing::info!("Received command from server: {:?}", command); match command { DaemonCommand::SpawnTask { task_id, task_name, plan, repo_url, base_branch, target_branch, parent_task_id, depth, is_orchestrator, target_repo_path, completion_action, continue_from_task_id, copy_files, contract_id, is_supervisor, autonomous_loop, resume_session, conversation_history, patch_data, patch_base_sha, local_only, auto_merge_local, supervisor_worktree_task_id, directive_id, } => { tracing::info!( task_id = %task_id, task_name = %task_name, repo_url = ?repo_url, base_branch = ?base_branch, target_branch = ?target_branch, parent_task_id = ?parent_task_id, depth = depth, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, autonomous_loop = autonomous_loop, resume_session = resume_session, target_repo_path = ?target_repo_path, completion_action = ?completion_action, continue_from_task_id = ?continue_from_task_id, copy_files = ?copy_files, contract_id = ?contract_id, directive_id = ?directive_id, supervisor_worktree_task_id = ?supervisor_worktree_task_id, plan_len = plan.len(), "Spawning new task" ); self.spawn_task( task_id, task_name, plan, repo_url, base_branch, target_branch, parent_task_id, depth, is_orchestrator, is_supervisor, target_repo_path, completion_action, continue_from_task_id, copy_files, contract_id, autonomous_loop, resume_session, conversation_history, patch_data, patch_base_sha, local_only, auto_merge_local, supervisor_worktree_task_id, directive_id, ).await?; } DaemonCommand::PauseTask { task_id } => { tracing::info!(task_id = %task_id, "Pausing task"); if let Err(e) = self.pause_task(task_id).await { tracing::warn!(task_id = %task_id, error = %e, "Failed to pause task"); } } DaemonCommand::ResumeTask { task_id } => { tracing::info!(task_id = %task_id, "Resuming task"); if let Err(e) = self.resume_task(task_id).await { tracing::warn!(task_id = %task_id, error = %e, "Failed to resume task"); } } DaemonCommand::InterruptTask { task_id, graceful: _ } => { tracing::info!(task_id = %task_id, "Interrupting task"); self.interrupt_task(task_id).await?; } DaemonCommand::SendMessage { task_id, message } => { // Check if this is an auth code message if message.starts_with("AUTH_CODE:") { let code = message.strip_prefix("AUTH_CODE:").unwrap_or("").trim(); tracing::info!(task_id = %task_id, "Received auth code from frontend"); if send_auth_code(code) { tracing::info!(task_id = %task_id, "Auth code forwarded to setup-token"); } else { tracing::warn!(task_id = %task_id, "No pending auth flow to receive code"); } } else { // Check if task is paused - auto-resume before sending message let task_state = { let tasks = self.tasks.read().await; tasks.get(&task_id).map(|t| t.state) }; if task_state == Some(TaskState::Paused) { tracing::info!(task_id = %task_id, "Auto-resuming paused task before sending message"); if let Err(e) = self.resume_task(task_id).await { tracing::warn!(task_id = %task_id, error = %e, "Failed to auto-resume task"); } } // Regular message - send to task's stdin tracing::info!(task_id = %task_id, message_len = message.len(), "Sending message to task stdin"); // Send message to the task's stdin via the input channel let inputs = self.task_inputs.read().await; if let Some(sender) = inputs.get(&task_id) { if let Err(e) = sender.send(message).await { tracing::warn!(task_id = %task_id, error = %e, "Failed to send message to task input channel (channel may be closed, stdin forwarder may have exited)"); } else { tracing::info!(task_id = %task_id, "Message sent to task input channel successfully, will be forwarded to Claude stdin"); } } else { drop(inputs); // Release read lock before checking if we need to respawn // Check if this is a supervisor that needs to be respawned let task_info = { let tasks = self.tasks.read().await; tasks.get(&task_id).cloned() }; if let Some(task) = task_info { if task.is_supervisor { tracing::info!( task_id = %task_id, "Supervisor has no active Claude process, respawning with message" ); // Respawn the supervisor with the new message as the plan // Claude Code will use --continue to maintain conversation history let inner = self.clone_inner(); let task_name = task.task_name.clone(); let repo_source = task.repo_source.clone(); let base_branch = task.base_branch.clone(); let target_branch = task.target_branch.clone(); let target_repo_path = task.target_repo_path.clone(); let completion_action = task.completion_action.clone(); let contract_id = task.contract_id; let local_only = task.local_only; let auto_merge_local = task.auto_merge_local; // Spawn in background to not block the command handler tokio::spawn(async move { if let Err(e) = inner.run_task( task_id, task_name, message, // Use the message as the new prompt repo_source, base_branch, target_branch, false, // is_orchestrator true, // is_supervisor target_repo_path, completion_action, None, // continue_from_task_id None, // copy_files contract_id, false, // autonomous_loop - supervisors don't use this false, // resume_session - respawning from scratch None, // conversation_history - not needed for fresh respawn None, // patch_data - not available for respawn None, // patch_base_sha - not available for respawn local_only, auto_merge_local, None, // supervisor_worktree_task_id - supervisors use their own worktree None, // directive_id ).await { tracing::error!( task_id = %task_id, error = %e, "Failed to respawn supervisor" ); } }); } else { tracing::warn!(task_id = %task_id, "No input channel for task (task may not be running)"); } } else { tracing::warn!(task_id = %task_id, "Task not found"); } } } } DaemonCommand::InjectSiblingContext { task_id, .. } => { tracing::debug!(task_id = %task_id, "Sibling context injection not supported for subprocess tasks"); } DaemonCommand::Authenticated { daemon_id } => { tracing::debug!(daemon_id = %daemon_id, "Authenticated command (handled by WS client)"); } DaemonCommand::Error { code, message } => { tracing::warn!(code = %code, message = %message, "Error command from server"); } // ========================================================================= // Merge Commands // ========================================================================= DaemonCommand::ListBranches { task_id } => { tracing::info!(task_id = %task_id, "Listing task branches"); self.handle_list_branches(task_id).await?; } DaemonCommand::MergeStart { task_id, source_branch } => { tracing::info!(task_id = %task_id, source_branch = %source_branch, "Starting merge"); self.handle_merge_start(task_id, source_branch).await?; } DaemonCommand::MergeStatus { task_id } => { tracing::info!(task_id = %task_id, "Getting merge status"); self.handle_merge_status(task_id).await?; } DaemonCommand::MergeResolve { task_id, file, strategy } => { tracing::info!(task_id = %task_id, file = %file, strategy = %strategy, "Resolving conflict"); self.handle_merge_resolve(task_id, file, strategy).await?; } DaemonCommand::MergeCommit { task_id, message } => { tracing::info!(task_id = %task_id, "Committing merge"); self.handle_merge_commit(task_id, message).await?; } DaemonCommand::MergeAbort { task_id } => { tracing::info!(task_id = %task_id, "Aborting merge"); self.handle_merge_abort(task_id).await?; } DaemonCommand::MergeSkip { task_id, subtask_id, reason } => { tracing::info!(task_id = %task_id, subtask_id = %subtask_id, reason = %reason, "Skipping subtask merge"); self.handle_merge_skip(task_id, subtask_id, reason).await?; } DaemonCommand::CheckMergeComplete { task_id } => { tracing::info!(task_id = %task_id, "Checking merge completion"); self.handle_check_merge_complete(task_id).await?; } // ========================================================================= // Completion Action Commands // ========================================================================= DaemonCommand::RetryCompletionAction { task_id, task_name, action, target_repo_path, target_branch, } => { tracing::info!( task_id = %task_id, task_name = %task_name, action = %action, target_repo_path = %target_repo_path, target_branch = ?target_branch, "Retrying completion action" ); self.handle_retry_completion_action(task_id, task_name, action, target_repo_path, target_branch).await?; } DaemonCommand::CloneWorktree { task_id, target_dir } => { tracing::info!( task_id = %task_id, target_dir = %target_dir, "Cloning worktree to target directory" ); self.handle_clone_worktree(task_id, target_dir).await?; } DaemonCommand::CheckTargetExists { task_id, target_dir } => { tracing::debug!( task_id = %task_id, target_dir = %target_dir, "Checking if target directory exists" ); self.handle_check_target_exists(task_id, target_dir).await?; } // ========================================================================= // Contract File Commands // ========================================================================= DaemonCommand::ReadRepoFile { request_id, contract_id, file_path, repo_path, } => { tracing::info!( request_id = %request_id, contract_id = %contract_id, file_path = %file_path, repo_path = %repo_path, "Reading file from repository" ); self.handle_read_repo_file(request_id, file_path, repo_path).await?; } DaemonCommand::CreateBranch { task_id, branch_name, from_ref, } => { tracing::info!( task_id = %task_id, branch_name = %branch_name, from_ref = ?from_ref, "Creating branch" ); self.handle_create_branch(task_id, branch_name, from_ref).await?; } DaemonCommand::MergeTaskToTarget { task_id, target_branch, squash, } => { tracing::info!( task_id = %task_id, target_branch = ?target_branch, squash = squash, "Merging task to target branch" ); self.handle_merge_task_to_target(task_id, target_branch, squash).await?; } DaemonCommand::CreatePR { task_id, title, body, base_branch, branch, } => { tracing::info!( task_id = %task_id, title = %title, base_branch = ?base_branch, branch = %branch, "Creating pull request" ); self.handle_create_pr(task_id, title, body, base_branch, branch).await?; } DaemonCommand::GetTaskDiff { task_id, } => { tracing::info!(task_id = %task_id, "Getting task diff"); self.handle_get_task_diff(task_id).await?; } DaemonCommand::GetWorktreeInfo { task_id, } => { tracing::info!(task_id = %task_id, "Getting worktree info"); self.handle_get_worktree_info(task_id).await?; } DaemonCommand::CommitWorktree { task_id, message } => { tracing::info!(task_id = %task_id, "Committing worktree changes"); self.handle_commit_worktree(task_id, message).await?; } DaemonCommand::CreateCheckpoint { task_id, message, } => { tracing::info!(task_id = %task_id, "Creating checkpoint"); self.handle_create_checkpoint(task_id, message).await?; } DaemonCommand::CleanupWorktree { task_id, delete_branch, } => { tracing::info!( task_id = %task_id, delete_branch = delete_branch, "Cleaning up worktree" ); self.handle_cleanup_worktree(task_id, delete_branch).await?; } DaemonCommand::InheritGitConfig { source_dir } => { tracing::info!(source_dir = ?source_dir, "Inheriting git config"); self.handle_inherit_git_config(source_dir).await?; } DaemonCommand::CreateExportPatch { task_id, base_sha } => { tracing::info!(task_id = %task_id, base_sha = ?base_sha, "Creating export patch"); self.handle_create_export_patch(task_id, base_sha).await?; } DaemonCommand::RestartDaemon => { tracing::info!("Received restart command from server, initiating daemon restart..."); // Shutdown all running tasks gracefully self.shutdown_all_processes(std::time::Duration::from_secs(5)).await; // Exit the process - the daemon should be restarted by a process manager // or the user can restart it manually tracing::info!("Daemon restart: exiting process with code 42 (restart requested)"); std::process::exit(42); } DaemonCommand::TriggerReauth { request_id } => { tracing::info!(request_id = %request_id, "Received reauth trigger command from server"); let claude_command = self.process_manager.claude_command().to_string(); let ws_tx = self.ws_tx.clone(); // Spawn in a task so it doesn't block command handling tokio::spawn(async move { match get_oauth_login_url(&claude_command).await { Some(flow_result) => { tracing::info!(request_id = %request_id, login_url = %flow_result.login_url, "Got OAuth login URL for reauth"); // Send url_ready status immediately let msg = DaemonMessage::ReauthStatus { request_id, status: "url_ready".to_string(), login_url: Some(flow_result.login_url), error: None, token_saved: false, }; let _ = ws_tx.send(msg).await; // Now wait for the token to be detected and saved (up to 10 minutes) let ws_tx_token = ws_tx.clone(); tokio::spawn(async move { match tokio::time::timeout( std::time::Duration::from_secs(600), flow_result.token_rx, ).await { Ok(Ok(_token)) => { tracing::info!(request_id = %request_id, "OAuth token received and saved, reporting completion"); let msg = DaemonMessage::ReauthStatus { request_id, status: "completed".to_string(), login_url: None, error: None, token_saved: true, }; let _ = ws_tx_token.send(msg).await; } Ok(Err(_)) => { tracing::warn!(request_id = %request_id, "Token channel closed without receiving token"); let msg = DaemonMessage::ReauthStatus { request_id, status: "failed".to_string(), login_url: None, error: Some("setup-token process ended without producing a token".to_string()), token_saved: false, }; let _ = ws_tx_token.send(msg).await; } Err(_) => { tracing::warn!(request_id = %request_id, "Timed out waiting for OAuth token (10 min)"); let msg = DaemonMessage::ReauthStatus { request_id, status: "failed".to_string(), login_url: None, error: Some("Timed out waiting for authentication to complete".to_string()), token_saved: false, }; let _ = ws_tx_token.send(msg).await; } } }); } None => { tracing::error!(request_id = %request_id, "Failed to get OAuth login URL for reauth"); let msg = DaemonMessage::ReauthStatus { request_id, status: "failed".to_string(), login_url: None, error: Some("Failed to get OAuth login URL from setup-token".to_string()), token_saved: false, }; let _ = ws_tx.send(msg).await; } } }); } DaemonCommand::SubmitAuthCode { request_id, code } => { tracing::info!(request_id = %request_id, "Received auth code submission from server"); let ws_tx = self.ws_tx.clone(); if send_auth_code(&code) { tracing::info!(request_id = %request_id, "Auth code forwarded to setup-token for reauth"); // Wait a short time then report completion // (the setup-token process takes a moment to complete) tokio::spawn(async move { tokio::time::sleep(std::time::Duration::from_secs(3)).await; let msg = DaemonMessage::ReauthStatus { request_id, status: "completed".to_string(), login_url: None, error: None, token_saved: false, }; let _ = ws_tx.send(msg).await; }); } else { tracing::warn!(request_id = %request_id, "No pending auth flow to receive code for reauth"); let msg = DaemonMessage::ReauthStatus { request_id, status: "failed".to_string(), login_url: None, error: Some("No pending auth flow to receive the code. Try triggering reauth again.".to_string()), token_saved: false, }; let _ = self.ws_tx.send(msg).await; } } DaemonCommand::ApplyPatchToWorktree { target_task_id, source_task_id, patch_data, base_sha, } => { tracing::info!( target_task_id = %target_task_id, source_task_id = %source_task_id, base_sha = %base_sha, "Applying patch from cross-daemon task to worktree" ); self.handle_apply_patch_to_worktree(target_task_id, source_task_id, patch_data, base_sha).await?; } } Ok(()) } /// Spawn a new task. #[allow(clippy::too_many_arguments)] pub async fn spawn_task( &self, task_id: Uuid, task_name: String, plan: String, repo_url: Option, base_branch: Option, target_branch: Option, parent_task_id: Option, depth: i32, is_orchestrator: bool, is_supervisor: bool, target_repo_path: Option, completion_action: Option, continue_from_task_id: Option, copy_files: Option>, contract_id: Option, autonomous_loop: bool, resume_session: bool, conversation_history: Option, patch_data: Option, patch_base_sha: Option, local_only: bool, auto_merge_local: bool, supervisor_worktree_task_id: Option, directive_id: Option, ) -> TaskResult<()> { tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, patch_available = patch_data.is_some(), "=== SPAWN_TASK START ==="); // Check if task already exists - allow re-spawning if in terminal state // or if resuming a supervisor (supervisors stay in Running state after Claude exits) { let mut tasks = self.tasks.write().await; if let Some(existing) = tasks.get(&task_id) { let can_respawn = existing.state.is_terminal() || (resume_session && existing.is_supervisor); if can_respawn { // Task exists but can be re-spawned (terminal state or supervisor resume) tracing::info!(task_id = %task_id, old_state = ?existing.state, resume_session = resume_session, is_supervisor = existing.is_supervisor, "Removing task to allow re-spawn"); tasks.remove(&task_id); } else { // Task is still active, reject tracing::warn!(task_id = %task_id, state = ?existing.state, "Task already exists and is active, rejecting spawn"); return Err(TaskError::AlreadyExists(task_id)); } } } // Acquire concurrency slot (contract-based concurrency) tracing::info!(task_id = %task_id, contract_id = ?contract_id, "Acquiring concurrency slot..."); let concurrency_key = self.try_acquire_concurrency_slot(contract_id, task_id).await?; tracing::info!(task_id = %task_id, concurrency_key = %concurrency_key, "Concurrency slot acquired"); // Create task entry tracing::info!(task_id = %task_id, "Creating task entry in state: Initializing"); let task = ManagedTask { id: task_id, task_name: task_name.clone(), state: TaskState::Initializing, worktree: None, plan: plan.clone(), repo_source: repo_url.clone(), base_branch: base_branch.clone(), target_branch: target_branch.clone(), parent_task_id, depth, is_orchestrator, is_supervisor, target_repo_path: target_repo_path.clone(), completion_action: completion_action.clone(), continue_from_task_id, copy_files: copy_files.clone(), contract_id, concurrency_key, autonomous_loop, local_only, auto_merge_local, merge_to_supervisor_task_id: None, // Set later if cross-daemon supervisor_worktree_task_id, created_at: Instant::now(), started_at: None, completed_at: None, error: None, }; // Persist task to local database for crash recovery self.persist_task_to_local_db(&task); self.tasks.write().await.insert(task_id, task); tracing::info!(task_id = %task_id, "Task entry created and stored"); // Notify server of status change tracing::info!(task_id = %task_id, "Notifying server: pending -> initializing"); self.send_status_change(task_id, "pending", "initializing").await; // Spawn task in background tracing::info!(task_id = %task_id, "Spawning background task runner"); let inner = self.clone_inner(); tokio::spawn(async move { tracing::info!(task_id = %task_id, "Background task runner started"); if let Err(e) = inner.run_task( task_id, task_name, plan, repo_url, base_branch, target_branch, is_orchestrator, is_supervisor, target_repo_path, completion_action, continue_from_task_id, copy_files, contract_id, autonomous_loop, resume_session, conversation_history, patch_data, patch_base_sha, local_only, auto_merge_local, supervisor_worktree_task_id, directive_id, ).await { tracing::error!(task_id = %task_id, error = %e, "Task execution failed"); inner.mark_failed(task_id, &e.to_string()).await; } // Release concurrency slot inner.release_concurrency_slot(concurrency_key).await; tracing::info!(task_id = %task_id, concurrency_key = %concurrency_key, "Background task runner completed, concurrency slot released"); }); tracing::info!(task_id = %task_id, "=== SPAWN_TASK END (task running in background) ==="); Ok(()) } /// Clone inner state for spawned tasks. fn clone_inner(&self) -> TaskManagerInner { TaskManagerInner { worktree_manager: self.worktree_manager.clone(), process_manager: self.process_manager.clone(), temp_manager: self.temp_manager.clone(), tasks: self.tasks.clone(), ws_tx: self.ws_tx.clone(), task_inputs: self.task_inputs.clone(), active_pids: self.active_pids.clone(), git_user_email: self.git_user_email.clone(), git_user_name: self.git_user_name.clone(), api_url: self.config.api_url.clone(), api_key: self.config.api_key.clone(), heartbeat_commit_interval_secs: self.config.heartbeat_commit_interval_secs, contract_task_counts: self.contract_task_counts.clone(), checkpoint_patches: self.config.checkpoint_patches.clone(), local_db: self.local_db.clone(), } } /// Interrupt a task. pub async fn interrupt_task(&self, task_id: Uuid) -> TaskResult<()> { let mut tasks = self.tasks.write().await; let task = tasks.get_mut(&task_id).ok_or(TaskError::NotFound(task_id))?; if task.state.is_terminal() { return Ok(()); // Already done } let old_state = task.state; task.state = TaskState::Interrupted; task.completed_at = Some(Instant::now()); // Notify server drop(tasks); self.send_status_change(task_id, old_state.as_str(), "interrupted").await; // Note: The process will be killed when the ClaudeProcess is dropped // Worktrees are kept until explicitly deleted Ok(()) } /// Get list of active task IDs. pub async fn active_task_ids(&self) -> Vec { self.tasks .read() .await .iter() .filter(|(_, t)| t.state.is_active()) .map(|(id, _)| *id) .collect() } /// Get task state. pub async fn get_task_state(&self, task_id: Uuid) -> Option { self.tasks.read().await.get(&task_id).map(|t| t.state) } /// Send status change notification to server. async fn send_status_change(&self, task_id: Uuid, old_status: &str, new_status: &str) { let msg = DaemonMessage::task_status_change(task_id, old_status, new_status); let _ = self.ws_tx.send(msg).await; } // ========================================================================= // Merge Handler Methods // ========================================================================= /// Get worktree path for a task, or return error if not found. /// First checks in-memory tasks, then scans the worktrees directory. async fn get_task_worktree_path(&self, task_id: Uuid) -> Result { // First try to get from in-memory tasks { let tasks = self.tasks.read().await; if let Some(task) = tasks.get(&task_id) { if let Some(ref worktree) = task.worktree { return Ok(worktree.path.clone()); } } } // Task not in memory - scan worktrees directory for matching task ID let short_id = &task_id.to_string()[..8]; let worktrees_dir = self.worktree_manager.base_dir(); if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await { while let Ok(Some(entry)) = entries.next_entry().await { let name = entry.file_name(); let name_str = name.to_string_lossy(); if name_str.starts_with(short_id) { let path = entry.path(); // Verify it's a valid git directory if path.join(".git").exists() { tracing::info!( task_id = %task_id, worktree_path = %path.display(), "Found worktree by scanning directory" ); return Ok(path); } } } } Err(DaemonError::Task(TaskError::SetupFailed( format!("No worktree found for task {}. The worktree may have been cleaned up.", task_id) ))) } /// Handle ListBranches command. async fn handle_list_branches(&self, task_id: Uuid) -> Result<(), DaemonError> { let worktree_path = self.get_task_worktree_path(task_id).await?; match self.worktree_manager.list_task_branches(&worktree_path).await { Ok(branches) => { let branch_infos: Vec = branches .into_iter() .map(|b| BranchInfo { name: b.name, task_id: b.task_id, is_merged: b.is_merged, last_commit: b.last_commit, last_commit_message: b.last_commit_message, }) .collect(); let msg = DaemonMessage::BranchList { task_id, branches: branch_infos, }; let _ = self.ws_tx.send(msg).await; } Err(e) => { tracing::error!(task_id = %task_id, error = %e, "Failed to list branches"); let msg = DaemonMessage::MergeResult { task_id, success: false, message: e.to_string(), commit_sha: None, conflicts: None, }; let _ = self.ws_tx.send(msg).await; } } Ok(()) } /// Handle MergeStart command. async fn handle_merge_start(&self, task_id: Uuid, source_branch: String) -> Result<(), DaemonError> { let worktree_path = self.get_task_worktree_path(task_id).await?; match self.worktree_manager.merge_branch(&worktree_path, &source_branch).await { Ok(None) => { // Merge succeeded without conflicts let msg = DaemonMessage::MergeResult { task_id, success: true, message: "Merge completed without conflicts".to_string(), commit_sha: None, conflicts: None, }; let _ = self.ws_tx.send(msg).await; } Ok(Some(conflicts)) => { // Merge has conflicts let msg = DaemonMessage::MergeResult { task_id, success: false, message: format!("Merge has {} conflicts", conflicts.len()), commit_sha: None, conflicts: Some(conflicts), }; let _ = self.ws_tx.send(msg).await; } Err(e) => { let msg = DaemonMessage::MergeResult { task_id, success: false, message: e.to_string(), commit_sha: None, conflicts: None, }; let _ = self.ws_tx.send(msg).await; } } Ok(()) } /// Handle MergeStatus command. async fn handle_merge_status(&self, task_id: Uuid) -> Result<(), DaemonError> { let worktree_path = self.get_task_worktree_path(task_id).await?; match self.worktree_manager.get_merge_state(&worktree_path).await { Ok(state) => { let msg = DaemonMessage::MergeStatusResponse { task_id, in_progress: state.in_progress, source_branch: if state.in_progress { Some(state.source_branch) } else { None }, conflicted_files: state.conflicted_files, }; let _ = self.ws_tx.send(msg).await; } Err(e) => { tracing::error!(task_id = %task_id, error = %e, "Failed to get merge status"); let msg = DaemonMessage::MergeStatusResponse { task_id, in_progress: false, source_branch: None, conflicted_files: vec![], }; let _ = self.ws_tx.send(msg).await; } } Ok(()) } /// Handle MergeResolve command. async fn handle_merge_resolve(&self, task_id: Uuid, file: String, strategy: String) -> Result<(), DaemonError> { let worktree_path = self.get_task_worktree_path(task_id).await?; let resolution = match strategy.to_lowercase().as_str() { "ours" => ConflictResolution::Ours, "theirs" => ConflictResolution::Theirs, _ => { let msg = DaemonMessage::MergeResult { task_id, success: false, message: format!("Invalid strategy '{}', must be 'ours' or 'theirs'", strategy), commit_sha: None, conflicts: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); } }; match self.worktree_manager.resolve_conflict(&worktree_path, &file, resolution).await { Ok(()) => { let msg = DaemonMessage::MergeResult { task_id, success: true, message: format!("Resolved conflict in {}", file), commit_sha: None, conflicts: None, }; let _ = self.ws_tx.send(msg).await; } Err(e) => { let msg = DaemonMessage::MergeResult { task_id, success: false, message: e.to_string(), commit_sha: None, conflicts: None, }; let _ = self.ws_tx.send(msg).await; } } Ok(()) } /// Handle MergeCommit command. async fn handle_merge_commit(&self, task_id: Uuid, message: String) -> Result<(), DaemonError> { let worktree_path = self.get_task_worktree_path(task_id).await?; match self.worktree_manager.commit_merge(&worktree_path, &message).await { Ok(commit_sha) => { // Track this merge as completed (extract subtask ID from branch if possible) // For now, we'll track it when MergeSkip is called or based on branch names let msg = DaemonMessage::MergeResult { task_id, success: true, message: "Merge committed successfully".to_string(), commit_sha: Some(commit_sha), conflicts: None, }; let _ = self.ws_tx.send(msg).await; } Err(e) => { let msg = DaemonMessage::MergeResult { task_id, success: false, message: e.to_string(), commit_sha: None, conflicts: None, }; let _ = self.ws_tx.send(msg).await; } } Ok(()) } /// Handle MergeAbort command. async fn handle_merge_abort(&self, task_id: Uuid) -> Result<(), DaemonError> { let worktree_path = self.get_task_worktree_path(task_id).await?; match self.worktree_manager.abort_merge(&worktree_path).await { Ok(()) => { let msg = DaemonMessage::MergeResult { task_id, success: true, message: "Merge aborted".to_string(), commit_sha: None, conflicts: None, }; let _ = self.ws_tx.send(msg).await; } Err(e) => { let msg = DaemonMessage::MergeResult { task_id, success: false, message: e.to_string(), commit_sha: None, conflicts: None, }; let _ = self.ws_tx.send(msg).await; } } Ok(()) } /// Handle MergeSkip command. async fn handle_merge_skip(&self, task_id: Uuid, subtask_id: Uuid, reason: String) -> Result<(), DaemonError> { // Record that this subtask was skipped { let mut trackers = self.merge_trackers.write().await; let tracker = trackers.entry(task_id).or_insert_with(MergeTracker::default); tracker.skipped_subtasks.insert(subtask_id, reason.clone()); } let msg = DaemonMessage::MergeResult { task_id, success: true, message: format!("Subtask {} skipped: {}", subtask_id, reason), commit_sha: None, conflicts: None, }; let _ = self.ws_tx.send(msg).await; Ok(()) } /// Handle CheckMergeComplete command. async fn handle_check_merge_complete(&self, task_id: Uuid) -> Result<(), DaemonError> { let worktree_path = self.get_task_worktree_path(task_id).await?; // Get all task branches let branches = match self.worktree_manager.list_task_branches(&worktree_path).await { Ok(b) => b, Err(e) => { let msg = DaemonMessage::MergeCompleteCheck { task_id, can_complete: false, unmerged_branches: vec![format!("Error listing branches: {}", e)], merged_count: 0, skipped_count: 0, }; let _ = self.ws_tx.send(msg).await; return Ok(()); } }; // Get tracker state let trackers = self.merge_trackers.read().await; let empty_merged: HashSet = HashSet::new(); let empty_skipped: HashMap = HashMap::new(); let tracker = trackers.get(&task_id); let merged_set = tracker.map(|t| &t.merged_subtasks).unwrap_or(&empty_merged); let skipped_set = tracker.map(|t| &t.skipped_subtasks).unwrap_or(&empty_skipped); let mut merged_count = 0u32; let mut skipped_count = 0u32; let mut unmerged_branches = Vec::new(); for branch in &branches { if branch.is_merged { merged_count += 1; } else if let Some(subtask_id) = branch.task_id { if merged_set.contains(&subtask_id) { merged_count += 1; } else if skipped_set.contains_key(&subtask_id) { skipped_count += 1; } else { unmerged_branches.push(branch.name.clone()); } } else { // Branch without task ID - check if it's merged unmerged_branches.push(branch.name.clone()); } } let can_complete = unmerged_branches.is_empty(); let msg = DaemonMessage::MergeCompleteCheck { task_id, can_complete, unmerged_branches, merged_count, skipped_count, }; let _ = self.ws_tx.send(msg).await; Ok(()) } /// Mark a subtask as merged in the tracker. #[allow(dead_code)] pub async fn mark_subtask_merged(&self, orchestrator_task_id: Uuid, subtask_id: Uuid) { let mut trackers = self.merge_trackers.write().await; let tracker = trackers.entry(orchestrator_task_id).or_insert_with(MergeTracker::default); tracker.merged_subtasks.insert(subtask_id); } // ========================================================================= // Completion Action Handler Methods // ========================================================================= /// Handle RetryCompletionAction command. async fn handle_retry_completion_action( &self, task_id: Uuid, task_name: String, action: String, target_repo_path: String, target_branch: Option, ) -> Result<(), DaemonError> { // Get the task's worktree path let worktree_path = self.get_task_worktree_path(task_id).await?; // Execute the completion action let inner = self.clone_inner(); let result = inner.execute_completion_action( task_id, &task_name, &worktree_path, &action, Some(target_repo_path.as_str()), target_branch.as_deref(), ).await; // Send result back to server let msg = match result { Ok(pr_url) => DaemonMessage::CompletionActionResult { task_id, success: true, message: match action.as_str() { "branch" => format!("Branch pushed to {}", target_repo_path), "merge" => format!("Merged into {}", target_branch.as_deref().unwrap_or("main")), "pr" => format!("Pull request created"), _ => format!("Completion action '{}' executed", action), }, pr_url, }, Err(e) => DaemonMessage::CompletionActionResult { task_id, success: false, message: e, pr_url: None, }, }; let _ = self.ws_tx.send(msg).await; Ok(()) } /// Handle CloneWorktree command. async fn handle_clone_worktree( &self, task_id: Uuid, target_dir: String, ) -> Result<(), DaemonError> { // Get the task's worktree path let worktree_path = self.get_task_worktree_path(task_id).await?; // Expand tilde in target path let target_path = crate::daemon::worktree::expand_tilde(&target_dir); // Clone the worktree to target directory let result = self.worktree_manager.clone_worktree_to_directory( &worktree_path, &target_path, ).await; // Send result back to server let msg = match result { Ok(message) => DaemonMessage::CloneWorktreeResult { task_id, success: true, message, target_dir: Some(target_path.to_string_lossy().to_string()), }, Err(e) => DaemonMessage::CloneWorktreeResult { task_id, success: false, message: e.to_string(), target_dir: None, }, }; let _ = self.ws_tx.send(msg).await; Ok(()) } /// Handle CheckTargetExists command. async fn handle_check_target_exists( &self, task_id: Uuid, target_dir: String, ) -> Result<(), DaemonError> { // Expand tilde in target path let target_path = crate::daemon::worktree::expand_tilde(&target_dir); // Check if target exists let exists = self.worktree_manager.target_directory_exists(&target_path).await; // Send result back to server let msg = DaemonMessage::CheckTargetExistsResult { task_id, exists, target_dir: target_path.to_string_lossy().to_string(), }; let _ = self.ws_tx.send(msg).await; Ok(()) } /// Handle CleanupWorktree command. /// /// Removes a task's worktree and optionally its branch. /// Used when a contract is completed or deleted to clean up associated task worktrees. async fn handle_cleanup_worktree( &self, task_id: Uuid, delete_branch: bool, ) -> Result<(), DaemonError> { // Try to get the worktree path, but don't fail if not found let worktree_result = self.get_task_worktree_path(task_id).await; let (success, message) = match worktree_result { Ok(worktree_path) => { // Remove the worktree match self.worktree_manager.remove_worktree(&worktree_path, delete_branch).await { Ok(()) => { tracing::info!( task_id = %task_id, worktree_path = %worktree_path.display(), delete_branch = delete_branch, "Worktree cleaned up successfully" ); // Also remove task from in-memory tracking self.tasks.write().await.remove(&task_id); self.task_inputs.write().await.remove(&task_id); self.merge_trackers.write().await.remove(&task_id); self.active_pids.write().await.remove(&task_id); (true, format!("Worktree cleaned up: {}", worktree_path.display())) } Err(e) => { tracing::warn!( task_id = %task_id, worktree_path = %worktree_path.display(), error = %e, "Failed to remove worktree" ); (false, format!("Failed to remove worktree: {}", e)) } } } Err(_) => { // Worktree not found - this is OK, it may have already been cleaned up tracing::debug!( task_id = %task_id, "No worktree found for task, may have already been cleaned up" ); // Still remove from in-memory tracking self.tasks.write().await.remove(&task_id); self.task_inputs.write().await.remove(&task_id); self.merge_trackers.write().await.remove(&task_id); self.active_pids.write().await.remove(&task_id); (true, "No worktree found, task tracking cleaned up".to_string()) } }; // Send result back to server let msg = DaemonMessage::CleanupWorktreeResult { task_id, success, message, }; let _ = self.ws_tx.send(msg).await; Ok(()) } /// Handle CreateExportPatch command. /// /// Creates an uncompressed, human-readable git patch for export. async fn handle_create_export_patch( &self, task_id: Uuid, base_sha: Option, ) -> Result<(), DaemonError> { // Get task's worktree path let worktree_result = self.get_task_worktree_path(task_id).await; let msg = match worktree_result { Ok(worktree_path) => { // Create the export patch match storage::create_export_patch(&worktree_path, base_sha.as_deref()).await { Ok(result) => { tracing::info!( task_id = %task_id, files_count = result.files_count, lines_added = result.lines_added, lines_removed = result.lines_removed, base_commit_sha = %result.base_commit_sha, "Export patch created successfully" ); DaemonMessage::ExportPatchCreated { task_id, success: true, patch_content: Some(result.patch_content), files_count: Some(result.files_count), lines_added: Some(result.lines_added), lines_removed: Some(result.lines_removed), base_commit_sha: Some(result.base_commit_sha), error: None, } } Err(e) => { tracing::warn!( task_id = %task_id, error = %e, "Failed to create export patch" ); DaemonMessage::ExportPatchCreated { task_id, success: false, patch_content: None, files_count: None, lines_added: None, lines_removed: None, base_commit_sha: None, error: Some(e.to_string()), } } } } Err(e) => { tracing::warn!( task_id = %task_id, error = %e, "Failed to get worktree path for export patch" ); DaemonMessage::ExportPatchCreated { task_id, success: false, patch_content: None, files_count: None, lines_added: None, lines_removed: None, base_commit_sha: None, error: Some(format!("Task not found or has no worktree: {}", e)), } } }; let _ = self.ws_tx.send(msg).await; Ok(()) } /// Handle ReadRepoFile command. /// /// Reads a file from a repository on the daemon's filesystem and sends /// the content back to the server for syncing contract files. async fn handle_read_repo_file( &self, request_id: Uuid, file_path: String, repo_path: String, ) -> Result<(), DaemonError> { // Expand tilde in repo path let repo_path_expanded = crate::daemon::worktree::expand_tilde(&repo_path); // Construct full file path let full_path = repo_path_expanded.join(&file_path); // Try to read the file let (content, success, error) = match tokio::fs::read_to_string(&full_path).await { Ok(content) => (Some(content), true, None), Err(e) => { tracing::warn!( request_id = %request_id, file_path = %file_path, repo_path = %repo_path, full_path = %full_path.display(), error = %e, "Failed to read repo file" ); (None, false, Some(e.to_string())) } }; // Send result back to server let msg = DaemonMessage::RepoFileContent { request_id, file_path, content, success, error, }; let _ = self.ws_tx.send(msg).await; Ok(()) } /// Handle CreateBranch command - create a new branch in a task's worktree. async fn handle_create_branch( &self, task_id: Uuid, branch_name: String, from_ref: Option, ) -> Result<(), DaemonError> { // Get task's worktree path let worktree_path = { let tasks = self.tasks.read().await; tasks.get(&task_id) .and_then(|t| t.worktree.as_ref()) .map(|w| w.path.clone()) }; let (success, message) = if let Some(path) = worktree_path { // Build git checkout command let mut cmd = tokio::process::Command::new("git"); cmd.current_dir(&path); cmd.arg("checkout").arg("-b").arg(&branch_name); if let Some(ref from) = from_ref { cmd.arg(from); } match cmd.output().await { Ok(output) => { if output.status.success() { (true, format!("Branch '{}' created successfully", branch_name)) } else { let stderr = String::from_utf8_lossy(&output.stderr); (false, format!("Failed to create branch: {}", stderr)) } } Err(e) => (false, format!("Failed to execute git: {}", e)), } } else { (false, format!("Task {} not found or has no worktree", task_id)) }; let msg = DaemonMessage::BranchCreated { task_id, success, branch_name, message, }; let _ = self.ws_tx.send(msg).await; Ok(()) } /// Handle MergeTaskToTarget command - merge a task's changes to a target branch. async fn handle_merge_task_to_target( &self, task_id: Uuid, target_branch: Option, squash: bool, ) -> Result<(), DaemonError> { // Get worktree path - this works even for completed tasks by scanning worktrees directory let worktree_path = match self.get_task_worktree_path(task_id).await { Ok(path) => path, Err(e) => { tracing::warn!(task_id = %task_id, error = %e, "Failed to find worktree for merge"); let msg = DaemonMessage::MergeToTargetResult { task_id, success: false, message: format!("Task {} not found or has no worktree: {}", task_id, e), commit_sha: None, conflicts: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); } }; // Get base_branch from in-memory tasks if available (for fallback target branch) let base_branch = { let tasks = self.tasks.read().await; tasks.get(&task_id).and_then(|t| t.base_branch.clone()) }; let target = target_branch.unwrap_or_else(|| base_branch.unwrap_or_else(|| "main".to_string())); tracing::info!( task_id = %task_id, worktree_path = %worktree_path.display(), target_branch = %target, squash = squash, "Starting merge operation" ); // First, stage and commit any uncommitted changes let add_result = tokio::process::Command::new("git") .current_dir(&worktree_path) .args(["add", "-A"]) .output() .await; let (success, message, commit_sha, conflicts) = if let Err(e) = add_result { (false, format!("Failed to stage changes: {}", e), None, None) } else { // Commit if there are staged changes let commit_result = tokio::process::Command::new("git") .current_dir(&worktree_path) .args(["commit", "-m", "Task completion checkpoint", "--allow-empty"]) .output() .await; if let Err(e) = commit_result { tracing::warn!(task_id = %task_id, error = %e, "Commit failed (may be empty)"); } // Get current branch name let branch_output = tokio::process::Command::new("git") .current_dir(&worktree_path) .args(["rev-parse", "--abbrev-ref", "HEAD"]) .output() .await; let source_branch = branch_output .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string()) .unwrap_or_else(|_| "unknown".to_string()); // Checkout target branch let checkout = tokio::process::Command::new("git") .current_dir(&worktree_path) .args(["checkout", &target]) .output() .await; match checkout { Ok(output) if output.status.success() => { // Merge the source branch let mut merge_cmd = tokio::process::Command::new("git"); merge_cmd.current_dir(&worktree_path); merge_cmd.arg("merge"); if squash { merge_cmd.arg("--squash"); } merge_cmd.arg(&source_branch); merge_cmd.arg("-m").arg(format!("Merge task {} into {}", task_id, target)); match merge_cmd.output().await { Ok(output) if output.status.success() => { // Get the commit SHA let sha_output = tokio::process::Command::new("git") .current_dir(&worktree_path) .args(["rev-parse", "HEAD"]) .output() .await; let sha = sha_output .ok() .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string()); if squash { // For squash merge, we need to commit let _ = tokio::process::Command::new("git") .current_dir(&worktree_path) .args(["commit", "-m", &format!("Squashed merge of task {}", task_id)]) .output() .await; } (true, format!("Merged {} into {}", source_branch, target), sha, None) } Ok(output) => { let stderr = String::from_utf8_lossy(&output.stderr); // Check for merge conflicts if stderr.contains("CONFLICT") { let conflict_files = stderr .lines() .filter(|l| l.contains("CONFLICT")) .map(|l| l.to_string()) .collect::>(); (false, "Merge conflicts detected".to_string(), None, Some(conflict_files)) } else { (false, format!("Merge failed: {}", stderr), None, None) } } Err(e) => (false, format!("Failed to merge: {}", e), None, None), } } Ok(output) => { let stderr = String::from_utf8_lossy(&output.stderr); (false, format!("Failed to checkout target branch: {}", stderr), None, None) } Err(e) => (false, format!("Failed to checkout: {}", e), None, None), } }; let msg = DaemonMessage::MergeToTargetResult { task_id, success, message, commit_sha, conflicts, }; let _ = self.ws_tx.send(msg).await; Ok(()) } /// Handle CreatePR command - create a pull request for a task's changes. async fn handle_create_pr( &self, task_id: Uuid, title: String, body: Option, base_branch: Option, branch: String, ) -> Result<(), DaemonError> { // Get worktree path - this works even for completed tasks by scanning worktrees directory let worktree_path = match self.get_task_worktree_path(task_id).await { Ok(path) => path, Err(e) => { tracing::error!(task_id = %task_id, error = %e, "Failed to find worktree for PR creation"); let msg = DaemonMessage::PRCreated { task_id, success: false, message: format!("Task {} not found or has no worktree: {}", task_id, e), pr_url: None, pr_number: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); } }; // Detect base branch if not provided let base_branch = match base_branch { Some(b) => b, None => { match self.worktree_manager.detect_default_branch(&worktree_path).await { Ok(detected) => { tracing::info!(task_id = %task_id, detected_branch = %detected, "Auto-detected base branch"); detected } Err(e) => { tracing::error!(task_id = %task_id, error = %e, "Failed to detect default branch"); let msg = DaemonMessage::PRCreated { task_id, success: false, message: format!("Failed to detect default branch: {}", e), pr_url: None, pr_number: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); } } } }; tracing::info!( task_id = %task_id, base_branch = %base_branch, branch = %branch, worktree_path = %worktree_path.display(), "Creating PR" ); // Push the branch to origin let push_refspec = format!("HEAD:refs/heads/{}", branch); let push_result = tokio::process::Command::new("git") .current_dir(&worktree_path) .args(["push", "-u", "origin", &push_refspec]) .output() .await; let (success, message, pr_url, pr_number) = match push_result { Err(e) => { tracing::error!(error = %e, "Failed to execute git push"); (false, format!("Failed to push branch: {}", e), None, None) } Ok(output) if !output.status.success() => { let stderr = String::from_utf8_lossy(&output.stderr); tracing::error!(stderr = %stderr, "git push failed"); (false, format!("Failed to push branch: {}", stderr), None, None) } Ok(_) => { tracing::info!("Branch pushed successfully, creating PR"); // Create PR using gh CLI let mut pr_cmd = tokio::process::Command::new("gh"); pr_cmd.current_dir(&worktree_path); pr_cmd.args(["pr", "create", "--title", &title, "--base", &base_branch, "--head", &branch]); if let Some(ref body_text) = body { pr_cmd.args(["--body", body_text]); } else { pr_cmd.args(["--body", ""]); } match pr_cmd.output().await { Ok(output) if output.status.success() => { let stdout = String::from_utf8_lossy(&output.stdout); // gh pr create outputs the PR URL let url = stdout.lines().last().map(|s| s.trim().to_string()); // Extract PR number from URL let number = url.as_ref().and_then(|u| { u.split('/').last().and_then(|n| n.parse::().ok()) }); tracing::info!(pr_url = ?url, pr_number = ?number, "PR created successfully"); (true, "Pull request created".to_string(), url, number) } Ok(output) => { let stderr = String::from_utf8_lossy(&output.stderr); tracing::error!(stderr = %stderr, "gh pr create failed"); (false, format!("Failed to create PR: {}", stderr), None, None) } Err(e) => { tracing::error!(error = %e, "Failed to execute gh command"); (false, format!("Failed to run gh: {}", e), None, None) } } } }; tracing::info!( task_id = %task_id, success = success, message = %message, pr_url = ?pr_url, "PR creation completed" ); let msg = DaemonMessage::PRCreated { task_id, success, message, pr_url, pr_number, }; let _ = self.ws_tx.send(msg).await; Ok(()) } /// Handle GetTaskDiff command - get the diff for a task's changes. async fn handle_get_task_diff( &self, task_id: Uuid, ) -> Result<(), DaemonError> { // Get task's worktree path let worktree_path = { let tasks = self.tasks.read().await; tasks.get(&task_id) .and_then(|t| t.worktree.as_ref()) .map(|w| w.path.clone()) }; let (success, diff, error) = if let Some(path) = worktree_path { // Get diff of all changes (staged and unstaged) let diff_result = tokio::process::Command::new("git") .current_dir(&path) .args(["diff", "HEAD"]) .output() .await; match diff_result { Ok(output) if output.status.success() => { let diff_text = String::from_utf8_lossy(&output.stdout).to_string(); if diff_text.is_empty() { // No uncommitted changes, show diff from base let base_diff = tokio::process::Command::new("git") .current_dir(&path) .args(["log", "-p", "--reverse", "HEAD~10..HEAD", "--"]) .output() .await; match base_diff { Ok(o) => (true, Some(String::from_utf8_lossy(&o.stdout).to_string()), None), Err(e) => (false, None, Some(format!("Failed to get diff: {}", e))), } } else { (true, Some(diff_text), None) } } Ok(output) => { let stderr = String::from_utf8_lossy(&output.stderr); (false, None, Some(format!("Git diff failed: {}", stderr))) } Err(e) => (false, None, Some(format!("Failed to run git: {}", e))), } } else { (false, None, Some(format!("Task {} not found or has no worktree", task_id))) }; let msg = DaemonMessage::TaskDiff { task_id, success, diff, error, }; let _ = self.ws_tx.send(msg).await; Ok(()) } /// Handle CommitWorktree command - stage and commit changes in a task's worktree. async fn handle_commit_worktree( &self, task_id: Uuid, message: Option, ) -> Result<(), DaemonError> { // Get task's worktree path let worktree_path = { let tasks = self.tasks.read().await; tasks.get(&task_id) .and_then(|t| t.worktree.as_ref()) .map(|w| w.path.clone()) }; let (success, commit_sha, error) = if let Some(path) = worktree_path { // Step 1: Check if there are changes to commit let status_output = tokio::process::Command::new("git") .current_dir(&path) .args(["status", "--porcelain"]) .output() .await; let has_changes = match &status_output { Ok(output) => !output.stdout.is_empty(), Err(_) => false, }; if !has_changes { (true, None, Some("No changes to commit".to_string())) } else { // Step 2: Stage all changes let add_result = tokio::process::Command::new("git") .current_dir(&path) .args(["add", "-A"]) .output() .await; match add_result { Ok(output) if output.status.success() => { // Step 3: Commit let commit_msg = message.unwrap_or_else(|| "Worktree commit".to_string()); let commit_result = tokio::process::Command::new("git") .current_dir(&path) .args(["commit", "-m", &commit_msg]) .output() .await; match commit_result { Ok(output) if output.status.success() => { // Step 4: Get commit SHA let sha_output = tokio::process::Command::new("git") .current_dir(&path) .args(["rev-parse", "HEAD"]) .output() .await; let sha = sha_output.ok() .filter(|o| o.status.success()) .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string()); (true, sha, None) } Ok(output) => { let stderr = String::from_utf8_lossy(&output.stderr).to_string(); (false, None, Some(format!("Git commit failed: {}", stderr))) } Err(e) => (false, None, Some(format!("Failed to run git commit: {}", e))), } } Ok(output) => { let stderr = String::from_utf8_lossy(&output.stderr).to_string(); (false, None, Some(format!("Failed to stage changes: {}", stderr))) } Err(e) => (false, None, Some(format!("Failed to run git add: {}", e))), } } } else { (false, None, Some(format!("Task {} not found or has no worktree", task_id))) }; let msg = DaemonMessage::WorktreeCommitResult { task_id, success, commit_sha, error, }; let _ = self.ws_tx.send(msg).await; Ok(()) } /// Handle GetWorktreeInfo command - get worktree files, stats, branch info. async fn handle_get_worktree_info( &self, task_id: Uuid, ) -> Result<(), DaemonError> { // Get task's worktree path, branch, and base_branch // If the task shares a supervisor's worktree, use the supervisor's worktree info let task_info = { let tasks = self.tasks.read().await; if let Some(task) = tasks.get(&task_id) { // Check if this task shares a supervisor's worktree if let Some(supervisor_task_id) = task.supervisor_worktree_task_id { // Use the supervisor's worktree tasks.get(&supervisor_task_id).map(|supervisor| ( supervisor.worktree.as_ref().map(|w| w.path.clone()), supervisor.worktree.as_ref().map(|w| w.branch.clone()), supervisor.base_branch.clone(), )) } else { // Use the task's own worktree Some(( task.worktree.as_ref().map(|w| w.path.clone()), task.worktree.as_ref().map(|w| w.branch.clone()), task.base_branch.clone(), )) } } else { None } }; let (worktree_path, branch, base_branch) = match task_info { Some((Some(path), branch, base_branch)) => (Some(path), branch, base_branch), Some((None, _, _)) => (None, None, None), None => (None, None, None), }; if worktree_path.is_none() { let msg = DaemonMessage::WorktreeInfoResult { task_id, success: true, worktree_path: None, exists: false, files_changed: 0, insertions: 0, deletions: 0, files: None, branch: None, head_sha: None, error: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); } let path = worktree_path.unwrap(); let path_str = path.to_string_lossy().to_string(); // Check if worktree exists if !path.exists() { let msg = DaemonMessage::WorktreeInfoResult { task_id, success: true, worktree_path: Some(path_str), exists: false, files_changed: 0, insertions: 0, deletions: 0, files: None, branch, head_sha: None, error: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); } // Get HEAD SHA let head_sha = match tokio::process::Command::new("git") .current_dir(&path) .args(["rev-parse", "HEAD"]) .output() .await { Ok(output) if output.status.success() => { Some(String::from_utf8_lossy(&output.stdout).trim().to_string()) } _ => None, }; // Get changed files with status using git status --porcelain let status_output = tokio::process::Command::new("git") .current_dir(&path) .args(["status", "--porcelain"]) .output() .await; let uncommitted_status_lines: Vec<(String, String)> = match status_output { Ok(output) if output.status.success() => { String::from_utf8_lossy(&output.stdout) .lines() .filter_map(|line| { if line.len() < 3 { return None; } let status = line[0..2].trim().to_string(); let file_path = line[3..].to_string(); Some((file_path, status)) }) .collect() } _ => vec![], }; // If there are uncommitted changes, use them. Otherwise, compare against base branch. // Track effective_base_branch for reuse in numstat query let (status_lines, effective_base_for_diff) = if !uncommitted_status_lines.is_empty() { (uncommitted_status_lines, None) } else { // No uncommitted changes - try to get committed changes vs base branch // First, try to detect the base branch if not provided let effective_base_branch = if let Some(ref base) = base_branch { Some(base.clone()) } else { // Auto-detect the default branch self.worktree_manager.detect_default_branch(&path).await.ok() }; if let Some(ref base) = effective_base_branch { // Resolve the best diff base reference, handling missing remote refs let resolved_diff_base = Self::resolve_diff_base(&path, base).await; if let Some(ref diff_base) = resolved_diff_base { // Get committed changes using git diff --name-status let name_status_output = tokio::process::Command::new("git") .current_dir(&path) .args(["diff", "--name-status", diff_base]) .output() .await; let committed_status_lines: Vec<(String, String)> = match name_status_output { Ok(output) if output.status.success() => { String::from_utf8_lossy(&output.stdout) .lines() .filter_map(|line| { let parts: Vec<&str> = line.splitn(2, '\t').collect(); if parts.len() >= 2 { let status = parts[0].trim().to_string(); let file_path = parts[1].to_string(); Some((file_path, status)) } else { None } }) .collect() } Ok(output) => { let stderr = String::from_utf8_lossy(&output.stderr); tracing::warn!( diff_base = %diff_base, stderr = %stderr, "git diff --name-status failed with resolved diff base", ); vec![] } Err(e) => { tracing::warn!( error = %e, diff_base = %diff_base, "Failed to execute git diff --name-status", ); vec![] } }; if !committed_status_lines.is_empty() { (committed_status_lines, resolved_diff_base) } else { (vec![], None) } } else { (vec![], None) } } else { (vec![], None) } }; // Get numstat for line counts // If we have effective_base_for_diff (a resolved diff base string), use it directly // Otherwise compare against HEAD for uncommitted changes let mut file_stats: std::collections::HashMap = std::collections::HashMap::new(); let numstat_output = if let Some(ref diff_base) = effective_base_for_diff { tokio::process::Command::new("git") .current_dir(&path) .args(["diff", "--numstat", diff_base]) .output() .await } else { tokio::process::Command::new("git") .current_dir(&path) .args(["diff", "HEAD", "--numstat"]) .output() .await }; if let Ok(output) = numstat_output { if output.status.success() { for line in String::from_utf8_lossy(&output.stdout).lines() { let parts: Vec<&str> = line.split('\t').collect(); if parts.len() >= 3 { let added = parts[0].parse::().unwrap_or(0); let removed = parts[1].parse::().unwrap_or(0); let file = parts[2].to_string(); file_stats.insert(file, (added, removed)); } } } } // Build file list with stats let mut files_json = Vec::new(); let mut total_insertions = 0; let mut total_deletions = 0; for (file_path, status) in &status_lines { let (lines_added, lines_removed) = file_stats.get(file_path).copied().unwrap_or((0, 0)); total_insertions += lines_added; total_deletions += lines_removed; files_json.push(serde_json::json!({ "path": file_path, "status": status, "linesAdded": lines_added, "linesRemoved": lines_removed, })); } let msg = DaemonMessage::WorktreeInfoResult { task_id, success: true, worktree_path: Some(path_str), exists: true, files_changed: status_lines.len() as i32, insertions: total_insertions, deletions: total_deletions, files: Some(serde_json::Value::Array(files_json)), branch, head_sha, error: None, }; let _ = self.ws_tx.send(msg).await; Ok(()) } /// Handle GetWorktreeDiff command - get git diff for a task's worktree. async fn handle_get_worktree_diff( &self, task_id: Uuid, file_path: Option, ) -> Result<(), DaemonError> { // Get task's worktree path, branch, and base_branch // If the task shares a supervisor's worktree, use the supervisor's worktree info let task_info = { let tasks = self.tasks.read().await; if let Some(task) = tasks.get(&task_id) { if let Some(supervisor_task_id) = task.supervisor_worktree_task_id { tasks.get(&supervisor_task_id).map(|supervisor| ( supervisor.worktree.as_ref().map(|w| w.path.clone()), supervisor.base_branch.clone(), )) } else { Some(( task.worktree.as_ref().map(|w| w.path.clone()), task.base_branch.clone(), )) } } else { None } }; let (worktree_path, base_branch) = match task_info { Some((Some(path), base_branch)) => (path, base_branch), _ => { let msg = DaemonMessage::WorktreeDiffResult { task_id, success: true, diff: Some(String::new()), error: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); } }; if !worktree_path.exists() { let msg = DaemonMessage::WorktreeDiffResult { task_id, success: false, diff: None, error: Some("Worktree path does not exist".to_string()), }; let _ = self.ws_tx.send(msg).await; return Ok(()); } // Check for uncommitted changes first let status_output = tokio::process::Command::new("git") .current_dir(&worktree_path) .args(["status", "--porcelain"]) .output() .await; let has_uncommitted = match &status_output { Ok(output) if output.status.success() => { !String::from_utf8_lossy(&output.stdout).trim().is_empty() } _ => false, }; let diff_result = if has_uncommitted { // Get diff for uncommitted changes (both staged and unstaged) let mut args = vec!["diff".to_string(), "HEAD".to_string()]; if let Some(ref fp) = file_path { args.push("--".to_string()); args.push(fp.clone()); } let output = tokio::process::Command::new("git") .current_dir(&worktree_path) .args(&args) .output() .await; match output { Ok(out) if out.status.success() => { let diff = String::from_utf8_lossy(&out.stdout).to_string(); // If diff is empty (e.g., for new untracked files), try git diff (no HEAD) // and also try to show untracked file content if diff.is_empty() { // Try to show untracked files as diffs let mut args2 = vec!["diff".to_string()]; if let Some(ref fp) = file_path { args2.push("--".to_string()); args2.push(fp.clone()); } let output2 = tokio::process::Command::new("git") .current_dir(&worktree_path) .args(&args2) .output() .await; match output2 { Ok(out2) if out2.status.success() => { Ok(String::from_utf8_lossy(&out2.stdout).to_string()) } _ => Ok(diff), } } else { Ok(diff) } } Ok(out) => Err(String::from_utf8_lossy(&out.stderr).to_string()), Err(e) => Err(format!("Failed to run git diff: {}", e)), } } else { // No uncommitted changes - compare against base branch let effective_base_branch = if let Some(ref base) = base_branch { Some(base.clone()) } else { self.worktree_manager.detect_default_branch(&worktree_path).await.ok() }; if let Some(ref base) = effective_base_branch { let diff_base = format!("origin/{}...HEAD", base); let mut args = vec!["diff".to_string(), diff_base]; if let Some(ref fp) = file_path { args.push("--".to_string()); args.push(fp.clone()); } let output = tokio::process::Command::new("git") .current_dir(&worktree_path) .args(&args) .output() .await; match output { Ok(out) if out.status.success() => { Ok(String::from_utf8_lossy(&out.stdout).to_string()) } Ok(out) => Err(String::from_utf8_lossy(&out.stderr).to_string()), Err(e) => Err(format!("Failed to run git diff: {}", e)), } } else { Ok(String::new()) } }; let msg = match diff_result { Ok(diff) => DaemonMessage::WorktreeDiffResult { task_id, success: true, diff: Some(diff), error: None, }, Err(e) => DaemonMessage::WorktreeDiffResult { task_id, success: false, diff: None, error: Some(e), }, }; let _ = self.ws_tx.send(msg).await; Ok(()) } /// Handle CreateCheckpoint command - stage all changes, commit, and get stats. async fn handle_create_checkpoint( &self, task_id: Uuid, message: String, ) -> Result<(), DaemonError> { // Get task's worktree path and branch name let task_info = { let tasks = self.tasks.read().await; tasks.get(&task_id).map(|t| ( t.worktree.as_ref().map(|w| w.path.clone()), t.worktree.as_ref().map(|w| w.branch.clone()), )) }; let (worktree_path, branch_name) = match task_info { Some((Some(path), Some(branch))) => (path, branch), Some((Some(path), None)) => { // Try to get current branch from git let branch = self.get_current_branch(&path).await.unwrap_or_else(|| "unknown".to_string()); (path, branch) } _ => { let msg = DaemonMessage::CheckpointCreated { task_id, success: false, commit_sha: None, branch_name: None, checkpoint_number: None, files_changed: None, lines_added: None, lines_removed: None, error: Some(format!("Task {} not found or has no worktree", task_id)), message, patch_data: None, patch_base_sha: None, patch_files_count: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); } }; // Step 1: Check if there are changes to commit let status_output = tokio::process::Command::new("git") .current_dir(&worktree_path) .args(["status", "--porcelain"]) .output() .await; let has_changes = match &status_output { Ok(output) => !output.stdout.is_empty(), Err(_) => false, }; if !has_changes { let msg = DaemonMessage::CheckpointCreated { task_id, success: false, commit_sha: None, branch_name: Some(branch_name), checkpoint_number: None, files_changed: None, lines_added: None, lines_removed: None, error: Some("No changes to checkpoint".to_string()), message, patch_data: None, patch_base_sha: None, patch_files_count: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); } // Step 2: Stage all changes let add_result = tokio::process::Command::new("git") .current_dir(&worktree_path) .args(["add", "-A"]) .output() .await; if let Err(e) = add_result { let msg = DaemonMessage::CheckpointCreated { task_id, success: false, commit_sha: None, branch_name: Some(branch_name), checkpoint_number: None, files_changed: None, lines_added: None, lines_removed: None, error: Some(format!("Failed to stage changes: {}", e)), message, patch_data: None, patch_base_sha: None, patch_files_count: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); } // Step 3: Get diff stats before commit let (lines_added, lines_removed, files_changed) = self.get_staged_diff_stats(&worktree_path).await; // Step 4: Create commit let commit_result = tokio::process::Command::new("git") .current_dir(&worktree_path) .args(["commit", "-m", &message]) .output() .await; let commit_sha = match commit_result { Ok(output) if output.status.success() => { // Get the commit SHA let sha_output = tokio::process::Command::new("git") .current_dir(&worktree_path) .args(["rev-parse", "HEAD"]) .output() .await; match sha_output { Ok(o) => Some(String::from_utf8_lossy(&o.stdout).trim().to_string()), Err(_) => None, } } Ok(output) => { let stderr = String::from_utf8_lossy(&output.stderr); let msg = DaemonMessage::CheckpointCreated { task_id, success: false, commit_sha: None, branch_name: Some(branch_name), checkpoint_number: None, files_changed: Some(files_changed), lines_added: Some(lines_added), lines_removed: Some(lines_removed), error: Some(format!("Commit failed: {}", stderr)), message, patch_data: None, patch_base_sha: None, patch_files_count: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); } Err(e) => { let msg = DaemonMessage::CheckpointCreated { task_id, success: false, commit_sha: None, branch_name: Some(branch_name), checkpoint_number: None, files_changed: None, lines_added: None, lines_removed: None, error: Some(format!("Failed to execute git commit: {}", e)), message, patch_data: None, patch_base_sha: None, patch_files_count: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); } }; // Success - send response (checkpoint_number will be assigned by server on DB insert) // Note: Manual checkpoints don't include patches (only heartbeat commits do) let msg = DaemonMessage::CheckpointCreated { task_id, success: true, commit_sha, branch_name: Some(branch_name), checkpoint_number: None, // Server will assign from DB files_changed: Some(files_changed), lines_added: Some(lines_added), lines_removed: Some(lines_removed), error: None, message, patch_data: None, patch_base_sha: None, patch_files_count: None, }; let _ = self.ws_tx.send(msg).await; Ok(()) } /// Get the current branch name from a worktree. async fn get_current_branch(&self, worktree_path: &std::path::PathBuf) -> Option { let output = tokio::process::Command::new("git") .current_dir(worktree_path) .args(["branch", "--show-current"]) .output() .await .ok()?; if output.status.success() { Some(String::from_utf8_lossy(&output.stdout).trim().to_string()) } else { None } } /// Get diff stats for staged changes. async fn get_staged_diff_stats(&self, worktree_path: &std::path::PathBuf) -> (i32, i32, serde_json::Value) { // Get numstat for lines added/removed let numstat = tokio::process::Command::new("git") .current_dir(worktree_path) .args(["diff", "--cached", "--numstat"]) .output() .await; let (mut total_added, mut total_removed) = (0i32, 0i32); if let Ok(output) = numstat { for line in String::from_utf8_lossy(&output.stdout).lines() { let parts: Vec<&str> = line.split_whitespace().collect(); if parts.len() >= 2 { if let Ok(added) = parts[0].parse::() { total_added += added; } if let Ok(removed) = parts[1].parse::() { total_removed += removed; } } } } // Get name-status for file changes let name_status = tokio::process::Command::new("git") .current_dir(worktree_path) .args(["diff", "--cached", "--name-status"]) .output() .await; let mut files = Vec::new(); if let Ok(output) = name_status { for line in String::from_utf8_lossy(&output.stdout).lines() { let parts: Vec<&str> = line.split_whitespace().collect(); if parts.len() >= 2 { files.push(serde_json::json!({ "action": parts[0], "path": parts[1] })); } } } (total_added, total_removed, serde_json::json!(files)) } /// Find worktree path for a task ID. /// First checks in-memory tasks, then scans the worktrees directory. async fn find_worktree_for_task_tm(&self, task_id: Uuid) -> Result { // First try to get from in-memory tasks { let tasks = self.tasks.read().await; if let Some(task) = tasks.get(&task_id) { if let Some(ref worktree) = task.worktree { return Ok(worktree.path.clone()); } } } // Task not in memory - scan worktrees directory for matching task ID let short_id = &task_id.to_string()[..8]; let worktrees_dir = self.worktree_manager.base_dir(); if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await { while let Ok(Some(entry)) = entries.next_entry().await { let name = entry.file_name(); let name_str = name.to_string_lossy(); if name_str.starts_with(short_id) { let path = entry.path(); // Verify it's a valid git directory if path.join(".git").exists() { tracing::info!( task_id = %task_id, worktree_path = %path.display(), "Found worktree by scanning directory" ); return Ok(path); } } } } Err(format!( "No worktree found for task {}. The worktree may have been cleaned up.", task_id )) } /// Handle ApplyPatchToWorktree command - apply a patch from a cross-daemon task to a supervisor's worktree. async fn handle_apply_patch_to_worktree( &self, target_task_id: Uuid, source_task_id: Uuid, patch_data: String, base_sha: String, ) -> Result<(), DaemonError> { // Find the target task's worktree let worktree_path = match self.find_worktree_for_task_tm(target_task_id).await { Ok(path) => path, Err(e) => { tracing::error!( target_task_id = %target_task_id, error = %e, "Failed to find worktree for patch application" ); let msg = DaemonMessage::task_output( target_task_id, format!("Failed to apply patch from task {}: worktree not found - {}\n", source_task_id, e), true, ); let _ = self.ws_tx.send(msg).await; return Ok(()); } }; tracing::info!( target_task_id = %target_task_id, source_task_id = %source_task_id, worktree = %worktree_path.display(), "Applying cross-daemon patch to worktree" ); // Decode the base64-gzipped patch data let patch_bytes = match base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &patch_data) { Ok(bytes) => bytes, Err(e) => { tracing::error!(error = %e, "Failed to decode patch base64"); let msg = DaemonMessage::task_output( target_task_id, format!("Failed to apply patch from task {}: base64 decode error - {}\n", source_task_id, e), true, ); let _ = self.ws_tx.send(msg).await; return Ok(()); } }; // Decompress the gzipped patch let patch_content = { use std::io::Read; let mut decoder = flate2::read::GzDecoder::new(&patch_bytes[..]); let mut content = String::new(); match decoder.read_to_string(&mut content) { Ok(_) => content, Err(e) => { tracing::error!(error = %e, "Failed to decompress patch"); let msg = DaemonMessage::task_output( target_task_id, format!("Failed to apply patch from task {}: decompress error - {}\n", source_task_id, e), true, ); let _ = self.ws_tx.send(msg).await; return Ok(()); } } }; // Check if patch is empty if patch_content.trim().is_empty() { tracing::info!( target_task_id = %target_task_id, source_task_id = %source_task_id, "Cross-daemon task had no changes to merge" ); let msg = DaemonMessage::task_output( target_task_id, format!("Cross-daemon task {} completed with no changes to merge\n", source_task_id), false, ); let _ = self.ws_tx.send(msg).await; return Ok(()); } // Apply the patch using git apply let mut child = match tokio::process::Command::new("git") .current_dir(&worktree_path) .args(["apply", "--3way", "-"]) .stdin(std::process::Stdio::piped()) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) .spawn() { Ok(child) => child, Err(e) => { tracing::error!(error = %e, "Failed to spawn git apply"); let msg = DaemonMessage::task_output( target_task_id, format!("Failed to apply patch from task {}: spawn error - {}\n", source_task_id, e), true, ); let _ = self.ws_tx.send(msg).await; return Ok(()); } }; // Write patch to stdin if let Some(mut stdin) = child.stdin.take() { use tokio::io::AsyncWriteExt; if let Err(e) = stdin.write_all(patch_content.as_bytes()).await { tracing::error!(error = %e, "Failed to write patch to git apply stdin"); } } // Wait for completion let output = match child.wait_with_output().await { Ok(output) => output, Err(e) => { tracing::error!(error = %e, "Failed to wait for git apply"); let msg = DaemonMessage::task_output( target_task_id, format!("Failed to apply patch from task {}: wait error - {}\n", source_task_id, e), true, ); let _ = self.ws_tx.send(msg).await; return Ok(()); } }; if output.status.success() { tracing::info!( target_task_id = %target_task_id, source_task_id = %source_task_id, base_sha = %base_sha, "Successfully applied cross-daemon patch" ); let msg = DaemonMessage::task_output( target_task_id, format!("Successfully merged changes from cross-daemon task {} (base: {})\n", source_task_id, &base_sha[..8]), false, ); let _ = self.ws_tx.send(msg).await; } else { let stderr = String::from_utf8_lossy(&output.stderr); tracing::error!( target_task_id = %target_task_id, source_task_id = %source_task_id, stderr = %stderr, "Failed to apply cross-daemon patch" ); let msg = DaemonMessage::task_output( target_task_id, format!("Failed to apply patch from task {}: {}\n", source_task_id, stderr), true, ); let _ = self.ws_tx.send(msg).await; } Ok(()) } /// Handle InheritGitConfig command - read git config from a directory and store it. async fn handle_inherit_git_config( &self, source_dir: Option, ) -> Result<(), DaemonError> { // Use provided directory or current working directory let dir = source_dir .map(std::path::PathBuf::from) .unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."))); tracing::info!(dir = ?dir, "Reading git config from directory"); // Read user.email let email_output = tokio::process::Command::new("git") .current_dir(&dir) .args(["config", "user.email"]) .output() .await; let user_email = match email_output { Ok(output) if output.status.success() => { let email = String::from_utf8_lossy(&output.stdout).trim().to_string(); if !email.is_empty() { Some(email) } else { None } } _ => None, }; // Read user.name let name_output = tokio::process::Command::new("git") .current_dir(&dir) .args(["config", "user.name"]) .output() .await; let user_name = match name_output { Ok(output) if output.status.success() => { let name = String::from_utf8_lossy(&output.stdout).trim().to_string(); if !name.is_empty() { Some(name) } else { None } } _ => None, }; // Check if we got at least one value if user_email.is_none() && user_name.is_none() { let msg = DaemonMessage::GitConfigInherited { success: false, user_email: None, user_name: None, error: Some("No git config found in the specified directory".to_string()), }; let _ = self.ws_tx.send(msg).await; return Ok(()); } // Store the config if let Some(ref email) = user_email { *self.git_user_email.write().await = Some(email.clone()); tracing::info!(email = %email, "Inherited git user.email"); } if let Some(ref name) = user_name { *self.git_user_name.write().await = Some(name.clone()); tracing::info!(name = %name, "Inherited git user.name"); } // Send success response let msg = DaemonMessage::GitConfigInherited { success: true, user_email, user_name, error: None, }; let _ = self.ws_tx.send(msg).await; Ok(()) } /// Apply inherited git config to a worktree directory. pub async fn apply_git_config(&self, worktree_path: &std::path::Path) -> Result<(), DaemonError> { let email = self.git_user_email.read().await.clone(); let name = self.git_user_name.read().await.clone(); if let Some(email) = email { let result = tokio::process::Command::new("git") .current_dir(worktree_path) .args(["config", "user.email", &email]) .output() .await; if let Err(e) = result { tracing::warn!(error = %e, "Failed to set git user.email in worktree"); } } if let Some(name) = name { let result = tokio::process::Command::new("git") .current_dir(worktree_path) .args(["config", "user.name", &name]) .output() .await; if let Err(e) = result { tracing::warn!(error = %e, "Failed to set git user.name in worktree"); } } Ok(()) } } /// Inner state for spawned tasks (cloneable). struct TaskManagerInner { worktree_manager: Arc, process_manager: Arc, temp_manager: Arc, tasks: Arc>>, ws_tx: mpsc::Sender, task_inputs: Arc>>>, active_pids: Arc>>, git_user_email: Arc>>, git_user_name: Arc>>, api_url: String, api_key: String, heartbeat_commit_interval_secs: u64, /// Shared contract task counts for releasing concurrency slots. contract_task_counts: Arc>>, /// Checkpoint patch storage configuration. checkpoint_patches: CheckpointPatchConfig, /// Local SQLite database for crash recovery. local_db: Arc>, } impl TaskManagerInner { /// Release a concurrency slot when a task completes. async fn release_concurrency_slot(&self, concurrency_key: Uuid) { let mut counts = self.contract_task_counts.write().await; if let Some(count) = counts.get_mut(&concurrency_key) { *count = count.saturating_sub(1); let new_count = *count; if new_count == 0 { counts.remove(&concurrency_key); } tracing::debug!( concurrency_key = %concurrency_key, new_count = new_count, "Released concurrency slot (from TaskManagerInner)" ); } } /// Remove completed/failed task from local database. fn remove_task_from_local_db(&self, task_id: Uuid) { if let Ok(db) = self.local_db.lock() { if let Err(e) = db.delete_task(task_id) { tracing::warn!(task_id = %task_id, error = %e, "Failed to remove task from local database"); } else { tracing::debug!(task_id = %task_id, "Removed task from local database"); } } } /// Fetch the latest checkpoint patch from the server and restore a worktree. async fn fetch_and_restore_patch( &self, task_id: Uuid, task_name: &str, repo_source: Option<&str>, ) -> Result, DaemonError> { use crate::daemon::api::client::ApiClient; if self.api_key.is_empty() { tracing::debug!(task_id = %task_id, "No API key configured, skipping patch fetch"); return Ok(None); } let client = match ApiClient::new(self.api_url.clone(), self.api_key.clone()) { Ok(c) => c, Err(e) => { tracing::warn!(task_id = %task_id, error = %e, "Failed to create API client for patch fetch"); return Ok(None); } }; let path = format!("/api/v1/mesh/tasks/{}/patch-data", task_id); #[derive(serde::Deserialize)] #[serde(rename_all = "camelCase")] struct PatchDataResponse { patch_data: String, base_commit_sha: String, repository_url: Option, } let resp: PatchDataResponse = match client.get(&path).await { Ok(r) => r, Err(crate::daemon::api::client::ApiError::Api { status: 404, .. }) => { tracing::debug!(task_id = %task_id, "No checkpoint patch found on server"); return Ok(None); } Err(e) => { tracing::warn!(task_id = %task_id, error = %e, "Failed to fetch patch data from server"); return Ok(None); } }; // Determine repo source: prefer the one from run_task args, fall back to server response let source = repo_source .map(|s| s.to_string()) .or(resp.repository_url); let source = match source { Some(s) => s, None => { tracing::warn!(task_id = %task_id, "No repository URL available to restore patch"); return Ok(None); } }; tracing::info!( task_id = %task_id, base_sha = %resp.base_commit_sha, "Fetched checkpoint patch from server, attempting restore" ); // Decode base64 patch data let patch_bytes = match base64::Engine::decode( &base64::engine::general_purpose::STANDARD, &resp.patch_data, ) { Ok(b) => b, Err(e) => { tracing::warn!(task_id = %task_id, error = %e, "Failed to decode fetched patch data"); return Ok(None); } }; match self.worktree_manager.restore_from_patch( &source, task_id, task_name, &resp.base_commit_sha, &patch_bytes, ).await { Ok(worktree_info) => { tracing::info!( task_id = %task_id, path = %worktree_info.path.display(), "Successfully restored worktree from fetched patch" ); Ok(Some(worktree_info.path)) } Err(e) => { tracing::warn!(task_id = %task_id, error = %e, "Failed to restore worktree from fetched patch"); Ok(None) } } } /// Run a task to completion. #[allow(clippy::too_many_arguments)] async fn run_task( &self, task_id: Uuid, task_name: String, plan: String, repo_source: Option, base_branch: Option, target_branch: Option, is_orchestrator: bool, is_supervisor: bool, target_repo_path: Option, completion_action: Option, continue_from_task_id: Option, copy_files: Option>, contract_id: Option, autonomous_loop: bool, resume_session: bool, conversation_history: Option, patch_data: Option, patch_base_sha: Option, local_only: bool, auto_merge_local: bool, supervisor_worktree_task_id: Option, directive_id: Option, ) -> Result<(), DaemonError> { tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, resume_session = resume_session, has_patch = patch_data.is_some(), "=== RUN_TASK START ==="); // If resuming session, try to find existing worktree first let existing_worktree = if resume_session { match self.find_worktree_for_task(task_id).await { Ok(path) => { tracing::info!(task_id = %task_id, path = %path.display(), "Found existing worktree for session resume"); Some(path) } Err(e) => { tracing::warn!(task_id = %task_id, error = %e, "No existing worktree found for resume, will create new"); None } } } else { None }; // Try to restore from patch if worktree is missing but we have patch data let restored_from_patch = if existing_worktree.is_none() { if let (Some(patch_str), Some(base_sha), Some(source)) = (&patch_data, &patch_base_sha, &repo_source) { tracing::info!( task_id = %task_id, base_sha = %base_sha, patch_len = patch_str.len(), "Attempting to restore worktree from patch" ); let msg = DaemonMessage::task_output( task_id, format!("Restoring worktree from checkpoint patch...\n"), false, ); let _ = self.ws_tx.send(msg).await; // Decode base64 patch data match base64::Engine::decode(&base64::engine::general_purpose::STANDARD, patch_str) { Ok(patch_bytes) => { match self.worktree_manager.restore_from_patch( source, task_id, &task_name, base_sha, &patch_bytes, ).await { Ok(worktree_info) => { tracing::info!( task_id = %task_id, path = %worktree_info.path.display(), "Successfully restored worktree from patch" ); // Store worktree info { let mut tasks = self.tasks.write().await; if let Some(task) = tasks.get_mut(&task_id) { task.worktree = Some(worktree_info.clone()); } } let msg = DaemonMessage::task_output( task_id, format!("Worktree restored at {}\n", worktree_info.path.display()), false, ); let _ = self.ws_tx.send(msg).await; Some(worktree_info.path) } Err(e) => { tracing::warn!(task_id = %task_id, error = %e, "Failed to restore from patch, will clone fresh"); let msg = DaemonMessage::task_output( task_id, format!("Warning: Failed to restore from patch ({}), starting fresh\n", e), false, ); let _ = self.ws_tx.send(msg).await; None } } } Err(e) => { tracing::warn!(task_id = %task_id, error = %e, "Failed to decode patch data"); None } } } else { None } } else { None }; // If resuming but no local worktree and no inline patch, try fetching from server let restored_from_patch = if restored_from_patch.is_none() && existing_worktree.is_none() && resume_session { tracing::info!(task_id = %task_id, "No local worktree or inline patch for resume, trying server fetch"); let msg = DaemonMessage::task_output( task_id, "Fetching checkpoint patch from server...\n".to_string(), false, ); let _ = self.ws_tx.send(msg).await; match self.fetch_and_restore_patch(task_id, &task_name, repo_source.as_deref()).await { Ok(Some(path)) => { // Store worktree info in tasks map { let mut tasks = self.tasks.write().await; if let Some(task) = tasks.get_mut(&task_id) { task.worktree = Some(WorktreeInfo { path: path.clone(), branch: format!("task/{}", task_id), source_repo: repo_source.clone().unwrap_or_default().into(), }); } } let msg = DaemonMessage::task_output( task_id, format!("Worktree restored from server patch at {}\n", path.display()), false, ); let _ = self.ws_tx.send(msg).await; Some(path) } Ok(None) => { tracing::info!(task_id = %task_id, "No server patch available, falling through to conversation-only resume"); let msg = DaemonMessage::task_output( task_id, "No checkpoint patch available on server, resuming with conversation history only\n".to_string(), false, ); let _ = self.ws_tx.send(msg).await; None } Err(e) => { tracing::warn!(task_id = %task_id, error = %e, "Failed to fetch/restore patch from server"); None } } } else { restored_from_patch }; // Determine working directory // First check if we should share a supervisor's worktree // Track if we need to merge to supervisor on completion (cross-daemon case) let mut merge_to_supervisor_on_completion: Option = None; let shared_supervisor_worktree = if let Some(supervisor_task_id) = supervisor_worktree_task_id { match self.find_worktree_for_task(supervisor_task_id).await { Ok(path) => { tracing::info!( task_id = %task_id, supervisor_task_id = %supervisor_task_id, path = %path.display(), "Using shared worktree from supervisor" ); let msg = DaemonMessage::task_output( task_id, format!("Using shared worktree from supervisor: {}\n", path.display()), false, ); let _ = self.ws_tx.send(msg).await; Some(path) } Err(_) => { // Supervisor worktree not on this daemon (cross-daemon case) // Will create own worktree and merge to supervisor on completion tracing::info!( task_id = %task_id, supervisor_task_id = %supervisor_task_id, "Supervisor worktree not on this daemon, will create own and merge on completion" ); let msg = DaemonMessage::task_output( task_id, format!("Supervisor on different daemon, will merge changes on completion\n"), false, ); let _ = self.ws_tx.send(msg).await; // Mark for merge on completion merge_to_supervisor_on_completion = Some(supervisor_task_id); None } } } else { None }; let has_existing_worktree = existing_worktree.is_some() || restored_from_patch.is_some() || shared_supervisor_worktree.is_some(); let working_dir = if let Some(shared_path) = shared_supervisor_worktree { // Use supervisor's worktree directly (no copy, no new branch) shared_path } else if let Some(existing) = existing_worktree { // Reuse existing worktree for session resume let msg = DaemonMessage::task_output( task_id, format!("Resuming session in existing worktree: {}\n", existing.display()), false, ); let _ = self.ws_tx.send(msg).await; existing } else if let Some(restored_path) = restored_from_patch { // Already restored from patch above restored_path } else if let Some(ref source) = repo_source { if is_new_repo_request(source) { // Explicit new repo request: new:// or new://project-name tracing::info!( task_id = %task_id, source = %source, "Creating new git repository" ); let msg = DaemonMessage::task_output( task_id, format!("Initializing new git repository...\n"), false, ); let _ = self.ws_tx.send(msg).await; let worktree_info = self.worktree_manager .init_new_repo(task_id, source) .await .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?; tracing::info!( task_id = %task_id, path = %worktree_info.path.display(), "New repository created" ); // Apply inherited git config to the new repo (overrides defaults) self.apply_git_config(&worktree_info.path).await; // Store worktree info { let mut tasks = self.tasks.write().await; if let Some(task) = tasks.get_mut(&task_id) { task.worktree = Some(worktree_info.clone()); } } let msg = DaemonMessage::task_output( task_id, format!("Repository ready at {}\n", worktree_info.path.display()), false, ); let _ = self.ws_tx.send(msg).await; worktree_info.path } else { // Send progress message let msg = DaemonMessage::task_output( task_id, format!("Setting up worktree from {}...\n", source), false, ); let _ = self.ws_tx.send(msg).await; // Ensure source repo exists (clone if URL, verify if path) let source_repo = self.worktree_manager.ensure_repo(source).await .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?; // Detect or use provided base branch let branch = if let Some(ref b) = base_branch { b.clone() } else { self.worktree_manager.detect_default_branch(&source_repo).await .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))? }; tracing::info!( task_id = %task_id, source = %source, branch = %branch, continue_from_task_id = ?continue_from_task_id, "Setting up worktree" ); // Create worktree - either from scratch or copying from another task let task_name = format!("task-{}", &task_id.to_string()[..8]); let worktree_info = if let Some(from_task_id) = continue_from_task_id { // Fallback chain for continuing from a previous task: // 1. Try copying from existing worktree (fastest, preserves uncommitted changes) // 2. Try creating from pushed branch (branch was pushed to remote) // 3. Try restoring from saved patch data // 4. Fail if none available // Step 1: Try copying from existing worktree let copy_result = match self.find_worktree_for_task(from_task_id).await { Ok(source_worktree) => { let msg = DaemonMessage::task_output( task_id, format!("Continuing from task {} worktree...\n", &from_task_id.to_string()[..8]), false, ); let _ = self.ws_tx.send(msg).await; self.worktree_manager .create_worktree_from_task(&source_worktree, task_id, &task_name) .await } Err(e) => Err(crate::daemon::worktree::WorktreeError::RepoNotFound(e.to_string())), }; match copy_result { Ok(info) => info, Err(copy_err) => { tracing::warn!( task_id = %task_id, from_task_id = %from_task_id, error = %copy_err, "Failed to copy from source worktree, trying branch fallback" ); // Step 2: Try creating from pushed branch let msg = DaemonMessage::task_output( task_id, format!("Source worktree unavailable, checking for pushed branch...\n"), false, ); let _ = self.ws_tx.send(msg).await; match self.worktree_manager .create_worktree_from_task_branch(&source_repo, from_task_id, task_id, &task_name) .await { Ok(info) => { tracing::info!( task_id = %task_id, from_task_id = %from_task_id, branch = %info.branch, "Successfully created worktree from pushed branch" ); let msg = DaemonMessage::task_output( task_id, format!("Restored from pushed branch {}\n", info.branch), false, ); let _ = self.ws_tx.send(msg).await; info } Err(branch_err) => { tracing::warn!( task_id = %task_id, from_task_id = %from_task_id, error = %branch_err, "No pushed branch found, trying patch fallback" ); // Step 3: Try restoring from saved patch data if let (Some(patch_str), Some(base_sha)) = (&patch_data, &patch_base_sha) { tracing::info!( task_id = %task_id, from_task_id = %from_task_id, base_sha = %base_sha, patch_len = patch_str.len(), "Attempting to restore from checkpoint patch" ); let msg = DaemonMessage::task_output( task_id, format!("Restoring from checkpoint patch...\n"), false, ); let _ = self.ws_tx.send(msg).await; match base64::Engine::decode(&base64::engine::general_purpose::STANDARD, patch_str) { Ok(patch_bytes) => { match self.worktree_manager.restore_from_patch( source, task_id, &task_name, base_sha, &patch_bytes, ).await { Ok(worktree_info) => { tracing::info!( task_id = %task_id, path = %worktree_info.path.display(), "Successfully restored worktree from patch" ); worktree_info } Err(patch_err) => { tracing::warn!( task_id = %task_id, from_task_id = %from_task_id, error = %patch_err, "Patch restore failed — falling back to fresh worktree" ); let msg = DaemonMessage::task_output( task_id, format!("Patch restore failed, starting fresh from {}\n", branch), false, ); let _ = self.ws_tx.send(msg).await; self.worktree_manager .create_worktree(&source_repo, task_id, &task_name, &branch) .await .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))? } } } Err(decode_err) => { tracing::warn!( task_id = %task_id, from_task_id = %from_task_id, error = %decode_err, "Patch decode failed — falling back to fresh worktree" ); let msg = DaemonMessage::task_output( task_id, format!("Patch decode failed, starting fresh from {}\n", branch), false, ); let _ = self.ws_tx.send(msg).await; self.worktree_manager .create_worktree(&source_repo, task_id, &task_name, &branch) .await .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))? } } } else { // Step 4: Fall back to fresh worktree from base branch tracing::warn!( task_id = %task_id, from_task_id = %from_task_id, "All continue_from fallbacks failed — creating fresh worktree from base branch" ); let msg = DaemonMessage::task_output( task_id, format!("Source task worktree unavailable, starting fresh from {}\n", branch), false, ); let _ = self.ws_tx.send(msg).await; self.worktree_manager .create_worktree(&source_repo, task_id, &task_name, &branch) .await .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))? } } } } } } else { // Create fresh worktree from repo self.worktree_manager .create_worktree(&source_repo, task_id, &task_name, &branch) .await .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))? }; tracing::info!( task_id = %task_id, worktree_path = %worktree_info.path.display(), branch = %worktree_info.branch, continued_from = ?continue_from_task_id, "Worktree created" ); // Apply inherited git config to the worktree self.apply_git_config(&worktree_info.path).await; // Store worktree info { let mut tasks = self.tasks.write().await; if let Some(task) = tasks.get_mut(&task_id) { task.worktree = Some(worktree_info.clone()); } } let msg = DaemonMessage::task_output( task_id, format!("Worktree ready at {}\n", worktree_info.path.display()), false, ); let _ = self.ws_tx.send(msg).await; worktree_info.path } } else { // No repo specified - use managed temp directory in ~/.makima/temp/ tracing::info!(task_id = %task_id, "Creating managed temp directory (no repo)"); let msg = DaemonMessage::task_output( task_id, "Creating temporary working directory...\n".to_string(), false, ); let _ = self.ws_tx.send(msg).await; let temp_dir = self.temp_manager.create_task_dir(task_id).await?; let msg = DaemonMessage::task_output( task_id, format!("Working directory ready at {}\n", temp_dir.display()), false, ); let _ = self.ws_tx.send(msg).await; temp_dir }; // Store merge target if cross-daemon (will merge to supervisor on completion) if let Some(supervisor_task_id) = merge_to_supervisor_on_completion { let mut tasks = self.tasks.write().await; if let Some(task) = tasks.get_mut(&task_id) { task.merge_to_supervisor_task_id = Some(supervisor_task_id); tracing::info!( task_id = %task_id, supervisor_task_id = %supervisor_task_id, "Task marked for merge to supervisor on completion" ); } } // Copy files from parent task's worktree if specified if let Some(ref files) = copy_files { if !files.is_empty() { // Get the parent task ID to find its worktree let parent_task_id = { let tasks = self.tasks.read().await; tasks.get(&task_id).and_then(|t| t.parent_task_id) }; if let Some(parent_id) = parent_task_id { match self.find_worktree_for_task(parent_id).await { Ok(parent_worktree) => { let msg = DaemonMessage::task_output( task_id, format!("Copying {} files from orchestrator...\n", files.len()), false, ); let _ = self.ws_tx.send(msg).await; for file_path in files { let source = parent_worktree.join(file_path); let dest = working_dir.join(file_path); // Create parent directories if needed if let Some(parent) = dest.parent() { if let Err(e) = tokio::fs::create_dir_all(parent).await { tracing::warn!( task_id = %task_id, file = %file_path, error = %e, "Failed to create parent directory for file" ); continue; } } // Copy the file match tokio::fs::copy(&source, &dest).await { Ok(_) => { tracing::info!( task_id = %task_id, source = %source.display(), dest = %dest.display(), "Copied file from orchestrator" ); } Err(e) => { tracing::warn!( task_id = %task_id, source = %source.display(), dest = %dest.display(), error = %e, "Failed to copy file from orchestrator" ); // Notify but don't fail - the file might be optional let msg = DaemonMessage::task_output( task_id, format!("Warning: Could not copy {}: {}\n", file_path, e), false, ); let _ = self.ws_tx.send(msg).await; } } } let msg = DaemonMessage::task_output( task_id, "Files copied from orchestrator.\n".to_string(), false, ); let _ = self.ws_tx.send(msg).await; } Err(e) => { tracing::warn!( task_id = %task_id, parent_id = %parent_id, error = %e, "Could not find parent task worktree for file copying" ); } } } else { tracing::warn!( task_id = %task_id, "copy_files specified but no parent_task_id" ); } } } // Update state to Starting tracing::info!(task_id = %task_id, "Updating state: Initializing -> Starting"); self.update_state(task_id, TaskState::Starting).await; self.send_status_change(task_id, "initializing", "starting").await; // Check Claude is available match self.process_manager.check_claude_available().await { Ok(version) => { tracing::info!(task_id = %task_id, version = %version, "Claude Code available"); let msg = DaemonMessage::task_output( task_id, format!("Claude Code {} ready\n", version), false, ); let _ = self.ws_tx.send(msg).await; } Err(e) => { let err_msg = format!("Claude Code not available: {}", e); tracing::error!(task_id = %task_id, error = %err_msg); return Err(DaemonError::Task(TaskError::SetupFailed(err_msg))); } } // Set up supervisor, orchestrator, or subtask mode let (extra_env, full_plan, system_prompt) = if is_supervisor { // Supervisor mode: long-running contract orchestrator tracing::info!(task_id = %task_id, working_dir = %working_dir.display(), "Setting up supervisor mode"); let msg = DaemonMessage::task_output( task_id, "Setting up supervisor environment...\n".to_string(), false, ); let _ = self.ws_tx.send(msg).await; // Generate tool key for API access let tool_key = generate_tool_key(); tracing::info!(task_id = %task_id, tool_key_len = tool_key.len(), "Generated tool key for supervisor"); // Register tool key with server let register_msg = DaemonMessage::register_tool_key(task_id, tool_key.clone()); if self.ws_tx.send(register_msg).await.is_err() { tracing::warn!(task_id = %task_id, "Failed to register tool key"); } else { tracing::info!(task_id = %task_id, "Tool key registration message sent to server"); } // Set up environment variables for makima CLI let mut env = HashMap::new(); env.insert("MAKIMA_API_URL".to_string(), self.api_url.clone()); env.insert("MAKIMA_API_KEY".to_string(), tool_key.clone()); env.insert("MAKIMA_TASK_ID".to_string(), task_id.to_string()); // Supervisor needs contract ID for its tools if let Some(cid) = contract_id { env.insert("MAKIMA_CONTRACT_ID".to_string(), cid.to_string()); } tracing::info!( task_id = %task_id, api_url = %self.api_url, tool_key_preview = &tool_key[..8.min(tool_key.len())], "Set supervisor environment variables" ); // For supervisor, pass instructions as SYSTEM PROMPT (not user message) // This ensures Claude treats them as behavioral constraints let supervisor_user_plan = format!( "Contract goal:\n{}", plan ); let msg = DaemonMessage::task_output( task_id, "Supervisor environment ready (makima CLI available)\n".to_string(), false, ); let _ = self.ws_tx.send(msg).await; // Return system prompt separately - it will be passed via --system-prompt flag (Some(env), supervisor_user_plan, Some(SUPERVISOR_SYSTEM_PROMPT.to_string())) } else if is_orchestrator { tracing::info!(task_id = %task_id, working_dir = %working_dir.display(), "Setting up orchestrator mode"); let msg = DaemonMessage::task_output( task_id, "Setting up orchestrator environment...\n".to_string(), false, ); let _ = self.ws_tx.send(msg).await; // Generate tool key for API access let tool_key = generate_tool_key(); tracing::info!(task_id = %task_id, tool_key_len = tool_key.len(), "Generated tool key for orchestrator"); // Register tool key with server let register_msg = DaemonMessage::register_tool_key(task_id, tool_key.clone()); if self.ws_tx.send(register_msg).await.is_err() { tracing::warn!(task_id = %task_id, "Failed to register tool key"); } else { tracing::info!(task_id = %task_id, "Tool key registration message sent to server"); } // Set up environment variables for makima CLI let mut env = HashMap::new(); env.insert("MAKIMA_API_URL".to_string(), self.api_url.clone()); env.insert("MAKIMA_API_KEY".to_string(), tool_key.clone()); env.insert("MAKIMA_TASK_ID".to_string(), task_id.to_string()); tracing::info!( task_id = %task_id, api_url = %self.api_url, tool_key_preview = &tool_key[..8.min(tool_key.len())], "Set orchestrator environment variables" ); // For orchestrator, pass instructions as SYSTEM PROMPT let orchestrator_user_plan = format!( "Your task:\n{}", plan ); let msg = DaemonMessage::task_output( task_id, "Orchestrator environment ready (makima CLI available)\n".to_string(), false, ); let _ = self.ws_tx.send(msg).await; (Some(env), orchestrator_user_plan, Some(ORCHESTRATOR_SYSTEM_PROMPT.to_string())) } else { tracing::info!(task_id = %task_id, "Running as regular subtask (not orchestrator)"); // For subtasks, pass worktree isolation instructions as system prompt let subtask_user_plan = format!( "Your task:\n{}", plan ); (None, subtask_user_plan, Some(SUBTASK_SYSTEM_PROMPT.to_string())) }; // Add contract environment if task has contract_id (skip for supervisors - they already have it) let (extra_env, full_plan, system_prompt) = if let Some(cid) = contract_id { if is_supervisor { // Supervisors already have contract ID and API access set up tracing::info!(task_id = %task_id, contract_id = %cid, "Supervisor already has contract integration"); (extra_env, full_plan, system_prompt) } else { tracing::info!(task_id = %task_id, contract_id = %cid, "Setting up contract integration"); // Set up environment variables for makima CLI let mut env = extra_env.unwrap_or_default(); env.insert("MAKIMA_CONTRACT_ID".to_string(), cid.to_string()); // If not already an orchestrator, we need API access for makima CLI if !is_orchestrator { // Generate tool key for API access let tool_key = generate_tool_key(); tracing::info!(task_id = %task_id, "Generated tool key for contract access"); // Register tool key with server let register_msg = DaemonMessage::register_tool_key(task_id, tool_key.clone()); if self.ws_tx.send(register_msg).await.is_err() { tracing::warn!(task_id = %task_id, "Failed to register contract tool key"); } env.insert("MAKIMA_API_URL".to_string(), self.api_url.clone()); env.insert("MAKIMA_API_KEY".to_string(), tool_key); env.insert("MAKIMA_TASK_ID".to_string(), task_id.to_string()); } let msg = DaemonMessage::task_output( task_id, "Contract integration ready (makima CLI available)\n".to_string(), false, ); let _ = self.ws_tx.send(msg).await; // Prepend contract integration prompt to the plan so the task knows to use makima CLI let contract_plan = format!( "{}{}", CONTRACT_INTEGRATION_PROMPT, full_plan ); (Some(env), contract_plan, system_prompt) } } else { (extra_env, full_plan, system_prompt) }; // Add directive environment if task has directive_id let (extra_env, full_plan, system_prompt) = if let Some(did) = directive_id { tracing::info!(task_id = %task_id, directive_id = %did, "Setting up directive integration"); let mut env = extra_env.unwrap_or_default(); env.insert("MAKIMA_DIRECTIVE_ID".to_string(), did.to_string()); // If not already an orchestrator/supervisor, we need API access for makima CLI if !is_orchestrator && !is_supervisor && !env.contains_key("MAKIMA_API_KEY") { let tool_key = generate_tool_key(); tracing::info!(task_id = %task_id, "Generated tool key for directive access"); let register_msg = DaemonMessage::register_tool_key(task_id, tool_key.clone()); if self.ws_tx.send(register_msg).await.is_err() { tracing::warn!(task_id = %task_id, "Failed to register directive tool key"); } env.insert("MAKIMA_API_URL".to_string(), self.api_url.clone()); env.insert("MAKIMA_API_KEY".to_string(), tool_key); env.insert("MAKIMA_TASK_ID".to_string(), task_id.to_string()); } let msg = DaemonMessage::task_output( task_id, "Directive integration ready (makima CLI available)\n".to_string(), false, ); let _ = self.ws_tx.send(msg).await; (Some(env), full_plan, system_prompt) } else { (extra_env, full_plan, system_prompt) }; // Spawn Claude process let plan_bytes = full_plan.len(); let plan_chars = full_plan.chars().count(); // Rough token estimate: ~4 chars per token for English let estimated_tokens = plan_chars / 4; tracing::info!( task_id = %task_id, working_dir = %working_dir.display(), is_orchestrator = is_orchestrator, plan_bytes = plan_bytes, plan_chars = plan_chars, estimated_tokens = estimated_tokens, "Spawning Claude process" ); // Warn if plan is very large (Claude's context is typically 100k-200k tokens) if estimated_tokens > 50_000 { tracing::warn!(task_id = %task_id, estimated_tokens = estimated_tokens, "Plan is very large - may hit context limits!"); let msg = DaemonMessage::task_output( task_id, format!("Warning: Plan is very large (~{} tokens). This may cause issues.\n", estimated_tokens), false, ); let _ = self.ws_tx.send(msg).await; } let msg = DaemonMessage::task_output( task_id, if is_orchestrator { format!("Starting Claude Code (orchestrator mode, ~{} tokens)...\n", estimated_tokens) } else { format!("Starting Claude Code (~{} tokens)...\n", estimated_tokens) }, false, ); let _ = self.ws_tx.send(msg).await; // Clone extra_env for use in autonomous loop iterations let extra_env_for_loop = extra_env.clone(); tracing::debug!(task_id = %task_id, has_system_prompt = system_prompt.is_some(), resume_session = resume_session, "Calling process_manager.spawn()..."); let mut process = if resume_session { // Use --continue flag to resume from previous session // Build continuation prompt based on whether worktree exists let continuation_prompt = if has_existing_worktree { // Worktree exists: Claude's session state should work format!( "Resuming previous session. Continue from where you left off.\n\n{}", full_plan ) } else if let Some(ref history) = conversation_history { // Worktree missing: inject conversation history as context let history_str = serde_json::to_string_pretty(history).unwrap_or_default(); format!( "Resuming previous session. Here is the conversation history from the previous session:\n\n\ \n{}\n\n\n\ Continue from where you left off with this task:\n\n{}", history_str, full_plan ) } else { // No history available: just the plan format!("Resuming with plan:\n\n{}", full_plan) }; let resume_msg = if has_existing_worktree { "Using --continue to resume previous conversation...\n" } else if conversation_history.is_some() { "Worktree not found. Resuming with injected conversation history...\n" } else { "Resuming without conversation history (worktree not found)...\n" }; let msg = DaemonMessage::task_output( task_id, resume_msg.to_string(), false, ); let _ = self.ws_tx.send(msg).await; self.process_manager .spawn_continue(&working_dir, &continuation_prompt, extra_env, system_prompt.as_deref()) .await .map_err(|e| { tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process with --continue"); DaemonError::Task(TaskError::SetupFailed(e.to_string())) })? } else { self.process_manager .spawn_with_system_prompt(&working_dir, &full_plan, extra_env, system_prompt.as_deref()) .await .map_err(|e| { tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process"); DaemonError::Task(TaskError::SetupFailed(e.to_string())) })? }; // Register the process PID for graceful shutdown tracking if let Some(pid) = process.id() { self.active_pids.write().await.insert(task_id, pid); tracing::info!(task_id = %task_id, pid = pid, "Claude process spawned successfully, PID registered"); } else { tracing::info!(task_id = %task_id, "Claude process spawned successfully (no PID available)"); } // Set up input channel for this task so we can send messages to its stdin tracing::debug!(task_id = %task_id, "Setting up input channel..."); let (input_tx, mut input_rx) = mpsc::channel::(100); tracing::debug!(task_id = %task_id, "Acquiring task_inputs write lock..."); self.task_inputs.write().await.insert(task_id, input_tx); tracing::debug!(task_id = %task_id, "Input channel registered"); // Get stdin handle for input forwarding and completion signaling let stdin_handle = process.stdin_handle(); let mut stdin_handle_for_completion = stdin_handle.clone(); tracing::info!(task_id = %task_id, "Setting up stdin forwarder for task input (JSON protocol)"); tokio::spawn(async move { tracing::info!(task_id = %task_id, "Stdin forwarder task started, waiting for messages..."); while let Some(msg) = input_rx.recv().await { tracing::info!(task_id = %task_id, msg_len = msg.len(), msg_preview = %if msg.len() > 50 { &msg[..50] } else { &msg }, "Received message from input channel"); // Format as JSON user message for stream-json input protocol let json_msg = ClaudeInputMessage::user(&msg); let json_line = match json_msg.to_json_line() { Ok(line) => line, Err(e) => { tracing::error!(task_id = %task_id, error = %e, "Failed to serialize input message"); continue; } }; tracing::debug!(task_id = %task_id, json_line = %json_line.trim(), "Formatted JSON line for stdin"); let mut stdin_guard = stdin_handle.lock().await; if let Some(ref mut stdin) = *stdin_guard { tracing::debug!(task_id = %task_id, "Acquired stdin lock, writing..."); if stdin.write_all(json_line.as_bytes()).await.is_err() { tracing::warn!(task_id = %task_id, "Failed to write to stdin, breaking"); break; } if stdin.flush().await.is_err() { tracing::warn!(task_id = %task_id, "Failed to flush stdin, breaking"); break; } tracing::info!(task_id = %task_id, json_len = json_line.len(), "Successfully wrote user message to Claude stdin"); } else { tracing::warn!(task_id = %task_id, "Stdin is None (already closed), cannot send message"); break; } } tracing::info!(task_id = %task_id, "Stdin forwarder task ended (channel closed or stdin unavailable)"); }); // Update state to Running { tracing::debug!(task_id = %task_id, "Acquiring tasks write lock for Running state update"); let mut tasks = self.tasks.write().await; if let Some(task) = tasks.get_mut(&task_id) { task.state = TaskState::Running; task.started_at = Some(Instant::now()); } tracing::debug!(task_id = %task_id, "Released tasks write lock"); } tracing::info!(task_id = %task_id, "Updating state: Starting -> Running"); self.send_status_change(task_id, "starting", "running").await; tracing::debug!(task_id = %task_id, "Sent status change notification"); // Stream output with startup timeout check tracing::info!(task_id = %task_id, "Starting output stream - waiting for Claude output..."); tracing::debug!(task_id = %task_id, "Output will be forwarded via WebSocket to server"); let ws_tx = self.ws_tx.clone(); // For auth error detection let claude_command = self.process_manager.claude_command().to_string(); let daemon_hostname = hostname::get().ok().and_then(|h| h.into_string().ok()); let mut auth_error_handled = false; // For autonomous loop mode: track accumulated output for COMPLETION_GATE detection let mut accumulated_output = String::new(); let mut circuit_breaker = CircuitBreaker::new(); let mut iteration_count = 0u32; let mut final_exit_code: i64 = -1; // Track the final exit code across iterations // Autonomous loop: we may run multiple iterations 'autonomous_loop: loop { iteration_count += 1; if autonomous_loop && iteration_count > 1 { tracing::info!( task_id = %task_id, iteration = iteration_count, "Starting autonomous loop iteration" ); let msg = DaemonMessage::task_output( task_id, format!("\n[Autonomous Loop] Starting iteration {} (--continue mode)\n", iteration_count), false, ); let _ = self.ws_tx.send(msg).await; // For subsequent iterations, spawn with --continue flag let continuation_prompt = "Continue working on the task. Review your previous output and progress. When you are completely done, output a COMPLETION_GATE block with ready: true."; process = self.process_manager .spawn_continue(&working_dir, continuation_prompt, extra_env_for_loop.clone(), system_prompt.as_deref()) .await .map_err(|e| { tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process for continuation"); DaemonError::Task(TaskError::SetupFailed(e.to_string())) })?; // Register the new process PID if let Some(pid) = process.id() { self.active_pids.write().await.insert(task_id, pid); tracing::info!(task_id = %task_id, pid = pid, iteration = iteration_count, "Claude continue process spawned"); } // Reset stdin handle for the new process stdin_handle_for_completion = process.stdin_handle(); } // Clear output for this iteration (we'll check for COMPLETION_GATE in the new output) let mut iteration_output = String::new(); let mut output_count = 0u64; let mut output_bytes = 0usize; let startup_timeout = tokio::time::Duration::from_secs(30); let mut startup_check = tokio::time::interval(tokio::time::Duration::from_secs(5)); startup_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); let startup_deadline = tokio::time::Instant::now() + startup_timeout; // Heartbeat commit interval (only active if configured and we have a git repo) let heartbeat_enabled = self.heartbeat_commit_interval_secs > 0 && repo_source.is_some(); let mut heartbeat_interval = tokio::time::interval( tokio::time::Duration::from_secs( if heartbeat_enabled { self.heartbeat_commit_interval_secs } else { u64::MAX } ) ); heartbeat_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); // Skip the first immediate tick heartbeat_interval.tick().await; loop { tokio::select! { maybe_line = process.next_output() => { match maybe_line { Some(line) => { output_count += 1; output_bytes += line.content.len(); // Accumulate output for COMPLETION_GATE detection in autonomous loop mode if autonomous_loop { iteration_output.push_str(&line.content); iteration_output.push('\n'); } if output_count == 1 { tracing::info!(task_id = %task_id, "Received first output line from Claude"); } if output_count % 100 == 0 { tracing::debug!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output progress"); } // Log output details for debugging tracing::trace!( task_id = %task_id, line_num = output_count, content_len = line.content.len(), is_stdout = line.is_stdout, json_type = ?line.json_type, "Forwarding output to WebSocket" ); // Check if this is a "result" message indicating task completion // With --input-format=stream-json, Claude waits for more input after completion if line.json_type.as_deref() == Some("result") { if autonomous_loop || directive_id.is_some() { // Autonomous loop: close stdin so we can spawn the next iteration // Directive tasks: close stdin so the process exits and the step completes tracing::info!(task_id = %task_id, directive_id = ?directive_id, "Received result message, closing stdin to signal completion"); let mut stdin_guard = stdin_handle_for_completion.lock().await; if let Some(mut stdin) = stdin_guard.take() { let _ = stdin.shutdown().await; } } else { // Interactive mode (mesh, contracts): keep stdin open so the user // can send follow-up messages. Claude will stay alive waiting for input. tracing::info!(task_id = %task_id, "Received result message, keeping stdin open for interactive input"); } } // Check for OAuth auth error before sending output let content_for_auth_check = line.content.clone(); let json_type_for_auth_check = line.json_type.clone(); let is_stdout_for_auth_check = line.is_stdout; let msg = DaemonMessage::task_output(task_id, line.content, false); if ws_tx.send(msg).await.is_err() { tracing::warn!(task_id = %task_id, "Failed to send output, channel closed"); break; } // Detect OAuth token expiration - log warning and let the task fail normally. // Users can reauthorize via the Daemons page instead. if !auth_error_handled && is_oauth_auth_error(&content_for_auth_check, json_type_for_auth_check.as_deref(), is_stdout_for_auth_check) { auth_error_handled = true; tracing::warn!(task_id = %task_id, "OAuth authentication error detected - task will fail. Reauthorize via Daemons page."); let error_msg = DaemonMessage::task_output( task_id, format!("⚠ Authentication expired on daemon{}. Go to Daemons page to reauthorize, then retry this task.\n", daemon_hostname.as_ref().map(|h| format!(" ({})", h)).unwrap_or_default()), false, ); let _ = ws_tx.send(error_msg).await; } } None => { tracing::info!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output stream ended"); break; } } } _ = startup_check.tick(), if output_count == 0 => { // Check if process is still alive match process.try_wait() { Ok(Some(exit_code)) => { tracing::error!(task_id = %task_id, exit_code = exit_code, "Claude process exited before producing output!"); let msg = DaemonMessage::task_output( task_id, format!("Error: Claude process exited unexpectedly with code {}\n", exit_code), false, ); let _ = ws_tx.send(msg).await; break; } Ok(None) => { // Still running but no output if tokio::time::Instant::now() > startup_deadline { tracing::warn!(task_id = %task_id, "Claude process not producing output after 30s - may be stuck"); let msg = DaemonMessage::task_output( task_id, "Warning: Claude Code is taking longer than expected to start. It may be waiting for authentication or network access.\n".to_string(), false, ); let _ = ws_tx.send(msg).await; } else { tracing::debug!(task_id = %task_id, "Claude process still running, waiting for output..."); } } Err(e) => { tracing::error!(task_id = %task_id, error = %e, "Failed to check Claude process status"); } } } _ = heartbeat_interval.tick(), if heartbeat_enabled => { // Create periodic ephemeral patch to preserve work-in-progress match self.create_ephemeral_patch(task_id, &working_dir).await { Ok(files_count) => { let msg = DaemonMessage::task_output( task_id, format!("[Heartbeat] Patch saved ({} files)\n", files_count), false, ); let _ = ws_tx.send(msg).await; } Err(e) => { // No changes to patch or error - this is fine, just log at debug level tracing::debug!(task_id = %task_id, error = %e, "Heartbeat patch skipped"); } } } } } // Wait for process to exit let exit_code = process.wait().await.unwrap_or(-1); final_exit_code = exit_code; // Store for use after the loop // Unregister the process PID (process has exited) self.active_pids.write().await.remove(&task_id); tracing::debug!(task_id = %task_id, "Unregistered process PID"); // Clean up input channel for this task self.task_inputs.write().await.remove(&task_id); tracing::debug!(task_id = %task_id, "Removed task input channel"); // Accumulate this iteration's output accumulated_output.push_str(&iteration_output); // === AUTONOMOUS LOOP LOGIC === // Check if we should continue or complete if autonomous_loop && exit_code == 0 { // Check for COMPLETION_GATE in the output let completion_gate = CompletionGate::parse_last(&iteration_output); match completion_gate { Some(gate) if gate.ready => { tracing::info!( task_id = %task_id, iteration = iteration_count, reason = ?gate.reason, "COMPLETION_GATE ready=true detected, task complete" ); let msg = DaemonMessage::task_output( task_id, format!("\n[Autonomous Loop] Task completed after {} iteration(s). Reason: {}\n", iteration_count, gate.reason.unwrap_or_else(|| "Task complete".to_string()) ), false, ); let _ = self.ws_tx.send(msg).await; break 'autonomous_loop; } Some(gate) => { // COMPLETION_GATE found but not ready tracing::info!( task_id = %task_id, iteration = iteration_count, reason = ?gate.reason, blockers = ?gate.blockers, "COMPLETION_GATE ready=false, will continue" ); // Check circuit breaker // For now, we consider output_bytes > 0 as "progress" let had_progress = output_bytes > 0; let error = gate.blockers.as_ref().and_then(|b| b.first()).map(|s| s.as_str()); if !circuit_breaker.record_iteration(had_progress, error) { // Circuit breaker tripped tracing::warn!( task_id = %task_id, reason = ?circuit_breaker.open_reason, "Circuit breaker tripped, stopping autonomous loop" ); let msg = DaemonMessage::task_output( task_id, format!("\n[Autonomous Loop] Circuit breaker tripped: {}\n", circuit_breaker.open_reason.as_deref().unwrap_or("Unknown reason") ), false, ); let _ = self.ws_tx.send(msg).await; break 'autonomous_loop; } let msg = DaemonMessage::task_output( task_id, format!("\n[Autonomous Loop] COMPLETION_GATE ready=false. Reason: {}. Restarting...\n", gate.reason.unwrap_or_else(|| "Not complete".to_string()) ), false, ); let _ = self.ws_tx.send(msg).await; // Continue to next iteration continue 'autonomous_loop; } None => { // No COMPLETION_GATE found - check circuit breaker and continue tracing::info!( task_id = %task_id, iteration = iteration_count, "No COMPLETION_GATE found, will restart with continuation prompt" ); let had_progress = output_bytes > 0; if !circuit_breaker.record_iteration(had_progress, None) { tracing::warn!( task_id = %task_id, reason = ?circuit_breaker.open_reason, "Circuit breaker tripped (no COMPLETION_GATE), stopping" ); let msg = DaemonMessage::task_output( task_id, format!("\n[Autonomous Loop] Circuit breaker tripped: {}\n", circuit_breaker.open_reason.as_deref().unwrap_or("Unknown reason") ), false, ); let _ = self.ws_tx.send(msg).await; break 'autonomous_loop; } let msg = DaemonMessage::task_output( task_id, "\n[Autonomous Loop] No COMPLETION_GATE found. Restarting with --continue...\n".to_string(), false, ); let _ = self.ws_tx.send(msg).await; continue 'autonomous_loop; } } } else { // Not in autonomous loop mode or process failed - exit normally break 'autonomous_loop; } } // end 'autonomous_loop // Update state based on exit code let success = final_exit_code == 0; let new_state = if success { TaskState::Completed } else { TaskState::Failed }; tracing::info!( task_id = %task_id, exit_code = final_exit_code, success = success, new_state = ?new_state, "Claude process exited, updating task state" ); { let mut tasks = self.tasks.write().await; if let Some(task) = tasks.get_mut(&task_id) { task.state = new_state; task.completed_at = Some(Instant::now()); if !success { task.error = Some(format!("Process exited with code {}", final_exit_code)); } } } // Execute completion action if task succeeded (skip in local_only mode unless auto_merge_local is enabled) let completion_result = if success { if local_only { if auto_merge_local { // In local_only mode with auto_merge_local enabled, merge locally tracing::info!( task_id = %task_id, "Local-only mode with auto_merge_local - executing local merge" ); self.execute_completion_action( task_id, &task_name, &working_dir, "merge", // Use merge action (not pr) target_repo_path.as_deref(), target_branch.as_deref(), ).await } else { tracing::info!( task_id = %task_id, "Skipping completion action - contract is in local_only mode" ); Ok(None) } } else if let Some(ref action) = completion_action { if action != "none" { self.execute_completion_action( task_id, &task_name, &working_dir, action, target_repo_path.as_deref(), target_branch.as_deref(), ).await } else { Ok(None) } } else { Ok(None) } } else { Ok(None) }; // Log completion action result match &completion_result { Ok(Some(pr_url)) => { tracing::info!(task_id = %task_id, pr_url = %pr_url, "Completion action created PR"); } Ok(None) => {} Err(e) => { tracing::warn!(task_id = %task_id, error = %e, "Completion action failed (task still marked as done)"); } } // If this task needs to merge to supervisor (cross-daemon case), generate and send patch let merge_to_supervisor = { let tasks = self.tasks.read().await; tasks.get(&task_id).and_then(|t| t.merge_to_supervisor_task_id) }; if let Some(supervisor_task_id) = merge_to_supervisor { if success { tracing::info!( task_id = %task_id, supervisor_task_id = %supervisor_task_id, "Task completed on cross-daemon, generating patch to merge to supervisor" ); // Get base SHA from the worktree's initial commit or parent match crate::daemon::storage::get_parent_sha(&working_dir).await { Ok(base_sha) => { // Generate patch match crate::daemon::storage::create_patch(&working_dir, &base_sha).await { Ok((patch_bytes, files_count)) => { // Base64 encode the patch let patch_data = base64::Engine::encode( &base64::engine::general_purpose::STANDARD, &patch_bytes, ); tracing::info!( task_id = %task_id, supervisor_task_id = %supervisor_task_id, files_count = files_count, patch_size = patch_bytes.len(), "Sending patch to supervisor" ); // Send MergePatchToSupervisor message to server let msg = DaemonMessage::MergePatchToSupervisor { task_id, supervisor_task_id, patch_data, base_sha, }; let _ = self.ws_tx.send(msg).await; let output_msg = DaemonMessage::task_output( task_id, format!("Sent {} file(s) to supervisor for merge\n", files_count), false, ); let _ = self.ws_tx.send(output_msg).await; } Err(e) => { tracing::warn!( task_id = %task_id, error = %e, "Failed to create patch for supervisor merge" ); } } } Err(e) => { tracing::warn!( task_id = %task_id, error = %e, "Failed to get base SHA for supervisor merge" ); } } } } // Notify server - but NOT for supervisors which should never complete if is_supervisor { tracing::info!( task_id = %task_id, exit_code = final_exit_code, "Supervisor Claude process exited - NOT marking as complete" ); // Update local state to reflect it's paused/waiting for input { let mut tasks = self.tasks.write().await; if let Some(task) = tasks.get_mut(&task_id) { task.state = TaskState::Running; // Keep it as running, not completed task.completed_at = None; } } // Send a status message to let the frontend know supervisor is ready for more input let msg = DaemonMessage::task_output( task_id, "\n[Supervisor ready for next instruction]\n".to_string(), false, ); let _ = self.ws_tx.send(msg).await; } else { // Create completion patch before notifying server if let Err(e) = self.create_completion_patch(task_id, &working_dir).await { tracing::debug!(task_id = %task_id, error = %e, "No completion patch created"); } let error = if success { None } else { Some(format!("Exit code: {}", final_exit_code)) }; tracing::info!(task_id = %task_id, success = success, "Notifying server of task completion"); let msg = DaemonMessage::task_complete(task_id, success, error); let _ = self.ws_tx.send(msg).await; // Remove completed task from local database (no longer needs crash recovery) self.remove_task_from_local_db(task_id); } // Note: Worktrees are kept until explicitly deleted (per user preference) // This allows inspection, PR creation, etc. tracing::info!(task_id = %task_id, "=== RUN_TASK END ==="); Ok(()) } /// Execute the completion action for a task. async fn execute_completion_action( &self, task_id: Uuid, task_name: &str, worktree_path: &std::path::Path, action: &str, target_repo_path: Option<&str>, target_branch: Option<&str>, ) -> Result, String> { // For PR action, we can use the worktree's origin directly if target_repo_path is not set let target_repo = match target_repo_path { Some(path) => Some(crate::daemon::worktree::expand_tilde(path)), None => { if action == "pr" || action == "branch" { // For PR/branch action without target_repo, use origin directly None } else { tracing::warn!(task_id = %task_id, "No target_repo_path configured, skipping completion action"); return Ok(None); } } }; // Validate target_repo exists if provided if let Some(ref repo) = target_repo { if !repo.exists() { return Err(format!("Target repo not found: {} (expanded from {:?})", repo.display(), target_repo_path)); } } // Get the branch name: makima/{task-name-with-dashes}-{short-id} let branch_name = format!( "makima/{}-{}", crate::daemon::worktree::sanitize_name(task_name), crate::daemon::worktree::short_uuid(task_id) ); // Determine target branch - use provided value or detect default branch let target_branch = match target_branch { Some(branch) => branch.to_string(), None => { // Detect default branch from target_repo if available, otherwise from worktree let detect_path = target_repo.as_ref().map(|p| p.as_path()).unwrap_or(worktree_path); self.worktree_manager .detect_default_branch(detect_path) .await .unwrap_or_else(|_| "master".to_string()) } }; let msg = DaemonMessage::task_output( task_id, format!("Executing completion action: {}...\n", action), false, ); let _ = self.ws_tx.send(msg).await; match action { "branch" => { match target_repo { Some(target_repo) => { // Push branch to local target repo self.worktree_manager .push_to_target_repo(worktree_path, &target_repo, &branch_name, task_name) .await .map_err(|e| e.to_string())?; let msg = DaemonMessage::task_output( task_id, format!("Branch '{}' pushed to {}\n", branch_name, target_repo.display()), false, ); let _ = self.ws_tx.send(msg).await; } None => { // Push branch to origin (GitHub) self.worktree_manager .push_branch_to_origin(worktree_path, &branch_name, task_name) .await .map_err(|e| e.to_string())?; let msg = DaemonMessage::task_output( task_id, format!("Branch '{}' pushed to origin\n", branch_name), false, ); let _ = self.ws_tx.send(msg).await; } } Ok(None) } "merge" => { let target_repo = target_repo.ok_or_else(|| "No target_repo_path configured for merge action".to_string())?; // Push and merge into target branch let commit_sha = self.worktree_manager .merge_to_target(worktree_path, &target_repo, &branch_name, &target_branch, task_name) .await .map_err(|e| e.to_string())?; let msg = DaemonMessage::task_output( task_id, format!("Branch merged into {} (commit: {})\n", target_branch, commit_sha), false, ); let _ = self.ws_tx.send(msg).await; Ok(None) } "pr" => { // Push and create PR // For PR, we can use target_repo if provided, or create PR directly from worktree let title = task_name.to_string(); let body = format!( "Automated PR from makima task.\n\nTask ID: `{}`", task_id ); let pr_url = self.worktree_manager .create_pull_request( worktree_path, target_repo.as_deref(), &branch_name, &target_branch, &title, &body, ) .await .map_err(|e| e.to_string())?; let msg = DaemonMessage::task_output( task_id, format!("Pull request created: {}\n", pr_url), false, ); let _ = self.ws_tx.send(msg).await; Ok(Some(pr_url)) } _ => { tracing::warn!(task_id = %task_id, action = %action, "Unknown completion action"); Ok(None) } } } /// Find worktree path for a task ID. /// First checks in-memory tasks, then scans the worktrees directory. async fn find_worktree_for_task(&self, task_id: Uuid) -> Result { // First try to get from in-memory tasks { let tasks = self.tasks.read().await; if let Some(task) = tasks.get(&task_id) { if let Some(ref worktree) = task.worktree { return Ok(worktree.path.clone()); } } } // Task not in memory - scan worktrees directory for matching task ID let short_id = &task_id.to_string()[..8]; let worktrees_dir = self.worktree_manager.base_dir(); if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await { while let Ok(Some(entry)) = entries.next_entry().await { let name = entry.file_name(); let name_str = name.to_string_lossy(); if name_str.starts_with(short_id) { let path = entry.path(); // Verify it's a valid git directory if path.join(".git").exists() { tracing::info!( task_id = %task_id, worktree_path = %path.display(), "Found worktree by scanning directory" ); return Ok(path); } } } } Err(format!( "No worktree found for task {}. The worktree may have been cleaned up.", task_id )) } async fn update_state(&self, task_id: Uuid, state: TaskState) { let mut tasks = self.tasks.write().await; if let Some(task) = tasks.get_mut(&task_id) { task.state = state; } } async fn send_status_change(&self, task_id: Uuid, old_status: &str, new_status: &str) { let msg = DaemonMessage::task_status_change(task_id, old_status, new_status); let _ = self.ws_tx.send(msg).await; } /// Mark task as failed. async fn mark_failed(&self, task_id: Uuid, error: &str) { { let mut tasks = self.tasks.write().await; if let Some(task) = tasks.get_mut(&task_id) { task.state = TaskState::Failed; task.error = Some(error.to_string()); task.completed_at = Some(Instant::now()); } } // Notify server let msg = DaemonMessage::task_complete(task_id, false, Some(error.to_string())); let _ = self.ws_tx.send(msg).await; // Remove failed task from local database self.remove_task_from_local_db(task_id); } /// Apply inherited git config to a worktree directory. async fn apply_git_config(&self, worktree_path: &std::path::Path) { let email = self.git_user_email.read().await.clone(); let name = self.git_user_name.read().await.clone(); if email.is_none() && name.is_none() { return; // No inherited config to apply } if let Some(email) = email { let result = tokio::process::Command::new("git") .current_dir(worktree_path) .args(["config", "user.email", &email]) .output() .await; match result { Ok(output) if output.status.success() => { tracing::debug!(email = %email, path = ?worktree_path, "Applied git user.email to worktree"); } Ok(output) => { tracing::warn!( path = ?worktree_path, stderr = %String::from_utf8_lossy(&output.stderr), "Failed to set git user.email in worktree" ); } Err(e) => { tracing::warn!(error = %e, "Failed to run git config user.email"); } } } if let Some(name) = name { let result = tokio::process::Command::new("git") .current_dir(worktree_path) .args(["config", "user.name", &name]) .output() .await; match result { Ok(output) if output.status.success() => { tracing::debug!(name = %name, path = ?worktree_path, "Applied git user.name to worktree"); } Ok(output) => { tracing::warn!( path = ?worktree_path, stderr = %String::from_utf8_lossy(&output.stderr), "Failed to set git user.name in worktree" ); } Err(e) => { tracing::warn!(error = %e, "Failed to run git config user.name"); } } } } /// Create an ephemeral patch of all changes (committed + uncommitted) since the /// merge-base with main/master and send to the server. /// Stages and commits any uncommitted changes, then diffs against the merge-base. /// Returns the number of files changed on success, or an error message if nothing to patch. async fn create_ephemeral_patch( &self, task_id: Uuid, worktree_path: &std::path::Path, ) -> Result { if !self.checkpoint_patches.enabled { return Err("Checkpoint patches disabled".into()); } // 1. Find merge-base with main/master (the fork point) let base_sha = storage::get_merge_base_sha(worktree_path) .await .map_err(|e| format!("Failed to get merge-base: {}", e))?; // 2. Stage and commit any uncommitted changes so they're included in the diff let _ = tokio::process::Command::new("git") .current_dir(worktree_path) .args(["add", "-A"]) .output() .await; // Check if there are staged changes to commit let staged_check = tokio::process::Command::new("git") .current_dir(worktree_path) .args(["diff", "--cached", "--quiet"]) .output() .await; if let Ok(output) = staged_check { if !output.status.success() { // There are staged changes - commit them let _ = tokio::process::Command::new("git") .current_dir(worktree_path) .args(["commit", "-m", "WIP: heartbeat checkpoint"]) .output() .await; } } // 3. Create patch (diff merge-base..HEAD captures all work) match storage::create_patch(worktree_path, &base_sha).await { Ok((compressed_patch, patch_files_count)) => { // Check size limit if compressed_patch.len() > self.checkpoint_patches.max_patch_size_bytes { tracing::warn!( task_id = %task_id, patch_size = compressed_patch.len(), max_size = self.checkpoint_patches.max_patch_size_bytes, "Patch exceeds size limit" ); return Err("Patch exceeds size limit".into()); } // Encode as base64 for JSON transport let patch_data = base64::engine::general_purpose::STANDARD.encode(&compressed_patch); tracing::debug!( task_id = %task_id, base_sha = %base_sha, patch_size = compressed_patch.len(), files_count = patch_files_count, "Created ephemeral patch" ); // Send CheckpointCreated message to server (patch-only, no commit) let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC"); let msg = DaemonMessage::CheckpointCreated { task_id, success: true, commit_sha: None, branch_name: None, checkpoint_number: None, files_changed: None, lines_added: None, lines_removed: None, error: None, message: format!("Ephemeral patch - {}", timestamp), patch_data: Some(patch_data), patch_base_sha: Some(base_sha), patch_files_count: Some(patch_files_count as i32), }; let _ = self.ws_tx.send(msg).await; Ok(patch_files_count as i32) } Err(e) => { Err(format!("Failed to create patch: {}", e)) } } } /// Create a completion patch capturing all changes (committed + uncommitted) since /// the merge-base with main/master. Sent before TaskComplete so the server always /// has a recoverable patch. All errors are non-fatal (logged, not propagated). async fn create_completion_patch( &self, task_id: Uuid, worktree_path: &std::path::Path, ) -> Result<(), String> { // 1. Stage all changes let _ = tokio::process::Command::new("git") .current_dir(worktree_path) .args(["add", "-A"]) .output() .await; // 2. Commit any staged changes so HEAD contains everything let staged_check = tokio::process::Command::new("git") .current_dir(worktree_path) .args(["diff", "--cached", "--quiet"]) .output() .await; if let Ok(output) = staged_check { if !output.status.success() { let _ = tokio::process::Command::new("git") .current_dir(worktree_path) .args(["commit", "-m", "WIP: task completion checkpoint"]) .output() .await; } } // 3. Find merge-base with main/master let base_sha = storage::get_merge_base_sha(worktree_path) .await .map_err(|e| format!("Failed to get merge-base: {}", e))?; // 4. Create patch (diff merge-base..HEAD) let (compressed_patch, patch_files_count) = storage::create_patch(worktree_path, &base_sha) .await .map_err(|e| format!("Failed to create patch: {}", e))?; // 5. Check size limit if compressed_patch.len() > self.checkpoint_patches.max_patch_size_bytes { return Err(format!( "Patch too large: {} bytes (max: {})", compressed_patch.len(), self.checkpoint_patches.max_patch_size_bytes )); } // 6. Send to server let patch_data = base64::engine::general_purpose::STANDARD.encode(&compressed_patch); let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC"); let msg = DaemonMessage::CheckpointCreated { task_id, success: true, commit_sha: None, branch_name: None, checkpoint_number: None, files_changed: None, lines_added: None, lines_removed: None, error: None, message: format!("Completion patch - {}", timestamp), patch_data: Some(patch_data), patch_base_sha: Some(base_sha), patch_files_count: Some(patch_files_count as i32), }; let _ = self.ws_tx.send(msg).await; tracing::info!( task_id = %task_id, files_count = patch_files_count, "Created completion patch" ); Ok(()) } } impl Clone for TaskManagerInner { fn clone(&self) -> Self { Self { worktree_manager: self.worktree_manager.clone(), process_manager: self.process_manager.clone(), temp_manager: self.temp_manager.clone(), tasks: self.tasks.clone(), ws_tx: self.ws_tx.clone(), task_inputs: self.task_inputs.clone(), active_pids: self.active_pids.clone(), git_user_email: self.git_user_email.clone(), git_user_name: self.git_user_name.clone(), api_url: self.api_url.clone(), api_key: self.api_key.clone(), heartbeat_commit_interval_secs: self.heartbeat_commit_interval_secs, contract_task_counts: self.contract_task_counts.clone(), checkpoint_patches: self.checkpoint_patches.clone(), local_db: self.local_db.clone(), } } }