From 75d9644d44ba998a32ed14c072e883a75145ab72 Mon Sep 17 00:00:00 2001 From: soryu Date: Fri, 16 Jan 2026 19:50:27 +0000 Subject: Add autopilot panel and retry system --- makima/src/db/models.rs | 19 ++++++++++ makima/src/db/repository.rs | 92 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 110 insertions(+), 1 deletion(-) (limited to 'makima/src/db') diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index 0e1303c..72ba6f2 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -6,6 +6,11 @@ use sqlx::FromRow; use utoipa::ToSchema; use uuid::Uuid; +/// Default max retries for task daemon failover (3 attempts) +fn default_max_retries() -> i32 { + 3 +} + /// Flexible datetime deserialization module. /// Accepts both date-only ("2026-01-15") and full ISO 8601 datetime ("2026-01-15T00:00:00Z") formats. pub mod flexible_datetime { @@ -500,6 +505,20 @@ pub struct Task { /// Files to copy from parent task's worktree when starting. #[serde(skip_serializing_if = "Option::is_none")] pub copy_files: Option, + + // Retry tracking for daemon failover + /// Number of times this task has been retried after daemon failure + #[serde(default)] + pub retry_count: i32, + /// Maximum retry attempts before marking as permanently failed + #[serde(default = "default_max_retries")] + pub max_retries: i32, + /// Array of daemon IDs that have failed this task (excluded from retry) + #[serde(skip_serializing_if = "Option::is_none")] + pub failed_daemon_ids: Option>, + /// When the task was last interrupted due to daemon disconnect + #[serde(skip_serializing_if = "Option::is_none")] + pub interrupted_at: Option>, } impl Task { diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 2b069d5..43b8e3a 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -790,6 +790,8 @@ pub async fn list_tasks_by_contract( } /// Get pending tasks for a contract (non-supervisor tasks only). +/// Includes tasks that were interrupted (retry candidates). +/// Prioritizes interrupted tasks and excludes those that exceeded max_retries. pub async fn get_pending_tasks_for_contract( pool: &PgPool, contract_id: Uuid, @@ -801,7 +803,11 @@ pub async fn get_pending_tasks_for_contract( WHERE contract_id = $1 AND owner_id = $2 AND status = 'pending' AND is_supervisor = false - ORDER BY priority DESC, created_at ASC + AND retry_count < max_retries + ORDER BY + interrupted_at DESC NULLS LAST, + priority DESC, + created_at ASC "#, ) .bind(contract_id) @@ -810,6 +816,61 @@ pub async fn get_pending_tasks_for_contract( .await } +/// Mark a task as pending for retry after daemon failure. +/// Increments retry count and adds the failed daemon to exclusion list. +pub async fn mark_task_for_retry( + pool: &PgPool, + task_id: Uuid, + failed_daemon_id: Uuid, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, Task>( + r#" + UPDATE tasks + SET status = 'pending', + daemon_id = NULL, + retry_count = retry_count + 1, + failed_daemon_ids = array_append(COALESCE(failed_daemon_ids, '{}'), $2), + last_active_daemon_id = $2, + interrupted_at = NOW(), + error_message = 'Daemon disconnected, awaiting retry', + updated_at = NOW() + WHERE id = $1 + AND retry_count < max_retries + RETURNING * + "#, + ) + .bind(task_id) + .bind(failed_daemon_id) + .fetch_optional(pool) + .await +} + +/// Mark a task as permanently failed (exceeded retry limit). +pub async fn mark_task_permanently_failed( + pool: &PgPool, + task_id: Uuid, + failed_daemon_id: Uuid, +) -> Result<(), sqlx::Error> { + sqlx::query( + r#" + UPDATE tasks + SET status = 'failed', + daemon_id = NULL, + retry_count = retry_count + 1, + failed_daemon_ids = array_append(COALESCE(failed_daemon_ids, '{}'), $2), + last_active_daemon_id = $2, + error_message = 'Task failed: exceeded maximum retry attempts', + updated_at = NOW() + WHERE id = $1 + "#, + ) + .bind(task_id) + .bind(failed_daemon_id) + .execute(pool) + .await?; + Ok(()) +} + /// Update a task by ID with optimistic locking. pub async fn update_task( pool: &PgPool, @@ -3008,6 +3069,35 @@ pub async fn get_available_daemons( .await } +/// Get daemons with capacity info for selection, excluding specified daemon IDs. +/// Used for task retry to avoid reassigning to daemons that have already failed. +pub async fn get_available_daemons_excluding( + pool: &PgPool, + owner_id: Uuid, + exclude_daemon_ids: &[Uuid], +) -> Result, sqlx::Error> { + sqlx::query_as::<_, DaemonWithCapacity>( + r#" + SELECT id, owner_id, connection_id, hostname, machine_id, + max_concurrent_tasks, current_task_count, + capacity_score, task_queue_length, supports_migration, + status, last_heartbeat_at, connected_at + FROM daemons + WHERE owner_id = $1 + AND status = 'connected' + AND id != ALL($2) + ORDER BY + COALESCE(capacity_score, 100) DESC, + (max_concurrent_tasks - current_task_count) DESC, + COALESCE(task_queue_length, 0) ASC + "#, + ) + .bind(owner_id) + .bind(exclude_daemon_ids) + .fetch_all(pool) + .await +} + /// Create a daemon task assignment. pub async fn create_daemon_task_assignment( pool: &PgPool, -- cgit v1.2.3