diff options
| author | soryu <soryu@soryu.co> | 2026-01-17 05:37:47 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-17 05:38:07 +0000 |
| commit | 2f62df1cc89a23a5bd30e1a3f68a39bcfce9665c (patch) | |
| tree | c658378488cf6db293f7ca71d3ca957249a6309e | |
| parent | 75d9644d44ba998a32ed14c072e883a75145ab72 (diff) | |
| download | soryu-2f62df1cc89a23a5bd30e1a3f68a39bcfce9665c.tar.gz soryu-2f62df1cc89a23a5bd30e1a3f68a39bcfce9665c.zip | |
Add heartbeat commits
| -rw-r--r-- | makima/src/bin/makima.rs | 1 | ||||
| -rw-r--r-- | makima/src/daemon/config.rs | 10 | ||||
| -rw-r--r-- | makima/src/daemon/task/manager.rs | 108 | ||||
| -rw-r--r-- | makima/src/server/handlers/contracts.rs | 46 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh.rs | 15 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 113 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_supervisor.rs | 15 |
7 files changed, 307 insertions, 1 deletions
diff --git a/makima/src/bin/makima.rs b/makima/src/bin/makima.rs index 47e627b..37dc81a 100644 --- a/makima/src/bin/makima.rs +++ b/makima/src/bin/makima.rs @@ -185,6 +185,7 @@ async fn run_daemon( disable_verbose: config.process.disable_verbose, bubblewrap: bubblewrap_config, api_url, + heartbeat_commit_interval_secs: config.process.heartbeat_commit_interval_secs, }; // Create task manager diff --git a/makima/src/daemon/config.rs b/makima/src/daemon/config.rs index 512b822..476b57e 100644 --- a/makima/src/daemon/config.rs +++ b/makima/src/daemon/config.rs @@ -213,12 +213,21 @@ pub struct ProcessConfig { /// Bubblewrap sandbox configuration. #[serde(default)] pub bubblewrap: BubblewrapConfig, + + /// Interval in seconds between heartbeat commits (WIP checkpoints). + /// Set to 0 to disable. Default: 300 (5 minutes). + #[serde(default = "default_heartbeat_commit_interval", alias = "heartbeatcommitintervalsecs")] + pub heartbeat_commit_interval_secs: u64, } fn default_claude_command() -> String { "claude".to_string() } +fn default_heartbeat_commit_interval() -> u64 { + 300 // 5 minutes +} + fn default_max_tasks() -> u32 { 4 } @@ -235,6 +244,7 @@ impl Default for ProcessConfig { default_timeout_secs: 0, env_vars: HashMap::new(), bubblewrap: BubblewrapConfig::default(), + heartbeat_commit_interval_secs: default_heartbeat_commit_interval(), } } } diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs index fccebc5..c3ccfa4 100644 --- a/makima/src/daemon/task/manager.rs +++ b/makima/src/daemon/task/manager.rs @@ -980,6 +980,9 @@ pub struct TaskConfig { pub bubblewrap: Option<crate::daemon::config::BubblewrapConfig>, /// API URL for spawned tasks (HTTP endpoint for makima CLI). pub api_url: String, + /// Interval in seconds between heartbeat commits (WIP checkpoints). + /// Set to 0 to disable. Default: 300 (5 minutes). + pub heartbeat_commit_interval_secs: u64, } impl Default for TaskConfig { @@ -995,6 +998,7 @@ impl Default for TaskConfig { disable_verbose: false, bubblewrap: None, api_url: "https://api.makima.jp".to_string(), + heartbeat_commit_interval_secs: 300, // 5 minutes } } } @@ -1587,6 +1591,7 @@ impl TaskManager { git_user_email: self.git_user_email.clone(), git_user_name: self.git_user_name.clone(), api_url: self.config.api_url.clone(), + heartbeat_commit_interval_secs: self.config.heartbeat_commit_interval_secs, } } @@ -2882,6 +2887,7 @@ struct TaskManagerInner { git_user_email: Arc<RwLock<Option<String>>>, git_user_name: Arc<RwLock<Option<String>>>, api_url: String, + heartbeat_commit_interval_secs: u64, } impl TaskManagerInner { @@ -3526,6 +3532,17 @@ impl TaskManagerInner { startup_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); let startup_deadline = tokio::time::Instant::now() + startup_timeout; + // Heartbeat commit interval (only active if configured and we have a git repo) + let heartbeat_enabled = self.heartbeat_commit_interval_secs > 0 && repo_source.is_some(); + let mut heartbeat_interval = tokio::time::interval( + tokio::time::Duration::from_secs( + if heartbeat_enabled { self.heartbeat_commit_interval_secs } else { u64::MAX } + ) + ); + heartbeat_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + // Skip the first immediate tick + heartbeat_interval.tick().await; + loop { tokio::select! { maybe_line = process.next_output() => { @@ -3645,6 +3662,23 @@ impl TaskManagerInner { } } } + _ = heartbeat_interval.tick(), if heartbeat_enabled => { + // Create periodic heartbeat commit to preserve work-in-progress + match self.create_heartbeat_commit(task_id, &working_dir).await { + Ok(sha) => { + let msg = DaemonMessage::task_output( + task_id, + format!("[Heartbeat] Created WIP checkpoint: {}\n", &sha[..8]), + false, + ); + let _ = ws_tx.send(msg).await; + } + Err(e) => { + // No changes to commit or git error - this is fine, just log at debug level + tracing::debug!(task_id = %task_id, error = %e, "Heartbeat commit skipped"); + } + } + } } } @@ -4115,6 +4149,79 @@ impl TaskManagerInner { } } } + + /// Create a heartbeat commit with all uncommitted changes (WIP checkpoint). + /// Returns the commit SHA on success, or an error message if nothing to commit. + async fn create_heartbeat_commit( + &self, + task_id: Uuid, + worktree_path: &std::path::Path, + ) -> Result<String, String> { + // 1. Check for uncommitted changes using git status --porcelain + let status_output = tokio::process::Command::new("git") + .current_dir(worktree_path) + .args(["status", "--porcelain"]) + .output() + .await + .map_err(|e| format!("Failed to run git status: {}", e))?; + + if !status_output.status.success() { + let stderr = String::from_utf8_lossy(&status_output.stderr); + return Err(format!("git status failed: {}", stderr)); + } + + let status = String::from_utf8_lossy(&status_output.stdout); + if status.trim().is_empty() { + return Err("No changes to commit".into()); + } + + // 2. Stage all changes + let add_output = tokio::process::Command::new("git") + .current_dir(worktree_path) + .args(["add", "-A"]) + .output() + .await + .map_err(|e| format!("Failed to run git add: {}", e))?; + + if !add_output.status.success() { + let stderr = String::from_utf8_lossy(&add_output.stderr); + return Err(format!("git add failed: {}", stderr)); + } + + // 3. Create WIP commit with timestamp + let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC"); + let commit_msg = format!("[WIP] Heartbeat checkpoint - {}", timestamp); + + let commit_output = tokio::process::Command::new("git") + .current_dir(worktree_path) + .args(["commit", "-m", &commit_msg]) + .output() + .await + .map_err(|e| format!("Failed to run git commit: {}", e))?; + + if !commit_output.status.success() { + let stderr = String::from_utf8_lossy(&commit_output.stderr); + return Err(format!("git commit failed: {}", stderr)); + } + + // 4. Get the commit SHA + let sha_output = tokio::process::Command::new("git") + .current_dir(worktree_path) + .args(["rev-parse", "HEAD"]) + .output() + .await + .map_err(|e| format!("Failed to run git rev-parse: {}", e))?; + + if !sha_output.status.success() { + let stderr = String::from_utf8_lossy(&sha_output.stderr); + return Err(format!("git rev-parse failed: {}", stderr)); + } + + let sha = String::from_utf8_lossy(&sha_output.stdout).trim().to_string(); + tracing::info!(task_id = %task_id, sha = %sha, "Created heartbeat commit"); + + Ok(sha) + } } impl Clone for TaskManagerInner { @@ -4130,6 +4237,7 @@ impl Clone for TaskManagerInner { git_user_email: self.git_user_email.clone(), git_user_name: self.git_user_name.clone(), api_url: self.api_url.clone(), + heartbeat_commit_interval_secs: self.heartbeat_commit_interval_secs, } } } diff --git a/makima/src/server/handlers/contracts.rs b/makima/src/server/handlers/contracts.rs index afca3d7..684ab2b 100644 --- a/makima/src/server/handlers/contracts.rs +++ b/makima/src/server/handlers/contracts.rs @@ -340,6 +340,22 @@ pub async fn create_contract( ); } + // Record history event for contract creation + let _ = repository::record_history_event( + pool, + auth.owner_id, + Some(contract.id), + None, + "contract", + Some("created"), + Some(&contract.phase), + serde_json::json!({ + "name": &contract.name, + "type": &contract.contract_type, + "description": &contract.description, + }), + ).await; + // Get the summary version with counts match repository::get_contract_summary_for_owner(pool, contract.id, auth.owner_id).await { @@ -474,6 +490,21 @@ pub async fn update_contract( tokio::spawn(async move { cleanup_contract_worktrees(&pool_clone, &state_clone, contract_id).await; }); + + // Record history event for contract completion + let _ = repository::record_history_event( + pool, + auth.owner_id, + Some(contract.id), + None, + "contract", + Some("completed"), + Some(&contract.phase), + serde_json::json!({ + "name": &contract.name, + "status": &contract.status, + }), + ).await; } // Get summary with counts @@ -1255,6 +1286,21 @@ pub async fn change_phase( } } + // Record history event for phase change + let _ = repository::record_history_event( + pool, + auth.owner_id, + Some(contract.id), + None, + "phase", + Some("changed"), + Some(&contract.phase), + serde_json::json!({ + "contractName": &contract.name, + "newPhase": &contract.phase, + }), + ).await; + // Get summary with counts match repository::get_contract_summary_for_owner(pool, contract.id, auth.owner_id).await { diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index cdda3fd..5a08a49 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -216,6 +216,21 @@ pub async fn create_task( match repository::create_task_for_owner(pool, auth.owner_id, req).await { Ok(task) => { + // Record history event for task creation + let _ = repository::record_history_event( + pool, + auth.owner_id, + task.contract_id, + Some(task.id), + "task", + Some("created"), + None, + serde_json::json!({ + "name": &task.name, + "isSupervisor": task.is_supervisor, + }), + ).await; + // Notify supervisor of new task creation if task belongs to a contract if let Some(contract_id) = task.contract_id { if !task.is_supervisor { diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index beb676e..22a2792 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -753,10 +753,82 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re task_id, owner_id: Some(owner_id), version: updated_task.version, - status: new_status_owned, + status: new_status_owned.clone(), updated_fields: vec!["status".into()], updated_by: "daemon".into(), }); + + // Initialize supervisor_state when supervisor task starts running + if updated_task.is_supervisor && new_status_owned == "running" { + if let Some(contract_id) = updated_task.contract_id { + // Get contract to get its phase + match repository::get_contract_for_owner( + &pool, + contract_id, + updated_task.owner_id, + ).await { + Ok(Some(contract)) => { + match repository::upsert_supervisor_state( + &pool, + contract_id, + task_id, + serde_json::json!([]), // Empty conversation + &[], // No pending tasks + &contract.phase, + ).await { + Ok(_) => { + tracing::info!( + task_id = %task_id, + contract_id = %contract_id, + phase = %contract.phase, + "Initialized supervisor state for running supervisor" + ); + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + error = %e, + "Failed to initialize supervisor state" + ); + } + } + } + Ok(None) => { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + "Contract not found when initializing supervisor state" + ); + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + contract_id = %contract_id, + error = %e, + "Failed to get contract for supervisor state" + ); + } + } + } + } + + // Record history event when task starts running + if new_status_owned == "running" { + let _ = repository::record_history_event( + &pool, + updated_task.owner_id, + updated_task.contract_id, + Some(task_id), + "task", + Some("started"), + None, + serde_json::json!({ + "name": &updated_task.name, + "isSupervisor": updated_task.is_supervisor, + }), + ).await; + } } Ok(None) => { tracing::warn!( @@ -850,6 +922,23 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re } } } + + // Record history event for task completion + let subtype = if updated_task.status == "done" { "completed" } else { "failed" }; + let _ = repository::record_history_event( + &pool, + updated_task.owner_id, + updated_task.contract_id, + Some(task_id), + "task", + Some(subtype), + None, + serde_json::json!({ + "name": &updated_task.name, + "status": &updated_task.status, + "error": &updated_task.error_message, + }), + ).await; } Ok(None) => { tracing::warn!( @@ -1225,6 +1314,28 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re duration_ms: None, is_partial: false, }); + + // Record history event for checkpoint + // Get task to get contract_id + if let Ok(Some(task)) = repository::get_task(pool, task_id).await { + let _ = repository::record_history_event( + pool, + task.owner_id, + task.contract_id, + Some(task_id), + "checkpoint", + Some("created"), + None, + serde_json::json!({ + "checkpointNumber": checkpoint.checkpoint_number, + "commitSha": &sha, + "message": &message, + "filesChanged": files_changed, + "linesAdded": lines_added, + "linesRemoved": lines_removed, + }), + ).await; + } } Err(e) => { tracing::error!(error = %e, "Failed to store checkpoint in database"); diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index 754d086..29eef81 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -576,6 +576,21 @@ pub async fn spawn_task( "Supervisor spawned new task" ); + // Record history event for task spawned by supervisor + let _ = repository::record_history_event( + pool, + owner_id, + task.contract_id, + Some(task.id), + "task", + Some("spawned"), + None, + serde_json::json!({ + "name": &task.name, + "spawnedBy": supervisor_id.to_string(), + }), + ).await; + // Start task on a daemon // Find a daemon that belongs to this owner let mut updated_task = task; |
