summaryrefslogtreecommitdiff
path: root/makima/src/server
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/server')
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs83
1 files changed, 83 insertions, 0 deletions
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index 1152502..887183a 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -15,6 +15,7 @@ use axum::{
response::{IntoResponse, Response},
};
use base64::Engine;
+use chrono::{DateTime, Utc};
use futures::{SinkExt, StreamExt};
use serde::Deserialize;
use sqlx::Row;
@@ -262,6 +263,27 @@ pub enum DaemonMessage {
#[serde(rename = "activeTasks")]
active_tasks: Vec<Uuid>,
},
+ /// Enhanced supervisor heartbeat with detailed state
+ SupervisorHeartbeat {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ #[serde(rename = "contractId")]
+ contract_id: Uuid,
+ /// Supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted
+ state: String,
+ /// Current contract phase
+ phase: String,
+ /// Description of current activity
+ #[serde(rename = "currentActivity")]
+ current_activity: Option<String>,
+ /// Progress percentage (0-100)
+ progress: u8,
+ /// Task IDs the supervisor is waiting on
+ #[serde(rename = "pendingTaskIds")]
+ pending_task_ids: Vec<Uuid>,
+ /// Timestamp of this heartbeat
+ timestamp: DateTime<Utc>,
+ },
/// Task output streaming (stdout/stderr from Claude Code)
TaskOutput {
#[serde(rename = "taskId")]
@@ -892,6 +914,67 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
});
}
}
+ Ok(DaemonMessage::SupervisorHeartbeat {
+ task_id,
+ contract_id,
+ state: supervisor_state,
+ phase,
+ current_activity,
+ progress,
+ pending_task_ids,
+ timestamp: _,
+ }) => {
+ tracing::debug!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ state = %supervisor_state,
+ phase = %phase,
+ progress = progress,
+ "Supervisor heartbeat received"
+ );
+
+ // Store heartbeat in database
+ if let Some(ref pool) = state.db_pool {
+ let pool = pool.clone();
+ let pending_ids = pending_task_ids.clone();
+ let activity = current_activity.clone();
+ let state_str = supervisor_state.clone();
+ let phase_str = phase.clone();
+ tokio::spawn(async move {
+ // Store the heartbeat record
+ if let Err(e) = repository::create_supervisor_heartbeat(
+ &pool,
+ task_id,
+ contract_id,
+ &state_str,
+ &phase_str,
+ activity.as_deref(),
+ progress as i32,
+ &pending_ids,
+ ).await {
+ tracing::warn!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ error = %e,
+ "Failed to store supervisor heartbeat"
+ );
+ }
+
+ // Also update the daemon heartbeat
+ if let Ok(Some(task)) = repository::get_task(&pool, task_id).await {
+ if let Some(daemon_id) = task.daemon_id {
+ if let Err(e) = repository::update_daemon_heartbeat(&pool, daemon_id).await {
+ tracing::warn!(
+ daemon_id = %daemon_id,
+ error = %e,
+ "Failed to update daemon heartbeat from supervisor"
+ );
+ }
+ }
+ }
+ });
+ }
+ }
Ok(DaemonMessage::TaskOutput { task_id, output, is_partial }) => {
// Parse the output line and broadcast structured data
if let Some(notification) = parse_claude_output(task_id, owner_id, &output, is_partial) {