diff options
| author | soryu <soryu@soryu.co> | 2026-01-16 16:57:09 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-16 16:57:09 +0000 |
| commit | dcec90d2c233671e64e412a9f7b883d8db6783ec (patch) | |
| tree | f3c529be277b606501fb1a3232b83343d95fdb0f | |
| parent | 205ab8a223ddf6591a3e8bfc9108506502977c11 (diff) | |
| download | soryu-dcec90d2c233671e64e412a9f7b883d8db6783ec.tar.gz soryu-dcec90d2c233671e64e412a9f7b883d8db6783ec.zip | |
Fixup: set daemon ID on spawned tasks by supervisor
| -rw-r--r-- | makima/src/db/models.rs | 1 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_chat.rs | 74 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_supervisor.rs | 63 |
3 files changed, 98 insertions, 40 deletions
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index d792f2c..6accb48 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -1560,7 +1560,6 @@ pub struct RepositoryHistoryListResponse { /// Request for getting repository suggestions #[derive(Debug, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] pub struct RepositorySuggestionsQuery { /// Filter by source type: 'remote' or 'local' pub source_type: Option<String>, diff --git a/makima/src/server/handlers/mesh_chat.rs b/makima/src/server/handlers/mesh_chat.rs index 9936ffc..f6b861a 100644 --- a/makima/src/server/handlers/mesh_chat.rs +++ b/makima/src/server/handlers/mesh_chat.rs @@ -1102,6 +1102,33 @@ async fn handle_mesh_request( }; let is_orchestrator = task.depth == 0 && subtask_count > 0; + // IMPORTANT: Update database FIRST to assign daemon_id before sending command + // This prevents race conditions where the task starts but daemon_id is not set + let update_req = crate::db::models::UpdateTaskRequest { + status: Some("starting".to_string()), + daemon_id: Some(target_daemon_id), + version: Some(task.version), + ..Default::default() + }; + + let updated_task = match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { + Ok(Some(t)) => t, + Ok(None) => { + return MeshRequestResult { + success: false, + message: format!("Task {} not found", task_id), + data: None, + } + } + Err(e) => { + return MeshRequestResult { + success: false, + message: format!("Failed to update task: {}", e), + data: None, + } + } + }; + // Send SpawnTask command to daemon let command = DaemonCommand::SpawnTask { task_id, @@ -1123,23 +1150,14 @@ async fn handle_mesh_request( match state.send_daemon_command(target_daemon_id, command).await { Ok(()) => { - // Update task status to running - let update_req = crate::db::models::UpdateTaskRequest { - status: Some("running".to_string()), - version: Some(task.version), - ..Default::default() - }; - - if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { - state.broadcast_task_update(TaskUpdateNotification { - task_id, - owner_id: Some(task.owner_id), - version: updated.version, - status: "running".to_string(), - updated_fields: vec!["status".to_string()], - updated_by: "system".to_string(), - }); - } + state.broadcast_task_update(TaskUpdateNotification { + task_id, + owner_id: Some(task.owner_id), + version: updated_task.version, + status: "starting".to_string(), + updated_fields: vec!["status".to_string(), "daemon_id".to_string()], + updated_by: "system".to_string(), + }); MeshRequestResult { success: true, @@ -1147,15 +1165,25 @@ async fn handle_mesh_request( data: Some(json!({ "taskId": task_id, "daemonId": target_daemon_id, - "status": "running", + "status": "starting", })), } } - Err(e) => MeshRequestResult { - success: false, - message: format!("Failed to start task: {}", e), - data: None, - }, + Err(e) => { + // Rollback: clear daemon_id and reset status since command failed + let rollback_req = crate::db::models::UpdateTaskRequest { + status: Some("pending".to_string()), + clear_daemon_id: true, + ..Default::default() + }; + let _ = repository::update_task_for_owner(pool, task_id, owner_id, rollback_req).await; + + MeshRequestResult { + success: false, + message: format!("Failed to start task: {}", e), + data: None, + } + } } } diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index 3fc7dd7..4dc0f0d 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use uuid::Uuid; -use crate::db::models::{CreateTaskRequest, Task, TaskSummary}; +use crate::db::models::{CreateTaskRequest, Task, TaskSummary, UpdateTaskRequest}; use crate::db::repository; use crate::server::auth::Authenticated; use crate::server::handlers::mesh::{extract_auth, AuthSource}; @@ -468,38 +468,69 @@ pub async fn spawn_task( // Start task on a daemon // Find a daemon that belongs to this owner + let mut updated_task = task; for entry in state.daemon_connections.iter() { let daemon = entry.value(); if daemon.owner_id == owner_id { - // Send spawn command to first available daemon + // IMPORTANT: Update database FIRST to assign daemon_id before sending command + // This prevents race conditions where the task starts but daemon_id is not set + let update_req = UpdateTaskRequest { + status: Some("starting".to_string()), + daemon_id: Some(daemon.id), + version: Some(updated_task.version), + ..Default::default() + }; + + match repository::update_task_for_owner(pool, updated_task.id, owner_id, update_req).await { + Ok(Some(t)) => { + updated_task = t; + } + Ok(None) => { + tracing::warn!(task_id = %updated_task.id, "Task not found when updating daemon_id"); + break; + } + Err(e) => { + tracing::error!(task_id = %updated_task.id, error = %e, "Failed to update task with daemon_id"); + break; + } + } + + // Send spawn command to daemon let cmd = DaemonCommand::SpawnTask { - task_id: task.id, - task_name: task.name.clone(), - plan: task.plan.clone(), + task_id: updated_task.id, + task_name: updated_task.name.clone(), + plan: updated_task.plan.clone(), repo_url: repo_url.clone(), - base_branch: task.base_branch.clone(), - target_branch: task.target_branch.clone(), - parent_task_id: task.parent_task_id, - depth: task.depth, + base_branch: updated_task.base_branch.clone(), + target_branch: updated_task.target_branch.clone(), + parent_task_id: updated_task.parent_task_id, + depth: updated_task.depth, is_orchestrator: false, - target_repo_path: task.target_repo_path.clone(), - completion_action: task.completion_action.clone(), - continue_from_task_id: task.continue_from_task_id, - copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), - contract_id: task.contract_id, + target_repo_path: updated_task.target_repo_path.clone(), + completion_action: updated_task.completion_action.clone(), + continue_from_task_id: updated_task.continue_from_task_id, + copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), + contract_id: updated_task.contract_id, is_supervisor: false, }; if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { tracing::warn!(error = %e, daemon_id = %daemon.id, "Failed to send spawn command"); + // Rollback: clear daemon_id and reset status since command failed + let rollback_req = UpdateTaskRequest { + status: Some("pending".to_string()), + clear_daemon_id: true, + ..Default::default() + }; + let _ = repository::update_task_for_owner(pool, updated_task.id, owner_id, rollback_req).await; } else { - tracing::info!(task_id = %task.id, daemon_id = %daemon.id, repo_url = ?repo_url, "Task spawn command sent"); + tracing::info!(task_id = %updated_task.id, daemon_id = %daemon.id, repo_url = ?repo_url, "Task spawn command sent"); } break; } } - (StatusCode::CREATED, Json(task)).into_response() + (StatusCode::CREATED, Json(updated_task)).into_response() } // ============================================================================= |
