summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-16 16:57:09 +0000
committersoryu <soryu@soryu.co>2026-01-16 16:57:09 +0000
commitdcec90d2c233671e64e412a9f7b883d8db6783ec (patch)
treef3c529be277b606501fb1a3232b83343d95fdb0f
parent205ab8a223ddf6591a3e8bfc9108506502977c11 (diff)
downloadsoryu-dcec90d2c233671e64e412a9f7b883d8db6783ec.tar.gz
soryu-dcec90d2c233671e64e412a9f7b883d8db6783ec.zip
Fixup: set daemon ID on spawned tasks by supervisor
-rw-r--r--makima/src/db/models.rs1
-rw-r--r--makima/src/server/handlers/mesh_chat.rs74
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs63
3 files changed, 98 insertions, 40 deletions
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs
index d792f2c..6accb48 100644
--- a/makima/src/db/models.rs
+++ b/makima/src/db/models.rs
@@ -1560,7 +1560,6 @@ pub struct RepositoryHistoryListResponse {
/// Request for getting repository suggestions
#[derive(Debug, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
pub struct RepositorySuggestionsQuery {
/// Filter by source type: 'remote' or 'local'
pub source_type: Option<String>,
diff --git a/makima/src/server/handlers/mesh_chat.rs b/makima/src/server/handlers/mesh_chat.rs
index 9936ffc..f6b861a 100644
--- a/makima/src/server/handlers/mesh_chat.rs
+++ b/makima/src/server/handlers/mesh_chat.rs
@@ -1102,6 +1102,33 @@ async fn handle_mesh_request(
};
let is_orchestrator = task.depth == 0 && subtask_count > 0;
+ // IMPORTANT: Update database FIRST to assign daemon_id before sending command
+ // This prevents race conditions where the task starts but daemon_id is not set
+ let update_req = crate::db::models::UpdateTaskRequest {
+ status: Some("starting".to_string()),
+ daemon_id: Some(target_daemon_id),
+ version: Some(task.version),
+ ..Default::default()
+ };
+
+ let updated_task = match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Task {} not found", task_id),
+ data: None,
+ }
+ }
+ Err(e) => {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Failed to update task: {}", e),
+ data: None,
+ }
+ }
+ };
+
// Send SpawnTask command to daemon
let command = DaemonCommand::SpawnTask {
task_id,
@@ -1123,23 +1150,14 @@ async fn handle_mesh_request(
match state.send_daemon_command(target_daemon_id, command).await {
Ok(()) => {
- // Update task status to running
- let update_req = crate::db::models::UpdateTaskRequest {
- status: Some("running".to_string()),
- version: Some(task.version),
- ..Default::default()
- };
-
- if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await {
- state.broadcast_task_update(TaskUpdateNotification {
- task_id,
- owner_id: Some(task.owner_id),
- version: updated.version,
- status: "running".to_string(),
- updated_fields: vec!["status".to_string()],
- updated_by: "system".to_string(),
- });
- }
+ state.broadcast_task_update(TaskUpdateNotification {
+ task_id,
+ owner_id: Some(task.owner_id),
+ version: updated_task.version,
+ status: "starting".to_string(),
+ updated_fields: vec!["status".to_string(), "daemon_id".to_string()],
+ updated_by: "system".to_string(),
+ });
MeshRequestResult {
success: true,
@@ -1147,15 +1165,25 @@ async fn handle_mesh_request(
data: Some(json!({
"taskId": task_id,
"daemonId": target_daemon_id,
- "status": "running",
+ "status": "starting",
})),
}
}
- Err(e) => MeshRequestResult {
- success: false,
- message: format!("Failed to start task: {}", e),
- data: None,
- },
+ Err(e) => {
+ // Rollback: clear daemon_id and reset status since command failed
+ let rollback_req = crate::db::models::UpdateTaskRequest {
+ status: Some("pending".to_string()),
+ clear_daemon_id: true,
+ ..Default::default()
+ };
+ let _ = repository::update_task_for_owner(pool, task_id, owner_id, rollback_req).await;
+
+ MeshRequestResult {
+ success: false,
+ message: format!("Failed to start task: {}", e),
+ data: None,
+ }
+ }
}
}
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index 3fc7dd7..4dc0f0d 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use uuid::Uuid;
-use crate::db::models::{CreateTaskRequest, Task, TaskSummary};
+use crate::db::models::{CreateTaskRequest, Task, TaskSummary, UpdateTaskRequest};
use crate::db::repository;
use crate::server::auth::Authenticated;
use crate::server::handlers::mesh::{extract_auth, AuthSource};
@@ -468,38 +468,69 @@ pub async fn spawn_task(
// Start task on a daemon
// Find a daemon that belongs to this owner
+ let mut updated_task = task;
for entry in state.daemon_connections.iter() {
let daemon = entry.value();
if daemon.owner_id == owner_id {
- // Send spawn command to first available daemon
+ // IMPORTANT: Update database FIRST to assign daemon_id before sending command
+ // This prevents race conditions where the task starts but daemon_id is not set
+ let update_req = UpdateTaskRequest {
+ status: Some("starting".to_string()),
+ daemon_id: Some(daemon.id),
+ version: Some(updated_task.version),
+ ..Default::default()
+ };
+
+ match repository::update_task_for_owner(pool, updated_task.id, owner_id, update_req).await {
+ Ok(Some(t)) => {
+ updated_task = t;
+ }
+ Ok(None) => {
+ tracing::warn!(task_id = %updated_task.id, "Task not found when updating daemon_id");
+ break;
+ }
+ Err(e) => {
+ tracing::error!(task_id = %updated_task.id, error = %e, "Failed to update task with daemon_id");
+ break;
+ }
+ }
+
+ // Send spawn command to daemon
let cmd = DaemonCommand::SpawnTask {
- task_id: task.id,
- task_name: task.name.clone(),
- plan: task.plan.clone(),
+ task_id: updated_task.id,
+ task_name: updated_task.name.clone(),
+ plan: updated_task.plan.clone(),
repo_url: repo_url.clone(),
- base_branch: task.base_branch.clone(),
- target_branch: task.target_branch.clone(),
- parent_task_id: task.parent_task_id,
- depth: task.depth,
+ base_branch: updated_task.base_branch.clone(),
+ target_branch: updated_task.target_branch.clone(),
+ parent_task_id: updated_task.parent_task_id,
+ depth: updated_task.depth,
is_orchestrator: false,
- target_repo_path: task.target_repo_path.clone(),
- completion_action: task.completion_action.clone(),
- continue_from_task_id: task.continue_from_task_id,
- copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
- contract_id: task.contract_id,
+ target_repo_path: updated_task.target_repo_path.clone(),
+ completion_action: updated_task.completion_action.clone(),
+ continue_from_task_id: updated_task.continue_from_task_id,
+ copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
+ contract_id: updated_task.contract_id,
is_supervisor: false,
};
if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
tracing::warn!(error = %e, daemon_id = %daemon.id, "Failed to send spawn command");
+ // Rollback: clear daemon_id and reset status since command failed
+ let rollback_req = UpdateTaskRequest {
+ status: Some("pending".to_string()),
+ clear_daemon_id: true,
+ ..Default::default()
+ };
+ let _ = repository::update_task_for_owner(pool, updated_task.id, owner_id, rollback_req).await;
} else {
- tracing::info!(task_id = %task.id, daemon_id = %daemon.id, repo_url = ?repo_url, "Task spawn command sent");
+ tracing::info!(task_id = %updated_task.id, daemon_id = %daemon.id, repo_url = ?repo_url, "Task spawn command sent");
}
break;
}
}
- (StatusCode::CREATED, Json(task)).into_response()
+ (StatusCode::CREATED, Json(updated_task)).into_response()
}
// =============================================================================