diff options
Diffstat (limited to 'makima/src/db')
| -rw-r--r-- | makima/src/db/models.rs | 76 | ||||
| -rw-r--r-- | makima/src/db/repository.rs | 135 |
2 files changed, 211 insertions, 0 deletions
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index fcbd044..abdcce6 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -2603,6 +2603,82 @@ impl std::str::FromStr for NotificationSeverity { } } +// ============================================================================ +// Supervisor Status API Types +// ============================================================================ + +/// Response for supervisor status endpoint +#[derive(Debug, Clone, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorStatusResponse { + /// The supervisor task ID + pub task_id: Uuid, + /// Current supervisor state (from supervisor_states table) + pub state: String, + /// Current contract phase + pub phase: String, + /// Description of current activity (from task progress_summary) + pub current_activity: Option<String>, + /// Progress percentage (0-100) + pub progress: Option<u8>, + /// When the supervisor last updated its state + pub last_heartbeat: DateTime<Utc>, + /// Task IDs the supervisor is currently waiting on + pub pending_task_ids: Vec<Uuid>, + /// Whether the supervisor is currently running + pub is_running: bool, +} + +/// Individual heartbeat entry for history +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorHeartbeatEntry { + /// Timestamp of this heartbeat + pub timestamp: DateTime<Utc>, + /// Supervisor state at this time + pub state: String, + /// Activity description at this time + pub activity: Option<String>, + /// Progress at this time + pub progress: Option<u8>, + /// Contract phase at this time + pub phase: String, + /// Pending task IDs at this time + pub pending_task_ids: Vec<Uuid>, +} + +/// Response for supervisor heartbeat history endpoint +#[derive(Debug, Clone, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorHeartbeatHistoryResponse { + /// List of heartbeat entries + pub heartbeats: Vec<SupervisorHeartbeatEntry>, + /// Total count of heartbeats (for pagination) + pub total: i64, +} + +/// Response for supervisor sync endpoint +#[derive(Debug, Clone, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorSyncResponse { + /// Whether the sync was successful + pub synced: bool, + /// Current supervisor state after sync + pub state: String, + /// Optional message about the sync result + pub message: Option<String>, +} + +/// Query parameters for heartbeat history endpoint +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct HeartbeatHistoryQuery { + /// Maximum number of heartbeats to return (default: 10) + pub limit: Option<i32>, + /// Offset for pagination (default: 0) + pub offset: Option<i32>, +} + // ============================================================================= // Unit Tests // ============================================================================= diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index d1ec3ef..e308df7 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -4860,3 +4860,138 @@ pub async fn get_notification_count_for_task( .map_err(RepositoryError::Database)?; Ok(result.0) } + +// ============================================================================= +// Supervisor Status API Helpers +// ============================================================================= + +/// Get supervisor status for a contract. +/// Returns combined information from supervisor_states and tasks tables. +pub async fn get_supervisor_status( + pool: &PgPool, + contract_id: Uuid, + owner_id: Uuid, +) -> Result<Option<SupervisorStatusInfo>, sqlx::Error> { + // Query to get supervisor status by joining supervisor_states with tasks + sqlx::query_as::<_, SupervisorStatusInfo>( + r#" + SELECT + ss.task_id, + COALESCE(t.status, 'unknown') as supervisor_state, + ss.phase, + t.progress_summary as current_activity, + ss.pending_task_ids, + ss.last_activity as last_heartbeat, + t.status as task_status, + t.daemon_id IS NOT NULL as is_running + FROM supervisor_states ss + JOIN tasks t ON t.id = ss.task_id + WHERE ss.contract_id = $1 + AND t.owner_id = $2 + "#, + ) + .bind(contract_id) + .bind(owner_id) + .fetch_optional(pool) + .await +} + +/// Internal struct to hold supervisor status query result +#[derive(Debug, Clone, sqlx::FromRow)] +pub struct SupervisorStatusInfo { + pub task_id: Uuid, + pub supervisor_state: String, + pub phase: String, + pub current_activity: Option<String>, + #[sqlx(try_from = "Vec<Uuid>")] + pub pending_task_ids: Vec<Uuid>, + pub last_heartbeat: chrono::DateTime<chrono::Utc>, + pub task_status: String, + pub is_running: bool, +} + +/// Get supervisor activity history from history_events table. +/// This provides a timeline of supervisor activities that can serve as "heartbeats". +pub async fn get_supervisor_activity_history( + pool: &PgPool, + contract_id: Uuid, + limit: i32, + offset: i32, +) -> Result<Vec<SupervisorActivityEntry>, sqlx::Error> { + sqlx::query_as::<_, SupervisorActivityEntry>( + r#" + SELECT + created_at as timestamp, + COALESCE(event_subtype, 'activity') as state, + event_data->>'activity' as activity, + (event_data->>'progress')::INTEGER as progress, + COALESCE(phase, 'unknown') as phase, + CASE + WHEN event_data->'pending_task_ids' IS NOT NULL + THEN ARRAY(SELECT jsonb_array_elements_text(event_data->'pending_task_ids'))::UUID[] + ELSE ARRAY[]::UUID[] + END as pending_task_ids + FROM history_events + WHERE contract_id = $1 + AND event_type IN ('supervisor', 'phase', 'task') + ORDER BY created_at DESC + LIMIT $2 OFFSET $3 + "#, + ) + .bind(contract_id) + .bind(limit) + .bind(offset) + .fetch_all(pool) + .await +} + +/// Internal struct to hold supervisor activity entry +#[derive(Debug, Clone, sqlx::FromRow)] +pub struct SupervisorActivityEntry { + pub timestamp: chrono::DateTime<chrono::Utc>, + pub state: String, + pub activity: Option<String>, + pub progress: Option<i32>, + pub phase: String, + #[sqlx(try_from = "Vec<Uuid>")] + pub pending_task_ids: Vec<Uuid>, +} + +/// Count total supervisor activity history entries for a contract. +pub async fn count_supervisor_activity_history( + pool: &PgPool, + contract_id: Uuid, +) -> Result<i64, sqlx::Error> { + let result: (i64,) = sqlx::query_as( + r#" + SELECT COUNT(*) + FROM history_events + WHERE contract_id = $1 + AND event_type IN ('supervisor', 'phase', 'task') + "#, + ) + .bind(contract_id) + .fetch_one(pool) + .await?; + Ok(result.0) +} + +/// Update supervisor state last_activity timestamp. +/// This acts as a "sync" operation to refresh the supervisor's heartbeat. +pub async fn sync_supervisor_state( + pool: &PgPool, + contract_id: Uuid, +) -> Result<Option<SupervisorState>, sqlx::Error> { + sqlx::query_as::<_, SupervisorState>( + r#" + UPDATE supervisor_states + SET last_activity = NOW(), + updated_at = NOW() + WHERE contract_id = $1 + RETURNING * + "#, + ) + .bind(contract_id) + .fetch_optional(pool) + .await +} |
