diff options
| author | soryu <soryu@soryu.co> | 2026-02-01 01:31:04 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-02-01 01:31:04 +0000 |
| commit | 65eebd078af712d004a5a9e28863a16df30792a6 (patch) | |
| tree | 3a9457f8e2bcfb0a85a7177d55686ec41bebcf89 /makima/src/server/handlers/mesh_daemon.rs | |
| parent | 15d680a8a3c22be03a8faacd7bd43214e62a37f4 (diff) | |
| parent | 5055b3f06d8027870b64abd84d9d3875070372e0 (diff) | |
| download | soryu-65eebd078af712d004a5a9e28863a16df30792a6.tar.gz soryu-65eebd078af712d004a5a9e28863a16df30792a6.zip | |
Merge pull request #55 from soryu-co/makima/contract-management-phase3
feat: Implement Phase 3 - Supervisor Resilience and State Management
Diffstat (limited to 'makima/src/server/handlers/mesh_daemon.rs')
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 207 |
1 files changed, 184 insertions, 23 deletions
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 1152502..34e2cc3 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -15,6 +15,7 @@ use axum::{ response::{IntoResponse, Response}, }; use base64::Engine; +use chrono::{DateTime, Utc}; use futures::{SinkExt, StreamExt}; use serde::Deserialize; use sqlx::Row; @@ -262,6 +263,27 @@ pub enum DaemonMessage { #[serde(rename = "activeTasks")] active_tasks: Vec<Uuid>, }, + /// Enhanced supervisor heartbeat with detailed state + SupervisorHeartbeat { + #[serde(rename = "taskId")] + task_id: Uuid, + #[serde(rename = "contractId")] + contract_id: Uuid, + /// Supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted + state: String, + /// Current contract phase + phase: String, + /// Description of current activity + #[serde(rename = "currentActivity")] + current_activity: Option<String>, + /// Progress percentage (0-100) + progress: u8, + /// Task IDs the supervisor is waiting on + #[serde(rename = "pendingTaskIds")] + pending_task_ids: Vec<Uuid>, + /// Timestamp of this heartbeat + timestamp: DateTime<Utc>, + }, /// Task output streaming (stdout/stderr from Claude Code) TaskOutput { #[serde(rename = "taskId")] @@ -892,6 +914,83 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re }); } } + Ok(DaemonMessage::SupervisorHeartbeat { + task_id, + contract_id, + state: supervisor_state, + phase, + current_activity, + progress, + pending_task_ids, + timestamp: _, + }) => { + tracing::debug!( + task_id = %task_id, + contract_id = %contract_id, + state = %supervisor_state, + phase = %phase, + progress = progress, + "Supervisor heartbeat received" + ); + + // Store heartbeat in database and update supervisor state (Task 3.3) + if let Some(ref pool) = state.db_pool { + let pool = pool.clone(); + let pending_ids = pending_task_ids.clone(); + let activity = current_activity.clone(); + let state_str = supervisor_state.clone(); + let phase_str = phase.clone(); + tokio::spawn(async move { + // Store the heartbeat record + if let Err(e) = repository::create_supervisor_heartbeat( + &pool, + task_id, + contract_id, + &state_str, + &phase_str, + activity.as_deref(), + progress as i32, + &pending_ids, + ).await { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + error = %e, + "Failed to store supervisor heartbeat" + ); + } + + // Update supervisor_states table (lightweight heartbeat state update - Task 3.3) + if let Err(e) = repository::update_supervisor_heartbeat_state( + &pool, + contract_id, + &state_str, + activity.as_deref(), + progress as i32, + &pending_ids, + ).await { + tracing::debug!( + contract_id = %contract_id, + error = %e, + "Failed to update supervisor state from heartbeat (may not exist yet)" + ); + } + + // Also update the daemon heartbeat + if let Ok(Some(task)) = repository::get_task(&pool, task_id).await { + if let Some(daemon_id) = task.daemon_id { + if let Err(e) = repository::update_daemon_heartbeat(&pool, daemon_id).await { + tracing::warn!( + daemon_id = %daemon_id, + error = %e, + "Failed to update daemon heartbeat from supervisor" + ); + } + } + } + }); + } + } Ok(DaemonMessage::TaskOutput { task_id, output, is_partial }) => { // Parse the output line and broadcast structured data if let Some(notification) = parse_claude_output(task_id, owner_id, &output, is_partial) { @@ -952,55 +1051,117 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re updated_by: "daemon".into(), }); - // Initialize supervisor_state when supervisor task starts running + // Initialize or restore supervisor_state when supervisor task starts running (Task 3.4) if updated_task.is_supervisor && new_status_owned == "running" { if let Some(contract_id) = updated_task.contract_id { - // Get contract to get its phase - match repository::get_contract_for_owner( - &pool, - contract_id, - updated_task.owner_id, - ).await { - Ok(Some(contract)) => { - match repository::upsert_supervisor_state( + // Check if supervisor state already exists (restoration scenario) + match repository::get_supervisor_state(&pool, contract_id).await { + Ok(Some(existing_state)) => { + // State exists - this is a restoration + tracing::info!( + task_id = %task_id, + contract_id = %contract_id, + existing_state = %existing_state.state, + restoration_count = existing_state.restoration_count, + "Supervisor starting with existing state - restoration in progress" + ); + + // Mark as restored (increments restoration_count) + match repository::mark_supervisor_restored( &pool, contract_id, - task_id, - serde_json::json!([]), // Empty conversation - &[], // No pending tasks - &contract.phase, + "daemon_restart", ).await { - Ok(_) => { + Ok(restored_state) => { tracing::info!( task_id = %task_id, contract_id = %contract_id, - phase = %contract.phase, - "Initialized supervisor state for running supervisor" + restoration_count = restored_state.restoration_count, + "Supervisor restoration marked" ); + + // Check for pending questions to re-deliver + if let Ok(questions) = serde_json::from_value::<Vec<crate::db::models::PendingQuestion>>( + restored_state.pending_questions.clone() + ) { + if !questions.is_empty() { + tracing::info!( + contract_id = %contract_id, + question_count = questions.len(), + "Pending questions found for re-delivery" + ); + // Questions will be re-delivered by the supervisor when it restores + } + } } Err(e) => { tracing::warn!( task_id = %task_id, contract_id = %contract_id, error = %e, - "Failed to initialize supervisor state" + "Failed to mark supervisor as restored" ); } } } Ok(None) => { - tracing::warn!( - task_id = %task_id, - contract_id = %contract_id, - "Contract not found when initializing supervisor state" - ); + // No existing state - fresh start + // Get contract to get its phase + match repository::get_contract_for_owner( + &pool, + contract_id, + updated_task.owner_id, + ).await { + Ok(Some(contract)) => { + match repository::upsert_supervisor_state( + &pool, + contract_id, + task_id, + serde_json::json!([]), // Empty conversation + &[], // No pending tasks + &contract.phase, + ).await { + Ok(_) => { + tracing::info!( + task_id = %task_id, + contract_id = %contract_id, + phase = %contract.phase, + "Initialized fresh supervisor state" + ); + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + error = %e, + "Failed to initialize supervisor state" + ); + } + } + } + Ok(None) => { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + "Contract not found when initializing supervisor state" + ); + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + error = %e, + "Failed to get contract for supervisor state" + ); + } + } } Err(e) => { tracing::warn!( task_id = %task_id, contract_id = %contract_id, error = %e, - "Failed to get contract for supervisor state" + "Failed to check existing supervisor state" ); } } |
