summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-18 18:35:44 +0000
committersoryu <soryu@soryu.co>2026-01-18 18:35:44 +0000
commit273da072fa0573c935798dc723ed79fd71ab037a (patch)
tree02972b8d5e65a90b763995895bdb8f92a175c0d0
parente0da93a20a965125ba4cbb46e3e0e179f06c2a08 (diff)
downloadsoryu-273da072fa0573c935798dc723ed79fd71ab037a.tar.gz
soryu-273da072fa0573c935798dc723ed79fd71ab037a.zip
Fix: Find alternate daemon
-rw-r--r--makima/src/server/handlers/mesh.rs161
-rw-r--r--makima/src/server/state.rs23
2 files changed, 173 insertions, 11 deletions
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs
index f8df69f..f710be1 100644
--- a/makima/src/server/handlers/mesh.rs
+++ b/makima/src/server/handlers/mesh.rs
@@ -629,12 +629,12 @@ pub async fn start_task(
.into_response();
}
- // Find an available daemon belonging to this owner
- let target_daemon_id = match state.daemon_connections
- .iter()
- .find(|d| d.value().owner_id == auth.owner_id)
- {
- Some(d) => d.value().id,
+ // Get list of daemons that have previously failed this task
+ let mut exclude_daemon_ids: Vec<Uuid> = task.failed_daemon_ids.clone().unwrap_or_default();
+
+ // Find an available daemon belonging to this owner, excluding failed ones
+ let target_daemon_id = match state.find_alternative_daemon(auth.owner_id, &exclude_daemon_ids) {
+ Some(id) => id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
@@ -729,18 +729,79 @@ pub async fn start_task(
"Sending SpawnTask command to daemon"
);
- if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
- tracing::error!("Failed to send SpawnTask command: {}", e);
- // Rollback: clear daemon_id and reset status since command failed
+ if let Err(e) = state.send_daemon_command(target_daemon_id, command.clone()).await {
+ tracing::warn!(
+ task_id = %id,
+ daemon_id = %target_daemon_id,
+ error = %e,
+ "Failed to send SpawnTask command, trying alternative daemon"
+ );
+
+ // Add this daemon to exclude list and try another
+ exclude_daemon_ids.push(target_daemon_id);
+
+ // Try to find an alternative daemon
+ if let Some(alt_daemon_id) = state.find_alternative_daemon(auth.owner_id, &exclude_daemon_ids) {
+ // Update task with new daemon
+ let alt_update_req = UpdateTaskRequest {
+ daemon_id: Some(alt_daemon_id),
+ ..Default::default()
+ };
+
+ if let Ok(Some(alt_updated_task)) = repository::update_task_for_owner(pool, id, auth.owner_id, alt_update_req).await {
+ // Recreate command with same data but try new daemon
+ let alt_command = DaemonCommand::SpawnTask {
+ task_id: id,
+ task_name: task.name.clone(),
+ plan: task.plan.clone(),
+ repo_url: task.repository_url.clone(),
+ base_branch: task.base_branch.clone(),
+ target_branch: task.target_branch.clone(),
+ parent_task_id: task.parent_task_id,
+ depth: task.depth,
+ is_orchestrator,
+ target_repo_path: task.target_repo_path.clone(),
+ completion_action: task.completion_action.clone(),
+ continue_from_task_id: task.continue_from_task_id,
+ copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
+ contract_id: task.contract_id,
+ is_supervisor: task.is_supervisor,
+ };
+
+ if state.send_daemon_command(alt_daemon_id, alt_command).await.is_ok() {
+ tracing::info!(
+ task_id = %id,
+ old_daemon_id = %target_daemon_id,
+ new_daemon_id = %alt_daemon_id,
+ "Task started on alternative daemon after first daemon failed"
+ );
+
+ // Broadcast task update notification
+ state.broadcast_task_update(TaskUpdateNotification {
+ task_id: id,
+ owner_id: Some(auth.owner_id),
+ version: alt_updated_task.version,
+ status: "starting".to_string(),
+ updated_fields: vec!["status".to_string(), "daemon_id".to_string()],
+ updated_by: "system".to_string(),
+ });
+
+ return Json(alt_updated_task).into_response();
+ }
+ }
+ }
+
+ // All daemons failed - rollback and return error
+ tracing::error!("Failed to start task on any daemon: {}", e);
let rollback_req = UpdateTaskRequest {
status: Some("pending".to_string()),
- clear_daemon_id: true, // Explicitly clear daemon_id
+ clear_daemon_id: true,
..Default::default()
};
let _ = repository::update_task_for_owner(pool, id, auth.owner_id, rollback_req).await;
return (
StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DAEMON_ERROR", e)),
+ Json(ApiError::new("DAEMON_ERROR", "Failed to start task on any available daemon")),
)
.into_response();
}
@@ -1060,6 +1121,84 @@ pub async fn send_message(
.into_response();
};
+ // Check if daemon is connected before trying to send
+ if !state.is_daemon_connected(target_daemon_id) {
+ tracing::warn!(
+ task_id = %id,
+ daemon_id = %target_daemon_id,
+ "Daemon not connected, attempting to reallocate task"
+ );
+
+ // Get list of failed daemons for this task
+ let mut exclude_daemons = task.failed_daemon_ids.clone().unwrap_or_default();
+ exclude_daemons.push(target_daemon_id);
+
+ // Try to find an alternative daemon
+ if let Some(new_daemon_id) = state.find_alternative_daemon(auth.owner_id, &exclude_daemons) {
+ // Mark the task for retry and update with new daemon
+ if let Some(ref pool) = state.db_pool {
+ // Mark the old daemon as failed for this task
+ let _ = repository::mark_task_for_retry(pool, id, target_daemon_id).await;
+
+ // Update task with new daemon and restart
+ let update_req = UpdateTaskRequest {
+ status: Some("starting".to_string()),
+ daemon_id: Some(new_daemon_id),
+ ..Default::default()
+ };
+
+ if let Ok(Some(updated_task)) = repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
+ // Send spawn command to new daemon
+ let spawn_cmd = DaemonCommand::SpawnTask {
+ task_id: id,
+ task_name: updated_task.name.clone(),
+ plan: updated_task.plan.clone(),
+ repo_url: updated_task.repository_url.clone(),
+ base_branch: updated_task.base_branch.clone(),
+ target_branch: updated_task.target_branch.clone(),
+ parent_task_id: updated_task.parent_task_id,
+ depth: updated_task.depth,
+ is_orchestrator: false,
+ target_repo_path: updated_task.target_repo_path.clone(),
+ completion_action: updated_task.completion_action.clone(),
+ continue_from_task_id: updated_task.continue_from_task_id,
+ copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
+ contract_id: updated_task.contract_id,
+ is_supervisor: updated_task.is_supervisor,
+ };
+
+ if state.send_daemon_command(new_daemon_id, spawn_cmd).await.is_ok() {
+ tracing::info!(
+ task_id = %id,
+ old_daemon_id = %target_daemon_id,
+ new_daemon_id = %new_daemon_id,
+ "Task reallocated to new daemon, will restart"
+ );
+
+ return (
+ StatusCode::ACCEPTED,
+ Json(serde_json::json!({
+ "success": true,
+ "reallocated": true,
+ "taskId": id,
+ "message": "Task was reallocated to a new daemon and is restarting. Please retry your message shortly."
+ })),
+ ).into_response();
+ }
+ }
+ }
+ }
+
+ // Could not reallocate - return error with helpful message
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new(
+ "DAEMON_DISCONNECTED",
+ "The daemon running this task has disconnected and no alternative daemon is available. Please start a daemon.",
+ )),
+ ).into_response();
+ }
+
// Send SendMessage command to daemon
let command = DaemonCommand::SendMessage {
task_id: id,
diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs
index 479eadf..d022834 100644
--- a/makima/src/server/state.rs
+++ b/makima/src/server/state.rs
@@ -884,6 +884,29 @@ impl AppState {
.collect()
}
+ /// Find an alternative daemon for a task, excluding specified daemon IDs.
+ /// Returns the daemon ID and connection info if found.
+ pub fn find_alternative_daemon(
+ &self,
+ owner_id: Uuid,
+ exclude_daemon_ids: &[Uuid],
+ ) -> Option<Uuid> {
+ self.daemon_connections
+ .iter()
+ .find(|entry| {
+ let daemon = entry.value();
+ daemon.owner_id == owner_id && !exclude_daemon_ids.contains(&daemon.id)
+ })
+ .map(|entry| entry.value().id)
+ }
+
+ /// Check if a specific daemon is connected.
+ pub fn is_daemon_connected(&self, daemon_id: Uuid) -> bool {
+ self.daemon_connections
+ .iter()
+ .any(|entry| entry.value().id == daemon_id)
+ }
+
// =========================================================================
// Tool Key Management
// =========================================================================