diff options
| author | soryu <soryu@soryu.co> | 2026-01-18 18:35:44 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-18 18:35:44 +0000 |
| commit | 273da072fa0573c935798dc723ed79fd71ab037a (patch) | |
| tree | 02972b8d5e65a90b763995895bdb8f92a175c0d0 | |
| parent | e0da93a20a965125ba4cbb46e3e0e179f06c2a08 (diff) | |
| download | soryu-273da072fa0573c935798dc723ed79fd71ab037a.tar.gz soryu-273da072fa0573c935798dc723ed79fd71ab037a.zip | |
Fix: Find alternate daemon
| -rw-r--r-- | makima/src/server/handlers/mesh.rs | 161 | ||||
| -rw-r--r-- | makima/src/server/state.rs | 23 |
2 files changed, 173 insertions, 11 deletions
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index f8df69f..f710be1 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -629,12 +629,12 @@ pub async fn start_task( .into_response(); } - // Find an available daemon belonging to this owner - let target_daemon_id = match state.daemon_connections - .iter() - .find(|d| d.value().owner_id == auth.owner_id) - { - Some(d) => d.value().id, + // Get list of daemons that have previously failed this task + let mut exclude_daemon_ids: Vec<Uuid> = task.failed_daemon_ids.clone().unwrap_or_default(); + + // Find an available daemon belonging to this owner, excluding failed ones + let target_daemon_id = match state.find_alternative_daemon(auth.owner_id, &exclude_daemon_ids) { + Some(id) => id, None => { return ( StatusCode::SERVICE_UNAVAILABLE, @@ -729,18 +729,79 @@ pub async fn start_task( "Sending SpawnTask command to daemon" ); - if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { - tracing::error!("Failed to send SpawnTask command: {}", e); - // Rollback: clear daemon_id and reset status since command failed + if let Err(e) = state.send_daemon_command(target_daemon_id, command.clone()).await { + tracing::warn!( + task_id = %id, + daemon_id = %target_daemon_id, + error = %e, + "Failed to send SpawnTask command, trying alternative daemon" + ); + + // Add this daemon to exclude list and try another + exclude_daemon_ids.push(target_daemon_id); + + // Try to find an alternative daemon + if let Some(alt_daemon_id) = state.find_alternative_daemon(auth.owner_id, &exclude_daemon_ids) { + // Update task with new daemon + let alt_update_req = UpdateTaskRequest { + daemon_id: Some(alt_daemon_id), + ..Default::default() + }; + + if let Ok(Some(alt_updated_task)) = repository::update_task_for_owner(pool, id, auth.owner_id, alt_update_req).await { + // Recreate command with same data but try new daemon + let alt_command = DaemonCommand::SpawnTask { + task_id: id, + task_name: task.name.clone(), + plan: task.plan.clone(), + repo_url: task.repository_url.clone(), + base_branch: task.base_branch.clone(), + target_branch: task.target_branch.clone(), + parent_task_id: task.parent_task_id, + depth: task.depth, + is_orchestrator, + target_repo_path: task.target_repo_path.clone(), + completion_action: task.completion_action.clone(), + continue_from_task_id: task.continue_from_task_id, + copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), + contract_id: task.contract_id, + is_supervisor: task.is_supervisor, + }; + + if state.send_daemon_command(alt_daemon_id, alt_command).await.is_ok() { + tracing::info!( + task_id = %id, + old_daemon_id = %target_daemon_id, + new_daemon_id = %alt_daemon_id, + "Task started on alternative daemon after first daemon failed" + ); + + // Broadcast task update notification + state.broadcast_task_update(TaskUpdateNotification { + task_id: id, + owner_id: Some(auth.owner_id), + version: alt_updated_task.version, + status: "starting".to_string(), + updated_fields: vec!["status".to_string(), "daemon_id".to_string()], + updated_by: "system".to_string(), + }); + + return Json(alt_updated_task).into_response(); + } + } + } + + // All daemons failed - rollback and return error + tracing::error!("Failed to start task on any daemon: {}", e); let rollback_req = UpdateTaskRequest { status: Some("pending".to_string()), - clear_daemon_id: true, // Explicitly clear daemon_id + clear_daemon_id: true, ..Default::default() }; let _ = repository::update_task_for_owner(pool, id, auth.owner_id, rollback_req).await; return ( StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DAEMON_ERROR", e)), + Json(ApiError::new("DAEMON_ERROR", "Failed to start task on any available daemon")), ) .into_response(); } @@ -1060,6 +1121,84 @@ pub async fn send_message( .into_response(); }; + // Check if daemon is connected before trying to send + if !state.is_daemon_connected(target_daemon_id) { + tracing::warn!( + task_id = %id, + daemon_id = %target_daemon_id, + "Daemon not connected, attempting to reallocate task" + ); + + // Get list of failed daemons for this task + let mut exclude_daemons = task.failed_daemon_ids.clone().unwrap_or_default(); + exclude_daemons.push(target_daemon_id); + + // Try to find an alternative daemon + if let Some(new_daemon_id) = state.find_alternative_daemon(auth.owner_id, &exclude_daemons) { + // Mark the task for retry and update with new daemon + if let Some(ref pool) = state.db_pool { + // Mark the old daemon as failed for this task + let _ = repository::mark_task_for_retry(pool, id, target_daemon_id).await; + + // Update task with new daemon and restart + let update_req = UpdateTaskRequest { + status: Some("starting".to_string()), + daemon_id: Some(new_daemon_id), + ..Default::default() + }; + + if let Ok(Some(updated_task)) = repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await { + // Send spawn command to new daemon + let spawn_cmd = DaemonCommand::SpawnTask { + task_id: id, + task_name: updated_task.name.clone(), + plan: updated_task.plan.clone(), + repo_url: updated_task.repository_url.clone(), + base_branch: updated_task.base_branch.clone(), + target_branch: updated_task.target_branch.clone(), + parent_task_id: updated_task.parent_task_id, + depth: updated_task.depth, + is_orchestrator: false, + target_repo_path: updated_task.target_repo_path.clone(), + completion_action: updated_task.completion_action.clone(), + continue_from_task_id: updated_task.continue_from_task_id, + copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), + contract_id: updated_task.contract_id, + is_supervisor: updated_task.is_supervisor, + }; + + if state.send_daemon_command(new_daemon_id, spawn_cmd).await.is_ok() { + tracing::info!( + task_id = %id, + old_daemon_id = %target_daemon_id, + new_daemon_id = %new_daemon_id, + "Task reallocated to new daemon, will restart" + ); + + return ( + StatusCode::ACCEPTED, + Json(serde_json::json!({ + "success": true, + "reallocated": true, + "taskId": id, + "message": "Task was reallocated to a new daemon and is restarting. Please retry your message shortly." + })), + ).into_response(); + } + } + } + } + + // Could not reallocate - return error with helpful message + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new( + "DAEMON_DISCONNECTED", + "The daemon running this task has disconnected and no alternative daemon is available. Please start a daemon.", + )), + ).into_response(); + } + // Send SendMessage command to daemon let command = DaemonCommand::SendMessage { task_id: id, diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs index 479eadf..d022834 100644 --- a/makima/src/server/state.rs +++ b/makima/src/server/state.rs @@ -884,6 +884,29 @@ impl AppState { .collect() } + /// Find an alternative daemon for a task, excluding specified daemon IDs. + /// Returns the daemon ID and connection info if found. + pub fn find_alternative_daemon( + &self, + owner_id: Uuid, + exclude_daemon_ids: &[Uuid], + ) -> Option<Uuid> { + self.daemon_connections + .iter() + .find(|entry| { + let daemon = entry.value(); + daemon.owner_id == owner_id && !exclude_daemon_ids.contains(&daemon.id) + }) + .map(|entry| entry.value().id) + } + + /// Check if a specific daemon is connected. + pub fn is_daemon_connected(&self, daemon_id: Uuid) -> bool { + self.daemon_connections + .iter() + .any(|entry| entry.value().id == daemon_id) + } + // ========================================================================= // Tool Key Management // ========================================================================= |
