summaryrefslogtreecommitdiff
path: root/makima/src/daemon/task
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/daemon/task')
-rw-r--r--makima/src/daemon/task/completion_gate.rs402
-rw-r--r--makima/src/daemon/task/manager.rs500
-rw-r--r--makima/src/daemon/task/mod.rs2
-rw-r--r--makima/src/daemon/task/state.rs2
4 files changed, 789 insertions, 117 deletions
diff --git a/makima/src/daemon/task/completion_gate.rs b/makima/src/daemon/task/completion_gate.rs
new file mode 100644
index 0000000..69b7c6a
--- /dev/null
+++ b/makima/src/daemon/task/completion_gate.rs
@@ -0,0 +1,402 @@
+//! Completion gate parsing for autonomous loop mode.
+//!
+//! This module parses COMPLETION_GATE blocks from Claude's output to determine
+//! if the task is truly complete. The format is inspired by Ralph's autonomous
+//! development framework.
+//!
+//! Format:
+//! ```
+//! <COMPLETION_GATE>
+//! ready: true|false
+//! reason: "explanation of completion status"
+//! progress: "summary of what was accomplished"
+//! blockers: ["list", "of", "blockers"] (optional, only when ready: false)
+//! </COMPLETION_GATE>
+//! ```
+
+use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
+
+/// Represents a parsed COMPLETION_GATE block from Claude's output.
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+pub struct CompletionGate {
+ /// Whether the task is ready to complete.
+ pub ready: bool,
+ /// Explanation of the completion status.
+ pub reason: Option<String>,
+ /// Summary of what was accomplished.
+ pub progress: Option<String>,
+ /// List of blockers if not ready.
+ pub blockers: Option<Vec<String>>,
+ /// Any additional fields that were parsed.
+ #[serde(flatten)]
+ pub extra: HashMap<String, serde_json::Value>,
+}
+
+impl CompletionGate {
+ /// Parse a COMPLETION_GATE block from text output.
+ ///
+ /// Returns None if no valid COMPLETION_GATE is found.
+ pub fn parse(text: &str) -> Option<Self> {
+ // Find the COMPLETION_GATE block
+ let start_tag = "<COMPLETION_GATE>";
+ let end_tag = "</COMPLETION_GATE>";
+
+ let start_idx = text.find(start_tag)?;
+ let end_idx = text.find(end_tag)?;
+
+ if end_idx <= start_idx {
+ return None;
+ }
+
+ let content = &text[start_idx + start_tag.len()..end_idx];
+ let content = content.trim();
+
+ // Try to parse as JSON first
+ if content.starts_with('{') {
+ if let Ok(gate) = serde_json::from_str::<CompletionGate>(content) {
+ return Some(gate);
+ }
+ }
+
+ // Fall back to YAML-like parsing
+ Self::parse_yaml_like(content)
+ }
+
+ /// Parse a YAML-like format (key: value lines).
+ fn parse_yaml_like(content: &str) -> Option<Self> {
+ let mut gate = CompletionGate::default();
+
+ for line in content.lines() {
+ let line = line.trim();
+ if line.is_empty() {
+ continue;
+ }
+
+ if let Some((key, value)) = line.split_once(':') {
+ let key = key.trim().to_lowercase();
+ let value = value.trim();
+
+ match key.as_str() {
+ "ready" => {
+ gate.ready = value.to_lowercase() == "true"
+ || value == "yes"
+ || value == "1";
+ }
+ "reason" => {
+ gate.reason = Some(Self::unquote(value));
+ }
+ "progress" => {
+ gate.progress = Some(Self::unquote(value));
+ }
+ "blockers" => {
+ // Try to parse as JSON array
+ if let Ok(blockers) = serde_json::from_str::<Vec<String>>(value) {
+ gate.blockers = Some(blockers);
+ } else {
+ // Single blocker as string
+ gate.blockers = Some(vec![Self::unquote(value)]);
+ }
+ }
+ _ => {
+ // Store unknown fields
+ if let Ok(json_val) = serde_json::from_str(value) {
+ gate.extra.insert(key, json_val);
+ } else {
+ gate.extra.insert(
+ key,
+ serde_json::Value::String(Self::unquote(value)),
+ );
+ }
+ }
+ }
+ }
+ }
+
+ Some(gate)
+ }
+
+ /// Remove surrounding quotes from a string value.
+ fn unquote(s: &str) -> String {
+ let s = s.trim();
+ if (s.starts_with('"') && s.ends_with('"'))
+ || (s.starts_with('\'') && s.ends_with('\''))
+ {
+ s[1..s.len() - 1].to_string()
+ } else {
+ s.to_string()
+ }
+ }
+
+ /// Find all COMPLETION_GATE blocks in the output and return the last one.
+ ///
+ /// This is useful when Claude produces multiple completion gates during
+ /// a long-running task, and we want to use the final status.
+ pub fn parse_last(text: &str) -> Option<Self> {
+ let end_tag = "</COMPLETION_GATE>";
+ let mut last_gate = None;
+ let mut search_start = 0;
+
+ while let Some(end_idx) = text[search_start..].find(end_tag) {
+ let absolute_end = search_start + end_idx + end_tag.len();
+ if let Some(gate) = Self::parse(&text[..absolute_end]) {
+ last_gate = Some(gate);
+ }
+ search_start = absolute_end;
+ }
+
+ last_gate
+ }
+}
+
+/// State tracking for the circuit breaker in autonomous loop mode.
+#[derive(Debug, Clone, Default)]
+pub struct CircuitBreaker {
+ /// Number of consecutive runs without file changes.
+ pub runs_without_changes: u32,
+ /// Threshold for opening circuit due to no changes (default: 3).
+ pub no_change_threshold: u32,
+ /// Number of consecutive runs with the same error.
+ pub same_error_count: u32,
+ /// Threshold for opening circuit due to same error (default: 5).
+ pub same_error_threshold: u32,
+ /// Last error message seen.
+ pub last_error: Option<String>,
+ /// Total number of loop iterations.
+ pub iteration_count: u32,
+ /// Maximum allowed iterations (default: 10).
+ pub max_iterations: u32,
+ /// Whether the circuit is open (task should stop).
+ pub is_open: bool,
+ /// Reason why circuit was opened.
+ pub open_reason: Option<String>,
+}
+
+impl CircuitBreaker {
+ /// Create a new circuit breaker with default thresholds.
+ pub fn new() -> Self {
+ Self {
+ no_change_threshold: 3,
+ same_error_threshold: 5,
+ max_iterations: 10,
+ ..Default::default()
+ }
+ }
+
+ /// Create with custom thresholds.
+ pub fn with_thresholds(no_change: u32, same_error: u32, max_iterations: u32) -> Self {
+ Self {
+ no_change_threshold: no_change,
+ same_error_threshold: same_error,
+ max_iterations,
+ ..Default::default()
+ }
+ }
+
+ /// Record a new iteration. Returns true if circuit should remain closed.
+ pub fn record_iteration(&mut self, had_changes: bool, error: Option<&str>) -> bool {
+ self.iteration_count += 1;
+
+ // Check max iterations
+ if self.iteration_count >= self.max_iterations {
+ self.is_open = true;
+ self.open_reason = Some(format!(
+ "Maximum iterations ({}) reached",
+ self.max_iterations
+ ));
+ return false;
+ }
+
+ // Track file changes
+ if had_changes {
+ self.runs_without_changes = 0;
+ } else {
+ self.runs_without_changes += 1;
+ if self.runs_without_changes >= self.no_change_threshold {
+ self.is_open = true;
+ self.open_reason = Some(format!(
+ "No file changes for {} consecutive runs",
+ self.runs_without_changes
+ ));
+ return false;
+ }
+ }
+
+ // Track errors
+ match (error, &self.last_error) {
+ (Some(err), Some(last)) if err == last => {
+ self.same_error_count += 1;
+ if self.same_error_count >= self.same_error_threshold {
+ self.is_open = true;
+ self.open_reason = Some(format!(
+ "Same error repeated {} times: {}",
+ self.same_error_count, err
+ ));
+ return false;
+ }
+ }
+ (Some(err), _) => {
+ self.last_error = Some(err.to_string());
+ self.same_error_count = 1;
+ }
+ (None, _) => {
+ self.same_error_count = 0;
+ self.last_error = None;
+ }
+ }
+
+ true // Circuit remains closed
+ }
+
+ /// Check if the circuit breaker is open.
+ pub fn should_stop(&self) -> bool {
+ self.is_open
+ }
+
+ /// Reset the circuit breaker.
+ pub fn reset(&mut self) {
+ *self = Self::with_thresholds(
+ self.no_change_threshold,
+ self.same_error_threshold,
+ self.max_iterations,
+ );
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_parse_yaml_format() {
+ let text = r#"
+Some output before
+<COMPLETION_GATE>
+ready: true
+reason: "All tests pass"
+progress: "Implemented feature X"
+</COMPLETION_GATE>
+More output after
+"#;
+
+ let gate = CompletionGate::parse(text).unwrap();
+ assert!(gate.ready);
+ assert_eq!(gate.reason.as_deref(), Some("All tests pass"));
+ assert_eq!(gate.progress.as_deref(), Some("Implemented feature X"));
+ }
+
+ #[test]
+ fn test_parse_not_ready() {
+ let text = r#"
+<COMPLETION_GATE>
+ready: false
+reason: "Tests are failing"
+blockers: ["Fix test_foo", "Fix test_bar"]
+</COMPLETION_GATE>
+"#;
+
+ let gate = CompletionGate::parse(text).unwrap();
+ assert!(!gate.ready);
+ assert_eq!(gate.reason.as_deref(), Some("Tests are failing"));
+ assert_eq!(
+ gate.blockers,
+ Some(vec!["Fix test_foo".to_string(), "Fix test_bar".to_string()])
+ );
+ }
+
+ #[test]
+ fn test_parse_json_format() {
+ let text = r#"
+<COMPLETION_GATE>
+{
+ "ready": true,
+ "reason": "Done",
+ "progress": "All good"
+}
+</COMPLETION_GATE>
+"#;
+
+ let gate = CompletionGate::parse(text).unwrap();
+ assert!(gate.ready);
+ assert_eq!(gate.reason.as_deref(), Some("Done"));
+ }
+
+ #[test]
+ fn test_parse_last_gate() {
+ let text = r#"
+<COMPLETION_GATE>
+ready: false
+reason: "Still working"
+</COMPLETION_GATE>
+Some more work...
+<COMPLETION_GATE>
+ready: true
+reason: "Finally done"
+</COMPLETION_GATE>
+"#;
+
+ let gate = CompletionGate::parse_last(text).unwrap();
+ assert!(gate.ready);
+ assert_eq!(gate.reason.as_deref(), Some("Finally done"));
+ }
+
+ #[test]
+ fn test_no_gate() {
+ let text = "No completion gate here";
+ assert!(CompletionGate::parse(text).is_none());
+ }
+
+ #[test]
+ fn test_circuit_breaker_max_iterations() {
+ let mut cb = CircuitBreaker::with_thresholds(3, 5, 5);
+ for _ in 0..4 {
+ assert!(cb.record_iteration(true, None));
+ }
+ assert!(!cb.record_iteration(true, None)); // 5th iteration should trip
+ assert!(cb.is_open);
+ assert!(cb.open_reason.as_ref().unwrap().contains("Maximum iterations"));
+ }
+
+ #[test]
+ fn test_circuit_breaker_no_changes() {
+ let mut cb = CircuitBreaker::with_thresholds(3, 5, 10);
+ assert!(cb.record_iteration(false, None)); // 1st no change
+ assert!(cb.record_iteration(false, None)); // 2nd no change
+ assert!(!cb.record_iteration(false, None)); // 3rd no change - trips
+ assert!(cb.is_open);
+ assert!(cb.open_reason.as_ref().unwrap().contains("No file changes"));
+ }
+
+ #[test]
+ fn test_circuit_breaker_same_error() {
+ let mut cb = CircuitBreaker::with_thresholds(10, 3, 10);
+ let err = "Test failed";
+ assert!(cb.record_iteration(true, Some(err)));
+ assert!(cb.record_iteration(true, Some(err)));
+ assert!(!cb.record_iteration(true, Some(err))); // 3rd same error - trips
+ assert!(cb.is_open);
+ assert!(cb.open_reason.as_ref().unwrap().contains("Same error"));
+ }
+
+ #[test]
+ fn test_circuit_breaker_different_errors_ok() {
+ let mut cb = CircuitBreaker::with_thresholds(10, 3, 10);
+ assert!(cb.record_iteration(true, Some("error 1")));
+ assert!(cb.record_iteration(true, Some("error 2")));
+ assert!(cb.record_iteration(true, Some("error 3")));
+ // Different errors don't trip the circuit
+ assert!(!cb.is_open);
+ }
+
+ #[test]
+ fn test_circuit_breaker_changes_reset() {
+ let mut cb = CircuitBreaker::with_thresholds(3, 5, 10);
+ assert!(cb.record_iteration(false, None)); // 1 no change
+ assert!(cb.record_iteration(false, None)); // 2 no changes
+ assert!(cb.record_iteration(true, None)); // has changes - resets
+ assert!(cb.record_iteration(false, None)); // 1 no change again
+ assert!(cb.record_iteration(false, None)); // 2 no changes
+ // Still shouldn't trip because we had a change in between
+ assert!(!cb.is_open);
+ }
+}
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs
index 4ccedb2..75c884b 100644
--- a/makima/src/daemon/task/manager.rs
+++ b/makima/src/daemon/task/manager.rs
@@ -12,6 +12,7 @@ use uuid::Uuid;
use std::collections::HashSet;
+use super::completion_gate::{CircuitBreaker, CompletionGate};
use super::state::TaskState;
use crate::daemon::error::{DaemonError, TaskError, TaskResult};
use crate::daemon::process::{ClaudeInputMessage, ProcessManager};
@@ -944,6 +945,8 @@ pub struct ManagedTask {
pub copy_files: Option<Vec<String>>,
/// Contract ID if this task is associated with a contract.
pub contract_id: Option<Uuid>,
+ /// Whether to run in autonomous loop mode.
+ pub autonomous_loop: bool,
/// Time task was created.
pub created_at: Instant,
/// Time task started running.
@@ -973,6 +976,8 @@ pub struct TaskConfig {
pub enable_permissions: bool,
/// Disable verbose output.
pub disable_verbose: bool,
+ /// Bubblewrap sandbox configuration.
+ pub bubblewrap: Option<crate::daemon::config::BubblewrapConfig>,
}
impl Default for TaskConfig {
@@ -986,6 +991,7 @@ impl Default for TaskConfig {
claude_pre_args: Vec::new(),
enable_permissions: false,
disable_verbose: false,
+ bubblewrap: None,
}
}
}
@@ -1027,7 +1033,8 @@ impl TaskManager {
.with_pre_args(config.claude_pre_args.clone())
.with_permissions_enabled(config.enable_permissions)
.with_verbose_disabled(config.disable_verbose)
- .with_env(config.env_vars.clone()),
+ .with_env(config.env_vars.clone())
+ .with_bubblewrap(config.bubblewrap.clone()),
);
let temp_manager = Arc::new(TempManager::new());
@@ -1150,6 +1157,7 @@ impl TaskManager {
copy_files,
contract_id,
is_supervisor,
+ autonomous_loop,
} => {
tracing::info!(
task_id = %task_id,
@@ -1161,6 +1169,7 @@ impl TaskManager {
depth = depth,
is_orchestrator = is_orchestrator,
is_supervisor = is_supervisor,
+ autonomous_loop = autonomous_loop,
target_repo_path = ?target_repo_path,
completion_action = ?completion_action,
continue_from_task_id = ?continue_from_task_id,
@@ -1173,7 +1182,7 @@ impl TaskManager {
task_id, task_name, plan, repo_url, base_branch, target_branch,
parent_task_id, depth, is_orchestrator, is_supervisor,
target_repo_path, completion_action, continue_from_task_id,
- copy_files, contract_id
+ copy_files, contract_id, autonomous_loop
).await?;
}
DaemonCommand::PauseTask { task_id } => {
@@ -1252,6 +1261,7 @@ impl TaskManager {
None, // continue_from_task_id
None, // copy_files
contract_id,
+ false, // autonomous_loop - supervisors don't use this
).await {
tracing::error!(
task_id = %task_id,
@@ -1421,6 +1431,17 @@ impl TaskManager {
tracing::info!(task_id = %task_id, "Getting task diff");
self.handle_get_task_diff(task_id).await?;
}
+ DaemonCommand::CleanupWorktree {
+ task_id,
+ delete_branch,
+ } => {
+ tracing::info!(
+ task_id = %task_id,
+ delete_branch = delete_branch,
+ "Cleaning up worktree"
+ );
+ self.handle_cleanup_worktree(task_id, delete_branch).await?;
+ }
}
Ok(())
}
@@ -1444,6 +1465,7 @@ impl TaskManager {
continue_from_task_id: Option<Uuid>,
copy_files: Option<Vec<String>>,
contract_id: Option<Uuid>,
+ autonomous_loop: bool,
) -> TaskResult<()> {
tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, "=== SPAWN_TASK START ===");
@@ -1496,6 +1518,7 @@ impl TaskManager {
continue_from_task_id,
copy_files: copy_files.clone(),
contract_id,
+ autonomous_loop,
created_at: Instant::now(),
started_at: None,
completed_at: None,
@@ -1519,7 +1542,7 @@ impl TaskManager {
if let Err(e) = inner.run_task(
task_id, task_name, plan, repo_url, base_branch, target_branch,
is_orchestrator, is_supervisor, target_repo_path, completion_action,
- continue_from_task_id, copy_files, contract_id
+ continue_from_task_id, copy_files, contract_id, autonomous_loop
).await {
tracing::error!(task_id = %task_id, error = %e, "Task execution failed");
inner.mark_failed(task_id, &e.to_string()).await;
@@ -2046,6 +2069,76 @@ impl TaskManager {
Ok(())
}
+ /// Handle CleanupWorktree command.
+ ///
+ /// Removes a task's worktree and optionally its branch.
+ /// Used when a contract is completed or deleted to clean up associated task worktrees.
+ async fn handle_cleanup_worktree(
+ &self,
+ task_id: Uuid,
+ delete_branch: bool,
+ ) -> Result<(), DaemonError> {
+ // Try to get the worktree path, but don't fail if not found
+ let worktree_result = self.get_task_worktree_path(task_id).await;
+
+ let (success, message) = match worktree_result {
+ Ok(worktree_path) => {
+ // Remove the worktree
+ match self.worktree_manager.remove_worktree(&worktree_path, delete_branch).await {
+ Ok(()) => {
+ tracing::info!(
+ task_id = %task_id,
+ worktree_path = %worktree_path.display(),
+ delete_branch = delete_branch,
+ "Worktree cleaned up successfully"
+ );
+
+ // Also remove task from in-memory tracking
+ self.tasks.write().await.remove(&task_id);
+ self.task_inputs.write().await.remove(&task_id);
+ self.merge_trackers.write().await.remove(&task_id);
+ self.active_pids.write().await.remove(&task_id);
+
+ (true, format!("Worktree cleaned up: {}", worktree_path.display()))
+ }
+ Err(e) => {
+ tracing::warn!(
+ task_id = %task_id,
+ worktree_path = %worktree_path.display(),
+ error = %e,
+ "Failed to remove worktree"
+ );
+ (false, format!("Failed to remove worktree: {}", e))
+ }
+ }
+ }
+ Err(_) => {
+ // Worktree not found - this is OK, it may have already been cleaned up
+ tracing::debug!(
+ task_id = %task_id,
+ "No worktree found for task, may have already been cleaned up"
+ );
+
+ // Still remove from in-memory tracking
+ self.tasks.write().await.remove(&task_id);
+ self.task_inputs.write().await.remove(&task_id);
+ self.merge_trackers.write().await.remove(&task_id);
+ self.active_pids.write().await.remove(&task_id);
+
+ (true, "No worktree found, task tracking cleaned up".to_string())
+ }
+ };
+
+ // Send result back to server
+ let msg = DaemonMessage::CleanupWorktreeResult {
+ task_id,
+ success,
+ message,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ Ok(())
+ }
+
/// Handle ReadRepoFile command.
///
/// Reads a file from a repository on the daemon's filesystem and sends
@@ -2436,6 +2529,7 @@ impl TaskManagerInner {
continue_from_task_id: Option<Uuid>,
copy_files: Option<Vec<String>>,
contract_id: Option<Uuid>,
+ autonomous_loop: bool,
) -> Result<(), DaemonError> {
tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, "=== RUN_TASK START ===");
@@ -2908,6 +3002,9 @@ impl TaskManagerInner {
);
let _ = self.ws_tx.send(msg).await;
+ // Clone extra_env for use in autonomous loop iterations
+ let extra_env_for_loop = extra_env.clone();
+
tracing::debug!(task_id = %task_id, has_system_prompt = system_prompt.is_some(), "Calling process_manager.spawn()...");
let mut process = self.process_manager
.spawn_with_system_prompt(&working_dir, &full_plan, extra_env, system_prompt.as_deref())
@@ -2934,7 +3031,7 @@ impl TaskManagerInner {
// Get stdin handle for input forwarding and completion signaling
let stdin_handle = process.stdin_handle();
- let stdin_handle_for_completion = stdin_handle.clone();
+ let mut stdin_handle_for_completion = stdin_handle.clone();
tracing::info!(task_id = %task_id, "Setting up stdin forwarder for task input (JSON protocol)");
tokio::spawn(async move {
@@ -2998,142 +3095,311 @@ impl TaskManagerInner {
let daemon_hostname = hostname::get().ok().and_then(|h| h.into_string().ok());
let mut auth_error_handled = false;
- let mut output_count = 0u64;
- let mut output_bytes = 0usize;
- let startup_timeout = tokio::time::Duration::from_secs(30);
- let mut startup_check = tokio::time::interval(tokio::time::Duration::from_secs(5));
- startup_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
- let startup_deadline = tokio::time::Instant::now() + startup_timeout;
+ // For autonomous loop mode: track accumulated output for COMPLETION_GATE detection
+ let mut accumulated_output = String::new();
+ let mut circuit_breaker = CircuitBreaker::new();
+ let mut iteration_count = 0u32;
+ let mut final_exit_code: i64 = -1; // Track the final exit code across iterations
- loop {
- tokio::select! {
- maybe_line = process.next_output() => {
- match maybe_line {
- Some(line) => {
- output_count += 1;
- output_bytes += line.content.len();
-
- if output_count == 1 {
- tracing::info!(task_id = %task_id, "Received first output line from Claude");
- }
- if output_count % 100 == 0 {
- tracing::debug!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output progress");
- }
+ // Autonomous loop: we may run multiple iterations
+ 'autonomous_loop: loop {
+ iteration_count += 1;
- // Log output details for debugging
- tracing::trace!(
- task_id = %task_id,
- line_num = output_count,
- content_len = line.content.len(),
- is_stdout = line.is_stdout,
- json_type = ?line.json_type,
- "Forwarding output to WebSocket"
- );
+ if autonomous_loop && iteration_count > 1 {
+ tracing::info!(
+ task_id = %task_id,
+ iteration = iteration_count,
+ "Starting autonomous loop iteration"
+ );
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("\n[Autonomous Loop] Starting iteration {} (--continue mode)\n", iteration_count),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ // For subsequent iterations, spawn with --continue flag
+ let continuation_prompt = "Continue working on the task. Review your previous output and progress. When you are completely done, output a COMPLETION_GATE block with ready: true.";
+
+ process = self.process_manager
+ .spawn_continue(&working_dir, continuation_prompt, extra_env_for_loop.clone(), system_prompt.as_deref())
+ .await
+ .map_err(|e| {
+ tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process for continuation");
+ DaemonError::Task(TaskError::SetupFailed(e.to_string()))
+ })?;
+
+ // Register the new process PID
+ if let Some(pid) = process.id() {
+ self.active_pids.write().await.insert(task_id, pid);
+ tracing::info!(task_id = %task_id, pid = pid, iteration = iteration_count, "Claude continue process spawned");
+ }
+
+ // Reset stdin handle for the new process
+ stdin_handle_for_completion = process.stdin_handle();
+ }
+
+ // Clear output for this iteration (we'll check for COMPLETION_GATE in the new output)
+ let mut iteration_output = String::new();
+
+ let mut output_count = 0u64;
+ let mut output_bytes = 0usize;
+ let startup_timeout = tokio::time::Duration::from_secs(30);
+ let mut startup_check = tokio::time::interval(tokio::time::Duration::from_secs(5));
+ startup_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
+ let startup_deadline = tokio::time::Instant::now() + startup_timeout;
- // Check if this is a "result" message indicating task completion
- // With --input-format=stream-json, Claude waits for more input after completion
- // We close stdin to signal EOF and let the process exit
- if line.json_type.as_deref() == Some("result") {
- tracing::info!(task_id = %task_id, "Received result message, closing stdin to signal completion");
- let mut stdin_guard = stdin_handle_for_completion.lock().await;
- if let Some(mut stdin) = stdin_guard.take() {
- let _ = stdin.shutdown().await;
+ loop {
+ tokio::select! {
+ maybe_line = process.next_output() => {
+ match maybe_line {
+ Some(line) => {
+ output_count += 1;
+ output_bytes += line.content.len();
+
+ // Accumulate output for COMPLETION_GATE detection in autonomous loop mode
+ if autonomous_loop {
+ iteration_output.push_str(&line.content);
+ iteration_output.push('\n');
}
- }
- // Check for OAuth auth error before sending output
- let content_for_auth_check = line.content.clone();
- let json_type_for_auth_check = line.json_type.clone();
- let is_stdout_for_auth_check = line.is_stdout;
+ if output_count == 1 {
+ tracing::info!(task_id = %task_id, "Received first output line from Claude");
+ }
+ if output_count % 100 == 0 {
+ tracing::debug!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output progress");
+ }
- let msg = DaemonMessage::task_output(task_id, line.content, false);
- if ws_tx.send(msg).await.is_err() {
- tracing::warn!(task_id = %task_id, "Failed to send output, channel closed");
- break;
- }
+ // Log output details for debugging
+ tracing::trace!(
+ task_id = %task_id,
+ line_num = output_count,
+ content_len = line.content.len(),
+ is_stdout = line.is_stdout,
+ json_type = ?line.json_type,
+ "Forwarding output to WebSocket"
+ );
- // Detect OAuth token expiration and trigger remote login flow
- if !auth_error_handled && is_oauth_auth_error(&content_for_auth_check, json_type_for_auth_check.as_deref(), is_stdout_for_auth_check) {
- auth_error_handled = true;
- tracing::warn!(task_id = %task_id, "OAuth authentication error detected, initiating remote login flow");
-
- // Spawn claude setup-token to get login URL
- if let Some(login_url) = get_oauth_login_url(&claude_command).await {
- tracing::info!(task_id = %task_id, login_url = %login_url, "Got OAuth login URL");
- let auth_msg = DaemonMessage::AuthenticationRequired {
- task_id: Some(task_id),
- login_url,
- hostname: daemon_hostname.clone(),
- };
- if ws_tx.send(auth_msg).await.is_err() {
- tracing::warn!(task_id = %task_id, "Failed to send auth required message");
+ // Check if this is a "result" message indicating task completion
+ // With --input-format=stream-json, Claude waits for more input after completion
+ // We close stdin to signal EOF and let the process exit
+ if line.json_type.as_deref() == Some("result") {
+ tracing::info!(task_id = %task_id, "Received result message, closing stdin to signal completion");
+ let mut stdin_guard = stdin_handle_for_completion.lock().await;
+ if let Some(mut stdin) = stdin_guard.take() {
+ let _ = stdin.shutdown().await;
+ }
+ }
+
+ // Check for OAuth auth error before sending output
+ let content_for_auth_check = line.content.clone();
+ let json_type_for_auth_check = line.json_type.clone();
+ let is_stdout_for_auth_check = line.is_stdout;
+
+ let msg = DaemonMessage::task_output(task_id, line.content, false);
+ if ws_tx.send(msg).await.is_err() {
+ tracing::warn!(task_id = %task_id, "Failed to send output, channel closed");
+ break;
+ }
+
+ // Detect OAuth token expiration and trigger remote login flow
+ if !auth_error_handled && is_oauth_auth_error(&content_for_auth_check, json_type_for_auth_check.as_deref(), is_stdout_for_auth_check) {
+ auth_error_handled = true;
+ tracing::warn!(task_id = %task_id, "OAuth authentication error detected, initiating remote login flow");
+
+ // Spawn claude setup-token to get login URL
+ if let Some(login_url) = get_oauth_login_url(&claude_command).await {
+ tracing::info!(task_id = %task_id, login_url = %login_url, "Got OAuth login URL");
+ let auth_msg = DaemonMessage::AuthenticationRequired {
+ task_id: Some(task_id),
+ login_url,
+ hostname: daemon_hostname.clone(),
+ };
+ if ws_tx.send(auth_msg).await.is_err() {
+ tracing::warn!(task_id = %task_id, "Failed to send auth required message");
+ }
+ } else {
+ tracing::error!(task_id = %task_id, "Failed to get OAuth login URL from setup-token");
+ let fallback_msg = DaemonMessage::task_output(
+ task_id,
+ format!("Authentication required on daemon{}. Please run 'claude /login' on the daemon machine.\n",
+ daemon_hostname.as_ref().map(|h| format!(" ({})", h)).unwrap_or_default()),
+ false,
+ );
+ let _ = ws_tx.send(fallback_msg).await;
}
- } else {
- tracing::error!(task_id = %task_id, "Failed to get OAuth login URL from setup-token");
- let fallback_msg = DaemonMessage::task_output(
- task_id,
- format!("Authentication required on daemon{}. Please run 'claude /login' on the daemon machine.\n",
- daemon_hostname.as_ref().map(|h| format!(" ({})", h)).unwrap_or_default()),
- false,
- );
- let _ = ws_tx.send(fallback_msg).await;
}
}
- }
- None => {
- tracing::info!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output stream ended");
- break;
+ None => {
+ tracing::info!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output stream ended");
+ break;
+ }
}
}
- }
- _ = startup_check.tick(), if output_count == 0 => {
- // Check if process is still alive
- match process.try_wait() {
- Ok(Some(exit_code)) => {
- tracing::error!(task_id = %task_id, exit_code = exit_code, "Claude process exited before producing output!");
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Error: Claude process exited unexpectedly with code {}\n", exit_code),
- false,
- );
- let _ = ws_tx.send(msg).await;
- break;
- }
- Ok(None) => {
- // Still running but no output
- if tokio::time::Instant::now() > startup_deadline {
- tracing::warn!(task_id = %task_id, "Claude process not producing output after 30s - may be stuck");
+ _ = startup_check.tick(), if output_count == 0 => {
+ // Check if process is still alive
+ match process.try_wait() {
+ Ok(Some(exit_code)) => {
+ tracing::error!(task_id = %task_id, exit_code = exit_code, "Claude process exited before producing output!");
let msg = DaemonMessage::task_output(
task_id,
- "Warning: Claude Code is taking longer than expected to start. It may be waiting for authentication or network access.\n".to_string(),
+ format!("Error: Claude process exited unexpectedly with code {}\n", exit_code),
false,
);
let _ = ws_tx.send(msg).await;
- } else {
- tracing::debug!(task_id = %task_id, "Claude process still running, waiting for output...");
+ break;
+ }
+ Ok(None) => {
+ // Still running but no output
+ if tokio::time::Instant::now() > startup_deadline {
+ tracing::warn!(task_id = %task_id, "Claude process not producing output after 30s - may be stuck");
+ let msg = DaemonMessage::task_output(
+ task_id,
+ "Warning: Claude Code is taking longer than expected to start. It may be waiting for authentication or network access.\n".to_string(),
+ false,
+ );
+ let _ = ws_tx.send(msg).await;
+ } else {
+ tracing::debug!(task_id = %task_id, "Claude process still running, waiting for output...");
+ }
+ }
+ Err(e) => {
+ tracing::error!(task_id = %task_id, error = %e, "Failed to check Claude process status");
}
- }
- Err(e) => {
- tracing::error!(task_id = %task_id, error = %e, "Failed to check Claude process status");
}
}
}
}
- }
- // Wait for process to exit
- let exit_code = process.wait().await.unwrap_or(-1);
+ // Wait for process to exit
+ let exit_code = process.wait().await.unwrap_or(-1);
+ final_exit_code = exit_code; // Store for use after the loop
+
+ // Unregister the process PID (process has exited)
+ self.active_pids.write().await.remove(&task_id);
+ tracing::debug!(task_id = %task_id, "Unregistered process PID");
+
+ // Clean up input channel for this task
+ self.task_inputs.write().await.remove(&task_id);
+ tracing::debug!(task_id = %task_id, "Removed task input channel");
- // Unregister the process PID (process has exited)
- self.active_pids.write().await.remove(&task_id);
- tracing::debug!(task_id = %task_id, "Unregistered process PID");
+ // Accumulate this iteration's output
+ accumulated_output.push_str(&iteration_output);
- // Clean up input channel for this task
- self.task_inputs.write().await.remove(&task_id);
- tracing::debug!(task_id = %task_id, "Removed task input channel");
+ // === AUTONOMOUS LOOP LOGIC ===
+ // Check if we should continue or complete
+ if autonomous_loop && exit_code == 0 {
+ // Check for COMPLETION_GATE in the output
+ let completion_gate = CompletionGate::parse_last(&iteration_output);
+
+ match completion_gate {
+ Some(gate) if gate.ready => {
+ tracing::info!(
+ task_id = %task_id,
+ iteration = iteration_count,
+ reason = ?gate.reason,
+ "COMPLETION_GATE ready=true detected, task complete"
+ );
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("\n[Autonomous Loop] Task completed after {} iteration(s). Reason: {}\n",
+ iteration_count,
+ gate.reason.unwrap_or_else(|| "Task complete".to_string())
+ ),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+ break 'autonomous_loop;
+ }
+ Some(gate) => {
+ // COMPLETION_GATE found but not ready
+ tracing::info!(
+ task_id = %task_id,
+ iteration = iteration_count,
+ reason = ?gate.reason,
+ blockers = ?gate.blockers,
+ "COMPLETION_GATE ready=false, will continue"
+ );
+
+ // Check circuit breaker
+ // For now, we consider output_bytes > 0 as "progress"
+ let had_progress = output_bytes > 0;
+ let error = gate.blockers.as_ref().and_then(|b| b.first()).map(|s| s.as_str());
+
+ if !circuit_breaker.record_iteration(had_progress, error) {
+ // Circuit breaker tripped
+ tracing::warn!(
+ task_id = %task_id,
+ reason = ?circuit_breaker.open_reason,
+ "Circuit breaker tripped, stopping autonomous loop"
+ );
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("\n[Autonomous Loop] Circuit breaker tripped: {}\n",
+ circuit_breaker.open_reason.as_deref().unwrap_or("Unknown reason")
+ ),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+ break 'autonomous_loop;
+ }
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("\n[Autonomous Loop] COMPLETION_GATE ready=false. Reason: {}. Restarting...\n",
+ gate.reason.unwrap_or_else(|| "Not complete".to_string())
+ ),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ // Continue to next iteration
+ continue 'autonomous_loop;
+ }
+ None => {
+ // No COMPLETION_GATE found - check circuit breaker and continue
+ tracing::info!(
+ task_id = %task_id,
+ iteration = iteration_count,
+ "No COMPLETION_GATE found, will restart with continuation prompt"
+ );
+
+ let had_progress = output_bytes > 0;
+ if !circuit_breaker.record_iteration(had_progress, None) {
+ tracing::warn!(
+ task_id = %task_id,
+ reason = ?circuit_breaker.open_reason,
+ "Circuit breaker tripped (no COMPLETION_GATE), stopping"
+ );
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("\n[Autonomous Loop] Circuit breaker tripped: {}\n",
+ circuit_breaker.open_reason.as_deref().unwrap_or("Unknown reason")
+ ),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+ break 'autonomous_loop;
+ }
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ "\n[Autonomous Loop] No COMPLETION_GATE found. Restarting with --continue...\n".to_string(),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ continue 'autonomous_loop;
+ }
+ }
+ } else {
+ // Not in autonomous loop mode or process failed - exit normally
+ break 'autonomous_loop;
+ }
+ } // end 'autonomous_loop
// Update state based on exit code
- let success = exit_code == 0;
+ let success = final_exit_code == 0;
let new_state = if success {
TaskState::Completed
} else {
@@ -3142,7 +3408,7 @@ impl TaskManagerInner {
tracing::info!(
task_id = %task_id,
- exit_code = exit_code,
+ exit_code = final_exit_code,
success = success,
new_state = ?new_state,
"Claude process exited, updating task state"
@@ -3154,7 +3420,7 @@ impl TaskManagerInner {
task.state = new_state;
task.completed_at = Some(Instant::now());
if !success {
- task.error = Some(format!("Process exited with code {}", exit_code));
+ task.error = Some(format!("Process exited with code {}", final_exit_code));
}
}
}
@@ -3196,7 +3462,7 @@ impl TaskManagerInner {
if is_supervisor {
tracing::info!(
task_id = %task_id,
- exit_code = exit_code,
+ exit_code = final_exit_code,
"Supervisor Claude process exited - NOT marking as complete"
);
// Update local state to reflect it's paused/waiting for input
@@ -3218,7 +3484,7 @@ impl TaskManagerInner {
let error = if success {
None
} else {
- Some(format!("Exit code: {}", exit_code))
+ Some(format!("Exit code: {}", final_exit_code))
};
tracing::info!(task_id = %task_id, success = success, "Notifying server of task completion");
let msg = DaemonMessage::task_complete(task_id, success, error);
diff --git a/makima/src/daemon/task/mod.rs b/makima/src/daemon/task/mod.rs
index 29c261e..3830e1d 100644
--- a/makima/src/daemon/task/mod.rs
+++ b/makima/src/daemon/task/mod.rs
@@ -1,7 +1,9 @@
//! Task management and execution.
+pub mod completion_gate;
pub mod manager;
pub mod state;
+pub use completion_gate::CompletionGate;
pub use manager::{ManagedTask, TaskConfig, TaskManager};
pub use state::TaskState;
diff --git a/makima/src/daemon/task/state.rs b/makima/src/daemon/task/state.rs
index ca5fc01..7b59b62 100644
--- a/makima/src/daemon/task/state.rs
+++ b/makima/src/daemon/task/state.rs
@@ -124,7 +124,9 @@ impl Default for TaskState {
#[cfg(test)]
mod tests {
+ #[allow(unused_imports)]
use crate::daemon::*;
+ use super::TaskState;
#[test]
fn test_valid_transitions() {