diff options
| author | soryu <soryu@soryu.co> | 2026-01-06 04:08:11 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-11 03:01:13 +0000 |
| commit | 8b17a175c3e7e27b789812eba4e3cd760beadb10 (patch) | |
| tree | 7864dcaa2fa9db47fdfd4e8bfdb0b1dde832aa33 /makima/daemon/src/task/manager.rs | |
| parent | f79c416c58557d2f946aa5332989afdfa8c021cd (diff) | |
| download | soryu-8b17a175c3e7e27b789812eba4e3cd760beadb10.tar.gz soryu-8b17a175c3e7e27b789812eba4e3cd760beadb10.zip | |
Initial Control system
Diffstat (limited to 'makima/daemon/src/task/manager.rs')
| -rw-r--r-- | makima/daemon/src/task/manager.rs | 2248 |
1 files changed, 2248 insertions, 0 deletions
diff --git a/makima/daemon/src/task/manager.rs b/makima/daemon/src/task/manager.rs new file mode 100644 index 0000000..4979ce7 --- /dev/null +++ b/makima/daemon/src/task/manager.rs @@ -0,0 +1,2248 @@ +//! Task lifecycle manager using git worktrees and Claude Code subprocesses. + +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Instant; + +use rand::Rng; +use tokio::io::AsyncWriteExt; +use tokio::sync::{mpsc, RwLock, Semaphore}; +use uuid::Uuid; + +use std::collections::HashSet; + +use super::state::TaskState; +use crate::error::{DaemonError, TaskError, TaskResult}; +use crate::process::{ClaudeInputMessage, ProcessManager}; +use crate::temp::TempManager; +use crate::worktree::{is_new_repo_request, ConflictResolution, WorktreeInfo, WorktreeManager}; +use crate::ws::{BranchInfo, DaemonCommand, DaemonMessage}; + +/// Generate a secure random API key for orchestrator tool access. +fn generate_tool_key() -> String { + let mut rng = rand::rng(); + let bytes: [u8; 32] = rng.random(); + hex::encode(bytes) +} + +/// System prompt for regular (non-orchestrator) subtasks. +/// This ensures subtasks work only within their isolated worktree directory. +const SUBTASK_SYSTEM_PROMPT: &str = r#"You are working in an isolated worktree directory that contains a snapshot of the codebase. + +## IMPORTANT: Directory Restrictions + +**You MUST only work within the current working directory (your worktree).** + +- DO NOT use `cd` to navigate to directories outside your worktree +- DO NOT use absolute paths that point outside your worktree (e.g., don't write to ~/some/path, /tmp, or the original repository) +- DO NOT modify files in parent directories or sibling directories +- All your file operations should be relative to the current directory + +Your working directory is your sandboxed workspace. When you complete your task, your changes will be reviewed and integrated by the orchestrator. + +**Why?** Your worktree is isolated so that: +1. Your changes don't affect other running tasks +2. Changes can be reviewed before integration +3. Multiple tasks can work on the codebase in parallel without conflicts + +--- + +"#; + +/// The orchestrator system prompt that tells Claude how to use the helper script. +const ORCHESTRATOR_SYSTEM_PROMPT: &str = r#"You are an orchestrator task. Your job is to coordinate subtasks and integrate their work, NOT to write code directly. + +## FIRST STEP + +Start by checking if you have existing subtasks: + +```bash +# List all subtasks to see what work needs to be done +./.makima/orchestrate.sh list +``` + +If subtasks exist, start them. If you need additional subtasks or no subtasks exist yet, you can create them. + +--- + +## Creating Subtasks + +You can create new subtasks to break down work: + +```bash +# Create a new subtask with a name and plan +./.makima/orchestrate.sh create "Subtask Name" "Detailed plan for what the subtask should do..." + +# The command returns the new subtask ID - use it to start the subtask +./.makima/orchestrate.sh start <new_subtask_id> +``` + +Create subtasks when you need to: +- Break down complex work into smaller pieces +- Run multiple tasks in parallel on different parts of the codebase +- Delegate specific implementation work + +## Task Continuation (Sequential Dependencies) + +When subtasks need to build on each other's work (e.g., Task B depends on Task A's changes), use `--continue-from`: + +```bash +# Create Task B that continues from Task A's worktree +./.makima/orchestrate.sh create "Task B" "Build on Task A's work..." --continue-from <task_a_id> +``` + +This copies all files from Task A's worktree into Task B's worktree, so Task B starts with Task A's changes. + +**When to use continuation:** +- Sequential work: Task B needs Task A's output files +- Staged implementation: Building features incrementally +- Fix-and-extend: One task fixes issues, another adds features on top + +**When NOT to use continuation:** +- Parallel tasks working on different files +- Independent subtasks that can be merged separately + +**Important for merging:** When tasks continue from each other, only merge the FINAL task in the chain. Earlier tasks' changes are already included in later tasks' worktrees. + +## Sharing Files with Subtasks + +Use `--files` to copy specific files from your orchestrator worktree to subtasks. This is useful for sharing plans, configs, or data files: + +```bash +# Create subtask with specific files copied from orchestrator +./.makima/orchestrate.sh create "Implement Feature" "Follow the plan in PLAN.md" --files "PLAN.md" + +# Copy multiple files (comma-separated) +./.makima/orchestrate.sh create "API Work" "Use the spec..." --files "PLAN.md,api-spec.yaml,types.ts" + +# Combine with --continue-from to share files AND continue from another task +./.makima/orchestrate.sh create "Step 2" "Continue..." --continue-from <task_a_id> --files "requirements.md" +``` + +**Use cases for --files:** +- Share a PLAN.md with detailed implementation steps +- Distribute configuration or spec files +- Pass generated data or intermediate results + +## How Subtasks Work + +Each subtask runs in its own **worktree** - a separate directory with a copy of the codebase. When subtasks complete: +- Their work remains in the worktree files (NOT committed to git) +- **Subtasks do NOT auto-merge** - YOU must integrate their work into your worktree +- You can view and copy files from subtask worktrees using their paths +- The worktree path is returned when you get subtask status + +**IMPORTANT:** Subtasks never create PRs or merge to the target repository. Only the orchestrator (you) can trigger completion actions like PR creation or merging after integrating all subtask work. + +## Subtask Commands +```bash +# List all subtasks and their current status +./.makima/orchestrate.sh list + +# Create a new subtask (returns the subtask ID) +./.makima/orchestrate.sh create "Name" "Plan/description" + +# Create a subtask that continues from another task's worktree +./.makima/orchestrate.sh create "Name" "Plan" --continue-from <other_task_id> + +# Create a subtask with specific files copied from orchestrator worktree +./.makima/orchestrate.sh create "Name" "Plan" --files "file1.md,file2.yaml" + +# Start a specific subtask (it will run in its own Claude instance) +./.makima/orchestrate.sh start <subtask_id> + +# Stop a running subtask +./.makima/orchestrate.sh stop <subtask_id> + +# Get detailed status of a subtask (includes worktree_path when available) +./.makima/orchestrate.sh status <subtask_id> + +# Get the output/logs of a subtask +./.makima/orchestrate.sh output <subtask_id> + +# Get the worktree path for a subtask +./.makima/orchestrate.sh worktree <subtask_id> +``` + +## Integrating Subtask Work + +When subtasks complete, their changes exist as files in their worktree directories: +- Files are NOT committed to git branches +- You must copy/integrate files from subtask worktrees into your worktree +- Use standard file operations (cp, cat, etc.) to review and integrate changes + +### Handling Continuation Chains + +**CRITICAL:** When subtasks use `--continue-from`, they form a chain where each task includes all changes from previous tasks. You must ONLY integrate the FINAL task in each chain. + +Example chain: Task A → Task B (continues from A) → Task C (continues from B) +- Task C's worktree contains ALL changes from A, B, and C +- You should ONLY integrate Task C's worktree +- DO NOT integrate Task A or Task B separately (their changes are already in C) + +**How to track continuation chains:** +1. When you create tasks with `--continue-from`, note which task continues from which +2. Build a mental model: Independent tasks (no continuation) + Continuation chains +3. For each chain, only integrate the LAST task in the chain + +**Example with mixed independent and chained tasks:** +``` +Independent tasks (integrate all): +- Task X: API endpoints +- Task Y: Database models + +Continuation chain (integrate ONLY the last one): +- Task A: Core feature → Task B: Tests (continues from A) → Task C: Docs (continues from B) + Only integrate Task C! +``` + +### Integration Examples + +For independent subtasks (no continuation): +```bash +# Get the worktree path for a completed subtask +SUBTASK_PATH=$(./.makima/orchestrate.sh worktree <subtask_id>) + +# View what files were changed +ls -la "$SUBTASK_PATH" +diff -r . "$SUBTASK_PATH" --exclude=.git --exclude=.makima + +# Copy specific files from subtask +cp "$SUBTASK_PATH/src/new_file.rs" ./src/ +cp "$SUBTASK_PATH/src/modified_file.rs" ./src/ + +# Or use diff/patch for more control +diff -u ./src/file.rs "$SUBTASK_PATH/src/file.rs" > changes.patch +patch -p0 < changes.patch +``` + +For continuation chains (only integrate the final task): +```bash +# If you have: Task A → Task B → Task C (each continues from previous) +# ONLY get and integrate Task C's worktree - it has everything! + +FINAL_TASK_PATH=$(./.makima/orchestrate.sh worktree <task_c_id>) + +# Copy all changes from the final task +rsync -av --exclude='.git' --exclude='.makima' "$FINAL_TASK_PATH/" ./ +``` + +## Completion +```bash +# Mark yourself as complete after integrating all subtask work +./.makima/orchestrate.sh done "Summary of what was accomplished" +``` + +## Workflow +1. **List existing subtasks**: Run `list` to see current subtasks +2. **Create subtasks if needed**: Use `create` to add new subtasks for the work + - For independent parallel work: create without `--continue-from` + - For sequential dependencies: use `--continue-from <previous_task_id>` + - Track which tasks continue from which (continuation chains) +3. **Start subtasks**: Run `start` for each subtask +4. **Monitor progress**: Check status and output as subtasks run +5. **Integrate work**: When subtasks complete: + - For independent tasks: integrate each one's worktree + - For continuation chains: ONLY integrate the FINAL task (it has all changes) + - Get worktree path with `worktree <subtask_id>` + - Copy or merge files into your worktree +6. **Complete**: Call `done` once all work is integrated + +## Important Notes +- Subtask files are in worktrees, NOT committed git branches +- **Subtasks do NOT auto-merge or create PRs** - you must integrate their work +- You can read files from subtask worktrees using their paths +- Use standard file tools (cp, diff, cat, rsync) to integrate changes +- You should NOT edit files directly - that's what subtasks are for +- DO NOT DO THE SUBTASKS' WORK! Your only job is to coordinate, not implement. +- When you call `done`, YOUR worktree may be used for the final PR/merge +"#; + +/// Content of the helper bash script that orchestrators use to call the API. +const ORCHESTRATE_SCRIPT: &str = r#"#!/bin/bash +# Makima Orchestrator Helper Script +# Usage: ./orchestrate.sh <command> [args...] + +API_URL="${MAKIMA_API_URL:-http://localhost:8080}" +API_KEY="${MAKIMA_API_KEY}" +TASK_ID="${MAKIMA_TASK_ID}" + +if [ -z "$API_KEY" ]; then + echo "Error: MAKIMA_API_KEY not set" >&2 + exit 1 +fi + +if [ -z "$TASK_ID" ]; then + echo "Error: MAKIMA_TASK_ID not set" >&2 + exit 1 +fi + +# Helper function to make API calls and check for errors +api_call() { + local method="$1" + local url="$2" + local data="$3" + local response + local http_code + + if [ -n "$data" ]; then + response=$(curl -s -w "\n%{http_code}" -X "$method" \ + -H "X-Makima-Tool-Key: $API_KEY" \ + -H "Content-Type: application/json" \ + -d "$data" \ + "$url") + else + response=$(curl -s -w "\n%{http_code}" -X "$method" \ + -H "X-Makima-Tool-Key: $API_KEY" \ + "$url") + fi + + # Extract HTTP code (last line) and body (everything else) + http_code=$(echo "$response" | tail -n1) + body=$(echo "$response" | sed '$d') + + # Check for curl errors or non-2xx status + if [ "$http_code" -lt 200 ] || [ "$http_code" -ge 300 ]; then + echo "Error: API request failed with HTTP $http_code" >&2 + echo "URL: $url" >&2 + echo "Response: $body" >&2 + echo "$body" + return 1 + fi + + echo "$body" + return 0 +} + +case "$1" in + list) + api_call GET "$API_URL/api/v1/mesh/tasks/$TASK_ID/subtasks" + ;; + create) + # Parse arguments: create "name" "plan" [--continue-from <task_id>] [--files "file1,file2"] + if [ -z "$2" ] || [ -z "$3" ]; then + echo "Usage: $0 create \"<name>\" \"<plan>\" [--continue-from <task_id>] [--files \"file1,file2\"]" >&2 + exit 1 + fi + NAME="$2" + PLAN="$3" + CONTINUE_FROM="" + COPY_FILES="" + + # Parse optional flags (can be in any order after name and plan) + shift 3 + while [ $# -gt 0 ]; do + case "$1" in + --continue-from) + CONTINUE_FROM="$2" + shift 2 + ;; + --files) + COPY_FILES="$2" + shift 2 + ;; + *) + echo "Unknown option: $1" >&2 + exit 1 + ;; + esac + done + + # Escape quotes in name and plan for JSON + NAME_ESCAPED=$(echo "$NAME" | sed 's/"/\\"/g' | sed 's/\\/\\\\/g') + PLAN_ESCAPED=$(echo "$PLAN" | sed 's/"/\\"/g' | sed 's/\\/\\\\/g') + + # Build JSON body + JSON_BODY="{\"name\":\"$NAME_ESCAPED\",\"plan\":\"$PLAN_ESCAPED\",\"parentTaskId\":\"$TASK_ID\"" + + if [ -n "$CONTINUE_FROM" ]; then + echo "Creating subtask: $NAME (continuing from $CONTINUE_FROM)..." >&2 + JSON_BODY="$JSON_BODY,\"continueFromTaskId\":\"$CONTINUE_FROM\"" + else + echo "Creating subtask: $NAME..." >&2 + fi + + if [ -n "$COPY_FILES" ]; then + # Convert comma-separated file list to JSON array + FILES_JSON="[" + first=true + IFS=',' read -ra FILE_ARRAY <<< "$COPY_FILES" + for file in "${FILE_ARRAY[@]}"; do + file=$(echo "$file" | xargs) # trim whitespace + if [ "$first" = true ]; then + FILES_JSON="$FILES_JSON\"$file\"" + first=false + else + FILES_JSON="$FILES_JSON,\"$file\"" + fi + done + FILES_JSON="$FILES_JSON]" + JSON_BODY="$JSON_BODY,\"copyFiles\":$FILES_JSON" + echo " (copying files: $COPY_FILES)" >&2 + fi + + JSON_BODY="$JSON_BODY}" + api_call POST "$API_URL/api/v1/mesh/tasks" "$JSON_BODY" + ;; + start) + if [ -z "$2" ]; then + echo "Usage: $0 start <subtask_id>" >&2 + exit 1 + fi + echo "Starting subtask $2..." >&2 + api_call POST "$API_URL/api/v1/mesh/tasks/$2/start" + ;; + stop) + if [ -z "$2" ]; then + echo "Usage: $0 stop <subtask_id>" >&2 + exit 1 + fi + api_call POST "$API_URL/api/v1/mesh/tasks/$2/stop" + ;; + status) + if [ -z "$2" ]; then + echo "Usage: $0 status <subtask_id>" >&2 + exit 1 + fi + api_call GET "$API_URL/api/v1/mesh/tasks/$2" + ;; + output) + if [ -z "$2" ]; then + echo "Usage: $0 output <subtask_id>" >&2 + exit 1 + fi + api_call GET "$API_URL/api/v1/mesh/tasks/$2/output" + ;; + worktree) + if [ -z "$2" ]; then + echo "Usage: $0 worktree <subtask_id>" >&2 + exit 1 + fi + # Get the worktree path from the task's overlayPath field via API + TASK_JSON=$(api_call GET "$API_URL/api/v1/mesh/tasks/$2") + if [ $? -ne 0 ]; then + echo "Error: Failed to get task info" >&2 + exit 1 + fi + WORKTREE_PATH=$(echo "$TASK_JSON" | grep -o '"overlayPath":"[^"]*"' | cut -d'"' -f4) + if [ -z "$WORKTREE_PATH" ]; then + echo "Error: Task has no worktree path (may not have started yet)" >&2 + exit 1 + fi + if [ -d "$WORKTREE_PATH" ]; then + echo "$WORKTREE_PATH" + else + echo "Error: Worktree not found at $WORKTREE_PATH" >&2 + echo "The worktree may have been cleaned up." >&2 + exit 1 + fi + ;; + done) + SUMMARY="${2:-Task completed}" + api_call PUT "$API_URL/api/v1/mesh/tasks/$TASK_ID" "{\"status\":\"done\",\"progressSummary\":\"$SUMMARY\"}" + ;; + *) + echo "Makima Orchestrator Helper" + echo "" + echo "Usage: $0 <command> [args...]" + echo "" + echo "Subtask Commands:" + echo " list List all subtasks and their status" + echo " create \"<name>\" \"<plan>\" Create a new subtask" + echo " create \"...\" --continue-from ID Create subtask continuing from another task's worktree" + echo " create \"...\" --files \"file1,file2\" Copy specific files from parent (orchestrator) worktree" + echo " start <subtask_id> Start a subtask" + echo " stop <subtask_id> Stop a running subtask" + echo " status <subtask_id> Get detailed subtask status" + echo " output <subtask_id> Get subtask output history" + echo " worktree <subtask_id> Get path to subtask's worktree" + echo "" + echo "Completion:" + echo " done [summary] Mark orchestrator as complete" + echo "" + echo "Examples:" + echo " create \"Fix bug\" \"Fix the null check bug\" --files \"PLAN.md\"" + echo " create \"Step 2\" \"Continue work\" --continue-from abc123 --files \"shared.rs,types.rs\"" + ;; +esac +"#; + +/// Tracks merge state for an orchestrator task. +#[derive(Default)] +struct MergeTracker { + /// Subtask branches that have been successfully merged. + merged_subtasks: HashSet<Uuid>, + /// Subtask branches that were explicitly skipped (with reason). + skipped_subtasks: HashMap<Uuid, String>, +} + +/// Managed task information. +pub struct ManagedTask { + /// Task ID. + pub id: Uuid, + /// Current state. + pub state: TaskState, + /// Worktree info if created. + pub worktree: Option<WorktreeInfo>, + /// Task plan. + pub plan: String, + /// Repository URL or path. + pub repo_source: Option<String>, + /// Base branch. + pub base_branch: Option<String>, + /// Target branch to merge into. + pub target_branch: Option<String>, + /// Parent task ID if this is a subtask. + pub parent_task_id: Option<Uuid>, + /// Depth in task hierarchy (0=top-level, 1=subtask, 2=sub-subtask). + pub depth: i32, + /// Whether this task runs as an orchestrator (coordinates subtasks). + pub is_orchestrator: bool, + /// Path to target repository for completion actions. + pub target_repo_path: Option<String>, + /// Completion action: "none", "branch", "merge", "pr". + pub completion_action: Option<String>, + /// Task ID to continue from (copy worktree from this task). + pub continue_from_task_id: Option<Uuid>, + /// Files to copy from parent task's worktree. + pub copy_files: Option<Vec<String>>, + /// Time task was created. + pub created_at: Instant, + /// Time task started running. + pub started_at: Option<Instant>, + /// Time task completed. + pub completed_at: Option<Instant>, + /// Error message if failed. + pub error: Option<String>, +} + +/// Configuration for task execution. +#[derive(Clone)] +pub struct TaskConfig { + /// Maximum concurrent tasks. + pub max_concurrent_tasks: u32, + /// Base directory for worktrees. + pub worktree_base_dir: PathBuf, + /// Environment variables to pass to Claude. + pub env_vars: HashMap<String, String>, + /// Claude command path. + pub claude_command: String, + /// Additional arguments to pass to Claude Code. + pub claude_args: Vec<String>, + /// Arguments to pass before defaults. + pub claude_pre_args: Vec<String>, + /// Enable Claude's permission system. + pub enable_permissions: bool, + /// Disable verbose output. + pub disable_verbose: bool, +} + +impl Default for TaskConfig { + fn default() -> Self { + Self { + max_concurrent_tasks: 4, + worktree_base_dir: WorktreeManager::default_base_dir(), + env_vars: HashMap::new(), + claude_command: "claude".to_string(), + claude_args: Vec::new(), + claude_pre_args: Vec::new(), + enable_permissions: false, + disable_verbose: false, + } + } +} + +/// Task manager for handling task lifecycle. +pub struct TaskManager { + /// Worktree manager. + worktree_manager: Arc<WorktreeManager>, + /// Process manager. + process_manager: Arc<ProcessManager>, + /// Temp directory manager. + temp_manager: Arc<TempManager>, + /// Task configuration. + #[allow(dead_code)] + config: TaskConfig, + /// Active tasks. + tasks: Arc<RwLock<HashMap<Uuid, ManagedTask>>>, + /// Channel to send messages to server. + ws_tx: mpsc::Sender<DaemonMessage>, + /// Semaphore for limiting concurrent tasks. + semaphore: Arc<Semaphore>, + /// Channels for sending input to running tasks. + /// Each sender allows sending messages to the stdin of a running Claude process. + task_inputs: Arc<RwLock<HashMap<Uuid, mpsc::Sender<String>>>>, + /// Tracks merge state per orchestrator task (for completion gate). + merge_trackers: Arc<RwLock<HashMap<Uuid, MergeTracker>>>, +} + +impl TaskManager { + /// Create a new task manager. + pub fn new(config: TaskConfig, ws_tx: mpsc::Sender<DaemonMessage>) -> Self { + let max_concurrent = config.max_concurrent_tasks as usize; + let worktree_manager = Arc::new(WorktreeManager::new(config.worktree_base_dir.clone())); + let process_manager = Arc::new( + ProcessManager::with_command(config.claude_command.clone()) + .with_args(config.claude_args.clone()) + .with_pre_args(config.claude_pre_args.clone()) + .with_permissions_enabled(config.enable_permissions) + .with_verbose_disabled(config.disable_verbose) + .with_env(config.env_vars.clone()), + ); + let temp_manager = Arc::new(TempManager::new()); + + Self { + worktree_manager, + process_manager, + temp_manager, + config, + tasks: Arc::new(RwLock::new(HashMap::new())), + ws_tx, + semaphore: Arc::new(Semaphore::new(max_concurrent)), + task_inputs: Arc::new(RwLock::new(HashMap::new())), + merge_trackers: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Handle a command from the server. + pub async fn handle_command(&self, command: DaemonCommand) -> Result<(), DaemonError> { + tracing::info!("Received command from server: {:?}", command); + + match command { + DaemonCommand::SpawnTask { + task_id, + task_name, + plan, + repo_url, + base_branch, + target_branch, + parent_task_id, + depth, + is_orchestrator, + target_repo_path, + completion_action, + continue_from_task_id, + copy_files, + } => { + tracing::info!( + task_id = %task_id, + task_name = %task_name, + repo_url = ?repo_url, + base_branch = ?base_branch, + target_branch = ?target_branch, + parent_task_id = ?parent_task_id, + depth = depth, + is_orchestrator = is_orchestrator, + target_repo_path = ?target_repo_path, + completion_action = ?completion_action, + continue_from_task_id = ?continue_from_task_id, + copy_files = ?copy_files, + plan_len = plan.len(), + "Spawning new task" + ); + self.spawn_task( + task_id, task_name, plan, repo_url, base_branch, target_branch, + parent_task_id, depth, is_orchestrator, + target_repo_path, completion_action, continue_from_task_id, + copy_files + ).await?; + } + DaemonCommand::PauseTask { task_id } => { + tracing::info!(task_id = %task_id, "Pause not supported for subprocess tasks"); + // Subprocesses don't support pause, just log and ignore + } + DaemonCommand::ResumeTask { task_id } => { + tracing::info!(task_id = %task_id, "Resume not supported for subprocess tasks"); + // Subprocesses don't support resume, just log and ignore + } + DaemonCommand::InterruptTask { task_id, graceful: _ } => { + tracing::info!(task_id = %task_id, "Interrupting task"); + self.interrupt_task(task_id).await?; + } + DaemonCommand::SendMessage { task_id, message } => { + tracing::info!(task_id = %task_id, message_len = message.len(), "Sending message to task"); + // Send message to the task's stdin via the input channel + let inputs = self.task_inputs.read().await; + if let Some(sender) = inputs.get(&task_id) { + if let Err(e) = sender.send(message).await { + tracing::warn!(task_id = %task_id, error = %e, "Failed to send message to task input channel"); + } else { + tracing::info!(task_id = %task_id, "Message sent to task successfully"); + } + } else { + tracing::warn!(task_id = %task_id, "No input channel for task (task may not be running)"); + } + } + DaemonCommand::InjectSiblingContext { task_id, .. } => { + tracing::debug!(task_id = %task_id, "Sibling context injection not supported for subprocess tasks"); + } + DaemonCommand::Authenticated { daemon_id } => { + tracing::debug!(daemon_id = %daemon_id, "Authenticated command (handled by WS client)"); + } + DaemonCommand::Error { code, message } => { + tracing::warn!(code = %code, message = %message, "Error command from server"); + } + + // ========================================================================= + // Merge Commands + // ========================================================================= + + DaemonCommand::ListBranches { task_id } => { + tracing::info!(task_id = %task_id, "Listing task branches"); + self.handle_list_branches(task_id).await?; + } + DaemonCommand::MergeStart { task_id, source_branch } => { + tracing::info!(task_id = %task_id, source_branch = %source_branch, "Starting merge"); + self.handle_merge_start(task_id, source_branch).await?; + } + DaemonCommand::MergeStatus { task_id } => { + tracing::info!(task_id = %task_id, "Getting merge status"); + self.handle_merge_status(task_id).await?; + } + DaemonCommand::MergeResolve { task_id, file, strategy } => { + tracing::info!(task_id = %task_id, file = %file, strategy = %strategy, "Resolving conflict"); + self.handle_merge_resolve(task_id, file, strategy).await?; + } + DaemonCommand::MergeCommit { task_id, message } => { + tracing::info!(task_id = %task_id, "Committing merge"); + self.handle_merge_commit(task_id, message).await?; + } + DaemonCommand::MergeAbort { task_id } => { + tracing::info!(task_id = %task_id, "Aborting merge"); + self.handle_merge_abort(task_id).await?; + } + DaemonCommand::MergeSkip { task_id, subtask_id, reason } => { + tracing::info!(task_id = %task_id, subtask_id = %subtask_id, reason = %reason, "Skipping subtask merge"); + self.handle_merge_skip(task_id, subtask_id, reason).await?; + } + DaemonCommand::CheckMergeComplete { task_id } => { + tracing::info!(task_id = %task_id, "Checking merge completion"); + self.handle_check_merge_complete(task_id).await?; + } + + // ========================================================================= + // Completion Action Commands + // ========================================================================= + + DaemonCommand::RetryCompletionAction { + task_id, + task_name, + action, + target_repo_path, + target_branch, + } => { + tracing::info!( + task_id = %task_id, + task_name = %task_name, + action = %action, + target_repo_path = %target_repo_path, + target_branch = ?target_branch, + "Retrying completion action" + ); + self.handle_retry_completion_action(task_id, task_name, action, target_repo_path, target_branch).await?; + } + + DaemonCommand::CloneWorktree { task_id, target_dir } => { + tracing::info!( + task_id = %task_id, + target_dir = %target_dir, + "Cloning worktree to target directory" + ); + self.handle_clone_worktree(task_id, target_dir).await?; + } + + DaemonCommand::CheckTargetExists { task_id, target_dir } => { + tracing::debug!( + task_id = %task_id, + target_dir = %target_dir, + "Checking if target directory exists" + ); + self.handle_check_target_exists(task_id, target_dir).await?; + } + } + Ok(()) + } + + /// Spawn a new task. + #[allow(clippy::too_many_arguments)] + pub async fn spawn_task( + &self, + task_id: Uuid, + task_name: String, + plan: String, + repo_url: Option<String>, + base_branch: Option<String>, + target_branch: Option<String>, + parent_task_id: Option<Uuid>, + depth: i32, + is_orchestrator: bool, + target_repo_path: Option<String>, + completion_action: Option<String>, + continue_from_task_id: Option<Uuid>, + copy_files: Option<Vec<String>>, + ) -> TaskResult<()> { + tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, depth = depth, "=== SPAWN_TASK START ==="); + + // Check if task already exists - allow re-spawning if in terminal state + { + let mut tasks = self.tasks.write().await; + if let Some(existing) = tasks.get(&task_id) { + if existing.state.is_terminal() { + // Task exists but is in terminal state (completed, failed, interrupted) + // Remove it so we can re-spawn + tracing::info!(task_id = %task_id, old_state = ?existing.state, "Removing terminated task to allow re-spawn"); + tasks.remove(&task_id); + } else { + // Task is still active, reject + tracing::warn!(task_id = %task_id, state = ?existing.state, "Task already exists and is active, rejecting spawn"); + return Err(TaskError::AlreadyExists(task_id)); + } + } + } + + // Acquire semaphore permit + tracing::info!(task_id = %task_id, "Acquiring concurrency permit..."); + let permit = self + .semaphore + .clone() + .try_acquire_owned() + .map_err(|_| { + tracing::warn!(task_id = %task_id, "Concurrency limit reached, cannot spawn task"); + TaskError::ConcurrencyLimit + })?; + tracing::info!(task_id = %task_id, "Concurrency permit acquired"); + + // Create task entry + tracing::info!(task_id = %task_id, "Creating task entry in state: Initializing"); + let task = ManagedTask { + id: task_id, + state: TaskState::Initializing, + worktree: None, + plan: plan.clone(), + repo_source: repo_url.clone(), + base_branch: base_branch.clone(), + target_branch: target_branch.clone(), + parent_task_id, + depth, + is_orchestrator, + target_repo_path: target_repo_path.clone(), + completion_action: completion_action.clone(), + continue_from_task_id, + copy_files: copy_files.clone(), + created_at: Instant::now(), + started_at: None, + completed_at: None, + error: None, + }; + + self.tasks.write().await.insert(task_id, task); + tracing::info!(task_id = %task_id, "Task entry created and stored"); + + // Notify server of status change + tracing::info!(task_id = %task_id, "Notifying server: pending -> initializing"); + self.send_status_change(task_id, "pending", "initializing").await; + + // Spawn task in background + tracing::info!(task_id = %task_id, "Spawning background task runner"); + let inner = self.clone_inner(); + tokio::spawn(async move { + let _permit = permit; // Hold permit until done + tracing::info!(task_id = %task_id, "Background task runner started"); + + if let Err(e) = inner.run_task( + task_id, task_name, plan, repo_url, base_branch, target_branch, + is_orchestrator, target_repo_path, completion_action, + continue_from_task_id, copy_files + ).await { + tracing::error!(task_id = %task_id, error = %e, "Task execution failed"); + inner.mark_failed(task_id, &e.to_string()).await; + } + tracing::info!(task_id = %task_id, "Background task runner completed"); + }); + + tracing::info!(task_id = %task_id, "=== SPAWN_TASK END (task running in background) ==="); + Ok(()) + } + + /// Clone inner state for spawned tasks. + fn clone_inner(&self) -> TaskManagerInner { + TaskManagerInner { + worktree_manager: self.worktree_manager.clone(), + process_manager: self.process_manager.clone(), + temp_manager: self.temp_manager.clone(), + tasks: self.tasks.clone(), + ws_tx: self.ws_tx.clone(), + task_inputs: self.task_inputs.clone(), + } + } + + /// Interrupt a task. + pub async fn interrupt_task(&self, task_id: Uuid) -> TaskResult<()> { + let mut tasks = self.tasks.write().await; + let task = tasks.get_mut(&task_id).ok_or(TaskError::NotFound(task_id))?; + + if task.state.is_terminal() { + return Ok(()); // Already done + } + + let old_state = task.state; + task.state = TaskState::Interrupted; + task.completed_at = Some(Instant::now()); + + // Notify server + drop(tasks); + self.send_status_change(task_id, old_state.as_str(), "interrupted").await; + + // Note: The process will be killed when the ClaudeProcess is dropped + // Worktrees are kept until explicitly deleted + + Ok(()) + } + + /// Get list of active task IDs. + pub async fn active_task_ids(&self) -> Vec<Uuid> { + self.tasks + .read() + .await + .iter() + .filter(|(_, t)| t.state.is_active()) + .map(|(id, _)| *id) + .collect() + } + + /// Get task state. + pub async fn get_task_state(&self, task_id: Uuid) -> Option<TaskState> { + self.tasks.read().await.get(&task_id).map(|t| t.state) + } + + /// Send status change notification to server. + async fn send_status_change(&self, task_id: Uuid, old_status: &str, new_status: &str) { + let msg = DaemonMessage::task_status_change(task_id, old_status, new_status); + let _ = self.ws_tx.send(msg).await; + } + + // ========================================================================= + // Merge Handler Methods + // ========================================================================= + + /// Get worktree path for a task, or return error if not found. + /// First checks in-memory tasks, then scans the worktrees directory. + async fn get_task_worktree_path(&self, task_id: Uuid) -> Result<std::path::PathBuf, DaemonError> { + // First try to get from in-memory tasks + { + let tasks = self.tasks.read().await; + if let Some(task) = tasks.get(&task_id) { + if let Some(ref worktree) = task.worktree { + return Ok(worktree.path.clone()); + } + } + } + + // Task not in memory - scan worktrees directory for matching task ID + let short_id = &task_id.to_string()[..8]; + let worktrees_dir = self.worktree_manager.base_dir(); + + if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await { + while let Ok(Some(entry)) = entries.next_entry().await { + let name = entry.file_name(); + let name_str = name.to_string_lossy(); + if name_str.starts_with(short_id) { + let path = entry.path(); + // Verify it's a valid git directory + if path.join(".git").exists() { + tracing::info!( + task_id = %task_id, + worktree_path = %path.display(), + "Found worktree by scanning directory" + ); + return Ok(path); + } + } + } + } + + Err(DaemonError::Task(TaskError::SetupFailed( + format!("No worktree found for task {}. The worktree may have been cleaned up.", task_id) + ))) + } + + /// Handle ListBranches command. + async fn handle_list_branches(&self, task_id: Uuid) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + match self.worktree_manager.list_task_branches(&worktree_path).await { + Ok(branches) => { + let branch_infos: Vec<BranchInfo> = branches + .into_iter() + .map(|b| BranchInfo { + name: b.name, + task_id: b.task_id, + is_merged: b.is_merged, + last_commit: b.last_commit, + last_commit_message: b.last_commit_message, + }) + .collect(); + + let msg = DaemonMessage::BranchList { + task_id, + branches: branch_infos, + }; + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + tracing::error!(task_id = %task_id, error = %e, "Failed to list branches"); + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: e.to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + } + Ok(()) + } + + /// Handle MergeStart command. + async fn handle_merge_start(&self, task_id: Uuid, source_branch: String) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + match self.worktree_manager.merge_branch(&worktree_path, &source_branch).await { + Ok(None) => { + // Merge succeeded without conflicts + let msg = DaemonMessage::MergeResult { + task_id, + success: true, + message: "Merge completed without conflicts".to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + Ok(Some(conflicts)) => { + // Merge has conflicts + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: format!("Merge has {} conflicts", conflicts.len()), + commit_sha: None, + conflicts: Some(conflicts), + }; + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: e.to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + } + Ok(()) + } + + /// Handle MergeStatus command. + async fn handle_merge_status(&self, task_id: Uuid) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + match self.worktree_manager.get_merge_state(&worktree_path).await { + Ok(state) => { + let msg = DaemonMessage::MergeStatusResponse { + task_id, + in_progress: state.in_progress, + source_branch: if state.in_progress { Some(state.source_branch) } else { None }, + conflicted_files: state.conflicted_files, + }; + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + tracing::error!(task_id = %task_id, error = %e, "Failed to get merge status"); + let msg = DaemonMessage::MergeStatusResponse { + task_id, + in_progress: false, + source_branch: None, + conflicted_files: vec![], + }; + let _ = self.ws_tx.send(msg).await; + } + } + Ok(()) + } + + /// Handle MergeResolve command. + async fn handle_merge_resolve(&self, task_id: Uuid, file: String, strategy: String) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + let resolution = match strategy.to_lowercase().as_str() { + "ours" => ConflictResolution::Ours, + "theirs" => ConflictResolution::Theirs, + _ => { + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: format!("Invalid strategy '{}', must be 'ours' or 'theirs'", strategy), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + return Ok(()); + } + }; + + match self.worktree_manager.resolve_conflict(&worktree_path, &file, resolution).await { + Ok(()) => { + let msg = DaemonMessage::MergeResult { + task_id, + success: true, + message: format!("Resolved conflict in {}", file), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: e.to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + } + Ok(()) + } + + /// Handle MergeCommit command. + async fn handle_merge_commit(&self, task_id: Uuid, message: String) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + match self.worktree_manager.commit_merge(&worktree_path, &message).await { + Ok(commit_sha) => { + // Track this merge as completed (extract subtask ID from branch if possible) + // For now, we'll track it when MergeSkip is called or based on branch names + let msg = DaemonMessage::MergeResult { + task_id, + success: true, + message: "Merge committed successfully".to_string(), + commit_sha: Some(commit_sha), + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: e.to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + } + Ok(()) + } + + /// Handle MergeAbort command. + async fn handle_merge_abort(&self, task_id: Uuid) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + match self.worktree_manager.abort_merge(&worktree_path).await { + Ok(()) => { + let msg = DaemonMessage::MergeResult { + task_id, + success: true, + message: "Merge aborted".to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: e.to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + } + Ok(()) + } + + /// Handle MergeSkip command. + async fn handle_merge_skip(&self, task_id: Uuid, subtask_id: Uuid, reason: String) -> Result<(), DaemonError> { + // Record that this subtask was skipped + { + let mut trackers = self.merge_trackers.write().await; + let tracker = trackers.entry(task_id).or_insert_with(MergeTracker::default); + tracker.skipped_subtasks.insert(subtask_id, reason.clone()); + } + + let msg = DaemonMessage::MergeResult { + task_id, + success: true, + message: format!("Subtask {} skipped: {}", subtask_id, reason), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Handle CheckMergeComplete command. + async fn handle_check_merge_complete(&self, task_id: Uuid) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + // Get all task branches + let branches = match self.worktree_manager.list_task_branches(&worktree_path).await { + Ok(b) => b, + Err(e) => { + let msg = DaemonMessage::MergeCompleteCheck { + task_id, + can_complete: false, + unmerged_branches: vec![format!("Error listing branches: {}", e)], + merged_count: 0, + skipped_count: 0, + }; + let _ = self.ws_tx.send(msg).await; + return Ok(()); + } + }; + + // Get tracker state + let trackers = self.merge_trackers.read().await; + let empty_merged: HashSet<Uuid> = HashSet::new(); + let empty_skipped: HashMap<Uuid, String> = HashMap::new(); + let tracker = trackers.get(&task_id); + let merged_set = tracker.map(|t| &t.merged_subtasks).unwrap_or(&empty_merged); + let skipped_set = tracker.map(|t| &t.skipped_subtasks).unwrap_or(&empty_skipped); + + let mut merged_count = 0u32; + let mut skipped_count = 0u32; + let mut unmerged_branches = Vec::new(); + + for branch in &branches { + if branch.is_merged { + merged_count += 1; + } else if let Some(subtask_id) = branch.task_id { + if merged_set.contains(&subtask_id) { + merged_count += 1; + } else if skipped_set.contains_key(&subtask_id) { + skipped_count += 1; + } else { + unmerged_branches.push(branch.name.clone()); + } + } else { + // Branch without task ID - check if it's merged + unmerged_branches.push(branch.name.clone()); + } + } + + let can_complete = unmerged_branches.is_empty(); + + let msg = DaemonMessage::MergeCompleteCheck { + task_id, + can_complete, + unmerged_branches, + merged_count, + skipped_count, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Mark a subtask as merged in the tracker. + #[allow(dead_code)] + pub async fn mark_subtask_merged(&self, orchestrator_task_id: Uuid, subtask_id: Uuid) { + let mut trackers = self.merge_trackers.write().await; + let tracker = trackers.entry(orchestrator_task_id).or_insert_with(MergeTracker::default); + tracker.merged_subtasks.insert(subtask_id); + } + + // ========================================================================= + // Completion Action Handler Methods + // ========================================================================= + + /// Handle RetryCompletionAction command. + async fn handle_retry_completion_action( + &self, + task_id: Uuid, + task_name: String, + action: String, + target_repo_path: String, + target_branch: Option<String>, + ) -> Result<(), DaemonError> { + // Get the task's worktree path + let worktree_path = self.get_task_worktree_path(task_id).await?; + + // Execute the completion action + let inner = self.clone_inner(); + let result = inner.execute_completion_action( + task_id, + &task_name, + &worktree_path, + &action, + Some(target_repo_path.as_str()), + target_branch.as_deref(), + ).await; + + // Send result back to server + let msg = match result { + Ok(pr_url) => DaemonMessage::CompletionActionResult { + task_id, + success: true, + message: match action.as_str() { + "branch" => format!("Branch pushed to {}", target_repo_path), + "merge" => format!("Merged into {}", target_branch.as_deref().unwrap_or("main")), + "pr" => format!("Pull request created"), + _ => format!("Completion action '{}' executed", action), + }, + pr_url, + }, + Err(e) => DaemonMessage::CompletionActionResult { + task_id, + success: false, + message: e, + pr_url: None, + }, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Handle CloneWorktree command. + async fn handle_clone_worktree( + &self, + task_id: Uuid, + target_dir: String, + ) -> Result<(), DaemonError> { + // Get the task's worktree path + let worktree_path = self.get_task_worktree_path(task_id).await?; + + // Expand tilde in target path + let target_path = crate::worktree::expand_tilde(&target_dir); + + // Clone the worktree to target directory + let result = self.worktree_manager.clone_worktree_to_directory( + &worktree_path, + &target_path, + ).await; + + // Send result back to server + let msg = match result { + Ok(message) => DaemonMessage::CloneWorktreeResult { + task_id, + success: true, + message, + target_dir: Some(target_path.to_string_lossy().to_string()), + }, + Err(e) => DaemonMessage::CloneWorktreeResult { + task_id, + success: false, + message: e.to_string(), + target_dir: None, + }, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Handle CheckTargetExists command. + async fn handle_check_target_exists( + &self, + task_id: Uuid, + target_dir: String, + ) -> Result<(), DaemonError> { + // Expand tilde in target path + let target_path = crate::worktree::expand_tilde(&target_dir); + + // Check if target exists + let exists = self.worktree_manager.target_directory_exists(&target_path).await; + + // Send result back to server + let msg = DaemonMessage::CheckTargetExistsResult { + task_id, + exists, + target_dir: target_path.to_string_lossy().to_string(), + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } +} + +/// Inner state for spawned tasks (cloneable). +struct TaskManagerInner { + worktree_manager: Arc<WorktreeManager>, + process_manager: Arc<ProcessManager>, + temp_manager: Arc<TempManager>, + tasks: Arc<RwLock<HashMap<Uuid, ManagedTask>>>, + ws_tx: mpsc::Sender<DaemonMessage>, + task_inputs: Arc<RwLock<HashMap<Uuid, mpsc::Sender<String>>>>, +} + +impl TaskManagerInner { + /// Run a task to completion. + #[allow(clippy::too_many_arguments)] + async fn run_task( + &self, + task_id: Uuid, + task_name: String, + plan: String, + repo_source: Option<String>, + base_branch: Option<String>, + target_branch: Option<String>, + is_orchestrator: bool, + target_repo_path: Option<String>, + completion_action: Option<String>, + continue_from_task_id: Option<Uuid>, + copy_files: Option<Vec<String>>, + ) -> Result<(), DaemonError> { + tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, "=== RUN_TASK START ==="); + + // Determine working directory + let working_dir = if let Some(ref source) = repo_source { + if is_new_repo_request(source) { + // Explicit new repo request: new:// or new://project-name + tracing::info!( + task_id = %task_id, + source = %source, + "Creating new git repository" + ); + + let msg = DaemonMessage::task_output( + task_id, + format!("Initializing new git repository...\n"), + false, + ); + let _ = self.ws_tx.send(msg).await; + + let worktree_info = self.worktree_manager + .init_new_repo(task_id, source) + .await + .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?; + + tracing::info!( + task_id = %task_id, + path = %worktree_info.path.display(), + "New repository created" + ); + + // Store worktree info + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.worktree = Some(worktree_info.clone()); + } + } + + let msg = DaemonMessage::task_output( + task_id, + format!("Repository ready at {}\n", worktree_info.path.display()), + false, + ); + let _ = self.ws_tx.send(msg).await; + + worktree_info.path + } else { + // Send progress message + let msg = DaemonMessage::task_output( + task_id, + format!("Setting up worktree from {}...\n", source), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // Ensure source repo exists (clone if URL, verify if path) + let source_repo = self.worktree_manager.ensure_repo(source).await + .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?; + + // Detect or use provided base branch + let branch = if let Some(ref b) = base_branch { + b.clone() + } else { + self.worktree_manager.detect_default_branch(&source_repo).await + .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))? + }; + + tracing::info!( + task_id = %task_id, + source = %source, + branch = %branch, + continue_from_task_id = ?continue_from_task_id, + "Setting up worktree" + ); + + // Create worktree - either from scratch or copying from another task + let task_name = format!("task-{}", &task_id.to_string()[..8]); + let worktree_info = if let Some(from_task_id) = continue_from_task_id { + // Find the source task's worktree path + let source_worktree = self.find_worktree_for_task(from_task_id).await + .map_err(|e| DaemonError::Task(TaskError::SetupFailed( + format!("Cannot continue from task {}: {}", from_task_id, e) + )))?; + + let msg = DaemonMessage::task_output( + task_id, + format!("Continuing from task {} worktree...\n", &from_task_id.to_string()[..8]), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // Create worktree by copying from source task + self.worktree_manager + .create_worktree_from_task(&source_worktree, task_id, &task_name) + .await + .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))? + } else { + // Create fresh worktree from repo + self.worktree_manager + .create_worktree(&source_repo, task_id, &task_name, &branch) + .await + .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))? + }; + + tracing::info!( + task_id = %task_id, + worktree_path = %worktree_info.path.display(), + branch = %worktree_info.branch, + continued_from = ?continue_from_task_id, + "Worktree created" + ); + + // Store worktree info + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.worktree = Some(worktree_info.clone()); + } + } + + let msg = DaemonMessage::task_output( + task_id, + format!("Worktree ready at {}\n", worktree_info.path.display()), + false, + ); + let _ = self.ws_tx.send(msg).await; + + worktree_info.path + } + } else { + // No repo specified - use managed temp directory in ~/.makima/temp/ + tracing::info!(task_id = %task_id, "Creating managed temp directory (no repo)"); + + let msg = DaemonMessage::task_output( + task_id, + "Creating temporary working directory...\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + + let temp_dir = self.temp_manager.create_task_dir(task_id).await?; + + let msg = DaemonMessage::task_output( + task_id, + format!("Working directory ready at {}\n", temp_dir.display()), + false, + ); + let _ = self.ws_tx.send(msg).await; + + temp_dir + }; + + // Copy files from parent task's worktree if specified + if let Some(ref files) = copy_files { + if !files.is_empty() { + // Get the parent task ID to find its worktree + let parent_task_id = { + let tasks = self.tasks.read().await; + tasks.get(&task_id).and_then(|t| t.parent_task_id) + }; + + if let Some(parent_id) = parent_task_id { + match self.find_worktree_for_task(parent_id).await { + Ok(parent_worktree) => { + let msg = DaemonMessage::task_output( + task_id, + format!("Copying {} files from orchestrator...\n", files.len()), + false, + ); + let _ = self.ws_tx.send(msg).await; + + for file_path in files { + let source = parent_worktree.join(file_path); + let dest = working_dir.join(file_path); + + // Create parent directories if needed + if let Some(parent) = dest.parent() { + if let Err(e) = tokio::fs::create_dir_all(parent).await { + tracing::warn!( + task_id = %task_id, + file = %file_path, + error = %e, + "Failed to create parent directory for file" + ); + continue; + } + } + + // Copy the file + match tokio::fs::copy(&source, &dest).await { + Ok(_) => { + tracing::info!( + task_id = %task_id, + source = %source.display(), + dest = %dest.display(), + "Copied file from orchestrator" + ); + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + source = %source.display(), + dest = %dest.display(), + error = %e, + "Failed to copy file from orchestrator" + ); + // Notify but don't fail - the file might be optional + let msg = DaemonMessage::task_output( + task_id, + format!("Warning: Could not copy {}: {}\n", file_path, e), + false, + ); + let _ = self.ws_tx.send(msg).await; + } + } + } + + let msg = DaemonMessage::task_output( + task_id, + "Files copied from orchestrator.\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + parent_id = %parent_id, + error = %e, + "Could not find parent task worktree for file copying" + ); + } + } + } else { + tracing::warn!( + task_id = %task_id, + "copy_files specified but no parent_task_id" + ); + } + } + } + + // Update state to Starting + tracing::info!(task_id = %task_id, "Updating state: Initializing -> Starting"); + self.update_state(task_id, TaskState::Starting).await; + self.send_status_change(task_id, "initializing", "starting").await; + + // Check Claude is available + match self.process_manager.check_claude_available().await { + Ok(version) => { + tracing::info!(task_id = %task_id, version = %version, "Claude Code available"); + let msg = DaemonMessage::task_output( + task_id, + format!("Claude Code {} ready\n", version), + false, + ); + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + let err_msg = format!("Claude Code not available: {}", e); + tracing::error!(task_id = %task_id, error = %err_msg); + return Err(DaemonError::Task(TaskError::SetupFailed(err_msg))); + } + } + + // Set up orchestrator mode if needed + let (extra_env, full_plan) = if is_orchestrator { + tracing::info!(task_id = %task_id, working_dir = %working_dir.display(), "Setting up orchestrator mode"); + + let msg = DaemonMessage::task_output( + task_id, + "Setting up orchestrator environment...\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // Generate tool key for API access + let tool_key = generate_tool_key(); + tracing::info!(task_id = %task_id, tool_key_len = tool_key.len(), "Generated tool key for orchestrator"); + + // Register tool key with server + let register_msg = DaemonMessage::register_tool_key(task_id, tool_key.clone()); + if self.ws_tx.send(register_msg).await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to register tool key"); + } else { + tracing::info!(task_id = %task_id, "Tool key registration message sent to server"); + } + + // Create .makima directory and helper script + let makima_dir = working_dir.join(".makima"); + if let Err(e) = tokio::fs::create_dir_all(&makima_dir).await { + tracing::warn!(task_id = %task_id, makima_dir = %makima_dir.display(), "Failed to create .makima directory: {}", e); + } else { + tracing::info!(task_id = %task_id, makima_dir = %makima_dir.display(), "Created .makima directory"); + } + + let script_path = makima_dir.join("orchestrate.sh"); + if let Err(e) = tokio::fs::write(&script_path, ORCHESTRATE_SCRIPT).await { + tracing::warn!(task_id = %task_id, script_path = %script_path.display(), "Failed to write orchestrate.sh: {}", e); + } else { + tracing::info!(task_id = %task_id, script_path = %script_path.display(), script_size = ORCHESTRATE_SCRIPT.len(), "Wrote orchestrate.sh"); + // Make script executable + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + if let Err(e) = std::fs::set_permissions(&script_path, std::fs::Permissions::from_mode(0o755)) { + tracing::warn!(task_id = %task_id, "Failed to set script permissions: {}", e); + } else { + tracing::info!(task_id = %task_id, "Set orchestrate.sh executable (0o755)"); + } + } + } + + // Set up environment variables + let mut env = HashMap::new(); + // TODO: Make API URL configurable + env.insert("MAKIMA_API_URL".to_string(), "http://localhost:8080".to_string()); + env.insert("MAKIMA_API_KEY".to_string(), tool_key.clone()); + env.insert("MAKIMA_TASK_ID".to_string(), task_id.to_string()); + + tracing::info!( + task_id = %task_id, + api_url = "http://localhost:8080", + tool_key_preview = &tool_key[..8.min(tool_key.len())], + "Set orchestrator environment variables" + ); + + // Prepend orchestrator instructions to the plan + let orchestrator_plan = format!( + "{}\n\n---\n\nYour task:\n{}", + ORCHESTRATOR_SYSTEM_PROMPT, + plan + ); + + let msg = DaemonMessage::task_output( + task_id, + format!("Orchestrator environment ready (script at {})\n", script_path.display()), + false, + ); + let _ = self.ws_tx.send(msg).await; + + (Some(env), orchestrator_plan) + } else { + tracing::info!(task_id = %task_id, "Running as regular subtask (not orchestrator)"); + // Prepend subtask instructions to ensure worktree isolation + let subtask_plan = format!( + "{}\nYour task:\n{}", + SUBTASK_SYSTEM_PROMPT, + plan + ); + (None, subtask_plan) + }; + + // Spawn Claude process + let plan_bytes = full_plan.len(); + let plan_chars = full_plan.chars().count(); + // Rough token estimate: ~4 chars per token for English + let estimated_tokens = plan_chars / 4; + + tracing::info!( + task_id = %task_id, + working_dir = %working_dir.display(), + is_orchestrator = is_orchestrator, + plan_bytes = plan_bytes, + plan_chars = plan_chars, + estimated_tokens = estimated_tokens, + "Spawning Claude process" + ); + + // Warn if plan is very large (Claude's context is typically 100k-200k tokens) + if estimated_tokens > 50_000 { + tracing::warn!(task_id = %task_id, estimated_tokens = estimated_tokens, "Plan is very large - may hit context limits!"); + let msg = DaemonMessage::task_output( + task_id, + format!("Warning: Plan is very large (~{} tokens). This may cause issues.\n", estimated_tokens), + false, + ); + let _ = self.ws_tx.send(msg).await; + } + + let msg = DaemonMessage::task_output( + task_id, + if is_orchestrator { + format!("Starting Claude Code (orchestrator mode, ~{} tokens)...\n", estimated_tokens) + } else { + format!("Starting Claude Code (~{} tokens)...\n", estimated_tokens) + }, + false, + ); + let _ = self.ws_tx.send(msg).await; + + tracing::debug!(task_id = %task_id, "Calling process_manager.spawn()..."); + let mut process = self.process_manager + .spawn(&working_dir, &full_plan, extra_env) + .await + .map_err(|e| { + tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process"); + DaemonError::Task(TaskError::SetupFailed(e.to_string())) + })?; + tracing::info!(task_id = %task_id, "Claude process spawned successfully"); + + // Set up input channel for this task so we can send messages to its stdin + tracing::debug!(task_id = %task_id, "Setting up input channel..."); + let (input_tx, mut input_rx) = mpsc::channel::<String>(100); + tracing::debug!(task_id = %task_id, "Acquiring task_inputs write lock..."); + self.task_inputs.write().await.insert(task_id, input_tx); + tracing::debug!(task_id = %task_id, "Input channel registered"); + + // Get stdin handle for input forwarding and completion signaling + let stdin_handle = process.stdin_handle(); + let stdin_handle_for_completion = stdin_handle.clone(); + + tracing::info!(task_id = %task_id, "Setting up stdin forwarder for task input (JSON protocol)"); + tokio::spawn(async move { + tracing::info!(task_id = %task_id, "Stdin forwarder task started, waiting for messages..."); + while let Some(msg) = input_rx.recv().await { + tracing::info!(task_id = %task_id, msg_len = msg.len(), msg_preview = %if msg.len() > 50 { &msg[..50] } else { &msg }, "Received message from input channel"); + + // Format as JSON user message for stream-json input protocol + let json_msg = ClaudeInputMessage::user(&msg); + let json_line = match json_msg.to_json_line() { + Ok(line) => line, + Err(e) => { + tracing::error!(task_id = %task_id, error = %e, "Failed to serialize input message"); + continue; + } + }; + + tracing::debug!(task_id = %task_id, json_line = %json_line.trim(), "Formatted JSON line for stdin"); + + let mut stdin_guard = stdin_handle.lock().await; + if let Some(ref mut stdin) = *stdin_guard { + tracing::debug!(task_id = %task_id, "Acquired stdin lock, writing..."); + if stdin.write_all(json_line.as_bytes()).await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to write to stdin, breaking"); + break; + } + if stdin.flush().await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to flush stdin, breaking"); + break; + } + tracing::info!(task_id = %task_id, json_len = json_line.len(), "Successfully wrote user message to Claude stdin"); + } else { + tracing::warn!(task_id = %task_id, "Stdin is None (already closed), cannot send message"); + break; + } + } + tracing::info!(task_id = %task_id, "Stdin forwarder task ended (channel closed or stdin unavailable)"); + }); + + // Update state to Running + { + tracing::debug!(task_id = %task_id, "Acquiring tasks write lock for Running state update"); + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.state = TaskState::Running; + task.started_at = Some(Instant::now()); + } + tracing::debug!(task_id = %task_id, "Released tasks write lock"); + } + tracing::info!(task_id = %task_id, "Updating state: Starting -> Running"); + self.send_status_change(task_id, "starting", "running").await; + tracing::debug!(task_id = %task_id, "Sent status change notification"); + + // Stream output with startup timeout check + tracing::info!(task_id = %task_id, "Starting output stream - waiting for Claude output..."); + tracing::debug!(task_id = %task_id, "Output will be forwarded via WebSocket to server"); + let ws_tx = self.ws_tx.clone(); + + let mut output_count = 0u64; + let mut output_bytes = 0usize; + let startup_timeout = tokio::time::Duration::from_secs(30); + let mut startup_check = tokio::time::interval(tokio::time::Duration::from_secs(5)); + startup_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let startup_deadline = tokio::time::Instant::now() + startup_timeout; + + loop { + tokio::select! { + maybe_line = process.next_output() => { + match maybe_line { + Some(line) => { + output_count += 1; + output_bytes += line.content.len(); + + if output_count == 1 { + tracing::info!(task_id = %task_id, "Received first output line from Claude"); + } + if output_count % 100 == 0 { + tracing::debug!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output progress"); + } + + // Log output details for debugging + tracing::trace!( + task_id = %task_id, + line_num = output_count, + content_len = line.content.len(), + is_stdout = line.is_stdout, + json_type = ?line.json_type, + "Forwarding output to WebSocket" + ); + + // Check if this is a "result" message indicating task completion + // With --input-format=stream-json, Claude waits for more input after completion + // We close stdin to signal EOF and let the process exit + if line.json_type.as_deref() == Some("result") { + tracing::info!(task_id = %task_id, "Received result message, closing stdin to signal completion"); + let mut stdin_guard = stdin_handle_for_completion.lock().await; + if let Some(mut stdin) = stdin_guard.take() { + let _ = stdin.shutdown().await; + } + } + + let msg = DaemonMessage::task_output(task_id, line.content, false); + if ws_tx.send(msg).await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to send output, channel closed"); + break; + } + } + None => { + tracing::info!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output stream ended"); + break; + } + } + } + _ = startup_check.tick(), if output_count == 0 => { + // Check if process is still alive + match process.try_wait() { + Ok(Some(exit_code)) => { + tracing::error!(task_id = %task_id, exit_code = exit_code, "Claude process exited before producing output!"); + let msg = DaemonMessage::task_output( + task_id, + format!("Error: Claude process exited unexpectedly with code {}\n", exit_code), + false, + ); + let _ = ws_tx.send(msg).await; + break; + } + Ok(None) => { + // Still running but no output + if tokio::time::Instant::now() > startup_deadline { + tracing::warn!(task_id = %task_id, "Claude process not producing output after 30s - may be stuck"); + let msg = DaemonMessage::task_output( + task_id, + "Warning: Claude Code is taking longer than expected to start. It may be waiting for authentication or network access.\n".to_string(), + false, + ); + let _ = ws_tx.send(msg).await; + } else { + tracing::debug!(task_id = %task_id, "Claude process still running, waiting for output..."); + } + } + Err(e) => { + tracing::error!(task_id = %task_id, error = %e, "Failed to check Claude process status"); + } + } + } + } + } + + // Wait for process to exit + let exit_code = process.wait().await.unwrap_or(-1); + + // Clean up input channel for this task + self.task_inputs.write().await.remove(&task_id); + tracing::debug!(task_id = %task_id, "Removed task input channel"); + + // Update state based on exit code + let success = exit_code == 0; + let new_state = if success { + TaskState::Completed + } else { + TaskState::Failed + }; + + tracing::info!( + task_id = %task_id, + exit_code = exit_code, + success = success, + new_state = ?new_state, + "Claude process exited, updating task state" + ); + + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.state = new_state; + task.completed_at = Some(Instant::now()); + if !success { + task.error = Some(format!("Process exited with code {}", exit_code)); + } + } + } + + // Execute completion action if task succeeded + let completion_result = if success { + if let Some(ref action) = completion_action { + if action != "none" { + self.execute_completion_action( + task_id, + &task_name, + &working_dir, + action, + target_repo_path.as_deref(), + target_branch.as_deref(), + ).await + } else { + Ok(None) + } + } else { + Ok(None) + } + } else { + Ok(None) + }; + + // Log completion action result + match &completion_result { + Ok(Some(pr_url)) => { + tracing::info!(task_id = %task_id, pr_url = %pr_url, "Completion action created PR"); + } + Ok(None) => {} + Err(e) => { + tracing::warn!(task_id = %task_id, error = %e, "Completion action failed (task still marked as done)"); + } + } + + // Notify server + let error = if success { + None + } else { + Some(format!("Exit code: {}", exit_code)) + }; + tracing::info!(task_id = %task_id, success = success, "Notifying server of task completion"); + let msg = DaemonMessage::task_complete(task_id, success, error); + let _ = self.ws_tx.send(msg).await; + + // Note: Worktrees are kept until explicitly deleted (per user preference) + // This allows inspection, PR creation, etc. + + tracing::info!(task_id = %task_id, "=== RUN_TASK END ==="); + Ok(()) + } + + /// Execute the completion action for a task. + async fn execute_completion_action( + &self, + task_id: Uuid, + task_name: &str, + worktree_path: &std::path::Path, + action: &str, + target_repo_path: Option<&str>, + target_branch: Option<&str>, + ) -> Result<Option<String>, String> { + let target_repo = match target_repo_path { + Some(path) => crate::worktree::expand_tilde(path), + None => { + tracing::warn!(task_id = %task_id, "No target_repo_path configured, skipping completion action"); + return Ok(None); + } + }; + + if !target_repo.exists() { + return Err(format!("Target repo not found: {} (expanded from {:?})", target_repo.display(), target_repo_path)); + } + + // Get the branch name: makima/{task-name-with-dashes}-{short-id} + let branch_name = format!( + "makima/{}-{}", + crate::worktree::sanitize_name(task_name), + crate::worktree::short_uuid(task_id) + ); + + // Determine target branch - use provided value or detect default branch of target repo + let target_branch = match target_branch { + Some(branch) => branch.to_string(), + None => { + // Detect default branch (main, master, develop, etc.) + self.worktree_manager + .detect_default_branch(&target_repo) + .await + .unwrap_or_else(|_| "master".to_string()) + } + }; + + let msg = DaemonMessage::task_output( + task_id, + format!("Executing completion action: {}...\n", action), + false, + ); + let _ = self.ws_tx.send(msg).await; + + match action { + "branch" => { + // Just push the branch to target repo + self.worktree_manager + .push_to_target_repo(worktree_path, &target_repo, &branch_name, task_name) + .await + .map_err(|e| e.to_string())?; + + let msg = DaemonMessage::task_output( + task_id, + format!("Branch '{}' pushed to {}\n", branch_name, target_repo.display()), + false, + ); + let _ = self.ws_tx.send(msg).await; + Ok(None) + } + "merge" => { + // Push and merge into target branch + let commit_sha = self.worktree_manager + .merge_to_target(worktree_path, &target_repo, &branch_name, &target_branch, task_name) + .await + .map_err(|e| e.to_string())?; + + let msg = DaemonMessage::task_output( + task_id, + format!("Branch merged into {} (commit: {})\n", target_branch, commit_sha), + false, + ); + let _ = self.ws_tx.send(msg).await; + Ok(None) + } + "pr" => { + // Push and create PR + let title = task_name.to_string(); + let body = format!( + "Automated PR from makima task.\n\nTask ID: `{}`", + task_id + ); + let pr_url = self.worktree_manager + .create_pull_request( + worktree_path, + &target_repo, + &branch_name, + &target_branch, + &title, + &body, + ) + .await + .map_err(|e| e.to_string())?; + + let msg = DaemonMessage::task_output( + task_id, + format!("Pull request created: {}\n", pr_url), + false, + ); + let _ = self.ws_tx.send(msg).await; + Ok(Some(pr_url)) + } + _ => { + tracing::warn!(task_id = %task_id, action = %action, "Unknown completion action"); + Ok(None) + } + } + } + + /// Find worktree path for a task ID. + /// First checks in-memory tasks, then scans the worktrees directory. + async fn find_worktree_for_task(&self, task_id: Uuid) -> Result<PathBuf, String> { + // First try to get from in-memory tasks + { + let tasks = self.tasks.read().await; + if let Some(task) = tasks.get(&task_id) { + if let Some(ref worktree) = task.worktree { + return Ok(worktree.path.clone()); + } + } + } + + // Task not in memory - scan worktrees directory for matching task ID + let short_id = &task_id.to_string()[..8]; + let worktrees_dir = self.worktree_manager.base_dir(); + + if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await { + while let Ok(Some(entry)) = entries.next_entry().await { + let name = entry.file_name(); + let name_str = name.to_string_lossy(); + if name_str.starts_with(short_id) { + let path = entry.path(); + // Verify it's a valid git directory + if path.join(".git").exists() { + tracing::info!( + task_id = %task_id, + worktree_path = %path.display(), + "Found worktree by scanning directory" + ); + return Ok(path); + } + } + } + } + + Err(format!( + "No worktree found for task {}. The worktree may have been cleaned up.", + task_id + )) + } + + async fn update_state(&self, task_id: Uuid, state: TaskState) { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.state = state; + } + } + + async fn send_status_change(&self, task_id: Uuid, old_status: &str, new_status: &str) { + let msg = DaemonMessage::task_status_change(task_id, old_status, new_status); + let _ = self.ws_tx.send(msg).await; + } + + /// Mark task as failed. + async fn mark_failed(&self, task_id: Uuid, error: &str) { + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.state = TaskState::Failed; + task.error = Some(error.to_string()); + task.completed_at = Some(Instant::now()); + } + } + + // Notify server + let msg = DaemonMessage::task_complete(task_id, false, Some(error.to_string())); + let _ = self.ws_tx.send(msg).await; + } +} + +impl Clone for TaskManagerInner { + fn clone(&self) -> Self { + Self { + worktree_manager: self.worktree_manager.clone(), + process_manager: self.process_manager.clone(), + temp_manager: self.temp_manager.clone(), + tasks: self.tasks.clone(), + ws_tx: self.ws_tx.clone(), + task_inputs: self.task_inputs.clone(), + } + } +} |
