From 8f144f3a811ab40e26514fe60fafbbdd35bad23d Mon Sep 17 00:00:00 2001 From: soryu Date: Sun, 1 Feb 2026 01:07:13 +0000 Subject: feat: Add Supervisor Status API endpoints (Phase 3 Task 3.5) Implement REST API endpoints for querying supervisor status: - GET /api/v1/contracts/{id}/supervisor/status Returns current supervisor status including task_id, state, phase, current_activity, progress, last_heartbeat, and pending_task_ids - GET /api/v1/contracts/{id}/supervisor/heartbeats?limit=10 Returns paginated supervisor activity history from history_events - POST /api/v1/contracts/{id}/supervisor/sync Triggers a sync to refresh the supervisor's last_activity timestamp New types added: - SupervisorStatusResponse - Status endpoint response - SupervisorHeartbeatEntry - Individual heartbeat history entry - SupervisorHeartbeatHistoryResponse - Heartbeat history with pagination - SupervisorSyncResponse - Sync endpoint response - HeartbeatHistoryQuery - Query params for heartbeats endpoint Repository helpers: - get_supervisor_status() - Combined info from supervisor_states and tasks - get_supervisor_activity_history() - Activity timeline from history_events - count_supervisor_activity_history() - Total count for pagination - sync_supervisor_state() - Refresh last_activity timestamp Error handling: - 404 for contract not found (CONTRACT_NOT_FOUND) - 404 for no supervisor (SUPERVISOR_NOT_FOUND) - Proper fallback when supervisor_state record doesn't exist but task does Co-Authored-By: Claude Opus 4.5 --- makima/src/db/models.rs | 76 +++++++++++++++++++++++++ makima/src/db/repository.rs | 135 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 211 insertions(+) (limited to 'makima/src/db') diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index 636d81a..f1e0be0 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -2395,3 +2395,79 @@ impl std::str::FromStr for NotificationSeverity { } } } + +// ============================================================================ +// Supervisor Status API Types +// ============================================================================ + +/// Response for supervisor status endpoint +#[derive(Debug, Clone, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorStatusResponse { + /// The supervisor task ID + pub task_id: Uuid, + /// Current supervisor state (from supervisor_states table) + pub state: String, + /// Current contract phase + pub phase: String, + /// Description of current activity (from task progress_summary) + pub current_activity: Option, + /// Progress percentage (0-100) + pub progress: Option, + /// When the supervisor last updated its state + pub last_heartbeat: DateTime, + /// Task IDs the supervisor is currently waiting on + pub pending_task_ids: Vec, + /// Whether the supervisor is currently running + pub is_running: bool, +} + +/// Individual heartbeat entry for history +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorHeartbeatEntry { + /// Timestamp of this heartbeat + pub timestamp: DateTime, + /// Supervisor state at this time + pub state: String, + /// Activity description at this time + pub activity: Option, + /// Progress at this time + pub progress: Option, + /// Contract phase at this time + pub phase: String, + /// Pending task IDs at this time + pub pending_task_ids: Vec, +} + +/// Response for supervisor heartbeat history endpoint +#[derive(Debug, Clone, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorHeartbeatHistoryResponse { + /// List of heartbeat entries + pub heartbeats: Vec, + /// Total count of heartbeats (for pagination) + pub total: i64, +} + +/// Response for supervisor sync endpoint +#[derive(Debug, Clone, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorSyncResponse { + /// Whether the sync was successful + pub synced: bool, + /// Current supervisor state after sync + pub state: String, + /// Optional message about the sync result + pub message: Option, +} + +/// Query parameters for heartbeat history endpoint +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct HeartbeatHistoryQuery { + /// Maximum number of heartbeats to return (default: 10) + pub limit: Option, + /// Offset for pagination (default: 0) + pub offset: Option, +} diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index b7c5af1..8055488 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -4402,3 +4402,138 @@ pub async fn get_notification_count_for_task( .map_err(RepositoryError::Database)?; Ok(result.0) } + +// ============================================================================= +// Supervisor Status API Helpers +// ============================================================================= + +/// Get supervisor status for a contract. +/// Returns combined information from supervisor_states and tasks tables. +pub async fn get_supervisor_status( + pool: &PgPool, + contract_id: Uuid, + owner_id: Uuid, +) -> Result, sqlx::Error> { + // Query to get supervisor status by joining supervisor_states with tasks + sqlx::query_as::<_, SupervisorStatusInfo>( + r#" + SELECT + ss.task_id, + COALESCE(t.status, 'unknown') as supervisor_state, + ss.phase, + t.progress_summary as current_activity, + ss.pending_task_ids, + ss.last_activity as last_heartbeat, + t.status as task_status, + t.daemon_id IS NOT NULL as is_running + FROM supervisor_states ss + JOIN tasks t ON t.id = ss.task_id + WHERE ss.contract_id = $1 + AND t.owner_id = $2 + "#, + ) + .bind(contract_id) + .bind(owner_id) + .fetch_optional(pool) + .await +} + +/// Internal struct to hold supervisor status query result +#[derive(Debug, Clone, sqlx::FromRow)] +pub struct SupervisorStatusInfo { + pub task_id: Uuid, + pub supervisor_state: String, + pub phase: String, + pub current_activity: Option, + #[sqlx(try_from = "Vec")] + pub pending_task_ids: Vec, + pub last_heartbeat: chrono::DateTime, + pub task_status: String, + pub is_running: bool, +} + +/// Get supervisor activity history from history_events table. +/// This provides a timeline of supervisor activities that can serve as "heartbeats". +pub async fn get_supervisor_activity_history( + pool: &PgPool, + contract_id: Uuid, + limit: i32, + offset: i32, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, SupervisorActivityEntry>( + r#" + SELECT + created_at as timestamp, + COALESCE(event_subtype, 'activity') as state, + event_data->>'activity' as activity, + (event_data->>'progress')::INTEGER as progress, + COALESCE(phase, 'unknown') as phase, + CASE + WHEN event_data->'pending_task_ids' IS NOT NULL + THEN ARRAY(SELECT jsonb_array_elements_text(event_data->'pending_task_ids'))::UUID[] + ELSE ARRAY[]::UUID[] + END as pending_task_ids + FROM history_events + WHERE contract_id = $1 + AND event_type IN ('supervisor', 'phase', 'task') + ORDER BY created_at DESC + LIMIT $2 OFFSET $3 + "#, + ) + .bind(contract_id) + .bind(limit) + .bind(offset) + .fetch_all(pool) + .await +} + +/// Internal struct to hold supervisor activity entry +#[derive(Debug, Clone, sqlx::FromRow)] +pub struct SupervisorActivityEntry { + pub timestamp: chrono::DateTime, + pub state: String, + pub activity: Option, + pub progress: Option, + pub phase: String, + #[sqlx(try_from = "Vec")] + pub pending_task_ids: Vec, +} + +/// Count total supervisor activity history entries for a contract. +pub async fn count_supervisor_activity_history( + pool: &PgPool, + contract_id: Uuid, +) -> Result { + let result: (i64,) = sqlx::query_as( + r#" + SELECT COUNT(*) + FROM history_events + WHERE contract_id = $1 + AND event_type IN ('supervisor', 'phase', 'task') + "#, + ) + .bind(contract_id) + .fetch_one(pool) + .await?; + Ok(result.0) +} + +/// Update supervisor state last_activity timestamp. +/// This acts as a "sync" operation to refresh the supervisor's heartbeat. +pub async fn sync_supervisor_state( + pool: &PgPool, + contract_id: Uuid, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, SupervisorState>( + r#" + UPDATE supervisor_states + SET last_activity = NOW(), + updated_at = NOW() + WHERE contract_id = $1 + RETURNING * + "#, + ) + .bind(contract_id) + .fetch_optional(pool) + .await +} -- cgit v1.2.3