summaryrefslogtreecommitdiff
path: root/makima/src/server/handlers/mesh_daemon.rs
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-02-01 01:31:04 +0000
committerGitHub <noreply@github.com>2026-02-01 01:31:04 +0000
commit65eebd078af712d004a5a9e28863a16df30792a6 (patch)
tree3a9457f8e2bcfb0a85a7177d55686ec41bebcf89 /makima/src/server/handlers/mesh_daemon.rs
parent15d680a8a3c22be03a8faacd7bd43214e62a37f4 (diff)
parent5055b3f06d8027870b64abd84d9d3875070372e0 (diff)
downloadsoryu-65eebd078af712d004a5a9e28863a16df30792a6.tar.gz
soryu-65eebd078af712d004a5a9e28863a16df30792a6.zip
Merge pull request #55 from soryu-co/makima/contract-management-phase3
feat: Implement Phase 3 - Supervisor Resilience and State Management
Diffstat (limited to 'makima/src/server/handlers/mesh_daemon.rs')
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs207
1 files changed, 184 insertions, 23 deletions
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index 1152502..34e2cc3 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -15,6 +15,7 @@ use axum::{
response::{IntoResponse, Response},
};
use base64::Engine;
+use chrono::{DateTime, Utc};
use futures::{SinkExt, StreamExt};
use serde::Deserialize;
use sqlx::Row;
@@ -262,6 +263,27 @@ pub enum DaemonMessage {
#[serde(rename = "activeTasks")]
active_tasks: Vec<Uuid>,
},
+ /// Enhanced supervisor heartbeat with detailed state
+ SupervisorHeartbeat {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ #[serde(rename = "contractId")]
+ contract_id: Uuid,
+ /// Supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted
+ state: String,
+ /// Current contract phase
+ phase: String,
+ /// Description of current activity
+ #[serde(rename = "currentActivity")]
+ current_activity: Option<String>,
+ /// Progress percentage (0-100)
+ progress: u8,
+ /// Task IDs the supervisor is waiting on
+ #[serde(rename = "pendingTaskIds")]
+ pending_task_ids: Vec<Uuid>,
+ /// Timestamp of this heartbeat
+ timestamp: DateTime<Utc>,
+ },
/// Task output streaming (stdout/stderr from Claude Code)
TaskOutput {
#[serde(rename = "taskId")]
@@ -892,6 +914,83 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
});
}
}
+ Ok(DaemonMessage::SupervisorHeartbeat {
+ task_id,
+ contract_id,
+ state: supervisor_state,
+ phase,
+ current_activity,
+ progress,
+ pending_task_ids,
+ timestamp: _,
+ }) => {
+ tracing::debug!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ state = %supervisor_state,
+ phase = %phase,
+ progress = progress,
+ "Supervisor heartbeat received"
+ );
+
+ // Store heartbeat in database and update supervisor state (Task 3.3)
+ if let Some(ref pool) = state.db_pool {
+ let pool = pool.clone();
+ let pending_ids = pending_task_ids.clone();
+ let activity = current_activity.clone();
+ let state_str = supervisor_state.clone();
+ let phase_str = phase.clone();
+ tokio::spawn(async move {
+ // Store the heartbeat record
+ if let Err(e) = repository::create_supervisor_heartbeat(
+ &pool,
+ task_id,
+ contract_id,
+ &state_str,
+ &phase_str,
+ activity.as_deref(),
+ progress as i32,
+ &pending_ids,
+ ).await {
+ tracing::warn!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ error = %e,
+ "Failed to store supervisor heartbeat"
+ );
+ }
+
+ // Update supervisor_states table (lightweight heartbeat state update - Task 3.3)
+ if let Err(e) = repository::update_supervisor_heartbeat_state(
+ &pool,
+ contract_id,
+ &state_str,
+ activity.as_deref(),
+ progress as i32,
+ &pending_ids,
+ ).await {
+ tracing::debug!(
+ contract_id = %contract_id,
+ error = %e,
+ "Failed to update supervisor state from heartbeat (may not exist yet)"
+ );
+ }
+
+ // Also update the daemon heartbeat
+ if let Ok(Some(task)) = repository::get_task(&pool, task_id).await {
+ if let Some(daemon_id) = task.daemon_id {
+ if let Err(e) = repository::update_daemon_heartbeat(&pool, daemon_id).await {
+ tracing::warn!(
+ daemon_id = %daemon_id,
+ error = %e,
+ "Failed to update daemon heartbeat from supervisor"
+ );
+ }
+ }
+ }
+ });
+ }
+ }
Ok(DaemonMessage::TaskOutput { task_id, output, is_partial }) => {
// Parse the output line and broadcast structured data
if let Some(notification) = parse_claude_output(task_id, owner_id, &output, is_partial) {
@@ -952,55 +1051,117 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
updated_by: "daemon".into(),
});
- // Initialize supervisor_state when supervisor task starts running
+ // Initialize or restore supervisor_state when supervisor task starts running (Task 3.4)
if updated_task.is_supervisor && new_status_owned == "running" {
if let Some(contract_id) = updated_task.contract_id {
- // Get contract to get its phase
- match repository::get_contract_for_owner(
- &pool,
- contract_id,
- updated_task.owner_id,
- ).await {
- Ok(Some(contract)) => {
- match repository::upsert_supervisor_state(
+ // Check if supervisor state already exists (restoration scenario)
+ match repository::get_supervisor_state(&pool, contract_id).await {
+ Ok(Some(existing_state)) => {
+ // State exists - this is a restoration
+ tracing::info!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ existing_state = %existing_state.state,
+ restoration_count = existing_state.restoration_count,
+ "Supervisor starting with existing state - restoration in progress"
+ );
+
+ // Mark as restored (increments restoration_count)
+ match repository::mark_supervisor_restored(
&pool,
contract_id,
- task_id,
- serde_json::json!([]), // Empty conversation
- &[], // No pending tasks
- &contract.phase,
+ "daemon_restart",
).await {
- Ok(_) => {
+ Ok(restored_state) => {
tracing::info!(
task_id = %task_id,
contract_id = %contract_id,
- phase = %contract.phase,
- "Initialized supervisor state for running supervisor"
+ restoration_count = restored_state.restoration_count,
+ "Supervisor restoration marked"
);
+
+ // Check for pending questions to re-deliver
+ if let Ok(questions) = serde_json::from_value::<Vec<crate::db::models::PendingQuestion>>(
+ restored_state.pending_questions.clone()
+ ) {
+ if !questions.is_empty() {
+ tracing::info!(
+ contract_id = %contract_id,
+ question_count = questions.len(),
+ "Pending questions found for re-delivery"
+ );
+ // Questions will be re-delivered by the supervisor when it restores
+ }
+ }
}
Err(e) => {
tracing::warn!(
task_id = %task_id,
contract_id = %contract_id,
error = %e,
- "Failed to initialize supervisor state"
+ "Failed to mark supervisor as restored"
);
}
}
}
Ok(None) => {
- tracing::warn!(
- task_id = %task_id,
- contract_id = %contract_id,
- "Contract not found when initializing supervisor state"
- );
+ // No existing state - fresh start
+ // Get contract to get its phase
+ match repository::get_contract_for_owner(
+ &pool,
+ contract_id,
+ updated_task.owner_id,
+ ).await {
+ Ok(Some(contract)) => {
+ match repository::upsert_supervisor_state(
+ &pool,
+ contract_id,
+ task_id,
+ serde_json::json!([]), // Empty conversation
+ &[], // No pending tasks
+ &contract.phase,
+ ).await {
+ Ok(_) => {
+ tracing::info!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ phase = %contract.phase,
+ "Initialized fresh supervisor state"
+ );
+ }
+ Err(e) => {
+ tracing::warn!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ error = %e,
+ "Failed to initialize supervisor state"
+ );
+ }
+ }
+ }
+ Ok(None) => {
+ tracing::warn!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ "Contract not found when initializing supervisor state"
+ );
+ }
+ Err(e) => {
+ tracing::warn!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ error = %e,
+ "Failed to get contract for supervisor state"
+ );
+ }
+ }
}
Err(e) => {
tracing::warn!(
task_id = %task_id,
contract_id = %contract_id,
error = %e,
- "Failed to get contract for supervisor state"
+ "Failed to check existing supervisor state"
);
}
}