summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-22 22:32:46 +0000
committersoryu <soryu@soryu.co>2026-01-23 01:03:04 +0000
commit1ed362424dafec690f919154f5116471951cda9c (patch)
tree19c7ca9231887394a791223fe32a8ad335a687a8
parent265f8cf14fec9d7116d09af49e4b48b357faceda (diff)
downloadsoryu-1ed362424dafec690f919154f5116471951cda9c.tar.gz
soryu-1ed362424dafec690f919154f5116471951cda9c.zip
Add patch checkpointing
-rw-r--r--Cargo.lock2
-rw-r--r--makima/Cargo.toml6
-rw-r--r--makima/migrations/20250122000001_checkpoint_patches.sql27
-rw-r--r--makima/src/bin/makima.rs1
-rw-r--r--makima/src/daemon/config.rs44
-rw-r--r--makima/src/daemon/error.rs3
-rw-r--r--makima/src/daemon/mod.rs1
-rw-r--r--makima/src/daemon/storage/mod.rs8
-rw-r--r--makima/src/daemon/storage/patch.rs293
-rw-r--r--makima/src/daemon/task/manager.rs232
-rw-r--r--makima/src/daemon/worktree/manager.rs135
-rw-r--r--makima/src/daemon/ws/protocol.rs16
-rw-r--r--makima/src/db/models.rs42
-rw-r--r--makima/src/db/repository.rs113
-rw-r--r--makima/src/server/handlers/contract_chat.rs3
-rw-r--r--makima/src/server/handlers/mesh.rs18
-rw-r--r--makima/src/server/handlers/mesh_chat.rs3
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs59
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs33
-rw-r--r--makima/src/server/mod.rs43
-rw-r--r--makima/src/server/state.rs9
21 files changed, 1075 insertions, 16 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 1e1be6c..1aeb184 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2007,6 +2007,7 @@ dependencies = [
"crossterm",
"dashmap",
"dirs 5.0.1",
+ "flate2",
"futures",
"fuzzy-matcher",
"hex",
@@ -2035,6 +2036,7 @@ dependencies = [
"shell-escape",
"sqlx",
"symphonia",
+ "tempfile",
"thiserror 2.0.17",
"tokenizers 0.21.4",
"tokio",
diff --git a/makima/Cargo.toml b/makima/Cargo.toml
index 650628a..950c123 100644
--- a/makima/Cargo.toml
+++ b/makima/Cargo.toml
@@ -92,3 +92,9 @@ ahash = "0.8"
# TUI
ratatui = "0.29"
crossterm = "0.28"
+
+# Compression
+flate2 = "1.0"
+
+[dev-dependencies]
+tempfile = "3.10"
diff --git a/makima/migrations/20250122000001_checkpoint_patches.sql b/makima/migrations/20250122000001_checkpoint_patches.sql
new file mode 100644
index 0000000..19da66e
--- /dev/null
+++ b/makima/migrations/20250122000001_checkpoint_patches.sql
@@ -0,0 +1,27 @@
+-- Checkpoint patches table for storing git diffs to enable task recovery
+-- When a local worktree is deleted/corrupted, the patch can be used to restore state
+
+CREATE TABLE IF NOT EXISTS checkpoint_patches (
+ id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
+ task_id UUID NOT NULL REFERENCES tasks(id) ON DELETE CASCADE,
+ checkpoint_id UUID REFERENCES task_checkpoints(id) ON DELETE CASCADE,
+ base_commit_sha VARCHAR(40) NOT NULL, -- Commit to apply patch on top of
+ patch_data BYTEA NOT NULL, -- Compressed git diff (gzip)
+ patch_size_bytes INTEGER NOT NULL,
+ files_count INTEGER NOT NULL,
+ created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
+ expires_at TIMESTAMPTZ NOT NULL, -- TTL for auto-cleanup
+ CONSTRAINT patch_size_limit CHECK (patch_size_bytes <= 10485760) -- 10MB limit
+);
+
+CREATE INDEX idx_checkpoint_patches_task ON checkpoint_patches(task_id);
+CREATE INDEX idx_checkpoint_patches_expires ON checkpoint_patches(expires_at);
+
+-- Link checkpoints to their patches
+ALTER TABLE task_checkpoints
+ADD COLUMN IF NOT EXISTS patch_id UUID REFERENCES checkpoint_patches(id) ON DELETE SET NULL;
+
+COMMENT ON TABLE checkpoint_patches IS 'Stores compressed git diffs for task recovery when local worktree is lost';
+COMMENT ON COLUMN checkpoint_patches.base_commit_sha IS 'The commit SHA that the patch should be applied on top of';
+COMMENT ON COLUMN checkpoint_patches.patch_data IS 'Gzip-compressed git diff data';
+COMMENT ON COLUMN checkpoint_patches.expires_at IS 'Automatic cleanup time (TTL)';
diff --git a/makima/src/bin/makima.rs b/makima/src/bin/makima.rs
index cb80c7f..f0db69c 100644
--- a/makima/src/bin/makima.rs
+++ b/makima/src/bin/makima.rs
@@ -241,6 +241,7 @@ async fn run_daemon(
api_url,
api_key: config.server.api_key.clone(),
heartbeat_commit_interval_secs: config.process.heartbeat_commit_interval_secs,
+ checkpoint_patches: config.process.checkpoint_patches.clone(),
};
// Create task manager
diff --git a/makima/src/daemon/config.rs b/makima/src/daemon/config.rs
index 9a00166..b7cb1e8 100644
--- a/makima/src/daemon/config.rs
+++ b/makima/src/daemon/config.rs
@@ -223,6 +223,48 @@ pub struct ProcessConfig {
/// Set to 0 to disable. Default: 300 (5 minutes).
#[serde(default = "default_heartbeat_commit_interval", alias = "heartbeatcommitintervalsecs")]
pub heartbeat_commit_interval_secs: u64,
+
+ /// Checkpoint patch storage configuration for task recovery.
+ #[serde(default)]
+ pub checkpoint_patches: CheckpointPatchConfig,
+}
+
+/// Configuration for checkpoint patch storage in PostgreSQL.
+/// Patches are stored to enable task recovery when local worktrees are lost.
+#[derive(Debug, Clone, Deserialize)]
+#[serde(default)]
+pub struct CheckpointPatchConfig {
+ /// Enable patch storage in PostgreSQL (default: true).
+ #[serde(default = "default_true")]
+ pub enabled: bool,
+
+ /// Maximum patch size in bytes (default: 10MB).
+ /// Patches larger than this will not be stored.
+ #[serde(default = "default_max_patch_size", alias = "maxpatchsizebytes")]
+ pub max_patch_size_bytes: usize,
+
+ /// TTL for patches in hours (default: 168 = 7 days).
+ /// Patches older than this will be automatically cleaned up.
+ #[serde(default = "default_patch_ttl_hours", alias = "ttlhours")]
+ pub ttl_hours: u64,
+}
+
+fn default_max_patch_size() -> usize {
+ 10 * 1024 * 1024 // 10MB
+}
+
+fn default_patch_ttl_hours() -> u64 {
+ 168 // 7 days
+}
+
+impl Default for CheckpointPatchConfig {
+ fn default() -> Self {
+ Self {
+ enabled: true,
+ max_patch_size_bytes: default_max_patch_size(),
+ ttl_hours: default_patch_ttl_hours(),
+ }
+ }
}
fn default_claude_command() -> String {
@@ -255,6 +297,7 @@ impl Default for ProcessConfig {
env_vars: HashMap::new(),
bubblewrap: BubblewrapConfig::default(),
heartbeat_commit_interval_secs: default_heartbeat_commit_interval(),
+ checkpoint_patches: CheckpointPatchConfig::default(),
}
}
}
@@ -576,6 +619,7 @@ impl DaemonConfig {
env_vars: HashMap::new(),
bubblewrap: BubblewrapConfig::default(),
heartbeat_commit_interval_secs: 300,
+ checkpoint_patches: CheckpointPatchConfig::default(),
},
local_db: LocalDbConfig {
path: PathBuf::from("/tmp/makima-daemon-test/state.db"),
diff --git a/makima/src/daemon/error.rs b/makima/src/daemon/error.rs
index ea00d25..8d7fff0 100644
--- a/makima/src/daemon/error.rs
+++ b/makima/src/daemon/error.rs
@@ -30,6 +30,9 @@ pub enum DaemonError {
#[error("JSON error: {0}")]
Json(#[from] serde_json::Error),
+ #[error("Storage error: {0}")]
+ Storage(#[from] crate::daemon::storage::PatchError),
+
#[error("Authentication failed: {0}")]
AuthFailed(String),
diff --git a/makima/src/daemon/mod.rs b/makima/src/daemon/mod.rs
index 18b5e8a..f5793d6 100644
--- a/makima/src/daemon/mod.rs
+++ b/makima/src/daemon/mod.rs
@@ -14,6 +14,7 @@ pub mod db;
pub mod error;
pub mod process;
pub mod setup;
+pub mod storage;
pub mod task;
pub mod temp;
pub mod tui;
diff --git a/makima/src/daemon/storage/mod.rs b/makima/src/daemon/storage/mod.rs
new file mode 100644
index 0000000..cc5441a
--- /dev/null
+++ b/makima/src/daemon/storage/mod.rs
@@ -0,0 +1,8 @@
+//! Checkpoint storage for task recovery.
+//!
+//! This module provides functionality to store and restore git patches
+//! in PostgreSQL for recovering task worktrees when they are lost.
+
+mod patch;
+
+pub use patch::{create_patch, apply_patch, PatchError};
diff --git a/makima/src/daemon/storage/patch.rs b/makima/src/daemon/storage/patch.rs
new file mode 100644
index 0000000..45624b5
--- /dev/null
+++ b/makima/src/daemon/storage/patch.rs
@@ -0,0 +1,293 @@
+//! Git patch creation and application for checkpoint recovery.
+
+use flate2::read::GzDecoder;
+use flate2::write::GzEncoder;
+use flate2::Compression;
+use std::io::{Read, Write};
+use std::path::Path;
+use thiserror::Error;
+use tokio::process::Command;
+
+/// Errors that can occur during patch operations.
+#[derive(Error, Debug)]
+pub enum PatchError {
+ #[error("Git command failed: {0}")]
+ GitCommand(String),
+
+ #[error("Compression error: {0}")]
+ Compression(#[from] std::io::Error),
+
+ #[error("Patch too large: {size} bytes (max: {max} bytes)")]
+ TooLarge { size: usize, max: usize },
+
+ #[error("Empty patch (no changes)")]
+ EmptyPatch,
+
+ #[error("Failed to apply patch: {0}")]
+ ApplyFailed(String),
+}
+
+/// Create a compressed git diff from worktree changes.
+///
+/// Generates a diff between `base_sha` and HEAD, then compresses it with gzip.
+/// Returns the compressed patch bytes and the number of files changed.
+pub async fn create_patch(
+ worktree_path: &Path,
+ base_sha: &str,
+) -> Result<(Vec<u8>, usize), PatchError> {
+ // Get the diff between base commit and HEAD
+ let output = Command::new("git")
+ .current_dir(worktree_path)
+ .args(["diff", base_sha, "HEAD", "--binary"])
+ .output()
+ .await
+ .map_err(|e| PatchError::GitCommand(format!("Failed to run git diff: {}", e)))?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(PatchError::GitCommand(format!("git diff failed: {}", stderr)));
+ }
+
+ let diff_data = output.stdout;
+ if diff_data.is_empty() {
+ return Err(PatchError::EmptyPatch);
+ }
+
+ // Count files changed
+ let files_output = Command::new("git")
+ .current_dir(worktree_path)
+ .args(["diff", base_sha, "HEAD", "--name-only"])
+ .output()
+ .await
+ .map_err(|e| PatchError::GitCommand(format!("Failed to count files: {}", e)))?;
+
+ let files_count = if files_output.status.success() {
+ String::from_utf8_lossy(&files_output.stdout)
+ .lines()
+ .filter(|l| !l.is_empty())
+ .count()
+ } else {
+ 0
+ };
+
+ // Compress with gzip
+ let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
+ encoder.write_all(&diff_data)?;
+ let compressed = encoder.finish()?;
+
+ Ok((compressed, files_count))
+}
+
+/// Apply a compressed patch to restore worktree state.
+///
+/// The worktree should already be checked out at `base_sha` before calling this.
+pub async fn apply_patch(worktree_path: &Path, patch_data: &[u8]) -> Result<(), PatchError> {
+ // Decompress gzip
+ let mut decoder = GzDecoder::new(patch_data);
+ let mut decompressed = Vec::new();
+ decoder.read_to_end(&mut decompressed)?;
+
+ if decompressed.is_empty() {
+ return Err(PatchError::EmptyPatch);
+ }
+
+ // Apply the patch using git apply
+ let mut child = Command::new("git")
+ .current_dir(worktree_path)
+ .args(["apply", "--binary", "-"])
+ .stdin(std::process::Stdio::piped())
+ .stdout(std::process::Stdio::piped())
+ .stderr(std::process::Stdio::piped())
+ .spawn()
+ .map_err(|e| PatchError::GitCommand(format!("Failed to spawn git apply: {}", e)))?;
+
+ // Write patch to stdin
+ if let Some(mut stdin) = child.stdin.take() {
+ use tokio::io::AsyncWriteExt;
+ stdin.write_all(&decompressed).await?;
+ drop(stdin); // Close stdin to signal EOF
+ }
+
+ let output = child
+ .wait_with_output()
+ .await
+ .map_err(|e| PatchError::GitCommand(format!("Failed to wait for git apply: {}", e)))?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(PatchError::ApplyFailed(stderr.to_string()));
+ }
+
+ Ok(())
+}
+
+/// Get the parent commit SHA (HEAD~1) from a worktree.
+pub async fn get_parent_sha(worktree_path: &Path) -> Result<String, PatchError> {
+ let output = Command::new("git")
+ .current_dir(worktree_path)
+ .args(["rev-parse", "HEAD~1"])
+ .output()
+ .await
+ .map_err(|e| PatchError::GitCommand(format!("Failed to get parent SHA: {}", e)))?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(PatchError::GitCommand(format!(
+ "git rev-parse HEAD~1 failed: {}",
+ stderr
+ )));
+ }
+
+ Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
+}
+
+/// Checkout a specific commit in the worktree.
+pub async fn checkout_commit(worktree_path: &Path, sha: &str) -> Result<(), PatchError> {
+ let output = Command::new("git")
+ .current_dir(worktree_path)
+ .args(["checkout", sha])
+ .output()
+ .await
+ .map_err(|e| PatchError::GitCommand(format!("Failed to checkout: {}", e)))?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(PatchError::GitCommand(format!(
+ "git checkout {} failed: {}",
+ sha, stderr
+ )));
+ }
+
+ Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::fs;
+ use tempfile::TempDir;
+
+ async fn setup_test_repo() -> TempDir {
+ let dir = TempDir::new().unwrap();
+ let path = dir.path();
+
+ // Initialize git repo
+ Command::new("git")
+ .current_dir(path)
+ .args(["init"])
+ .output()
+ .await
+ .unwrap();
+
+ // Configure git user
+ Command::new("git")
+ .current_dir(path)
+ .args(["config", "user.email", "test@test.com"])
+ .output()
+ .await
+ .unwrap();
+ Command::new("git")
+ .current_dir(path)
+ .args(["config", "user.name", "Test"])
+ .output()
+ .await
+ .unwrap();
+
+ // Create initial commit
+ fs::write(path.join("file.txt"), "initial").unwrap();
+ Command::new("git")
+ .current_dir(path)
+ .args(["add", "."])
+ .output()
+ .await
+ .unwrap();
+ Command::new("git")
+ .current_dir(path)
+ .args(["commit", "-m", "initial"])
+ .output()
+ .await
+ .unwrap();
+
+ dir
+ }
+
+ #[tokio::test]
+ async fn test_create_and_apply_patch() {
+ let dir = setup_test_repo().await;
+ let path = dir.path();
+
+ // Get base SHA
+ let base_sha = get_parent_sha(path).await;
+ // This will fail since there's only one commit
+ assert!(base_sha.is_err());
+
+ // Make another commit first
+ fs::write(path.join("file.txt"), "modified").unwrap();
+ Command::new("git")
+ .current_dir(path)
+ .args(["add", "."])
+ .output()
+ .await
+ .unwrap();
+ Command::new("git")
+ .current_dir(path)
+ .args(["commit", "-m", "modified"])
+ .output()
+ .await
+ .unwrap();
+
+ // Now get the base SHA
+ let base_sha = get_parent_sha(path).await.unwrap();
+
+ // Create patch
+ let (patch_data, files_count) = create_patch(path, &base_sha).await.unwrap();
+ assert!(!patch_data.is_empty());
+ assert_eq!(files_count, 1);
+
+ // Reset to base and apply patch
+ checkout_commit(path, &base_sha).await.unwrap();
+ assert_eq!(fs::read_to_string(path.join("file.txt")).unwrap(), "initial");
+
+ apply_patch(path, &patch_data).await.unwrap();
+ assert_eq!(
+ fs::read_to_string(path.join("file.txt")).unwrap(),
+ "modified"
+ );
+ }
+
+ #[tokio::test]
+ async fn test_empty_patch() {
+ let dir = setup_test_repo().await;
+ let path = dir.path();
+
+ // Make another commit
+ fs::write(path.join("file.txt"), "modified").unwrap();
+ Command::new("git")
+ .current_dir(path)
+ .args(["add", "."])
+ .output()
+ .await
+ .unwrap();
+ Command::new("git")
+ .current_dir(path)
+ .args(["commit", "-m", "modified"])
+ .output()
+ .await
+ .unwrap();
+
+ // Get current HEAD
+ let head_output = Command::new("git")
+ .current_dir(path)
+ .args(["rev-parse", "HEAD"])
+ .output()
+ .await
+ .unwrap();
+ let head_sha = String::from_utf8_lossy(&head_output.stdout)
+ .trim()
+ .to_string();
+
+ // Try to create patch from HEAD to HEAD (no changes)
+ let result = create_patch(path, &head_sha).await;
+ assert!(matches!(result, Err(PatchError::EmptyPatch)));
+ }
+}
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs
index 0cba516..cb4bde2 100644
--- a/makima/src/daemon/task/manager.rs
+++ b/makima/src/daemon/task/manager.rs
@@ -5,6 +5,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
+use base64::Engine;
use rand::Rng;
use tokio::io::AsyncWriteExt;
use tokio::sync::{mpsc, RwLock};
@@ -14,8 +15,10 @@ use std::collections::HashSet;
use super::completion_gate::{CircuitBreaker, CompletionGate};
use super::state::TaskState;
+use crate::daemon::config::CheckpointPatchConfig;
use crate::daemon::error::{DaemonError, TaskError, TaskResult};
use crate::daemon::process::{ClaudeInputMessage, ProcessManager};
+use crate::daemon::storage;
use crate::daemon::temp::TempManager;
use crate::daemon::worktree::{is_new_repo_request, ConflictResolution, WorktreeInfo, WorktreeManager};
use crate::daemon::ws::{BranchInfo, DaemonCommand, DaemonMessage};
@@ -989,6 +992,8 @@ pub struct TaskConfig {
/// Interval in seconds between heartbeat commits (WIP checkpoints).
/// Set to 0 to disable. Default: 300 (5 minutes).
pub heartbeat_commit_interval_secs: u64,
+ /// Checkpoint patch storage configuration.
+ pub checkpoint_patches: CheckpointPatchConfig,
}
impl Default for TaskConfig {
@@ -1007,6 +1012,7 @@ impl Default for TaskConfig {
api_url: "https://api.makima.jp".to_string(),
api_key: String::new(),
heartbeat_commit_interval_secs: 300, // 5 minutes
+ checkpoint_patches: CheckpointPatchConfig::default(),
}
}
}
@@ -1405,6 +1411,8 @@ impl TaskManager {
autonomous_loop,
resume_session,
conversation_history,
+ patch_data,
+ patch_base_sha,
} => {
tracing::info!(
task_id = %task_id,
@@ -1431,7 +1439,7 @@ impl TaskManager {
parent_task_id, depth, is_orchestrator, is_supervisor,
target_repo_path, completion_action, continue_from_task_id,
copy_files, contract_id, autonomous_loop, resume_session,
- conversation_history,
+ conversation_history, patch_data, patch_base_sha,
).await?;
}
DaemonCommand::PauseTask { task_id } => {
@@ -1529,6 +1537,8 @@ impl TaskManager {
false, // autonomous_loop - supervisors don't use this
false, // resume_session - respawning from scratch
None, // conversation_history - not needed for fresh respawn
+ None, // patch_data - not available for respawn
+ None, // patch_base_sha - not available for respawn
).await {
tracing::error!(
task_id = %task_id,
@@ -1755,17 +1765,22 @@ impl TaskManager {
autonomous_loop: bool,
resume_session: bool,
conversation_history: Option<serde_json::Value>,
+ patch_data: Option<String>,
+ patch_base_sha: Option<String>,
) -> TaskResult<()> {
- tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, "=== SPAWN_TASK START ===");
+ tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, patch_available = patch_data.is_some(), "=== SPAWN_TASK START ===");
// Check if task already exists - allow re-spawning if in terminal state
+ // or if resuming a supervisor (supervisors stay in Running state after Claude exits)
{
let mut tasks = self.tasks.write().await;
if let Some(existing) = tasks.get(&task_id) {
- if existing.state.is_terminal() {
- // Task exists but is in terminal state (completed, failed, interrupted)
- // Remove it so we can re-spawn
- tracing::info!(task_id = %task_id, old_state = ?existing.state, "Removing terminated task to allow re-spawn");
+ let can_respawn = existing.state.is_terminal()
+ || (resume_session && existing.is_supervisor);
+
+ if can_respawn {
+ // Task exists but can be re-spawned (terminal state or supervisor resume)
+ tracing::info!(task_id = %task_id, old_state = ?existing.state, resume_session = resume_session, is_supervisor = existing.is_supervisor, "Removing task to allow re-spawn");
tasks.remove(&task_id);
} else {
// Task is still active, reject
@@ -1825,7 +1840,7 @@ impl TaskManager {
task_id, task_name, plan, repo_url, base_branch, target_branch,
is_orchestrator, is_supervisor, target_repo_path, completion_action,
continue_from_task_id, copy_files, contract_id, autonomous_loop, resume_session,
- conversation_history,
+ conversation_history, patch_data, patch_base_sha,
).await {
tracing::error!(task_id = %task_id, error = %e, "Task execution failed");
inner.mark_failed(task_id, &e.to_string()).await;
@@ -1855,6 +1870,7 @@ impl TaskManager {
api_url: self.config.api_url.clone(),
heartbeat_commit_interval_secs: self.config.heartbeat_commit_interval_secs,
contract_task_counts: self.contract_task_counts.clone(),
+ checkpoint_patches: self.config.checkpoint_patches.clone(),
}
}
@@ -2824,6 +2840,9 @@ impl TaskManager {
lines_removed: None,
error: Some(format!("Task {} not found or has no worktree", task_id)),
message,
+ patch_data: None,
+ patch_base_sha: None,
+ patch_files_count: None,
};
let _ = self.ws_tx.send(msg).await;
return Ok(());
@@ -2854,6 +2873,9 @@ impl TaskManager {
lines_removed: None,
error: Some("No changes to checkpoint".to_string()),
message,
+ patch_data: None,
+ patch_base_sha: None,
+ patch_files_count: None,
};
let _ = self.ws_tx.send(msg).await;
return Ok(());
@@ -2878,6 +2900,9 @@ impl TaskManager {
lines_removed: None,
error: Some(format!("Failed to stage changes: {}", e)),
message,
+ patch_data: None,
+ patch_base_sha: None,
+ patch_files_count: None,
};
let _ = self.ws_tx.send(msg).await;
return Ok(());
@@ -2920,6 +2945,9 @@ impl TaskManager {
lines_removed: Some(lines_removed),
error: Some(format!("Commit failed: {}", stderr)),
message,
+ patch_data: None,
+ patch_base_sha: None,
+ patch_files_count: None,
};
let _ = self.ws_tx.send(msg).await;
return Ok(());
@@ -2936,6 +2964,9 @@ impl TaskManager {
lines_removed: None,
error: Some(format!("Failed to execute git commit: {}", e)),
message,
+ patch_data: None,
+ patch_base_sha: None,
+ patch_files_count: None,
};
let _ = self.ws_tx.send(msg).await;
return Ok(());
@@ -2943,6 +2974,7 @@ impl TaskManager {
};
// Success - send response (checkpoint_number will be assigned by server on DB insert)
+ // Note: Manual checkpoints don't include patches (only heartbeat commits do)
let msg = DaemonMessage::CheckpointCreated {
task_id,
success: true,
@@ -2954,6 +2986,9 @@ impl TaskManager {
lines_removed: Some(lines_removed),
error: None,
message,
+ patch_data: None,
+ patch_base_sha: None,
+ patch_files_count: None,
};
let _ = self.ws_tx.send(msg).await;
Ok(())
@@ -3153,6 +3188,8 @@ struct TaskManagerInner {
heartbeat_commit_interval_secs: u64,
/// Shared contract task counts for releasing concurrency slots.
contract_task_counts: Arc<RwLock<HashMap<Uuid, usize>>>,
+ /// Checkpoint patch storage configuration.
+ checkpoint_patches: CheckpointPatchConfig,
}
impl TaskManagerInner {
@@ -3193,8 +3230,10 @@ impl TaskManagerInner {
autonomous_loop: bool,
resume_session: bool,
conversation_history: Option<serde_json::Value>,
+ patch_data: Option<String>,
+ patch_base_sha: Option<String>,
) -> Result<(), DaemonError> {
- tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, resume_session = resume_session, "=== RUN_TASK START ===");
+ tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, resume_session = resume_session, has_patch = patch_data.is_some(), "=== RUN_TASK START ===");
// If resuming session, try to find existing worktree first
let existing_worktree = if resume_session {
@@ -3212,8 +3251,83 @@ impl TaskManagerInner {
None
};
+ // Try to restore from patch if worktree is missing but we have patch data
+ let restored_from_patch = if existing_worktree.is_none() {
+ if let (Some(patch_str), Some(base_sha), Some(source)) = (&patch_data, &patch_base_sha, &repo_source) {
+ tracing::info!(
+ task_id = %task_id,
+ base_sha = %base_sha,
+ patch_len = patch_str.len(),
+ "Attempting to restore worktree from patch"
+ );
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("Restoring worktree from checkpoint patch...\n"),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ // Decode base64 patch data
+ match base64::Engine::decode(&base64::engine::general_purpose::STANDARD, patch_str) {
+ Ok(patch_bytes) => {
+ match self.worktree_manager.restore_from_patch(
+ source,
+ task_id,
+ &task_name,
+ base_sha,
+ &patch_bytes,
+ ).await {
+ Ok(worktree_info) => {
+ tracing::info!(
+ task_id = %task_id,
+ path = %worktree_info.path.display(),
+ "Successfully restored worktree from patch"
+ );
+
+ // Store worktree info
+ {
+ let mut tasks = self.tasks.write().await;
+ if let Some(task) = tasks.get_mut(&task_id) {
+ task.worktree = Some(worktree_info.clone());
+ }
+ }
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("Worktree restored at {}\n", worktree_info.path.display()),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ Some(worktree_info.path)
+ }
+ Err(e) => {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to restore from patch, will clone fresh");
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("Warning: Failed to restore from patch ({}), starting fresh\n", e),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+ None
+ }
+ }
+ }
+ Err(e) => {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to decode patch data");
+ None
+ }
+ }
+ } else {
+ None
+ }
+ } else {
+ None
+ };
+
// Determine working directory
- let has_existing_worktree = existing_worktree.is_some();
+ let has_existing_worktree = existing_worktree.is_some() || restored_from_patch.is_some();
let working_dir = if let Some(existing) = existing_worktree {
// Reuse existing worktree for session resume
let msg = DaemonMessage::task_output(
@@ -3223,6 +3337,9 @@ impl TaskManagerInner {
);
let _ = self.ws_tx.send(msg).await;
existing
+ } else if let Some(restored_path) = restored_from_patch {
+ // Already restored from patch above
+ restored_path
} else if let Some(ref source) = repo_source {
if is_new_repo_request(source) {
// Explicit new repo request: new:// or new://project-name
@@ -4523,12 +4640,24 @@ impl TaskManagerInner {
/// Create a heartbeat commit with all uncommitted changes (WIP checkpoint).
/// Returns (commit SHA, push succeeded) on success, or an error message if nothing to commit.
+ /// Also creates a patch and sends it to the server for recovery purposes.
async fn create_heartbeat_commit(
&self,
task_id: Uuid,
worktree_path: &std::path::Path,
) -> Result<(String, bool), String> {
- // 1. Check for uncommitted changes using git status --porcelain
+ // 1. Get parent SHA BEFORE committing (for patch creation)
+ let parent_sha_output = tokio::process::Command::new("git")
+ .current_dir(worktree_path)
+ .args(["rev-parse", "HEAD"])
+ .output()
+ .await;
+ let parent_sha = parent_sha_output
+ .ok()
+ .filter(|o| o.status.success())
+ .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string());
+
+ // 2. Check for uncommitted changes using git status --porcelain
let status_output = tokio::process::Command::new("git")
.current_dir(worktree_path)
.args(["status", "--porcelain"])
@@ -4546,7 +4675,7 @@ impl TaskManagerInner {
return Err("No changes to commit".into());
}
- // 2. Stage all changes
+ // 3. Stage all changes
let add_output = tokio::process::Command::new("git")
.current_dir(worktree_path)
.args(["add", "-A"])
@@ -4559,7 +4688,7 @@ impl TaskManagerInner {
return Err(format!("git add failed: {}", stderr));
}
- // 3. Create WIP commit with timestamp
+ // 4. Create WIP commit with timestamp
let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC");
let commit_msg = format!("[WIP] Heartbeat checkpoint - {}", timestamp);
@@ -4575,7 +4704,7 @@ impl TaskManagerInner {
return Err(format!("git commit failed: {}", stderr));
}
- // 4. Get the commit SHA
+ // 5. Get the commit SHA
let sha_output = tokio::process::Command::new("git")
.current_dir(worktree_path)
.args(["rev-parse", "HEAD"])
@@ -4591,7 +4720,19 @@ impl TaskManagerInner {
let sha = String::from_utf8_lossy(&sha_output.stdout).trim().to_string();
tracing::info!(task_id = %task_id, sha = %sha, "Created heartbeat commit");
- // 5. Push to remote (best effort - don't fail if push fails)
+ // 6. Get current branch name
+ let branch_output = tokio::process::Command::new("git")
+ .current_dir(worktree_path)
+ .args(["branch", "--show-current"])
+ .output()
+ .await;
+ let branch_name = branch_output
+ .ok()
+ .filter(|o| o.status.success())
+ .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string())
+ .unwrap_or_else(|| "unknown".to_string());
+
+ // 7. Push to remote (best effort - don't fail if push fails)
let push_output = tokio::process::Command::new("git")
.current_dir(worktree_path)
.args(["push"])
@@ -4614,6 +4755,68 @@ impl TaskManagerInner {
}
};
+ // 8. Create patch and send CheckpointCreated message to server
+ let mut patch_data: Option<String> = None;
+ let mut patch_base_sha: Option<String> = None;
+ let mut patch_files_count: Option<i32> = None;
+
+ if self.checkpoint_patches.enabled {
+ if let Some(ref base_sha) = parent_sha {
+ match storage::create_patch(worktree_path, base_sha).await {
+ Ok((compressed_patch, files_count)) => {
+ // Check size limit
+ if compressed_patch.len() <= self.checkpoint_patches.max_patch_size_bytes {
+ // Encode as base64 for JSON transport
+ patch_data = Some(base64::engine::general_purpose::STANDARD.encode(&compressed_patch));
+ patch_base_sha = Some(base_sha.clone());
+ patch_files_count = Some(files_count as i32);
+ tracing::debug!(
+ task_id = %task_id,
+ sha = %sha,
+ patch_size = compressed_patch.len(),
+ files_count = files_count,
+ "Created checkpoint patch"
+ );
+ } else {
+ tracing::warn!(
+ task_id = %task_id,
+ sha = %sha,
+ patch_size = compressed_patch.len(),
+ max_size = self.checkpoint_patches.max_patch_size_bytes,
+ "Patch exceeds size limit, not including in checkpoint"
+ );
+ }
+ }
+ Err(e) => {
+ tracing::warn!(
+ task_id = %task_id,
+ sha = %sha,
+ error = %e,
+ "Failed to create patch for heartbeat commit"
+ );
+ }
+ }
+ }
+ }
+
+ // Send CheckpointCreated message to server (so it stores the checkpoint and patch)
+ let msg = DaemonMessage::CheckpointCreated {
+ task_id,
+ success: true,
+ commit_sha: Some(sha.clone()),
+ branch_name: Some(branch_name),
+ checkpoint_number: None, // Server will assign
+ files_changed: None, // Could get from git diff --name-status if needed
+ lines_added: None,
+ lines_removed: None,
+ error: None,
+ message: commit_msg,
+ patch_data,
+ patch_base_sha,
+ patch_files_count,
+ };
+ let _ = self.ws_tx.send(msg).await;
+
Ok((sha, pushed))
}
}
@@ -4633,6 +4836,7 @@ impl Clone for TaskManagerInner {
api_url: self.api_url.clone(),
heartbeat_commit_interval_secs: self.heartbeat_commit_interval_secs,
contract_task_counts: self.contract_task_counts.clone(),
+ checkpoint_patches: self.checkpoint_patches.clone(),
}
}
}
diff --git a/makima/src/daemon/worktree/manager.rs b/makima/src/daemon/worktree/manager.rs
index 5edd7b1..04cb307 100644
--- a/makima/src/daemon/worktree/manager.rs
+++ b/makima/src/daemon/worktree/manager.rs
@@ -1697,6 +1697,141 @@ impl WorktreeManager {
pub async fn target_directory_exists(&self, target_dir: &Path) -> bool {
target_dir.exists()
}
+
+ /// Restore a worktree from a stored patch.
+ ///
+ /// This is used for task recovery when the local worktree has been lost.
+ /// 1. Clone/fetch the source repo to get the base commit
+ /// 2. Create a new worktree at the base commit
+ /// 3. Apply the patch to restore the task's state
+ pub async fn restore_from_patch(
+ &self,
+ source_repo: &str,
+ task_id: Uuid,
+ task_name: &str,
+ base_commit_sha: &str,
+ patch_data: &[u8],
+ ) -> Result<WorktreeInfo, WorktreeError> {
+ use crate::daemon::storage;
+
+ // Generate directory and branch names
+ let dir_name = format!("{}-{}", short_uuid(task_id), sanitize_name(task_name));
+ let worktree_path = self.base_dir.join(&dir_name);
+ let branch_name = format!(
+ "{}{}-{}",
+ self.branch_prefix,
+ sanitize_name(task_name),
+ short_uuid(task_id)
+ );
+
+ // Ensure base directory exists
+ tokio::fs::create_dir_all(&self.base_dir).await?;
+
+ // Remove existing worktree if present (we're restoring from scratch)
+ if worktree_path.exists() {
+ tracing::info!(
+ task_id = %task_id,
+ worktree_path = %worktree_path.display(),
+ "Removing existing worktree before restore"
+ );
+ tokio::fs::remove_dir_all(&worktree_path).await?;
+ }
+
+ // Clone the source repo if needed
+ let repo_path = self.ensure_repo(source_repo).await?;
+
+ // Create worktree at the base commit
+ // First, we need to make sure the base commit is available
+ let fetch_output = Command::new("git")
+ .args(["fetch", "--all"])
+ .current_dir(&repo_path)
+ .output()
+ .await?;
+
+ if !fetch_output.status.success() {
+ tracing::warn!(
+ task_id = %task_id,
+ stderr = %String::from_utf8_lossy(&fetch_output.stderr),
+ "git fetch failed, continuing anyway"
+ );
+ }
+
+ // Create the worktree from the base commit
+ let output = Command::new("git")
+ .args([
+ "worktree",
+ "add",
+ "-b",
+ &branch_name,
+ worktree_path.to_str().ok_or_else(|| {
+ WorktreeError::InvalidPath("Invalid worktree path".to_string())
+ })?,
+ base_commit_sha,
+ ])
+ .current_dir(&repo_path)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ // If branch already exists, try without -b flag
+ if stderr.contains("already exists") {
+ // Remove the branch and try again
+ let _ = Command::new("git")
+ .args(["branch", "-D", &branch_name])
+ .current_dir(&repo_path)
+ .output()
+ .await;
+
+ let retry_output = Command::new("git")
+ .args([
+ "worktree",
+ "add",
+ "-b",
+ &branch_name,
+ worktree_path.to_str().unwrap(),
+ base_commit_sha,
+ ])
+ .current_dir(&repo_path)
+ .output()
+ .await?;
+
+ if !retry_output.status.success() {
+ return Err(WorktreeError::GitCommand(format!(
+ "Failed to create worktree after retry: {}",
+ String::from_utf8_lossy(&retry_output.stderr)
+ )));
+ }
+ } else {
+ return Err(WorktreeError::GitCommand(format!(
+ "Failed to create worktree: {}",
+ stderr
+ )));
+ }
+ }
+
+ // Apply the patch to restore the task's state
+ if let Err(e) = storage::apply_patch(&worktree_path, patch_data).await {
+ tracing::error!(
+ task_id = %task_id,
+ error = %e,
+ "Failed to apply patch, worktree is at base commit"
+ );
+ // Don't fail - the worktree is usable, just at the base commit
+ } else {
+ tracing::info!(
+ task_id = %task_id,
+ worktree_path = %worktree_path.display(),
+ "Successfully restored worktree from patch"
+ );
+ }
+
+ Ok(WorktreeInfo {
+ path: worktree_path,
+ branch: branch_name,
+ source_repo: repo_path,
+ })
+ }
}
/// Check if repo_source is a "new repo" request.
diff --git a/makima/src/daemon/ws/protocol.rs b/makima/src/daemon/ws/protocol.rs
index 3b02b53..ec9b09e 100644
--- a/makima/src/daemon/ws/protocol.rs
+++ b/makima/src/daemon/ws/protocol.rs
@@ -278,6 +278,15 @@ pub enum DaemonMessage {
error: Option<String>,
/// User-provided checkpoint message
message: String,
+ /// Base64-encoded gzip-compressed patch data for recovery
+ #[serde(rename = "patchData", skip_serializing_if = "Option::is_none")]
+ patch_data: Option<String>,
+ /// Commit SHA to apply patch on top of (for recovery)
+ #[serde(rename = "patchBaseSha", skip_serializing_if = "Option::is_none")]
+ patch_base_sha: Option<String>,
+ /// Number of files in the patch
+ #[serde(rename = "patchFilesCount", skip_serializing_if = "Option::is_none")]
+ patch_files_count: Option<i32>,
},
/// Response to CleanupWorktree command.
@@ -387,6 +396,13 @@ pub enum DaemonCommand {
/// Used to inject previous conversation context into the prompt.
#[serde(rename = "conversationHistory", default)]
conversation_history: Option<serde_json::Value>,
+ /// Base64-encoded gzip-compressed patch for worktree recovery.
+ /// Used when resume_session=true and the local worktree is missing.
+ #[serde(rename = "patchData", default, skip_serializing_if = "Option::is_none")]
+ patch_data: Option<String>,
+ /// Commit SHA to apply the patch on top of.
+ #[serde(rename = "patchBaseSha", default, skip_serializing_if = "Option::is_none")]
+ patch_base_sha: Option<String>,
},
/// Pause a running task.
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs
index 6ede268..58f4da1 100644
--- a/makima/src/db/models.rs
+++ b/makima/src/db/models.rs
@@ -1966,3 +1966,45 @@ pub struct ForkPoint {
pub checkpoint: Option<TaskCheckpoint>,
pub timestamp: DateTime<Utc>,
}
+
+// ============================================================================
+// Checkpoint Patches (for task recovery when worktrees are lost)
+// ============================================================================
+
+/// A stored git patch for checkpoint recovery.
+/// Enables task recovery when local worktrees are deleted or corrupted.
+#[derive(Debug, Clone, FromRow, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct CheckpointPatch {
+ pub id: Uuid,
+ pub task_id: Uuid,
+ /// Optional link to a task_checkpoint record
+ pub checkpoint_id: Option<Uuid>,
+ /// The commit SHA that the patch should be applied on top of
+ pub base_commit_sha: String,
+ /// Compressed git diff data (gzip)
+ #[sqlx(rename = "patch_data")]
+ #[serde(skip)] // Don't serialize binary data to JSON
+ pub patch_data: Vec<u8>,
+ /// Size of the compressed patch in bytes
+ pub patch_size_bytes: i32,
+ /// Number of files affected by this patch
+ pub files_count: i32,
+ pub created_at: DateTime<Utc>,
+ /// When this patch expires and will be automatically deleted
+ pub expires_at: DateTime<Utc>,
+}
+
+/// Response for checkpoint patch (without binary data)
+#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct CheckpointPatchInfo {
+ pub id: Uuid,
+ pub task_id: Uuid,
+ pub checkpoint_id: Option<Uuid>,
+ pub base_commit_sha: String,
+ pub patch_size_bytes: i32,
+ pub files_count: i32,
+ pub created_at: DateTime<Utc>,
+ pub expires_at: DateTime<Utc>,
+}
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index 84afc8d..da44899 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -6,8 +6,9 @@ use sqlx::PgPool;
use uuid::Uuid;
use super::models::{
- Contract, ContractChatConversation, ContractChatMessageRecord, ContractEvent, ContractRepository,
- ContractSummary, ConversationMessage, ConversationSnapshot, CreateContractRequest, CreateFileRequest,
+ CheckpointPatch, CheckpointPatchInfo, Contract, ContractChatConversation,
+ ContractChatMessageRecord, ContractEvent, ContractRepository, ContractSummary,
+ ConversationMessage, ConversationSnapshot, CreateContractRequest, CreateFileRequest,
CreateTaskRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, File, FileSummary,
FileVersion, HistoryEvent, HistoryQueryFilters, MeshChatConversation, MeshChatMessageRecord,
SupervisorState, Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest,
@@ -3717,3 +3718,111 @@ pub async fn cleanup_stale_anonymous_tasks(
Ok(result.rows_affected() as i64)
}
+
+// ============================================================================
+// Checkpoint Patches (for task recovery)
+// ============================================================================
+
+/// Create a checkpoint patch for task recovery.
+pub async fn create_checkpoint_patch(
+ pool: &PgPool,
+ task_id: Uuid,
+ checkpoint_id: Option<Uuid>,
+ base_commit_sha: &str,
+ patch_data: &[u8],
+ files_count: i32,
+ ttl_hours: i64,
+) -> Result<CheckpointPatch, sqlx::Error> {
+ sqlx::query_as::<_, CheckpointPatch>(
+ r#"
+ INSERT INTO checkpoint_patches (
+ task_id, checkpoint_id, base_commit_sha, patch_data,
+ patch_size_bytes, files_count, expires_at
+ )
+ VALUES ($1, $2, $3, $4, $5, $6, NOW() + INTERVAL '1 hour' * $7)
+ RETURNING *
+ "#,
+ )
+ .bind(task_id)
+ .bind(checkpoint_id)
+ .bind(base_commit_sha)
+ .bind(patch_data)
+ .bind(patch_data.len() as i32)
+ .bind(files_count)
+ .bind(ttl_hours)
+ .fetch_one(pool)
+ .await
+}
+
+/// Get the latest checkpoint patch for a task.
+pub async fn get_latest_checkpoint_patch(
+ pool: &PgPool,
+ task_id: Uuid,
+) -> Result<Option<CheckpointPatch>, sqlx::Error> {
+ sqlx::query_as::<_, CheckpointPatch>(
+ r#"
+ SELECT * FROM checkpoint_patches
+ WHERE task_id = $1 AND expires_at > NOW()
+ ORDER BY created_at DESC
+ LIMIT 1
+ "#,
+ )
+ .bind(task_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Get a checkpoint patch by ID.
+pub async fn get_checkpoint_patch(
+ pool: &PgPool,
+ id: Uuid,
+) -> Result<Option<CheckpointPatch>, sqlx::Error> {
+ sqlx::query_as::<_, CheckpointPatch>(
+ "SELECT * FROM checkpoint_patches WHERE id = $1",
+ )
+ .bind(id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// List all checkpoint patches for a task (without patch data for efficiency).
+pub async fn list_checkpoint_patches(
+ pool: &PgPool,
+ task_id: Uuid,
+) -> Result<Vec<CheckpointPatchInfo>, sqlx::Error> {
+ sqlx::query_as::<_, CheckpointPatchInfo>(
+ r#"
+ SELECT id, task_id, checkpoint_id, base_commit_sha,
+ patch_size_bytes, files_count, created_at, expires_at
+ FROM checkpoint_patches
+ WHERE task_id = $1
+ ORDER BY created_at DESC
+ "#,
+ )
+ .bind(task_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Delete expired checkpoint patches.
+/// Returns the number of deleted patches.
+pub async fn cleanup_expired_checkpoint_patches(
+ pool: &PgPool,
+) -> Result<i64, sqlx::Error> {
+ let result = sqlx::query("DELETE FROM checkpoint_patches WHERE expires_at < NOW()")
+ .execute(pool)
+ .await?;
+ Ok(result.rows_affected() as i64)
+}
+
+/// Delete all checkpoint patches for a task.
+pub async fn delete_checkpoint_patches_for_task(
+ pool: &PgPool,
+ task_id: Uuid,
+) -> Result<i64, sqlx::Error> {
+ let result = sqlx::query("DELETE FROM checkpoint_patches WHERE task_id = $1")
+ .bind(task_id)
+ .execute(pool)
+ .await?;
+ Ok(result.rows_affected() as i64)
+}
diff --git a/makima/src/server/handlers/contract_chat.rs b/makima/src/server/handlers/contract_chat.rs
index c94538d..e2adb72 100644
--- a/makima/src/server/handlers/contract_chat.rs
+++ b/makima/src/server/handlers/contract_chat.rs
@@ -1593,8 +1593,11 @@ async fn handle_contract_request(
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
+ autonomous_loop: false,
resume_session: false,
conversation_history: None,
+ patch_data: None,
+ patch_base_sha: None,
};
if let Err(e) = command_sender.send(command).await {
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs
index 53e1587..240e1f7 100644
--- a/makima/src/server/handlers/mesh.rs
+++ b/makima/src/server/handlers/mesh.rs
@@ -685,8 +685,11 @@ pub async fn start_task(
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
+ autonomous_loop: false,
resume_session: false,
conversation_history: None,
+ patch_data: None,
+ patch_base_sha: None,
};
tracing::info!(
@@ -734,8 +737,11 @@ pub async fn start_task(
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
+ autonomous_loop: false,
resume_session: false,
conversation_history: None,
+ patch_data: None,
+ patch_base_sha: None,
};
if state.send_daemon_command(alt_daemon_id, alt_command).await.is_ok() {
@@ -1135,8 +1141,11 @@ pub async fn send_message(
copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: updated_task.contract_id,
is_supervisor: updated_task.is_supervisor,
+ autonomous_loop: false,
resume_session: false,
conversation_history: None,
+ patch_data: None,
+ patch_base_sha: None,
};
if state.send_daemon_command(new_daemon_id, spawn_cmd).await.is_ok() {
@@ -2273,8 +2282,11 @@ pub async fn reassign_task(
copy_files: None,
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
+ autonomous_loop: false,
resume_session: false,
conversation_history: None,
+ patch_data: None,
+ patch_base_sha: None,
};
tracing::info!(
@@ -2597,8 +2609,11 @@ pub async fn continue_task(
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
+ autonomous_loop: false,
resume_session: false,
conversation_history: None,
+ patch_data: None,
+ patch_base_sha: None,
};
tracing::info!(
@@ -3490,8 +3505,11 @@ pub async fn branch_task(
copy_files: None,
contract_id: None,
is_supervisor: false,
+ autonomous_loop: false,
resume_session: message_count > 0, // Resume if we have conversation history
conversation_history: updated_task.conversation_state.clone(),
+ patch_data: None,
+ patch_base_sha: None,
};
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
diff --git a/makima/src/server/handlers/mesh_chat.rs b/makima/src/server/handlers/mesh_chat.rs
index 8e134bd..1ff0724 100644
--- a/makima/src/server/handlers/mesh_chat.rs
+++ b/makima/src/server/handlers/mesh_chat.rs
@@ -1148,8 +1148,11 @@ async fn handle_mesh_request(
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
+ autonomous_loop: false,
resume_session: false,
conversation_history: None,
+ patch_data: None,
+ patch_base_sha: None,
};
match state.send_daemon_command(target_daemon_id, command).await {
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index 6262975..65db373 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -14,6 +14,7 @@ use axum::{
http::{HeaderMap, StatusCode},
response::{IntoResponse, Response},
};
+use base64::Engine;
use futures::{SinkExt, StreamExt};
use serde::Deserialize;
use sqlx::Row;
@@ -410,6 +411,15 @@ pub enum DaemonMessage {
error: Option<String>,
/// User-provided checkpoint message
message: String,
+ /// Base64-encoded gzip-compressed patch data for recovery
+ #[serde(rename = "patchData", skip_serializing_if = "Option::is_none")]
+ patch_data: Option<String>,
+ /// Commit SHA to apply patch on top of (for recovery)
+ #[serde(rename = "patchBaseSha", skip_serializing_if = "Option::is_none")]
+ patch_base_sha: Option<String>,
+ /// Number of files in the patch
+ #[serde(rename = "patchFilesCount", skip_serializing_if = "Option::is_none")]
+ patch_files_count: Option<i32>,
},
/// Notification that git config was inherited
GitConfigInherited {
@@ -1279,11 +1289,15 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
lines_removed,
error,
message,
+ patch_data,
+ patch_base_sha,
+ patch_files_count,
}) => {
tracing::info!(
task_id = %task_id,
success = success,
commit_sha = ?commit_sha,
+ has_patch = patch_data.is_some(),
"Checkpoint created notification received"
);
@@ -1309,6 +1323,50 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
"Checkpoint stored in database"
);
+ // Store patch if provided (for task recovery)
+ if let (Some(patch_b64), Some(base_sha)) = (&patch_data, &patch_base_sha) {
+ // Decode base64 patch data
+ match base64::engine::general_purpose::STANDARD.decode(patch_b64) {
+ Ok(patch_bytes) => {
+ let files_count = patch_files_count.unwrap_or(0);
+ // Default TTL: 7 days (168 hours)
+ let ttl_hours = 168i64;
+ match repository::create_checkpoint_patch(
+ pool,
+ task_id,
+ Some(checkpoint.id),
+ base_sha,
+ &patch_bytes,
+ files_count,
+ ttl_hours,
+ ).await {
+ Ok(patch) => {
+ tracing::info!(
+ task_id = %task_id,
+ patch_id = %patch.id,
+ patch_size = patch_bytes.len(),
+ "Checkpoint patch stored for recovery"
+ );
+ }
+ Err(e) => {
+ tracing::warn!(
+ task_id = %task_id,
+ error = %e,
+ "Failed to store checkpoint patch"
+ );
+ }
+ }
+ }
+ Err(e) => {
+ tracing::warn!(
+ task_id = %task_id,
+ error = %e,
+ "Failed to decode patch base64 data"
+ );
+ }
+ }
+ }
+
// Broadcast success as task output
state.broadcast_task_output(TaskOutputNotification {
task_id,
@@ -1346,6 +1404,7 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
"filesChanged": files_changed,
"linesAdded": lines_added,
"linesRemoved": lines_removed,
+ "hasPatch": patch_data.is_some(),
}),
).await;
}
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index 57f3f2f..21c9515 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -9,6 +9,7 @@ use axum::{
response::IntoResponse,
Json,
};
+use base64::Engine;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use uuid::Uuid;
@@ -364,8 +365,11 @@ async fn try_start_pending_task(
copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: updated_task.contract_id,
is_supervisor: false,
+ autonomous_loop: false,
resume_session: false,
conversation_history: None,
+ patch_data: None,
+ patch_base_sha: None,
};
if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
@@ -663,8 +667,11 @@ pub async fn spawn_task(
copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: updated_task.contract_id,
is_supervisor: false,
+ autonomous_loop: false,
resume_session: false,
conversation_history: None,
+ patch_data: None,
+ patch_base_sha: None,
};
if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
@@ -1992,6 +1999,29 @@ pub async fn resume_supervisor(
.into_response();
}
+ // Fetch latest checkpoint patch for worktree recovery
+ let (patch_data, patch_base_sha) = match repository::get_latest_checkpoint_patch(pool, supervisor_state.task_id).await {
+ Ok(Some(patch)) => {
+ tracing::info!(
+ task_id = %supervisor_state.task_id,
+ patch_size = patch.patch_size_bytes,
+ base_sha = %patch.base_commit_sha,
+ "Including checkpoint patch for worktree recovery"
+ );
+ // Encode patch as base64 for JSON transport
+ let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data);
+ (Some(encoded), Some(patch.base_commit_sha))
+ }
+ Ok(None) => {
+ tracing::debug!(task_id = %supervisor_state.task_id, "No checkpoint patch found");
+ (None, None)
+ }
+ Err(e) => {
+ tracing::warn!(task_id = %supervisor_state.task_id, error = %e, "Failed to fetch checkpoint patch");
+ (None, None)
+ }
+ };
+
// Send SpawnTask with resume_session=true to use Claude's --continue
// Include conversation_history as fallback if worktree doesn't exist on target daemon
let command = DaemonCommand::SpawnTask {
@@ -2010,8 +2040,11 @@ pub async fn resume_supervisor(
copy_files: supervisor_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: supervisor_task.contract_id,
is_supervisor: true,
+ autonomous_loop: false,
resume_session: true, // Use --continue to preserve conversation
conversation_history: Some(supervisor_state.conversation_history.clone()), // Fallback if worktree missing
+ patch_data,
+ patch_base_sha,
};
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs
index 0bc1b92..3a27513 100644
--- a/makima/src/server/mod.rs
+++ b/makima/src/server/mod.rs
@@ -248,6 +248,8 @@ const DAEMON_HEARTBEAT_TIMEOUT_SECS: i64 = 120;
const ANONYMOUS_TASK_CLEANUP_INTERVAL_SECS: u64 = 24 * 60 * 60;
/// Maximum age in days for anonymous tasks before cleanup
const ANONYMOUS_TASK_MAX_AGE_DAYS: i32 = 7;
+/// Interval for checkpoint patch cleanup (hourly)
+const CHECKPOINT_PATCH_CLEANUP_INTERVAL_SECS: u64 = 3600;
/// Run the HTTP server with graceful shutdown support.
///
@@ -344,6 +346,47 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> {
}
}
});
+
+ // Clone pool for checkpoint patch cleanup
+ let checkpoint_patch_cleanup_pool = pool.clone();
+
+ // Initial cleanup of any expired checkpoint patches
+ match crate::db::repository::cleanup_expired_checkpoint_patches(&pool).await {
+ Ok(deleted) if deleted > 0 => {
+ tracing::info!(
+ deleted = deleted,
+ "Cleaned up expired checkpoint patches on startup"
+ );
+ }
+ Err(e) => {
+ tracing::warn!(error = %e, "Failed to clean up expired checkpoint patches on startup");
+ }
+ _ => {}
+ }
+
+ // Spawn periodic checkpoint patch cleanup task (runs hourly)
+ tokio::spawn(async move {
+ let mut interval = tokio::time::interval(
+ std::time::Duration::from_secs(CHECKPOINT_PATCH_CLEANUP_INTERVAL_SECS)
+ );
+ loop {
+ interval.tick().await;
+ match crate::db::repository::cleanup_expired_checkpoint_patches(
+ &checkpoint_patch_cleanup_pool,
+ ).await {
+ Ok(deleted) if deleted > 0 => {
+ tracing::info!(
+ deleted = deleted,
+ "Cleaned up expired checkpoint patches"
+ );
+ }
+ Err(e) => {
+ tracing::warn!(error = %e, "Failed to clean up expired checkpoint patches");
+ }
+ _ => {}
+ }
+ }
+ });
}
let app = make_router(state);
diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs
index 28d65d0..5b75281 100644
--- a/makima/src/server/state.rs
+++ b/makima/src/server/state.rs
@@ -208,12 +208,21 @@ pub enum DaemonCommand {
/// Whether this task is a supervisor (long-running contract orchestrator)
#[serde(rename = "isSupervisor")]
is_supervisor: bool,
+ /// Whether to run in autonomous loop mode
+ #[serde(rename = "autonomousLoop", default)]
+ autonomous_loop: bool,
/// Whether to resume from a previous session using --continue flag
#[serde(rename = "resumeSession", default)]
resume_session: bool,
/// Conversation history for fallback when worktree doesn't exist
#[serde(rename = "conversationHistory", default)]
conversation_history: Option<serde_json::Value>,
+ /// Base64-encoded gzip-compressed patch for worktree recovery
+ #[serde(rename = "patchData", default, skip_serializing_if = "Option::is_none")]
+ patch_data: Option<String>,
+ /// Commit SHA to apply the patch on top of
+ #[serde(rename = "patchBaseSha", default, skip_serializing_if = "Option::is_none")]
+ patch_base_sha: Option<String>,
},
/// Pause a running task
PauseTask {