summaryrefslogtreecommitdiff
path: root/makima/src/db
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-02-01 01:10:26 +0000
committersoryu <soryu@soryu.co>2026-02-01 01:10:26 +0000
commit5055b3f06d8027870b64abd84d9d3875070372e0 (patch)
tree529cf092a656d736d049adeaa9463c14c8db9b8a /makima/src/db
parent96ad3af6051af69e2e8b34b35e8b40926bdd13a1 (diff)
parent11db455af392bc6c86a85a2e453fbe947530852f (diff)
downloadsoryu-makima/contract-management-phase3.tar.gz
soryu-makima/contract-management-phase3.zip
feat: Implement Phase 3.5 - Supervisor Status APImakima/contract-management-phase3
- Add SupervisorStatusResponse for status endpoint - Add SupervisorHeartbeatEntry and history response types - Add SupervisorSyncResponse for sync endpoint - Add HeartbeatHistoryQuery for pagination - Resolve merge conflict keeping both API types and unit tests Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Diffstat (limited to 'makima/src/db')
-rw-r--r--makima/src/db/models.rs76
-rw-r--r--makima/src/db/repository.rs135
2 files changed, 211 insertions, 0 deletions
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs
index fcbd044..abdcce6 100644
--- a/makima/src/db/models.rs
+++ b/makima/src/db/models.rs
@@ -2603,6 +2603,82 @@ impl std::str::FromStr for NotificationSeverity {
}
}
+// ============================================================================
+// Supervisor Status API Types
+// ============================================================================
+
+/// Response for supervisor status endpoint
+#[derive(Debug, Clone, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct SupervisorStatusResponse {
+ /// The supervisor task ID
+ pub task_id: Uuid,
+ /// Current supervisor state (from supervisor_states table)
+ pub state: String,
+ /// Current contract phase
+ pub phase: String,
+ /// Description of current activity (from task progress_summary)
+ pub current_activity: Option<String>,
+ /// Progress percentage (0-100)
+ pub progress: Option<u8>,
+ /// When the supervisor last updated its state
+ pub last_heartbeat: DateTime<Utc>,
+ /// Task IDs the supervisor is currently waiting on
+ pub pending_task_ids: Vec<Uuid>,
+ /// Whether the supervisor is currently running
+ pub is_running: bool,
+}
+
+/// Individual heartbeat entry for history
+#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct SupervisorHeartbeatEntry {
+ /// Timestamp of this heartbeat
+ pub timestamp: DateTime<Utc>,
+ /// Supervisor state at this time
+ pub state: String,
+ /// Activity description at this time
+ pub activity: Option<String>,
+ /// Progress at this time
+ pub progress: Option<u8>,
+ /// Contract phase at this time
+ pub phase: String,
+ /// Pending task IDs at this time
+ pub pending_task_ids: Vec<Uuid>,
+}
+
+/// Response for supervisor heartbeat history endpoint
+#[derive(Debug, Clone, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct SupervisorHeartbeatHistoryResponse {
+ /// List of heartbeat entries
+ pub heartbeats: Vec<SupervisorHeartbeatEntry>,
+ /// Total count of heartbeats (for pagination)
+ pub total: i64,
+}
+
+/// Response for supervisor sync endpoint
+#[derive(Debug, Clone, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct SupervisorSyncResponse {
+ /// Whether the sync was successful
+ pub synced: bool,
+ /// Current supervisor state after sync
+ pub state: String,
+ /// Optional message about the sync result
+ pub message: Option<String>,
+}
+
+/// Query parameters for heartbeat history endpoint
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct HeartbeatHistoryQuery {
+ /// Maximum number of heartbeats to return (default: 10)
+ pub limit: Option<i32>,
+ /// Offset for pagination (default: 0)
+ pub offset: Option<i32>,
+}
+
// =============================================================================
// Unit Tests
// =============================================================================
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index d1ec3ef..e308df7 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -4860,3 +4860,138 @@ pub async fn get_notification_count_for_task(
.map_err(RepositoryError::Database)?;
Ok(result.0)
}
+
+// =============================================================================
+// Supervisor Status API Helpers
+// =============================================================================
+
+/// Get supervisor status for a contract.
+/// Returns combined information from supervisor_states and tasks tables.
+pub async fn get_supervisor_status(
+ pool: &PgPool,
+ contract_id: Uuid,
+ owner_id: Uuid,
+) -> Result<Option<SupervisorStatusInfo>, sqlx::Error> {
+ // Query to get supervisor status by joining supervisor_states with tasks
+ sqlx::query_as::<_, SupervisorStatusInfo>(
+ r#"
+ SELECT
+ ss.task_id,
+ COALESCE(t.status, 'unknown') as supervisor_state,
+ ss.phase,
+ t.progress_summary as current_activity,
+ ss.pending_task_ids,
+ ss.last_activity as last_heartbeat,
+ t.status as task_status,
+ t.daemon_id IS NOT NULL as is_running
+ FROM supervisor_states ss
+ JOIN tasks t ON t.id = ss.task_id
+ WHERE ss.contract_id = $1
+ AND t.owner_id = $2
+ "#,
+ )
+ .bind(contract_id)
+ .bind(owner_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Internal struct to hold supervisor status query result
+#[derive(Debug, Clone, sqlx::FromRow)]
+pub struct SupervisorStatusInfo {
+ pub task_id: Uuid,
+ pub supervisor_state: String,
+ pub phase: String,
+ pub current_activity: Option<String>,
+ #[sqlx(try_from = "Vec<Uuid>")]
+ pub pending_task_ids: Vec<Uuid>,
+ pub last_heartbeat: chrono::DateTime<chrono::Utc>,
+ pub task_status: String,
+ pub is_running: bool,
+}
+
+/// Get supervisor activity history from history_events table.
+/// This provides a timeline of supervisor activities that can serve as "heartbeats".
+pub async fn get_supervisor_activity_history(
+ pool: &PgPool,
+ contract_id: Uuid,
+ limit: i32,
+ offset: i32,
+) -> Result<Vec<SupervisorActivityEntry>, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorActivityEntry>(
+ r#"
+ SELECT
+ created_at as timestamp,
+ COALESCE(event_subtype, 'activity') as state,
+ event_data->>'activity' as activity,
+ (event_data->>'progress')::INTEGER as progress,
+ COALESCE(phase, 'unknown') as phase,
+ CASE
+ WHEN event_data->'pending_task_ids' IS NOT NULL
+ THEN ARRAY(SELECT jsonb_array_elements_text(event_data->'pending_task_ids'))::UUID[]
+ ELSE ARRAY[]::UUID[]
+ END as pending_task_ids
+ FROM history_events
+ WHERE contract_id = $1
+ AND event_type IN ('supervisor', 'phase', 'task')
+ ORDER BY created_at DESC
+ LIMIT $2 OFFSET $3
+ "#,
+ )
+ .bind(contract_id)
+ .bind(limit)
+ .bind(offset)
+ .fetch_all(pool)
+ .await
+}
+
+/// Internal struct to hold supervisor activity entry
+#[derive(Debug, Clone, sqlx::FromRow)]
+pub struct SupervisorActivityEntry {
+ pub timestamp: chrono::DateTime<chrono::Utc>,
+ pub state: String,
+ pub activity: Option<String>,
+ pub progress: Option<i32>,
+ pub phase: String,
+ #[sqlx(try_from = "Vec<Uuid>")]
+ pub pending_task_ids: Vec<Uuid>,
+}
+
+/// Count total supervisor activity history entries for a contract.
+pub async fn count_supervisor_activity_history(
+ pool: &PgPool,
+ contract_id: Uuid,
+) -> Result<i64, sqlx::Error> {
+ let result: (i64,) = sqlx::query_as(
+ r#"
+ SELECT COUNT(*)
+ FROM history_events
+ WHERE contract_id = $1
+ AND event_type IN ('supervisor', 'phase', 'task')
+ "#,
+ )
+ .bind(contract_id)
+ .fetch_one(pool)
+ .await?;
+ Ok(result.0)
+}
+
+/// Update supervisor state last_activity timestamp.
+/// This acts as a "sync" operation to refresh the supervisor's heartbeat.
+pub async fn sync_supervisor_state(
+ pool: &PgPool,
+ contract_id: Uuid,
+) -> Result<Option<SupervisorState>, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorState>(
+ r#"
+ UPDATE supervisor_states
+ SET last_activity = NOW(),
+ updated_at = NOW()
+ WHERE contract_id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(contract_id)
+ .fetch_optional(pool)
+ .await
+}