diff options
Diffstat (limited to 'makima/daemon/src/task')
| -rw-r--r-- | makima/daemon/src/task/manager.rs | 2248 | ||||
| -rw-r--r-- | makima/daemon/src/task/mod.rs | 7 | ||||
| -rw-r--r-- | makima/daemon/src/task/state.rs | 161 |
3 files changed, 0 insertions, 2416 deletions
diff --git a/makima/daemon/src/task/manager.rs b/makima/daemon/src/task/manager.rs deleted file mode 100644 index 4979ce7..0000000 --- a/makima/daemon/src/task/manager.rs +++ /dev/null @@ -1,2248 +0,0 @@ -//! Task lifecycle manager using git worktrees and Claude Code subprocesses. - -use std::collections::HashMap; -use std::path::PathBuf; -use std::sync::Arc; -use std::time::Instant; - -use rand::Rng; -use tokio::io::AsyncWriteExt; -use tokio::sync::{mpsc, RwLock, Semaphore}; -use uuid::Uuid; - -use std::collections::HashSet; - -use super::state::TaskState; -use crate::error::{DaemonError, TaskError, TaskResult}; -use crate::process::{ClaudeInputMessage, ProcessManager}; -use crate::temp::TempManager; -use crate::worktree::{is_new_repo_request, ConflictResolution, WorktreeInfo, WorktreeManager}; -use crate::ws::{BranchInfo, DaemonCommand, DaemonMessage}; - -/// Generate a secure random API key for orchestrator tool access. -fn generate_tool_key() -> String { - let mut rng = rand::rng(); - let bytes: [u8; 32] = rng.random(); - hex::encode(bytes) -} - -/// System prompt for regular (non-orchestrator) subtasks. -/// This ensures subtasks work only within their isolated worktree directory. -const SUBTASK_SYSTEM_PROMPT: &str = r#"You are working in an isolated worktree directory that contains a snapshot of the codebase. - -## IMPORTANT: Directory Restrictions - -**You MUST only work within the current working directory (your worktree).** - -- DO NOT use `cd` to navigate to directories outside your worktree -- DO NOT use absolute paths that point outside your worktree (e.g., don't write to ~/some/path, /tmp, or the original repository) -- DO NOT modify files in parent directories or sibling directories -- All your file operations should be relative to the current directory - -Your working directory is your sandboxed workspace. When you complete your task, your changes will be reviewed and integrated by the orchestrator. - -**Why?** Your worktree is isolated so that: -1. Your changes don't affect other running tasks -2. Changes can be reviewed before integration -3. Multiple tasks can work on the codebase in parallel without conflicts - ---- - -"#; - -/// The orchestrator system prompt that tells Claude how to use the helper script. -const ORCHESTRATOR_SYSTEM_PROMPT: &str = r#"You are an orchestrator task. Your job is to coordinate subtasks and integrate their work, NOT to write code directly. - -## FIRST STEP - -Start by checking if you have existing subtasks: - -```bash -# List all subtasks to see what work needs to be done -./.makima/orchestrate.sh list -``` - -If subtasks exist, start them. If you need additional subtasks or no subtasks exist yet, you can create them. - ---- - -## Creating Subtasks - -You can create new subtasks to break down work: - -```bash -# Create a new subtask with a name and plan -./.makima/orchestrate.sh create "Subtask Name" "Detailed plan for what the subtask should do..." - -# The command returns the new subtask ID - use it to start the subtask -./.makima/orchestrate.sh start <new_subtask_id> -``` - -Create subtasks when you need to: -- Break down complex work into smaller pieces -- Run multiple tasks in parallel on different parts of the codebase -- Delegate specific implementation work - -## Task Continuation (Sequential Dependencies) - -When subtasks need to build on each other's work (e.g., Task B depends on Task A's changes), use `--continue-from`: - -```bash -# Create Task B that continues from Task A's worktree -./.makima/orchestrate.sh create "Task B" "Build on Task A's work..." --continue-from <task_a_id> -``` - -This copies all files from Task A's worktree into Task B's worktree, so Task B starts with Task A's changes. - -**When to use continuation:** -- Sequential work: Task B needs Task A's output files -- Staged implementation: Building features incrementally -- Fix-and-extend: One task fixes issues, another adds features on top - -**When NOT to use continuation:** -- Parallel tasks working on different files -- Independent subtasks that can be merged separately - -**Important for merging:** When tasks continue from each other, only merge the FINAL task in the chain. Earlier tasks' changes are already included in later tasks' worktrees. - -## Sharing Files with Subtasks - -Use `--files` to copy specific files from your orchestrator worktree to subtasks. This is useful for sharing plans, configs, or data files: - -```bash -# Create subtask with specific files copied from orchestrator -./.makima/orchestrate.sh create "Implement Feature" "Follow the plan in PLAN.md" --files "PLAN.md" - -# Copy multiple files (comma-separated) -./.makima/orchestrate.sh create "API Work" "Use the spec..." --files "PLAN.md,api-spec.yaml,types.ts" - -# Combine with --continue-from to share files AND continue from another task -./.makima/orchestrate.sh create "Step 2" "Continue..." --continue-from <task_a_id> --files "requirements.md" -``` - -**Use cases for --files:** -- Share a PLAN.md with detailed implementation steps -- Distribute configuration or spec files -- Pass generated data or intermediate results - -## How Subtasks Work - -Each subtask runs in its own **worktree** - a separate directory with a copy of the codebase. When subtasks complete: -- Their work remains in the worktree files (NOT committed to git) -- **Subtasks do NOT auto-merge** - YOU must integrate their work into your worktree -- You can view and copy files from subtask worktrees using their paths -- The worktree path is returned when you get subtask status - -**IMPORTANT:** Subtasks never create PRs or merge to the target repository. Only the orchestrator (you) can trigger completion actions like PR creation or merging after integrating all subtask work. - -## Subtask Commands -```bash -# List all subtasks and their current status -./.makima/orchestrate.sh list - -# Create a new subtask (returns the subtask ID) -./.makima/orchestrate.sh create "Name" "Plan/description" - -# Create a subtask that continues from another task's worktree -./.makima/orchestrate.sh create "Name" "Plan" --continue-from <other_task_id> - -# Create a subtask with specific files copied from orchestrator worktree -./.makima/orchestrate.sh create "Name" "Plan" --files "file1.md,file2.yaml" - -# Start a specific subtask (it will run in its own Claude instance) -./.makima/orchestrate.sh start <subtask_id> - -# Stop a running subtask -./.makima/orchestrate.sh stop <subtask_id> - -# Get detailed status of a subtask (includes worktree_path when available) -./.makima/orchestrate.sh status <subtask_id> - -# Get the output/logs of a subtask -./.makima/orchestrate.sh output <subtask_id> - -# Get the worktree path for a subtask -./.makima/orchestrate.sh worktree <subtask_id> -``` - -## Integrating Subtask Work - -When subtasks complete, their changes exist as files in their worktree directories: -- Files are NOT committed to git branches -- You must copy/integrate files from subtask worktrees into your worktree -- Use standard file operations (cp, cat, etc.) to review and integrate changes - -### Handling Continuation Chains - -**CRITICAL:** When subtasks use `--continue-from`, they form a chain where each task includes all changes from previous tasks. You must ONLY integrate the FINAL task in each chain. - -Example chain: Task A → Task B (continues from A) → Task C (continues from B) -- Task C's worktree contains ALL changes from A, B, and C -- You should ONLY integrate Task C's worktree -- DO NOT integrate Task A or Task B separately (their changes are already in C) - -**How to track continuation chains:** -1. When you create tasks with `--continue-from`, note which task continues from which -2. Build a mental model: Independent tasks (no continuation) + Continuation chains -3. For each chain, only integrate the LAST task in the chain - -**Example with mixed independent and chained tasks:** -``` -Independent tasks (integrate all): -- Task X: API endpoints -- Task Y: Database models - -Continuation chain (integrate ONLY the last one): -- Task A: Core feature → Task B: Tests (continues from A) → Task C: Docs (continues from B) - Only integrate Task C! -``` - -### Integration Examples - -For independent subtasks (no continuation): -```bash -# Get the worktree path for a completed subtask -SUBTASK_PATH=$(./.makima/orchestrate.sh worktree <subtask_id>) - -# View what files were changed -ls -la "$SUBTASK_PATH" -diff -r . "$SUBTASK_PATH" --exclude=.git --exclude=.makima - -# Copy specific files from subtask -cp "$SUBTASK_PATH/src/new_file.rs" ./src/ -cp "$SUBTASK_PATH/src/modified_file.rs" ./src/ - -# Or use diff/patch for more control -diff -u ./src/file.rs "$SUBTASK_PATH/src/file.rs" > changes.patch -patch -p0 < changes.patch -``` - -For continuation chains (only integrate the final task): -```bash -# If you have: Task A → Task B → Task C (each continues from previous) -# ONLY get and integrate Task C's worktree - it has everything! - -FINAL_TASK_PATH=$(./.makima/orchestrate.sh worktree <task_c_id>) - -# Copy all changes from the final task -rsync -av --exclude='.git' --exclude='.makima' "$FINAL_TASK_PATH/" ./ -``` - -## Completion -```bash -# Mark yourself as complete after integrating all subtask work -./.makima/orchestrate.sh done "Summary of what was accomplished" -``` - -## Workflow -1. **List existing subtasks**: Run `list` to see current subtasks -2. **Create subtasks if needed**: Use `create` to add new subtasks for the work - - For independent parallel work: create without `--continue-from` - - For sequential dependencies: use `--continue-from <previous_task_id>` - - Track which tasks continue from which (continuation chains) -3. **Start subtasks**: Run `start` for each subtask -4. **Monitor progress**: Check status and output as subtasks run -5. **Integrate work**: When subtasks complete: - - For independent tasks: integrate each one's worktree - - For continuation chains: ONLY integrate the FINAL task (it has all changes) - - Get worktree path with `worktree <subtask_id>` - - Copy or merge files into your worktree -6. **Complete**: Call `done` once all work is integrated - -## Important Notes -- Subtask files are in worktrees, NOT committed git branches -- **Subtasks do NOT auto-merge or create PRs** - you must integrate their work -- You can read files from subtask worktrees using their paths -- Use standard file tools (cp, diff, cat, rsync) to integrate changes -- You should NOT edit files directly - that's what subtasks are for -- DO NOT DO THE SUBTASKS' WORK! Your only job is to coordinate, not implement. -- When you call `done`, YOUR worktree may be used for the final PR/merge -"#; - -/// Content of the helper bash script that orchestrators use to call the API. -const ORCHESTRATE_SCRIPT: &str = r#"#!/bin/bash -# Makima Orchestrator Helper Script -# Usage: ./orchestrate.sh <command> [args...] - -API_URL="${MAKIMA_API_URL:-http://localhost:8080}" -API_KEY="${MAKIMA_API_KEY}" -TASK_ID="${MAKIMA_TASK_ID}" - -if [ -z "$API_KEY" ]; then - echo "Error: MAKIMA_API_KEY not set" >&2 - exit 1 -fi - -if [ -z "$TASK_ID" ]; then - echo "Error: MAKIMA_TASK_ID not set" >&2 - exit 1 -fi - -# Helper function to make API calls and check for errors -api_call() { - local method="$1" - local url="$2" - local data="$3" - local response - local http_code - - if [ -n "$data" ]; then - response=$(curl -s -w "\n%{http_code}" -X "$method" \ - -H "X-Makima-Tool-Key: $API_KEY" \ - -H "Content-Type: application/json" \ - -d "$data" \ - "$url") - else - response=$(curl -s -w "\n%{http_code}" -X "$method" \ - -H "X-Makima-Tool-Key: $API_KEY" \ - "$url") - fi - - # Extract HTTP code (last line) and body (everything else) - http_code=$(echo "$response" | tail -n1) - body=$(echo "$response" | sed '$d') - - # Check for curl errors or non-2xx status - if [ "$http_code" -lt 200 ] || [ "$http_code" -ge 300 ]; then - echo "Error: API request failed with HTTP $http_code" >&2 - echo "URL: $url" >&2 - echo "Response: $body" >&2 - echo "$body" - return 1 - fi - - echo "$body" - return 0 -} - -case "$1" in - list) - api_call GET "$API_URL/api/v1/mesh/tasks/$TASK_ID/subtasks" - ;; - create) - # Parse arguments: create "name" "plan" [--continue-from <task_id>] [--files "file1,file2"] - if [ -z "$2" ] || [ -z "$3" ]; then - echo "Usage: $0 create \"<name>\" \"<plan>\" [--continue-from <task_id>] [--files \"file1,file2\"]" >&2 - exit 1 - fi - NAME="$2" - PLAN="$3" - CONTINUE_FROM="" - COPY_FILES="" - - # Parse optional flags (can be in any order after name and plan) - shift 3 - while [ $# -gt 0 ]; do - case "$1" in - --continue-from) - CONTINUE_FROM="$2" - shift 2 - ;; - --files) - COPY_FILES="$2" - shift 2 - ;; - *) - echo "Unknown option: $1" >&2 - exit 1 - ;; - esac - done - - # Escape quotes in name and plan for JSON - NAME_ESCAPED=$(echo "$NAME" | sed 's/"/\\"/g' | sed 's/\\/\\\\/g') - PLAN_ESCAPED=$(echo "$PLAN" | sed 's/"/\\"/g' | sed 's/\\/\\\\/g') - - # Build JSON body - JSON_BODY="{\"name\":\"$NAME_ESCAPED\",\"plan\":\"$PLAN_ESCAPED\",\"parentTaskId\":\"$TASK_ID\"" - - if [ -n "$CONTINUE_FROM" ]; then - echo "Creating subtask: $NAME (continuing from $CONTINUE_FROM)..." >&2 - JSON_BODY="$JSON_BODY,\"continueFromTaskId\":\"$CONTINUE_FROM\"" - else - echo "Creating subtask: $NAME..." >&2 - fi - - if [ -n "$COPY_FILES" ]; then - # Convert comma-separated file list to JSON array - FILES_JSON="[" - first=true - IFS=',' read -ra FILE_ARRAY <<< "$COPY_FILES" - for file in "${FILE_ARRAY[@]}"; do - file=$(echo "$file" | xargs) # trim whitespace - if [ "$first" = true ]; then - FILES_JSON="$FILES_JSON\"$file\"" - first=false - else - FILES_JSON="$FILES_JSON,\"$file\"" - fi - done - FILES_JSON="$FILES_JSON]" - JSON_BODY="$JSON_BODY,\"copyFiles\":$FILES_JSON" - echo " (copying files: $COPY_FILES)" >&2 - fi - - JSON_BODY="$JSON_BODY}" - api_call POST "$API_URL/api/v1/mesh/tasks" "$JSON_BODY" - ;; - start) - if [ -z "$2" ]; then - echo "Usage: $0 start <subtask_id>" >&2 - exit 1 - fi - echo "Starting subtask $2..." >&2 - api_call POST "$API_URL/api/v1/mesh/tasks/$2/start" - ;; - stop) - if [ -z "$2" ]; then - echo "Usage: $0 stop <subtask_id>" >&2 - exit 1 - fi - api_call POST "$API_URL/api/v1/mesh/tasks/$2/stop" - ;; - status) - if [ -z "$2" ]; then - echo "Usage: $0 status <subtask_id>" >&2 - exit 1 - fi - api_call GET "$API_URL/api/v1/mesh/tasks/$2" - ;; - output) - if [ -z "$2" ]; then - echo "Usage: $0 output <subtask_id>" >&2 - exit 1 - fi - api_call GET "$API_URL/api/v1/mesh/tasks/$2/output" - ;; - worktree) - if [ -z "$2" ]; then - echo "Usage: $0 worktree <subtask_id>" >&2 - exit 1 - fi - # Get the worktree path from the task's overlayPath field via API - TASK_JSON=$(api_call GET "$API_URL/api/v1/mesh/tasks/$2") - if [ $? -ne 0 ]; then - echo "Error: Failed to get task info" >&2 - exit 1 - fi - WORKTREE_PATH=$(echo "$TASK_JSON" | grep -o '"overlayPath":"[^"]*"' | cut -d'"' -f4) - if [ -z "$WORKTREE_PATH" ]; then - echo "Error: Task has no worktree path (may not have started yet)" >&2 - exit 1 - fi - if [ -d "$WORKTREE_PATH" ]; then - echo "$WORKTREE_PATH" - else - echo "Error: Worktree not found at $WORKTREE_PATH" >&2 - echo "The worktree may have been cleaned up." >&2 - exit 1 - fi - ;; - done) - SUMMARY="${2:-Task completed}" - api_call PUT "$API_URL/api/v1/mesh/tasks/$TASK_ID" "{\"status\":\"done\",\"progressSummary\":\"$SUMMARY\"}" - ;; - *) - echo "Makima Orchestrator Helper" - echo "" - echo "Usage: $0 <command> [args...]" - echo "" - echo "Subtask Commands:" - echo " list List all subtasks and their status" - echo " create \"<name>\" \"<plan>\" Create a new subtask" - echo " create \"...\" --continue-from ID Create subtask continuing from another task's worktree" - echo " create \"...\" --files \"file1,file2\" Copy specific files from parent (orchestrator) worktree" - echo " start <subtask_id> Start a subtask" - echo " stop <subtask_id> Stop a running subtask" - echo " status <subtask_id> Get detailed subtask status" - echo " output <subtask_id> Get subtask output history" - echo " worktree <subtask_id> Get path to subtask's worktree" - echo "" - echo "Completion:" - echo " done [summary] Mark orchestrator as complete" - echo "" - echo "Examples:" - echo " create \"Fix bug\" \"Fix the null check bug\" --files \"PLAN.md\"" - echo " create \"Step 2\" \"Continue work\" --continue-from abc123 --files \"shared.rs,types.rs\"" - ;; -esac -"#; - -/// Tracks merge state for an orchestrator task. -#[derive(Default)] -struct MergeTracker { - /// Subtask branches that have been successfully merged. - merged_subtasks: HashSet<Uuid>, - /// Subtask branches that were explicitly skipped (with reason). - skipped_subtasks: HashMap<Uuid, String>, -} - -/// Managed task information. -pub struct ManagedTask { - /// Task ID. - pub id: Uuid, - /// Current state. - pub state: TaskState, - /// Worktree info if created. - pub worktree: Option<WorktreeInfo>, - /// Task plan. - pub plan: String, - /// Repository URL or path. - pub repo_source: Option<String>, - /// Base branch. - pub base_branch: Option<String>, - /// Target branch to merge into. - pub target_branch: Option<String>, - /// Parent task ID if this is a subtask. - pub parent_task_id: Option<Uuid>, - /// Depth in task hierarchy (0=top-level, 1=subtask, 2=sub-subtask). - pub depth: i32, - /// Whether this task runs as an orchestrator (coordinates subtasks). - pub is_orchestrator: bool, - /// Path to target repository for completion actions. - pub target_repo_path: Option<String>, - /// Completion action: "none", "branch", "merge", "pr". - pub completion_action: Option<String>, - /// Task ID to continue from (copy worktree from this task). - pub continue_from_task_id: Option<Uuid>, - /// Files to copy from parent task's worktree. - pub copy_files: Option<Vec<String>>, - /// Time task was created. - pub created_at: Instant, - /// Time task started running. - pub started_at: Option<Instant>, - /// Time task completed. - pub completed_at: Option<Instant>, - /// Error message if failed. - pub error: Option<String>, -} - -/// Configuration for task execution. -#[derive(Clone)] -pub struct TaskConfig { - /// Maximum concurrent tasks. - pub max_concurrent_tasks: u32, - /// Base directory for worktrees. - pub worktree_base_dir: PathBuf, - /// Environment variables to pass to Claude. - pub env_vars: HashMap<String, String>, - /// Claude command path. - pub claude_command: String, - /// Additional arguments to pass to Claude Code. - pub claude_args: Vec<String>, - /// Arguments to pass before defaults. - pub claude_pre_args: Vec<String>, - /// Enable Claude's permission system. - pub enable_permissions: bool, - /// Disable verbose output. - pub disable_verbose: bool, -} - -impl Default for TaskConfig { - fn default() -> Self { - Self { - max_concurrent_tasks: 4, - worktree_base_dir: WorktreeManager::default_base_dir(), - env_vars: HashMap::new(), - claude_command: "claude".to_string(), - claude_args: Vec::new(), - claude_pre_args: Vec::new(), - enable_permissions: false, - disable_verbose: false, - } - } -} - -/// Task manager for handling task lifecycle. -pub struct TaskManager { - /// Worktree manager. - worktree_manager: Arc<WorktreeManager>, - /// Process manager. - process_manager: Arc<ProcessManager>, - /// Temp directory manager. - temp_manager: Arc<TempManager>, - /// Task configuration. - #[allow(dead_code)] - config: TaskConfig, - /// Active tasks. - tasks: Arc<RwLock<HashMap<Uuid, ManagedTask>>>, - /// Channel to send messages to server. - ws_tx: mpsc::Sender<DaemonMessage>, - /// Semaphore for limiting concurrent tasks. - semaphore: Arc<Semaphore>, - /// Channels for sending input to running tasks. - /// Each sender allows sending messages to the stdin of a running Claude process. - task_inputs: Arc<RwLock<HashMap<Uuid, mpsc::Sender<String>>>>, - /// Tracks merge state per orchestrator task (for completion gate). - merge_trackers: Arc<RwLock<HashMap<Uuid, MergeTracker>>>, -} - -impl TaskManager { - /// Create a new task manager. - pub fn new(config: TaskConfig, ws_tx: mpsc::Sender<DaemonMessage>) -> Self { - let max_concurrent = config.max_concurrent_tasks as usize; - let worktree_manager = Arc::new(WorktreeManager::new(config.worktree_base_dir.clone())); - let process_manager = Arc::new( - ProcessManager::with_command(config.claude_command.clone()) - .with_args(config.claude_args.clone()) - .with_pre_args(config.claude_pre_args.clone()) - .with_permissions_enabled(config.enable_permissions) - .with_verbose_disabled(config.disable_verbose) - .with_env(config.env_vars.clone()), - ); - let temp_manager = Arc::new(TempManager::new()); - - Self { - worktree_manager, - process_manager, - temp_manager, - config, - tasks: Arc::new(RwLock::new(HashMap::new())), - ws_tx, - semaphore: Arc::new(Semaphore::new(max_concurrent)), - task_inputs: Arc::new(RwLock::new(HashMap::new())), - merge_trackers: Arc::new(RwLock::new(HashMap::new())), - } - } - - /// Handle a command from the server. - pub async fn handle_command(&self, command: DaemonCommand) -> Result<(), DaemonError> { - tracing::info!("Received command from server: {:?}", command); - - match command { - DaemonCommand::SpawnTask { - task_id, - task_name, - plan, - repo_url, - base_branch, - target_branch, - parent_task_id, - depth, - is_orchestrator, - target_repo_path, - completion_action, - continue_from_task_id, - copy_files, - } => { - tracing::info!( - task_id = %task_id, - task_name = %task_name, - repo_url = ?repo_url, - base_branch = ?base_branch, - target_branch = ?target_branch, - parent_task_id = ?parent_task_id, - depth = depth, - is_orchestrator = is_orchestrator, - target_repo_path = ?target_repo_path, - completion_action = ?completion_action, - continue_from_task_id = ?continue_from_task_id, - copy_files = ?copy_files, - plan_len = plan.len(), - "Spawning new task" - ); - self.spawn_task( - task_id, task_name, plan, repo_url, base_branch, target_branch, - parent_task_id, depth, is_orchestrator, - target_repo_path, completion_action, continue_from_task_id, - copy_files - ).await?; - } - DaemonCommand::PauseTask { task_id } => { - tracing::info!(task_id = %task_id, "Pause not supported for subprocess tasks"); - // Subprocesses don't support pause, just log and ignore - } - DaemonCommand::ResumeTask { task_id } => { - tracing::info!(task_id = %task_id, "Resume not supported for subprocess tasks"); - // Subprocesses don't support resume, just log and ignore - } - DaemonCommand::InterruptTask { task_id, graceful: _ } => { - tracing::info!(task_id = %task_id, "Interrupting task"); - self.interrupt_task(task_id).await?; - } - DaemonCommand::SendMessage { task_id, message } => { - tracing::info!(task_id = %task_id, message_len = message.len(), "Sending message to task"); - // Send message to the task's stdin via the input channel - let inputs = self.task_inputs.read().await; - if let Some(sender) = inputs.get(&task_id) { - if let Err(e) = sender.send(message).await { - tracing::warn!(task_id = %task_id, error = %e, "Failed to send message to task input channel"); - } else { - tracing::info!(task_id = %task_id, "Message sent to task successfully"); - } - } else { - tracing::warn!(task_id = %task_id, "No input channel for task (task may not be running)"); - } - } - DaemonCommand::InjectSiblingContext { task_id, .. } => { - tracing::debug!(task_id = %task_id, "Sibling context injection not supported for subprocess tasks"); - } - DaemonCommand::Authenticated { daemon_id } => { - tracing::debug!(daemon_id = %daemon_id, "Authenticated command (handled by WS client)"); - } - DaemonCommand::Error { code, message } => { - tracing::warn!(code = %code, message = %message, "Error command from server"); - } - - // ========================================================================= - // Merge Commands - // ========================================================================= - - DaemonCommand::ListBranches { task_id } => { - tracing::info!(task_id = %task_id, "Listing task branches"); - self.handle_list_branches(task_id).await?; - } - DaemonCommand::MergeStart { task_id, source_branch } => { - tracing::info!(task_id = %task_id, source_branch = %source_branch, "Starting merge"); - self.handle_merge_start(task_id, source_branch).await?; - } - DaemonCommand::MergeStatus { task_id } => { - tracing::info!(task_id = %task_id, "Getting merge status"); - self.handle_merge_status(task_id).await?; - } - DaemonCommand::MergeResolve { task_id, file, strategy } => { - tracing::info!(task_id = %task_id, file = %file, strategy = %strategy, "Resolving conflict"); - self.handle_merge_resolve(task_id, file, strategy).await?; - } - DaemonCommand::MergeCommit { task_id, message } => { - tracing::info!(task_id = %task_id, "Committing merge"); - self.handle_merge_commit(task_id, message).await?; - } - DaemonCommand::MergeAbort { task_id } => { - tracing::info!(task_id = %task_id, "Aborting merge"); - self.handle_merge_abort(task_id).await?; - } - DaemonCommand::MergeSkip { task_id, subtask_id, reason } => { - tracing::info!(task_id = %task_id, subtask_id = %subtask_id, reason = %reason, "Skipping subtask merge"); - self.handle_merge_skip(task_id, subtask_id, reason).await?; - } - DaemonCommand::CheckMergeComplete { task_id } => { - tracing::info!(task_id = %task_id, "Checking merge completion"); - self.handle_check_merge_complete(task_id).await?; - } - - // ========================================================================= - // Completion Action Commands - // ========================================================================= - - DaemonCommand::RetryCompletionAction { - task_id, - task_name, - action, - target_repo_path, - target_branch, - } => { - tracing::info!( - task_id = %task_id, - task_name = %task_name, - action = %action, - target_repo_path = %target_repo_path, - target_branch = ?target_branch, - "Retrying completion action" - ); - self.handle_retry_completion_action(task_id, task_name, action, target_repo_path, target_branch).await?; - } - - DaemonCommand::CloneWorktree { task_id, target_dir } => { - tracing::info!( - task_id = %task_id, - target_dir = %target_dir, - "Cloning worktree to target directory" - ); - self.handle_clone_worktree(task_id, target_dir).await?; - } - - DaemonCommand::CheckTargetExists { task_id, target_dir } => { - tracing::debug!( - task_id = %task_id, - target_dir = %target_dir, - "Checking if target directory exists" - ); - self.handle_check_target_exists(task_id, target_dir).await?; - } - } - Ok(()) - } - - /// Spawn a new task. - #[allow(clippy::too_many_arguments)] - pub async fn spawn_task( - &self, - task_id: Uuid, - task_name: String, - plan: String, - repo_url: Option<String>, - base_branch: Option<String>, - target_branch: Option<String>, - parent_task_id: Option<Uuid>, - depth: i32, - is_orchestrator: bool, - target_repo_path: Option<String>, - completion_action: Option<String>, - continue_from_task_id: Option<Uuid>, - copy_files: Option<Vec<String>>, - ) -> TaskResult<()> { - tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, depth = depth, "=== SPAWN_TASK START ==="); - - // Check if task already exists - allow re-spawning if in terminal state - { - let mut tasks = self.tasks.write().await; - if let Some(existing) = tasks.get(&task_id) { - if existing.state.is_terminal() { - // Task exists but is in terminal state (completed, failed, interrupted) - // Remove it so we can re-spawn - tracing::info!(task_id = %task_id, old_state = ?existing.state, "Removing terminated task to allow re-spawn"); - tasks.remove(&task_id); - } else { - // Task is still active, reject - tracing::warn!(task_id = %task_id, state = ?existing.state, "Task already exists and is active, rejecting spawn"); - return Err(TaskError::AlreadyExists(task_id)); - } - } - } - - // Acquire semaphore permit - tracing::info!(task_id = %task_id, "Acquiring concurrency permit..."); - let permit = self - .semaphore - .clone() - .try_acquire_owned() - .map_err(|_| { - tracing::warn!(task_id = %task_id, "Concurrency limit reached, cannot spawn task"); - TaskError::ConcurrencyLimit - })?; - tracing::info!(task_id = %task_id, "Concurrency permit acquired"); - - // Create task entry - tracing::info!(task_id = %task_id, "Creating task entry in state: Initializing"); - let task = ManagedTask { - id: task_id, - state: TaskState::Initializing, - worktree: None, - plan: plan.clone(), - repo_source: repo_url.clone(), - base_branch: base_branch.clone(), - target_branch: target_branch.clone(), - parent_task_id, - depth, - is_orchestrator, - target_repo_path: target_repo_path.clone(), - completion_action: completion_action.clone(), - continue_from_task_id, - copy_files: copy_files.clone(), - created_at: Instant::now(), - started_at: None, - completed_at: None, - error: None, - }; - - self.tasks.write().await.insert(task_id, task); - tracing::info!(task_id = %task_id, "Task entry created and stored"); - - // Notify server of status change - tracing::info!(task_id = %task_id, "Notifying server: pending -> initializing"); - self.send_status_change(task_id, "pending", "initializing").await; - - // Spawn task in background - tracing::info!(task_id = %task_id, "Spawning background task runner"); - let inner = self.clone_inner(); - tokio::spawn(async move { - let _permit = permit; // Hold permit until done - tracing::info!(task_id = %task_id, "Background task runner started"); - - if let Err(e) = inner.run_task( - task_id, task_name, plan, repo_url, base_branch, target_branch, - is_orchestrator, target_repo_path, completion_action, - continue_from_task_id, copy_files - ).await { - tracing::error!(task_id = %task_id, error = %e, "Task execution failed"); - inner.mark_failed(task_id, &e.to_string()).await; - } - tracing::info!(task_id = %task_id, "Background task runner completed"); - }); - - tracing::info!(task_id = %task_id, "=== SPAWN_TASK END (task running in background) ==="); - Ok(()) - } - - /// Clone inner state for spawned tasks. - fn clone_inner(&self) -> TaskManagerInner { - TaskManagerInner { - worktree_manager: self.worktree_manager.clone(), - process_manager: self.process_manager.clone(), - temp_manager: self.temp_manager.clone(), - tasks: self.tasks.clone(), - ws_tx: self.ws_tx.clone(), - task_inputs: self.task_inputs.clone(), - } - } - - /// Interrupt a task. - pub async fn interrupt_task(&self, task_id: Uuid) -> TaskResult<()> { - let mut tasks = self.tasks.write().await; - let task = tasks.get_mut(&task_id).ok_or(TaskError::NotFound(task_id))?; - - if task.state.is_terminal() { - return Ok(()); // Already done - } - - let old_state = task.state; - task.state = TaskState::Interrupted; - task.completed_at = Some(Instant::now()); - - // Notify server - drop(tasks); - self.send_status_change(task_id, old_state.as_str(), "interrupted").await; - - // Note: The process will be killed when the ClaudeProcess is dropped - // Worktrees are kept until explicitly deleted - - Ok(()) - } - - /// Get list of active task IDs. - pub async fn active_task_ids(&self) -> Vec<Uuid> { - self.tasks - .read() - .await - .iter() - .filter(|(_, t)| t.state.is_active()) - .map(|(id, _)| *id) - .collect() - } - - /// Get task state. - pub async fn get_task_state(&self, task_id: Uuid) -> Option<TaskState> { - self.tasks.read().await.get(&task_id).map(|t| t.state) - } - - /// Send status change notification to server. - async fn send_status_change(&self, task_id: Uuid, old_status: &str, new_status: &str) { - let msg = DaemonMessage::task_status_change(task_id, old_status, new_status); - let _ = self.ws_tx.send(msg).await; - } - - // ========================================================================= - // Merge Handler Methods - // ========================================================================= - - /// Get worktree path for a task, or return error if not found. - /// First checks in-memory tasks, then scans the worktrees directory. - async fn get_task_worktree_path(&self, task_id: Uuid) -> Result<std::path::PathBuf, DaemonError> { - // First try to get from in-memory tasks - { - let tasks = self.tasks.read().await; - if let Some(task) = tasks.get(&task_id) { - if let Some(ref worktree) = task.worktree { - return Ok(worktree.path.clone()); - } - } - } - - // Task not in memory - scan worktrees directory for matching task ID - let short_id = &task_id.to_string()[..8]; - let worktrees_dir = self.worktree_manager.base_dir(); - - if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await { - while let Ok(Some(entry)) = entries.next_entry().await { - let name = entry.file_name(); - let name_str = name.to_string_lossy(); - if name_str.starts_with(short_id) { - let path = entry.path(); - // Verify it's a valid git directory - if path.join(".git").exists() { - tracing::info!( - task_id = %task_id, - worktree_path = %path.display(), - "Found worktree by scanning directory" - ); - return Ok(path); - } - } - } - } - - Err(DaemonError::Task(TaskError::SetupFailed( - format!("No worktree found for task {}. The worktree may have been cleaned up.", task_id) - ))) - } - - /// Handle ListBranches command. - async fn handle_list_branches(&self, task_id: Uuid) -> Result<(), DaemonError> { - let worktree_path = self.get_task_worktree_path(task_id).await?; - - match self.worktree_manager.list_task_branches(&worktree_path).await { - Ok(branches) => { - let branch_infos: Vec<BranchInfo> = branches - .into_iter() - .map(|b| BranchInfo { - name: b.name, - task_id: b.task_id, - is_merged: b.is_merged, - last_commit: b.last_commit, - last_commit_message: b.last_commit_message, - }) - .collect(); - - let msg = DaemonMessage::BranchList { - task_id, - branches: branch_infos, - }; - let _ = self.ws_tx.send(msg).await; - } - Err(e) => { - tracing::error!(task_id = %task_id, error = %e, "Failed to list branches"); - let msg = DaemonMessage::MergeResult { - task_id, - success: false, - message: e.to_string(), - commit_sha: None, - conflicts: None, - }; - let _ = self.ws_tx.send(msg).await; - } - } - Ok(()) - } - - /// Handle MergeStart command. - async fn handle_merge_start(&self, task_id: Uuid, source_branch: String) -> Result<(), DaemonError> { - let worktree_path = self.get_task_worktree_path(task_id).await?; - - match self.worktree_manager.merge_branch(&worktree_path, &source_branch).await { - Ok(None) => { - // Merge succeeded without conflicts - let msg = DaemonMessage::MergeResult { - task_id, - success: true, - message: "Merge completed without conflicts".to_string(), - commit_sha: None, - conflicts: None, - }; - let _ = self.ws_tx.send(msg).await; - } - Ok(Some(conflicts)) => { - // Merge has conflicts - let msg = DaemonMessage::MergeResult { - task_id, - success: false, - message: format!("Merge has {} conflicts", conflicts.len()), - commit_sha: None, - conflicts: Some(conflicts), - }; - let _ = self.ws_tx.send(msg).await; - } - Err(e) => { - let msg = DaemonMessage::MergeResult { - task_id, - success: false, - message: e.to_string(), - commit_sha: None, - conflicts: None, - }; - let _ = self.ws_tx.send(msg).await; - } - } - Ok(()) - } - - /// Handle MergeStatus command. - async fn handle_merge_status(&self, task_id: Uuid) -> Result<(), DaemonError> { - let worktree_path = self.get_task_worktree_path(task_id).await?; - - match self.worktree_manager.get_merge_state(&worktree_path).await { - Ok(state) => { - let msg = DaemonMessage::MergeStatusResponse { - task_id, - in_progress: state.in_progress, - source_branch: if state.in_progress { Some(state.source_branch) } else { None }, - conflicted_files: state.conflicted_files, - }; - let _ = self.ws_tx.send(msg).await; - } - Err(e) => { - tracing::error!(task_id = %task_id, error = %e, "Failed to get merge status"); - let msg = DaemonMessage::MergeStatusResponse { - task_id, - in_progress: false, - source_branch: None, - conflicted_files: vec![], - }; - let _ = self.ws_tx.send(msg).await; - } - } - Ok(()) - } - - /// Handle MergeResolve command. - async fn handle_merge_resolve(&self, task_id: Uuid, file: String, strategy: String) -> Result<(), DaemonError> { - let worktree_path = self.get_task_worktree_path(task_id).await?; - - let resolution = match strategy.to_lowercase().as_str() { - "ours" => ConflictResolution::Ours, - "theirs" => ConflictResolution::Theirs, - _ => { - let msg = DaemonMessage::MergeResult { - task_id, - success: false, - message: format!("Invalid strategy '{}', must be 'ours' or 'theirs'", strategy), - commit_sha: None, - conflicts: None, - }; - let _ = self.ws_tx.send(msg).await; - return Ok(()); - } - }; - - match self.worktree_manager.resolve_conflict(&worktree_path, &file, resolution).await { - Ok(()) => { - let msg = DaemonMessage::MergeResult { - task_id, - success: true, - message: format!("Resolved conflict in {}", file), - commit_sha: None, - conflicts: None, - }; - let _ = self.ws_tx.send(msg).await; - } - Err(e) => { - let msg = DaemonMessage::MergeResult { - task_id, - success: false, - message: e.to_string(), - commit_sha: None, - conflicts: None, - }; - let _ = self.ws_tx.send(msg).await; - } - } - Ok(()) - } - - /// Handle MergeCommit command. - async fn handle_merge_commit(&self, task_id: Uuid, message: String) -> Result<(), DaemonError> { - let worktree_path = self.get_task_worktree_path(task_id).await?; - - match self.worktree_manager.commit_merge(&worktree_path, &message).await { - Ok(commit_sha) => { - // Track this merge as completed (extract subtask ID from branch if possible) - // For now, we'll track it when MergeSkip is called or based on branch names - let msg = DaemonMessage::MergeResult { - task_id, - success: true, - message: "Merge committed successfully".to_string(), - commit_sha: Some(commit_sha), - conflicts: None, - }; - let _ = self.ws_tx.send(msg).await; - } - Err(e) => { - let msg = DaemonMessage::MergeResult { - task_id, - success: false, - message: e.to_string(), - commit_sha: None, - conflicts: None, - }; - let _ = self.ws_tx.send(msg).await; - } - } - Ok(()) - } - - /// Handle MergeAbort command. - async fn handle_merge_abort(&self, task_id: Uuid) -> Result<(), DaemonError> { - let worktree_path = self.get_task_worktree_path(task_id).await?; - - match self.worktree_manager.abort_merge(&worktree_path).await { - Ok(()) => { - let msg = DaemonMessage::MergeResult { - task_id, - success: true, - message: "Merge aborted".to_string(), - commit_sha: None, - conflicts: None, - }; - let _ = self.ws_tx.send(msg).await; - } - Err(e) => { - let msg = DaemonMessage::MergeResult { - task_id, - success: false, - message: e.to_string(), - commit_sha: None, - conflicts: None, - }; - let _ = self.ws_tx.send(msg).await; - } - } - Ok(()) - } - - /// Handle MergeSkip command. - async fn handle_merge_skip(&self, task_id: Uuid, subtask_id: Uuid, reason: String) -> Result<(), DaemonError> { - // Record that this subtask was skipped - { - let mut trackers = self.merge_trackers.write().await; - let tracker = trackers.entry(task_id).or_insert_with(MergeTracker::default); - tracker.skipped_subtasks.insert(subtask_id, reason.clone()); - } - - let msg = DaemonMessage::MergeResult { - task_id, - success: true, - message: format!("Subtask {} skipped: {}", subtask_id, reason), - commit_sha: None, - conflicts: None, - }; - let _ = self.ws_tx.send(msg).await; - Ok(()) - } - - /// Handle CheckMergeComplete command. - async fn handle_check_merge_complete(&self, task_id: Uuid) -> Result<(), DaemonError> { - let worktree_path = self.get_task_worktree_path(task_id).await?; - - // Get all task branches - let branches = match self.worktree_manager.list_task_branches(&worktree_path).await { - Ok(b) => b, - Err(e) => { - let msg = DaemonMessage::MergeCompleteCheck { - task_id, - can_complete: false, - unmerged_branches: vec![format!("Error listing branches: {}", e)], - merged_count: 0, - skipped_count: 0, - }; - let _ = self.ws_tx.send(msg).await; - return Ok(()); - } - }; - - // Get tracker state - let trackers = self.merge_trackers.read().await; - let empty_merged: HashSet<Uuid> = HashSet::new(); - let empty_skipped: HashMap<Uuid, String> = HashMap::new(); - let tracker = trackers.get(&task_id); - let merged_set = tracker.map(|t| &t.merged_subtasks).unwrap_or(&empty_merged); - let skipped_set = tracker.map(|t| &t.skipped_subtasks).unwrap_or(&empty_skipped); - - let mut merged_count = 0u32; - let mut skipped_count = 0u32; - let mut unmerged_branches = Vec::new(); - - for branch in &branches { - if branch.is_merged { - merged_count += 1; - } else if let Some(subtask_id) = branch.task_id { - if merged_set.contains(&subtask_id) { - merged_count += 1; - } else if skipped_set.contains_key(&subtask_id) { - skipped_count += 1; - } else { - unmerged_branches.push(branch.name.clone()); - } - } else { - // Branch without task ID - check if it's merged - unmerged_branches.push(branch.name.clone()); - } - } - - let can_complete = unmerged_branches.is_empty(); - - let msg = DaemonMessage::MergeCompleteCheck { - task_id, - can_complete, - unmerged_branches, - merged_count, - skipped_count, - }; - let _ = self.ws_tx.send(msg).await; - Ok(()) - } - - /// Mark a subtask as merged in the tracker. - #[allow(dead_code)] - pub async fn mark_subtask_merged(&self, orchestrator_task_id: Uuid, subtask_id: Uuid) { - let mut trackers = self.merge_trackers.write().await; - let tracker = trackers.entry(orchestrator_task_id).or_insert_with(MergeTracker::default); - tracker.merged_subtasks.insert(subtask_id); - } - - // ========================================================================= - // Completion Action Handler Methods - // ========================================================================= - - /// Handle RetryCompletionAction command. - async fn handle_retry_completion_action( - &self, - task_id: Uuid, - task_name: String, - action: String, - target_repo_path: String, - target_branch: Option<String>, - ) -> Result<(), DaemonError> { - // Get the task's worktree path - let worktree_path = self.get_task_worktree_path(task_id).await?; - - // Execute the completion action - let inner = self.clone_inner(); - let result = inner.execute_completion_action( - task_id, - &task_name, - &worktree_path, - &action, - Some(target_repo_path.as_str()), - target_branch.as_deref(), - ).await; - - // Send result back to server - let msg = match result { - Ok(pr_url) => DaemonMessage::CompletionActionResult { - task_id, - success: true, - message: match action.as_str() { - "branch" => format!("Branch pushed to {}", target_repo_path), - "merge" => format!("Merged into {}", target_branch.as_deref().unwrap_or("main")), - "pr" => format!("Pull request created"), - _ => format!("Completion action '{}' executed", action), - }, - pr_url, - }, - Err(e) => DaemonMessage::CompletionActionResult { - task_id, - success: false, - message: e, - pr_url: None, - }, - }; - let _ = self.ws_tx.send(msg).await; - Ok(()) - } - - /// Handle CloneWorktree command. - async fn handle_clone_worktree( - &self, - task_id: Uuid, - target_dir: String, - ) -> Result<(), DaemonError> { - // Get the task's worktree path - let worktree_path = self.get_task_worktree_path(task_id).await?; - - // Expand tilde in target path - let target_path = crate::worktree::expand_tilde(&target_dir); - - // Clone the worktree to target directory - let result = self.worktree_manager.clone_worktree_to_directory( - &worktree_path, - &target_path, - ).await; - - // Send result back to server - let msg = match result { - Ok(message) => DaemonMessage::CloneWorktreeResult { - task_id, - success: true, - message, - target_dir: Some(target_path.to_string_lossy().to_string()), - }, - Err(e) => DaemonMessage::CloneWorktreeResult { - task_id, - success: false, - message: e.to_string(), - target_dir: None, - }, - }; - let _ = self.ws_tx.send(msg).await; - Ok(()) - } - - /// Handle CheckTargetExists command. - async fn handle_check_target_exists( - &self, - task_id: Uuid, - target_dir: String, - ) -> Result<(), DaemonError> { - // Expand tilde in target path - let target_path = crate::worktree::expand_tilde(&target_dir); - - // Check if target exists - let exists = self.worktree_manager.target_directory_exists(&target_path).await; - - // Send result back to server - let msg = DaemonMessage::CheckTargetExistsResult { - task_id, - exists, - target_dir: target_path.to_string_lossy().to_string(), - }; - let _ = self.ws_tx.send(msg).await; - Ok(()) - } -} - -/// Inner state for spawned tasks (cloneable). -struct TaskManagerInner { - worktree_manager: Arc<WorktreeManager>, - process_manager: Arc<ProcessManager>, - temp_manager: Arc<TempManager>, - tasks: Arc<RwLock<HashMap<Uuid, ManagedTask>>>, - ws_tx: mpsc::Sender<DaemonMessage>, - task_inputs: Arc<RwLock<HashMap<Uuid, mpsc::Sender<String>>>>, -} - -impl TaskManagerInner { - /// Run a task to completion. - #[allow(clippy::too_many_arguments)] - async fn run_task( - &self, - task_id: Uuid, - task_name: String, - plan: String, - repo_source: Option<String>, - base_branch: Option<String>, - target_branch: Option<String>, - is_orchestrator: bool, - target_repo_path: Option<String>, - completion_action: Option<String>, - continue_from_task_id: Option<Uuid>, - copy_files: Option<Vec<String>>, - ) -> Result<(), DaemonError> { - tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, "=== RUN_TASK START ==="); - - // Determine working directory - let working_dir = if let Some(ref source) = repo_source { - if is_new_repo_request(source) { - // Explicit new repo request: new:// or new://project-name - tracing::info!( - task_id = %task_id, - source = %source, - "Creating new git repository" - ); - - let msg = DaemonMessage::task_output( - task_id, - format!("Initializing new git repository...\n"), - false, - ); - let _ = self.ws_tx.send(msg).await; - - let worktree_info = self.worktree_manager - .init_new_repo(task_id, source) - .await - .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?; - - tracing::info!( - task_id = %task_id, - path = %worktree_info.path.display(), - "New repository created" - ); - - // Store worktree info - { - let mut tasks = self.tasks.write().await; - if let Some(task) = tasks.get_mut(&task_id) { - task.worktree = Some(worktree_info.clone()); - } - } - - let msg = DaemonMessage::task_output( - task_id, - format!("Repository ready at {}\n", worktree_info.path.display()), - false, - ); - let _ = self.ws_tx.send(msg).await; - - worktree_info.path - } else { - // Send progress message - let msg = DaemonMessage::task_output( - task_id, - format!("Setting up worktree from {}...\n", source), - false, - ); - let _ = self.ws_tx.send(msg).await; - - // Ensure source repo exists (clone if URL, verify if path) - let source_repo = self.worktree_manager.ensure_repo(source).await - .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?; - - // Detect or use provided base branch - let branch = if let Some(ref b) = base_branch { - b.clone() - } else { - self.worktree_manager.detect_default_branch(&source_repo).await - .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))? - }; - - tracing::info!( - task_id = %task_id, - source = %source, - branch = %branch, - continue_from_task_id = ?continue_from_task_id, - "Setting up worktree" - ); - - // Create worktree - either from scratch or copying from another task - let task_name = format!("task-{}", &task_id.to_string()[..8]); - let worktree_info = if let Some(from_task_id) = continue_from_task_id { - // Find the source task's worktree path - let source_worktree = self.find_worktree_for_task(from_task_id).await - .map_err(|e| DaemonError::Task(TaskError::SetupFailed( - format!("Cannot continue from task {}: {}", from_task_id, e) - )))?; - - let msg = DaemonMessage::task_output( - task_id, - format!("Continuing from task {} worktree...\n", &from_task_id.to_string()[..8]), - false, - ); - let _ = self.ws_tx.send(msg).await; - - // Create worktree by copying from source task - self.worktree_manager - .create_worktree_from_task(&source_worktree, task_id, &task_name) - .await - .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))? - } else { - // Create fresh worktree from repo - self.worktree_manager - .create_worktree(&source_repo, task_id, &task_name, &branch) - .await - .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))? - }; - - tracing::info!( - task_id = %task_id, - worktree_path = %worktree_info.path.display(), - branch = %worktree_info.branch, - continued_from = ?continue_from_task_id, - "Worktree created" - ); - - // Store worktree info - { - let mut tasks = self.tasks.write().await; - if let Some(task) = tasks.get_mut(&task_id) { - task.worktree = Some(worktree_info.clone()); - } - } - - let msg = DaemonMessage::task_output( - task_id, - format!("Worktree ready at {}\n", worktree_info.path.display()), - false, - ); - let _ = self.ws_tx.send(msg).await; - - worktree_info.path - } - } else { - // No repo specified - use managed temp directory in ~/.makima/temp/ - tracing::info!(task_id = %task_id, "Creating managed temp directory (no repo)"); - - let msg = DaemonMessage::task_output( - task_id, - "Creating temporary working directory...\n".to_string(), - false, - ); - let _ = self.ws_tx.send(msg).await; - - let temp_dir = self.temp_manager.create_task_dir(task_id).await?; - - let msg = DaemonMessage::task_output( - task_id, - format!("Working directory ready at {}\n", temp_dir.display()), - false, - ); - let _ = self.ws_tx.send(msg).await; - - temp_dir - }; - - // Copy files from parent task's worktree if specified - if let Some(ref files) = copy_files { - if !files.is_empty() { - // Get the parent task ID to find its worktree - let parent_task_id = { - let tasks = self.tasks.read().await; - tasks.get(&task_id).and_then(|t| t.parent_task_id) - }; - - if let Some(parent_id) = parent_task_id { - match self.find_worktree_for_task(parent_id).await { - Ok(parent_worktree) => { - let msg = DaemonMessage::task_output( - task_id, - format!("Copying {} files from orchestrator...\n", files.len()), - false, - ); - let _ = self.ws_tx.send(msg).await; - - for file_path in files { - let source = parent_worktree.join(file_path); - let dest = working_dir.join(file_path); - - // Create parent directories if needed - if let Some(parent) = dest.parent() { - if let Err(e) = tokio::fs::create_dir_all(parent).await { - tracing::warn!( - task_id = %task_id, - file = %file_path, - error = %e, - "Failed to create parent directory for file" - ); - continue; - } - } - - // Copy the file - match tokio::fs::copy(&source, &dest).await { - Ok(_) => { - tracing::info!( - task_id = %task_id, - source = %source.display(), - dest = %dest.display(), - "Copied file from orchestrator" - ); - } - Err(e) => { - tracing::warn!( - task_id = %task_id, - source = %source.display(), - dest = %dest.display(), - error = %e, - "Failed to copy file from orchestrator" - ); - // Notify but don't fail - the file might be optional - let msg = DaemonMessage::task_output( - task_id, - format!("Warning: Could not copy {}: {}\n", file_path, e), - false, - ); - let _ = self.ws_tx.send(msg).await; - } - } - } - - let msg = DaemonMessage::task_output( - task_id, - "Files copied from orchestrator.\n".to_string(), - false, - ); - let _ = self.ws_tx.send(msg).await; - } - Err(e) => { - tracing::warn!( - task_id = %task_id, - parent_id = %parent_id, - error = %e, - "Could not find parent task worktree for file copying" - ); - } - } - } else { - tracing::warn!( - task_id = %task_id, - "copy_files specified but no parent_task_id" - ); - } - } - } - - // Update state to Starting - tracing::info!(task_id = %task_id, "Updating state: Initializing -> Starting"); - self.update_state(task_id, TaskState::Starting).await; - self.send_status_change(task_id, "initializing", "starting").await; - - // Check Claude is available - match self.process_manager.check_claude_available().await { - Ok(version) => { - tracing::info!(task_id = %task_id, version = %version, "Claude Code available"); - let msg = DaemonMessage::task_output( - task_id, - format!("Claude Code {} ready\n", version), - false, - ); - let _ = self.ws_tx.send(msg).await; - } - Err(e) => { - let err_msg = format!("Claude Code not available: {}", e); - tracing::error!(task_id = %task_id, error = %err_msg); - return Err(DaemonError::Task(TaskError::SetupFailed(err_msg))); - } - } - - // Set up orchestrator mode if needed - let (extra_env, full_plan) = if is_orchestrator { - tracing::info!(task_id = %task_id, working_dir = %working_dir.display(), "Setting up orchestrator mode"); - - let msg = DaemonMessage::task_output( - task_id, - "Setting up orchestrator environment...\n".to_string(), - false, - ); - let _ = self.ws_tx.send(msg).await; - - // Generate tool key for API access - let tool_key = generate_tool_key(); - tracing::info!(task_id = %task_id, tool_key_len = tool_key.len(), "Generated tool key for orchestrator"); - - // Register tool key with server - let register_msg = DaemonMessage::register_tool_key(task_id, tool_key.clone()); - if self.ws_tx.send(register_msg).await.is_err() { - tracing::warn!(task_id = %task_id, "Failed to register tool key"); - } else { - tracing::info!(task_id = %task_id, "Tool key registration message sent to server"); - } - - // Create .makima directory and helper script - let makima_dir = working_dir.join(".makima"); - if let Err(e) = tokio::fs::create_dir_all(&makima_dir).await { - tracing::warn!(task_id = %task_id, makima_dir = %makima_dir.display(), "Failed to create .makima directory: {}", e); - } else { - tracing::info!(task_id = %task_id, makima_dir = %makima_dir.display(), "Created .makima directory"); - } - - let script_path = makima_dir.join("orchestrate.sh"); - if let Err(e) = tokio::fs::write(&script_path, ORCHESTRATE_SCRIPT).await { - tracing::warn!(task_id = %task_id, script_path = %script_path.display(), "Failed to write orchestrate.sh: {}", e); - } else { - tracing::info!(task_id = %task_id, script_path = %script_path.display(), script_size = ORCHESTRATE_SCRIPT.len(), "Wrote orchestrate.sh"); - // Make script executable - #[cfg(unix)] - { - use std::os::unix::fs::PermissionsExt; - if let Err(e) = std::fs::set_permissions(&script_path, std::fs::Permissions::from_mode(0o755)) { - tracing::warn!(task_id = %task_id, "Failed to set script permissions: {}", e); - } else { - tracing::info!(task_id = %task_id, "Set orchestrate.sh executable (0o755)"); - } - } - } - - // Set up environment variables - let mut env = HashMap::new(); - // TODO: Make API URL configurable - env.insert("MAKIMA_API_URL".to_string(), "http://localhost:8080".to_string()); - env.insert("MAKIMA_API_KEY".to_string(), tool_key.clone()); - env.insert("MAKIMA_TASK_ID".to_string(), task_id.to_string()); - - tracing::info!( - task_id = %task_id, - api_url = "http://localhost:8080", - tool_key_preview = &tool_key[..8.min(tool_key.len())], - "Set orchestrator environment variables" - ); - - // Prepend orchestrator instructions to the plan - let orchestrator_plan = format!( - "{}\n\n---\n\nYour task:\n{}", - ORCHESTRATOR_SYSTEM_PROMPT, - plan - ); - - let msg = DaemonMessage::task_output( - task_id, - format!("Orchestrator environment ready (script at {})\n", script_path.display()), - false, - ); - let _ = self.ws_tx.send(msg).await; - - (Some(env), orchestrator_plan) - } else { - tracing::info!(task_id = %task_id, "Running as regular subtask (not orchestrator)"); - // Prepend subtask instructions to ensure worktree isolation - let subtask_plan = format!( - "{}\nYour task:\n{}", - SUBTASK_SYSTEM_PROMPT, - plan - ); - (None, subtask_plan) - }; - - // Spawn Claude process - let plan_bytes = full_plan.len(); - let plan_chars = full_plan.chars().count(); - // Rough token estimate: ~4 chars per token for English - let estimated_tokens = plan_chars / 4; - - tracing::info!( - task_id = %task_id, - working_dir = %working_dir.display(), - is_orchestrator = is_orchestrator, - plan_bytes = plan_bytes, - plan_chars = plan_chars, - estimated_tokens = estimated_tokens, - "Spawning Claude process" - ); - - // Warn if plan is very large (Claude's context is typically 100k-200k tokens) - if estimated_tokens > 50_000 { - tracing::warn!(task_id = %task_id, estimated_tokens = estimated_tokens, "Plan is very large - may hit context limits!"); - let msg = DaemonMessage::task_output( - task_id, - format!("Warning: Plan is very large (~{} tokens). This may cause issues.\n", estimated_tokens), - false, - ); - let _ = self.ws_tx.send(msg).await; - } - - let msg = DaemonMessage::task_output( - task_id, - if is_orchestrator { - format!("Starting Claude Code (orchestrator mode, ~{} tokens)...\n", estimated_tokens) - } else { - format!("Starting Claude Code (~{} tokens)...\n", estimated_tokens) - }, - false, - ); - let _ = self.ws_tx.send(msg).await; - - tracing::debug!(task_id = %task_id, "Calling process_manager.spawn()..."); - let mut process = self.process_manager - .spawn(&working_dir, &full_plan, extra_env) - .await - .map_err(|e| { - tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process"); - DaemonError::Task(TaskError::SetupFailed(e.to_string())) - })?; - tracing::info!(task_id = %task_id, "Claude process spawned successfully"); - - // Set up input channel for this task so we can send messages to its stdin - tracing::debug!(task_id = %task_id, "Setting up input channel..."); - let (input_tx, mut input_rx) = mpsc::channel::<String>(100); - tracing::debug!(task_id = %task_id, "Acquiring task_inputs write lock..."); - self.task_inputs.write().await.insert(task_id, input_tx); - tracing::debug!(task_id = %task_id, "Input channel registered"); - - // Get stdin handle for input forwarding and completion signaling - let stdin_handle = process.stdin_handle(); - let stdin_handle_for_completion = stdin_handle.clone(); - - tracing::info!(task_id = %task_id, "Setting up stdin forwarder for task input (JSON protocol)"); - tokio::spawn(async move { - tracing::info!(task_id = %task_id, "Stdin forwarder task started, waiting for messages..."); - while let Some(msg) = input_rx.recv().await { - tracing::info!(task_id = %task_id, msg_len = msg.len(), msg_preview = %if msg.len() > 50 { &msg[..50] } else { &msg }, "Received message from input channel"); - - // Format as JSON user message for stream-json input protocol - let json_msg = ClaudeInputMessage::user(&msg); - let json_line = match json_msg.to_json_line() { - Ok(line) => line, - Err(e) => { - tracing::error!(task_id = %task_id, error = %e, "Failed to serialize input message"); - continue; - } - }; - - tracing::debug!(task_id = %task_id, json_line = %json_line.trim(), "Formatted JSON line for stdin"); - - let mut stdin_guard = stdin_handle.lock().await; - if let Some(ref mut stdin) = *stdin_guard { - tracing::debug!(task_id = %task_id, "Acquired stdin lock, writing..."); - if stdin.write_all(json_line.as_bytes()).await.is_err() { - tracing::warn!(task_id = %task_id, "Failed to write to stdin, breaking"); - break; - } - if stdin.flush().await.is_err() { - tracing::warn!(task_id = %task_id, "Failed to flush stdin, breaking"); - break; - } - tracing::info!(task_id = %task_id, json_len = json_line.len(), "Successfully wrote user message to Claude stdin"); - } else { - tracing::warn!(task_id = %task_id, "Stdin is None (already closed), cannot send message"); - break; - } - } - tracing::info!(task_id = %task_id, "Stdin forwarder task ended (channel closed or stdin unavailable)"); - }); - - // Update state to Running - { - tracing::debug!(task_id = %task_id, "Acquiring tasks write lock for Running state update"); - let mut tasks = self.tasks.write().await; - if let Some(task) = tasks.get_mut(&task_id) { - task.state = TaskState::Running; - task.started_at = Some(Instant::now()); - } - tracing::debug!(task_id = %task_id, "Released tasks write lock"); - } - tracing::info!(task_id = %task_id, "Updating state: Starting -> Running"); - self.send_status_change(task_id, "starting", "running").await; - tracing::debug!(task_id = %task_id, "Sent status change notification"); - - // Stream output with startup timeout check - tracing::info!(task_id = %task_id, "Starting output stream - waiting for Claude output..."); - tracing::debug!(task_id = %task_id, "Output will be forwarded via WebSocket to server"); - let ws_tx = self.ws_tx.clone(); - - let mut output_count = 0u64; - let mut output_bytes = 0usize; - let startup_timeout = tokio::time::Duration::from_secs(30); - let mut startup_check = tokio::time::interval(tokio::time::Duration::from_secs(5)); - startup_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - let startup_deadline = tokio::time::Instant::now() + startup_timeout; - - loop { - tokio::select! { - maybe_line = process.next_output() => { - match maybe_line { - Some(line) => { - output_count += 1; - output_bytes += line.content.len(); - - if output_count == 1 { - tracing::info!(task_id = %task_id, "Received first output line from Claude"); - } - if output_count % 100 == 0 { - tracing::debug!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output progress"); - } - - // Log output details for debugging - tracing::trace!( - task_id = %task_id, - line_num = output_count, - content_len = line.content.len(), - is_stdout = line.is_stdout, - json_type = ?line.json_type, - "Forwarding output to WebSocket" - ); - - // Check if this is a "result" message indicating task completion - // With --input-format=stream-json, Claude waits for more input after completion - // We close stdin to signal EOF and let the process exit - if line.json_type.as_deref() == Some("result") { - tracing::info!(task_id = %task_id, "Received result message, closing stdin to signal completion"); - let mut stdin_guard = stdin_handle_for_completion.lock().await; - if let Some(mut stdin) = stdin_guard.take() { - let _ = stdin.shutdown().await; - } - } - - let msg = DaemonMessage::task_output(task_id, line.content, false); - if ws_tx.send(msg).await.is_err() { - tracing::warn!(task_id = %task_id, "Failed to send output, channel closed"); - break; - } - } - None => { - tracing::info!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output stream ended"); - break; - } - } - } - _ = startup_check.tick(), if output_count == 0 => { - // Check if process is still alive - match process.try_wait() { - Ok(Some(exit_code)) => { - tracing::error!(task_id = %task_id, exit_code = exit_code, "Claude process exited before producing output!"); - let msg = DaemonMessage::task_output( - task_id, - format!("Error: Claude process exited unexpectedly with code {}\n", exit_code), - false, - ); - let _ = ws_tx.send(msg).await; - break; - } - Ok(None) => { - // Still running but no output - if tokio::time::Instant::now() > startup_deadline { - tracing::warn!(task_id = %task_id, "Claude process not producing output after 30s - may be stuck"); - let msg = DaemonMessage::task_output( - task_id, - "Warning: Claude Code is taking longer than expected to start. It may be waiting for authentication or network access.\n".to_string(), - false, - ); - let _ = ws_tx.send(msg).await; - } else { - tracing::debug!(task_id = %task_id, "Claude process still running, waiting for output..."); - } - } - Err(e) => { - tracing::error!(task_id = %task_id, error = %e, "Failed to check Claude process status"); - } - } - } - } - } - - // Wait for process to exit - let exit_code = process.wait().await.unwrap_or(-1); - - // Clean up input channel for this task - self.task_inputs.write().await.remove(&task_id); - tracing::debug!(task_id = %task_id, "Removed task input channel"); - - // Update state based on exit code - let success = exit_code == 0; - let new_state = if success { - TaskState::Completed - } else { - TaskState::Failed - }; - - tracing::info!( - task_id = %task_id, - exit_code = exit_code, - success = success, - new_state = ?new_state, - "Claude process exited, updating task state" - ); - - { - let mut tasks = self.tasks.write().await; - if let Some(task) = tasks.get_mut(&task_id) { - task.state = new_state; - task.completed_at = Some(Instant::now()); - if !success { - task.error = Some(format!("Process exited with code {}", exit_code)); - } - } - } - - // Execute completion action if task succeeded - let completion_result = if success { - if let Some(ref action) = completion_action { - if action != "none" { - self.execute_completion_action( - task_id, - &task_name, - &working_dir, - action, - target_repo_path.as_deref(), - target_branch.as_deref(), - ).await - } else { - Ok(None) - } - } else { - Ok(None) - } - } else { - Ok(None) - }; - - // Log completion action result - match &completion_result { - Ok(Some(pr_url)) => { - tracing::info!(task_id = %task_id, pr_url = %pr_url, "Completion action created PR"); - } - Ok(None) => {} - Err(e) => { - tracing::warn!(task_id = %task_id, error = %e, "Completion action failed (task still marked as done)"); - } - } - - // Notify server - let error = if success { - None - } else { - Some(format!("Exit code: {}", exit_code)) - }; - tracing::info!(task_id = %task_id, success = success, "Notifying server of task completion"); - let msg = DaemonMessage::task_complete(task_id, success, error); - let _ = self.ws_tx.send(msg).await; - - // Note: Worktrees are kept until explicitly deleted (per user preference) - // This allows inspection, PR creation, etc. - - tracing::info!(task_id = %task_id, "=== RUN_TASK END ==="); - Ok(()) - } - - /// Execute the completion action for a task. - async fn execute_completion_action( - &self, - task_id: Uuid, - task_name: &str, - worktree_path: &std::path::Path, - action: &str, - target_repo_path: Option<&str>, - target_branch: Option<&str>, - ) -> Result<Option<String>, String> { - let target_repo = match target_repo_path { - Some(path) => crate::worktree::expand_tilde(path), - None => { - tracing::warn!(task_id = %task_id, "No target_repo_path configured, skipping completion action"); - return Ok(None); - } - }; - - if !target_repo.exists() { - return Err(format!("Target repo not found: {} (expanded from {:?})", target_repo.display(), target_repo_path)); - } - - // Get the branch name: makima/{task-name-with-dashes}-{short-id} - let branch_name = format!( - "makima/{}-{}", - crate::worktree::sanitize_name(task_name), - crate::worktree::short_uuid(task_id) - ); - - // Determine target branch - use provided value or detect default branch of target repo - let target_branch = match target_branch { - Some(branch) => branch.to_string(), - None => { - // Detect default branch (main, master, develop, etc.) - self.worktree_manager - .detect_default_branch(&target_repo) - .await - .unwrap_or_else(|_| "master".to_string()) - } - }; - - let msg = DaemonMessage::task_output( - task_id, - format!("Executing completion action: {}...\n", action), - false, - ); - let _ = self.ws_tx.send(msg).await; - - match action { - "branch" => { - // Just push the branch to target repo - self.worktree_manager - .push_to_target_repo(worktree_path, &target_repo, &branch_name, task_name) - .await - .map_err(|e| e.to_string())?; - - let msg = DaemonMessage::task_output( - task_id, - format!("Branch '{}' pushed to {}\n", branch_name, target_repo.display()), - false, - ); - let _ = self.ws_tx.send(msg).await; - Ok(None) - } - "merge" => { - // Push and merge into target branch - let commit_sha = self.worktree_manager - .merge_to_target(worktree_path, &target_repo, &branch_name, &target_branch, task_name) - .await - .map_err(|e| e.to_string())?; - - let msg = DaemonMessage::task_output( - task_id, - format!("Branch merged into {} (commit: {})\n", target_branch, commit_sha), - false, - ); - let _ = self.ws_tx.send(msg).await; - Ok(None) - } - "pr" => { - // Push and create PR - let title = task_name.to_string(); - let body = format!( - "Automated PR from makima task.\n\nTask ID: `{}`", - task_id - ); - let pr_url = self.worktree_manager - .create_pull_request( - worktree_path, - &target_repo, - &branch_name, - &target_branch, - &title, - &body, - ) - .await - .map_err(|e| e.to_string())?; - - let msg = DaemonMessage::task_output( - task_id, - format!("Pull request created: {}\n", pr_url), - false, - ); - let _ = self.ws_tx.send(msg).await; - Ok(Some(pr_url)) - } - _ => { - tracing::warn!(task_id = %task_id, action = %action, "Unknown completion action"); - Ok(None) - } - } - } - - /// Find worktree path for a task ID. - /// First checks in-memory tasks, then scans the worktrees directory. - async fn find_worktree_for_task(&self, task_id: Uuid) -> Result<PathBuf, String> { - // First try to get from in-memory tasks - { - let tasks = self.tasks.read().await; - if let Some(task) = tasks.get(&task_id) { - if let Some(ref worktree) = task.worktree { - return Ok(worktree.path.clone()); - } - } - } - - // Task not in memory - scan worktrees directory for matching task ID - let short_id = &task_id.to_string()[..8]; - let worktrees_dir = self.worktree_manager.base_dir(); - - if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await { - while let Ok(Some(entry)) = entries.next_entry().await { - let name = entry.file_name(); - let name_str = name.to_string_lossy(); - if name_str.starts_with(short_id) { - let path = entry.path(); - // Verify it's a valid git directory - if path.join(".git").exists() { - tracing::info!( - task_id = %task_id, - worktree_path = %path.display(), - "Found worktree by scanning directory" - ); - return Ok(path); - } - } - } - } - - Err(format!( - "No worktree found for task {}. The worktree may have been cleaned up.", - task_id - )) - } - - async fn update_state(&self, task_id: Uuid, state: TaskState) { - let mut tasks = self.tasks.write().await; - if let Some(task) = tasks.get_mut(&task_id) { - task.state = state; - } - } - - async fn send_status_change(&self, task_id: Uuid, old_status: &str, new_status: &str) { - let msg = DaemonMessage::task_status_change(task_id, old_status, new_status); - let _ = self.ws_tx.send(msg).await; - } - - /// Mark task as failed. - async fn mark_failed(&self, task_id: Uuid, error: &str) { - { - let mut tasks = self.tasks.write().await; - if let Some(task) = tasks.get_mut(&task_id) { - task.state = TaskState::Failed; - task.error = Some(error.to_string()); - task.completed_at = Some(Instant::now()); - } - } - - // Notify server - let msg = DaemonMessage::task_complete(task_id, false, Some(error.to_string())); - let _ = self.ws_tx.send(msg).await; - } -} - -impl Clone for TaskManagerInner { - fn clone(&self) -> Self { - Self { - worktree_manager: self.worktree_manager.clone(), - process_manager: self.process_manager.clone(), - temp_manager: self.temp_manager.clone(), - tasks: self.tasks.clone(), - ws_tx: self.ws_tx.clone(), - task_inputs: self.task_inputs.clone(), - } - } -} diff --git a/makima/daemon/src/task/mod.rs b/makima/daemon/src/task/mod.rs deleted file mode 100644 index 29c261e..0000000 --- a/makima/daemon/src/task/mod.rs +++ /dev/null @@ -1,7 +0,0 @@ -//! Task management and execution. - -pub mod manager; -pub mod state; - -pub use manager::{ManagedTask, TaskConfig, TaskManager}; -pub use state::TaskState; diff --git a/makima/daemon/src/task/state.rs b/makima/daemon/src/task/state.rs deleted file mode 100644 index fe73de1..0000000 --- a/makima/daemon/src/task/state.rs +++ /dev/null @@ -1,161 +0,0 @@ -//! Task state machine. - -use std::fmt; - -/// Task execution state. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub enum TaskState { - /// Task received, preparing overlay. - Initializing, - /// Overlay ready, starting container. - Starting, - /// Container running. - Running, - /// Container paused. - Paused, - /// Waiting for sibling or resource. - Blocked, - /// Task completed successfully. - Completed, - /// Task failed with error. - Failed, - /// Task interrupted by user. - Interrupted, -} - -impl TaskState { - /// Check if a state transition is valid. - pub fn can_transition_to(&self, target: TaskState) -> bool { - use TaskState::*; - - matches!( - (self, target), - // From Initializing - (Initializing, Starting) - | (Initializing, Failed) - | (Initializing, Interrupted) - // From Starting - | (Starting, Running) - | (Starting, Failed) - | (Starting, Interrupted) - // From Running - | (Running, Paused) - | (Running, Blocked) - | (Running, Completed) - | (Running, Failed) - | (Running, Interrupted) - // From Paused - | (Paused, Running) - | (Paused, Interrupted) - | (Paused, Failed) - // From Blocked - | (Blocked, Running) - | (Blocked, Failed) - | (Blocked, Interrupted) - ) - } - - /// Check if this state is terminal (no more transitions possible). - pub fn is_terminal(&self) -> bool { - matches!( - self, - TaskState::Completed | TaskState::Failed | TaskState::Interrupted - ) - } - - /// Check if the task is currently active (running or paused). - pub fn is_active(&self) -> bool { - matches!( - self, - TaskState::Initializing - | TaskState::Starting - | TaskState::Running - | TaskState::Paused - | TaskState::Blocked - ) - } - - /// Check if the task is running. - pub fn is_running(&self) -> bool { - matches!(self, TaskState::Running) - } - - /// Convert to string for protocol messages. - pub fn as_str(&self) -> &'static str { - match self { - TaskState::Initializing => "initializing", - TaskState::Starting => "starting", - TaskState::Running => "running", - TaskState::Paused => "paused", - TaskState::Blocked => "blocked", - TaskState::Completed => "done", - TaskState::Failed => "failed", - TaskState::Interrupted => "interrupted", - } - } - - /// Parse from string. - pub fn from_str(s: &str) -> Option<Self> { - match s.to_lowercase().as_str() { - "initializing" => Some(TaskState::Initializing), - "starting" => Some(TaskState::Starting), - "running" => Some(TaskState::Running), - "paused" => Some(TaskState::Paused), - "blocked" => Some(TaskState::Blocked), - "done" | "completed" => Some(TaskState::Completed), - "failed" => Some(TaskState::Failed), - "interrupted" => Some(TaskState::Interrupted), - _ => None, - } - } -} - -impl fmt::Display for TaskState { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.as_str()) - } -} - -impl Default for TaskState { - fn default() -> Self { - TaskState::Initializing - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_valid_transitions() { - use TaskState::*; - - // Valid transitions - assert!(Initializing.can_transition_to(Starting)); - assert!(Starting.can_transition_to(Running)); - assert!(Running.can_transition_to(Completed)); - assert!(Running.can_transition_to(Paused)); - assert!(Paused.can_transition_to(Running)); - - // Invalid transitions - assert!(!Completed.can_transition_to(Running)); - assert!(!Failed.can_transition_to(Running)); - assert!(!Running.can_transition_to(Initializing)); - } - - #[test] - fn test_terminal_states() { - assert!(TaskState::Completed.is_terminal()); - assert!(TaskState::Failed.is_terminal()); - assert!(TaskState::Interrupted.is_terminal()); - assert!(!TaskState::Running.is_terminal()); - assert!(!TaskState::Paused.is_terminal()); - } - - #[test] - fn test_parse() { - assert_eq!(TaskState::from_str("running"), Some(TaskState::Running)); - assert_eq!(TaskState::from_str("done"), Some(TaskState::Completed)); - assert_eq!(TaskState::from_str("invalid"), None); - } -} |
