diff options
| author | soryu <soryu@soryu.co> | 2026-01-19 02:26:09 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-19 02:26:09 +0000 |
| commit | 786510379bed060db2b3742b7dfca671552d2c34 (patch) | |
| tree | 432d0b575a64ef0fb2cb10a86cd916b5fbc16909 /makima/src/server/handlers | |
| parent | b64eddc8c2f250cdcbacae18cce107bf4c86f9f4 (diff) | |
| download | soryu-786510379bed060db2b3742b7dfca671552d2c34.tar.gz soryu-786510379bed060db2b3742b7dfca671552d2c34.zip | |
Make sure tasks can continue
Diffstat (limited to 'makima/src/server/handlers')
| -rw-r--r-- | makima/src/server/handlers/contract_chat.rs | 2 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh.rs | 10 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_chat.rs | 2 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_supervisor.rs | 82 |
4 files changed, 91 insertions, 5 deletions
diff --git a/makima/src/server/handlers/contract_chat.rs b/makima/src/server/handlers/contract_chat.rs index 29ec620..7e7b476 100644 --- a/makima/src/server/handlers/contract_chat.rs +++ b/makima/src/server/handlers/contract_chat.rs @@ -1589,6 +1589,8 @@ async fn handle_contract_request( copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, + resume_session: false, + conversation_history: None, }; if let Err(e) = command_sender.send(command).await { diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index e9a421c..ad3ec79 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -719,6 +719,8 @@ pub async fn start_task( copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, + resume_session: false, + conversation_history: None, }; tracing::info!( @@ -766,6 +768,8 @@ pub async fn start_task( copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, + resume_session: false, + conversation_history: None, }; if state.send_daemon_command(alt_daemon_id, alt_command).await.is_ok() { @@ -1165,6 +1169,8 @@ pub async fn send_message( copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: updated_task.contract_id, is_supervisor: updated_task.is_supervisor, + resume_session: false, + conversation_history: None, }; if state.send_daemon_command(new_daemon_id, spawn_cmd).await.is_ok() { @@ -2299,6 +2305,8 @@ pub async fn reassign_task( copy_files: None, contract_id: task.contract_id, is_supervisor: task.is_supervisor, + resume_session: false, + conversation_history: None, }; tracing::info!( @@ -2621,6 +2629,8 @@ pub async fn continue_task( copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, + resume_session: false, + conversation_history: None, }; tracing::info!( diff --git a/makima/src/server/handlers/mesh_chat.rs b/makima/src/server/handlers/mesh_chat.rs index f6b861a..c468446 100644 --- a/makima/src/server/handlers/mesh_chat.rs +++ b/makima/src/server/handlers/mesh_chat.rs @@ -1146,6 +1146,8 @@ async fn handle_mesh_request( copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, + resume_session: false, + conversation_history: None, }; match state.send_daemon_command(target_daemon_id, command).await { diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index a8af3fb..df8f77c 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -351,6 +351,8 @@ async fn try_start_pending_task( copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: updated_task.contract_id, is_supervisor: false, + resume_session: false, + conversation_history: None, }; if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { @@ -646,6 +648,8 @@ pub async fn spawn_task( copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: updated_task.contract_id, is_supervisor: false, + resume_session: false, + conversation_history: None, }; if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { @@ -1845,22 +1849,90 @@ pub async fn resume_supervisor( .map(|arr| arr.len() as i32) .unwrap_or(0); + // Find a connected daemon for this owner + let target_daemon_id = match state.find_alternative_daemon(auth_info.owner_id, &[]) { + Some(id) => id, + None => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new( + "NO_DAEMON", + "No daemons connected for your account. Cannot resume supervisor.", + )), + ) + .into_response(); + } + }; + + // Track response values (may be updated by resume modes) + let mut response_daemon_id = supervisor_task.daemon_id; + let mut response_status = "pending".to_string(); + // Based on resume mode, handle differently match req.resume_mode.as_str() { "continue" => { - // Mark task for reassignment with existing conversation context - if let Err(e) = sqlx::query("UPDATE tasks SET status = 'pending' WHERE id = $1") + // Update task status to starting and assign daemon + if let Err(e) = sqlx::query("UPDATE tasks SET status = 'starting', daemon_id = $1 WHERE id = $2") + .bind(target_daemon_id) .bind(supervisor_state.task_id) .execute(pool) .await { - tracing::error!("Failed to update task status: {}", e); + tracing::error!("Failed to update task for resume: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } + + // Send SpawnTask with resume_session=true to use Claude's --continue + // Include conversation_history as fallback if worktree doesn't exist on target daemon + let command = DaemonCommand::SpawnTask { + task_id: supervisor_state.task_id, + task_name: supervisor_task.name.clone(), + plan: supervisor_task.plan.clone(), + repo_url: supervisor_task.repository_url.clone(), + base_branch: supervisor_task.base_branch.clone(), + target_branch: supervisor_task.target_branch.clone(), + parent_task_id: supervisor_task.parent_task_id, + depth: supervisor_task.depth, + is_orchestrator: false, + target_repo_path: supervisor_task.target_repo_path.clone(), + completion_action: supervisor_task.completion_action.clone(), + continue_from_task_id: supervisor_task.continue_from_task_id, + copy_files: supervisor_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), + contract_id: supervisor_task.contract_id, + is_supervisor: true, + resume_session: true, // Use --continue to preserve conversation + conversation_history: Some(supervisor_state.conversation_history.clone()), // Fallback if worktree missing + }; + + if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { + // Rollback status on failure + let _ = sqlx::query("UPDATE tasks SET status = 'interrupted', daemon_id = NULL WHERE id = $1") + .bind(supervisor_state.task_id) + .execute(pool) + .await; + tracing::error!("Failed to send SpawnTask to daemon: {}", e); + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DAEMON_ERROR", format!("Failed to send to daemon: {}", e))), + ) + .into_response(); + } + + tracing::info!( + contract_id = %contract_id, + supervisor_task_id = %supervisor_state.task_id, + daemon_id = %target_daemon_id, + message_count = message_count, + "Supervisor resumed with --continue (resume_session=true)" + ); + + // Update response values for successful resume + response_daemon_id = Some(target_daemon_id); + response_status = "starting".to_string(); } "restart_phase" => { // Clear conversation but keep phase progress @@ -1925,13 +1997,13 @@ pub async fn resume_supervisor( Json(ResumeSupervisorResponse { supervisor_task_id: supervisor_state.task_id, - daemon_id: supervisor_task.daemon_id, + daemon_id: response_daemon_id, resumed_from: ResumedFromInfo { phase: contract.phase, last_activity: supervisor_state.last_activity, message_count, }, - status: "pending".to_string(), + status: response_status, }) .into_response() } |
