From 96ad3af6051af69e2e8b34b35e8b40926bdd13a1 Mon Sep 17 00:00:00 2001 From: soryu Date: Sun, 1 Feb 2026 00:42:53 +0000 Subject: feat: Implement Phase 3 Tasks 3.3 and 3.4 - Supervisor State Persistence and Restoration Task 3.3: Supervisor State Persistence - Add migration 20260201000001_enhanced_supervisor_state.sql with new columns: - state (supervisor state enum) - current_activity (description) - progress (0-100) - error_message (for failed states) - spawned_task_ids (tasks created by supervisor) - pending_questions (questions awaiting user response) - restoration_count, last_restored_at, restoration_source (restoration tracking) - Update SupervisorState model with new fields - Add PendingQuestion struct for tracking unanswered questions - Add SupervisorRestorationContext for returning restoration info - Add StateValidationResult and StateRecoveryAction for state validation State persistence functions in repository.rs: - update_supervisor_detailed_state() - Update state, activity, progress - add_supervisor_spawned_task() - Track spawned tasks - add_supervisor_pending_question() - Track pending questions - remove_supervisor_pending_question() - Clear answered questions - save_supervisor_state_full() - Full state save (UPSERT) - mark_supervisor_restored() - Increment restoration count - get_supervisors_with_pending_questions() - Find supervisors with pending questions - get_supervisor_state_for_restoration() - Load state for restoration - validate_spawned_tasks() - Validate task consistency - update_supervisor_phase() - Update on phase change - update_supervisor_heartbeat_state() - Lightweight heartbeat update State save points: - On task spawn (save_state_on_task_spawn) - On question asked (save_state_on_question_asked) - On question answered (clear_pending_question) - On phase change (save_state_on_phase_change) - On heartbeat (update_supervisor_heartbeat_state) Task 3.4: Supervisor Restoration Protocol - Add restoration detection when supervisor starts with existing state - Implement validate_supervisor_state() for state consistency checks - Implement restore_supervisor() with validation and context generation - Add redeliver_pending_questions() for re-delivering questions after crash - Add generate_restoration_context_message() for Claude context injection - Update resume_supervisor endpoint to return RestorationInfo - Re-deliver pending questions when supervisor resumes Restoration flow: 1. Daemon restarts or task reassigned 2. Load supervisor state from supervisor_states 3. If NOT FOUND: Start fresh, log warning 4. If FOUND: Validate state consistency 5. If INVALID: Start from last checkpoint 6. If VALID: Restore conversation history 7. Check for pending questions - re-deliver to user 8. Check for waiting tasks - resume waiting state 9. Send restoration context to Claude 10. Resume execution from last state Co-Authored-By: Claude Opus 4.5 --- makima/src/server/handlers/mesh_daemon.rs | 126 ++++++++++++++++++++++++------ 1 file changed, 102 insertions(+), 24 deletions(-) (limited to 'makima/src/server/handlers/mesh_daemon.rs') diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 887183a..34e2cc3 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -933,7 +933,7 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re "Supervisor heartbeat received" ); - // Store heartbeat in database + // Store heartbeat in database and update supervisor state (Task 3.3) if let Some(ref pool) = state.db_pool { let pool = pool.clone(); let pending_ids = pending_task_ids.clone(); @@ -960,6 +960,22 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re ); } + // Update supervisor_states table (lightweight heartbeat state update - Task 3.3) + if let Err(e) = repository::update_supervisor_heartbeat_state( + &pool, + contract_id, + &state_str, + activity.as_deref(), + progress as i32, + &pending_ids, + ).await { + tracing::debug!( + contract_id = %contract_id, + error = %e, + "Failed to update supervisor state from heartbeat (may not exist yet)" + ); + } + // Also update the daemon heartbeat if let Ok(Some(task)) = repository::get_task(&pool, task_id).await { if let Some(daemon_id) = task.daemon_id { @@ -1035,55 +1051,117 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re updated_by: "daemon".into(), }); - // Initialize supervisor_state when supervisor task starts running + // Initialize or restore supervisor_state when supervisor task starts running (Task 3.4) if updated_task.is_supervisor && new_status_owned == "running" { if let Some(contract_id) = updated_task.contract_id { - // Get contract to get its phase - match repository::get_contract_for_owner( - &pool, - contract_id, - updated_task.owner_id, - ).await { - Ok(Some(contract)) => { - match repository::upsert_supervisor_state( + // Check if supervisor state already exists (restoration scenario) + match repository::get_supervisor_state(&pool, contract_id).await { + Ok(Some(existing_state)) => { + // State exists - this is a restoration + tracing::info!( + task_id = %task_id, + contract_id = %contract_id, + existing_state = %existing_state.state, + restoration_count = existing_state.restoration_count, + "Supervisor starting with existing state - restoration in progress" + ); + + // Mark as restored (increments restoration_count) + match repository::mark_supervisor_restored( &pool, contract_id, - task_id, - serde_json::json!([]), // Empty conversation - &[], // No pending tasks - &contract.phase, + "daemon_restart", ).await { - Ok(_) => { + Ok(restored_state) => { tracing::info!( task_id = %task_id, contract_id = %contract_id, - phase = %contract.phase, - "Initialized supervisor state for running supervisor" + restoration_count = restored_state.restoration_count, + "Supervisor restoration marked" ); + + // Check for pending questions to re-deliver + if let Ok(questions) = serde_json::from_value::>( + restored_state.pending_questions.clone() + ) { + if !questions.is_empty() { + tracing::info!( + contract_id = %contract_id, + question_count = questions.len(), + "Pending questions found for re-delivery" + ); + // Questions will be re-delivered by the supervisor when it restores + } + } } Err(e) => { tracing::warn!( task_id = %task_id, contract_id = %contract_id, error = %e, - "Failed to initialize supervisor state" + "Failed to mark supervisor as restored" ); } } } Ok(None) => { - tracing::warn!( - task_id = %task_id, - contract_id = %contract_id, - "Contract not found when initializing supervisor state" - ); + // No existing state - fresh start + // Get contract to get its phase + match repository::get_contract_for_owner( + &pool, + contract_id, + updated_task.owner_id, + ).await { + Ok(Some(contract)) => { + match repository::upsert_supervisor_state( + &pool, + contract_id, + task_id, + serde_json::json!([]), // Empty conversation + &[], // No pending tasks + &contract.phase, + ).await { + Ok(_) => { + tracing::info!( + task_id = %task_id, + contract_id = %contract_id, + phase = %contract.phase, + "Initialized fresh supervisor state" + ); + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + error = %e, + "Failed to initialize supervisor state" + ); + } + } + } + Ok(None) => { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + "Contract not found when initializing supervisor state" + ); + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + error = %e, + "Failed to get contract for supervisor state" + ); + } + } } Err(e) => { tracing::warn!( task_id = %task_id, contract_id = %contract_id, error = %e, - "Failed to get contract for supervisor state" + "Failed to check existing supervisor state" ); } } -- cgit v1.2.3