diff options
Diffstat (limited to 'makima/src/daemon/task/manager.rs')
| -rw-r--r-- | makima/src/daemon/task/manager.rs | 500 |
1 files changed, 383 insertions, 117 deletions
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs index 4ccedb2..75c884b 100644 --- a/makima/src/daemon/task/manager.rs +++ b/makima/src/daemon/task/manager.rs @@ -12,6 +12,7 @@ use uuid::Uuid; use std::collections::HashSet; +use super::completion_gate::{CircuitBreaker, CompletionGate}; use super::state::TaskState; use crate::daemon::error::{DaemonError, TaskError, TaskResult}; use crate::daemon::process::{ClaudeInputMessage, ProcessManager}; @@ -944,6 +945,8 @@ pub struct ManagedTask { pub copy_files: Option<Vec<String>>, /// Contract ID if this task is associated with a contract. pub contract_id: Option<Uuid>, + /// Whether to run in autonomous loop mode. + pub autonomous_loop: bool, /// Time task was created. pub created_at: Instant, /// Time task started running. @@ -973,6 +976,8 @@ pub struct TaskConfig { pub enable_permissions: bool, /// Disable verbose output. pub disable_verbose: bool, + /// Bubblewrap sandbox configuration. + pub bubblewrap: Option<crate::daemon::config::BubblewrapConfig>, } impl Default for TaskConfig { @@ -986,6 +991,7 @@ impl Default for TaskConfig { claude_pre_args: Vec::new(), enable_permissions: false, disable_verbose: false, + bubblewrap: None, } } } @@ -1027,7 +1033,8 @@ impl TaskManager { .with_pre_args(config.claude_pre_args.clone()) .with_permissions_enabled(config.enable_permissions) .with_verbose_disabled(config.disable_verbose) - .with_env(config.env_vars.clone()), + .with_env(config.env_vars.clone()) + .with_bubblewrap(config.bubblewrap.clone()), ); let temp_manager = Arc::new(TempManager::new()); @@ -1150,6 +1157,7 @@ impl TaskManager { copy_files, contract_id, is_supervisor, + autonomous_loop, } => { tracing::info!( task_id = %task_id, @@ -1161,6 +1169,7 @@ impl TaskManager { depth = depth, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, + autonomous_loop = autonomous_loop, target_repo_path = ?target_repo_path, completion_action = ?completion_action, continue_from_task_id = ?continue_from_task_id, @@ -1173,7 +1182,7 @@ impl TaskManager { task_id, task_name, plan, repo_url, base_branch, target_branch, parent_task_id, depth, is_orchestrator, is_supervisor, target_repo_path, completion_action, continue_from_task_id, - copy_files, contract_id + copy_files, contract_id, autonomous_loop ).await?; } DaemonCommand::PauseTask { task_id } => { @@ -1252,6 +1261,7 @@ impl TaskManager { None, // continue_from_task_id None, // copy_files contract_id, + false, // autonomous_loop - supervisors don't use this ).await { tracing::error!( task_id = %task_id, @@ -1421,6 +1431,17 @@ impl TaskManager { tracing::info!(task_id = %task_id, "Getting task diff"); self.handle_get_task_diff(task_id).await?; } + DaemonCommand::CleanupWorktree { + task_id, + delete_branch, + } => { + tracing::info!( + task_id = %task_id, + delete_branch = delete_branch, + "Cleaning up worktree" + ); + self.handle_cleanup_worktree(task_id, delete_branch).await?; + } } Ok(()) } @@ -1444,6 +1465,7 @@ impl TaskManager { continue_from_task_id: Option<Uuid>, copy_files: Option<Vec<String>>, contract_id: Option<Uuid>, + autonomous_loop: bool, ) -> TaskResult<()> { tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, "=== SPAWN_TASK START ==="); @@ -1496,6 +1518,7 @@ impl TaskManager { continue_from_task_id, copy_files: copy_files.clone(), contract_id, + autonomous_loop, created_at: Instant::now(), started_at: None, completed_at: None, @@ -1519,7 +1542,7 @@ impl TaskManager { if let Err(e) = inner.run_task( task_id, task_name, plan, repo_url, base_branch, target_branch, is_orchestrator, is_supervisor, target_repo_path, completion_action, - continue_from_task_id, copy_files, contract_id + continue_from_task_id, copy_files, contract_id, autonomous_loop ).await { tracing::error!(task_id = %task_id, error = %e, "Task execution failed"); inner.mark_failed(task_id, &e.to_string()).await; @@ -2046,6 +2069,76 @@ impl TaskManager { Ok(()) } + /// Handle CleanupWorktree command. + /// + /// Removes a task's worktree and optionally its branch. + /// Used when a contract is completed or deleted to clean up associated task worktrees. + async fn handle_cleanup_worktree( + &self, + task_id: Uuid, + delete_branch: bool, + ) -> Result<(), DaemonError> { + // Try to get the worktree path, but don't fail if not found + let worktree_result = self.get_task_worktree_path(task_id).await; + + let (success, message) = match worktree_result { + Ok(worktree_path) => { + // Remove the worktree + match self.worktree_manager.remove_worktree(&worktree_path, delete_branch).await { + Ok(()) => { + tracing::info!( + task_id = %task_id, + worktree_path = %worktree_path.display(), + delete_branch = delete_branch, + "Worktree cleaned up successfully" + ); + + // Also remove task from in-memory tracking + self.tasks.write().await.remove(&task_id); + self.task_inputs.write().await.remove(&task_id); + self.merge_trackers.write().await.remove(&task_id); + self.active_pids.write().await.remove(&task_id); + + (true, format!("Worktree cleaned up: {}", worktree_path.display())) + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + worktree_path = %worktree_path.display(), + error = %e, + "Failed to remove worktree" + ); + (false, format!("Failed to remove worktree: {}", e)) + } + } + } + Err(_) => { + // Worktree not found - this is OK, it may have already been cleaned up + tracing::debug!( + task_id = %task_id, + "No worktree found for task, may have already been cleaned up" + ); + + // Still remove from in-memory tracking + self.tasks.write().await.remove(&task_id); + self.task_inputs.write().await.remove(&task_id); + self.merge_trackers.write().await.remove(&task_id); + self.active_pids.write().await.remove(&task_id); + + (true, "No worktree found, task tracking cleaned up".to_string()) + } + }; + + // Send result back to server + let msg = DaemonMessage::CleanupWorktreeResult { + task_id, + success, + message, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + /// Handle ReadRepoFile command. /// /// Reads a file from a repository on the daemon's filesystem and sends @@ -2436,6 +2529,7 @@ impl TaskManagerInner { continue_from_task_id: Option<Uuid>, copy_files: Option<Vec<String>>, contract_id: Option<Uuid>, + autonomous_loop: bool, ) -> Result<(), DaemonError> { tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, "=== RUN_TASK START ==="); @@ -2908,6 +3002,9 @@ impl TaskManagerInner { ); let _ = self.ws_tx.send(msg).await; + // Clone extra_env for use in autonomous loop iterations + let extra_env_for_loop = extra_env.clone(); + tracing::debug!(task_id = %task_id, has_system_prompt = system_prompt.is_some(), "Calling process_manager.spawn()..."); let mut process = self.process_manager .spawn_with_system_prompt(&working_dir, &full_plan, extra_env, system_prompt.as_deref()) @@ -2934,7 +3031,7 @@ impl TaskManagerInner { // Get stdin handle for input forwarding and completion signaling let stdin_handle = process.stdin_handle(); - let stdin_handle_for_completion = stdin_handle.clone(); + let mut stdin_handle_for_completion = stdin_handle.clone(); tracing::info!(task_id = %task_id, "Setting up stdin forwarder for task input (JSON protocol)"); tokio::spawn(async move { @@ -2998,142 +3095,311 @@ impl TaskManagerInner { let daemon_hostname = hostname::get().ok().and_then(|h| h.into_string().ok()); let mut auth_error_handled = false; - let mut output_count = 0u64; - let mut output_bytes = 0usize; - let startup_timeout = tokio::time::Duration::from_secs(30); - let mut startup_check = tokio::time::interval(tokio::time::Duration::from_secs(5)); - startup_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - let startup_deadline = tokio::time::Instant::now() + startup_timeout; + // For autonomous loop mode: track accumulated output for COMPLETION_GATE detection + let mut accumulated_output = String::new(); + let mut circuit_breaker = CircuitBreaker::new(); + let mut iteration_count = 0u32; + let mut final_exit_code: i64 = -1; // Track the final exit code across iterations - loop { - tokio::select! { - maybe_line = process.next_output() => { - match maybe_line { - Some(line) => { - output_count += 1; - output_bytes += line.content.len(); - - if output_count == 1 { - tracing::info!(task_id = %task_id, "Received first output line from Claude"); - } - if output_count % 100 == 0 { - tracing::debug!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output progress"); - } + // Autonomous loop: we may run multiple iterations + 'autonomous_loop: loop { + iteration_count += 1; - // Log output details for debugging - tracing::trace!( - task_id = %task_id, - line_num = output_count, - content_len = line.content.len(), - is_stdout = line.is_stdout, - json_type = ?line.json_type, - "Forwarding output to WebSocket" - ); + if autonomous_loop && iteration_count > 1 { + tracing::info!( + task_id = %task_id, + iteration = iteration_count, + "Starting autonomous loop iteration" + ); + let msg = DaemonMessage::task_output( + task_id, + format!("\n[Autonomous Loop] Starting iteration {} (--continue mode)\n", iteration_count), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // For subsequent iterations, spawn with --continue flag + let continuation_prompt = "Continue working on the task. Review your previous output and progress. When you are completely done, output a COMPLETION_GATE block with ready: true."; + + process = self.process_manager + .spawn_continue(&working_dir, continuation_prompt, extra_env_for_loop.clone(), system_prompt.as_deref()) + .await + .map_err(|e| { + tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process for continuation"); + DaemonError::Task(TaskError::SetupFailed(e.to_string())) + })?; + + // Register the new process PID + if let Some(pid) = process.id() { + self.active_pids.write().await.insert(task_id, pid); + tracing::info!(task_id = %task_id, pid = pid, iteration = iteration_count, "Claude continue process spawned"); + } + + // Reset stdin handle for the new process + stdin_handle_for_completion = process.stdin_handle(); + } + + // Clear output for this iteration (we'll check for COMPLETION_GATE in the new output) + let mut iteration_output = String::new(); + + let mut output_count = 0u64; + let mut output_bytes = 0usize; + let startup_timeout = tokio::time::Duration::from_secs(30); + let mut startup_check = tokio::time::interval(tokio::time::Duration::from_secs(5)); + startup_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let startup_deadline = tokio::time::Instant::now() + startup_timeout; - // Check if this is a "result" message indicating task completion - // With --input-format=stream-json, Claude waits for more input after completion - // We close stdin to signal EOF and let the process exit - if line.json_type.as_deref() == Some("result") { - tracing::info!(task_id = %task_id, "Received result message, closing stdin to signal completion"); - let mut stdin_guard = stdin_handle_for_completion.lock().await; - if let Some(mut stdin) = stdin_guard.take() { - let _ = stdin.shutdown().await; + loop { + tokio::select! { + maybe_line = process.next_output() => { + match maybe_line { + Some(line) => { + output_count += 1; + output_bytes += line.content.len(); + + // Accumulate output for COMPLETION_GATE detection in autonomous loop mode + if autonomous_loop { + iteration_output.push_str(&line.content); + iteration_output.push('\n'); } - } - // Check for OAuth auth error before sending output - let content_for_auth_check = line.content.clone(); - let json_type_for_auth_check = line.json_type.clone(); - let is_stdout_for_auth_check = line.is_stdout; + if output_count == 1 { + tracing::info!(task_id = %task_id, "Received first output line from Claude"); + } + if output_count % 100 == 0 { + tracing::debug!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output progress"); + } - let msg = DaemonMessage::task_output(task_id, line.content, false); - if ws_tx.send(msg).await.is_err() { - tracing::warn!(task_id = %task_id, "Failed to send output, channel closed"); - break; - } + // Log output details for debugging + tracing::trace!( + task_id = %task_id, + line_num = output_count, + content_len = line.content.len(), + is_stdout = line.is_stdout, + json_type = ?line.json_type, + "Forwarding output to WebSocket" + ); - // Detect OAuth token expiration and trigger remote login flow - if !auth_error_handled && is_oauth_auth_error(&content_for_auth_check, json_type_for_auth_check.as_deref(), is_stdout_for_auth_check) { - auth_error_handled = true; - tracing::warn!(task_id = %task_id, "OAuth authentication error detected, initiating remote login flow"); - - // Spawn claude setup-token to get login URL - if let Some(login_url) = get_oauth_login_url(&claude_command).await { - tracing::info!(task_id = %task_id, login_url = %login_url, "Got OAuth login URL"); - let auth_msg = DaemonMessage::AuthenticationRequired { - task_id: Some(task_id), - login_url, - hostname: daemon_hostname.clone(), - }; - if ws_tx.send(auth_msg).await.is_err() { - tracing::warn!(task_id = %task_id, "Failed to send auth required message"); + // Check if this is a "result" message indicating task completion + // With --input-format=stream-json, Claude waits for more input after completion + // We close stdin to signal EOF and let the process exit + if line.json_type.as_deref() == Some("result") { + tracing::info!(task_id = %task_id, "Received result message, closing stdin to signal completion"); + let mut stdin_guard = stdin_handle_for_completion.lock().await; + if let Some(mut stdin) = stdin_guard.take() { + let _ = stdin.shutdown().await; + } + } + + // Check for OAuth auth error before sending output + let content_for_auth_check = line.content.clone(); + let json_type_for_auth_check = line.json_type.clone(); + let is_stdout_for_auth_check = line.is_stdout; + + let msg = DaemonMessage::task_output(task_id, line.content, false); + if ws_tx.send(msg).await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to send output, channel closed"); + break; + } + + // Detect OAuth token expiration and trigger remote login flow + if !auth_error_handled && is_oauth_auth_error(&content_for_auth_check, json_type_for_auth_check.as_deref(), is_stdout_for_auth_check) { + auth_error_handled = true; + tracing::warn!(task_id = %task_id, "OAuth authentication error detected, initiating remote login flow"); + + // Spawn claude setup-token to get login URL + if let Some(login_url) = get_oauth_login_url(&claude_command).await { + tracing::info!(task_id = %task_id, login_url = %login_url, "Got OAuth login URL"); + let auth_msg = DaemonMessage::AuthenticationRequired { + task_id: Some(task_id), + login_url, + hostname: daemon_hostname.clone(), + }; + if ws_tx.send(auth_msg).await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to send auth required message"); + } + } else { + tracing::error!(task_id = %task_id, "Failed to get OAuth login URL from setup-token"); + let fallback_msg = DaemonMessage::task_output( + task_id, + format!("Authentication required on daemon{}. Please run 'claude /login' on the daemon machine.\n", + daemon_hostname.as_ref().map(|h| format!(" ({})", h)).unwrap_or_default()), + false, + ); + let _ = ws_tx.send(fallback_msg).await; } - } else { - tracing::error!(task_id = %task_id, "Failed to get OAuth login URL from setup-token"); - let fallback_msg = DaemonMessage::task_output( - task_id, - format!("Authentication required on daemon{}. Please run 'claude /login' on the daemon machine.\n", - daemon_hostname.as_ref().map(|h| format!(" ({})", h)).unwrap_or_default()), - false, - ); - let _ = ws_tx.send(fallback_msg).await; } } - } - None => { - tracing::info!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output stream ended"); - break; + None => { + tracing::info!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output stream ended"); + break; + } } } - } - _ = startup_check.tick(), if output_count == 0 => { - // Check if process is still alive - match process.try_wait() { - Ok(Some(exit_code)) => { - tracing::error!(task_id = %task_id, exit_code = exit_code, "Claude process exited before producing output!"); - let msg = DaemonMessage::task_output( - task_id, - format!("Error: Claude process exited unexpectedly with code {}\n", exit_code), - false, - ); - let _ = ws_tx.send(msg).await; - break; - } - Ok(None) => { - // Still running but no output - if tokio::time::Instant::now() > startup_deadline { - tracing::warn!(task_id = %task_id, "Claude process not producing output after 30s - may be stuck"); + _ = startup_check.tick(), if output_count == 0 => { + // Check if process is still alive + match process.try_wait() { + Ok(Some(exit_code)) => { + tracing::error!(task_id = %task_id, exit_code = exit_code, "Claude process exited before producing output!"); let msg = DaemonMessage::task_output( task_id, - "Warning: Claude Code is taking longer than expected to start. It may be waiting for authentication or network access.\n".to_string(), + format!("Error: Claude process exited unexpectedly with code {}\n", exit_code), false, ); let _ = ws_tx.send(msg).await; - } else { - tracing::debug!(task_id = %task_id, "Claude process still running, waiting for output..."); + break; + } + Ok(None) => { + // Still running but no output + if tokio::time::Instant::now() > startup_deadline { + tracing::warn!(task_id = %task_id, "Claude process not producing output after 30s - may be stuck"); + let msg = DaemonMessage::task_output( + task_id, + "Warning: Claude Code is taking longer than expected to start. It may be waiting for authentication or network access.\n".to_string(), + false, + ); + let _ = ws_tx.send(msg).await; + } else { + tracing::debug!(task_id = %task_id, "Claude process still running, waiting for output..."); + } + } + Err(e) => { + tracing::error!(task_id = %task_id, error = %e, "Failed to check Claude process status"); } - } - Err(e) => { - tracing::error!(task_id = %task_id, error = %e, "Failed to check Claude process status"); } } } } - } - // Wait for process to exit - let exit_code = process.wait().await.unwrap_or(-1); + // Wait for process to exit + let exit_code = process.wait().await.unwrap_or(-1); + final_exit_code = exit_code; // Store for use after the loop + + // Unregister the process PID (process has exited) + self.active_pids.write().await.remove(&task_id); + tracing::debug!(task_id = %task_id, "Unregistered process PID"); + + // Clean up input channel for this task + self.task_inputs.write().await.remove(&task_id); + tracing::debug!(task_id = %task_id, "Removed task input channel"); - // Unregister the process PID (process has exited) - self.active_pids.write().await.remove(&task_id); - tracing::debug!(task_id = %task_id, "Unregistered process PID"); + // Accumulate this iteration's output + accumulated_output.push_str(&iteration_output); - // Clean up input channel for this task - self.task_inputs.write().await.remove(&task_id); - tracing::debug!(task_id = %task_id, "Removed task input channel"); + // === AUTONOMOUS LOOP LOGIC === + // Check if we should continue or complete + if autonomous_loop && exit_code == 0 { + // Check for COMPLETION_GATE in the output + let completion_gate = CompletionGate::parse_last(&iteration_output); + + match completion_gate { + Some(gate) if gate.ready => { + tracing::info!( + task_id = %task_id, + iteration = iteration_count, + reason = ?gate.reason, + "COMPLETION_GATE ready=true detected, task complete" + ); + let msg = DaemonMessage::task_output( + task_id, + format!("\n[Autonomous Loop] Task completed after {} iteration(s). Reason: {}\n", + iteration_count, + gate.reason.unwrap_or_else(|| "Task complete".to_string()) + ), + false, + ); + let _ = self.ws_tx.send(msg).await; + break 'autonomous_loop; + } + Some(gate) => { + // COMPLETION_GATE found but not ready + tracing::info!( + task_id = %task_id, + iteration = iteration_count, + reason = ?gate.reason, + blockers = ?gate.blockers, + "COMPLETION_GATE ready=false, will continue" + ); + + // Check circuit breaker + // For now, we consider output_bytes > 0 as "progress" + let had_progress = output_bytes > 0; + let error = gate.blockers.as_ref().and_then(|b| b.first()).map(|s| s.as_str()); + + if !circuit_breaker.record_iteration(had_progress, error) { + // Circuit breaker tripped + tracing::warn!( + task_id = %task_id, + reason = ?circuit_breaker.open_reason, + "Circuit breaker tripped, stopping autonomous loop" + ); + let msg = DaemonMessage::task_output( + task_id, + format!("\n[Autonomous Loop] Circuit breaker tripped: {}\n", + circuit_breaker.open_reason.as_deref().unwrap_or("Unknown reason") + ), + false, + ); + let _ = self.ws_tx.send(msg).await; + break 'autonomous_loop; + } + + let msg = DaemonMessage::task_output( + task_id, + format!("\n[Autonomous Loop] COMPLETION_GATE ready=false. Reason: {}. Restarting...\n", + gate.reason.unwrap_or_else(|| "Not complete".to_string()) + ), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // Continue to next iteration + continue 'autonomous_loop; + } + None => { + // No COMPLETION_GATE found - check circuit breaker and continue + tracing::info!( + task_id = %task_id, + iteration = iteration_count, + "No COMPLETION_GATE found, will restart with continuation prompt" + ); + + let had_progress = output_bytes > 0; + if !circuit_breaker.record_iteration(had_progress, None) { + tracing::warn!( + task_id = %task_id, + reason = ?circuit_breaker.open_reason, + "Circuit breaker tripped (no COMPLETION_GATE), stopping" + ); + let msg = DaemonMessage::task_output( + task_id, + format!("\n[Autonomous Loop] Circuit breaker tripped: {}\n", + circuit_breaker.open_reason.as_deref().unwrap_or("Unknown reason") + ), + false, + ); + let _ = self.ws_tx.send(msg).await; + break 'autonomous_loop; + } + + let msg = DaemonMessage::task_output( + task_id, + "\n[Autonomous Loop] No COMPLETION_GATE found. Restarting with --continue...\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + + continue 'autonomous_loop; + } + } + } else { + // Not in autonomous loop mode or process failed - exit normally + break 'autonomous_loop; + } + } // end 'autonomous_loop // Update state based on exit code - let success = exit_code == 0; + let success = final_exit_code == 0; let new_state = if success { TaskState::Completed } else { @@ -3142,7 +3408,7 @@ impl TaskManagerInner { tracing::info!( task_id = %task_id, - exit_code = exit_code, + exit_code = final_exit_code, success = success, new_state = ?new_state, "Claude process exited, updating task state" @@ -3154,7 +3420,7 @@ impl TaskManagerInner { task.state = new_state; task.completed_at = Some(Instant::now()); if !success { - task.error = Some(format!("Process exited with code {}", exit_code)); + task.error = Some(format!("Process exited with code {}", final_exit_code)); } } } @@ -3196,7 +3462,7 @@ impl TaskManagerInner { if is_supervisor { tracing::info!( task_id = %task_id, - exit_code = exit_code, + exit_code = final_exit_code, "Supervisor Claude process exited - NOT marking as complete" ); // Update local state to reflect it's paused/waiting for input @@ -3218,7 +3484,7 @@ impl TaskManagerInner { let error = if success { None } else { - Some(format!("Exit code: {}", exit_code)) + Some(format!("Exit code: {}", final_exit_code)) }; tracing::info!(task_id = %task_id, success = success, "Notifying server of task completion"); let msg = DaemonMessage::task_complete(task_id, success, error); |
