From bb14010db99b40792372bfcb4348cf4984f30b3f Mon Sep 17 00:00:00 2001 From: soryu Date: Sun, 1 Feb 2026 00:20:55 +0000 Subject: feat: Implement Phase 3 Tasks 3.1 and 3.2 - SupervisorState enum and Heartbeat Infrastructure Task 3.1: Enhanced Supervisor State Enum - Add SupervisorStateEnum with states: Initializing, Idle, Working, WaitingForUser, WaitingForTasks, Blocked, Completed, Failed, Interrupted - Implement Display, FromStr, Default, and serde serialization - Add SupervisorHeartbeatRecord and SupervisorHeartbeatRequest structs Task 3.2: Heartbeat Infrastructure - Create supervisor_heartbeats migration with proper indexes and constraints - Add heartbeat storage functions to repository.rs: - create_supervisor_heartbeat - get_latest_supervisor_heartbeat - get_supervisor_heartbeats - get_contract_supervisor_heartbeats - cleanup_old_heartbeats (24 hour TTL support) - find_stale_supervisors (for dead supervisor detection) - Add SupervisorHeartbeat message to protocol.rs with enhanced fields - Update mesh_daemon.rs to process and store supervisor heartbeats - Add unit tests for SupervisorStateEnum and heartbeat serialization Co-Authored-By: Claude Opus 4.5 --- makima/src/server/handlers/mesh_daemon.rs | 83 +++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) (limited to 'makima/src/server') diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 1152502..887183a 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -15,6 +15,7 @@ use axum::{ response::{IntoResponse, Response}, }; use base64::Engine; +use chrono::{DateTime, Utc}; use futures::{SinkExt, StreamExt}; use serde::Deserialize; use sqlx::Row; @@ -262,6 +263,27 @@ pub enum DaemonMessage { #[serde(rename = "activeTasks")] active_tasks: Vec, }, + /// Enhanced supervisor heartbeat with detailed state + SupervisorHeartbeat { + #[serde(rename = "taskId")] + task_id: Uuid, + #[serde(rename = "contractId")] + contract_id: Uuid, + /// Supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted + state: String, + /// Current contract phase + phase: String, + /// Description of current activity + #[serde(rename = "currentActivity")] + current_activity: Option, + /// Progress percentage (0-100) + progress: u8, + /// Task IDs the supervisor is waiting on + #[serde(rename = "pendingTaskIds")] + pending_task_ids: Vec, + /// Timestamp of this heartbeat + timestamp: DateTime, + }, /// Task output streaming (stdout/stderr from Claude Code) TaskOutput { #[serde(rename = "taskId")] @@ -892,6 +914,67 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re }); } } + Ok(DaemonMessage::SupervisorHeartbeat { + task_id, + contract_id, + state: supervisor_state, + phase, + current_activity, + progress, + pending_task_ids, + timestamp: _, + }) => { + tracing::debug!( + task_id = %task_id, + contract_id = %contract_id, + state = %supervisor_state, + phase = %phase, + progress = progress, + "Supervisor heartbeat received" + ); + + // Store heartbeat in database + if let Some(ref pool) = state.db_pool { + let pool = pool.clone(); + let pending_ids = pending_task_ids.clone(); + let activity = current_activity.clone(); + let state_str = supervisor_state.clone(); + let phase_str = phase.clone(); + tokio::spawn(async move { + // Store the heartbeat record + if let Err(e) = repository::create_supervisor_heartbeat( + &pool, + task_id, + contract_id, + &state_str, + &phase_str, + activity.as_deref(), + progress as i32, + &pending_ids, + ).await { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + error = %e, + "Failed to store supervisor heartbeat" + ); + } + + // Also update the daemon heartbeat + if let Ok(Some(task)) = repository::get_task(&pool, task_id).await { + if let Some(daemon_id) = task.daemon_id { + if let Err(e) = repository::update_daemon_heartbeat(&pool, daemon_id).await { + tracing::warn!( + daemon_id = %daemon_id, + error = %e, + "Failed to update daemon heartbeat from supervisor" + ); + } + } + } + }); + } + } Ok(DaemonMessage::TaskOutput { task_id, output, is_partial }) => { // Parse the output line and broadcast structured data if let Some(notification) = parse_claude_output(task_id, owner_id, &output, is_partial) { -- cgit v1.2.3