diff options
| author | soryu <soryu@soryu.co> | 2026-02-01 00:42:53 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-02-01 00:42:53 +0000 |
| commit | 96ad3af6051af69e2e8b34b35e8b40926bdd13a1 (patch) | |
| tree | 2e2aedd39c66dedf7da301273306a0c77440ecf4 /makima/src/server/handlers/mesh_daemon.rs | |
| parent | bb14010db99b40792372bfcb4348cf4984f30b3f (diff) | |
| download | soryu-96ad3af6051af69e2e8b34b35e8b40926bdd13a1.tar.gz soryu-96ad3af6051af69e2e8b34b35e8b40926bdd13a1.zip | |
feat: Implement Phase 3 Tasks 3.3 and 3.4 - Supervisor State Persistence and Restoration
Task 3.3: Supervisor State Persistence
- Add migration 20260201000001_enhanced_supervisor_state.sql with new columns:
- state (supervisor state enum)
- current_activity (description)
- progress (0-100)
- error_message (for failed states)
- spawned_task_ids (tasks created by supervisor)
- pending_questions (questions awaiting user response)
- restoration_count, last_restored_at, restoration_source (restoration tracking)
- Update SupervisorState model with new fields
- Add PendingQuestion struct for tracking unanswered questions
- Add SupervisorRestorationContext for returning restoration info
- Add StateValidationResult and StateRecoveryAction for state validation
State persistence functions in repository.rs:
- update_supervisor_detailed_state() - Update state, activity, progress
- add_supervisor_spawned_task() - Track spawned tasks
- add_supervisor_pending_question() - Track pending questions
- remove_supervisor_pending_question() - Clear answered questions
- save_supervisor_state_full() - Full state save (UPSERT)
- mark_supervisor_restored() - Increment restoration count
- get_supervisors_with_pending_questions() - Find supervisors with pending questions
- get_supervisor_state_for_restoration() - Load state for restoration
- validate_spawned_tasks() - Validate task consistency
- update_supervisor_phase() - Update on phase change
- update_supervisor_heartbeat_state() - Lightweight heartbeat update
State save points:
- On task spawn (save_state_on_task_spawn)
- On question asked (save_state_on_question_asked)
- On question answered (clear_pending_question)
- On phase change (save_state_on_phase_change)
- On heartbeat (update_supervisor_heartbeat_state)
Task 3.4: Supervisor Restoration Protocol
- Add restoration detection when supervisor starts with existing state
- Implement validate_supervisor_state() for state consistency checks
- Implement restore_supervisor() with validation and context generation
- Add redeliver_pending_questions() for re-delivering questions after crash
- Add generate_restoration_context_message() for Claude context injection
- Update resume_supervisor endpoint to return RestorationInfo
- Re-deliver pending questions when supervisor resumes
Restoration flow:
1. Daemon restarts or task reassigned
2. Load supervisor state from supervisor_states
3. If NOT FOUND: Start fresh, log warning
4. If FOUND: Validate state consistency
5. If INVALID: Start from last checkpoint
6. If VALID: Restore conversation history
7. Check for pending questions - re-deliver to user
8. Check for waiting tasks - resume waiting state
9. Send restoration context to Claude
10. Resume execution from last state
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Diffstat (limited to 'makima/src/server/handlers/mesh_daemon.rs')
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 126 |
1 files changed, 102 insertions, 24 deletions
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 887183a..34e2cc3 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -933,7 +933,7 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re "Supervisor heartbeat received" ); - // Store heartbeat in database + // Store heartbeat in database and update supervisor state (Task 3.3) if let Some(ref pool) = state.db_pool { let pool = pool.clone(); let pending_ids = pending_task_ids.clone(); @@ -960,6 +960,22 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re ); } + // Update supervisor_states table (lightweight heartbeat state update - Task 3.3) + if let Err(e) = repository::update_supervisor_heartbeat_state( + &pool, + contract_id, + &state_str, + activity.as_deref(), + progress as i32, + &pending_ids, + ).await { + tracing::debug!( + contract_id = %contract_id, + error = %e, + "Failed to update supervisor state from heartbeat (may not exist yet)" + ); + } + // Also update the daemon heartbeat if let Ok(Some(task)) = repository::get_task(&pool, task_id).await { if let Some(daemon_id) = task.daemon_id { @@ -1035,55 +1051,117 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re updated_by: "daemon".into(), }); - // Initialize supervisor_state when supervisor task starts running + // Initialize or restore supervisor_state when supervisor task starts running (Task 3.4) if updated_task.is_supervisor && new_status_owned == "running" { if let Some(contract_id) = updated_task.contract_id { - // Get contract to get its phase - match repository::get_contract_for_owner( - &pool, - contract_id, - updated_task.owner_id, - ).await { - Ok(Some(contract)) => { - match repository::upsert_supervisor_state( + // Check if supervisor state already exists (restoration scenario) + match repository::get_supervisor_state(&pool, contract_id).await { + Ok(Some(existing_state)) => { + // State exists - this is a restoration + tracing::info!( + task_id = %task_id, + contract_id = %contract_id, + existing_state = %existing_state.state, + restoration_count = existing_state.restoration_count, + "Supervisor starting with existing state - restoration in progress" + ); + + // Mark as restored (increments restoration_count) + match repository::mark_supervisor_restored( &pool, contract_id, - task_id, - serde_json::json!([]), // Empty conversation - &[], // No pending tasks - &contract.phase, + "daemon_restart", ).await { - Ok(_) => { + Ok(restored_state) => { tracing::info!( task_id = %task_id, contract_id = %contract_id, - phase = %contract.phase, - "Initialized supervisor state for running supervisor" + restoration_count = restored_state.restoration_count, + "Supervisor restoration marked" ); + + // Check for pending questions to re-deliver + if let Ok(questions) = serde_json::from_value::<Vec<crate::db::models::PendingQuestion>>( + restored_state.pending_questions.clone() + ) { + if !questions.is_empty() { + tracing::info!( + contract_id = %contract_id, + question_count = questions.len(), + "Pending questions found for re-delivery" + ); + // Questions will be re-delivered by the supervisor when it restores + } + } } Err(e) => { tracing::warn!( task_id = %task_id, contract_id = %contract_id, error = %e, - "Failed to initialize supervisor state" + "Failed to mark supervisor as restored" ); } } } Ok(None) => { - tracing::warn!( - task_id = %task_id, - contract_id = %contract_id, - "Contract not found when initializing supervisor state" - ); + // No existing state - fresh start + // Get contract to get its phase + match repository::get_contract_for_owner( + &pool, + contract_id, + updated_task.owner_id, + ).await { + Ok(Some(contract)) => { + match repository::upsert_supervisor_state( + &pool, + contract_id, + task_id, + serde_json::json!([]), // Empty conversation + &[], // No pending tasks + &contract.phase, + ).await { + Ok(_) => { + tracing::info!( + task_id = %task_id, + contract_id = %contract_id, + phase = %contract.phase, + "Initialized fresh supervisor state" + ); + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + error = %e, + "Failed to initialize supervisor state" + ); + } + } + } + Ok(None) => { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + "Contract not found when initializing supervisor state" + ); + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + error = %e, + "Failed to get contract for supervisor state" + ); + } + } } Err(e) => { tracing::warn!( task_id = %task_id, contract_id = %contract_id, error = %e, - "Failed to get contract for supervisor state" + "Failed to check existing supervisor state" ); } } |
