From 786510379bed060db2b3742b7dfca671552d2c34 Mon Sep 17 00:00:00 2001 From: soryu Date: Mon, 19 Jan 2026 02:26:09 +0000 Subject: Make sure tasks can continue --- makima/src/server/handlers/mesh_supervisor.rs | 82 +++++++++++++++++++++++++-- 1 file changed, 77 insertions(+), 5 deletions(-) (limited to 'makima/src/server/handlers/mesh_supervisor.rs') diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index a8af3fb..df8f77c 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -351,6 +351,8 @@ async fn try_start_pending_task( copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: updated_task.contract_id, is_supervisor: false, + resume_session: false, + conversation_history: None, }; if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { @@ -646,6 +648,8 @@ pub async fn spawn_task( copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: updated_task.contract_id, is_supervisor: false, + resume_session: false, + conversation_history: None, }; if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { @@ -1845,22 +1849,90 @@ pub async fn resume_supervisor( .map(|arr| arr.len() as i32) .unwrap_or(0); + // Find a connected daemon for this owner + let target_daemon_id = match state.find_alternative_daemon(auth_info.owner_id, &[]) { + Some(id) => id, + None => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new( + "NO_DAEMON", + "No daemons connected for your account. Cannot resume supervisor.", + )), + ) + .into_response(); + } + }; + + // Track response values (may be updated by resume modes) + let mut response_daemon_id = supervisor_task.daemon_id; + let mut response_status = "pending".to_string(); + // Based on resume mode, handle differently match req.resume_mode.as_str() { "continue" => { - // Mark task for reassignment with existing conversation context - if let Err(e) = sqlx::query("UPDATE tasks SET status = 'pending' WHERE id = $1") + // Update task status to starting and assign daemon + if let Err(e) = sqlx::query("UPDATE tasks SET status = 'starting', daemon_id = $1 WHERE id = $2") + .bind(target_daemon_id) .bind(supervisor_state.task_id) .execute(pool) .await { - tracing::error!("Failed to update task status: {}", e); + tracing::error!("Failed to update task for resume: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } + + // Send SpawnTask with resume_session=true to use Claude's --continue + // Include conversation_history as fallback if worktree doesn't exist on target daemon + let command = DaemonCommand::SpawnTask { + task_id: supervisor_state.task_id, + task_name: supervisor_task.name.clone(), + plan: supervisor_task.plan.clone(), + repo_url: supervisor_task.repository_url.clone(), + base_branch: supervisor_task.base_branch.clone(), + target_branch: supervisor_task.target_branch.clone(), + parent_task_id: supervisor_task.parent_task_id, + depth: supervisor_task.depth, + is_orchestrator: false, + target_repo_path: supervisor_task.target_repo_path.clone(), + completion_action: supervisor_task.completion_action.clone(), + continue_from_task_id: supervisor_task.continue_from_task_id, + copy_files: supervisor_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), + contract_id: supervisor_task.contract_id, + is_supervisor: true, + resume_session: true, // Use --continue to preserve conversation + conversation_history: Some(supervisor_state.conversation_history.clone()), // Fallback if worktree missing + }; + + if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { + // Rollback status on failure + let _ = sqlx::query("UPDATE tasks SET status = 'interrupted', daemon_id = NULL WHERE id = $1") + .bind(supervisor_state.task_id) + .execute(pool) + .await; + tracing::error!("Failed to send SpawnTask to daemon: {}", e); + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DAEMON_ERROR", format!("Failed to send to daemon: {}", e))), + ) + .into_response(); + } + + tracing::info!( + contract_id = %contract_id, + supervisor_task_id = %supervisor_state.task_id, + daemon_id = %target_daemon_id, + message_count = message_count, + "Supervisor resumed with --continue (resume_session=true)" + ); + + // Update response values for successful resume + response_daemon_id = Some(target_daemon_id); + response_status = "starting".to_string(); } "restart_phase" => { // Clear conversation but keep phase progress @@ -1925,13 +1997,13 @@ pub async fn resume_supervisor( Json(ResumeSupervisorResponse { supervisor_task_id: supervisor_state.task_id, - daemon_id: supervisor_task.daemon_id, + daemon_id: response_daemon_id, resumed_from: ResumedFromInfo { phase: contract.phase, last_activity: supervisor_state.last_activity, message_count, }, - status: "pending".to_string(), + status: response_status, }) .into_response() } -- cgit v1.2.3