diff options
Diffstat (limited to 'makima/src/server/handlers/mesh_chat.rs')
| -rw-r--r-- | makima/src/server/handlers/mesh_chat.rs | 74 |
1 files changed, 51 insertions, 23 deletions
diff --git a/makima/src/server/handlers/mesh_chat.rs b/makima/src/server/handlers/mesh_chat.rs index 9936ffc..f6b861a 100644 --- a/makima/src/server/handlers/mesh_chat.rs +++ b/makima/src/server/handlers/mesh_chat.rs @@ -1102,6 +1102,33 @@ async fn handle_mesh_request( }; let is_orchestrator = task.depth == 0 && subtask_count > 0; + // IMPORTANT: Update database FIRST to assign daemon_id before sending command + // This prevents race conditions where the task starts but daemon_id is not set + let update_req = crate::db::models::UpdateTaskRequest { + status: Some("starting".to_string()), + daemon_id: Some(target_daemon_id), + version: Some(task.version), + ..Default::default() + }; + + let updated_task = match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { + Ok(Some(t)) => t, + Ok(None) => { + return MeshRequestResult { + success: false, + message: format!("Task {} not found", task_id), + data: None, + } + } + Err(e) => { + return MeshRequestResult { + success: false, + message: format!("Failed to update task: {}", e), + data: None, + } + } + }; + // Send SpawnTask command to daemon let command = DaemonCommand::SpawnTask { task_id, @@ -1123,23 +1150,14 @@ async fn handle_mesh_request( match state.send_daemon_command(target_daemon_id, command).await { Ok(()) => { - // Update task status to running - let update_req = crate::db::models::UpdateTaskRequest { - status: Some("running".to_string()), - version: Some(task.version), - ..Default::default() - }; - - if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { - state.broadcast_task_update(TaskUpdateNotification { - task_id, - owner_id: Some(task.owner_id), - version: updated.version, - status: "running".to_string(), - updated_fields: vec!["status".to_string()], - updated_by: "system".to_string(), - }); - } + state.broadcast_task_update(TaskUpdateNotification { + task_id, + owner_id: Some(task.owner_id), + version: updated_task.version, + status: "starting".to_string(), + updated_fields: vec!["status".to_string(), "daemon_id".to_string()], + updated_by: "system".to_string(), + }); MeshRequestResult { success: true, @@ -1147,15 +1165,25 @@ async fn handle_mesh_request( data: Some(json!({ "taskId": task_id, "daemonId": target_daemon_id, - "status": "running", + "status": "starting", })), } } - Err(e) => MeshRequestResult { - success: false, - message: format!("Failed to start task: {}", e), - data: None, - }, + Err(e) => { + // Rollback: clear daemon_id and reset status since command failed + let rollback_req = crate::db::models::UpdateTaskRequest { + status: Some("pending".to_string()), + clear_daemon_id: true, + ..Default::default() + }; + let _ = repository::update_task_for_owner(pool, task_id, owner_id, rollback_req).await; + + MeshRequestResult { + success: false, + message: format!("Failed to start task: {}", e), + data: None, + } + } } } |
