//! Task lifecycle manager using git worktrees and Claude Code subprocesses.
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use base64::Engine;
use rand::Rng;
use tokio::io::AsyncWriteExt;
use tokio::sync::{mpsc, RwLock};
use uuid::Uuid;
use std::collections::HashSet;
use super::completion_gate::{CircuitBreaker, CompletionGate};
use super::state::TaskState;
use crate::daemon::config::CheckpointPatchConfig;
use crate::daemon::error::{DaemonError, TaskError, TaskResult};
use crate::daemon::process::{ClaudeInputMessage, ProcessManager};
use crate::daemon::storage;
use crate::daemon::temp::TempManager;
use crate::daemon::worktree::{is_new_repo_request, ConflictResolution, WorktreeError, WorktreeInfo, WorktreeManager};
use crate::daemon::db::local::LocalDb;
use crate::daemon::ws::{BranchInfo, DaemonCommand, DaemonMessage};
/// Generate a secure random API key for orchestrator tool access.
fn generate_tool_key() -> String {
let mut rng = rand::thread_rng();
let bytes: [u8; 32] = rng.r#gen();
hex::encode(bytes)
}
/// Check if output contains an OAuth authentication error.
/// Only checks system/error messages, not assistant responses (which could mention auth errors conversationally).
fn is_oauth_auth_error(output: &str, json_type: Option<&str>, is_stdout: bool) -> bool {
// Only check system messages or stderr output - not assistant messages
// which could mention auth errors in conversation
match json_type {
Some("assistant") | Some("user") | Some("result") => return false,
_ => {}
}
// For stdout JSON messages, only check system/error types
if is_stdout && json_type.is_none() {
// Non-JSON stdout output - could be startup messages, check carefully
// Only match very specific patterns that wouldn't appear in conversation
}
// Match various authentication error patterns from Claude Code
// These patterns are specific enough that they shouldn't appear in normal conversation
if output.contains("Please run /login") && output.contains("authenticate") {
return true;
}
if output.contains("Invalid API key") && output.contains("ANTHROPIC_API_KEY") {
return true;
}
if output.contains("authentication_error")
&& (output.contains("OAuth token has expired")
|| output.contains("Please obtain a new token"))
{
return true;
}
// Check for Claude Code's specific error format
if output.contains("\"type\":\"error\"") && output.contains("authentication") {
return true;
}
false
}
/// Extract OAuth URL from text (looks for claude.ai OAuth URLs).
fn extract_url(text: &str) -> Option<String> {
// Look for claude.ai OAuth URLs - try multiple patterns
let patterns = [
"https://claude.ai/oauth",
"https://console.anthropic.com/oauth",
];
for pattern in patterns {
if let Some(start) = text.find(pattern) {
let remaining = &text[start..];
// Find the end of the URL - stop at:
// - Whitespace, common URL terminators, escape sequences
let end = remaining
.find(|c: char| {
c.is_whitespace() || c == '"' || c == '\'' || c == '>' || c == ')' || c == ']' || c == '\x07' || c == '\x1b'
})
.unwrap_or(remaining.len());
let url = &remaining[..end];
// Also check if there's another https:// within the match (hyperlink duplication)
// Skip first 20 chars to avoid matching the start
let url = if url.len() > 30 {
if let Some(second_https) = url[20..].find("https://") {
&url[..second_https + 20] // Keep only first URL
} else {
url
}
} else {
url
};
if url.len() > 20 {
return Some(url.to_string());
}
}
}
None
}
/// Global storage for pending OAuth flow (only one can be active at a time per daemon)
static PENDING_AUTH_FLOW: std::sync::OnceLock<std::sync::Mutex<Option<std::sync::mpsc::Sender<String>>>> = std::sync::OnceLock::new();
fn get_auth_flow_storage() -> &'static std::sync::Mutex<Option<std::sync::mpsc::Sender<String>>> {
PENDING_AUTH_FLOW.get_or_init(|| std::sync::Mutex::new(None))
}
/// Send an auth code to the pending OAuth flow.
pub fn send_auth_code(code: &str) -> bool {
let storage = get_auth_flow_storage();
if let Ok(mut guard) = storage.lock() {
if let Some(sender) = guard.take() {
if sender.send(code.to_string()).is_ok() {
tracing::info!("Auth code sent to setup-token process");
return true;
}
}
}
tracing::warn!("No pending auth flow to send code to");
false
}
/// Extract an OAuth token from a line of setup-token output.
/// Looks for tokens matching the `sk-ant-oat01-` prefix format.
fn extract_oauth_token(line: &str) -> Option<String> {
let trimmed = line.trim();
if trimmed.starts_with("sk-ant-oat01-") {
Some(trimmed.to_string())
} else {
None
}
}
/// Save an OAuth token to the ~/.makima directory for later use by spawned Claude processes.
fn save_oauth_token(token: &str) -> std::io::Result<()> {
let makima_dir = dirs::home_dir()
.unwrap_or_default()
.join(".makima");
std::fs::create_dir_all(&makima_dir)?;
let token_path = makima_dir.join("claude_oauth_token");
std::fs::write(&token_path, token)?;
// Set restrictive permissions on Unix
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(&token_path, std::fs::Permissions::from_mode(0o600))?;
}
tracing::info!(path = %token_path.display(), "Saved OAuth token to disk");
Ok(())
}
/// Load a previously saved OAuth token from ~/.makima/claude_oauth_token.
/// Returns None if no token file exists or is empty.
pub fn load_oauth_token() -> Option<String> {
let token_path = dirs::home_dir()?
.join(".makima")
.join("claude_oauth_token");
std::fs::read_to_string(&token_path).ok()
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
}
/// Result of the OAuth login flow initiated by `get_oauth_login_url`.
/// Contains the URL for the user to visit, plus a receiver for when the token is saved.
struct OAuthFlowResult {
/// The OAuth login URL the user should visit.
login_url: String,
/// Receiver that will yield the saved token once authentication completes.
token_rx: tokio::sync::oneshot::Receiver<String>,
}
/// Spawn `claude setup-token` to initiate OAuth flow and capture the login URL.
/// This spawns the process in a PTY (required by Ink) and reads output until we find a URL.
///
/// The new `claude setup-token` flow outputs a token directly (sk-ant-oat01-...) after
/// the user completes browser authentication, so no code submission is needed.
/// The token is automatically detected, saved to disk, and reported via the token_rx channel.
async fn get_oauth_login_url(claude_command: &str) -> Option<OAuthFlowResult> {
use portable_pty::{native_pty_system, CommandBuilder, PtySize};
use std::io::{Read, Write};
tracing::info!("Spawning claude setup-token in PTY to get OAuth login URL");
// Create a PTY - Ink requires a real terminal
let pty_system = native_pty_system();
let pair = match pty_system.openpty(PtySize {
rows: 24,
cols: 200, // Wide enough to avoid line wrapping for long URLs/codes
pixel_width: 0,
pixel_height: 0,
}) {
Ok(pair) => pair,
Err(e) => {
tracing::error!(error = %e, "Failed to open PTY");
return None;
}
};
// Build the command
let mut cmd = CommandBuilder::new(claude_command);
cmd.arg("setup-token");
// Set environment variables to prevent browser from opening and disable fancy output
// Use "false" so the browser command fails, forcing setup-token to show URL and wait for manual input
cmd.env("BROWSER", "false");
cmd.env("TERM", "dumb"); // Disable hyperlinks and fancy terminal features
cmd.env("NO_COLOR", "1"); // Disable colors
// Spawn the process in the PTY
let mut child = match pair.slave.spawn_command(cmd) {
Ok(child) => child,
Err(e) => {
tracing::error!(error = %e, "Failed to spawn claude setup-token in PTY");
return None;
}
};
// Get the reader from the master side
let mut reader = match pair.master.try_clone_reader() {
Ok(reader) => reader,
Err(e) => {
tracing::error!(error = %e, "Failed to clone PTY reader");
return None;
}
};
let mut writer = match pair.master.take_writer() {
Ok(writer) => writer,
Err(e) => {
tracing::error!(error = %e, "Failed to take PTY writer");
return None;
}
};
// Create channels for communication
let (code_tx, code_rx) = std::sync::mpsc::channel::<String>();
let (url_tx, url_rx) = std::sync::mpsc::channel::<String>();
let (token_tx, token_rx) = tokio::sync::oneshot::channel::<String>();
// Store the code sender globally so it can be used when AUTH_CODE message arrives
{
let storage = get_auth_flow_storage();
if let Ok(mut guard) = storage.lock() {
*guard = Some(code_tx);
}
}
// Spawn reader thread - reads PTY output, sends URL when found, and watches for token
let reader_handle = std::thread::spawn(move || {
let mut buffer = [0u8; 4096];
let mut accumulated = String::new();
let mut url_sent = false;
let mut token_saved = false;
let mut token_tx = Some(token_tx);
let mut read_count = 0;
tracing::info!("setup-token reader thread started");
loop {
match reader.read(&mut buffer) {
Ok(0) => {
tracing::info!("setup-token PTY EOF reached after {} reads", read_count);
break;
}
Ok(n) => {
read_count += 1;
let chunk = String::from_utf8_lossy(&buffer[..n]);
accumulated.push_str(&chunk);
// Process complete lines
while let Some(newline_pos) = accumulated.find('\n') {
let line = accumulated[..newline_pos].to_string();
accumulated = accumulated[newline_pos + 1..].to_string();
let clean_line = strip_ansi_codes(&line);
if !clean_line.trim().is_empty() {
tracing::info!(line = %clean_line, "setup-token output");
}
// Look for OAuth URL if not found yet
if !url_sent {
if let Some(url) = extract_url(&line) {
tracing::info!(url = %url, "Found OAuth login URL");
let _ = url_tx.send(url);
url_sent = true;
}
}
// Look for OAuth token in output (new setup-token format)
if !token_saved {
if let Some(token) = extract_oauth_token(&clean_line) {
tracing::info!("Found OAuth token in setup-token output");
if let Err(e) = save_oauth_token(&token) {
tracing::error!(error = %e, "Failed to save OAuth token");
} else {
tracing::info!("OAuth token saved successfully");
}
if let Some(tx) = token_tx.take() {
let _ = tx.send(token);
}
token_saved = true;
}
}
// Check for success/failure messages
if clean_line.contains("successfully") || clean_line.contains("authenticated") || clean_line.contains("Success") {
tracing::info!("Authentication appears successful!");
}
if clean_line.contains("error") || clean_line.contains("failed") || clean_line.contains("invalid") {
tracing::warn!(line = %clean_line, "setup-token may have encountered an error");
}
}
}
Err(e) => {
tracing::warn!(error = %e, "PTY read error after {} reads", read_count);
break;
}
}
}
tracing::info!("setup-token reader thread ended (token_saved={})", token_saved);
});
// Spawn writer thread - waits for auth code and writes it to PTY
std::thread::spawn(move || {
tracing::info!("setup-token writer thread started, waiting for auth code (10 min timeout)");
// Wait for auth code from frontend (with long timeout - user needs time to authenticate)
match code_rx.recv_timeout(std::time::Duration::from_secs(600)) {
Ok(code) => {
tracing::info!(code_len = code.len(), "Received auth code from frontend, writing to PTY");
// Write code followed by carriage return (Enter key in raw terminal mode)
let code_with_enter = format!("{}\r", code);
if let Err(e) = writer.write_all(code_with_enter.as_bytes()) {
tracing::error!(error = %e, "Failed to write auth code to PTY");
} else if let Err(e) = writer.flush() {
tracing::error!(error = %e, "Failed to flush PTY writer");
} else {
tracing::info!("Auth code written to setup-token PTY successfully");
// Give Ink a moment to process, then send another Enter in case first was buffered
std::thread::sleep(std::time::Duration::from_millis(100));
let _ = writer.write_all(b"\r");
let _ = writer.flush();
tracing::info!("Sent additional Enter keypress");
}
}
Err(e) => {
tracing::info!(error = %e, "Auth code receive ended (timeout or channel closed)");
}
}
// Wait for reader thread to finish
tracing::debug!("Waiting for reader thread to finish...");
let _ = reader_handle.join();
// Wait for child to fully exit
tracing::debug!("Waiting for setup-token child process to exit...");
match child.wait() {
Ok(status) => {
tracing::info!(exit_status = ?status, "setup-token process exited");
}
Err(e) => {
tracing::error!(error = %e, "Failed to wait for setup-token process");
}
}
});
// Wait for URL with timeout
match url_rx.recv_timeout(std::time::Duration::from_secs(30)) {
Ok(login_url) => Some(OAuthFlowResult {
login_url,
token_rx,
}),
Err(e) => {
tracing::error!(error = %e, "Timed out waiting for OAuth login URL");
None
}
}
}
/// Strip ANSI escape codes from a string for cleaner logging.
fn strip_ansi_codes(s: &str) -> String {
let mut result = String::with_capacity(s.len());
let mut chars = s.chars().peekable();
while let Some(c) = chars.next() {
if c == '\x1b' {
// Check what type of escape sequence
match chars.peek() {
Some(&'[') => {
// CSI sequence: ESC [ ... letter
chars.next(); // consume '['
while let Some(&next) = chars.peek() {
chars.next();
if next.is_ascii_alphabetic() {
break;
}
}
}
Some(&']') => {
// OSC sequence: ESC ] ... ST (where ST is BEL or ESC \)
chars.next(); // consume ']'
while let Some(&next) = chars.peek() {
if next == '\x07' {
chars.next(); // consume BEL (string terminator)
break;
}
if next == '\x1b' {
chars.next(); // consume ESC
if chars.peek() == Some(&'\\') {
chars.next(); // consume \ (string terminator)
}
break;
}
chars.next();
}
}
_ => {
// Unknown escape, skip just the ESC
}
}
} else if !c.is_control() || c == '\n' {
result.push(c);
}
}
result
}
/// System prompt for regular (non-orchestrator) subtasks.
/// This tells subtasks they share a worktree with the supervisor and other tasks.
const SUBTASK_SYSTEM_PROMPT: &str = r#"You are working in a shared worktree directory with other tasks in this contract.
## IMPORTANT: Shared Worktree
**You share this worktree with the supervisor and other tasks in the contract.**
- Work within your assigned area (files/modules specified in your task plan)
- Be aware other tasks may be modifying other parts of the codebase
- Your changes will be auto-committed when your task completes
- DO NOT make commits yourself - the system handles this
## Directory Restrictions
- DO NOT use `cd` to navigate outside your worktree
- DO NOT use absolute paths pointing outside the worktree
- All file operations should be relative to the current directory
## Your Role
1. Complete the specific task assigned to you
2. Stay focused on your task plan
3. The system will commit and integrate your changes automatically
---
"#;
/// The orchestrator system prompt that tells Claude how to use the helper script.
const ORCHESTRATOR_SYSTEM_PROMPT: &str = r#"You are an orchestrator task. Your job is to coordinate subtasks and integrate their work, NOT to write code directly.
## FIRST STEP
Start by checking if you have existing subtasks:
```bash
# List all subtasks to see what work needs to be done
./.makima/orchestrate.sh list
```
If subtasks exist, start them. If you need additional subtasks or no subtasks exist yet, you can create them.
---
## Creating Subtasks
You can create new subtasks to break down work:
```bash
# Create a new subtask with a name and plan
./.makima/orchestrate.sh create "Subtask Name" "Detailed plan for what the subtask should do..."
# The command returns the new subtask ID - use it to start the subtask
./.makima/orchestrate.sh start <new_subtask_id>
```
Create subtasks when you need to:
- Break down complex work into smaller pieces
- Run multiple tasks in parallel on different parts of the codebase
- Delegate specific implementation work
## Task Continuation (Sequential Dependencies)
When subtasks need to build on each other's work (e.g., Task B depends on Task A's changes), use `--continue-from`:
```bash
# Create Task B that continues from Task A's worktree
./.makima/orchestrate.sh create "Task B" "Build on Task A's work..." --continue-from <task_a_id>
```
This copies all files from Task A's worktree into Task B's worktree, so Task B starts with Task A's changes.
**When to use continuation:**
- Sequential work: Task B needs Task A's output files
- Staged implementation: Building features incrementally
- Fix-and-extend: One task fixes issues, another adds features on top
**When NOT to use continuation:**
- Parallel tasks working on different files
- Independent subtasks that can be merged separately
**Important for merging:** When tasks continue from each other, only merge the FINAL task in the chain. Earlier tasks' changes are already included in later tasks' worktrees.
## Sharing Files with Subtasks
Use `--files` to copy specific files from your orchestrator worktree to subtasks. This is useful for sharing plans, configs, or data files:
```bash
# Create subtask with specific files copied from orchestrator
./.makima/orchestrate.sh create "Implement Feature" "Follow the plan in PLAN.md" --files "PLAN.md"
# Copy multiple files (comma-separated)
./.makima/orchestrate.sh create "API Work" "Use the spec..." --files "PLAN.md,api-spec.yaml,types.ts"
# Combine with --continue-from to share files AND continue from another task
./.makima/orchestrate.sh create "Step 2" "Continue..." --continue-from <task_a_id> --files "requirements.md"
```
**Use cases for --files:**
- Share a PLAN.md with detailed implementation steps
- Distribute configuration or spec files
- Pass generated data or intermediate results
## How Subtasks Work
Each subtask runs in its own **worktree** - a separate directory with a copy of the codebase. When subtasks complete:
- Their work remains in the worktree files (NOT committed to git)
- **Subtasks do NOT auto-merge** - YOU must integrate their work into your worktree
- You can view and copy files from subtask worktrees using their paths
- The worktree path is returned when you get subtask status
**IMPORTANT:** Subtasks never create PRs or merge to the target repository. Only the orchestrator (you) can trigger completion actions like PR creation or merging after integrating all subtask work.
## Subtask Commands
```bash
# List all subtasks and their current status
./.makima/orchestrate.sh list
# Create a new subtask (returns the subtask ID)
./.makima/orchestrate.sh create "Name" "Plan/description"
# Create a subtask that continues from another task's worktree
./.makima/orchestrate.sh create "Name" "Plan" --continue-from <other_task_id>
# Create a subtask with specific files copied from orchestrator worktree
./.makima/orchestrate.sh create "Name" "Plan" --files "file1.md,file2.yaml"
# Start a specific subtask (it will run in its own Claude instance)
./.makima/orchestrate.sh start <subtask_id>
# Stop a running subtask
./.makima/orchestrate.sh stop <subtask_id>
# Get detailed status of a subtask (includes worktree_path when available)
./.makima/orchestrate.sh status <subtask_id>
# Get the output/logs of a subtask
./.makima/orchestrate.sh output <subtask_id>
# Get the worktree path for a subtask
./.makima/orchestrate.sh worktree <subtask_id>
```
## Integrating Subtask Work
When subtasks complete, their changes exist as files in their worktree directories:
- Files are NOT committed to git branches
- You must copy/integrate files from subtask worktrees into your worktree
- Use standard file operations (cp, cat, etc.) to review and integrate changes
### Handling Continuation Chains
**CRITICAL:** When subtasks use `--continue-from`, they form a chain where each task includes all changes from previous tasks. You must ONLY integrate the FINAL task in each chain.
Example chain: Task A → Task B (continues from A) → Task C (continues from B)
- Task C's worktree contains ALL changes from A, B, and C
- You should ONLY integrate Task C's worktree
- DO NOT integrate Task A or Task B separately (their changes are already in C)
**How to track continuation chains:**
1. When you create tasks with `--continue-from`, note which task continues from which
2. Build a mental model: Independent tasks (no continuation) + Continuation chains
3. For each chain, only integrate the LAST task in the chain
**Example with mixed independent and chained tasks:**
```
Independent tasks (integrate all):
- Task X: API endpoints
- Task Y: Database models
Continuation chain (integrate ONLY the last one):
- Task A: Core feature → Task B: Tests (continues from A) → Task C: Docs (continues from B)
Only integrate Task C!
```
### Integration Examples
For independent subtasks (no continuation):
```bash
# Get the worktree path for a completed subtask
SUBTASK_PATH=$(./.makima/orchestrate.sh worktree <subtask_id>)
# View what files were changed
ls -la "$SUBTASK_PATH"
diff -r . "$SUBTASK_PATH" --exclude=.git --exclude=.makima
# Copy specific files from subtask
cp "$SUBTASK_PATH/src/new_file.rs" ./src/
cp "$SUBTASK_PATH/src/modified_file.rs" ./src/
# Or use diff/patch for more control
diff -u ./src/file.rs "$SUBTASK_PATH/src/file.rs" > changes.patch
patch -p0 < changes.patch
```
For continuation chains (only integrate the final task):
```bash
# If you have: Task A → Task B → Task C (each continues from previous)
# ONLY get and integrate Task C's worktree - it has everything!
FINAL_TASK_PATH=$(./.makima/orchestrate.sh worktree <task_c_id>)
# Copy all changes from the final task
rsync -av --exclude='.git' --exclude='.makima' "$FINAL_TASK_PATH/" ./
```
## Completion
```bash
# Mark yourself as complete after integrating all subtask work
./.makima/orchestrate.sh done "Summary of what was accomplished"
```
## Workflow
1. **List existing subtasks**: Run `list` to see current subtasks
2. **Create subtasks if needed**: Use `create` to add new subtasks for the work
- For independent parallel work: create without `--continue-from`
- For sequential dependencies: use `--continue-from <previous_task_id>`
- Track which tasks continue from which (continuation chains)
3. **Start subtasks**: Run `start` for each subtask
4. **Monitor progress**: Check status and output as subtasks run
5. **Integrate work**: When subtasks complete:
- For independent tasks: integrate each one's worktree
- For continuation chains: ONLY integrate the FINAL task (it has all changes)
- Get worktree path with `worktree <subtask_id>`
- Copy or merge files into your worktree
6. **Complete**: Call `done` once all work is integrated
## Important Notes
- Subtask files are in worktrees, NOT committed git branches
- **Subtasks do NOT auto-merge or create PRs** - you must integrate their work
- You can read files from subtask worktrees using their paths
- Use standard file tools (cp, diff, cat, rsync) to integrate changes
- You should NOT edit files directly - that's what subtasks are for
- DO NOT DO THE SUBTASKS' WORK! Your only job is to coordinate, not implement.
- When you call `done`, YOUR worktree may be used for the final PR/merge
"#;
/// System prompt for supervisor tasks (contract orchestrators).
/// Supervisors coordinate work by spawning tasks and responding to user questions.
/// Git operations and phase advancement are handled automatically by the system.
const SUPERVISOR_SYSTEM_PROMPT: &str = r###"You are the SUPERVISOR for this contract. Your job is to coordinate work by spawning tasks and responding to user questions.
## WHAT YOU DO
1. Break down the contract goal into actionable tasks
2. Spawn tasks using `makima supervisor spawn "Task Name" "Detailed plan..."`
3. Wait for tasks to complete using `makima supervisor wait <task_id>`
4. Respond to user questions when asked
## WHAT THE SYSTEM HANDLES AUTOMATICALLY
- **Phase advancement** - When deliverables are complete, the system advances the phase
- **Git commits** - Tasks auto-commit their changes on completion
- **Pull requests** - System auto-creates PR when execute phase completes
- **You will be notified** when phases advance so you know to continue
## CRITICAL RULES
1. **NEVER write code or edit files yourself** - you are a coordinator ONLY
2. **ALWAYS spawn tasks** for ANY work that involves writing or editing code
3. **ALWAYS wait for tasks to complete** - you MUST use `wait` after spawning
## AVAILABLE COMMANDS
### Task Management
```bash
makima supervisor spawn "Task Name" "Detailed plan..." # Create and start a task
makima supervisor wait <task_id> [timeout_seconds] # Wait for task completion
makima supervisor tasks # List all tasks
makima supervisor tree # View task tree
makima supervisor diff <task_id> # View task changes
makima supervisor read-file <task_id> <file_path> # Read file from task
```
### User Interaction
```bash
makima supervisor ask "Your question" [--choices "A,B,C"] # Ask user
makima supervisor status # Contract status (read-only)
```
## WORKFLOW PATTERN
```bash
# 1. Spawn a task
RESULT=$(makima supervisor spawn "Implement feature X" "Details...")
TASK_ID=$(echo "$RESULT" | jq -r '.taskId')
# 2. Wait for it
makima supervisor wait "$TASK_ID"
# 3. Check result
makima supervisor diff "$TASK_ID"
# 4. Repeat for more tasks
# System handles commits, merging, and PR creation automatically
```
## MULTI-PHASE PLANS
When the plan document contains multiple implementation phases (Phase 1, Phase 2, etc.):
1. **Read the plan** to identify ALL phases
2. **Execute phases SEQUENTIALLY** - complete Phase 1 before Phase 2
3. **Track your progress** - keep track of which phases are done
4. **Confirm between phases** - use `ask` to confirm before proceeding
5. The system will auto-create PR when ALL phases are complete
## IMPORTANT NOTES
- DO NOT call advance-phase - the system does this automatically
- DO NOT manage git operations (branch, merge, pr) - the system handles this
- Focus ONLY on spawning tasks and responding to users
- You share a worktree with all tasks - changes are visible immediately
- If you need user input, use `makima supervisor ask`
- When all work is complete, use `makima supervisor complete` to finish
## WHEN TASKS COMPLETE
When a task completes:
1. Check the result with `makima supervisor diff <task_id>`
2. If more work needed, spawn another task
3. The system automatically commits changes
When ALL work is complete:
- Use `makima supervisor complete` to mark the contract done
- The system will auto-create PR (for remote repos)
"###;
/// System prompt for tasks that are part of a contract.
/// This tells the task about contract.sh and how to use it to interact with the contract.
const CONTRACT_INTEGRATION_PROMPT: &str = r##"
## Contract Integration
This task is part of a contract. You have access to contract tools via the `makima contract` CLI.
### Contract Commands
```bash
# Get contract context (name, phase, goals)
makima contract status
# Get phase checklist and deliverables
makima contract checklist
# List contract files
makima contract files
# Read a specific file content
makima contract file <file_id>
# Report progress to the contract
makima contract report "Completed X, working on Y..."
# Create a new contract file (content via stdin)
echo "# New Documentation" | makima contract create-file "New Document"
# Update an existing contract file (content via stdin)
cat updated_content.md | makima contract update-file <file_id>
# Get suggested next action when done
makima contract suggest-action
# Report completion with metrics
makima contract completion-action --files "file1.rs,file2.rs" --code
```
### What You Should Do
**Before starting:**
1. Run `makima contract status` to understand the contract context
2. Run `makima contract checklist` to see phase deliverables
3. Run `makima contract files` to see existing documentation
**While working:**
- Report significant progress with `makima contract report "..."`
**When completing:**
1. If your work should be documented, create or update contract files
2. Run `makima contract completion-action` to see recommended next steps
3. Consider what contract files or phases might need updating
**Important:** Your work should contribute to the contract's goals. Check the contract status to understand what's expected.
---
"##;
/// Tracks merge state for an orchestrator task.
#[derive(Default)]
struct MergeTracker {
/// Subtask branches that have been successfully merged.
merged_subtasks: HashSet<Uuid>,
/// Subtask branches that were explicitly skipped (with reason).
skipped_subtasks: HashMap<Uuid, String>,
}
/// Managed task information.
#[derive(Clone)]
pub struct ManagedTask {
/// Task ID.
pub id: Uuid,
/// Human-readable task name.
pub task_name: String,
/// Current state.
pub state: TaskState,
/// Worktree info if created.
pub worktree: Option<WorktreeInfo>,
/// Task plan.
pub plan: String,
/// Repository URL or path.
pub repo_source: Option<String>,
/// Base branch.
pub base_branch: Option<String>,
/// Target branch to merge into.
pub target_branch: Option<String>,
/// Parent task ID if this is a subtask.
pub parent_task_id: Option<Uuid>,
/// Depth in task hierarchy (0=top-level, 1=subtask, 2=sub-subtask).
pub depth: i32,
/// Whether this task runs as an orchestrator (coordinates subtasks).
pub is_orchestrator: bool,
/// Whether this task is a supervisor (long-running contract orchestrator).
pub is_supervisor: bool,
/// Path to target repository for completion actions.
pub target_repo_path: Option<String>,
/// Completion action: "none", "branch", "merge", "pr".
pub completion_action: Option<String>,
/// Task ID to continue from (copy worktree from this task).
pub continue_from_task_id: Option<Uuid>,
/// Files to copy from parent task's worktree.
pub copy_files: Option<Vec<String>>,
/// Contract ID if this task is associated with a contract.
pub contract_id: Option<Uuid>,
/// Key used for concurrency tracking (contract_id or task_id for standalone).
pub concurrency_key: Uuid,
/// Whether to run in autonomous loop mode.
pub autonomous_loop: bool,
/// Whether the contract is in local-only mode (skips automatic completion actions).
pub local_only: bool,
/// Whether to auto-merge to target branch locally when local_only mode is enabled.
pub auto_merge_local: bool,
/// If set, merge this task's changes to the supervisor's worktree on completion (cross-daemon case).
pub merge_to_supervisor_task_id: Option<Uuid>,
/// If set, this task shares the worktree of the specified supervisor task.
pub supervisor_worktree_task_id: Option<Uuid>,
/// Time task was created.
pub created_at: Instant,
/// Time task started running.
pub started_at: Option<Instant>,
/// Time task completed.
pub completed_at: Option<Instant>,
/// Error message if failed.
pub error: Option<String>,
}
/// Configuration for task execution.
#[derive(Clone)]
pub struct TaskConfig {
/// Maximum concurrent tasks (global cap).
pub max_concurrent_tasks: u32,
/// Maximum concurrent tasks per contract/supervisor.
pub max_tasks_per_contract: u32,
/// Base directory for worktrees.
pub worktree_base_dir: PathBuf,
/// Environment variables to pass to Claude.
pub env_vars: HashMap<String, String>,
/// Claude command path.
pub claude_command: String,
/// Additional arguments to pass to Claude Code.
pub claude_args: Vec<String>,
/// Arguments to pass before defaults.
pub claude_pre_args: Vec<String>,
/// Enable Claude's permission system.
pub enable_permissions: bool,
/// Disable verbose output.
pub disable_verbose: bool,
/// Bubblewrap sandbox configuration.
pub bubblewrap: Option<crate::daemon::config::BubblewrapConfig>,
/// API URL for spawned tasks (HTTP endpoint for makima CLI).
pub api_url: String,
/// API key for making authenticated API calls.
pub api_key: String,
/// Interval in seconds between heartbeat commits (WIP checkpoints).
/// Set to 0 to disable. Default: 300 (5 minutes).
pub heartbeat_commit_interval_secs: u64,
/// Checkpoint patch storage configuration.
pub checkpoint_patches: CheckpointPatchConfig,
}
impl Default for TaskConfig {
fn default() -> Self {
Self {
max_concurrent_tasks: 10,
max_tasks_per_contract: 10,
worktree_base_dir: WorktreeManager::default_base_dir(),
env_vars: HashMap::new(),
claude_command: "claude".to_string(),
claude_args: Vec::new(),
claude_pre_args: Vec::new(),
enable_permissions: false,
disable_verbose: false,
bubblewrap: None,
api_url: "https://api.makima.jp".to_string(),
api_key: String::new(),
heartbeat_commit_interval_secs: 300, // 5 minutes
checkpoint_patches: CheckpointPatchConfig::default(),
}
}
}
/// Task manager for handling task lifecycle.
pub struct TaskManager {
/// Worktree manager.
worktree_manager: Arc<WorktreeManager>,
/// Process manager.
process_manager: Arc<ProcessManager>,
/// Temp directory manager.
temp_manager: Arc<TempManager>,
/// Task configuration.
config: TaskConfig,
/// Active tasks.
tasks: Arc<RwLock<HashMap<Uuid, ManagedTask>>>,
/// Channel to send messages to server.
ws_tx: mpsc::Sender<DaemonMessage>,
/// Tracks running task count per contract (or per standalone task).
/// Key is contract_id for contract tasks, or task_id for standalone tasks.
contract_task_counts: Arc<RwLock<HashMap<Uuid, usize>>>,
/// Channels for sending input to running tasks.
/// Each sender allows sending messages to the stdin of a running Claude process.
task_inputs: Arc<RwLock<HashMap<Uuid, mpsc::Sender<String>>>>,
/// Tracks merge state per orchestrator task (for completion gate).
merge_trackers: Arc<RwLock<HashMap<Uuid, MergeTracker>>>,
/// Active process PIDs for graceful shutdown.
active_pids: Arc<RwLock<HashMap<Uuid, u32>>>,
/// Inherited git user.email for worktrees.
git_user_email: Arc<RwLock<Option<String>>>,
/// Inherited git user.name for worktrees.
git_user_name: Arc<RwLock<Option<String>>>,
/// Local SQLite database for crash recovery.
local_db: Arc<std::sync::Mutex<LocalDb>>,
}
impl TaskManager {
/// Create a new task manager with local database for crash recovery.
pub fn new(
config: TaskConfig,
ws_tx: mpsc::Sender<DaemonMessage>,
local_db: Arc<std::sync::Mutex<LocalDb>>,
) -> Self {
let worktree_manager = Arc::new(WorktreeManager::new(config.worktree_base_dir.clone()));
let process_manager = Arc::new(
ProcessManager::with_command(config.claude_command.clone())
.with_args(config.claude_args.clone())
.with_pre_args(config.claude_pre_args.clone())
.with_permissions_enabled(config.enable_permissions)
.with_verbose_disabled(config.disable_verbose)
.with_env(config.env_vars.clone())
.with_bubblewrap(config.bubblewrap.clone()),
);
let temp_manager = Arc::new(TempManager::new());
Self {
worktree_manager,
process_manager,
temp_manager,
config,
tasks: Arc::new(RwLock::new(HashMap::new())),
ws_tx,
contract_task_counts: Arc::new(RwLock::new(HashMap::new())),
task_inputs: Arc::new(RwLock::new(HashMap::new())),
merge_trackers: Arc::new(RwLock::new(HashMap::new())),
active_pids: Arc::new(RwLock::new(HashMap::new())),
git_user_email: Arc::new(RwLock::new(None)),
git_user_name: Arc::new(RwLock::new(None)),
local_db,
}
}
/// Persist task state to local SQLite database for crash recovery.
fn persist_task_to_local_db(&self, task: &ManagedTask) {
use crate::daemon::db::local::LocalTask;
let local_task = LocalTask {
id: task.id,
server_task_id: task.id, // Same as task id
state: task.state.clone(),
container_id: None,
overlay_path: task.worktree.as_ref().map(|w| w.path.to_string_lossy().to_string()),
repo_url: task.repo_source.clone(),
base_branch: task.base_branch.clone(),
plan: task.plan.clone(),
created_at: chrono::Utc::now(),
started_at: task.started_at.map(|_| chrono::Utc::now()),
completed_at: task.completed_at.map(|_| chrono::Utc::now()),
error_message: task.error.clone(),
};
if let Ok(db) = self.local_db.lock() {
if let Err(e) = db.save_task(&local_task) {
tracing::warn!(task_id = %task.id, error = %e, "Failed to persist task to local database");
} else {
tracing::debug!(task_id = %task.id, state = ?task.state, "Persisted task to local database");
}
}
}
/// Remove completed/failed task from local database.
fn remove_task_from_local_db(&self, task_id: Uuid) {
if let Ok(db) = self.local_db.lock() {
if let Err(e) = db.delete_task(task_id) {
tracing::warn!(task_id = %task_id, error = %e, "Failed to remove task from local database");
} else {
tracing::debug!(task_id = %task_id, "Removed task from local database");
}
}
}
/// Recover orphaned tasks from local database after daemon restart.
/// Returns list of task IDs that have worktrees and can potentially be recovered.
pub async fn recover_orphaned_tasks(&self) -> Vec<Uuid> {
tracing::info!("=== STARTING ORPHANED TASK RECOVERY ===");
let active_tasks = {
let db = match self.local_db.lock() {
Ok(db) => db,
Err(e) => {
tracing::error!(error = %e, "Failed to lock local database for recovery");
return Vec::new();
}
};
match db.get_active_tasks() {
Ok(tasks) => tasks,
Err(e) => {
tracing::error!(error = %e, "Failed to load active tasks from local database");
return Vec::new();
}
}
};
if active_tasks.is_empty() {
tracing::info!("No orphaned tasks found in local database");
return Vec::new();
}
tracing::info!(count = active_tasks.len(), "Found orphaned tasks in local database");
let mut recoverable_task_ids = Vec::new();
for local_task in active_tasks {
tracing::info!(
task_id = %local_task.id,
state = ?local_task.state,
overlay_path = ?local_task.overlay_path,
"Checking orphaned task"
);
// Check if worktree exists on filesystem
let worktree_exists = if let Some(ref path) = local_task.overlay_path {
let path = std::path::PathBuf::from(path);
path.exists() && path.join(".git").exists()
} else {
// Try to find worktree by task ID pattern (scan worktrees directory)
let short_id = &local_task.id.to_string()[..8];
let worktrees_dir = self.worktree_manager.base_dir();
let mut found = false;
if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await {
while let Ok(Some(entry)) = entries.next_entry().await {
let name = entry.file_name();
let name_str = name.to_string_lossy();
if name_str.starts_with(short_id) {
let path = entry.path();
if path.join(".git").exists() {
found = true;
break;
}
}
}
}
found
};
if worktree_exists {
tracing::info!(
task_id = %local_task.id,
"Found worktree for orphaned task - can be recovered"
);
recoverable_task_ids.push(local_task.id);
// Send structured recovery notification to server
let msg = DaemonMessage::task_recovery_detected(
local_task.id,
local_task.state.as_str(),
true, // worktree intact
local_task.overlay_path.clone(),
false, // doesn't need patch since worktree is intact
);
let _ = self.ws_tx.send(msg).await;
} else {
tracing::warn!(
task_id = %local_task.id,
"Worktree missing for orphaned task - marking as lost"
);
// Update local db to mark as failed
if let Ok(db) = self.local_db.lock() {
let _ = db.update_task_state(local_task.id, TaskState::Failed);
}
}
}
tracing::info!(
recoverable = recoverable_task_ids.len(),
"=== ORPHANED TASK RECOVERY COMPLETE ==="
);
recoverable_task_ids
}
/// Check worktree health for all running tasks.
/// If a worktree is missing, marks the task as interrupted and notifies the server.
/// This allows the retry orchestrator to pick up the task and restore it from checkpoint.
pub async fn check_worktree_health(&self) -> Vec<Uuid> {
let mut affected_task_ids = Vec::new();
// Get all running tasks with their worktree info and supervisor worktree task ID
let tasks_snapshot: Vec<(Uuid, Option<PathBuf>, Option<Uuid>)> = {
let tasks = self.tasks.read().await;
tasks
.iter()
.filter(|(_, t)| matches!(t.state, TaskState::Running | TaskState::Starting))
.map(|(id, t)| (*id, t.worktree.as_ref().map(|w| w.path.clone()), t.supervisor_worktree_task_id))
.collect()
};
if tasks_snapshot.is_empty() {
return affected_task_ids;
}
for (task_id, worktree_path, supervisor_worktree_task_id) in tasks_snapshot {
let worktree_exists = if let Some(ref path) = worktree_path {
path.exists() && path.join(".git").exists()
} else if let Some(supervisor_task_id) = supervisor_worktree_task_id {
// Task uses shared supervisor worktree - check the supervisor's worktree
// First try to get from in-memory tasks
let supervisor_worktree_path: Option<PathBuf> = {
let tasks = self.tasks.read().await;
tasks.get(&supervisor_task_id)
.and_then(|t| t.worktree.as_ref().map(|w| w.path.clone()))
};
if let Some(path) = supervisor_worktree_path {
path.exists() && path.join(".git").exists()
} else {
// Supervisor not in memory - scan worktrees directory
let short_id = &supervisor_task_id.to_string()[..8];
let worktrees_dir = self.worktree_manager.base_dir();
let mut found = false;
if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await {
while let Ok(Some(entry)) = entries.next_entry().await {
let name = entry.file_name();
let name_str = name.to_string_lossy();
if name_str.starts_with(short_id) {
let path = entry.path();
if path.join(".git").exists() {
found = true;
break;
}
}
}
}
found
}
} else {
// No worktree set - scan by task ID
let short_id = &task_id.to_string()[..8];
let worktrees_dir = self.worktree_manager.base_dir();
let mut found = false;
if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await {
while let Ok(Some(entry)) = entries.next_entry().await {
let name = entry.file_name();
let name_str = name.to_string_lossy();
if name_str.starts_with(short_id) {
let path = entry.path();
if path.join(".git").exists() {
found = true;
break;
}
}
}
}
found
};
if !worktree_exists {
tracing::warn!(
task_id = %task_id,
worktree_path = ?worktree_path,
"Worktree missing for running task - marking as interrupted for retry"
);
affected_task_ids.push(task_id);
// Update task state to interrupted
{
let mut tasks = self.tasks.write().await;
if let Some(task) = tasks.get_mut(&task_id) {
task.state = TaskState::Interrupted;
task.error = Some("Worktree directory was deleted".to_string());
task.completed_at = Some(Instant::now());
}
}
// Notify server - task needs recovery/retry
let msg = DaemonMessage::task_complete(
task_id,
false,
Some("Worktree deleted - task interrupted for recovery".to_string()),
);
let _ = self.ws_tx.send(msg).await;
// Remove from local db since server will handle retry
self.remove_task_from_local_db(task_id);
}
}
if !affected_task_ids.is_empty() {
tracing::info!(
count = affected_task_ids.len(),
"Worktree health check found missing worktrees"
);
}
affected_task_ids
}
/// Check if a task can be spawned given contract-based concurrency limits.
/// Returns the concurrency key to use (contract_id or task_id for standalone).
async fn try_acquire_concurrency_slot(
&self,
contract_id: Option<Uuid>,
task_id: Uuid,
) -> TaskResult<Uuid> {
let mut counts = self.contract_task_counts.write().await;
// Determine the concurrency key:
// - For contract tasks: use contract_id
// - For standalone tasks: use task_id (each standalone task is its own "contract")
let concurrency_key = contract_id.unwrap_or(task_id);
// Check global cap
let total: usize = counts.values().sum();
if total >= self.config.max_concurrent_tasks as usize {
tracing::warn!(
task_id = %task_id,
total_running = total,
max = self.config.max_concurrent_tasks,
"Global concurrency limit reached, cannot spawn task"
);
return Err(TaskError::ConcurrencyLimit);
}
// Check per-contract cap
let contract_count = counts.get(&concurrency_key).copied().unwrap_or(0);
if contract_count >= self.config.max_tasks_per_contract as usize {
tracing::warn!(
task_id = %task_id,
contract_id = ?contract_id,
concurrency_key = %concurrency_key,
contract_running = contract_count,
max_per_contract = self.config.max_tasks_per_contract,
"Contract concurrency limit reached, cannot spawn task"
);
return Err(TaskError::ContractConcurrencyLimit);
}
// Increment count for this contract
*counts.entry(concurrency_key).or_insert(0) += 1;
tracing::debug!(
task_id = %task_id,
concurrency_key = %concurrency_key,
new_count = counts.get(&concurrency_key).copied().unwrap_or(0),
total = total + 1,
"Acquired concurrency slot"
);
Ok(concurrency_key)
}
/// Gracefully shutdown all running Claude processes and their children.
///
/// This sends SIGTERM to all active process groups, waits for them to exit gracefully,
/// and then sends SIGKILL to any that don't exit within the timeout.
/// Uses process groups to ensure all child processes (bash commands, etc.) are also killed.
#[cfg(unix)]
pub async fn shutdown_all_processes(&self, timeout: std::time::Duration) {
use nix::sys::signal::{killpg, Signal};
use nix::unistd::Pid;
let pids: Vec<(Uuid, u32)> = {
let guard = self.active_pids.read().await;
guard.iter().map(|(k, v)| (*k, *v)).collect()
};
if pids.is_empty() {
tracing::info!("No active Claude processes to shutdown");
return;
}
tracing::info!(count = pids.len(), "Sending SIGTERM to all Claude process groups");
// Send SIGTERM to all process groups (each Claude process is its own group leader)
for (task_id, pid) in &pids {
match killpg(Pid::from_raw(*pid as i32), Signal::SIGTERM) {
Ok(()) => {
tracing::debug!(task_id = %task_id, pid = pid, "Sent SIGTERM to process group");
}
Err(nix::errno::Errno::ESRCH) => {
tracing::debug!(task_id = %task_id, pid = pid, "Process group already exited");
}
Err(e) => {
tracing::warn!(task_id = %task_id, pid = pid, error = %e, "Failed to send SIGTERM to process group");
}
}
}
// Wait for processes to exit with timeout
let start = std::time::Instant::now();
let check_interval = std::time::Duration::from_millis(100);
while start.elapsed() < timeout {
let remaining: Vec<u32> = {
let guard = self.active_pids.read().await;
guard.values().copied().collect()
};
if remaining.is_empty() {
tracing::info!("All Claude processes exited gracefully");
return;
}
tokio::time::sleep(check_interval).await;
}
// Send SIGKILL to any remaining process groups
let remaining: Vec<(Uuid, u32)> = {
let guard = self.active_pids.read().await;
guard.iter().map(|(k, v)| (*k, *v)).collect()
};
if !remaining.is_empty() {
tracing::warn!(
count = remaining.len(),
"Some process groups did not exit gracefully, sending SIGKILL"
);
for (task_id, pid) in &remaining {
match killpg(Pid::from_raw(*pid as i32), Signal::SIGKILL) {
Ok(()) => {
tracing::debug!(task_id = %task_id, pid = pid, "Sent SIGKILL to process group");
}
Err(e) => {
tracing::warn!(task_id = %task_id, pid = pid, error = %e, "Failed to send SIGKILL to process group");
}
}
}
}
}
/// Gracefully shutdown all running Claude processes (no-op on non-Unix).
#[cfg(not(unix))]
pub async fn shutdown_all_processes(&self, _timeout: std::time::Duration) {
tracing::warn!("Graceful shutdown not supported on this platform");
}
/// Pause a running task by sending SIGSTOP to its process.
#[cfg(unix)]
pub async fn pause_task(&self, task_id: Uuid) -> TaskResult<()> {
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
// Check if task exists and is running
let current_state = {
let tasks = self.tasks.read().await;
tasks.get(&task_id).map(|t| t.state)
};
match current_state {
Some(TaskState::Running) => {}
Some(TaskState::Paused) => {
tracing::debug!(task_id = %task_id, "Task already paused");
return Ok(());
}
Some(state) => {
tracing::warn!(task_id = %task_id, state = ?state, "Cannot pause task in state");
return Err(TaskError::InvalidStateTransition {
from: format!("{:?}", state),
to: "Paused".to_string(),
});
}
None => {
tracing::warn!(task_id = %task_id, "Task not found");
return Err(TaskError::NotFound(task_id));
}
}
// Get the process PID
let pid = {
let pids = self.active_pids.read().await;
pids.get(&task_id).copied()
};
let Some(pid) = pid else {
tracing::warn!(task_id = %task_id, "No PID found for task");
return Err(TaskError::ExecutionFailed(
"No active process for task".to_string(),
));
};
// Send SIGSTOP to pause the process
match kill(Pid::from_raw(pid as i32), Signal::SIGSTOP) {
Ok(()) => {
tracing::info!(task_id = %task_id, pid = pid, "Sent SIGSTOP to pause process");
}
Err(nix::errno::Errno::ESRCH) => {
tracing::warn!(task_id = %task_id, pid = pid, "Process not found");
return Err(TaskError::ExecutionFailed("Process not found".to_string()));
}
Err(e) => {
tracing::error!(task_id = %task_id, pid = pid, error = %e, "Failed to send SIGSTOP");
return Err(TaskError::ExecutionFailed(format!(
"Failed to pause: {}",
e
)));
}
}
// Update task state to Paused
{
let mut tasks = self.tasks.write().await;
if let Some(task) = tasks.get_mut(&task_id) {
task.state = TaskState::Paused;
}
}
// Notify server of state change
let msg = DaemonMessage::task_status_change(task_id, "running", "paused");
let _ = self.ws_tx.send(msg).await;
Ok(())
}
/// Pause a task (no-op on non-Unix).
#[cfg(not(unix))]
pub async fn pause_task(&self, task_id: Uuid) -> TaskResult<()> {
tracing::warn!(task_id = %task_id, "Pause not supported on this platform");
Err(TaskError::ExecutionFailed(
"Pause not supported on this platform".to_string(),
))
}
/// Resume a paused task by sending SIGCONT to its process.
#[cfg(unix)]
pub async fn resume_task(&self, task_id: Uuid) -> TaskResult<()> {
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
// Check if task exists and is paused
let current_state = {
let tasks = self.tasks.read().await;
tasks.get(&task_id).map(|t| t.state)
};
match current_state {
Some(TaskState::Paused) => {}
Some(TaskState::Running) => {
tracing::debug!(task_id = %task_id, "Task already running");
return Ok(());
}
Some(state) => {
tracing::warn!(task_id = %task_id, state = ?state, "Cannot resume task in state");
return Err(TaskError::InvalidStateTransition {
from: format!("{:?}", state),
to: "Running".to_string(),
});
}
None => {
tracing::warn!(task_id = %task_id, "Task not found");
return Err(TaskError::NotFound(task_id));
}
}
// Get the process PID
let pid = {
let pids = self.active_pids.read().await;
pids.get(&task_id).copied()
};
let Some(pid) = pid else {
tracing::warn!(task_id = %task_id, "No PID found for task");
return Err(TaskError::ExecutionFailed(
"No active process for task".to_string(),
));
};
// Send SIGCONT to resume the process
match kill(Pid::from_raw(pid as i32), Signal::SIGCONT) {
Ok(()) => {
tracing::info!(task_id = %task_id, pid = pid, "Sent SIGCONT to resume process");
}
Err(nix::errno::Errno::ESRCH) => {
tracing::warn!(task_id = %task_id, pid = pid, "Process not found");
return Err(TaskError::ExecutionFailed("Process not found".to_string()));
}
Err(e) => {
tracing::error!(task_id = %task_id, pid = pid, error = %e, "Failed to send SIGCONT");
return Err(TaskError::ExecutionFailed(format!(
"Failed to resume: {}",
e
)));
}
}
// Update task state to Running
{
let mut tasks = self.tasks.write().await;
if let Some(task) = tasks.get_mut(&task_id) {
task.state = TaskState::Running;
}
}
// Notify server of state change
let msg = DaemonMessage::task_status_change(task_id, "paused", "running");
let _ = self.ws_tx.send(msg).await;
Ok(())
}
/// Resume a task (no-op on non-Unix).
#[cfg(not(unix))]
pub async fn resume_task(&self, task_id: Uuid) -> TaskResult<()> {
tracing::warn!(task_id = %task_id, "Resume not supported on this platform");
Err(TaskError::ExecutionFailed(
"Resume not supported on this platform".to_string(),
))
}
/// Handle a command from the server.
pub async fn handle_command(&self, command: DaemonCommand) -> Result<(), DaemonError> {
tracing::info!("Received command from server: {:?}", command);
match command {
DaemonCommand::SpawnTask {
task_id,
task_name,
plan,
repo_url,
base_branch,
target_branch,
parent_task_id,
depth,
is_orchestrator,
target_repo_path,
completion_action,
continue_from_task_id,
copy_files,
contract_id,
is_supervisor,
autonomous_loop,
resume_session,
conversation_history,
patch_data,
patch_base_sha,
local_only,
auto_merge_local,
supervisor_worktree_task_id,
directive_id,
} => {
tracing::info!(
task_id = %task_id,
task_name = %task_name,
repo_url = ?repo_url,
base_branch = ?base_branch,
target_branch = ?target_branch,
parent_task_id = ?parent_task_id,
depth = depth,
is_orchestrator = is_orchestrator,
is_supervisor = is_supervisor,
autonomous_loop = autonomous_loop,
resume_session = resume_session,
target_repo_path = ?target_repo_path,
completion_action = ?completion_action,
continue_from_task_id = ?continue_from_task_id,
copy_files = ?copy_files,
contract_id = ?contract_id,
directive_id = ?directive_id,
supervisor_worktree_task_id = ?supervisor_worktree_task_id,
plan_len = plan.len(),
"Spawning new task"
);
self.spawn_task(
task_id, task_name, plan, repo_url, base_branch, target_branch,
parent_task_id, depth, is_orchestrator, is_supervisor,
target_repo_path, completion_action, continue_from_task_id,
copy_files, contract_id, autonomous_loop, resume_session,
conversation_history, patch_data, patch_base_sha, local_only, auto_merge_local,
supervisor_worktree_task_id, directive_id,
).await?;
}
DaemonCommand::PauseTask { task_id } => {
tracing::info!(task_id = %task_id, "Pausing task");
if let Err(e) = self.pause_task(task_id).await {
tracing::warn!(task_id = %task_id, error = %e, "Failed to pause task");
}
}
DaemonCommand::ResumeTask { task_id } => {
tracing::info!(task_id = %task_id, "Resuming task");
if let Err(e) = self.resume_task(task_id).await {
tracing::warn!(task_id = %task_id, error = %e, "Failed to resume task");
}
}
DaemonCommand::InterruptTask { task_id, graceful: _ } => {
tracing::info!(task_id = %task_id, "Interrupting task");
self.interrupt_task(task_id).await?;
}
DaemonCommand::SendMessage { task_id, message } => {
// Check if this is an auth code message
if message.starts_with("AUTH_CODE:") {
let code = message.strip_prefix("AUTH_CODE:").unwrap_or("").trim();
tracing::info!(task_id = %task_id, "Received auth code from frontend");
if send_auth_code(code) {
tracing::info!(task_id = %task_id, "Auth code forwarded to setup-token");
} else {
tracing::warn!(task_id = %task_id, "No pending auth flow to receive code");
}
} else {
// Check if task is paused - auto-resume before sending message
let task_state = {
let tasks = self.tasks.read().await;
tasks.get(&task_id).map(|t| t.state)
};
if task_state == Some(TaskState::Paused) {
tracing::info!(task_id = %task_id, "Auto-resuming paused task before sending message");
if let Err(e) = self.resume_task(task_id).await {
tracing::warn!(task_id = %task_id, error = %e, "Failed to auto-resume task");
}
}
// Regular message - send to task's stdin
tracing::info!(task_id = %task_id, message_len = message.len(), "Sending message to task stdin");
// Send message to the task's stdin via the input channel
let inputs = self.task_inputs.read().await;
if let Some(sender) = inputs.get(&task_id) {
if let Err(e) = sender.send(message).await {
tracing::warn!(task_id = %task_id, error = %e, "Failed to send message to task input channel (channel may be closed, stdin forwarder may have exited)");
} else {
tracing::info!(task_id = %task_id, "Message sent to task input channel successfully, will be forwarded to Claude stdin");
}
} else {
drop(inputs); // Release read lock before checking if we need to respawn
// Check if this is a supervisor that needs to be respawned
let task_info = {
let tasks = self.tasks.read().await;
tasks.get(&task_id).cloned()
};
if let Some(task) = task_info {
if task.is_supervisor {
tracing::info!(
task_id = %task_id,
"Supervisor has no active Claude process, respawning with message"
);
// Respawn the supervisor with the new message as the plan
// Claude Code will use --continue to maintain conversation history
let inner = self.clone_inner();
let task_name = task.task_name.clone();
let repo_source = task.repo_source.clone();
let base_branch = task.base_branch.clone();
let target_branch = task.target_branch.clone();
let target_repo_path = task.target_repo_path.clone();
let completion_action = task.completion_action.clone();
let contract_id = task.contract_id;
let local_only = task.local_only;
let auto_merge_local = task.auto_merge_local;
// Spawn in background to not block the command handler
tokio::spawn(async move {
if let Err(e) = inner.run_task(
task_id,
task_name,
message, // Use the message as the new prompt
repo_source,
base_branch,
target_branch,
false, // is_orchestrator
true, // is_supervisor
target_repo_path,
completion_action,
None, // continue_from_task_id
None, // copy_files
contract_id,
false, // autonomous_loop - supervisors don't use this
false, // resume_session - respawning from scratch
None, // conversation_history - not needed for fresh respawn
None, // patch_data - not available for respawn
None, // patch_base_sha - not available for respawn
local_only,
auto_merge_local,
None, // supervisor_worktree_task_id - supervisors use their own worktree
None, // directive_id
).await {
tracing::error!(
task_id = %task_id,
error = %e,
"Failed to respawn supervisor"
);
}
});
} else {
tracing::warn!(task_id = %task_id, "No input channel for task (task may not be running)");
}
} else {
tracing::warn!(task_id = %task_id, "Task not found");
}
}
}
}
DaemonCommand::InjectSiblingContext { task_id, .. } => {
tracing::debug!(task_id = %task_id, "Sibling context injection not supported for subprocess tasks");
}
DaemonCommand::Authenticated { daemon_id } => {
tracing::debug!(daemon_id = %daemon_id, "Authenticated command (handled by WS client)");
}
DaemonCommand::Error { code, message } => {
tracing::warn!(code = %code, message = %message, "Error command from server");
}
// =========================================================================
// Merge Commands
// =========================================================================
DaemonCommand::ListBranches { task_id } => {
tracing::info!(task_id = %task_id, "Listing task branches");
self.handle_list_branches(task_id).await?;
}
DaemonCommand::MergeStart { task_id, source_branch } => {
tracing::info!(task_id = %task_id, source_branch = %source_branch, "Starting merge");
self.handle_merge_start(task_id, source_branch).await?;
}
DaemonCommand::MergeStatus { task_id } => {
tracing::info!(task_id = %task_id, "Getting merge status");
self.handle_merge_status(task_id).await?;
}
DaemonCommand::MergeResolve { task_id, file, strategy } => {
tracing::info!(task_id = %task_id, file = %file, strategy = %strategy, "Resolving conflict");
self.handle_merge_resolve(task_id, file, strategy).await?;
}
DaemonCommand::MergeCommit { task_id, message } => {
tracing::info!(task_id = %task_id, "Committing merge");
self.handle_merge_commit(task_id, message).await?;
}
DaemonCommand::MergeAbort { task_id } => {
tracing::info!(task_id = %task_id, "Aborting merge");
self.handle_merge_abort(task_id).await?;
}
DaemonCommand::MergeSkip { task_id, subtask_id, reason } => {
tracing::info!(task_id = %task_id, subtask_id = %subtask_id, reason = %reason, "Skipping subtask merge");
self.handle_merge_skip(task_id, subtask_id, reason).await?;
}
DaemonCommand::CheckMergeComplete { task_id } => {
tracing::info!(task_id = %task_id, "Checking merge completion");
self.handle_check_merge_complete(task_id).await?;
}
// =========================================================================
// Completion Action Commands
// =========================================================================
DaemonCommand::RetryCompletionAction {
task_id,
task_name,
action,
target_repo_path,
target_branch,
} => {
tracing::info!(
task_id = %task_id,
task_name = %task_name,
action = %action,
target_repo_path = %target_repo_path,
target_branch = ?target_branch,
"Retrying completion action"
);
self.handle_retry_completion_action(task_id, task_name, action, target_repo_path, target_branch).await?;
}
DaemonCommand::CloneWorktree { task_id, target_dir } => {
tracing::info!(
task_id = %task_id,
target_dir = %target_dir,
"Cloning worktree to target directory"
);
self.handle_clone_worktree(task_id, target_dir).await?;
}
DaemonCommand::CheckTargetExists { task_id, target_dir } => {
tracing::debug!(
task_id = %task_id,
target_dir = %target_dir,
"Checking if target directory exists"
);
self.handle_check_target_exists(task_id, target_dir).await?;
}
// =========================================================================
// Contract File Commands
// =========================================================================
DaemonCommand::ReadRepoFile {
request_id,
contract_id,
file_path,
repo_path,
} => {
tracing::info!(
request_id = %request_id,
contract_id = %contract_id,
file_path = %file_path,
repo_path = %repo_path,
"Reading file from repository"
);
self.handle_read_repo_file(request_id, file_path, repo_path).await?;
}
DaemonCommand::CreateBranch {
task_id,
branch_name,
from_ref,
} => {
tracing::info!(
task_id = %task_id,
branch_name = %branch_name,
from_ref = ?from_ref,
"Creating branch"
);
self.handle_create_branch(task_id, branch_name, from_ref).await?;
}
DaemonCommand::MergeTaskToTarget {
task_id,
target_branch,
squash,
} => {
tracing::info!(
task_id = %task_id,
target_branch = ?target_branch,
squash = squash,
"Merging task to target branch"
);
self.handle_merge_task_to_target(task_id, target_branch, squash).await?;
}
DaemonCommand::CreatePR {
task_id,
title,
body,
base_branch,
branch,
} => {
tracing::info!(
task_id = %task_id,
title = %title,
base_branch = ?base_branch,
branch = %branch,
"Creating pull request"
);
self.handle_create_pr(task_id, title, body, base_branch, branch).await?;
}
DaemonCommand::GetTaskDiff {
task_id,
} => {
tracing::info!(task_id = %task_id, "Getting task diff");
self.handle_get_task_diff(task_id).await?;
}
DaemonCommand::GetWorktreeInfo {
task_id,
} => {
tracing::info!(task_id = %task_id, "Getting worktree info");
self.handle_get_worktree_info(task_id).await?;
}
DaemonCommand::CommitWorktree { task_id, message } => {
tracing::info!(task_id = %task_id, "Committing worktree changes");
self.handle_commit_worktree(task_id, message).await?;
}
DaemonCommand::CreateCheckpoint {
task_id,
message,
} => {
tracing::info!(task_id = %task_id, "Creating checkpoint");
self.handle_create_checkpoint(task_id, message).await?;
}
DaemonCommand::CleanupWorktree {
task_id,
delete_branch,
} => {
tracing::info!(
task_id = %task_id,
delete_branch = delete_branch,
"Cleaning up worktree"
);
self.handle_cleanup_worktree(task_id, delete_branch).await?;
}
DaemonCommand::InheritGitConfig { source_dir } => {
tracing::info!(source_dir = ?source_dir, "Inheriting git config");
self.handle_inherit_git_config(source_dir).await?;
}
DaemonCommand::CreateExportPatch { task_id, base_sha } => {
tracing::info!(task_id = %task_id, base_sha = ?base_sha, "Creating export patch");
self.handle_create_export_patch(task_id, base_sha).await?;
}
DaemonCommand::RestartDaemon => {
tracing::info!("Received restart command from server, initiating daemon restart...");
// Shutdown all running tasks gracefully
self.shutdown_all_processes(std::time::Duration::from_secs(5)).await;
// Exit the process - the daemon should be restarted by a process manager
// or the user can restart it manually
tracing::info!("Daemon restart: exiting process with code 42 (restart requested)");
std::process::exit(42);
}
DaemonCommand::TriggerReauth { request_id } => {
tracing::info!(request_id = %request_id, "Received reauth trigger command from server");
let claude_command = self.process_manager.claude_command().to_string();
let ws_tx = self.ws_tx.clone();
// Spawn in a task so it doesn't block command handling
tokio::spawn(async move {
match get_oauth_login_url(&claude_command).await {
Some(flow_result) => {
tracing::info!(request_id = %request_id, login_url = %flow_result.login_url, "Got OAuth login URL for reauth");
// Send url_ready status immediately
let msg = DaemonMessage::ReauthStatus {
request_id,
status: "url_ready".to_string(),
login_url: Some(flow_result.login_url),
error: None,
token_saved: false,
};
let _ = ws_tx.send(msg).await;
// Now wait for the token to be detected and saved (up to 10 minutes)
let ws_tx_token = ws_tx.clone();
tokio::spawn(async move {
match tokio::time::timeout(
std::time::Duration::from_secs(600),
flow_result.token_rx,
).await {
Ok(Ok(_token)) => {
tracing::info!(request_id = %request_id, "OAuth token received and saved, reporting completion");
let msg = DaemonMessage::ReauthStatus {
request_id,
status: "completed".to_string(),
login_url: None,
error: None,
token_saved: true,
};
let _ = ws_tx_token.send(msg).await;
}
Ok(Err(_)) => {
tracing::warn!(request_id = %request_id, "Token channel closed without receiving token");
let msg = DaemonMessage::ReauthStatus {
request_id,
status: "failed".to_string(),
login_url: None,
error: Some("setup-token process ended without producing a token".to_string()),
token_saved: false,
};
let _ = ws_tx_token.send(msg).await;
}
Err(_) => {
tracing::warn!(request_id = %request_id, "Timed out waiting for OAuth token (10 min)");
let msg = DaemonMessage::ReauthStatus {
request_id,
status: "failed".to_string(),
login_url: None,
error: Some("Timed out waiting for authentication to complete".to_string()),
token_saved: false,
};
let _ = ws_tx_token.send(msg).await;
}
}
});
}
None => {
tracing::error!(request_id = %request_id, "Failed to get OAuth login URL for reauth");
let msg = DaemonMessage::ReauthStatus {
request_id,
status: "failed".to_string(),
login_url: None,
error: Some("Failed to get OAuth login URL from setup-token".to_string()),
token_saved: false,
};
let _ = ws_tx.send(msg).await;
}
}
});
}
DaemonCommand::SubmitAuthCode { request_id, code } => {
tracing::info!(request_id = %request_id, "Received auth code submission from server");
let ws_tx = self.ws_tx.clone();
if send_auth_code(&code) {
tracing::info!(request_id = %request_id, "Auth code forwarded to setup-token for reauth");
// Wait a short time then report completion
// (the setup-token process takes a moment to complete)
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
let msg = DaemonMessage::ReauthStatus {
request_id,
status: "completed".to_string(),
login_url: None,
error: None,
token_saved: false,
};
let _ = ws_tx.send(msg).await;
});
} else {
tracing::warn!(request_id = %request_id, "No pending auth flow to receive code for reauth");
let msg = DaemonMessage::ReauthStatus {
request_id,
status: "failed".to_string(),
login_url: None,
error: Some("No pending auth flow to receive the code. Try triggering reauth again.".to_string()),
token_saved: false,
};
let _ = self.ws_tx.send(msg).await;
}
}
DaemonCommand::ApplyPatchToWorktree {
target_task_id,
source_task_id,
patch_data,
base_sha,
} => {
tracing::info!(
target_task_id = %target_task_id,
source_task_id = %source_task_id,
base_sha = %base_sha,
"Applying patch from cross-daemon task to worktree"
);
self.handle_apply_patch_to_worktree(target_task_id, source_task_id, patch_data, base_sha).await?;
}
}
Ok(())
}
/// Spawn a new task.
#[allow(clippy::too_many_arguments)]
pub async fn spawn_task(
&self,
task_id: Uuid,
task_name: String,
plan: String,
repo_url: Option<String>,
base_branch: Option<String>,
target_branch: Option<String>,
parent_task_id: Option<Uuid>,
depth: i32,
is_orchestrator: bool,
is_supervisor: bool,
target_repo_path: Option<String>,
completion_action: Option<String>,
continue_from_task_id: Option<Uuid>,
copy_files: Option<Vec<String>>,
contract_id: Option<Uuid>,
autonomous_loop: bool,
resume_session: bool,
conversation_history: Option<serde_json::Value>,
patch_data: Option<String>,
patch_base_sha: Option<String>,
local_only: bool,
auto_merge_local: bool,
supervisor_worktree_task_id: Option<Uuid>,
directive_id: Option<Uuid>,
) -> TaskResult<()> {
tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, patch_available = patch_data.is_some(), "=== SPAWN_TASK START ===");
// Check if task already exists - allow re-spawning if in terminal state
// or if resuming a supervisor (supervisors stay in Running state after Claude exits)
{
let mut tasks = self.tasks.write().await;
if let Some(existing) = tasks.get(&task_id) {
let can_respawn = existing.state.is_terminal()
|| (resume_session && existing.is_supervisor);
if can_respawn {
// Task exists but can be re-spawned (terminal state or supervisor resume)
tracing::info!(task_id = %task_id, old_state = ?existing.state, resume_session = resume_session, is_supervisor = existing.is_supervisor, "Removing task to allow re-spawn");
tasks.remove(&task_id);
} else {
// Task is still active, reject
tracing::warn!(task_id = %task_id, state = ?existing.state, "Task already exists and is active, rejecting spawn");
return Err(TaskError::AlreadyExists(task_id));
}
}
}
// Acquire concurrency slot (contract-based concurrency)
tracing::info!(task_id = %task_id, contract_id = ?contract_id, "Acquiring concurrency slot...");
let concurrency_key = self.try_acquire_concurrency_slot(contract_id, task_id).await?;
tracing::info!(task_id = %task_id, concurrency_key = %concurrency_key, "Concurrency slot acquired");
// Create task entry
tracing::info!(task_id = %task_id, "Creating task entry in state: Initializing");
let task = ManagedTask {
id: task_id,
task_name: task_name.clone(),
state: TaskState::Initializing,
worktree: None,
plan: plan.clone(),
repo_source: repo_url.clone(),
base_branch: base_branch.clone(),
target_branch: target_branch.clone(),
parent_task_id,
depth,
is_orchestrator,
is_supervisor,
target_repo_path: target_repo_path.clone(),
completion_action: completion_action.clone(),
continue_from_task_id,
copy_files: copy_files.clone(),
contract_id,
concurrency_key,
autonomous_loop,
local_only,
auto_merge_local,
merge_to_supervisor_task_id: None, // Set later if cross-daemon
supervisor_worktree_task_id,
created_at: Instant::now(),
started_at: None,
completed_at: None,
error: None,
};
// Persist task to local database for crash recovery
self.persist_task_to_local_db(&task);
self.tasks.write().await.insert(task_id, task);
tracing::info!(task_id = %task_id, "Task entry created and stored");
// Notify server of status change
tracing::info!(task_id = %task_id, "Notifying server: pending -> initializing");
self.send_status_change(task_id, "pending", "initializing").await;
// Spawn task in background
tracing::info!(task_id = %task_id, "Spawning background task runner");
let inner = self.clone_inner();
tokio::spawn(async move {
tracing::info!(task_id = %task_id, "Background task runner started");
if let Err(e) = inner.run_task(
task_id, task_name, plan, repo_url, base_branch, target_branch,
is_orchestrator, is_supervisor, target_repo_path, completion_action,
continue_from_task_id, copy_files, contract_id, autonomous_loop, resume_session,
conversation_history, patch_data, patch_base_sha, local_only, auto_merge_local,
supervisor_worktree_task_id, directive_id,
).await {
tracing::error!(task_id = %task_id, error = %e, "Task execution failed");
inner.mark_failed(task_id, &e.to_string()).await;
}
// Release concurrency slot
inner.release_concurrency_slot(concurrency_key).await;
tracing::info!(task_id = %task_id, concurrency_key = %concurrency_key, "Background task runner completed, concurrency slot released");
});
tracing::info!(task_id = %task_id, "=== SPAWN_TASK END (task running in background) ===");
Ok(())
}
/// Clone inner state for spawned tasks.
fn clone_inner(&self) -> TaskManagerInner {
TaskManagerInner {
worktree_manager: self.worktree_manager.clone(),
process_manager: self.process_manager.clone(),
temp_manager: self.temp_manager.clone(),
tasks: self.tasks.clone(),
ws_tx: self.ws_tx.clone(),
task_inputs: self.task_inputs.clone(),
active_pids: self.active_pids.clone(),
git_user_email: self.git_user_email.clone(),
git_user_name: self.git_user_name.clone(),
api_url: self.config.api_url.clone(),
api_key: self.config.api_key.clone(),
heartbeat_commit_interval_secs: self.config.heartbeat_commit_interval_secs,
contract_task_counts: self.contract_task_counts.clone(),
checkpoint_patches: self.config.checkpoint_patches.clone(),
local_db: self.local_db.clone(),
}
}
/// Interrupt a task.
pub async fn interrupt_task(&self, task_id: Uuid) -> TaskResult<()> {
let mut tasks = self.tasks.write().await;
let task = tasks.get_mut(&task_id).ok_or(TaskError::NotFound(task_id))?;
if task.state.is_terminal() {
return Ok(()); // Already done
}
let old_state = task.state;
task.state = TaskState::Interrupted;
task.completed_at = Some(Instant::now());
// Notify server
drop(tasks);
self.send_status_change(task_id, old_state.as_str(), "interrupted").await;
// Note: The process will be killed when the ClaudeProcess is dropped
// Worktrees are kept until explicitly deleted
Ok(())
}
/// Get list of active task IDs.
pub async fn active_task_ids(&self) -> Vec<Uuid> {
self.tasks
.read()
.await
.iter()
.filter(|(_, t)| t.state.is_active())
.map(|(id, _)| *id)
.collect()
}
/// Get task state.
pub async fn get_task_state(&self, task_id: Uuid) -> Option<TaskState> {
self.tasks.read().await.get(&task_id).map(|t| t.state)
}
/// Send status change notification to server.
async fn send_status_change(&self, task_id: Uuid, old_status: &str, new_status: &str) {
let msg = DaemonMessage::task_status_change(task_id, old_status, new_status);
let _ = self.ws_tx.send(msg).await;
}
// =========================================================================
// Merge Handler Methods
// =========================================================================
/// Get worktree path for a task, or return error if not found.
/// First checks in-memory tasks, then scans the worktrees directory.
async fn get_task_worktree_path(&self, task_id: Uuid) -> Result<std::path::PathBuf, DaemonError> {
// First try to get from in-memory tasks
{
let tasks = self.tasks.read().await;
if let Some(task) = tasks.get(&task_id) {
if let Some(ref worktree) = task.worktree {
return Ok(worktree.path.clone());
}
}
}
// Task not in memory - scan worktrees directory for matching task ID
let short_id = &task_id.to_string()[..8];
let worktrees_dir = self.worktree_manager.base_dir();
if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await {
while let Ok(Some(entry)) = entries.next_entry().await {
let name = entry.file_name();
let name_str = name.to_string_lossy();
if name_str.starts_with(short_id) {
let path = entry.path();
// Verify it's a valid git directory
if path.join(".git").exists() {
tracing::info!(
task_id = %task_id,
worktree_path = %path.display(),
"Found worktree by scanning directory"
);
return Ok(path);
}
}
}
}
Err(DaemonError::Task(TaskError::SetupFailed(
format!("No worktree found for task {}. The worktree may have been cleaned up.", task_id)
)))
}
/// Handle ListBranches command.
async fn handle_list_branches(&self, task_id: Uuid) -> Result<(), DaemonError> {
let worktree_path = self.get_task_worktree_path(task_id).await?;
match self.worktree_manager.list_task_branches(&worktree_path).await {
Ok(branches) => {
let branch_infos: Vec<BranchInfo> = branches
.into_iter()
.map(|b| BranchInfo {
name: b.name,
task_id: b.task_id,
is_merged: b.is_merged,
last_commit: b.last_commit,
last_commit_message: b.last_commit_message,
})
.collect();
let msg = DaemonMessage::BranchList {
task_id,
branches: branch_infos,
};
let _ = self.ws_tx.send(msg).await;
}
Err(e) => {
tracing::error!(task_id = %task_id, error = %e, "Failed to list branches");
let msg = DaemonMessage::MergeResult {
task_id,
success: false,
message: e.to_string(),
commit_sha: None,
conflicts: None,
};
let _ = self.ws_tx.send(msg).await;
}
}
Ok(())
}
/// Handle MergeStart command.
async fn handle_merge_start(&self, task_id: Uuid, source_branch: String) -> Result<(), DaemonError> {
let worktree_path = self.get_task_worktree_path(task_id).await?;
match self.worktree_manager.merge_branch(&worktree_path, &source_branch).await {
Ok(None) => {
// Merge succeeded without conflicts
let msg = DaemonMessage::MergeResult {
task_id,
success: true,
message: "Merge completed without conflicts".to_string(),
commit_sha: None,
conflicts: None,
};
let _ = self.ws_tx.send(msg).await;
}
Ok(Some(conflicts)) => {
// Merge has conflicts
let msg = DaemonMessage::MergeResult {
task_id,
success: false,
message: format!("Merge has {} conflicts", conflicts.len()),
commit_sha: None,
conflicts: Some(conflicts),
};
let _ = self.ws_tx.send(msg).await;
}
Err(e) => {
let msg = DaemonMessage::MergeResult {
task_id,
success: false,
message: e.to_string(),
commit_sha: None,
conflicts: None,
};
let _ = self.ws_tx.send(msg).await;
}
}
Ok(())
}
/// Handle MergeStatus command.
async fn handle_merge_status(&self, task_id: Uuid) -> Result<(), DaemonError> {
let worktree_path = self.get_task_worktree_path(task_id).await?;
match self.worktree_manager.get_merge_state(&worktree_path).await {
Ok(state) => {
let msg = DaemonMessage::MergeStatusResponse {
task_id,
in_progress: state.in_progress,
source_branch: if state.in_progress { Some(state.source_branch) } else { None },
conflicted_files: state.conflicted_files,
};
let _ = self.ws_tx.send(msg).await;
}
Err(e) => {
tracing::error!(task_id = %task_id, error = %e, "Failed to get merge status");
let msg = DaemonMessage::MergeStatusResponse {
task_id,
in_progress: false,
source_branch: None,
conflicted_files: vec![],
};
let _ = self.ws_tx.send(msg).await;
}
}
Ok(())
}
/// Handle MergeResolve command.
async fn handle_merge_resolve(&self, task_id: Uuid, file: String, strategy: String) -> Result<(), DaemonError> {
let worktree_path = self.get_task_worktree_path(task_id).await?;
let resolution = match strategy.to_lowercase().as_str() {
"ours" => ConflictResolution::Ours,
"theirs" => ConflictResolution::Theirs,
_ => {
let msg = DaemonMessage::MergeResult {
task_id,
success: false,
message: format!("Invalid strategy '{}', must be 'ours' or 'theirs'", strategy),
commit_sha: None,
conflicts: None,
};
let _ = self.ws_tx.send(msg).await;
return Ok(());
}
};
match self.worktree_manager.resolve_conflict(&worktree_path, &file, resolution).await {
Ok(()) => {
let msg = DaemonMessage::MergeResult {
task_id,
success: true,
message: format!("Resolved conflict in {}", file),
commit_sha: None,
conflicts: None,
};
let _ = self.ws_tx.send(msg).await;
}
Err(e) => {
let msg = DaemonMessage::MergeResult {
task_id,
success: false,
message: e.to_string(),
commit_sha: None,
conflicts: None,
};
let _ = self.ws_tx.send(msg).await;
}
}
Ok(())
}
/// Handle MergeCommit command.
async fn handle_merge_commit(&self, task_id: Uuid, message: String) -> Result<(), DaemonError> {
let worktree_path = self.get_task_worktree_path(task_id).await?;
match self.worktree_manager.commit_merge(&worktree_path, &message).await {
Ok(commit_sha) => {
// Track this merge as completed (extract subtask ID from branch if possible)
// For now, we'll track it when MergeSkip is called or based on branch names
let msg = DaemonMessage::MergeResult {
task_id,
success: true,
message: "Merge committed successfully".to_string(),
commit_sha: Some(commit_sha),
conflicts: None,
};
let _ = self.ws_tx.send(msg).await;
}
Err(e) => {
let msg = DaemonMessage::MergeResult {
task_id,
success: false,
message: e.to_string(),
commit_sha: None,
conflicts: None,
};
let _ = self.ws_tx.send(msg).await;
}
}
Ok(())
}
/// Handle MergeAbort command.
async fn handle_merge_abort(&self, task_id: Uuid) -> Result<(), DaemonError> {
let worktree_path = self.get_task_worktree_path(task_id).await?;
match self.worktree_manager.abort_merge(&worktree_path).await {
Ok(()) => {
let msg = DaemonMessage::MergeResult {
task_id,
success: true,
message: "Merge aborted".to_string(),
commit_sha: None,
conflicts: None,
};
let _ = self.ws_tx.send(msg).await;
}
Err(e) => {
let msg = DaemonMessage::MergeResult {
task_id,
success: false,
message: e.to_string(),
commit_sha: None,
conflicts: None,
};
let _ = self.ws_tx.send(msg).await;
}
}
Ok(())
}
/// Handle MergeSkip command.
async fn handle_merge_skip(&self, task_id: Uuid, subtask_id: Uuid, reason: String) -> Result<(), DaemonError> {
// Record that this subtask was skipped
{
let mut trackers = self.merge_trackers.write().await;
let tracker = trackers.entry(task_id).or_insert_with(MergeTracker::default);
tracker.skipped_subtasks.insert(subtask_id, reason.clone());
}
let msg = DaemonMessage::MergeResult {
task_id,
success: true,
message: format!("Subtask {} skipped: {}", subtask_id, reason),
commit_sha: None,
conflicts: None,
};
let _ = self.ws_tx.send(msg).await;
Ok(())
}
/// Handle CheckMergeComplete command.
async fn handle_check_merge_complete(&self, task_id: Uuid) -> Result<(), DaemonError> {
let worktree_path = self.get_task_worktree_path(task_id).await?;
// Get all task branches
let branches = match self.worktree_manager.list_task_branches(&worktree_path).await {
Ok(b) => b,
Err(e) => {
let msg = DaemonMessage::MergeCompleteCheck {
task_id,
can_complete: false,
unmerged_branches: vec![format!("Error listing branches: {}", e)],
merged_count: 0,
skipped_count: 0,
};
let _ = self.ws_tx.send(msg).await;
return Ok(());
}
};
// Get tracker state
let trackers = self.merge_trackers.read().await;
let empty_merged: HashSet<Uuid> = HashSet::new();
let empty_skipped: HashMap<Uuid, String> = HashMap::new();
let tracker = trackers.get(&task_id);
let merged_set = tracker.map(|t| &t.merged_subtasks).unwrap_or(&empty_merged);
let skipped_set = tracker.map(|t| &t.skipped_subtasks).unwrap_or(&empty_skipped);
let mut merged_count = 0u32;
let mut skipped_count = 0u32;
let mut unmerged_branches = Vec::new();
for branch in &branches {
if branch.is_merged {
merged_count += 1;
} else if let Some(subtask_id) = branch.task_id {
if merged_set.contains(&subtask_id) {
merged_count += 1;
} else if skipped_set.contains_key(&subtask_id) {
skipped_count += 1;
} else {
unmerged_branches.push(branch.name.clone());
}
} else {
// Branch without task ID - check if it's merged
unmerged_branches.push(branch.name.clone());
}
}
let can_complete = unmerged_branches.is_empty();
let msg = DaemonMessage::MergeCompleteCheck {
task_id,
can_complete,
unmerged_branches,
merged_count,
skipped_count,
};
let _ = self.ws_tx.send(msg).await;
Ok(())
}
/// Mark a subtask as merged in the tracker.
#[allow(dead_code)]
pub async fn mark_subtask_merged(&self, orchestrator_task_id: Uuid, subtask_id: Uuid) {
let mut trackers = self.merge_trackers.write().await;
let tracker = trackers.entry(orchestrator_task_id).or_insert_with(MergeTracker::default);
tracker.merged_subtasks.insert(subtask_id);
}
// =========================================================================
// Completion Action Handler Methods
// =========================================================================
/// Handle RetryCompletionAction command.
async fn handle_retry_completion_action(
&self,
task_id: Uuid,
task_name: String,
action: String,
target_repo_path: String,
target_branch: Option<String>,
) -> Result<(), DaemonError> {
// Get the task's worktree path
let worktree_path = self.get_task_worktree_path(task_id).await?;
// Execute the completion action
let inner = self.clone_inner();
let result = inner.execute_completion_action(
task_id,
&task_name,
&worktree_path,
&action,
Some(target_repo_path.as_str()),
target_branch.as_deref(),
).await;
// Send result back to server
let msg = match result {
Ok(pr_url) => DaemonMessage::CompletionActionResult {
task_id,
success: true,
message: match action.as_str() {
"branch" => format!("Branch pushed to {}", target_repo_path),
"merge" => format!("Merged into {}", target_branch.as_deref().unwrap_or("main")),
"pr" => format!("Pull request created"),
_ => format!("Completion action '{}' executed", action),
},
pr_url,
},
Err(e) => DaemonMessage::CompletionActionResult {
task_id,
success: false,
message: e,
pr_url: None,
},
};
let _ = self.ws_tx.send(msg).await;
Ok(())
}
/// Handle CloneWorktree command.
async fn handle_clone_worktree(
&self,
task_id: Uuid,
target_dir: String,
) -> Result<(), DaemonError> {
// Get the task's worktree path
let worktree_path = self.get_task_worktree_path(task_id).await?;
// Expand tilde in target path
let target_path = crate::daemon::worktree::expand_tilde(&target_dir);
// Clone the worktree to target directory
let result = self.worktree_manager.clone_worktree_to_directory(
&worktree_path,
&target_path,
).await;
// Send result back to server
let msg = match result {
Ok(message) => DaemonMessage::CloneWorktreeResult {
task_id,
success: true,
message,
target_dir: Some(target_path.to_string_lossy().to_string()),
},
Err(e) => DaemonMessage::CloneWorktreeResult {
task_id,
success: false,
message: e.to_string(),
target_dir: None,
},
};
let _ = self.ws_tx.send(msg).await;
Ok(())
}
/// Handle CheckTargetExists command.
async fn handle_check_target_exists(
&self,
task_id: Uuid,
target_dir: String,
) -> Result<(), DaemonError> {
// Expand tilde in target path
let target_path = crate::daemon::worktree::expand_tilde(&target_dir);
// Check if target exists
let exists = self.worktree_manager.target_directory_exists(&target_path).await;
// Send result back to server
let msg = DaemonMessage::CheckTargetExistsResult {
task_id,
exists,
target_dir: target_path.to_string_lossy().to_string(),
};
let _ = self.ws_tx.send(msg).await;
Ok(())
}
/// Handle CleanupWorktree command.
///
/// Removes a task's worktree and optionally its branch.
/// Used when a contract is completed or deleted to clean up associated task worktrees.
async fn handle_cleanup_worktree(
&self,
task_id: Uuid,
delete_branch: bool,
) -> Result<(), DaemonError> {
// Try to get the worktree path, but don't fail if not found
let worktree_result = self.get_task_worktree_path(task_id).await;
let (success, message) = match worktree_result {
Ok(worktree_path) => {
// Remove the worktree
match self.worktree_manager.remove_worktree(&worktree_path, delete_branch).await {
Ok(()) => {
tracing::info!(
task_id = %task_id,
worktree_path = %worktree_path.display(),
delete_branch = delete_branch,
"Worktree cleaned up successfully"
);
// Also remove task from in-memory tracking
self.tasks.write().await.remove(&task_id);
self.task_inputs.write().await.remove(&task_id);
self.merge_trackers.write().await.remove(&task_id);
self.active_pids.write().await.remove(&task_id);
(true, format!("Worktree cleaned up: {}", worktree_path.display()))
}
Err(e) => {
tracing::warn!(
task_id = %task_id,
worktree_path = %worktree_path.display(),
error = %e,
"Failed to remove worktree"
);
(false, format!("Failed to remove worktree: {}", e))
}
}
}
Err(_) => {
// Worktree not found - this is OK, it may have already been cleaned up
tracing::debug!(
task_id = %task_id,
"No worktree found for task, may have already been cleaned up"
);
// Still remove from in-memory tracking
self.tasks.write().await.remove(&task_id);
self.task_inputs.write().await.remove(&task_id);
self.merge_trackers.write().await.remove(&task_id);
self.active_pids.write().await.remove(&task_id);
(true, "No worktree found, task tracking cleaned up".to_string())
}
};
// Send result back to server
let msg = DaemonMessage::CleanupWorktreeResult {
task_id,
success,
message,
};
let _ = self.ws_tx.send(msg).await;
Ok(())
}
/// Handle CreateExportPatch command.
///
/// Creates an uncompressed, human-readable git patch for export.
async fn handle_create_export_patch(
&self,
task_id: Uuid,
base_sha: Option<String>,
) -> Result<(), DaemonError> {
// Get task's worktree path
let worktree_result = self.get_task_worktree_path(task_id).await;
let msg = match worktree_result {
Ok(worktree_path) => {
// Create the export patch
match storage::create_export_patch(&worktree_path, base_sha.as_deref()).await {
Ok(result) => {
tracing::info!(
task_id = %task_id,
files_count = result.files_count,
lines_added = result.lines_added,
lines_removed = result.lines_removed,
base_commit_sha = %result.base_commit_sha,
"Export patch created successfully"
);
DaemonMessage::ExportPatchCreated {
task_id,
success: true,
patch_content: Some(result.patch_content),
files_count: Some(result.files_count),
lines_added: Some(result.lines_added),
lines_removed: Some(result.lines_removed),
base_commit_sha: Some(result.base_commit_sha),
error: None,
}
}
Err(e) => {
tracing::warn!(
task_id = %task_id,
error = %e,
"Failed to create export patch"
);
DaemonMessage::ExportPatchCreated {
task_id,
success: false,
patch_content: None,
files_count: None,
lines_added: None,
lines_removed: None,
base_commit_sha: None,
error: Some(e.to_string()),
}
}
}
}
Err(e) => {
tracing::warn!(
task_id = %task_id,
error = %e,
"Failed to get worktree path for export patch"
);
DaemonMessage::ExportPatchCreated {
task_id,
success: false,
patch_content: None,
files_count: None,
lines_added: None,
lines_removed: None,
base_commit_sha: None,
error: Some(format!("Task not found or has no worktree: {}", e)),
}
}
};
let _ = self.ws_tx.send(msg).await;
Ok(())
}
/// Handle ReadRepoFile command.
///
/// Reads a file from a repository on the daemon's filesystem and sends
/// the content back to the server for syncing contract files.
async fn handle_read_repo_file(
&self,
request_id: Uuid,
file_path: String,
repo_path: String,
) -> Result<(), DaemonError> {
// Expand tilde in repo path
let repo_path_expanded = crate::daemon::worktree::expand_tilde(&repo_path);
// Construct full file path
let full_path = repo_path_expanded.join(&file_path);
// Try to read the file
let (content, success, error) = match tokio::fs::read_to_string(&full_path).await {
Ok(content) => (Some(content), true, None),
Err(e) => {
tracing::warn!(
request_id = %request_id,
file_path = %file_path,
repo_path = %repo_path,
full_path = %full_path.display(),
error = %e,
"Failed to read repo file"
);
(None, false, Some(e.to_string()))
}
};
// Send result back to server
let msg = DaemonMessage::RepoFileContent {
request_id,
file_path,
content,
success,
error,
};
let _ = self.ws_tx.send(msg).await;
Ok(())
}
/// Handle CreateBranch command - create a new branch in a task's worktree.
async fn handle_create_branch(
&self,
task_id: Uuid,
branch_name: String,
from_ref: Option<String>,
) -> Result<(), DaemonError> {
// Get task's worktree path
let worktree_path = {
let tasks = self.tasks.read().await;
tasks.get(&task_id)
.and_then(|t| t.worktree.as_ref())
.map(|w| w.path.clone())
};
let (success, message) = if let Some(path) = worktree_path {
// Build git checkout command
let mut cmd = tokio::process::Command::new("git");
cmd.current_dir(&path);
cmd.arg("checkout").arg("-b").arg(&branch_name);
if let Some(ref from) = from_ref {
cmd.arg(from);
}
match cmd.output().await {
Ok(output) => {
if output.status.success() {
(true, format!("Branch '{}' created successfully", branch_name))
} else {
let stderr = String::from_utf8_lossy(&output.stderr);
(false, format!("Failed to create branch: {}", stderr))
}
}
Err(e) => (false, format!("Failed to execute git: {}", e)),
}
} else {
(false, format!("Task {} not found or has no worktree", task_id))
};
let msg = DaemonMessage::BranchCreated {
task_id,
success,
branch_name,
message,
};
let _ = self.ws_tx.send(msg).await;
Ok(())
}
/// Handle MergeTaskToTarget command - merge a task's changes to a target branch.
async fn handle_merge_task_to_target(
&self,
task_id: Uuid,
target_branch: Option<String>,
squash: bool,
) -> Result<(), DaemonError> {
// Get worktree path - this works even for completed tasks by scanning worktrees directory
let worktree_path = match self.get_task_worktree_path(task_id).await {
Ok(path) => path,
Err(e) => {
tracing::warn!(task_id = %task_id, error = %e, "Failed to find worktree for merge");
let msg = DaemonMessage::MergeToTargetResult {
task_id,
success: false,
message: format!("Task {} not found or has no worktree: {}", task_id, e),
commit_sha: None,
conflicts: None,
};
let _ = self.ws_tx.send(msg).await;
return Ok(());
}
};
// Get base_branch from in-memory tasks if available (for fallback target branch)
let base_branch = {
let tasks = self.tasks.read().await;
tasks.get(&task_id).and_then(|t| t.base_branch.clone())
};
let target = target_branch.unwrap_or_else(|| base_branch.unwrap_or_else(|| "main".to_string()));
tracing::info!(
task_id = %task_id,
worktree_path = %worktree_path.display(),
target_branch = %target,
squash = squash,
"Starting merge operation"
);
// First, stage and commit any uncommitted changes
let add_result = tokio::process::Command::new("git")
.current_dir(&worktree_path)
.args(["add", "-A"])
.output()
.await;
let (success, message, commit_sha, conflicts) = if let Err(e) = add_result {
(false, format!("Failed to stage changes: {}", e), None, None)
} else {
// Commit if there are staged changes
let commit_result = tokio::process::Command::new("git")
.current_dir(&worktree_path)
.args(["commit", "-m", "Task completion checkpoint", "--allow-empty"])
.output()
.await;
if let Err(e) = commit_result {
tracing::warn!(task_id = %task_id, error = %e, "Commit failed (may be empty)");
}
// Get current branch name
let branch_output = tokio::process::Command::new("git")
.current_dir(&worktree_path)
.args(["rev-parse", "--abbrev-ref", "HEAD"])
.output()
.await;
let source_branch = branch_output
.map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string())
.unwrap_or_else(|_| "unknown".to_string());
// Checkout target branch
let checkout = tokio::process::Command::new("git")
.current_dir(&worktree_path)
.args(["checkout", &target])
.output()
.await;
match checkout {
Ok(output) if output.status.success() => {
// Merge the source branch
let mut merge_cmd = tokio::process::Command::new("git");
merge_cmd.current_dir(&worktree_path);
merge_cmd.arg("merge");
if squash {
merge_cmd.arg("--squash");
}
merge_cmd.arg(&source_branch);
merge_cmd.arg("-m").arg(format!("Merge task {} into {}", task_id, target));
match merge_cmd.output().await {
Ok(output) if output.status.success() => {
// Get the commit SHA
let sha_output = tokio::process::Command::new("git")
.current_dir(&worktree_path)
.args(["rev-parse", "HEAD"])
.output()
.await;
let sha = sha_output
.ok()
.map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string());
if squash {
// For squash merge, we need to commit
let _ = tokio::process::Command::new("git")
.current_dir(&worktree_path)
.args(["commit", "-m", &format!("Squashed merge of task {}", task_id)])
.output()
.await;
}
(true, format!("Merged {} into {}", source_branch, target), sha, None)
}
Ok(output) => {
let stderr = String::from_utf8_lossy(&output.stderr);
// Check for merge conflicts
if stderr.contains("CONFLICT") {
let conflict_files = stderr
.lines()
.filter(|l| l.contains("CONFLICT"))
.map(|l| l.to_string())
.collect::<Vec<_>>();
(false, "Merge conflicts detected".to_string(), None, Some(conflict_files))
} else {
(false, format!("Merge failed: {}", stderr), None, None)
}
}
Err(e) => (false, format!("Failed to merge: {}", e), None, None),
}
}
Ok(output) => {
let stderr = String::from_utf8_lossy(&output.stderr);
(false, format!("Failed to checkout target branch: {}", stderr), None, None)
}
Err(e) => (false, format!("Failed to checkout: {}", e), None, None),
}
};
let msg = DaemonMessage::MergeToTargetResult {
task_id,
success,
message,
commit_sha,
conflicts,
};
let _ = self.ws_tx.send(msg).await;
Ok(())
}
/// Handle CreatePR command - create a pull request for a task's changes.
async fn handle_create_pr(
&self,
task_id: Uuid,
title: String,
body: Option<String>,
base_branch: Option<String>,
branch: String,
) -> Result<(), DaemonError> {
// Get worktree path - this works even for completed tasks by scanning worktrees directory
let worktree_path = match self.get_task_worktree_path(task_id).await {
Ok(path) => path,
Err(e) => {
tracing::error!(task_id = %task_id, error = %e, "Failed to find worktree for PR creation");
let msg = DaemonMessage::PRCreated {
task_id,
success: false,
message: format!("Task {} not found or has no worktree: {}", task_id, e),
pr_url: None,
pr_number: None,
};
let _ = self.ws_tx.send(msg).await;
return Ok(());
}
};
// Detect base branch if not provided
let base_branch = match base_branch {
Some(b) => b,
None => {
match self.worktree_manager.detect_default_branch(&worktree_path).await {
Ok(detected) => {
tracing::info!(task_id = %task_id, detected_branch = %detected, "Auto-detected base branch");
detected
}
Err(e) => {
tracing::error!(task_id = %task_id, error = %e, "Failed to detect default branch");
let msg = DaemonMessage::PRCreated {
task_id,
success: false,
message: format!("Failed to detect default branch: {}", e),
pr_url: None,
pr_number: None,
};
let _ = self.ws_tx.send(msg).await;
return Ok(());
}
}
}
};
tracing::info!(
task_id = %task_id,
base_branch = %base_branch,
branch = %branch,
worktree_path = %worktree_path.display(),
"Creating PR"
);
// Push the branch to origin
let push_refspec = format!("HEAD:refs/heads/{}", branch);
let push_result = tokio::process::Command::new("git")
.current_dir(&worktree_path)
.args(["push", "-u", "origin", &push_refspec])
.output()
.await;
let (success, message, pr_url, pr_number) = match push_result {
Err(e) => {
tracing::error!(error = %e, "Failed to execute git push");
(false, format!("Failed to push branch: {}", e), None, None)
}
Ok(output) if !output.status.success() => {
let stderr = String::from_utf8_lossy(&output.stderr);
tracing::error!(stderr = %stderr, "git push failed");
(false, format!("Failed to push branch: {}", stderr), None, None)
}
Ok(_) => {
tracing::info!("Branch pushed successfully, creating PR");
// Create PR using gh CLI
let mut pr_cmd = tokio::process::Command::new("gh");
pr_cmd.current_dir(&worktree_path);
pr_cmd.args(["pr", "create", "--title", &title, "--base", &base_branch, "--head", &branch]);
if let Some(ref body_text) = body {
pr_cmd.args(["--body", body_text]);
} else {
pr_cmd.args(["--body", ""]);
}
match pr_cmd.output().await {
Ok(output) if output.status.success() => {
let stdout = String::from_utf8_lossy(&output.stdout);
// gh pr create outputs the PR URL
let url = stdout.lines().last().map(|s| s.trim().to_string());
// Extract PR number from URL
let number = url.as_ref().and_then(|u| {
u.split('/').last().and_then(|n| n.parse::<i32>().ok())
});
tracing::info!(pr_url = ?url, pr_number = ?number, "PR created successfully");
(true, "Pull request created".to_string(), url, number)
}
Ok(output) => {
let stderr = String::from_utf8_lossy(&output.stderr);
tracing::error!(stderr = %stderr, "gh pr create failed");
(false, format!("Failed to create PR: {}", stderr), None, None)
}
Err(e) => {
tracing::error!(error = %e, "Failed to execute gh command");
(false, format!("Failed to run gh: {}", e), None, None)
}
}
}
};
tracing::info!(
task_id = %task_id,
success = success,
message = %message,
pr_url = ?pr_url,
"PR creation completed"
);
let msg = DaemonMessage::PRCreated {
task_id,
success,
message,
pr_url,
pr_number,
};
let _ = self.ws_tx.send(msg).await;
Ok(())
}
/// Handle GetTaskDiff command - get the diff for a task's changes.
async fn handle_get_task_diff(
&self,
task_id: Uuid,
) -> Result<(), DaemonError> {
// Get task's worktree path
let worktree_path = {
let tasks = self.tasks.read().await;
tasks.get(&task_id)
.and_then(|t| t.worktree.as_ref())
.map(|w| w.path.clone())
};
let (success, diff, error) = if let Some(path) = worktree_path {
// Get diff of all changes (staged and unstaged)
let diff_result = tokio::process::Command::new("git")
.current_dir(&path)
.args(["diff", "HEAD"])
.output()
.await;
match diff_result {
Ok(output) if output.status.success() => {
let diff_text = String::from_utf8_lossy(&output.stdout).to_string();
if diff_text.is_empty() {
// No uncommitted changes, show diff from base
let base_diff = tokio::process::Command::new("git")
.current_dir(&path)
.args(["log", "-p", "--reverse", "HEAD~10..HEAD", "--"])
.output()
.await;
match base_diff {
Ok(o) => (true, Some(String::from_utf8_lossy(&o.stdout).to_string()), None),
Err(e) => (false, None, Some(format!("Failed to get diff: {}", e))),
}
} else {
(true, Some(diff_text), None)
}
}
Ok(output) => {
let stderr = String::from_utf8_lossy(&output.stderr);
(false, None, Some(format!("Git diff failed: {}", stderr)))
}
Err(e) => (false, None, Some(format!("Failed to run git: {}", e))),
}
} else {
(false, None, Some(format!("Task {} not found or has no worktree", task_id)))
};
let msg = DaemonMessage::TaskDiff {
task_id,
success,
diff,
error,
};
let _ = self.ws_tx.send(msg).await;
Ok(())
}
/// Handle CommitWorktree command - stage and commit changes in a task's worktree.
async fn handle_commit_worktree(
&self,
task_id: Uuid,
message: Option<String>,
) -> Result<(), DaemonError> {
// Get task's worktree path
let worktree_path = {
let tasks = self.tasks.read().await;
tasks.get(&task_id)
.and_then(|t| t.worktree.as_ref())
.map(|w| w.path.clone())
};
let (success, commit_sha, error) = if let Some(path) = worktree_path {
// Step 1: Check if there are changes to commit
let status_output = tokio::process::Command::new("git")
.current_dir(&path)
.args(["status", "--porcelain"])
.output()
.await;
let has_changes = match &status_output {
Ok(output) => !output.stdout.is_empty(),
Err(_) => false,
};
if !has_changes {
(true, None, Some("No changes to commit".to_string()))
} else {
// Step 2: Stage all changes
let add_result = tokio::process::Command::new("git")
.current_dir(&path)
.args(["add", "-A"])
.output()
.await;
match add_result {
Ok(output) if output.status.success() => {
// Step 3: Commit
let commit_msg = message.unwrap_or_else(|| "Worktree commit".to_string());
let commit_result = tokio::process::Command::new("git")
.current_dir(&path)
.args(["commit", "-m", &commit_msg])
.output()
.await;
match commit_result {
Ok(output) if output.status.success() => {
// Step 4: Get commit SHA
let sha_output = tokio::process::Command::new("git")
.current_dir(&path)
.args(["rev-parse", "HEAD"])
.output()
.await;
let sha = sha_output.ok()
.filter(|o| o.status.success())
.map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string());
(true, sha, None)
}
Ok(output) => {
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
(false, None, Some(format!("Git commit failed: {}", stderr)))
}
Err(e) => (false, None, Some(format!("Failed to run git commit: {}", e))),
}
}
Ok(output) => {
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
(false, None, Some(format!("Failed to stage changes: {}", stderr)))
}
Err(e) => (false, None, Some(format!("Failed to run git add: {}", e))),
}
}
} else {
(false, None, Some(format!("Task {} not found or has no worktree", task_id)))
};
let msg = DaemonMessage::WorktreeCommitResult {
task_id,
success,
commit_sha,
error,
};
let _ = self.ws_tx.send(msg).await;
Ok(())
}
/// Handle GetWorktreeInfo command - get worktree files, stats, branch info.
async fn handle_get_worktree_info(
&self,
task_id: Uuid,
) -> Result<(), DaemonError> {
// Get task's worktree path, branch, and base_branch
// If the task shares a supervisor's worktree, use the supervisor's worktree info
let task_info = {
let tasks = self.tasks.read().await;
if let Some(task) = tasks.get(&task_id) {
// Check if this task shares a supervisor's worktree
if let Some(supervisor_task_id) = task.supervisor_worktree_task_id {
// Use the supervisor's worktree
tasks.get(&supervisor_task_id).map(|supervisor| (
supervisor.worktree.as_ref().map(|w| w.path.clone()),
supervisor.worktree.as_ref().map(|w| w.branch.clone()),
supervisor.base_branch.clone(),
))
} else {
// Use the task's own worktree
Some((
task.worktree.as_ref().map(|w| w.path.clone()),
task.worktree.as_ref().map(|w| w.branch.clone()),
task.base_branch.clone(),
))
}
} else {
None
}
};
let (worktree_path, branch, base_branch) = match task_info {
Some((Some(path), branch, base_branch)) => (Some(path), branch, base_branch),
Some((None, _, _)) => (None, None, None),
None => (None, None, None),
};
if worktree_path.is_none() {
let msg = DaemonMessage::WorktreeInfoResult {
task_id,
success: true,
worktree_path: None,
exists: false,
files_changed: 0,
insertions: 0,
deletions: 0,
files: None,
branch: None,
head_sha: None,
error: None,
};
let _ = self.ws_tx.send(msg).await;
return Ok(());
}
let path = worktree_path.unwrap();
let path_str = path.to_string_lossy().to_string();
// Check if worktree exists
if !path.exists() {
let msg = DaemonMessage::WorktreeInfoResult {
task_id,
success: true,
worktree_path: Some(path_str),
exists: false,
files_changed: 0,
insertions: 0,
deletions: 0,
files: None,
branch,
head_sha: None,
error: None,
};
let _ = self.ws_tx.send(msg).await;
return Ok(());
}
// Get HEAD SHA
let head_sha = match tokio::process::Command::new("git")
.current_dir(&path)
.args(["rev-parse", "HEAD"])
.output()
.await
{
Ok(output) if output.status.success() => {
Some(String::from_utf8_lossy(&output.stdout).trim().to_string())
}
_ => None,
};
// Get changed files with status using git status --porcelain
let status_output = tokio::process::Command::new("git")
.current_dir(&path)
.args(["status", "--porcelain"])
.output()
.await;
let uncommitted_status_lines: Vec<(String, String)> = match status_output {
Ok(output) if output.status.success() => {
String::from_utf8_lossy(&output.stdout)
.lines()
.filter_map(|line| {
if line.len() < 3 {
return None;
}
let status = line[0..2].trim().to_string();
let file_path = line[3..].to_string();
Some((file_path, status))
})
.collect()
}
_ => vec![],
};
// If there are uncommitted changes, use them. Otherwise, compare against base branch.
// Track effective_base_branch for reuse in numstat query
let (status_lines, effective_base_for_diff) = if !uncommitted_status_lines.is_empty() {
(uncommitted_status_lines, None)
} else {
// No uncommitted changes - try to get committed changes vs base branch
// First, try to detect the base branch if not provided
let effective_base_branch = if let Some(ref base) = base_branch {
Some(base.clone())
} else {
// Auto-detect the default branch
self.worktree_manager.detect_default_branch(&path).await.ok()
};
if let Some(ref base) = effective_base_branch {
// Resolve the best diff base reference, handling missing remote refs
let resolved_diff_base = Self::resolve_diff_base(&path, base).await;
if let Some(ref diff_base) = resolved_diff_base {
// Get committed changes using git diff --name-status
let name_status_output = tokio::process::Command::new("git")
.current_dir(&path)
.args(["diff", "--name-status", diff_base])
.output()
.await;
let committed_status_lines: Vec<(String, String)> = match name_status_output {
Ok(output) if output.status.success() => {
String::from_utf8_lossy(&output.stdout)
.lines()
.filter_map(|line| {
let parts: Vec<&str> = line.splitn(2, '\t').collect();
if parts.len() >= 2 {
let status = parts[0].trim().to_string();
let file_path = parts[1].to_string();
Some((file_path, status))
} else {
None
}
})
.collect()
}
Ok(output) => {
let stderr = String::from_utf8_lossy(&output.stderr);
tracing::warn!(
diff_base = %diff_base,
stderr = %stderr,
"git diff --name-status failed with resolved diff base",
);
vec![]
}
Err(e) => {
tracing::warn!(
error = %e,
diff_base = %diff_base,
"Failed to execute git diff --name-status",
);
vec![]
}
};
if !committed_status_lines.is_empty() {
(committed_status_lines, resolved_diff_base)
} else {
(vec![], None)
}
} else {
(vec![], None)
}
} else {
(vec![], None)
}
};
// Get numstat for line counts
// If we have effective_base_for_diff (a resolved diff base string), use it directly
// Otherwise compare against HEAD for uncommitted changes
let mut file_stats: std::collections::HashMap<String, (i32, i32)> = std::collections::HashMap::new();
let numstat_output = if let Some(ref diff_base) = effective_base_for_diff {
tokio::process::Command::new("git")
.current_dir(&path)
.args(["diff", "--numstat", diff_base])
.output()
.await
} else {
tokio::process::Command::new("git")
.current_dir(&path)
.args(["diff", "HEAD", "--numstat"])
.output()
.await
};
if let Ok(output) = numstat_output {
if output.status.success() {
for line in String::from_utf8_lossy(&output.stdout).lines() {
let parts: Vec<&str> = line.split('\t').collect();
if parts.len() >= 3 {
let added = parts[0].parse::<i32>().unwrap_or(0);
let removed = parts[1].parse::<i32>().unwrap_or(0);
let file = parts[2].to_string();
file_stats.insert(file, (added, removed));
}
}
}
}
// Build file list with stats
let mut files_json = Vec::new();
let mut total_insertions = 0;
let mut total_deletions = 0;
for (file_path, status) in &status_lines {
let (lines_added, lines_removed) = file_stats.get(file_path).copied().unwrap_or((0, 0));
total_insertions += lines_added;
total_deletions += lines_removed;
files_json.push(serde_json::json!({
"path": file_path,
"status": status,
"linesAdded": lines_added,
"linesRemoved": lines_removed,
}));
}
let msg = DaemonMessage::WorktreeInfoResult {
task_id,
success: true,
worktree_path: Some(path_str),
exists: true,
files_changed: status_lines.len() as i32,
insertions: total_insertions,
deletions: total_deletions,
files: Some(serde_json::Value::Array(files_json)),
branch,
head_sha,
error: None,
};
let _ = self.ws_tx.send(msg).await;
Ok(())
}
/// Resolve the best diff base reference for a given base branch.
/// Tries origin/{base} first, then falls back to local {base} branch.
async fn resolve_diff_base(path: &std::path::PathBuf, base: &str) -> Option<String> {
// Try origin/{base} first
let origin_ref = format!("origin/{}", base);
let check_origin = tokio::process::Command::new("git")
.current_dir(path)
.args(["rev-parse", "--verify", &origin_ref])
.output()
.await;
if let Ok(output) = check_origin {
if output.status.success() {
return Some(format!("origin/{}...HEAD", base));
}
}
// Fall back to local branch
let check_local = tokio::process::Command::new("git")
.current_dir(path)
.args(["rev-parse", "--verify", base])
.output()
.await;
if let Ok(output) = check_local {
if output.status.success() {
return Some(format!("{}...HEAD", base));
}
}
None
}
/// Handle GetWorktreeDiff command - get git diff for a task's worktree.
async fn handle_get_worktree_diff(
&self,
task_id: Uuid,
file_path: Option<String>,
) -> Result<(), DaemonError> {
// Get task's worktree path, branch, and base_branch
// If the task shares a supervisor's worktree, use the supervisor's worktree info
let task_info = {
let tasks = self.tasks.read().await;
if let Some(task) = tasks.get(&task_id) {
if let Some(supervisor_task_id) = task.supervisor_worktree_task_id {
tasks.get(&supervisor_task_id).map(|supervisor| (
supervisor.worktree.as_ref().map(|w| w.path.clone()),
supervisor.base_branch.clone(),
))
} else {
Some((
task.worktree.as_ref().map(|w| w.path.clone()),
task.base_branch.clone(),
))
}
} else {
None
}
};
let (worktree_path, base_branch) = match task_info {
Some((Some(path), base_branch)) => (path, base_branch),
_ => {
let msg = DaemonMessage::WorktreeDiffResult {
task_id,
success: true,
diff: Some(String::new()),
error: None,
};
let _ = self.ws_tx.send(msg).await;
return Ok(());
}
};
if !worktree_path.exists() {
let msg = DaemonMessage::WorktreeDiffResult {
task_id,
success: false,
diff: None,
error: Some("Worktree path does not exist".to_string()),
};
let _ = self.ws_tx.send(msg).await;
return Ok(());
}
// Check for uncommitted changes first
let status_output = tokio::process::Command::new("git")
.current_dir(&worktree_path)
.args(["status", "--porcelain"])
.output()
.await;
let has_uncommitted = match &status_output {
Ok(output) if output.status.success() => {
!String::from_utf8_lossy(&output.stdout).trim().is_empty()
}
_ => false,
};
let diff_result = if has_uncommitted {
// Get diff for uncommitted changes (both staged and unstaged)
let mut args = vec!["diff".to_string(), "HEAD".to_string()];
if let Some(ref fp) = file_path {
args.push("--".to_string());
args.push(fp.clone());
}
let output = tokio::process::Command::new("git")
.current_dir(&worktree_path)
.args(&args)
.output()
.await;
match output {
Ok(out) if out.status.success() => {
let diff = String::from_utf8_lossy(&out.stdout).to_string();
// If diff is empty (e.g., for new untracked files), try git diff (no HEAD)
// and also try to show untracked file content
if diff.is_empty() {
// Try to show untracked files as diffs
let mut args2 = vec!["diff".to_string()];
if let Some(ref fp) = file_path {
args2.push("--".to_string());
args2.push(fp.clone());
}
let output2 = tokio::process::Command::new("git")
.current_dir(&worktree_path)
.args(&args2)
.output()
.await;
match output2 {
Ok(out2) if out2.status.success() => {
Ok(String::from_utf8_lossy(&out2.stdout).to_string())
}
_ => Ok(diff),
}
} else {
Ok(diff)
}
}
Ok(out) => Err(String::from_utf8_lossy(&out.stderr).to_string()),
Err(e) => Err(format!("Failed to run git diff: {}", e)),
}
} else {
// No uncommitted changes - compare against base branch
let effective_base_branch = if let Some(ref base) = base_branch {
Some(base.clone())
} else {
self.worktree_manager.detect_default_branch(&worktree_path).await.ok()
};
if let Some(ref base) = effective_base_branch {
let diff_base = format!("origin/{}...HEAD", base);
let mut args = vec!["diff".to_string(), diff_base];
if let Some(ref fp) = file_path {
args.push("--".to_string());
args.push(fp.clone());
}
let output = tokio::process::Command::new("git")
.current_dir(&worktree_path)
.args(&args)
.output()
.await;
match output {
Ok(out) if out.status.success() => {
Ok(String::from_utf8_lossy(&out.stdout).to_string())
}
Ok(out) => Err(String::from_utf8_lossy(&out.stderr).to_string()),
Err(e) => Err(format!("Failed to run git diff: {}", e)),
}
} else {
Ok(String::new())
}
};
let msg = match diff_result {
Ok(diff) => DaemonMessage::WorktreeDiffResult {
task_id,
success: true,
diff: Some(diff),
error: None,
},
Err(e) => DaemonMessage::WorktreeDiffResult {
task_id,
success: false,
diff: None,
error: Some(e),
},
};
let _ = self.ws_tx.send(msg).await;
Ok(())
}
/// Handle CreateCheckpoint command - stage all changes, commit, and get stats.
async fn handle_create_checkpoint(
&self,
task_id: Uuid,
message: String,
) -> Result<(), DaemonError> {
// Get task's worktree path and branch name
let task_info = {
let tasks = self.tasks.read().await;
tasks.get(&task_id).map(|t| (
t.worktree.as_ref().map(|w| w.path.clone()),
t.worktree.as_ref().map(|w| w.branch.clone()),
))
};
let (worktree_path, branch_name) = match task_info {
Some((Some(path), Some(branch))) => (path, branch),
Some((Some(path), None)) => {
// Try to get current branch from git
let branch = self.get_current_branch(&path).await.unwrap_or_else(|| "unknown".to_string());
(path, branch)
}
_ => {
let msg = DaemonMessage::CheckpointCreated {
task_id,
success: false,
commit_sha: None,
branch_name: None,
checkpoint_number: None,
files_changed: None,
lines_added: None,
lines_removed: None,
error: Some(format!("Task {} not found or has no worktree", task_id)),
message,
patch_data: None,
patch_base_sha: None,
patch_files_count: None,
};
let _ = self.ws_tx.send(msg).await;
return Ok(());
}
};
// Step 1: Check if there are changes to commit
let status_output = tokio::process::Command::new("git")
.current_dir(&worktree_path)
.args(["status", "--porcelain"])
.output()
.await;
let has_changes = match &status_output {
Ok(output) => !output.stdout.is_empty(),
Err(_) => false,
};
if !has_changes {
let msg = DaemonMessage::CheckpointCreated {
task_id,
success: false,
commit_sha: None,
branch_name: Some(branch_name),
checkpoint_number: None,
files_changed: None,
lines_added: None,
lines_removed: None,
error: Some("No changes to checkpoint".to_string()),
message,
patch_data: None,
patch_base_sha: None,
patch_files_count: None,
};
let _ = self.ws_tx.send(msg).await;
return Ok(());
}
// Step 2: Stage all changes
let add_result = tokio::process::Command::new("git")
.current_dir(&worktree_path)
.args(["add", "-A"])
.output()
.await;
if let Err(e) = add_result {
let msg = DaemonMessage::CheckpointCreated {
task_id,
success: false,
commit_sha: None,
branch_name: Some(branch_name),
checkpoint_number: None,
files_changed: None,
lines_added: None,
lines_removed: None,
error: Some(format!("Failed to stage changes: {}", e)),
message,
patch_data: None,
patch_base_sha: None,
patch_files_count: None,
};
let _ = self.ws_tx.send(msg).await;
return Ok(());
}
// Step 3: Get diff stats before commit
let (lines_added, lines_removed, files_changed) = self.get_staged_diff_stats(&worktree_path).await;
// Step 4: Create commit
let commit_result = tokio::process::Command::new("git")
.current_dir(&worktree_path)
.args(["commit", "-m", &message])
.output()
.await;
let commit_sha = match commit_result {
Ok(output) if output.status.success() => {
// Get the commit SHA
let sha_output = tokio::process::Command::new("git")
.current_dir(&worktree_path)
.args(["rev-parse", "HEAD"])
.output()
.await;
match sha_output {
Ok(o) => Some(String::from_utf8_lossy(&o.stdout).trim().to_string()),
Err(_) => None,
}
}
Ok(output) => {
let stderr = String::from_utf8_lossy(&output.stderr);
let msg = DaemonMessage::CheckpointCreated {
task_id,
success: false,
commit_sha: None,
branch_name: Some(branch_name),
checkpoint_number: None,
files_changed: Some(files_changed),
lines_added: Some(lines_added),
lines_removed: Some(lines_removed),
error: Some(format!("Commit failed: {}", stderr)),
message,
patch_data: None,
patch_base_sha: None,
patch_files_count: None,
};
let _ = self.ws_tx.send(msg).await;
return Ok(());
}
Err(e) => {
let msg = DaemonMessage::CheckpointCreated {
task_id,
success: false,
commit_sha: None,
branch_name: Some(branch_name),
checkpoint_number: None,
files_changed: None,
lines_added: None,
lines_removed: None,
error: Some(format!("Failed to execute git commit: {}", e)),
message,
patch_data: None,
patch_base_sha: None,
patch_files_count: None,
};
let _ = self.ws_tx.send(msg).await;
return Ok(());
}
};
// Success - send response (checkpoint_number will be assigned by server on DB insert)
// Note: Manual checkpoints don't include patches (only heartbeat commits do)
let msg = DaemonMessage::CheckpointCreated {
task_id,
success: true,
commit_sha,
branch_name: Some(branch_name),
checkpoint_number: None, // Server will assign from DB
files_changed: Some(files_changed),
lines_added: Some(lines_added),
lines_removed: Some(lines_removed),
error: None,
message,
patch_data: None,
patch_base_sha: None,
patch_files_count: None,
};
let _ = self.ws_tx.send(msg).await;
Ok(())
}
/// Get the current branch name from a worktree.
async fn get_current_branch(&self, worktree_path: &std::path::PathBuf) -> Option<String> {
let output = tokio::process::Command::new("git")
.current_dir(worktree_path)
.args(["branch", "--show-current"])
.output()
.await
.ok()?;
if output.status.success() {
Some(String::from_utf8_lossy(&output.stdout).trim().to_string())
} else {
None
}
}
/// Get diff stats for staged changes.
async fn get_staged_diff_stats(&self, worktree_path: &std::path::PathBuf) -> (i32, i32, serde_json::Value) {
// Get numstat for lines added/removed
let numstat = tokio::process::Command::new("git")
.current_dir(worktree_path)
.args(["diff", "--cached", "--numstat"])
.output()
.await;
let (mut total_added, mut total_removed) = (0i32, 0i32);
if let Ok(output) = numstat {
for line in String::from_utf8_lossy(&output.stdout).lines() {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 2 {
if let Ok(added) = parts[0].parse::<i32>() {
total_added += added;
}
if let Ok(removed) = parts[1].parse::<i32>() {
total_removed += removed;
}
}
}
}
// Get name-status for file changes
let name_status = tokio::process::Command::new("git")
.current_dir(worktree_path)
.args(["diff", "--cached", "--name-status"])
.output()
.await;
let mut files = Vec::new();
if let Ok(output) = name_status {
for line in String::from_utf8_lossy(&output.stdout).lines() {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 2 {
files.push(serde_json::json!({
"action": parts[0],
"path": parts[1]
}));
}
}
}
(total_added, total_removed, serde_json::json!(files))
}
/// Find worktree path for a task ID.
/// First checks in-memory tasks, then scans the worktrees directory.
async fn find_worktree_for_task_tm(&self, task_id: Uuid) -> Result<PathBuf, String> {
// First try to get from in-memory tasks
{
let tasks = self.tasks.read().await;
if let Some(task) = tasks.get(&task_id) {
if let Some(ref worktree) = task.worktree {
return Ok(worktree.path.clone());
}
}
}
// Task not in memory - scan worktrees directory for matching task ID
let short_id = &task_id.to_string()[..8];
let worktrees_dir = self.worktree_manager.base_dir();
if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await {
while let Ok(Some(entry)) = entries.next_entry().await {
let name = entry.file_name();
let name_str = name.to_string_lossy();
if name_str.starts_with(short_id) {
let path = entry.path();
// Verify it's a valid git directory
if path.join(".git").exists() {
tracing::info!(
task_id = %task_id,
worktree_path = %path.display(),
"Found worktree by scanning directory"
);
return Ok(path);
}
}
}
}
Err(format!(
"No worktree found for task {}. The worktree may have been cleaned up.",
task_id
))
}
/// Handle ApplyPatchToWorktree command - apply a patch from a cross-daemon task to a supervisor's worktree.
async fn handle_apply_patch_to_worktree(
&self,
target_task_id: Uuid,
source_task_id: Uuid,
patch_data: String,
base_sha: String,
) -> Result<(), DaemonError> {
// Find the target task's worktree
let worktree_path = match self.find_worktree_for_task_tm(target_task_id).await {
Ok(path) => path,
Err(e) => {
tracing::error!(
target_task_id = %target_task_id,
error = %e,
"Failed to find worktree for patch application"
);
let msg = DaemonMessage::task_output(
target_task_id,
format!("Failed to apply patch from task {}: worktree not found - {}\n", source_task_id, e),
true,
);
let _ = self.ws_tx.send(msg).await;
return Ok(());
}
};
tracing::info!(
target_task_id = %target_task_id,
source_task_id = %source_task_id,
worktree = %worktree_path.display(),
"Applying cross-daemon patch to worktree"
);
// Decode the base64-gzipped patch data
let patch_bytes = match base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &patch_data) {
Ok(bytes) => bytes,
Err(e) => {
tracing::error!(error = %e, "Failed to decode patch base64");
let msg = DaemonMessage::task_output(
target_task_id,
format!("Failed to apply patch from task {}: base64 decode error - {}\n", source_task_id, e),
true,
);
let _ = self.ws_tx.send(msg).await;
return Ok(());
}
};
// Decompress the gzipped patch
let patch_content = {
use std::io::Read;
let mut decoder = flate2::read::GzDecoder::new(&patch_bytes[..]);
let mut content = String::new();
match decoder.read_to_string(&mut content) {
Ok(_) => content,
Err(e) => {
tracing::error!(error = %e, "Failed to decompress patch");
let msg = DaemonMessage::task_output(
target_task_id,
format!("Failed to apply patch from task {}: decompress error - {}\n", source_task_id, e),
true,
);
let _ = self.ws_tx.send(msg).await;
return Ok(());
}
}
};
// Check if patch is empty
if patch_content.trim().is_empty() {
tracing::info!(
target_task_id = %target_task_id,
source_task_id = %source_task_id,
"Cross-daemon task had no changes to merge"
);
let msg = DaemonMessage::task_output(
target_task_id,
format!("Cross-daemon task {} completed with no changes to merge\n", source_task_id),
false,
);
let _ = self.ws_tx.send(msg).await;
return Ok(());
}
// Apply the patch using git apply
let mut child = match tokio::process::Command::new("git")
.current_dir(&worktree_path)
.args(["apply", "--3way", "-"])
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
{
Ok(child) => child,
Err(e) => {
tracing::error!(error = %e, "Failed to spawn git apply");
let msg = DaemonMessage::task_output(
target_task_id,
format!("Failed to apply patch from task {}: spawn error - {}\n", source_task_id, e),
true,
);
let _ = self.ws_tx.send(msg).await;
return Ok(());
}
};
// Write patch to stdin
if let Some(mut stdin) = child.stdin.take() {
use tokio::io::AsyncWriteExt;
if let Err(e) = stdin.write_all(patch_content.as_bytes()).await {
tracing::error!(error = %e, "Failed to write patch to git apply stdin");
}
}
// Wait for completion
let output = match child.wait_with_output().await {
Ok(output) => output,
Err(e) => {
tracing::error!(error = %e, "Failed to wait for git apply");
let msg = DaemonMessage::task_output(
target_task_id,
format!("Failed to apply patch from task {}: wait error - {}\n", source_task_id, e),
true,
);
let _ = self.ws_tx.send(msg).await;
return Ok(());
}
};
if output.status.success() {
tracing::info!(
target_task_id = %target_task_id,
source_task_id = %source_task_id,
base_sha = %base_sha,
"Successfully applied cross-daemon patch"
);
let msg = DaemonMessage::task_output(
target_task_id,
format!("Successfully merged changes from cross-daemon task {} (base: {})\n", source_task_id, &base_sha[..8]),
false,
);
let _ = self.ws_tx.send(msg).await;
} else {
let stderr = String::from_utf8_lossy(&output.stderr);
tracing::error!(
target_task_id = %target_task_id,
source_task_id = %source_task_id,
stderr = %stderr,
"Failed to apply cross-daemon patch"
);
let msg = DaemonMessage::task_output(
target_task_id,
format!("Failed to apply patch from task {}: {}\n", source_task_id, stderr),
true,
);
let _ = self.ws_tx.send(msg).await;
}
Ok(())
}
/// Handle InheritGitConfig command - read git config from a directory and store it.
async fn handle_inherit_git_config(
&self,
source_dir: Option<String>,
) -> Result<(), DaemonError> {
// Use provided directory or current working directory
let dir = source_dir
.map(std::path::PathBuf::from)
.unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from(".")));
tracing::info!(dir = ?dir, "Reading git config from directory");
// Read user.email
let email_output = tokio::process::Command::new("git")
.current_dir(&dir)
.args(["config", "user.email"])
.output()
.await;
let user_email = match email_output {
Ok(output) if output.status.success() => {
let email = String::from_utf8_lossy(&output.stdout).trim().to_string();
if !email.is_empty() {
Some(email)
} else {
None
}
}
_ => None,
};
// Read user.name
let name_output = tokio::process::Command::new("git")
.current_dir(&dir)
.args(["config", "user.name"])
.output()
.await;
let user_name = match name_output {
Ok(output) if output.status.success() => {
let name = String::from_utf8_lossy(&output.stdout).trim().to_string();
if !name.is_empty() {
Some(name)
} else {
None
}
}
_ => None,
};
// Check if we got at least one value
if user_email.is_none() && user_name.is_none() {
let msg = DaemonMessage::GitConfigInherited {
success: false,
user_email: None,
user_name: None,
error: Some("No git config found in the specified directory".to_string()),
};
let _ = self.ws_tx.send(msg).await;
return Ok(());
}
// Store the config
if let Some(ref email) = user_email {
*self.git_user_email.write().await = Some(email.clone());
tracing::info!(email = %email, "Inherited git user.email");
}
if let Some(ref name) = user_name {
*self.git_user_name.write().await = Some(name.clone());
tracing::info!(name = %name, "Inherited git user.name");
}
// Send success response
let msg = DaemonMessage::GitConfigInherited {
success: true,
user_email,
user_name,
error: None,
};
let _ = self.ws_tx.send(msg).await;
Ok(())
}
/// Apply inherited git config to a worktree directory.
pub async fn apply_git_config(&self, worktree_path: &std::path::Path) -> Result<(), DaemonError> {
let email = self.git_user_email.read().await.clone();
let name = self.git_user_name.read().await.clone();
if let Some(email) = email {
let result = tokio::process::Command::new("git")
.current_dir(worktree_path)
.args(["config", "user.email", &email])
.output()
.await;
if let Err(e) = result {
tracing::warn!(error = %e, "Failed to set git user.email in worktree");
}
}
if let Some(name) = name {
let result = tokio::process::Command::new("git")
.current_dir(worktree_path)
.args(["config", "user.name", &name])
.output()
.await;
if let Err(e) = result {
tracing::warn!(error = %e, "Failed to set git user.name in worktree");
}
}
Ok(())
}
}
/// Inner state for spawned tasks (cloneable).
struct TaskManagerInner {
worktree_manager: Arc<WorktreeManager>,
process_manager: Arc<ProcessManager>,
temp_manager: Arc<TempManager>,
tasks: Arc<RwLock<HashMap<Uuid, ManagedTask>>>,
ws_tx: mpsc::Sender<DaemonMessage>,
task_inputs: Arc<RwLock<HashMap<Uuid, mpsc::Sender<String>>>>,
active_pids: Arc<RwLock<HashMap<Uuid, u32>>>,
git_user_email: Arc<RwLock<Option<String>>>,
git_user_name: Arc<RwLock<Option<String>>>,
api_url: String,
api_key: String,
heartbeat_commit_interval_secs: u64,
/// Shared contract task counts for releasing concurrency slots.
contract_task_counts: Arc<RwLock<HashMap<Uuid, usize>>>,
/// Checkpoint patch storage configuration.
checkpoint_patches: CheckpointPatchConfig,
/// Local SQLite database for crash recovery.
local_db: Arc<std::sync::Mutex<LocalDb>>,
}
impl TaskManagerInner {
/// Release a concurrency slot when a task completes.
async fn release_concurrency_slot(&self, concurrency_key: Uuid) {
let mut counts = self.contract_task_counts.write().await;
if let Some(count) = counts.get_mut(&concurrency_key) {
*count = count.saturating_sub(1);
let new_count = *count;
if new_count == 0 {
counts.remove(&concurrency_key);
}
tracing::debug!(
concurrency_key = %concurrency_key,
new_count = new_count,
"Released concurrency slot (from TaskManagerInner)"
);
}
}
/// Remove completed/failed task from local database.
fn remove_task_from_local_db(&self, task_id: Uuid) {
if let Ok(db) = self.local_db.lock() {
if let Err(e) = db.delete_task(task_id) {
tracing::warn!(task_id = %task_id, error = %e, "Failed to remove task from local database");
} else {
tracing::debug!(task_id = %task_id, "Removed task from local database");
}
}
}
/// Fetch the latest checkpoint patch from the server and restore a worktree.
async fn fetch_and_restore_patch(
&self,
task_id: Uuid,
task_name: &str,
repo_source: Option<&str>,
) -> Result<Option<std::path::PathBuf>, DaemonError> {
use crate::daemon::api::client::ApiClient;
if self.api_key.is_empty() {
tracing::debug!(task_id = %task_id, "No API key configured, skipping patch fetch");
return Ok(None);
}
let client = match ApiClient::new(self.api_url.clone(), self.api_key.clone()) {
Ok(c) => c,
Err(e) => {
tracing::warn!(task_id = %task_id, error = %e, "Failed to create API client for patch fetch");
return Ok(None);
}
};
let path = format!("/api/v1/mesh/tasks/{}/patch-data", task_id);
#[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct PatchDataResponse {
patch_data: String,
base_commit_sha: String,
repository_url: Option<String>,
}
let resp: PatchDataResponse = match client.get(&path).await {
Ok(r) => r,
Err(crate::daemon::api::client::ApiError::Api { status: 404, .. }) => {
tracing::debug!(task_id = %task_id, "No checkpoint patch found on server");
return Ok(None);
}
Err(e) => {
tracing::warn!(task_id = %task_id, error = %e, "Failed to fetch patch data from server");
return Ok(None);
}
};
// Determine repo source: prefer the one from run_task args, fall back to server response
let source = repo_source
.map(|s| s.to_string())
.or(resp.repository_url);
let source = match source {
Some(s) => s,
None => {
tracing::warn!(task_id = %task_id, "No repository URL available to restore patch");
return Ok(None);
}
};
tracing::info!(
task_id = %task_id,
base_sha = %resp.base_commit_sha,
"Fetched checkpoint patch from server, attempting restore"
);
// Decode base64 patch data
let patch_bytes = match base64::Engine::decode(
&base64::engine::general_purpose::STANDARD,
&resp.patch_data,
) {
Ok(b) => b,
Err(e) => {
tracing::warn!(task_id = %task_id, error = %e, "Failed to decode fetched patch data");
return Ok(None);
}
};
match self.worktree_manager.restore_from_patch(
&source,
task_id,
task_name,
&resp.base_commit_sha,
&patch_bytes,
).await {
Ok(worktree_info) => {
tracing::info!(
task_id = %task_id,
path = %worktree_info.path.display(),
"Successfully restored worktree from fetched patch"
);
Ok(Some(worktree_info.path))
}
Err(e) => {
tracing::warn!(task_id = %task_id, error = %e, "Failed to restore worktree from fetched patch");
Ok(None)
}
}
}
/// Run a task to completion.
#[allow(clippy::too_many_arguments)]
async fn run_task(
&self,
task_id: Uuid,
task_name: String,
plan: String,
repo_source: Option<String>,
base_branch: Option<String>,
target_branch: Option<String>,
is_orchestrator: bool,
is_supervisor: bool,
target_repo_path: Option<String>,
completion_action: Option<String>,
continue_from_task_id: Option<Uuid>,
copy_files: Option<Vec<String>>,
contract_id: Option<Uuid>,
autonomous_loop: bool,
resume_session: bool,
conversation_history: Option<serde_json::Value>,
patch_data: Option<String>,
patch_base_sha: Option<String>,
local_only: bool,
auto_merge_local: bool,
supervisor_worktree_task_id: Option<Uuid>,
directive_id: Option<Uuid>,
) -> Result<(), DaemonError> {
tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, resume_session = resume_session, has_patch = patch_data.is_some(), "=== RUN_TASK START ===");
// If resuming session, try to find existing worktree first
let existing_worktree = if resume_session {
match self.find_worktree_for_task(task_id).await {
Ok(path) => {
tracing::info!(task_id = %task_id, path = %path.display(), "Found existing worktree for session resume");
Some(path)
}
Err(e) => {
tracing::warn!(task_id = %task_id, error = %e, "No existing worktree found for resume, will create new");
None
}
}
} else {
None
};
// Try to restore from patch if worktree is missing but we have patch data
let restored_from_patch = if existing_worktree.is_none() {
if let (Some(patch_str), Some(base_sha), Some(source)) = (&patch_data, &patch_base_sha, &repo_source) {
tracing::info!(
task_id = %task_id,
base_sha = %base_sha,
patch_len = patch_str.len(),
"Attempting to restore worktree from patch"
);
let msg = DaemonMessage::task_output(
task_id,
format!("Restoring worktree from checkpoint patch...\n"),
false,
);
let _ = self.ws_tx.send(msg).await;
// Decode base64 patch data
match base64::Engine::decode(&base64::engine::general_purpose::STANDARD, patch_str) {
Ok(patch_bytes) => {
match self.worktree_manager.restore_from_patch(
source,
task_id,
&task_name,
base_sha,
&patch_bytes,
).await {
Ok(worktree_info) => {
tracing::info!(
task_id = %task_id,
path = %worktree_info.path.display(),
"Successfully restored worktree from patch"
);
// Store worktree info
{
let mut tasks = self.tasks.write().await;
if let Some(task) = tasks.get_mut(&task_id) {
task.worktree = Some(worktree_info.clone());
}
}
let msg = DaemonMessage::task_output(
task_id,
format!("Worktree restored at {}\n", worktree_info.path.display()),
false,
);
let _ = self.ws_tx.send(msg).await;
Some(worktree_info.path)
}
Err(e) => {
tracing::warn!(task_id = %task_id, error = %e, "Failed to restore from patch, will clone fresh");
let msg = DaemonMessage::task_output(
task_id,
format!("Warning: Failed to restore from patch ({}), starting fresh\n", e),
false,
);
let _ = self.ws_tx.send(msg).await;
None
}
}
}
Err(e) => {
tracing::warn!(task_id = %task_id, error = %e, "Failed to decode patch data");
None
}
}
} else {
None
}
} else {
None
};
// If resuming but no local worktree and no inline patch, try fetching from server
let restored_from_patch = if restored_from_patch.is_none() && existing_worktree.is_none() && resume_session {
tracing::info!(task_id = %task_id, "No local worktree or inline patch for resume, trying server fetch");
let msg = DaemonMessage::task_output(
task_id,
"Fetching checkpoint patch from server...\n".to_string(),
false,
);
let _ = self.ws_tx.send(msg).await;
match self.fetch_and_restore_patch(task_id, &task_name, repo_source.as_deref()).await {
Ok(Some(path)) => {
// Store worktree info in tasks map
{
let mut tasks = self.tasks.write().await;
if let Some(task) = tasks.get_mut(&task_id) {
task.worktree = Some(WorktreeInfo {
path: path.clone(),
branch: format!("task/{}", task_id),
source_repo: repo_source.clone().unwrap_or_default().into(),
});
}
}
let msg = DaemonMessage::task_output(
task_id,
format!("Worktree restored from server patch at {}\n", path.display()),
false,
);
let _ = self.ws_tx.send(msg).await;
Some(path)
}
Ok(None) => {
tracing::info!(task_id = %task_id, "No server patch available, falling through to conversation-only resume");
let msg = DaemonMessage::task_output(
task_id,
"No checkpoint patch available on server, resuming with conversation history only\n".to_string(),
false,
);
let _ = self.ws_tx.send(msg).await;
None
}
Err(e) => {
tracing::warn!(task_id = %task_id, error = %e, "Failed to fetch/restore patch from server");
None
}
}
} else {
restored_from_patch
};
// Determine working directory
// First check if we should share a supervisor's worktree
// Track if we need to merge to supervisor on completion (cross-daemon case)
let mut merge_to_supervisor_on_completion: Option<Uuid> = None;
let shared_supervisor_worktree = if let Some(supervisor_task_id) = supervisor_worktree_task_id {
match self.find_worktree_for_task(supervisor_task_id).await {
Ok(path) => {
tracing::info!(
task_id = %task_id,
supervisor_task_id = %supervisor_task_id,
path = %path.display(),
"Using shared worktree from supervisor"
);
let msg = DaemonMessage::task_output(
task_id,
format!("Using shared worktree from supervisor: {}\n", path.display()),
false,
);
let _ = self.ws_tx.send(msg).await;
Some(path)
}
Err(_) => {
// Supervisor worktree not on this daemon (cross-daemon case)
// Will create own worktree and merge to supervisor on completion
tracing::info!(
task_id = %task_id,
supervisor_task_id = %supervisor_task_id,
"Supervisor worktree not on this daemon, will create own and merge on completion"
);
let msg = DaemonMessage::task_output(
task_id,
format!("Supervisor on different daemon, will merge changes on completion\n"),
false,
);
let _ = self.ws_tx.send(msg).await;
// Mark for merge on completion
merge_to_supervisor_on_completion = Some(supervisor_task_id);
None
}
}
} else {
None
};
let has_existing_worktree = existing_worktree.is_some() || restored_from_patch.is_some() || shared_supervisor_worktree.is_some();
let working_dir = if let Some(shared_path) = shared_supervisor_worktree {
// Use supervisor's worktree directly (no copy, no new branch)
shared_path
} else if let Some(existing) = existing_worktree {
// Reuse existing worktree for session resume
let msg = DaemonMessage::task_output(
task_id,
format!("Resuming session in existing worktree: {}\n", existing.display()),
false,
);
let _ = self.ws_tx.send(msg).await;
existing
} else if let Some(restored_path) = restored_from_patch {
// Already restored from patch above
restored_path
} else if let Some(ref source) = repo_source {
if is_new_repo_request(source) {
// Explicit new repo request: new:// or new://project-name
tracing::info!(
task_id = %task_id,
source = %source,
"Creating new git repository"
);
let msg = DaemonMessage::task_output(
task_id,
format!("Initializing new git repository...\n"),
false,
);
let _ = self.ws_tx.send(msg).await;
let worktree_info = self.worktree_manager
.init_new_repo(task_id, source)
.await
.map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?;
tracing::info!(
task_id = %task_id,
path = %worktree_info.path.display(),
"New repository created"
);
// Apply inherited git config to the new repo (overrides defaults)
self.apply_git_config(&worktree_info.path).await;
// Store worktree info
{
let mut tasks = self.tasks.write().await;
if let Some(task) = tasks.get_mut(&task_id) {
task.worktree = Some(worktree_info.clone());
}
}
let msg = DaemonMessage::task_output(
task_id,
format!("Repository ready at {}\n", worktree_info.path.display()),
false,
);
let _ = self.ws_tx.send(msg).await;
worktree_info.path
} else {
// Send progress message
let msg = DaemonMessage::task_output(
task_id,
format!("Setting up worktree from {}...\n", source),
false,
);
let _ = self.ws_tx.send(msg).await;
// Ensure source repo exists (clone if URL, verify if path)
let source_repo = self.worktree_manager.ensure_repo(source).await
.map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?;
// Detect or use provided base branch
let branch = if let Some(ref b) = base_branch {
b.clone()
} else {
self.worktree_manager.detect_default_branch(&source_repo).await
.map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?
};
tracing::info!(
task_id = %task_id,
source = %source,
branch = %branch,
continue_from_task_id = ?continue_from_task_id,
"Setting up worktree"
);
// Create worktree - either from scratch or copying from another task
let task_name = format!("task-{}", &task_id.to_string()[..8]);
let worktree_info = if let Some(from_task_id) = continue_from_task_id {
// Fallback chain for continuing from a previous task:
// 1. Try copying from existing worktree (fastest, preserves uncommitted changes)
// 2. Try creating from pushed branch (branch was pushed to remote)
// 3. Try restoring from saved patch data
// 4. Fail if none available
// Step 1: Try copying from existing worktree
let copy_result = match self.find_worktree_for_task(from_task_id).await {
Ok(source_worktree) => {
let msg = DaemonMessage::task_output(
task_id,
format!("Continuing from task {} worktree...\n", &from_task_id.to_string()[..8]),
false,
);
let _ = self.ws_tx.send(msg).await;
self.worktree_manager
.create_worktree_from_task(&source_worktree, task_id, &task_name)
.await
}
Err(e) => Err(crate::daemon::worktree::WorktreeError::RepoNotFound(e.to_string())),
};
match copy_result {
Ok(info) => info,
Err(copy_err) => {
tracing::warn!(
task_id = %task_id,
from_task_id = %from_task_id,
error = %copy_err,
"Failed to copy from source worktree, trying branch fallback"
);
// Step 2: Try creating from pushed branch
let msg = DaemonMessage::task_output(
task_id,
format!("Source worktree unavailable, checking for pushed branch...\n"),
false,
);
let _ = self.ws_tx.send(msg).await;
match self.worktree_manager
.create_worktree_from_task_branch(&source_repo, from_task_id, task_id, &task_name)
.await
{
Ok(info) => {
tracing::info!(
task_id = %task_id,
from_task_id = %from_task_id,
branch = %info.branch,
"Successfully created worktree from pushed branch"
);
let msg = DaemonMessage::task_output(
task_id,
format!("Restored from pushed branch {}\n", info.branch),
false,
);
let _ = self.ws_tx.send(msg).await;
info
}
Err(branch_err) => {
tracing::warn!(
task_id = %task_id,
from_task_id = %from_task_id,
error = %branch_err,
"No pushed branch found, trying patch fallback"
);
// Step 3: Try restoring from saved patch data
if let (Some(patch_str), Some(base_sha)) = (&patch_data, &patch_base_sha) {
tracing::info!(
task_id = %task_id,
from_task_id = %from_task_id,
base_sha = %base_sha,
patch_len = patch_str.len(),
"Attempting to restore from checkpoint patch"
);
let msg = DaemonMessage::task_output(
task_id,
format!("Restoring from checkpoint patch...\n"),
false,
);
let _ = self.ws_tx.send(msg).await;
match base64::Engine::decode(&base64::engine::general_purpose::STANDARD, patch_str) {
Ok(patch_bytes) => {
match self.worktree_manager.restore_from_patch(
source,
task_id,
&task_name,
base_sha,
&patch_bytes,
).await {
Ok(worktree_info) => {
tracing::info!(
task_id = %task_id,
path = %worktree_info.path.display(),
"Successfully restored worktree from patch"
);
worktree_info
}
Err(patch_err) => {
tracing::warn!(
task_id = %task_id,
from_task_id = %from_task_id,
error = %patch_err,
"Patch restore failed — falling back to fresh worktree"
);
let msg = DaemonMessage::task_output(
task_id,
format!("Patch restore failed, starting fresh from {}\n", branch),
false,
);
let _ = self.ws_tx.send(msg).await;
self.worktree_manager
.create_worktree(&source_repo, task_id, &task_name, &branch)
.await
.map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?
}
}
}
Err(decode_err) => {
tracing::warn!(
task_id = %task_id,
from_task_id = %from_task_id,
error = %decode_err,
"Patch decode failed — falling back to fresh worktree"
);
let msg = DaemonMessage::task_output(
task_id,
format!("Patch decode failed, starting fresh from {}\n", branch),
false,
);
let _ = self.ws_tx.send(msg).await;
self.worktree_manager
.create_worktree(&source_repo, task_id, &task_name, &branch)
.await
.map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?
}
}
} else {
// Step 4: Fall back to fresh worktree from base branch
tracing::warn!(
task_id = %task_id,
from_task_id = %from_task_id,
"All continue_from fallbacks failed — creating fresh worktree from base branch"
);
let msg = DaemonMessage::task_output(
task_id,
format!("Source task worktree unavailable, starting fresh from {}\n", branch),
false,
);
let _ = self.ws_tx.send(msg).await;
self.worktree_manager
.create_worktree(&source_repo, task_id, &task_name, &branch)
.await
.map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?
}
}
}
}
}
} else {
// Create fresh worktree from repo
self.worktree_manager
.create_worktree(&source_repo, task_id, &task_name, &branch)
.await
.map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?
};
tracing::info!(
task_id = %task_id,
worktree_path = %worktree_info.path.display(),
branch = %worktree_info.branch,
continued_from = ?continue_from_task_id,
"Worktree created"
);
// Apply inherited git config to the worktree
self.apply_git_config(&worktree_info.path).await;
// Store worktree info
{
let mut tasks = self.tasks.write().await;
if let Some(task) = tasks.get_mut(&task_id) {
task.worktree = Some(worktree_info.clone());
}
}
let msg = DaemonMessage::task_output(
task_id,
format!("Worktree ready at {}\n", worktree_info.path.display()),
false,
);
let _ = self.ws_tx.send(msg).await;
worktree_info.path
}
} else {
// No repo specified - use managed temp directory in ~/.makima/temp/
tracing::info!(task_id = %task_id, "Creating managed temp directory (no repo)");
let msg = DaemonMessage::task_output(
task_id,
"Creating temporary working directory...\n".to_string(),
false,
);
let _ = self.ws_tx.send(msg).await;
let temp_dir = self.temp_manager.create_task_dir(task_id).await?;
let msg = DaemonMessage::task_output(
task_id,
format!("Working directory ready at {}\n", temp_dir.display()),
false,
);
let _ = self.ws_tx.send(msg).await;
temp_dir
};
// Store merge target if cross-daemon (will merge to supervisor on completion)
if let Some(supervisor_task_id) = merge_to_supervisor_on_completion {
let mut tasks = self.tasks.write().await;
if let Some(task) = tasks.get_mut(&task_id) {
task.merge_to_supervisor_task_id = Some(supervisor_task_id);
tracing::info!(
task_id = %task_id,
supervisor_task_id = %supervisor_task_id,
"Task marked for merge to supervisor on completion"
);
}
}
// Copy files from parent task's worktree if specified
if let Some(ref files) = copy_files {
if !files.is_empty() {
// Get the parent task ID to find its worktree
let parent_task_id = {
let tasks = self.tasks.read().await;
tasks.get(&task_id).and_then(|t| t.parent_task_id)
};
if let Some(parent_id) = parent_task_id {
match self.find_worktree_for_task(parent_id).await {
Ok(parent_worktree) => {
let msg = DaemonMessage::task_output(
task_id,
format!("Copying {} files from orchestrator...\n", files.len()),
false,
);
let _ = self.ws_tx.send(msg).await;
for file_path in files {
let source = parent_worktree.join(file_path);
let dest = working_dir.join(file_path);
// Create parent directories if needed
if let Some(parent) = dest.parent() {
if let Err(e) = tokio::fs::create_dir_all(parent).await {
tracing::warn!(
task_id = %task_id,
file = %file_path,
error = %e,
"Failed to create parent directory for file"
);
continue;
}
}
// Copy the file
match tokio::fs::copy(&source, &dest).await {
Ok(_) => {
tracing::info!(
task_id = %task_id,
source = %source.display(),
dest = %dest.display(),
"Copied file from orchestrator"
);
}
Err(e) => {
tracing::warn!(
task_id = %task_id,
source = %source.display(),
dest = %dest.display(),
error = %e,
"Failed to copy file from orchestrator"
);
// Notify but don't fail - the file might be optional
let msg = DaemonMessage::task_output(
task_id,
format!("Warning: Could not copy {}: {}\n", file_path, e),
false,
);
let _ = self.ws_tx.send(msg).await;
}
}
}
let msg = DaemonMessage::task_output(
task_id,
"Files copied from orchestrator.\n".to_string(),
false,
);
let _ = self.ws_tx.send(msg).await;
}
Err(e) => {
tracing::warn!(
task_id = %task_id,
parent_id = %parent_id,
error = %e,
"Could not find parent task worktree for file copying"
);
}
}
} else {
tracing::warn!(
task_id = %task_id,
"copy_files specified but no parent_task_id"
);
}
}
}
// Update state to Starting
tracing::info!(task_id = %task_id, "Updating state: Initializing -> Starting");
self.update_state(task_id, TaskState::Starting).await;
self.send_status_change(task_id, "initializing", "starting").await;
// Check Claude is available
match self.process_manager.check_claude_available().await {
Ok(version) => {
tracing::info!(task_id = %task_id, version = %version, "Claude Code available");
let msg = DaemonMessage::task_output(
task_id,
format!("Claude Code {} ready\n", version),
false,
);
let _ = self.ws_tx.send(msg).await;
}
Err(e) => {
let err_msg = format!("Claude Code not available: {}", e);
tracing::error!(task_id = %task_id, error = %err_msg);
return Err(DaemonError::Task(TaskError::SetupFailed(err_msg)));
}
}
// Set up supervisor, orchestrator, or subtask mode
let (extra_env, full_plan, system_prompt) = if is_supervisor {
// Supervisor mode: long-running contract orchestrator
tracing::info!(task_id = %task_id, working_dir = %working_dir.display(), "Setting up supervisor mode");
let msg = DaemonMessage::task_output(
task_id,
"Setting up supervisor environment...\n".to_string(),
false,
);
let _ = self.ws_tx.send(msg).await;
// Generate tool key for API access
let tool_key = generate_tool_key();
tracing::info!(task_id = %task_id, tool_key_len = tool_key.len(), "Generated tool key for supervisor");
// Register tool key with server
let register_msg = DaemonMessage::register_tool_key(task_id, tool_key.clone());
if self.ws_tx.send(register_msg).await.is_err() {
tracing::warn!(task_id = %task_id, "Failed to register tool key");
} else {
tracing::info!(task_id = %task_id, "Tool key registration message sent to server");
}
// Set up environment variables for makima CLI
let mut env = HashMap::new();
env.insert("MAKIMA_API_URL".to_string(), self.api_url.clone());
env.insert("MAKIMA_API_KEY".to_string(), tool_key.clone());
env.insert("MAKIMA_TASK_ID".to_string(), task_id.to_string());
// Supervisor needs contract ID for its tools
if let Some(cid) = contract_id {
env.insert("MAKIMA_CONTRACT_ID".to_string(), cid.to_string());
}
tracing::info!(
task_id = %task_id,
api_url = %self.api_url,
tool_key_preview = &tool_key[..8.min(tool_key.len())],
"Set supervisor environment variables"
);
// For supervisor, pass instructions as SYSTEM PROMPT (not user message)
// This ensures Claude treats them as behavioral constraints
let supervisor_user_plan = format!(
"Contract goal:\n{}",
plan
);
let msg = DaemonMessage::task_output(
task_id,
"Supervisor environment ready (makima CLI available)\n".to_string(),
false,
);
let _ = self.ws_tx.send(msg).await;
// Return system prompt separately - it will be passed via --system-prompt flag
(Some(env), supervisor_user_plan, Some(SUPERVISOR_SYSTEM_PROMPT.to_string()))
} else if is_orchestrator {
tracing::info!(task_id = %task_id, working_dir = %working_dir.display(), "Setting up orchestrator mode");
let msg = DaemonMessage::task_output(
task_id,
"Setting up orchestrator environment...\n".to_string(),
false,
);
let _ = self.ws_tx.send(msg).await;
// Generate tool key for API access
let tool_key = generate_tool_key();
tracing::info!(task_id = %task_id, tool_key_len = tool_key.len(), "Generated tool key for orchestrator");
// Register tool key with server
let register_msg = DaemonMessage::register_tool_key(task_id, tool_key.clone());
if self.ws_tx.send(register_msg).await.is_err() {
tracing::warn!(task_id = %task_id, "Failed to register tool key");
} else {
tracing::info!(task_id = %task_id, "Tool key registration message sent to server");
}
// Set up environment variables for makima CLI
let mut env = HashMap::new();
env.insert("MAKIMA_API_URL".to_string(), self.api_url.clone());
env.insert("MAKIMA_API_KEY".to_string(), tool_key.clone());
env.insert("MAKIMA_TASK_ID".to_string(), task_id.to_string());
tracing::info!(
task_id = %task_id,
api_url = %self.api_url,
tool_key_preview = &tool_key[..8.min(tool_key.len())],
"Set orchestrator environment variables"
);
// For orchestrator, pass instructions as SYSTEM PROMPT
let orchestrator_user_plan = format!(
"Your task:\n{}",
plan
);
let msg = DaemonMessage::task_output(
task_id,
"Orchestrator environment ready (makima CLI available)\n".to_string(),
false,
);
let _ = self.ws_tx.send(msg).await;
(Some(env), orchestrator_user_plan, Some(ORCHESTRATOR_SYSTEM_PROMPT.to_string()))
} else {
tracing::info!(task_id = %task_id, "Running as regular subtask (not orchestrator)");
// For subtasks, pass worktree isolation instructions as system prompt
let subtask_user_plan = format!(
"Your task:\n{}",
plan
);
(None, subtask_user_plan, Some(SUBTASK_SYSTEM_PROMPT.to_string()))
};
// Add contract environment if task has contract_id (skip for supervisors - they already have it)
let (extra_env, full_plan, system_prompt) = if let Some(cid) = contract_id {
if is_supervisor {
// Supervisors already have contract ID and API access set up
tracing::info!(task_id = %task_id, contract_id = %cid, "Supervisor already has contract integration");
(extra_env, full_plan, system_prompt)
} else {
tracing::info!(task_id = %task_id, contract_id = %cid, "Setting up contract integration");
// Set up environment variables for makima CLI
let mut env = extra_env.unwrap_or_default();
env.insert("MAKIMA_CONTRACT_ID".to_string(), cid.to_string());
// If not already an orchestrator, we need API access for makima CLI
if !is_orchestrator {
// Generate tool key for API access
let tool_key = generate_tool_key();
tracing::info!(task_id = %task_id, "Generated tool key for contract access");
// Register tool key with server
let register_msg = DaemonMessage::register_tool_key(task_id, tool_key.clone());
if self.ws_tx.send(register_msg).await.is_err() {
tracing::warn!(task_id = %task_id, "Failed to register contract tool key");
}
env.insert("MAKIMA_API_URL".to_string(), self.api_url.clone());
env.insert("MAKIMA_API_KEY".to_string(), tool_key);
env.insert("MAKIMA_TASK_ID".to_string(), task_id.to_string());
}
let msg = DaemonMessage::task_output(
task_id,
"Contract integration ready (makima CLI available)\n".to_string(),
false,
);
let _ = self.ws_tx.send(msg).await;
// Prepend contract integration prompt to the plan so the task knows to use makima CLI
let contract_plan = format!(
"{}{}",
CONTRACT_INTEGRATION_PROMPT,
full_plan
);
(Some(env), contract_plan, system_prompt)
}
} else {
(extra_env, full_plan, system_prompt)
};
// Add directive environment if task has directive_id
let (extra_env, full_plan, system_prompt) = if let Some(did) = directive_id {
tracing::info!(task_id = %task_id, directive_id = %did, "Setting up directive integration");
let mut env = extra_env.unwrap_or_default();
env.insert("MAKIMA_DIRECTIVE_ID".to_string(), did.to_string());
// If not already an orchestrator/supervisor, we need API access for makima CLI
if !is_orchestrator && !is_supervisor && !env.contains_key("MAKIMA_API_KEY") {
let tool_key = generate_tool_key();
tracing::info!(task_id = %task_id, "Generated tool key for directive access");
let register_msg = DaemonMessage::register_tool_key(task_id, tool_key.clone());
if self.ws_tx.send(register_msg).await.is_err() {
tracing::warn!(task_id = %task_id, "Failed to register directive tool key");
}
env.insert("MAKIMA_API_URL".to_string(), self.api_url.clone());
env.insert("MAKIMA_API_KEY".to_string(), tool_key);
env.insert("MAKIMA_TASK_ID".to_string(), task_id.to_string());
}
let msg = DaemonMessage::task_output(
task_id,
"Directive integration ready (makima CLI available)\n".to_string(),
false,
);
let _ = self.ws_tx.send(msg).await;
(Some(env), full_plan, system_prompt)
} else {
(extra_env, full_plan, system_prompt)
};
// Spawn Claude process
let plan_bytes = full_plan.len();
let plan_chars = full_plan.chars().count();
// Rough token estimate: ~4 chars per token for English
let estimated_tokens = plan_chars / 4;
tracing::info!(
task_id = %task_id,
working_dir = %working_dir.display(),
is_orchestrator = is_orchestrator,
plan_bytes = plan_bytes,
plan_chars = plan_chars,
estimated_tokens = estimated_tokens,
"Spawning Claude process"
);
// Warn if plan is very large (Claude's context is typically 100k-200k tokens)
if estimated_tokens > 50_000 {
tracing::warn!(task_id = %task_id, estimated_tokens = estimated_tokens, "Plan is very large - may hit context limits!");
let msg = DaemonMessage::task_output(
task_id,
format!("Warning: Plan is very large (~{} tokens). This may cause issues.\n", estimated_tokens),
false,
);
let _ = self.ws_tx.send(msg).await;
}
let msg = DaemonMessage::task_output(
task_id,
if is_orchestrator {
format!("Starting Claude Code (orchestrator mode, ~{} tokens)...\n", estimated_tokens)
} else {
format!("Starting Claude Code (~{} tokens)...\n", estimated_tokens)
},
false,
);
let _ = self.ws_tx.send(msg).await;
// Clone extra_env for use in autonomous loop iterations
let extra_env_for_loop = extra_env.clone();
tracing::debug!(task_id = %task_id, has_system_prompt = system_prompt.is_some(), resume_session = resume_session, "Calling process_manager.spawn()...");
let mut process = if resume_session {
// Use --continue flag to resume from previous session
// Build continuation prompt based on whether worktree exists
let continuation_prompt = if has_existing_worktree {
// Worktree exists: Claude's session state should work
format!(
"Resuming previous session. Continue from where you left off.\n\n{}",
full_plan
)
} else if let Some(ref history) = conversation_history {
// Worktree missing: inject conversation history as context
let history_str = serde_json::to_string_pretty(history).unwrap_or_default();
format!(
"Resuming previous session. Here is the conversation history from the previous session:\n\n\
<previous_conversation>\n{}\n</previous_conversation>\n\n\
Continue from where you left off with this task:\n\n{}",
history_str,
full_plan
)
} else {
// No history available: just the plan
format!("Resuming with plan:\n\n{}", full_plan)
};
let resume_msg = if has_existing_worktree {
"Using --continue to resume previous conversation...\n"
} else if conversation_history.is_some() {
"Worktree not found. Resuming with injected conversation history...\n"
} else {
"Resuming without conversation history (worktree not found)...\n"
};
let msg = DaemonMessage::task_output(
task_id,
resume_msg.to_string(),
false,
);
let _ = self.ws_tx.send(msg).await;
self.process_manager
.spawn_continue(&working_dir, &continuation_prompt, extra_env, system_prompt.as_deref())
.await
.map_err(|e| {
tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process with --continue");
DaemonError::Task(TaskError::SetupFailed(e.to_string()))
})?
} else {
self.process_manager
.spawn_with_system_prompt(&working_dir, &full_plan, extra_env, system_prompt.as_deref())
.await
.map_err(|e| {
tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process");
DaemonError::Task(TaskError::SetupFailed(e.to_string()))
})?
};
// Register the process PID for graceful shutdown tracking
if let Some(pid) = process.id() {
self.active_pids.write().await.insert(task_id, pid);
tracing::info!(task_id = %task_id, pid = pid, "Claude process spawned successfully, PID registered");
} else {
tracing::info!(task_id = %task_id, "Claude process spawned successfully (no PID available)");
}
// Set up input channel for this task so we can send messages to its stdin
tracing::debug!(task_id = %task_id, "Setting up input channel...");
let (input_tx, mut input_rx) = mpsc::channel::<String>(100);
tracing::debug!(task_id = %task_id, "Acquiring task_inputs write lock...");
self.task_inputs.write().await.insert(task_id, input_tx);
tracing::debug!(task_id = %task_id, "Input channel registered");
// Get stdin handle for input forwarding and completion signaling
let stdin_handle = process.stdin_handle();
let mut stdin_handle_for_completion = stdin_handle.clone();
tracing::info!(task_id = %task_id, "Setting up stdin forwarder for task input (JSON protocol)");
tokio::spawn(async move {
tracing::info!(task_id = %task_id, "Stdin forwarder task started, waiting for messages...");
while let Some(msg) = input_rx.recv().await {
tracing::info!(task_id = %task_id, msg_len = msg.len(), msg_preview = %if msg.len() > 50 { &msg[..50] } else { &msg }, "Received message from input channel");
// Format as JSON user message for stream-json input protocol
let json_msg = ClaudeInputMessage::user(&msg);
let json_line = match json_msg.to_json_line() {
Ok(line) => line,
Err(e) => {
tracing::error!(task_id = %task_id, error = %e, "Failed to serialize input message");
continue;
}
};
tracing::debug!(task_id = %task_id, json_line = %json_line.trim(), "Formatted JSON line for stdin");
let mut stdin_guard = stdin_handle.lock().await;
if let Some(ref mut stdin) = *stdin_guard {
tracing::debug!(task_id = %task_id, "Acquired stdin lock, writing...");
if stdin.write_all(json_line.as_bytes()).await.is_err() {
tracing::warn!(task_id = %task_id, "Failed to write to stdin, breaking");
break;
}
if stdin.flush().await.is_err() {
tracing::warn!(task_id = %task_id, "Failed to flush stdin, breaking");
break;
}
tracing::info!(task_id = %task_id, json_len = json_line.len(), "Successfully wrote user message to Claude stdin");
} else {
tracing::warn!(task_id = %task_id, "Stdin is None (already closed), cannot send message");
break;
}
}
tracing::info!(task_id = %task_id, "Stdin forwarder task ended (channel closed or stdin unavailable)");
});
// Update state to Running
{
tracing::debug!(task_id = %task_id, "Acquiring tasks write lock for Running state update");
let mut tasks = self.tasks.write().await;
if let Some(task) = tasks.get_mut(&task_id) {
task.state = TaskState::Running;
task.started_at = Some(Instant::now());
}
tracing::debug!(task_id = %task_id, "Released tasks write lock");
}
tracing::info!(task_id = %task_id, "Updating state: Starting -> Running");
self.send_status_change(task_id, "starting", "running").await;
tracing::debug!(task_id = %task_id, "Sent status change notification");
// Stream output with startup timeout check
tracing::info!(task_id = %task_id, "Starting output stream - waiting for Claude output...");
tracing::debug!(task_id = %task_id, "Output will be forwarded via WebSocket to server");
let ws_tx = self.ws_tx.clone();
// For auth error detection
let claude_command = self.process_manager.claude_command().to_string();
let daemon_hostname = hostname::get().ok().and_then(|h| h.into_string().ok());
let mut auth_error_handled = false;
// For autonomous loop mode: track accumulated output for COMPLETION_GATE detection
let mut accumulated_output = String::new();
let mut circuit_breaker = CircuitBreaker::new();
let mut iteration_count = 0u32;
let mut final_exit_code: i64 = -1; // Track the final exit code across iterations
// Autonomous loop: we may run multiple iterations
'autonomous_loop: loop {
iteration_count += 1;
if autonomous_loop && iteration_count > 1 {
tracing::info!(
task_id = %task_id,
iteration = iteration_count,
"Starting autonomous loop iteration"
);
let msg = DaemonMessage::task_output(
task_id,
format!("\n[Autonomous Loop] Starting iteration {} (--continue mode)\n", iteration_count),
false,
);
let _ = self.ws_tx.send(msg).await;
// For subsequent iterations, spawn with --continue flag
let continuation_prompt = "Continue working on the task. Review your previous output and progress. When you are completely done, output a COMPLETION_GATE block with ready: true.";
process = self.process_manager
.spawn_continue(&working_dir, continuation_prompt, extra_env_for_loop.clone(), system_prompt.as_deref())
.await
.map_err(|e| {
tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process for continuation");
DaemonError::Task(TaskError::SetupFailed(e.to_string()))
})?;
// Register the new process PID
if let Some(pid) = process.id() {
self.active_pids.write().await.insert(task_id, pid);
tracing::info!(task_id = %task_id, pid = pid, iteration = iteration_count, "Claude continue process spawned");
}
// Reset stdin handle for the new process
stdin_handle_for_completion = process.stdin_handle();
}
// Clear output for this iteration (we'll check for COMPLETION_GATE in the new output)
let mut iteration_output = String::new();
let mut output_count = 0u64;
let mut output_bytes = 0usize;
let startup_timeout = tokio::time::Duration::from_secs(30);
let mut startup_check = tokio::time::interval(tokio::time::Duration::from_secs(5));
startup_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let startup_deadline = tokio::time::Instant::now() + startup_timeout;
// Heartbeat commit interval (only active if configured and we have a git repo)
let heartbeat_enabled = self.heartbeat_commit_interval_secs > 0 && repo_source.is_some();
let mut heartbeat_interval = tokio::time::interval(
tokio::time::Duration::from_secs(
if heartbeat_enabled { self.heartbeat_commit_interval_secs } else { u64::MAX }
)
);
heartbeat_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
// Skip the first immediate tick
heartbeat_interval.tick().await;
loop {
tokio::select! {
maybe_line = process.next_output() => {
match maybe_line {
Some(line) => {
output_count += 1;
output_bytes += line.content.len();
// Accumulate output for COMPLETION_GATE detection in autonomous loop mode
if autonomous_loop {
iteration_output.push_str(&line.content);
iteration_output.push('\n');
}
if output_count == 1 {
tracing::info!(task_id = %task_id, "Received first output line from Claude");
}
if output_count % 100 == 0 {
tracing::debug!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output progress");
}
// Log output details for debugging
tracing::trace!(
task_id = %task_id,
line_num = output_count,
content_len = line.content.len(),
is_stdout = line.is_stdout,
json_type = ?line.json_type,
"Forwarding output to WebSocket"
);
// Check if this is a "result" message indicating task completion
// With --input-format=stream-json, Claude waits for more input after completion
if line.json_type.as_deref() == Some("result") {
if autonomous_loop || directive_id.is_some() {
// Autonomous loop: close stdin so we can spawn the next iteration
// Directive tasks: close stdin so the process exits and the step completes
tracing::info!(task_id = %task_id, directive_id = ?directive_id, "Received result message, closing stdin to signal completion");
let mut stdin_guard = stdin_handle_for_completion.lock().await;
if let Some(mut stdin) = stdin_guard.take() {
let _ = stdin.shutdown().await;
}
} else {
// Interactive mode (mesh, contracts): keep stdin open so the user
// can send follow-up messages. Claude will stay alive waiting for input.
tracing::info!(task_id = %task_id, "Received result message, keeping stdin open for interactive input");
}
}
// Check for OAuth auth error before sending output
let content_for_auth_check = line.content.clone();
let json_type_for_auth_check = line.json_type.clone();
let is_stdout_for_auth_check = line.is_stdout;
let msg = DaemonMessage::task_output(task_id, line.content, false);
if ws_tx.send(msg).await.is_err() {
tracing::warn!(task_id = %task_id, "Failed to send output, channel closed");
break;
}
// Detect OAuth token expiration - log warning and let the task fail normally.
// Users can reauthorize via the Daemons page instead.
if !auth_error_handled && is_oauth_auth_error(&content_for_auth_check, json_type_for_auth_check.as_deref(), is_stdout_for_auth_check) {
auth_error_handled = true;
tracing::warn!(task_id = %task_id, "OAuth authentication error detected - task will fail. Reauthorize via Daemons page.");
let error_msg = DaemonMessage::task_output(
task_id,
format!("⚠ Authentication expired on daemon{}. Go to Daemons page to reauthorize, then retry this task.\n",
daemon_hostname.as_ref().map(|h| format!(" ({})", h)).unwrap_or_default()),
false,
);
let _ = ws_tx.send(error_msg).await;
}
}
None => {
tracing::info!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output stream ended");
break;
}
}
}
_ = startup_check.tick(), if output_count == 0 => {
// Check if process is still alive
match process.try_wait() {
Ok(Some(exit_code)) => {
tracing::error!(task_id = %task_id, exit_code = exit_code, "Claude process exited before producing output!");
let msg = DaemonMessage::task_output(
task_id,
format!("Error: Claude process exited unexpectedly with code {}\n", exit_code),
false,
);
let _ = ws_tx.send(msg).await;
break;
}
Ok(None) => {
// Still running but no output
if tokio::time::Instant::now() > startup_deadline {
tracing::warn!(task_id = %task_id, "Claude process not producing output after 30s - may be stuck");
let msg = DaemonMessage::task_output(
task_id,
"Warning: Claude Code is taking longer than expected to start. It may be waiting for authentication or network access.\n".to_string(),
false,
);
let _ = ws_tx.send(msg).await;
} else {
tracing::debug!(task_id = %task_id, "Claude process still running, waiting for output...");
}
}
Err(e) => {
tracing::error!(task_id = %task_id, error = %e, "Failed to check Claude process status");
}
}
}
_ = heartbeat_interval.tick(), if heartbeat_enabled => {
// Create periodic ephemeral patch to preserve work-in-progress
match self.create_ephemeral_patch(task_id, &working_dir).await {
Ok(files_count) => {
let msg = DaemonMessage::task_output(
task_id,
format!("[Heartbeat] Patch saved ({} files)\n", files_count),
false,
);
let _ = ws_tx.send(msg).await;
}
Err(e) => {
// No changes to patch or error - this is fine, just log at debug level
tracing::debug!(task_id = %task_id, error = %e, "Heartbeat patch skipped");
}
}
}
}
}
// Wait for process to exit
let exit_code = process.wait().await.unwrap_or(-1);
final_exit_code = exit_code; // Store for use after the loop
// Unregister the process PID (process has exited)
self.active_pids.write().await.remove(&task_id);
tracing::debug!(task_id = %task_id, "Unregistered process PID");
// Clean up input channel for this task
self.task_inputs.write().await.remove(&task_id);
tracing::debug!(task_id = %task_id, "Removed task input channel");
// Accumulate this iteration's output
accumulated_output.push_str(&iteration_output);
// === AUTONOMOUS LOOP LOGIC ===
// Check if we should continue or complete
if autonomous_loop && exit_code == 0 {
// Check for COMPLETION_GATE in the output
let completion_gate = CompletionGate::parse_last(&iteration_output);
match completion_gate {
Some(gate) if gate.ready => {
tracing::info!(
task_id = %task_id,
iteration = iteration_count,
reason = ?gate.reason,
"COMPLETION_GATE ready=true detected, task complete"
);
let msg = DaemonMessage::task_output(
task_id,
format!("\n[Autonomous Loop] Task completed after {} iteration(s). Reason: {}\n",
iteration_count,
gate.reason.unwrap_or_else(|| "Task complete".to_string())
),
false,
);
let _ = self.ws_tx.send(msg).await;
break 'autonomous_loop;
}
Some(gate) => {
// COMPLETION_GATE found but not ready
tracing::info!(
task_id = %task_id,
iteration = iteration_count,
reason = ?gate.reason,
blockers = ?gate.blockers,
"COMPLETION_GATE ready=false, will continue"
);
// Check circuit breaker
// For now, we consider output_bytes > 0 as "progress"
let had_progress = output_bytes > 0;
let error = gate.blockers.as_ref().and_then(|b| b.first()).map(|s| s.as_str());
if !circuit_breaker.record_iteration(had_progress, error) {
// Circuit breaker tripped
tracing::warn!(
task_id = %task_id,
reason = ?circuit_breaker.open_reason,
"Circuit breaker tripped, stopping autonomous loop"
);
let msg = DaemonMessage::task_output(
task_id,
format!("\n[Autonomous Loop] Circuit breaker tripped: {}\n",
circuit_breaker.open_reason.as_deref().unwrap_or("Unknown reason")
),
false,
);
let _ = self.ws_tx.send(msg).await;
break 'autonomous_loop;
}
let msg = DaemonMessage::task_output(
task_id,
format!("\n[Autonomous Loop] COMPLETION_GATE ready=false. Reason: {}. Restarting...\n",
gate.reason.unwrap_or_else(|| "Not complete".to_string())
),
false,
);
let _ = self.ws_tx.send(msg).await;
// Continue to next iteration
continue 'autonomous_loop;
}
None => {
// No COMPLETION_GATE found - check circuit breaker and continue
tracing::info!(
task_id = %task_id,
iteration = iteration_count,
"No COMPLETION_GATE found, will restart with continuation prompt"
);
let had_progress = output_bytes > 0;
if !circuit_breaker.record_iteration(had_progress, None) {
tracing::warn!(
task_id = %task_id,
reason = ?circuit_breaker.open_reason,
"Circuit breaker tripped (no COMPLETION_GATE), stopping"
);
let msg = DaemonMessage::task_output(
task_id,
format!("\n[Autonomous Loop] Circuit breaker tripped: {}\n",
circuit_breaker.open_reason.as_deref().unwrap_or("Unknown reason")
),
false,
);
let _ = self.ws_tx.send(msg).await;
break 'autonomous_loop;
}
let msg = DaemonMessage::task_output(
task_id,
"\n[Autonomous Loop] No COMPLETION_GATE found. Restarting with --continue...\n".to_string(),
false,
);
let _ = self.ws_tx.send(msg).await;
continue 'autonomous_loop;
}
}
} else {
// Not in autonomous loop mode or process failed - exit normally
break 'autonomous_loop;
}
} // end 'autonomous_loop
// Update state based on exit code
let success = final_exit_code == 0;
let new_state = if success {
TaskState::Completed
} else {
TaskState::Failed
};
tracing::info!(
task_id = %task_id,
exit_code = final_exit_code,
success = success,
new_state = ?new_state,
"Claude process exited, updating task state"
);
{
let mut tasks = self.tasks.write().await;
if let Some(task) = tasks.get_mut(&task_id) {
task.state = new_state;
task.completed_at = Some(Instant::now());
if !success {
task.error = Some(format!("Process exited with code {}", final_exit_code));
}
}
}
// Execute completion action if task succeeded (skip in local_only mode unless auto_merge_local is enabled)
let completion_result = if success {
if local_only {
if auto_merge_local {
// In local_only mode with auto_merge_local enabled, merge locally
tracing::info!(
task_id = %task_id,
"Local-only mode with auto_merge_local - executing local merge"
);
self.execute_completion_action(
task_id,
&task_name,
&working_dir,
"merge", // Use merge action (not pr)
target_repo_path.as_deref(),
target_branch.as_deref(),
).await
} else {
tracing::info!(
task_id = %task_id,
"Skipping completion action - contract is in local_only mode"
);
Ok(None)
}
} else if let Some(ref action) = completion_action {
if action != "none" {
self.execute_completion_action(
task_id,
&task_name,
&working_dir,
action,
target_repo_path.as_deref(),
target_branch.as_deref(),
).await
} else {
Ok(None)
}
} else {
Ok(None)
}
} else {
Ok(None)
};
// Log completion action result
match &completion_result {
Ok(Some(pr_url)) => {
tracing::info!(task_id = %task_id, pr_url = %pr_url, "Completion action created PR");
}
Ok(None) => {}
Err(e) => {
tracing::warn!(task_id = %task_id, error = %e, "Completion action failed (task still marked as done)");
}
}
// If this task needs to merge to supervisor (cross-daemon case), generate and send patch
let merge_to_supervisor = {
let tasks = self.tasks.read().await;
tasks.get(&task_id).and_then(|t| t.merge_to_supervisor_task_id)
};
if let Some(supervisor_task_id) = merge_to_supervisor {
if success {
tracing::info!(
task_id = %task_id,
supervisor_task_id = %supervisor_task_id,
"Task completed on cross-daemon, generating patch to merge to supervisor"
);
// Get base SHA from the worktree's initial commit or parent
match crate::daemon::storage::get_parent_sha(&working_dir).await {
Ok(base_sha) => {
// Generate patch
match crate::daemon::storage::create_patch(&working_dir, &base_sha).await {
Ok((patch_bytes, files_count)) => {
// Base64 encode the patch
let patch_data = base64::Engine::encode(
&base64::engine::general_purpose::STANDARD,
&patch_bytes,
);
tracing::info!(
task_id = %task_id,
supervisor_task_id = %supervisor_task_id,
files_count = files_count,
patch_size = patch_bytes.len(),
"Sending patch to supervisor"
);
// Send MergePatchToSupervisor message to server
let msg = DaemonMessage::MergePatchToSupervisor {
task_id,
supervisor_task_id,
patch_data,
base_sha,
};
let _ = self.ws_tx.send(msg).await;
let output_msg = DaemonMessage::task_output(
task_id,
format!("Sent {} file(s) to supervisor for merge\n", files_count),
false,
);
let _ = self.ws_tx.send(output_msg).await;
}
Err(e) => {
tracing::warn!(
task_id = %task_id,
error = %e,
"Failed to create patch for supervisor merge"
);
}
}
}
Err(e) => {
tracing::warn!(
task_id = %task_id,
error = %e,
"Failed to get base SHA for supervisor merge"
);
}
}
}
}
// Notify server - but NOT for supervisors which should never complete
if is_supervisor {
tracing::info!(
task_id = %task_id,
exit_code = final_exit_code,
"Supervisor Claude process exited - NOT marking as complete"
);
// Update local state to reflect it's paused/waiting for input
{
let mut tasks = self.tasks.write().await;
if let Some(task) = tasks.get_mut(&task_id) {
task.state = TaskState::Running; // Keep it as running, not completed
task.completed_at = None;
}
}
// Send a status message to let the frontend know supervisor is ready for more input
let msg = DaemonMessage::task_output(
task_id,
"\n[Supervisor ready for next instruction]\n".to_string(),
false,
);
let _ = self.ws_tx.send(msg).await;
} else {
// Create completion patch before notifying server
if let Err(e) = self.create_completion_patch(task_id, &working_dir).await {
tracing::debug!(task_id = %task_id, error = %e, "No completion patch created");
}
let error = if success {
None
} else {
Some(format!("Exit code: {}", final_exit_code))
};
tracing::info!(task_id = %task_id, success = success, "Notifying server of task completion");
let msg = DaemonMessage::task_complete(task_id, success, error);
let _ = self.ws_tx.send(msg).await;
// Remove completed task from local database (no longer needs crash recovery)
self.remove_task_from_local_db(task_id);
}
// Note: Worktrees are kept until explicitly deleted (per user preference)
// This allows inspection, PR creation, etc.
tracing::info!(task_id = %task_id, "=== RUN_TASK END ===");
Ok(())
}
/// Execute the completion action for a task.
async fn execute_completion_action(
&self,
task_id: Uuid,
task_name: &str,
worktree_path: &std::path::Path,
action: &str,
target_repo_path: Option<&str>,
target_branch: Option<&str>,
) -> Result<Option<String>, String> {
// For PR action, we can use the worktree's origin directly if target_repo_path is not set
let target_repo = match target_repo_path {
Some(path) => Some(crate::daemon::worktree::expand_tilde(path)),
None => {
if action == "pr" || action == "branch" {
// For PR/branch action without target_repo, use origin directly
None
} else {
tracing::warn!(task_id = %task_id, "No target_repo_path configured, skipping completion action");
return Ok(None);
}
}
};
// Validate target_repo exists if provided
if let Some(ref repo) = target_repo {
if !repo.exists() {
return Err(format!("Target repo not found: {} (expanded from {:?})", repo.display(), target_repo_path));
}
}
// Get the branch name: makima/{task-name-with-dashes}-{short-id}
let branch_name = format!(
"makima/{}-{}",
crate::daemon::worktree::sanitize_name(task_name),
crate::daemon::worktree::short_uuid(task_id)
);
// Determine target branch - use provided value or detect default branch
let target_branch = match target_branch {
Some(branch) => branch.to_string(),
None => {
// Detect default branch from target_repo if available, otherwise from worktree
let detect_path = target_repo.as_ref().map(|p| p.as_path()).unwrap_or(worktree_path);
self.worktree_manager
.detect_default_branch(detect_path)
.await
.unwrap_or_else(|_| "master".to_string())
}
};
let msg = DaemonMessage::task_output(
task_id,
format!("Executing completion action: {}...\n", action),
false,
);
let _ = self.ws_tx.send(msg).await;
match action {
"branch" => {
match target_repo {
Some(target_repo) => {
// Push branch to local target repo
self.worktree_manager
.push_to_target_repo(worktree_path, &target_repo, &branch_name, task_name)
.await
.map_err(|e| e.to_string())?;
let msg = DaemonMessage::task_output(
task_id,
format!("Branch '{}' pushed to {}\n", branch_name, target_repo.display()),
false,
);
let _ = self.ws_tx.send(msg).await;
}
None => {
// Push branch to origin (GitHub)
self.worktree_manager
.push_branch_to_origin(worktree_path, &branch_name, task_name)
.await
.map_err(|e| e.to_string())?;
let msg = DaemonMessage::task_output(
task_id,
format!("Branch '{}' pushed to origin\n", branch_name),
false,
);
let _ = self.ws_tx.send(msg).await;
}
}
Ok(None)
}
"merge" => {
let target_repo = target_repo.ok_or_else(|| "No target_repo_path configured for merge action".to_string())?;
// Push and merge into target branch
let commit_sha = self.worktree_manager
.merge_to_target(worktree_path, &target_repo, &branch_name, &target_branch, task_name)
.await
.map_err(|e| e.to_string())?;
let msg = DaemonMessage::task_output(
task_id,
format!("Branch merged into {} (commit: {})\n", target_branch, commit_sha),
false,
);
let _ = self.ws_tx.send(msg).await;
Ok(None)
}
"pr" => {
// Push and create PR
// For PR, we can use target_repo if provided, or create PR directly from worktree
let title = task_name.to_string();
let body = format!(
"Automated PR from makima task.\n\nTask ID: `{}`",
task_id
);
let pr_url = self.worktree_manager
.create_pull_request(
worktree_path,
target_repo.as_deref(),
&branch_name,
&target_branch,
&title,
&body,
)
.await
.map_err(|e| e.to_string())?;
let msg = DaemonMessage::task_output(
task_id,
format!("Pull request created: {}\n", pr_url),
false,
);
let _ = self.ws_tx.send(msg).await;
Ok(Some(pr_url))
}
_ => {
tracing::warn!(task_id = %task_id, action = %action, "Unknown completion action");
Ok(None)
}
}
}
/// Find worktree path for a task ID.
/// First checks in-memory tasks, then scans the worktrees directory.
async fn find_worktree_for_task(&self, task_id: Uuid) -> Result<PathBuf, String> {
// First try to get from in-memory tasks
{
let tasks = self.tasks.read().await;
if let Some(task) = tasks.get(&task_id) {
if let Some(ref worktree) = task.worktree {
return Ok(worktree.path.clone());
}
}
}
// Task not in memory - scan worktrees directory for matching task ID
let short_id = &task_id.to_string()[..8];
let worktrees_dir = self.worktree_manager.base_dir();
if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await {
while let Ok(Some(entry)) = entries.next_entry().await {
let name = entry.file_name();
let name_str = name.to_string_lossy();
if name_str.starts_with(short_id) {
let path = entry.path();
// Verify it's a valid git directory
if path.join(".git").exists() {
tracing::info!(
task_id = %task_id,
worktree_path = %path.display(),
"Found worktree by scanning directory"
);
return Ok(path);
}
}
}
}
Err(format!(
"No worktree found for task {}. The worktree may have been cleaned up.",
task_id
))
}
async fn update_state(&self, task_id: Uuid, state: TaskState) {
let mut tasks = self.tasks.write().await;
if let Some(task) = tasks.get_mut(&task_id) {
task.state = state;
}
}
async fn send_status_change(&self, task_id: Uuid, old_status: &str, new_status: &str) {
let msg = DaemonMessage::task_status_change(task_id, old_status, new_status);
let _ = self.ws_tx.send(msg).await;
}
/// Mark task as failed.
async fn mark_failed(&self, task_id: Uuid, error: &str) {
{
let mut tasks = self.tasks.write().await;
if let Some(task) = tasks.get_mut(&task_id) {
task.state = TaskState::Failed;
task.error = Some(error.to_string());
task.completed_at = Some(Instant::now());
}
}
// Notify server
let msg = DaemonMessage::task_complete(task_id, false, Some(error.to_string()));
let _ = self.ws_tx.send(msg).await;
// Remove failed task from local database
self.remove_task_from_local_db(task_id);
}
/// Apply inherited git config to a worktree directory.
async fn apply_git_config(&self, worktree_path: &std::path::Path) {
let email = self.git_user_email.read().await.clone();
let name = self.git_user_name.read().await.clone();
if email.is_none() && name.is_none() {
return; // No inherited config to apply
}
if let Some(email) = email {
let result = tokio::process::Command::new("git")
.current_dir(worktree_path)
.args(["config", "user.email", &email])
.output()
.await;
match result {
Ok(output) if output.status.success() => {
tracing::debug!(email = %email, path = ?worktree_path, "Applied git user.email to worktree");
}
Ok(output) => {
tracing::warn!(
path = ?worktree_path,
stderr = %String::from_utf8_lossy(&output.stderr),
"Failed to set git user.email in worktree"
);
}
Err(e) => {
tracing::warn!(error = %e, "Failed to run git config user.email");
}
}
}
if let Some(name) = name {
let result = tokio::process::Command::new("git")
.current_dir(worktree_path)
.args(["config", "user.name", &name])
.output()
.await;
match result {
Ok(output) if output.status.success() => {
tracing::debug!(name = %name, path = ?worktree_path, "Applied git user.name to worktree");
}
Ok(output) => {
tracing::warn!(
path = ?worktree_path,
stderr = %String::from_utf8_lossy(&output.stderr),
"Failed to set git user.name in worktree"
);
}
Err(e) => {
tracing::warn!(error = %e, "Failed to run git config user.name");
}
}
}
}
/// Create an ephemeral patch of all changes (committed + uncommitted) since the
/// merge-base with main/master and send to the server.
/// Stages and commits any uncommitted changes, then diffs against the merge-base.
/// Returns the number of files changed on success, or an error message if nothing to patch.
async fn create_ephemeral_patch(
&self,
task_id: Uuid,
worktree_path: &std::path::Path,
) -> Result<i32, String> {
if !self.checkpoint_patches.enabled {
return Err("Checkpoint patches disabled".into());
}
// 1. Find merge-base with main/master (the fork point)
let base_sha = storage::get_merge_base_sha(worktree_path)
.await
.map_err(|e| format!("Failed to get merge-base: {}", e))?;
// 2. Stage and commit any uncommitted changes so they're included in the diff
let _ = tokio::process::Command::new("git")
.current_dir(worktree_path)
.args(["add", "-A"])
.output()
.await;
// Check if there are staged changes to commit
let staged_check = tokio::process::Command::new("git")
.current_dir(worktree_path)
.args(["diff", "--cached", "--quiet"])
.output()
.await;
if let Ok(output) = staged_check {
if !output.status.success() {
// There are staged changes - commit them
let _ = tokio::process::Command::new("git")
.current_dir(worktree_path)
.args(["commit", "-m", "WIP: heartbeat checkpoint"])
.output()
.await;
}
}
// 3. Create patch (diff merge-base..HEAD captures all work)
match storage::create_patch(worktree_path, &base_sha).await {
Ok((compressed_patch, patch_files_count)) => {
// Check size limit
if compressed_patch.len() > self.checkpoint_patches.max_patch_size_bytes {
tracing::warn!(
task_id = %task_id,
patch_size = compressed_patch.len(),
max_size = self.checkpoint_patches.max_patch_size_bytes,
"Patch exceeds size limit"
);
return Err("Patch exceeds size limit".into());
}
// Encode as base64 for JSON transport
let patch_data = base64::engine::general_purpose::STANDARD.encode(&compressed_patch);
tracing::debug!(
task_id = %task_id,
base_sha = %base_sha,
patch_size = compressed_patch.len(),
files_count = patch_files_count,
"Created ephemeral patch"
);
// Send CheckpointCreated message to server (patch-only, no commit)
let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC");
let msg = DaemonMessage::CheckpointCreated {
task_id,
success: true,
commit_sha: None,
branch_name: None,
checkpoint_number: None,
files_changed: None,
lines_added: None,
lines_removed: None,
error: None,
message: format!("Ephemeral patch - {}", timestamp),
patch_data: Some(patch_data),
patch_base_sha: Some(base_sha),
patch_files_count: Some(patch_files_count as i32),
};
let _ = self.ws_tx.send(msg).await;
Ok(patch_files_count as i32)
}
Err(e) => {
Err(format!("Failed to create patch: {}", e))
}
}
}
/// Create a completion patch capturing all changes (committed + uncommitted) since
/// the merge-base with main/master. Sent before TaskComplete so the server always
/// has a recoverable patch. All errors are non-fatal (logged, not propagated).
async fn create_completion_patch(
&self,
task_id: Uuid,
worktree_path: &std::path::Path,
) -> Result<(), String> {
// 1. Stage all changes
let _ = tokio::process::Command::new("git")
.current_dir(worktree_path)
.args(["add", "-A"])
.output()
.await;
// 2. Commit any staged changes so HEAD contains everything
let staged_check = tokio::process::Command::new("git")
.current_dir(worktree_path)
.args(["diff", "--cached", "--quiet"])
.output()
.await;
if let Ok(output) = staged_check {
if !output.status.success() {
let _ = tokio::process::Command::new("git")
.current_dir(worktree_path)
.args(["commit", "-m", "WIP: task completion checkpoint"])
.output()
.await;
}
}
// 3. Find merge-base with main/master
let base_sha = storage::get_merge_base_sha(worktree_path)
.await
.map_err(|e| format!("Failed to get merge-base: {}", e))?;
// 4. Create patch (diff merge-base..HEAD)
let (compressed_patch, patch_files_count) = storage::create_patch(worktree_path, &base_sha)
.await
.map_err(|e| format!("Failed to create patch: {}", e))?;
// 5. Check size limit
if compressed_patch.len() > self.checkpoint_patches.max_patch_size_bytes {
return Err(format!(
"Patch too large: {} bytes (max: {})",
compressed_patch.len(),
self.checkpoint_patches.max_patch_size_bytes
));
}
// 6. Send to server
let patch_data = base64::engine::general_purpose::STANDARD.encode(&compressed_patch);
let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC");
let msg = DaemonMessage::CheckpointCreated {
task_id,
success: true,
commit_sha: None,
branch_name: None,
checkpoint_number: None,
files_changed: None,
lines_added: None,
lines_removed: None,
error: None,
message: format!("Completion patch - {}", timestamp),
patch_data: Some(patch_data),
patch_base_sha: Some(base_sha),
patch_files_count: Some(patch_files_count as i32),
};
let _ = self.ws_tx.send(msg).await;
tracing::info!(
task_id = %task_id,
files_count = patch_files_count,
"Created completion patch"
);
Ok(())
}
}
impl Clone for TaskManagerInner {
fn clone(&self) -> Self {
Self {
worktree_manager: self.worktree_manager.clone(),
process_manager: self.process_manager.clone(),
temp_manager: self.temp_manager.clone(),
tasks: self.tasks.clone(),
ws_tx: self.ws_tx.clone(),
task_inputs: self.task_inputs.clone(),
active_pids: self.active_pids.clone(),
git_user_email: self.git_user_email.clone(),
git_user_name: self.git_user_name.clone(),
api_url: self.api_url.clone(),
api_key: self.api_key.clone(),
heartbeat_commit_interval_secs: self.heartbeat_commit_interval_secs,
contract_task_counts: self.contract_task_counts.clone(),
checkpoint_patches: self.checkpoint_patches.clone(),
local_db: self.local_db.clone(),
}
}
}