diff options
| author | soryu <soryu@soryu.co> | 2026-02-01 01:10:26 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-02-01 01:10:26 +0000 |
| commit | 5055b3f06d8027870b64abd84d9d3875070372e0 (patch) | |
| tree | 529cf092a656d736d049adeaa9463c14c8db9b8a | |
| parent | 96ad3af6051af69e2e8b34b35e8b40926bdd13a1 (diff) | |
| parent | 11db455af392bc6c86a85a2e453fbe947530852f (diff) | |
| download | soryu-makima/contract-management-phase3.tar.gz soryu-makima/contract-management-phase3.zip | |
feat: Implement Phase 3.5 - Supervisor Status APImakima/contract-management-phase3
- Add SupervisorStatusResponse for status endpoint
- Add SupervisorHeartbeatEntry and history response types
- Add SupervisorSyncResponse for sync endpoint
- Add HeartbeatHistoryQuery for pagination
- Resolve merge conflict keeping both API types and unit tests
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
| -rw-r--r-- | makima/src/db/models.rs | 76 | ||||
| -rw-r--r-- | makima/src/db/repository.rs | 135 | ||||
| -rw-r--r-- | makima/src/server/handlers/contracts.rs | 342 | ||||
| -rw-r--r-- | makima/src/server/mod.rs | 4 |
4 files changed, 557 insertions, 0 deletions
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index fcbd044..abdcce6 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -2603,6 +2603,82 @@ impl std::str::FromStr for NotificationSeverity { } } +// ============================================================================ +// Supervisor Status API Types +// ============================================================================ + +/// Response for supervisor status endpoint +#[derive(Debug, Clone, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorStatusResponse { + /// The supervisor task ID + pub task_id: Uuid, + /// Current supervisor state (from supervisor_states table) + pub state: String, + /// Current contract phase + pub phase: String, + /// Description of current activity (from task progress_summary) + pub current_activity: Option<String>, + /// Progress percentage (0-100) + pub progress: Option<u8>, + /// When the supervisor last updated its state + pub last_heartbeat: DateTime<Utc>, + /// Task IDs the supervisor is currently waiting on + pub pending_task_ids: Vec<Uuid>, + /// Whether the supervisor is currently running + pub is_running: bool, +} + +/// Individual heartbeat entry for history +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorHeartbeatEntry { + /// Timestamp of this heartbeat + pub timestamp: DateTime<Utc>, + /// Supervisor state at this time + pub state: String, + /// Activity description at this time + pub activity: Option<String>, + /// Progress at this time + pub progress: Option<u8>, + /// Contract phase at this time + pub phase: String, + /// Pending task IDs at this time + pub pending_task_ids: Vec<Uuid>, +} + +/// Response for supervisor heartbeat history endpoint +#[derive(Debug, Clone, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorHeartbeatHistoryResponse { + /// List of heartbeat entries + pub heartbeats: Vec<SupervisorHeartbeatEntry>, + /// Total count of heartbeats (for pagination) + pub total: i64, +} + +/// Response for supervisor sync endpoint +#[derive(Debug, Clone, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorSyncResponse { + /// Whether the sync was successful + pub synced: bool, + /// Current supervisor state after sync + pub state: String, + /// Optional message about the sync result + pub message: Option<String>, +} + +/// Query parameters for heartbeat history endpoint +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct HeartbeatHistoryQuery { + /// Maximum number of heartbeats to return (default: 10) + pub limit: Option<i32>, + /// Offset for pagination (default: 0) + pub offset: Option<i32>, +} + // ============================================================================= // Unit Tests // ============================================================================= diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index d1ec3ef..e308df7 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -4860,3 +4860,138 @@ pub async fn get_notification_count_for_task( .map_err(RepositoryError::Database)?; Ok(result.0) } + +// ============================================================================= +// Supervisor Status API Helpers +// ============================================================================= + +/// Get supervisor status for a contract. +/// Returns combined information from supervisor_states and tasks tables. +pub async fn get_supervisor_status( + pool: &PgPool, + contract_id: Uuid, + owner_id: Uuid, +) -> Result<Option<SupervisorStatusInfo>, sqlx::Error> { + // Query to get supervisor status by joining supervisor_states with tasks + sqlx::query_as::<_, SupervisorStatusInfo>( + r#" + SELECT + ss.task_id, + COALESCE(t.status, 'unknown') as supervisor_state, + ss.phase, + t.progress_summary as current_activity, + ss.pending_task_ids, + ss.last_activity as last_heartbeat, + t.status as task_status, + t.daemon_id IS NOT NULL as is_running + FROM supervisor_states ss + JOIN tasks t ON t.id = ss.task_id + WHERE ss.contract_id = $1 + AND t.owner_id = $2 + "#, + ) + .bind(contract_id) + .bind(owner_id) + .fetch_optional(pool) + .await +} + +/// Internal struct to hold supervisor status query result +#[derive(Debug, Clone, sqlx::FromRow)] +pub struct SupervisorStatusInfo { + pub task_id: Uuid, + pub supervisor_state: String, + pub phase: String, + pub current_activity: Option<String>, + #[sqlx(try_from = "Vec<Uuid>")] + pub pending_task_ids: Vec<Uuid>, + pub last_heartbeat: chrono::DateTime<chrono::Utc>, + pub task_status: String, + pub is_running: bool, +} + +/// Get supervisor activity history from history_events table. +/// This provides a timeline of supervisor activities that can serve as "heartbeats". +pub async fn get_supervisor_activity_history( + pool: &PgPool, + contract_id: Uuid, + limit: i32, + offset: i32, +) -> Result<Vec<SupervisorActivityEntry>, sqlx::Error> { + sqlx::query_as::<_, SupervisorActivityEntry>( + r#" + SELECT + created_at as timestamp, + COALESCE(event_subtype, 'activity') as state, + event_data->>'activity' as activity, + (event_data->>'progress')::INTEGER as progress, + COALESCE(phase, 'unknown') as phase, + CASE + WHEN event_data->'pending_task_ids' IS NOT NULL + THEN ARRAY(SELECT jsonb_array_elements_text(event_data->'pending_task_ids'))::UUID[] + ELSE ARRAY[]::UUID[] + END as pending_task_ids + FROM history_events + WHERE contract_id = $1 + AND event_type IN ('supervisor', 'phase', 'task') + ORDER BY created_at DESC + LIMIT $2 OFFSET $3 + "#, + ) + .bind(contract_id) + .bind(limit) + .bind(offset) + .fetch_all(pool) + .await +} + +/// Internal struct to hold supervisor activity entry +#[derive(Debug, Clone, sqlx::FromRow)] +pub struct SupervisorActivityEntry { + pub timestamp: chrono::DateTime<chrono::Utc>, + pub state: String, + pub activity: Option<String>, + pub progress: Option<i32>, + pub phase: String, + #[sqlx(try_from = "Vec<Uuid>")] + pub pending_task_ids: Vec<Uuid>, +} + +/// Count total supervisor activity history entries for a contract. +pub async fn count_supervisor_activity_history( + pool: &PgPool, + contract_id: Uuid, +) -> Result<i64, sqlx::Error> { + let result: (i64,) = sqlx::query_as( + r#" + SELECT COUNT(*) + FROM history_events + WHERE contract_id = $1 + AND event_type IN ('supervisor', 'phase', 'task') + "#, + ) + .bind(contract_id) + .fetch_one(pool) + .await?; + Ok(result.0) +} + +/// Update supervisor state last_activity timestamp. +/// This acts as a "sync" operation to refresh the supervisor's heartbeat. +pub async fn sync_supervisor_state( + pool: &PgPool, + contract_id: Uuid, +) -> Result<Option<SupervisorState>, sqlx::Error> { + sqlx::query_as::<_, SupervisorState>( + r#" + UPDATE supervisor_states + SET last_activity = NOW(), + updated_at = NOW() + WHERE contract_id = $1 + RETURNING * + "#, + ) + .bind(contract_id) + .fetch_optional(pool) + .await +} diff --git a/makima/src/server/handlers/contracts.rs b/makima/src/server/handlers/contracts.rs index 5a87616..01b4610 100644 --- a/makima/src/server/handlers/contracts.rs +++ b/makima/src/server/handlers/contracts.rs @@ -1889,6 +1889,348 @@ async fn cleanup_contract_worktrees( } // ============================================================================= +// Supervisor Status API +// ============================================================================= + +/// Query parameters for supervisor heartbeat history +#[derive(Debug, Deserialize)] +pub struct HeartbeatHistoryQuery { + /// Maximum number of heartbeats to return (default: 10) + pub limit: Option<i32>, + /// Offset for pagination (default: 0) + pub offset: Option<i32>, +} + +/// Get supervisor status for a contract. +#[utoipa::path( + get, + path = "/api/v1/contracts/{id}/supervisor/status", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + responses( + (status = 200, description = "Supervisor status", body = crate::db::models::SupervisorStatusResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract or supervisor not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Contracts" +)] +pub async fn get_supervisor_status( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify contract exists and belongs to owner + let contract = match repository::get_contract_for_owner(pool, id, auth.owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("CONTRACT_NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Check if contract has a supervisor + let supervisor_task_id = match contract.supervisor_task_id { + Some(task_id) => task_id, + None => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("SUPERVISOR_NOT_FOUND", "No supervisor task for this contract")), + ) + .into_response(); + } + }; + + // Get supervisor status from supervisor_states table + match repository::get_supervisor_status(pool, id, auth.owner_id).await { + Ok(Some(status_info)) => { + // Determine if supervisor is actively running + let is_running = status_info.is_running && status_info.task_status == "running"; + + let response = crate::db::models::SupervisorStatusResponse { + task_id: status_info.task_id, + state: status_info.supervisor_state, + phase: status_info.phase, + current_activity: status_info.current_activity, + progress: None, // We don't track progress percentage yet + last_heartbeat: status_info.last_heartbeat, + pending_task_ids: status_info.pending_task_ids, + is_running, + }; + Json(response).into_response() + } + Ok(None) => { + // No supervisor state record exists, but supervisor task might exist + // Try to get info from the task itself + match repository::get_task_for_owner(pool, supervisor_task_id, auth.owner_id).await { + Ok(Some(task)) => { + let is_running = task.daemon_id.is_some() && task.status == "running"; + let response = crate::db::models::SupervisorStatusResponse { + task_id: task.id, + state: task.status.clone(), + phase: contract.phase.clone(), + current_activity: task.progress_summary.clone(), + progress: None, + last_heartbeat: task.updated_at, + pending_task_ids: Vec::new(), + is_running, + }; + Json(response).into_response() + } + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("SUPERVISOR_NOT_FOUND", "Supervisor task not found")), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to get supervisor task {}: {}", supervisor_task_id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } + } + Err(e) => { + tracing::error!("Failed to get supervisor status for contract {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Get supervisor heartbeat history for a contract. +#[utoipa::path( + get, + path = "/api/v1/contracts/{id}/supervisor/heartbeats", + params( + ("id" = Uuid, Path, description = "Contract ID"), + ("limit" = Option<i32>, Query, description = "Maximum number of heartbeats to return (default: 10)"), + ("offset" = Option<i32>, Query, description = "Offset for pagination (default: 0)") + ), + responses( + (status = 200, description = "Supervisor heartbeat history", body = crate::db::models::SupervisorHeartbeatHistoryResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Contracts" +)] +pub async fn get_supervisor_heartbeats( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, + axum::extract::Query(query): axum::extract::Query<HeartbeatHistoryQuery>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify contract exists and belongs to owner + match repository::get_contract_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("CONTRACT_NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + + let limit = query.limit.unwrap_or(10).min(100); // Cap at 100 + let offset = query.offset.unwrap_or(0); + + // Get activity history as heartbeats + let activities = match repository::get_supervisor_activity_history(pool, id, limit, offset).await { + Ok(activities) => activities, + Err(e) => { + tracing::error!("Failed to get supervisor heartbeats for contract {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get total count for pagination + let total = match repository::count_supervisor_activity_history(pool, id).await { + Ok(count) => count, + Err(e) => { + tracing::warn!("Failed to count supervisor heartbeats: {}", e); + activities.len() as i64 + } + }; + + // Convert to heartbeat entries + let heartbeats: Vec<crate::db::models::SupervisorHeartbeatEntry> = activities + .into_iter() + .map(|a| crate::db::models::SupervisorHeartbeatEntry { + timestamp: a.timestamp, + state: a.state, + activity: a.activity, + progress: a.progress.map(|p| p as u8), + phase: a.phase, + pending_task_ids: a.pending_task_ids, + }) + .collect(); + + Json(crate::db::models::SupervisorHeartbeatHistoryResponse { + heartbeats, + total, + }) + .into_response() +} + +/// Sync supervisor state (refresh last_activity timestamp). +#[utoipa::path( + post, + path = "/api/v1/contracts/{id}/supervisor/sync", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + responses( + (status = 200, description = "Supervisor synced", body = crate::db::models::SupervisorSyncResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract or supervisor not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Contracts" +)] +pub async fn sync_supervisor( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify contract exists and belongs to owner + let contract = match repository::get_contract_for_owner(pool, id, auth.owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("CONTRACT_NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Check if contract has a supervisor + if contract.supervisor_task_id.is_none() { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("SUPERVISOR_NOT_FOUND", "No supervisor task for this contract")), + ) + .into_response(); + } + + // Sync supervisor state (update last_activity) + match repository::sync_supervisor_state(pool, id).await { + Ok(Some(_state)) => { + // Get task status to determine current state + let task_status = if let Some(task_id) = contract.supervisor_task_id { + match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { + Ok(Some(task)) => task.status, + _ => "unknown".to_string(), + } + } else { + "unknown".to_string() + }; + + Json(crate::db::models::SupervisorSyncResponse { + synced: true, + state: task_status, + message: Some("Supervisor state synced successfully".to_string()), + }) + .into_response() + } + Ok(None) => { + // No supervisor state exists, return not found + ( + StatusCode::NOT_FOUND, + Json(ApiError::new("SUPERVISOR_NOT_FOUND", "No supervisor state found for this contract")), + ) + .into_response() + } + Err(e) => { + tracing::error!("Failed to sync supervisor state for contract {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +// ============================================================================= // Tests // ============================================================================= diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index 8456006..e5415ae 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -175,6 +175,10 @@ pub fn make_router(state: SharedState) -> Router { // Contract supervisor resume endpoints .route("/contracts/{id}/supervisor/resume", post(mesh_supervisor::resume_supervisor)) .route("/contracts/{id}/supervisor/conversation/rewind", post(mesh_supervisor::rewind_conversation)) + // Contract supervisor status endpoints + .route("/contracts/{id}/supervisor/status", get(contracts::get_supervisor_status)) + .route("/contracts/{id}/supervisor/heartbeats", get(contracts::get_supervisor_heartbeats)) + .route("/contracts/{id}/supervisor/sync", post(contracts::sync_supervisor)) // History endpoints .route("/contracts/{id}/history", get(history::get_contract_history)) .route("/contracts/{id}/supervisor/conversation", get(history::get_supervisor_conversation)) |
