summaryrefslogtreecommitdiff
path: root/makima/src/daemon/task
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-15 11:57:43 +0000
committersoryu <soryu@soryu.co>2026-01-15 17:12:04 +0000
commit3efdab36ca61a6795454668881d5b925abe22bd3 (patch)
tree0fd96e527f45a3da31dfc073b07cd55ba284e550 /makima/src/daemon/task
parent63b2e347b2ecadc6a48062e10e0a7e19b6102631 (diff)
downloadsoryu-3efdab36ca61a6795454668881d5b925abe22bd3.tar.gz
soryu-3efdab36ca61a6795454668881d5b925abe22bd3.zip
Fixup: Add cleanup and isolation features to makima
Add comprehensive CLI documentation - Create makima/docs/CLI.md with complete command reference for: - makima server: HTTP/WebSocket server options - makima daemon: Worker daemon configuration - makima supervisor: Contract orchestration commands - makima contract: Task-contract interaction commands - Include configuration file examples and environment variables - Add usage workflows for common scenarios - Update makima/README.md with CLI overview and link to docs Add GitHub Actions release workflow for v0.1.0 Creates automated release workflow that: - Triggers on v* tag pushes - Builds binaries for Linux x86_64, macOS x86_64, and macOS ARM64 - Uses Rust nightly toolchain (required for edition 2024) - Packages binaries as .tar.gz archives - Creates GitHub release with installation instructions fix(ci): update macOS runner for x86_64 builds Replace deprecated macos-13 runner with macos-15-intel for x86_64-apple-darwin target. The macos-13 runner has been retired by GitHub Actions. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Add dismissing notifications and fix CLI task ID arg Add worktree cleanup when contracts complete or are deleted (#21) - Add CleanupWorktree daemon command variant - Handle CleanupWorktree in daemon task manager - Add cleanup_contract_worktrees helper function - Trigger cleanup when contract status becomes 'completed' - Trigger cleanup before contract deletion Add Autonomous Loop Mode for persistent task completion (#20) Implements the "Autonomous Loop Mode" feature inspired by Ralph for Claude Code. This enables tasks to automatically restart and continue working until they explicitly signal completion via a COMPLETION_GATE block. Key features: - Exit confirmation via COMPLETION_GATE: Tasks must output a <COMPLETION_GATE> block with `ready: true` to signal completion. Without this, the task auto-restarts using `claude --continue` to resume the conversation. - Circuit breaker: Prevents infinite loops by detecting: * Maximum iteration limit (default: 10) * No progress for N consecutive iterations (default: 3) * Same error repeated N times (default: 5) - spawn_continue: New ProcessManager method to spawn Claude with the `--continue` flag, resuming from the previous session state. Toggle: Enable via `autonomous_loop` flag on contracts. When set, all tasks spawned for that contract will run in autonomous loop mode. Files changed: - completion_gate.rs: COMPLETION_GATE parser and CircuitBreaker logic - claude.rs: spawn_continue() for --continue mode spawning - manager.rs: Autonomous loop iteration logic in run_task() - protocol.rs: autonomousLoop field in DaemonCommand::SpawnTask - models.rs/repository.rs: autonomous_loop column on contracts/tasks - Migration: Adds autonomous_loop columns to contracts and tasks tables Add get-task and output commands to supervisor CLI (#24) Add two new supervisor subcommands: - `makima supervisor task <task_id>` - Get individual task details - `makima supervisor output <task_id>` - Get task output/claude log This allows supervisors to fetch task details and claude output directly from the CLI instead of using curl to call the task API. Add optional bubblewrap sandboxing for Claude processes (#23) Add --bubblewrap flag and process.bubblewrap config section to enable running Claude Code in a bubblewrap sandbox for process isolation. When enabled, claude processes run with filesystem restrictions: - Root filesystem mounted read-only - Working directory (worktree) mounted read-write - Fresh /dev, /proc, /tmp - Network access preserved for API calls Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Diffstat (limited to 'makima/src/daemon/task')
-rw-r--r--makima/src/daemon/task/completion_gate.rs402
-rw-r--r--makima/src/daemon/task/manager.rs500
-rw-r--r--makima/src/daemon/task/mod.rs2
-rw-r--r--makima/src/daemon/task/state.rs2
4 files changed, 789 insertions, 117 deletions
diff --git a/makima/src/daemon/task/completion_gate.rs b/makima/src/daemon/task/completion_gate.rs
new file mode 100644
index 0000000..69b7c6a
--- /dev/null
+++ b/makima/src/daemon/task/completion_gate.rs
@@ -0,0 +1,402 @@
+//! Completion gate parsing for autonomous loop mode.
+//!
+//! This module parses COMPLETION_GATE blocks from Claude's output to determine
+//! if the task is truly complete. The format is inspired by Ralph's autonomous
+//! development framework.
+//!
+//! Format:
+//! ```
+//! <COMPLETION_GATE>
+//! ready: true|false
+//! reason: "explanation of completion status"
+//! progress: "summary of what was accomplished"
+//! blockers: ["list", "of", "blockers"] (optional, only when ready: false)
+//! </COMPLETION_GATE>
+//! ```
+
+use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
+
+/// Represents a parsed COMPLETION_GATE block from Claude's output.
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+pub struct CompletionGate {
+ /// Whether the task is ready to complete.
+ pub ready: bool,
+ /// Explanation of the completion status.
+ pub reason: Option<String>,
+ /// Summary of what was accomplished.
+ pub progress: Option<String>,
+ /// List of blockers if not ready.
+ pub blockers: Option<Vec<String>>,
+ /// Any additional fields that were parsed.
+ #[serde(flatten)]
+ pub extra: HashMap<String, serde_json::Value>,
+}
+
+impl CompletionGate {
+ /// Parse a COMPLETION_GATE block from text output.
+ ///
+ /// Returns None if no valid COMPLETION_GATE is found.
+ pub fn parse(text: &str) -> Option<Self> {
+ // Find the COMPLETION_GATE block
+ let start_tag = "<COMPLETION_GATE>";
+ let end_tag = "</COMPLETION_GATE>";
+
+ let start_idx = text.find(start_tag)?;
+ let end_idx = text.find(end_tag)?;
+
+ if end_idx <= start_idx {
+ return None;
+ }
+
+ let content = &text[start_idx + start_tag.len()..end_idx];
+ let content = content.trim();
+
+ // Try to parse as JSON first
+ if content.starts_with('{') {
+ if let Ok(gate) = serde_json::from_str::<CompletionGate>(content) {
+ return Some(gate);
+ }
+ }
+
+ // Fall back to YAML-like parsing
+ Self::parse_yaml_like(content)
+ }
+
+ /// Parse a YAML-like format (key: value lines).
+ fn parse_yaml_like(content: &str) -> Option<Self> {
+ let mut gate = CompletionGate::default();
+
+ for line in content.lines() {
+ let line = line.trim();
+ if line.is_empty() {
+ continue;
+ }
+
+ if let Some((key, value)) = line.split_once(':') {
+ let key = key.trim().to_lowercase();
+ let value = value.trim();
+
+ match key.as_str() {
+ "ready" => {
+ gate.ready = value.to_lowercase() == "true"
+ || value == "yes"
+ || value == "1";
+ }
+ "reason" => {
+ gate.reason = Some(Self::unquote(value));
+ }
+ "progress" => {
+ gate.progress = Some(Self::unquote(value));
+ }
+ "blockers" => {
+ // Try to parse as JSON array
+ if let Ok(blockers) = serde_json::from_str::<Vec<String>>(value) {
+ gate.blockers = Some(blockers);
+ } else {
+ // Single blocker as string
+ gate.blockers = Some(vec![Self::unquote(value)]);
+ }
+ }
+ _ => {
+ // Store unknown fields
+ if let Ok(json_val) = serde_json::from_str(value) {
+ gate.extra.insert(key, json_val);
+ } else {
+ gate.extra.insert(
+ key,
+ serde_json::Value::String(Self::unquote(value)),
+ );
+ }
+ }
+ }
+ }
+ }
+
+ Some(gate)
+ }
+
+ /// Remove surrounding quotes from a string value.
+ fn unquote(s: &str) -> String {
+ let s = s.trim();
+ if (s.starts_with('"') && s.ends_with('"'))
+ || (s.starts_with('\'') && s.ends_with('\''))
+ {
+ s[1..s.len() - 1].to_string()
+ } else {
+ s.to_string()
+ }
+ }
+
+ /// Find all COMPLETION_GATE blocks in the output and return the last one.
+ ///
+ /// This is useful when Claude produces multiple completion gates during
+ /// a long-running task, and we want to use the final status.
+ pub fn parse_last(text: &str) -> Option<Self> {
+ let end_tag = "</COMPLETION_GATE>";
+ let mut last_gate = None;
+ let mut search_start = 0;
+
+ while let Some(end_idx) = text[search_start..].find(end_tag) {
+ let absolute_end = search_start + end_idx + end_tag.len();
+ if let Some(gate) = Self::parse(&text[..absolute_end]) {
+ last_gate = Some(gate);
+ }
+ search_start = absolute_end;
+ }
+
+ last_gate
+ }
+}
+
+/// State tracking for the circuit breaker in autonomous loop mode.
+#[derive(Debug, Clone, Default)]
+pub struct CircuitBreaker {
+ /// Number of consecutive runs without file changes.
+ pub runs_without_changes: u32,
+ /// Threshold for opening circuit due to no changes (default: 3).
+ pub no_change_threshold: u32,
+ /// Number of consecutive runs with the same error.
+ pub same_error_count: u32,
+ /// Threshold for opening circuit due to same error (default: 5).
+ pub same_error_threshold: u32,
+ /// Last error message seen.
+ pub last_error: Option<String>,
+ /// Total number of loop iterations.
+ pub iteration_count: u32,
+ /// Maximum allowed iterations (default: 10).
+ pub max_iterations: u32,
+ /// Whether the circuit is open (task should stop).
+ pub is_open: bool,
+ /// Reason why circuit was opened.
+ pub open_reason: Option<String>,
+}
+
+impl CircuitBreaker {
+ /// Create a new circuit breaker with default thresholds.
+ pub fn new() -> Self {
+ Self {
+ no_change_threshold: 3,
+ same_error_threshold: 5,
+ max_iterations: 10,
+ ..Default::default()
+ }
+ }
+
+ /// Create with custom thresholds.
+ pub fn with_thresholds(no_change: u32, same_error: u32, max_iterations: u32) -> Self {
+ Self {
+ no_change_threshold: no_change,
+ same_error_threshold: same_error,
+ max_iterations,
+ ..Default::default()
+ }
+ }
+
+ /// Record a new iteration. Returns true if circuit should remain closed.
+ pub fn record_iteration(&mut self, had_changes: bool, error: Option<&str>) -> bool {
+ self.iteration_count += 1;
+
+ // Check max iterations
+ if self.iteration_count >= self.max_iterations {
+ self.is_open = true;
+ self.open_reason = Some(format!(
+ "Maximum iterations ({}) reached",
+ self.max_iterations
+ ));
+ return false;
+ }
+
+ // Track file changes
+ if had_changes {
+ self.runs_without_changes = 0;
+ } else {
+ self.runs_without_changes += 1;
+ if self.runs_without_changes >= self.no_change_threshold {
+ self.is_open = true;
+ self.open_reason = Some(format!(
+ "No file changes for {} consecutive runs",
+ self.runs_without_changes
+ ));
+ return false;
+ }
+ }
+
+ // Track errors
+ match (error, &self.last_error) {
+ (Some(err), Some(last)) if err == last => {
+ self.same_error_count += 1;
+ if self.same_error_count >= self.same_error_threshold {
+ self.is_open = true;
+ self.open_reason = Some(format!(
+ "Same error repeated {} times: {}",
+ self.same_error_count, err
+ ));
+ return false;
+ }
+ }
+ (Some(err), _) => {
+ self.last_error = Some(err.to_string());
+ self.same_error_count = 1;
+ }
+ (None, _) => {
+ self.same_error_count = 0;
+ self.last_error = None;
+ }
+ }
+
+ true // Circuit remains closed
+ }
+
+ /// Check if the circuit breaker is open.
+ pub fn should_stop(&self) -> bool {
+ self.is_open
+ }
+
+ /// Reset the circuit breaker.
+ pub fn reset(&mut self) {
+ *self = Self::with_thresholds(
+ self.no_change_threshold,
+ self.same_error_threshold,
+ self.max_iterations,
+ );
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_parse_yaml_format() {
+ let text = r#"
+Some output before
+<COMPLETION_GATE>
+ready: true
+reason: "All tests pass"
+progress: "Implemented feature X"
+</COMPLETION_GATE>
+More output after
+"#;
+
+ let gate = CompletionGate::parse(text).unwrap();
+ assert!(gate.ready);
+ assert_eq!(gate.reason.as_deref(), Some("All tests pass"));
+ assert_eq!(gate.progress.as_deref(), Some("Implemented feature X"));
+ }
+
+ #[test]
+ fn test_parse_not_ready() {
+ let text = r#"
+<COMPLETION_GATE>
+ready: false
+reason: "Tests are failing"
+blockers: ["Fix test_foo", "Fix test_bar"]
+</COMPLETION_GATE>
+"#;
+
+ let gate = CompletionGate::parse(text).unwrap();
+ assert!(!gate.ready);
+ assert_eq!(gate.reason.as_deref(), Some("Tests are failing"));
+ assert_eq!(
+ gate.blockers,
+ Some(vec!["Fix test_foo".to_string(), "Fix test_bar".to_string()])
+ );
+ }
+
+ #[test]
+ fn test_parse_json_format() {
+ let text = r#"
+<COMPLETION_GATE>
+{
+ "ready": true,
+ "reason": "Done",
+ "progress": "All good"
+}
+</COMPLETION_GATE>
+"#;
+
+ let gate = CompletionGate::parse(text).unwrap();
+ assert!(gate.ready);
+ assert_eq!(gate.reason.as_deref(), Some("Done"));
+ }
+
+ #[test]
+ fn test_parse_last_gate() {
+ let text = r#"
+<COMPLETION_GATE>
+ready: false
+reason: "Still working"
+</COMPLETION_GATE>
+Some more work...
+<COMPLETION_GATE>
+ready: true
+reason: "Finally done"
+</COMPLETION_GATE>
+"#;
+
+ let gate = CompletionGate::parse_last(text).unwrap();
+ assert!(gate.ready);
+ assert_eq!(gate.reason.as_deref(), Some("Finally done"));
+ }
+
+ #[test]
+ fn test_no_gate() {
+ let text = "No completion gate here";
+ assert!(CompletionGate::parse(text).is_none());
+ }
+
+ #[test]
+ fn test_circuit_breaker_max_iterations() {
+ let mut cb = CircuitBreaker::with_thresholds(3, 5, 5);
+ for _ in 0..4 {
+ assert!(cb.record_iteration(true, None));
+ }
+ assert!(!cb.record_iteration(true, None)); // 5th iteration should trip
+ assert!(cb.is_open);
+ assert!(cb.open_reason.as_ref().unwrap().contains("Maximum iterations"));
+ }
+
+ #[test]
+ fn test_circuit_breaker_no_changes() {
+ let mut cb = CircuitBreaker::with_thresholds(3, 5, 10);
+ assert!(cb.record_iteration(false, None)); // 1st no change
+ assert!(cb.record_iteration(false, None)); // 2nd no change
+ assert!(!cb.record_iteration(false, None)); // 3rd no change - trips
+ assert!(cb.is_open);
+ assert!(cb.open_reason.as_ref().unwrap().contains("No file changes"));
+ }
+
+ #[test]
+ fn test_circuit_breaker_same_error() {
+ let mut cb = CircuitBreaker::with_thresholds(10, 3, 10);
+ let err = "Test failed";
+ assert!(cb.record_iteration(true, Some(err)));
+ assert!(cb.record_iteration(true, Some(err)));
+ assert!(!cb.record_iteration(true, Some(err))); // 3rd same error - trips
+ assert!(cb.is_open);
+ assert!(cb.open_reason.as_ref().unwrap().contains("Same error"));
+ }
+
+ #[test]
+ fn test_circuit_breaker_different_errors_ok() {
+ let mut cb = CircuitBreaker::with_thresholds(10, 3, 10);
+ assert!(cb.record_iteration(true, Some("error 1")));
+ assert!(cb.record_iteration(true, Some("error 2")));
+ assert!(cb.record_iteration(true, Some("error 3")));
+ // Different errors don't trip the circuit
+ assert!(!cb.is_open);
+ }
+
+ #[test]
+ fn test_circuit_breaker_changes_reset() {
+ let mut cb = CircuitBreaker::with_thresholds(3, 5, 10);
+ assert!(cb.record_iteration(false, None)); // 1 no change
+ assert!(cb.record_iteration(false, None)); // 2 no changes
+ assert!(cb.record_iteration(true, None)); // has changes - resets
+ assert!(cb.record_iteration(false, None)); // 1 no change again
+ assert!(cb.record_iteration(false, None)); // 2 no changes
+ // Still shouldn't trip because we had a change in between
+ assert!(!cb.is_open);
+ }
+}
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs
index 4ccedb2..75c884b 100644
--- a/makima/src/daemon/task/manager.rs
+++ b/makima/src/daemon/task/manager.rs
@@ -12,6 +12,7 @@ use uuid::Uuid;
use std::collections::HashSet;
+use super::completion_gate::{CircuitBreaker, CompletionGate};
use super::state::TaskState;
use crate::daemon::error::{DaemonError, TaskError, TaskResult};
use crate::daemon::process::{ClaudeInputMessage, ProcessManager};
@@ -944,6 +945,8 @@ pub struct ManagedTask {
pub copy_files: Option<Vec<String>>,
/// Contract ID if this task is associated with a contract.
pub contract_id: Option<Uuid>,
+ /// Whether to run in autonomous loop mode.
+ pub autonomous_loop: bool,
/// Time task was created.
pub created_at: Instant,
/// Time task started running.
@@ -973,6 +976,8 @@ pub struct TaskConfig {
pub enable_permissions: bool,
/// Disable verbose output.
pub disable_verbose: bool,
+ /// Bubblewrap sandbox configuration.
+ pub bubblewrap: Option<crate::daemon::config::BubblewrapConfig>,
}
impl Default for TaskConfig {
@@ -986,6 +991,7 @@ impl Default for TaskConfig {
claude_pre_args: Vec::new(),
enable_permissions: false,
disable_verbose: false,
+ bubblewrap: None,
}
}
}
@@ -1027,7 +1033,8 @@ impl TaskManager {
.with_pre_args(config.claude_pre_args.clone())
.with_permissions_enabled(config.enable_permissions)
.with_verbose_disabled(config.disable_verbose)
- .with_env(config.env_vars.clone()),
+ .with_env(config.env_vars.clone())
+ .with_bubblewrap(config.bubblewrap.clone()),
);
let temp_manager = Arc::new(TempManager::new());
@@ -1150,6 +1157,7 @@ impl TaskManager {
copy_files,
contract_id,
is_supervisor,
+ autonomous_loop,
} => {
tracing::info!(
task_id = %task_id,
@@ -1161,6 +1169,7 @@ impl TaskManager {
depth = depth,
is_orchestrator = is_orchestrator,
is_supervisor = is_supervisor,
+ autonomous_loop = autonomous_loop,
target_repo_path = ?target_repo_path,
completion_action = ?completion_action,
continue_from_task_id = ?continue_from_task_id,
@@ -1173,7 +1182,7 @@ impl TaskManager {
task_id, task_name, plan, repo_url, base_branch, target_branch,
parent_task_id, depth, is_orchestrator, is_supervisor,
target_repo_path, completion_action, continue_from_task_id,
- copy_files, contract_id
+ copy_files, contract_id, autonomous_loop
).await?;
}
DaemonCommand::PauseTask { task_id } => {
@@ -1252,6 +1261,7 @@ impl TaskManager {
None, // continue_from_task_id
None, // copy_files
contract_id,
+ false, // autonomous_loop - supervisors don't use this
).await {
tracing::error!(
task_id = %task_id,
@@ -1421,6 +1431,17 @@ impl TaskManager {
tracing::info!(task_id = %task_id, "Getting task diff");
self.handle_get_task_diff(task_id).await?;
}
+ DaemonCommand::CleanupWorktree {
+ task_id,
+ delete_branch,
+ } => {
+ tracing::info!(
+ task_id = %task_id,
+ delete_branch = delete_branch,
+ "Cleaning up worktree"
+ );
+ self.handle_cleanup_worktree(task_id, delete_branch).await?;
+ }
}
Ok(())
}
@@ -1444,6 +1465,7 @@ impl TaskManager {
continue_from_task_id: Option<Uuid>,
copy_files: Option<Vec<String>>,
contract_id: Option<Uuid>,
+ autonomous_loop: bool,
) -> TaskResult<()> {
tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, "=== SPAWN_TASK START ===");
@@ -1496,6 +1518,7 @@ impl TaskManager {
continue_from_task_id,
copy_files: copy_files.clone(),
contract_id,
+ autonomous_loop,
created_at: Instant::now(),
started_at: None,
completed_at: None,
@@ -1519,7 +1542,7 @@ impl TaskManager {
if let Err(e) = inner.run_task(
task_id, task_name, plan, repo_url, base_branch, target_branch,
is_orchestrator, is_supervisor, target_repo_path, completion_action,
- continue_from_task_id, copy_files, contract_id
+ continue_from_task_id, copy_files, contract_id, autonomous_loop
).await {
tracing::error!(task_id = %task_id, error = %e, "Task execution failed");
inner.mark_failed(task_id, &e.to_string()).await;
@@ -2046,6 +2069,76 @@ impl TaskManager {
Ok(())
}
+ /// Handle CleanupWorktree command.
+ ///
+ /// Removes a task's worktree and optionally its branch.
+ /// Used when a contract is completed or deleted to clean up associated task worktrees.
+ async fn handle_cleanup_worktree(
+ &self,
+ task_id: Uuid,
+ delete_branch: bool,
+ ) -> Result<(), DaemonError> {
+ // Try to get the worktree path, but don't fail if not found
+ let worktree_result = self.get_task_worktree_path(task_id).await;
+
+ let (success, message) = match worktree_result {
+ Ok(worktree_path) => {
+ // Remove the worktree
+ match self.worktree_manager.remove_worktree(&worktree_path, delete_branch).await {
+ Ok(()) => {
+ tracing::info!(
+ task_id = %task_id,
+ worktree_path = %worktree_path.display(),
+ delete_branch = delete_branch,
+ "Worktree cleaned up successfully"
+ );
+
+ // Also remove task from in-memory tracking
+ self.tasks.write().await.remove(&task_id);
+ self.task_inputs.write().await.remove(&task_id);
+ self.merge_trackers.write().await.remove(&task_id);
+ self.active_pids.write().await.remove(&task_id);
+
+ (true, format!("Worktree cleaned up: {}", worktree_path.display()))
+ }
+ Err(e) => {
+ tracing::warn!(
+ task_id = %task_id,
+ worktree_path = %worktree_path.display(),
+ error = %e,
+ "Failed to remove worktree"
+ );
+ (false, format!("Failed to remove worktree: {}", e))
+ }
+ }
+ }
+ Err(_) => {
+ // Worktree not found - this is OK, it may have already been cleaned up
+ tracing::debug!(
+ task_id = %task_id,
+ "No worktree found for task, may have already been cleaned up"
+ );
+
+ // Still remove from in-memory tracking
+ self.tasks.write().await.remove(&task_id);
+ self.task_inputs.write().await.remove(&task_id);
+ self.merge_trackers.write().await.remove(&task_id);
+ self.active_pids.write().await.remove(&task_id);
+
+ (true, "No worktree found, task tracking cleaned up".to_string())
+ }
+ };
+
+ // Send result back to server
+ let msg = DaemonMessage::CleanupWorktreeResult {
+ task_id,
+ success,
+ message,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ Ok(())
+ }
+
/// Handle ReadRepoFile command.
///
/// Reads a file from a repository on the daemon's filesystem and sends
@@ -2436,6 +2529,7 @@ impl TaskManagerInner {
continue_from_task_id: Option<Uuid>,
copy_files: Option<Vec<String>>,
contract_id: Option<Uuid>,
+ autonomous_loop: bool,
) -> Result<(), DaemonError> {
tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, "=== RUN_TASK START ===");
@@ -2908,6 +3002,9 @@ impl TaskManagerInner {
);
let _ = self.ws_tx.send(msg).await;
+ // Clone extra_env for use in autonomous loop iterations
+ let extra_env_for_loop = extra_env.clone();
+
tracing::debug!(task_id = %task_id, has_system_prompt = system_prompt.is_some(), "Calling process_manager.spawn()...");
let mut process = self.process_manager
.spawn_with_system_prompt(&working_dir, &full_plan, extra_env, system_prompt.as_deref())
@@ -2934,7 +3031,7 @@ impl TaskManagerInner {
// Get stdin handle for input forwarding and completion signaling
let stdin_handle = process.stdin_handle();
- let stdin_handle_for_completion = stdin_handle.clone();
+ let mut stdin_handle_for_completion = stdin_handle.clone();
tracing::info!(task_id = %task_id, "Setting up stdin forwarder for task input (JSON protocol)");
tokio::spawn(async move {
@@ -2998,142 +3095,311 @@ impl TaskManagerInner {
let daemon_hostname = hostname::get().ok().and_then(|h| h.into_string().ok());
let mut auth_error_handled = false;
- let mut output_count = 0u64;
- let mut output_bytes = 0usize;
- let startup_timeout = tokio::time::Duration::from_secs(30);
- let mut startup_check = tokio::time::interval(tokio::time::Duration::from_secs(5));
- startup_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
- let startup_deadline = tokio::time::Instant::now() + startup_timeout;
+ // For autonomous loop mode: track accumulated output for COMPLETION_GATE detection
+ let mut accumulated_output = String::new();
+ let mut circuit_breaker = CircuitBreaker::new();
+ let mut iteration_count = 0u32;
+ let mut final_exit_code: i64 = -1; // Track the final exit code across iterations
- loop {
- tokio::select! {
- maybe_line = process.next_output() => {
- match maybe_line {
- Some(line) => {
- output_count += 1;
- output_bytes += line.content.len();
-
- if output_count == 1 {
- tracing::info!(task_id = %task_id, "Received first output line from Claude");
- }
- if output_count % 100 == 0 {
- tracing::debug!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output progress");
- }
+ // Autonomous loop: we may run multiple iterations
+ 'autonomous_loop: loop {
+ iteration_count += 1;
- // Log output details for debugging
- tracing::trace!(
- task_id = %task_id,
- line_num = output_count,
- content_len = line.content.len(),
- is_stdout = line.is_stdout,
- json_type = ?line.json_type,
- "Forwarding output to WebSocket"
- );
+ if autonomous_loop && iteration_count > 1 {
+ tracing::info!(
+ task_id = %task_id,
+ iteration = iteration_count,
+ "Starting autonomous loop iteration"
+ );
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("\n[Autonomous Loop] Starting iteration {} (--continue mode)\n", iteration_count),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ // For subsequent iterations, spawn with --continue flag
+ let continuation_prompt = "Continue working on the task. Review your previous output and progress. When you are completely done, output a COMPLETION_GATE block with ready: true.";
+
+ process = self.process_manager
+ .spawn_continue(&working_dir, continuation_prompt, extra_env_for_loop.clone(), system_prompt.as_deref())
+ .await
+ .map_err(|e| {
+ tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process for continuation");
+ DaemonError::Task(TaskError::SetupFailed(e.to_string()))
+ })?;
+
+ // Register the new process PID
+ if let Some(pid) = process.id() {
+ self.active_pids.write().await.insert(task_id, pid);
+ tracing::info!(task_id = %task_id, pid = pid, iteration = iteration_count, "Claude continue process spawned");
+ }
+
+ // Reset stdin handle for the new process
+ stdin_handle_for_completion = process.stdin_handle();
+ }
+
+ // Clear output for this iteration (we'll check for COMPLETION_GATE in the new output)
+ let mut iteration_output = String::new();
+
+ let mut output_count = 0u64;
+ let mut output_bytes = 0usize;
+ let startup_timeout = tokio::time::Duration::from_secs(30);
+ let mut startup_check = tokio::time::interval(tokio::time::Duration::from_secs(5));
+ startup_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
+ let startup_deadline = tokio::time::Instant::now() + startup_timeout;
- // Check if this is a "result" message indicating task completion
- // With --input-format=stream-json, Claude waits for more input after completion
- // We close stdin to signal EOF and let the process exit
- if line.json_type.as_deref() == Some("result") {
- tracing::info!(task_id = %task_id, "Received result message, closing stdin to signal completion");
- let mut stdin_guard = stdin_handle_for_completion.lock().await;
- if let Some(mut stdin) = stdin_guard.take() {
- let _ = stdin.shutdown().await;
+ loop {
+ tokio::select! {
+ maybe_line = process.next_output() => {
+ match maybe_line {
+ Some(line) => {
+ output_count += 1;
+ output_bytes += line.content.len();
+
+ // Accumulate output for COMPLETION_GATE detection in autonomous loop mode
+ if autonomous_loop {
+ iteration_output.push_str(&line.content);
+ iteration_output.push('\n');
}
- }
- // Check for OAuth auth error before sending output
- let content_for_auth_check = line.content.clone();
- let json_type_for_auth_check = line.json_type.clone();
- let is_stdout_for_auth_check = line.is_stdout;
+ if output_count == 1 {
+ tracing::info!(task_id = %task_id, "Received first output line from Claude");
+ }
+ if output_count % 100 == 0 {
+ tracing::debug!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output progress");
+ }
- let msg = DaemonMessage::task_output(task_id, line.content, false);
- if ws_tx.send(msg).await.is_err() {
- tracing::warn!(task_id = %task_id, "Failed to send output, channel closed");
- break;
- }
+ // Log output details for debugging
+ tracing::trace!(
+ task_id = %task_id,
+ line_num = output_count,
+ content_len = line.content.len(),
+ is_stdout = line.is_stdout,
+ json_type = ?line.json_type,
+ "Forwarding output to WebSocket"
+ );
- // Detect OAuth token expiration and trigger remote login flow
- if !auth_error_handled && is_oauth_auth_error(&content_for_auth_check, json_type_for_auth_check.as_deref(), is_stdout_for_auth_check) {
- auth_error_handled = true;
- tracing::warn!(task_id = %task_id, "OAuth authentication error detected, initiating remote login flow");
-
- // Spawn claude setup-token to get login URL
- if let Some(login_url) = get_oauth_login_url(&claude_command).await {
- tracing::info!(task_id = %task_id, login_url = %login_url, "Got OAuth login URL");
- let auth_msg = DaemonMessage::AuthenticationRequired {
- task_id: Some(task_id),
- login_url,
- hostname: daemon_hostname.clone(),
- };
- if ws_tx.send(auth_msg).await.is_err() {
- tracing::warn!(task_id = %task_id, "Failed to send auth required message");
+ // Check if this is a "result" message indicating task completion
+ // With --input-format=stream-json, Claude waits for more input after completion
+ // We close stdin to signal EOF and let the process exit
+ if line.json_type.as_deref() == Some("result") {
+ tracing::info!(task_id = %task_id, "Received result message, closing stdin to signal completion");
+ let mut stdin_guard = stdin_handle_for_completion.lock().await;
+ if let Some(mut stdin) = stdin_guard.take() {
+ let _ = stdin.shutdown().await;
+ }
+ }
+
+ // Check for OAuth auth error before sending output
+ let content_for_auth_check = line.content.clone();
+ let json_type_for_auth_check = line.json_type.clone();
+ let is_stdout_for_auth_check = line.is_stdout;
+
+ let msg = DaemonMessage::task_output(task_id, line.content, false);
+ if ws_tx.send(msg).await.is_err() {
+ tracing::warn!(task_id = %task_id, "Failed to send output, channel closed");
+ break;
+ }
+
+ // Detect OAuth token expiration and trigger remote login flow
+ if !auth_error_handled && is_oauth_auth_error(&content_for_auth_check, json_type_for_auth_check.as_deref(), is_stdout_for_auth_check) {
+ auth_error_handled = true;
+ tracing::warn!(task_id = %task_id, "OAuth authentication error detected, initiating remote login flow");
+
+ // Spawn claude setup-token to get login URL
+ if let Some(login_url) = get_oauth_login_url(&claude_command).await {
+ tracing::info!(task_id = %task_id, login_url = %login_url, "Got OAuth login URL");
+ let auth_msg = DaemonMessage::AuthenticationRequired {
+ task_id: Some(task_id),
+ login_url,
+ hostname: daemon_hostname.clone(),
+ };
+ if ws_tx.send(auth_msg).await.is_err() {
+ tracing::warn!(task_id = %task_id, "Failed to send auth required message");
+ }
+ } else {
+ tracing::error!(task_id = %task_id, "Failed to get OAuth login URL from setup-token");
+ let fallback_msg = DaemonMessage::task_output(
+ task_id,
+ format!("Authentication required on daemon{}. Please run 'claude /login' on the daemon machine.\n",
+ daemon_hostname.as_ref().map(|h| format!(" ({})", h)).unwrap_or_default()),
+ false,
+ );
+ let _ = ws_tx.send(fallback_msg).await;
}
- } else {
- tracing::error!(task_id = %task_id, "Failed to get OAuth login URL from setup-token");
- let fallback_msg = DaemonMessage::task_output(
- task_id,
- format!("Authentication required on daemon{}. Please run 'claude /login' on the daemon machine.\n",
- daemon_hostname.as_ref().map(|h| format!(" ({})", h)).unwrap_or_default()),
- false,
- );
- let _ = ws_tx.send(fallback_msg).await;
}
}
- }
- None => {
- tracing::info!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output stream ended");
- break;
+ None => {
+ tracing::info!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output stream ended");
+ break;
+ }
}
}
- }
- _ = startup_check.tick(), if output_count == 0 => {
- // Check if process is still alive
- match process.try_wait() {
- Ok(Some(exit_code)) => {
- tracing::error!(task_id = %task_id, exit_code = exit_code, "Claude process exited before producing output!");
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Error: Claude process exited unexpectedly with code {}\n", exit_code),
- false,
- );
- let _ = ws_tx.send(msg).await;
- break;
- }
- Ok(None) => {
- // Still running but no output
- if tokio::time::Instant::now() > startup_deadline {
- tracing::warn!(task_id = %task_id, "Claude process not producing output after 30s - may be stuck");
+ _ = startup_check.tick(), if output_count == 0 => {
+ // Check if process is still alive
+ match process.try_wait() {
+ Ok(Some(exit_code)) => {
+ tracing::error!(task_id = %task_id, exit_code = exit_code, "Claude process exited before producing output!");
let msg = DaemonMessage::task_output(
task_id,
- "Warning: Claude Code is taking longer than expected to start. It may be waiting for authentication or network access.\n".to_string(),
+ format!("Error: Claude process exited unexpectedly with code {}\n", exit_code),
false,
);
let _ = ws_tx.send(msg).await;
- } else {
- tracing::debug!(task_id = %task_id, "Claude process still running, waiting for output...");
+ break;
+ }
+ Ok(None) => {
+ // Still running but no output
+ if tokio::time::Instant::now() > startup_deadline {
+ tracing::warn!(task_id = %task_id, "Claude process not producing output after 30s - may be stuck");
+ let msg = DaemonMessage::task_output(
+ task_id,
+ "Warning: Claude Code is taking longer than expected to start. It may be waiting for authentication or network access.\n".to_string(),
+ false,
+ );
+ let _ = ws_tx.send(msg).await;
+ } else {
+ tracing::debug!(task_id = %task_id, "Claude process still running, waiting for output...");
+ }
+ }
+ Err(e) => {
+ tracing::error!(task_id = %task_id, error = %e, "Failed to check Claude process status");
}
- }
- Err(e) => {
- tracing::error!(task_id = %task_id, error = %e, "Failed to check Claude process status");
}
}
}
}
- }
- // Wait for process to exit
- let exit_code = process.wait().await.unwrap_or(-1);
+ // Wait for process to exit
+ let exit_code = process.wait().await.unwrap_or(-1);
+ final_exit_code = exit_code; // Store for use after the loop
+
+ // Unregister the process PID (process has exited)
+ self.active_pids.write().await.remove(&task_id);
+ tracing::debug!(task_id = %task_id, "Unregistered process PID");
+
+ // Clean up input channel for this task
+ self.task_inputs.write().await.remove(&task_id);
+ tracing::debug!(task_id = %task_id, "Removed task input channel");
- // Unregister the process PID (process has exited)
- self.active_pids.write().await.remove(&task_id);
- tracing::debug!(task_id = %task_id, "Unregistered process PID");
+ // Accumulate this iteration's output
+ accumulated_output.push_str(&iteration_output);
- // Clean up input channel for this task
- self.task_inputs.write().await.remove(&task_id);
- tracing::debug!(task_id = %task_id, "Removed task input channel");
+ // === AUTONOMOUS LOOP LOGIC ===
+ // Check if we should continue or complete
+ if autonomous_loop && exit_code == 0 {
+ // Check for COMPLETION_GATE in the output
+ let completion_gate = CompletionGate::parse_last(&iteration_output);
+
+ match completion_gate {
+ Some(gate) if gate.ready => {
+ tracing::info!(
+ task_id = %task_id,
+ iteration = iteration_count,
+ reason = ?gate.reason,
+ "COMPLETION_GATE ready=true detected, task complete"
+ );
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("\n[Autonomous Loop] Task completed after {} iteration(s). Reason: {}\n",
+ iteration_count,
+ gate.reason.unwrap_or_else(|| "Task complete".to_string())
+ ),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+ break 'autonomous_loop;
+ }
+ Some(gate) => {
+ // COMPLETION_GATE found but not ready
+ tracing::info!(
+ task_id = %task_id,
+ iteration = iteration_count,
+ reason = ?gate.reason,
+ blockers = ?gate.blockers,
+ "COMPLETION_GATE ready=false, will continue"
+ );
+
+ // Check circuit breaker
+ // For now, we consider output_bytes > 0 as "progress"
+ let had_progress = output_bytes > 0;
+ let error = gate.blockers.as_ref().and_then(|b| b.first()).map(|s| s.as_str());
+
+ if !circuit_breaker.record_iteration(had_progress, error) {
+ // Circuit breaker tripped
+ tracing::warn!(
+ task_id = %task_id,
+ reason = ?circuit_breaker.open_reason,
+ "Circuit breaker tripped, stopping autonomous loop"
+ );
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("\n[Autonomous Loop] Circuit breaker tripped: {}\n",
+ circuit_breaker.open_reason.as_deref().unwrap_or("Unknown reason")
+ ),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+ break 'autonomous_loop;
+ }
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("\n[Autonomous Loop] COMPLETION_GATE ready=false. Reason: {}. Restarting...\n",
+ gate.reason.unwrap_or_else(|| "Not complete".to_string())
+ ),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ // Continue to next iteration
+ continue 'autonomous_loop;
+ }
+ None => {
+ // No COMPLETION_GATE found - check circuit breaker and continue
+ tracing::info!(
+ task_id = %task_id,
+ iteration = iteration_count,
+ "No COMPLETION_GATE found, will restart with continuation prompt"
+ );
+
+ let had_progress = output_bytes > 0;
+ if !circuit_breaker.record_iteration(had_progress, None) {
+ tracing::warn!(
+ task_id = %task_id,
+ reason = ?circuit_breaker.open_reason,
+ "Circuit breaker tripped (no COMPLETION_GATE), stopping"
+ );
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("\n[Autonomous Loop] Circuit breaker tripped: {}\n",
+ circuit_breaker.open_reason.as_deref().unwrap_or("Unknown reason")
+ ),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+ break 'autonomous_loop;
+ }
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ "\n[Autonomous Loop] No COMPLETION_GATE found. Restarting with --continue...\n".to_string(),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ continue 'autonomous_loop;
+ }
+ }
+ } else {
+ // Not in autonomous loop mode or process failed - exit normally
+ break 'autonomous_loop;
+ }
+ } // end 'autonomous_loop
// Update state based on exit code
- let success = exit_code == 0;
+ let success = final_exit_code == 0;
let new_state = if success {
TaskState::Completed
} else {
@@ -3142,7 +3408,7 @@ impl TaskManagerInner {
tracing::info!(
task_id = %task_id,
- exit_code = exit_code,
+ exit_code = final_exit_code,
success = success,
new_state = ?new_state,
"Claude process exited, updating task state"
@@ -3154,7 +3420,7 @@ impl TaskManagerInner {
task.state = new_state;
task.completed_at = Some(Instant::now());
if !success {
- task.error = Some(format!("Process exited with code {}", exit_code));
+ task.error = Some(format!("Process exited with code {}", final_exit_code));
}
}
}
@@ -3196,7 +3462,7 @@ impl TaskManagerInner {
if is_supervisor {
tracing::info!(
task_id = %task_id,
- exit_code = exit_code,
+ exit_code = final_exit_code,
"Supervisor Claude process exited - NOT marking as complete"
);
// Update local state to reflect it's paused/waiting for input
@@ -3218,7 +3484,7 @@ impl TaskManagerInner {
let error = if success {
None
} else {
- Some(format!("Exit code: {}", exit_code))
+ Some(format!("Exit code: {}", final_exit_code))
};
tracing::info!(task_id = %task_id, success = success, "Notifying server of task completion");
let msg = DaemonMessage::task_complete(task_id, success, error);
diff --git a/makima/src/daemon/task/mod.rs b/makima/src/daemon/task/mod.rs
index 29c261e..3830e1d 100644
--- a/makima/src/daemon/task/mod.rs
+++ b/makima/src/daemon/task/mod.rs
@@ -1,7 +1,9 @@
//! Task management and execution.
+pub mod completion_gate;
pub mod manager;
pub mod state;
+pub use completion_gate::CompletionGate;
pub use manager::{ManagedTask, TaskConfig, TaskManager};
pub use state::TaskState;
diff --git a/makima/src/daemon/task/state.rs b/makima/src/daemon/task/state.rs
index ca5fc01..7b59b62 100644
--- a/makima/src/daemon/task/state.rs
+++ b/makima/src/daemon/task/state.rs
@@ -124,7 +124,9 @@ impl Default for TaskState {
#[cfg(test)]
mod tests {
+ #[allow(unused_imports)]
use crate::daemon::*;
+ use super::TaskState;
#[test]
fn test_valid_transitions() {