From bb14010db99b40792372bfcb4348cf4984f30b3f Mon Sep 17 00:00:00 2001 From: soryu Date: Sun, 1 Feb 2026 00:20:55 +0000 Subject: feat: Implement Phase 3 Tasks 3.1 and 3.2 - SupervisorState enum and Heartbeat Infrastructure Task 3.1: Enhanced Supervisor State Enum - Add SupervisorStateEnum with states: Initializing, Idle, Working, WaitingForUser, WaitingForTasks, Blocked, Completed, Failed, Interrupted - Implement Display, FromStr, Default, and serde serialization - Add SupervisorHeartbeatRecord and SupervisorHeartbeatRequest structs Task 3.2: Heartbeat Infrastructure - Create supervisor_heartbeats migration with proper indexes and constraints - Add heartbeat storage functions to repository.rs: - create_supervisor_heartbeat - get_latest_supervisor_heartbeat - get_supervisor_heartbeats - get_contract_supervisor_heartbeats - cleanup_old_heartbeats (24 hour TTL support) - find_stale_supervisors (for dead supervisor detection) - Add SupervisorHeartbeat message to protocol.rs with enhanced fields - Update mesh_daemon.rs to process and store supervisor heartbeats - Add unit tests for SupervisorStateEnum and heartbeat serialization Co-Authored-By: Claude Opus 4.5 --- .../20260201000000_supervisor_heartbeats.sql | 36 ++++ makima/src/daemon/ws/protocol.rs | 46 +++++ makima/src/db/models.rs | 201 +++++++++++++++++++++ makima/src/db/repository.rs | 160 +++++++++++++++- makima/src/server/handlers/mesh_daemon.rs | 83 +++++++++ 5 files changed, 523 insertions(+), 3 deletions(-) create mode 100644 makima/migrations/20260201000000_supervisor_heartbeats.sql diff --git a/makima/migrations/20260201000000_supervisor_heartbeats.sql b/makima/migrations/20260201000000_supervisor_heartbeats.sql new file mode 100644 index 0000000..8595f71 --- /dev/null +++ b/makima/migrations/20260201000000_supervisor_heartbeats.sql @@ -0,0 +1,36 @@ +-- Create supervisor_heartbeats table for tracking supervisor state over time. +-- This enables detection of dead/stale supervisors and provides audit trail. + +CREATE TABLE IF NOT EXISTS supervisor_heartbeats ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + supervisor_task_id UUID NOT NULL REFERENCES tasks(id) ON DELETE CASCADE, + contract_id UUID NOT NULL REFERENCES contracts(id) ON DELETE CASCADE, + state VARCHAR(50) NOT NULL, + phase VARCHAR(50) NOT NULL, + current_activity TEXT, + progress INTEGER DEFAULT 0 CHECK (progress >= 0 AND progress <= 100), + pending_task_ids UUID[] DEFAULT ARRAY[]::UUID[], + timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- Index for finding heartbeats by supervisor task +CREATE INDEX idx_heartbeats_supervisor ON supervisor_heartbeats(supervisor_task_id); + +-- Index for finding heartbeats by timestamp (for cleanup and monitoring) +CREATE INDEX idx_heartbeats_timestamp ON supervisor_heartbeats(timestamp); + +-- Index for finding heartbeats by contract +CREATE INDEX idx_heartbeats_contract ON supervisor_heartbeats(contract_id); + +-- Composite index for finding latest heartbeat per supervisor +CREATE INDEX idx_heartbeats_supervisor_timestamp ON supervisor_heartbeats(supervisor_task_id, timestamp DESC); + +COMMENT ON TABLE supervisor_heartbeats IS 'Historical record of supervisor heartbeats for monitoring and dead supervisor detection'; +COMMENT ON COLUMN supervisor_heartbeats.state IS 'Supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted'; +COMMENT ON COLUMN supervisor_heartbeats.phase IS 'Current contract phase when heartbeat was sent'; +COMMENT ON COLUMN supervisor_heartbeats.current_activity IS 'Human-readable description of what the supervisor is doing'; +COMMENT ON COLUMN supervisor_heartbeats.progress IS 'Progress percentage (0-100)'; +COMMENT ON COLUMN supervisor_heartbeats.pending_task_ids IS 'Array of task IDs the supervisor is waiting on'; + +-- Note: Cleanup of old heartbeats (24 hour TTL) should be done by a scheduled job +-- or application-level cleanup, not a CHECK constraint (which can't reference NOW()) diff --git a/makima/src/daemon/ws/protocol.rs b/makima/src/daemon/ws/protocol.rs index bfe6326..5c88038 100644 --- a/makima/src/daemon/ws/protocol.rs +++ b/makima/src/daemon/ws/protocol.rs @@ -2,6 +2,7 @@ //! //! These types mirror the server's protocol exactly for compatibility. +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -26,6 +27,29 @@ pub enum DaemonMessage { active_tasks: Vec, }, + /// Enhanced supervisor heartbeat with detailed state. + /// Sent periodically by supervisor tasks to report their current state. + SupervisorHeartbeat { + #[serde(rename = "taskId")] + task_id: Uuid, + #[serde(rename = "contractId")] + contract_id: Uuid, + /// Supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted + state: String, + /// Current contract phase + phase: String, + /// Description of current activity + #[serde(rename = "currentActivity")] + current_activity: Option, + /// Progress percentage (0-100) + progress: u8, + /// Task IDs the supervisor is waiting on + #[serde(rename = "pendingTaskIds")] + pending_task_ids: Vec, + /// Timestamp of this heartbeat + timestamp: DateTime, + }, + /// Task output streaming (stdout/stderr from Claude Code). TaskOutput { #[serde(rename = "taskId")] @@ -857,6 +881,28 @@ impl DaemonMessage { pub fn revoke_tool_key(task_id: Uuid) -> Self { Self::RevokeToolKey { task_id } } + + /// Create a supervisor heartbeat message. + pub fn supervisor_heartbeat( + task_id: Uuid, + contract_id: Uuid, + state: &str, + phase: &str, + current_activity: Option, + progress: u8, + pending_task_ids: Vec, + ) -> Self { + Self::SupervisorHeartbeat { + task_id, + contract_id, + state: state.to_string(), + phase: phase.to_string(), + current_activity, + progress, + pending_task_ids, + timestamp: Utc::now(), + } + } } #[cfg(test)] diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index 636d81a..cc30465 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -2339,6 +2339,111 @@ pub struct CheckpointPatchInfo { // Red Team Types // ============================================================================ +// ============================================================================= +// Supervisor State and Heartbeat Types +// ============================================================================= + +/// Supervisor state for contract supervisor tasks. +/// Captures detailed activity state for monitoring and restoration. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "snake_case")] +pub enum SupervisorStateEnum { + /// Supervisor is starting up + Initializing, + /// Supervisor is idle, waiting for work + Idle, + /// Supervisor is actively working + Working, + /// Supervisor is waiting for user input/confirmation + WaitingForUser, + /// Supervisor is waiting for spawned tasks to complete + WaitingForTasks, + /// Supervisor is blocked (external dependency, error, etc.) + Blocked, + /// Supervisor has completed its contract + Completed, + /// Supervisor has failed + Failed, + /// Supervisor was interrupted (daemon crash, etc.) + Interrupted, +} + +impl Default for SupervisorStateEnum { + fn default() -> Self { + SupervisorStateEnum::Initializing + } +} + +impl std::fmt::Display for SupervisorStateEnum { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SupervisorStateEnum::Initializing => write!(f, "initializing"), + SupervisorStateEnum::Idle => write!(f, "idle"), + SupervisorStateEnum::Working => write!(f, "working"), + SupervisorStateEnum::WaitingForUser => write!(f, "waiting_for_user"), + SupervisorStateEnum::WaitingForTasks => write!(f, "waiting_for_tasks"), + SupervisorStateEnum::Blocked => write!(f, "blocked"), + SupervisorStateEnum::Completed => write!(f, "completed"), + SupervisorStateEnum::Failed => write!(f, "failed"), + SupervisorStateEnum::Interrupted => write!(f, "interrupted"), + } + } +} + +impl std::str::FromStr for SupervisorStateEnum { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "initializing" => Ok(SupervisorStateEnum::Initializing), + "idle" => Ok(SupervisorStateEnum::Idle), + "working" => Ok(SupervisorStateEnum::Working), + "waiting_for_user" | "waitingforuser" => Ok(SupervisorStateEnum::WaitingForUser), + "waiting_for_tasks" | "waitingfortasks" => Ok(SupervisorStateEnum::WaitingForTasks), + "blocked" => Ok(SupervisorStateEnum::Blocked), + "completed" => Ok(SupervisorStateEnum::Completed), + "failed" => Ok(SupervisorStateEnum::Failed), + "interrupted" => Ok(SupervisorStateEnum::Interrupted), + _ => Err(format!("Unknown supervisor state: {}", s)), + } + } +} + +/// Enhanced heartbeat record for supervisor task monitoring. +/// Stored in the database for historical analysis and dead supervisor detection. +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorHeartbeatRecord { + pub id: Uuid, + pub supervisor_task_id: Uuid, + pub contract_id: Uuid, + pub state: String, + pub phase: String, + pub current_activity: Option, + pub progress: i32, + #[sqlx(try_from = "Vec")] + pub pending_task_ids: Vec, + pub timestamp: DateTime, +} + +/// Request payload for sending a supervisor heartbeat. +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorHeartbeatRequest { + pub task_id: Uuid, + pub contract_id: Uuid, + pub state: SupervisorStateEnum, + pub phase: String, + pub current_activity: Option, + /// Progress percentage (0-100) + pub progress: u8, + pub pending_task_ids: Vec, +} + +// ============================================================================= +// Red Team Types +// ============================================================================= + /// Red Team notification record #[derive(Debug, Clone, FromRow, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] @@ -2395,3 +2500,99 @@ impl std::str::FromStr for NotificationSeverity { } } } + +// ============================================================================= +// Unit Tests +// ============================================================================= + +#[cfg(test)] +mod tests { + use super::*; + use uuid::Uuid; + + #[test] + fn test_supervisor_state_enum_display() { + assert_eq!(SupervisorStateEnum::Initializing.to_string(), "initializing"); + assert_eq!(SupervisorStateEnum::Idle.to_string(), "idle"); + assert_eq!(SupervisorStateEnum::Working.to_string(), "working"); + assert_eq!(SupervisorStateEnum::WaitingForUser.to_string(), "waiting_for_user"); + assert_eq!(SupervisorStateEnum::WaitingForTasks.to_string(), "waiting_for_tasks"); + assert_eq!(SupervisorStateEnum::Blocked.to_string(), "blocked"); + assert_eq!(SupervisorStateEnum::Completed.to_string(), "completed"); + assert_eq!(SupervisorStateEnum::Failed.to_string(), "failed"); + assert_eq!(SupervisorStateEnum::Interrupted.to_string(), "interrupted"); + } + + #[test] + fn test_supervisor_state_enum_from_str() { + // Standard lowercase + assert_eq!("initializing".parse::().unwrap(), SupervisorStateEnum::Initializing); + assert_eq!("idle".parse::().unwrap(), SupervisorStateEnum::Idle); + assert_eq!("working".parse::().unwrap(), SupervisorStateEnum::Working); + assert_eq!("waiting_for_user".parse::().unwrap(), SupervisorStateEnum::WaitingForUser); + assert_eq!("waiting_for_tasks".parse::().unwrap(), SupervisorStateEnum::WaitingForTasks); + assert_eq!("blocked".parse::().unwrap(), SupervisorStateEnum::Blocked); + assert_eq!("completed".parse::().unwrap(), SupervisorStateEnum::Completed); + assert_eq!("failed".parse::().unwrap(), SupervisorStateEnum::Failed); + assert_eq!("interrupted".parse::().unwrap(), SupervisorStateEnum::Interrupted); + + // Case insensitive + assert_eq!("WORKING".parse::().unwrap(), SupervisorStateEnum::Working); + assert_eq!("Working".parse::().unwrap(), SupervisorStateEnum::Working); + + // Alternative formats + assert_eq!("waitingforuser".parse::().unwrap(), SupervisorStateEnum::WaitingForUser); + assert_eq!("waitingfortasks".parse::().unwrap(), SupervisorStateEnum::WaitingForTasks); + + // Invalid state + assert!("invalid_state".parse::().is_err()); + } + + #[test] + fn test_supervisor_state_enum_serialization() { + // Test JSON serialization + let state = SupervisorStateEnum::Working; + let json = serde_json::to_string(&state).unwrap(); + assert_eq!(json, "\"working\""); + + // Test JSON deserialization + let deserialized: SupervisorStateEnum = serde_json::from_str("\"working\"").unwrap(); + assert_eq!(deserialized, SupervisorStateEnum::Working); + + // Test underscore variants + let json = "\"waiting_for_user\""; + let deserialized: SupervisorStateEnum = serde_json::from_str(json).unwrap(); + assert_eq!(deserialized, SupervisorStateEnum::WaitingForUser); + } + + #[test] + fn test_supervisor_state_enum_default() { + let default_state = SupervisorStateEnum::default(); + assert_eq!(default_state, SupervisorStateEnum::Initializing); + } + + #[test] + fn test_supervisor_heartbeat_request_serialization() { + let request = SupervisorHeartbeatRequest { + task_id: Uuid::nil(), + contract_id: Uuid::nil(), + state: SupervisorStateEnum::Working, + phase: "execute".to_string(), + current_activity: Some("Implementing feature".to_string()), + progress: 50, + pending_task_ids: vec![Uuid::nil()], + }; + + let json = serde_json::to_string(&request).unwrap(); + assert!(json.contains("\"state\":\"working\"")); + assert!(json.contains("\"phase\":\"execute\"")); + assert!(json.contains("\"progress\":50")); + assert!(json.contains("\"currentActivity\":\"Implementing feature\"")); + + // Test deserialization + let deserialized: SupervisorHeartbeatRequest = serde_json::from_str(&json).unwrap(); + assert_eq!(deserialized.state, SupervisorStateEnum::Working); + assert_eq!(deserialized.phase, "execute"); + assert_eq!(deserialized.progress, 50); + } +} diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index b7c5af1..1ac188c 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -12,9 +12,9 @@ use super::models::{ CreateFileRequest, CreateTaskRequest, CreateTemplateRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, DeliverableDefinition, File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters, MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult, - PhaseConfig, PhaseDefinition, RedTeamNotification, SupervisorState, Task, TaskCheckpoint, - TaskEvent, TaskSummary, UpdateContractRequest, UpdateFileRequest, UpdateTaskRequest, - UpdateTemplateRequest, + PhaseConfig, PhaseDefinition, RedTeamNotification, SupervisorHeartbeatRecord, SupervisorState, + Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest, UpdateFileRequest, + UpdateTaskRequest, UpdateTemplateRequest, }; /// Repository error types. @@ -3404,6 +3404,160 @@ pub async fn update_supervisor_pending_tasks( .await } +// ============================================================================ +// Supervisor Heartbeats +// ============================================================================ + +/// Record a supervisor heartbeat. +/// This creates a historical record for monitoring and dead supervisor detection. +pub async fn create_supervisor_heartbeat( + pool: &PgPool, + supervisor_task_id: Uuid, + contract_id: Uuid, + state: &str, + phase: &str, + current_activity: Option<&str>, + progress: i32, + pending_task_ids: &[Uuid], +) -> Result { + sqlx::query_as::<_, SupervisorHeartbeatRecord>( + r#" + INSERT INTO supervisor_heartbeats ( + supervisor_task_id, contract_id, state, phase, current_activity, progress, pending_task_ids, timestamp + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, NOW()) + RETURNING * + "#, + ) + .bind(supervisor_task_id) + .bind(contract_id) + .bind(state) + .bind(phase) + .bind(current_activity) + .bind(progress) + .bind(pending_task_ids) + .fetch_one(pool) + .await +} + +/// Get the latest heartbeat for a supervisor task. +pub async fn get_latest_supervisor_heartbeat( + pool: &PgPool, + supervisor_task_id: Uuid, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, SupervisorHeartbeatRecord>( + r#" + SELECT * FROM supervisor_heartbeats + WHERE supervisor_task_id = $1 + ORDER BY timestamp DESC + LIMIT 1 + "#, + ) + .bind(supervisor_task_id) + .fetch_optional(pool) + .await +} + +/// Get recent heartbeats for a supervisor task. +pub async fn get_supervisor_heartbeats( + pool: &PgPool, + supervisor_task_id: Uuid, + limit: i64, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, SupervisorHeartbeatRecord>( + r#" + SELECT * FROM supervisor_heartbeats + WHERE supervisor_task_id = $1 + ORDER BY timestamp DESC + LIMIT $2 + "#, + ) + .bind(supervisor_task_id) + .bind(limit) + .fetch_all(pool) + .await +} + +/// Get recent heartbeats for a contract. +pub async fn get_contract_supervisor_heartbeats( + pool: &PgPool, + contract_id: Uuid, + limit: i64, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, SupervisorHeartbeatRecord>( + r#" + SELECT * FROM supervisor_heartbeats + WHERE contract_id = $1 + ORDER BY timestamp DESC + LIMIT $2 + "#, + ) + .bind(contract_id) + .bind(limit) + .fetch_all(pool) + .await +} + +/// Delete old heartbeats beyond the TTL (24 hours by default). +/// Returns the number of deleted records. +pub async fn cleanup_old_heartbeats( + pool: &PgPool, + ttl_hours: i64, +) -> Result { + let result = sqlx::query( + r#" + DELETE FROM supervisor_heartbeats + WHERE timestamp < NOW() - ($1 || ' hours')::INTERVAL + "#, + ) + .bind(ttl_hours.to_string()) + .execute(pool) + .await?; + + Ok(result.rows_affected()) +} + +/// Find supervisors that have not sent a heartbeat within the timeout period. +/// Returns list of (supervisor_task_id, contract_id, last_heartbeat_timestamp). +pub async fn find_stale_supervisors( + pool: &PgPool, + timeout_seconds: i64, +) -> Result)>, sqlx::Error> { + let rows = sqlx::query( + r#" + WITH latest_heartbeats AS ( + SELECT DISTINCT ON (supervisor_task_id) + supervisor_task_id, + contract_id, + timestamp + FROM supervisor_heartbeats + ORDER BY supervisor_task_id, timestamp DESC + ) + SELECT + lh.supervisor_task_id, + lh.contract_id, + lh.timestamp + FROM latest_heartbeats lh + JOIN tasks t ON t.id = lh.supervisor_task_id + WHERE t.status = 'running' + AND lh.timestamp < NOW() - ($1 || ' seconds')::INTERVAL + "#, + ) + .bind(timeout_seconds.to_string()) + .fetch_all(pool) + .await?; + + let mut result = Vec::new(); + for row in rows { + use sqlx::Row; + let supervisor_task_id: Uuid = row.get("supervisor_task_id"); + let contract_id: Uuid = row.get("contract_id"); + let timestamp: chrono::DateTime = row.get("timestamp"); + result.push((supervisor_task_id, contract_id, timestamp)); + } + Ok(result) +} + // ============================================================================ // Contract Supervisor // ============================================================================ diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 1152502..887183a 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -15,6 +15,7 @@ use axum::{ response::{IntoResponse, Response}, }; use base64::Engine; +use chrono::{DateTime, Utc}; use futures::{SinkExt, StreamExt}; use serde::Deserialize; use sqlx::Row; @@ -262,6 +263,27 @@ pub enum DaemonMessage { #[serde(rename = "activeTasks")] active_tasks: Vec, }, + /// Enhanced supervisor heartbeat with detailed state + SupervisorHeartbeat { + #[serde(rename = "taskId")] + task_id: Uuid, + #[serde(rename = "contractId")] + contract_id: Uuid, + /// Supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted + state: String, + /// Current contract phase + phase: String, + /// Description of current activity + #[serde(rename = "currentActivity")] + current_activity: Option, + /// Progress percentage (0-100) + progress: u8, + /// Task IDs the supervisor is waiting on + #[serde(rename = "pendingTaskIds")] + pending_task_ids: Vec, + /// Timestamp of this heartbeat + timestamp: DateTime, + }, /// Task output streaming (stdout/stderr from Claude Code) TaskOutput { #[serde(rename = "taskId")] @@ -892,6 +914,67 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re }); } } + Ok(DaemonMessage::SupervisorHeartbeat { + task_id, + contract_id, + state: supervisor_state, + phase, + current_activity, + progress, + pending_task_ids, + timestamp: _, + }) => { + tracing::debug!( + task_id = %task_id, + contract_id = %contract_id, + state = %supervisor_state, + phase = %phase, + progress = progress, + "Supervisor heartbeat received" + ); + + // Store heartbeat in database + if let Some(ref pool) = state.db_pool { + let pool = pool.clone(); + let pending_ids = pending_task_ids.clone(); + let activity = current_activity.clone(); + let state_str = supervisor_state.clone(); + let phase_str = phase.clone(); + tokio::spawn(async move { + // Store the heartbeat record + if let Err(e) = repository::create_supervisor_heartbeat( + &pool, + task_id, + contract_id, + &state_str, + &phase_str, + activity.as_deref(), + progress as i32, + &pending_ids, + ).await { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + error = %e, + "Failed to store supervisor heartbeat" + ); + } + + // Also update the daemon heartbeat + if let Ok(Some(task)) = repository::get_task(&pool, task_id).await { + if let Some(daemon_id) = task.daemon_id { + if let Err(e) = repository::update_daemon_heartbeat(&pool, daemon_id).await { + tracing::warn!( + daemon_id = %daemon_id, + error = %e, + "Failed to update daemon heartbeat from supervisor" + ); + } + } + } + }); + } + } Ok(DaemonMessage::TaskOutput { task_id, output, is_partial }) => { // Parse the output line and broadcast structured data if let Some(notification) = parse_claude_output(task_id, owner_id, &output, is_partial) { -- cgit v1.2.3 From 96ad3af6051af69e2e8b34b35e8b40926bdd13a1 Mon Sep 17 00:00:00 2001 From: soryu Date: Sun, 1 Feb 2026 00:42:53 +0000 Subject: feat: Implement Phase 3 Tasks 3.3 and 3.4 - Supervisor State Persistence and Restoration Task 3.3: Supervisor State Persistence - Add migration 20260201000001_enhanced_supervisor_state.sql with new columns: - state (supervisor state enum) - current_activity (description) - progress (0-100) - error_message (for failed states) - spawned_task_ids (tasks created by supervisor) - pending_questions (questions awaiting user response) - restoration_count, last_restored_at, restoration_source (restoration tracking) - Update SupervisorState model with new fields - Add PendingQuestion struct for tracking unanswered questions - Add SupervisorRestorationContext for returning restoration info - Add StateValidationResult and StateRecoveryAction for state validation State persistence functions in repository.rs: - update_supervisor_detailed_state() - Update state, activity, progress - add_supervisor_spawned_task() - Track spawned tasks - add_supervisor_pending_question() - Track pending questions - remove_supervisor_pending_question() - Clear answered questions - save_supervisor_state_full() - Full state save (UPSERT) - mark_supervisor_restored() - Increment restoration count - get_supervisors_with_pending_questions() - Find supervisors with pending questions - get_supervisor_state_for_restoration() - Load state for restoration - validate_spawned_tasks() - Validate task consistency - update_supervisor_phase() - Update on phase change - update_supervisor_heartbeat_state() - Lightweight heartbeat update State save points: - On task spawn (save_state_on_task_spawn) - On question asked (save_state_on_question_asked) - On question answered (clear_pending_question) - On phase change (save_state_on_phase_change) - On heartbeat (update_supervisor_heartbeat_state) Task 3.4: Supervisor Restoration Protocol - Add restoration detection when supervisor starts with existing state - Implement validate_supervisor_state() for state consistency checks - Implement restore_supervisor() with validation and context generation - Add redeliver_pending_questions() for re-delivering questions after crash - Add generate_restoration_context_message() for Claude context injection - Update resume_supervisor endpoint to return RestorationInfo - Re-deliver pending questions when supervisor resumes Restoration flow: 1. Daemon restarts or task reassigned 2. Load supervisor state from supervisor_states 3. If NOT FOUND: Start fresh, log warning 4. If FOUND: Validate state consistency 5. If INVALID: Start from last checkpoint 6. If VALID: Restore conversation history 7. Check for pending questions - re-deliver to user 8. Check for waiting tasks - resume waiting state 9. Send restoration context to Claude 10. Resume execution from last state Co-Authored-By: Claude Opus 4.5 --- .../20260201000001_enhanced_supervisor_state.sql | 56 +++ makima/src/db/models.rs | 102 +++++ makima/src/db/repository.rs | 304 +++++++++++++ makima/src/server/handlers/contracts.rs | 16 + makima/src/server/handlers/mesh_daemon.rs | 126 ++++-- makima/src/server/handlers/mesh_supervisor.rs | 478 ++++++++++++++++++++- 6 files changed, 1057 insertions(+), 25 deletions(-) create mode 100644 makima/migrations/20260201000001_enhanced_supervisor_state.sql diff --git a/makima/migrations/20260201000001_enhanced_supervisor_state.sql b/makima/migrations/20260201000001_enhanced_supervisor_state.sql new file mode 100644 index 0000000..5411b73 --- /dev/null +++ b/makima/migrations/20260201000001_enhanced_supervisor_state.sql @@ -0,0 +1,56 @@ +-- Enhanced supervisor state persistence for restoration after crashes. +-- Adds additional fields to supervisor_states to track detailed state for recovery. + +-- Add state tracking field (matches SupervisorStateEnum: initializing, idle, working, +-- waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted) +ALTER TABLE supervisor_states + ADD COLUMN IF NOT EXISTS state VARCHAR(50) NOT NULL DEFAULT 'initializing'; + +-- Add current activity description for monitoring +ALTER TABLE supervisor_states + ADD COLUMN IF NOT EXISTS current_activity TEXT; + +-- Add progress percentage (0-100) +ALTER TABLE supervisor_states + ADD COLUMN IF NOT EXISTS progress INTEGER DEFAULT 0 + CHECK (progress >= 0 AND progress <= 100); + +-- Add error message for failed states +ALTER TABLE supervisor_states + ADD COLUMN IF NOT EXISTS error_message TEXT; + +-- Add spawned task IDs (tasks this supervisor has created) +ALTER TABLE supervisor_states + ADD COLUMN IF NOT EXISTS spawned_task_ids UUID[] DEFAULT ARRAY[]::UUID[]; + +-- Add pending questions (questions waiting for user response) +ALTER TABLE supervisor_states + ADD COLUMN IF NOT EXISTS pending_questions JSONB DEFAULT '[]'; + +-- Add restoration metadata +ALTER TABLE supervisor_states + ADD COLUMN IF NOT EXISTS restoration_count INTEGER DEFAULT 0; + +ALTER TABLE supervisor_states + ADD COLUMN IF NOT EXISTS last_restored_at TIMESTAMPTZ; + +ALTER TABLE supervisor_states + ADD COLUMN IF NOT EXISTS restoration_source VARCHAR(50); + +-- Index for finding supervisors by state (useful for finding blocked/failed supervisors) +CREATE INDEX IF NOT EXISTS idx_supervisor_states_state ON supervisor_states(state); + +-- Index for finding supervisors with pending questions +CREATE INDEX IF NOT EXISTS idx_supervisor_states_pending_questions + ON supervisor_states USING gin(pending_questions) + WHERE pending_questions != '[]'::jsonb; + +COMMENT ON COLUMN supervisor_states.state IS 'Current supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted'; +COMMENT ON COLUMN supervisor_states.current_activity IS 'Human-readable description of current activity'; +COMMENT ON COLUMN supervisor_states.progress IS 'Progress percentage (0-100)'; +COMMENT ON COLUMN supervisor_states.error_message IS 'Error message when state is failed or blocked'; +COMMENT ON COLUMN supervisor_states.spawned_task_ids IS 'Array of task UUIDs spawned by this supervisor'; +COMMENT ON COLUMN supervisor_states.pending_questions IS 'Array of questions awaiting user response: [{id, question, choices, context, asked_at}]'; +COMMENT ON COLUMN supervisor_states.restoration_count IS 'Number of times this supervisor has been restored after interruption'; +COMMENT ON COLUMN supervisor_states.last_restored_at IS 'Timestamp of last restoration'; +COMMENT ON COLUMN supervisor_states.restoration_source IS 'Source of last restoration: daemon_restart, task_reassignment, manual'; diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index cc30465..fcbd044 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -1971,6 +1971,50 @@ pub struct SupervisorState { pub last_activity: DateTime, pub created_at: DateTime, pub updated_at: DateTime, + /// Current supervisor state (initializing, idle, working, waiting_for_user, etc.) + pub state: String, + /// Human-readable description of current activity + pub current_activity: Option, + /// Progress percentage (0-100) + pub progress: i32, + /// Error message when state is failed or blocked + pub error_message: Option, + /// Tasks spawned by this supervisor + #[sqlx(try_from = "Vec")] + pub spawned_task_ids: Vec, + /// Pending questions awaiting user response + #[sqlx(json)] + pub pending_questions: serde_json::Value, + /// Number of times this supervisor has been restored + pub restoration_count: i32, + /// Timestamp of last restoration + pub last_restored_at: Option>, + /// Source of last restoration (daemon_restart, task_reassignment, manual) + pub restoration_source: Option, +} + +/// Pending question structure for supervisor state +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct PendingQuestion { + /// Unique question ID + pub id: Uuid, + /// The question text + pub question: String, + /// Optional choices (empty for free-form) + #[serde(default)] + pub choices: Vec, + /// Optional context + pub context: Option, + /// Question type: general, phase_confirmation, contract_complete + #[serde(default = "default_question_type")] + pub question_type: String, + /// When the question was asked + pub asked_at: DateTime, +} + +fn default_question_type() -> String { + "general".to_string() } /// Request to update supervisor state @@ -1983,6 +2027,64 @@ pub struct UpdateSupervisorStateRequest { pub pending_task_ids: Option>, /// Current contract phase pub phase: Option, + /// Current supervisor state + pub state: Option, + /// Current activity description + pub current_activity: Option, + /// Progress percentage + pub progress: Option, + /// Error message + pub error_message: Option, + /// Spawned task IDs + pub spawned_task_ids: Option>, + /// Pending questions + pub pending_questions: Option, +} + +/// Restoration context returned when restoring a supervisor +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorRestorationContext { + /// Whether restoration was successful + pub success: bool, + /// Previous state before restoration + pub previous_state: SupervisorStateEnum, + /// Restored conversation history + pub conversation_history: serde_json::Value, + /// Pending questions that need re-delivery + pub pending_questions: Vec, + /// Tasks still being waited on + pub waiting_task_ids: Vec, + /// Spawned tasks to check status of + pub spawned_task_ids: Vec, + /// Restoration count (incremented) + pub restoration_count: i32, + /// Context message for Claude + pub restoration_context_message: String, + /// Any warnings during restoration + pub warnings: Vec, +} + +/// Validation result for supervisor state consistency +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StateValidationResult { + pub is_valid: bool, + pub issues: Vec, + /// Suggested recovery action + pub recovery_action: StateRecoveryAction, +} + +/// Action to take when state validation fails +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum StateRecoveryAction { + /// State is valid, proceed with restoration + Proceed, + /// Start from last checkpoint + UseCheckpoint, + /// Start fresh + StartFresh, + /// Manual intervention required + ManualIntervention, } // ============================================================================ diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 1ac188c..d1ec3ef 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -3404,6 +3404,310 @@ pub async fn update_supervisor_pending_tasks( .await } +/// Update supervisor state with detailed activity tracking. +/// Called at key save points: LLM response, task spawn, question asked, phase change. +pub async fn update_supervisor_detailed_state( + pool: &PgPool, + contract_id: Uuid, + state: &str, + current_activity: Option<&str>, + progress: i32, + error_message: Option<&str>, +) -> Result { + sqlx::query_as::<_, SupervisorState>( + r#" + UPDATE supervisor_states + SET state = $1, + current_activity = $2, + progress = $3, + error_message = $4, + last_activity = NOW(), + updated_at = NOW() + WHERE contract_id = $5 + RETURNING * + "#, + ) + .bind(state) + .bind(current_activity) + .bind(progress) + .bind(error_message) + .bind(contract_id) + .fetch_one(pool) + .await +} + +/// Add a spawned task ID to the supervisor's list. +pub async fn add_supervisor_spawned_task( + pool: &PgPool, + contract_id: Uuid, + task_id: Uuid, +) -> Result { + sqlx::query_as::<_, SupervisorState>( + r#" + UPDATE supervisor_states + SET spawned_task_ids = array_append(spawned_task_ids, $1), + last_activity = NOW(), + updated_at = NOW() + WHERE contract_id = $2 + RETURNING * + "#, + ) + .bind(task_id) + .bind(contract_id) + .fetch_one(pool) + .await +} + +/// Add a pending question to the supervisor state. +pub async fn add_supervisor_pending_question( + pool: &PgPool, + contract_id: Uuid, + question: serde_json::Value, +) -> Result { + sqlx::query_as::<_, SupervisorState>( + r#" + UPDATE supervisor_states + SET pending_questions = pending_questions || $1::jsonb, + state = 'waiting_for_user', + last_activity = NOW(), + updated_at = NOW() + WHERE contract_id = $2 + RETURNING * + "#, + ) + .bind(question) + .bind(contract_id) + .fetch_one(pool) + .await +} + +/// Remove a pending question by ID. +pub async fn remove_supervisor_pending_question( + pool: &PgPool, + contract_id: Uuid, + question_id: Uuid, +) -> Result { + sqlx::query_as::<_, SupervisorState>( + r#" + UPDATE supervisor_states + SET pending_questions = ( + SELECT COALESCE(jsonb_agg(elem), '[]'::jsonb) + FROM jsonb_array_elements(pending_questions) elem + WHERE (elem->>'id')::uuid != $1 + ), + last_activity = NOW(), + updated_at = NOW() + WHERE contract_id = $2 + RETURNING * + "#, + ) + .bind(question_id) + .bind(contract_id) + .fetch_one(pool) + .await +} + +/// Comprehensive state save - used at major save points. +pub async fn save_supervisor_state_full( + pool: &PgPool, + contract_id: Uuid, + task_id: Uuid, + conversation_history: serde_json::Value, + pending_task_ids: &[Uuid], + phase: &str, + state: &str, + current_activity: Option<&str>, + progress: i32, + error_message: Option<&str>, + spawned_task_ids: &[Uuid], + pending_questions: serde_json::Value, +) -> Result { + sqlx::query_as::<_, SupervisorState>( + r#" + INSERT INTO supervisor_states ( + contract_id, task_id, conversation_history, pending_task_ids, phase, + state, current_activity, progress, error_message, spawned_task_ids, + pending_questions, last_activity + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW()) + ON CONFLICT (contract_id) DO UPDATE SET + task_id = EXCLUDED.task_id, + conversation_history = EXCLUDED.conversation_history, + pending_task_ids = EXCLUDED.pending_task_ids, + phase = EXCLUDED.phase, + state = EXCLUDED.state, + current_activity = EXCLUDED.current_activity, + progress = EXCLUDED.progress, + error_message = EXCLUDED.error_message, + spawned_task_ids = EXCLUDED.spawned_task_ids, + pending_questions = EXCLUDED.pending_questions, + last_activity = NOW(), + updated_at = NOW() + RETURNING * + "#, + ) + .bind(contract_id) + .bind(task_id) + .bind(conversation_history) + .bind(pending_task_ids) + .bind(phase) + .bind(state) + .bind(current_activity) + .bind(progress) + .bind(error_message) + .bind(spawned_task_ids) + .bind(pending_questions) + .fetch_one(pool) + .await +} + +/// Mark supervisor as restored from a crash/interruption. +pub async fn mark_supervisor_restored( + pool: &PgPool, + contract_id: Uuid, + restoration_source: &str, +) -> Result { + sqlx::query_as::<_, SupervisorState>( + r#" + UPDATE supervisor_states + SET restoration_count = restoration_count + 1, + last_restored_at = NOW(), + restoration_source = $1, + state = 'initializing', + error_message = NULL, + last_activity = NOW(), + updated_at = NOW() + WHERE contract_id = $2 + RETURNING * + "#, + ) + .bind(restoration_source) + .bind(contract_id) + .fetch_one(pool) + .await +} + +/// Get supervisors with pending questions (for re-delivery after restoration). +pub async fn get_supervisors_with_pending_questions( + pool: &PgPool, + owner_id: Uuid, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, SupervisorState>( + r#" + SELECT ss.* + FROM supervisor_states ss + JOIN contracts c ON c.id = ss.contract_id + WHERE c.owner_id = $1 + AND ss.pending_questions != '[]'::jsonb + AND c.status = 'active' + ORDER BY ss.last_activity DESC + "#, + ) + .bind(owner_id) + .fetch_all(pool) + .await +} + +/// Get supervisor state with full details for restoration. +/// Includes validation info. +pub async fn get_supervisor_state_for_restoration( + pool: &PgPool, + contract_id: Uuid, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, SupervisorState>( + r#" + SELECT * FROM supervisor_states WHERE contract_id = $1 + "#, + ) + .bind(contract_id) + .fetch_optional(pool) + .await +} + +/// Validate spawned tasks are in expected states. +/// Returns map of task_id -> (status, updated_at). +pub async fn validate_spawned_tasks( + pool: &PgPool, + task_ids: &[Uuid], +) -> Result)>, sqlx::Error> { + use sqlx::Row; + + let rows = sqlx::query( + r#" + SELECT id, status, updated_at + FROM tasks + WHERE id = ANY($1) + "#, + ) + .bind(task_ids) + .fetch_all(pool) + .await?; + + let mut result = std::collections::HashMap::new(); + for row in rows { + let id: Uuid = row.get("id"); + let status: String = row.get("status"); + let updated_at: chrono::DateTime = row.get("updated_at"); + result.insert(id, (status, updated_at)); + } + Ok(result) +} + +/// Update supervisor state when phase changes. +pub async fn update_supervisor_phase( + pool: &PgPool, + contract_id: Uuid, + new_phase: &str, +) -> Result { + sqlx::query_as::<_, SupervisorState>( + r#" + UPDATE supervisor_states + SET phase = $1, + state = 'working', + current_activity = 'Phase changed to ' || $1, + last_activity = NOW(), + updated_at = NOW() + WHERE contract_id = $2 + RETURNING * + "#, + ) + .bind(new_phase) + .bind(contract_id) + .fetch_one(pool) + .await +} + +/// Update supervisor state on heartbeat (lightweight update). +pub async fn update_supervisor_heartbeat_state( + pool: &PgPool, + contract_id: Uuid, + state: &str, + current_activity: Option<&str>, + progress: i32, + pending_task_ids: &[Uuid], +) -> Result<(), sqlx::Error> { + sqlx::query( + r#" + UPDATE supervisor_states + SET state = $1, + current_activity = $2, + progress = $3, + pending_task_ids = $4, + last_activity = NOW(), + updated_at = NOW() + WHERE contract_id = $5 + "#, + ) + .bind(state) + .bind(current_activity) + .bind(progress) + .bind(pending_task_ids) + .bind(contract_id) + .execute(pool) + .await?; + Ok(()) +} + // ============================================================================ // Supervisor Heartbeats // ============================================================================ diff --git a/makima/src/server/handlers/contracts.rs b/makima/src/server/handlers/contracts.rs index b15667d..5a87616 100644 --- a/makima/src/server/handlers/contracts.rs +++ b/makima/src/server/handlers/contracts.rs @@ -1461,6 +1461,22 @@ pub async fn change_phase( .await { Ok(PhaseChangeResult::Success(updated_contract)) => { + // Save supervisor state on phase change (Task 3.3) + // This is a key save point for restoration + let new_phase_for_state = updated_contract.phase.clone(); + let contract_id_for_state = updated_contract.id; + let pool_for_state = pool.clone(); + tokio::spawn(async move { + if let Err(e) = repository::update_supervisor_phase(&pool_for_state, contract_id_for_state, &new_phase_for_state).await { + tracing::warn!( + contract_id = %contract_id_for_state, + new_phase = %new_phase_for_state, + error = %e, + "Failed to save supervisor state on phase change" + ); + } + }); + // Notify supervisor of phase change if let Some(supervisor_task_id) = updated_contract.supervisor_task_id { if let Ok(Some(supervisor)) = repository::get_task_for_owner(pool, supervisor_task_id, auth.owner_id).await { diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 887183a..34e2cc3 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -933,7 +933,7 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re "Supervisor heartbeat received" ); - // Store heartbeat in database + // Store heartbeat in database and update supervisor state (Task 3.3) if let Some(ref pool) = state.db_pool { let pool = pool.clone(); let pending_ids = pending_task_ids.clone(); @@ -960,6 +960,22 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re ); } + // Update supervisor_states table (lightweight heartbeat state update - Task 3.3) + if let Err(e) = repository::update_supervisor_heartbeat_state( + &pool, + contract_id, + &state_str, + activity.as_deref(), + progress as i32, + &pending_ids, + ).await { + tracing::debug!( + contract_id = %contract_id, + error = %e, + "Failed to update supervisor state from heartbeat (may not exist yet)" + ); + } + // Also update the daemon heartbeat if let Ok(Some(task)) = repository::get_task(&pool, task_id).await { if let Some(daemon_id) = task.daemon_id { @@ -1035,55 +1051,117 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re updated_by: "daemon".into(), }); - // Initialize supervisor_state when supervisor task starts running + // Initialize or restore supervisor_state when supervisor task starts running (Task 3.4) if updated_task.is_supervisor && new_status_owned == "running" { if let Some(contract_id) = updated_task.contract_id { - // Get contract to get its phase - match repository::get_contract_for_owner( - &pool, - contract_id, - updated_task.owner_id, - ).await { - Ok(Some(contract)) => { - match repository::upsert_supervisor_state( + // Check if supervisor state already exists (restoration scenario) + match repository::get_supervisor_state(&pool, contract_id).await { + Ok(Some(existing_state)) => { + // State exists - this is a restoration + tracing::info!( + task_id = %task_id, + contract_id = %contract_id, + existing_state = %existing_state.state, + restoration_count = existing_state.restoration_count, + "Supervisor starting with existing state - restoration in progress" + ); + + // Mark as restored (increments restoration_count) + match repository::mark_supervisor_restored( &pool, contract_id, - task_id, - serde_json::json!([]), // Empty conversation - &[], // No pending tasks - &contract.phase, + "daemon_restart", ).await { - Ok(_) => { + Ok(restored_state) => { tracing::info!( task_id = %task_id, contract_id = %contract_id, - phase = %contract.phase, - "Initialized supervisor state for running supervisor" + restoration_count = restored_state.restoration_count, + "Supervisor restoration marked" ); + + // Check for pending questions to re-deliver + if let Ok(questions) = serde_json::from_value::>( + restored_state.pending_questions.clone() + ) { + if !questions.is_empty() { + tracing::info!( + contract_id = %contract_id, + question_count = questions.len(), + "Pending questions found for re-delivery" + ); + // Questions will be re-delivered by the supervisor when it restores + } + } } Err(e) => { tracing::warn!( task_id = %task_id, contract_id = %contract_id, error = %e, - "Failed to initialize supervisor state" + "Failed to mark supervisor as restored" ); } } } Ok(None) => { - tracing::warn!( - task_id = %task_id, - contract_id = %contract_id, - "Contract not found when initializing supervisor state" - ); + // No existing state - fresh start + // Get contract to get its phase + match repository::get_contract_for_owner( + &pool, + contract_id, + updated_task.owner_id, + ).await { + Ok(Some(contract)) => { + match repository::upsert_supervisor_state( + &pool, + contract_id, + task_id, + serde_json::json!([]), // Empty conversation + &[], // No pending tasks + &contract.phase, + ).await { + Ok(_) => { + tracing::info!( + task_id = %task_id, + contract_id = %contract_id, + phase = %contract.phase, + "Initialized fresh supervisor state" + ); + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + error = %e, + "Failed to initialize supervisor state" + ); + } + } + } + Ok(None) => { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + "Contract not found when initializing supervisor state" + ); + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + error = %e, + "Failed to get contract for supervisor state" + ); + } + } } Err(e) => { tracing::warn!( task_id = %task_id, contract_id = %contract_id, error = %e, - "Failed to get contract for supervisor state" + "Failed to check existing supervisor state" ); } } diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index 3411ec0..a29b666 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -14,8 +14,9 @@ use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use uuid::Uuid; -use crate::db::models::{CreateTaskRequest, Task, TaskSummary, UpdateTaskRequest}; +use crate::db::models::{CreateTaskRequest, PendingQuestion, Task, TaskSummary, UpdateTaskRequest}; use crate::db::repository; +use sqlx::PgPool; use crate::server::auth::Authenticated; use crate::server::handlers::mesh::{extract_auth, AuthSource}; use crate::server::messages::ApiError; @@ -748,6 +749,9 @@ pub async fn spawn_task( } else { tracing::info!(task_id = %updated_task.id, daemon_id = %daemon.id, repo_url = ?repo_url, "Task spawn command sent"); + // Save state: task spawn is a key save point (Task 3.3) + save_state_on_task_spawn(pool, request.contract_id, updated_task.id).await; + // Broadcast task status update notification to WebSocket subscribers state.broadcast_task_update(TaskUpdateNotification { task_id: updated_task.id, @@ -1770,6 +1774,17 @@ pub async fn ask_question( request.question_type.clone(), ); + // Save state: question asked is a key save point (Task 3.3) + let pending_question = PendingQuestion { + id: question_id, + question: request.question.clone(), + choices: request.choices.clone(), + context: request.context.clone(), + question_type: request.question_type.clone(), + asked_at: chrono::Utc::now(), + }; + save_state_on_question_asked(pool, contract_id, pending_question).await; + // Broadcast question as task output entry for the task's chat let question_data = serde_json::json!({ "question_id": question_id.to_string(), @@ -1864,6 +1879,9 @@ pub async fn ask_question( // Clean up the response state.cleanup_question_response(question_id); + // Clear pending question from supervisor state (Task 3.3) + clear_pending_question(pool, contract_id, question_id).await; + return ( StatusCode::OK, Json(AskQuestionResponse { @@ -1879,6 +1897,9 @@ pub async fn ask_question( // Remove the pending question on timeout state.remove_pending_question(question_id); + // Clear pending question from supervisor state on timeout (Task 3.3) + clear_pending_question(pool, contract_id, question_id).await; + return ( StatusCode::REQUEST_TIMEOUT, Json(AskQuestionResponse { @@ -2031,6 +2052,8 @@ pub struct ResumeSupervisorResponse { pub daemon_id: Option, pub resumed_from: ResumedFromInfo, pub status: String, + /// Restoration context (Task 3.4) + pub restoration: Option, } #[derive(Debug, Serialize, ToSchema)] @@ -2041,6 +2064,24 @@ pub struct ResumedFromInfo { pub message_count: i32, } +/// Information about supervisor restoration (Task 3.4) +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct RestorationInfo { + /// Previous state before restoration + pub previous_state: String, + /// How many times this supervisor has been restored + pub restoration_count: i32, + /// Number of pending questions to re-deliver + pub pending_questions_count: usize, + /// Number of tasks being waited on + pub waiting_tasks_count: usize, + /// Number of tasks spawned before crash + pub spawned_tasks_count: usize, + /// Any warnings from state validation + pub warnings: Vec, +} + /// Resume interrupted supervisor with specified mode. /// /// POST /api/v1/contracts/{id}/supervisor/resume @@ -2350,6 +2391,31 @@ pub async fn resume_supervisor( "Supervisor resume requested" ); + // Build restoration info (Task 3.4) + let pending_questions: Vec = serde_json::from_value( + supervisor_state.pending_questions.clone() + ).unwrap_or_default(); + + let restoration_info = RestorationInfo { + previous_state: supervisor_state.state.clone(), + restoration_count: supervisor_state.restoration_count, + pending_questions_count: pending_questions.len(), + waiting_tasks_count: supervisor_state.pending_task_ids.len(), + spawned_tasks_count: supervisor_state.spawned_task_ids.len(), + warnings: vec![], // Could add validation warnings here + }; + + // Re-deliver pending questions if any (Task 3.4) + if !pending_questions.is_empty() { + redeliver_pending_questions( + &state, + supervisor_state.task_id, + contract_id, + auth_info.owner_id, + &pending_questions, + ).await; + } + Json(ResumeSupervisorResponse { supervisor_task_id: supervisor_state.task_id, daemon_id: response_daemon_id, @@ -2359,6 +2425,7 @@ pub async fn resume_supervisor( message_count, }, status: response_status, + restoration: Some(restoration_info), }) .into_response() } @@ -2748,3 +2815,412 @@ pub async fn spawn_red_team_task( // It will remain pending and can be started later Ok(task) } + +// ============================================================================= +// Supervisor State Persistence Helpers (Task 3.3) +// ============================================================================= + +use crate::db::models::{ + SupervisorRestorationContext, SupervisorStateEnum, + StateValidationResult, StateRecoveryAction, +}; + +/// Save supervisor state on task spawn. +/// This is called when a supervisor spawns a new task. +pub async fn save_state_on_task_spawn( + pool: &PgPool, + contract_id: Uuid, + spawned_task_id: Uuid, +) { + if let Err(e) = repository::add_supervisor_spawned_task(pool, contract_id, spawned_task_id).await { + tracing::warn!( + contract_id = %contract_id, + spawned_task_id = %spawned_task_id, + error = %e, + "Failed to save spawned task to supervisor state" + ); + } else { + tracing::debug!( + contract_id = %contract_id, + spawned_task_id = %spawned_task_id, + "Saved spawned task to supervisor state" + ); + } + + // Also update state to working + if let Err(e) = repository::update_supervisor_detailed_state( + pool, + contract_id, + "working", + Some(&format!("Spawned task {}", spawned_task_id)), + 0, // Progress resets when spawning new work + None, + ).await { + tracing::warn!(contract_id = %contract_id, error = %e, "Failed to update supervisor state on task spawn"); + } +} + +/// Save supervisor state on question asked. +/// This is called when a supervisor asks a question and is waiting for user input. +pub async fn save_state_on_question_asked( + pool: &PgPool, + contract_id: Uuid, + question: PendingQuestion, +) { + let question_json = match serde_json::to_value(&[&question]) { + Ok(v) => v, + Err(e) => { + tracing::warn!(contract_id = %contract_id, error = %e, "Failed to serialize pending question"); + return; + } + }; + + if let Err(e) = repository::add_supervisor_pending_question(pool, contract_id, question_json).await { + tracing::warn!( + contract_id = %contract_id, + question_id = %question.id, + error = %e, + "Failed to save pending question to supervisor state" + ); + } else { + tracing::debug!( + contract_id = %contract_id, + question_id = %question.id, + "Saved pending question to supervisor state" + ); + } +} + +/// Clear pending question after it's answered. +pub async fn clear_pending_question( + pool: &PgPool, + contract_id: Uuid, + question_id: Uuid, +) { + if let Err(e) = repository::remove_supervisor_pending_question(pool, contract_id, question_id).await { + tracing::warn!( + contract_id = %contract_id, + question_id = %question_id, + error = %e, + "Failed to remove pending question from supervisor state" + ); + } + + // Update state back to working (if no more pending questions) + match repository::get_supervisor_state(pool, contract_id).await { + Ok(Some(state)) => { + let questions: Vec = serde_json::from_value(state.pending_questions.clone()) + .unwrap_or_default(); + if questions.is_empty() { + let _ = repository::update_supervisor_detailed_state( + pool, + contract_id, + "working", + Some("Resumed after user response"), + state.progress, + None, + ).await; + } + } + Ok(None) => {} + Err(e) => { + tracing::warn!(contract_id = %contract_id, error = %e, "Failed to check supervisor state after clearing question"); + } + } +} + +/// Save supervisor state on phase change. +pub async fn save_state_on_phase_change( + pool: &PgPool, + contract_id: Uuid, + new_phase: &str, +) { + if let Err(e) = repository::update_supervisor_phase(pool, contract_id, new_phase).await { + tracing::warn!( + contract_id = %contract_id, + new_phase = %new_phase, + error = %e, + "Failed to update supervisor state on phase change" + ); + } else { + tracing::info!( + contract_id = %contract_id, + new_phase = %new_phase, + "Updated supervisor state on phase change" + ); + } +} + +// ============================================================================= +// Supervisor Restoration Protocol (Task 3.4) +// ============================================================================= + +/// Validate supervisor state consistency before restoration. +/// Checks that spawned tasks and pending questions are in expected states. +pub async fn validate_supervisor_state( + pool: &PgPool, + state: &crate::db::models::SupervisorState, +) -> StateValidationResult { + let mut issues = Vec::new(); + + // Validate spawned tasks + if !state.spawned_task_ids.is_empty() { + match repository::validate_spawned_tasks(pool, &state.spawned_task_ids).await { + Ok(task_statuses) => { + for task_id in &state.spawned_task_ids { + if !task_statuses.contains_key(task_id) { + issues.push(format!("Spawned task {} not found in database", task_id)); + } + } + } + Err(e) => { + issues.push(format!("Failed to validate spawned tasks: {}", e)); + } + } + } + + // Validate pending questions + let pending_questions: Vec = serde_json::from_value(state.pending_questions.clone()) + .unwrap_or_default(); + + // Check if questions are not too old (e.g., more than 24 hours) + for question in &pending_questions { + let age = chrono::Utc::now() - question.asked_at; + if age.num_hours() > 24 { + issues.push(format!( + "Pending question {} is {} hours old, may be stale", + question.id, age.num_hours() + )); + } + } + + // Validate conversation history + if let Some(history) = state.conversation_history.as_array() { + if history.is_empty() && state.restoration_count > 0 { + issues.push("Conversation history is empty after previous restoration".to_string()); + } + } + + // Determine recovery action + let recovery_action = if issues.is_empty() { + StateRecoveryAction::Proceed + } else if issues.iter().any(|i| i.contains("not found")) { + // Missing tasks suggest corruption - use checkpoint + StateRecoveryAction::UseCheckpoint + } else if issues.len() > 3 { + // Many issues suggest manual intervention needed + StateRecoveryAction::ManualIntervention + } else { + // Minor issues - proceed with warnings + StateRecoveryAction::Proceed + }; + + StateValidationResult { + is_valid: issues.is_empty(), + issues, + recovery_action, + } +} + +/// Restore supervisor from saved state after daemon crash or task reassignment. +/// Returns restoration context to send to the supervisor. +pub async fn restore_supervisor( + pool: &PgPool, + contract_id: Uuid, + restoration_source: &str, +) -> Result { + // Step 1: Load supervisor state + let state = match repository::get_supervisor_state_for_restoration(pool, contract_id).await { + Ok(Some(s)) => s, + Ok(None) => { + tracing::warn!( + contract_id = %contract_id, + "No supervisor state found for restoration - starting fresh" + ); + return Ok(SupervisorRestorationContext { + success: true, + previous_state: SupervisorStateEnum::Initializing, + conversation_history: serde_json::json!([]), + pending_questions: vec![], + waiting_task_ids: vec![], + spawned_task_ids: vec![], + restoration_count: 0, + restoration_context_message: "No previous state found. Starting fresh.".to_string(), + warnings: vec!["No previous supervisor state found".to_string()], + }); + } + Err(e) => { + return Err(format!("Failed to load supervisor state: {}", e)); + } + }; + + // Step 2: Parse previous state + let previous_state: SupervisorStateEnum = state.state.parse().unwrap_or(SupervisorStateEnum::Interrupted); + + // Step 3: Validate state consistency + let validation = validate_supervisor_state(pool, &state).await; + let mut warnings = validation.issues.clone(); + + // Step 4: Handle based on validation result + let (conversation_history, pending_questions, restoration_message) = match validation.recovery_action { + StateRecoveryAction::Proceed => { + // State is valid, use it + let questions: Vec = serde_json::from_value(state.pending_questions.clone()) + .unwrap_or_default(); + + let message = format!( + "Restored from {} state. {} pending questions, {} spawned tasks, {} waiting tasks.", + state.state, + questions.len(), + state.spawned_task_ids.len(), + state.pending_task_ids.len() + ); + + (state.conversation_history.clone(), questions, message) + } + StateRecoveryAction::UseCheckpoint => { + // State is corrupted, try to use checkpoint + warnings.push("State validation failed, attempting checkpoint recovery".to_string()); + + // TODO: Implement checkpoint-based recovery + // For now, start with empty questions but preserve conversation + let message = "Restored from last checkpoint due to state inconsistency.".to_string(); + (state.conversation_history.clone(), vec![], message) + } + StateRecoveryAction::StartFresh => { + warnings.push("Starting fresh due to unrecoverable state".to_string()); + let message = "Starting fresh due to unrecoverable state corruption.".to_string(); + (serde_json::json!([]), vec![], message) + } + StateRecoveryAction::ManualIntervention => { + warnings.push("Manual intervention may be required".to_string()); + // Still try to restore but with warning + let questions: Vec = serde_json::from_value(state.pending_questions.clone()) + .unwrap_or_default(); + let message = "Restored with warnings - manual intervention may be required.".to_string(); + (state.conversation_history.clone(), questions, message) + } + }; + + // Step 5: Mark supervisor as restored + let new_state = match repository::mark_supervisor_restored(pool, contract_id, restoration_source).await { + Ok(s) => s, + Err(e) => { + return Err(format!("Failed to mark supervisor as restored: {}", e)); + } + }; + + // Step 6: Build restoration context + let context = SupervisorRestorationContext { + success: true, + previous_state, + conversation_history, + pending_questions, + waiting_task_ids: state.pending_task_ids.clone(), + spawned_task_ids: state.spawned_task_ids.clone(), + restoration_count: new_state.restoration_count, + restoration_context_message: restoration_message, + warnings, + }; + + tracing::info!( + contract_id = %contract_id, + restoration_source = %restoration_source, + restoration_count = new_state.restoration_count, + pending_questions_count = context.pending_questions.len(), + waiting_tasks_count = context.waiting_task_ids.len(), + spawned_tasks_count = context.spawned_task_ids.len(), + "Supervisor restoration completed" + ); + + Ok(context) +} + +/// Re-deliver pending questions to the user after restoration. +/// This ensures questions asked before crash are shown to the user again. +pub async fn redeliver_pending_questions( + state: &SharedState, + supervisor_id: Uuid, + contract_id: Uuid, + owner_id: Uuid, + questions: &[PendingQuestion], +) { + for question in questions { + // Add to in-memory question state + state.add_supervisor_question( + supervisor_id, + contract_id, + owner_id, + question.question.clone(), + question.choices.clone(), + question.context.clone(), + false, // Assume single select for restored questions + question.question_type.clone(), + ); + + // Broadcast to WebSocket clients + let question_data = serde_json::json!({ + "question_id": question.id.to_string(), + "choices": question.choices, + "context": question.context, + "question_type": question.question_type, + "is_restored": true, + "originally_asked_at": question.asked_at.to_rfc3339(), + }); + + state.broadcast_task_output(TaskOutputNotification { + task_id: supervisor_id, + owner_id: Some(owner_id), + message_type: "supervisor_question".to_string(), + content: question.question.clone(), + tool_name: None, + tool_input: Some(question_data), + is_error: None, + cost_usd: None, + duration_ms: None, + is_partial: false, + }); + + tracing::info!( + supervisor_id = %supervisor_id, + question_id = %question.id, + "Re-delivered pending question after restoration" + ); + } +} + +/// Generate restoration context message for Claude. +/// This message is injected into the conversation to inform Claude about the restoration. +pub fn generate_restoration_context_message(context: &SupervisorRestorationContext) -> String { + let mut message = String::new(); + + message.push_str("=== SUPERVISOR RESTORATION NOTICE ===\n\n"); + message.push_str(&format!("This supervisor has been restored after interruption. {}\n\n", context.restoration_context_message)); + message.push_str(&format!("Restoration count: {}\n", context.restoration_count)); + + if !context.pending_questions.is_empty() { + message.push_str(&format!("\nPending questions ({}): These have been re-delivered to the user.\n", context.pending_questions.len())); + for q in &context.pending_questions { + message.push_str(&format!(" - {}: {}\n", q.id, q.question)); + } + } + + if !context.waiting_task_ids.is_empty() { + message.push_str(&format!("\nWaiting on {} task(s) to complete. Check their status before continuing.\n", context.waiting_task_ids.len())); + } + + if !context.spawned_task_ids.is_empty() { + message.push_str(&format!("\n{} task(s) were spawned before interruption. Their status may need verification.\n", context.spawned_task_ids.len())); + } + + if !context.warnings.is_empty() { + message.push_str("\nWarnings:\n"); + for warning in &context.warnings { + message.push_str(&format!(" - {}\n", warning)); + } + } + + message.push_str("\n=== END RESTORATION NOTICE ===\n"); + + message +} -- cgit v1.2.3 From 8f144f3a811ab40e26514fe60fafbbdd35bad23d Mon Sep 17 00:00:00 2001 From: soryu Date: Sun, 1 Feb 2026 01:07:13 +0000 Subject: feat: Add Supervisor Status API endpoints (Phase 3 Task 3.5) Implement REST API endpoints for querying supervisor status: - GET /api/v1/contracts/{id}/supervisor/status Returns current supervisor status including task_id, state, phase, current_activity, progress, last_heartbeat, and pending_task_ids - GET /api/v1/contracts/{id}/supervisor/heartbeats?limit=10 Returns paginated supervisor activity history from history_events - POST /api/v1/contracts/{id}/supervisor/sync Triggers a sync to refresh the supervisor's last_activity timestamp New types added: - SupervisorStatusResponse - Status endpoint response - SupervisorHeartbeatEntry - Individual heartbeat history entry - SupervisorHeartbeatHistoryResponse - Heartbeat history with pagination - SupervisorSyncResponse - Sync endpoint response - HeartbeatHistoryQuery - Query params for heartbeats endpoint Repository helpers: - get_supervisor_status() - Combined info from supervisor_states and tasks - get_supervisor_activity_history() - Activity timeline from history_events - count_supervisor_activity_history() - Total count for pagination - sync_supervisor_state() - Refresh last_activity timestamp Error handling: - 404 for contract not found (CONTRACT_NOT_FOUND) - 404 for no supervisor (SUPERVISOR_NOT_FOUND) - Proper fallback when supervisor_state record doesn't exist but task does Co-Authored-By: Claude Opus 4.5 --- makima/src/db/models.rs | 76 +++++++ makima/src/db/repository.rs | 135 +++++++++++++ makima/src/server/handlers/contracts.rs | 342 ++++++++++++++++++++++++++++++++ makima/src/server/mod.rs | 4 + 4 files changed, 557 insertions(+) diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index 636d81a..f1e0be0 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -2395,3 +2395,79 @@ impl std::str::FromStr for NotificationSeverity { } } } + +// ============================================================================ +// Supervisor Status API Types +// ============================================================================ + +/// Response for supervisor status endpoint +#[derive(Debug, Clone, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorStatusResponse { + /// The supervisor task ID + pub task_id: Uuid, + /// Current supervisor state (from supervisor_states table) + pub state: String, + /// Current contract phase + pub phase: String, + /// Description of current activity (from task progress_summary) + pub current_activity: Option, + /// Progress percentage (0-100) + pub progress: Option, + /// When the supervisor last updated its state + pub last_heartbeat: DateTime, + /// Task IDs the supervisor is currently waiting on + pub pending_task_ids: Vec, + /// Whether the supervisor is currently running + pub is_running: bool, +} + +/// Individual heartbeat entry for history +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorHeartbeatEntry { + /// Timestamp of this heartbeat + pub timestamp: DateTime, + /// Supervisor state at this time + pub state: String, + /// Activity description at this time + pub activity: Option, + /// Progress at this time + pub progress: Option, + /// Contract phase at this time + pub phase: String, + /// Pending task IDs at this time + pub pending_task_ids: Vec, +} + +/// Response for supervisor heartbeat history endpoint +#[derive(Debug, Clone, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorHeartbeatHistoryResponse { + /// List of heartbeat entries + pub heartbeats: Vec, + /// Total count of heartbeats (for pagination) + pub total: i64, +} + +/// Response for supervisor sync endpoint +#[derive(Debug, Clone, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorSyncResponse { + /// Whether the sync was successful + pub synced: bool, + /// Current supervisor state after sync + pub state: String, + /// Optional message about the sync result + pub message: Option, +} + +/// Query parameters for heartbeat history endpoint +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct HeartbeatHistoryQuery { + /// Maximum number of heartbeats to return (default: 10) + pub limit: Option, + /// Offset for pagination (default: 0) + pub offset: Option, +} diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index b7c5af1..8055488 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -4402,3 +4402,138 @@ pub async fn get_notification_count_for_task( .map_err(RepositoryError::Database)?; Ok(result.0) } + +// ============================================================================= +// Supervisor Status API Helpers +// ============================================================================= + +/// Get supervisor status for a contract. +/// Returns combined information from supervisor_states and tasks tables. +pub async fn get_supervisor_status( + pool: &PgPool, + contract_id: Uuid, + owner_id: Uuid, +) -> Result, sqlx::Error> { + // Query to get supervisor status by joining supervisor_states with tasks + sqlx::query_as::<_, SupervisorStatusInfo>( + r#" + SELECT + ss.task_id, + COALESCE(t.status, 'unknown') as supervisor_state, + ss.phase, + t.progress_summary as current_activity, + ss.pending_task_ids, + ss.last_activity as last_heartbeat, + t.status as task_status, + t.daemon_id IS NOT NULL as is_running + FROM supervisor_states ss + JOIN tasks t ON t.id = ss.task_id + WHERE ss.contract_id = $1 + AND t.owner_id = $2 + "#, + ) + .bind(contract_id) + .bind(owner_id) + .fetch_optional(pool) + .await +} + +/// Internal struct to hold supervisor status query result +#[derive(Debug, Clone, sqlx::FromRow)] +pub struct SupervisorStatusInfo { + pub task_id: Uuid, + pub supervisor_state: String, + pub phase: String, + pub current_activity: Option, + #[sqlx(try_from = "Vec")] + pub pending_task_ids: Vec, + pub last_heartbeat: chrono::DateTime, + pub task_status: String, + pub is_running: bool, +} + +/// Get supervisor activity history from history_events table. +/// This provides a timeline of supervisor activities that can serve as "heartbeats". +pub async fn get_supervisor_activity_history( + pool: &PgPool, + contract_id: Uuid, + limit: i32, + offset: i32, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, SupervisorActivityEntry>( + r#" + SELECT + created_at as timestamp, + COALESCE(event_subtype, 'activity') as state, + event_data->>'activity' as activity, + (event_data->>'progress')::INTEGER as progress, + COALESCE(phase, 'unknown') as phase, + CASE + WHEN event_data->'pending_task_ids' IS NOT NULL + THEN ARRAY(SELECT jsonb_array_elements_text(event_data->'pending_task_ids'))::UUID[] + ELSE ARRAY[]::UUID[] + END as pending_task_ids + FROM history_events + WHERE contract_id = $1 + AND event_type IN ('supervisor', 'phase', 'task') + ORDER BY created_at DESC + LIMIT $2 OFFSET $3 + "#, + ) + .bind(contract_id) + .bind(limit) + .bind(offset) + .fetch_all(pool) + .await +} + +/// Internal struct to hold supervisor activity entry +#[derive(Debug, Clone, sqlx::FromRow)] +pub struct SupervisorActivityEntry { + pub timestamp: chrono::DateTime, + pub state: String, + pub activity: Option, + pub progress: Option, + pub phase: String, + #[sqlx(try_from = "Vec")] + pub pending_task_ids: Vec, +} + +/// Count total supervisor activity history entries for a contract. +pub async fn count_supervisor_activity_history( + pool: &PgPool, + contract_id: Uuid, +) -> Result { + let result: (i64,) = sqlx::query_as( + r#" + SELECT COUNT(*) + FROM history_events + WHERE contract_id = $1 + AND event_type IN ('supervisor', 'phase', 'task') + "#, + ) + .bind(contract_id) + .fetch_one(pool) + .await?; + Ok(result.0) +} + +/// Update supervisor state last_activity timestamp. +/// This acts as a "sync" operation to refresh the supervisor's heartbeat. +pub async fn sync_supervisor_state( + pool: &PgPool, + contract_id: Uuid, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, SupervisorState>( + r#" + UPDATE supervisor_states + SET last_activity = NOW(), + updated_at = NOW() + WHERE contract_id = $1 + RETURNING * + "#, + ) + .bind(contract_id) + .fetch_optional(pool) + .await +} diff --git a/makima/src/server/handlers/contracts.rs b/makima/src/server/handlers/contracts.rs index b15667d..b704586 100644 --- a/makima/src/server/handlers/contracts.rs +++ b/makima/src/server/handlers/contracts.rs @@ -1872,6 +1872,348 @@ async fn cleanup_contract_worktrees( } } +// ============================================================================= +// Supervisor Status API +// ============================================================================= + +/// Query parameters for supervisor heartbeat history +#[derive(Debug, Deserialize)] +pub struct HeartbeatHistoryQuery { + /// Maximum number of heartbeats to return (default: 10) + pub limit: Option, + /// Offset for pagination (default: 0) + pub offset: Option, +} + +/// Get supervisor status for a contract. +#[utoipa::path( + get, + path = "/api/v1/contracts/{id}/supervisor/status", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + responses( + (status = 200, description = "Supervisor status", body = crate::db::models::SupervisorStatusResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract or supervisor not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Contracts" +)] +pub async fn get_supervisor_status( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify contract exists and belongs to owner + let contract = match repository::get_contract_for_owner(pool, id, auth.owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("CONTRACT_NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Check if contract has a supervisor + let supervisor_task_id = match contract.supervisor_task_id { + Some(task_id) => task_id, + None => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("SUPERVISOR_NOT_FOUND", "No supervisor task for this contract")), + ) + .into_response(); + } + }; + + // Get supervisor status from supervisor_states table + match repository::get_supervisor_status(pool, id, auth.owner_id).await { + Ok(Some(status_info)) => { + // Determine if supervisor is actively running + let is_running = status_info.is_running && status_info.task_status == "running"; + + let response = crate::db::models::SupervisorStatusResponse { + task_id: status_info.task_id, + state: status_info.supervisor_state, + phase: status_info.phase, + current_activity: status_info.current_activity, + progress: None, // We don't track progress percentage yet + last_heartbeat: status_info.last_heartbeat, + pending_task_ids: status_info.pending_task_ids, + is_running, + }; + Json(response).into_response() + } + Ok(None) => { + // No supervisor state record exists, but supervisor task might exist + // Try to get info from the task itself + match repository::get_task_for_owner(pool, supervisor_task_id, auth.owner_id).await { + Ok(Some(task)) => { + let is_running = task.daemon_id.is_some() && task.status == "running"; + let response = crate::db::models::SupervisorStatusResponse { + task_id: task.id, + state: task.status.clone(), + phase: contract.phase.clone(), + current_activity: task.progress_summary.clone(), + progress: None, + last_heartbeat: task.updated_at, + pending_task_ids: Vec::new(), + is_running, + }; + Json(response).into_response() + } + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("SUPERVISOR_NOT_FOUND", "Supervisor task not found")), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to get supervisor task {}: {}", supervisor_task_id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } + } + Err(e) => { + tracing::error!("Failed to get supervisor status for contract {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Get supervisor heartbeat history for a contract. +#[utoipa::path( + get, + path = "/api/v1/contracts/{id}/supervisor/heartbeats", + params( + ("id" = Uuid, Path, description = "Contract ID"), + ("limit" = Option, Query, description = "Maximum number of heartbeats to return (default: 10)"), + ("offset" = Option, Query, description = "Offset for pagination (default: 0)") + ), + responses( + (status = 200, description = "Supervisor heartbeat history", body = crate::db::models::SupervisorHeartbeatHistoryResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Contracts" +)] +pub async fn get_supervisor_heartbeats( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, + axum::extract::Query(query): axum::extract::Query, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify contract exists and belongs to owner + match repository::get_contract_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("CONTRACT_NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + + let limit = query.limit.unwrap_or(10).min(100); // Cap at 100 + let offset = query.offset.unwrap_or(0); + + // Get activity history as heartbeats + let activities = match repository::get_supervisor_activity_history(pool, id, limit, offset).await { + Ok(activities) => activities, + Err(e) => { + tracing::error!("Failed to get supervisor heartbeats for contract {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get total count for pagination + let total = match repository::count_supervisor_activity_history(pool, id).await { + Ok(count) => count, + Err(e) => { + tracing::warn!("Failed to count supervisor heartbeats: {}", e); + activities.len() as i64 + } + }; + + // Convert to heartbeat entries + let heartbeats: Vec = activities + .into_iter() + .map(|a| crate::db::models::SupervisorHeartbeatEntry { + timestamp: a.timestamp, + state: a.state, + activity: a.activity, + progress: a.progress.map(|p| p as u8), + phase: a.phase, + pending_task_ids: a.pending_task_ids, + }) + .collect(); + + Json(crate::db::models::SupervisorHeartbeatHistoryResponse { + heartbeats, + total, + }) + .into_response() +} + +/// Sync supervisor state (refresh last_activity timestamp). +#[utoipa::path( + post, + path = "/api/v1/contracts/{id}/supervisor/sync", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + responses( + (status = 200, description = "Supervisor synced", body = crate::db::models::SupervisorSyncResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract or supervisor not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Contracts" +)] +pub async fn sync_supervisor( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify contract exists and belongs to owner + let contract = match repository::get_contract_for_owner(pool, id, auth.owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("CONTRACT_NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Check if contract has a supervisor + if contract.supervisor_task_id.is_none() { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("SUPERVISOR_NOT_FOUND", "No supervisor task for this contract")), + ) + .into_response(); + } + + // Sync supervisor state (update last_activity) + match repository::sync_supervisor_state(pool, id).await { + Ok(Some(_state)) => { + // Get task status to determine current state + let task_status = if let Some(task_id) = contract.supervisor_task_id { + match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { + Ok(Some(task)) => task.status, + _ => "unknown".to_string(), + } + } else { + "unknown".to_string() + }; + + Json(crate::db::models::SupervisorSyncResponse { + synced: true, + state: task_status, + message: Some("Supervisor state synced successfully".to_string()), + }) + .into_response() + } + Ok(None) => { + // No supervisor state exists, return not found + ( + StatusCode::NOT_FOUND, + Json(ApiError::new("SUPERVISOR_NOT_FOUND", "No supervisor state found for this contract")), + ) + .into_response() + } + Err(e) => { + tracing::error!("Failed to sync supervisor state for contract {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + // ============================================================================= // Tests // ============================================================================= diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index 8456006..e5415ae 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -175,6 +175,10 @@ pub fn make_router(state: SharedState) -> Router { // Contract supervisor resume endpoints .route("/contracts/{id}/supervisor/resume", post(mesh_supervisor::resume_supervisor)) .route("/contracts/{id}/supervisor/conversation/rewind", post(mesh_supervisor::rewind_conversation)) + // Contract supervisor status endpoints + .route("/contracts/{id}/supervisor/status", get(contracts::get_supervisor_status)) + .route("/contracts/{id}/supervisor/heartbeats", get(contracts::get_supervisor_heartbeats)) + .route("/contracts/{id}/supervisor/sync", post(contracts::sync_supervisor)) // History endpoints .route("/contracts/{id}/history", get(history::get_contract_history)) .route("/contracts/{id}/supervisor/conversation", get(history::get_supervisor_conversation)) -- cgit v1.2.3 -- cgit v1.2.3