summaryrefslogtreecommitdiff
path: root/makima/src
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src')
-rw-r--r--makima/src/daemon/ws/protocol.rs46
-rw-r--r--makima/src/db/models.rs201
-rw-r--r--makima/src/db/repository.rs160
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs83
4 files changed, 487 insertions, 3 deletions
diff --git a/makima/src/daemon/ws/protocol.rs b/makima/src/daemon/ws/protocol.rs
index bfe6326..5c88038 100644
--- a/makima/src/daemon/ws/protocol.rs
+++ b/makima/src/daemon/ws/protocol.rs
@@ -2,6 +2,7 @@
//!
//! These types mirror the server's protocol exactly for compatibility.
+use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
@@ -26,6 +27,29 @@ pub enum DaemonMessage {
active_tasks: Vec<Uuid>,
},
+ /// Enhanced supervisor heartbeat with detailed state.
+ /// Sent periodically by supervisor tasks to report their current state.
+ SupervisorHeartbeat {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ #[serde(rename = "contractId")]
+ contract_id: Uuid,
+ /// Supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted
+ state: String,
+ /// Current contract phase
+ phase: String,
+ /// Description of current activity
+ #[serde(rename = "currentActivity")]
+ current_activity: Option<String>,
+ /// Progress percentage (0-100)
+ progress: u8,
+ /// Task IDs the supervisor is waiting on
+ #[serde(rename = "pendingTaskIds")]
+ pending_task_ids: Vec<Uuid>,
+ /// Timestamp of this heartbeat
+ timestamp: DateTime<Utc>,
+ },
+
/// Task output streaming (stdout/stderr from Claude Code).
TaskOutput {
#[serde(rename = "taskId")]
@@ -857,6 +881,28 @@ impl DaemonMessage {
pub fn revoke_tool_key(task_id: Uuid) -> Self {
Self::RevokeToolKey { task_id }
}
+
+ /// Create a supervisor heartbeat message.
+ pub fn supervisor_heartbeat(
+ task_id: Uuid,
+ contract_id: Uuid,
+ state: &str,
+ phase: &str,
+ current_activity: Option<String>,
+ progress: u8,
+ pending_task_ids: Vec<Uuid>,
+ ) -> Self {
+ Self::SupervisorHeartbeat {
+ task_id,
+ contract_id,
+ state: state.to_string(),
+ phase: phase.to_string(),
+ current_activity,
+ progress,
+ pending_task_ids,
+ timestamp: Utc::now(),
+ }
+ }
}
#[cfg(test)]
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs
index 636d81a..cc30465 100644
--- a/makima/src/db/models.rs
+++ b/makima/src/db/models.rs
@@ -2339,6 +2339,111 @@ pub struct CheckpointPatchInfo {
// Red Team Types
// ============================================================================
+// =============================================================================
+// Supervisor State and Heartbeat Types
+// =============================================================================
+
+/// Supervisor state for contract supervisor tasks.
+/// Captures detailed activity state for monitoring and restoration.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "snake_case")]
+pub enum SupervisorStateEnum {
+ /// Supervisor is starting up
+ Initializing,
+ /// Supervisor is idle, waiting for work
+ Idle,
+ /// Supervisor is actively working
+ Working,
+ /// Supervisor is waiting for user input/confirmation
+ WaitingForUser,
+ /// Supervisor is waiting for spawned tasks to complete
+ WaitingForTasks,
+ /// Supervisor is blocked (external dependency, error, etc.)
+ Blocked,
+ /// Supervisor has completed its contract
+ Completed,
+ /// Supervisor has failed
+ Failed,
+ /// Supervisor was interrupted (daemon crash, etc.)
+ Interrupted,
+}
+
+impl Default for SupervisorStateEnum {
+ fn default() -> Self {
+ SupervisorStateEnum::Initializing
+ }
+}
+
+impl std::fmt::Display for SupervisorStateEnum {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ SupervisorStateEnum::Initializing => write!(f, "initializing"),
+ SupervisorStateEnum::Idle => write!(f, "idle"),
+ SupervisorStateEnum::Working => write!(f, "working"),
+ SupervisorStateEnum::WaitingForUser => write!(f, "waiting_for_user"),
+ SupervisorStateEnum::WaitingForTasks => write!(f, "waiting_for_tasks"),
+ SupervisorStateEnum::Blocked => write!(f, "blocked"),
+ SupervisorStateEnum::Completed => write!(f, "completed"),
+ SupervisorStateEnum::Failed => write!(f, "failed"),
+ SupervisorStateEnum::Interrupted => write!(f, "interrupted"),
+ }
+ }
+}
+
+impl std::str::FromStr for SupervisorStateEnum {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.to_lowercase().as_str() {
+ "initializing" => Ok(SupervisorStateEnum::Initializing),
+ "idle" => Ok(SupervisorStateEnum::Idle),
+ "working" => Ok(SupervisorStateEnum::Working),
+ "waiting_for_user" | "waitingforuser" => Ok(SupervisorStateEnum::WaitingForUser),
+ "waiting_for_tasks" | "waitingfortasks" => Ok(SupervisorStateEnum::WaitingForTasks),
+ "blocked" => Ok(SupervisorStateEnum::Blocked),
+ "completed" => Ok(SupervisorStateEnum::Completed),
+ "failed" => Ok(SupervisorStateEnum::Failed),
+ "interrupted" => Ok(SupervisorStateEnum::Interrupted),
+ _ => Err(format!("Unknown supervisor state: {}", s)),
+ }
+ }
+}
+
+/// Enhanced heartbeat record for supervisor task monitoring.
+/// Stored in the database for historical analysis and dead supervisor detection.
+#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct SupervisorHeartbeatRecord {
+ pub id: Uuid,
+ pub supervisor_task_id: Uuid,
+ pub contract_id: Uuid,
+ pub state: String,
+ pub phase: String,
+ pub current_activity: Option<String>,
+ pub progress: i32,
+ #[sqlx(try_from = "Vec<Uuid>")]
+ pub pending_task_ids: Vec<Uuid>,
+ pub timestamp: DateTime<Utc>,
+}
+
+/// Request payload for sending a supervisor heartbeat.
+#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct SupervisorHeartbeatRequest {
+ pub task_id: Uuid,
+ pub contract_id: Uuid,
+ pub state: SupervisorStateEnum,
+ pub phase: String,
+ pub current_activity: Option<String>,
+ /// Progress percentage (0-100)
+ pub progress: u8,
+ pub pending_task_ids: Vec<Uuid>,
+}
+
+// =============================================================================
+// Red Team Types
+// =============================================================================
+
/// Red Team notification record
#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
@@ -2395,3 +2500,99 @@ impl std::str::FromStr for NotificationSeverity {
}
}
}
+
+// =============================================================================
+// Unit Tests
+// =============================================================================
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use uuid::Uuid;
+
+ #[test]
+ fn test_supervisor_state_enum_display() {
+ assert_eq!(SupervisorStateEnum::Initializing.to_string(), "initializing");
+ assert_eq!(SupervisorStateEnum::Idle.to_string(), "idle");
+ assert_eq!(SupervisorStateEnum::Working.to_string(), "working");
+ assert_eq!(SupervisorStateEnum::WaitingForUser.to_string(), "waiting_for_user");
+ assert_eq!(SupervisorStateEnum::WaitingForTasks.to_string(), "waiting_for_tasks");
+ assert_eq!(SupervisorStateEnum::Blocked.to_string(), "blocked");
+ assert_eq!(SupervisorStateEnum::Completed.to_string(), "completed");
+ assert_eq!(SupervisorStateEnum::Failed.to_string(), "failed");
+ assert_eq!(SupervisorStateEnum::Interrupted.to_string(), "interrupted");
+ }
+
+ #[test]
+ fn test_supervisor_state_enum_from_str() {
+ // Standard lowercase
+ assert_eq!("initializing".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Initializing);
+ assert_eq!("idle".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Idle);
+ assert_eq!("working".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Working);
+ assert_eq!("waiting_for_user".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::WaitingForUser);
+ assert_eq!("waiting_for_tasks".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::WaitingForTasks);
+ assert_eq!("blocked".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Blocked);
+ assert_eq!("completed".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Completed);
+ assert_eq!("failed".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Failed);
+ assert_eq!("interrupted".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Interrupted);
+
+ // Case insensitive
+ assert_eq!("WORKING".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Working);
+ assert_eq!("Working".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Working);
+
+ // Alternative formats
+ assert_eq!("waitingforuser".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::WaitingForUser);
+ assert_eq!("waitingfortasks".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::WaitingForTasks);
+
+ // Invalid state
+ assert!("invalid_state".parse::<SupervisorStateEnum>().is_err());
+ }
+
+ #[test]
+ fn test_supervisor_state_enum_serialization() {
+ // Test JSON serialization
+ let state = SupervisorStateEnum::Working;
+ let json = serde_json::to_string(&state).unwrap();
+ assert_eq!(json, "\"working\"");
+
+ // Test JSON deserialization
+ let deserialized: SupervisorStateEnum = serde_json::from_str("\"working\"").unwrap();
+ assert_eq!(deserialized, SupervisorStateEnum::Working);
+
+ // Test underscore variants
+ let json = "\"waiting_for_user\"";
+ let deserialized: SupervisorStateEnum = serde_json::from_str(json).unwrap();
+ assert_eq!(deserialized, SupervisorStateEnum::WaitingForUser);
+ }
+
+ #[test]
+ fn test_supervisor_state_enum_default() {
+ let default_state = SupervisorStateEnum::default();
+ assert_eq!(default_state, SupervisorStateEnum::Initializing);
+ }
+
+ #[test]
+ fn test_supervisor_heartbeat_request_serialization() {
+ let request = SupervisorHeartbeatRequest {
+ task_id: Uuid::nil(),
+ contract_id: Uuid::nil(),
+ state: SupervisorStateEnum::Working,
+ phase: "execute".to_string(),
+ current_activity: Some("Implementing feature".to_string()),
+ progress: 50,
+ pending_task_ids: vec![Uuid::nil()],
+ };
+
+ let json = serde_json::to_string(&request).unwrap();
+ assert!(json.contains("\"state\":\"working\""));
+ assert!(json.contains("\"phase\":\"execute\""));
+ assert!(json.contains("\"progress\":50"));
+ assert!(json.contains("\"currentActivity\":\"Implementing feature\""));
+
+ // Test deserialization
+ let deserialized: SupervisorHeartbeatRequest = serde_json::from_str(&json).unwrap();
+ assert_eq!(deserialized.state, SupervisorStateEnum::Working);
+ assert_eq!(deserialized.phase, "execute");
+ assert_eq!(deserialized.progress, 50);
+ }
+}
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index b7c5af1..1ac188c 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -12,9 +12,9 @@ use super::models::{
CreateFileRequest, CreateTaskRequest, CreateTemplateRequest, Daemon, DaemonTaskAssignment,
DaemonWithCapacity, DeliverableDefinition, File, FileSummary, FileVersion, HistoryEvent,
HistoryQueryFilters, MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult,
- PhaseConfig, PhaseDefinition, RedTeamNotification, SupervisorState, Task, TaskCheckpoint,
- TaskEvent, TaskSummary, UpdateContractRequest, UpdateFileRequest, UpdateTaskRequest,
- UpdateTemplateRequest,
+ PhaseConfig, PhaseDefinition, RedTeamNotification, SupervisorHeartbeatRecord, SupervisorState,
+ Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest, UpdateFileRequest,
+ UpdateTaskRequest, UpdateTemplateRequest,
};
/// Repository error types.
@@ -3405,6 +3405,160 @@ pub async fn update_supervisor_pending_tasks(
}
// ============================================================================
+// Supervisor Heartbeats
+// ============================================================================
+
+/// Record a supervisor heartbeat.
+/// This creates a historical record for monitoring and dead supervisor detection.
+pub async fn create_supervisor_heartbeat(
+ pool: &PgPool,
+ supervisor_task_id: Uuid,
+ contract_id: Uuid,
+ state: &str,
+ phase: &str,
+ current_activity: Option<&str>,
+ progress: i32,
+ pending_task_ids: &[Uuid],
+) -> Result<SupervisorHeartbeatRecord, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorHeartbeatRecord>(
+ r#"
+ INSERT INTO supervisor_heartbeats (
+ supervisor_task_id, contract_id, state, phase, current_activity, progress, pending_task_ids, timestamp
+ )
+ VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
+ RETURNING *
+ "#,
+ )
+ .bind(supervisor_task_id)
+ .bind(contract_id)
+ .bind(state)
+ .bind(phase)
+ .bind(current_activity)
+ .bind(progress)
+ .bind(pending_task_ids)
+ .fetch_one(pool)
+ .await
+}
+
+/// Get the latest heartbeat for a supervisor task.
+pub async fn get_latest_supervisor_heartbeat(
+ pool: &PgPool,
+ supervisor_task_id: Uuid,
+) -> Result<Option<SupervisorHeartbeatRecord>, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorHeartbeatRecord>(
+ r#"
+ SELECT * FROM supervisor_heartbeats
+ WHERE supervisor_task_id = $1
+ ORDER BY timestamp DESC
+ LIMIT 1
+ "#,
+ )
+ .bind(supervisor_task_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Get recent heartbeats for a supervisor task.
+pub async fn get_supervisor_heartbeats(
+ pool: &PgPool,
+ supervisor_task_id: Uuid,
+ limit: i64,
+) -> Result<Vec<SupervisorHeartbeatRecord>, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorHeartbeatRecord>(
+ r#"
+ SELECT * FROM supervisor_heartbeats
+ WHERE supervisor_task_id = $1
+ ORDER BY timestamp DESC
+ LIMIT $2
+ "#,
+ )
+ .bind(supervisor_task_id)
+ .bind(limit)
+ .fetch_all(pool)
+ .await
+}
+
+/// Get recent heartbeats for a contract.
+pub async fn get_contract_supervisor_heartbeats(
+ pool: &PgPool,
+ contract_id: Uuid,
+ limit: i64,
+) -> Result<Vec<SupervisorHeartbeatRecord>, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorHeartbeatRecord>(
+ r#"
+ SELECT * FROM supervisor_heartbeats
+ WHERE contract_id = $1
+ ORDER BY timestamp DESC
+ LIMIT $2
+ "#,
+ )
+ .bind(contract_id)
+ .bind(limit)
+ .fetch_all(pool)
+ .await
+}
+
+/// Delete old heartbeats beyond the TTL (24 hours by default).
+/// Returns the number of deleted records.
+pub async fn cleanup_old_heartbeats(
+ pool: &PgPool,
+ ttl_hours: i64,
+) -> Result<u64, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ DELETE FROM supervisor_heartbeats
+ WHERE timestamp < NOW() - ($1 || ' hours')::INTERVAL
+ "#,
+ )
+ .bind(ttl_hours.to_string())
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected())
+}
+
+/// Find supervisors that have not sent a heartbeat within the timeout period.
+/// Returns list of (supervisor_task_id, contract_id, last_heartbeat_timestamp).
+pub async fn find_stale_supervisors(
+ pool: &PgPool,
+ timeout_seconds: i64,
+) -> Result<Vec<(Uuid, Uuid, chrono::DateTime<Utc>)>, sqlx::Error> {
+ let rows = sqlx::query(
+ r#"
+ WITH latest_heartbeats AS (
+ SELECT DISTINCT ON (supervisor_task_id)
+ supervisor_task_id,
+ contract_id,
+ timestamp
+ FROM supervisor_heartbeats
+ ORDER BY supervisor_task_id, timestamp DESC
+ )
+ SELECT
+ lh.supervisor_task_id,
+ lh.contract_id,
+ lh.timestamp
+ FROM latest_heartbeats lh
+ JOIN tasks t ON t.id = lh.supervisor_task_id
+ WHERE t.status = 'running'
+ AND lh.timestamp < NOW() - ($1 || ' seconds')::INTERVAL
+ "#,
+ )
+ .bind(timeout_seconds.to_string())
+ .fetch_all(pool)
+ .await?;
+
+ let mut result = Vec::new();
+ for row in rows {
+ use sqlx::Row;
+ let supervisor_task_id: Uuid = row.get("supervisor_task_id");
+ let contract_id: Uuid = row.get("contract_id");
+ let timestamp: chrono::DateTime<Utc> = row.get("timestamp");
+ result.push((supervisor_task_id, contract_id, timestamp));
+ }
+ Ok(result)
+}
+
+// ============================================================================
// Contract Supervisor
// ============================================================================
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index 1152502..887183a 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -15,6 +15,7 @@ use axum::{
response::{IntoResponse, Response},
};
use base64::Engine;
+use chrono::{DateTime, Utc};
use futures::{SinkExt, StreamExt};
use serde::Deserialize;
use sqlx::Row;
@@ -262,6 +263,27 @@ pub enum DaemonMessage {
#[serde(rename = "activeTasks")]
active_tasks: Vec<Uuid>,
},
+ /// Enhanced supervisor heartbeat with detailed state
+ SupervisorHeartbeat {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ #[serde(rename = "contractId")]
+ contract_id: Uuid,
+ /// Supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted
+ state: String,
+ /// Current contract phase
+ phase: String,
+ /// Description of current activity
+ #[serde(rename = "currentActivity")]
+ current_activity: Option<String>,
+ /// Progress percentage (0-100)
+ progress: u8,
+ /// Task IDs the supervisor is waiting on
+ #[serde(rename = "pendingTaskIds")]
+ pending_task_ids: Vec<Uuid>,
+ /// Timestamp of this heartbeat
+ timestamp: DateTime<Utc>,
+ },
/// Task output streaming (stdout/stderr from Claude Code)
TaskOutput {
#[serde(rename = "taskId")]
@@ -892,6 +914,67 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
});
}
}
+ Ok(DaemonMessage::SupervisorHeartbeat {
+ task_id,
+ contract_id,
+ state: supervisor_state,
+ phase,
+ current_activity,
+ progress,
+ pending_task_ids,
+ timestamp: _,
+ }) => {
+ tracing::debug!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ state = %supervisor_state,
+ phase = %phase,
+ progress = progress,
+ "Supervisor heartbeat received"
+ );
+
+ // Store heartbeat in database
+ if let Some(ref pool) = state.db_pool {
+ let pool = pool.clone();
+ let pending_ids = pending_task_ids.clone();
+ let activity = current_activity.clone();
+ let state_str = supervisor_state.clone();
+ let phase_str = phase.clone();
+ tokio::spawn(async move {
+ // Store the heartbeat record
+ if let Err(e) = repository::create_supervisor_heartbeat(
+ &pool,
+ task_id,
+ contract_id,
+ &state_str,
+ &phase_str,
+ activity.as_deref(),
+ progress as i32,
+ &pending_ids,
+ ).await {
+ tracing::warn!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ error = %e,
+ "Failed to store supervisor heartbeat"
+ );
+ }
+
+ // Also update the daemon heartbeat
+ if let Ok(Some(task)) = repository::get_task(&pool, task_id).await {
+ if let Some(daemon_id) = task.daemon_id {
+ if let Err(e) = repository::update_daemon_heartbeat(&pool, daemon_id).await {
+ tracing::warn!(
+ daemon_id = %daemon_id,
+ error = %e,
+ "Failed to update daemon heartbeat from supervisor"
+ );
+ }
+ }
+ }
+ });
+ }
+ }
Ok(DaemonMessage::TaskOutput { task_id, output, is_partial }) => {
// Parse the output line and broadcast structured data
if let Some(notification) = parse_claude_output(task_id, owner_id, &output, is_partial) {