summaryrefslogtreecommitdiff
path: root/makima/src/server/handlers/mesh_supervisor.rs
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/server/handlers/mesh_supervisor.rs')
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs63
1 files changed, 47 insertions, 16 deletions
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index 3fc7dd7..4dc0f0d 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use uuid::Uuid;
-use crate::db::models::{CreateTaskRequest, Task, TaskSummary};
+use crate::db::models::{CreateTaskRequest, Task, TaskSummary, UpdateTaskRequest};
use crate::db::repository;
use crate::server::auth::Authenticated;
use crate::server::handlers::mesh::{extract_auth, AuthSource};
@@ -468,38 +468,69 @@ pub async fn spawn_task(
// Start task on a daemon
// Find a daemon that belongs to this owner
+ let mut updated_task = task;
for entry in state.daemon_connections.iter() {
let daemon = entry.value();
if daemon.owner_id == owner_id {
- // Send spawn command to first available daemon
+ // IMPORTANT: Update database FIRST to assign daemon_id before sending command
+ // This prevents race conditions where the task starts but daemon_id is not set
+ let update_req = UpdateTaskRequest {
+ status: Some("starting".to_string()),
+ daemon_id: Some(daemon.id),
+ version: Some(updated_task.version),
+ ..Default::default()
+ };
+
+ match repository::update_task_for_owner(pool, updated_task.id, owner_id, update_req).await {
+ Ok(Some(t)) => {
+ updated_task = t;
+ }
+ Ok(None) => {
+ tracing::warn!(task_id = %updated_task.id, "Task not found when updating daemon_id");
+ break;
+ }
+ Err(e) => {
+ tracing::error!(task_id = %updated_task.id, error = %e, "Failed to update task with daemon_id");
+ break;
+ }
+ }
+
+ // Send spawn command to daemon
let cmd = DaemonCommand::SpawnTask {
- task_id: task.id,
- task_name: task.name.clone(),
- plan: task.plan.clone(),
+ task_id: updated_task.id,
+ task_name: updated_task.name.clone(),
+ plan: updated_task.plan.clone(),
repo_url: repo_url.clone(),
- base_branch: task.base_branch.clone(),
- target_branch: task.target_branch.clone(),
- parent_task_id: task.parent_task_id,
- depth: task.depth,
+ base_branch: updated_task.base_branch.clone(),
+ target_branch: updated_task.target_branch.clone(),
+ parent_task_id: updated_task.parent_task_id,
+ depth: updated_task.depth,
is_orchestrator: false,
- target_repo_path: task.target_repo_path.clone(),
- completion_action: task.completion_action.clone(),
- continue_from_task_id: task.continue_from_task_id,
- copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
- contract_id: task.contract_id,
+ target_repo_path: updated_task.target_repo_path.clone(),
+ completion_action: updated_task.completion_action.clone(),
+ continue_from_task_id: updated_task.continue_from_task_id,
+ copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
+ contract_id: updated_task.contract_id,
is_supervisor: false,
};
if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
tracing::warn!(error = %e, daemon_id = %daemon.id, "Failed to send spawn command");
+ // Rollback: clear daemon_id and reset status since command failed
+ let rollback_req = UpdateTaskRequest {
+ status: Some("pending".to_string()),
+ clear_daemon_id: true,
+ ..Default::default()
+ };
+ let _ = repository::update_task_for_owner(pool, updated_task.id, owner_id, rollback_req).await;
} else {
- tracing::info!(task_id = %task.id, daemon_id = %daemon.id, repo_url = ?repo_url, "Task spawn command sent");
+ tracing::info!(task_id = %updated_task.id, daemon_id = %daemon.id, repo_url = ?repo_url, "Task spawn command sent");
}
break;
}
}
- (StatusCode::CREATED, Json(task)).into_response()
+ (StatusCode::CREATED, Json(updated_task)).into_response()
}
// =============================================================================