diff options
| author | soryu <soryu@soryu.co> | 2026-01-16 19:50:27 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-17 05:38:07 +0000 |
| commit | 75d9644d44ba998a32ed14c072e883a75145ab72 (patch) | |
| tree | b82dee94632fd40764a92a9b11da24ef21600ed5 /makima/src/db/repository.rs | |
| parent | 6b94b5895ed27e3aef052a1843fb3f334397d1b4 (diff) | |
| download | soryu-75d9644d44ba998a32ed14c072e883a75145ab72.tar.gz soryu-75d9644d44ba998a32ed14c072e883a75145ab72.zip | |
Add autopilot panel and retry system
Diffstat (limited to 'makima/src/db/repository.rs')
| -rw-r--r-- | makima/src/db/repository.rs | 92 |
1 files changed, 91 insertions, 1 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 2b069d5..43b8e3a 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -790,6 +790,8 @@ pub async fn list_tasks_by_contract( } /// Get pending tasks for a contract (non-supervisor tasks only). +/// Includes tasks that were interrupted (retry candidates). +/// Prioritizes interrupted tasks and excludes those that exceeded max_retries. pub async fn get_pending_tasks_for_contract( pool: &PgPool, contract_id: Uuid, @@ -801,7 +803,11 @@ pub async fn get_pending_tasks_for_contract( WHERE contract_id = $1 AND owner_id = $2 AND status = 'pending' AND is_supervisor = false - ORDER BY priority DESC, created_at ASC + AND retry_count < max_retries + ORDER BY + interrupted_at DESC NULLS LAST, + priority DESC, + created_at ASC "#, ) .bind(contract_id) @@ -810,6 +816,61 @@ pub async fn get_pending_tasks_for_contract( .await } +/// Mark a task as pending for retry after daemon failure. +/// Increments retry count and adds the failed daemon to exclusion list. +pub async fn mark_task_for_retry( + pool: &PgPool, + task_id: Uuid, + failed_daemon_id: Uuid, +) -> Result<Option<Task>, sqlx::Error> { + sqlx::query_as::<_, Task>( + r#" + UPDATE tasks + SET status = 'pending', + daemon_id = NULL, + retry_count = retry_count + 1, + failed_daemon_ids = array_append(COALESCE(failed_daemon_ids, '{}'), $2), + last_active_daemon_id = $2, + interrupted_at = NOW(), + error_message = 'Daemon disconnected, awaiting retry', + updated_at = NOW() + WHERE id = $1 + AND retry_count < max_retries + RETURNING * + "#, + ) + .bind(task_id) + .bind(failed_daemon_id) + .fetch_optional(pool) + .await +} + +/// Mark a task as permanently failed (exceeded retry limit). +pub async fn mark_task_permanently_failed( + pool: &PgPool, + task_id: Uuid, + failed_daemon_id: Uuid, +) -> Result<(), sqlx::Error> { + sqlx::query( + r#" + UPDATE tasks + SET status = 'failed', + daemon_id = NULL, + retry_count = retry_count + 1, + failed_daemon_ids = array_append(COALESCE(failed_daemon_ids, '{}'), $2), + last_active_daemon_id = $2, + error_message = 'Task failed: exceeded maximum retry attempts', + updated_at = NOW() + WHERE id = $1 + "#, + ) + .bind(task_id) + .bind(failed_daemon_id) + .execute(pool) + .await?; + Ok(()) +} + /// Update a task by ID with optimistic locking. pub async fn update_task( pool: &PgPool, @@ -3008,6 +3069,35 @@ pub async fn get_available_daemons( .await } +/// Get daemons with capacity info for selection, excluding specified daemon IDs. +/// Used for task retry to avoid reassigning to daemons that have already failed. +pub async fn get_available_daemons_excluding( + pool: &PgPool, + owner_id: Uuid, + exclude_daemon_ids: &[Uuid], +) -> Result<Vec<DaemonWithCapacity>, sqlx::Error> { + sqlx::query_as::<_, DaemonWithCapacity>( + r#" + SELECT id, owner_id, connection_id, hostname, machine_id, + max_concurrent_tasks, current_task_count, + capacity_score, task_queue_length, supports_migration, + status, last_heartbeat_at, connected_at + FROM daemons + WHERE owner_id = $1 + AND status = 'connected' + AND id != ALL($2) + ORDER BY + COALESCE(capacity_score, 100) DESC, + (max_concurrent_tasks - current_task_count) DESC, + COALESCE(task_queue_length, 0) ASC + "#, + ) + .bind(owner_id) + .bind(exclude_daemon_ids) + .fetch_all(pool) + .await +} + /// Create a daemon task assignment. pub async fn create_daemon_task_assignment( pool: &PgPool, |
