diff options
Diffstat (limited to 'makima/src/daemon')
| -rw-r--r-- | makima/src/daemon/api/supervisor.rs | 11 | ||||
| -rw-r--r-- | makima/src/daemon/cli/daemon.rs | 5 | ||||
| -rw-r--r-- | makima/src/daemon/cli/mod.rs | 6 | ||||
| -rw-r--r-- | makima/src/daemon/cli/supervisor.rs | 26 | ||||
| -rw-r--r-- | makima/src/daemon/config.rs | 43 | ||||
| -rw-r--r-- | makima/src/daemon/process/claude.rs | 373 | ||||
| -rw-r--r-- | makima/src/daemon/task/completion_gate.rs | 402 | ||||
| -rw-r--r-- | makima/src/daemon/task/manager.rs | 500 | ||||
| -rw-r--r-- | makima/src/daemon/task/mod.rs | 2 | ||||
| -rw-r--r-- | makima/src/daemon/task/state.rs | 2 | ||||
| -rw-r--r-- | makima/src/daemon/ws/protocol.rs | 22 |
11 files changed, 1258 insertions, 134 deletions
diff --git a/makima/src/daemon/api/supervisor.rs b/makima/src/daemon/api/supervisor.rs index 0a68980..8b3d480 100644 --- a/makima/src/daemon/api/supervisor.rs +++ b/makima/src/daemon/api/supervisor.rs @@ -228,4 +228,15 @@ impl ApiClient { self.post(&format!("/api/v1/contracts/{}/phase", contract_id), &req) .await } + + /// Get individual task details. + pub async fn supervisor_get_task(&self, task_id: Uuid) -> Result<JsonValue, ApiError> { + self.get(&format!("/api/v1/mesh/tasks/{}", task_id)).await + } + + /// Get task output/claude log. + pub async fn supervisor_get_task_output(&self, task_id: Uuid) -> Result<JsonValue, ApiError> { + self.get(&format!("/api/v1/mesh/tasks/{}/output", task_id)) + .await + } } diff --git a/makima/src/daemon/cli/daemon.rs b/makima/src/daemon/cli/daemon.rs index de4cff4..c779d64 100644 --- a/makima/src/daemon/cli/daemon.rs +++ b/makima/src/daemon/cli/daemon.rs @@ -33,4 +33,9 @@ pub struct DaemonArgs { /// Log level (trace, debug, info, warn, error) #[arg(short, long, default_value = "info")] pub log_level: String, + + /// Enable bubblewrap sandbox for Claude processes. + /// Requires bwrap to be installed on the system. + #[arg(long, env = "MAKIMA_DAEMON_BUBBLEWRAP")] + pub bubblewrap: bool, } diff --git a/makima/src/daemon/cli/mod.rs b/makima/src/daemon/cli/mod.rs index 1a49399..da71b0d 100644 --- a/makima/src/daemon/cli/mod.rs +++ b/makima/src/daemon/cli/mod.rs @@ -82,6 +82,12 @@ pub enum SupervisorCommand { /// Ask a question and wait for user feedback Ask(supervisor::AskArgs), + + /// Get individual task details + Task(supervisor::GetTaskArgs), + + /// Get task output/claude log + Output(supervisor::GetTaskOutputArgs), } /// Contract subcommands for task-contract interaction. diff --git a/makima/src/daemon/cli/supervisor.rs b/makima/src/daemon/cli/supervisor.rs index dc534b5..2bc4c89 100644 --- a/makima/src/daemon/cli/supervisor.rs +++ b/makima/src/daemon/cli/supervisor.rs @@ -14,9 +14,9 @@ pub struct SupervisorArgs { #[arg(long, env = "MAKIMA_API_KEY")] pub api_key: String, - /// Current task ID (optional) + /// Current task ID (optional) - the supervisor's own task ID #[arg(long, env = "MAKIMA_TASK_ID")] - pub task_id: Option<Uuid>, + pub self_task_id: Option<Uuid>, /// Contract ID #[arg(long, env = "MAKIMA_CONTRACT_ID")] @@ -199,3 +199,25 @@ pub struct AdvancePhaseArgs { #[arg(index = 1)] pub phase: String, } + +/// Arguments for task command (get individual task details). +#[derive(Args, Debug)] +pub struct GetTaskArgs { + #[command(flatten)] + pub common: SupervisorArgs, + + /// Task ID to get details for + #[arg(index = 1, id = "target_task_id")] + pub target_task_id: Uuid, +} + +/// Arguments for output command (get task output/claude log). +#[derive(Args, Debug)] +pub struct GetTaskOutputArgs { + #[command(flatten)] + pub common: SupervisorArgs, + + /// Task ID to get output for + #[arg(index = 1, id = "target_task_id")] + pub target_task_id: Uuid, +} diff --git a/makima/src/daemon/config.rs b/makima/src/daemon/config.rs index 866ee70..512b822 100644 --- a/makima/src/daemon/config.rs +++ b/makima/src/daemon/config.rs @@ -5,6 +5,38 @@ use serde::Deserialize; use std::collections::HashMap; use std::path::PathBuf; +/// Bubblewrap sandbox configuration for Claude processes. +#[derive(Debug, Clone, Deserialize, Default)] +pub struct BubblewrapConfig { + /// Enable bubblewrap sandboxing. + #[serde(default)] + pub enabled: bool, + + /// Path to bwrap binary (default: 'bwrap'). + #[serde(default = "default_bwrap_command")] + pub bwrap_command: String, + + /// Allow network access inside sandbox (default: true). + #[serde(default = "default_true")] + pub network: bool, + + /// Additional paths to bind read-only. + #[serde(default)] + pub ro_bind: Vec<PathBuf>, + + /// Additional paths to bind read-write. + #[serde(default)] + pub rw_bind: Vec<PathBuf>, +} + +fn default_bwrap_command() -> String { + "bwrap".to_string() +} + +fn default_true() -> bool { + true +} + /// Root daemon configuration. #[derive(Debug, Clone, Deserialize)] pub struct DaemonConfig { @@ -177,6 +209,10 @@ pub struct ProcessConfig { /// Additional environment variables to pass to Claude Code. #[serde(default, alias = "envvars")] pub env_vars: HashMap<String, String>, + + /// Bubblewrap sandbox configuration. + #[serde(default)] + pub bubblewrap: BubblewrapConfig, } fn default_claude_command() -> String { @@ -198,6 +234,7 @@ impl Default for ProcessConfig { max_concurrent_tasks: default_max_tasks(), default_timeout_secs: 0, env_vars: HashMap::new(), + bubblewrap: BubblewrapConfig::default(), } } } @@ -478,6 +515,11 @@ impl DaemonConfig { // Log level is always set (has default) config.logging.level = args.log_level.clone(); + // Enable bubblewrap if --bubblewrap flag is set + if args.bubblewrap { + config.process.bubblewrap.enabled = true; + } + // Validate required fields after all sources are merged config.validate()?; @@ -511,6 +553,7 @@ impl DaemonConfig { max_concurrent_tasks: 2, default_timeout_secs: 0, env_vars: HashMap::new(), + bubblewrap: BubblewrapConfig::default(), }, local_db: LocalDbConfig { path: PathBuf::from("/tmp/makima-daemon-test/state.db"), diff --git a/makima/src/daemon/process/claude.rs b/makima/src/daemon/process/claude.rs index 536d883..f3aa421 100644 --- a/makima/src/daemon/process/claude.rs +++ b/makima/src/daemon/process/claude.rs @@ -1,7 +1,7 @@ //! Claude Code process management. use std::collections::HashMap; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::process::Stdio; use std::sync::Arc; @@ -11,6 +11,7 @@ use tokio::process::{Child, ChildStdin, Command}; use tokio::sync::{mpsc, Mutex}; use super::claude_protocol::ClaudeInputMessage; +use crate::daemon::config::BubblewrapConfig; /// Errors that can occur during Claude process management. #[derive(Debug, thiserror::Error)] @@ -26,6 +27,9 @@ pub enum ClaudeProcessError { #[error("Failed to read output: {0}")] OutputRead(String), + + #[error("Bubblewrap (bwrap) not found. Install bubblewrap or disable the --bubblewrap flag.")] + BubblewrapNotFound, } /// A line of output from Claude Code. @@ -234,6 +238,8 @@ pub struct ProcessManager { disable_verbose: bool, /// Default environment variables to pass. default_env: HashMap<String, String>, + /// Bubblewrap sandbox configuration. + bubblewrap: Option<BubblewrapConfig>, } impl Default for ProcessManager { @@ -252,6 +258,7 @@ impl ProcessManager { enable_permissions: false, disable_verbose: false, default_env: HashMap::new(), + bubblewrap: None, } } @@ -264,6 +271,7 @@ impl ProcessManager { enable_permissions: false, disable_verbose: false, default_env: HashMap::new(), + bubblewrap: None, } } @@ -297,11 +305,147 @@ impl ProcessManager { self } + /// Configure bubblewrap sandboxing. + /// + /// When enabled, Claude processes will be spawned inside a bubblewrap sandbox + /// with filesystem isolation. + pub fn with_bubblewrap(mut self, config: Option<BubblewrapConfig>) -> Self { + self.bubblewrap = config; + self + } + /// Get the claude command path. pub fn claude_command(&self) -> &str { &self.claude_command } + /// Check if bubblewrap (bwrap) is available on the system. + /// + /// Returns the bwrap version string if available. + pub async fn check_bwrap_available(&self) -> Result<String, ClaudeProcessError> { + let bwrap_cmd = self + .bubblewrap + .as_ref() + .map(|c| c.bwrap_command.as_str()) + .unwrap_or("bwrap"); + + let output = Command::new(bwrap_cmd) + .arg("--version") + .output() + .await + .map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + ClaudeProcessError::BubblewrapNotFound + } else { + ClaudeProcessError::SpawnFailed(e) + } + })?; + + if output.status.success() { + Ok(String::from_utf8_lossy(&output.stdout).trim().to_string()) + } else { + Err(ClaudeProcessError::BubblewrapNotFound) + } + } + + /// Build bwrap command arguments for sandboxing. + /// + /// Returns a tuple of (bwrap_command, bwrap_args) where bwrap_args includes + /// all the sandbox flags followed by "--" and the actual command to run. + fn build_bwrap_args( + &self, + working_dir: &Path, + claude_command: &str, + claude_args: &[String], + config: &BubblewrapConfig, + ) -> (String, Vec<String>) { + let mut args = Vec::new(); + + // Unshare all namespaces except user (for unprivileged use) + args.push("--unshare-all".to_string()); + + // Share network if enabled (needed for API calls) + if config.network { + args.push("--share-net".to_string()); + } + + // Safety flags + args.push("--die-with-parent".to_string()); + args.push("--new-session".to_string()); + + // Bind root filesystem read-only + args.push("--ro-bind".to_string()); + args.push("/".to_string()); + args.push("/".to_string()); + + // Mount fresh /dev + args.push("--dev".to_string()); + args.push("/dev".to_string()); + + // Mount fresh /proc + args.push("--proc".to_string()); + args.push("/proc".to_string()); + + // Fresh /tmp + args.push("--tmpfs".to_string()); + args.push("/tmp".to_string()); + + // Bind working directory (worktree) read-write + let working_dir_str = working_dir.to_string_lossy().to_string(); + args.push("--bind".to_string()); + args.push(working_dir_str.clone()); + args.push(working_dir_str); + + // Bind ~/.claude read-write if it exists (for Claude config) + if let Ok(home) = std::env::var("HOME") { + let claude_config_dir = PathBuf::from(&home).join(".claude"); + if claude_config_dir.exists() { + let claude_config_str = claude_config_dir.to_string_lossy().to_string(); + args.push("--bind".to_string()); + args.push(claude_config_str.clone()); + args.push(claude_config_str); + } + + // Also bind ~/.config/claude if it exists (alternative config location) + let claude_config_alt = PathBuf::from(&home).join(".config").join("claude"); + if claude_config_alt.exists() { + let config_str = claude_config_alt.to_string_lossy().to_string(); + args.push("--bind".to_string()); + args.push(config_str.clone()); + args.push(config_str); + } + } + + // Additional read-only binds from config + for path in &config.ro_bind { + if path.exists() { + let path_str = path.to_string_lossy().to_string(); + args.push("--ro-bind".to_string()); + args.push(path_str.clone()); + args.push(path_str); + } + } + + // Additional read-write binds from config + for path in &config.rw_bind { + if path.exists() { + let path_str = path.to_string_lossy().to_string(); + args.push("--bind".to_string()); + args.push(path_str.clone()); + args.push(path_str); + } + } + + // Separator before the actual command + args.push("--".to_string()); + + // Add the claude command and its arguments + args.push(claude_command.to_string()); + args.extend(claude_args.iter().cloned()); + + (config.bwrap_command.clone(), args) + } + /// Spawn a Claude Code process to execute a plan. /// /// The process runs in the specified working directory with stream-json output format. @@ -327,11 +471,25 @@ impl ProcessManager { extra_env: Option<HashMap<String, String>>, system_prompt: Option<&str>, ) -> Result<ClaudeProcess, ClaudeProcessError> { + // Check if bubblewrap is enabled and available + let use_bubblewrap = if let Some(ref bwrap_config) = self.bubblewrap { + if bwrap_config.enabled { + // Verify bwrap is available before proceeding + self.check_bwrap_available().await?; + true + } else { + false + } + } else { + false + }; + tracing::info!( working_dir = %working_dir.display(), plan_len = plan.len(), plan_preview = %if plan.len() > 200 { &plan[..200] } else { plan }, has_system_prompt = system_prompt.is_some(), + bubblewrap_enabled = use_bubblewrap, "Spawning Claude Code process" ); @@ -350,37 +508,52 @@ impl ProcessManager { env.extend(extra); } - // Build arguments list - let mut args = Vec::new(); + // Build Claude arguments list + let mut claude_args = Vec::new(); // Pre-args (before defaults) - args.extend(self.claude_pre_args.clone()); + claude_args.extend(self.claude_pre_args.clone()); // Required arguments for stream-json protocol - args.push("--output-format=stream-json".to_string()); - args.push("--input-format=stream-json".to_string()); + claude_args.push("--output-format=stream-json".to_string()); + claude_args.push("--input-format=stream-json".to_string()); // Optional default arguments if !self.disable_verbose { - args.push("--verbose".to_string()); + claude_args.push("--verbose".to_string()); } if !self.enable_permissions { - args.push("--dangerously-skip-permissions".to_string()); + claude_args.push("--dangerously-skip-permissions".to_string()); } // System prompt - passed via --system-prompt flag for system-level constraints if let Some(prompt) = system_prompt { - args.push("--system-prompt".to_string()); - args.push(prompt.to_string()); + claude_args.push("--system-prompt".to_string()); + claude_args.push(prompt.to_string()); } // Additional user-configured arguments - args.extend(self.claude_args.clone()); - - tracing::debug!(args = ?args, "Claude command arguments"); + claude_args.extend(self.claude_args.clone()); + + // Determine the actual command and arguments to spawn + let (command, args) = if use_bubblewrap { + let bwrap_config = self.bubblewrap.as_ref().unwrap(); + let (bwrap_cmd, bwrap_args) = + self.build_bwrap_args(working_dir, &self.claude_command, &claude_args, bwrap_config); + tracing::info!( + bwrap_command = %bwrap_cmd, + bwrap_args_count = bwrap_args.len(), + "Running Claude in bubblewrap sandbox" + ); + tracing::debug!(bwrap_args = ?bwrap_args, "Bubblewrap arguments"); + (bwrap_cmd, bwrap_args) + } else { + tracing::debug!(args = ?claude_args, "Claude command arguments"); + (self.claude_command.clone(), claude_args) + }; // Spawn the process - let mut child = Command::new(&self.claude_command) + let mut child = Command::new(&command) .args(&args) .current_dir(working_dir) .envs(env) @@ -391,7 +564,11 @@ impl ProcessManager { .spawn() .map_err(|e| { if e.kind() == std::io::ErrorKind::NotFound { - ClaudeProcessError::CommandNotFound(self.claude_command.clone()) + if use_bubblewrap { + ClaudeProcessError::BubblewrapNotFound + } else { + ClaudeProcessError::CommandNotFound(self.claude_command.clone()) + } } else { ClaudeProcessError::SpawnFailed(e) } @@ -487,6 +664,172 @@ impl ProcessManager { Ok(process) } + /// Spawn a Claude Code process in continuation mode. + /// + /// This is used for the autonomous loop feature where we need to continue + /// a previous conversation. The --continue flag tells Claude to resume + /// from the previous session state. + pub async fn spawn_continue( + &self, + working_dir: &Path, + continuation_prompt: &str, + extra_env: Option<HashMap<String, String>>, + system_prompt: Option<&str>, + ) -> Result<ClaudeProcess, ClaudeProcessError> { + tracing::info!( + working_dir = %working_dir.display(), + prompt_len = continuation_prompt.len(), + has_system_prompt = system_prompt.is_some(), + "Spawning Claude Code process in continuation mode" + ); + + // Verify working directory exists + if !working_dir.exists() { + tracing::error!(working_dir = %working_dir.display(), "Working directory does not exist!"); + return Err(ClaudeProcessError::SpawnFailed(std::io::Error::new( + std::io::ErrorKind::NotFound, + format!("Working directory does not exist: {}", working_dir.display()), + ))); + } + + // Build environment + let mut env = self.default_env.clone(); + if let Some(extra) = extra_env { + env.extend(extra); + } + + // Build arguments list + let mut args = Vec::new(); + + // Pre-args (before defaults) + args.extend(self.claude_pre_args.clone()); + + // Required arguments for stream-json protocol + args.push("--output-format=stream-json".to_string()); + args.push("--input-format=stream-json".to_string()); + + // The key flag for continuation mode + args.push("--continue".to_string()); + + // Optional default arguments + if !self.disable_verbose { + args.push("--verbose".to_string()); + } + if !self.enable_permissions { + args.push("--dangerously-skip-permissions".to_string()); + } + + // System prompt - passed via --system-prompt flag for system-level constraints + if let Some(prompt) = system_prompt { + args.push("--system-prompt".to_string()); + args.push(prompt.to_string()); + } + + // Additional user-configured arguments + args.extend(self.claude_args.clone()); + + tracing::debug!(args = ?args, "Claude continue command arguments"); + + // Spawn the process + let mut child = Command::new(&self.claude_command) + .args(&args) + .current_dir(working_dir) + .envs(env) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .kill_on_drop(true) + .spawn() + .map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + ClaudeProcessError::CommandNotFound(self.claude_command.clone()) + } else { + ClaudeProcessError::SpawnFailed(e) + } + })?; + + // Create output channel + let (tx, rx) = mpsc::channel(1000); + + // Take stdout, stderr, and stdin + let stdin = child.stdin.take(); + let stdin = Arc::new(Mutex::new(stdin)); + + let stdout = child.stdout.take().expect("stdout should be piped"); + let stderr = child.stderr.take().expect("stderr should be piped"); + + // Spawn task to read stdout + let tx_stdout = tx.clone(); + tokio::spawn(async move { + use tokio::io::AsyncReadExt; + let mut reader = BufReader::new(stdout); + let mut buffer = vec![0u8; 4096]; + let mut line_buffer = String::new(); + + loop { + match tokio::time::timeout( + tokio::time::Duration::from_secs(5), + reader.read(&mut buffer) + ).await { + Ok(Ok(0)) => { + tracing::debug!("Claude stdout EOF (continue mode)"); + if !line_buffer.is_empty() { + let _ = tx_stdout.send(OutputLine::stdout(line_buffer)).await; + } + break; + } + Ok(Ok(n)) => { + let chunk = String::from_utf8_lossy(&buffer[..n]); + line_buffer.push_str(&chunk); + while let Some(newline_pos) = line_buffer.find('\n') { + let line = line_buffer[..newline_pos].to_string(); + line_buffer = line_buffer[newline_pos + 1..].to_string(); + if tx_stdout.send(OutputLine::stdout(line)).await.is_err() { + return; + } + } + } + Ok(Err(e)) => { + tracing::error!(error = %e, "Error reading Claude stdout (continue mode)"); + break; + } + Err(_) => { + tracing::warn!("No stdout data from Claude for 5 seconds (continue mode)"); + } + } + } + tracing::debug!("Claude stdout reader task ended (continue mode)"); + }); + + // Spawn task to read stderr + let tx_stderr = tx; + tokio::spawn(async move { + let reader = BufReader::new(stderr); + let mut lines = reader.lines(); + while let Ok(Some(line)) = lines.next_line().await { + tracing::debug!(line = %line, "Claude stderr (continue mode)"); + if tx_stderr.send(OutputLine::stderr(line)).await.is_err() { + break; + } + } + tracing::debug!("Claude stderr reader task ended (continue mode)"); + }); + + tracing::info!("Claude Code process spawned successfully in continue mode"); + + let process = ClaudeProcess { + child, + output_rx: rx, + stdin, + }; + + // Send the continuation prompt as a user message + tracing::info!(prompt_len = continuation_prompt.len(), "Sending continuation prompt to Claude via stdin"); + process.send_user_message(continuation_prompt).await?; + + Ok(process) + } + /// Check if the claude command is available. pub async fn check_claude_available(&self) -> Result<String, ClaudeProcessError> { let output = Command::new(&self.claude_command) diff --git a/makima/src/daemon/task/completion_gate.rs b/makima/src/daemon/task/completion_gate.rs new file mode 100644 index 0000000..69b7c6a --- /dev/null +++ b/makima/src/daemon/task/completion_gate.rs @@ -0,0 +1,402 @@ +//! Completion gate parsing for autonomous loop mode. +//! +//! This module parses COMPLETION_GATE blocks from Claude's output to determine +//! if the task is truly complete. The format is inspired by Ralph's autonomous +//! development framework. +//! +//! Format: +//! ``` +//! <COMPLETION_GATE> +//! ready: true|false +//! reason: "explanation of completion status" +//! progress: "summary of what was accomplished" +//! blockers: ["list", "of", "blockers"] (optional, only when ready: false) +//! </COMPLETION_GATE> +//! ``` + +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// Represents a parsed COMPLETION_GATE block from Claude's output. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct CompletionGate { + /// Whether the task is ready to complete. + pub ready: bool, + /// Explanation of the completion status. + pub reason: Option<String>, + /// Summary of what was accomplished. + pub progress: Option<String>, + /// List of blockers if not ready. + pub blockers: Option<Vec<String>>, + /// Any additional fields that were parsed. + #[serde(flatten)] + pub extra: HashMap<String, serde_json::Value>, +} + +impl CompletionGate { + /// Parse a COMPLETION_GATE block from text output. + /// + /// Returns None if no valid COMPLETION_GATE is found. + pub fn parse(text: &str) -> Option<Self> { + // Find the COMPLETION_GATE block + let start_tag = "<COMPLETION_GATE>"; + let end_tag = "</COMPLETION_GATE>"; + + let start_idx = text.find(start_tag)?; + let end_idx = text.find(end_tag)?; + + if end_idx <= start_idx { + return None; + } + + let content = &text[start_idx + start_tag.len()..end_idx]; + let content = content.trim(); + + // Try to parse as JSON first + if content.starts_with('{') { + if let Ok(gate) = serde_json::from_str::<CompletionGate>(content) { + return Some(gate); + } + } + + // Fall back to YAML-like parsing + Self::parse_yaml_like(content) + } + + /// Parse a YAML-like format (key: value lines). + fn parse_yaml_like(content: &str) -> Option<Self> { + let mut gate = CompletionGate::default(); + + for line in content.lines() { + let line = line.trim(); + if line.is_empty() { + continue; + } + + if let Some((key, value)) = line.split_once(':') { + let key = key.trim().to_lowercase(); + let value = value.trim(); + + match key.as_str() { + "ready" => { + gate.ready = value.to_lowercase() == "true" + || value == "yes" + || value == "1"; + } + "reason" => { + gate.reason = Some(Self::unquote(value)); + } + "progress" => { + gate.progress = Some(Self::unquote(value)); + } + "blockers" => { + // Try to parse as JSON array + if let Ok(blockers) = serde_json::from_str::<Vec<String>>(value) { + gate.blockers = Some(blockers); + } else { + // Single blocker as string + gate.blockers = Some(vec![Self::unquote(value)]); + } + } + _ => { + // Store unknown fields + if let Ok(json_val) = serde_json::from_str(value) { + gate.extra.insert(key, json_val); + } else { + gate.extra.insert( + key, + serde_json::Value::String(Self::unquote(value)), + ); + } + } + } + } + } + + Some(gate) + } + + /// Remove surrounding quotes from a string value. + fn unquote(s: &str) -> String { + let s = s.trim(); + if (s.starts_with('"') && s.ends_with('"')) + || (s.starts_with('\'') && s.ends_with('\'')) + { + s[1..s.len() - 1].to_string() + } else { + s.to_string() + } + } + + /// Find all COMPLETION_GATE blocks in the output and return the last one. + /// + /// This is useful when Claude produces multiple completion gates during + /// a long-running task, and we want to use the final status. + pub fn parse_last(text: &str) -> Option<Self> { + let end_tag = "</COMPLETION_GATE>"; + let mut last_gate = None; + let mut search_start = 0; + + while let Some(end_idx) = text[search_start..].find(end_tag) { + let absolute_end = search_start + end_idx + end_tag.len(); + if let Some(gate) = Self::parse(&text[..absolute_end]) { + last_gate = Some(gate); + } + search_start = absolute_end; + } + + last_gate + } +} + +/// State tracking for the circuit breaker in autonomous loop mode. +#[derive(Debug, Clone, Default)] +pub struct CircuitBreaker { + /// Number of consecutive runs without file changes. + pub runs_without_changes: u32, + /// Threshold for opening circuit due to no changes (default: 3). + pub no_change_threshold: u32, + /// Number of consecutive runs with the same error. + pub same_error_count: u32, + /// Threshold for opening circuit due to same error (default: 5). + pub same_error_threshold: u32, + /// Last error message seen. + pub last_error: Option<String>, + /// Total number of loop iterations. + pub iteration_count: u32, + /// Maximum allowed iterations (default: 10). + pub max_iterations: u32, + /// Whether the circuit is open (task should stop). + pub is_open: bool, + /// Reason why circuit was opened. + pub open_reason: Option<String>, +} + +impl CircuitBreaker { + /// Create a new circuit breaker with default thresholds. + pub fn new() -> Self { + Self { + no_change_threshold: 3, + same_error_threshold: 5, + max_iterations: 10, + ..Default::default() + } + } + + /// Create with custom thresholds. + pub fn with_thresholds(no_change: u32, same_error: u32, max_iterations: u32) -> Self { + Self { + no_change_threshold: no_change, + same_error_threshold: same_error, + max_iterations, + ..Default::default() + } + } + + /// Record a new iteration. Returns true if circuit should remain closed. + pub fn record_iteration(&mut self, had_changes: bool, error: Option<&str>) -> bool { + self.iteration_count += 1; + + // Check max iterations + if self.iteration_count >= self.max_iterations { + self.is_open = true; + self.open_reason = Some(format!( + "Maximum iterations ({}) reached", + self.max_iterations + )); + return false; + } + + // Track file changes + if had_changes { + self.runs_without_changes = 0; + } else { + self.runs_without_changes += 1; + if self.runs_without_changes >= self.no_change_threshold { + self.is_open = true; + self.open_reason = Some(format!( + "No file changes for {} consecutive runs", + self.runs_without_changes + )); + return false; + } + } + + // Track errors + match (error, &self.last_error) { + (Some(err), Some(last)) if err == last => { + self.same_error_count += 1; + if self.same_error_count >= self.same_error_threshold { + self.is_open = true; + self.open_reason = Some(format!( + "Same error repeated {} times: {}", + self.same_error_count, err + )); + return false; + } + } + (Some(err), _) => { + self.last_error = Some(err.to_string()); + self.same_error_count = 1; + } + (None, _) => { + self.same_error_count = 0; + self.last_error = None; + } + } + + true // Circuit remains closed + } + + /// Check if the circuit breaker is open. + pub fn should_stop(&self) -> bool { + self.is_open + } + + /// Reset the circuit breaker. + pub fn reset(&mut self) { + *self = Self::with_thresholds( + self.no_change_threshold, + self.same_error_threshold, + self.max_iterations, + ); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_yaml_format() { + let text = r#" +Some output before +<COMPLETION_GATE> +ready: true +reason: "All tests pass" +progress: "Implemented feature X" +</COMPLETION_GATE> +More output after +"#; + + let gate = CompletionGate::parse(text).unwrap(); + assert!(gate.ready); + assert_eq!(gate.reason.as_deref(), Some("All tests pass")); + assert_eq!(gate.progress.as_deref(), Some("Implemented feature X")); + } + + #[test] + fn test_parse_not_ready() { + let text = r#" +<COMPLETION_GATE> +ready: false +reason: "Tests are failing" +blockers: ["Fix test_foo", "Fix test_bar"] +</COMPLETION_GATE> +"#; + + let gate = CompletionGate::parse(text).unwrap(); + assert!(!gate.ready); + assert_eq!(gate.reason.as_deref(), Some("Tests are failing")); + assert_eq!( + gate.blockers, + Some(vec!["Fix test_foo".to_string(), "Fix test_bar".to_string()]) + ); + } + + #[test] + fn test_parse_json_format() { + let text = r#" +<COMPLETION_GATE> +{ + "ready": true, + "reason": "Done", + "progress": "All good" +} +</COMPLETION_GATE> +"#; + + let gate = CompletionGate::parse(text).unwrap(); + assert!(gate.ready); + assert_eq!(gate.reason.as_deref(), Some("Done")); + } + + #[test] + fn test_parse_last_gate() { + let text = r#" +<COMPLETION_GATE> +ready: false +reason: "Still working" +</COMPLETION_GATE> +Some more work... +<COMPLETION_GATE> +ready: true +reason: "Finally done" +</COMPLETION_GATE> +"#; + + let gate = CompletionGate::parse_last(text).unwrap(); + assert!(gate.ready); + assert_eq!(gate.reason.as_deref(), Some("Finally done")); + } + + #[test] + fn test_no_gate() { + let text = "No completion gate here"; + assert!(CompletionGate::parse(text).is_none()); + } + + #[test] + fn test_circuit_breaker_max_iterations() { + let mut cb = CircuitBreaker::with_thresholds(3, 5, 5); + for _ in 0..4 { + assert!(cb.record_iteration(true, None)); + } + assert!(!cb.record_iteration(true, None)); // 5th iteration should trip + assert!(cb.is_open); + assert!(cb.open_reason.as_ref().unwrap().contains("Maximum iterations")); + } + + #[test] + fn test_circuit_breaker_no_changes() { + let mut cb = CircuitBreaker::with_thresholds(3, 5, 10); + assert!(cb.record_iteration(false, None)); // 1st no change + assert!(cb.record_iteration(false, None)); // 2nd no change + assert!(!cb.record_iteration(false, None)); // 3rd no change - trips + assert!(cb.is_open); + assert!(cb.open_reason.as_ref().unwrap().contains("No file changes")); + } + + #[test] + fn test_circuit_breaker_same_error() { + let mut cb = CircuitBreaker::with_thresholds(10, 3, 10); + let err = "Test failed"; + assert!(cb.record_iteration(true, Some(err))); + assert!(cb.record_iteration(true, Some(err))); + assert!(!cb.record_iteration(true, Some(err))); // 3rd same error - trips + assert!(cb.is_open); + assert!(cb.open_reason.as_ref().unwrap().contains("Same error")); + } + + #[test] + fn test_circuit_breaker_different_errors_ok() { + let mut cb = CircuitBreaker::with_thresholds(10, 3, 10); + assert!(cb.record_iteration(true, Some("error 1"))); + assert!(cb.record_iteration(true, Some("error 2"))); + assert!(cb.record_iteration(true, Some("error 3"))); + // Different errors don't trip the circuit + assert!(!cb.is_open); + } + + #[test] + fn test_circuit_breaker_changes_reset() { + let mut cb = CircuitBreaker::with_thresholds(3, 5, 10); + assert!(cb.record_iteration(false, None)); // 1 no change + assert!(cb.record_iteration(false, None)); // 2 no changes + assert!(cb.record_iteration(true, None)); // has changes - resets + assert!(cb.record_iteration(false, None)); // 1 no change again + assert!(cb.record_iteration(false, None)); // 2 no changes + // Still shouldn't trip because we had a change in between + assert!(!cb.is_open); + } +} diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs index 4ccedb2..75c884b 100644 --- a/makima/src/daemon/task/manager.rs +++ b/makima/src/daemon/task/manager.rs @@ -12,6 +12,7 @@ use uuid::Uuid; use std::collections::HashSet; +use super::completion_gate::{CircuitBreaker, CompletionGate}; use super::state::TaskState; use crate::daemon::error::{DaemonError, TaskError, TaskResult}; use crate::daemon::process::{ClaudeInputMessage, ProcessManager}; @@ -944,6 +945,8 @@ pub struct ManagedTask { pub copy_files: Option<Vec<String>>, /// Contract ID if this task is associated with a contract. pub contract_id: Option<Uuid>, + /// Whether to run in autonomous loop mode. + pub autonomous_loop: bool, /// Time task was created. pub created_at: Instant, /// Time task started running. @@ -973,6 +976,8 @@ pub struct TaskConfig { pub enable_permissions: bool, /// Disable verbose output. pub disable_verbose: bool, + /// Bubblewrap sandbox configuration. + pub bubblewrap: Option<crate::daemon::config::BubblewrapConfig>, } impl Default for TaskConfig { @@ -986,6 +991,7 @@ impl Default for TaskConfig { claude_pre_args: Vec::new(), enable_permissions: false, disable_verbose: false, + bubblewrap: None, } } } @@ -1027,7 +1033,8 @@ impl TaskManager { .with_pre_args(config.claude_pre_args.clone()) .with_permissions_enabled(config.enable_permissions) .with_verbose_disabled(config.disable_verbose) - .with_env(config.env_vars.clone()), + .with_env(config.env_vars.clone()) + .with_bubblewrap(config.bubblewrap.clone()), ); let temp_manager = Arc::new(TempManager::new()); @@ -1150,6 +1157,7 @@ impl TaskManager { copy_files, contract_id, is_supervisor, + autonomous_loop, } => { tracing::info!( task_id = %task_id, @@ -1161,6 +1169,7 @@ impl TaskManager { depth = depth, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, + autonomous_loop = autonomous_loop, target_repo_path = ?target_repo_path, completion_action = ?completion_action, continue_from_task_id = ?continue_from_task_id, @@ -1173,7 +1182,7 @@ impl TaskManager { task_id, task_name, plan, repo_url, base_branch, target_branch, parent_task_id, depth, is_orchestrator, is_supervisor, target_repo_path, completion_action, continue_from_task_id, - copy_files, contract_id + copy_files, contract_id, autonomous_loop ).await?; } DaemonCommand::PauseTask { task_id } => { @@ -1252,6 +1261,7 @@ impl TaskManager { None, // continue_from_task_id None, // copy_files contract_id, + false, // autonomous_loop - supervisors don't use this ).await { tracing::error!( task_id = %task_id, @@ -1421,6 +1431,17 @@ impl TaskManager { tracing::info!(task_id = %task_id, "Getting task diff"); self.handle_get_task_diff(task_id).await?; } + DaemonCommand::CleanupWorktree { + task_id, + delete_branch, + } => { + tracing::info!( + task_id = %task_id, + delete_branch = delete_branch, + "Cleaning up worktree" + ); + self.handle_cleanup_worktree(task_id, delete_branch).await?; + } } Ok(()) } @@ -1444,6 +1465,7 @@ impl TaskManager { continue_from_task_id: Option<Uuid>, copy_files: Option<Vec<String>>, contract_id: Option<Uuid>, + autonomous_loop: bool, ) -> TaskResult<()> { tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, "=== SPAWN_TASK START ==="); @@ -1496,6 +1518,7 @@ impl TaskManager { continue_from_task_id, copy_files: copy_files.clone(), contract_id, + autonomous_loop, created_at: Instant::now(), started_at: None, completed_at: None, @@ -1519,7 +1542,7 @@ impl TaskManager { if let Err(e) = inner.run_task( task_id, task_name, plan, repo_url, base_branch, target_branch, is_orchestrator, is_supervisor, target_repo_path, completion_action, - continue_from_task_id, copy_files, contract_id + continue_from_task_id, copy_files, contract_id, autonomous_loop ).await { tracing::error!(task_id = %task_id, error = %e, "Task execution failed"); inner.mark_failed(task_id, &e.to_string()).await; @@ -2046,6 +2069,76 @@ impl TaskManager { Ok(()) } + /// Handle CleanupWorktree command. + /// + /// Removes a task's worktree and optionally its branch. + /// Used when a contract is completed or deleted to clean up associated task worktrees. + async fn handle_cleanup_worktree( + &self, + task_id: Uuid, + delete_branch: bool, + ) -> Result<(), DaemonError> { + // Try to get the worktree path, but don't fail if not found + let worktree_result = self.get_task_worktree_path(task_id).await; + + let (success, message) = match worktree_result { + Ok(worktree_path) => { + // Remove the worktree + match self.worktree_manager.remove_worktree(&worktree_path, delete_branch).await { + Ok(()) => { + tracing::info!( + task_id = %task_id, + worktree_path = %worktree_path.display(), + delete_branch = delete_branch, + "Worktree cleaned up successfully" + ); + + // Also remove task from in-memory tracking + self.tasks.write().await.remove(&task_id); + self.task_inputs.write().await.remove(&task_id); + self.merge_trackers.write().await.remove(&task_id); + self.active_pids.write().await.remove(&task_id); + + (true, format!("Worktree cleaned up: {}", worktree_path.display())) + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + worktree_path = %worktree_path.display(), + error = %e, + "Failed to remove worktree" + ); + (false, format!("Failed to remove worktree: {}", e)) + } + } + } + Err(_) => { + // Worktree not found - this is OK, it may have already been cleaned up + tracing::debug!( + task_id = %task_id, + "No worktree found for task, may have already been cleaned up" + ); + + // Still remove from in-memory tracking + self.tasks.write().await.remove(&task_id); + self.task_inputs.write().await.remove(&task_id); + self.merge_trackers.write().await.remove(&task_id); + self.active_pids.write().await.remove(&task_id); + + (true, "No worktree found, task tracking cleaned up".to_string()) + } + }; + + // Send result back to server + let msg = DaemonMessage::CleanupWorktreeResult { + task_id, + success, + message, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + /// Handle ReadRepoFile command. /// /// Reads a file from a repository on the daemon's filesystem and sends @@ -2436,6 +2529,7 @@ impl TaskManagerInner { continue_from_task_id: Option<Uuid>, copy_files: Option<Vec<String>>, contract_id: Option<Uuid>, + autonomous_loop: bool, ) -> Result<(), DaemonError> { tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, "=== RUN_TASK START ==="); @@ -2908,6 +3002,9 @@ impl TaskManagerInner { ); let _ = self.ws_tx.send(msg).await; + // Clone extra_env for use in autonomous loop iterations + let extra_env_for_loop = extra_env.clone(); + tracing::debug!(task_id = %task_id, has_system_prompt = system_prompt.is_some(), "Calling process_manager.spawn()..."); let mut process = self.process_manager .spawn_with_system_prompt(&working_dir, &full_plan, extra_env, system_prompt.as_deref()) @@ -2934,7 +3031,7 @@ impl TaskManagerInner { // Get stdin handle for input forwarding and completion signaling let stdin_handle = process.stdin_handle(); - let stdin_handle_for_completion = stdin_handle.clone(); + let mut stdin_handle_for_completion = stdin_handle.clone(); tracing::info!(task_id = %task_id, "Setting up stdin forwarder for task input (JSON protocol)"); tokio::spawn(async move { @@ -2998,142 +3095,311 @@ impl TaskManagerInner { let daemon_hostname = hostname::get().ok().and_then(|h| h.into_string().ok()); let mut auth_error_handled = false; - let mut output_count = 0u64; - let mut output_bytes = 0usize; - let startup_timeout = tokio::time::Duration::from_secs(30); - let mut startup_check = tokio::time::interval(tokio::time::Duration::from_secs(5)); - startup_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - let startup_deadline = tokio::time::Instant::now() + startup_timeout; + // For autonomous loop mode: track accumulated output for COMPLETION_GATE detection + let mut accumulated_output = String::new(); + let mut circuit_breaker = CircuitBreaker::new(); + let mut iteration_count = 0u32; + let mut final_exit_code: i64 = -1; // Track the final exit code across iterations - loop { - tokio::select! { - maybe_line = process.next_output() => { - match maybe_line { - Some(line) => { - output_count += 1; - output_bytes += line.content.len(); - - if output_count == 1 { - tracing::info!(task_id = %task_id, "Received first output line from Claude"); - } - if output_count % 100 == 0 { - tracing::debug!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output progress"); - } + // Autonomous loop: we may run multiple iterations + 'autonomous_loop: loop { + iteration_count += 1; - // Log output details for debugging - tracing::trace!( - task_id = %task_id, - line_num = output_count, - content_len = line.content.len(), - is_stdout = line.is_stdout, - json_type = ?line.json_type, - "Forwarding output to WebSocket" - ); + if autonomous_loop && iteration_count > 1 { + tracing::info!( + task_id = %task_id, + iteration = iteration_count, + "Starting autonomous loop iteration" + ); + let msg = DaemonMessage::task_output( + task_id, + format!("\n[Autonomous Loop] Starting iteration {} (--continue mode)\n", iteration_count), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // For subsequent iterations, spawn with --continue flag + let continuation_prompt = "Continue working on the task. Review your previous output and progress. When you are completely done, output a COMPLETION_GATE block with ready: true."; + + process = self.process_manager + .spawn_continue(&working_dir, continuation_prompt, extra_env_for_loop.clone(), system_prompt.as_deref()) + .await + .map_err(|e| { + tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process for continuation"); + DaemonError::Task(TaskError::SetupFailed(e.to_string())) + })?; + + // Register the new process PID + if let Some(pid) = process.id() { + self.active_pids.write().await.insert(task_id, pid); + tracing::info!(task_id = %task_id, pid = pid, iteration = iteration_count, "Claude continue process spawned"); + } + + // Reset stdin handle for the new process + stdin_handle_for_completion = process.stdin_handle(); + } + + // Clear output for this iteration (we'll check for COMPLETION_GATE in the new output) + let mut iteration_output = String::new(); + + let mut output_count = 0u64; + let mut output_bytes = 0usize; + let startup_timeout = tokio::time::Duration::from_secs(30); + let mut startup_check = tokio::time::interval(tokio::time::Duration::from_secs(5)); + startup_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let startup_deadline = tokio::time::Instant::now() + startup_timeout; - // Check if this is a "result" message indicating task completion - // With --input-format=stream-json, Claude waits for more input after completion - // We close stdin to signal EOF and let the process exit - if line.json_type.as_deref() == Some("result") { - tracing::info!(task_id = %task_id, "Received result message, closing stdin to signal completion"); - let mut stdin_guard = stdin_handle_for_completion.lock().await; - if let Some(mut stdin) = stdin_guard.take() { - let _ = stdin.shutdown().await; + loop { + tokio::select! { + maybe_line = process.next_output() => { + match maybe_line { + Some(line) => { + output_count += 1; + output_bytes += line.content.len(); + + // Accumulate output for COMPLETION_GATE detection in autonomous loop mode + if autonomous_loop { + iteration_output.push_str(&line.content); + iteration_output.push('\n'); } - } - // Check for OAuth auth error before sending output - let content_for_auth_check = line.content.clone(); - let json_type_for_auth_check = line.json_type.clone(); - let is_stdout_for_auth_check = line.is_stdout; + if output_count == 1 { + tracing::info!(task_id = %task_id, "Received first output line from Claude"); + } + if output_count % 100 == 0 { + tracing::debug!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output progress"); + } - let msg = DaemonMessage::task_output(task_id, line.content, false); - if ws_tx.send(msg).await.is_err() { - tracing::warn!(task_id = %task_id, "Failed to send output, channel closed"); - break; - } + // Log output details for debugging + tracing::trace!( + task_id = %task_id, + line_num = output_count, + content_len = line.content.len(), + is_stdout = line.is_stdout, + json_type = ?line.json_type, + "Forwarding output to WebSocket" + ); - // Detect OAuth token expiration and trigger remote login flow - if !auth_error_handled && is_oauth_auth_error(&content_for_auth_check, json_type_for_auth_check.as_deref(), is_stdout_for_auth_check) { - auth_error_handled = true; - tracing::warn!(task_id = %task_id, "OAuth authentication error detected, initiating remote login flow"); - - // Spawn claude setup-token to get login URL - if let Some(login_url) = get_oauth_login_url(&claude_command).await { - tracing::info!(task_id = %task_id, login_url = %login_url, "Got OAuth login URL"); - let auth_msg = DaemonMessage::AuthenticationRequired { - task_id: Some(task_id), - login_url, - hostname: daemon_hostname.clone(), - }; - if ws_tx.send(auth_msg).await.is_err() { - tracing::warn!(task_id = %task_id, "Failed to send auth required message"); + // Check if this is a "result" message indicating task completion + // With --input-format=stream-json, Claude waits for more input after completion + // We close stdin to signal EOF and let the process exit + if line.json_type.as_deref() == Some("result") { + tracing::info!(task_id = %task_id, "Received result message, closing stdin to signal completion"); + let mut stdin_guard = stdin_handle_for_completion.lock().await; + if let Some(mut stdin) = stdin_guard.take() { + let _ = stdin.shutdown().await; + } + } + + // Check for OAuth auth error before sending output + let content_for_auth_check = line.content.clone(); + let json_type_for_auth_check = line.json_type.clone(); + let is_stdout_for_auth_check = line.is_stdout; + + let msg = DaemonMessage::task_output(task_id, line.content, false); + if ws_tx.send(msg).await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to send output, channel closed"); + break; + } + + // Detect OAuth token expiration and trigger remote login flow + if !auth_error_handled && is_oauth_auth_error(&content_for_auth_check, json_type_for_auth_check.as_deref(), is_stdout_for_auth_check) { + auth_error_handled = true; + tracing::warn!(task_id = %task_id, "OAuth authentication error detected, initiating remote login flow"); + + // Spawn claude setup-token to get login URL + if let Some(login_url) = get_oauth_login_url(&claude_command).await { + tracing::info!(task_id = %task_id, login_url = %login_url, "Got OAuth login URL"); + let auth_msg = DaemonMessage::AuthenticationRequired { + task_id: Some(task_id), + login_url, + hostname: daemon_hostname.clone(), + }; + if ws_tx.send(auth_msg).await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to send auth required message"); + } + } else { + tracing::error!(task_id = %task_id, "Failed to get OAuth login URL from setup-token"); + let fallback_msg = DaemonMessage::task_output( + task_id, + format!("Authentication required on daemon{}. Please run 'claude /login' on the daemon machine.\n", + daemon_hostname.as_ref().map(|h| format!(" ({})", h)).unwrap_or_default()), + false, + ); + let _ = ws_tx.send(fallback_msg).await; } - } else { - tracing::error!(task_id = %task_id, "Failed to get OAuth login URL from setup-token"); - let fallback_msg = DaemonMessage::task_output( - task_id, - format!("Authentication required on daemon{}. Please run 'claude /login' on the daemon machine.\n", - daemon_hostname.as_ref().map(|h| format!(" ({})", h)).unwrap_or_default()), - false, - ); - let _ = ws_tx.send(fallback_msg).await; } } - } - None => { - tracing::info!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output stream ended"); - break; + None => { + tracing::info!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output stream ended"); + break; + } } } - } - _ = startup_check.tick(), if output_count == 0 => { - // Check if process is still alive - match process.try_wait() { - Ok(Some(exit_code)) => { - tracing::error!(task_id = %task_id, exit_code = exit_code, "Claude process exited before producing output!"); - let msg = DaemonMessage::task_output( - task_id, - format!("Error: Claude process exited unexpectedly with code {}\n", exit_code), - false, - ); - let _ = ws_tx.send(msg).await; - break; - } - Ok(None) => { - // Still running but no output - if tokio::time::Instant::now() > startup_deadline { - tracing::warn!(task_id = %task_id, "Claude process not producing output after 30s - may be stuck"); + _ = startup_check.tick(), if output_count == 0 => { + // Check if process is still alive + match process.try_wait() { + Ok(Some(exit_code)) => { + tracing::error!(task_id = %task_id, exit_code = exit_code, "Claude process exited before producing output!"); let msg = DaemonMessage::task_output( task_id, - "Warning: Claude Code is taking longer than expected to start. It may be waiting for authentication or network access.\n".to_string(), + format!("Error: Claude process exited unexpectedly with code {}\n", exit_code), false, ); let _ = ws_tx.send(msg).await; - } else { - tracing::debug!(task_id = %task_id, "Claude process still running, waiting for output..."); + break; + } + Ok(None) => { + // Still running but no output + if tokio::time::Instant::now() > startup_deadline { + tracing::warn!(task_id = %task_id, "Claude process not producing output after 30s - may be stuck"); + let msg = DaemonMessage::task_output( + task_id, + "Warning: Claude Code is taking longer than expected to start. It may be waiting for authentication or network access.\n".to_string(), + false, + ); + let _ = ws_tx.send(msg).await; + } else { + tracing::debug!(task_id = %task_id, "Claude process still running, waiting for output..."); + } + } + Err(e) => { + tracing::error!(task_id = %task_id, error = %e, "Failed to check Claude process status"); } - } - Err(e) => { - tracing::error!(task_id = %task_id, error = %e, "Failed to check Claude process status"); } } } } - } - // Wait for process to exit - let exit_code = process.wait().await.unwrap_or(-1); + // Wait for process to exit + let exit_code = process.wait().await.unwrap_or(-1); + final_exit_code = exit_code; // Store for use after the loop + + // Unregister the process PID (process has exited) + self.active_pids.write().await.remove(&task_id); + tracing::debug!(task_id = %task_id, "Unregistered process PID"); + + // Clean up input channel for this task + self.task_inputs.write().await.remove(&task_id); + tracing::debug!(task_id = %task_id, "Removed task input channel"); - // Unregister the process PID (process has exited) - self.active_pids.write().await.remove(&task_id); - tracing::debug!(task_id = %task_id, "Unregistered process PID"); + // Accumulate this iteration's output + accumulated_output.push_str(&iteration_output); - // Clean up input channel for this task - self.task_inputs.write().await.remove(&task_id); - tracing::debug!(task_id = %task_id, "Removed task input channel"); + // === AUTONOMOUS LOOP LOGIC === + // Check if we should continue or complete + if autonomous_loop && exit_code == 0 { + // Check for COMPLETION_GATE in the output + let completion_gate = CompletionGate::parse_last(&iteration_output); + + match completion_gate { + Some(gate) if gate.ready => { + tracing::info!( + task_id = %task_id, + iteration = iteration_count, + reason = ?gate.reason, + "COMPLETION_GATE ready=true detected, task complete" + ); + let msg = DaemonMessage::task_output( + task_id, + format!("\n[Autonomous Loop] Task completed after {} iteration(s). Reason: {}\n", + iteration_count, + gate.reason.unwrap_or_else(|| "Task complete".to_string()) + ), + false, + ); + let _ = self.ws_tx.send(msg).await; + break 'autonomous_loop; + } + Some(gate) => { + // COMPLETION_GATE found but not ready + tracing::info!( + task_id = %task_id, + iteration = iteration_count, + reason = ?gate.reason, + blockers = ?gate.blockers, + "COMPLETION_GATE ready=false, will continue" + ); + + // Check circuit breaker + // For now, we consider output_bytes > 0 as "progress" + let had_progress = output_bytes > 0; + let error = gate.blockers.as_ref().and_then(|b| b.first()).map(|s| s.as_str()); + + if !circuit_breaker.record_iteration(had_progress, error) { + // Circuit breaker tripped + tracing::warn!( + task_id = %task_id, + reason = ?circuit_breaker.open_reason, + "Circuit breaker tripped, stopping autonomous loop" + ); + let msg = DaemonMessage::task_output( + task_id, + format!("\n[Autonomous Loop] Circuit breaker tripped: {}\n", + circuit_breaker.open_reason.as_deref().unwrap_or("Unknown reason") + ), + false, + ); + let _ = self.ws_tx.send(msg).await; + break 'autonomous_loop; + } + + let msg = DaemonMessage::task_output( + task_id, + format!("\n[Autonomous Loop] COMPLETION_GATE ready=false. Reason: {}. Restarting...\n", + gate.reason.unwrap_or_else(|| "Not complete".to_string()) + ), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // Continue to next iteration + continue 'autonomous_loop; + } + None => { + // No COMPLETION_GATE found - check circuit breaker and continue + tracing::info!( + task_id = %task_id, + iteration = iteration_count, + "No COMPLETION_GATE found, will restart with continuation prompt" + ); + + let had_progress = output_bytes > 0; + if !circuit_breaker.record_iteration(had_progress, None) { + tracing::warn!( + task_id = %task_id, + reason = ?circuit_breaker.open_reason, + "Circuit breaker tripped (no COMPLETION_GATE), stopping" + ); + let msg = DaemonMessage::task_output( + task_id, + format!("\n[Autonomous Loop] Circuit breaker tripped: {}\n", + circuit_breaker.open_reason.as_deref().unwrap_or("Unknown reason") + ), + false, + ); + let _ = self.ws_tx.send(msg).await; + break 'autonomous_loop; + } + + let msg = DaemonMessage::task_output( + task_id, + "\n[Autonomous Loop] No COMPLETION_GATE found. Restarting with --continue...\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + + continue 'autonomous_loop; + } + } + } else { + // Not in autonomous loop mode or process failed - exit normally + break 'autonomous_loop; + } + } // end 'autonomous_loop // Update state based on exit code - let success = exit_code == 0; + let success = final_exit_code == 0; let new_state = if success { TaskState::Completed } else { @@ -3142,7 +3408,7 @@ impl TaskManagerInner { tracing::info!( task_id = %task_id, - exit_code = exit_code, + exit_code = final_exit_code, success = success, new_state = ?new_state, "Claude process exited, updating task state" @@ -3154,7 +3420,7 @@ impl TaskManagerInner { task.state = new_state; task.completed_at = Some(Instant::now()); if !success { - task.error = Some(format!("Process exited with code {}", exit_code)); + task.error = Some(format!("Process exited with code {}", final_exit_code)); } } } @@ -3196,7 +3462,7 @@ impl TaskManagerInner { if is_supervisor { tracing::info!( task_id = %task_id, - exit_code = exit_code, + exit_code = final_exit_code, "Supervisor Claude process exited - NOT marking as complete" ); // Update local state to reflect it's paused/waiting for input @@ -3218,7 +3484,7 @@ impl TaskManagerInner { let error = if success { None } else { - Some(format!("Exit code: {}", exit_code)) + Some(format!("Exit code: {}", final_exit_code)) }; tracing::info!(task_id = %task_id, success = success, "Notifying server of task completion"); let msg = DaemonMessage::task_complete(task_id, success, error); diff --git a/makima/src/daemon/task/mod.rs b/makima/src/daemon/task/mod.rs index 29c261e..3830e1d 100644 --- a/makima/src/daemon/task/mod.rs +++ b/makima/src/daemon/task/mod.rs @@ -1,7 +1,9 @@ //! Task management and execution. +pub mod completion_gate; pub mod manager; pub mod state; +pub use completion_gate::CompletionGate; pub use manager::{ManagedTask, TaskConfig, TaskManager}; pub use state::TaskState; diff --git a/makima/src/daemon/task/state.rs b/makima/src/daemon/task/state.rs index ca5fc01..7b59b62 100644 --- a/makima/src/daemon/task/state.rs +++ b/makima/src/daemon/task/state.rs @@ -124,7 +124,9 @@ impl Default for TaskState { #[cfg(test)] mod tests { + #[allow(unused_imports)] use crate::daemon::*; + use super::TaskState; #[test] fn test_valid_transitions() { diff --git a/makima/src/daemon/ws/protocol.rs b/makima/src/daemon/ws/protocol.rs index e86a577..714c0f9 100644 --- a/makima/src/daemon/ws/protocol.rs +++ b/makima/src/daemon/ws/protocol.rs @@ -250,6 +250,14 @@ pub enum DaemonMessage { diff: Option<String>, error: Option<String>, }, + + /// Response to CleanupWorktree command. + CleanupWorktreeResult { + #[serde(rename = "taskId")] + task_id: Uuid, + success: bool, + message: String, + }, } /// Information about a branch (used in BranchList message). @@ -323,6 +331,11 @@ pub enum DaemonCommand { /// Whether this task is a supervisor (long-running contract orchestrator). #[serde(rename = "isSupervisor", default)] is_supervisor: bool, + /// Whether to run in autonomous loop mode. + /// When enabled, task will automatically restart with --continue if it exits + /// without a COMPLETION_GATE indicating ready: true. + #[serde(rename = "autonomousLoop", default)] + autonomous_loop: bool, }, /// Pause a running task. @@ -530,6 +543,15 @@ pub enum DaemonCommand { task_id: Uuid, }, + /// Clean up a task's worktree (used when contract is completed/deleted). + CleanupWorktree { + #[serde(rename = "taskId")] + task_id: Uuid, + /// Whether to delete the associated branch. + #[serde(rename = "deleteBranch")] + delete_branch: bool, + }, + /// Error response. Error { code: String, |
