diff options
Diffstat (limited to 'makima/src/db/repository.rs')
| -rw-r--r-- | makima/src/db/repository.rs | 160 |
1 files changed, 157 insertions, 3 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index b7c5af1..1ac188c 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -12,9 +12,9 @@ use super::models::{ CreateFileRequest, CreateTaskRequest, CreateTemplateRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, DeliverableDefinition, File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters, MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult, - PhaseConfig, PhaseDefinition, RedTeamNotification, SupervisorState, Task, TaskCheckpoint, - TaskEvent, TaskSummary, UpdateContractRequest, UpdateFileRequest, UpdateTaskRequest, - UpdateTemplateRequest, + PhaseConfig, PhaseDefinition, RedTeamNotification, SupervisorHeartbeatRecord, SupervisorState, + Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest, UpdateFileRequest, + UpdateTaskRequest, UpdateTemplateRequest, }; /// Repository error types. @@ -3405,6 +3405,160 @@ pub async fn update_supervisor_pending_tasks( } // ============================================================================ +// Supervisor Heartbeats +// ============================================================================ + +/// Record a supervisor heartbeat. +/// This creates a historical record for monitoring and dead supervisor detection. +pub async fn create_supervisor_heartbeat( + pool: &PgPool, + supervisor_task_id: Uuid, + contract_id: Uuid, + state: &str, + phase: &str, + current_activity: Option<&str>, + progress: i32, + pending_task_ids: &[Uuid], +) -> Result<SupervisorHeartbeatRecord, sqlx::Error> { + sqlx::query_as::<_, SupervisorHeartbeatRecord>( + r#" + INSERT INTO supervisor_heartbeats ( + supervisor_task_id, contract_id, state, phase, current_activity, progress, pending_task_ids, timestamp + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, NOW()) + RETURNING * + "#, + ) + .bind(supervisor_task_id) + .bind(contract_id) + .bind(state) + .bind(phase) + .bind(current_activity) + .bind(progress) + .bind(pending_task_ids) + .fetch_one(pool) + .await +} + +/// Get the latest heartbeat for a supervisor task. +pub async fn get_latest_supervisor_heartbeat( + pool: &PgPool, + supervisor_task_id: Uuid, +) -> Result<Option<SupervisorHeartbeatRecord>, sqlx::Error> { + sqlx::query_as::<_, SupervisorHeartbeatRecord>( + r#" + SELECT * FROM supervisor_heartbeats + WHERE supervisor_task_id = $1 + ORDER BY timestamp DESC + LIMIT 1 + "#, + ) + .bind(supervisor_task_id) + .fetch_optional(pool) + .await +} + +/// Get recent heartbeats for a supervisor task. +pub async fn get_supervisor_heartbeats( + pool: &PgPool, + supervisor_task_id: Uuid, + limit: i64, +) -> Result<Vec<SupervisorHeartbeatRecord>, sqlx::Error> { + sqlx::query_as::<_, SupervisorHeartbeatRecord>( + r#" + SELECT * FROM supervisor_heartbeats + WHERE supervisor_task_id = $1 + ORDER BY timestamp DESC + LIMIT $2 + "#, + ) + .bind(supervisor_task_id) + .bind(limit) + .fetch_all(pool) + .await +} + +/// Get recent heartbeats for a contract. +pub async fn get_contract_supervisor_heartbeats( + pool: &PgPool, + contract_id: Uuid, + limit: i64, +) -> Result<Vec<SupervisorHeartbeatRecord>, sqlx::Error> { + sqlx::query_as::<_, SupervisorHeartbeatRecord>( + r#" + SELECT * FROM supervisor_heartbeats + WHERE contract_id = $1 + ORDER BY timestamp DESC + LIMIT $2 + "#, + ) + .bind(contract_id) + .bind(limit) + .fetch_all(pool) + .await +} + +/// Delete old heartbeats beyond the TTL (24 hours by default). +/// Returns the number of deleted records. +pub async fn cleanup_old_heartbeats( + pool: &PgPool, + ttl_hours: i64, +) -> Result<u64, sqlx::Error> { + let result = sqlx::query( + r#" + DELETE FROM supervisor_heartbeats + WHERE timestamp < NOW() - ($1 || ' hours')::INTERVAL + "#, + ) + .bind(ttl_hours.to_string()) + .execute(pool) + .await?; + + Ok(result.rows_affected()) +} + +/// Find supervisors that have not sent a heartbeat within the timeout period. +/// Returns list of (supervisor_task_id, contract_id, last_heartbeat_timestamp). +pub async fn find_stale_supervisors( + pool: &PgPool, + timeout_seconds: i64, +) -> Result<Vec<(Uuid, Uuid, chrono::DateTime<Utc>)>, sqlx::Error> { + let rows = sqlx::query( + r#" + WITH latest_heartbeats AS ( + SELECT DISTINCT ON (supervisor_task_id) + supervisor_task_id, + contract_id, + timestamp + FROM supervisor_heartbeats + ORDER BY supervisor_task_id, timestamp DESC + ) + SELECT + lh.supervisor_task_id, + lh.contract_id, + lh.timestamp + FROM latest_heartbeats lh + JOIN tasks t ON t.id = lh.supervisor_task_id + WHERE t.status = 'running' + AND lh.timestamp < NOW() - ($1 || ' seconds')::INTERVAL + "#, + ) + .bind(timeout_seconds.to_string()) + .fetch_all(pool) + .await?; + + let mut result = Vec::new(); + for row in rows { + use sqlx::Row; + let supervisor_task_id: Uuid = row.get("supervisor_task_id"); + let contract_id: Uuid = row.get("contract_id"); + let timestamp: chrono::DateTime<Utc> = row.get("timestamp"); + result.push((supervisor_task_id, contract_id, timestamp)); + } + Ok(result) +} + +// ============================================================================ // Contract Supervisor // ============================================================================ |
