summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--makima/migrations/20260201000000_supervisor_heartbeats.sql36
-rw-r--r--makima/migrations/20260201000001_enhanced_supervisor_state.sql56
-rw-r--r--makima/src/daemon/ws/protocol.rs46
-rw-r--r--makima/src/db/models.rs379
-rw-r--r--makima/src/db/repository.rs599
-rw-r--r--makima/src/server/handlers/contracts.rs358
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs207
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs478
-rw-r--r--makima/src/server/mod.rs4
9 files changed, 2136 insertions, 27 deletions
diff --git a/makima/migrations/20260201000000_supervisor_heartbeats.sql b/makima/migrations/20260201000000_supervisor_heartbeats.sql
new file mode 100644
index 0000000..8595f71
--- /dev/null
+++ b/makima/migrations/20260201000000_supervisor_heartbeats.sql
@@ -0,0 +1,36 @@
+-- Create supervisor_heartbeats table for tracking supervisor state over time.
+-- This enables detection of dead/stale supervisors and provides audit trail.
+
+CREATE TABLE IF NOT EXISTS supervisor_heartbeats (
+ id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
+ supervisor_task_id UUID NOT NULL REFERENCES tasks(id) ON DELETE CASCADE,
+ contract_id UUID NOT NULL REFERENCES contracts(id) ON DELETE CASCADE,
+ state VARCHAR(50) NOT NULL,
+ phase VARCHAR(50) NOT NULL,
+ current_activity TEXT,
+ progress INTEGER DEFAULT 0 CHECK (progress >= 0 AND progress <= 100),
+ pending_task_ids UUID[] DEFAULT ARRAY[]::UUID[],
+ timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW()
+);
+
+-- Index for finding heartbeats by supervisor task
+CREATE INDEX idx_heartbeats_supervisor ON supervisor_heartbeats(supervisor_task_id);
+
+-- Index for finding heartbeats by timestamp (for cleanup and monitoring)
+CREATE INDEX idx_heartbeats_timestamp ON supervisor_heartbeats(timestamp);
+
+-- Index for finding heartbeats by contract
+CREATE INDEX idx_heartbeats_contract ON supervisor_heartbeats(contract_id);
+
+-- Composite index for finding latest heartbeat per supervisor
+CREATE INDEX idx_heartbeats_supervisor_timestamp ON supervisor_heartbeats(supervisor_task_id, timestamp DESC);
+
+COMMENT ON TABLE supervisor_heartbeats IS 'Historical record of supervisor heartbeats for monitoring and dead supervisor detection';
+COMMENT ON COLUMN supervisor_heartbeats.state IS 'Supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted';
+COMMENT ON COLUMN supervisor_heartbeats.phase IS 'Current contract phase when heartbeat was sent';
+COMMENT ON COLUMN supervisor_heartbeats.current_activity IS 'Human-readable description of what the supervisor is doing';
+COMMENT ON COLUMN supervisor_heartbeats.progress IS 'Progress percentage (0-100)';
+COMMENT ON COLUMN supervisor_heartbeats.pending_task_ids IS 'Array of task IDs the supervisor is waiting on';
+
+-- Note: Cleanup of old heartbeats (24 hour TTL) should be done by a scheduled job
+-- or application-level cleanup, not a CHECK constraint (which can't reference NOW())
diff --git a/makima/migrations/20260201000001_enhanced_supervisor_state.sql b/makima/migrations/20260201000001_enhanced_supervisor_state.sql
new file mode 100644
index 0000000..5411b73
--- /dev/null
+++ b/makima/migrations/20260201000001_enhanced_supervisor_state.sql
@@ -0,0 +1,56 @@
+-- Enhanced supervisor state persistence for restoration after crashes.
+-- Adds additional fields to supervisor_states to track detailed state for recovery.
+
+-- Add state tracking field (matches SupervisorStateEnum: initializing, idle, working,
+-- waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted)
+ALTER TABLE supervisor_states
+ ADD COLUMN IF NOT EXISTS state VARCHAR(50) NOT NULL DEFAULT 'initializing';
+
+-- Add current activity description for monitoring
+ALTER TABLE supervisor_states
+ ADD COLUMN IF NOT EXISTS current_activity TEXT;
+
+-- Add progress percentage (0-100)
+ALTER TABLE supervisor_states
+ ADD COLUMN IF NOT EXISTS progress INTEGER DEFAULT 0
+ CHECK (progress >= 0 AND progress <= 100);
+
+-- Add error message for failed states
+ALTER TABLE supervisor_states
+ ADD COLUMN IF NOT EXISTS error_message TEXT;
+
+-- Add spawned task IDs (tasks this supervisor has created)
+ALTER TABLE supervisor_states
+ ADD COLUMN IF NOT EXISTS spawned_task_ids UUID[] DEFAULT ARRAY[]::UUID[];
+
+-- Add pending questions (questions waiting for user response)
+ALTER TABLE supervisor_states
+ ADD COLUMN IF NOT EXISTS pending_questions JSONB DEFAULT '[]';
+
+-- Add restoration metadata
+ALTER TABLE supervisor_states
+ ADD COLUMN IF NOT EXISTS restoration_count INTEGER DEFAULT 0;
+
+ALTER TABLE supervisor_states
+ ADD COLUMN IF NOT EXISTS last_restored_at TIMESTAMPTZ;
+
+ALTER TABLE supervisor_states
+ ADD COLUMN IF NOT EXISTS restoration_source VARCHAR(50);
+
+-- Index for finding supervisors by state (useful for finding blocked/failed supervisors)
+CREATE INDEX IF NOT EXISTS idx_supervisor_states_state ON supervisor_states(state);
+
+-- Index for finding supervisors with pending questions
+CREATE INDEX IF NOT EXISTS idx_supervisor_states_pending_questions
+ ON supervisor_states USING gin(pending_questions)
+ WHERE pending_questions != '[]'::jsonb;
+
+COMMENT ON COLUMN supervisor_states.state IS 'Current supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted';
+COMMENT ON COLUMN supervisor_states.current_activity IS 'Human-readable description of current activity';
+COMMENT ON COLUMN supervisor_states.progress IS 'Progress percentage (0-100)';
+COMMENT ON COLUMN supervisor_states.error_message IS 'Error message when state is failed or blocked';
+COMMENT ON COLUMN supervisor_states.spawned_task_ids IS 'Array of task UUIDs spawned by this supervisor';
+COMMENT ON COLUMN supervisor_states.pending_questions IS 'Array of questions awaiting user response: [{id, question, choices, context, asked_at}]';
+COMMENT ON COLUMN supervisor_states.restoration_count IS 'Number of times this supervisor has been restored after interruption';
+COMMENT ON COLUMN supervisor_states.last_restored_at IS 'Timestamp of last restoration';
+COMMENT ON COLUMN supervisor_states.restoration_source IS 'Source of last restoration: daemon_restart, task_reassignment, manual';
diff --git a/makima/src/daemon/ws/protocol.rs b/makima/src/daemon/ws/protocol.rs
index bfe6326..5c88038 100644
--- a/makima/src/daemon/ws/protocol.rs
+++ b/makima/src/daemon/ws/protocol.rs
@@ -2,6 +2,7 @@
//!
//! These types mirror the server's protocol exactly for compatibility.
+use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
@@ -26,6 +27,29 @@ pub enum DaemonMessage {
active_tasks: Vec<Uuid>,
},
+ /// Enhanced supervisor heartbeat with detailed state.
+ /// Sent periodically by supervisor tasks to report their current state.
+ SupervisorHeartbeat {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ #[serde(rename = "contractId")]
+ contract_id: Uuid,
+ /// Supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted
+ state: String,
+ /// Current contract phase
+ phase: String,
+ /// Description of current activity
+ #[serde(rename = "currentActivity")]
+ current_activity: Option<String>,
+ /// Progress percentage (0-100)
+ progress: u8,
+ /// Task IDs the supervisor is waiting on
+ #[serde(rename = "pendingTaskIds")]
+ pending_task_ids: Vec<Uuid>,
+ /// Timestamp of this heartbeat
+ timestamp: DateTime<Utc>,
+ },
+
/// Task output streaming (stdout/stderr from Claude Code).
TaskOutput {
#[serde(rename = "taskId")]
@@ -857,6 +881,28 @@ impl DaemonMessage {
pub fn revoke_tool_key(task_id: Uuid) -> Self {
Self::RevokeToolKey { task_id }
}
+
+ /// Create a supervisor heartbeat message.
+ pub fn supervisor_heartbeat(
+ task_id: Uuid,
+ contract_id: Uuid,
+ state: &str,
+ phase: &str,
+ current_activity: Option<String>,
+ progress: u8,
+ pending_task_ids: Vec<Uuid>,
+ ) -> Self {
+ Self::SupervisorHeartbeat {
+ task_id,
+ contract_id,
+ state: state.to_string(),
+ phase: phase.to_string(),
+ current_activity,
+ progress,
+ pending_task_ids,
+ timestamp: Utc::now(),
+ }
+ }
}
#[cfg(test)]
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs
index 636d81a..abdcce6 100644
--- a/makima/src/db/models.rs
+++ b/makima/src/db/models.rs
@@ -1971,6 +1971,50 @@ pub struct SupervisorState {
pub last_activity: DateTime<Utc>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
+ /// Current supervisor state (initializing, idle, working, waiting_for_user, etc.)
+ pub state: String,
+ /// Human-readable description of current activity
+ pub current_activity: Option<String>,
+ /// Progress percentage (0-100)
+ pub progress: i32,
+ /// Error message when state is failed or blocked
+ pub error_message: Option<String>,
+ /// Tasks spawned by this supervisor
+ #[sqlx(try_from = "Vec<Uuid>")]
+ pub spawned_task_ids: Vec<Uuid>,
+ /// Pending questions awaiting user response
+ #[sqlx(json)]
+ pub pending_questions: serde_json::Value,
+ /// Number of times this supervisor has been restored
+ pub restoration_count: i32,
+ /// Timestamp of last restoration
+ pub last_restored_at: Option<DateTime<Utc>>,
+ /// Source of last restoration (daemon_restart, task_reassignment, manual)
+ pub restoration_source: Option<String>,
+}
+
+/// Pending question structure for supervisor state
+#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct PendingQuestion {
+ /// Unique question ID
+ pub id: Uuid,
+ /// The question text
+ pub question: String,
+ /// Optional choices (empty for free-form)
+ #[serde(default)]
+ pub choices: Vec<String>,
+ /// Optional context
+ pub context: Option<String>,
+ /// Question type: general, phase_confirmation, contract_complete
+ #[serde(default = "default_question_type")]
+ pub question_type: String,
+ /// When the question was asked
+ pub asked_at: DateTime<Utc>,
+}
+
+fn default_question_type() -> String {
+ "general".to_string()
}
/// Request to update supervisor state
@@ -1983,6 +2027,64 @@ pub struct UpdateSupervisorStateRequest {
pub pending_task_ids: Option<Vec<Uuid>>,
/// Current contract phase
pub phase: Option<String>,
+ /// Current supervisor state
+ pub state: Option<String>,
+ /// Current activity description
+ pub current_activity: Option<String>,
+ /// Progress percentage
+ pub progress: Option<i32>,
+ /// Error message
+ pub error_message: Option<String>,
+ /// Spawned task IDs
+ pub spawned_task_ids: Option<Vec<Uuid>>,
+ /// Pending questions
+ pub pending_questions: Option<serde_json::Value>,
+}
+
+/// Restoration context returned when restoring a supervisor
+#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct SupervisorRestorationContext {
+ /// Whether restoration was successful
+ pub success: bool,
+ /// Previous state before restoration
+ pub previous_state: SupervisorStateEnum,
+ /// Restored conversation history
+ pub conversation_history: serde_json::Value,
+ /// Pending questions that need re-delivery
+ pub pending_questions: Vec<PendingQuestion>,
+ /// Tasks still being waited on
+ pub waiting_task_ids: Vec<Uuid>,
+ /// Spawned tasks to check status of
+ pub spawned_task_ids: Vec<Uuid>,
+ /// Restoration count (incremented)
+ pub restoration_count: i32,
+ /// Context message for Claude
+ pub restoration_context_message: String,
+ /// Any warnings during restoration
+ pub warnings: Vec<String>,
+}
+
+/// Validation result for supervisor state consistency
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct StateValidationResult {
+ pub is_valid: bool,
+ pub issues: Vec<String>,
+ /// Suggested recovery action
+ pub recovery_action: StateRecoveryAction,
+}
+
+/// Action to take when state validation fails
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub enum StateRecoveryAction {
+ /// State is valid, proceed with restoration
+ Proceed,
+ /// Start from last checkpoint
+ UseCheckpoint,
+ /// Start fresh
+ StartFresh,
+ /// Manual intervention required
+ ManualIntervention,
}
// ============================================================================
@@ -2339,6 +2441,111 @@ pub struct CheckpointPatchInfo {
// Red Team Types
// ============================================================================
+// =============================================================================
+// Supervisor State and Heartbeat Types
+// =============================================================================
+
+/// Supervisor state for contract supervisor tasks.
+/// Captures detailed activity state for monitoring and restoration.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "snake_case")]
+pub enum SupervisorStateEnum {
+ /// Supervisor is starting up
+ Initializing,
+ /// Supervisor is idle, waiting for work
+ Idle,
+ /// Supervisor is actively working
+ Working,
+ /// Supervisor is waiting for user input/confirmation
+ WaitingForUser,
+ /// Supervisor is waiting for spawned tasks to complete
+ WaitingForTasks,
+ /// Supervisor is blocked (external dependency, error, etc.)
+ Blocked,
+ /// Supervisor has completed its contract
+ Completed,
+ /// Supervisor has failed
+ Failed,
+ /// Supervisor was interrupted (daemon crash, etc.)
+ Interrupted,
+}
+
+impl Default for SupervisorStateEnum {
+ fn default() -> Self {
+ SupervisorStateEnum::Initializing
+ }
+}
+
+impl std::fmt::Display for SupervisorStateEnum {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ SupervisorStateEnum::Initializing => write!(f, "initializing"),
+ SupervisorStateEnum::Idle => write!(f, "idle"),
+ SupervisorStateEnum::Working => write!(f, "working"),
+ SupervisorStateEnum::WaitingForUser => write!(f, "waiting_for_user"),
+ SupervisorStateEnum::WaitingForTasks => write!(f, "waiting_for_tasks"),
+ SupervisorStateEnum::Blocked => write!(f, "blocked"),
+ SupervisorStateEnum::Completed => write!(f, "completed"),
+ SupervisorStateEnum::Failed => write!(f, "failed"),
+ SupervisorStateEnum::Interrupted => write!(f, "interrupted"),
+ }
+ }
+}
+
+impl std::str::FromStr for SupervisorStateEnum {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.to_lowercase().as_str() {
+ "initializing" => Ok(SupervisorStateEnum::Initializing),
+ "idle" => Ok(SupervisorStateEnum::Idle),
+ "working" => Ok(SupervisorStateEnum::Working),
+ "waiting_for_user" | "waitingforuser" => Ok(SupervisorStateEnum::WaitingForUser),
+ "waiting_for_tasks" | "waitingfortasks" => Ok(SupervisorStateEnum::WaitingForTasks),
+ "blocked" => Ok(SupervisorStateEnum::Blocked),
+ "completed" => Ok(SupervisorStateEnum::Completed),
+ "failed" => Ok(SupervisorStateEnum::Failed),
+ "interrupted" => Ok(SupervisorStateEnum::Interrupted),
+ _ => Err(format!("Unknown supervisor state: {}", s)),
+ }
+ }
+}
+
+/// Enhanced heartbeat record for supervisor task monitoring.
+/// Stored in the database for historical analysis and dead supervisor detection.
+#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct SupervisorHeartbeatRecord {
+ pub id: Uuid,
+ pub supervisor_task_id: Uuid,
+ pub contract_id: Uuid,
+ pub state: String,
+ pub phase: String,
+ pub current_activity: Option<String>,
+ pub progress: i32,
+ #[sqlx(try_from = "Vec<Uuid>")]
+ pub pending_task_ids: Vec<Uuid>,
+ pub timestamp: DateTime<Utc>,
+}
+
+/// Request payload for sending a supervisor heartbeat.
+#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct SupervisorHeartbeatRequest {
+ pub task_id: Uuid,
+ pub contract_id: Uuid,
+ pub state: SupervisorStateEnum,
+ pub phase: String,
+ pub current_activity: Option<String>,
+ /// Progress percentage (0-100)
+ pub progress: u8,
+ pub pending_task_ids: Vec<Uuid>,
+}
+
+// =============================================================================
+// Red Team Types
+// =============================================================================
+
/// Red Team notification record
#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
@@ -2395,3 +2602,175 @@ impl std::str::FromStr for NotificationSeverity {
}
}
}
+
+// ============================================================================
+// Supervisor Status API Types
+// ============================================================================
+
+/// Response for supervisor status endpoint
+#[derive(Debug, Clone, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct SupervisorStatusResponse {
+ /// The supervisor task ID
+ pub task_id: Uuid,
+ /// Current supervisor state (from supervisor_states table)
+ pub state: String,
+ /// Current contract phase
+ pub phase: String,
+ /// Description of current activity (from task progress_summary)
+ pub current_activity: Option<String>,
+ /// Progress percentage (0-100)
+ pub progress: Option<u8>,
+ /// When the supervisor last updated its state
+ pub last_heartbeat: DateTime<Utc>,
+ /// Task IDs the supervisor is currently waiting on
+ pub pending_task_ids: Vec<Uuid>,
+ /// Whether the supervisor is currently running
+ pub is_running: bool,
+}
+
+/// Individual heartbeat entry for history
+#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct SupervisorHeartbeatEntry {
+ /// Timestamp of this heartbeat
+ pub timestamp: DateTime<Utc>,
+ /// Supervisor state at this time
+ pub state: String,
+ /// Activity description at this time
+ pub activity: Option<String>,
+ /// Progress at this time
+ pub progress: Option<u8>,
+ /// Contract phase at this time
+ pub phase: String,
+ /// Pending task IDs at this time
+ pub pending_task_ids: Vec<Uuid>,
+}
+
+/// Response for supervisor heartbeat history endpoint
+#[derive(Debug, Clone, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct SupervisorHeartbeatHistoryResponse {
+ /// List of heartbeat entries
+ pub heartbeats: Vec<SupervisorHeartbeatEntry>,
+ /// Total count of heartbeats (for pagination)
+ pub total: i64,
+}
+
+/// Response for supervisor sync endpoint
+#[derive(Debug, Clone, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct SupervisorSyncResponse {
+ /// Whether the sync was successful
+ pub synced: bool,
+ /// Current supervisor state after sync
+ pub state: String,
+ /// Optional message about the sync result
+ pub message: Option<String>,
+}
+
+/// Query parameters for heartbeat history endpoint
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct HeartbeatHistoryQuery {
+ /// Maximum number of heartbeats to return (default: 10)
+ pub limit: Option<i32>,
+ /// Offset for pagination (default: 0)
+ pub offset: Option<i32>,
+}
+
+// =============================================================================
+// Unit Tests
+// =============================================================================
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use uuid::Uuid;
+
+ #[test]
+ fn test_supervisor_state_enum_display() {
+ assert_eq!(SupervisorStateEnum::Initializing.to_string(), "initializing");
+ assert_eq!(SupervisorStateEnum::Idle.to_string(), "idle");
+ assert_eq!(SupervisorStateEnum::Working.to_string(), "working");
+ assert_eq!(SupervisorStateEnum::WaitingForUser.to_string(), "waiting_for_user");
+ assert_eq!(SupervisorStateEnum::WaitingForTasks.to_string(), "waiting_for_tasks");
+ assert_eq!(SupervisorStateEnum::Blocked.to_string(), "blocked");
+ assert_eq!(SupervisorStateEnum::Completed.to_string(), "completed");
+ assert_eq!(SupervisorStateEnum::Failed.to_string(), "failed");
+ assert_eq!(SupervisorStateEnum::Interrupted.to_string(), "interrupted");
+ }
+
+ #[test]
+ fn test_supervisor_state_enum_from_str() {
+ // Standard lowercase
+ assert_eq!("initializing".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Initializing);
+ assert_eq!("idle".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Idle);
+ assert_eq!("working".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Working);
+ assert_eq!("waiting_for_user".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::WaitingForUser);
+ assert_eq!("waiting_for_tasks".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::WaitingForTasks);
+ assert_eq!("blocked".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Blocked);
+ assert_eq!("completed".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Completed);
+ assert_eq!("failed".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Failed);
+ assert_eq!("interrupted".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Interrupted);
+
+ // Case insensitive
+ assert_eq!("WORKING".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Working);
+ assert_eq!("Working".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Working);
+
+ // Alternative formats
+ assert_eq!("waitingforuser".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::WaitingForUser);
+ assert_eq!("waitingfortasks".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::WaitingForTasks);
+
+ // Invalid state
+ assert!("invalid_state".parse::<SupervisorStateEnum>().is_err());
+ }
+
+ #[test]
+ fn test_supervisor_state_enum_serialization() {
+ // Test JSON serialization
+ let state = SupervisorStateEnum::Working;
+ let json = serde_json::to_string(&state).unwrap();
+ assert_eq!(json, "\"working\"");
+
+ // Test JSON deserialization
+ let deserialized: SupervisorStateEnum = serde_json::from_str("\"working\"").unwrap();
+ assert_eq!(deserialized, SupervisorStateEnum::Working);
+
+ // Test underscore variants
+ let json = "\"waiting_for_user\"";
+ let deserialized: SupervisorStateEnum = serde_json::from_str(json).unwrap();
+ assert_eq!(deserialized, SupervisorStateEnum::WaitingForUser);
+ }
+
+ #[test]
+ fn test_supervisor_state_enum_default() {
+ let default_state = SupervisorStateEnum::default();
+ assert_eq!(default_state, SupervisorStateEnum::Initializing);
+ }
+
+ #[test]
+ fn test_supervisor_heartbeat_request_serialization() {
+ let request = SupervisorHeartbeatRequest {
+ task_id: Uuid::nil(),
+ contract_id: Uuid::nil(),
+ state: SupervisorStateEnum::Working,
+ phase: "execute".to_string(),
+ current_activity: Some("Implementing feature".to_string()),
+ progress: 50,
+ pending_task_ids: vec![Uuid::nil()],
+ };
+
+ let json = serde_json::to_string(&request).unwrap();
+ assert!(json.contains("\"state\":\"working\""));
+ assert!(json.contains("\"phase\":\"execute\""));
+ assert!(json.contains("\"progress\":50"));
+ assert!(json.contains("\"currentActivity\":\"Implementing feature\""));
+
+ // Test deserialization
+ let deserialized: SupervisorHeartbeatRequest = serde_json::from_str(&json).unwrap();
+ assert_eq!(deserialized.state, SupervisorStateEnum::Working);
+ assert_eq!(deserialized.phase, "execute");
+ assert_eq!(deserialized.progress, 50);
+ }
+}
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index b7c5af1..e308df7 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -12,9 +12,9 @@ use super::models::{
CreateFileRequest, CreateTaskRequest, CreateTemplateRequest, Daemon, DaemonTaskAssignment,
DaemonWithCapacity, DeliverableDefinition, File, FileSummary, FileVersion, HistoryEvent,
HistoryQueryFilters, MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult,
- PhaseConfig, PhaseDefinition, RedTeamNotification, SupervisorState, Task, TaskCheckpoint,
- TaskEvent, TaskSummary, UpdateContractRequest, UpdateFileRequest, UpdateTaskRequest,
- UpdateTemplateRequest,
+ PhaseConfig, PhaseDefinition, RedTeamNotification, SupervisorHeartbeatRecord, SupervisorState,
+ Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest, UpdateFileRequest,
+ UpdateTaskRequest, UpdateTemplateRequest,
};
/// Repository error types.
@@ -3404,6 +3404,464 @@ pub async fn update_supervisor_pending_tasks(
.await
}
+/// Update supervisor state with detailed activity tracking.
+/// Called at key save points: LLM response, task spawn, question asked, phase change.
+pub async fn update_supervisor_detailed_state(
+ pool: &PgPool,
+ contract_id: Uuid,
+ state: &str,
+ current_activity: Option<&str>,
+ progress: i32,
+ error_message: Option<&str>,
+) -> Result<SupervisorState, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorState>(
+ r#"
+ UPDATE supervisor_states
+ SET state = $1,
+ current_activity = $2,
+ progress = $3,
+ error_message = $4,
+ last_activity = NOW(),
+ updated_at = NOW()
+ WHERE contract_id = $5
+ RETURNING *
+ "#,
+ )
+ .bind(state)
+ .bind(current_activity)
+ .bind(progress)
+ .bind(error_message)
+ .bind(contract_id)
+ .fetch_one(pool)
+ .await
+}
+
+/// Add a spawned task ID to the supervisor's list.
+pub async fn add_supervisor_spawned_task(
+ pool: &PgPool,
+ contract_id: Uuid,
+ task_id: Uuid,
+) -> Result<SupervisorState, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorState>(
+ r#"
+ UPDATE supervisor_states
+ SET spawned_task_ids = array_append(spawned_task_ids, $1),
+ last_activity = NOW(),
+ updated_at = NOW()
+ WHERE contract_id = $2
+ RETURNING *
+ "#,
+ )
+ .bind(task_id)
+ .bind(contract_id)
+ .fetch_one(pool)
+ .await
+}
+
+/// Add a pending question to the supervisor state.
+pub async fn add_supervisor_pending_question(
+ pool: &PgPool,
+ contract_id: Uuid,
+ question: serde_json::Value,
+) -> Result<SupervisorState, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorState>(
+ r#"
+ UPDATE supervisor_states
+ SET pending_questions = pending_questions || $1::jsonb,
+ state = 'waiting_for_user',
+ last_activity = NOW(),
+ updated_at = NOW()
+ WHERE contract_id = $2
+ RETURNING *
+ "#,
+ )
+ .bind(question)
+ .bind(contract_id)
+ .fetch_one(pool)
+ .await
+}
+
+/// Remove a pending question by ID.
+pub async fn remove_supervisor_pending_question(
+ pool: &PgPool,
+ contract_id: Uuid,
+ question_id: Uuid,
+) -> Result<SupervisorState, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorState>(
+ r#"
+ UPDATE supervisor_states
+ SET pending_questions = (
+ SELECT COALESCE(jsonb_agg(elem), '[]'::jsonb)
+ FROM jsonb_array_elements(pending_questions) elem
+ WHERE (elem->>'id')::uuid != $1
+ ),
+ last_activity = NOW(),
+ updated_at = NOW()
+ WHERE contract_id = $2
+ RETURNING *
+ "#,
+ )
+ .bind(question_id)
+ .bind(contract_id)
+ .fetch_one(pool)
+ .await
+}
+
+/// Comprehensive state save - used at major save points.
+pub async fn save_supervisor_state_full(
+ pool: &PgPool,
+ contract_id: Uuid,
+ task_id: Uuid,
+ conversation_history: serde_json::Value,
+ pending_task_ids: &[Uuid],
+ phase: &str,
+ state: &str,
+ current_activity: Option<&str>,
+ progress: i32,
+ error_message: Option<&str>,
+ spawned_task_ids: &[Uuid],
+ pending_questions: serde_json::Value,
+) -> Result<SupervisorState, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorState>(
+ r#"
+ INSERT INTO supervisor_states (
+ contract_id, task_id, conversation_history, pending_task_ids, phase,
+ state, current_activity, progress, error_message, spawned_task_ids,
+ pending_questions, last_activity
+ )
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW())
+ ON CONFLICT (contract_id) DO UPDATE SET
+ task_id = EXCLUDED.task_id,
+ conversation_history = EXCLUDED.conversation_history,
+ pending_task_ids = EXCLUDED.pending_task_ids,
+ phase = EXCLUDED.phase,
+ state = EXCLUDED.state,
+ current_activity = EXCLUDED.current_activity,
+ progress = EXCLUDED.progress,
+ error_message = EXCLUDED.error_message,
+ spawned_task_ids = EXCLUDED.spawned_task_ids,
+ pending_questions = EXCLUDED.pending_questions,
+ last_activity = NOW(),
+ updated_at = NOW()
+ RETURNING *
+ "#,
+ )
+ .bind(contract_id)
+ .bind(task_id)
+ .bind(conversation_history)
+ .bind(pending_task_ids)
+ .bind(phase)
+ .bind(state)
+ .bind(current_activity)
+ .bind(progress)
+ .bind(error_message)
+ .bind(spawned_task_ids)
+ .bind(pending_questions)
+ .fetch_one(pool)
+ .await
+}
+
+/// Mark supervisor as restored from a crash/interruption.
+pub async fn mark_supervisor_restored(
+ pool: &PgPool,
+ contract_id: Uuid,
+ restoration_source: &str,
+) -> Result<SupervisorState, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorState>(
+ r#"
+ UPDATE supervisor_states
+ SET restoration_count = restoration_count + 1,
+ last_restored_at = NOW(),
+ restoration_source = $1,
+ state = 'initializing',
+ error_message = NULL,
+ last_activity = NOW(),
+ updated_at = NOW()
+ WHERE contract_id = $2
+ RETURNING *
+ "#,
+ )
+ .bind(restoration_source)
+ .bind(contract_id)
+ .fetch_one(pool)
+ .await
+}
+
+/// Get supervisors with pending questions (for re-delivery after restoration).
+pub async fn get_supervisors_with_pending_questions(
+ pool: &PgPool,
+ owner_id: Uuid,
+) -> Result<Vec<SupervisorState>, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorState>(
+ r#"
+ SELECT ss.*
+ FROM supervisor_states ss
+ JOIN contracts c ON c.id = ss.contract_id
+ WHERE c.owner_id = $1
+ AND ss.pending_questions != '[]'::jsonb
+ AND c.status = 'active'
+ ORDER BY ss.last_activity DESC
+ "#,
+ )
+ .bind(owner_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Get supervisor state with full details for restoration.
+/// Includes validation info.
+pub async fn get_supervisor_state_for_restoration(
+ pool: &PgPool,
+ contract_id: Uuid,
+) -> Result<Option<SupervisorState>, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorState>(
+ r#"
+ SELECT * FROM supervisor_states WHERE contract_id = $1
+ "#,
+ )
+ .bind(contract_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Validate spawned tasks are in expected states.
+/// Returns map of task_id -> (status, updated_at).
+pub async fn validate_spawned_tasks(
+ pool: &PgPool,
+ task_ids: &[Uuid],
+) -> Result<std::collections::HashMap<Uuid, (String, chrono::DateTime<Utc>)>, sqlx::Error> {
+ use sqlx::Row;
+
+ let rows = sqlx::query(
+ r#"
+ SELECT id, status, updated_at
+ FROM tasks
+ WHERE id = ANY($1)
+ "#,
+ )
+ .bind(task_ids)
+ .fetch_all(pool)
+ .await?;
+
+ let mut result = std::collections::HashMap::new();
+ for row in rows {
+ let id: Uuid = row.get("id");
+ let status: String = row.get("status");
+ let updated_at: chrono::DateTime<Utc> = row.get("updated_at");
+ result.insert(id, (status, updated_at));
+ }
+ Ok(result)
+}
+
+/// Update supervisor state when phase changes.
+pub async fn update_supervisor_phase(
+ pool: &PgPool,
+ contract_id: Uuid,
+ new_phase: &str,
+) -> Result<SupervisorState, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorState>(
+ r#"
+ UPDATE supervisor_states
+ SET phase = $1,
+ state = 'working',
+ current_activity = 'Phase changed to ' || $1,
+ last_activity = NOW(),
+ updated_at = NOW()
+ WHERE contract_id = $2
+ RETURNING *
+ "#,
+ )
+ .bind(new_phase)
+ .bind(contract_id)
+ .fetch_one(pool)
+ .await
+}
+
+/// Update supervisor state on heartbeat (lightweight update).
+pub async fn update_supervisor_heartbeat_state(
+ pool: &PgPool,
+ contract_id: Uuid,
+ state: &str,
+ current_activity: Option<&str>,
+ progress: i32,
+ pending_task_ids: &[Uuid],
+) -> Result<(), sqlx::Error> {
+ sqlx::query(
+ r#"
+ UPDATE supervisor_states
+ SET state = $1,
+ current_activity = $2,
+ progress = $3,
+ pending_task_ids = $4,
+ last_activity = NOW(),
+ updated_at = NOW()
+ WHERE contract_id = $5
+ "#,
+ )
+ .bind(state)
+ .bind(current_activity)
+ .bind(progress)
+ .bind(pending_task_ids)
+ .bind(contract_id)
+ .execute(pool)
+ .await?;
+ Ok(())
+}
+
+// ============================================================================
+// Supervisor Heartbeats
+// ============================================================================
+
+/// Record a supervisor heartbeat.
+/// This creates a historical record for monitoring and dead supervisor detection.
+pub async fn create_supervisor_heartbeat(
+ pool: &PgPool,
+ supervisor_task_id: Uuid,
+ contract_id: Uuid,
+ state: &str,
+ phase: &str,
+ current_activity: Option<&str>,
+ progress: i32,
+ pending_task_ids: &[Uuid],
+) -> Result<SupervisorHeartbeatRecord, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorHeartbeatRecord>(
+ r#"
+ INSERT INTO supervisor_heartbeats (
+ supervisor_task_id, contract_id, state, phase, current_activity, progress, pending_task_ids, timestamp
+ )
+ VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
+ RETURNING *
+ "#,
+ )
+ .bind(supervisor_task_id)
+ .bind(contract_id)
+ .bind(state)
+ .bind(phase)
+ .bind(current_activity)
+ .bind(progress)
+ .bind(pending_task_ids)
+ .fetch_one(pool)
+ .await
+}
+
+/// Get the latest heartbeat for a supervisor task.
+pub async fn get_latest_supervisor_heartbeat(
+ pool: &PgPool,
+ supervisor_task_id: Uuid,
+) -> Result<Option<SupervisorHeartbeatRecord>, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorHeartbeatRecord>(
+ r#"
+ SELECT * FROM supervisor_heartbeats
+ WHERE supervisor_task_id = $1
+ ORDER BY timestamp DESC
+ LIMIT 1
+ "#,
+ )
+ .bind(supervisor_task_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Get recent heartbeats for a supervisor task.
+pub async fn get_supervisor_heartbeats(
+ pool: &PgPool,
+ supervisor_task_id: Uuid,
+ limit: i64,
+) -> Result<Vec<SupervisorHeartbeatRecord>, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorHeartbeatRecord>(
+ r#"
+ SELECT * FROM supervisor_heartbeats
+ WHERE supervisor_task_id = $1
+ ORDER BY timestamp DESC
+ LIMIT $2
+ "#,
+ )
+ .bind(supervisor_task_id)
+ .bind(limit)
+ .fetch_all(pool)
+ .await
+}
+
+/// Get recent heartbeats for a contract.
+pub async fn get_contract_supervisor_heartbeats(
+ pool: &PgPool,
+ contract_id: Uuid,
+ limit: i64,
+) -> Result<Vec<SupervisorHeartbeatRecord>, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorHeartbeatRecord>(
+ r#"
+ SELECT * FROM supervisor_heartbeats
+ WHERE contract_id = $1
+ ORDER BY timestamp DESC
+ LIMIT $2
+ "#,
+ )
+ .bind(contract_id)
+ .bind(limit)
+ .fetch_all(pool)
+ .await
+}
+
+/// Delete old heartbeats beyond the TTL (24 hours by default).
+/// Returns the number of deleted records.
+pub async fn cleanup_old_heartbeats(
+ pool: &PgPool,
+ ttl_hours: i64,
+) -> Result<u64, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ DELETE FROM supervisor_heartbeats
+ WHERE timestamp < NOW() - ($1 || ' hours')::INTERVAL
+ "#,
+ )
+ .bind(ttl_hours.to_string())
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected())
+}
+
+/// Find supervisors that have not sent a heartbeat within the timeout period.
+/// Returns list of (supervisor_task_id, contract_id, last_heartbeat_timestamp).
+pub async fn find_stale_supervisors(
+ pool: &PgPool,
+ timeout_seconds: i64,
+) -> Result<Vec<(Uuid, Uuid, chrono::DateTime<Utc>)>, sqlx::Error> {
+ let rows = sqlx::query(
+ r#"
+ WITH latest_heartbeats AS (
+ SELECT DISTINCT ON (supervisor_task_id)
+ supervisor_task_id,
+ contract_id,
+ timestamp
+ FROM supervisor_heartbeats
+ ORDER BY supervisor_task_id, timestamp DESC
+ )
+ SELECT
+ lh.supervisor_task_id,
+ lh.contract_id,
+ lh.timestamp
+ FROM latest_heartbeats lh
+ JOIN tasks t ON t.id = lh.supervisor_task_id
+ WHERE t.status = 'running'
+ AND lh.timestamp < NOW() - ($1 || ' seconds')::INTERVAL
+ "#,
+ )
+ .bind(timeout_seconds.to_string())
+ .fetch_all(pool)
+ .await?;
+
+ let mut result = Vec::new();
+ for row in rows {
+ use sqlx::Row;
+ let supervisor_task_id: Uuid = row.get("supervisor_task_id");
+ let contract_id: Uuid = row.get("contract_id");
+ let timestamp: chrono::DateTime<Utc> = row.get("timestamp");
+ result.push((supervisor_task_id, contract_id, timestamp));
+ }
+ Ok(result)
+}
+
// ============================================================================
// Contract Supervisor
// ============================================================================
@@ -4402,3 +4860,138 @@ pub async fn get_notification_count_for_task(
.map_err(RepositoryError::Database)?;
Ok(result.0)
}
+
+// =============================================================================
+// Supervisor Status API Helpers
+// =============================================================================
+
+/// Get supervisor status for a contract.
+/// Returns combined information from supervisor_states and tasks tables.
+pub async fn get_supervisor_status(
+ pool: &PgPool,
+ contract_id: Uuid,
+ owner_id: Uuid,
+) -> Result<Option<SupervisorStatusInfo>, sqlx::Error> {
+ // Query to get supervisor status by joining supervisor_states with tasks
+ sqlx::query_as::<_, SupervisorStatusInfo>(
+ r#"
+ SELECT
+ ss.task_id,
+ COALESCE(t.status, 'unknown') as supervisor_state,
+ ss.phase,
+ t.progress_summary as current_activity,
+ ss.pending_task_ids,
+ ss.last_activity as last_heartbeat,
+ t.status as task_status,
+ t.daemon_id IS NOT NULL as is_running
+ FROM supervisor_states ss
+ JOIN tasks t ON t.id = ss.task_id
+ WHERE ss.contract_id = $1
+ AND t.owner_id = $2
+ "#,
+ )
+ .bind(contract_id)
+ .bind(owner_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Internal struct to hold supervisor status query result
+#[derive(Debug, Clone, sqlx::FromRow)]
+pub struct SupervisorStatusInfo {
+ pub task_id: Uuid,
+ pub supervisor_state: String,
+ pub phase: String,
+ pub current_activity: Option<String>,
+ #[sqlx(try_from = "Vec<Uuid>")]
+ pub pending_task_ids: Vec<Uuid>,
+ pub last_heartbeat: chrono::DateTime<chrono::Utc>,
+ pub task_status: String,
+ pub is_running: bool,
+}
+
+/// Get supervisor activity history from history_events table.
+/// This provides a timeline of supervisor activities that can serve as "heartbeats".
+pub async fn get_supervisor_activity_history(
+ pool: &PgPool,
+ contract_id: Uuid,
+ limit: i32,
+ offset: i32,
+) -> Result<Vec<SupervisorActivityEntry>, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorActivityEntry>(
+ r#"
+ SELECT
+ created_at as timestamp,
+ COALESCE(event_subtype, 'activity') as state,
+ event_data->>'activity' as activity,
+ (event_data->>'progress')::INTEGER as progress,
+ COALESCE(phase, 'unknown') as phase,
+ CASE
+ WHEN event_data->'pending_task_ids' IS NOT NULL
+ THEN ARRAY(SELECT jsonb_array_elements_text(event_data->'pending_task_ids'))::UUID[]
+ ELSE ARRAY[]::UUID[]
+ END as pending_task_ids
+ FROM history_events
+ WHERE contract_id = $1
+ AND event_type IN ('supervisor', 'phase', 'task')
+ ORDER BY created_at DESC
+ LIMIT $2 OFFSET $3
+ "#,
+ )
+ .bind(contract_id)
+ .bind(limit)
+ .bind(offset)
+ .fetch_all(pool)
+ .await
+}
+
+/// Internal struct to hold supervisor activity entry
+#[derive(Debug, Clone, sqlx::FromRow)]
+pub struct SupervisorActivityEntry {
+ pub timestamp: chrono::DateTime<chrono::Utc>,
+ pub state: String,
+ pub activity: Option<String>,
+ pub progress: Option<i32>,
+ pub phase: String,
+ #[sqlx(try_from = "Vec<Uuid>")]
+ pub pending_task_ids: Vec<Uuid>,
+}
+
+/// Count total supervisor activity history entries for a contract.
+pub async fn count_supervisor_activity_history(
+ pool: &PgPool,
+ contract_id: Uuid,
+) -> Result<i64, sqlx::Error> {
+ let result: (i64,) = sqlx::query_as(
+ r#"
+ SELECT COUNT(*)
+ FROM history_events
+ WHERE contract_id = $1
+ AND event_type IN ('supervisor', 'phase', 'task')
+ "#,
+ )
+ .bind(contract_id)
+ .fetch_one(pool)
+ .await?;
+ Ok(result.0)
+}
+
+/// Update supervisor state last_activity timestamp.
+/// This acts as a "sync" operation to refresh the supervisor's heartbeat.
+pub async fn sync_supervisor_state(
+ pool: &PgPool,
+ contract_id: Uuid,
+) -> Result<Option<SupervisorState>, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorState>(
+ r#"
+ UPDATE supervisor_states
+ SET last_activity = NOW(),
+ updated_at = NOW()
+ WHERE contract_id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(contract_id)
+ .fetch_optional(pool)
+ .await
+}
diff --git a/makima/src/server/handlers/contracts.rs b/makima/src/server/handlers/contracts.rs
index b15667d..01b4610 100644
--- a/makima/src/server/handlers/contracts.rs
+++ b/makima/src/server/handlers/contracts.rs
@@ -1461,6 +1461,22 @@ pub async fn change_phase(
.await
{
Ok(PhaseChangeResult::Success(updated_contract)) => {
+ // Save supervisor state on phase change (Task 3.3)
+ // This is a key save point for restoration
+ let new_phase_for_state = updated_contract.phase.clone();
+ let contract_id_for_state = updated_contract.id;
+ let pool_for_state = pool.clone();
+ tokio::spawn(async move {
+ if let Err(e) = repository::update_supervisor_phase(&pool_for_state, contract_id_for_state, &new_phase_for_state).await {
+ tracing::warn!(
+ contract_id = %contract_id_for_state,
+ new_phase = %new_phase_for_state,
+ error = %e,
+ "Failed to save supervisor state on phase change"
+ );
+ }
+ });
+
// Notify supervisor of phase change
if let Some(supervisor_task_id) = updated_contract.supervisor_task_id {
if let Ok(Some(supervisor)) = repository::get_task_for_owner(pool, supervisor_task_id, auth.owner_id).await {
@@ -1873,6 +1889,348 @@ async fn cleanup_contract_worktrees(
}
// =============================================================================
+// Supervisor Status API
+// =============================================================================
+
+/// Query parameters for supervisor heartbeat history
+#[derive(Debug, Deserialize)]
+pub struct HeartbeatHistoryQuery {
+ /// Maximum number of heartbeats to return (default: 10)
+ pub limit: Option<i32>,
+ /// Offset for pagination (default: 0)
+ pub offset: Option<i32>,
+}
+
+/// Get supervisor status for a contract.
+#[utoipa::path(
+ get,
+ path = "/api/v1/contracts/{id}/supervisor/status",
+ params(
+ ("id" = Uuid, Path, description = "Contract ID")
+ ),
+ responses(
+ (status = 200, description = "Supervisor status", body = crate::db::models::SupervisorStatusResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Contract or supervisor not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Contracts"
+)]
+pub async fn get_supervisor_status(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Verify contract exists and belongs to owner
+ let contract = match repository::get_contract_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(c)) => c,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("CONTRACT_NOT_FOUND", "Contract not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get contract {}: {}", id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Check if contract has a supervisor
+ let supervisor_task_id = match contract.supervisor_task_id {
+ Some(task_id) => task_id,
+ None => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("SUPERVISOR_NOT_FOUND", "No supervisor task for this contract")),
+ )
+ .into_response();
+ }
+ };
+
+ // Get supervisor status from supervisor_states table
+ match repository::get_supervisor_status(pool, id, auth.owner_id).await {
+ Ok(Some(status_info)) => {
+ // Determine if supervisor is actively running
+ let is_running = status_info.is_running && status_info.task_status == "running";
+
+ let response = crate::db::models::SupervisorStatusResponse {
+ task_id: status_info.task_id,
+ state: status_info.supervisor_state,
+ phase: status_info.phase,
+ current_activity: status_info.current_activity,
+ progress: None, // We don't track progress percentage yet
+ last_heartbeat: status_info.last_heartbeat,
+ pending_task_ids: status_info.pending_task_ids,
+ is_running,
+ };
+ Json(response).into_response()
+ }
+ Ok(None) => {
+ // No supervisor state record exists, but supervisor task might exist
+ // Try to get info from the task itself
+ match repository::get_task_for_owner(pool, supervisor_task_id, auth.owner_id).await {
+ Ok(Some(task)) => {
+ let is_running = task.daemon_id.is_some() && task.status == "running";
+ let response = crate::db::models::SupervisorStatusResponse {
+ task_id: task.id,
+ state: task.status.clone(),
+ phase: contract.phase.clone(),
+ current_activity: task.progress_summary.clone(),
+ progress: None,
+ last_heartbeat: task.updated_at,
+ pending_task_ids: Vec::new(),
+ is_running,
+ };
+ Json(response).into_response()
+ }
+ Ok(None) => (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("SUPERVISOR_NOT_FOUND", "Supervisor task not found")),
+ )
+ .into_response(),
+ Err(e) => {
+ tracing::error!("Failed to get supervisor task {}: {}", supervisor_task_id, e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ }
+ }
+ Err(e) => {
+ tracing::error!("Failed to get supervisor status for contract {}: {}", id, e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ }
+}
+
+/// Get supervisor heartbeat history for a contract.
+#[utoipa::path(
+ get,
+ path = "/api/v1/contracts/{id}/supervisor/heartbeats",
+ params(
+ ("id" = Uuid, Path, description = "Contract ID"),
+ ("limit" = Option<i32>, Query, description = "Maximum number of heartbeats to return (default: 10)"),
+ ("offset" = Option<i32>, Query, description = "Offset for pagination (default: 0)")
+ ),
+ responses(
+ (status = 200, description = "Supervisor heartbeat history", body = crate::db::models::SupervisorHeartbeatHistoryResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Contract not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Contracts"
+)]
+pub async fn get_supervisor_heartbeats(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+ axum::extract::Query(query): axum::extract::Query<HeartbeatHistoryQuery>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Verify contract exists and belongs to owner
+ match repository::get_contract_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(_)) => {}
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("CONTRACT_NOT_FOUND", "Contract not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get contract {}: {}", id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ }
+
+ let limit = query.limit.unwrap_or(10).min(100); // Cap at 100
+ let offset = query.offset.unwrap_or(0);
+
+ // Get activity history as heartbeats
+ let activities = match repository::get_supervisor_activity_history(pool, id, limit, offset).await {
+ Ok(activities) => activities,
+ Err(e) => {
+ tracing::error!("Failed to get supervisor heartbeats for contract {}: {}", id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get total count for pagination
+ let total = match repository::count_supervisor_activity_history(pool, id).await {
+ Ok(count) => count,
+ Err(e) => {
+ tracing::warn!("Failed to count supervisor heartbeats: {}", e);
+ activities.len() as i64
+ }
+ };
+
+ // Convert to heartbeat entries
+ let heartbeats: Vec<crate::db::models::SupervisorHeartbeatEntry> = activities
+ .into_iter()
+ .map(|a| crate::db::models::SupervisorHeartbeatEntry {
+ timestamp: a.timestamp,
+ state: a.state,
+ activity: a.activity,
+ progress: a.progress.map(|p| p as u8),
+ phase: a.phase,
+ pending_task_ids: a.pending_task_ids,
+ })
+ .collect();
+
+ Json(crate::db::models::SupervisorHeartbeatHistoryResponse {
+ heartbeats,
+ total,
+ })
+ .into_response()
+}
+
+/// Sync supervisor state (refresh last_activity timestamp).
+#[utoipa::path(
+ post,
+ path = "/api/v1/contracts/{id}/supervisor/sync",
+ params(
+ ("id" = Uuid, Path, description = "Contract ID")
+ ),
+ responses(
+ (status = 200, description = "Supervisor synced", body = crate::db::models::SupervisorSyncResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Contract or supervisor not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Contracts"
+)]
+pub async fn sync_supervisor(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Verify contract exists and belongs to owner
+ let contract = match repository::get_contract_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(c)) => c,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("CONTRACT_NOT_FOUND", "Contract not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get contract {}: {}", id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Check if contract has a supervisor
+ if contract.supervisor_task_id.is_none() {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("SUPERVISOR_NOT_FOUND", "No supervisor task for this contract")),
+ )
+ .into_response();
+ }
+
+ // Sync supervisor state (update last_activity)
+ match repository::sync_supervisor_state(pool, id).await {
+ Ok(Some(_state)) => {
+ // Get task status to determine current state
+ let task_status = if let Some(task_id) = contract.supervisor_task_id {
+ match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
+ Ok(Some(task)) => task.status,
+ _ => "unknown".to_string(),
+ }
+ } else {
+ "unknown".to_string()
+ };
+
+ Json(crate::db::models::SupervisorSyncResponse {
+ synced: true,
+ state: task_status,
+ message: Some("Supervisor state synced successfully".to_string()),
+ })
+ .into_response()
+ }
+ Ok(None) => {
+ // No supervisor state exists, return not found
+ (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("SUPERVISOR_NOT_FOUND", "No supervisor state found for this contract")),
+ )
+ .into_response()
+ }
+ Err(e) => {
+ tracing::error!("Failed to sync supervisor state for contract {}: {}", id, e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ }
+}
+
+// =============================================================================
// Tests
// =============================================================================
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index 1152502..34e2cc3 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -15,6 +15,7 @@ use axum::{
response::{IntoResponse, Response},
};
use base64::Engine;
+use chrono::{DateTime, Utc};
use futures::{SinkExt, StreamExt};
use serde::Deserialize;
use sqlx::Row;
@@ -262,6 +263,27 @@ pub enum DaemonMessage {
#[serde(rename = "activeTasks")]
active_tasks: Vec<Uuid>,
},
+ /// Enhanced supervisor heartbeat with detailed state
+ SupervisorHeartbeat {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ #[serde(rename = "contractId")]
+ contract_id: Uuid,
+ /// Supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted
+ state: String,
+ /// Current contract phase
+ phase: String,
+ /// Description of current activity
+ #[serde(rename = "currentActivity")]
+ current_activity: Option<String>,
+ /// Progress percentage (0-100)
+ progress: u8,
+ /// Task IDs the supervisor is waiting on
+ #[serde(rename = "pendingTaskIds")]
+ pending_task_ids: Vec<Uuid>,
+ /// Timestamp of this heartbeat
+ timestamp: DateTime<Utc>,
+ },
/// Task output streaming (stdout/stderr from Claude Code)
TaskOutput {
#[serde(rename = "taskId")]
@@ -892,6 +914,83 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
});
}
}
+ Ok(DaemonMessage::SupervisorHeartbeat {
+ task_id,
+ contract_id,
+ state: supervisor_state,
+ phase,
+ current_activity,
+ progress,
+ pending_task_ids,
+ timestamp: _,
+ }) => {
+ tracing::debug!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ state = %supervisor_state,
+ phase = %phase,
+ progress = progress,
+ "Supervisor heartbeat received"
+ );
+
+ // Store heartbeat in database and update supervisor state (Task 3.3)
+ if let Some(ref pool) = state.db_pool {
+ let pool = pool.clone();
+ let pending_ids = pending_task_ids.clone();
+ let activity = current_activity.clone();
+ let state_str = supervisor_state.clone();
+ let phase_str = phase.clone();
+ tokio::spawn(async move {
+ // Store the heartbeat record
+ if let Err(e) = repository::create_supervisor_heartbeat(
+ &pool,
+ task_id,
+ contract_id,
+ &state_str,
+ &phase_str,
+ activity.as_deref(),
+ progress as i32,
+ &pending_ids,
+ ).await {
+ tracing::warn!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ error = %e,
+ "Failed to store supervisor heartbeat"
+ );
+ }
+
+ // Update supervisor_states table (lightweight heartbeat state update - Task 3.3)
+ if let Err(e) = repository::update_supervisor_heartbeat_state(
+ &pool,
+ contract_id,
+ &state_str,
+ activity.as_deref(),
+ progress as i32,
+ &pending_ids,
+ ).await {
+ tracing::debug!(
+ contract_id = %contract_id,
+ error = %e,
+ "Failed to update supervisor state from heartbeat (may not exist yet)"
+ );
+ }
+
+ // Also update the daemon heartbeat
+ if let Ok(Some(task)) = repository::get_task(&pool, task_id).await {
+ if let Some(daemon_id) = task.daemon_id {
+ if let Err(e) = repository::update_daemon_heartbeat(&pool, daemon_id).await {
+ tracing::warn!(
+ daemon_id = %daemon_id,
+ error = %e,
+ "Failed to update daemon heartbeat from supervisor"
+ );
+ }
+ }
+ }
+ });
+ }
+ }
Ok(DaemonMessage::TaskOutput { task_id, output, is_partial }) => {
// Parse the output line and broadcast structured data
if let Some(notification) = parse_claude_output(task_id, owner_id, &output, is_partial) {
@@ -952,55 +1051,117 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
updated_by: "daemon".into(),
});
- // Initialize supervisor_state when supervisor task starts running
+ // Initialize or restore supervisor_state when supervisor task starts running (Task 3.4)
if updated_task.is_supervisor && new_status_owned == "running" {
if let Some(contract_id) = updated_task.contract_id {
- // Get contract to get its phase
- match repository::get_contract_for_owner(
- &pool,
- contract_id,
- updated_task.owner_id,
- ).await {
- Ok(Some(contract)) => {
- match repository::upsert_supervisor_state(
+ // Check if supervisor state already exists (restoration scenario)
+ match repository::get_supervisor_state(&pool, contract_id).await {
+ Ok(Some(existing_state)) => {
+ // State exists - this is a restoration
+ tracing::info!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ existing_state = %existing_state.state,
+ restoration_count = existing_state.restoration_count,
+ "Supervisor starting with existing state - restoration in progress"
+ );
+
+ // Mark as restored (increments restoration_count)
+ match repository::mark_supervisor_restored(
&pool,
contract_id,
- task_id,
- serde_json::json!([]), // Empty conversation
- &[], // No pending tasks
- &contract.phase,
+ "daemon_restart",
).await {
- Ok(_) => {
+ Ok(restored_state) => {
tracing::info!(
task_id = %task_id,
contract_id = %contract_id,
- phase = %contract.phase,
- "Initialized supervisor state for running supervisor"
+ restoration_count = restored_state.restoration_count,
+ "Supervisor restoration marked"
);
+
+ // Check for pending questions to re-deliver
+ if let Ok(questions) = serde_json::from_value::<Vec<crate::db::models::PendingQuestion>>(
+ restored_state.pending_questions.clone()
+ ) {
+ if !questions.is_empty() {
+ tracing::info!(
+ contract_id = %contract_id,
+ question_count = questions.len(),
+ "Pending questions found for re-delivery"
+ );
+ // Questions will be re-delivered by the supervisor when it restores
+ }
+ }
}
Err(e) => {
tracing::warn!(
task_id = %task_id,
contract_id = %contract_id,
error = %e,
- "Failed to initialize supervisor state"
+ "Failed to mark supervisor as restored"
);
}
}
}
Ok(None) => {
- tracing::warn!(
- task_id = %task_id,
- contract_id = %contract_id,
- "Contract not found when initializing supervisor state"
- );
+ // No existing state - fresh start
+ // Get contract to get its phase
+ match repository::get_contract_for_owner(
+ &pool,
+ contract_id,
+ updated_task.owner_id,
+ ).await {
+ Ok(Some(contract)) => {
+ match repository::upsert_supervisor_state(
+ &pool,
+ contract_id,
+ task_id,
+ serde_json::json!([]), // Empty conversation
+ &[], // No pending tasks
+ &contract.phase,
+ ).await {
+ Ok(_) => {
+ tracing::info!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ phase = %contract.phase,
+ "Initialized fresh supervisor state"
+ );
+ }
+ Err(e) => {
+ tracing::warn!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ error = %e,
+ "Failed to initialize supervisor state"
+ );
+ }
+ }
+ }
+ Ok(None) => {
+ tracing::warn!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ "Contract not found when initializing supervisor state"
+ );
+ }
+ Err(e) => {
+ tracing::warn!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ error = %e,
+ "Failed to get contract for supervisor state"
+ );
+ }
+ }
}
Err(e) => {
tracing::warn!(
task_id = %task_id,
contract_id = %contract_id,
error = %e,
- "Failed to get contract for supervisor state"
+ "Failed to check existing supervisor state"
);
}
}
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index 3411ec0..a29b666 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -14,8 +14,9 @@ use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use uuid::Uuid;
-use crate::db::models::{CreateTaskRequest, Task, TaskSummary, UpdateTaskRequest};
+use crate::db::models::{CreateTaskRequest, PendingQuestion, Task, TaskSummary, UpdateTaskRequest};
use crate::db::repository;
+use sqlx::PgPool;
use crate::server::auth::Authenticated;
use crate::server::handlers::mesh::{extract_auth, AuthSource};
use crate::server::messages::ApiError;
@@ -748,6 +749,9 @@ pub async fn spawn_task(
} else {
tracing::info!(task_id = %updated_task.id, daemon_id = %daemon.id, repo_url = ?repo_url, "Task spawn command sent");
+ // Save state: task spawn is a key save point (Task 3.3)
+ save_state_on_task_spawn(pool, request.contract_id, updated_task.id).await;
+
// Broadcast task status update notification to WebSocket subscribers
state.broadcast_task_update(TaskUpdateNotification {
task_id: updated_task.id,
@@ -1770,6 +1774,17 @@ pub async fn ask_question(
request.question_type.clone(),
);
+ // Save state: question asked is a key save point (Task 3.3)
+ let pending_question = PendingQuestion {
+ id: question_id,
+ question: request.question.clone(),
+ choices: request.choices.clone(),
+ context: request.context.clone(),
+ question_type: request.question_type.clone(),
+ asked_at: chrono::Utc::now(),
+ };
+ save_state_on_question_asked(pool, contract_id, pending_question).await;
+
// Broadcast question as task output entry for the task's chat
let question_data = serde_json::json!({
"question_id": question_id.to_string(),
@@ -1864,6 +1879,9 @@ pub async fn ask_question(
// Clean up the response
state.cleanup_question_response(question_id);
+ // Clear pending question from supervisor state (Task 3.3)
+ clear_pending_question(pool, contract_id, question_id).await;
+
return (
StatusCode::OK,
Json(AskQuestionResponse {
@@ -1879,6 +1897,9 @@ pub async fn ask_question(
// Remove the pending question on timeout
state.remove_pending_question(question_id);
+ // Clear pending question from supervisor state on timeout (Task 3.3)
+ clear_pending_question(pool, contract_id, question_id).await;
+
return (
StatusCode::REQUEST_TIMEOUT,
Json(AskQuestionResponse {
@@ -2031,6 +2052,8 @@ pub struct ResumeSupervisorResponse {
pub daemon_id: Option<Uuid>,
pub resumed_from: ResumedFromInfo,
pub status: String,
+ /// Restoration context (Task 3.4)
+ pub restoration: Option<RestorationInfo>,
}
#[derive(Debug, Serialize, ToSchema)]
@@ -2041,6 +2064,24 @@ pub struct ResumedFromInfo {
pub message_count: i32,
}
+/// Information about supervisor restoration (Task 3.4)
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct RestorationInfo {
+ /// Previous state before restoration
+ pub previous_state: String,
+ /// How many times this supervisor has been restored
+ pub restoration_count: i32,
+ /// Number of pending questions to re-deliver
+ pub pending_questions_count: usize,
+ /// Number of tasks being waited on
+ pub waiting_tasks_count: usize,
+ /// Number of tasks spawned before crash
+ pub spawned_tasks_count: usize,
+ /// Any warnings from state validation
+ pub warnings: Vec<String>,
+}
+
/// Resume interrupted supervisor with specified mode.
///
/// POST /api/v1/contracts/{id}/supervisor/resume
@@ -2350,6 +2391,31 @@ pub async fn resume_supervisor(
"Supervisor resume requested"
);
+ // Build restoration info (Task 3.4)
+ let pending_questions: Vec<PendingQuestion> = serde_json::from_value(
+ supervisor_state.pending_questions.clone()
+ ).unwrap_or_default();
+
+ let restoration_info = RestorationInfo {
+ previous_state: supervisor_state.state.clone(),
+ restoration_count: supervisor_state.restoration_count,
+ pending_questions_count: pending_questions.len(),
+ waiting_tasks_count: supervisor_state.pending_task_ids.len(),
+ spawned_tasks_count: supervisor_state.spawned_task_ids.len(),
+ warnings: vec![], // Could add validation warnings here
+ };
+
+ // Re-deliver pending questions if any (Task 3.4)
+ if !pending_questions.is_empty() {
+ redeliver_pending_questions(
+ &state,
+ supervisor_state.task_id,
+ contract_id,
+ auth_info.owner_id,
+ &pending_questions,
+ ).await;
+ }
+
Json(ResumeSupervisorResponse {
supervisor_task_id: supervisor_state.task_id,
daemon_id: response_daemon_id,
@@ -2359,6 +2425,7 @@ pub async fn resume_supervisor(
message_count,
},
status: response_status,
+ restoration: Some(restoration_info),
})
.into_response()
}
@@ -2748,3 +2815,412 @@ pub async fn spawn_red_team_task(
// It will remain pending and can be started later
Ok(task)
}
+
+// =============================================================================
+// Supervisor State Persistence Helpers (Task 3.3)
+// =============================================================================
+
+use crate::db::models::{
+ SupervisorRestorationContext, SupervisorStateEnum,
+ StateValidationResult, StateRecoveryAction,
+};
+
+/// Save supervisor state on task spawn.
+/// This is called when a supervisor spawns a new task.
+pub async fn save_state_on_task_spawn(
+ pool: &PgPool,
+ contract_id: Uuid,
+ spawned_task_id: Uuid,
+) {
+ if let Err(e) = repository::add_supervisor_spawned_task(pool, contract_id, spawned_task_id).await {
+ tracing::warn!(
+ contract_id = %contract_id,
+ spawned_task_id = %spawned_task_id,
+ error = %e,
+ "Failed to save spawned task to supervisor state"
+ );
+ } else {
+ tracing::debug!(
+ contract_id = %contract_id,
+ spawned_task_id = %spawned_task_id,
+ "Saved spawned task to supervisor state"
+ );
+ }
+
+ // Also update state to working
+ if let Err(e) = repository::update_supervisor_detailed_state(
+ pool,
+ contract_id,
+ "working",
+ Some(&format!("Spawned task {}", spawned_task_id)),
+ 0, // Progress resets when spawning new work
+ None,
+ ).await {
+ tracing::warn!(contract_id = %contract_id, error = %e, "Failed to update supervisor state on task spawn");
+ }
+}
+
+/// Save supervisor state on question asked.
+/// This is called when a supervisor asks a question and is waiting for user input.
+pub async fn save_state_on_question_asked(
+ pool: &PgPool,
+ contract_id: Uuid,
+ question: PendingQuestion,
+) {
+ let question_json = match serde_json::to_value(&[&question]) {
+ Ok(v) => v,
+ Err(e) => {
+ tracing::warn!(contract_id = %contract_id, error = %e, "Failed to serialize pending question");
+ return;
+ }
+ };
+
+ if let Err(e) = repository::add_supervisor_pending_question(pool, contract_id, question_json).await {
+ tracing::warn!(
+ contract_id = %contract_id,
+ question_id = %question.id,
+ error = %e,
+ "Failed to save pending question to supervisor state"
+ );
+ } else {
+ tracing::debug!(
+ contract_id = %contract_id,
+ question_id = %question.id,
+ "Saved pending question to supervisor state"
+ );
+ }
+}
+
+/// Clear pending question after it's answered.
+pub async fn clear_pending_question(
+ pool: &PgPool,
+ contract_id: Uuid,
+ question_id: Uuid,
+) {
+ if let Err(e) = repository::remove_supervisor_pending_question(pool, contract_id, question_id).await {
+ tracing::warn!(
+ contract_id = %contract_id,
+ question_id = %question_id,
+ error = %e,
+ "Failed to remove pending question from supervisor state"
+ );
+ }
+
+ // Update state back to working (if no more pending questions)
+ match repository::get_supervisor_state(pool, contract_id).await {
+ Ok(Some(state)) => {
+ let questions: Vec<PendingQuestion> = serde_json::from_value(state.pending_questions.clone())
+ .unwrap_or_default();
+ if questions.is_empty() {
+ let _ = repository::update_supervisor_detailed_state(
+ pool,
+ contract_id,
+ "working",
+ Some("Resumed after user response"),
+ state.progress,
+ None,
+ ).await;
+ }
+ }
+ Ok(None) => {}
+ Err(e) => {
+ tracing::warn!(contract_id = %contract_id, error = %e, "Failed to check supervisor state after clearing question");
+ }
+ }
+}
+
+/// Save supervisor state on phase change.
+pub async fn save_state_on_phase_change(
+ pool: &PgPool,
+ contract_id: Uuid,
+ new_phase: &str,
+) {
+ if let Err(e) = repository::update_supervisor_phase(pool, contract_id, new_phase).await {
+ tracing::warn!(
+ contract_id = %contract_id,
+ new_phase = %new_phase,
+ error = %e,
+ "Failed to update supervisor state on phase change"
+ );
+ } else {
+ tracing::info!(
+ contract_id = %contract_id,
+ new_phase = %new_phase,
+ "Updated supervisor state on phase change"
+ );
+ }
+}
+
+// =============================================================================
+// Supervisor Restoration Protocol (Task 3.4)
+// =============================================================================
+
+/// Validate supervisor state consistency before restoration.
+/// Checks that spawned tasks and pending questions are in expected states.
+pub async fn validate_supervisor_state(
+ pool: &PgPool,
+ state: &crate::db::models::SupervisorState,
+) -> StateValidationResult {
+ let mut issues = Vec::new();
+
+ // Validate spawned tasks
+ if !state.spawned_task_ids.is_empty() {
+ match repository::validate_spawned_tasks(pool, &state.spawned_task_ids).await {
+ Ok(task_statuses) => {
+ for task_id in &state.spawned_task_ids {
+ if !task_statuses.contains_key(task_id) {
+ issues.push(format!("Spawned task {} not found in database", task_id));
+ }
+ }
+ }
+ Err(e) => {
+ issues.push(format!("Failed to validate spawned tasks: {}", e));
+ }
+ }
+ }
+
+ // Validate pending questions
+ let pending_questions: Vec<PendingQuestion> = serde_json::from_value(state.pending_questions.clone())
+ .unwrap_or_default();
+
+ // Check if questions are not too old (e.g., more than 24 hours)
+ for question in &pending_questions {
+ let age = chrono::Utc::now() - question.asked_at;
+ if age.num_hours() > 24 {
+ issues.push(format!(
+ "Pending question {} is {} hours old, may be stale",
+ question.id, age.num_hours()
+ ));
+ }
+ }
+
+ // Validate conversation history
+ if let Some(history) = state.conversation_history.as_array() {
+ if history.is_empty() && state.restoration_count > 0 {
+ issues.push("Conversation history is empty after previous restoration".to_string());
+ }
+ }
+
+ // Determine recovery action
+ let recovery_action = if issues.is_empty() {
+ StateRecoveryAction::Proceed
+ } else if issues.iter().any(|i| i.contains("not found")) {
+ // Missing tasks suggest corruption - use checkpoint
+ StateRecoveryAction::UseCheckpoint
+ } else if issues.len() > 3 {
+ // Many issues suggest manual intervention needed
+ StateRecoveryAction::ManualIntervention
+ } else {
+ // Minor issues - proceed with warnings
+ StateRecoveryAction::Proceed
+ };
+
+ StateValidationResult {
+ is_valid: issues.is_empty(),
+ issues,
+ recovery_action,
+ }
+}
+
+/// Restore supervisor from saved state after daemon crash or task reassignment.
+/// Returns restoration context to send to the supervisor.
+pub async fn restore_supervisor(
+ pool: &PgPool,
+ contract_id: Uuid,
+ restoration_source: &str,
+) -> Result<SupervisorRestorationContext, String> {
+ // Step 1: Load supervisor state
+ let state = match repository::get_supervisor_state_for_restoration(pool, contract_id).await {
+ Ok(Some(s)) => s,
+ Ok(None) => {
+ tracing::warn!(
+ contract_id = %contract_id,
+ "No supervisor state found for restoration - starting fresh"
+ );
+ return Ok(SupervisorRestorationContext {
+ success: true,
+ previous_state: SupervisorStateEnum::Initializing,
+ conversation_history: serde_json::json!([]),
+ pending_questions: vec![],
+ waiting_task_ids: vec![],
+ spawned_task_ids: vec![],
+ restoration_count: 0,
+ restoration_context_message: "No previous state found. Starting fresh.".to_string(),
+ warnings: vec!["No previous supervisor state found".to_string()],
+ });
+ }
+ Err(e) => {
+ return Err(format!("Failed to load supervisor state: {}", e));
+ }
+ };
+
+ // Step 2: Parse previous state
+ let previous_state: SupervisorStateEnum = state.state.parse().unwrap_or(SupervisorStateEnum::Interrupted);
+
+ // Step 3: Validate state consistency
+ let validation = validate_supervisor_state(pool, &state).await;
+ let mut warnings = validation.issues.clone();
+
+ // Step 4: Handle based on validation result
+ let (conversation_history, pending_questions, restoration_message) = match validation.recovery_action {
+ StateRecoveryAction::Proceed => {
+ // State is valid, use it
+ let questions: Vec<PendingQuestion> = serde_json::from_value(state.pending_questions.clone())
+ .unwrap_or_default();
+
+ let message = format!(
+ "Restored from {} state. {} pending questions, {} spawned tasks, {} waiting tasks.",
+ state.state,
+ questions.len(),
+ state.spawned_task_ids.len(),
+ state.pending_task_ids.len()
+ );
+
+ (state.conversation_history.clone(), questions, message)
+ }
+ StateRecoveryAction::UseCheckpoint => {
+ // State is corrupted, try to use checkpoint
+ warnings.push("State validation failed, attempting checkpoint recovery".to_string());
+
+ // TODO: Implement checkpoint-based recovery
+ // For now, start with empty questions but preserve conversation
+ let message = "Restored from last checkpoint due to state inconsistency.".to_string();
+ (state.conversation_history.clone(), vec![], message)
+ }
+ StateRecoveryAction::StartFresh => {
+ warnings.push("Starting fresh due to unrecoverable state".to_string());
+ let message = "Starting fresh due to unrecoverable state corruption.".to_string();
+ (serde_json::json!([]), vec![], message)
+ }
+ StateRecoveryAction::ManualIntervention => {
+ warnings.push("Manual intervention may be required".to_string());
+ // Still try to restore but with warning
+ let questions: Vec<PendingQuestion> = serde_json::from_value(state.pending_questions.clone())
+ .unwrap_or_default();
+ let message = "Restored with warnings - manual intervention may be required.".to_string();
+ (state.conversation_history.clone(), questions, message)
+ }
+ };
+
+ // Step 5: Mark supervisor as restored
+ let new_state = match repository::mark_supervisor_restored(pool, contract_id, restoration_source).await {
+ Ok(s) => s,
+ Err(e) => {
+ return Err(format!("Failed to mark supervisor as restored: {}", e));
+ }
+ };
+
+ // Step 6: Build restoration context
+ let context = SupervisorRestorationContext {
+ success: true,
+ previous_state,
+ conversation_history,
+ pending_questions,
+ waiting_task_ids: state.pending_task_ids.clone(),
+ spawned_task_ids: state.spawned_task_ids.clone(),
+ restoration_count: new_state.restoration_count,
+ restoration_context_message: restoration_message,
+ warnings,
+ };
+
+ tracing::info!(
+ contract_id = %contract_id,
+ restoration_source = %restoration_source,
+ restoration_count = new_state.restoration_count,
+ pending_questions_count = context.pending_questions.len(),
+ waiting_tasks_count = context.waiting_task_ids.len(),
+ spawned_tasks_count = context.spawned_task_ids.len(),
+ "Supervisor restoration completed"
+ );
+
+ Ok(context)
+}
+
+/// Re-deliver pending questions to the user after restoration.
+/// This ensures questions asked before crash are shown to the user again.
+pub async fn redeliver_pending_questions(
+ state: &SharedState,
+ supervisor_id: Uuid,
+ contract_id: Uuid,
+ owner_id: Uuid,
+ questions: &[PendingQuestion],
+) {
+ for question in questions {
+ // Add to in-memory question state
+ state.add_supervisor_question(
+ supervisor_id,
+ contract_id,
+ owner_id,
+ question.question.clone(),
+ question.choices.clone(),
+ question.context.clone(),
+ false, // Assume single select for restored questions
+ question.question_type.clone(),
+ );
+
+ // Broadcast to WebSocket clients
+ let question_data = serde_json::json!({
+ "question_id": question.id.to_string(),
+ "choices": question.choices,
+ "context": question.context,
+ "question_type": question.question_type,
+ "is_restored": true,
+ "originally_asked_at": question.asked_at.to_rfc3339(),
+ });
+
+ state.broadcast_task_output(TaskOutputNotification {
+ task_id: supervisor_id,
+ owner_id: Some(owner_id),
+ message_type: "supervisor_question".to_string(),
+ content: question.question.clone(),
+ tool_name: None,
+ tool_input: Some(question_data),
+ is_error: None,
+ cost_usd: None,
+ duration_ms: None,
+ is_partial: false,
+ });
+
+ tracing::info!(
+ supervisor_id = %supervisor_id,
+ question_id = %question.id,
+ "Re-delivered pending question after restoration"
+ );
+ }
+}
+
+/// Generate restoration context message for Claude.
+/// This message is injected into the conversation to inform Claude about the restoration.
+pub fn generate_restoration_context_message(context: &SupervisorRestorationContext) -> String {
+ let mut message = String::new();
+
+ message.push_str("=== SUPERVISOR RESTORATION NOTICE ===\n\n");
+ message.push_str(&format!("This supervisor has been restored after interruption. {}\n\n", context.restoration_context_message));
+ message.push_str(&format!("Restoration count: {}\n", context.restoration_count));
+
+ if !context.pending_questions.is_empty() {
+ message.push_str(&format!("\nPending questions ({}): These have been re-delivered to the user.\n", context.pending_questions.len()));
+ for q in &context.pending_questions {
+ message.push_str(&format!(" - {}: {}\n", q.id, q.question));
+ }
+ }
+
+ if !context.waiting_task_ids.is_empty() {
+ message.push_str(&format!("\nWaiting on {} task(s) to complete. Check their status before continuing.\n", context.waiting_task_ids.len()));
+ }
+
+ if !context.spawned_task_ids.is_empty() {
+ message.push_str(&format!("\n{} task(s) were spawned before interruption. Their status may need verification.\n", context.spawned_task_ids.len()));
+ }
+
+ if !context.warnings.is_empty() {
+ message.push_str("\nWarnings:\n");
+ for warning in &context.warnings {
+ message.push_str(&format!(" - {}\n", warning));
+ }
+ }
+
+ message.push_str("\n=== END RESTORATION NOTICE ===\n");
+
+ message
+}
diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs
index 8456006..e5415ae 100644
--- a/makima/src/server/mod.rs
+++ b/makima/src/server/mod.rs
@@ -175,6 +175,10 @@ pub fn make_router(state: SharedState) -> Router {
// Contract supervisor resume endpoints
.route("/contracts/{id}/supervisor/resume", post(mesh_supervisor::resume_supervisor))
.route("/contracts/{id}/supervisor/conversation/rewind", post(mesh_supervisor::rewind_conversation))
+ // Contract supervisor status endpoints
+ .route("/contracts/{id}/supervisor/status", get(contracts::get_supervisor_status))
+ .route("/contracts/{id}/supervisor/heartbeats", get(contracts::get_supervisor_heartbeats))
+ .route("/contracts/{id}/supervisor/sync", post(contracts::sync_supervisor))
// History endpoints
.route("/contracts/{id}/history", get(history::get_contract_history))
.route("/contracts/{id}/supervisor/conversation", get(history::get_supervisor_conversation))