diff options
Diffstat (limited to 'makima/src/server/handlers/mesh_daemon.rs')
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 83 |
1 files changed, 64 insertions, 19 deletions
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 4bcb5cd..beb676e 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -20,6 +20,7 @@ use sqlx::Row; use tokio::sync::mpsc; use uuid::Uuid; +use crate::db::models::Task; use crate::db::repository; use crate::server::auth::{hash_api_key, API_KEY_HEADER}; use crate::server::messages::ApiError; @@ -1334,42 +1335,86 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re ); } - // Find tasks assigned to this daemon that are still active - if let Err(e) = clear_daemon_from_tasks(&pool, daemon_uuid).await { + // Find tasks assigned to this daemon and mark for retry or fail permanently + if let Err(e) = handle_daemon_disconnect_tasks(&pool, daemon_uuid).await { tracing::error!( daemon_id = %daemon_uuid, error = %e, - "Failed to clear daemon from tasks on disconnect" + "Failed to handle daemon disconnect for tasks" ); } }); } } -/// Clear daemon_id from tasks when daemon disconnects -async fn clear_daemon_from_tasks(pool: &sqlx::PgPool, daemon_id: Uuid) -> Result<(), sqlx::Error> { - // Update tasks that were running on this daemon to failed state - let result = sqlx::query( +/// Handle tasks when daemon disconnects - mark for retry or fail permanently. +async fn handle_daemon_disconnect_tasks(pool: &sqlx::PgPool, daemon_id: Uuid) -> Result<(), sqlx::Error> { + // Get all active tasks on this daemon + let active_tasks: Vec<Task> = sqlx::query_as( r#" - UPDATE tasks - SET daemon_id = NULL, - status = 'failed', - error_message = 'Daemon disconnected', - updated_at = NOW() + SELECT * FROM tasks WHERE daemon_id = $1 AND status IN ('starting', 'running', 'initializing') "#, ) .bind(daemon_id) - .execute(pool) + .fetch_all(pool) .await?; - if result.rows_affected() > 0 { - tracing::warn!( - daemon_id = %daemon_id, - tasks_affected = result.rows_affected(), - "Marked tasks as failed due to daemon disconnect" - ); + if active_tasks.is_empty() { + return Ok(()); + } + + tracing::info!( + daemon_id = %daemon_id, + task_count = active_tasks.len(), + "Processing tasks for disconnected daemon" + ); + + for task in active_tasks { + if task.retry_count < task.max_retries { + // Mark for retry + match repository::mark_task_for_retry(pool, task.id, daemon_id).await { + Ok(Some(updated_task)) => { + tracing::info!( + task_id = %task.id, + task_name = %task.name, + retry_count = updated_task.retry_count, + max_retries = updated_task.max_retries, + "Task marked for retry after daemon disconnect" + ); + } + Ok(None) => { + tracing::warn!( + task_id = %task.id, + "Task not found or already at max retries" + ); + } + Err(e) => { + tracing::error!( + task_id = %task.id, + error = %e, + "Failed to mark task for retry" + ); + } + } + } else { + // Exceeded retries, mark as permanently failed + if let Err(e) = repository::mark_task_permanently_failed(pool, task.id, daemon_id).await { + tracing::error!( + task_id = %task.id, + error = %e, + "Failed to mark task as permanently failed" + ); + } else { + tracing::warn!( + task_id = %task.id, + task_name = %task.name, + retry_count = task.retry_count + 1, + "Task permanently failed: exceeded maximum retries" + ); + } + } } Ok(()) |
