summaryrefslogtreecommitdiff
path: root/makima/src/db/repository.rs
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-02-01 00:20:55 +0000
committersoryu <soryu@soryu.co>2026-02-01 00:25:43 +0000
commitbb14010db99b40792372bfcb4348cf4984f30b3f (patch)
treed5c12af5ce8e87430daad3f80a979157233e8644 /makima/src/db/repository.rs
parent7567153e6281b94e39e52be5d060b381ed69597d (diff)
downloadsoryu-bb14010db99b40792372bfcb4348cf4984f30b3f.tar.gz
soryu-bb14010db99b40792372bfcb4348cf4984f30b3f.zip
feat: Implement Phase 3 Tasks 3.1 and 3.2 - SupervisorState enum and Heartbeat Infrastructure
Task 3.1: Enhanced Supervisor State Enum - Add SupervisorStateEnum with states: Initializing, Idle, Working, WaitingForUser, WaitingForTasks, Blocked, Completed, Failed, Interrupted - Implement Display, FromStr, Default, and serde serialization - Add SupervisorHeartbeatRecord and SupervisorHeartbeatRequest structs Task 3.2: Heartbeat Infrastructure - Create supervisor_heartbeats migration with proper indexes and constraints - Add heartbeat storage functions to repository.rs: - create_supervisor_heartbeat - get_latest_supervisor_heartbeat - get_supervisor_heartbeats - get_contract_supervisor_heartbeats - cleanup_old_heartbeats (24 hour TTL support) - find_stale_supervisors (for dead supervisor detection) - Add SupervisorHeartbeat message to protocol.rs with enhanced fields - Update mesh_daemon.rs to process and store supervisor heartbeats - Add unit tests for SupervisorStateEnum and heartbeat serialization Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Diffstat (limited to 'makima/src/db/repository.rs')
-rw-r--r--makima/src/db/repository.rs160
1 files changed, 157 insertions, 3 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index b7c5af1..1ac188c 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -12,9 +12,9 @@ use super::models::{
CreateFileRequest, CreateTaskRequest, CreateTemplateRequest, Daemon, DaemonTaskAssignment,
DaemonWithCapacity, DeliverableDefinition, File, FileSummary, FileVersion, HistoryEvent,
HistoryQueryFilters, MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult,
- PhaseConfig, PhaseDefinition, RedTeamNotification, SupervisorState, Task, TaskCheckpoint,
- TaskEvent, TaskSummary, UpdateContractRequest, UpdateFileRequest, UpdateTaskRequest,
- UpdateTemplateRequest,
+ PhaseConfig, PhaseDefinition, RedTeamNotification, SupervisorHeartbeatRecord, SupervisorState,
+ Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest, UpdateFileRequest,
+ UpdateTaskRequest, UpdateTemplateRequest,
};
/// Repository error types.
@@ -3405,6 +3405,160 @@ pub async fn update_supervisor_pending_tasks(
}
// ============================================================================
+// Supervisor Heartbeats
+// ============================================================================
+
+/// Record a supervisor heartbeat.
+/// This creates a historical record for monitoring and dead supervisor detection.
+pub async fn create_supervisor_heartbeat(
+ pool: &PgPool,
+ supervisor_task_id: Uuid,
+ contract_id: Uuid,
+ state: &str,
+ phase: &str,
+ current_activity: Option<&str>,
+ progress: i32,
+ pending_task_ids: &[Uuid],
+) -> Result<SupervisorHeartbeatRecord, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorHeartbeatRecord>(
+ r#"
+ INSERT INTO supervisor_heartbeats (
+ supervisor_task_id, contract_id, state, phase, current_activity, progress, pending_task_ids, timestamp
+ )
+ VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
+ RETURNING *
+ "#,
+ )
+ .bind(supervisor_task_id)
+ .bind(contract_id)
+ .bind(state)
+ .bind(phase)
+ .bind(current_activity)
+ .bind(progress)
+ .bind(pending_task_ids)
+ .fetch_one(pool)
+ .await
+}
+
+/// Get the latest heartbeat for a supervisor task.
+pub async fn get_latest_supervisor_heartbeat(
+ pool: &PgPool,
+ supervisor_task_id: Uuid,
+) -> Result<Option<SupervisorHeartbeatRecord>, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorHeartbeatRecord>(
+ r#"
+ SELECT * FROM supervisor_heartbeats
+ WHERE supervisor_task_id = $1
+ ORDER BY timestamp DESC
+ LIMIT 1
+ "#,
+ )
+ .bind(supervisor_task_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Get recent heartbeats for a supervisor task.
+pub async fn get_supervisor_heartbeats(
+ pool: &PgPool,
+ supervisor_task_id: Uuid,
+ limit: i64,
+) -> Result<Vec<SupervisorHeartbeatRecord>, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorHeartbeatRecord>(
+ r#"
+ SELECT * FROM supervisor_heartbeats
+ WHERE supervisor_task_id = $1
+ ORDER BY timestamp DESC
+ LIMIT $2
+ "#,
+ )
+ .bind(supervisor_task_id)
+ .bind(limit)
+ .fetch_all(pool)
+ .await
+}
+
+/// Get recent heartbeats for a contract.
+pub async fn get_contract_supervisor_heartbeats(
+ pool: &PgPool,
+ contract_id: Uuid,
+ limit: i64,
+) -> Result<Vec<SupervisorHeartbeatRecord>, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorHeartbeatRecord>(
+ r#"
+ SELECT * FROM supervisor_heartbeats
+ WHERE contract_id = $1
+ ORDER BY timestamp DESC
+ LIMIT $2
+ "#,
+ )
+ .bind(contract_id)
+ .bind(limit)
+ .fetch_all(pool)
+ .await
+}
+
+/// Delete old heartbeats beyond the TTL (24 hours by default).
+/// Returns the number of deleted records.
+pub async fn cleanup_old_heartbeats(
+ pool: &PgPool,
+ ttl_hours: i64,
+) -> Result<u64, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ DELETE FROM supervisor_heartbeats
+ WHERE timestamp < NOW() - ($1 || ' hours')::INTERVAL
+ "#,
+ )
+ .bind(ttl_hours.to_string())
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected())
+}
+
+/// Find supervisors that have not sent a heartbeat within the timeout period.
+/// Returns list of (supervisor_task_id, contract_id, last_heartbeat_timestamp).
+pub async fn find_stale_supervisors(
+ pool: &PgPool,
+ timeout_seconds: i64,
+) -> Result<Vec<(Uuid, Uuid, chrono::DateTime<Utc>)>, sqlx::Error> {
+ let rows = sqlx::query(
+ r#"
+ WITH latest_heartbeats AS (
+ SELECT DISTINCT ON (supervisor_task_id)
+ supervisor_task_id,
+ contract_id,
+ timestamp
+ FROM supervisor_heartbeats
+ ORDER BY supervisor_task_id, timestamp DESC
+ )
+ SELECT
+ lh.supervisor_task_id,
+ lh.contract_id,
+ lh.timestamp
+ FROM latest_heartbeats lh
+ JOIN tasks t ON t.id = lh.supervisor_task_id
+ WHERE t.status = 'running'
+ AND lh.timestamp < NOW() - ($1 || ' seconds')::INTERVAL
+ "#,
+ )
+ .bind(timeout_seconds.to_string())
+ .fetch_all(pool)
+ .await?;
+
+ let mut result = Vec::new();
+ for row in rows {
+ use sqlx::Row;
+ let supervisor_task_id: Uuid = row.get("supervisor_task_id");
+ let contract_id: Uuid = row.get("contract_id");
+ let timestamp: chrono::DateTime<Utc> = row.get("timestamp");
+ result.push((supervisor_task_id, contract_id, timestamp));
+ }
+ Ok(result)
+}
+
+// ============================================================================
// Contract Supervisor
// ============================================================================