From 87044a747b47bd83249d61a45842c7f7b2eae56d Mon Sep 17 00:00:00 2001 From: soryu Date: Sun, 11 Jan 2026 05:52:14 +0000 Subject: Contract system --- makima/src/daemon/task/manager.rs | 3215 +++++++++++++++++++++++++++++++++++++ 1 file changed, 3215 insertions(+) create mode 100644 makima/src/daemon/task/manager.rs (limited to 'makima/src/daemon/task/manager.rs') diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs new file mode 100644 index 0000000..8269083 --- /dev/null +++ b/makima/src/daemon/task/manager.rs @@ -0,0 +1,3215 @@ +//! Task lifecycle manager using git worktrees and Claude Code subprocesses. + +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Instant; + +use rand::Rng; +use tokio::io::AsyncWriteExt; +use tokio::sync::{mpsc, RwLock, Semaphore}; +use uuid::Uuid; + +use std::collections::HashSet; + +use super::state::TaskState; +use crate::daemon::error::{DaemonError, TaskError, TaskResult}; +use crate::daemon::process::{ClaudeInputMessage, ProcessManager}; +use crate::daemon::temp::TempManager; +use crate::daemon::worktree::{is_new_repo_request, ConflictResolution, WorktreeInfo, WorktreeManager}; +use crate::daemon::ws::{BranchInfo, DaemonCommand, DaemonMessage}; + +/// Generate a secure random API key for orchestrator tool access. +fn generate_tool_key() -> String { + let mut rng = rand::thread_rng(); + let bytes: [u8; 32] = rng.r#gen(); + hex::encode(bytes) +} + +/// Check if output contains an OAuth authentication error. +fn is_oauth_auth_error(output: &str) -> bool { + // Match various authentication error patterns from Claude Code + if output.contains("Please run /login") { + return true; + } + if output.contains("Invalid API key") { + return true; + } + if output.contains("authentication_error") + && (output.contains("OAuth token has expired") + || output.contains("Please obtain a new token")) + { + return true; + } + false +} + +/// Extract OAuth URL from text (looks for claude.ai OAuth URLs). +fn extract_url(text: &str) -> Option { + // Look for claude.ai OAuth URLs - try multiple patterns + let patterns = [ + "https://claude.ai/oauth", + "https://console.anthropic.com/oauth", + ]; + + for pattern in patterns { + if let Some(start) = text.find(pattern) { + let remaining = &text[start..]; + // Find the end of the URL - stop at: + // - Whitespace, common URL terminators, escape sequences + let end = remaining + .find(|c: char| { + c.is_whitespace() || c == '"' || c == '\'' || c == '>' || c == ')' || c == ']' || c == '\x07' || c == '\x1b' + }) + .unwrap_or(remaining.len()); + + let url = &remaining[..end]; + + // Also check if there's another https:// within the match (hyperlink duplication) + // Skip first 20 chars to avoid matching the start + let url = if url.len() > 30 { + if let Some(second_https) = url[20..].find("https://") { + &url[..second_https + 20] // Keep only first URL + } else { + url + } + } else { + url + }; + + if url.len() > 20 { + return Some(url.to_string()); + } + } + } + None +} + +/// Global storage for pending OAuth flow (only one can be active at a time per daemon) +static PENDING_AUTH_FLOW: std::sync::OnceLock>>> = std::sync::OnceLock::new(); + +fn get_auth_flow_storage() -> &'static std::sync::Mutex>> { + PENDING_AUTH_FLOW.get_or_init(|| std::sync::Mutex::new(None)) +} + +/// Send an auth code to the pending OAuth flow. +pub fn send_auth_code(code: &str) -> bool { + let storage = get_auth_flow_storage(); + if let Ok(mut guard) = storage.lock() { + if let Some(sender) = guard.take() { + if sender.send(code.to_string()).is_ok() { + tracing::info!("Auth code sent to setup-token process"); + return true; + } + } + } + tracing::warn!("No pending auth flow to send code to"); + false +} + +/// Spawn `claude setup-token` to initiate OAuth flow and capture the login URL. +/// This spawns the process in a PTY (required by Ink) and reads output until we find a URL. +/// The process continues running in the background waiting for auth completion. +async fn get_oauth_login_url(claude_command: &str) -> Option { + use portable_pty::{native_pty_system, CommandBuilder, PtySize}; + use std::io::{Read, Write}; + + tracing::info!("Spawning claude setup-token in PTY to get OAuth login URL"); + + // Create a PTY - Ink requires a real terminal + let pty_system = native_pty_system(); + let pair = match pty_system.openpty(PtySize { + rows: 24, + cols: 200, // Wide enough to avoid line wrapping for long URLs/codes + pixel_width: 0, + pixel_height: 0, + }) { + Ok(pair) => pair, + Err(e) => { + tracing::error!(error = %e, "Failed to open PTY"); + return None; + } + }; + + // Build the command + let mut cmd = CommandBuilder::new(claude_command); + cmd.arg("setup-token"); + // Set environment variables to prevent browser from opening and disable fancy output + // Use "false" so the browser command fails, forcing setup-token to show URL and wait for manual input + cmd.env("BROWSER", "false"); + cmd.env("TERM", "dumb"); // Disable hyperlinks and fancy terminal features + cmd.env("NO_COLOR", "1"); // Disable colors + + // Spawn the process in the PTY + let mut child = match pair.slave.spawn_command(cmd) { + Ok(child) => child, + Err(e) => { + tracing::error!(error = %e, "Failed to spawn claude setup-token in PTY"); + return None; + } + }; + + // Get the reader and writer from the master side + let mut reader = match pair.master.try_clone_reader() { + Ok(reader) => reader, + Err(e) => { + tracing::error!(error = %e, "Failed to clone PTY reader"); + return None; + } + }; + + let mut writer = match pair.master.take_writer() { + Ok(writer) => writer, + Err(e) => { + tracing::error!(error = %e, "Failed to take PTY writer"); + return None; + } + }; + + // Create channels for communication + let (code_tx, code_rx) = std::sync::mpsc::channel::(); + let (url_tx, url_rx) = std::sync::mpsc::channel::(); + + // Store the code sender globally so it can be used when AUTH_CODE message arrives + { + let storage = get_auth_flow_storage(); + if let Ok(mut guard) = storage.lock() { + *guard = Some(code_tx); + } + } + + // Spawn reader thread - reads PTY output and sends URL when found + let reader_handle = std::thread::spawn(move || { + let mut buffer = [0u8; 4096]; + let mut accumulated = String::new(); + let mut url_sent = false; + let mut read_count = 0; + + tracing::info!("setup-token reader thread started"); + + loop { + match reader.read(&mut buffer) { + Ok(0) => { + tracing::info!("setup-token PTY EOF reached after {} reads", read_count); + break; + } + Ok(n) => { + read_count += 1; + let chunk = String::from_utf8_lossy(&buffer[..n]); + accumulated.push_str(&chunk); + + // Process complete lines + while let Some(newline_pos) = accumulated.find('\n') { + let line = accumulated[..newline_pos].to_string(); + accumulated = accumulated[newline_pos + 1..].to_string(); + + let clean_line = strip_ansi_codes(&line); + if !clean_line.trim().is_empty() { + tracing::info!(line = %clean_line, "setup-token output"); + } + + // Look for OAuth URL if not found yet + if !url_sent { + if let Some(url) = extract_url(&line) { + tracing::info!(url = %url, "Found OAuth login URL"); + let _ = url_tx.send(url); + url_sent = true; + } + } + + // Check for success/failure messages + if clean_line.contains("successfully") || clean_line.contains("authenticated") || clean_line.contains("Success") { + tracing::info!("Authentication appears successful!"); + } + if clean_line.contains("error") || clean_line.contains("failed") || clean_line.contains("invalid") { + tracing::warn!(line = %clean_line, "setup-token may have encountered an error"); + } + } + } + Err(e) => { + tracing::warn!(error = %e, "PTY read error after {} reads", read_count); + break; + } + } + } + tracing::info!("setup-token reader thread ended"); + }); + + // Spawn writer thread - waits for auth code and writes it to PTY + std::thread::spawn(move || { + tracing::info!("setup-token writer thread started, waiting for auth code (10 min timeout)"); + + // Wait for auth code from frontend (with long timeout - user needs time to authenticate) + match code_rx.recv_timeout(std::time::Duration::from_secs(600)) { + Ok(code) => { + tracing::info!(code_len = code.len(), "Received auth code from frontend, writing to PTY"); + // Write code followed by carriage return (Enter key in raw terminal mode) + let code_with_enter = format!("{}\r", code); + if let Err(e) = writer.write_all(code_with_enter.as_bytes()) { + tracing::error!(error = %e, "Failed to write auth code to PTY"); + } else if let Err(e) = writer.flush() { + tracing::error!(error = %e, "Failed to flush PTY writer"); + } else { + tracing::info!("Auth code written to setup-token PTY successfully"); + // Give Ink a moment to process, then send another Enter in case first was buffered + std::thread::sleep(std::time::Duration::from_millis(100)); + let _ = writer.write_all(b"\r"); + let _ = writer.flush(); + tracing::info!("Sent additional Enter keypress"); + } + } + Err(e) => { + tracing::info!(error = %e, "Auth code receive ended (timeout or channel closed)"); + } + } + + // Wait for reader thread to finish + tracing::debug!("Waiting for reader thread to finish..."); + let _ = reader_handle.join(); + + // Wait for child to fully exit + tracing::debug!("Waiting for setup-token child process to exit..."); + match child.wait() { + Ok(status) => { + tracing::info!(exit_status = ?status, "setup-token process exited"); + } + Err(e) => { + tracing::error!(error = %e, "Failed to wait for setup-token process"); + } + } + }); + + // Wait for URL with timeout + match url_rx.recv_timeout(std::time::Duration::from_secs(30)) { + Ok(url) => Some(url), + Err(e) => { + tracing::error!(error = %e, "Timed out waiting for OAuth login URL"); + None + } + } +} + +/// Strip ANSI escape codes from a string for cleaner logging. +fn strip_ansi_codes(s: &str) -> String { + let mut result = String::with_capacity(s.len()); + let mut chars = s.chars().peekable(); + + while let Some(c) = chars.next() { + if c == '\x1b' { + // Check what type of escape sequence + match chars.peek() { + Some(&'[') => { + // CSI sequence: ESC [ ... letter + chars.next(); // consume '[' + while let Some(&next) = chars.peek() { + chars.next(); + if next.is_ascii_alphabetic() { + break; + } + } + } + Some(&']') => { + // OSC sequence: ESC ] ... ST (where ST is BEL or ESC \) + chars.next(); // consume ']' + while let Some(&next) = chars.peek() { + if next == '\x07' { + chars.next(); // consume BEL (string terminator) + break; + } + if next == '\x1b' { + chars.next(); // consume ESC + if chars.peek() == Some(&'\\') { + chars.next(); // consume \ (string terminator) + } + break; + } + chars.next(); + } + } + _ => { + // Unknown escape, skip just the ESC + } + } + } else if !c.is_control() || c == '\n' { + result.push(c); + } + } + + result +} + +/// System prompt for regular (non-orchestrator) subtasks. +/// This ensures subtasks work only within their isolated worktree directory. +const SUBTASK_SYSTEM_PROMPT: &str = r#"You are working in an isolated worktree directory that contains a snapshot of the codebase. + +## IMPORTANT: Directory Restrictions + +**You MUST only work within the current working directory (your worktree).** + +- DO NOT use `cd` to navigate to directories outside your worktree +- DO NOT use absolute paths that point outside your worktree (e.g., don't write to ~/some/path, /tmp, or the original repository) +- DO NOT modify files in parent directories or sibling directories +- All your file operations should be relative to the current directory + +Your working directory is your sandboxed workspace. When you complete your task, your changes will be reviewed and integrated by the orchestrator. + +**Why?** Your worktree is isolated so that: +1. Your changes don't affect other running tasks +2. Changes can be reviewed before integration +3. Multiple tasks can work on the codebase in parallel without conflicts + +--- + +"#; + +/// The orchestrator system prompt that tells Claude how to use the helper script. +const ORCHESTRATOR_SYSTEM_PROMPT: &str = r#"You are an orchestrator task. Your job is to coordinate subtasks and integrate their work, NOT to write code directly. + +## FIRST STEP + +Start by checking if you have existing subtasks: + +```bash +# List all subtasks to see what work needs to be done +./.makima/orchestrate.sh list +``` + +If subtasks exist, start them. If you need additional subtasks or no subtasks exist yet, you can create them. + +--- + +## Creating Subtasks + +You can create new subtasks to break down work: + +```bash +# Create a new subtask with a name and plan +./.makima/orchestrate.sh create "Subtask Name" "Detailed plan for what the subtask should do..." + +# The command returns the new subtask ID - use it to start the subtask +./.makima/orchestrate.sh start +``` + +Create subtasks when you need to: +- Break down complex work into smaller pieces +- Run multiple tasks in parallel on different parts of the codebase +- Delegate specific implementation work + +## Task Continuation (Sequential Dependencies) + +When subtasks need to build on each other's work (e.g., Task B depends on Task A's changes), use `--continue-from`: + +```bash +# Create Task B that continues from Task A's worktree +./.makima/orchestrate.sh create "Task B" "Build on Task A's work..." --continue-from +``` + +This copies all files from Task A's worktree into Task B's worktree, so Task B starts with Task A's changes. + +**When to use continuation:** +- Sequential work: Task B needs Task A's output files +- Staged implementation: Building features incrementally +- Fix-and-extend: One task fixes issues, another adds features on top + +**When NOT to use continuation:** +- Parallel tasks working on different files +- Independent subtasks that can be merged separately + +**Important for merging:** When tasks continue from each other, only merge the FINAL task in the chain. Earlier tasks' changes are already included in later tasks' worktrees. + +## Sharing Files with Subtasks + +Use `--files` to copy specific files from your orchestrator worktree to subtasks. This is useful for sharing plans, configs, or data files: + +```bash +# Create subtask with specific files copied from orchestrator +./.makima/orchestrate.sh create "Implement Feature" "Follow the plan in PLAN.md" --files "PLAN.md" + +# Copy multiple files (comma-separated) +./.makima/orchestrate.sh create "API Work" "Use the spec..." --files "PLAN.md,api-spec.yaml,types.ts" + +# Combine with --continue-from to share files AND continue from another task +./.makima/orchestrate.sh create "Step 2" "Continue..." --continue-from --files "requirements.md" +``` + +**Use cases for --files:** +- Share a PLAN.md with detailed implementation steps +- Distribute configuration or spec files +- Pass generated data or intermediate results + +## How Subtasks Work + +Each subtask runs in its own **worktree** - a separate directory with a copy of the codebase. When subtasks complete: +- Their work remains in the worktree files (NOT committed to git) +- **Subtasks do NOT auto-merge** - YOU must integrate their work into your worktree +- You can view and copy files from subtask worktrees using their paths +- The worktree path is returned when you get subtask status + +**IMPORTANT:** Subtasks never create PRs or merge to the target repository. Only the orchestrator (you) can trigger completion actions like PR creation or merging after integrating all subtask work. + +## Subtask Commands +```bash +# List all subtasks and their current status +./.makima/orchestrate.sh list + +# Create a new subtask (returns the subtask ID) +./.makima/orchestrate.sh create "Name" "Plan/description" + +# Create a subtask that continues from another task's worktree +./.makima/orchestrate.sh create "Name" "Plan" --continue-from + +# Create a subtask with specific files copied from orchestrator worktree +./.makima/orchestrate.sh create "Name" "Plan" --files "file1.md,file2.yaml" + +# Start a specific subtask (it will run in its own Claude instance) +./.makima/orchestrate.sh start + +# Stop a running subtask +./.makima/orchestrate.sh stop + +# Get detailed status of a subtask (includes worktree_path when available) +./.makima/orchestrate.sh status + +# Get the output/logs of a subtask +./.makima/orchestrate.sh output + +# Get the worktree path for a subtask +./.makima/orchestrate.sh worktree +``` + +## Integrating Subtask Work + +When subtasks complete, their changes exist as files in their worktree directories: +- Files are NOT committed to git branches +- You must copy/integrate files from subtask worktrees into your worktree +- Use standard file operations (cp, cat, etc.) to review and integrate changes + +### Handling Continuation Chains + +**CRITICAL:** When subtasks use `--continue-from`, they form a chain where each task includes all changes from previous tasks. You must ONLY integrate the FINAL task in each chain. + +Example chain: Task A → Task B (continues from A) → Task C (continues from B) +- Task C's worktree contains ALL changes from A, B, and C +- You should ONLY integrate Task C's worktree +- DO NOT integrate Task A or Task B separately (their changes are already in C) + +**How to track continuation chains:** +1. When you create tasks with `--continue-from`, note which task continues from which +2. Build a mental model: Independent tasks (no continuation) + Continuation chains +3. For each chain, only integrate the LAST task in the chain + +**Example with mixed independent and chained tasks:** +``` +Independent tasks (integrate all): +- Task X: API endpoints +- Task Y: Database models + +Continuation chain (integrate ONLY the last one): +- Task A: Core feature → Task B: Tests (continues from A) → Task C: Docs (continues from B) + Only integrate Task C! +``` + +### Integration Examples + +For independent subtasks (no continuation): +```bash +# Get the worktree path for a completed subtask +SUBTASK_PATH=$(./.makima/orchestrate.sh worktree ) + +# View what files were changed +ls -la "$SUBTASK_PATH" +diff -r . "$SUBTASK_PATH" --exclude=.git --exclude=.makima + +# Copy specific files from subtask +cp "$SUBTASK_PATH/src/new_file.rs" ./src/ +cp "$SUBTASK_PATH/src/modified_file.rs" ./src/ + +# Or use diff/patch for more control +diff -u ./src/file.rs "$SUBTASK_PATH/src/file.rs" > changes.patch +patch -p0 < changes.patch +``` + +For continuation chains (only integrate the final task): +```bash +# If you have: Task A → Task B → Task C (each continues from previous) +# ONLY get and integrate Task C's worktree - it has everything! + +FINAL_TASK_PATH=$(./.makima/orchestrate.sh worktree ) + +# Copy all changes from the final task +rsync -av --exclude='.git' --exclude='.makima' "$FINAL_TASK_PATH/" ./ +``` + +## Completion +```bash +# Mark yourself as complete after integrating all subtask work +./.makima/orchestrate.sh done "Summary of what was accomplished" +``` + +## Workflow +1. **List existing subtasks**: Run `list` to see current subtasks +2. **Create subtasks if needed**: Use `create` to add new subtasks for the work + - For independent parallel work: create without `--continue-from` + - For sequential dependencies: use `--continue-from ` + - Track which tasks continue from which (continuation chains) +3. **Start subtasks**: Run `start` for each subtask +4. **Monitor progress**: Check status and output as subtasks run +5. **Integrate work**: When subtasks complete: + - For independent tasks: integrate each one's worktree + - For continuation chains: ONLY integrate the FINAL task (it has all changes) + - Get worktree path with `worktree ` + - Copy or merge files into your worktree +6. **Complete**: Call `done` once all work is integrated + +## Important Notes +- Subtask files are in worktrees, NOT committed git branches +- **Subtasks do NOT auto-merge or create PRs** - you must integrate their work +- You can read files from subtask worktrees using their paths +- Use standard file tools (cp, diff, cat, rsync) to integrate changes +- You should NOT edit files directly - that's what subtasks are for +- DO NOT DO THE SUBTASKS' WORK! Your only job is to coordinate, not implement. +- When you call `done`, YOUR worktree may be used for the final PR/merge +"#; + + +/// System prompt for supervisor tasks (contract orchestrators). +/// Supervisors monitor all tasks in a contract, create new tasks, and drive the contract to completion. +const SUPERVISOR_SYSTEM_PROMPT: &str = r#"You are the SUPERVISOR for this contract. Your ONLY job is to coordinate work by spawning tasks, waiting for them to complete, and managing git operations. + +## CRITICAL RULES - READ CAREFULLY + +1. **NEVER write code or edit files yourself** - you are a coordinator ONLY +2. **NEVER make commits yourself** - tasks do their own commits +3. **ALWAYS spawn tasks** for ANY work that involves: + - Writing or editing code + - Creating or modifying files + - Making implementation changes + - Any actual development work +4. **ALWAYS wait for tasks to complete** - you MUST use `wait` after spawning +5. **Your role is ONLY to**: + - Analyze the contract goal and break it into tasks + - Spawn tasks AND wait for them to complete + - Review completed task results + - Merge completed work using `merge` + - Create PRs when ready using `pr` + +## REQUIRED WORKFLOW - Follow This Pattern + +For EVERY task you spawn, you MUST: +1. Spawn the task with `spawn` +2. IMMEDIATELY call `wait` to block until completion +3. Check the result and handle success/failure +4. Merge if successful + +```bash +# CORRECT PATTERN - spawn then wait +RESULT=$(makima supervisor spawn "Task Name" "Detailed plan...") +TASK_ID=$(echo "$RESULT" | jq -r '.taskId') +echo "Spawned task: $TASK_ID" + +# MUST wait for the task - DO NOT skip this step! +makima supervisor wait "$TASK_ID" + +# Check result, view diff, merge if successful +makima supervisor diff "$TASK_ID" +makima supervisor merge "$TASK_ID" +``` + +## Example - Full Workflow + +Goal: "Add user authentication" + +```bash +# Step 1: Create a makima branch for this work (use makima/{name} convention) +makima supervisor branch "makima/user-authentication" + +# Step 2: Spawn tasks, wait for each, and merge to the branch + +# Task 1: Research (spawn and wait) +RESULT=$(makima supervisor spawn "Research auth patterns" "Explore the codebase for existing authentication. Document findings.") +TASK_ID=$(echo "$RESULT" | jq -r '.taskId') +makima supervisor wait "$TASK_ID" +# Review findings before continuing + +# Task 2: Login endpoint (spawn and wait) +RESULT=$(makima supervisor spawn "Implement login" "Create POST /api/login endpoint...") +TASK_ID=$(echo "$RESULT" | jq -r '.taskId') +makima supervisor wait "$TASK_ID" +makima supervisor diff "$TASK_ID" +makima supervisor merge "$TASK_ID" --to "makima/user-authentication" + +# Task 3: Logout endpoint (spawn and wait) +RESULT=$(makima supervisor spawn "Implement logout" "Create POST /api/logout endpoint...") +TASK_ID=$(echo "$RESULT" | jq -r '.taskId') +makima supervisor wait "$TASK_ID" +makima supervisor merge "$TASK_ID" --to "makima/user-authentication" + +# Step 3: All tasks complete - create PR from makima branch +makima supervisor pr "makima/user-authentication" --title "Add user authentication" --base main +``` + +## Available Tools (via makima supervisor) + +### Task Management +```bash +# List all tasks in this contract +makima supervisor tasks + +# Spawn a new task (returns JSON with taskId) +makima supervisor spawn "Task Name" "Detailed plan..." + +# IMPORTANT: Wait for task to complete (blocks until done/failed) +makima supervisor wait [timeout_seconds] + +# Read a file from any task's worktree +makima supervisor read-file + +# Get the full task tree structure +makima supervisor tree +``` + +### Git Operations +```bash +# Create a new branch +makima supervisor branch [--from ] + +# Merge a task's changes to a branch +makima supervisor merge [--to ] [--squash] + +# Create a pull request +makima supervisor pr --title "Title" [--body "Body"] [--base main] + +# View a task's diff +makima supervisor diff + +# Create a git checkpoint +makima supervisor checkpoint "Checkpoint message" + +# List checkpoints for a task +makima supervisor checkpoints [task_id] +``` + +### Contract +```bash +# Get contract status +makima supervisor status +``` + +## Key Points + +1. **Create a makima branch first** - use `branch "makima/{name}"` for the contract's work +2. **spawn returns immediately** - the task runs in the background +3. **wait blocks until complete** - you MUST call this to know when a task finishes +4. **Never fire-and-forget** - always wait for each task before moving on +5. **Merge to your makima branch** - use `merge --to "makima/{name}"` to collect completed work +6. **Create PR when done** - use `pr "makima/{name}" --title "..." --base main` + +## Standard Workflow + +1. `branch "makima/{name}"` - Create branch (e.g., "makima/add-auth") +2. For each piece of work: + - `spawn` - Create task + - `wait` - Block until complete + - `merge --to "makima/{name}"` - Merge to branch +3. `pr "makima/{name}" --title "..." --base main` - Create PR + +## Important Reminders + +- **ONLY YOU can spawn tasks** - regular tasks cannot create children +- **NEVER implement anything yourself** - always spawn tasks +- **ALWAYS create a makima branch** - use `makima/{name}` naming convention +- Tasks run independently - you just coordinate +- You will be resumed if interrupted - your conversation is preserved +- Create checkpoints before major transitions + +--- + +"#; + +/// System prompt for tasks that are part of a contract. +/// This tells the task about contract.sh and how to use it to interact with the contract. +const CONTRACT_INTEGRATION_PROMPT: &str = r##" +## Contract Integration + +This task is part of a contract. You have access to contract tools via the `makima contract` CLI. + +### Contract Commands + +```bash +# Get contract context (name, phase, goals) +makima contract status + +# Get phase checklist and deliverables +makima contract checklist + +# List contract files +makima contract files + +# Read a specific file content +makima contract file + +# Report progress to the contract +makima contract report "Completed X, working on Y..." + +# Create a new contract file (content via stdin) +echo "# New Documentation" | makima contract create-file "New Document" + +# Update an existing contract file (content via stdin) +cat updated_content.md | makima contract update-file + +# Get suggested next action when done +makima contract suggest-action + +# Report completion with metrics +makima contract completion-action --files "file1.rs,file2.rs" --code +``` + +### What You Should Do + +**Before starting:** +1. Run `makima contract status` to understand the contract context +2. Run `makima contract checklist` to see phase deliverables +3. Run `makima contract files` to see existing documentation + +**While working:** +- Report significant progress with `makima contract report "..."` + +**When completing:** +1. If your work should be documented, create or update contract files +2. Run `makima contract completion-action` to see recommended next steps +3. Consider what contract files or phases might need updating + +**Important:** Your work should contribute to the contract's goals. Check the contract status to understand what's expected. + +--- + +"##; + +/// Tracks merge state for an orchestrator task. +#[derive(Default)] +struct MergeTracker { + /// Subtask branches that have been successfully merged. + merged_subtasks: HashSet, + /// Subtask branches that were explicitly skipped (with reason). + skipped_subtasks: HashMap, +} + +/// Managed task information. +#[derive(Clone)] +pub struct ManagedTask { + /// Task ID. + pub id: Uuid, + /// Human-readable task name. + pub task_name: String, + /// Current state. + pub state: TaskState, + /// Worktree info if created. + pub worktree: Option, + /// Task plan. + pub plan: String, + /// Repository URL or path. + pub repo_source: Option, + /// Base branch. + pub base_branch: Option, + /// Target branch to merge into. + pub target_branch: Option, + /// Parent task ID if this is a subtask. + pub parent_task_id: Option, + /// Depth in task hierarchy (0=top-level, 1=subtask, 2=sub-subtask). + pub depth: i32, + /// Whether this task runs as an orchestrator (coordinates subtasks). + pub is_orchestrator: bool, + /// Whether this task is a supervisor (long-running contract orchestrator). + pub is_supervisor: bool, + /// Path to target repository for completion actions. + pub target_repo_path: Option, + /// Completion action: "none", "branch", "merge", "pr". + pub completion_action: Option, + /// Task ID to continue from (copy worktree from this task). + pub continue_from_task_id: Option, + /// Files to copy from parent task's worktree. + pub copy_files: Option>, + /// Contract ID if this task is associated with a contract. + pub contract_id: Option, + /// Time task was created. + pub created_at: Instant, + /// Time task started running. + pub started_at: Option, + /// Time task completed. + pub completed_at: Option, + /// Error message if failed. + pub error: Option, +} + +/// Configuration for task execution. +#[derive(Clone)] +pub struct TaskConfig { + /// Maximum concurrent tasks. + pub max_concurrent_tasks: u32, + /// Base directory for worktrees. + pub worktree_base_dir: PathBuf, + /// Environment variables to pass to Claude. + pub env_vars: HashMap, + /// Claude command path. + pub claude_command: String, + /// Additional arguments to pass to Claude Code. + pub claude_args: Vec, + /// Arguments to pass before defaults. + pub claude_pre_args: Vec, + /// Enable Claude's permission system. + pub enable_permissions: bool, + /// Disable verbose output. + pub disable_verbose: bool, +} + +impl Default for TaskConfig { + fn default() -> Self { + Self { + max_concurrent_tasks: 4, + worktree_base_dir: WorktreeManager::default_base_dir(), + env_vars: HashMap::new(), + claude_command: "claude".to_string(), + claude_args: Vec::new(), + claude_pre_args: Vec::new(), + enable_permissions: false, + disable_verbose: false, + } + } +} + +/// Task manager for handling task lifecycle. +pub struct TaskManager { + /// Worktree manager. + worktree_manager: Arc, + /// Process manager. + process_manager: Arc, + /// Temp directory manager. + temp_manager: Arc, + /// Task configuration. + #[allow(dead_code)] + config: TaskConfig, + /// Active tasks. + tasks: Arc>>, + /// Channel to send messages to server. + ws_tx: mpsc::Sender, + /// Semaphore for limiting concurrent tasks. + semaphore: Arc, + /// Channels for sending input to running tasks. + /// Each sender allows sending messages to the stdin of a running Claude process. + task_inputs: Arc>>>, + /// Tracks merge state per orchestrator task (for completion gate). + merge_trackers: Arc>>, +} + +impl TaskManager { + /// Create a new task manager. + pub fn new(config: TaskConfig, ws_tx: mpsc::Sender) -> Self { + let max_concurrent = config.max_concurrent_tasks as usize; + let worktree_manager = Arc::new(WorktreeManager::new(config.worktree_base_dir.clone())); + let process_manager = Arc::new( + ProcessManager::with_command(config.claude_command.clone()) + .with_args(config.claude_args.clone()) + .with_pre_args(config.claude_pre_args.clone()) + .with_permissions_enabled(config.enable_permissions) + .with_verbose_disabled(config.disable_verbose) + .with_env(config.env_vars.clone()), + ); + let temp_manager = Arc::new(TempManager::new()); + + Self { + worktree_manager, + process_manager, + temp_manager, + config, + tasks: Arc::new(RwLock::new(HashMap::new())), + ws_tx, + semaphore: Arc::new(Semaphore::new(max_concurrent)), + task_inputs: Arc::new(RwLock::new(HashMap::new())), + merge_trackers: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Handle a command from the server. + pub async fn handle_command(&self, command: DaemonCommand) -> Result<(), DaemonError> { + tracing::info!("Received command from server: {:?}", command); + + match command { + DaemonCommand::SpawnTask { + task_id, + task_name, + plan, + repo_url, + base_branch, + target_branch, + parent_task_id, + depth, + is_orchestrator, + target_repo_path, + completion_action, + continue_from_task_id, + copy_files, + contract_id, + is_supervisor, + } => { + tracing::info!( + task_id = %task_id, + task_name = %task_name, + repo_url = ?repo_url, + base_branch = ?base_branch, + target_branch = ?target_branch, + parent_task_id = ?parent_task_id, + depth = depth, + is_orchestrator = is_orchestrator, + is_supervisor = is_supervisor, + target_repo_path = ?target_repo_path, + completion_action = ?completion_action, + continue_from_task_id = ?continue_from_task_id, + copy_files = ?copy_files, + contract_id = ?contract_id, + plan_len = plan.len(), + "Spawning new task" + ); + self.spawn_task( + task_id, task_name, plan, repo_url, base_branch, target_branch, + parent_task_id, depth, is_orchestrator, is_supervisor, + target_repo_path, completion_action, continue_from_task_id, + copy_files, contract_id + ).await?; + } + DaemonCommand::PauseTask { task_id } => { + tracing::info!(task_id = %task_id, "Pause not supported for subprocess tasks"); + // Subprocesses don't support pause, just log and ignore + } + DaemonCommand::ResumeTask { task_id } => { + tracing::info!(task_id = %task_id, "Resume not supported for subprocess tasks"); + // Subprocesses don't support resume, just log and ignore + } + DaemonCommand::InterruptTask { task_id, graceful: _ } => { + tracing::info!(task_id = %task_id, "Interrupting task"); + self.interrupt_task(task_id).await?; + } + DaemonCommand::SendMessage { task_id, message } => { + // Check if this is an auth code message + if message.starts_with("AUTH_CODE:") { + let code = message.strip_prefix("AUTH_CODE:").unwrap_or("").trim(); + tracing::info!(task_id = %task_id, "Received auth code from frontend"); + if send_auth_code(code) { + tracing::info!(task_id = %task_id, "Auth code forwarded to setup-token"); + } else { + tracing::warn!(task_id = %task_id, "No pending auth flow to receive code"); + } + } else { + // Regular message - send to task's stdin + tracing::info!(task_id = %task_id, message_len = message.len(), "Sending message to task"); + // Send message to the task's stdin via the input channel + let inputs = self.task_inputs.read().await; + if let Some(sender) = inputs.get(&task_id) { + if let Err(e) = sender.send(message).await { + tracing::warn!(task_id = %task_id, error = %e, "Failed to send message to task input channel"); + } else { + tracing::info!(task_id = %task_id, "Message sent to task successfully"); + } + } else { + drop(inputs); // Release read lock before checking if we need to respawn + + // Check if this is a supervisor that needs to be respawned + let task_info = { + let tasks = self.tasks.read().await; + tasks.get(&task_id).cloned() + }; + + if let Some(task) = task_info { + if task.is_supervisor { + tracing::info!( + task_id = %task_id, + "Supervisor has no active Claude process, respawning with message" + ); + + // Respawn the supervisor with the new message as the plan + // Claude Code will use --continue to maintain conversation history + let inner = self.clone_inner(); + let task_name = task.task_name.clone(); + let repo_source = task.repo_source.clone(); + let base_branch = task.base_branch.clone(); + let target_branch = task.target_branch.clone(); + let target_repo_path = task.target_repo_path.clone(); + let completion_action = task.completion_action.clone(); + let contract_id = task.contract_id; + + // Spawn in background to not block the command handler + tokio::spawn(async move { + if let Err(e) = inner.run_task( + task_id, + task_name, + message, // Use the message as the new prompt + repo_source, + base_branch, + target_branch, + false, // is_orchestrator + true, // is_supervisor + target_repo_path, + completion_action, + None, // continue_from_task_id + None, // copy_files + contract_id, + ).await { + tracing::error!( + task_id = %task_id, + error = %e, + "Failed to respawn supervisor" + ); + } + }); + } else { + tracing::warn!(task_id = %task_id, "No input channel for task (task may not be running)"); + } + } else { + tracing::warn!(task_id = %task_id, "Task not found"); + } + } + } + } + DaemonCommand::InjectSiblingContext { task_id, .. } => { + tracing::debug!(task_id = %task_id, "Sibling context injection not supported for subprocess tasks"); + } + DaemonCommand::Authenticated { daemon_id } => { + tracing::debug!(daemon_id = %daemon_id, "Authenticated command (handled by WS client)"); + } + DaemonCommand::Error { code, message } => { + tracing::warn!(code = %code, message = %message, "Error command from server"); + } + + // ========================================================================= + // Merge Commands + // ========================================================================= + + DaemonCommand::ListBranches { task_id } => { + tracing::info!(task_id = %task_id, "Listing task branches"); + self.handle_list_branches(task_id).await?; + } + DaemonCommand::MergeStart { task_id, source_branch } => { + tracing::info!(task_id = %task_id, source_branch = %source_branch, "Starting merge"); + self.handle_merge_start(task_id, source_branch).await?; + } + DaemonCommand::MergeStatus { task_id } => { + tracing::info!(task_id = %task_id, "Getting merge status"); + self.handle_merge_status(task_id).await?; + } + DaemonCommand::MergeResolve { task_id, file, strategy } => { + tracing::info!(task_id = %task_id, file = %file, strategy = %strategy, "Resolving conflict"); + self.handle_merge_resolve(task_id, file, strategy).await?; + } + DaemonCommand::MergeCommit { task_id, message } => { + tracing::info!(task_id = %task_id, "Committing merge"); + self.handle_merge_commit(task_id, message).await?; + } + DaemonCommand::MergeAbort { task_id } => { + tracing::info!(task_id = %task_id, "Aborting merge"); + self.handle_merge_abort(task_id).await?; + } + DaemonCommand::MergeSkip { task_id, subtask_id, reason } => { + tracing::info!(task_id = %task_id, subtask_id = %subtask_id, reason = %reason, "Skipping subtask merge"); + self.handle_merge_skip(task_id, subtask_id, reason).await?; + } + DaemonCommand::CheckMergeComplete { task_id } => { + tracing::info!(task_id = %task_id, "Checking merge completion"); + self.handle_check_merge_complete(task_id).await?; + } + + // ========================================================================= + // Completion Action Commands + // ========================================================================= + + DaemonCommand::RetryCompletionAction { + task_id, + task_name, + action, + target_repo_path, + target_branch, + } => { + tracing::info!( + task_id = %task_id, + task_name = %task_name, + action = %action, + target_repo_path = %target_repo_path, + target_branch = ?target_branch, + "Retrying completion action" + ); + self.handle_retry_completion_action(task_id, task_name, action, target_repo_path, target_branch).await?; + } + + DaemonCommand::CloneWorktree { task_id, target_dir } => { + tracing::info!( + task_id = %task_id, + target_dir = %target_dir, + "Cloning worktree to target directory" + ); + self.handle_clone_worktree(task_id, target_dir).await?; + } + + DaemonCommand::CheckTargetExists { task_id, target_dir } => { + tracing::debug!( + task_id = %task_id, + target_dir = %target_dir, + "Checking if target directory exists" + ); + self.handle_check_target_exists(task_id, target_dir).await?; + } + + // ========================================================================= + // Contract File Commands + // ========================================================================= + + DaemonCommand::ReadRepoFile { + request_id, + contract_id, + file_path, + repo_path, + } => { + tracing::info!( + request_id = %request_id, + contract_id = %contract_id, + file_path = %file_path, + repo_path = %repo_path, + "Reading file from repository" + ); + self.handle_read_repo_file(request_id, file_path, repo_path).await?; + } + DaemonCommand::CreateBranch { + task_id, + branch_name, + from_ref, + } => { + tracing::info!( + task_id = %task_id, + branch_name = %branch_name, + from_ref = ?from_ref, + "Creating branch" + ); + self.handle_create_branch(task_id, branch_name, from_ref).await?; + } + DaemonCommand::MergeTaskToTarget { + task_id, + target_branch, + squash, + } => { + tracing::info!( + task_id = %task_id, + target_branch = ?target_branch, + squash = squash, + "Merging task to target branch" + ); + self.handle_merge_task_to_target(task_id, target_branch, squash).await?; + } + DaemonCommand::CreatePR { + task_id, + title, + body, + base_branch, + } => { + tracing::info!( + task_id = %task_id, + title = %title, + base_branch = %base_branch, + "Creating pull request" + ); + self.handle_create_pr(task_id, title, body, base_branch).await?; + } + DaemonCommand::GetTaskDiff { + task_id, + } => { + tracing::info!(task_id = %task_id, "Getting task diff"); + self.handle_get_task_diff(task_id).await?; + } + } + Ok(()) + } + + /// Spawn a new task. + #[allow(clippy::too_many_arguments)] + pub async fn spawn_task( + &self, + task_id: Uuid, + task_name: String, + plan: String, + repo_url: Option, + base_branch: Option, + target_branch: Option, + parent_task_id: Option, + depth: i32, + is_orchestrator: bool, + is_supervisor: bool, + target_repo_path: Option, + completion_action: Option, + continue_from_task_id: Option, + copy_files: Option>, + contract_id: Option, + ) -> TaskResult<()> { + tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, "=== SPAWN_TASK START ==="); + + // Check if task already exists - allow re-spawning if in terminal state + { + let mut tasks = self.tasks.write().await; + if let Some(existing) = tasks.get(&task_id) { + if existing.state.is_terminal() { + // Task exists but is in terminal state (completed, failed, interrupted) + // Remove it so we can re-spawn + tracing::info!(task_id = %task_id, old_state = ?existing.state, "Removing terminated task to allow re-spawn"); + tasks.remove(&task_id); + } else { + // Task is still active, reject + tracing::warn!(task_id = %task_id, state = ?existing.state, "Task already exists and is active, rejecting spawn"); + return Err(TaskError::AlreadyExists(task_id)); + } + } + } + + // Acquire semaphore permit + tracing::info!(task_id = %task_id, "Acquiring concurrency permit..."); + let permit = self + .semaphore + .clone() + .try_acquire_owned() + .map_err(|_| { + tracing::warn!(task_id = %task_id, "Concurrency limit reached, cannot spawn task"); + TaskError::ConcurrencyLimit + })?; + tracing::info!(task_id = %task_id, "Concurrency permit acquired"); + + // Create task entry + tracing::info!(task_id = %task_id, "Creating task entry in state: Initializing"); + let task = ManagedTask { + id: task_id, + task_name: task_name.clone(), + state: TaskState::Initializing, + worktree: None, + plan: plan.clone(), + repo_source: repo_url.clone(), + base_branch: base_branch.clone(), + target_branch: target_branch.clone(), + parent_task_id, + depth, + is_orchestrator, + is_supervisor, + target_repo_path: target_repo_path.clone(), + completion_action: completion_action.clone(), + continue_from_task_id, + copy_files: copy_files.clone(), + contract_id, + created_at: Instant::now(), + started_at: None, + completed_at: None, + error: None, + }; + + self.tasks.write().await.insert(task_id, task); + tracing::info!(task_id = %task_id, "Task entry created and stored"); + + // Notify server of status change + tracing::info!(task_id = %task_id, "Notifying server: pending -> initializing"); + self.send_status_change(task_id, "pending", "initializing").await; + + // Spawn task in background + tracing::info!(task_id = %task_id, "Spawning background task runner"); + let inner = self.clone_inner(); + tokio::spawn(async move { + let _permit = permit; // Hold permit until done + tracing::info!(task_id = %task_id, "Background task runner started"); + + if let Err(e) = inner.run_task( + task_id, task_name, plan, repo_url, base_branch, target_branch, + is_orchestrator, is_supervisor, target_repo_path, completion_action, + continue_from_task_id, copy_files, contract_id + ).await { + tracing::error!(task_id = %task_id, error = %e, "Task execution failed"); + inner.mark_failed(task_id, &e.to_string()).await; + } + tracing::info!(task_id = %task_id, "Background task runner completed"); + }); + + tracing::info!(task_id = %task_id, "=== SPAWN_TASK END (task running in background) ==="); + Ok(()) + } + + /// Clone inner state for spawned tasks. + fn clone_inner(&self) -> TaskManagerInner { + TaskManagerInner { + worktree_manager: self.worktree_manager.clone(), + process_manager: self.process_manager.clone(), + temp_manager: self.temp_manager.clone(), + tasks: self.tasks.clone(), + ws_tx: self.ws_tx.clone(), + task_inputs: self.task_inputs.clone(), + } + } + + /// Interrupt a task. + pub async fn interrupt_task(&self, task_id: Uuid) -> TaskResult<()> { + let mut tasks = self.tasks.write().await; + let task = tasks.get_mut(&task_id).ok_or(TaskError::NotFound(task_id))?; + + if task.state.is_terminal() { + return Ok(()); // Already done + } + + let old_state = task.state; + task.state = TaskState::Interrupted; + task.completed_at = Some(Instant::now()); + + // Notify server + drop(tasks); + self.send_status_change(task_id, old_state.as_str(), "interrupted").await; + + // Note: The process will be killed when the ClaudeProcess is dropped + // Worktrees are kept until explicitly deleted + + Ok(()) + } + + /// Get list of active task IDs. + pub async fn active_task_ids(&self) -> Vec { + self.tasks + .read() + .await + .iter() + .filter(|(_, t)| t.state.is_active()) + .map(|(id, _)| *id) + .collect() + } + + /// Get task state. + pub async fn get_task_state(&self, task_id: Uuid) -> Option { + self.tasks.read().await.get(&task_id).map(|t| t.state) + } + + /// Send status change notification to server. + async fn send_status_change(&self, task_id: Uuid, old_status: &str, new_status: &str) { + let msg = DaemonMessage::task_status_change(task_id, old_status, new_status); + let _ = self.ws_tx.send(msg).await; + } + + // ========================================================================= + // Merge Handler Methods + // ========================================================================= + + /// Get worktree path for a task, or return error if not found. + /// First checks in-memory tasks, then scans the worktrees directory. + async fn get_task_worktree_path(&self, task_id: Uuid) -> Result { + // First try to get from in-memory tasks + { + let tasks = self.tasks.read().await; + if let Some(task) = tasks.get(&task_id) { + if let Some(ref worktree) = task.worktree { + return Ok(worktree.path.clone()); + } + } + } + + // Task not in memory - scan worktrees directory for matching task ID + let short_id = &task_id.to_string()[..8]; + let worktrees_dir = self.worktree_manager.base_dir(); + + if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await { + while let Ok(Some(entry)) = entries.next_entry().await { + let name = entry.file_name(); + let name_str = name.to_string_lossy(); + if name_str.starts_with(short_id) { + let path = entry.path(); + // Verify it's a valid git directory + if path.join(".git").exists() { + tracing::info!( + task_id = %task_id, + worktree_path = %path.display(), + "Found worktree by scanning directory" + ); + return Ok(path); + } + } + } + } + + Err(DaemonError::Task(TaskError::SetupFailed( + format!("No worktree found for task {}. The worktree may have been cleaned up.", task_id) + ))) + } + + /// Handle ListBranches command. + async fn handle_list_branches(&self, task_id: Uuid) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + match self.worktree_manager.list_task_branches(&worktree_path).await { + Ok(branches) => { + let branch_infos: Vec = branches + .into_iter() + .map(|b| BranchInfo { + name: b.name, + task_id: b.task_id, + is_merged: b.is_merged, + last_commit: b.last_commit, + last_commit_message: b.last_commit_message, + }) + .collect(); + + let msg = DaemonMessage::BranchList { + task_id, + branches: branch_infos, + }; + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + tracing::error!(task_id = %task_id, error = %e, "Failed to list branches"); + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: e.to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + } + Ok(()) + } + + /// Handle MergeStart command. + async fn handle_merge_start(&self, task_id: Uuid, source_branch: String) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + match self.worktree_manager.merge_branch(&worktree_path, &source_branch).await { + Ok(None) => { + // Merge succeeded without conflicts + let msg = DaemonMessage::MergeResult { + task_id, + success: true, + message: "Merge completed without conflicts".to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + Ok(Some(conflicts)) => { + // Merge has conflicts + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: format!("Merge has {} conflicts", conflicts.len()), + commit_sha: None, + conflicts: Some(conflicts), + }; + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: e.to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + } + Ok(()) + } + + /// Handle MergeStatus command. + async fn handle_merge_status(&self, task_id: Uuid) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + match self.worktree_manager.get_merge_state(&worktree_path).await { + Ok(state) => { + let msg = DaemonMessage::MergeStatusResponse { + task_id, + in_progress: state.in_progress, + source_branch: if state.in_progress { Some(state.source_branch) } else { None }, + conflicted_files: state.conflicted_files, + }; + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + tracing::error!(task_id = %task_id, error = %e, "Failed to get merge status"); + let msg = DaemonMessage::MergeStatusResponse { + task_id, + in_progress: false, + source_branch: None, + conflicted_files: vec![], + }; + let _ = self.ws_tx.send(msg).await; + } + } + Ok(()) + } + + /// Handle MergeResolve command. + async fn handle_merge_resolve(&self, task_id: Uuid, file: String, strategy: String) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + let resolution = match strategy.to_lowercase().as_str() { + "ours" => ConflictResolution::Ours, + "theirs" => ConflictResolution::Theirs, + _ => { + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: format!("Invalid strategy '{}', must be 'ours' or 'theirs'", strategy), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + return Ok(()); + } + }; + + match self.worktree_manager.resolve_conflict(&worktree_path, &file, resolution).await { + Ok(()) => { + let msg = DaemonMessage::MergeResult { + task_id, + success: true, + message: format!("Resolved conflict in {}", file), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: e.to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + } + Ok(()) + } + + /// Handle MergeCommit command. + async fn handle_merge_commit(&self, task_id: Uuid, message: String) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + match self.worktree_manager.commit_merge(&worktree_path, &message).await { + Ok(commit_sha) => { + // Track this merge as completed (extract subtask ID from branch if possible) + // For now, we'll track it when MergeSkip is called or based on branch names + let msg = DaemonMessage::MergeResult { + task_id, + success: true, + message: "Merge committed successfully".to_string(), + commit_sha: Some(commit_sha), + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: e.to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + } + Ok(()) + } + + /// Handle MergeAbort command. + async fn handle_merge_abort(&self, task_id: Uuid) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + match self.worktree_manager.abort_merge(&worktree_path).await { + Ok(()) => { + let msg = DaemonMessage::MergeResult { + task_id, + success: true, + message: "Merge aborted".to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: e.to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + } + Ok(()) + } + + /// Handle MergeSkip command. + async fn handle_merge_skip(&self, task_id: Uuid, subtask_id: Uuid, reason: String) -> Result<(), DaemonError> { + // Record that this subtask was skipped + { + let mut trackers = self.merge_trackers.write().await; + let tracker = trackers.entry(task_id).or_insert_with(MergeTracker::default); + tracker.skipped_subtasks.insert(subtask_id, reason.clone()); + } + + let msg = DaemonMessage::MergeResult { + task_id, + success: true, + message: format!("Subtask {} skipped: {}", subtask_id, reason), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Handle CheckMergeComplete command. + async fn handle_check_merge_complete(&self, task_id: Uuid) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + // Get all task branches + let branches = match self.worktree_manager.list_task_branches(&worktree_path).await { + Ok(b) => b, + Err(e) => { + let msg = DaemonMessage::MergeCompleteCheck { + task_id, + can_complete: false, + unmerged_branches: vec![format!("Error listing branches: {}", e)], + merged_count: 0, + skipped_count: 0, + }; + let _ = self.ws_tx.send(msg).await; + return Ok(()); + } + }; + + // Get tracker state + let trackers = self.merge_trackers.read().await; + let empty_merged: HashSet = HashSet::new(); + let empty_skipped: HashMap = HashMap::new(); + let tracker = trackers.get(&task_id); + let merged_set = tracker.map(|t| &t.merged_subtasks).unwrap_or(&empty_merged); + let skipped_set = tracker.map(|t| &t.skipped_subtasks).unwrap_or(&empty_skipped); + + let mut merged_count = 0u32; + let mut skipped_count = 0u32; + let mut unmerged_branches = Vec::new(); + + for branch in &branches { + if branch.is_merged { + merged_count += 1; + } else if let Some(subtask_id) = branch.task_id { + if merged_set.contains(&subtask_id) { + merged_count += 1; + } else if skipped_set.contains_key(&subtask_id) { + skipped_count += 1; + } else { + unmerged_branches.push(branch.name.clone()); + } + } else { + // Branch without task ID - check if it's merged + unmerged_branches.push(branch.name.clone()); + } + } + + let can_complete = unmerged_branches.is_empty(); + + let msg = DaemonMessage::MergeCompleteCheck { + task_id, + can_complete, + unmerged_branches, + merged_count, + skipped_count, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Mark a subtask as merged in the tracker. + #[allow(dead_code)] + pub async fn mark_subtask_merged(&self, orchestrator_task_id: Uuid, subtask_id: Uuid) { + let mut trackers = self.merge_trackers.write().await; + let tracker = trackers.entry(orchestrator_task_id).or_insert_with(MergeTracker::default); + tracker.merged_subtasks.insert(subtask_id); + } + + // ========================================================================= + // Completion Action Handler Methods + // ========================================================================= + + /// Handle RetryCompletionAction command. + async fn handle_retry_completion_action( + &self, + task_id: Uuid, + task_name: String, + action: String, + target_repo_path: String, + target_branch: Option, + ) -> Result<(), DaemonError> { + // Get the task's worktree path + let worktree_path = self.get_task_worktree_path(task_id).await?; + + // Execute the completion action + let inner = self.clone_inner(); + let result = inner.execute_completion_action( + task_id, + &task_name, + &worktree_path, + &action, + Some(target_repo_path.as_str()), + target_branch.as_deref(), + ).await; + + // Send result back to server + let msg = match result { + Ok(pr_url) => DaemonMessage::CompletionActionResult { + task_id, + success: true, + message: match action.as_str() { + "branch" => format!("Branch pushed to {}", target_repo_path), + "merge" => format!("Merged into {}", target_branch.as_deref().unwrap_or("main")), + "pr" => format!("Pull request created"), + _ => format!("Completion action '{}' executed", action), + }, + pr_url, + }, + Err(e) => DaemonMessage::CompletionActionResult { + task_id, + success: false, + message: e, + pr_url: None, + }, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Handle CloneWorktree command. + async fn handle_clone_worktree( + &self, + task_id: Uuid, + target_dir: String, + ) -> Result<(), DaemonError> { + // Get the task's worktree path + let worktree_path = self.get_task_worktree_path(task_id).await?; + + // Expand tilde in target path + let target_path = crate::daemon::worktree::expand_tilde(&target_dir); + + // Clone the worktree to target directory + let result = self.worktree_manager.clone_worktree_to_directory( + &worktree_path, + &target_path, + ).await; + + // Send result back to server + let msg = match result { + Ok(message) => DaemonMessage::CloneWorktreeResult { + task_id, + success: true, + message, + target_dir: Some(target_path.to_string_lossy().to_string()), + }, + Err(e) => DaemonMessage::CloneWorktreeResult { + task_id, + success: false, + message: e.to_string(), + target_dir: None, + }, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Handle CheckTargetExists command. + async fn handle_check_target_exists( + &self, + task_id: Uuid, + target_dir: String, + ) -> Result<(), DaemonError> { + // Expand tilde in target path + let target_path = crate::daemon::worktree::expand_tilde(&target_dir); + + // Check if target exists + let exists = self.worktree_manager.target_directory_exists(&target_path).await; + + // Send result back to server + let msg = DaemonMessage::CheckTargetExistsResult { + task_id, + exists, + target_dir: target_path.to_string_lossy().to_string(), + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Handle ReadRepoFile command. + /// + /// Reads a file from a repository on the daemon's filesystem and sends + /// the content back to the server for syncing contract files. + async fn handle_read_repo_file( + &self, + request_id: Uuid, + file_path: String, + repo_path: String, + ) -> Result<(), DaemonError> { + // Expand tilde in repo path + let repo_path_expanded = crate::daemon::worktree::expand_tilde(&repo_path); + + // Construct full file path + let full_path = repo_path_expanded.join(&file_path); + + // Try to read the file + let (content, success, error) = match tokio::fs::read_to_string(&full_path).await { + Ok(content) => (Some(content), true, None), + Err(e) => { + tracing::warn!( + request_id = %request_id, + file_path = %file_path, + repo_path = %repo_path, + full_path = %full_path.display(), + error = %e, + "Failed to read repo file" + ); + (None, false, Some(e.to_string())) + } + }; + + // Send result back to server + let msg = DaemonMessage::RepoFileContent { + request_id, + file_path, + content, + success, + error, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Handle CreateBranch command - create a new branch in a task's worktree. + async fn handle_create_branch( + &self, + task_id: Uuid, + branch_name: String, + from_ref: Option, + ) -> Result<(), DaemonError> { + // Get task's worktree path + let worktree_path = { + let tasks = self.tasks.read().await; + tasks.get(&task_id) + .and_then(|t| t.worktree.as_ref()) + .map(|w| w.path.clone()) + }; + + let (success, message) = if let Some(path) = worktree_path { + // Build git checkout command + let mut cmd = tokio::process::Command::new("git"); + cmd.current_dir(&path); + cmd.arg("checkout").arg("-b").arg(&branch_name); + + if let Some(ref from) = from_ref { + cmd.arg(from); + } + + match cmd.output().await { + Ok(output) => { + if output.status.success() { + (true, format!("Branch '{}' created successfully", branch_name)) + } else { + let stderr = String::from_utf8_lossy(&output.stderr); + (false, format!("Failed to create branch: {}", stderr)) + } + } + Err(e) => (false, format!("Failed to execute git: {}", e)), + } + } else { + (false, format!("Task {} not found or has no worktree", task_id)) + }; + + let msg = DaemonMessage::BranchCreated { + task_id, + success, + branch_name, + message, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Handle MergeTaskToTarget command - merge a task's changes to a target branch. + async fn handle_merge_task_to_target( + &self, + task_id: Uuid, + target_branch: Option, + squash: bool, + ) -> Result<(), DaemonError> { + // Get task info + let task_info = { + let tasks = self.tasks.read().await; + tasks.get(&task_id).map(|t| ( + t.worktree.as_ref().map(|w| w.path.clone()), + t.base_branch.clone(), + )) + }; + + let (success, message, commit_sha, conflicts) = match task_info { + Some((Some(worktree_path), base)) => { + let target = target_branch.unwrap_or_else(|| base.unwrap_or_else(|| "main".to_string())); + + // First, stage and commit any uncommitted changes + let add_result = tokio::process::Command::new("git") + .current_dir(&worktree_path) + .args(["add", "-A"]) + .output() + .await; + + if let Err(e) = add_result { + (false, format!("Failed to stage changes: {}", e), None, None) + } else { + // Commit if there are staged changes + let commit_result = tokio::process::Command::new("git") + .current_dir(&worktree_path) + .args(["commit", "-m", "Task completion checkpoint", "--allow-empty"]) + .output() + .await; + + if let Err(e) = commit_result { + tracing::warn!(task_id = %task_id, error = %e, "Commit failed (may be empty)"); + } + + // Get current branch name + let branch_output = tokio::process::Command::new("git") + .current_dir(&worktree_path) + .args(["rev-parse", "--abbrev-ref", "HEAD"]) + .output() + .await; + + let source_branch = branch_output + .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string()) + .unwrap_or_else(|_| "unknown".to_string()); + + // Checkout target branch + let checkout = tokio::process::Command::new("git") + .current_dir(&worktree_path) + .args(["checkout", &target]) + .output() + .await; + + match checkout { + Ok(output) if output.status.success() => { + // Merge the source branch + let mut merge_cmd = tokio::process::Command::new("git"); + merge_cmd.current_dir(&worktree_path); + merge_cmd.arg("merge"); + if squash { + merge_cmd.arg("--squash"); + } + merge_cmd.arg(&source_branch); + merge_cmd.arg("-m").arg(format!("Merge task {} into {}", task_id, target)); + + match merge_cmd.output().await { + Ok(output) if output.status.success() => { + // Get the commit SHA + let sha_output = tokio::process::Command::new("git") + .current_dir(&worktree_path) + .args(["rev-parse", "HEAD"]) + .output() + .await; + + let sha = sha_output + .ok() + .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string()); + + if squash { + // For squash merge, we need to commit + let _ = tokio::process::Command::new("git") + .current_dir(&worktree_path) + .args(["commit", "-m", &format!("Squashed merge of task {}", task_id)]) + .output() + .await; + } + + (true, format!("Merged {} into {}", source_branch, target), sha, None) + } + Ok(output) => { + let stderr = String::from_utf8_lossy(&output.stderr); + // Check for merge conflicts + if stderr.contains("CONFLICT") { + let conflict_files = stderr + .lines() + .filter(|l| l.contains("CONFLICT")) + .map(|l| l.to_string()) + .collect::>(); + (false, "Merge conflicts detected".to_string(), None, Some(conflict_files)) + } else { + (false, format!("Merge failed: {}", stderr), None, None) + } + } + Err(e) => (false, format!("Failed to merge: {}", e), None, None), + } + } + Ok(output) => { + let stderr = String::from_utf8_lossy(&output.stderr); + (false, format!("Failed to checkout target branch: {}", stderr), None, None) + } + Err(e) => (false, format!("Failed to checkout: {}", e), None, None), + } + } + } + Some((None, _)) => (false, format!("Task {} has no worktree", task_id), None, None), + None => (false, format!("Task {} not found", task_id), None, None), + }; + + let msg = DaemonMessage::MergeToTargetResult { + task_id, + success, + message, + commit_sha, + conflicts, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Handle CreatePR command - create a pull request for a task's changes. + async fn handle_create_pr( + &self, + task_id: Uuid, + title: String, + body: Option, + base_branch: String, + ) -> Result<(), DaemonError> { + // Get task's worktree path + let worktree_path = { + let tasks = self.tasks.read().await; + tasks.get(&task_id) + .and_then(|t| t.worktree.as_ref()) + .map(|w| w.path.clone()) + }; + + let (success, message, pr_url, pr_number) = if let Some(path) = worktree_path { + // Push the current branch first + let push_result = tokio::process::Command::new("git") + .current_dir(&path) + .args(["push", "-u", "origin", "HEAD"]) + .output() + .await; + + if let Err(e) = push_result { + (false, format!("Failed to push branch: {}", e), None, None) + } else { + // Create PR using gh CLI + let mut pr_cmd = tokio::process::Command::new("gh"); + pr_cmd.current_dir(&path); + pr_cmd.args(["pr", "create", "--title", &title, "--base", &base_branch]); + + if let Some(ref body_text) = body { + pr_cmd.args(["--body", body_text]); + } else { + pr_cmd.args(["--body", ""]); + } + + match pr_cmd.output().await { + Ok(output) if output.status.success() => { + let stdout = String::from_utf8_lossy(&output.stdout); + // gh pr create outputs the PR URL + let url = stdout.lines().last().map(|s| s.trim().to_string()); + // Extract PR number from URL + let number = url.as_ref().and_then(|u| { + u.split('/').last().and_then(|n| n.parse::().ok()) + }); + (true, "Pull request created".to_string(), url, number) + } + Ok(output) => { + let stderr = String::from_utf8_lossy(&output.stderr); + (false, format!("Failed to create PR: {}", stderr), None, None) + } + Err(e) => (false, format!("Failed to run gh: {}", e), None, None), + } + } + } else { + (false, format!("Task {} not found or has no worktree", task_id), None, None) + }; + + let msg = DaemonMessage::PRCreated { + task_id, + success, + message, + pr_url, + pr_number, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Handle GetTaskDiff command - get the diff for a task's changes. + async fn handle_get_task_diff( + &self, + task_id: Uuid, + ) -> Result<(), DaemonError> { + // Get task's worktree path + let worktree_path = { + let tasks = self.tasks.read().await; + tasks.get(&task_id) + .and_then(|t| t.worktree.as_ref()) + .map(|w| w.path.clone()) + }; + + let (success, diff, error) = if let Some(path) = worktree_path { + // Get diff of all changes (staged and unstaged) + let diff_result = tokio::process::Command::new("git") + .current_dir(&path) + .args(["diff", "HEAD"]) + .output() + .await; + + match diff_result { + Ok(output) if output.status.success() => { + let diff_text = String::from_utf8_lossy(&output.stdout).to_string(); + if diff_text.is_empty() { + // No uncommitted changes, show diff from base + let base_diff = tokio::process::Command::new("git") + .current_dir(&path) + .args(["log", "-p", "--reverse", "HEAD~10..HEAD", "--"]) + .output() + .await; + + match base_diff { + Ok(o) => (true, Some(String::from_utf8_lossy(&o.stdout).to_string()), None), + Err(e) => (false, None, Some(format!("Failed to get diff: {}", e))), + } + } else { + (true, Some(diff_text), None) + } + } + Ok(output) => { + let stderr = String::from_utf8_lossy(&output.stderr); + (false, None, Some(format!("Git diff failed: {}", stderr))) + } + Err(e) => (false, None, Some(format!("Failed to run git: {}", e))), + } + } else { + (false, None, Some(format!("Task {} not found or has no worktree", task_id))) + }; + + let msg = DaemonMessage::TaskDiff { + task_id, + success, + diff, + error, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } +} + +/// Inner state for spawned tasks (cloneable). +struct TaskManagerInner { + worktree_manager: Arc, + process_manager: Arc, + temp_manager: Arc, + tasks: Arc>>, + ws_tx: mpsc::Sender, + task_inputs: Arc>>>, +} + +impl TaskManagerInner { + /// Run a task to completion. + #[allow(clippy::too_many_arguments)] + async fn run_task( + &self, + task_id: Uuid, + task_name: String, + plan: String, + repo_source: Option, + base_branch: Option, + target_branch: Option, + is_orchestrator: bool, + is_supervisor: bool, + target_repo_path: Option, + completion_action: Option, + continue_from_task_id: Option, + copy_files: Option>, + contract_id: Option, + ) -> Result<(), DaemonError> { + tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, "=== RUN_TASK START ==="); + + // Determine working directory + let working_dir = if let Some(ref source) = repo_source { + if is_new_repo_request(source) { + // Explicit new repo request: new:// or new://project-name + tracing::info!( + task_id = %task_id, + source = %source, + "Creating new git repository" + ); + + let msg = DaemonMessage::task_output( + task_id, + format!("Initializing new git repository...\n"), + false, + ); + let _ = self.ws_tx.send(msg).await; + + let worktree_info = self.worktree_manager + .init_new_repo(task_id, source) + .await + .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?; + + tracing::info!( + task_id = %task_id, + path = %worktree_info.path.display(), + "New repository created" + ); + + // Store worktree info + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.worktree = Some(worktree_info.clone()); + } + } + + let msg = DaemonMessage::task_output( + task_id, + format!("Repository ready at {}\n", worktree_info.path.display()), + false, + ); + let _ = self.ws_tx.send(msg).await; + + worktree_info.path + } else { + // Send progress message + let msg = DaemonMessage::task_output( + task_id, + format!("Setting up worktree from {}...\n", source), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // Ensure source repo exists (clone if URL, verify if path) + let source_repo = self.worktree_manager.ensure_repo(source).await + .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?; + + // Detect or use provided base branch + let branch = if let Some(ref b) = base_branch { + b.clone() + } else { + self.worktree_manager.detect_default_branch(&source_repo).await + .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))? + }; + + tracing::info!( + task_id = %task_id, + source = %source, + branch = %branch, + continue_from_task_id = ?continue_from_task_id, + "Setting up worktree" + ); + + // Create worktree - either from scratch or copying from another task + let task_name = format!("task-{}", &task_id.to_string()[..8]); + let worktree_info = if let Some(from_task_id) = continue_from_task_id { + // Find the source task's worktree path + let source_worktree = self.find_worktree_for_task(from_task_id).await + .map_err(|e| DaemonError::Task(TaskError::SetupFailed( + format!("Cannot continue from task {}: {}", from_task_id, e) + )))?; + + let msg = DaemonMessage::task_output( + task_id, + format!("Continuing from task {} worktree...\n", &from_task_id.to_string()[..8]), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // Create worktree by copying from source task + self.worktree_manager + .create_worktree_from_task(&source_worktree, task_id, &task_name) + .await + .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))? + } else { + // Create fresh worktree from repo + self.worktree_manager + .create_worktree(&source_repo, task_id, &task_name, &branch) + .await + .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))? + }; + + tracing::info!( + task_id = %task_id, + worktree_path = %worktree_info.path.display(), + branch = %worktree_info.branch, + continued_from = ?continue_from_task_id, + "Worktree created" + ); + + // Store worktree info + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.worktree = Some(worktree_info.clone()); + } + } + + let msg = DaemonMessage::task_output( + task_id, + format!("Worktree ready at {}\n", worktree_info.path.display()), + false, + ); + let _ = self.ws_tx.send(msg).await; + + worktree_info.path + } + } else { + // No repo specified - use managed temp directory in ~/.makima/temp/ + tracing::info!(task_id = %task_id, "Creating managed temp directory (no repo)"); + + let msg = DaemonMessage::task_output( + task_id, + "Creating temporary working directory...\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + + let temp_dir = self.temp_manager.create_task_dir(task_id).await?; + + let msg = DaemonMessage::task_output( + task_id, + format!("Working directory ready at {}\n", temp_dir.display()), + false, + ); + let _ = self.ws_tx.send(msg).await; + + temp_dir + }; + + // Copy files from parent task's worktree if specified + if let Some(ref files) = copy_files { + if !files.is_empty() { + // Get the parent task ID to find its worktree + let parent_task_id = { + let tasks = self.tasks.read().await; + tasks.get(&task_id).and_then(|t| t.parent_task_id) + }; + + if let Some(parent_id) = parent_task_id { + match self.find_worktree_for_task(parent_id).await { + Ok(parent_worktree) => { + let msg = DaemonMessage::task_output( + task_id, + format!("Copying {} files from orchestrator...\n", files.len()), + false, + ); + let _ = self.ws_tx.send(msg).await; + + for file_path in files { + let source = parent_worktree.join(file_path); + let dest = working_dir.join(file_path); + + // Create parent directories if needed + if let Some(parent) = dest.parent() { + if let Err(e) = tokio::fs::create_dir_all(parent).await { + tracing::warn!( + task_id = %task_id, + file = %file_path, + error = %e, + "Failed to create parent directory for file" + ); + continue; + } + } + + // Copy the file + match tokio::fs::copy(&source, &dest).await { + Ok(_) => { + tracing::info!( + task_id = %task_id, + source = %source.display(), + dest = %dest.display(), + "Copied file from orchestrator" + ); + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + source = %source.display(), + dest = %dest.display(), + error = %e, + "Failed to copy file from orchestrator" + ); + // Notify but don't fail - the file might be optional + let msg = DaemonMessage::task_output( + task_id, + format!("Warning: Could not copy {}: {}\n", file_path, e), + false, + ); + let _ = self.ws_tx.send(msg).await; + } + } + } + + let msg = DaemonMessage::task_output( + task_id, + "Files copied from orchestrator.\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + parent_id = %parent_id, + error = %e, + "Could not find parent task worktree for file copying" + ); + } + } + } else { + tracing::warn!( + task_id = %task_id, + "copy_files specified but no parent_task_id" + ); + } + } + } + + // Update state to Starting + tracing::info!(task_id = %task_id, "Updating state: Initializing -> Starting"); + self.update_state(task_id, TaskState::Starting).await; + self.send_status_change(task_id, "initializing", "starting").await; + + // Check Claude is available + match self.process_manager.check_claude_available().await { + Ok(version) => { + tracing::info!(task_id = %task_id, version = %version, "Claude Code available"); + let msg = DaemonMessage::task_output( + task_id, + format!("Claude Code {} ready\n", version), + false, + ); + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + let err_msg = format!("Claude Code not available: {}", e); + tracing::error!(task_id = %task_id, error = %err_msg); + return Err(DaemonError::Task(TaskError::SetupFailed(err_msg))); + } + } + + // Set up supervisor, orchestrator, or subtask mode + let (extra_env, full_plan, system_prompt) = if is_supervisor { + // Supervisor mode: long-running contract orchestrator + tracing::info!(task_id = %task_id, working_dir = %working_dir.display(), "Setting up supervisor mode"); + + let msg = DaemonMessage::task_output( + task_id, + "Setting up supervisor environment...\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // Generate tool key for API access + let tool_key = generate_tool_key(); + tracing::info!(task_id = %task_id, tool_key_len = tool_key.len(), "Generated tool key for supervisor"); + + // Register tool key with server + let register_msg = DaemonMessage::register_tool_key(task_id, tool_key.clone()); + if self.ws_tx.send(register_msg).await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to register tool key"); + } else { + tracing::info!(task_id = %task_id, "Tool key registration message sent to server"); + } + + // Set up environment variables for makima CLI + let mut env = HashMap::new(); + // TODO: Make API URL configurable + env.insert("MAKIMA_API_URL".to_string(), "http://localhost:8080".to_string()); + env.insert("MAKIMA_API_KEY".to_string(), tool_key.clone()); + env.insert("MAKIMA_TASK_ID".to_string(), task_id.to_string()); + // Supervisor needs contract ID for its tools + if let Some(cid) = contract_id { + env.insert("MAKIMA_CONTRACT_ID".to_string(), cid.to_string()); + } + + tracing::info!( + task_id = %task_id, + api_url = "http://localhost:8080", + tool_key_preview = &tool_key[..8.min(tool_key.len())], + "Set supervisor environment variables" + ); + + // For supervisor, pass instructions as SYSTEM PROMPT (not user message) + // This ensures Claude treats them as behavioral constraints + let supervisor_user_plan = format!( + "Contract goal:\n{}", + plan + ); + + let msg = DaemonMessage::task_output( + task_id, + "Supervisor environment ready (makima CLI available)\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // Return system prompt separately - it will be passed via --system-prompt flag + (Some(env), supervisor_user_plan, Some(SUPERVISOR_SYSTEM_PROMPT.to_string())) + } else if is_orchestrator { + tracing::info!(task_id = %task_id, working_dir = %working_dir.display(), "Setting up orchestrator mode"); + + let msg = DaemonMessage::task_output( + task_id, + "Setting up orchestrator environment...\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // Generate tool key for API access + let tool_key = generate_tool_key(); + tracing::info!(task_id = %task_id, tool_key_len = tool_key.len(), "Generated tool key for orchestrator"); + + // Register tool key with server + let register_msg = DaemonMessage::register_tool_key(task_id, tool_key.clone()); + if self.ws_tx.send(register_msg).await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to register tool key"); + } else { + tracing::info!(task_id = %task_id, "Tool key registration message sent to server"); + } + + // Set up environment variables for makima CLI + let mut env = HashMap::new(); + // TODO: Make API URL configurable + env.insert("MAKIMA_API_URL".to_string(), "http://localhost:8080".to_string()); + env.insert("MAKIMA_API_KEY".to_string(), tool_key.clone()); + env.insert("MAKIMA_TASK_ID".to_string(), task_id.to_string()); + + tracing::info!( + task_id = %task_id, + api_url = "http://localhost:8080", + tool_key_preview = &tool_key[..8.min(tool_key.len())], + "Set orchestrator environment variables" + ); + + // For orchestrator, pass instructions as SYSTEM PROMPT + let orchestrator_user_plan = format!( + "Your task:\n{}", + plan + ); + + let msg = DaemonMessage::task_output( + task_id, + "Orchestrator environment ready (makima CLI available)\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + + (Some(env), orchestrator_user_plan, Some(ORCHESTRATOR_SYSTEM_PROMPT.to_string())) + } else { + tracing::info!(task_id = %task_id, "Running as regular subtask (not orchestrator)"); + // For subtasks, pass worktree isolation instructions as system prompt + let subtask_user_plan = format!( + "Your task:\n{}", + plan + ); + (None, subtask_user_plan, Some(SUBTASK_SYSTEM_PROMPT.to_string())) + }; + + // Add contract environment if task has contract_id (skip for supervisors - they already have it) + let (extra_env, full_plan, system_prompt) = if let Some(cid) = contract_id { + if is_supervisor { + // Supervisors already have contract ID and API access set up + tracing::info!(task_id = %task_id, contract_id = %cid, "Supervisor already has contract integration"); + (extra_env, full_plan, system_prompt) + } else { + tracing::info!(task_id = %task_id, contract_id = %cid, "Setting up contract integration"); + + // Set up environment variables for makima CLI + let mut env = extra_env.unwrap_or_default(); + env.insert("MAKIMA_CONTRACT_ID".to_string(), cid.to_string()); + + // If not already an orchestrator, we need API access for makima CLI + if !is_orchestrator { + // Generate tool key for API access + let tool_key = generate_tool_key(); + tracing::info!(task_id = %task_id, "Generated tool key for contract access"); + + // Register tool key with server + let register_msg = DaemonMessage::register_tool_key(task_id, tool_key.clone()); + if self.ws_tx.send(register_msg).await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to register contract tool key"); + } + + env.insert("MAKIMA_API_URL".to_string(), "http://localhost:8080".to_string()); + env.insert("MAKIMA_API_KEY".to_string(), tool_key); + env.insert("MAKIMA_TASK_ID".to_string(), task_id.to_string()); + } + + let msg = DaemonMessage::task_output( + task_id, + "Contract integration ready (makima CLI available)\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // Prepend contract integration prompt to the plan so the task knows to use makima CLI + let contract_plan = format!( + "{}{}", + CONTRACT_INTEGRATION_PROMPT, + full_plan + ); + + (Some(env), contract_plan, system_prompt) + } + } else { + (extra_env, full_plan, system_prompt) + }; + + // Spawn Claude process + let plan_bytes = full_plan.len(); + let plan_chars = full_plan.chars().count(); + // Rough token estimate: ~4 chars per token for English + let estimated_tokens = plan_chars / 4; + + tracing::info!( + task_id = %task_id, + working_dir = %working_dir.display(), + is_orchestrator = is_orchestrator, + plan_bytes = plan_bytes, + plan_chars = plan_chars, + estimated_tokens = estimated_tokens, + "Spawning Claude process" + ); + + // Warn if plan is very large (Claude's context is typically 100k-200k tokens) + if estimated_tokens > 50_000 { + tracing::warn!(task_id = %task_id, estimated_tokens = estimated_tokens, "Plan is very large - may hit context limits!"); + let msg = DaemonMessage::task_output( + task_id, + format!("Warning: Plan is very large (~{} tokens). This may cause issues.\n", estimated_tokens), + false, + ); + let _ = self.ws_tx.send(msg).await; + } + + let msg = DaemonMessage::task_output( + task_id, + if is_orchestrator { + format!("Starting Claude Code (orchestrator mode, ~{} tokens)...\n", estimated_tokens) + } else { + format!("Starting Claude Code (~{} tokens)...\n", estimated_tokens) + }, + false, + ); + let _ = self.ws_tx.send(msg).await; + + tracing::debug!(task_id = %task_id, has_system_prompt = system_prompt.is_some(), "Calling process_manager.spawn()..."); + let mut process = self.process_manager + .spawn_with_system_prompt(&working_dir, &full_plan, extra_env, system_prompt.as_deref()) + .await + .map_err(|e| { + tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process"); + DaemonError::Task(TaskError::SetupFailed(e.to_string())) + })?; + tracing::info!(task_id = %task_id, "Claude process spawned successfully"); + + // Set up input channel for this task so we can send messages to its stdin + tracing::debug!(task_id = %task_id, "Setting up input channel..."); + let (input_tx, mut input_rx) = mpsc::channel::(100); + tracing::debug!(task_id = %task_id, "Acquiring task_inputs write lock..."); + self.task_inputs.write().await.insert(task_id, input_tx); + tracing::debug!(task_id = %task_id, "Input channel registered"); + + // Get stdin handle for input forwarding and completion signaling + let stdin_handle = process.stdin_handle(); + let stdin_handle_for_completion = stdin_handle.clone(); + + tracing::info!(task_id = %task_id, "Setting up stdin forwarder for task input (JSON protocol)"); + tokio::spawn(async move { + tracing::info!(task_id = %task_id, "Stdin forwarder task started, waiting for messages..."); + while let Some(msg) = input_rx.recv().await { + tracing::info!(task_id = %task_id, msg_len = msg.len(), msg_preview = %if msg.len() > 50 { &msg[..50] } else { &msg }, "Received message from input channel"); + + // Format as JSON user message for stream-json input protocol + let json_msg = ClaudeInputMessage::user(&msg); + let json_line = match json_msg.to_json_line() { + Ok(line) => line, + Err(e) => { + tracing::error!(task_id = %task_id, error = %e, "Failed to serialize input message"); + continue; + } + }; + + tracing::debug!(task_id = %task_id, json_line = %json_line.trim(), "Formatted JSON line for stdin"); + + let mut stdin_guard = stdin_handle.lock().await; + if let Some(ref mut stdin) = *stdin_guard { + tracing::debug!(task_id = %task_id, "Acquired stdin lock, writing..."); + if stdin.write_all(json_line.as_bytes()).await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to write to stdin, breaking"); + break; + } + if stdin.flush().await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to flush stdin, breaking"); + break; + } + tracing::info!(task_id = %task_id, json_len = json_line.len(), "Successfully wrote user message to Claude stdin"); + } else { + tracing::warn!(task_id = %task_id, "Stdin is None (already closed), cannot send message"); + break; + } + } + tracing::info!(task_id = %task_id, "Stdin forwarder task ended (channel closed or stdin unavailable)"); + }); + + // Update state to Running + { + tracing::debug!(task_id = %task_id, "Acquiring tasks write lock for Running state update"); + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.state = TaskState::Running; + task.started_at = Some(Instant::now()); + } + tracing::debug!(task_id = %task_id, "Released tasks write lock"); + } + tracing::info!(task_id = %task_id, "Updating state: Starting -> Running"); + self.send_status_change(task_id, "starting", "running").await; + tracing::debug!(task_id = %task_id, "Sent status change notification"); + + // Stream output with startup timeout check + tracing::info!(task_id = %task_id, "Starting output stream - waiting for Claude output..."); + tracing::debug!(task_id = %task_id, "Output will be forwarded via WebSocket to server"); + let ws_tx = self.ws_tx.clone(); + + // For auth error detection + let claude_command = self.process_manager.claude_command().to_string(); + let daemon_hostname = hostname::get().ok().and_then(|h| h.into_string().ok()); + let mut auth_error_handled = false; + + let mut output_count = 0u64; + let mut output_bytes = 0usize; + let startup_timeout = tokio::time::Duration::from_secs(30); + let mut startup_check = tokio::time::interval(tokio::time::Duration::from_secs(5)); + startup_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let startup_deadline = tokio::time::Instant::now() + startup_timeout; + + loop { + tokio::select! { + maybe_line = process.next_output() => { + match maybe_line { + Some(line) => { + output_count += 1; + output_bytes += line.content.len(); + + if output_count == 1 { + tracing::info!(task_id = %task_id, "Received first output line from Claude"); + } + if output_count % 100 == 0 { + tracing::debug!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output progress"); + } + + // Log output details for debugging + tracing::trace!( + task_id = %task_id, + line_num = output_count, + content_len = line.content.len(), + is_stdout = line.is_stdout, + json_type = ?line.json_type, + "Forwarding output to WebSocket" + ); + + // Check if this is a "result" message indicating task completion + // With --input-format=stream-json, Claude waits for more input after completion + // We close stdin to signal EOF and let the process exit + if line.json_type.as_deref() == Some("result") { + tracing::info!(task_id = %task_id, "Received result message, closing stdin to signal completion"); + let mut stdin_guard = stdin_handle_for_completion.lock().await; + if let Some(mut stdin) = stdin_guard.take() { + let _ = stdin.shutdown().await; + } + } + + // Check for OAuth auth error before sending output + let content_for_auth_check = line.content.clone(); + + let msg = DaemonMessage::task_output(task_id, line.content, false); + if ws_tx.send(msg).await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to send output, channel closed"); + break; + } + + // Detect OAuth token expiration and trigger remote login flow + if !auth_error_handled && is_oauth_auth_error(&content_for_auth_check) { + auth_error_handled = true; + tracing::warn!(task_id = %task_id, "OAuth authentication error detected, initiating remote login flow"); + + // Spawn claude setup-token to get login URL + if let Some(login_url) = get_oauth_login_url(&claude_command).await { + tracing::info!(task_id = %task_id, login_url = %login_url, "Got OAuth login URL"); + let auth_msg = DaemonMessage::AuthenticationRequired { + task_id: Some(task_id), + login_url, + hostname: daemon_hostname.clone(), + }; + if ws_tx.send(auth_msg).await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to send auth required message"); + } + } else { + tracing::error!(task_id = %task_id, "Failed to get OAuth login URL from setup-token"); + let fallback_msg = DaemonMessage::task_output( + task_id, + format!("Authentication required on daemon{}. Please run 'claude /login' on the daemon machine.\n", + daemon_hostname.as_ref().map(|h| format!(" ({})", h)).unwrap_or_default()), + false, + ); + let _ = ws_tx.send(fallback_msg).await; + } + } + } + None => { + tracing::info!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output stream ended"); + break; + } + } + } + _ = startup_check.tick(), if output_count == 0 => { + // Check if process is still alive + match process.try_wait() { + Ok(Some(exit_code)) => { + tracing::error!(task_id = %task_id, exit_code = exit_code, "Claude process exited before producing output!"); + let msg = DaemonMessage::task_output( + task_id, + format!("Error: Claude process exited unexpectedly with code {}\n", exit_code), + false, + ); + let _ = ws_tx.send(msg).await; + break; + } + Ok(None) => { + // Still running but no output + if tokio::time::Instant::now() > startup_deadline { + tracing::warn!(task_id = %task_id, "Claude process not producing output after 30s - may be stuck"); + let msg = DaemonMessage::task_output( + task_id, + "Warning: Claude Code is taking longer than expected to start. It may be waiting for authentication or network access.\n".to_string(), + false, + ); + let _ = ws_tx.send(msg).await; + } else { + tracing::debug!(task_id = %task_id, "Claude process still running, waiting for output..."); + } + } + Err(e) => { + tracing::error!(task_id = %task_id, error = %e, "Failed to check Claude process status"); + } + } + } + } + } + + // Wait for process to exit + let exit_code = process.wait().await.unwrap_or(-1); + + // Clean up input channel for this task + self.task_inputs.write().await.remove(&task_id); + tracing::debug!(task_id = %task_id, "Removed task input channel"); + + // Update state based on exit code + let success = exit_code == 0; + let new_state = if success { + TaskState::Completed + } else { + TaskState::Failed + }; + + tracing::info!( + task_id = %task_id, + exit_code = exit_code, + success = success, + new_state = ?new_state, + "Claude process exited, updating task state" + ); + + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.state = new_state; + task.completed_at = Some(Instant::now()); + if !success { + task.error = Some(format!("Process exited with code {}", exit_code)); + } + } + } + + // Execute completion action if task succeeded + let completion_result = if success { + if let Some(ref action) = completion_action { + if action != "none" { + self.execute_completion_action( + task_id, + &task_name, + &working_dir, + action, + target_repo_path.as_deref(), + target_branch.as_deref(), + ).await + } else { + Ok(None) + } + } else { + Ok(None) + } + } else { + Ok(None) + }; + + // Log completion action result + match &completion_result { + Ok(Some(pr_url)) => { + tracing::info!(task_id = %task_id, pr_url = %pr_url, "Completion action created PR"); + } + Ok(None) => {} + Err(e) => { + tracing::warn!(task_id = %task_id, error = %e, "Completion action failed (task still marked as done)"); + } + } + + // Notify server - but NOT for supervisors which should never complete + if is_supervisor { + tracing::info!( + task_id = %task_id, + exit_code = exit_code, + "Supervisor Claude process exited - NOT marking as complete" + ); + // Update local state to reflect it's paused/waiting for input + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.state = TaskState::Running; // Keep it as running, not completed + task.completed_at = None; + } + } + // Send a status message to let the frontend know supervisor is ready for more input + let msg = DaemonMessage::task_output( + task_id, + "\n[Supervisor ready for next instruction]\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + } else { + let error = if success { + None + } else { + Some(format!("Exit code: {}", exit_code)) + }; + tracing::info!(task_id = %task_id, success = success, "Notifying server of task completion"); + let msg = DaemonMessage::task_complete(task_id, success, error); + let _ = self.ws_tx.send(msg).await; + } + + // Note: Worktrees are kept until explicitly deleted (per user preference) + // This allows inspection, PR creation, etc. + + tracing::info!(task_id = %task_id, "=== RUN_TASK END ==="); + Ok(()) + } + + /// Execute the completion action for a task. + async fn execute_completion_action( + &self, + task_id: Uuid, + task_name: &str, + worktree_path: &std::path::Path, + action: &str, + target_repo_path: Option<&str>, + target_branch: Option<&str>, + ) -> Result, String> { + let target_repo = match target_repo_path { + Some(path) => crate::daemon::worktree::expand_tilde(path), + None => { + tracing::warn!(task_id = %task_id, "No target_repo_path configured, skipping completion action"); + return Ok(None); + } + }; + + if !target_repo.exists() { + return Err(format!("Target repo not found: {} (expanded from {:?})", target_repo.display(), target_repo_path)); + } + + // Get the branch name: makima/{task-name-with-dashes}-{short-id} + let branch_name = format!( + "makima/{}-{}", + crate::daemon::worktree::sanitize_name(task_name), + crate::daemon::worktree::short_uuid(task_id) + ); + + // Determine target branch - use provided value or detect default branch of target repo + let target_branch = match target_branch { + Some(branch) => branch.to_string(), + None => { + // Detect default branch (main, master, develop, etc.) + self.worktree_manager + .detect_default_branch(&target_repo) + .await + .unwrap_or_else(|_| "master".to_string()) + } + }; + + let msg = DaemonMessage::task_output( + task_id, + format!("Executing completion action: {}...\n", action), + false, + ); + let _ = self.ws_tx.send(msg).await; + + match action { + "branch" => { + // Just push the branch to target repo + self.worktree_manager + .push_to_target_repo(worktree_path, &target_repo, &branch_name, task_name) + .await + .map_err(|e| e.to_string())?; + + let msg = DaemonMessage::task_output( + task_id, + format!("Branch '{}' pushed to {}\n", branch_name, target_repo.display()), + false, + ); + let _ = self.ws_tx.send(msg).await; + Ok(None) + } + "merge" => { + // Push and merge into target branch + let commit_sha = self.worktree_manager + .merge_to_target(worktree_path, &target_repo, &branch_name, &target_branch, task_name) + .await + .map_err(|e| e.to_string())?; + + let msg = DaemonMessage::task_output( + task_id, + format!("Branch merged into {} (commit: {})\n", target_branch, commit_sha), + false, + ); + let _ = self.ws_tx.send(msg).await; + Ok(None) + } + "pr" => { + // Push and create PR + let title = task_name.to_string(); + let body = format!( + "Automated PR from makima task.\n\nTask ID: `{}`", + task_id + ); + let pr_url = self.worktree_manager + .create_pull_request( + worktree_path, + &target_repo, + &branch_name, + &target_branch, + &title, + &body, + ) + .await + .map_err(|e| e.to_string())?; + + let msg = DaemonMessage::task_output( + task_id, + format!("Pull request created: {}\n", pr_url), + false, + ); + let _ = self.ws_tx.send(msg).await; + Ok(Some(pr_url)) + } + _ => { + tracing::warn!(task_id = %task_id, action = %action, "Unknown completion action"); + Ok(None) + } + } + } + + /// Find worktree path for a task ID. + /// First checks in-memory tasks, then scans the worktrees directory. + async fn find_worktree_for_task(&self, task_id: Uuid) -> Result { + // First try to get from in-memory tasks + { + let tasks = self.tasks.read().await; + if let Some(task) = tasks.get(&task_id) { + if let Some(ref worktree) = task.worktree { + return Ok(worktree.path.clone()); + } + } + } + + // Task not in memory - scan worktrees directory for matching task ID + let short_id = &task_id.to_string()[..8]; + let worktrees_dir = self.worktree_manager.base_dir(); + + if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await { + while let Ok(Some(entry)) = entries.next_entry().await { + let name = entry.file_name(); + let name_str = name.to_string_lossy(); + if name_str.starts_with(short_id) { + let path = entry.path(); + // Verify it's a valid git directory + if path.join(".git").exists() { + tracing::info!( + task_id = %task_id, + worktree_path = %path.display(), + "Found worktree by scanning directory" + ); + return Ok(path); + } + } + } + } + + Err(format!( + "No worktree found for task {}. The worktree may have been cleaned up.", + task_id + )) + } + + async fn update_state(&self, task_id: Uuid, state: TaskState) { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.state = state; + } + } + + async fn send_status_change(&self, task_id: Uuid, old_status: &str, new_status: &str) { + let msg = DaemonMessage::task_status_change(task_id, old_status, new_status); + let _ = self.ws_tx.send(msg).await; + } + + /// Mark task as failed. + async fn mark_failed(&self, task_id: Uuid, error: &str) { + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.state = TaskState::Failed; + task.error = Some(error.to_string()); + task.completed_at = Some(Instant::now()); + } + } + + // Notify server + let msg = DaemonMessage::task_complete(task_id, false, Some(error.to_string())); + let _ = self.ws_tx.send(msg).await; + } +} + +impl Clone for TaskManagerInner { + fn clone(&self) -> Self { + Self { + worktree_manager: self.worktree_manager.clone(), + process_manager: self.process_manager.clone(), + temp_manager: self.temp_manager.clone(), + tasks: self.tasks.clone(), + ws_tx: self.ws_tx.clone(), + task_inputs: self.task_inputs.clone(), + } + } +} -- cgit v1.2.3