summaryrefslogtreecommitdiff
path: root/makima/src/server/handlers/mesh_daemon.rs
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-02-01 00:20:55 +0000
committersoryu <soryu@soryu.co>2026-02-01 00:25:43 +0000
commitbb14010db99b40792372bfcb4348cf4984f30b3f (patch)
treed5c12af5ce8e87430daad3f80a979157233e8644 /makima/src/server/handlers/mesh_daemon.rs
parent7567153e6281b94e39e52be5d060b381ed69597d (diff)
downloadsoryu-bb14010db99b40792372bfcb4348cf4984f30b3f.tar.gz
soryu-bb14010db99b40792372bfcb4348cf4984f30b3f.zip
feat: Implement Phase 3 Tasks 3.1 and 3.2 - SupervisorState enum and Heartbeat Infrastructure
Task 3.1: Enhanced Supervisor State Enum - Add SupervisorStateEnum with states: Initializing, Idle, Working, WaitingForUser, WaitingForTasks, Blocked, Completed, Failed, Interrupted - Implement Display, FromStr, Default, and serde serialization - Add SupervisorHeartbeatRecord and SupervisorHeartbeatRequest structs Task 3.2: Heartbeat Infrastructure - Create supervisor_heartbeats migration with proper indexes and constraints - Add heartbeat storage functions to repository.rs: - create_supervisor_heartbeat - get_latest_supervisor_heartbeat - get_supervisor_heartbeats - get_contract_supervisor_heartbeats - cleanup_old_heartbeats (24 hour TTL support) - find_stale_supervisors (for dead supervisor detection) - Add SupervisorHeartbeat message to protocol.rs with enhanced fields - Update mesh_daemon.rs to process and store supervisor heartbeats - Add unit tests for SupervisorStateEnum and heartbeat serialization Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Diffstat (limited to 'makima/src/server/handlers/mesh_daemon.rs')
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs83
1 files changed, 83 insertions, 0 deletions
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index 1152502..887183a 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -15,6 +15,7 @@ use axum::{
response::{IntoResponse, Response},
};
use base64::Engine;
+use chrono::{DateTime, Utc};
use futures::{SinkExt, StreamExt};
use serde::Deserialize;
use sqlx::Row;
@@ -262,6 +263,27 @@ pub enum DaemonMessage {
#[serde(rename = "activeTasks")]
active_tasks: Vec<Uuid>,
},
+ /// Enhanced supervisor heartbeat with detailed state
+ SupervisorHeartbeat {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ #[serde(rename = "contractId")]
+ contract_id: Uuid,
+ /// Supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted
+ state: String,
+ /// Current contract phase
+ phase: String,
+ /// Description of current activity
+ #[serde(rename = "currentActivity")]
+ current_activity: Option<String>,
+ /// Progress percentage (0-100)
+ progress: u8,
+ /// Task IDs the supervisor is waiting on
+ #[serde(rename = "pendingTaskIds")]
+ pending_task_ids: Vec<Uuid>,
+ /// Timestamp of this heartbeat
+ timestamp: DateTime<Utc>,
+ },
/// Task output streaming (stdout/stderr from Claude Code)
TaskOutput {
#[serde(rename = "taskId")]
@@ -892,6 +914,67 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
});
}
}
+ Ok(DaemonMessage::SupervisorHeartbeat {
+ task_id,
+ contract_id,
+ state: supervisor_state,
+ phase,
+ current_activity,
+ progress,
+ pending_task_ids,
+ timestamp: _,
+ }) => {
+ tracing::debug!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ state = %supervisor_state,
+ phase = %phase,
+ progress = progress,
+ "Supervisor heartbeat received"
+ );
+
+ // Store heartbeat in database
+ if let Some(ref pool) = state.db_pool {
+ let pool = pool.clone();
+ let pending_ids = pending_task_ids.clone();
+ let activity = current_activity.clone();
+ let state_str = supervisor_state.clone();
+ let phase_str = phase.clone();
+ tokio::spawn(async move {
+ // Store the heartbeat record
+ if let Err(e) = repository::create_supervisor_heartbeat(
+ &pool,
+ task_id,
+ contract_id,
+ &state_str,
+ &phase_str,
+ activity.as_deref(),
+ progress as i32,
+ &pending_ids,
+ ).await {
+ tracing::warn!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ error = %e,
+ "Failed to store supervisor heartbeat"
+ );
+ }
+
+ // Also update the daemon heartbeat
+ if let Ok(Some(task)) = repository::get_task(&pool, task_id).await {
+ if let Some(daemon_id) = task.daemon_id {
+ if let Err(e) = repository::update_daemon_heartbeat(&pool, daemon_id).await {
+ tracing::warn!(
+ daemon_id = %daemon_id,
+ error = %e,
+ "Failed to update daemon heartbeat from supervisor"
+ );
+ }
+ }
+ }
+ });
+ }
+ }
Ok(DaemonMessage::TaskOutput { task_id, output, is_partial }) => {
// Parse the output line and broadcast structured data
if let Some(notification) = parse_claude_output(task_id, owner_id, &output, is_partial) {