summaryrefslogtreecommitdiff
path: root/makima/src/server/handlers/mesh_daemon.rs
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-02-01 00:42:53 +0000
committersoryu <soryu@soryu.co>2026-02-01 00:42:53 +0000
commit96ad3af6051af69e2e8b34b35e8b40926bdd13a1 (patch)
tree2e2aedd39c66dedf7da301273306a0c77440ecf4 /makima/src/server/handlers/mesh_daemon.rs
parentbb14010db99b40792372bfcb4348cf4984f30b3f (diff)
downloadsoryu-96ad3af6051af69e2e8b34b35e8b40926bdd13a1.tar.gz
soryu-96ad3af6051af69e2e8b34b35e8b40926bdd13a1.zip
feat: Implement Phase 3 Tasks 3.3 and 3.4 - Supervisor State Persistence and Restoration
Task 3.3: Supervisor State Persistence - Add migration 20260201000001_enhanced_supervisor_state.sql with new columns: - state (supervisor state enum) - current_activity (description) - progress (0-100) - error_message (for failed states) - spawned_task_ids (tasks created by supervisor) - pending_questions (questions awaiting user response) - restoration_count, last_restored_at, restoration_source (restoration tracking) - Update SupervisorState model with new fields - Add PendingQuestion struct for tracking unanswered questions - Add SupervisorRestorationContext for returning restoration info - Add StateValidationResult and StateRecoveryAction for state validation State persistence functions in repository.rs: - update_supervisor_detailed_state() - Update state, activity, progress - add_supervisor_spawned_task() - Track spawned tasks - add_supervisor_pending_question() - Track pending questions - remove_supervisor_pending_question() - Clear answered questions - save_supervisor_state_full() - Full state save (UPSERT) - mark_supervisor_restored() - Increment restoration count - get_supervisors_with_pending_questions() - Find supervisors with pending questions - get_supervisor_state_for_restoration() - Load state for restoration - validate_spawned_tasks() - Validate task consistency - update_supervisor_phase() - Update on phase change - update_supervisor_heartbeat_state() - Lightweight heartbeat update State save points: - On task spawn (save_state_on_task_spawn) - On question asked (save_state_on_question_asked) - On question answered (clear_pending_question) - On phase change (save_state_on_phase_change) - On heartbeat (update_supervisor_heartbeat_state) Task 3.4: Supervisor Restoration Protocol - Add restoration detection when supervisor starts with existing state - Implement validate_supervisor_state() for state consistency checks - Implement restore_supervisor() with validation and context generation - Add redeliver_pending_questions() for re-delivering questions after crash - Add generate_restoration_context_message() for Claude context injection - Update resume_supervisor endpoint to return RestorationInfo - Re-deliver pending questions when supervisor resumes Restoration flow: 1. Daemon restarts or task reassigned 2. Load supervisor state from supervisor_states 3. If NOT FOUND: Start fresh, log warning 4. If FOUND: Validate state consistency 5. If INVALID: Start from last checkpoint 6. If VALID: Restore conversation history 7. Check for pending questions - re-deliver to user 8. Check for waiting tasks - resume waiting state 9. Send restoration context to Claude 10. Resume execution from last state Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Diffstat (limited to 'makima/src/server/handlers/mesh_daemon.rs')
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs126
1 files changed, 102 insertions, 24 deletions
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index 887183a..34e2cc3 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -933,7 +933,7 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
"Supervisor heartbeat received"
);
- // Store heartbeat in database
+ // Store heartbeat in database and update supervisor state (Task 3.3)
if let Some(ref pool) = state.db_pool {
let pool = pool.clone();
let pending_ids = pending_task_ids.clone();
@@ -960,6 +960,22 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
);
}
+ // Update supervisor_states table (lightweight heartbeat state update - Task 3.3)
+ if let Err(e) = repository::update_supervisor_heartbeat_state(
+ &pool,
+ contract_id,
+ &state_str,
+ activity.as_deref(),
+ progress as i32,
+ &pending_ids,
+ ).await {
+ tracing::debug!(
+ contract_id = %contract_id,
+ error = %e,
+ "Failed to update supervisor state from heartbeat (may not exist yet)"
+ );
+ }
+
// Also update the daemon heartbeat
if let Ok(Some(task)) = repository::get_task(&pool, task_id).await {
if let Some(daemon_id) = task.daemon_id {
@@ -1035,55 +1051,117 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
updated_by: "daemon".into(),
});
- // Initialize supervisor_state when supervisor task starts running
+ // Initialize or restore supervisor_state when supervisor task starts running (Task 3.4)
if updated_task.is_supervisor && new_status_owned == "running" {
if let Some(contract_id) = updated_task.contract_id {
- // Get contract to get its phase
- match repository::get_contract_for_owner(
- &pool,
- contract_id,
- updated_task.owner_id,
- ).await {
- Ok(Some(contract)) => {
- match repository::upsert_supervisor_state(
+ // Check if supervisor state already exists (restoration scenario)
+ match repository::get_supervisor_state(&pool, contract_id).await {
+ Ok(Some(existing_state)) => {
+ // State exists - this is a restoration
+ tracing::info!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ existing_state = %existing_state.state,
+ restoration_count = existing_state.restoration_count,
+ "Supervisor starting with existing state - restoration in progress"
+ );
+
+ // Mark as restored (increments restoration_count)
+ match repository::mark_supervisor_restored(
&pool,
contract_id,
- task_id,
- serde_json::json!([]), // Empty conversation
- &[], // No pending tasks
- &contract.phase,
+ "daemon_restart",
).await {
- Ok(_) => {
+ Ok(restored_state) => {
tracing::info!(
task_id = %task_id,
contract_id = %contract_id,
- phase = %contract.phase,
- "Initialized supervisor state for running supervisor"
+ restoration_count = restored_state.restoration_count,
+ "Supervisor restoration marked"
);
+
+ // Check for pending questions to re-deliver
+ if let Ok(questions) = serde_json::from_value::<Vec<crate::db::models::PendingQuestion>>(
+ restored_state.pending_questions.clone()
+ ) {
+ if !questions.is_empty() {
+ tracing::info!(
+ contract_id = %contract_id,
+ question_count = questions.len(),
+ "Pending questions found for re-delivery"
+ );
+ // Questions will be re-delivered by the supervisor when it restores
+ }
+ }
}
Err(e) => {
tracing::warn!(
task_id = %task_id,
contract_id = %contract_id,
error = %e,
- "Failed to initialize supervisor state"
+ "Failed to mark supervisor as restored"
);
}
}
}
Ok(None) => {
- tracing::warn!(
- task_id = %task_id,
- contract_id = %contract_id,
- "Contract not found when initializing supervisor state"
- );
+ // No existing state - fresh start
+ // Get contract to get its phase
+ match repository::get_contract_for_owner(
+ &pool,
+ contract_id,
+ updated_task.owner_id,
+ ).await {
+ Ok(Some(contract)) => {
+ match repository::upsert_supervisor_state(
+ &pool,
+ contract_id,
+ task_id,
+ serde_json::json!([]), // Empty conversation
+ &[], // No pending tasks
+ &contract.phase,
+ ).await {
+ Ok(_) => {
+ tracing::info!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ phase = %contract.phase,
+ "Initialized fresh supervisor state"
+ );
+ }
+ Err(e) => {
+ tracing::warn!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ error = %e,
+ "Failed to initialize supervisor state"
+ );
+ }
+ }
+ }
+ Ok(None) => {
+ tracing::warn!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ "Contract not found when initializing supervisor state"
+ );
+ }
+ Err(e) => {
+ tracing::warn!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ error = %e,
+ "Failed to get contract for supervisor state"
+ );
+ }
+ }
}
Err(e) => {
tracing::warn!(
task_id = %task_id,
contract_id = %contract_id,
error = %e,
- "Failed to get contract for supervisor state"
+ "Failed to check existing supervisor state"
);
}
}