diff options
| author | soryu <soryu@soryu.co> | 2026-02-01 00:20:55 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-02-01 00:25:43 +0000 |
| commit | bb14010db99b40792372bfcb4348cf4984f30b3f (patch) | |
| tree | d5c12af5ce8e87430daad3f80a979157233e8644 /makima/src/server/handlers/mesh_daemon.rs | |
| parent | 7567153e6281b94e39e52be5d060b381ed69597d (diff) | |
| download | soryu-bb14010db99b40792372bfcb4348cf4984f30b3f.tar.gz soryu-bb14010db99b40792372bfcb4348cf4984f30b3f.zip | |
feat: Implement Phase 3 Tasks 3.1 and 3.2 - SupervisorState enum and Heartbeat Infrastructure
Task 3.1: Enhanced Supervisor State Enum
- Add SupervisorStateEnum with states: Initializing, Idle, Working, WaitingForUser,
WaitingForTasks, Blocked, Completed, Failed, Interrupted
- Implement Display, FromStr, Default, and serde serialization
- Add SupervisorHeartbeatRecord and SupervisorHeartbeatRequest structs
Task 3.2: Heartbeat Infrastructure
- Create supervisor_heartbeats migration with proper indexes and constraints
- Add heartbeat storage functions to repository.rs:
- create_supervisor_heartbeat
- get_latest_supervisor_heartbeat
- get_supervisor_heartbeats
- get_contract_supervisor_heartbeats
- cleanup_old_heartbeats (24 hour TTL support)
- find_stale_supervisors (for dead supervisor detection)
- Add SupervisorHeartbeat message to protocol.rs with enhanced fields
- Update mesh_daemon.rs to process and store supervisor heartbeats
- Add unit tests for SupervisorStateEnum and heartbeat serialization
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Diffstat (limited to 'makima/src/server/handlers/mesh_daemon.rs')
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 83 |
1 files changed, 83 insertions, 0 deletions
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 1152502..887183a 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -15,6 +15,7 @@ use axum::{ response::{IntoResponse, Response}, }; use base64::Engine; +use chrono::{DateTime, Utc}; use futures::{SinkExt, StreamExt}; use serde::Deserialize; use sqlx::Row; @@ -262,6 +263,27 @@ pub enum DaemonMessage { #[serde(rename = "activeTasks")] active_tasks: Vec<Uuid>, }, + /// Enhanced supervisor heartbeat with detailed state + SupervisorHeartbeat { + #[serde(rename = "taskId")] + task_id: Uuid, + #[serde(rename = "contractId")] + contract_id: Uuid, + /// Supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted + state: String, + /// Current contract phase + phase: String, + /// Description of current activity + #[serde(rename = "currentActivity")] + current_activity: Option<String>, + /// Progress percentage (0-100) + progress: u8, + /// Task IDs the supervisor is waiting on + #[serde(rename = "pendingTaskIds")] + pending_task_ids: Vec<Uuid>, + /// Timestamp of this heartbeat + timestamp: DateTime<Utc>, + }, /// Task output streaming (stdout/stderr from Claude Code) TaskOutput { #[serde(rename = "taskId")] @@ -892,6 +914,67 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re }); } } + Ok(DaemonMessage::SupervisorHeartbeat { + task_id, + contract_id, + state: supervisor_state, + phase, + current_activity, + progress, + pending_task_ids, + timestamp: _, + }) => { + tracing::debug!( + task_id = %task_id, + contract_id = %contract_id, + state = %supervisor_state, + phase = %phase, + progress = progress, + "Supervisor heartbeat received" + ); + + // Store heartbeat in database + if let Some(ref pool) = state.db_pool { + let pool = pool.clone(); + let pending_ids = pending_task_ids.clone(); + let activity = current_activity.clone(); + let state_str = supervisor_state.clone(); + let phase_str = phase.clone(); + tokio::spawn(async move { + // Store the heartbeat record + if let Err(e) = repository::create_supervisor_heartbeat( + &pool, + task_id, + contract_id, + &state_str, + &phase_str, + activity.as_deref(), + progress as i32, + &pending_ids, + ).await { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + error = %e, + "Failed to store supervisor heartbeat" + ); + } + + // Also update the daemon heartbeat + if let Ok(Some(task)) = repository::get_task(&pool, task_id).await { + if let Some(daemon_id) = task.daemon_id { + if let Err(e) = repository::update_daemon_heartbeat(&pool, daemon_id).await { + tracing::warn!( + daemon_id = %daemon_id, + error = %e, + "Failed to update daemon heartbeat from supervisor" + ); + } + } + } + }); + } + } Ok(DaemonMessage::TaskOutput { task_id, output, is_partial }) => { // Parse the output line and broadcast structured data if let Some(notification) = parse_claude_output(task_id, owner_id, &output, is_partial) { |
