From 2f62df1cc89a23a5bd30e1a3f68a39bcfce9665c Mon Sep 17 00:00:00 2001 From: soryu Date: Sat, 17 Jan 2026 05:37:47 +0000 Subject: Add heartbeat commits --- makima/src/server/handlers/contracts.rs | 46 +++++++++++ makima/src/server/handlers/mesh.rs | 15 ++++ makima/src/server/handlers/mesh_daemon.rs | 113 +++++++++++++++++++++++++- makima/src/server/handlers/mesh_supervisor.rs | 15 ++++ 4 files changed, 188 insertions(+), 1 deletion(-) (limited to 'makima/src/server') diff --git a/makima/src/server/handlers/contracts.rs b/makima/src/server/handlers/contracts.rs index afca3d7..684ab2b 100644 --- a/makima/src/server/handlers/contracts.rs +++ b/makima/src/server/handlers/contracts.rs @@ -340,6 +340,22 @@ pub async fn create_contract( ); } + // Record history event for contract creation + let _ = repository::record_history_event( + pool, + auth.owner_id, + Some(contract.id), + None, + "contract", + Some("created"), + Some(&contract.phase), + serde_json::json!({ + "name": &contract.name, + "type": &contract.contract_type, + "description": &contract.description, + }), + ).await; + // Get the summary version with counts match repository::get_contract_summary_for_owner(pool, contract.id, auth.owner_id).await { @@ -474,6 +490,21 @@ pub async fn update_contract( tokio::spawn(async move { cleanup_contract_worktrees(&pool_clone, &state_clone, contract_id).await; }); + + // Record history event for contract completion + let _ = repository::record_history_event( + pool, + auth.owner_id, + Some(contract.id), + None, + "contract", + Some("completed"), + Some(&contract.phase), + serde_json::json!({ + "name": &contract.name, + "status": &contract.status, + }), + ).await; } // Get summary with counts @@ -1255,6 +1286,21 @@ pub async fn change_phase( } } + // Record history event for phase change + let _ = repository::record_history_event( + pool, + auth.owner_id, + Some(contract.id), + None, + "phase", + Some("changed"), + Some(&contract.phase), + serde_json::json!({ + "contractName": &contract.name, + "newPhase": &contract.phase, + }), + ).await; + // Get summary with counts match repository::get_contract_summary_for_owner(pool, contract.id, auth.owner_id).await { diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index cdda3fd..5a08a49 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -216,6 +216,21 @@ pub async fn create_task( match repository::create_task_for_owner(pool, auth.owner_id, req).await { Ok(task) => { + // Record history event for task creation + let _ = repository::record_history_event( + pool, + auth.owner_id, + task.contract_id, + Some(task.id), + "task", + Some("created"), + None, + serde_json::json!({ + "name": &task.name, + "isSupervisor": task.is_supervisor, + }), + ).await; + // Notify supervisor of new task creation if task belongs to a contract if let Some(contract_id) = task.contract_id { if !task.is_supervisor { diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index beb676e..22a2792 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -753,10 +753,82 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re task_id, owner_id: Some(owner_id), version: updated_task.version, - status: new_status_owned, + status: new_status_owned.clone(), updated_fields: vec!["status".into()], updated_by: "daemon".into(), }); + + // Initialize supervisor_state when supervisor task starts running + if updated_task.is_supervisor && new_status_owned == "running" { + if let Some(contract_id) = updated_task.contract_id { + // Get contract to get its phase + match repository::get_contract_for_owner( + &pool, + contract_id, + updated_task.owner_id, + ).await { + Ok(Some(contract)) => { + match repository::upsert_supervisor_state( + &pool, + contract_id, + task_id, + serde_json::json!([]), // Empty conversation + &[], // No pending tasks + &contract.phase, + ).await { + Ok(_) => { + tracing::info!( + task_id = %task_id, + contract_id = %contract_id, + phase = %contract.phase, + "Initialized supervisor state for running supervisor" + ); + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + error = %e, + "Failed to initialize supervisor state" + ); + } + } + } + Ok(None) => { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + "Contract not found when initializing supervisor state" + ); + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + error = %e, + "Failed to get contract for supervisor state" + ); + } + } + } + } + + // Record history event when task starts running + if new_status_owned == "running" { + let _ = repository::record_history_event( + &pool, + updated_task.owner_id, + updated_task.contract_id, + Some(task_id), + "task", + Some("started"), + None, + serde_json::json!({ + "name": &updated_task.name, + "isSupervisor": updated_task.is_supervisor, + }), + ).await; + } } Ok(None) => { tracing::warn!( @@ -850,6 +922,23 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re } } } + + // Record history event for task completion + let subtype = if updated_task.status == "done" { "completed" } else { "failed" }; + let _ = repository::record_history_event( + &pool, + updated_task.owner_id, + updated_task.contract_id, + Some(task_id), + "task", + Some(subtype), + None, + serde_json::json!({ + "name": &updated_task.name, + "status": &updated_task.status, + "error": &updated_task.error_message, + }), + ).await; } Ok(None) => { tracing::warn!( @@ -1225,6 +1314,28 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re duration_ms: None, is_partial: false, }); + + // Record history event for checkpoint + // Get task to get contract_id + if let Ok(Some(task)) = repository::get_task(pool, task_id).await { + let _ = repository::record_history_event( + pool, + task.owner_id, + task.contract_id, + Some(task_id), + "checkpoint", + Some("created"), + None, + serde_json::json!({ + "checkpointNumber": checkpoint.checkpoint_number, + "commitSha": &sha, + "message": &message, + "filesChanged": files_changed, + "linesAdded": lines_added, + "linesRemoved": lines_removed, + }), + ).await; + } } Err(e) => { tracing::error!(error = %e, "Failed to store checkpoint in database"); diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index 754d086..29eef81 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -576,6 +576,21 @@ pub async fn spawn_task( "Supervisor spawned new task" ); + // Record history event for task spawned by supervisor + let _ = repository::record_history_event( + pool, + owner_id, + task.contract_id, + Some(task.id), + "task", + Some("spawned"), + None, + serde_json::json!({ + "name": &task.name, + "spawnedBy": supervisor_id.to_string(), + }), + ).await; + // Start task on a daemon // Find a daemon that belongs to this owner let mut updated_task = task; -- cgit v1.2.3