summaryrefslogtreecommitdiff
path: root/makima/src
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-15 11:57:43 +0000
committersoryu <soryu@soryu.co>2026-01-15 17:12:04 +0000
commit3efdab36ca61a6795454668881d5b925abe22bd3 (patch)
tree0fd96e527f45a3da31dfc073b07cd55ba284e550 /makima/src
parent63b2e347b2ecadc6a48062e10e0a7e19b6102631 (diff)
downloadsoryu-3efdab36ca61a6795454668881d5b925abe22bd3.tar.gz
soryu-3efdab36ca61a6795454668881d5b925abe22bd3.zip
Fixup: Add cleanup and isolation features to makima
Add comprehensive CLI documentation - Create makima/docs/CLI.md with complete command reference for: - makima server: HTTP/WebSocket server options - makima daemon: Worker daemon configuration - makima supervisor: Contract orchestration commands - makima contract: Task-contract interaction commands - Include configuration file examples and environment variables - Add usage workflows for common scenarios - Update makima/README.md with CLI overview and link to docs Add GitHub Actions release workflow for v0.1.0 Creates automated release workflow that: - Triggers on v* tag pushes - Builds binaries for Linux x86_64, macOS x86_64, and macOS ARM64 - Uses Rust nightly toolchain (required for edition 2024) - Packages binaries as .tar.gz archives - Creates GitHub release with installation instructions fix(ci): update macOS runner for x86_64 builds Replace deprecated macos-13 runner with macos-15-intel for x86_64-apple-darwin target. The macos-13 runner has been retired by GitHub Actions. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Add dismissing notifications and fix CLI task ID arg Add worktree cleanup when contracts complete or are deleted (#21) - Add CleanupWorktree daemon command variant - Handle CleanupWorktree in daemon task manager - Add cleanup_contract_worktrees helper function - Trigger cleanup when contract status becomes 'completed' - Trigger cleanup before contract deletion Add Autonomous Loop Mode for persistent task completion (#20) Implements the "Autonomous Loop Mode" feature inspired by Ralph for Claude Code. This enables tasks to automatically restart and continue working until they explicitly signal completion via a COMPLETION_GATE block. Key features: - Exit confirmation via COMPLETION_GATE: Tasks must output a <COMPLETION_GATE> block with `ready: true` to signal completion. Without this, the task auto-restarts using `claude --continue` to resume the conversation. - Circuit breaker: Prevents infinite loops by detecting: * Maximum iteration limit (default: 10) * No progress for N consecutive iterations (default: 3) * Same error repeated N times (default: 5) - spawn_continue: New ProcessManager method to spawn Claude with the `--continue` flag, resuming from the previous session state. Toggle: Enable via `autonomous_loop` flag on contracts. When set, all tasks spawned for that contract will run in autonomous loop mode. Files changed: - completion_gate.rs: COMPLETION_GATE parser and CircuitBreaker logic - claude.rs: spawn_continue() for --continue mode spawning - manager.rs: Autonomous loop iteration logic in run_task() - protocol.rs: autonomousLoop field in DaemonCommand::SpawnTask - models.rs/repository.rs: autonomous_loop column on contracts/tasks - Migration: Adds autonomous_loop columns to contracts and tasks tables Add get-task and output commands to supervisor CLI (#24) Add two new supervisor subcommands: - `makima supervisor task <task_id>` - Get individual task details - `makima supervisor output <task_id>` - Get task output/claude log This allows supervisors to fetch task details and claude output directly from the CLI instead of using curl to call the task API. Add optional bubblewrap sandboxing for Claude processes (#23) Add --bubblewrap flag and process.bubblewrap config section to enable running Claude Code in a bubblewrap sandbox for process isolation. When enabled, claude processes run with filesystem restrictions: - Root filesystem mounted read-only - Working directory (worktree) mounted read-write - Fresh /dev, /proc, /tmp - Network access preserved for API calls Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Diffstat (limited to 'makima/src')
-rw-r--r--makima/src/bin/makima.rs21
-rw-r--r--makima/src/daemon/api/supervisor.rs11
-rw-r--r--makima/src/daemon/cli/daemon.rs5
-rw-r--r--makima/src/daemon/cli/mod.rs6
-rw-r--r--makima/src/daemon/cli/supervisor.rs26
-rw-r--r--makima/src/daemon/config.rs43
-rw-r--r--makima/src/daemon/process/claude.rs373
-rw-r--r--makima/src/daemon/task/completion_gate.rs402
-rw-r--r--makima/src/daemon/task/manager.rs500
-rw-r--r--makima/src/daemon/task/mod.rs2
-rw-r--r--makima/src/daemon/task/state.rs2
-rw-r--r--makima/src/daemon/ws/protocol.rs22
-rw-r--r--makima/src/db/models.rs13
-rw-r--r--makima/src/db/repository.rs42
-rw-r--r--makima/src/server/handlers/contract_chat.rs1
-rw-r--r--makima/src/server/handlers/contracts.rs116
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs39
-rw-r--r--makima/src/server/handlers/transcript_analysis.rs1
-rw-r--r--makima/src/server/state.rs9
19 files changed, 1491 insertions, 143 deletions
diff --git a/makima/src/bin/makima.rs b/makima/src/bin/makima.rs
index 4fc331c..f430701 100644
--- a/makima/src/bin/makima.rs
+++ b/makima/src/bin/makima.rs
@@ -78,6 +78,7 @@ async fn run_daemon(
api_key: args.api_key,
max_tasks: args.max_tasks,
log_level: args.log_level,
+ bubblewrap: args.bubblewrap,
};
// Load configuration with CLI overrides
@@ -162,6 +163,11 @@ async fn run_daemon(
let ws_tx = ws_client.sender();
// Create task configuration
+ let bubblewrap_config = if config.process.bubblewrap.enabled {
+ Some(config.process.bubblewrap.clone())
+ } else {
+ None
+ };
let task_config = TaskConfig {
max_concurrent_tasks: config.process.max_concurrent_tasks,
worktree_base_dir: config.worktree.base_dir.clone(),
@@ -171,6 +177,7 @@ async fn run_daemon(
claude_pre_args: config.process.claude_pre_args.clone(),
enable_permissions: config.process.enable_permissions,
disable_verbose: config.process.disable_verbose,
+ bubblewrap: bubblewrap_config,
};
// Create task manager
@@ -309,7 +316,7 @@ async fn run_supervisor(
let client = ApiClient::new(args.common.api_url, args.common.api_key)?;
let task_id = args
.common
- .task_id
+ .self_task_id
.ok_or("MAKIMA_TASK_ID is required for checkpoint")?;
let result = client
.supervisor_checkpoint(task_id, &args.message)
@@ -318,7 +325,7 @@ async fn run_supervisor(
}
SupervisorCommand::Checkpoints(args) => {
let client = ApiClient::new(args.api_url, args.api_key)?;
- let task_id = args.task_id.ok_or("MAKIMA_TASK_ID is required")?;
+ let task_id = args.self_task_id.ok_or("MAKIMA_TASK_ID is required")?;
let result = client.supervisor_checkpoints(task_id).await?;
println!("{}", serde_json::to_string(&result.0)?);
}
@@ -347,6 +354,16 @@ async fn run_supervisor(
.await?;
println!("{}", serde_json::to_string(&result.0)?);
}
+ SupervisorCommand::Task(args) => {
+ let client = ApiClient::new(args.common.api_url, args.common.api_key)?;
+ let result = client.supervisor_get_task(args.target_task_id).await?;
+ println!("{}", serde_json::to_string(&result.0)?);
+ }
+ SupervisorCommand::Output(args) => {
+ let client = ApiClient::new(args.common.api_url, args.common.api_key)?;
+ let result = client.supervisor_get_task_output(args.target_task_id).await?;
+ println!("{}", serde_json::to_string(&result.0)?);
+ }
}
Ok(())
diff --git a/makima/src/daemon/api/supervisor.rs b/makima/src/daemon/api/supervisor.rs
index 0a68980..8b3d480 100644
--- a/makima/src/daemon/api/supervisor.rs
+++ b/makima/src/daemon/api/supervisor.rs
@@ -228,4 +228,15 @@ impl ApiClient {
self.post(&format!("/api/v1/contracts/{}/phase", contract_id), &req)
.await
}
+
+ /// Get individual task details.
+ pub async fn supervisor_get_task(&self, task_id: Uuid) -> Result<JsonValue, ApiError> {
+ self.get(&format!("/api/v1/mesh/tasks/{}", task_id)).await
+ }
+
+ /// Get task output/claude log.
+ pub async fn supervisor_get_task_output(&self, task_id: Uuid) -> Result<JsonValue, ApiError> {
+ self.get(&format!("/api/v1/mesh/tasks/{}/output", task_id))
+ .await
+ }
}
diff --git a/makima/src/daemon/cli/daemon.rs b/makima/src/daemon/cli/daemon.rs
index de4cff4..c779d64 100644
--- a/makima/src/daemon/cli/daemon.rs
+++ b/makima/src/daemon/cli/daemon.rs
@@ -33,4 +33,9 @@ pub struct DaemonArgs {
/// Log level (trace, debug, info, warn, error)
#[arg(short, long, default_value = "info")]
pub log_level: String,
+
+ /// Enable bubblewrap sandbox for Claude processes.
+ /// Requires bwrap to be installed on the system.
+ #[arg(long, env = "MAKIMA_DAEMON_BUBBLEWRAP")]
+ pub bubblewrap: bool,
}
diff --git a/makima/src/daemon/cli/mod.rs b/makima/src/daemon/cli/mod.rs
index 1a49399..da71b0d 100644
--- a/makima/src/daemon/cli/mod.rs
+++ b/makima/src/daemon/cli/mod.rs
@@ -82,6 +82,12 @@ pub enum SupervisorCommand {
/// Ask a question and wait for user feedback
Ask(supervisor::AskArgs),
+
+ /// Get individual task details
+ Task(supervisor::GetTaskArgs),
+
+ /// Get task output/claude log
+ Output(supervisor::GetTaskOutputArgs),
}
/// Contract subcommands for task-contract interaction.
diff --git a/makima/src/daemon/cli/supervisor.rs b/makima/src/daemon/cli/supervisor.rs
index dc534b5..2bc4c89 100644
--- a/makima/src/daemon/cli/supervisor.rs
+++ b/makima/src/daemon/cli/supervisor.rs
@@ -14,9 +14,9 @@ pub struct SupervisorArgs {
#[arg(long, env = "MAKIMA_API_KEY")]
pub api_key: String,
- /// Current task ID (optional)
+ /// Current task ID (optional) - the supervisor's own task ID
#[arg(long, env = "MAKIMA_TASK_ID")]
- pub task_id: Option<Uuid>,
+ pub self_task_id: Option<Uuid>,
/// Contract ID
#[arg(long, env = "MAKIMA_CONTRACT_ID")]
@@ -199,3 +199,25 @@ pub struct AdvancePhaseArgs {
#[arg(index = 1)]
pub phase: String,
}
+
+/// Arguments for task command (get individual task details).
+#[derive(Args, Debug)]
+pub struct GetTaskArgs {
+ #[command(flatten)]
+ pub common: SupervisorArgs,
+
+ /// Task ID to get details for
+ #[arg(index = 1, id = "target_task_id")]
+ pub target_task_id: Uuid,
+}
+
+/// Arguments for output command (get task output/claude log).
+#[derive(Args, Debug)]
+pub struct GetTaskOutputArgs {
+ #[command(flatten)]
+ pub common: SupervisorArgs,
+
+ /// Task ID to get output for
+ #[arg(index = 1, id = "target_task_id")]
+ pub target_task_id: Uuid,
+}
diff --git a/makima/src/daemon/config.rs b/makima/src/daemon/config.rs
index 866ee70..512b822 100644
--- a/makima/src/daemon/config.rs
+++ b/makima/src/daemon/config.rs
@@ -5,6 +5,38 @@ use serde::Deserialize;
use std::collections::HashMap;
use std::path::PathBuf;
+/// Bubblewrap sandbox configuration for Claude processes.
+#[derive(Debug, Clone, Deserialize, Default)]
+pub struct BubblewrapConfig {
+ /// Enable bubblewrap sandboxing.
+ #[serde(default)]
+ pub enabled: bool,
+
+ /// Path to bwrap binary (default: 'bwrap').
+ #[serde(default = "default_bwrap_command")]
+ pub bwrap_command: String,
+
+ /// Allow network access inside sandbox (default: true).
+ #[serde(default = "default_true")]
+ pub network: bool,
+
+ /// Additional paths to bind read-only.
+ #[serde(default)]
+ pub ro_bind: Vec<PathBuf>,
+
+ /// Additional paths to bind read-write.
+ #[serde(default)]
+ pub rw_bind: Vec<PathBuf>,
+}
+
+fn default_bwrap_command() -> String {
+ "bwrap".to_string()
+}
+
+fn default_true() -> bool {
+ true
+}
+
/// Root daemon configuration.
#[derive(Debug, Clone, Deserialize)]
pub struct DaemonConfig {
@@ -177,6 +209,10 @@ pub struct ProcessConfig {
/// Additional environment variables to pass to Claude Code.
#[serde(default, alias = "envvars")]
pub env_vars: HashMap<String, String>,
+
+ /// Bubblewrap sandbox configuration.
+ #[serde(default)]
+ pub bubblewrap: BubblewrapConfig,
}
fn default_claude_command() -> String {
@@ -198,6 +234,7 @@ impl Default for ProcessConfig {
max_concurrent_tasks: default_max_tasks(),
default_timeout_secs: 0,
env_vars: HashMap::new(),
+ bubblewrap: BubblewrapConfig::default(),
}
}
}
@@ -478,6 +515,11 @@ impl DaemonConfig {
// Log level is always set (has default)
config.logging.level = args.log_level.clone();
+ // Enable bubblewrap if --bubblewrap flag is set
+ if args.bubblewrap {
+ config.process.bubblewrap.enabled = true;
+ }
+
// Validate required fields after all sources are merged
config.validate()?;
@@ -511,6 +553,7 @@ impl DaemonConfig {
max_concurrent_tasks: 2,
default_timeout_secs: 0,
env_vars: HashMap::new(),
+ bubblewrap: BubblewrapConfig::default(),
},
local_db: LocalDbConfig {
path: PathBuf::from("/tmp/makima-daemon-test/state.db"),
diff --git a/makima/src/daemon/process/claude.rs b/makima/src/daemon/process/claude.rs
index 536d883..f3aa421 100644
--- a/makima/src/daemon/process/claude.rs
+++ b/makima/src/daemon/process/claude.rs
@@ -1,7 +1,7 @@
//! Claude Code process management.
use std::collections::HashMap;
-use std::path::Path;
+use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::Arc;
@@ -11,6 +11,7 @@ use tokio::process::{Child, ChildStdin, Command};
use tokio::sync::{mpsc, Mutex};
use super::claude_protocol::ClaudeInputMessage;
+use crate::daemon::config::BubblewrapConfig;
/// Errors that can occur during Claude process management.
#[derive(Debug, thiserror::Error)]
@@ -26,6 +27,9 @@ pub enum ClaudeProcessError {
#[error("Failed to read output: {0}")]
OutputRead(String),
+
+ #[error("Bubblewrap (bwrap) not found. Install bubblewrap or disable the --bubblewrap flag.")]
+ BubblewrapNotFound,
}
/// A line of output from Claude Code.
@@ -234,6 +238,8 @@ pub struct ProcessManager {
disable_verbose: bool,
/// Default environment variables to pass.
default_env: HashMap<String, String>,
+ /// Bubblewrap sandbox configuration.
+ bubblewrap: Option<BubblewrapConfig>,
}
impl Default for ProcessManager {
@@ -252,6 +258,7 @@ impl ProcessManager {
enable_permissions: false,
disable_verbose: false,
default_env: HashMap::new(),
+ bubblewrap: None,
}
}
@@ -264,6 +271,7 @@ impl ProcessManager {
enable_permissions: false,
disable_verbose: false,
default_env: HashMap::new(),
+ bubblewrap: None,
}
}
@@ -297,11 +305,147 @@ impl ProcessManager {
self
}
+ /// Configure bubblewrap sandboxing.
+ ///
+ /// When enabled, Claude processes will be spawned inside a bubblewrap sandbox
+ /// with filesystem isolation.
+ pub fn with_bubblewrap(mut self, config: Option<BubblewrapConfig>) -> Self {
+ self.bubblewrap = config;
+ self
+ }
+
/// Get the claude command path.
pub fn claude_command(&self) -> &str {
&self.claude_command
}
+ /// Check if bubblewrap (bwrap) is available on the system.
+ ///
+ /// Returns the bwrap version string if available.
+ pub async fn check_bwrap_available(&self) -> Result<String, ClaudeProcessError> {
+ let bwrap_cmd = self
+ .bubblewrap
+ .as_ref()
+ .map(|c| c.bwrap_command.as_str())
+ .unwrap_or("bwrap");
+
+ let output = Command::new(bwrap_cmd)
+ .arg("--version")
+ .output()
+ .await
+ .map_err(|e| {
+ if e.kind() == std::io::ErrorKind::NotFound {
+ ClaudeProcessError::BubblewrapNotFound
+ } else {
+ ClaudeProcessError::SpawnFailed(e)
+ }
+ })?;
+
+ if output.status.success() {
+ Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
+ } else {
+ Err(ClaudeProcessError::BubblewrapNotFound)
+ }
+ }
+
+ /// Build bwrap command arguments for sandboxing.
+ ///
+ /// Returns a tuple of (bwrap_command, bwrap_args) where bwrap_args includes
+ /// all the sandbox flags followed by "--" and the actual command to run.
+ fn build_bwrap_args(
+ &self,
+ working_dir: &Path,
+ claude_command: &str,
+ claude_args: &[String],
+ config: &BubblewrapConfig,
+ ) -> (String, Vec<String>) {
+ let mut args = Vec::new();
+
+ // Unshare all namespaces except user (for unprivileged use)
+ args.push("--unshare-all".to_string());
+
+ // Share network if enabled (needed for API calls)
+ if config.network {
+ args.push("--share-net".to_string());
+ }
+
+ // Safety flags
+ args.push("--die-with-parent".to_string());
+ args.push("--new-session".to_string());
+
+ // Bind root filesystem read-only
+ args.push("--ro-bind".to_string());
+ args.push("/".to_string());
+ args.push("/".to_string());
+
+ // Mount fresh /dev
+ args.push("--dev".to_string());
+ args.push("/dev".to_string());
+
+ // Mount fresh /proc
+ args.push("--proc".to_string());
+ args.push("/proc".to_string());
+
+ // Fresh /tmp
+ args.push("--tmpfs".to_string());
+ args.push("/tmp".to_string());
+
+ // Bind working directory (worktree) read-write
+ let working_dir_str = working_dir.to_string_lossy().to_string();
+ args.push("--bind".to_string());
+ args.push(working_dir_str.clone());
+ args.push(working_dir_str);
+
+ // Bind ~/.claude read-write if it exists (for Claude config)
+ if let Ok(home) = std::env::var("HOME") {
+ let claude_config_dir = PathBuf::from(&home).join(".claude");
+ if claude_config_dir.exists() {
+ let claude_config_str = claude_config_dir.to_string_lossy().to_string();
+ args.push("--bind".to_string());
+ args.push(claude_config_str.clone());
+ args.push(claude_config_str);
+ }
+
+ // Also bind ~/.config/claude if it exists (alternative config location)
+ let claude_config_alt = PathBuf::from(&home).join(".config").join("claude");
+ if claude_config_alt.exists() {
+ let config_str = claude_config_alt.to_string_lossy().to_string();
+ args.push("--bind".to_string());
+ args.push(config_str.clone());
+ args.push(config_str);
+ }
+ }
+
+ // Additional read-only binds from config
+ for path in &config.ro_bind {
+ if path.exists() {
+ let path_str = path.to_string_lossy().to_string();
+ args.push("--ro-bind".to_string());
+ args.push(path_str.clone());
+ args.push(path_str);
+ }
+ }
+
+ // Additional read-write binds from config
+ for path in &config.rw_bind {
+ if path.exists() {
+ let path_str = path.to_string_lossy().to_string();
+ args.push("--bind".to_string());
+ args.push(path_str.clone());
+ args.push(path_str);
+ }
+ }
+
+ // Separator before the actual command
+ args.push("--".to_string());
+
+ // Add the claude command and its arguments
+ args.push(claude_command.to_string());
+ args.extend(claude_args.iter().cloned());
+
+ (config.bwrap_command.clone(), args)
+ }
+
/// Spawn a Claude Code process to execute a plan.
///
/// The process runs in the specified working directory with stream-json output format.
@@ -327,11 +471,25 @@ impl ProcessManager {
extra_env: Option<HashMap<String, String>>,
system_prompt: Option<&str>,
) -> Result<ClaudeProcess, ClaudeProcessError> {
+ // Check if bubblewrap is enabled and available
+ let use_bubblewrap = if let Some(ref bwrap_config) = self.bubblewrap {
+ if bwrap_config.enabled {
+ // Verify bwrap is available before proceeding
+ self.check_bwrap_available().await?;
+ true
+ } else {
+ false
+ }
+ } else {
+ false
+ };
+
tracing::info!(
working_dir = %working_dir.display(),
plan_len = plan.len(),
plan_preview = %if plan.len() > 200 { &plan[..200] } else { plan },
has_system_prompt = system_prompt.is_some(),
+ bubblewrap_enabled = use_bubblewrap,
"Spawning Claude Code process"
);
@@ -350,37 +508,52 @@ impl ProcessManager {
env.extend(extra);
}
- // Build arguments list
- let mut args = Vec::new();
+ // Build Claude arguments list
+ let mut claude_args = Vec::new();
// Pre-args (before defaults)
- args.extend(self.claude_pre_args.clone());
+ claude_args.extend(self.claude_pre_args.clone());
// Required arguments for stream-json protocol
- args.push("--output-format=stream-json".to_string());
- args.push("--input-format=stream-json".to_string());
+ claude_args.push("--output-format=stream-json".to_string());
+ claude_args.push("--input-format=stream-json".to_string());
// Optional default arguments
if !self.disable_verbose {
- args.push("--verbose".to_string());
+ claude_args.push("--verbose".to_string());
}
if !self.enable_permissions {
- args.push("--dangerously-skip-permissions".to_string());
+ claude_args.push("--dangerously-skip-permissions".to_string());
}
// System prompt - passed via --system-prompt flag for system-level constraints
if let Some(prompt) = system_prompt {
- args.push("--system-prompt".to_string());
- args.push(prompt.to_string());
+ claude_args.push("--system-prompt".to_string());
+ claude_args.push(prompt.to_string());
}
// Additional user-configured arguments
- args.extend(self.claude_args.clone());
-
- tracing::debug!(args = ?args, "Claude command arguments");
+ claude_args.extend(self.claude_args.clone());
+
+ // Determine the actual command and arguments to spawn
+ let (command, args) = if use_bubblewrap {
+ let bwrap_config = self.bubblewrap.as_ref().unwrap();
+ let (bwrap_cmd, bwrap_args) =
+ self.build_bwrap_args(working_dir, &self.claude_command, &claude_args, bwrap_config);
+ tracing::info!(
+ bwrap_command = %bwrap_cmd,
+ bwrap_args_count = bwrap_args.len(),
+ "Running Claude in bubblewrap sandbox"
+ );
+ tracing::debug!(bwrap_args = ?bwrap_args, "Bubblewrap arguments");
+ (bwrap_cmd, bwrap_args)
+ } else {
+ tracing::debug!(args = ?claude_args, "Claude command arguments");
+ (self.claude_command.clone(), claude_args)
+ };
// Spawn the process
- let mut child = Command::new(&self.claude_command)
+ let mut child = Command::new(&command)
.args(&args)
.current_dir(working_dir)
.envs(env)
@@ -391,7 +564,11 @@ impl ProcessManager {
.spawn()
.map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
- ClaudeProcessError::CommandNotFound(self.claude_command.clone())
+ if use_bubblewrap {
+ ClaudeProcessError::BubblewrapNotFound
+ } else {
+ ClaudeProcessError::CommandNotFound(self.claude_command.clone())
+ }
} else {
ClaudeProcessError::SpawnFailed(e)
}
@@ -487,6 +664,172 @@ impl ProcessManager {
Ok(process)
}
+ /// Spawn a Claude Code process in continuation mode.
+ ///
+ /// This is used for the autonomous loop feature where we need to continue
+ /// a previous conversation. The --continue flag tells Claude to resume
+ /// from the previous session state.
+ pub async fn spawn_continue(
+ &self,
+ working_dir: &Path,
+ continuation_prompt: &str,
+ extra_env: Option<HashMap<String, String>>,
+ system_prompt: Option<&str>,
+ ) -> Result<ClaudeProcess, ClaudeProcessError> {
+ tracing::info!(
+ working_dir = %working_dir.display(),
+ prompt_len = continuation_prompt.len(),
+ has_system_prompt = system_prompt.is_some(),
+ "Spawning Claude Code process in continuation mode"
+ );
+
+ // Verify working directory exists
+ if !working_dir.exists() {
+ tracing::error!(working_dir = %working_dir.display(), "Working directory does not exist!");
+ return Err(ClaudeProcessError::SpawnFailed(std::io::Error::new(
+ std::io::ErrorKind::NotFound,
+ format!("Working directory does not exist: {}", working_dir.display()),
+ )));
+ }
+
+ // Build environment
+ let mut env = self.default_env.clone();
+ if let Some(extra) = extra_env {
+ env.extend(extra);
+ }
+
+ // Build arguments list
+ let mut args = Vec::new();
+
+ // Pre-args (before defaults)
+ args.extend(self.claude_pre_args.clone());
+
+ // Required arguments for stream-json protocol
+ args.push("--output-format=stream-json".to_string());
+ args.push("--input-format=stream-json".to_string());
+
+ // The key flag for continuation mode
+ args.push("--continue".to_string());
+
+ // Optional default arguments
+ if !self.disable_verbose {
+ args.push("--verbose".to_string());
+ }
+ if !self.enable_permissions {
+ args.push("--dangerously-skip-permissions".to_string());
+ }
+
+ // System prompt - passed via --system-prompt flag for system-level constraints
+ if let Some(prompt) = system_prompt {
+ args.push("--system-prompt".to_string());
+ args.push(prompt.to_string());
+ }
+
+ // Additional user-configured arguments
+ args.extend(self.claude_args.clone());
+
+ tracing::debug!(args = ?args, "Claude continue command arguments");
+
+ // Spawn the process
+ let mut child = Command::new(&self.claude_command)
+ .args(&args)
+ .current_dir(working_dir)
+ .envs(env)
+ .stdin(Stdio::piped())
+ .stdout(Stdio::piped())
+ .stderr(Stdio::piped())
+ .kill_on_drop(true)
+ .spawn()
+ .map_err(|e| {
+ if e.kind() == std::io::ErrorKind::NotFound {
+ ClaudeProcessError::CommandNotFound(self.claude_command.clone())
+ } else {
+ ClaudeProcessError::SpawnFailed(e)
+ }
+ })?;
+
+ // Create output channel
+ let (tx, rx) = mpsc::channel(1000);
+
+ // Take stdout, stderr, and stdin
+ let stdin = child.stdin.take();
+ let stdin = Arc::new(Mutex::new(stdin));
+
+ let stdout = child.stdout.take().expect("stdout should be piped");
+ let stderr = child.stderr.take().expect("stderr should be piped");
+
+ // Spawn task to read stdout
+ let tx_stdout = tx.clone();
+ tokio::spawn(async move {
+ use tokio::io::AsyncReadExt;
+ let mut reader = BufReader::new(stdout);
+ let mut buffer = vec![0u8; 4096];
+ let mut line_buffer = String::new();
+
+ loop {
+ match tokio::time::timeout(
+ tokio::time::Duration::from_secs(5),
+ reader.read(&mut buffer)
+ ).await {
+ Ok(Ok(0)) => {
+ tracing::debug!("Claude stdout EOF (continue mode)");
+ if !line_buffer.is_empty() {
+ let _ = tx_stdout.send(OutputLine::stdout(line_buffer)).await;
+ }
+ break;
+ }
+ Ok(Ok(n)) => {
+ let chunk = String::from_utf8_lossy(&buffer[..n]);
+ line_buffer.push_str(&chunk);
+ while let Some(newline_pos) = line_buffer.find('\n') {
+ let line = line_buffer[..newline_pos].to_string();
+ line_buffer = line_buffer[newline_pos + 1..].to_string();
+ if tx_stdout.send(OutputLine::stdout(line)).await.is_err() {
+ return;
+ }
+ }
+ }
+ Ok(Err(e)) => {
+ tracing::error!(error = %e, "Error reading Claude stdout (continue mode)");
+ break;
+ }
+ Err(_) => {
+ tracing::warn!("No stdout data from Claude for 5 seconds (continue mode)");
+ }
+ }
+ }
+ tracing::debug!("Claude stdout reader task ended (continue mode)");
+ });
+
+ // Spawn task to read stderr
+ let tx_stderr = tx;
+ tokio::spawn(async move {
+ let reader = BufReader::new(stderr);
+ let mut lines = reader.lines();
+ while let Ok(Some(line)) = lines.next_line().await {
+ tracing::debug!(line = %line, "Claude stderr (continue mode)");
+ if tx_stderr.send(OutputLine::stderr(line)).await.is_err() {
+ break;
+ }
+ }
+ tracing::debug!("Claude stderr reader task ended (continue mode)");
+ });
+
+ tracing::info!("Claude Code process spawned successfully in continue mode");
+
+ let process = ClaudeProcess {
+ child,
+ output_rx: rx,
+ stdin,
+ };
+
+ // Send the continuation prompt as a user message
+ tracing::info!(prompt_len = continuation_prompt.len(), "Sending continuation prompt to Claude via stdin");
+ process.send_user_message(continuation_prompt).await?;
+
+ Ok(process)
+ }
+
/// Check if the claude command is available.
pub async fn check_claude_available(&self) -> Result<String, ClaudeProcessError> {
let output = Command::new(&self.claude_command)
diff --git a/makima/src/daemon/task/completion_gate.rs b/makima/src/daemon/task/completion_gate.rs
new file mode 100644
index 0000000..69b7c6a
--- /dev/null
+++ b/makima/src/daemon/task/completion_gate.rs
@@ -0,0 +1,402 @@
+//! Completion gate parsing for autonomous loop mode.
+//!
+//! This module parses COMPLETION_GATE blocks from Claude's output to determine
+//! if the task is truly complete. The format is inspired by Ralph's autonomous
+//! development framework.
+//!
+//! Format:
+//! ```
+//! <COMPLETION_GATE>
+//! ready: true|false
+//! reason: "explanation of completion status"
+//! progress: "summary of what was accomplished"
+//! blockers: ["list", "of", "blockers"] (optional, only when ready: false)
+//! </COMPLETION_GATE>
+//! ```
+
+use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
+
+/// Represents a parsed COMPLETION_GATE block from Claude's output.
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+pub struct CompletionGate {
+ /// Whether the task is ready to complete.
+ pub ready: bool,
+ /// Explanation of the completion status.
+ pub reason: Option<String>,
+ /// Summary of what was accomplished.
+ pub progress: Option<String>,
+ /// List of blockers if not ready.
+ pub blockers: Option<Vec<String>>,
+ /// Any additional fields that were parsed.
+ #[serde(flatten)]
+ pub extra: HashMap<String, serde_json::Value>,
+}
+
+impl CompletionGate {
+ /// Parse a COMPLETION_GATE block from text output.
+ ///
+ /// Returns None if no valid COMPLETION_GATE is found.
+ pub fn parse(text: &str) -> Option<Self> {
+ // Find the COMPLETION_GATE block
+ let start_tag = "<COMPLETION_GATE>";
+ let end_tag = "</COMPLETION_GATE>";
+
+ let start_idx = text.find(start_tag)?;
+ let end_idx = text.find(end_tag)?;
+
+ if end_idx <= start_idx {
+ return None;
+ }
+
+ let content = &text[start_idx + start_tag.len()..end_idx];
+ let content = content.trim();
+
+ // Try to parse as JSON first
+ if content.starts_with('{') {
+ if let Ok(gate) = serde_json::from_str::<CompletionGate>(content) {
+ return Some(gate);
+ }
+ }
+
+ // Fall back to YAML-like parsing
+ Self::parse_yaml_like(content)
+ }
+
+ /// Parse a YAML-like format (key: value lines).
+ fn parse_yaml_like(content: &str) -> Option<Self> {
+ let mut gate = CompletionGate::default();
+
+ for line in content.lines() {
+ let line = line.trim();
+ if line.is_empty() {
+ continue;
+ }
+
+ if let Some((key, value)) = line.split_once(':') {
+ let key = key.trim().to_lowercase();
+ let value = value.trim();
+
+ match key.as_str() {
+ "ready" => {
+ gate.ready = value.to_lowercase() == "true"
+ || value == "yes"
+ || value == "1";
+ }
+ "reason" => {
+ gate.reason = Some(Self::unquote(value));
+ }
+ "progress" => {
+ gate.progress = Some(Self::unquote(value));
+ }
+ "blockers" => {
+ // Try to parse as JSON array
+ if let Ok(blockers) = serde_json::from_str::<Vec<String>>(value) {
+ gate.blockers = Some(blockers);
+ } else {
+ // Single blocker as string
+ gate.blockers = Some(vec![Self::unquote(value)]);
+ }
+ }
+ _ => {
+ // Store unknown fields
+ if let Ok(json_val) = serde_json::from_str(value) {
+ gate.extra.insert(key, json_val);
+ } else {
+ gate.extra.insert(
+ key,
+ serde_json::Value::String(Self::unquote(value)),
+ );
+ }
+ }
+ }
+ }
+ }
+
+ Some(gate)
+ }
+
+ /// Remove surrounding quotes from a string value.
+ fn unquote(s: &str) -> String {
+ let s = s.trim();
+ if (s.starts_with('"') && s.ends_with('"'))
+ || (s.starts_with('\'') && s.ends_with('\''))
+ {
+ s[1..s.len() - 1].to_string()
+ } else {
+ s.to_string()
+ }
+ }
+
+ /// Find all COMPLETION_GATE blocks in the output and return the last one.
+ ///
+ /// This is useful when Claude produces multiple completion gates during
+ /// a long-running task, and we want to use the final status.
+ pub fn parse_last(text: &str) -> Option<Self> {
+ let end_tag = "</COMPLETION_GATE>";
+ let mut last_gate = None;
+ let mut search_start = 0;
+
+ while let Some(end_idx) = text[search_start..].find(end_tag) {
+ let absolute_end = search_start + end_idx + end_tag.len();
+ if let Some(gate) = Self::parse(&text[..absolute_end]) {
+ last_gate = Some(gate);
+ }
+ search_start = absolute_end;
+ }
+
+ last_gate
+ }
+}
+
+/// State tracking for the circuit breaker in autonomous loop mode.
+#[derive(Debug, Clone, Default)]
+pub struct CircuitBreaker {
+ /// Number of consecutive runs without file changes.
+ pub runs_without_changes: u32,
+ /// Threshold for opening circuit due to no changes (default: 3).
+ pub no_change_threshold: u32,
+ /// Number of consecutive runs with the same error.
+ pub same_error_count: u32,
+ /// Threshold for opening circuit due to same error (default: 5).
+ pub same_error_threshold: u32,
+ /// Last error message seen.
+ pub last_error: Option<String>,
+ /// Total number of loop iterations.
+ pub iteration_count: u32,
+ /// Maximum allowed iterations (default: 10).
+ pub max_iterations: u32,
+ /// Whether the circuit is open (task should stop).
+ pub is_open: bool,
+ /// Reason why circuit was opened.
+ pub open_reason: Option<String>,
+}
+
+impl CircuitBreaker {
+ /// Create a new circuit breaker with default thresholds.
+ pub fn new() -> Self {
+ Self {
+ no_change_threshold: 3,
+ same_error_threshold: 5,
+ max_iterations: 10,
+ ..Default::default()
+ }
+ }
+
+ /// Create with custom thresholds.
+ pub fn with_thresholds(no_change: u32, same_error: u32, max_iterations: u32) -> Self {
+ Self {
+ no_change_threshold: no_change,
+ same_error_threshold: same_error,
+ max_iterations,
+ ..Default::default()
+ }
+ }
+
+ /// Record a new iteration. Returns true if circuit should remain closed.
+ pub fn record_iteration(&mut self, had_changes: bool, error: Option<&str>) -> bool {
+ self.iteration_count += 1;
+
+ // Check max iterations
+ if self.iteration_count >= self.max_iterations {
+ self.is_open = true;
+ self.open_reason = Some(format!(
+ "Maximum iterations ({}) reached",
+ self.max_iterations
+ ));
+ return false;
+ }
+
+ // Track file changes
+ if had_changes {
+ self.runs_without_changes = 0;
+ } else {
+ self.runs_without_changes += 1;
+ if self.runs_without_changes >= self.no_change_threshold {
+ self.is_open = true;
+ self.open_reason = Some(format!(
+ "No file changes for {} consecutive runs",
+ self.runs_without_changes
+ ));
+ return false;
+ }
+ }
+
+ // Track errors
+ match (error, &self.last_error) {
+ (Some(err), Some(last)) if err == last => {
+ self.same_error_count += 1;
+ if self.same_error_count >= self.same_error_threshold {
+ self.is_open = true;
+ self.open_reason = Some(format!(
+ "Same error repeated {} times: {}",
+ self.same_error_count, err
+ ));
+ return false;
+ }
+ }
+ (Some(err), _) => {
+ self.last_error = Some(err.to_string());
+ self.same_error_count = 1;
+ }
+ (None, _) => {
+ self.same_error_count = 0;
+ self.last_error = None;
+ }
+ }
+
+ true // Circuit remains closed
+ }
+
+ /// Check if the circuit breaker is open.
+ pub fn should_stop(&self) -> bool {
+ self.is_open
+ }
+
+ /// Reset the circuit breaker.
+ pub fn reset(&mut self) {
+ *self = Self::with_thresholds(
+ self.no_change_threshold,
+ self.same_error_threshold,
+ self.max_iterations,
+ );
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_parse_yaml_format() {
+ let text = r#"
+Some output before
+<COMPLETION_GATE>
+ready: true
+reason: "All tests pass"
+progress: "Implemented feature X"
+</COMPLETION_GATE>
+More output after
+"#;
+
+ let gate = CompletionGate::parse(text).unwrap();
+ assert!(gate.ready);
+ assert_eq!(gate.reason.as_deref(), Some("All tests pass"));
+ assert_eq!(gate.progress.as_deref(), Some("Implemented feature X"));
+ }
+
+ #[test]
+ fn test_parse_not_ready() {
+ let text = r#"
+<COMPLETION_GATE>
+ready: false
+reason: "Tests are failing"
+blockers: ["Fix test_foo", "Fix test_bar"]
+</COMPLETION_GATE>
+"#;
+
+ let gate = CompletionGate::parse(text).unwrap();
+ assert!(!gate.ready);
+ assert_eq!(gate.reason.as_deref(), Some("Tests are failing"));
+ assert_eq!(
+ gate.blockers,
+ Some(vec!["Fix test_foo".to_string(), "Fix test_bar".to_string()])
+ );
+ }
+
+ #[test]
+ fn test_parse_json_format() {
+ let text = r#"
+<COMPLETION_GATE>
+{
+ "ready": true,
+ "reason": "Done",
+ "progress": "All good"
+}
+</COMPLETION_GATE>
+"#;
+
+ let gate = CompletionGate::parse(text).unwrap();
+ assert!(gate.ready);
+ assert_eq!(gate.reason.as_deref(), Some("Done"));
+ }
+
+ #[test]
+ fn test_parse_last_gate() {
+ let text = r#"
+<COMPLETION_GATE>
+ready: false
+reason: "Still working"
+</COMPLETION_GATE>
+Some more work...
+<COMPLETION_GATE>
+ready: true
+reason: "Finally done"
+</COMPLETION_GATE>
+"#;
+
+ let gate = CompletionGate::parse_last(text).unwrap();
+ assert!(gate.ready);
+ assert_eq!(gate.reason.as_deref(), Some("Finally done"));
+ }
+
+ #[test]
+ fn test_no_gate() {
+ let text = "No completion gate here";
+ assert!(CompletionGate::parse(text).is_none());
+ }
+
+ #[test]
+ fn test_circuit_breaker_max_iterations() {
+ let mut cb = CircuitBreaker::with_thresholds(3, 5, 5);
+ for _ in 0..4 {
+ assert!(cb.record_iteration(true, None));
+ }
+ assert!(!cb.record_iteration(true, None)); // 5th iteration should trip
+ assert!(cb.is_open);
+ assert!(cb.open_reason.as_ref().unwrap().contains("Maximum iterations"));
+ }
+
+ #[test]
+ fn test_circuit_breaker_no_changes() {
+ let mut cb = CircuitBreaker::with_thresholds(3, 5, 10);
+ assert!(cb.record_iteration(false, None)); // 1st no change
+ assert!(cb.record_iteration(false, None)); // 2nd no change
+ assert!(!cb.record_iteration(false, None)); // 3rd no change - trips
+ assert!(cb.is_open);
+ assert!(cb.open_reason.as_ref().unwrap().contains("No file changes"));
+ }
+
+ #[test]
+ fn test_circuit_breaker_same_error() {
+ let mut cb = CircuitBreaker::with_thresholds(10, 3, 10);
+ let err = "Test failed";
+ assert!(cb.record_iteration(true, Some(err)));
+ assert!(cb.record_iteration(true, Some(err)));
+ assert!(!cb.record_iteration(true, Some(err))); // 3rd same error - trips
+ assert!(cb.is_open);
+ assert!(cb.open_reason.as_ref().unwrap().contains("Same error"));
+ }
+
+ #[test]
+ fn test_circuit_breaker_different_errors_ok() {
+ let mut cb = CircuitBreaker::with_thresholds(10, 3, 10);
+ assert!(cb.record_iteration(true, Some("error 1")));
+ assert!(cb.record_iteration(true, Some("error 2")));
+ assert!(cb.record_iteration(true, Some("error 3")));
+ // Different errors don't trip the circuit
+ assert!(!cb.is_open);
+ }
+
+ #[test]
+ fn test_circuit_breaker_changes_reset() {
+ let mut cb = CircuitBreaker::with_thresholds(3, 5, 10);
+ assert!(cb.record_iteration(false, None)); // 1 no change
+ assert!(cb.record_iteration(false, None)); // 2 no changes
+ assert!(cb.record_iteration(true, None)); // has changes - resets
+ assert!(cb.record_iteration(false, None)); // 1 no change again
+ assert!(cb.record_iteration(false, None)); // 2 no changes
+ // Still shouldn't trip because we had a change in between
+ assert!(!cb.is_open);
+ }
+}
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs
index 4ccedb2..75c884b 100644
--- a/makima/src/daemon/task/manager.rs
+++ b/makima/src/daemon/task/manager.rs
@@ -12,6 +12,7 @@ use uuid::Uuid;
use std::collections::HashSet;
+use super::completion_gate::{CircuitBreaker, CompletionGate};
use super::state::TaskState;
use crate::daemon::error::{DaemonError, TaskError, TaskResult};
use crate::daemon::process::{ClaudeInputMessage, ProcessManager};
@@ -944,6 +945,8 @@ pub struct ManagedTask {
pub copy_files: Option<Vec<String>>,
/// Contract ID if this task is associated with a contract.
pub contract_id: Option<Uuid>,
+ /// Whether to run in autonomous loop mode.
+ pub autonomous_loop: bool,
/// Time task was created.
pub created_at: Instant,
/// Time task started running.
@@ -973,6 +976,8 @@ pub struct TaskConfig {
pub enable_permissions: bool,
/// Disable verbose output.
pub disable_verbose: bool,
+ /// Bubblewrap sandbox configuration.
+ pub bubblewrap: Option<crate::daemon::config::BubblewrapConfig>,
}
impl Default for TaskConfig {
@@ -986,6 +991,7 @@ impl Default for TaskConfig {
claude_pre_args: Vec::new(),
enable_permissions: false,
disable_verbose: false,
+ bubblewrap: None,
}
}
}
@@ -1027,7 +1033,8 @@ impl TaskManager {
.with_pre_args(config.claude_pre_args.clone())
.with_permissions_enabled(config.enable_permissions)
.with_verbose_disabled(config.disable_verbose)
- .with_env(config.env_vars.clone()),
+ .with_env(config.env_vars.clone())
+ .with_bubblewrap(config.bubblewrap.clone()),
);
let temp_manager = Arc::new(TempManager::new());
@@ -1150,6 +1157,7 @@ impl TaskManager {
copy_files,
contract_id,
is_supervisor,
+ autonomous_loop,
} => {
tracing::info!(
task_id = %task_id,
@@ -1161,6 +1169,7 @@ impl TaskManager {
depth = depth,
is_orchestrator = is_orchestrator,
is_supervisor = is_supervisor,
+ autonomous_loop = autonomous_loop,
target_repo_path = ?target_repo_path,
completion_action = ?completion_action,
continue_from_task_id = ?continue_from_task_id,
@@ -1173,7 +1182,7 @@ impl TaskManager {
task_id, task_name, plan, repo_url, base_branch, target_branch,
parent_task_id, depth, is_orchestrator, is_supervisor,
target_repo_path, completion_action, continue_from_task_id,
- copy_files, contract_id
+ copy_files, contract_id, autonomous_loop
).await?;
}
DaemonCommand::PauseTask { task_id } => {
@@ -1252,6 +1261,7 @@ impl TaskManager {
None, // continue_from_task_id
None, // copy_files
contract_id,
+ false, // autonomous_loop - supervisors don't use this
).await {
tracing::error!(
task_id = %task_id,
@@ -1421,6 +1431,17 @@ impl TaskManager {
tracing::info!(task_id = %task_id, "Getting task diff");
self.handle_get_task_diff(task_id).await?;
}
+ DaemonCommand::CleanupWorktree {
+ task_id,
+ delete_branch,
+ } => {
+ tracing::info!(
+ task_id = %task_id,
+ delete_branch = delete_branch,
+ "Cleaning up worktree"
+ );
+ self.handle_cleanup_worktree(task_id, delete_branch).await?;
+ }
}
Ok(())
}
@@ -1444,6 +1465,7 @@ impl TaskManager {
continue_from_task_id: Option<Uuid>,
copy_files: Option<Vec<String>>,
contract_id: Option<Uuid>,
+ autonomous_loop: bool,
) -> TaskResult<()> {
tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, "=== SPAWN_TASK START ===");
@@ -1496,6 +1518,7 @@ impl TaskManager {
continue_from_task_id,
copy_files: copy_files.clone(),
contract_id,
+ autonomous_loop,
created_at: Instant::now(),
started_at: None,
completed_at: None,
@@ -1519,7 +1542,7 @@ impl TaskManager {
if let Err(e) = inner.run_task(
task_id, task_name, plan, repo_url, base_branch, target_branch,
is_orchestrator, is_supervisor, target_repo_path, completion_action,
- continue_from_task_id, copy_files, contract_id
+ continue_from_task_id, copy_files, contract_id, autonomous_loop
).await {
tracing::error!(task_id = %task_id, error = %e, "Task execution failed");
inner.mark_failed(task_id, &e.to_string()).await;
@@ -2046,6 +2069,76 @@ impl TaskManager {
Ok(())
}
+ /// Handle CleanupWorktree command.
+ ///
+ /// Removes a task's worktree and optionally its branch.
+ /// Used when a contract is completed or deleted to clean up associated task worktrees.
+ async fn handle_cleanup_worktree(
+ &self,
+ task_id: Uuid,
+ delete_branch: bool,
+ ) -> Result<(), DaemonError> {
+ // Try to get the worktree path, but don't fail if not found
+ let worktree_result = self.get_task_worktree_path(task_id).await;
+
+ let (success, message) = match worktree_result {
+ Ok(worktree_path) => {
+ // Remove the worktree
+ match self.worktree_manager.remove_worktree(&worktree_path, delete_branch).await {
+ Ok(()) => {
+ tracing::info!(
+ task_id = %task_id,
+ worktree_path = %worktree_path.display(),
+ delete_branch = delete_branch,
+ "Worktree cleaned up successfully"
+ );
+
+ // Also remove task from in-memory tracking
+ self.tasks.write().await.remove(&task_id);
+ self.task_inputs.write().await.remove(&task_id);
+ self.merge_trackers.write().await.remove(&task_id);
+ self.active_pids.write().await.remove(&task_id);
+
+ (true, format!("Worktree cleaned up: {}", worktree_path.display()))
+ }
+ Err(e) => {
+ tracing::warn!(
+ task_id = %task_id,
+ worktree_path = %worktree_path.display(),
+ error = %e,
+ "Failed to remove worktree"
+ );
+ (false, format!("Failed to remove worktree: {}", e))
+ }
+ }
+ }
+ Err(_) => {
+ // Worktree not found - this is OK, it may have already been cleaned up
+ tracing::debug!(
+ task_id = %task_id,
+ "No worktree found for task, may have already been cleaned up"
+ );
+
+ // Still remove from in-memory tracking
+ self.tasks.write().await.remove(&task_id);
+ self.task_inputs.write().await.remove(&task_id);
+ self.merge_trackers.write().await.remove(&task_id);
+ self.active_pids.write().await.remove(&task_id);
+
+ (true, "No worktree found, task tracking cleaned up".to_string())
+ }
+ };
+
+ // Send result back to server
+ let msg = DaemonMessage::CleanupWorktreeResult {
+ task_id,
+ success,
+ message,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ Ok(())
+ }
+
/// Handle ReadRepoFile command.
///
/// Reads a file from a repository on the daemon's filesystem and sends
@@ -2436,6 +2529,7 @@ impl TaskManagerInner {
continue_from_task_id: Option<Uuid>,
copy_files: Option<Vec<String>>,
contract_id: Option<Uuid>,
+ autonomous_loop: bool,
) -> Result<(), DaemonError> {
tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, "=== RUN_TASK START ===");
@@ -2908,6 +3002,9 @@ impl TaskManagerInner {
);
let _ = self.ws_tx.send(msg).await;
+ // Clone extra_env for use in autonomous loop iterations
+ let extra_env_for_loop = extra_env.clone();
+
tracing::debug!(task_id = %task_id, has_system_prompt = system_prompt.is_some(), "Calling process_manager.spawn()...");
let mut process = self.process_manager
.spawn_with_system_prompt(&working_dir, &full_plan, extra_env, system_prompt.as_deref())
@@ -2934,7 +3031,7 @@ impl TaskManagerInner {
// Get stdin handle for input forwarding and completion signaling
let stdin_handle = process.stdin_handle();
- let stdin_handle_for_completion = stdin_handle.clone();
+ let mut stdin_handle_for_completion = stdin_handle.clone();
tracing::info!(task_id = %task_id, "Setting up stdin forwarder for task input (JSON protocol)");
tokio::spawn(async move {
@@ -2998,142 +3095,311 @@ impl TaskManagerInner {
let daemon_hostname = hostname::get().ok().and_then(|h| h.into_string().ok());
let mut auth_error_handled = false;
- let mut output_count = 0u64;
- let mut output_bytes = 0usize;
- let startup_timeout = tokio::time::Duration::from_secs(30);
- let mut startup_check = tokio::time::interval(tokio::time::Duration::from_secs(5));
- startup_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
- let startup_deadline = tokio::time::Instant::now() + startup_timeout;
+ // For autonomous loop mode: track accumulated output for COMPLETION_GATE detection
+ let mut accumulated_output = String::new();
+ let mut circuit_breaker = CircuitBreaker::new();
+ let mut iteration_count = 0u32;
+ let mut final_exit_code: i64 = -1; // Track the final exit code across iterations
- loop {
- tokio::select! {
- maybe_line = process.next_output() => {
- match maybe_line {
- Some(line) => {
- output_count += 1;
- output_bytes += line.content.len();
-
- if output_count == 1 {
- tracing::info!(task_id = %task_id, "Received first output line from Claude");
- }
- if output_count % 100 == 0 {
- tracing::debug!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output progress");
- }
+ // Autonomous loop: we may run multiple iterations
+ 'autonomous_loop: loop {
+ iteration_count += 1;
- // Log output details for debugging
- tracing::trace!(
- task_id = %task_id,
- line_num = output_count,
- content_len = line.content.len(),
- is_stdout = line.is_stdout,
- json_type = ?line.json_type,
- "Forwarding output to WebSocket"
- );
+ if autonomous_loop && iteration_count > 1 {
+ tracing::info!(
+ task_id = %task_id,
+ iteration = iteration_count,
+ "Starting autonomous loop iteration"
+ );
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("\n[Autonomous Loop] Starting iteration {} (--continue mode)\n", iteration_count),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ // For subsequent iterations, spawn with --continue flag
+ let continuation_prompt = "Continue working on the task. Review your previous output and progress. When you are completely done, output a COMPLETION_GATE block with ready: true.";
+
+ process = self.process_manager
+ .spawn_continue(&working_dir, continuation_prompt, extra_env_for_loop.clone(), system_prompt.as_deref())
+ .await
+ .map_err(|e| {
+ tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process for continuation");
+ DaemonError::Task(TaskError::SetupFailed(e.to_string()))
+ })?;
+
+ // Register the new process PID
+ if let Some(pid) = process.id() {
+ self.active_pids.write().await.insert(task_id, pid);
+ tracing::info!(task_id = %task_id, pid = pid, iteration = iteration_count, "Claude continue process spawned");
+ }
+
+ // Reset stdin handle for the new process
+ stdin_handle_for_completion = process.stdin_handle();
+ }
+
+ // Clear output for this iteration (we'll check for COMPLETION_GATE in the new output)
+ let mut iteration_output = String::new();
+
+ let mut output_count = 0u64;
+ let mut output_bytes = 0usize;
+ let startup_timeout = tokio::time::Duration::from_secs(30);
+ let mut startup_check = tokio::time::interval(tokio::time::Duration::from_secs(5));
+ startup_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
+ let startup_deadline = tokio::time::Instant::now() + startup_timeout;
- // Check if this is a "result" message indicating task completion
- // With --input-format=stream-json, Claude waits for more input after completion
- // We close stdin to signal EOF and let the process exit
- if line.json_type.as_deref() == Some("result") {
- tracing::info!(task_id = %task_id, "Received result message, closing stdin to signal completion");
- let mut stdin_guard = stdin_handle_for_completion.lock().await;
- if let Some(mut stdin) = stdin_guard.take() {
- let _ = stdin.shutdown().await;
+ loop {
+ tokio::select! {
+ maybe_line = process.next_output() => {
+ match maybe_line {
+ Some(line) => {
+ output_count += 1;
+ output_bytes += line.content.len();
+
+ // Accumulate output for COMPLETION_GATE detection in autonomous loop mode
+ if autonomous_loop {
+ iteration_output.push_str(&line.content);
+ iteration_output.push('\n');
}
- }
- // Check for OAuth auth error before sending output
- let content_for_auth_check = line.content.clone();
- let json_type_for_auth_check = line.json_type.clone();
- let is_stdout_for_auth_check = line.is_stdout;
+ if output_count == 1 {
+ tracing::info!(task_id = %task_id, "Received first output line from Claude");
+ }
+ if output_count % 100 == 0 {
+ tracing::debug!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output progress");
+ }
- let msg = DaemonMessage::task_output(task_id, line.content, false);
- if ws_tx.send(msg).await.is_err() {
- tracing::warn!(task_id = %task_id, "Failed to send output, channel closed");
- break;
- }
+ // Log output details for debugging
+ tracing::trace!(
+ task_id = %task_id,
+ line_num = output_count,
+ content_len = line.content.len(),
+ is_stdout = line.is_stdout,
+ json_type = ?line.json_type,
+ "Forwarding output to WebSocket"
+ );
- // Detect OAuth token expiration and trigger remote login flow
- if !auth_error_handled && is_oauth_auth_error(&content_for_auth_check, json_type_for_auth_check.as_deref(), is_stdout_for_auth_check) {
- auth_error_handled = true;
- tracing::warn!(task_id = %task_id, "OAuth authentication error detected, initiating remote login flow");
-
- // Spawn claude setup-token to get login URL
- if let Some(login_url) = get_oauth_login_url(&claude_command).await {
- tracing::info!(task_id = %task_id, login_url = %login_url, "Got OAuth login URL");
- let auth_msg = DaemonMessage::AuthenticationRequired {
- task_id: Some(task_id),
- login_url,
- hostname: daemon_hostname.clone(),
- };
- if ws_tx.send(auth_msg).await.is_err() {
- tracing::warn!(task_id = %task_id, "Failed to send auth required message");
+ // Check if this is a "result" message indicating task completion
+ // With --input-format=stream-json, Claude waits for more input after completion
+ // We close stdin to signal EOF and let the process exit
+ if line.json_type.as_deref() == Some("result") {
+ tracing::info!(task_id = %task_id, "Received result message, closing stdin to signal completion");
+ let mut stdin_guard = stdin_handle_for_completion.lock().await;
+ if let Some(mut stdin) = stdin_guard.take() {
+ let _ = stdin.shutdown().await;
+ }
+ }
+
+ // Check for OAuth auth error before sending output
+ let content_for_auth_check = line.content.clone();
+ let json_type_for_auth_check = line.json_type.clone();
+ let is_stdout_for_auth_check = line.is_stdout;
+
+ let msg = DaemonMessage::task_output(task_id, line.content, false);
+ if ws_tx.send(msg).await.is_err() {
+ tracing::warn!(task_id = %task_id, "Failed to send output, channel closed");
+ break;
+ }
+
+ // Detect OAuth token expiration and trigger remote login flow
+ if !auth_error_handled && is_oauth_auth_error(&content_for_auth_check, json_type_for_auth_check.as_deref(), is_stdout_for_auth_check) {
+ auth_error_handled = true;
+ tracing::warn!(task_id = %task_id, "OAuth authentication error detected, initiating remote login flow");
+
+ // Spawn claude setup-token to get login URL
+ if let Some(login_url) = get_oauth_login_url(&claude_command).await {
+ tracing::info!(task_id = %task_id, login_url = %login_url, "Got OAuth login URL");
+ let auth_msg = DaemonMessage::AuthenticationRequired {
+ task_id: Some(task_id),
+ login_url,
+ hostname: daemon_hostname.clone(),
+ };
+ if ws_tx.send(auth_msg).await.is_err() {
+ tracing::warn!(task_id = %task_id, "Failed to send auth required message");
+ }
+ } else {
+ tracing::error!(task_id = %task_id, "Failed to get OAuth login URL from setup-token");
+ let fallback_msg = DaemonMessage::task_output(
+ task_id,
+ format!("Authentication required on daemon{}. Please run 'claude /login' on the daemon machine.\n",
+ daemon_hostname.as_ref().map(|h| format!(" ({})", h)).unwrap_or_default()),
+ false,
+ );
+ let _ = ws_tx.send(fallback_msg).await;
}
- } else {
- tracing::error!(task_id = %task_id, "Failed to get OAuth login URL from setup-token");
- let fallback_msg = DaemonMessage::task_output(
- task_id,
- format!("Authentication required on daemon{}. Please run 'claude /login' on the daemon machine.\n",
- daemon_hostname.as_ref().map(|h| format!(" ({})", h)).unwrap_or_default()),
- false,
- );
- let _ = ws_tx.send(fallback_msg).await;
}
}
- }
- None => {
- tracing::info!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output stream ended");
- break;
+ None => {
+ tracing::info!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output stream ended");
+ break;
+ }
}
}
- }
- _ = startup_check.tick(), if output_count == 0 => {
- // Check if process is still alive
- match process.try_wait() {
- Ok(Some(exit_code)) => {
- tracing::error!(task_id = %task_id, exit_code = exit_code, "Claude process exited before producing output!");
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Error: Claude process exited unexpectedly with code {}\n", exit_code),
- false,
- );
- let _ = ws_tx.send(msg).await;
- break;
- }
- Ok(None) => {
- // Still running but no output
- if tokio::time::Instant::now() > startup_deadline {
- tracing::warn!(task_id = %task_id, "Claude process not producing output after 30s - may be stuck");
+ _ = startup_check.tick(), if output_count == 0 => {
+ // Check if process is still alive
+ match process.try_wait() {
+ Ok(Some(exit_code)) => {
+ tracing::error!(task_id = %task_id, exit_code = exit_code, "Claude process exited before producing output!");
let msg = DaemonMessage::task_output(
task_id,
- "Warning: Claude Code is taking longer than expected to start. It may be waiting for authentication or network access.\n".to_string(),
+ format!("Error: Claude process exited unexpectedly with code {}\n", exit_code),
false,
);
let _ = ws_tx.send(msg).await;
- } else {
- tracing::debug!(task_id = %task_id, "Claude process still running, waiting for output...");
+ break;
+ }
+ Ok(None) => {
+ // Still running but no output
+ if tokio::time::Instant::now() > startup_deadline {
+ tracing::warn!(task_id = %task_id, "Claude process not producing output after 30s - may be stuck");
+ let msg = DaemonMessage::task_output(
+ task_id,
+ "Warning: Claude Code is taking longer than expected to start. It may be waiting for authentication or network access.\n".to_string(),
+ false,
+ );
+ let _ = ws_tx.send(msg).await;
+ } else {
+ tracing::debug!(task_id = %task_id, "Claude process still running, waiting for output...");
+ }
+ }
+ Err(e) => {
+ tracing::error!(task_id = %task_id, error = %e, "Failed to check Claude process status");
}
- }
- Err(e) => {
- tracing::error!(task_id = %task_id, error = %e, "Failed to check Claude process status");
}
}
}
}
- }
- // Wait for process to exit
- let exit_code = process.wait().await.unwrap_or(-1);
+ // Wait for process to exit
+ let exit_code = process.wait().await.unwrap_or(-1);
+ final_exit_code = exit_code; // Store for use after the loop
+
+ // Unregister the process PID (process has exited)
+ self.active_pids.write().await.remove(&task_id);
+ tracing::debug!(task_id = %task_id, "Unregistered process PID");
+
+ // Clean up input channel for this task
+ self.task_inputs.write().await.remove(&task_id);
+ tracing::debug!(task_id = %task_id, "Removed task input channel");
- // Unregister the process PID (process has exited)
- self.active_pids.write().await.remove(&task_id);
- tracing::debug!(task_id = %task_id, "Unregistered process PID");
+ // Accumulate this iteration's output
+ accumulated_output.push_str(&iteration_output);
- // Clean up input channel for this task
- self.task_inputs.write().await.remove(&task_id);
- tracing::debug!(task_id = %task_id, "Removed task input channel");
+ // === AUTONOMOUS LOOP LOGIC ===
+ // Check if we should continue or complete
+ if autonomous_loop && exit_code == 0 {
+ // Check for COMPLETION_GATE in the output
+ let completion_gate = CompletionGate::parse_last(&iteration_output);
+
+ match completion_gate {
+ Some(gate) if gate.ready => {
+ tracing::info!(
+ task_id = %task_id,
+ iteration = iteration_count,
+ reason = ?gate.reason,
+ "COMPLETION_GATE ready=true detected, task complete"
+ );
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("\n[Autonomous Loop] Task completed after {} iteration(s). Reason: {}\n",
+ iteration_count,
+ gate.reason.unwrap_or_else(|| "Task complete".to_string())
+ ),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+ break 'autonomous_loop;
+ }
+ Some(gate) => {
+ // COMPLETION_GATE found but not ready
+ tracing::info!(
+ task_id = %task_id,
+ iteration = iteration_count,
+ reason = ?gate.reason,
+ blockers = ?gate.blockers,
+ "COMPLETION_GATE ready=false, will continue"
+ );
+
+ // Check circuit breaker
+ // For now, we consider output_bytes > 0 as "progress"
+ let had_progress = output_bytes > 0;
+ let error = gate.blockers.as_ref().and_then(|b| b.first()).map(|s| s.as_str());
+
+ if !circuit_breaker.record_iteration(had_progress, error) {
+ // Circuit breaker tripped
+ tracing::warn!(
+ task_id = %task_id,
+ reason = ?circuit_breaker.open_reason,
+ "Circuit breaker tripped, stopping autonomous loop"
+ );
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("\n[Autonomous Loop] Circuit breaker tripped: {}\n",
+ circuit_breaker.open_reason.as_deref().unwrap_or("Unknown reason")
+ ),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+ break 'autonomous_loop;
+ }
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("\n[Autonomous Loop] COMPLETION_GATE ready=false. Reason: {}. Restarting...\n",
+ gate.reason.unwrap_or_else(|| "Not complete".to_string())
+ ),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ // Continue to next iteration
+ continue 'autonomous_loop;
+ }
+ None => {
+ // No COMPLETION_GATE found - check circuit breaker and continue
+ tracing::info!(
+ task_id = %task_id,
+ iteration = iteration_count,
+ "No COMPLETION_GATE found, will restart with continuation prompt"
+ );
+
+ let had_progress = output_bytes > 0;
+ if !circuit_breaker.record_iteration(had_progress, None) {
+ tracing::warn!(
+ task_id = %task_id,
+ reason = ?circuit_breaker.open_reason,
+ "Circuit breaker tripped (no COMPLETION_GATE), stopping"
+ );
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("\n[Autonomous Loop] Circuit breaker tripped: {}\n",
+ circuit_breaker.open_reason.as_deref().unwrap_or("Unknown reason")
+ ),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+ break 'autonomous_loop;
+ }
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ "\n[Autonomous Loop] No COMPLETION_GATE found. Restarting with --continue...\n".to_string(),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ continue 'autonomous_loop;
+ }
+ }
+ } else {
+ // Not in autonomous loop mode or process failed - exit normally
+ break 'autonomous_loop;
+ }
+ } // end 'autonomous_loop
// Update state based on exit code
- let success = exit_code == 0;
+ let success = final_exit_code == 0;
let new_state = if success {
TaskState::Completed
} else {
@@ -3142,7 +3408,7 @@ impl TaskManagerInner {
tracing::info!(
task_id = %task_id,
- exit_code = exit_code,
+ exit_code = final_exit_code,
success = success,
new_state = ?new_state,
"Claude process exited, updating task state"
@@ -3154,7 +3420,7 @@ impl TaskManagerInner {
task.state = new_state;
task.completed_at = Some(Instant::now());
if !success {
- task.error = Some(format!("Process exited with code {}", exit_code));
+ task.error = Some(format!("Process exited with code {}", final_exit_code));
}
}
}
@@ -3196,7 +3462,7 @@ impl TaskManagerInner {
if is_supervisor {
tracing::info!(
task_id = %task_id,
- exit_code = exit_code,
+ exit_code = final_exit_code,
"Supervisor Claude process exited - NOT marking as complete"
);
// Update local state to reflect it's paused/waiting for input
@@ -3218,7 +3484,7 @@ impl TaskManagerInner {
let error = if success {
None
} else {
- Some(format!("Exit code: {}", exit_code))
+ Some(format!("Exit code: {}", final_exit_code))
};
tracing::info!(task_id = %task_id, success = success, "Notifying server of task completion");
let msg = DaemonMessage::task_complete(task_id, success, error);
diff --git a/makima/src/daemon/task/mod.rs b/makima/src/daemon/task/mod.rs
index 29c261e..3830e1d 100644
--- a/makima/src/daemon/task/mod.rs
+++ b/makima/src/daemon/task/mod.rs
@@ -1,7 +1,9 @@
//! Task management and execution.
+pub mod completion_gate;
pub mod manager;
pub mod state;
+pub use completion_gate::CompletionGate;
pub use manager::{ManagedTask, TaskConfig, TaskManager};
pub use state::TaskState;
diff --git a/makima/src/daemon/task/state.rs b/makima/src/daemon/task/state.rs
index ca5fc01..7b59b62 100644
--- a/makima/src/daemon/task/state.rs
+++ b/makima/src/daemon/task/state.rs
@@ -124,7 +124,9 @@ impl Default for TaskState {
#[cfg(test)]
mod tests {
+ #[allow(unused_imports)]
use crate::daemon::*;
+ use super::TaskState;
#[test]
fn test_valid_transitions() {
diff --git a/makima/src/daemon/ws/protocol.rs b/makima/src/daemon/ws/protocol.rs
index e86a577..714c0f9 100644
--- a/makima/src/daemon/ws/protocol.rs
+++ b/makima/src/daemon/ws/protocol.rs
@@ -250,6 +250,14 @@ pub enum DaemonMessage {
diff: Option<String>,
error: Option<String>,
},
+
+ /// Response to CleanupWorktree command.
+ CleanupWorktreeResult {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ success: bool,
+ message: String,
+ },
}
/// Information about a branch (used in BranchList message).
@@ -323,6 +331,11 @@ pub enum DaemonCommand {
/// Whether this task is a supervisor (long-running contract orchestrator).
#[serde(rename = "isSupervisor", default)]
is_supervisor: bool,
+ /// Whether to run in autonomous loop mode.
+ /// When enabled, task will automatically restart with --continue if it exits
+ /// without a COMPLETION_GATE indicating ready: true.
+ #[serde(rename = "autonomousLoop", default)]
+ autonomous_loop: bool,
},
/// Pause a running task.
@@ -530,6 +543,15 @@ pub enum DaemonCommand {
task_id: Uuid,
},
+ /// Clean up a task's worktree (used when contract is completed/deleted).
+ CleanupWorktree {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ /// Whether to delete the associated branch.
+ #[serde(rename = "deleteBranch")]
+ delete_branch: bool,
+ },
+
/// Error response.
Error {
code: String,
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs
index 8ab3a10..40d4109 100644
--- a/makima/src/db/models.rs
+++ b/makima/src/db/models.rs
@@ -1194,6 +1194,11 @@ pub struct Contract {
/// The long-running supervisor task that orchestrates this contract
#[serde(skip_serializing_if = "Option::is_none")]
pub supervisor_task_id: Option<Uuid>,
+ /// Whether tasks for this contract should run in autonomous loop mode.
+ /// When enabled, tasks will automatically restart with --continue if they exit
+ /// without a COMPLETION_GATE indicating ready: true.
+ #[serde(default)]
+ pub autonomous_loop: bool,
pub version: i32,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
@@ -1314,6 +1319,11 @@ pub struct CreateContractRequest {
/// - specification: defaults to "research"
#[serde(default)]
pub initial_phase: Option<String>,
+ /// Enable autonomous loop mode for tasks in this contract.
+ /// When enabled, tasks automatically restart with --continue if they exit
+ /// without a COMPLETION_GATE indicating ready: true.
+ #[serde(default)]
+ pub autonomous_loop: Option<bool>,
}
/// Request payload for updating a contract
@@ -1327,6 +1337,9 @@ pub struct UpdateContractRequest {
/// Supervisor task ID for contract orchestration
#[serde(skip_serializing_if = "Option::is_none")]
pub supervisor_task_id: Option<Uuid>,
+ /// Enable or disable autonomous loop mode for tasks in this contract.
+ #[serde(default)]
+ pub autonomous_loop: Option<bool>,
/// Version for optimistic locking
pub version: Option<i32>,
}
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index 0e85be1..92b2048 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -2052,10 +2052,12 @@ pub async fn create_contract_for_owner(
)));
}
+ let autonomous_loop = req.autonomous_loop.unwrap_or(false);
+
sqlx::query_as::<_, Contract>(
r#"
- INSERT INTO contracts (owner_id, name, description, contract_type, phase)
- VALUES ($1, $2, $3, $4, $5)
+ INSERT INTO contracts (owner_id, name, description, contract_type, phase, autonomous_loop)
+ VALUES ($1, $2, $3, $4, $5, $6)
RETURNING *
"#,
)
@@ -2064,6 +2066,7 @@ pub async fn create_contract_for_owner(
.bind(&req.description)
.bind(contract_type)
.bind(phase)
+ .bind(autonomous_loop)
.fetch_one(pool)
.await
}
@@ -2162,14 +2165,15 @@ pub async fn update_contract_for_owner(
let phase = req.phase.unwrap_or(existing.phase);
let status = req.status.unwrap_or(existing.status);
let supervisor_task_id = req.supervisor_task_id.or(existing.supervisor_task_id);
+ let autonomous_loop = req.autonomous_loop.unwrap_or(existing.autonomous_loop);
let result = if req.version.is_some() {
sqlx::query_as::<_, Contract>(
r#"
UPDATE contracts
SET name = $3, description = $4, phase = $5, status = $6,
- supervisor_task_id = $7, version = version + 1, updated_at = NOW()
- WHERE id = $1 AND owner_id = $2 AND version = $8
+ supervisor_task_id = $7, autonomous_loop = $8, version = version + 1, updated_at = NOW()
+ WHERE id = $1 AND owner_id = $2 AND version = $9
RETURNING *
"#,
)
@@ -2180,6 +2184,7 @@ pub async fn update_contract_for_owner(
.bind(&phase)
.bind(&status)
.bind(supervisor_task_id)
+ .bind(autonomous_loop)
.bind(req.version.unwrap())
.fetch_optional(pool)
.await?
@@ -2188,7 +2193,7 @@ pub async fn update_contract_for_owner(
r#"
UPDATE contracts
SET name = $3, description = $4, phase = $5, status = $6,
- supervisor_task_id = $7, version = version + 1, updated_at = NOW()
+ supervisor_task_id = $7, autonomous_loop = $8, version = version + 1, updated_at = NOW()
WHERE id = $1 AND owner_id = $2
RETURNING *
"#,
@@ -2200,6 +2205,7 @@ pub async fn update_contract_for_owner(
.bind(&phase)
.bind(&status)
.bind(supervisor_task_id)
+ .bind(autonomous_loop)
.fetch_optional(pool)
.await?
};
@@ -2591,6 +2597,32 @@ pub async fn list_tasks_in_contract(
.await
}
+/// Minimal task info for worktree cleanup operations.
+#[derive(Debug, Clone, sqlx::FromRow)]
+pub struct TaskWorktreeInfo {
+ pub id: Uuid,
+ pub daemon_id: Option<Uuid>,
+ pub overlay_path: Option<String>,
+}
+
+/// List tasks in a contract with their daemon/worktree info.
+/// Used for cleaning up worktrees when a contract is completed or deleted.
+pub async fn list_contract_tasks_with_worktree_info(
+ pool: &PgPool,
+ contract_id: Uuid,
+) -> Result<Vec<TaskWorktreeInfo>, sqlx::Error> {
+ sqlx::query_as::<_, TaskWorktreeInfo>(
+ r#"
+ SELECT id, daemon_id, overlay_path
+ FROM tasks
+ WHERE contract_id = $1 AND (daemon_id IS NOT NULL OR overlay_path IS NOT NULL)
+ "#,
+ )
+ .bind(contract_id)
+ .fetch_all(pool)
+ .await
+}
+
// =============================================================================
// Contract Events
// =============================================================================
diff --git a/makima/src/server/handlers/contract_chat.rs b/makima/src/server/handlers/contract_chat.rs
index e2bd10e..101b257 100644
--- a/makima/src/server/handlers/contract_chat.rs
+++ b/makima/src/server/handlers/contract_chat.rs
@@ -2376,6 +2376,7 @@ async fn handle_contract_request(
description: contract_description,
contract_type: Some("specification".to_string()),
initial_phase: Some("research".to_string()),
+ autonomous_loop: None,
};
let contract = match repository::create_contract_for_owner(pool, owner_id, contract_req).await {
diff --git a/makima/src/server/handlers/contracts.rs b/makima/src/server/handlers/contracts.rs
index 3ce29e1..09f78e6 100644
--- a/makima/src/server/handlers/contracts.rs
+++ b/makima/src/server/handlers/contracts.rs
@@ -425,7 +425,7 @@ pub async fn update_contract(
match repository::update_contract_for_owner(pool, id, auth.owner_id, req).await {
Ok(Some(contract)) => {
- // If contract is completed, stop the supervisor task
+ // If contract is completed, stop the supervisor task and clean up worktrees
if contract.status == "completed" {
if let Some(supervisor_task_id) = contract.supervisor_task_id {
// Get the supervisor task to find its daemon
@@ -456,6 +456,14 @@ pub async fn update_contract(
}
}
}
+
+ // Clean up all task worktrees for this contract
+ let pool_clone = pool.clone();
+ let state_clone = state.clone();
+ let contract_id = id;
+ tokio::spawn(async move {
+ cleanup_contract_worktrees(&pool_clone, &state_clone, contract_id).await;
+ });
}
// Get summary with counts
@@ -548,6 +556,30 @@ pub async fn delete_contract(
.into_response();
};
+ // First, verify contract exists and belongs to owner
+ match repository::get_contract_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(_)) => {}
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Contract not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get contract {}: {}", id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ }
+
+ // Clean up all task worktrees BEFORE deleting the contract
+ // (because CASCADE delete will remove tasks from DB)
+ cleanup_contract_worktrees(pool, &state, id).await;
+
match repository::delete_contract_for_owner(pool, id, auth.owner_id).await {
Ok(true) => StatusCode::NO_CONTENT.into_response(),
Ok(false) => (
@@ -1318,3 +1350,85 @@ pub async fn get_events(
}
}
}
+
+// =============================================================================
+// Internal Helper Functions
+// =============================================================================
+
+/// Clean up all worktrees for tasks in a contract.
+///
+/// This is called when a contract is completed or deleted to remove
+/// all associated task worktrees from connected daemons.
+async fn cleanup_contract_worktrees(
+ pool: &sqlx::PgPool,
+ state: &SharedState,
+ contract_id: Uuid,
+) {
+ tracing::info!(
+ contract_id = %contract_id,
+ "Cleaning up worktrees for contract tasks"
+ );
+
+ // Get all tasks with worktree info for this contract
+ let tasks = match repository::list_contract_tasks_with_worktree_info(pool, contract_id).await {
+ Ok(tasks) => tasks,
+ Err(e) => {
+ tracing::error!(
+ contract_id = %contract_id,
+ error = %e,
+ "Failed to list tasks for worktree cleanup"
+ );
+ return;
+ }
+ };
+
+ if tasks.is_empty() {
+ tracing::debug!(
+ contract_id = %contract_id,
+ "No tasks with worktrees to clean up"
+ );
+ return;
+ }
+
+ tracing::info!(
+ contract_id = %contract_id,
+ task_count = tasks.len(),
+ "Found tasks with worktrees to clean up"
+ );
+
+ // Send cleanup command to each task's daemon
+ for task in tasks {
+ if let Some(daemon_id) = task.daemon_id {
+ let cmd = crate::server::state::DaemonCommand::CleanupWorktree {
+ task_id: task.id,
+ delete_branch: true, // Delete the branch when contract is done
+ };
+
+ match state.send_daemon_command(daemon_id, cmd).await {
+ Ok(()) => {
+ tracing::info!(
+ task_id = %task.id,
+ daemon_id = %daemon_id,
+ contract_id = %contract_id,
+ "Sent worktree cleanup command"
+ );
+ }
+ Err(e) => {
+ tracing::warn!(
+ task_id = %task.id,
+ daemon_id = %daemon_id,
+ contract_id = %contract_id,
+ error = %e,
+ "Failed to send worktree cleanup command (daemon may be offline)"
+ );
+ }
+ }
+ } else {
+ tracing::debug!(
+ task_id = %task.id,
+ contract_id = %contract_id,
+ "Task has no daemon assigned, skipping worktree cleanup"
+ );
+ }
+ }
+}
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index d0fa4d1..3add89f 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -18,7 +18,7 @@ use crate::db::repository;
use crate::server::auth::Authenticated;
use crate::server::handlers::mesh::{extract_auth, AuthSource};
use crate::server::messages::ApiError;
-use crate::server::state::{DaemonCommand, SharedState};
+use crate::server::state::{DaemonCommand, SharedState, TaskOutputNotification};
// =============================================================================
// Request/Response Types
@@ -1311,6 +1311,43 @@ pub async fn ask_question(
request.context.clone(),
);
+ // Broadcast question as task output entry for the task's chat
+ let question_data = serde_json::json!({
+ "question_id": question_id.to_string(),
+ "choices": request.choices,
+ "context": request.context,
+ });
+ state.broadcast_task_output(TaskOutputNotification {
+ task_id: supervisor_id,
+ owner_id: Some(owner_id),
+ message_type: "supervisor_question".to_string(),
+ content: request.question.clone(),
+ tool_name: None,
+ tool_input: Some(question_data.clone()),
+ is_error: None,
+ cost_usd: None,
+ duration_ms: None,
+ is_partial: false,
+ });
+
+ // Persist to database so it appears when reloading the page
+ // Use event_type "output" with messageType "supervisor_question" to match TaskOutputEntry format
+ if let Some(pool) = state.db_pool.as_ref() {
+ let event_data = serde_json::json!({
+ "messageType": "supervisor_question",
+ "content": request.question,
+ "toolInput": question_data,
+ });
+ let _ = repository::create_task_event(
+ pool,
+ supervisor_id,
+ "output",
+ None,
+ None,
+ Some(event_data),
+ ).await;
+ }
+
// Poll for response with timeout
let timeout_duration = std::time::Duration::from_secs(request.timeout_seconds.max(1) as u64);
let start = std::time::Instant::now();
diff --git a/makima/src/server/handlers/transcript_analysis.rs b/makima/src/server/handlers/transcript_analysis.rs
index 2c38eea..275905e 100644
--- a/makima/src/server/handlers/transcript_analysis.rs
+++ b/makima/src/server/handlers/transcript_analysis.rs
@@ -276,6 +276,7 @@ pub async fn create_contract_from_analysis(
description: contract_description,
contract_type: Some("specification".to_string()),
initial_phase: Some("research".to_string()),
+ autonomous_loop: None,
};
let contract = match repository::create_contract_for_owner(pool, auth.owner_id, contract_req).await {
diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs
index 495fc15..2a45d88 100644
--- a/makima/src/server/state.rs
+++ b/makima/src/server/state.rs
@@ -396,6 +396,15 @@ pub enum DaemonCommand {
task_id: Uuid,
},
+ /// Clean up a task's worktree (used when contract is completed/deleted)
+ CleanupWorktree {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ /// Whether to delete the associated branch
+ #[serde(rename = "deleteBranch")]
+ delete_branch: bool,
+ },
+
/// Error response
Error { code: String, message: String },
}