diff options
| author | soryu <soryu@soryu.co> | 2026-01-19 02:26:09 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-19 02:26:09 +0000 |
| commit | 786510379bed060db2b3742b7dfca671552d2c34 (patch) | |
| tree | 432d0b575a64ef0fb2cb10a86cd916b5fbc16909 | |
| parent | b64eddc8c2f250cdcbacae18cce107bf4c86f9f4 (diff) | |
| download | soryu-786510379bed060db2b3742b7dfca671552d2c34.tar.gz soryu-786510379bed060db2b3742b7dfca671552d2c34.zip | |
Make sure tasks can continue
| -rw-r--r-- | makima/src/daemon/task/manager.rs | 108 | ||||
| -rw-r--r-- | makima/src/daemon/ws/protocol.rs | 9 | ||||
| -rw-r--r-- | makima/src/server/handlers/contract_chat.rs | 2 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh.rs | 10 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_chat.rs | 2 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_supervisor.rs | 82 | ||||
| -rw-r--r-- | makima/src/server/state.rs | 6 |
7 files changed, 202 insertions, 17 deletions
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs index 029d026..80b7039 100644 --- a/makima/src/daemon/task/manager.rs +++ b/makima/src/daemon/task/manager.rs @@ -1171,6 +1171,8 @@ impl TaskManager { contract_id, is_supervisor, autonomous_loop, + resume_session, + conversation_history, } => { tracing::info!( task_id = %task_id, @@ -1183,6 +1185,7 @@ impl TaskManager { is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, autonomous_loop = autonomous_loop, + resume_session = resume_session, target_repo_path = ?target_repo_path, completion_action = ?completion_action, continue_from_task_id = ?continue_from_task_id, @@ -1195,7 +1198,8 @@ impl TaskManager { task_id, task_name, plan, repo_url, base_branch, target_branch, parent_task_id, depth, is_orchestrator, is_supervisor, target_repo_path, completion_action, continue_from_task_id, - copy_files, contract_id, autonomous_loop + copy_files, contract_id, autonomous_loop, resume_session, + conversation_history, ).await?; } DaemonCommand::PauseTask { task_id } => { @@ -1275,6 +1279,8 @@ impl TaskManager { None, // copy_files contract_id, false, // autonomous_loop - supervisors don't use this + false, // resume_session - respawning from scratch + None, // conversation_history - not needed for fresh respawn ).await { tracing::error!( task_id = %task_id, @@ -1490,6 +1496,8 @@ impl TaskManager { copy_files: Option<Vec<String>>, contract_id: Option<Uuid>, autonomous_loop: bool, + resume_session: bool, + conversation_history: Option<serde_json::Value>, ) -> TaskResult<()> { tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, "=== SPAWN_TASK START ==="); @@ -1566,7 +1574,8 @@ impl TaskManager { if let Err(e) = inner.run_task( task_id, task_name, plan, repo_url, base_branch, target_branch, is_orchestrator, is_supervisor, target_repo_path, completion_action, - continue_from_task_id, copy_files, contract_id, autonomous_loop + continue_from_task_id, copy_files, contract_id, autonomous_loop, resume_session, + conversation_history, ).await { tracing::error!(task_id = %task_id, error = %e, "Task execution failed"); inner.mark_failed(task_id, &e.to_string()).await; @@ -2909,11 +2918,39 @@ impl TaskManagerInner { copy_files: Option<Vec<String>>, contract_id: Option<Uuid>, autonomous_loop: bool, + resume_session: bool, + conversation_history: Option<serde_json::Value>, ) -> Result<(), DaemonError> { - tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, "=== RUN_TASK START ==="); + tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, resume_session = resume_session, "=== RUN_TASK START ==="); + + // If resuming session, try to find existing worktree first + let existing_worktree = if resume_session { + match self.find_worktree_for_task(task_id).await { + Ok(path) => { + tracing::info!(task_id = %task_id, path = %path.display(), "Found existing worktree for session resume"); + Some(path) + } + Err(e) => { + tracing::warn!(task_id = %task_id, error = %e, "No existing worktree found for resume, will create new"); + None + } + } + } else { + None + }; // Determine working directory - let working_dir = if let Some(ref source) = repo_source { + let has_existing_worktree = existing_worktree.is_some(); + let working_dir = if let Some(existing) = existing_worktree { + // Reuse existing worktree for session resume + let msg = DaemonMessage::task_output( + task_id, + format!("Resuming session in existing worktree: {}\n", existing.display()), + false, + ); + let _ = self.ws_tx.send(msg).await; + existing + } else if let Some(ref source) = repo_source { if is_new_repo_request(source) { // Explicit new repo request: new:// or new://project-name tracing::info!( @@ -3388,14 +3425,61 @@ impl TaskManagerInner { // Clone extra_env for use in autonomous loop iterations let extra_env_for_loop = extra_env.clone(); - tracing::debug!(task_id = %task_id, has_system_prompt = system_prompt.is_some(), "Calling process_manager.spawn()..."); - let mut process = self.process_manager - .spawn_with_system_prompt(&working_dir, &full_plan, extra_env, system_prompt.as_deref()) - .await - .map_err(|e| { - tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process"); - DaemonError::Task(TaskError::SetupFailed(e.to_string())) - })?; + tracing::debug!(task_id = %task_id, has_system_prompt = system_prompt.is_some(), resume_session = resume_session, "Calling process_manager.spawn()..."); + let mut process = if resume_session { + // Use --continue flag to resume from previous session + // Build continuation prompt based on whether worktree exists + let continuation_prompt = if has_existing_worktree { + // Worktree exists: Claude's session state should work + format!( + "Resuming previous session. Continue from where you left off.\n\n{}", + full_plan + ) + } else if let Some(ref history) = conversation_history { + // Worktree missing: inject conversation history as context + let history_str = serde_json::to_string_pretty(history).unwrap_or_default(); + format!( + "Resuming previous session. Here is the conversation history from the previous session:\n\n\ + <previous_conversation>\n{}\n</previous_conversation>\n\n\ + Continue from where you left off with this task:\n\n{}", + history_str, + full_plan + ) + } else { + // No history available: just the plan + format!("Resuming with plan:\n\n{}", full_plan) + }; + + let resume_msg = if has_existing_worktree { + "Using --continue to resume previous conversation...\n" + } else if conversation_history.is_some() { + "Worktree not found. Resuming with injected conversation history...\n" + } else { + "Resuming without conversation history (worktree not found)...\n" + }; + let msg = DaemonMessage::task_output( + task_id, + resume_msg.to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + + self.process_manager + .spawn_continue(&working_dir, &continuation_prompt, extra_env, system_prompt.as_deref()) + .await + .map_err(|e| { + tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process with --continue"); + DaemonError::Task(TaskError::SetupFailed(e.to_string())) + })? + } else { + self.process_manager + .spawn_with_system_prompt(&working_dir, &full_plan, extra_env, system_prompt.as_deref()) + .await + .map_err(|e| { + tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process"); + DaemonError::Task(TaskError::SetupFailed(e.to_string())) + })? + }; // Register the process PID for graceful shutdown tracking if let Some(pid) = process.id() { diff --git a/makima/src/daemon/ws/protocol.rs b/makima/src/daemon/ws/protocol.rs index 5c9004b..d0bcc19 100644 --- a/makima/src/daemon/ws/protocol.rs +++ b/makima/src/daemon/ws/protocol.rs @@ -378,6 +378,15 @@ pub enum DaemonCommand { /// without a COMPLETION_GATE indicating ready: true. #[serde(rename = "autonomousLoop", default)] autonomous_loop: bool, + /// Whether to resume from a previous session using --continue flag. + /// When enabled, the daemon will reuse the existing worktree and call + /// Claude with --continue to maintain conversation history. + #[serde(rename = "resumeSession", default)] + resume_session: bool, + /// Conversation history for fallback when worktree doesn't exist. + /// Used to inject previous conversation context into the prompt. + #[serde(rename = "conversationHistory", default)] + conversation_history: Option<serde_json::Value>, }, /// Pause a running task. diff --git a/makima/src/server/handlers/contract_chat.rs b/makima/src/server/handlers/contract_chat.rs index 29ec620..7e7b476 100644 --- a/makima/src/server/handlers/contract_chat.rs +++ b/makima/src/server/handlers/contract_chat.rs @@ -1589,6 +1589,8 @@ async fn handle_contract_request( copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, + resume_session: false, + conversation_history: None, }; if let Err(e) = command_sender.send(command).await { diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index e9a421c..ad3ec79 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -719,6 +719,8 @@ pub async fn start_task( copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, + resume_session: false, + conversation_history: None, }; tracing::info!( @@ -766,6 +768,8 @@ pub async fn start_task( copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, + resume_session: false, + conversation_history: None, }; if state.send_daemon_command(alt_daemon_id, alt_command).await.is_ok() { @@ -1165,6 +1169,8 @@ pub async fn send_message( copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: updated_task.contract_id, is_supervisor: updated_task.is_supervisor, + resume_session: false, + conversation_history: None, }; if state.send_daemon_command(new_daemon_id, spawn_cmd).await.is_ok() { @@ -2299,6 +2305,8 @@ pub async fn reassign_task( copy_files: None, contract_id: task.contract_id, is_supervisor: task.is_supervisor, + resume_session: false, + conversation_history: None, }; tracing::info!( @@ -2621,6 +2629,8 @@ pub async fn continue_task( copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, + resume_session: false, + conversation_history: None, }; tracing::info!( diff --git a/makima/src/server/handlers/mesh_chat.rs b/makima/src/server/handlers/mesh_chat.rs index f6b861a..c468446 100644 --- a/makima/src/server/handlers/mesh_chat.rs +++ b/makima/src/server/handlers/mesh_chat.rs @@ -1146,6 +1146,8 @@ async fn handle_mesh_request( copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, + resume_session: false, + conversation_history: None, }; match state.send_daemon_command(target_daemon_id, command).await { diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index a8af3fb..df8f77c 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -351,6 +351,8 @@ async fn try_start_pending_task( copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: updated_task.contract_id, is_supervisor: false, + resume_session: false, + conversation_history: None, }; if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { @@ -646,6 +648,8 @@ pub async fn spawn_task( copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: updated_task.contract_id, is_supervisor: false, + resume_session: false, + conversation_history: None, }; if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { @@ -1845,22 +1849,90 @@ pub async fn resume_supervisor( .map(|arr| arr.len() as i32) .unwrap_or(0); + // Find a connected daemon for this owner + let target_daemon_id = match state.find_alternative_daemon(auth_info.owner_id, &[]) { + Some(id) => id, + None => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new( + "NO_DAEMON", + "No daemons connected for your account. Cannot resume supervisor.", + )), + ) + .into_response(); + } + }; + + // Track response values (may be updated by resume modes) + let mut response_daemon_id = supervisor_task.daemon_id; + let mut response_status = "pending".to_string(); + // Based on resume mode, handle differently match req.resume_mode.as_str() { "continue" => { - // Mark task for reassignment with existing conversation context - if let Err(e) = sqlx::query("UPDATE tasks SET status = 'pending' WHERE id = $1") + // Update task status to starting and assign daemon + if let Err(e) = sqlx::query("UPDATE tasks SET status = 'starting', daemon_id = $1 WHERE id = $2") + .bind(target_daemon_id) .bind(supervisor_state.task_id) .execute(pool) .await { - tracing::error!("Failed to update task status: {}", e); + tracing::error!("Failed to update task for resume: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } + + // Send SpawnTask with resume_session=true to use Claude's --continue + // Include conversation_history as fallback if worktree doesn't exist on target daemon + let command = DaemonCommand::SpawnTask { + task_id: supervisor_state.task_id, + task_name: supervisor_task.name.clone(), + plan: supervisor_task.plan.clone(), + repo_url: supervisor_task.repository_url.clone(), + base_branch: supervisor_task.base_branch.clone(), + target_branch: supervisor_task.target_branch.clone(), + parent_task_id: supervisor_task.parent_task_id, + depth: supervisor_task.depth, + is_orchestrator: false, + target_repo_path: supervisor_task.target_repo_path.clone(), + completion_action: supervisor_task.completion_action.clone(), + continue_from_task_id: supervisor_task.continue_from_task_id, + copy_files: supervisor_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), + contract_id: supervisor_task.contract_id, + is_supervisor: true, + resume_session: true, // Use --continue to preserve conversation + conversation_history: Some(supervisor_state.conversation_history.clone()), // Fallback if worktree missing + }; + + if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { + // Rollback status on failure + let _ = sqlx::query("UPDATE tasks SET status = 'interrupted', daemon_id = NULL WHERE id = $1") + .bind(supervisor_state.task_id) + .execute(pool) + .await; + tracing::error!("Failed to send SpawnTask to daemon: {}", e); + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DAEMON_ERROR", format!("Failed to send to daemon: {}", e))), + ) + .into_response(); + } + + tracing::info!( + contract_id = %contract_id, + supervisor_task_id = %supervisor_state.task_id, + daemon_id = %target_daemon_id, + message_count = message_count, + "Supervisor resumed with --continue (resume_session=true)" + ); + + // Update response values for successful resume + response_daemon_id = Some(target_daemon_id); + response_status = "starting".to_string(); } "restart_phase" => { // Clear conversation but keep phase progress @@ -1925,13 +1997,13 @@ pub async fn resume_supervisor( Json(ResumeSupervisorResponse { supervisor_task_id: supervisor_state.task_id, - daemon_id: supervisor_task.daemon_id, + daemon_id: response_daemon_id, resumed_from: ResumedFromInfo { phase: contract.phase, last_activity: supervisor_state.last_activity, message_count, }, - status: "pending".to_string(), + status: response_status, }) .into_response() } diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs index 86f38c8..c5736af 100644 --- a/makima/src/server/state.rs +++ b/makima/src/server/state.rs @@ -206,6 +206,12 @@ pub enum DaemonCommand { /// Whether this task is a supervisor (long-running contract orchestrator) #[serde(rename = "isSupervisor")] is_supervisor: bool, + /// Whether to resume from a previous session using --continue flag + #[serde(rename = "resumeSession", default)] + resume_session: bool, + /// Conversation history for fallback when worktree doesn't exist + #[serde(rename = "conversationHistory", default)] + conversation_history: Option<serde_json::Value>, }, /// Pause a running task PauseTask { |
