diff options
Diffstat (limited to 'makima/src/daemon/task')
| -rw-r--r-- | makima/src/daemon/task/completion_gate.rs | 402 | ||||
| -rw-r--r-- | makima/src/daemon/task/manager.rs | 500 | ||||
| -rw-r--r-- | makima/src/daemon/task/mod.rs | 2 | ||||
| -rw-r--r-- | makima/src/daemon/task/state.rs | 2 |
4 files changed, 789 insertions, 117 deletions
diff --git a/makima/src/daemon/task/completion_gate.rs b/makima/src/daemon/task/completion_gate.rs new file mode 100644 index 0000000..69b7c6a --- /dev/null +++ b/makima/src/daemon/task/completion_gate.rs @@ -0,0 +1,402 @@ +//! Completion gate parsing for autonomous loop mode. +//! +//! This module parses COMPLETION_GATE blocks from Claude's output to determine +//! if the task is truly complete. The format is inspired by Ralph's autonomous +//! development framework. +//! +//! Format: +//! ``` +//! <COMPLETION_GATE> +//! ready: true|false +//! reason: "explanation of completion status" +//! progress: "summary of what was accomplished" +//! blockers: ["list", "of", "blockers"] (optional, only when ready: false) +//! </COMPLETION_GATE> +//! ``` + +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// Represents a parsed COMPLETION_GATE block from Claude's output. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct CompletionGate { + /// Whether the task is ready to complete. + pub ready: bool, + /// Explanation of the completion status. + pub reason: Option<String>, + /// Summary of what was accomplished. + pub progress: Option<String>, + /// List of blockers if not ready. + pub blockers: Option<Vec<String>>, + /// Any additional fields that were parsed. + #[serde(flatten)] + pub extra: HashMap<String, serde_json::Value>, +} + +impl CompletionGate { + /// Parse a COMPLETION_GATE block from text output. + /// + /// Returns None if no valid COMPLETION_GATE is found. + pub fn parse(text: &str) -> Option<Self> { + // Find the COMPLETION_GATE block + let start_tag = "<COMPLETION_GATE>"; + let end_tag = "</COMPLETION_GATE>"; + + let start_idx = text.find(start_tag)?; + let end_idx = text.find(end_tag)?; + + if end_idx <= start_idx { + return None; + } + + let content = &text[start_idx + start_tag.len()..end_idx]; + let content = content.trim(); + + // Try to parse as JSON first + if content.starts_with('{') { + if let Ok(gate) = serde_json::from_str::<CompletionGate>(content) { + return Some(gate); + } + } + + // Fall back to YAML-like parsing + Self::parse_yaml_like(content) + } + + /// Parse a YAML-like format (key: value lines). + fn parse_yaml_like(content: &str) -> Option<Self> { + let mut gate = CompletionGate::default(); + + for line in content.lines() { + let line = line.trim(); + if line.is_empty() { + continue; + } + + if let Some((key, value)) = line.split_once(':') { + let key = key.trim().to_lowercase(); + let value = value.trim(); + + match key.as_str() { + "ready" => { + gate.ready = value.to_lowercase() == "true" + || value == "yes" + || value == "1"; + } + "reason" => { + gate.reason = Some(Self::unquote(value)); + } + "progress" => { + gate.progress = Some(Self::unquote(value)); + } + "blockers" => { + // Try to parse as JSON array + if let Ok(blockers) = serde_json::from_str::<Vec<String>>(value) { + gate.blockers = Some(blockers); + } else { + // Single blocker as string + gate.blockers = Some(vec![Self::unquote(value)]); + } + } + _ => { + // Store unknown fields + if let Ok(json_val) = serde_json::from_str(value) { + gate.extra.insert(key, json_val); + } else { + gate.extra.insert( + key, + serde_json::Value::String(Self::unquote(value)), + ); + } + } + } + } + } + + Some(gate) + } + + /// Remove surrounding quotes from a string value. + fn unquote(s: &str) -> String { + let s = s.trim(); + if (s.starts_with('"') && s.ends_with('"')) + || (s.starts_with('\'') && s.ends_with('\'')) + { + s[1..s.len() - 1].to_string() + } else { + s.to_string() + } + } + + /// Find all COMPLETION_GATE blocks in the output and return the last one. + /// + /// This is useful when Claude produces multiple completion gates during + /// a long-running task, and we want to use the final status. + pub fn parse_last(text: &str) -> Option<Self> { + let end_tag = "</COMPLETION_GATE>"; + let mut last_gate = None; + let mut search_start = 0; + + while let Some(end_idx) = text[search_start..].find(end_tag) { + let absolute_end = search_start + end_idx + end_tag.len(); + if let Some(gate) = Self::parse(&text[..absolute_end]) { + last_gate = Some(gate); + } + search_start = absolute_end; + } + + last_gate + } +} + +/// State tracking for the circuit breaker in autonomous loop mode. +#[derive(Debug, Clone, Default)] +pub struct CircuitBreaker { + /// Number of consecutive runs without file changes. + pub runs_without_changes: u32, + /// Threshold for opening circuit due to no changes (default: 3). + pub no_change_threshold: u32, + /// Number of consecutive runs with the same error. + pub same_error_count: u32, + /// Threshold for opening circuit due to same error (default: 5). + pub same_error_threshold: u32, + /// Last error message seen. + pub last_error: Option<String>, + /// Total number of loop iterations. + pub iteration_count: u32, + /// Maximum allowed iterations (default: 10). + pub max_iterations: u32, + /// Whether the circuit is open (task should stop). + pub is_open: bool, + /// Reason why circuit was opened. + pub open_reason: Option<String>, +} + +impl CircuitBreaker { + /// Create a new circuit breaker with default thresholds. + pub fn new() -> Self { + Self { + no_change_threshold: 3, + same_error_threshold: 5, + max_iterations: 10, + ..Default::default() + } + } + + /// Create with custom thresholds. + pub fn with_thresholds(no_change: u32, same_error: u32, max_iterations: u32) -> Self { + Self { + no_change_threshold: no_change, + same_error_threshold: same_error, + max_iterations, + ..Default::default() + } + } + + /// Record a new iteration. Returns true if circuit should remain closed. + pub fn record_iteration(&mut self, had_changes: bool, error: Option<&str>) -> bool { + self.iteration_count += 1; + + // Check max iterations + if self.iteration_count >= self.max_iterations { + self.is_open = true; + self.open_reason = Some(format!( + "Maximum iterations ({}) reached", + self.max_iterations + )); + return false; + } + + // Track file changes + if had_changes { + self.runs_without_changes = 0; + } else { + self.runs_without_changes += 1; + if self.runs_without_changes >= self.no_change_threshold { + self.is_open = true; + self.open_reason = Some(format!( + "No file changes for {} consecutive runs", + self.runs_without_changes + )); + return false; + } + } + + // Track errors + match (error, &self.last_error) { + (Some(err), Some(last)) if err == last => { + self.same_error_count += 1; + if self.same_error_count >= self.same_error_threshold { + self.is_open = true; + self.open_reason = Some(format!( + "Same error repeated {} times: {}", + self.same_error_count, err + )); + return false; + } + } + (Some(err), _) => { + self.last_error = Some(err.to_string()); + self.same_error_count = 1; + } + (None, _) => { + self.same_error_count = 0; + self.last_error = None; + } + } + + true // Circuit remains closed + } + + /// Check if the circuit breaker is open. + pub fn should_stop(&self) -> bool { + self.is_open + } + + /// Reset the circuit breaker. + pub fn reset(&mut self) { + *self = Self::with_thresholds( + self.no_change_threshold, + self.same_error_threshold, + self.max_iterations, + ); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_yaml_format() { + let text = r#" +Some output before +<COMPLETION_GATE> +ready: true +reason: "All tests pass" +progress: "Implemented feature X" +</COMPLETION_GATE> +More output after +"#; + + let gate = CompletionGate::parse(text).unwrap(); + assert!(gate.ready); + assert_eq!(gate.reason.as_deref(), Some("All tests pass")); + assert_eq!(gate.progress.as_deref(), Some("Implemented feature X")); + } + + #[test] + fn test_parse_not_ready() { + let text = r#" +<COMPLETION_GATE> +ready: false +reason: "Tests are failing" +blockers: ["Fix test_foo", "Fix test_bar"] +</COMPLETION_GATE> +"#; + + let gate = CompletionGate::parse(text).unwrap(); + assert!(!gate.ready); + assert_eq!(gate.reason.as_deref(), Some("Tests are failing")); + assert_eq!( + gate.blockers, + Some(vec!["Fix test_foo".to_string(), "Fix test_bar".to_string()]) + ); + } + + #[test] + fn test_parse_json_format() { + let text = r#" +<COMPLETION_GATE> +{ + "ready": true, + "reason": "Done", + "progress": "All good" +} +</COMPLETION_GATE> +"#; + + let gate = CompletionGate::parse(text).unwrap(); + assert!(gate.ready); + assert_eq!(gate.reason.as_deref(), Some("Done")); + } + + #[test] + fn test_parse_last_gate() { + let text = r#" +<COMPLETION_GATE> +ready: false +reason: "Still working" +</COMPLETION_GATE> +Some more work... +<COMPLETION_GATE> +ready: true +reason: "Finally done" +</COMPLETION_GATE> +"#; + + let gate = CompletionGate::parse_last(text).unwrap(); + assert!(gate.ready); + assert_eq!(gate.reason.as_deref(), Some("Finally done")); + } + + #[test] + fn test_no_gate() { + let text = "No completion gate here"; + assert!(CompletionGate::parse(text).is_none()); + } + + #[test] + fn test_circuit_breaker_max_iterations() { + let mut cb = CircuitBreaker::with_thresholds(3, 5, 5); + for _ in 0..4 { + assert!(cb.record_iteration(true, None)); + } + assert!(!cb.record_iteration(true, None)); // 5th iteration should trip + assert!(cb.is_open); + assert!(cb.open_reason.as_ref().unwrap().contains("Maximum iterations")); + } + + #[test] + fn test_circuit_breaker_no_changes() { + let mut cb = CircuitBreaker::with_thresholds(3, 5, 10); + assert!(cb.record_iteration(false, None)); // 1st no change + assert!(cb.record_iteration(false, None)); // 2nd no change + assert!(!cb.record_iteration(false, None)); // 3rd no change - trips + assert!(cb.is_open); + assert!(cb.open_reason.as_ref().unwrap().contains("No file changes")); + } + + #[test] + fn test_circuit_breaker_same_error() { + let mut cb = CircuitBreaker::with_thresholds(10, 3, 10); + let err = "Test failed"; + assert!(cb.record_iteration(true, Some(err))); + assert!(cb.record_iteration(true, Some(err))); + assert!(!cb.record_iteration(true, Some(err))); // 3rd same error - trips + assert!(cb.is_open); + assert!(cb.open_reason.as_ref().unwrap().contains("Same error")); + } + + #[test] + fn test_circuit_breaker_different_errors_ok() { + let mut cb = CircuitBreaker::with_thresholds(10, 3, 10); + assert!(cb.record_iteration(true, Some("error 1"))); + assert!(cb.record_iteration(true, Some("error 2"))); + assert!(cb.record_iteration(true, Some("error 3"))); + // Different errors don't trip the circuit + assert!(!cb.is_open); + } + + #[test] + fn test_circuit_breaker_changes_reset() { + let mut cb = CircuitBreaker::with_thresholds(3, 5, 10); + assert!(cb.record_iteration(false, None)); // 1 no change + assert!(cb.record_iteration(false, None)); // 2 no changes + assert!(cb.record_iteration(true, None)); // has changes - resets + assert!(cb.record_iteration(false, None)); // 1 no change again + assert!(cb.record_iteration(false, None)); // 2 no changes + // Still shouldn't trip because we had a change in between + assert!(!cb.is_open); + } +} diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs index 4ccedb2..75c884b 100644 --- a/makima/src/daemon/task/manager.rs +++ b/makima/src/daemon/task/manager.rs @@ -12,6 +12,7 @@ use uuid::Uuid; use std::collections::HashSet; +use super::completion_gate::{CircuitBreaker, CompletionGate}; use super::state::TaskState; use crate::daemon::error::{DaemonError, TaskError, TaskResult}; use crate::daemon::process::{ClaudeInputMessage, ProcessManager}; @@ -944,6 +945,8 @@ pub struct ManagedTask { pub copy_files: Option<Vec<String>>, /// Contract ID if this task is associated with a contract. pub contract_id: Option<Uuid>, + /// Whether to run in autonomous loop mode. + pub autonomous_loop: bool, /// Time task was created. pub created_at: Instant, /// Time task started running. @@ -973,6 +976,8 @@ pub struct TaskConfig { pub enable_permissions: bool, /// Disable verbose output. pub disable_verbose: bool, + /// Bubblewrap sandbox configuration. + pub bubblewrap: Option<crate::daemon::config::BubblewrapConfig>, } impl Default for TaskConfig { @@ -986,6 +991,7 @@ impl Default for TaskConfig { claude_pre_args: Vec::new(), enable_permissions: false, disable_verbose: false, + bubblewrap: None, } } } @@ -1027,7 +1033,8 @@ impl TaskManager { .with_pre_args(config.claude_pre_args.clone()) .with_permissions_enabled(config.enable_permissions) .with_verbose_disabled(config.disable_verbose) - .with_env(config.env_vars.clone()), + .with_env(config.env_vars.clone()) + .with_bubblewrap(config.bubblewrap.clone()), ); let temp_manager = Arc::new(TempManager::new()); @@ -1150,6 +1157,7 @@ impl TaskManager { copy_files, contract_id, is_supervisor, + autonomous_loop, } => { tracing::info!( task_id = %task_id, @@ -1161,6 +1169,7 @@ impl TaskManager { depth = depth, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, + autonomous_loop = autonomous_loop, target_repo_path = ?target_repo_path, completion_action = ?completion_action, continue_from_task_id = ?continue_from_task_id, @@ -1173,7 +1182,7 @@ impl TaskManager { task_id, task_name, plan, repo_url, base_branch, target_branch, parent_task_id, depth, is_orchestrator, is_supervisor, target_repo_path, completion_action, continue_from_task_id, - copy_files, contract_id + copy_files, contract_id, autonomous_loop ).await?; } DaemonCommand::PauseTask { task_id } => { @@ -1252,6 +1261,7 @@ impl TaskManager { None, // continue_from_task_id None, // copy_files contract_id, + false, // autonomous_loop - supervisors don't use this ).await { tracing::error!( task_id = %task_id, @@ -1421,6 +1431,17 @@ impl TaskManager { tracing::info!(task_id = %task_id, "Getting task diff"); self.handle_get_task_diff(task_id).await?; } + DaemonCommand::CleanupWorktree { + task_id, + delete_branch, + } => { + tracing::info!( + task_id = %task_id, + delete_branch = delete_branch, + "Cleaning up worktree" + ); + self.handle_cleanup_worktree(task_id, delete_branch).await?; + } } Ok(()) } @@ -1444,6 +1465,7 @@ impl TaskManager { continue_from_task_id: Option<Uuid>, copy_files: Option<Vec<String>>, contract_id: Option<Uuid>, + autonomous_loop: bool, ) -> TaskResult<()> { tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, "=== SPAWN_TASK START ==="); @@ -1496,6 +1518,7 @@ impl TaskManager { continue_from_task_id, copy_files: copy_files.clone(), contract_id, + autonomous_loop, created_at: Instant::now(), started_at: None, completed_at: None, @@ -1519,7 +1542,7 @@ impl TaskManager { if let Err(e) = inner.run_task( task_id, task_name, plan, repo_url, base_branch, target_branch, is_orchestrator, is_supervisor, target_repo_path, completion_action, - continue_from_task_id, copy_files, contract_id + continue_from_task_id, copy_files, contract_id, autonomous_loop ).await { tracing::error!(task_id = %task_id, error = %e, "Task execution failed"); inner.mark_failed(task_id, &e.to_string()).await; @@ -2046,6 +2069,76 @@ impl TaskManager { Ok(()) } + /// Handle CleanupWorktree command. + /// + /// Removes a task's worktree and optionally its branch. + /// Used when a contract is completed or deleted to clean up associated task worktrees. + async fn handle_cleanup_worktree( + &self, + task_id: Uuid, + delete_branch: bool, + ) -> Result<(), DaemonError> { + // Try to get the worktree path, but don't fail if not found + let worktree_result = self.get_task_worktree_path(task_id).await; + + let (success, message) = match worktree_result { + Ok(worktree_path) => { + // Remove the worktree + match self.worktree_manager.remove_worktree(&worktree_path, delete_branch).await { + Ok(()) => { + tracing::info!( + task_id = %task_id, + worktree_path = %worktree_path.display(), + delete_branch = delete_branch, + "Worktree cleaned up successfully" + ); + + // Also remove task from in-memory tracking + self.tasks.write().await.remove(&task_id); + self.task_inputs.write().await.remove(&task_id); + self.merge_trackers.write().await.remove(&task_id); + self.active_pids.write().await.remove(&task_id); + + (true, format!("Worktree cleaned up: {}", worktree_path.display())) + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + worktree_path = %worktree_path.display(), + error = %e, + "Failed to remove worktree" + ); + (false, format!("Failed to remove worktree: {}", e)) + } + } + } + Err(_) => { + // Worktree not found - this is OK, it may have already been cleaned up + tracing::debug!( + task_id = %task_id, + "No worktree found for task, may have already been cleaned up" + ); + + // Still remove from in-memory tracking + self.tasks.write().await.remove(&task_id); + self.task_inputs.write().await.remove(&task_id); + self.merge_trackers.write().await.remove(&task_id); + self.active_pids.write().await.remove(&task_id); + + (true, "No worktree found, task tracking cleaned up".to_string()) + } + }; + + // Send result back to server + let msg = DaemonMessage::CleanupWorktreeResult { + task_id, + success, + message, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + /// Handle ReadRepoFile command. /// /// Reads a file from a repository on the daemon's filesystem and sends @@ -2436,6 +2529,7 @@ impl TaskManagerInner { continue_from_task_id: Option<Uuid>, copy_files: Option<Vec<String>>, contract_id: Option<Uuid>, + autonomous_loop: bool, ) -> Result<(), DaemonError> { tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, "=== RUN_TASK START ==="); @@ -2908,6 +3002,9 @@ impl TaskManagerInner { ); let _ = self.ws_tx.send(msg).await; + // Clone extra_env for use in autonomous loop iterations + let extra_env_for_loop = extra_env.clone(); + tracing::debug!(task_id = %task_id, has_system_prompt = system_prompt.is_some(), "Calling process_manager.spawn()..."); let mut process = self.process_manager .spawn_with_system_prompt(&working_dir, &full_plan, extra_env, system_prompt.as_deref()) @@ -2934,7 +3031,7 @@ impl TaskManagerInner { // Get stdin handle for input forwarding and completion signaling let stdin_handle = process.stdin_handle(); - let stdin_handle_for_completion = stdin_handle.clone(); + let mut stdin_handle_for_completion = stdin_handle.clone(); tracing::info!(task_id = %task_id, "Setting up stdin forwarder for task input (JSON protocol)"); tokio::spawn(async move { @@ -2998,142 +3095,311 @@ impl TaskManagerInner { let daemon_hostname = hostname::get().ok().and_then(|h| h.into_string().ok()); let mut auth_error_handled = false; - let mut output_count = 0u64; - let mut output_bytes = 0usize; - let startup_timeout = tokio::time::Duration::from_secs(30); - let mut startup_check = tokio::time::interval(tokio::time::Duration::from_secs(5)); - startup_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - let startup_deadline = tokio::time::Instant::now() + startup_timeout; + // For autonomous loop mode: track accumulated output for COMPLETION_GATE detection + let mut accumulated_output = String::new(); + let mut circuit_breaker = CircuitBreaker::new(); + let mut iteration_count = 0u32; + let mut final_exit_code: i64 = -1; // Track the final exit code across iterations - loop { - tokio::select! { - maybe_line = process.next_output() => { - match maybe_line { - Some(line) => { - output_count += 1; - output_bytes += line.content.len(); - - if output_count == 1 { - tracing::info!(task_id = %task_id, "Received first output line from Claude"); - } - if output_count % 100 == 0 { - tracing::debug!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output progress"); - } + // Autonomous loop: we may run multiple iterations + 'autonomous_loop: loop { + iteration_count += 1; - // Log output details for debugging - tracing::trace!( - task_id = %task_id, - line_num = output_count, - content_len = line.content.len(), - is_stdout = line.is_stdout, - json_type = ?line.json_type, - "Forwarding output to WebSocket" - ); + if autonomous_loop && iteration_count > 1 { + tracing::info!( + task_id = %task_id, + iteration = iteration_count, + "Starting autonomous loop iteration" + ); + let msg = DaemonMessage::task_output( + task_id, + format!("\n[Autonomous Loop] Starting iteration {} (--continue mode)\n", iteration_count), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // For subsequent iterations, spawn with --continue flag + let continuation_prompt = "Continue working on the task. Review your previous output and progress. When you are completely done, output a COMPLETION_GATE block with ready: true."; + + process = self.process_manager + .spawn_continue(&working_dir, continuation_prompt, extra_env_for_loop.clone(), system_prompt.as_deref()) + .await + .map_err(|e| { + tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process for continuation"); + DaemonError::Task(TaskError::SetupFailed(e.to_string())) + })?; + + // Register the new process PID + if let Some(pid) = process.id() { + self.active_pids.write().await.insert(task_id, pid); + tracing::info!(task_id = %task_id, pid = pid, iteration = iteration_count, "Claude continue process spawned"); + } + + // Reset stdin handle for the new process + stdin_handle_for_completion = process.stdin_handle(); + } + + // Clear output for this iteration (we'll check for COMPLETION_GATE in the new output) + let mut iteration_output = String::new(); + + let mut output_count = 0u64; + let mut output_bytes = 0usize; + let startup_timeout = tokio::time::Duration::from_secs(30); + let mut startup_check = tokio::time::interval(tokio::time::Duration::from_secs(5)); + startup_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let startup_deadline = tokio::time::Instant::now() + startup_timeout; - // Check if this is a "result" message indicating task completion - // With --input-format=stream-json, Claude waits for more input after completion - // We close stdin to signal EOF and let the process exit - if line.json_type.as_deref() == Some("result") { - tracing::info!(task_id = %task_id, "Received result message, closing stdin to signal completion"); - let mut stdin_guard = stdin_handle_for_completion.lock().await; - if let Some(mut stdin) = stdin_guard.take() { - let _ = stdin.shutdown().await; + loop { + tokio::select! { + maybe_line = process.next_output() => { + match maybe_line { + Some(line) => { + output_count += 1; + output_bytes += line.content.len(); + + // Accumulate output for COMPLETION_GATE detection in autonomous loop mode + if autonomous_loop { + iteration_output.push_str(&line.content); + iteration_output.push('\n'); } - } - // Check for OAuth auth error before sending output - let content_for_auth_check = line.content.clone(); - let json_type_for_auth_check = line.json_type.clone(); - let is_stdout_for_auth_check = line.is_stdout; + if output_count == 1 { + tracing::info!(task_id = %task_id, "Received first output line from Claude"); + } + if output_count % 100 == 0 { + tracing::debug!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output progress"); + } - let msg = DaemonMessage::task_output(task_id, line.content, false); - if ws_tx.send(msg).await.is_err() { - tracing::warn!(task_id = %task_id, "Failed to send output, channel closed"); - break; - } + // Log output details for debugging + tracing::trace!( + task_id = %task_id, + line_num = output_count, + content_len = line.content.len(), + is_stdout = line.is_stdout, + json_type = ?line.json_type, + "Forwarding output to WebSocket" + ); - // Detect OAuth token expiration and trigger remote login flow - if !auth_error_handled && is_oauth_auth_error(&content_for_auth_check, json_type_for_auth_check.as_deref(), is_stdout_for_auth_check) { - auth_error_handled = true; - tracing::warn!(task_id = %task_id, "OAuth authentication error detected, initiating remote login flow"); - - // Spawn claude setup-token to get login URL - if let Some(login_url) = get_oauth_login_url(&claude_command).await { - tracing::info!(task_id = %task_id, login_url = %login_url, "Got OAuth login URL"); - let auth_msg = DaemonMessage::AuthenticationRequired { - task_id: Some(task_id), - login_url, - hostname: daemon_hostname.clone(), - }; - if ws_tx.send(auth_msg).await.is_err() { - tracing::warn!(task_id = %task_id, "Failed to send auth required message"); + // Check if this is a "result" message indicating task completion + // With --input-format=stream-json, Claude waits for more input after completion + // We close stdin to signal EOF and let the process exit + if line.json_type.as_deref() == Some("result") { + tracing::info!(task_id = %task_id, "Received result message, closing stdin to signal completion"); + let mut stdin_guard = stdin_handle_for_completion.lock().await; + if let Some(mut stdin) = stdin_guard.take() { + let _ = stdin.shutdown().await; + } + } + + // Check for OAuth auth error before sending output + let content_for_auth_check = line.content.clone(); + let json_type_for_auth_check = line.json_type.clone(); + let is_stdout_for_auth_check = line.is_stdout; + + let msg = DaemonMessage::task_output(task_id, line.content, false); + if ws_tx.send(msg).await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to send output, channel closed"); + break; + } + + // Detect OAuth token expiration and trigger remote login flow + if !auth_error_handled && is_oauth_auth_error(&content_for_auth_check, json_type_for_auth_check.as_deref(), is_stdout_for_auth_check) { + auth_error_handled = true; + tracing::warn!(task_id = %task_id, "OAuth authentication error detected, initiating remote login flow"); + + // Spawn claude setup-token to get login URL + if let Some(login_url) = get_oauth_login_url(&claude_command).await { + tracing::info!(task_id = %task_id, login_url = %login_url, "Got OAuth login URL"); + let auth_msg = DaemonMessage::AuthenticationRequired { + task_id: Some(task_id), + login_url, + hostname: daemon_hostname.clone(), + }; + if ws_tx.send(auth_msg).await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to send auth required message"); + } + } else { + tracing::error!(task_id = %task_id, "Failed to get OAuth login URL from setup-token"); + let fallback_msg = DaemonMessage::task_output( + task_id, + format!("Authentication required on daemon{}. Please run 'claude /login' on the daemon machine.\n", + daemon_hostname.as_ref().map(|h| format!(" ({})", h)).unwrap_or_default()), + false, + ); + let _ = ws_tx.send(fallback_msg).await; } - } else { - tracing::error!(task_id = %task_id, "Failed to get OAuth login URL from setup-token"); - let fallback_msg = DaemonMessage::task_output( - task_id, - format!("Authentication required on daemon{}. Please run 'claude /login' on the daemon machine.\n", - daemon_hostname.as_ref().map(|h| format!(" ({})", h)).unwrap_or_default()), - false, - ); - let _ = ws_tx.send(fallback_msg).await; } } - } - None => { - tracing::info!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output stream ended"); - break; + None => { + tracing::info!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output stream ended"); + break; + } } } - } - _ = startup_check.tick(), if output_count == 0 => { - // Check if process is still alive - match process.try_wait() { - Ok(Some(exit_code)) => { - tracing::error!(task_id = %task_id, exit_code = exit_code, "Claude process exited before producing output!"); - let msg = DaemonMessage::task_output( - task_id, - format!("Error: Claude process exited unexpectedly with code {}\n", exit_code), - false, - ); - let _ = ws_tx.send(msg).await; - break; - } - Ok(None) => { - // Still running but no output - if tokio::time::Instant::now() > startup_deadline { - tracing::warn!(task_id = %task_id, "Claude process not producing output after 30s - may be stuck"); + _ = startup_check.tick(), if output_count == 0 => { + // Check if process is still alive + match process.try_wait() { + Ok(Some(exit_code)) => { + tracing::error!(task_id = %task_id, exit_code = exit_code, "Claude process exited before producing output!"); let msg = DaemonMessage::task_output( task_id, - "Warning: Claude Code is taking longer than expected to start. It may be waiting for authentication or network access.\n".to_string(), + format!("Error: Claude process exited unexpectedly with code {}\n", exit_code), false, ); let _ = ws_tx.send(msg).await; - } else { - tracing::debug!(task_id = %task_id, "Claude process still running, waiting for output..."); + break; + } + Ok(None) => { + // Still running but no output + if tokio::time::Instant::now() > startup_deadline { + tracing::warn!(task_id = %task_id, "Claude process not producing output after 30s - may be stuck"); + let msg = DaemonMessage::task_output( + task_id, + "Warning: Claude Code is taking longer than expected to start. It may be waiting for authentication or network access.\n".to_string(), + false, + ); + let _ = ws_tx.send(msg).await; + } else { + tracing::debug!(task_id = %task_id, "Claude process still running, waiting for output..."); + } + } + Err(e) => { + tracing::error!(task_id = %task_id, error = %e, "Failed to check Claude process status"); } - } - Err(e) => { - tracing::error!(task_id = %task_id, error = %e, "Failed to check Claude process status"); } } } } - } - // Wait for process to exit - let exit_code = process.wait().await.unwrap_or(-1); + // Wait for process to exit + let exit_code = process.wait().await.unwrap_or(-1); + final_exit_code = exit_code; // Store for use after the loop + + // Unregister the process PID (process has exited) + self.active_pids.write().await.remove(&task_id); + tracing::debug!(task_id = %task_id, "Unregistered process PID"); + + // Clean up input channel for this task + self.task_inputs.write().await.remove(&task_id); + tracing::debug!(task_id = %task_id, "Removed task input channel"); - // Unregister the process PID (process has exited) - self.active_pids.write().await.remove(&task_id); - tracing::debug!(task_id = %task_id, "Unregistered process PID"); + // Accumulate this iteration's output + accumulated_output.push_str(&iteration_output); - // Clean up input channel for this task - self.task_inputs.write().await.remove(&task_id); - tracing::debug!(task_id = %task_id, "Removed task input channel"); + // === AUTONOMOUS LOOP LOGIC === + // Check if we should continue or complete + if autonomous_loop && exit_code == 0 { + // Check for COMPLETION_GATE in the output + let completion_gate = CompletionGate::parse_last(&iteration_output); + + match completion_gate { + Some(gate) if gate.ready => { + tracing::info!( + task_id = %task_id, + iteration = iteration_count, + reason = ?gate.reason, + "COMPLETION_GATE ready=true detected, task complete" + ); + let msg = DaemonMessage::task_output( + task_id, + format!("\n[Autonomous Loop] Task completed after {} iteration(s). Reason: {}\n", + iteration_count, + gate.reason.unwrap_or_else(|| "Task complete".to_string()) + ), + false, + ); + let _ = self.ws_tx.send(msg).await; + break 'autonomous_loop; + } + Some(gate) => { + // COMPLETION_GATE found but not ready + tracing::info!( + task_id = %task_id, + iteration = iteration_count, + reason = ?gate.reason, + blockers = ?gate.blockers, + "COMPLETION_GATE ready=false, will continue" + ); + + // Check circuit breaker + // For now, we consider output_bytes > 0 as "progress" + let had_progress = output_bytes > 0; + let error = gate.blockers.as_ref().and_then(|b| b.first()).map(|s| s.as_str()); + + if !circuit_breaker.record_iteration(had_progress, error) { + // Circuit breaker tripped + tracing::warn!( + task_id = %task_id, + reason = ?circuit_breaker.open_reason, + "Circuit breaker tripped, stopping autonomous loop" + ); + let msg = DaemonMessage::task_output( + task_id, + format!("\n[Autonomous Loop] Circuit breaker tripped: {}\n", + circuit_breaker.open_reason.as_deref().unwrap_or("Unknown reason") + ), + false, + ); + let _ = self.ws_tx.send(msg).await; + break 'autonomous_loop; + } + + let msg = DaemonMessage::task_output( + task_id, + format!("\n[Autonomous Loop] COMPLETION_GATE ready=false. Reason: {}. Restarting...\n", + gate.reason.unwrap_or_else(|| "Not complete".to_string()) + ), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // Continue to next iteration + continue 'autonomous_loop; + } + None => { + // No COMPLETION_GATE found - check circuit breaker and continue + tracing::info!( + task_id = %task_id, + iteration = iteration_count, + "No COMPLETION_GATE found, will restart with continuation prompt" + ); + + let had_progress = output_bytes > 0; + if !circuit_breaker.record_iteration(had_progress, None) { + tracing::warn!( + task_id = %task_id, + reason = ?circuit_breaker.open_reason, + "Circuit breaker tripped (no COMPLETION_GATE), stopping" + ); + let msg = DaemonMessage::task_output( + task_id, + format!("\n[Autonomous Loop] Circuit breaker tripped: {}\n", + circuit_breaker.open_reason.as_deref().unwrap_or("Unknown reason") + ), + false, + ); + let _ = self.ws_tx.send(msg).await; + break 'autonomous_loop; + } + + let msg = DaemonMessage::task_output( + task_id, + "\n[Autonomous Loop] No COMPLETION_GATE found. Restarting with --continue...\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + + continue 'autonomous_loop; + } + } + } else { + // Not in autonomous loop mode or process failed - exit normally + break 'autonomous_loop; + } + } // end 'autonomous_loop // Update state based on exit code - let success = exit_code == 0; + let success = final_exit_code == 0; let new_state = if success { TaskState::Completed } else { @@ -3142,7 +3408,7 @@ impl TaskManagerInner { tracing::info!( task_id = %task_id, - exit_code = exit_code, + exit_code = final_exit_code, success = success, new_state = ?new_state, "Claude process exited, updating task state" @@ -3154,7 +3420,7 @@ impl TaskManagerInner { task.state = new_state; task.completed_at = Some(Instant::now()); if !success { - task.error = Some(format!("Process exited with code {}", exit_code)); + task.error = Some(format!("Process exited with code {}", final_exit_code)); } } } @@ -3196,7 +3462,7 @@ impl TaskManagerInner { if is_supervisor { tracing::info!( task_id = %task_id, - exit_code = exit_code, + exit_code = final_exit_code, "Supervisor Claude process exited - NOT marking as complete" ); // Update local state to reflect it's paused/waiting for input @@ -3218,7 +3484,7 @@ impl TaskManagerInner { let error = if success { None } else { - Some(format!("Exit code: {}", exit_code)) + Some(format!("Exit code: {}", final_exit_code)) }; tracing::info!(task_id = %task_id, success = success, "Notifying server of task completion"); let msg = DaemonMessage::task_complete(task_id, success, error); diff --git a/makima/src/daemon/task/mod.rs b/makima/src/daemon/task/mod.rs index 29c261e..3830e1d 100644 --- a/makima/src/daemon/task/mod.rs +++ b/makima/src/daemon/task/mod.rs @@ -1,7 +1,9 @@ //! Task management and execution. +pub mod completion_gate; pub mod manager; pub mod state; +pub use completion_gate::CompletionGate; pub use manager::{ManagedTask, TaskConfig, TaskManager}; pub use state::TaskState; diff --git a/makima/src/daemon/task/state.rs b/makima/src/daemon/task/state.rs index ca5fc01..7b59b62 100644 --- a/makima/src/daemon/task/state.rs +++ b/makima/src/daemon/task/state.rs @@ -124,7 +124,9 @@ impl Default for TaskState { #[cfg(test)] mod tests { + #[allow(unused_imports)] use crate::daemon::*; + use super::TaskState; #[test] fn test_valid_transitions() { |
