summaryrefslogtreecommitdiff
path: root/makima/src/db
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-16 19:50:27 +0000
committersoryu <soryu@soryu.co>2026-01-17 05:38:07 +0000
commit75d9644d44ba998a32ed14c072e883a75145ab72 (patch)
treeb82dee94632fd40764a92a9b11da24ef21600ed5 /makima/src/db
parent6b94b5895ed27e3aef052a1843fb3f334397d1b4 (diff)
downloadsoryu-75d9644d44ba998a32ed14c072e883a75145ab72.tar.gz
soryu-75d9644d44ba998a32ed14c072e883a75145ab72.zip
Add autopilot panel and retry system
Diffstat (limited to 'makima/src/db')
-rw-r--r--makima/src/db/models.rs19
-rw-r--r--makima/src/db/repository.rs92
2 files changed, 110 insertions, 1 deletions
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs
index 0e1303c..72ba6f2 100644
--- a/makima/src/db/models.rs
+++ b/makima/src/db/models.rs
@@ -6,6 +6,11 @@ use sqlx::FromRow;
use utoipa::ToSchema;
use uuid::Uuid;
+/// Default max retries for task daemon failover (3 attempts)
+fn default_max_retries() -> i32 {
+ 3
+}
+
/// Flexible datetime deserialization module.
/// Accepts both date-only ("2026-01-15") and full ISO 8601 datetime ("2026-01-15T00:00:00Z") formats.
pub mod flexible_datetime {
@@ -500,6 +505,20 @@ pub struct Task {
/// Files to copy from parent task's worktree when starting.
#[serde(skip_serializing_if = "Option::is_none")]
pub copy_files: Option<serde_json::Value>,
+
+ // Retry tracking for daemon failover
+ /// Number of times this task has been retried after daemon failure
+ #[serde(default)]
+ pub retry_count: i32,
+ /// Maximum retry attempts before marking as permanently failed
+ #[serde(default = "default_max_retries")]
+ pub max_retries: i32,
+ /// Array of daemon IDs that have failed this task (excluded from retry)
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub failed_daemon_ids: Option<Vec<Uuid>>,
+ /// When the task was last interrupted due to daemon disconnect
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub interrupted_at: Option<DateTime<Utc>>,
}
impl Task {
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index 2b069d5..43b8e3a 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -790,6 +790,8 @@ pub async fn list_tasks_by_contract(
}
/// Get pending tasks for a contract (non-supervisor tasks only).
+/// Includes tasks that were interrupted (retry candidates).
+/// Prioritizes interrupted tasks and excludes those that exceeded max_retries.
pub async fn get_pending_tasks_for_contract(
pool: &PgPool,
contract_id: Uuid,
@@ -801,7 +803,11 @@ pub async fn get_pending_tasks_for_contract(
WHERE contract_id = $1 AND owner_id = $2
AND status = 'pending'
AND is_supervisor = false
- ORDER BY priority DESC, created_at ASC
+ AND retry_count < max_retries
+ ORDER BY
+ interrupted_at DESC NULLS LAST,
+ priority DESC,
+ created_at ASC
"#,
)
.bind(contract_id)
@@ -810,6 +816,61 @@ pub async fn get_pending_tasks_for_contract(
.await
}
+/// Mark a task as pending for retry after daemon failure.
+/// Increments retry count and adds the failed daemon to exclusion list.
+pub async fn mark_task_for_retry(
+ pool: &PgPool,
+ task_id: Uuid,
+ failed_daemon_id: Uuid,
+) -> Result<Option<Task>, sqlx::Error> {
+ sqlx::query_as::<_, Task>(
+ r#"
+ UPDATE tasks
+ SET status = 'pending',
+ daemon_id = NULL,
+ retry_count = retry_count + 1,
+ failed_daemon_ids = array_append(COALESCE(failed_daemon_ids, '{}'), $2),
+ last_active_daemon_id = $2,
+ interrupted_at = NOW(),
+ error_message = 'Daemon disconnected, awaiting retry',
+ updated_at = NOW()
+ WHERE id = $1
+ AND retry_count < max_retries
+ RETURNING *
+ "#,
+ )
+ .bind(task_id)
+ .bind(failed_daemon_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Mark a task as permanently failed (exceeded retry limit).
+pub async fn mark_task_permanently_failed(
+ pool: &PgPool,
+ task_id: Uuid,
+ failed_daemon_id: Uuid,
+) -> Result<(), sqlx::Error> {
+ sqlx::query(
+ r#"
+ UPDATE tasks
+ SET status = 'failed',
+ daemon_id = NULL,
+ retry_count = retry_count + 1,
+ failed_daemon_ids = array_append(COALESCE(failed_daemon_ids, '{}'), $2),
+ last_active_daemon_id = $2,
+ error_message = 'Task failed: exceeded maximum retry attempts',
+ updated_at = NOW()
+ WHERE id = $1
+ "#,
+ )
+ .bind(task_id)
+ .bind(failed_daemon_id)
+ .execute(pool)
+ .await?;
+ Ok(())
+}
+
/// Update a task by ID with optimistic locking.
pub async fn update_task(
pool: &PgPool,
@@ -3008,6 +3069,35 @@ pub async fn get_available_daemons(
.await
}
+/// Get daemons with capacity info for selection, excluding specified daemon IDs.
+/// Used for task retry to avoid reassigning to daemons that have already failed.
+pub async fn get_available_daemons_excluding(
+ pool: &PgPool,
+ owner_id: Uuid,
+ exclude_daemon_ids: &[Uuid],
+) -> Result<Vec<DaemonWithCapacity>, sqlx::Error> {
+ sqlx::query_as::<_, DaemonWithCapacity>(
+ r#"
+ SELECT id, owner_id, connection_id, hostname, machine_id,
+ max_concurrent_tasks, current_task_count,
+ capacity_score, task_queue_length, supports_migration,
+ status, last_heartbeat_at, connected_at
+ FROM daemons
+ WHERE owner_id = $1
+ AND status = 'connected'
+ AND id != ALL($2)
+ ORDER BY
+ COALESCE(capacity_score, 100) DESC,
+ (max_concurrent_tasks - current_task_count) DESC,
+ COALESCE(task_queue_length, 0) ASC
+ "#,
+ )
+ .bind(owner_id)
+ .bind(exclude_daemon_ids)
+ .fetch_all(pool)
+ .await
+}
+
/// Create a daemon task assignment.
pub async fn create_daemon_task_assignment(
pool: &PgPool,