summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-02-01 01:07:13 +0000
committersoryu <soryu@soryu.co>2026-02-01 01:07:13 +0000
commit59aa17ea650d3afe34cccf83b2acd41bc1a95a35 (patch)
tree3b7cfe5c02b367f8aa9ae8be84eefa6149608ec9
parent7567153e6281b94e39e52be5d060b381ed69597d (diff)
downloadsoryu-makima/task-task-b2c95676-b2c95676.tar.gz
soryu-makima/task-task-b2c95676-b2c95676.zip
[WIP] Heartbeat checkpoint - 2026-02-01 01:07:13 UTCmakima/task-task-b2c95676-b2c95676
-rw-r--r--makima/src/db/models.rs76
-rw-r--r--makima/src/db/repository.rs135
-rw-r--r--makima/src/server/handlers/contracts.rs342
-rw-r--r--makima/src/server/mod.rs4
4 files changed, 557 insertions, 0 deletions
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs
index 636d81a..f1e0be0 100644
--- a/makima/src/db/models.rs
+++ b/makima/src/db/models.rs
@@ -2395,3 +2395,79 @@ impl std::str::FromStr for NotificationSeverity {
}
}
}
+
+// ============================================================================
+// Supervisor Status API Types
+// ============================================================================
+
+/// Response for supervisor status endpoint
+#[derive(Debug, Clone, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct SupervisorStatusResponse {
+ /// The supervisor task ID
+ pub task_id: Uuid,
+ /// Current supervisor state (from supervisor_states table)
+ pub state: String,
+ /// Current contract phase
+ pub phase: String,
+ /// Description of current activity (from task progress_summary)
+ pub current_activity: Option<String>,
+ /// Progress percentage (0-100)
+ pub progress: Option<u8>,
+ /// When the supervisor last updated its state
+ pub last_heartbeat: DateTime<Utc>,
+ /// Task IDs the supervisor is currently waiting on
+ pub pending_task_ids: Vec<Uuid>,
+ /// Whether the supervisor is currently running
+ pub is_running: bool,
+}
+
+/// Individual heartbeat entry for history
+#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct SupervisorHeartbeatEntry {
+ /// Timestamp of this heartbeat
+ pub timestamp: DateTime<Utc>,
+ /// Supervisor state at this time
+ pub state: String,
+ /// Activity description at this time
+ pub activity: Option<String>,
+ /// Progress at this time
+ pub progress: Option<u8>,
+ /// Contract phase at this time
+ pub phase: String,
+ /// Pending task IDs at this time
+ pub pending_task_ids: Vec<Uuid>,
+}
+
+/// Response for supervisor heartbeat history endpoint
+#[derive(Debug, Clone, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct SupervisorHeartbeatHistoryResponse {
+ /// List of heartbeat entries
+ pub heartbeats: Vec<SupervisorHeartbeatEntry>,
+ /// Total count of heartbeats (for pagination)
+ pub total: i64,
+}
+
+/// Response for supervisor sync endpoint
+#[derive(Debug, Clone, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct SupervisorSyncResponse {
+ /// Whether the sync was successful
+ pub synced: bool,
+ /// Current supervisor state after sync
+ pub state: String,
+ /// Optional message about the sync result
+ pub message: Option<String>,
+}
+
+/// Query parameters for heartbeat history endpoint
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct HeartbeatHistoryQuery {
+ /// Maximum number of heartbeats to return (default: 10)
+ pub limit: Option<i32>,
+ /// Offset for pagination (default: 0)
+ pub offset: Option<i32>,
+}
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index b7c5af1..8055488 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -4402,3 +4402,138 @@ pub async fn get_notification_count_for_task(
.map_err(RepositoryError::Database)?;
Ok(result.0)
}
+
+// =============================================================================
+// Supervisor Status API Helpers
+// =============================================================================
+
+/// Get supervisor status for a contract.
+/// Returns combined information from supervisor_states and tasks tables.
+pub async fn get_supervisor_status(
+ pool: &PgPool,
+ contract_id: Uuid,
+ owner_id: Uuid,
+) -> Result<Option<SupervisorStatusInfo>, sqlx::Error> {
+ // Query to get supervisor status by joining supervisor_states with tasks
+ sqlx::query_as::<_, SupervisorStatusInfo>(
+ r#"
+ SELECT
+ ss.task_id,
+ COALESCE(t.status, 'unknown') as supervisor_state,
+ ss.phase,
+ t.progress_summary as current_activity,
+ ss.pending_task_ids,
+ ss.last_activity as last_heartbeat,
+ t.status as task_status,
+ t.daemon_id IS NOT NULL as is_running
+ FROM supervisor_states ss
+ JOIN tasks t ON t.id = ss.task_id
+ WHERE ss.contract_id = $1
+ AND t.owner_id = $2
+ "#,
+ )
+ .bind(contract_id)
+ .bind(owner_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Internal struct to hold supervisor status query result
+#[derive(Debug, Clone, sqlx::FromRow)]
+pub struct SupervisorStatusInfo {
+ pub task_id: Uuid,
+ pub supervisor_state: String,
+ pub phase: String,
+ pub current_activity: Option<String>,
+ #[sqlx(try_from = "Vec<Uuid>")]
+ pub pending_task_ids: Vec<Uuid>,
+ pub last_heartbeat: chrono::DateTime<chrono::Utc>,
+ pub task_status: String,
+ pub is_running: bool,
+}
+
+/// Get supervisor activity history from history_events table.
+/// This provides a timeline of supervisor activities that can serve as "heartbeats".
+pub async fn get_supervisor_activity_history(
+ pool: &PgPool,
+ contract_id: Uuid,
+ limit: i32,
+ offset: i32,
+) -> Result<Vec<SupervisorActivityEntry>, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorActivityEntry>(
+ r#"
+ SELECT
+ created_at as timestamp,
+ COALESCE(event_subtype, 'activity') as state,
+ event_data->>'activity' as activity,
+ (event_data->>'progress')::INTEGER as progress,
+ COALESCE(phase, 'unknown') as phase,
+ CASE
+ WHEN event_data->'pending_task_ids' IS NOT NULL
+ THEN ARRAY(SELECT jsonb_array_elements_text(event_data->'pending_task_ids'))::UUID[]
+ ELSE ARRAY[]::UUID[]
+ END as pending_task_ids
+ FROM history_events
+ WHERE contract_id = $1
+ AND event_type IN ('supervisor', 'phase', 'task')
+ ORDER BY created_at DESC
+ LIMIT $2 OFFSET $3
+ "#,
+ )
+ .bind(contract_id)
+ .bind(limit)
+ .bind(offset)
+ .fetch_all(pool)
+ .await
+}
+
+/// Internal struct to hold supervisor activity entry
+#[derive(Debug, Clone, sqlx::FromRow)]
+pub struct SupervisorActivityEntry {
+ pub timestamp: chrono::DateTime<chrono::Utc>,
+ pub state: String,
+ pub activity: Option<String>,
+ pub progress: Option<i32>,
+ pub phase: String,
+ #[sqlx(try_from = "Vec<Uuid>")]
+ pub pending_task_ids: Vec<Uuid>,
+}
+
+/// Count total supervisor activity history entries for a contract.
+pub async fn count_supervisor_activity_history(
+ pool: &PgPool,
+ contract_id: Uuid,
+) -> Result<i64, sqlx::Error> {
+ let result: (i64,) = sqlx::query_as(
+ r#"
+ SELECT COUNT(*)
+ FROM history_events
+ WHERE contract_id = $1
+ AND event_type IN ('supervisor', 'phase', 'task')
+ "#,
+ )
+ .bind(contract_id)
+ .fetch_one(pool)
+ .await?;
+ Ok(result.0)
+}
+
+/// Update supervisor state last_activity timestamp.
+/// This acts as a "sync" operation to refresh the supervisor's heartbeat.
+pub async fn sync_supervisor_state(
+ pool: &PgPool,
+ contract_id: Uuid,
+) -> Result<Option<SupervisorState>, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorState>(
+ r#"
+ UPDATE supervisor_states
+ SET last_activity = NOW(),
+ updated_at = NOW()
+ WHERE contract_id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(contract_id)
+ .fetch_optional(pool)
+ .await
+}
diff --git a/makima/src/server/handlers/contracts.rs b/makima/src/server/handlers/contracts.rs
index b15667d..b704586 100644
--- a/makima/src/server/handlers/contracts.rs
+++ b/makima/src/server/handlers/contracts.rs
@@ -1873,6 +1873,348 @@ async fn cleanup_contract_worktrees(
}
// =============================================================================
+// Supervisor Status API
+// =============================================================================
+
+/// Query parameters for supervisor heartbeat history
+#[derive(Debug, Deserialize)]
+pub struct HeartbeatHistoryQuery {
+ /// Maximum number of heartbeats to return (default: 10)
+ pub limit: Option<i32>,
+ /// Offset for pagination (default: 0)
+ pub offset: Option<i32>,
+}
+
+/// Get supervisor status for a contract.
+#[utoipa::path(
+ get,
+ path = "/api/v1/contracts/{id}/supervisor/status",
+ params(
+ ("id" = Uuid, Path, description = "Contract ID")
+ ),
+ responses(
+ (status = 200, description = "Supervisor status", body = crate::db::models::SupervisorStatusResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Contract or supervisor not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Contracts"
+)]
+pub async fn get_supervisor_status(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Verify contract exists and belongs to owner
+ let contract = match repository::get_contract_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(c)) => c,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("CONTRACT_NOT_FOUND", "Contract not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get contract {}: {}", id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Check if contract has a supervisor
+ let supervisor_task_id = match contract.supervisor_task_id {
+ Some(task_id) => task_id,
+ None => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("SUPERVISOR_NOT_FOUND", "No supervisor task for this contract")),
+ )
+ .into_response();
+ }
+ };
+
+ // Get supervisor status from supervisor_states table
+ match repository::get_supervisor_status(pool, id, auth.owner_id).await {
+ Ok(Some(status_info)) => {
+ // Determine if supervisor is actively running
+ let is_running = status_info.is_running && status_info.task_status == "running";
+
+ let response = crate::db::models::SupervisorStatusResponse {
+ task_id: status_info.task_id,
+ state: status_info.supervisor_state,
+ phase: status_info.phase,
+ current_activity: status_info.current_activity,
+ progress: None, // We don't track progress percentage yet
+ last_heartbeat: status_info.last_heartbeat,
+ pending_task_ids: status_info.pending_task_ids,
+ is_running,
+ };
+ Json(response).into_response()
+ }
+ Ok(None) => {
+ // No supervisor state record exists, but supervisor task might exist
+ // Try to get info from the task itself
+ match repository::get_task_for_owner(pool, supervisor_task_id, auth.owner_id).await {
+ Ok(Some(task)) => {
+ let is_running = task.daemon_id.is_some() && task.status == "running";
+ let response = crate::db::models::SupervisorStatusResponse {
+ task_id: task.id,
+ state: task.status.clone(),
+ phase: contract.phase.clone(),
+ current_activity: task.progress_summary.clone(),
+ progress: None,
+ last_heartbeat: task.updated_at,
+ pending_task_ids: Vec::new(),
+ is_running,
+ };
+ Json(response).into_response()
+ }
+ Ok(None) => (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("SUPERVISOR_NOT_FOUND", "Supervisor task not found")),
+ )
+ .into_response(),
+ Err(e) => {
+ tracing::error!("Failed to get supervisor task {}: {}", supervisor_task_id, e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ }
+ }
+ Err(e) => {
+ tracing::error!("Failed to get supervisor status for contract {}: {}", id, e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ }
+}
+
+/// Get supervisor heartbeat history for a contract.
+#[utoipa::path(
+ get,
+ path = "/api/v1/contracts/{id}/supervisor/heartbeats",
+ params(
+ ("id" = Uuid, Path, description = "Contract ID"),
+ ("limit" = Option<i32>, Query, description = "Maximum number of heartbeats to return (default: 10)"),
+ ("offset" = Option<i32>, Query, description = "Offset for pagination (default: 0)")
+ ),
+ responses(
+ (status = 200, description = "Supervisor heartbeat history", body = crate::db::models::SupervisorHeartbeatHistoryResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Contract not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Contracts"
+)]
+pub async fn get_supervisor_heartbeats(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+ axum::extract::Query(query): axum::extract::Query<HeartbeatHistoryQuery>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Verify contract exists and belongs to owner
+ match repository::get_contract_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(_)) => {}
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("CONTRACT_NOT_FOUND", "Contract not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get contract {}: {}", id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ }
+
+ let limit = query.limit.unwrap_or(10).min(100); // Cap at 100
+ let offset = query.offset.unwrap_or(0);
+
+ // Get activity history as heartbeats
+ let activities = match repository::get_supervisor_activity_history(pool, id, limit, offset).await {
+ Ok(activities) => activities,
+ Err(e) => {
+ tracing::error!("Failed to get supervisor heartbeats for contract {}: {}", id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get total count for pagination
+ let total = match repository::count_supervisor_activity_history(pool, id).await {
+ Ok(count) => count,
+ Err(e) => {
+ tracing::warn!("Failed to count supervisor heartbeats: {}", e);
+ activities.len() as i64
+ }
+ };
+
+ // Convert to heartbeat entries
+ let heartbeats: Vec<crate::db::models::SupervisorHeartbeatEntry> = activities
+ .into_iter()
+ .map(|a| crate::db::models::SupervisorHeartbeatEntry {
+ timestamp: a.timestamp,
+ state: a.state,
+ activity: a.activity,
+ progress: a.progress.map(|p| p as u8),
+ phase: a.phase,
+ pending_task_ids: a.pending_task_ids,
+ })
+ .collect();
+
+ Json(crate::db::models::SupervisorHeartbeatHistoryResponse {
+ heartbeats,
+ total,
+ })
+ .into_response()
+}
+
+/// Sync supervisor state (refresh last_activity timestamp).
+#[utoipa::path(
+ post,
+ path = "/api/v1/contracts/{id}/supervisor/sync",
+ params(
+ ("id" = Uuid, Path, description = "Contract ID")
+ ),
+ responses(
+ (status = 200, description = "Supervisor synced", body = crate::db::models::SupervisorSyncResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Contract or supervisor not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Contracts"
+)]
+pub async fn sync_supervisor(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Verify contract exists and belongs to owner
+ let contract = match repository::get_contract_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(c)) => c,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("CONTRACT_NOT_FOUND", "Contract not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get contract {}: {}", id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Check if contract has a supervisor
+ if contract.supervisor_task_id.is_none() {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("SUPERVISOR_NOT_FOUND", "No supervisor task for this contract")),
+ )
+ .into_response();
+ }
+
+ // Sync supervisor state (update last_activity)
+ match repository::sync_supervisor_state(pool, id).await {
+ Ok(Some(_state)) => {
+ // Get task status to determine current state
+ let task_status = if let Some(task_id) = contract.supervisor_task_id {
+ match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
+ Ok(Some(task)) => task.status,
+ _ => "unknown".to_string(),
+ }
+ } else {
+ "unknown".to_string()
+ };
+
+ Json(crate::db::models::SupervisorSyncResponse {
+ synced: true,
+ state: task_status,
+ message: Some("Supervisor state synced successfully".to_string()),
+ })
+ .into_response()
+ }
+ Ok(None) => {
+ // No supervisor state exists, return not found
+ (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("SUPERVISOR_NOT_FOUND", "No supervisor state found for this contract")),
+ )
+ .into_response()
+ }
+ Err(e) => {
+ tracing::error!("Failed to sync supervisor state for contract {}: {}", id, e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ }
+}
+
+// =============================================================================
// Tests
// =============================================================================
diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs
index 8456006..e5415ae 100644
--- a/makima/src/server/mod.rs
+++ b/makima/src/server/mod.rs
@@ -175,6 +175,10 @@ pub fn make_router(state: SharedState) -> Router {
// Contract supervisor resume endpoints
.route("/contracts/{id}/supervisor/resume", post(mesh_supervisor::resume_supervisor))
.route("/contracts/{id}/supervisor/conversation/rewind", post(mesh_supervisor::rewind_conversation))
+ // Contract supervisor status endpoints
+ .route("/contracts/{id}/supervisor/status", get(contracts::get_supervisor_status))
+ .route("/contracts/{id}/supervisor/heartbeats", get(contracts::get_supervisor_heartbeats))
+ .route("/contracts/{id}/supervisor/sync", post(contracts::sync_supervisor))
// History endpoints
.route("/contracts/{id}/history", get(history::get_contract_history))
.route("/contracts/{id}/supervisor/conversation", get(history::get_supervisor_conversation))