diff options
| author | soryu <soryu@soryu.co> | 2026-02-01 00:20:55 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-02-01 00:25:43 +0000 |
| commit | bb14010db99b40792372bfcb4348cf4984f30b3f (patch) | |
| tree | d5c12af5ce8e87430daad3f80a979157233e8644 /makima | |
| parent | 7567153e6281b94e39e52be5d060b381ed69597d (diff) | |
| download | soryu-bb14010db99b40792372bfcb4348cf4984f30b3f.tar.gz soryu-bb14010db99b40792372bfcb4348cf4984f30b3f.zip | |
feat: Implement Phase 3 Tasks 3.1 and 3.2 - SupervisorState enum and Heartbeat Infrastructure
Task 3.1: Enhanced Supervisor State Enum
- Add SupervisorStateEnum with states: Initializing, Idle, Working, WaitingForUser,
WaitingForTasks, Blocked, Completed, Failed, Interrupted
- Implement Display, FromStr, Default, and serde serialization
- Add SupervisorHeartbeatRecord and SupervisorHeartbeatRequest structs
Task 3.2: Heartbeat Infrastructure
- Create supervisor_heartbeats migration with proper indexes and constraints
- Add heartbeat storage functions to repository.rs:
- create_supervisor_heartbeat
- get_latest_supervisor_heartbeat
- get_supervisor_heartbeats
- get_contract_supervisor_heartbeats
- cleanup_old_heartbeats (24 hour TTL support)
- find_stale_supervisors (for dead supervisor detection)
- Add SupervisorHeartbeat message to protocol.rs with enhanced fields
- Update mesh_daemon.rs to process and store supervisor heartbeats
- Add unit tests for SupervisorStateEnum and heartbeat serialization
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Diffstat (limited to 'makima')
| -rw-r--r-- | makima/migrations/20260201000000_supervisor_heartbeats.sql | 36 | ||||
| -rw-r--r-- | makima/src/daemon/ws/protocol.rs | 46 | ||||
| -rw-r--r-- | makima/src/db/models.rs | 201 | ||||
| -rw-r--r-- | makima/src/db/repository.rs | 160 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 83 |
5 files changed, 523 insertions, 3 deletions
diff --git a/makima/migrations/20260201000000_supervisor_heartbeats.sql b/makima/migrations/20260201000000_supervisor_heartbeats.sql new file mode 100644 index 0000000..8595f71 --- /dev/null +++ b/makima/migrations/20260201000000_supervisor_heartbeats.sql @@ -0,0 +1,36 @@ +-- Create supervisor_heartbeats table for tracking supervisor state over time. +-- This enables detection of dead/stale supervisors and provides audit trail. + +CREATE TABLE IF NOT EXISTS supervisor_heartbeats ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + supervisor_task_id UUID NOT NULL REFERENCES tasks(id) ON DELETE CASCADE, + contract_id UUID NOT NULL REFERENCES contracts(id) ON DELETE CASCADE, + state VARCHAR(50) NOT NULL, + phase VARCHAR(50) NOT NULL, + current_activity TEXT, + progress INTEGER DEFAULT 0 CHECK (progress >= 0 AND progress <= 100), + pending_task_ids UUID[] DEFAULT ARRAY[]::UUID[], + timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- Index for finding heartbeats by supervisor task +CREATE INDEX idx_heartbeats_supervisor ON supervisor_heartbeats(supervisor_task_id); + +-- Index for finding heartbeats by timestamp (for cleanup and monitoring) +CREATE INDEX idx_heartbeats_timestamp ON supervisor_heartbeats(timestamp); + +-- Index for finding heartbeats by contract +CREATE INDEX idx_heartbeats_contract ON supervisor_heartbeats(contract_id); + +-- Composite index for finding latest heartbeat per supervisor +CREATE INDEX idx_heartbeats_supervisor_timestamp ON supervisor_heartbeats(supervisor_task_id, timestamp DESC); + +COMMENT ON TABLE supervisor_heartbeats IS 'Historical record of supervisor heartbeats for monitoring and dead supervisor detection'; +COMMENT ON COLUMN supervisor_heartbeats.state IS 'Supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted'; +COMMENT ON COLUMN supervisor_heartbeats.phase IS 'Current contract phase when heartbeat was sent'; +COMMENT ON COLUMN supervisor_heartbeats.current_activity IS 'Human-readable description of what the supervisor is doing'; +COMMENT ON COLUMN supervisor_heartbeats.progress IS 'Progress percentage (0-100)'; +COMMENT ON COLUMN supervisor_heartbeats.pending_task_ids IS 'Array of task IDs the supervisor is waiting on'; + +-- Note: Cleanup of old heartbeats (24 hour TTL) should be done by a scheduled job +-- or application-level cleanup, not a CHECK constraint (which can't reference NOW()) diff --git a/makima/src/daemon/ws/protocol.rs b/makima/src/daemon/ws/protocol.rs index bfe6326..5c88038 100644 --- a/makima/src/daemon/ws/protocol.rs +++ b/makima/src/daemon/ws/protocol.rs @@ -2,6 +2,7 @@ //! //! These types mirror the server's protocol exactly for compatibility. +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -26,6 +27,29 @@ pub enum DaemonMessage { active_tasks: Vec<Uuid>, }, + /// Enhanced supervisor heartbeat with detailed state. + /// Sent periodically by supervisor tasks to report their current state. + SupervisorHeartbeat { + #[serde(rename = "taskId")] + task_id: Uuid, + #[serde(rename = "contractId")] + contract_id: Uuid, + /// Supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted + state: String, + /// Current contract phase + phase: String, + /// Description of current activity + #[serde(rename = "currentActivity")] + current_activity: Option<String>, + /// Progress percentage (0-100) + progress: u8, + /// Task IDs the supervisor is waiting on + #[serde(rename = "pendingTaskIds")] + pending_task_ids: Vec<Uuid>, + /// Timestamp of this heartbeat + timestamp: DateTime<Utc>, + }, + /// Task output streaming (stdout/stderr from Claude Code). TaskOutput { #[serde(rename = "taskId")] @@ -857,6 +881,28 @@ impl DaemonMessage { pub fn revoke_tool_key(task_id: Uuid) -> Self { Self::RevokeToolKey { task_id } } + + /// Create a supervisor heartbeat message. + pub fn supervisor_heartbeat( + task_id: Uuid, + contract_id: Uuid, + state: &str, + phase: &str, + current_activity: Option<String>, + progress: u8, + pending_task_ids: Vec<Uuid>, + ) -> Self { + Self::SupervisorHeartbeat { + task_id, + contract_id, + state: state.to_string(), + phase: phase.to_string(), + current_activity, + progress, + pending_task_ids, + timestamp: Utc::now(), + } + } } #[cfg(test)] diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index 636d81a..cc30465 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -2339,6 +2339,111 @@ pub struct CheckpointPatchInfo { // Red Team Types // ============================================================================ +// ============================================================================= +// Supervisor State and Heartbeat Types +// ============================================================================= + +/// Supervisor state for contract supervisor tasks. +/// Captures detailed activity state for monitoring and restoration. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "snake_case")] +pub enum SupervisorStateEnum { + /// Supervisor is starting up + Initializing, + /// Supervisor is idle, waiting for work + Idle, + /// Supervisor is actively working + Working, + /// Supervisor is waiting for user input/confirmation + WaitingForUser, + /// Supervisor is waiting for spawned tasks to complete + WaitingForTasks, + /// Supervisor is blocked (external dependency, error, etc.) + Blocked, + /// Supervisor has completed its contract + Completed, + /// Supervisor has failed + Failed, + /// Supervisor was interrupted (daemon crash, etc.) + Interrupted, +} + +impl Default for SupervisorStateEnum { + fn default() -> Self { + SupervisorStateEnum::Initializing + } +} + +impl std::fmt::Display for SupervisorStateEnum { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SupervisorStateEnum::Initializing => write!(f, "initializing"), + SupervisorStateEnum::Idle => write!(f, "idle"), + SupervisorStateEnum::Working => write!(f, "working"), + SupervisorStateEnum::WaitingForUser => write!(f, "waiting_for_user"), + SupervisorStateEnum::WaitingForTasks => write!(f, "waiting_for_tasks"), + SupervisorStateEnum::Blocked => write!(f, "blocked"), + SupervisorStateEnum::Completed => write!(f, "completed"), + SupervisorStateEnum::Failed => write!(f, "failed"), + SupervisorStateEnum::Interrupted => write!(f, "interrupted"), + } + } +} + +impl std::str::FromStr for SupervisorStateEnum { + type Err = String; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s.to_lowercase().as_str() { + "initializing" => Ok(SupervisorStateEnum::Initializing), + "idle" => Ok(SupervisorStateEnum::Idle), + "working" => Ok(SupervisorStateEnum::Working), + "waiting_for_user" | "waitingforuser" => Ok(SupervisorStateEnum::WaitingForUser), + "waiting_for_tasks" | "waitingfortasks" => Ok(SupervisorStateEnum::WaitingForTasks), + "blocked" => Ok(SupervisorStateEnum::Blocked), + "completed" => Ok(SupervisorStateEnum::Completed), + "failed" => Ok(SupervisorStateEnum::Failed), + "interrupted" => Ok(SupervisorStateEnum::Interrupted), + _ => Err(format!("Unknown supervisor state: {}", s)), + } + } +} + +/// Enhanced heartbeat record for supervisor task monitoring. +/// Stored in the database for historical analysis and dead supervisor detection. +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorHeartbeatRecord { + pub id: Uuid, + pub supervisor_task_id: Uuid, + pub contract_id: Uuid, + pub state: String, + pub phase: String, + pub current_activity: Option<String>, + pub progress: i32, + #[sqlx(try_from = "Vec<Uuid>")] + pub pending_task_ids: Vec<Uuid>, + pub timestamp: DateTime<Utc>, +} + +/// Request payload for sending a supervisor heartbeat. +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorHeartbeatRequest { + pub task_id: Uuid, + pub contract_id: Uuid, + pub state: SupervisorStateEnum, + pub phase: String, + pub current_activity: Option<String>, + /// Progress percentage (0-100) + pub progress: u8, + pub pending_task_ids: Vec<Uuid>, +} + +// ============================================================================= +// Red Team Types +// ============================================================================= + /// Red Team notification record #[derive(Debug, Clone, FromRow, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] @@ -2395,3 +2500,99 @@ impl std::str::FromStr for NotificationSeverity { } } } + +// ============================================================================= +// Unit Tests +// ============================================================================= + +#[cfg(test)] +mod tests { + use super::*; + use uuid::Uuid; + + #[test] + fn test_supervisor_state_enum_display() { + assert_eq!(SupervisorStateEnum::Initializing.to_string(), "initializing"); + assert_eq!(SupervisorStateEnum::Idle.to_string(), "idle"); + assert_eq!(SupervisorStateEnum::Working.to_string(), "working"); + assert_eq!(SupervisorStateEnum::WaitingForUser.to_string(), "waiting_for_user"); + assert_eq!(SupervisorStateEnum::WaitingForTasks.to_string(), "waiting_for_tasks"); + assert_eq!(SupervisorStateEnum::Blocked.to_string(), "blocked"); + assert_eq!(SupervisorStateEnum::Completed.to_string(), "completed"); + assert_eq!(SupervisorStateEnum::Failed.to_string(), "failed"); + assert_eq!(SupervisorStateEnum::Interrupted.to_string(), "interrupted"); + } + + #[test] + fn test_supervisor_state_enum_from_str() { + // Standard lowercase + assert_eq!("initializing".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Initializing); + assert_eq!("idle".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Idle); + assert_eq!("working".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Working); + assert_eq!("waiting_for_user".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::WaitingForUser); + assert_eq!("waiting_for_tasks".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::WaitingForTasks); + assert_eq!("blocked".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Blocked); + assert_eq!("completed".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Completed); + assert_eq!("failed".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Failed); + assert_eq!("interrupted".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Interrupted); + + // Case insensitive + assert_eq!("WORKING".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Working); + assert_eq!("Working".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::Working); + + // Alternative formats + assert_eq!("waitingforuser".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::WaitingForUser); + assert_eq!("waitingfortasks".parse::<SupervisorStateEnum>().unwrap(), SupervisorStateEnum::WaitingForTasks); + + // Invalid state + assert!("invalid_state".parse::<SupervisorStateEnum>().is_err()); + } + + #[test] + fn test_supervisor_state_enum_serialization() { + // Test JSON serialization + let state = SupervisorStateEnum::Working; + let json = serde_json::to_string(&state).unwrap(); + assert_eq!(json, "\"working\""); + + // Test JSON deserialization + let deserialized: SupervisorStateEnum = serde_json::from_str("\"working\"").unwrap(); + assert_eq!(deserialized, SupervisorStateEnum::Working); + + // Test underscore variants + let json = "\"waiting_for_user\""; + let deserialized: SupervisorStateEnum = serde_json::from_str(json).unwrap(); + assert_eq!(deserialized, SupervisorStateEnum::WaitingForUser); + } + + #[test] + fn test_supervisor_state_enum_default() { + let default_state = SupervisorStateEnum::default(); + assert_eq!(default_state, SupervisorStateEnum::Initializing); + } + + #[test] + fn test_supervisor_heartbeat_request_serialization() { + let request = SupervisorHeartbeatRequest { + task_id: Uuid::nil(), + contract_id: Uuid::nil(), + state: SupervisorStateEnum::Working, + phase: "execute".to_string(), + current_activity: Some("Implementing feature".to_string()), + progress: 50, + pending_task_ids: vec![Uuid::nil()], + }; + + let json = serde_json::to_string(&request).unwrap(); + assert!(json.contains("\"state\":\"working\"")); + assert!(json.contains("\"phase\":\"execute\"")); + assert!(json.contains("\"progress\":50")); + assert!(json.contains("\"currentActivity\":\"Implementing feature\"")); + + // Test deserialization + let deserialized: SupervisorHeartbeatRequest = serde_json::from_str(&json).unwrap(); + assert_eq!(deserialized.state, SupervisorStateEnum::Working); + assert_eq!(deserialized.phase, "execute"); + assert_eq!(deserialized.progress, 50); + } +} diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index b7c5af1..1ac188c 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -12,9 +12,9 @@ use super::models::{ CreateFileRequest, CreateTaskRequest, CreateTemplateRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, DeliverableDefinition, File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters, MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult, - PhaseConfig, PhaseDefinition, RedTeamNotification, SupervisorState, Task, TaskCheckpoint, - TaskEvent, TaskSummary, UpdateContractRequest, UpdateFileRequest, UpdateTaskRequest, - UpdateTemplateRequest, + PhaseConfig, PhaseDefinition, RedTeamNotification, SupervisorHeartbeatRecord, SupervisorState, + Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest, UpdateFileRequest, + UpdateTaskRequest, UpdateTemplateRequest, }; /// Repository error types. @@ -3405,6 +3405,160 @@ pub async fn update_supervisor_pending_tasks( } // ============================================================================ +// Supervisor Heartbeats +// ============================================================================ + +/// Record a supervisor heartbeat. +/// This creates a historical record for monitoring and dead supervisor detection. +pub async fn create_supervisor_heartbeat( + pool: &PgPool, + supervisor_task_id: Uuid, + contract_id: Uuid, + state: &str, + phase: &str, + current_activity: Option<&str>, + progress: i32, + pending_task_ids: &[Uuid], +) -> Result<SupervisorHeartbeatRecord, sqlx::Error> { + sqlx::query_as::<_, SupervisorHeartbeatRecord>( + r#" + INSERT INTO supervisor_heartbeats ( + supervisor_task_id, contract_id, state, phase, current_activity, progress, pending_task_ids, timestamp + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, NOW()) + RETURNING * + "#, + ) + .bind(supervisor_task_id) + .bind(contract_id) + .bind(state) + .bind(phase) + .bind(current_activity) + .bind(progress) + .bind(pending_task_ids) + .fetch_one(pool) + .await +} + +/// Get the latest heartbeat for a supervisor task. +pub async fn get_latest_supervisor_heartbeat( + pool: &PgPool, + supervisor_task_id: Uuid, +) -> Result<Option<SupervisorHeartbeatRecord>, sqlx::Error> { + sqlx::query_as::<_, SupervisorHeartbeatRecord>( + r#" + SELECT * FROM supervisor_heartbeats + WHERE supervisor_task_id = $1 + ORDER BY timestamp DESC + LIMIT 1 + "#, + ) + .bind(supervisor_task_id) + .fetch_optional(pool) + .await +} + +/// Get recent heartbeats for a supervisor task. +pub async fn get_supervisor_heartbeats( + pool: &PgPool, + supervisor_task_id: Uuid, + limit: i64, +) -> Result<Vec<SupervisorHeartbeatRecord>, sqlx::Error> { + sqlx::query_as::<_, SupervisorHeartbeatRecord>( + r#" + SELECT * FROM supervisor_heartbeats + WHERE supervisor_task_id = $1 + ORDER BY timestamp DESC + LIMIT $2 + "#, + ) + .bind(supervisor_task_id) + .bind(limit) + .fetch_all(pool) + .await +} + +/// Get recent heartbeats for a contract. +pub async fn get_contract_supervisor_heartbeats( + pool: &PgPool, + contract_id: Uuid, + limit: i64, +) -> Result<Vec<SupervisorHeartbeatRecord>, sqlx::Error> { + sqlx::query_as::<_, SupervisorHeartbeatRecord>( + r#" + SELECT * FROM supervisor_heartbeats + WHERE contract_id = $1 + ORDER BY timestamp DESC + LIMIT $2 + "#, + ) + .bind(contract_id) + .bind(limit) + .fetch_all(pool) + .await +} + +/// Delete old heartbeats beyond the TTL (24 hours by default). +/// Returns the number of deleted records. +pub async fn cleanup_old_heartbeats( + pool: &PgPool, + ttl_hours: i64, +) -> Result<u64, sqlx::Error> { + let result = sqlx::query( + r#" + DELETE FROM supervisor_heartbeats + WHERE timestamp < NOW() - ($1 || ' hours')::INTERVAL + "#, + ) + .bind(ttl_hours.to_string()) + .execute(pool) + .await?; + + Ok(result.rows_affected()) +} + +/// Find supervisors that have not sent a heartbeat within the timeout period. +/// Returns list of (supervisor_task_id, contract_id, last_heartbeat_timestamp). +pub async fn find_stale_supervisors( + pool: &PgPool, + timeout_seconds: i64, +) -> Result<Vec<(Uuid, Uuid, chrono::DateTime<Utc>)>, sqlx::Error> { + let rows = sqlx::query( + r#" + WITH latest_heartbeats AS ( + SELECT DISTINCT ON (supervisor_task_id) + supervisor_task_id, + contract_id, + timestamp + FROM supervisor_heartbeats + ORDER BY supervisor_task_id, timestamp DESC + ) + SELECT + lh.supervisor_task_id, + lh.contract_id, + lh.timestamp + FROM latest_heartbeats lh + JOIN tasks t ON t.id = lh.supervisor_task_id + WHERE t.status = 'running' + AND lh.timestamp < NOW() - ($1 || ' seconds')::INTERVAL + "#, + ) + .bind(timeout_seconds.to_string()) + .fetch_all(pool) + .await?; + + let mut result = Vec::new(); + for row in rows { + use sqlx::Row; + let supervisor_task_id: Uuid = row.get("supervisor_task_id"); + let contract_id: Uuid = row.get("contract_id"); + let timestamp: chrono::DateTime<Utc> = row.get("timestamp"); + result.push((supervisor_task_id, contract_id, timestamp)); + } + Ok(result) +} + +// ============================================================================ // Contract Supervisor // ============================================================================ diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 1152502..887183a 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -15,6 +15,7 @@ use axum::{ response::{IntoResponse, Response}, }; use base64::Engine; +use chrono::{DateTime, Utc}; use futures::{SinkExt, StreamExt}; use serde::Deserialize; use sqlx::Row; @@ -262,6 +263,27 @@ pub enum DaemonMessage { #[serde(rename = "activeTasks")] active_tasks: Vec<Uuid>, }, + /// Enhanced supervisor heartbeat with detailed state + SupervisorHeartbeat { + #[serde(rename = "taskId")] + task_id: Uuid, + #[serde(rename = "contractId")] + contract_id: Uuid, + /// Supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted + state: String, + /// Current contract phase + phase: String, + /// Description of current activity + #[serde(rename = "currentActivity")] + current_activity: Option<String>, + /// Progress percentage (0-100) + progress: u8, + /// Task IDs the supervisor is waiting on + #[serde(rename = "pendingTaskIds")] + pending_task_ids: Vec<Uuid>, + /// Timestamp of this heartbeat + timestamp: DateTime<Utc>, + }, /// Task output streaming (stdout/stderr from Claude Code) TaskOutput { #[serde(rename = "taskId")] @@ -892,6 +914,67 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re }); } } + Ok(DaemonMessage::SupervisorHeartbeat { + task_id, + contract_id, + state: supervisor_state, + phase, + current_activity, + progress, + pending_task_ids, + timestamp: _, + }) => { + tracing::debug!( + task_id = %task_id, + contract_id = %contract_id, + state = %supervisor_state, + phase = %phase, + progress = progress, + "Supervisor heartbeat received" + ); + + // Store heartbeat in database + if let Some(ref pool) = state.db_pool { + let pool = pool.clone(); + let pending_ids = pending_task_ids.clone(); + let activity = current_activity.clone(); + let state_str = supervisor_state.clone(); + let phase_str = phase.clone(); + tokio::spawn(async move { + // Store the heartbeat record + if let Err(e) = repository::create_supervisor_heartbeat( + &pool, + task_id, + contract_id, + &state_str, + &phase_str, + activity.as_deref(), + progress as i32, + &pending_ids, + ).await { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + error = %e, + "Failed to store supervisor heartbeat" + ); + } + + // Also update the daemon heartbeat + if let Ok(Some(task)) = repository::get_task(&pool, task_id).await { + if let Some(daemon_id) = task.daemon_id { + if let Err(e) = repository::update_daemon_heartbeat(&pool, daemon_id).await { + tracing::warn!( + daemon_id = %daemon_id, + error = %e, + "Failed to update daemon heartbeat from supervisor" + ); + } + } + } + }); + } + } Ok(DaemonMessage::TaskOutput { task_id, output, is_partial }) => { // Parse the output line and broadcast structured data if let Some(notification) = parse_claude_output(task_id, owner_id, &output, is_partial) { |
