diff options
Diffstat (limited to 'makima/src/server')
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 83 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_supervisor.rs | 155 |
2 files changed, 145 insertions, 93 deletions
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 4bcb5cd..beb676e 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -20,6 +20,7 @@ use sqlx::Row; use tokio::sync::mpsc; use uuid::Uuid; +use crate::db::models::Task; use crate::db::repository; use crate::server::auth::{hash_api_key, API_KEY_HEADER}; use crate::server::messages::ApiError; @@ -1334,42 +1335,86 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re ); } - // Find tasks assigned to this daemon that are still active - if let Err(e) = clear_daemon_from_tasks(&pool, daemon_uuid).await { + // Find tasks assigned to this daemon and mark for retry or fail permanently + if let Err(e) = handle_daemon_disconnect_tasks(&pool, daemon_uuid).await { tracing::error!( daemon_id = %daemon_uuid, error = %e, - "Failed to clear daemon from tasks on disconnect" + "Failed to handle daemon disconnect for tasks" ); } }); } } -/// Clear daemon_id from tasks when daemon disconnects -async fn clear_daemon_from_tasks(pool: &sqlx::PgPool, daemon_id: Uuid) -> Result<(), sqlx::Error> { - // Update tasks that were running on this daemon to failed state - let result = sqlx::query( +/// Handle tasks when daemon disconnects - mark for retry or fail permanently. +async fn handle_daemon_disconnect_tasks(pool: &sqlx::PgPool, daemon_id: Uuid) -> Result<(), sqlx::Error> { + // Get all active tasks on this daemon + let active_tasks: Vec<Task> = sqlx::query_as( r#" - UPDATE tasks - SET daemon_id = NULL, - status = 'failed', - error_message = 'Daemon disconnected', - updated_at = NOW() + SELECT * FROM tasks WHERE daemon_id = $1 AND status IN ('starting', 'running', 'initializing') "#, ) .bind(daemon_id) - .execute(pool) + .fetch_all(pool) .await?; - if result.rows_affected() > 0 { - tracing::warn!( - daemon_id = %daemon_id, - tasks_affected = result.rows_affected(), - "Marked tasks as failed due to daemon disconnect" - ); + if active_tasks.is_empty() { + return Ok(()); + } + + tracing::info!( + daemon_id = %daemon_id, + task_count = active_tasks.len(), + "Processing tasks for disconnected daemon" + ); + + for task in active_tasks { + if task.retry_count < task.max_retries { + // Mark for retry + match repository::mark_task_for_retry(pool, task.id, daemon_id).await { + Ok(Some(updated_task)) => { + tracing::info!( + task_id = %task.id, + task_name = %task.name, + retry_count = updated_task.retry_count, + max_retries = updated_task.max_retries, + "Task marked for retry after daemon disconnect" + ); + } + Ok(None) => { + tracing::warn!( + task_id = %task.id, + "Task not found or already at max retries" + ); + } + Err(e) => { + tracing::error!( + task_id = %task.id, + error = %e, + "Failed to mark task for retry" + ); + } + } + } else { + // Exceeded retries, mark as permanently failed + if let Err(e) = repository::mark_task_permanently_failed(pool, task.id, daemon_id).await { + tracing::error!( + task_id = %task.id, + error = %e, + "Failed to mark task as permanently failed" + ); + } else { + tracing::warn!( + task_id = %task.id, + task_name = %task.name, + retry_count = task.retry_count + 1, + "Task permanently failed: exceeded maximum retries" + ); + } + } } Ok(()) diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index 1014fdc..754d086 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -256,6 +256,7 @@ async fn verify_supervisor_auth( /// Try to start a pending task on an available daemon. /// Returns Ok(Some(task)) if a task was started, Ok(None) if no tasks could be started. +/// For retried tasks, excludes daemons that previously failed the task. async fn try_start_pending_task( state: &SharedState, contract_id: Uuid, @@ -263,7 +264,7 @@ async fn try_start_pending_task( ) -> Result<Option<Task>, String> { let pool = state.db_pool.as_ref().ok_or("Database not configured")?; - // Get pending tasks for this contract + // Get pending tasks for this contract (includes interrupted tasks awaiting retry) let pending_tasks = repository::get_pending_tasks_for_contract(pool, contract_id, owner_id) .await .map_err(|e| format!("Failed to get pending tasks: {}", e))?; @@ -272,89 +273,95 @@ async fn try_start_pending_task( return Ok(None); } - // Get available daemons with capacity - let daemons = repository::get_available_daemons(pool, owner_id) - .await - .map_err(|e| format!("Failed to get available daemons: {}", e))?; - - // Find a daemon with capacity - let available_daemon = daemons.iter().find(|d| { - d.current_task_count < d.max_concurrent_tasks - && state.daemon_connections.contains_key(&d.connection_id) - }); + // Try each pending task until we find one we can start + for task in &pending_tasks { + // Get excluded daemon IDs for this task (daemons that have already failed it) + let exclude_ids: Vec<Uuid> = task.failed_daemon_ids.clone().unwrap_or_default(); - let daemon = match available_daemon { - Some(d) => d, - None => return Ok(None), // No daemon with capacity - }; + // Get available daemons excluding failed ones for this task + let daemons = repository::get_available_daemons_excluding(pool, owner_id, &exclude_ids) + .await + .map_err(|e| format!("Failed to get available daemons: {}", e))?; - // Try to start the first pending task - let task = &pending_tasks[0]; + // Find a daemon with capacity + let available_daemon = daemons.iter().find(|d| { + d.current_task_count < d.max_concurrent_tasks + && state.daemon_connections.contains_key(&d.connection_id) + }); - // Get repo URL from task or contract - let repo_url = if let Some(url) = &task.repository_url { - Some(url.clone()) - } else { - match repository::list_contract_repositories(pool, contract_id).await { - Ok(repos) => repos - .iter() - .find(|r| r.is_primary) - .or(repos.first()) - .and_then(|r| r.repository_url.clone().or_else(|| r.local_path.clone())), - Err(_) => None, - } - }; + let daemon = match available_daemon { + Some(d) => d, + None => continue, // Try next task + }; - // Update task with daemon assignment - let update_req = UpdateTaskRequest { - status: Some("starting".to_string()), - daemon_id: Some(daemon.id), - version: Some(task.version), - ..Default::default() - }; + // Get repo URL from task or contract + let repo_url = if let Some(url) = &task.repository_url { + Some(url.clone()) + } else { + match repository::list_contract_repositories(pool, contract_id).await { + Ok(repos) => repos + .iter() + .find(|r| r.is_primary) + .or(repos.first()) + .and_then(|r| r.repository_url.clone().or_else(|| r.local_path.clone())), + Err(_) => None, + } + }; - let updated_task = match repository::update_task_for_owner(pool, task.id, owner_id, update_req).await { - Ok(Some(t)) => t, - Ok(None) => return Ok(None), - Err(e) => { - tracing::warn!(task_id = %task.id, error = %e, "Failed to update task for daemon assignment"); - return Ok(None); - } - }; + // Update task with daemon assignment + let update_req = UpdateTaskRequest { + status: Some("starting".to_string()), + daemon_id: Some(daemon.id), + version: Some(task.version), + ..Default::default() + }; - // Send spawn command - let cmd = DaemonCommand::SpawnTask { - task_id: updated_task.id, - task_name: updated_task.name.clone(), - plan: updated_task.plan.clone(), - repo_url, - base_branch: updated_task.base_branch.clone(), - target_branch: updated_task.target_branch.clone(), - parent_task_id: updated_task.parent_task_id, - depth: updated_task.depth, - is_orchestrator: false, - target_repo_path: updated_task.target_repo_path.clone(), - completion_action: updated_task.completion_action.clone(), - continue_from_task_id: updated_task.continue_from_task_id, - copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), - contract_id: updated_task.contract_id, - is_supervisor: false, - }; + let updated_task = match repository::update_task_for_owner(pool, task.id, owner_id, update_req).await { + Ok(Some(t)) => t, + Ok(None) => continue, // Task was modified concurrently, try next + Err(e) => { + tracing::warn!(task_id = %task.id, error = %e, "Failed to update task for daemon assignment"); + continue; // Try next task + } + }; - if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { - tracing::warn!(error = %e, daemon_id = %daemon.id, task_id = %task.id, "Failed to send spawn command"); - // Rollback - let rollback_req = UpdateTaskRequest { - status: Some("pending".to_string()), - clear_daemon_id: true, - ..Default::default() + // Send spawn command + let cmd = DaemonCommand::SpawnTask { + task_id: updated_task.id, + task_name: updated_task.name.clone(), + plan: updated_task.plan.clone(), + repo_url, + base_branch: updated_task.base_branch.clone(), + target_branch: updated_task.target_branch.clone(), + parent_task_id: updated_task.parent_task_id, + depth: updated_task.depth, + is_orchestrator: false, + target_repo_path: updated_task.target_repo_path.clone(), + completion_action: updated_task.completion_action.clone(), + continue_from_task_id: updated_task.continue_from_task_id, + copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), + contract_id: updated_task.contract_id, + is_supervisor: false, }; - let _ = repository::update_task_for_owner(pool, task.id, owner_id, rollback_req).await; - return Ok(None); + + if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { + tracing::warn!(error = %e, daemon_id = %daemon.id, task_id = %task.id, "Failed to send spawn command"); + // Rollback + let rollback_req = UpdateTaskRequest { + status: Some("pending".to_string()), + clear_daemon_id: true, + ..Default::default() + }; + let _ = repository::update_task_for_owner(pool, task.id, owner_id, rollback_req).await; + continue; // Try next task + } + + tracing::info!(task_id = %task.id, daemon_id = %daemon.id, "Started pending task from wait loop"); + return Ok(Some(updated_task)); } - tracing::info!(task_id = %task.id, daemon_id = %daemon.id, "Started pending task from wait loop"); - Ok(Some(updated_task)) + // No tasks could be started + Ok(None) } // ============================================================================= |
