diff options
| author | soryu <soryu@soryu.co> | 2026-01-17 05:37:47 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-17 05:38:07 +0000 |
| commit | 2f62df1cc89a23a5bd30e1a3f68a39bcfce9665c (patch) | |
| tree | c658378488cf6db293f7ca71d3ca957249a6309e /makima/src/server/handlers/mesh_daemon.rs | |
| parent | 75d9644d44ba998a32ed14c072e883a75145ab72 (diff) | |
| download | soryu-2f62df1cc89a23a5bd30e1a3f68a39bcfce9665c.tar.gz soryu-2f62df1cc89a23a5bd30e1a3f68a39bcfce9665c.zip | |
Add heartbeat commits
Diffstat (limited to 'makima/src/server/handlers/mesh_daemon.rs')
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 113 |
1 files changed, 112 insertions, 1 deletions
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index beb676e..22a2792 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -753,10 +753,82 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re task_id, owner_id: Some(owner_id), version: updated_task.version, - status: new_status_owned, + status: new_status_owned.clone(), updated_fields: vec!["status".into()], updated_by: "daemon".into(), }); + + // Initialize supervisor_state when supervisor task starts running + if updated_task.is_supervisor && new_status_owned == "running" { + if let Some(contract_id) = updated_task.contract_id { + // Get contract to get its phase + match repository::get_contract_for_owner( + &pool, + contract_id, + updated_task.owner_id, + ).await { + Ok(Some(contract)) => { + match repository::upsert_supervisor_state( + &pool, + contract_id, + task_id, + serde_json::json!([]), // Empty conversation + &[], // No pending tasks + &contract.phase, + ).await { + Ok(_) => { + tracing::info!( + task_id = %task_id, + contract_id = %contract_id, + phase = %contract.phase, + "Initialized supervisor state for running supervisor" + ); + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + error = %e, + "Failed to initialize supervisor state" + ); + } + } + } + Ok(None) => { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + "Contract not found when initializing supervisor state" + ); + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + error = %e, + "Failed to get contract for supervisor state" + ); + } + } + } + } + + // Record history event when task starts running + if new_status_owned == "running" { + let _ = repository::record_history_event( + &pool, + updated_task.owner_id, + updated_task.contract_id, + Some(task_id), + "task", + Some("started"), + None, + serde_json::json!({ + "name": &updated_task.name, + "isSupervisor": updated_task.is_supervisor, + }), + ).await; + } } Ok(None) => { tracing::warn!( @@ -850,6 +922,23 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re } } } + + // Record history event for task completion + let subtype = if updated_task.status == "done" { "completed" } else { "failed" }; + let _ = repository::record_history_event( + &pool, + updated_task.owner_id, + updated_task.contract_id, + Some(task_id), + "task", + Some(subtype), + None, + serde_json::json!({ + "name": &updated_task.name, + "status": &updated_task.status, + "error": &updated_task.error_message, + }), + ).await; } Ok(None) => { tracing::warn!( @@ -1225,6 +1314,28 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re duration_ms: None, is_partial: false, }); + + // Record history event for checkpoint + // Get task to get contract_id + if let Ok(Some(task)) = repository::get_task(pool, task_id).await { + let _ = repository::record_history_event( + pool, + task.owner_id, + task.contract_id, + Some(task_id), + "checkpoint", + Some("created"), + None, + serde_json::json!({ + "checkpointNumber": checkpoint.checkpoint_number, + "commitSha": &sha, + "message": &message, + "filesChanged": files_changed, + "linesAdded": lines_added, + "linesRemoved": lines_removed, + }), + ).await; + } } Err(e) => { tracing::error!(error = %e, "Failed to store checkpoint in database"); |
