summaryrefslogtreecommitdiff
path: root/makima/daemon/src/task
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-11 05:52:14 +0000
committersoryu <soryu@soryu.co>2026-01-15 00:21:16 +0000
commit87044a747b47bd83249d61a45842c7f7b2eae56d (patch)
treeef2000ce79ffcc2723ef841acef5aa1deb1d5378 /makima/daemon/src/task
parent077820c4167c168072d217a1b01df840463a12a8 (diff)
downloadsoryu-87044a747b47bd83249d61a45842c7f7b2eae56d.tar.gz
soryu-87044a747b47bd83249d61a45842c7f7b2eae56d.zip
Contract system
Diffstat (limited to 'makima/daemon/src/task')
-rw-r--r--makima/daemon/src/task/manager.rs2248
-rw-r--r--makima/daemon/src/task/mod.rs7
-rw-r--r--makima/daemon/src/task/state.rs161
3 files changed, 0 insertions, 2416 deletions
diff --git a/makima/daemon/src/task/manager.rs b/makima/daemon/src/task/manager.rs
deleted file mode 100644
index 4979ce7..0000000
--- a/makima/daemon/src/task/manager.rs
+++ /dev/null
@@ -1,2248 +0,0 @@
-//! Task lifecycle manager using git worktrees and Claude Code subprocesses.
-
-use std::collections::HashMap;
-use std::path::PathBuf;
-use std::sync::Arc;
-use std::time::Instant;
-
-use rand::Rng;
-use tokio::io::AsyncWriteExt;
-use tokio::sync::{mpsc, RwLock, Semaphore};
-use uuid::Uuid;
-
-use std::collections::HashSet;
-
-use super::state::TaskState;
-use crate::error::{DaemonError, TaskError, TaskResult};
-use crate::process::{ClaudeInputMessage, ProcessManager};
-use crate::temp::TempManager;
-use crate::worktree::{is_new_repo_request, ConflictResolution, WorktreeInfo, WorktreeManager};
-use crate::ws::{BranchInfo, DaemonCommand, DaemonMessage};
-
-/// Generate a secure random API key for orchestrator tool access.
-fn generate_tool_key() -> String {
- let mut rng = rand::rng();
- let bytes: [u8; 32] = rng.random();
- hex::encode(bytes)
-}
-
-/// System prompt for regular (non-orchestrator) subtasks.
-/// This ensures subtasks work only within their isolated worktree directory.
-const SUBTASK_SYSTEM_PROMPT: &str = r#"You are working in an isolated worktree directory that contains a snapshot of the codebase.
-
-## IMPORTANT: Directory Restrictions
-
-**You MUST only work within the current working directory (your worktree).**
-
-- DO NOT use `cd` to navigate to directories outside your worktree
-- DO NOT use absolute paths that point outside your worktree (e.g., don't write to ~/some/path, /tmp, or the original repository)
-- DO NOT modify files in parent directories or sibling directories
-- All your file operations should be relative to the current directory
-
-Your working directory is your sandboxed workspace. When you complete your task, your changes will be reviewed and integrated by the orchestrator.
-
-**Why?** Your worktree is isolated so that:
-1. Your changes don't affect other running tasks
-2. Changes can be reviewed before integration
-3. Multiple tasks can work on the codebase in parallel without conflicts
-
----
-
-"#;
-
-/// The orchestrator system prompt that tells Claude how to use the helper script.
-const ORCHESTRATOR_SYSTEM_PROMPT: &str = r#"You are an orchestrator task. Your job is to coordinate subtasks and integrate their work, NOT to write code directly.
-
-## FIRST STEP
-
-Start by checking if you have existing subtasks:
-
-```bash
-# List all subtasks to see what work needs to be done
-./.makima/orchestrate.sh list
-```
-
-If subtasks exist, start them. If you need additional subtasks or no subtasks exist yet, you can create them.
-
----
-
-## Creating Subtasks
-
-You can create new subtasks to break down work:
-
-```bash
-# Create a new subtask with a name and plan
-./.makima/orchestrate.sh create "Subtask Name" "Detailed plan for what the subtask should do..."
-
-# The command returns the new subtask ID - use it to start the subtask
-./.makima/orchestrate.sh start <new_subtask_id>
-```
-
-Create subtasks when you need to:
-- Break down complex work into smaller pieces
-- Run multiple tasks in parallel on different parts of the codebase
-- Delegate specific implementation work
-
-## Task Continuation (Sequential Dependencies)
-
-When subtasks need to build on each other's work (e.g., Task B depends on Task A's changes), use `--continue-from`:
-
-```bash
-# Create Task B that continues from Task A's worktree
-./.makima/orchestrate.sh create "Task B" "Build on Task A's work..." --continue-from <task_a_id>
-```
-
-This copies all files from Task A's worktree into Task B's worktree, so Task B starts with Task A's changes.
-
-**When to use continuation:**
-- Sequential work: Task B needs Task A's output files
-- Staged implementation: Building features incrementally
-- Fix-and-extend: One task fixes issues, another adds features on top
-
-**When NOT to use continuation:**
-- Parallel tasks working on different files
-- Independent subtasks that can be merged separately
-
-**Important for merging:** When tasks continue from each other, only merge the FINAL task in the chain. Earlier tasks' changes are already included in later tasks' worktrees.
-
-## Sharing Files with Subtasks
-
-Use `--files` to copy specific files from your orchestrator worktree to subtasks. This is useful for sharing plans, configs, or data files:
-
-```bash
-# Create subtask with specific files copied from orchestrator
-./.makima/orchestrate.sh create "Implement Feature" "Follow the plan in PLAN.md" --files "PLAN.md"
-
-# Copy multiple files (comma-separated)
-./.makima/orchestrate.sh create "API Work" "Use the spec..." --files "PLAN.md,api-spec.yaml,types.ts"
-
-# Combine with --continue-from to share files AND continue from another task
-./.makima/orchestrate.sh create "Step 2" "Continue..." --continue-from <task_a_id> --files "requirements.md"
-```
-
-**Use cases for --files:**
-- Share a PLAN.md with detailed implementation steps
-- Distribute configuration or spec files
-- Pass generated data or intermediate results
-
-## How Subtasks Work
-
-Each subtask runs in its own **worktree** - a separate directory with a copy of the codebase. When subtasks complete:
-- Their work remains in the worktree files (NOT committed to git)
-- **Subtasks do NOT auto-merge** - YOU must integrate their work into your worktree
-- You can view and copy files from subtask worktrees using their paths
-- The worktree path is returned when you get subtask status
-
-**IMPORTANT:** Subtasks never create PRs or merge to the target repository. Only the orchestrator (you) can trigger completion actions like PR creation or merging after integrating all subtask work.
-
-## Subtask Commands
-```bash
-# List all subtasks and their current status
-./.makima/orchestrate.sh list
-
-# Create a new subtask (returns the subtask ID)
-./.makima/orchestrate.sh create "Name" "Plan/description"
-
-# Create a subtask that continues from another task's worktree
-./.makima/orchestrate.sh create "Name" "Plan" --continue-from <other_task_id>
-
-# Create a subtask with specific files copied from orchestrator worktree
-./.makima/orchestrate.sh create "Name" "Plan" --files "file1.md,file2.yaml"
-
-# Start a specific subtask (it will run in its own Claude instance)
-./.makima/orchestrate.sh start <subtask_id>
-
-# Stop a running subtask
-./.makima/orchestrate.sh stop <subtask_id>
-
-# Get detailed status of a subtask (includes worktree_path when available)
-./.makima/orchestrate.sh status <subtask_id>
-
-# Get the output/logs of a subtask
-./.makima/orchestrate.sh output <subtask_id>
-
-# Get the worktree path for a subtask
-./.makima/orchestrate.sh worktree <subtask_id>
-```
-
-## Integrating Subtask Work
-
-When subtasks complete, their changes exist as files in their worktree directories:
-- Files are NOT committed to git branches
-- You must copy/integrate files from subtask worktrees into your worktree
-- Use standard file operations (cp, cat, etc.) to review and integrate changes
-
-### Handling Continuation Chains
-
-**CRITICAL:** When subtasks use `--continue-from`, they form a chain where each task includes all changes from previous tasks. You must ONLY integrate the FINAL task in each chain.
-
-Example chain: Task A → Task B (continues from A) → Task C (continues from B)
-- Task C's worktree contains ALL changes from A, B, and C
-- You should ONLY integrate Task C's worktree
-- DO NOT integrate Task A or Task B separately (their changes are already in C)
-
-**How to track continuation chains:**
-1. When you create tasks with `--continue-from`, note which task continues from which
-2. Build a mental model: Independent tasks (no continuation) + Continuation chains
-3. For each chain, only integrate the LAST task in the chain
-
-**Example with mixed independent and chained tasks:**
-```
-Independent tasks (integrate all):
-- Task X: API endpoints
-- Task Y: Database models
-
-Continuation chain (integrate ONLY the last one):
-- Task A: Core feature → Task B: Tests (continues from A) → Task C: Docs (continues from B)
- Only integrate Task C!
-```
-
-### Integration Examples
-
-For independent subtasks (no continuation):
-```bash
-# Get the worktree path for a completed subtask
-SUBTASK_PATH=$(./.makima/orchestrate.sh worktree <subtask_id>)
-
-# View what files were changed
-ls -la "$SUBTASK_PATH"
-diff -r . "$SUBTASK_PATH" --exclude=.git --exclude=.makima
-
-# Copy specific files from subtask
-cp "$SUBTASK_PATH/src/new_file.rs" ./src/
-cp "$SUBTASK_PATH/src/modified_file.rs" ./src/
-
-# Or use diff/patch for more control
-diff -u ./src/file.rs "$SUBTASK_PATH/src/file.rs" > changes.patch
-patch -p0 < changes.patch
-```
-
-For continuation chains (only integrate the final task):
-```bash
-# If you have: Task A → Task B → Task C (each continues from previous)
-# ONLY get and integrate Task C's worktree - it has everything!
-
-FINAL_TASK_PATH=$(./.makima/orchestrate.sh worktree <task_c_id>)
-
-# Copy all changes from the final task
-rsync -av --exclude='.git' --exclude='.makima' "$FINAL_TASK_PATH/" ./
-```
-
-## Completion
-```bash
-# Mark yourself as complete after integrating all subtask work
-./.makima/orchestrate.sh done "Summary of what was accomplished"
-```
-
-## Workflow
-1. **List existing subtasks**: Run `list` to see current subtasks
-2. **Create subtasks if needed**: Use `create` to add new subtasks for the work
- - For independent parallel work: create without `--continue-from`
- - For sequential dependencies: use `--continue-from <previous_task_id>`
- - Track which tasks continue from which (continuation chains)
-3. **Start subtasks**: Run `start` for each subtask
-4. **Monitor progress**: Check status and output as subtasks run
-5. **Integrate work**: When subtasks complete:
- - For independent tasks: integrate each one's worktree
- - For continuation chains: ONLY integrate the FINAL task (it has all changes)
- - Get worktree path with `worktree <subtask_id>`
- - Copy or merge files into your worktree
-6. **Complete**: Call `done` once all work is integrated
-
-## Important Notes
-- Subtask files are in worktrees, NOT committed git branches
-- **Subtasks do NOT auto-merge or create PRs** - you must integrate their work
-- You can read files from subtask worktrees using their paths
-- Use standard file tools (cp, diff, cat, rsync) to integrate changes
-- You should NOT edit files directly - that's what subtasks are for
-- DO NOT DO THE SUBTASKS' WORK! Your only job is to coordinate, not implement.
-- When you call `done`, YOUR worktree may be used for the final PR/merge
-"#;
-
-/// Content of the helper bash script that orchestrators use to call the API.
-const ORCHESTRATE_SCRIPT: &str = r#"#!/bin/bash
-# Makima Orchestrator Helper Script
-# Usage: ./orchestrate.sh <command> [args...]
-
-API_URL="${MAKIMA_API_URL:-http://localhost:8080}"
-API_KEY="${MAKIMA_API_KEY}"
-TASK_ID="${MAKIMA_TASK_ID}"
-
-if [ -z "$API_KEY" ]; then
- echo "Error: MAKIMA_API_KEY not set" >&2
- exit 1
-fi
-
-if [ -z "$TASK_ID" ]; then
- echo "Error: MAKIMA_TASK_ID not set" >&2
- exit 1
-fi
-
-# Helper function to make API calls and check for errors
-api_call() {
- local method="$1"
- local url="$2"
- local data="$3"
- local response
- local http_code
-
- if [ -n "$data" ]; then
- response=$(curl -s -w "\n%{http_code}" -X "$method" \
- -H "X-Makima-Tool-Key: $API_KEY" \
- -H "Content-Type: application/json" \
- -d "$data" \
- "$url")
- else
- response=$(curl -s -w "\n%{http_code}" -X "$method" \
- -H "X-Makima-Tool-Key: $API_KEY" \
- "$url")
- fi
-
- # Extract HTTP code (last line) and body (everything else)
- http_code=$(echo "$response" | tail -n1)
- body=$(echo "$response" | sed '$d')
-
- # Check for curl errors or non-2xx status
- if [ "$http_code" -lt 200 ] || [ "$http_code" -ge 300 ]; then
- echo "Error: API request failed with HTTP $http_code" >&2
- echo "URL: $url" >&2
- echo "Response: $body" >&2
- echo "$body"
- return 1
- fi
-
- echo "$body"
- return 0
-}
-
-case "$1" in
- list)
- api_call GET "$API_URL/api/v1/mesh/tasks/$TASK_ID/subtasks"
- ;;
- create)
- # Parse arguments: create "name" "plan" [--continue-from <task_id>] [--files "file1,file2"]
- if [ -z "$2" ] || [ -z "$3" ]; then
- echo "Usage: $0 create \"<name>\" \"<plan>\" [--continue-from <task_id>] [--files \"file1,file2\"]" >&2
- exit 1
- fi
- NAME="$2"
- PLAN="$3"
- CONTINUE_FROM=""
- COPY_FILES=""
-
- # Parse optional flags (can be in any order after name and plan)
- shift 3
- while [ $# -gt 0 ]; do
- case "$1" in
- --continue-from)
- CONTINUE_FROM="$2"
- shift 2
- ;;
- --files)
- COPY_FILES="$2"
- shift 2
- ;;
- *)
- echo "Unknown option: $1" >&2
- exit 1
- ;;
- esac
- done
-
- # Escape quotes in name and plan for JSON
- NAME_ESCAPED=$(echo "$NAME" | sed 's/"/\\"/g' | sed 's/\\/\\\\/g')
- PLAN_ESCAPED=$(echo "$PLAN" | sed 's/"/\\"/g' | sed 's/\\/\\\\/g')
-
- # Build JSON body
- JSON_BODY="{\"name\":\"$NAME_ESCAPED\",\"plan\":\"$PLAN_ESCAPED\",\"parentTaskId\":\"$TASK_ID\""
-
- if [ -n "$CONTINUE_FROM" ]; then
- echo "Creating subtask: $NAME (continuing from $CONTINUE_FROM)..." >&2
- JSON_BODY="$JSON_BODY,\"continueFromTaskId\":\"$CONTINUE_FROM\""
- else
- echo "Creating subtask: $NAME..." >&2
- fi
-
- if [ -n "$COPY_FILES" ]; then
- # Convert comma-separated file list to JSON array
- FILES_JSON="["
- first=true
- IFS=',' read -ra FILE_ARRAY <<< "$COPY_FILES"
- for file in "${FILE_ARRAY[@]}"; do
- file=$(echo "$file" | xargs) # trim whitespace
- if [ "$first" = true ]; then
- FILES_JSON="$FILES_JSON\"$file\""
- first=false
- else
- FILES_JSON="$FILES_JSON,\"$file\""
- fi
- done
- FILES_JSON="$FILES_JSON]"
- JSON_BODY="$JSON_BODY,\"copyFiles\":$FILES_JSON"
- echo " (copying files: $COPY_FILES)" >&2
- fi
-
- JSON_BODY="$JSON_BODY}"
- api_call POST "$API_URL/api/v1/mesh/tasks" "$JSON_BODY"
- ;;
- start)
- if [ -z "$2" ]; then
- echo "Usage: $0 start <subtask_id>" >&2
- exit 1
- fi
- echo "Starting subtask $2..." >&2
- api_call POST "$API_URL/api/v1/mesh/tasks/$2/start"
- ;;
- stop)
- if [ -z "$2" ]; then
- echo "Usage: $0 stop <subtask_id>" >&2
- exit 1
- fi
- api_call POST "$API_URL/api/v1/mesh/tasks/$2/stop"
- ;;
- status)
- if [ -z "$2" ]; then
- echo "Usage: $0 status <subtask_id>" >&2
- exit 1
- fi
- api_call GET "$API_URL/api/v1/mesh/tasks/$2"
- ;;
- output)
- if [ -z "$2" ]; then
- echo "Usage: $0 output <subtask_id>" >&2
- exit 1
- fi
- api_call GET "$API_URL/api/v1/mesh/tasks/$2/output"
- ;;
- worktree)
- if [ -z "$2" ]; then
- echo "Usage: $0 worktree <subtask_id>" >&2
- exit 1
- fi
- # Get the worktree path from the task's overlayPath field via API
- TASK_JSON=$(api_call GET "$API_URL/api/v1/mesh/tasks/$2")
- if [ $? -ne 0 ]; then
- echo "Error: Failed to get task info" >&2
- exit 1
- fi
- WORKTREE_PATH=$(echo "$TASK_JSON" | grep -o '"overlayPath":"[^"]*"' | cut -d'"' -f4)
- if [ -z "$WORKTREE_PATH" ]; then
- echo "Error: Task has no worktree path (may not have started yet)" >&2
- exit 1
- fi
- if [ -d "$WORKTREE_PATH" ]; then
- echo "$WORKTREE_PATH"
- else
- echo "Error: Worktree not found at $WORKTREE_PATH" >&2
- echo "The worktree may have been cleaned up." >&2
- exit 1
- fi
- ;;
- done)
- SUMMARY="${2:-Task completed}"
- api_call PUT "$API_URL/api/v1/mesh/tasks/$TASK_ID" "{\"status\":\"done\",\"progressSummary\":\"$SUMMARY\"}"
- ;;
- *)
- echo "Makima Orchestrator Helper"
- echo ""
- echo "Usage: $0 <command> [args...]"
- echo ""
- echo "Subtask Commands:"
- echo " list List all subtasks and their status"
- echo " create \"<name>\" \"<plan>\" Create a new subtask"
- echo " create \"...\" --continue-from ID Create subtask continuing from another task's worktree"
- echo " create \"...\" --files \"file1,file2\" Copy specific files from parent (orchestrator) worktree"
- echo " start <subtask_id> Start a subtask"
- echo " stop <subtask_id> Stop a running subtask"
- echo " status <subtask_id> Get detailed subtask status"
- echo " output <subtask_id> Get subtask output history"
- echo " worktree <subtask_id> Get path to subtask's worktree"
- echo ""
- echo "Completion:"
- echo " done [summary] Mark orchestrator as complete"
- echo ""
- echo "Examples:"
- echo " create \"Fix bug\" \"Fix the null check bug\" --files \"PLAN.md\""
- echo " create \"Step 2\" \"Continue work\" --continue-from abc123 --files \"shared.rs,types.rs\""
- ;;
-esac
-"#;
-
-/// Tracks merge state for an orchestrator task.
-#[derive(Default)]
-struct MergeTracker {
- /// Subtask branches that have been successfully merged.
- merged_subtasks: HashSet<Uuid>,
- /// Subtask branches that were explicitly skipped (with reason).
- skipped_subtasks: HashMap<Uuid, String>,
-}
-
-/// Managed task information.
-pub struct ManagedTask {
- /// Task ID.
- pub id: Uuid,
- /// Current state.
- pub state: TaskState,
- /// Worktree info if created.
- pub worktree: Option<WorktreeInfo>,
- /// Task plan.
- pub plan: String,
- /// Repository URL or path.
- pub repo_source: Option<String>,
- /// Base branch.
- pub base_branch: Option<String>,
- /// Target branch to merge into.
- pub target_branch: Option<String>,
- /// Parent task ID if this is a subtask.
- pub parent_task_id: Option<Uuid>,
- /// Depth in task hierarchy (0=top-level, 1=subtask, 2=sub-subtask).
- pub depth: i32,
- /// Whether this task runs as an orchestrator (coordinates subtasks).
- pub is_orchestrator: bool,
- /// Path to target repository for completion actions.
- pub target_repo_path: Option<String>,
- /// Completion action: "none", "branch", "merge", "pr".
- pub completion_action: Option<String>,
- /// Task ID to continue from (copy worktree from this task).
- pub continue_from_task_id: Option<Uuid>,
- /// Files to copy from parent task's worktree.
- pub copy_files: Option<Vec<String>>,
- /// Time task was created.
- pub created_at: Instant,
- /// Time task started running.
- pub started_at: Option<Instant>,
- /// Time task completed.
- pub completed_at: Option<Instant>,
- /// Error message if failed.
- pub error: Option<String>,
-}
-
-/// Configuration for task execution.
-#[derive(Clone)]
-pub struct TaskConfig {
- /// Maximum concurrent tasks.
- pub max_concurrent_tasks: u32,
- /// Base directory for worktrees.
- pub worktree_base_dir: PathBuf,
- /// Environment variables to pass to Claude.
- pub env_vars: HashMap<String, String>,
- /// Claude command path.
- pub claude_command: String,
- /// Additional arguments to pass to Claude Code.
- pub claude_args: Vec<String>,
- /// Arguments to pass before defaults.
- pub claude_pre_args: Vec<String>,
- /// Enable Claude's permission system.
- pub enable_permissions: bool,
- /// Disable verbose output.
- pub disable_verbose: bool,
-}
-
-impl Default for TaskConfig {
- fn default() -> Self {
- Self {
- max_concurrent_tasks: 4,
- worktree_base_dir: WorktreeManager::default_base_dir(),
- env_vars: HashMap::new(),
- claude_command: "claude".to_string(),
- claude_args: Vec::new(),
- claude_pre_args: Vec::new(),
- enable_permissions: false,
- disable_verbose: false,
- }
- }
-}
-
-/// Task manager for handling task lifecycle.
-pub struct TaskManager {
- /// Worktree manager.
- worktree_manager: Arc<WorktreeManager>,
- /// Process manager.
- process_manager: Arc<ProcessManager>,
- /// Temp directory manager.
- temp_manager: Arc<TempManager>,
- /// Task configuration.
- #[allow(dead_code)]
- config: TaskConfig,
- /// Active tasks.
- tasks: Arc<RwLock<HashMap<Uuid, ManagedTask>>>,
- /// Channel to send messages to server.
- ws_tx: mpsc::Sender<DaemonMessage>,
- /// Semaphore for limiting concurrent tasks.
- semaphore: Arc<Semaphore>,
- /// Channels for sending input to running tasks.
- /// Each sender allows sending messages to the stdin of a running Claude process.
- task_inputs: Arc<RwLock<HashMap<Uuid, mpsc::Sender<String>>>>,
- /// Tracks merge state per orchestrator task (for completion gate).
- merge_trackers: Arc<RwLock<HashMap<Uuid, MergeTracker>>>,
-}
-
-impl TaskManager {
- /// Create a new task manager.
- pub fn new(config: TaskConfig, ws_tx: mpsc::Sender<DaemonMessage>) -> Self {
- let max_concurrent = config.max_concurrent_tasks as usize;
- let worktree_manager = Arc::new(WorktreeManager::new(config.worktree_base_dir.clone()));
- let process_manager = Arc::new(
- ProcessManager::with_command(config.claude_command.clone())
- .with_args(config.claude_args.clone())
- .with_pre_args(config.claude_pre_args.clone())
- .with_permissions_enabled(config.enable_permissions)
- .with_verbose_disabled(config.disable_verbose)
- .with_env(config.env_vars.clone()),
- );
- let temp_manager = Arc::new(TempManager::new());
-
- Self {
- worktree_manager,
- process_manager,
- temp_manager,
- config,
- tasks: Arc::new(RwLock::new(HashMap::new())),
- ws_tx,
- semaphore: Arc::new(Semaphore::new(max_concurrent)),
- task_inputs: Arc::new(RwLock::new(HashMap::new())),
- merge_trackers: Arc::new(RwLock::new(HashMap::new())),
- }
- }
-
- /// Handle a command from the server.
- pub async fn handle_command(&self, command: DaemonCommand) -> Result<(), DaemonError> {
- tracing::info!("Received command from server: {:?}", command);
-
- match command {
- DaemonCommand::SpawnTask {
- task_id,
- task_name,
- plan,
- repo_url,
- base_branch,
- target_branch,
- parent_task_id,
- depth,
- is_orchestrator,
- target_repo_path,
- completion_action,
- continue_from_task_id,
- copy_files,
- } => {
- tracing::info!(
- task_id = %task_id,
- task_name = %task_name,
- repo_url = ?repo_url,
- base_branch = ?base_branch,
- target_branch = ?target_branch,
- parent_task_id = ?parent_task_id,
- depth = depth,
- is_orchestrator = is_orchestrator,
- target_repo_path = ?target_repo_path,
- completion_action = ?completion_action,
- continue_from_task_id = ?continue_from_task_id,
- copy_files = ?copy_files,
- plan_len = plan.len(),
- "Spawning new task"
- );
- self.spawn_task(
- task_id, task_name, plan, repo_url, base_branch, target_branch,
- parent_task_id, depth, is_orchestrator,
- target_repo_path, completion_action, continue_from_task_id,
- copy_files
- ).await?;
- }
- DaemonCommand::PauseTask { task_id } => {
- tracing::info!(task_id = %task_id, "Pause not supported for subprocess tasks");
- // Subprocesses don't support pause, just log and ignore
- }
- DaemonCommand::ResumeTask { task_id } => {
- tracing::info!(task_id = %task_id, "Resume not supported for subprocess tasks");
- // Subprocesses don't support resume, just log and ignore
- }
- DaemonCommand::InterruptTask { task_id, graceful: _ } => {
- tracing::info!(task_id = %task_id, "Interrupting task");
- self.interrupt_task(task_id).await?;
- }
- DaemonCommand::SendMessage { task_id, message } => {
- tracing::info!(task_id = %task_id, message_len = message.len(), "Sending message to task");
- // Send message to the task's stdin via the input channel
- let inputs = self.task_inputs.read().await;
- if let Some(sender) = inputs.get(&task_id) {
- if let Err(e) = sender.send(message).await {
- tracing::warn!(task_id = %task_id, error = %e, "Failed to send message to task input channel");
- } else {
- tracing::info!(task_id = %task_id, "Message sent to task successfully");
- }
- } else {
- tracing::warn!(task_id = %task_id, "No input channel for task (task may not be running)");
- }
- }
- DaemonCommand::InjectSiblingContext { task_id, .. } => {
- tracing::debug!(task_id = %task_id, "Sibling context injection not supported for subprocess tasks");
- }
- DaemonCommand::Authenticated { daemon_id } => {
- tracing::debug!(daemon_id = %daemon_id, "Authenticated command (handled by WS client)");
- }
- DaemonCommand::Error { code, message } => {
- tracing::warn!(code = %code, message = %message, "Error command from server");
- }
-
- // =========================================================================
- // Merge Commands
- // =========================================================================
-
- DaemonCommand::ListBranches { task_id } => {
- tracing::info!(task_id = %task_id, "Listing task branches");
- self.handle_list_branches(task_id).await?;
- }
- DaemonCommand::MergeStart { task_id, source_branch } => {
- tracing::info!(task_id = %task_id, source_branch = %source_branch, "Starting merge");
- self.handle_merge_start(task_id, source_branch).await?;
- }
- DaemonCommand::MergeStatus { task_id } => {
- tracing::info!(task_id = %task_id, "Getting merge status");
- self.handle_merge_status(task_id).await?;
- }
- DaemonCommand::MergeResolve { task_id, file, strategy } => {
- tracing::info!(task_id = %task_id, file = %file, strategy = %strategy, "Resolving conflict");
- self.handle_merge_resolve(task_id, file, strategy).await?;
- }
- DaemonCommand::MergeCommit { task_id, message } => {
- tracing::info!(task_id = %task_id, "Committing merge");
- self.handle_merge_commit(task_id, message).await?;
- }
- DaemonCommand::MergeAbort { task_id } => {
- tracing::info!(task_id = %task_id, "Aborting merge");
- self.handle_merge_abort(task_id).await?;
- }
- DaemonCommand::MergeSkip { task_id, subtask_id, reason } => {
- tracing::info!(task_id = %task_id, subtask_id = %subtask_id, reason = %reason, "Skipping subtask merge");
- self.handle_merge_skip(task_id, subtask_id, reason).await?;
- }
- DaemonCommand::CheckMergeComplete { task_id } => {
- tracing::info!(task_id = %task_id, "Checking merge completion");
- self.handle_check_merge_complete(task_id).await?;
- }
-
- // =========================================================================
- // Completion Action Commands
- // =========================================================================
-
- DaemonCommand::RetryCompletionAction {
- task_id,
- task_name,
- action,
- target_repo_path,
- target_branch,
- } => {
- tracing::info!(
- task_id = %task_id,
- task_name = %task_name,
- action = %action,
- target_repo_path = %target_repo_path,
- target_branch = ?target_branch,
- "Retrying completion action"
- );
- self.handle_retry_completion_action(task_id, task_name, action, target_repo_path, target_branch).await?;
- }
-
- DaemonCommand::CloneWorktree { task_id, target_dir } => {
- tracing::info!(
- task_id = %task_id,
- target_dir = %target_dir,
- "Cloning worktree to target directory"
- );
- self.handle_clone_worktree(task_id, target_dir).await?;
- }
-
- DaemonCommand::CheckTargetExists { task_id, target_dir } => {
- tracing::debug!(
- task_id = %task_id,
- target_dir = %target_dir,
- "Checking if target directory exists"
- );
- self.handle_check_target_exists(task_id, target_dir).await?;
- }
- }
- Ok(())
- }
-
- /// Spawn a new task.
- #[allow(clippy::too_many_arguments)]
- pub async fn spawn_task(
- &self,
- task_id: Uuid,
- task_name: String,
- plan: String,
- repo_url: Option<String>,
- base_branch: Option<String>,
- target_branch: Option<String>,
- parent_task_id: Option<Uuid>,
- depth: i32,
- is_orchestrator: bool,
- target_repo_path: Option<String>,
- completion_action: Option<String>,
- continue_from_task_id: Option<Uuid>,
- copy_files: Option<Vec<String>>,
- ) -> TaskResult<()> {
- tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, depth = depth, "=== SPAWN_TASK START ===");
-
- // Check if task already exists - allow re-spawning if in terminal state
- {
- let mut tasks = self.tasks.write().await;
- if let Some(existing) = tasks.get(&task_id) {
- if existing.state.is_terminal() {
- // Task exists but is in terminal state (completed, failed, interrupted)
- // Remove it so we can re-spawn
- tracing::info!(task_id = %task_id, old_state = ?existing.state, "Removing terminated task to allow re-spawn");
- tasks.remove(&task_id);
- } else {
- // Task is still active, reject
- tracing::warn!(task_id = %task_id, state = ?existing.state, "Task already exists and is active, rejecting spawn");
- return Err(TaskError::AlreadyExists(task_id));
- }
- }
- }
-
- // Acquire semaphore permit
- tracing::info!(task_id = %task_id, "Acquiring concurrency permit...");
- let permit = self
- .semaphore
- .clone()
- .try_acquire_owned()
- .map_err(|_| {
- tracing::warn!(task_id = %task_id, "Concurrency limit reached, cannot spawn task");
- TaskError::ConcurrencyLimit
- })?;
- tracing::info!(task_id = %task_id, "Concurrency permit acquired");
-
- // Create task entry
- tracing::info!(task_id = %task_id, "Creating task entry in state: Initializing");
- let task = ManagedTask {
- id: task_id,
- state: TaskState::Initializing,
- worktree: None,
- plan: plan.clone(),
- repo_source: repo_url.clone(),
- base_branch: base_branch.clone(),
- target_branch: target_branch.clone(),
- parent_task_id,
- depth,
- is_orchestrator,
- target_repo_path: target_repo_path.clone(),
- completion_action: completion_action.clone(),
- continue_from_task_id,
- copy_files: copy_files.clone(),
- created_at: Instant::now(),
- started_at: None,
- completed_at: None,
- error: None,
- };
-
- self.tasks.write().await.insert(task_id, task);
- tracing::info!(task_id = %task_id, "Task entry created and stored");
-
- // Notify server of status change
- tracing::info!(task_id = %task_id, "Notifying server: pending -> initializing");
- self.send_status_change(task_id, "pending", "initializing").await;
-
- // Spawn task in background
- tracing::info!(task_id = %task_id, "Spawning background task runner");
- let inner = self.clone_inner();
- tokio::spawn(async move {
- let _permit = permit; // Hold permit until done
- tracing::info!(task_id = %task_id, "Background task runner started");
-
- if let Err(e) = inner.run_task(
- task_id, task_name, plan, repo_url, base_branch, target_branch,
- is_orchestrator, target_repo_path, completion_action,
- continue_from_task_id, copy_files
- ).await {
- tracing::error!(task_id = %task_id, error = %e, "Task execution failed");
- inner.mark_failed(task_id, &e.to_string()).await;
- }
- tracing::info!(task_id = %task_id, "Background task runner completed");
- });
-
- tracing::info!(task_id = %task_id, "=== SPAWN_TASK END (task running in background) ===");
- Ok(())
- }
-
- /// Clone inner state for spawned tasks.
- fn clone_inner(&self) -> TaskManagerInner {
- TaskManagerInner {
- worktree_manager: self.worktree_manager.clone(),
- process_manager: self.process_manager.clone(),
- temp_manager: self.temp_manager.clone(),
- tasks: self.tasks.clone(),
- ws_tx: self.ws_tx.clone(),
- task_inputs: self.task_inputs.clone(),
- }
- }
-
- /// Interrupt a task.
- pub async fn interrupt_task(&self, task_id: Uuid) -> TaskResult<()> {
- let mut tasks = self.tasks.write().await;
- let task = tasks.get_mut(&task_id).ok_or(TaskError::NotFound(task_id))?;
-
- if task.state.is_terminal() {
- return Ok(()); // Already done
- }
-
- let old_state = task.state;
- task.state = TaskState::Interrupted;
- task.completed_at = Some(Instant::now());
-
- // Notify server
- drop(tasks);
- self.send_status_change(task_id, old_state.as_str(), "interrupted").await;
-
- // Note: The process will be killed when the ClaudeProcess is dropped
- // Worktrees are kept until explicitly deleted
-
- Ok(())
- }
-
- /// Get list of active task IDs.
- pub async fn active_task_ids(&self) -> Vec<Uuid> {
- self.tasks
- .read()
- .await
- .iter()
- .filter(|(_, t)| t.state.is_active())
- .map(|(id, _)| *id)
- .collect()
- }
-
- /// Get task state.
- pub async fn get_task_state(&self, task_id: Uuid) -> Option<TaskState> {
- self.tasks.read().await.get(&task_id).map(|t| t.state)
- }
-
- /// Send status change notification to server.
- async fn send_status_change(&self, task_id: Uuid, old_status: &str, new_status: &str) {
- let msg = DaemonMessage::task_status_change(task_id, old_status, new_status);
- let _ = self.ws_tx.send(msg).await;
- }
-
- // =========================================================================
- // Merge Handler Methods
- // =========================================================================
-
- /// Get worktree path for a task, or return error if not found.
- /// First checks in-memory tasks, then scans the worktrees directory.
- async fn get_task_worktree_path(&self, task_id: Uuid) -> Result<std::path::PathBuf, DaemonError> {
- // First try to get from in-memory tasks
- {
- let tasks = self.tasks.read().await;
- if let Some(task) = tasks.get(&task_id) {
- if let Some(ref worktree) = task.worktree {
- return Ok(worktree.path.clone());
- }
- }
- }
-
- // Task not in memory - scan worktrees directory for matching task ID
- let short_id = &task_id.to_string()[..8];
- let worktrees_dir = self.worktree_manager.base_dir();
-
- if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await {
- while let Ok(Some(entry)) = entries.next_entry().await {
- let name = entry.file_name();
- let name_str = name.to_string_lossy();
- if name_str.starts_with(short_id) {
- let path = entry.path();
- // Verify it's a valid git directory
- if path.join(".git").exists() {
- tracing::info!(
- task_id = %task_id,
- worktree_path = %path.display(),
- "Found worktree by scanning directory"
- );
- return Ok(path);
- }
- }
- }
- }
-
- Err(DaemonError::Task(TaskError::SetupFailed(
- format!("No worktree found for task {}. The worktree may have been cleaned up.", task_id)
- )))
- }
-
- /// Handle ListBranches command.
- async fn handle_list_branches(&self, task_id: Uuid) -> Result<(), DaemonError> {
- let worktree_path = self.get_task_worktree_path(task_id).await?;
-
- match self.worktree_manager.list_task_branches(&worktree_path).await {
- Ok(branches) => {
- let branch_infos: Vec<BranchInfo> = branches
- .into_iter()
- .map(|b| BranchInfo {
- name: b.name,
- task_id: b.task_id,
- is_merged: b.is_merged,
- last_commit: b.last_commit,
- last_commit_message: b.last_commit_message,
- })
- .collect();
-
- let msg = DaemonMessage::BranchList {
- task_id,
- branches: branch_infos,
- };
- let _ = self.ws_tx.send(msg).await;
- }
- Err(e) => {
- tracing::error!(task_id = %task_id, error = %e, "Failed to list branches");
- let msg = DaemonMessage::MergeResult {
- task_id,
- success: false,
- message: e.to_string(),
- commit_sha: None,
- conflicts: None,
- };
- let _ = self.ws_tx.send(msg).await;
- }
- }
- Ok(())
- }
-
- /// Handle MergeStart command.
- async fn handle_merge_start(&self, task_id: Uuid, source_branch: String) -> Result<(), DaemonError> {
- let worktree_path = self.get_task_worktree_path(task_id).await?;
-
- match self.worktree_manager.merge_branch(&worktree_path, &source_branch).await {
- Ok(None) => {
- // Merge succeeded without conflicts
- let msg = DaemonMessage::MergeResult {
- task_id,
- success: true,
- message: "Merge completed without conflicts".to_string(),
- commit_sha: None,
- conflicts: None,
- };
- let _ = self.ws_tx.send(msg).await;
- }
- Ok(Some(conflicts)) => {
- // Merge has conflicts
- let msg = DaemonMessage::MergeResult {
- task_id,
- success: false,
- message: format!("Merge has {} conflicts", conflicts.len()),
- commit_sha: None,
- conflicts: Some(conflicts),
- };
- let _ = self.ws_tx.send(msg).await;
- }
- Err(e) => {
- let msg = DaemonMessage::MergeResult {
- task_id,
- success: false,
- message: e.to_string(),
- commit_sha: None,
- conflicts: None,
- };
- let _ = self.ws_tx.send(msg).await;
- }
- }
- Ok(())
- }
-
- /// Handle MergeStatus command.
- async fn handle_merge_status(&self, task_id: Uuid) -> Result<(), DaemonError> {
- let worktree_path = self.get_task_worktree_path(task_id).await?;
-
- match self.worktree_manager.get_merge_state(&worktree_path).await {
- Ok(state) => {
- let msg = DaemonMessage::MergeStatusResponse {
- task_id,
- in_progress: state.in_progress,
- source_branch: if state.in_progress { Some(state.source_branch) } else { None },
- conflicted_files: state.conflicted_files,
- };
- let _ = self.ws_tx.send(msg).await;
- }
- Err(e) => {
- tracing::error!(task_id = %task_id, error = %e, "Failed to get merge status");
- let msg = DaemonMessage::MergeStatusResponse {
- task_id,
- in_progress: false,
- source_branch: None,
- conflicted_files: vec![],
- };
- let _ = self.ws_tx.send(msg).await;
- }
- }
- Ok(())
- }
-
- /// Handle MergeResolve command.
- async fn handle_merge_resolve(&self, task_id: Uuid, file: String, strategy: String) -> Result<(), DaemonError> {
- let worktree_path = self.get_task_worktree_path(task_id).await?;
-
- let resolution = match strategy.to_lowercase().as_str() {
- "ours" => ConflictResolution::Ours,
- "theirs" => ConflictResolution::Theirs,
- _ => {
- let msg = DaemonMessage::MergeResult {
- task_id,
- success: false,
- message: format!("Invalid strategy '{}', must be 'ours' or 'theirs'", strategy),
- commit_sha: None,
- conflicts: None,
- };
- let _ = self.ws_tx.send(msg).await;
- return Ok(());
- }
- };
-
- match self.worktree_manager.resolve_conflict(&worktree_path, &file, resolution).await {
- Ok(()) => {
- let msg = DaemonMessage::MergeResult {
- task_id,
- success: true,
- message: format!("Resolved conflict in {}", file),
- commit_sha: None,
- conflicts: None,
- };
- let _ = self.ws_tx.send(msg).await;
- }
- Err(e) => {
- let msg = DaemonMessage::MergeResult {
- task_id,
- success: false,
- message: e.to_string(),
- commit_sha: None,
- conflicts: None,
- };
- let _ = self.ws_tx.send(msg).await;
- }
- }
- Ok(())
- }
-
- /// Handle MergeCommit command.
- async fn handle_merge_commit(&self, task_id: Uuid, message: String) -> Result<(), DaemonError> {
- let worktree_path = self.get_task_worktree_path(task_id).await?;
-
- match self.worktree_manager.commit_merge(&worktree_path, &message).await {
- Ok(commit_sha) => {
- // Track this merge as completed (extract subtask ID from branch if possible)
- // For now, we'll track it when MergeSkip is called or based on branch names
- let msg = DaemonMessage::MergeResult {
- task_id,
- success: true,
- message: "Merge committed successfully".to_string(),
- commit_sha: Some(commit_sha),
- conflicts: None,
- };
- let _ = self.ws_tx.send(msg).await;
- }
- Err(e) => {
- let msg = DaemonMessage::MergeResult {
- task_id,
- success: false,
- message: e.to_string(),
- commit_sha: None,
- conflicts: None,
- };
- let _ = self.ws_tx.send(msg).await;
- }
- }
- Ok(())
- }
-
- /// Handle MergeAbort command.
- async fn handle_merge_abort(&self, task_id: Uuid) -> Result<(), DaemonError> {
- let worktree_path = self.get_task_worktree_path(task_id).await?;
-
- match self.worktree_manager.abort_merge(&worktree_path).await {
- Ok(()) => {
- let msg = DaemonMessage::MergeResult {
- task_id,
- success: true,
- message: "Merge aborted".to_string(),
- commit_sha: None,
- conflicts: None,
- };
- let _ = self.ws_tx.send(msg).await;
- }
- Err(e) => {
- let msg = DaemonMessage::MergeResult {
- task_id,
- success: false,
- message: e.to_string(),
- commit_sha: None,
- conflicts: None,
- };
- let _ = self.ws_tx.send(msg).await;
- }
- }
- Ok(())
- }
-
- /// Handle MergeSkip command.
- async fn handle_merge_skip(&self, task_id: Uuid, subtask_id: Uuid, reason: String) -> Result<(), DaemonError> {
- // Record that this subtask was skipped
- {
- let mut trackers = self.merge_trackers.write().await;
- let tracker = trackers.entry(task_id).or_insert_with(MergeTracker::default);
- tracker.skipped_subtasks.insert(subtask_id, reason.clone());
- }
-
- let msg = DaemonMessage::MergeResult {
- task_id,
- success: true,
- message: format!("Subtask {} skipped: {}", subtask_id, reason),
- commit_sha: None,
- conflicts: None,
- };
- let _ = self.ws_tx.send(msg).await;
- Ok(())
- }
-
- /// Handle CheckMergeComplete command.
- async fn handle_check_merge_complete(&self, task_id: Uuid) -> Result<(), DaemonError> {
- let worktree_path = self.get_task_worktree_path(task_id).await?;
-
- // Get all task branches
- let branches = match self.worktree_manager.list_task_branches(&worktree_path).await {
- Ok(b) => b,
- Err(e) => {
- let msg = DaemonMessage::MergeCompleteCheck {
- task_id,
- can_complete: false,
- unmerged_branches: vec![format!("Error listing branches: {}", e)],
- merged_count: 0,
- skipped_count: 0,
- };
- let _ = self.ws_tx.send(msg).await;
- return Ok(());
- }
- };
-
- // Get tracker state
- let trackers = self.merge_trackers.read().await;
- let empty_merged: HashSet<Uuid> = HashSet::new();
- let empty_skipped: HashMap<Uuid, String> = HashMap::new();
- let tracker = trackers.get(&task_id);
- let merged_set = tracker.map(|t| &t.merged_subtasks).unwrap_or(&empty_merged);
- let skipped_set = tracker.map(|t| &t.skipped_subtasks).unwrap_or(&empty_skipped);
-
- let mut merged_count = 0u32;
- let mut skipped_count = 0u32;
- let mut unmerged_branches = Vec::new();
-
- for branch in &branches {
- if branch.is_merged {
- merged_count += 1;
- } else if let Some(subtask_id) = branch.task_id {
- if merged_set.contains(&subtask_id) {
- merged_count += 1;
- } else if skipped_set.contains_key(&subtask_id) {
- skipped_count += 1;
- } else {
- unmerged_branches.push(branch.name.clone());
- }
- } else {
- // Branch without task ID - check if it's merged
- unmerged_branches.push(branch.name.clone());
- }
- }
-
- let can_complete = unmerged_branches.is_empty();
-
- let msg = DaemonMessage::MergeCompleteCheck {
- task_id,
- can_complete,
- unmerged_branches,
- merged_count,
- skipped_count,
- };
- let _ = self.ws_tx.send(msg).await;
- Ok(())
- }
-
- /// Mark a subtask as merged in the tracker.
- #[allow(dead_code)]
- pub async fn mark_subtask_merged(&self, orchestrator_task_id: Uuid, subtask_id: Uuid) {
- let mut trackers = self.merge_trackers.write().await;
- let tracker = trackers.entry(orchestrator_task_id).or_insert_with(MergeTracker::default);
- tracker.merged_subtasks.insert(subtask_id);
- }
-
- // =========================================================================
- // Completion Action Handler Methods
- // =========================================================================
-
- /// Handle RetryCompletionAction command.
- async fn handle_retry_completion_action(
- &self,
- task_id: Uuid,
- task_name: String,
- action: String,
- target_repo_path: String,
- target_branch: Option<String>,
- ) -> Result<(), DaemonError> {
- // Get the task's worktree path
- let worktree_path = self.get_task_worktree_path(task_id).await?;
-
- // Execute the completion action
- let inner = self.clone_inner();
- let result = inner.execute_completion_action(
- task_id,
- &task_name,
- &worktree_path,
- &action,
- Some(target_repo_path.as_str()),
- target_branch.as_deref(),
- ).await;
-
- // Send result back to server
- let msg = match result {
- Ok(pr_url) => DaemonMessage::CompletionActionResult {
- task_id,
- success: true,
- message: match action.as_str() {
- "branch" => format!("Branch pushed to {}", target_repo_path),
- "merge" => format!("Merged into {}", target_branch.as_deref().unwrap_or("main")),
- "pr" => format!("Pull request created"),
- _ => format!("Completion action '{}' executed", action),
- },
- pr_url,
- },
- Err(e) => DaemonMessage::CompletionActionResult {
- task_id,
- success: false,
- message: e,
- pr_url: None,
- },
- };
- let _ = self.ws_tx.send(msg).await;
- Ok(())
- }
-
- /// Handle CloneWorktree command.
- async fn handle_clone_worktree(
- &self,
- task_id: Uuid,
- target_dir: String,
- ) -> Result<(), DaemonError> {
- // Get the task's worktree path
- let worktree_path = self.get_task_worktree_path(task_id).await?;
-
- // Expand tilde in target path
- let target_path = crate::worktree::expand_tilde(&target_dir);
-
- // Clone the worktree to target directory
- let result = self.worktree_manager.clone_worktree_to_directory(
- &worktree_path,
- &target_path,
- ).await;
-
- // Send result back to server
- let msg = match result {
- Ok(message) => DaemonMessage::CloneWorktreeResult {
- task_id,
- success: true,
- message,
- target_dir: Some(target_path.to_string_lossy().to_string()),
- },
- Err(e) => DaemonMessage::CloneWorktreeResult {
- task_id,
- success: false,
- message: e.to_string(),
- target_dir: None,
- },
- };
- let _ = self.ws_tx.send(msg).await;
- Ok(())
- }
-
- /// Handle CheckTargetExists command.
- async fn handle_check_target_exists(
- &self,
- task_id: Uuid,
- target_dir: String,
- ) -> Result<(), DaemonError> {
- // Expand tilde in target path
- let target_path = crate::worktree::expand_tilde(&target_dir);
-
- // Check if target exists
- let exists = self.worktree_manager.target_directory_exists(&target_path).await;
-
- // Send result back to server
- let msg = DaemonMessage::CheckTargetExistsResult {
- task_id,
- exists,
- target_dir: target_path.to_string_lossy().to_string(),
- };
- let _ = self.ws_tx.send(msg).await;
- Ok(())
- }
-}
-
-/// Inner state for spawned tasks (cloneable).
-struct TaskManagerInner {
- worktree_manager: Arc<WorktreeManager>,
- process_manager: Arc<ProcessManager>,
- temp_manager: Arc<TempManager>,
- tasks: Arc<RwLock<HashMap<Uuid, ManagedTask>>>,
- ws_tx: mpsc::Sender<DaemonMessage>,
- task_inputs: Arc<RwLock<HashMap<Uuid, mpsc::Sender<String>>>>,
-}
-
-impl TaskManagerInner {
- /// Run a task to completion.
- #[allow(clippy::too_many_arguments)]
- async fn run_task(
- &self,
- task_id: Uuid,
- task_name: String,
- plan: String,
- repo_source: Option<String>,
- base_branch: Option<String>,
- target_branch: Option<String>,
- is_orchestrator: bool,
- target_repo_path: Option<String>,
- completion_action: Option<String>,
- continue_from_task_id: Option<Uuid>,
- copy_files: Option<Vec<String>>,
- ) -> Result<(), DaemonError> {
- tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, "=== RUN_TASK START ===");
-
- // Determine working directory
- let working_dir = if let Some(ref source) = repo_source {
- if is_new_repo_request(source) {
- // Explicit new repo request: new:// or new://project-name
- tracing::info!(
- task_id = %task_id,
- source = %source,
- "Creating new git repository"
- );
-
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Initializing new git repository...\n"),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
-
- let worktree_info = self.worktree_manager
- .init_new_repo(task_id, source)
- .await
- .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?;
-
- tracing::info!(
- task_id = %task_id,
- path = %worktree_info.path.display(),
- "New repository created"
- );
-
- // Store worktree info
- {
- let mut tasks = self.tasks.write().await;
- if let Some(task) = tasks.get_mut(&task_id) {
- task.worktree = Some(worktree_info.clone());
- }
- }
-
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Repository ready at {}\n", worktree_info.path.display()),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
-
- worktree_info.path
- } else {
- // Send progress message
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Setting up worktree from {}...\n", source),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
-
- // Ensure source repo exists (clone if URL, verify if path)
- let source_repo = self.worktree_manager.ensure_repo(source).await
- .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?;
-
- // Detect or use provided base branch
- let branch = if let Some(ref b) = base_branch {
- b.clone()
- } else {
- self.worktree_manager.detect_default_branch(&source_repo).await
- .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?
- };
-
- tracing::info!(
- task_id = %task_id,
- source = %source,
- branch = %branch,
- continue_from_task_id = ?continue_from_task_id,
- "Setting up worktree"
- );
-
- // Create worktree - either from scratch or copying from another task
- let task_name = format!("task-{}", &task_id.to_string()[..8]);
- let worktree_info = if let Some(from_task_id) = continue_from_task_id {
- // Find the source task's worktree path
- let source_worktree = self.find_worktree_for_task(from_task_id).await
- .map_err(|e| DaemonError::Task(TaskError::SetupFailed(
- format!("Cannot continue from task {}: {}", from_task_id, e)
- )))?;
-
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Continuing from task {} worktree...\n", &from_task_id.to_string()[..8]),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
-
- // Create worktree by copying from source task
- self.worktree_manager
- .create_worktree_from_task(&source_worktree, task_id, &task_name)
- .await
- .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?
- } else {
- // Create fresh worktree from repo
- self.worktree_manager
- .create_worktree(&source_repo, task_id, &task_name, &branch)
- .await
- .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?
- };
-
- tracing::info!(
- task_id = %task_id,
- worktree_path = %worktree_info.path.display(),
- branch = %worktree_info.branch,
- continued_from = ?continue_from_task_id,
- "Worktree created"
- );
-
- // Store worktree info
- {
- let mut tasks = self.tasks.write().await;
- if let Some(task) = tasks.get_mut(&task_id) {
- task.worktree = Some(worktree_info.clone());
- }
- }
-
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Worktree ready at {}\n", worktree_info.path.display()),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
-
- worktree_info.path
- }
- } else {
- // No repo specified - use managed temp directory in ~/.makima/temp/
- tracing::info!(task_id = %task_id, "Creating managed temp directory (no repo)");
-
- let msg = DaemonMessage::task_output(
- task_id,
- "Creating temporary working directory...\n".to_string(),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
-
- let temp_dir = self.temp_manager.create_task_dir(task_id).await?;
-
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Working directory ready at {}\n", temp_dir.display()),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
-
- temp_dir
- };
-
- // Copy files from parent task's worktree if specified
- if let Some(ref files) = copy_files {
- if !files.is_empty() {
- // Get the parent task ID to find its worktree
- let parent_task_id = {
- let tasks = self.tasks.read().await;
- tasks.get(&task_id).and_then(|t| t.parent_task_id)
- };
-
- if let Some(parent_id) = parent_task_id {
- match self.find_worktree_for_task(parent_id).await {
- Ok(parent_worktree) => {
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Copying {} files from orchestrator...\n", files.len()),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
-
- for file_path in files {
- let source = parent_worktree.join(file_path);
- let dest = working_dir.join(file_path);
-
- // Create parent directories if needed
- if let Some(parent) = dest.parent() {
- if let Err(e) = tokio::fs::create_dir_all(parent).await {
- tracing::warn!(
- task_id = %task_id,
- file = %file_path,
- error = %e,
- "Failed to create parent directory for file"
- );
- continue;
- }
- }
-
- // Copy the file
- match tokio::fs::copy(&source, &dest).await {
- Ok(_) => {
- tracing::info!(
- task_id = %task_id,
- source = %source.display(),
- dest = %dest.display(),
- "Copied file from orchestrator"
- );
- }
- Err(e) => {
- tracing::warn!(
- task_id = %task_id,
- source = %source.display(),
- dest = %dest.display(),
- error = %e,
- "Failed to copy file from orchestrator"
- );
- // Notify but don't fail - the file might be optional
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Warning: Could not copy {}: {}\n", file_path, e),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
- }
- }
- }
-
- let msg = DaemonMessage::task_output(
- task_id,
- "Files copied from orchestrator.\n".to_string(),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
- }
- Err(e) => {
- tracing::warn!(
- task_id = %task_id,
- parent_id = %parent_id,
- error = %e,
- "Could not find parent task worktree for file copying"
- );
- }
- }
- } else {
- tracing::warn!(
- task_id = %task_id,
- "copy_files specified but no parent_task_id"
- );
- }
- }
- }
-
- // Update state to Starting
- tracing::info!(task_id = %task_id, "Updating state: Initializing -> Starting");
- self.update_state(task_id, TaskState::Starting).await;
- self.send_status_change(task_id, "initializing", "starting").await;
-
- // Check Claude is available
- match self.process_manager.check_claude_available().await {
- Ok(version) => {
- tracing::info!(task_id = %task_id, version = %version, "Claude Code available");
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Claude Code {} ready\n", version),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
- }
- Err(e) => {
- let err_msg = format!("Claude Code not available: {}", e);
- tracing::error!(task_id = %task_id, error = %err_msg);
- return Err(DaemonError::Task(TaskError::SetupFailed(err_msg)));
- }
- }
-
- // Set up orchestrator mode if needed
- let (extra_env, full_plan) = if is_orchestrator {
- tracing::info!(task_id = %task_id, working_dir = %working_dir.display(), "Setting up orchestrator mode");
-
- let msg = DaemonMessage::task_output(
- task_id,
- "Setting up orchestrator environment...\n".to_string(),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
-
- // Generate tool key for API access
- let tool_key = generate_tool_key();
- tracing::info!(task_id = %task_id, tool_key_len = tool_key.len(), "Generated tool key for orchestrator");
-
- // Register tool key with server
- let register_msg = DaemonMessage::register_tool_key(task_id, tool_key.clone());
- if self.ws_tx.send(register_msg).await.is_err() {
- tracing::warn!(task_id = %task_id, "Failed to register tool key");
- } else {
- tracing::info!(task_id = %task_id, "Tool key registration message sent to server");
- }
-
- // Create .makima directory and helper script
- let makima_dir = working_dir.join(".makima");
- if let Err(e) = tokio::fs::create_dir_all(&makima_dir).await {
- tracing::warn!(task_id = %task_id, makima_dir = %makima_dir.display(), "Failed to create .makima directory: {}", e);
- } else {
- tracing::info!(task_id = %task_id, makima_dir = %makima_dir.display(), "Created .makima directory");
- }
-
- let script_path = makima_dir.join("orchestrate.sh");
- if let Err(e) = tokio::fs::write(&script_path, ORCHESTRATE_SCRIPT).await {
- tracing::warn!(task_id = %task_id, script_path = %script_path.display(), "Failed to write orchestrate.sh: {}", e);
- } else {
- tracing::info!(task_id = %task_id, script_path = %script_path.display(), script_size = ORCHESTRATE_SCRIPT.len(), "Wrote orchestrate.sh");
- // Make script executable
- #[cfg(unix)]
- {
- use std::os::unix::fs::PermissionsExt;
- if let Err(e) = std::fs::set_permissions(&script_path, std::fs::Permissions::from_mode(0o755)) {
- tracing::warn!(task_id = %task_id, "Failed to set script permissions: {}", e);
- } else {
- tracing::info!(task_id = %task_id, "Set orchestrate.sh executable (0o755)");
- }
- }
- }
-
- // Set up environment variables
- let mut env = HashMap::new();
- // TODO: Make API URL configurable
- env.insert("MAKIMA_API_URL".to_string(), "http://localhost:8080".to_string());
- env.insert("MAKIMA_API_KEY".to_string(), tool_key.clone());
- env.insert("MAKIMA_TASK_ID".to_string(), task_id.to_string());
-
- tracing::info!(
- task_id = %task_id,
- api_url = "http://localhost:8080",
- tool_key_preview = &tool_key[..8.min(tool_key.len())],
- "Set orchestrator environment variables"
- );
-
- // Prepend orchestrator instructions to the plan
- let orchestrator_plan = format!(
- "{}\n\n---\n\nYour task:\n{}",
- ORCHESTRATOR_SYSTEM_PROMPT,
- plan
- );
-
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Orchestrator environment ready (script at {})\n", script_path.display()),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
-
- (Some(env), orchestrator_plan)
- } else {
- tracing::info!(task_id = %task_id, "Running as regular subtask (not orchestrator)");
- // Prepend subtask instructions to ensure worktree isolation
- let subtask_plan = format!(
- "{}\nYour task:\n{}",
- SUBTASK_SYSTEM_PROMPT,
- plan
- );
- (None, subtask_plan)
- };
-
- // Spawn Claude process
- let plan_bytes = full_plan.len();
- let plan_chars = full_plan.chars().count();
- // Rough token estimate: ~4 chars per token for English
- let estimated_tokens = plan_chars / 4;
-
- tracing::info!(
- task_id = %task_id,
- working_dir = %working_dir.display(),
- is_orchestrator = is_orchestrator,
- plan_bytes = plan_bytes,
- plan_chars = plan_chars,
- estimated_tokens = estimated_tokens,
- "Spawning Claude process"
- );
-
- // Warn if plan is very large (Claude's context is typically 100k-200k tokens)
- if estimated_tokens > 50_000 {
- tracing::warn!(task_id = %task_id, estimated_tokens = estimated_tokens, "Plan is very large - may hit context limits!");
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Warning: Plan is very large (~{} tokens). This may cause issues.\n", estimated_tokens),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
- }
-
- let msg = DaemonMessage::task_output(
- task_id,
- if is_orchestrator {
- format!("Starting Claude Code (orchestrator mode, ~{} tokens)...\n", estimated_tokens)
- } else {
- format!("Starting Claude Code (~{} tokens)...\n", estimated_tokens)
- },
- false,
- );
- let _ = self.ws_tx.send(msg).await;
-
- tracing::debug!(task_id = %task_id, "Calling process_manager.spawn()...");
- let mut process = self.process_manager
- .spawn(&working_dir, &full_plan, extra_env)
- .await
- .map_err(|e| {
- tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process");
- DaemonError::Task(TaskError::SetupFailed(e.to_string()))
- })?;
- tracing::info!(task_id = %task_id, "Claude process spawned successfully");
-
- // Set up input channel for this task so we can send messages to its stdin
- tracing::debug!(task_id = %task_id, "Setting up input channel...");
- let (input_tx, mut input_rx) = mpsc::channel::<String>(100);
- tracing::debug!(task_id = %task_id, "Acquiring task_inputs write lock...");
- self.task_inputs.write().await.insert(task_id, input_tx);
- tracing::debug!(task_id = %task_id, "Input channel registered");
-
- // Get stdin handle for input forwarding and completion signaling
- let stdin_handle = process.stdin_handle();
- let stdin_handle_for_completion = stdin_handle.clone();
-
- tracing::info!(task_id = %task_id, "Setting up stdin forwarder for task input (JSON protocol)");
- tokio::spawn(async move {
- tracing::info!(task_id = %task_id, "Stdin forwarder task started, waiting for messages...");
- while let Some(msg) = input_rx.recv().await {
- tracing::info!(task_id = %task_id, msg_len = msg.len(), msg_preview = %if msg.len() > 50 { &msg[..50] } else { &msg }, "Received message from input channel");
-
- // Format as JSON user message for stream-json input protocol
- let json_msg = ClaudeInputMessage::user(&msg);
- let json_line = match json_msg.to_json_line() {
- Ok(line) => line,
- Err(e) => {
- tracing::error!(task_id = %task_id, error = %e, "Failed to serialize input message");
- continue;
- }
- };
-
- tracing::debug!(task_id = %task_id, json_line = %json_line.trim(), "Formatted JSON line for stdin");
-
- let mut stdin_guard = stdin_handle.lock().await;
- if let Some(ref mut stdin) = *stdin_guard {
- tracing::debug!(task_id = %task_id, "Acquired stdin lock, writing...");
- if stdin.write_all(json_line.as_bytes()).await.is_err() {
- tracing::warn!(task_id = %task_id, "Failed to write to stdin, breaking");
- break;
- }
- if stdin.flush().await.is_err() {
- tracing::warn!(task_id = %task_id, "Failed to flush stdin, breaking");
- break;
- }
- tracing::info!(task_id = %task_id, json_len = json_line.len(), "Successfully wrote user message to Claude stdin");
- } else {
- tracing::warn!(task_id = %task_id, "Stdin is None (already closed), cannot send message");
- break;
- }
- }
- tracing::info!(task_id = %task_id, "Stdin forwarder task ended (channel closed or stdin unavailable)");
- });
-
- // Update state to Running
- {
- tracing::debug!(task_id = %task_id, "Acquiring tasks write lock for Running state update");
- let mut tasks = self.tasks.write().await;
- if let Some(task) = tasks.get_mut(&task_id) {
- task.state = TaskState::Running;
- task.started_at = Some(Instant::now());
- }
- tracing::debug!(task_id = %task_id, "Released tasks write lock");
- }
- tracing::info!(task_id = %task_id, "Updating state: Starting -> Running");
- self.send_status_change(task_id, "starting", "running").await;
- tracing::debug!(task_id = %task_id, "Sent status change notification");
-
- // Stream output with startup timeout check
- tracing::info!(task_id = %task_id, "Starting output stream - waiting for Claude output...");
- tracing::debug!(task_id = %task_id, "Output will be forwarded via WebSocket to server");
- let ws_tx = self.ws_tx.clone();
-
- let mut output_count = 0u64;
- let mut output_bytes = 0usize;
- let startup_timeout = tokio::time::Duration::from_secs(30);
- let mut startup_check = tokio::time::interval(tokio::time::Duration::from_secs(5));
- startup_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
- let startup_deadline = tokio::time::Instant::now() + startup_timeout;
-
- loop {
- tokio::select! {
- maybe_line = process.next_output() => {
- match maybe_line {
- Some(line) => {
- output_count += 1;
- output_bytes += line.content.len();
-
- if output_count == 1 {
- tracing::info!(task_id = %task_id, "Received first output line from Claude");
- }
- if output_count % 100 == 0 {
- tracing::debug!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output progress");
- }
-
- // Log output details for debugging
- tracing::trace!(
- task_id = %task_id,
- line_num = output_count,
- content_len = line.content.len(),
- is_stdout = line.is_stdout,
- json_type = ?line.json_type,
- "Forwarding output to WebSocket"
- );
-
- // Check if this is a "result" message indicating task completion
- // With --input-format=stream-json, Claude waits for more input after completion
- // We close stdin to signal EOF and let the process exit
- if line.json_type.as_deref() == Some("result") {
- tracing::info!(task_id = %task_id, "Received result message, closing stdin to signal completion");
- let mut stdin_guard = stdin_handle_for_completion.lock().await;
- if let Some(mut stdin) = stdin_guard.take() {
- let _ = stdin.shutdown().await;
- }
- }
-
- let msg = DaemonMessage::task_output(task_id, line.content, false);
- if ws_tx.send(msg).await.is_err() {
- tracing::warn!(task_id = %task_id, "Failed to send output, channel closed");
- break;
- }
- }
- None => {
- tracing::info!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output stream ended");
- break;
- }
- }
- }
- _ = startup_check.tick(), if output_count == 0 => {
- // Check if process is still alive
- match process.try_wait() {
- Ok(Some(exit_code)) => {
- tracing::error!(task_id = %task_id, exit_code = exit_code, "Claude process exited before producing output!");
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Error: Claude process exited unexpectedly with code {}\n", exit_code),
- false,
- );
- let _ = ws_tx.send(msg).await;
- break;
- }
- Ok(None) => {
- // Still running but no output
- if tokio::time::Instant::now() > startup_deadline {
- tracing::warn!(task_id = %task_id, "Claude process not producing output after 30s - may be stuck");
- let msg = DaemonMessage::task_output(
- task_id,
- "Warning: Claude Code is taking longer than expected to start. It may be waiting for authentication or network access.\n".to_string(),
- false,
- );
- let _ = ws_tx.send(msg).await;
- } else {
- tracing::debug!(task_id = %task_id, "Claude process still running, waiting for output...");
- }
- }
- Err(e) => {
- tracing::error!(task_id = %task_id, error = %e, "Failed to check Claude process status");
- }
- }
- }
- }
- }
-
- // Wait for process to exit
- let exit_code = process.wait().await.unwrap_or(-1);
-
- // Clean up input channel for this task
- self.task_inputs.write().await.remove(&task_id);
- tracing::debug!(task_id = %task_id, "Removed task input channel");
-
- // Update state based on exit code
- let success = exit_code == 0;
- let new_state = if success {
- TaskState::Completed
- } else {
- TaskState::Failed
- };
-
- tracing::info!(
- task_id = %task_id,
- exit_code = exit_code,
- success = success,
- new_state = ?new_state,
- "Claude process exited, updating task state"
- );
-
- {
- let mut tasks = self.tasks.write().await;
- if let Some(task) = tasks.get_mut(&task_id) {
- task.state = new_state;
- task.completed_at = Some(Instant::now());
- if !success {
- task.error = Some(format!("Process exited with code {}", exit_code));
- }
- }
- }
-
- // Execute completion action if task succeeded
- let completion_result = if success {
- if let Some(ref action) = completion_action {
- if action != "none" {
- self.execute_completion_action(
- task_id,
- &task_name,
- &working_dir,
- action,
- target_repo_path.as_deref(),
- target_branch.as_deref(),
- ).await
- } else {
- Ok(None)
- }
- } else {
- Ok(None)
- }
- } else {
- Ok(None)
- };
-
- // Log completion action result
- match &completion_result {
- Ok(Some(pr_url)) => {
- tracing::info!(task_id = %task_id, pr_url = %pr_url, "Completion action created PR");
- }
- Ok(None) => {}
- Err(e) => {
- tracing::warn!(task_id = %task_id, error = %e, "Completion action failed (task still marked as done)");
- }
- }
-
- // Notify server
- let error = if success {
- None
- } else {
- Some(format!("Exit code: {}", exit_code))
- };
- tracing::info!(task_id = %task_id, success = success, "Notifying server of task completion");
- let msg = DaemonMessage::task_complete(task_id, success, error);
- let _ = self.ws_tx.send(msg).await;
-
- // Note: Worktrees are kept until explicitly deleted (per user preference)
- // This allows inspection, PR creation, etc.
-
- tracing::info!(task_id = %task_id, "=== RUN_TASK END ===");
- Ok(())
- }
-
- /// Execute the completion action for a task.
- async fn execute_completion_action(
- &self,
- task_id: Uuid,
- task_name: &str,
- worktree_path: &std::path::Path,
- action: &str,
- target_repo_path: Option<&str>,
- target_branch: Option<&str>,
- ) -> Result<Option<String>, String> {
- let target_repo = match target_repo_path {
- Some(path) => crate::worktree::expand_tilde(path),
- None => {
- tracing::warn!(task_id = %task_id, "No target_repo_path configured, skipping completion action");
- return Ok(None);
- }
- };
-
- if !target_repo.exists() {
- return Err(format!("Target repo not found: {} (expanded from {:?})", target_repo.display(), target_repo_path));
- }
-
- // Get the branch name: makima/{task-name-with-dashes}-{short-id}
- let branch_name = format!(
- "makima/{}-{}",
- crate::worktree::sanitize_name(task_name),
- crate::worktree::short_uuid(task_id)
- );
-
- // Determine target branch - use provided value or detect default branch of target repo
- let target_branch = match target_branch {
- Some(branch) => branch.to_string(),
- None => {
- // Detect default branch (main, master, develop, etc.)
- self.worktree_manager
- .detect_default_branch(&target_repo)
- .await
- .unwrap_or_else(|_| "master".to_string())
- }
- };
-
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Executing completion action: {}...\n", action),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
-
- match action {
- "branch" => {
- // Just push the branch to target repo
- self.worktree_manager
- .push_to_target_repo(worktree_path, &target_repo, &branch_name, task_name)
- .await
- .map_err(|e| e.to_string())?;
-
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Branch '{}' pushed to {}\n", branch_name, target_repo.display()),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
- Ok(None)
- }
- "merge" => {
- // Push and merge into target branch
- let commit_sha = self.worktree_manager
- .merge_to_target(worktree_path, &target_repo, &branch_name, &target_branch, task_name)
- .await
- .map_err(|e| e.to_string())?;
-
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Branch merged into {} (commit: {})\n", target_branch, commit_sha),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
- Ok(None)
- }
- "pr" => {
- // Push and create PR
- let title = task_name.to_string();
- let body = format!(
- "Automated PR from makima task.\n\nTask ID: `{}`",
- task_id
- );
- let pr_url = self.worktree_manager
- .create_pull_request(
- worktree_path,
- &target_repo,
- &branch_name,
- &target_branch,
- &title,
- &body,
- )
- .await
- .map_err(|e| e.to_string())?;
-
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Pull request created: {}\n", pr_url),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
- Ok(Some(pr_url))
- }
- _ => {
- tracing::warn!(task_id = %task_id, action = %action, "Unknown completion action");
- Ok(None)
- }
- }
- }
-
- /// Find worktree path for a task ID.
- /// First checks in-memory tasks, then scans the worktrees directory.
- async fn find_worktree_for_task(&self, task_id: Uuid) -> Result<PathBuf, String> {
- // First try to get from in-memory tasks
- {
- let tasks = self.tasks.read().await;
- if let Some(task) = tasks.get(&task_id) {
- if let Some(ref worktree) = task.worktree {
- return Ok(worktree.path.clone());
- }
- }
- }
-
- // Task not in memory - scan worktrees directory for matching task ID
- let short_id = &task_id.to_string()[..8];
- let worktrees_dir = self.worktree_manager.base_dir();
-
- if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await {
- while let Ok(Some(entry)) = entries.next_entry().await {
- let name = entry.file_name();
- let name_str = name.to_string_lossy();
- if name_str.starts_with(short_id) {
- let path = entry.path();
- // Verify it's a valid git directory
- if path.join(".git").exists() {
- tracing::info!(
- task_id = %task_id,
- worktree_path = %path.display(),
- "Found worktree by scanning directory"
- );
- return Ok(path);
- }
- }
- }
- }
-
- Err(format!(
- "No worktree found for task {}. The worktree may have been cleaned up.",
- task_id
- ))
- }
-
- async fn update_state(&self, task_id: Uuid, state: TaskState) {
- let mut tasks = self.tasks.write().await;
- if let Some(task) = tasks.get_mut(&task_id) {
- task.state = state;
- }
- }
-
- async fn send_status_change(&self, task_id: Uuid, old_status: &str, new_status: &str) {
- let msg = DaemonMessage::task_status_change(task_id, old_status, new_status);
- let _ = self.ws_tx.send(msg).await;
- }
-
- /// Mark task as failed.
- async fn mark_failed(&self, task_id: Uuid, error: &str) {
- {
- let mut tasks = self.tasks.write().await;
- if let Some(task) = tasks.get_mut(&task_id) {
- task.state = TaskState::Failed;
- task.error = Some(error.to_string());
- task.completed_at = Some(Instant::now());
- }
- }
-
- // Notify server
- let msg = DaemonMessage::task_complete(task_id, false, Some(error.to_string()));
- let _ = self.ws_tx.send(msg).await;
- }
-}
-
-impl Clone for TaskManagerInner {
- fn clone(&self) -> Self {
- Self {
- worktree_manager: self.worktree_manager.clone(),
- process_manager: self.process_manager.clone(),
- temp_manager: self.temp_manager.clone(),
- tasks: self.tasks.clone(),
- ws_tx: self.ws_tx.clone(),
- task_inputs: self.task_inputs.clone(),
- }
- }
-}
diff --git a/makima/daemon/src/task/mod.rs b/makima/daemon/src/task/mod.rs
deleted file mode 100644
index 29c261e..0000000
--- a/makima/daemon/src/task/mod.rs
+++ /dev/null
@@ -1,7 +0,0 @@
-//! Task management and execution.
-
-pub mod manager;
-pub mod state;
-
-pub use manager::{ManagedTask, TaskConfig, TaskManager};
-pub use state::TaskState;
diff --git a/makima/daemon/src/task/state.rs b/makima/daemon/src/task/state.rs
deleted file mode 100644
index fe73de1..0000000
--- a/makima/daemon/src/task/state.rs
+++ /dev/null
@@ -1,161 +0,0 @@
-//! Task state machine.
-
-use std::fmt;
-
-/// Task execution state.
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
-pub enum TaskState {
- /// Task received, preparing overlay.
- Initializing,
- /// Overlay ready, starting container.
- Starting,
- /// Container running.
- Running,
- /// Container paused.
- Paused,
- /// Waiting for sibling or resource.
- Blocked,
- /// Task completed successfully.
- Completed,
- /// Task failed with error.
- Failed,
- /// Task interrupted by user.
- Interrupted,
-}
-
-impl TaskState {
- /// Check if a state transition is valid.
- pub fn can_transition_to(&self, target: TaskState) -> bool {
- use TaskState::*;
-
- matches!(
- (self, target),
- // From Initializing
- (Initializing, Starting)
- | (Initializing, Failed)
- | (Initializing, Interrupted)
- // From Starting
- | (Starting, Running)
- | (Starting, Failed)
- | (Starting, Interrupted)
- // From Running
- | (Running, Paused)
- | (Running, Blocked)
- | (Running, Completed)
- | (Running, Failed)
- | (Running, Interrupted)
- // From Paused
- | (Paused, Running)
- | (Paused, Interrupted)
- | (Paused, Failed)
- // From Blocked
- | (Blocked, Running)
- | (Blocked, Failed)
- | (Blocked, Interrupted)
- )
- }
-
- /// Check if this state is terminal (no more transitions possible).
- pub fn is_terminal(&self) -> bool {
- matches!(
- self,
- TaskState::Completed | TaskState::Failed | TaskState::Interrupted
- )
- }
-
- /// Check if the task is currently active (running or paused).
- pub fn is_active(&self) -> bool {
- matches!(
- self,
- TaskState::Initializing
- | TaskState::Starting
- | TaskState::Running
- | TaskState::Paused
- | TaskState::Blocked
- )
- }
-
- /// Check if the task is running.
- pub fn is_running(&self) -> bool {
- matches!(self, TaskState::Running)
- }
-
- /// Convert to string for protocol messages.
- pub fn as_str(&self) -> &'static str {
- match self {
- TaskState::Initializing => "initializing",
- TaskState::Starting => "starting",
- TaskState::Running => "running",
- TaskState::Paused => "paused",
- TaskState::Blocked => "blocked",
- TaskState::Completed => "done",
- TaskState::Failed => "failed",
- TaskState::Interrupted => "interrupted",
- }
- }
-
- /// Parse from string.
- pub fn from_str(s: &str) -> Option<Self> {
- match s.to_lowercase().as_str() {
- "initializing" => Some(TaskState::Initializing),
- "starting" => Some(TaskState::Starting),
- "running" => Some(TaskState::Running),
- "paused" => Some(TaskState::Paused),
- "blocked" => Some(TaskState::Blocked),
- "done" | "completed" => Some(TaskState::Completed),
- "failed" => Some(TaskState::Failed),
- "interrupted" => Some(TaskState::Interrupted),
- _ => None,
- }
- }
-}
-
-impl fmt::Display for TaskState {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(f, "{}", self.as_str())
- }
-}
-
-impl Default for TaskState {
- fn default() -> Self {
- TaskState::Initializing
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_valid_transitions() {
- use TaskState::*;
-
- // Valid transitions
- assert!(Initializing.can_transition_to(Starting));
- assert!(Starting.can_transition_to(Running));
- assert!(Running.can_transition_to(Completed));
- assert!(Running.can_transition_to(Paused));
- assert!(Paused.can_transition_to(Running));
-
- // Invalid transitions
- assert!(!Completed.can_transition_to(Running));
- assert!(!Failed.can_transition_to(Running));
- assert!(!Running.can_transition_to(Initializing));
- }
-
- #[test]
- fn test_terminal_states() {
- assert!(TaskState::Completed.is_terminal());
- assert!(TaskState::Failed.is_terminal());
- assert!(TaskState::Interrupted.is_terminal());
- assert!(!TaskState::Running.is_terminal());
- assert!(!TaskState::Paused.is_terminal());
- }
-
- #[test]
- fn test_parse() {
- assert_eq!(TaskState::from_str("running"), Some(TaskState::Running));
- assert_eq!(TaskState::from_str("done"), Some(TaskState::Completed));
- assert_eq!(TaskState::from_str("invalid"), None);
- }
-}