summaryrefslogtreecommitdiff
path: root/makima/src/server/handlers/mesh_daemon.rs
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/server/handlers/mesh_daemon.rs')
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs83
1 files changed, 64 insertions, 19 deletions
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index 4bcb5cd..beb676e 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -20,6 +20,7 @@ use sqlx::Row;
use tokio::sync::mpsc;
use uuid::Uuid;
+use crate::db::models::Task;
use crate::db::repository;
use crate::server::auth::{hash_api_key, API_KEY_HEADER};
use crate::server::messages::ApiError;
@@ -1334,42 +1335,86 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
);
}
- // Find tasks assigned to this daemon that are still active
- if let Err(e) = clear_daemon_from_tasks(&pool, daemon_uuid).await {
+ // Find tasks assigned to this daemon and mark for retry or fail permanently
+ if let Err(e) = handle_daemon_disconnect_tasks(&pool, daemon_uuid).await {
tracing::error!(
daemon_id = %daemon_uuid,
error = %e,
- "Failed to clear daemon from tasks on disconnect"
+ "Failed to handle daemon disconnect for tasks"
);
}
});
}
}
-/// Clear daemon_id from tasks when daemon disconnects
-async fn clear_daemon_from_tasks(pool: &sqlx::PgPool, daemon_id: Uuid) -> Result<(), sqlx::Error> {
- // Update tasks that were running on this daemon to failed state
- let result = sqlx::query(
+/// Handle tasks when daemon disconnects - mark for retry or fail permanently.
+async fn handle_daemon_disconnect_tasks(pool: &sqlx::PgPool, daemon_id: Uuid) -> Result<(), sqlx::Error> {
+ // Get all active tasks on this daemon
+ let active_tasks: Vec<Task> = sqlx::query_as(
r#"
- UPDATE tasks
- SET daemon_id = NULL,
- status = 'failed',
- error_message = 'Daemon disconnected',
- updated_at = NOW()
+ SELECT * FROM tasks
WHERE daemon_id = $1
AND status IN ('starting', 'running', 'initializing')
"#,
)
.bind(daemon_id)
- .execute(pool)
+ .fetch_all(pool)
.await?;
- if result.rows_affected() > 0 {
- tracing::warn!(
- daemon_id = %daemon_id,
- tasks_affected = result.rows_affected(),
- "Marked tasks as failed due to daemon disconnect"
- );
+ if active_tasks.is_empty() {
+ return Ok(());
+ }
+
+ tracing::info!(
+ daemon_id = %daemon_id,
+ task_count = active_tasks.len(),
+ "Processing tasks for disconnected daemon"
+ );
+
+ for task in active_tasks {
+ if task.retry_count < task.max_retries {
+ // Mark for retry
+ match repository::mark_task_for_retry(pool, task.id, daemon_id).await {
+ Ok(Some(updated_task)) => {
+ tracing::info!(
+ task_id = %task.id,
+ task_name = %task.name,
+ retry_count = updated_task.retry_count,
+ max_retries = updated_task.max_retries,
+ "Task marked for retry after daemon disconnect"
+ );
+ }
+ Ok(None) => {
+ tracing::warn!(
+ task_id = %task.id,
+ "Task not found or already at max retries"
+ );
+ }
+ Err(e) => {
+ tracing::error!(
+ task_id = %task.id,
+ error = %e,
+ "Failed to mark task for retry"
+ );
+ }
+ }
+ } else {
+ // Exceeded retries, mark as permanently failed
+ if let Err(e) = repository::mark_task_permanently_failed(pool, task.id, daemon_id).await {
+ tracing::error!(
+ task_id = %task.id,
+ error = %e,
+ "Failed to mark task as permanently failed"
+ );
+ } else {
+ tracing::warn!(
+ task_id = %task.id,
+ task_name = %task.name,
+ retry_count = task.retry_count + 1,
+ "Task permanently failed: exceeded maximum retries"
+ );
+ }
+ }
}
Ok(())