summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-17 05:37:47 +0000
committersoryu <soryu@soryu.co>2026-01-17 05:38:07 +0000
commit2f62df1cc89a23a5bd30e1a3f68a39bcfce9665c (patch)
treec658378488cf6db293f7ca71d3ca957249a6309e
parent75d9644d44ba998a32ed14c072e883a75145ab72 (diff)
downloadsoryu-2f62df1cc89a23a5bd30e1a3f68a39bcfce9665c.tar.gz
soryu-2f62df1cc89a23a5bd30e1a3f68a39bcfce9665c.zip
Add heartbeat commits
-rw-r--r--makima/src/bin/makima.rs1
-rw-r--r--makima/src/daemon/config.rs10
-rw-r--r--makima/src/daemon/task/manager.rs108
-rw-r--r--makima/src/server/handlers/contracts.rs46
-rw-r--r--makima/src/server/handlers/mesh.rs15
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs113
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs15
7 files changed, 307 insertions, 1 deletions
diff --git a/makima/src/bin/makima.rs b/makima/src/bin/makima.rs
index 47e627b..37dc81a 100644
--- a/makima/src/bin/makima.rs
+++ b/makima/src/bin/makima.rs
@@ -185,6 +185,7 @@ async fn run_daemon(
disable_verbose: config.process.disable_verbose,
bubblewrap: bubblewrap_config,
api_url,
+ heartbeat_commit_interval_secs: config.process.heartbeat_commit_interval_secs,
};
// Create task manager
diff --git a/makima/src/daemon/config.rs b/makima/src/daemon/config.rs
index 512b822..476b57e 100644
--- a/makima/src/daemon/config.rs
+++ b/makima/src/daemon/config.rs
@@ -213,12 +213,21 @@ pub struct ProcessConfig {
/// Bubblewrap sandbox configuration.
#[serde(default)]
pub bubblewrap: BubblewrapConfig,
+
+ /// Interval in seconds between heartbeat commits (WIP checkpoints).
+ /// Set to 0 to disable. Default: 300 (5 minutes).
+ #[serde(default = "default_heartbeat_commit_interval", alias = "heartbeatcommitintervalsecs")]
+ pub heartbeat_commit_interval_secs: u64,
}
fn default_claude_command() -> String {
"claude".to_string()
}
+fn default_heartbeat_commit_interval() -> u64 {
+ 300 // 5 minutes
+}
+
fn default_max_tasks() -> u32 {
4
}
@@ -235,6 +244,7 @@ impl Default for ProcessConfig {
default_timeout_secs: 0,
env_vars: HashMap::new(),
bubblewrap: BubblewrapConfig::default(),
+ heartbeat_commit_interval_secs: default_heartbeat_commit_interval(),
}
}
}
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs
index fccebc5..c3ccfa4 100644
--- a/makima/src/daemon/task/manager.rs
+++ b/makima/src/daemon/task/manager.rs
@@ -980,6 +980,9 @@ pub struct TaskConfig {
pub bubblewrap: Option<crate::daemon::config::BubblewrapConfig>,
/// API URL for spawned tasks (HTTP endpoint for makima CLI).
pub api_url: String,
+ /// Interval in seconds between heartbeat commits (WIP checkpoints).
+ /// Set to 0 to disable. Default: 300 (5 minutes).
+ pub heartbeat_commit_interval_secs: u64,
}
impl Default for TaskConfig {
@@ -995,6 +998,7 @@ impl Default for TaskConfig {
disable_verbose: false,
bubblewrap: None,
api_url: "https://api.makima.jp".to_string(),
+ heartbeat_commit_interval_secs: 300, // 5 minutes
}
}
}
@@ -1587,6 +1591,7 @@ impl TaskManager {
git_user_email: self.git_user_email.clone(),
git_user_name: self.git_user_name.clone(),
api_url: self.config.api_url.clone(),
+ heartbeat_commit_interval_secs: self.config.heartbeat_commit_interval_secs,
}
}
@@ -2882,6 +2887,7 @@ struct TaskManagerInner {
git_user_email: Arc<RwLock<Option<String>>>,
git_user_name: Arc<RwLock<Option<String>>>,
api_url: String,
+ heartbeat_commit_interval_secs: u64,
}
impl TaskManagerInner {
@@ -3526,6 +3532,17 @@ impl TaskManagerInner {
startup_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let startup_deadline = tokio::time::Instant::now() + startup_timeout;
+ // Heartbeat commit interval (only active if configured and we have a git repo)
+ let heartbeat_enabled = self.heartbeat_commit_interval_secs > 0 && repo_source.is_some();
+ let mut heartbeat_interval = tokio::time::interval(
+ tokio::time::Duration::from_secs(
+ if heartbeat_enabled { self.heartbeat_commit_interval_secs } else { u64::MAX }
+ )
+ );
+ heartbeat_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
+ // Skip the first immediate tick
+ heartbeat_interval.tick().await;
+
loop {
tokio::select! {
maybe_line = process.next_output() => {
@@ -3645,6 +3662,23 @@ impl TaskManagerInner {
}
}
}
+ _ = heartbeat_interval.tick(), if heartbeat_enabled => {
+ // Create periodic heartbeat commit to preserve work-in-progress
+ match self.create_heartbeat_commit(task_id, &working_dir).await {
+ Ok(sha) => {
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("[Heartbeat] Created WIP checkpoint: {}\n", &sha[..8]),
+ false,
+ );
+ let _ = ws_tx.send(msg).await;
+ }
+ Err(e) => {
+ // No changes to commit or git error - this is fine, just log at debug level
+ tracing::debug!(task_id = %task_id, error = %e, "Heartbeat commit skipped");
+ }
+ }
+ }
}
}
@@ -4115,6 +4149,79 @@ impl TaskManagerInner {
}
}
}
+
+ /// Create a heartbeat commit with all uncommitted changes (WIP checkpoint).
+ /// Returns the commit SHA on success, or an error message if nothing to commit.
+ async fn create_heartbeat_commit(
+ &self,
+ task_id: Uuid,
+ worktree_path: &std::path::Path,
+ ) -> Result<String, String> {
+ // 1. Check for uncommitted changes using git status --porcelain
+ let status_output = tokio::process::Command::new("git")
+ .current_dir(worktree_path)
+ .args(["status", "--porcelain"])
+ .output()
+ .await
+ .map_err(|e| format!("Failed to run git status: {}", e))?;
+
+ if !status_output.status.success() {
+ let stderr = String::from_utf8_lossy(&status_output.stderr);
+ return Err(format!("git status failed: {}", stderr));
+ }
+
+ let status = String::from_utf8_lossy(&status_output.stdout);
+ if status.trim().is_empty() {
+ return Err("No changes to commit".into());
+ }
+
+ // 2. Stage all changes
+ let add_output = tokio::process::Command::new("git")
+ .current_dir(worktree_path)
+ .args(["add", "-A"])
+ .output()
+ .await
+ .map_err(|e| format!("Failed to run git add: {}", e))?;
+
+ if !add_output.status.success() {
+ let stderr = String::from_utf8_lossy(&add_output.stderr);
+ return Err(format!("git add failed: {}", stderr));
+ }
+
+ // 3. Create WIP commit with timestamp
+ let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC");
+ let commit_msg = format!("[WIP] Heartbeat checkpoint - {}", timestamp);
+
+ let commit_output = tokio::process::Command::new("git")
+ .current_dir(worktree_path)
+ .args(["commit", "-m", &commit_msg])
+ .output()
+ .await
+ .map_err(|e| format!("Failed to run git commit: {}", e))?;
+
+ if !commit_output.status.success() {
+ let stderr = String::from_utf8_lossy(&commit_output.stderr);
+ return Err(format!("git commit failed: {}", stderr));
+ }
+
+ // 4. Get the commit SHA
+ let sha_output = tokio::process::Command::new("git")
+ .current_dir(worktree_path)
+ .args(["rev-parse", "HEAD"])
+ .output()
+ .await
+ .map_err(|e| format!("Failed to run git rev-parse: {}", e))?;
+
+ if !sha_output.status.success() {
+ let stderr = String::from_utf8_lossy(&sha_output.stderr);
+ return Err(format!("git rev-parse failed: {}", stderr));
+ }
+
+ let sha = String::from_utf8_lossy(&sha_output.stdout).trim().to_string();
+ tracing::info!(task_id = %task_id, sha = %sha, "Created heartbeat commit");
+
+ Ok(sha)
+ }
}
impl Clone for TaskManagerInner {
@@ -4130,6 +4237,7 @@ impl Clone for TaskManagerInner {
git_user_email: self.git_user_email.clone(),
git_user_name: self.git_user_name.clone(),
api_url: self.api_url.clone(),
+ heartbeat_commit_interval_secs: self.heartbeat_commit_interval_secs,
}
}
}
diff --git a/makima/src/server/handlers/contracts.rs b/makima/src/server/handlers/contracts.rs
index afca3d7..684ab2b 100644
--- a/makima/src/server/handlers/contracts.rs
+++ b/makima/src/server/handlers/contracts.rs
@@ -340,6 +340,22 @@ pub async fn create_contract(
);
}
+ // Record history event for contract creation
+ let _ = repository::record_history_event(
+ pool,
+ auth.owner_id,
+ Some(contract.id),
+ None,
+ "contract",
+ Some("created"),
+ Some(&contract.phase),
+ serde_json::json!({
+ "name": &contract.name,
+ "type": &contract.contract_type,
+ "description": &contract.description,
+ }),
+ ).await;
+
// Get the summary version with counts
match repository::get_contract_summary_for_owner(pool, contract.id, auth.owner_id).await
{
@@ -474,6 +490,21 @@ pub async fn update_contract(
tokio::spawn(async move {
cleanup_contract_worktrees(&pool_clone, &state_clone, contract_id).await;
});
+
+ // Record history event for contract completion
+ let _ = repository::record_history_event(
+ pool,
+ auth.owner_id,
+ Some(contract.id),
+ None,
+ "contract",
+ Some("completed"),
+ Some(&contract.phase),
+ serde_json::json!({
+ "name": &contract.name,
+ "status": &contract.status,
+ }),
+ ).await;
}
// Get summary with counts
@@ -1255,6 +1286,21 @@ pub async fn change_phase(
}
}
+ // Record history event for phase change
+ let _ = repository::record_history_event(
+ pool,
+ auth.owner_id,
+ Some(contract.id),
+ None,
+ "phase",
+ Some("changed"),
+ Some(&contract.phase),
+ serde_json::json!({
+ "contractName": &contract.name,
+ "newPhase": &contract.phase,
+ }),
+ ).await;
+
// Get summary with counts
match repository::get_contract_summary_for_owner(pool, contract.id, auth.owner_id).await
{
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs
index cdda3fd..5a08a49 100644
--- a/makima/src/server/handlers/mesh.rs
+++ b/makima/src/server/handlers/mesh.rs
@@ -216,6 +216,21 @@ pub async fn create_task(
match repository::create_task_for_owner(pool, auth.owner_id, req).await {
Ok(task) => {
+ // Record history event for task creation
+ let _ = repository::record_history_event(
+ pool,
+ auth.owner_id,
+ task.contract_id,
+ Some(task.id),
+ "task",
+ Some("created"),
+ None,
+ serde_json::json!({
+ "name": &task.name,
+ "isSupervisor": task.is_supervisor,
+ }),
+ ).await;
+
// Notify supervisor of new task creation if task belongs to a contract
if let Some(contract_id) = task.contract_id {
if !task.is_supervisor {
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index beb676e..22a2792 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -753,10 +753,82 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
task_id,
owner_id: Some(owner_id),
version: updated_task.version,
- status: new_status_owned,
+ status: new_status_owned.clone(),
updated_fields: vec!["status".into()],
updated_by: "daemon".into(),
});
+
+ // Initialize supervisor_state when supervisor task starts running
+ if updated_task.is_supervisor && new_status_owned == "running" {
+ if let Some(contract_id) = updated_task.contract_id {
+ // Get contract to get its phase
+ match repository::get_contract_for_owner(
+ &pool,
+ contract_id,
+ updated_task.owner_id,
+ ).await {
+ Ok(Some(contract)) => {
+ match repository::upsert_supervisor_state(
+ &pool,
+ contract_id,
+ task_id,
+ serde_json::json!([]), // Empty conversation
+ &[], // No pending tasks
+ &contract.phase,
+ ).await {
+ Ok(_) => {
+ tracing::info!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ phase = %contract.phase,
+ "Initialized supervisor state for running supervisor"
+ );
+ }
+ Err(e) => {
+ tracing::warn!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ error = %e,
+ "Failed to initialize supervisor state"
+ );
+ }
+ }
+ }
+ Ok(None) => {
+ tracing::warn!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ "Contract not found when initializing supervisor state"
+ );
+ }
+ Err(e) => {
+ tracing::warn!(
+ task_id = %task_id,
+ contract_id = %contract_id,
+ error = %e,
+ "Failed to get contract for supervisor state"
+ );
+ }
+ }
+ }
+ }
+
+ // Record history event when task starts running
+ if new_status_owned == "running" {
+ let _ = repository::record_history_event(
+ &pool,
+ updated_task.owner_id,
+ updated_task.contract_id,
+ Some(task_id),
+ "task",
+ Some("started"),
+ None,
+ serde_json::json!({
+ "name": &updated_task.name,
+ "isSupervisor": updated_task.is_supervisor,
+ }),
+ ).await;
+ }
}
Ok(None) => {
tracing::warn!(
@@ -850,6 +922,23 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
}
}
}
+
+ // Record history event for task completion
+ let subtype = if updated_task.status == "done" { "completed" } else { "failed" };
+ let _ = repository::record_history_event(
+ &pool,
+ updated_task.owner_id,
+ updated_task.contract_id,
+ Some(task_id),
+ "task",
+ Some(subtype),
+ None,
+ serde_json::json!({
+ "name": &updated_task.name,
+ "status": &updated_task.status,
+ "error": &updated_task.error_message,
+ }),
+ ).await;
}
Ok(None) => {
tracing::warn!(
@@ -1225,6 +1314,28 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
duration_ms: None,
is_partial: false,
});
+
+ // Record history event for checkpoint
+ // Get task to get contract_id
+ if let Ok(Some(task)) = repository::get_task(pool, task_id).await {
+ let _ = repository::record_history_event(
+ pool,
+ task.owner_id,
+ task.contract_id,
+ Some(task_id),
+ "checkpoint",
+ Some("created"),
+ None,
+ serde_json::json!({
+ "checkpointNumber": checkpoint.checkpoint_number,
+ "commitSha": &sha,
+ "message": &message,
+ "filesChanged": files_changed,
+ "linesAdded": lines_added,
+ "linesRemoved": lines_removed,
+ }),
+ ).await;
+ }
}
Err(e) => {
tracing::error!(error = %e, "Failed to store checkpoint in database");
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index 754d086..29eef81 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -576,6 +576,21 @@ pub async fn spawn_task(
"Supervisor spawned new task"
);
+ // Record history event for task spawned by supervisor
+ let _ = repository::record_history_event(
+ pool,
+ owner_id,
+ task.contract_id,
+ Some(task.id),
+ "task",
+ Some("spawned"),
+ None,
+ serde_json::json!({
+ "name": &task.name,
+ "spawnedBy": supervisor_id.to_string(),
+ }),
+ ).await;
+
// Start task on a daemon
// Find a daemon that belongs to this owner
let mut updated_task = task;