diff options
| author | soryu <soryu@soryu.co> | 2026-01-16 19:50:27 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-17 05:38:07 +0000 |
| commit | 75d9644d44ba998a32ed14c072e883a75145ab72 (patch) | |
| tree | b82dee94632fd40764a92a9b11da24ef21600ed5 /makima/src/db | |
| parent | 6b94b5895ed27e3aef052a1843fb3f334397d1b4 (diff) | |
| download | soryu-75d9644d44ba998a32ed14c072e883a75145ab72.tar.gz soryu-75d9644d44ba998a32ed14c072e883a75145ab72.zip | |
Add autopilot panel and retry system
Diffstat (limited to 'makima/src/db')
| -rw-r--r-- | makima/src/db/models.rs | 19 | ||||
| -rw-r--r-- | makima/src/db/repository.rs | 92 |
2 files changed, 110 insertions, 1 deletions
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index 0e1303c..72ba6f2 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -6,6 +6,11 @@ use sqlx::FromRow; use utoipa::ToSchema; use uuid::Uuid; +/// Default max retries for task daemon failover (3 attempts) +fn default_max_retries() -> i32 { + 3 +} + /// Flexible datetime deserialization module. /// Accepts both date-only ("2026-01-15") and full ISO 8601 datetime ("2026-01-15T00:00:00Z") formats. pub mod flexible_datetime { @@ -500,6 +505,20 @@ pub struct Task { /// Files to copy from parent task's worktree when starting. #[serde(skip_serializing_if = "Option::is_none")] pub copy_files: Option<serde_json::Value>, + + // Retry tracking for daemon failover + /// Number of times this task has been retried after daemon failure + #[serde(default)] + pub retry_count: i32, + /// Maximum retry attempts before marking as permanently failed + #[serde(default = "default_max_retries")] + pub max_retries: i32, + /// Array of daemon IDs that have failed this task (excluded from retry) + #[serde(skip_serializing_if = "Option::is_none")] + pub failed_daemon_ids: Option<Vec<Uuid>>, + /// When the task was last interrupted due to daemon disconnect + #[serde(skip_serializing_if = "Option::is_none")] + pub interrupted_at: Option<DateTime<Utc>>, } impl Task { diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 2b069d5..43b8e3a 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -790,6 +790,8 @@ pub async fn list_tasks_by_contract( } /// Get pending tasks for a contract (non-supervisor tasks only). +/// Includes tasks that were interrupted (retry candidates). +/// Prioritizes interrupted tasks and excludes those that exceeded max_retries. pub async fn get_pending_tasks_for_contract( pool: &PgPool, contract_id: Uuid, @@ -801,7 +803,11 @@ pub async fn get_pending_tasks_for_contract( WHERE contract_id = $1 AND owner_id = $2 AND status = 'pending' AND is_supervisor = false - ORDER BY priority DESC, created_at ASC + AND retry_count < max_retries + ORDER BY + interrupted_at DESC NULLS LAST, + priority DESC, + created_at ASC "#, ) .bind(contract_id) @@ -810,6 +816,61 @@ pub async fn get_pending_tasks_for_contract( .await } +/// Mark a task as pending for retry after daemon failure. +/// Increments retry count and adds the failed daemon to exclusion list. +pub async fn mark_task_for_retry( + pool: &PgPool, + task_id: Uuid, + failed_daemon_id: Uuid, +) -> Result<Option<Task>, sqlx::Error> { + sqlx::query_as::<_, Task>( + r#" + UPDATE tasks + SET status = 'pending', + daemon_id = NULL, + retry_count = retry_count + 1, + failed_daemon_ids = array_append(COALESCE(failed_daemon_ids, '{}'), $2), + last_active_daemon_id = $2, + interrupted_at = NOW(), + error_message = 'Daemon disconnected, awaiting retry', + updated_at = NOW() + WHERE id = $1 + AND retry_count < max_retries + RETURNING * + "#, + ) + .bind(task_id) + .bind(failed_daemon_id) + .fetch_optional(pool) + .await +} + +/// Mark a task as permanently failed (exceeded retry limit). +pub async fn mark_task_permanently_failed( + pool: &PgPool, + task_id: Uuid, + failed_daemon_id: Uuid, +) -> Result<(), sqlx::Error> { + sqlx::query( + r#" + UPDATE tasks + SET status = 'failed', + daemon_id = NULL, + retry_count = retry_count + 1, + failed_daemon_ids = array_append(COALESCE(failed_daemon_ids, '{}'), $2), + last_active_daemon_id = $2, + error_message = 'Task failed: exceeded maximum retry attempts', + updated_at = NOW() + WHERE id = $1 + "#, + ) + .bind(task_id) + .bind(failed_daemon_id) + .execute(pool) + .await?; + Ok(()) +} + /// Update a task by ID with optimistic locking. pub async fn update_task( pool: &PgPool, @@ -3008,6 +3069,35 @@ pub async fn get_available_daemons( .await } +/// Get daemons with capacity info for selection, excluding specified daemon IDs. +/// Used for task retry to avoid reassigning to daemons that have already failed. +pub async fn get_available_daemons_excluding( + pool: &PgPool, + owner_id: Uuid, + exclude_daemon_ids: &[Uuid], +) -> Result<Vec<DaemonWithCapacity>, sqlx::Error> { + sqlx::query_as::<_, DaemonWithCapacity>( + r#" + SELECT id, owner_id, connection_id, hostname, machine_id, + max_concurrent_tasks, current_task_count, + capacity_score, task_queue_length, supports_migration, + status, last_heartbeat_at, connected_at + FROM daemons + WHERE owner_id = $1 + AND status = 'connected' + AND id != ALL($2) + ORDER BY + COALESCE(capacity_score, 100) DESC, + (max_concurrent_tasks - current_task_count) DESC, + COALESCE(task_queue_length, 0) ASC + "#, + ) + .bind(owner_id) + .bind(exclude_daemon_ids) + .fetch_all(pool) + .await +} + /// Create a daemon task assignment. pub async fn create_daemon_task_assignment( pool: &PgPool, |
