From dcec90d2c233671e64e412a9f7b883d8db6783ec Mon Sep 17 00:00:00 2001 From: soryu Date: Fri, 16 Jan 2026 16:57:09 +0000 Subject: Fixup: set daemon ID on spawned tasks by supervisor --- makima/src/server/handlers/mesh_supervisor.rs | 63 ++++++++++++++++++++------- 1 file changed, 47 insertions(+), 16 deletions(-) (limited to 'makima/src/server/handlers/mesh_supervisor.rs') diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index 3fc7dd7..4dc0f0d 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use uuid::Uuid; -use crate::db::models::{CreateTaskRequest, Task, TaskSummary}; +use crate::db::models::{CreateTaskRequest, Task, TaskSummary, UpdateTaskRequest}; use crate::db::repository; use crate::server::auth::Authenticated; use crate::server::handlers::mesh::{extract_auth, AuthSource}; @@ -468,38 +468,69 @@ pub async fn spawn_task( // Start task on a daemon // Find a daemon that belongs to this owner + let mut updated_task = task; for entry in state.daemon_connections.iter() { let daemon = entry.value(); if daemon.owner_id == owner_id { - // Send spawn command to first available daemon + // IMPORTANT: Update database FIRST to assign daemon_id before sending command + // This prevents race conditions where the task starts but daemon_id is not set + let update_req = UpdateTaskRequest { + status: Some("starting".to_string()), + daemon_id: Some(daemon.id), + version: Some(updated_task.version), + ..Default::default() + }; + + match repository::update_task_for_owner(pool, updated_task.id, owner_id, update_req).await { + Ok(Some(t)) => { + updated_task = t; + } + Ok(None) => { + tracing::warn!(task_id = %updated_task.id, "Task not found when updating daemon_id"); + break; + } + Err(e) => { + tracing::error!(task_id = %updated_task.id, error = %e, "Failed to update task with daemon_id"); + break; + } + } + + // Send spawn command to daemon let cmd = DaemonCommand::SpawnTask { - task_id: task.id, - task_name: task.name.clone(), - plan: task.plan.clone(), + task_id: updated_task.id, + task_name: updated_task.name.clone(), + plan: updated_task.plan.clone(), repo_url: repo_url.clone(), - base_branch: task.base_branch.clone(), - target_branch: task.target_branch.clone(), - parent_task_id: task.parent_task_id, - depth: task.depth, + base_branch: updated_task.base_branch.clone(), + target_branch: updated_task.target_branch.clone(), + parent_task_id: updated_task.parent_task_id, + depth: updated_task.depth, is_orchestrator: false, - target_repo_path: task.target_repo_path.clone(), - completion_action: task.completion_action.clone(), - continue_from_task_id: task.continue_from_task_id, - copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), - contract_id: task.contract_id, + target_repo_path: updated_task.target_repo_path.clone(), + completion_action: updated_task.completion_action.clone(), + continue_from_task_id: updated_task.continue_from_task_id, + copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), + contract_id: updated_task.contract_id, is_supervisor: false, }; if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { tracing::warn!(error = %e, daemon_id = %daemon.id, "Failed to send spawn command"); + // Rollback: clear daemon_id and reset status since command failed + let rollback_req = UpdateTaskRequest { + status: Some("pending".to_string()), + clear_daemon_id: true, + ..Default::default() + }; + let _ = repository::update_task_for_owner(pool, updated_task.id, owner_id, rollback_req).await; } else { - tracing::info!(task_id = %task.id, daemon_id = %daemon.id, repo_url = ?repo_url, "Task spawn command sent"); + tracing::info!(task_id = %updated_task.id, daemon_id = %daemon.id, repo_url = ?repo_url, "Task spawn command sent"); } break; } } - (StatusCode::CREATED, Json(task)).into_response() + (StatusCode::CREATED, Json(updated_task)).into_response() } // ============================================================================= -- cgit v1.2.3