diff options
| author | soryu <soryu@soryu.co> | 2026-01-22 22:32:46 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-23 01:03:04 +0000 |
| commit | 1ed362424dafec690f919154f5116471951cda9c (patch) | |
| tree | 19c7ca9231887394a791223fe32a8ad335a687a8 | |
| parent | 265f8cf14fec9d7116d09af49e4b48b357faceda (diff) | |
| download | soryu-1ed362424dafec690f919154f5116471951cda9c.tar.gz soryu-1ed362424dafec690f919154f5116471951cda9c.zip | |
Add patch checkpointing
| -rw-r--r-- | Cargo.lock | 2 | ||||
| -rw-r--r-- | makima/Cargo.toml | 6 | ||||
| -rw-r--r-- | makima/migrations/20250122000001_checkpoint_patches.sql | 27 | ||||
| -rw-r--r-- | makima/src/bin/makima.rs | 1 | ||||
| -rw-r--r-- | makima/src/daemon/config.rs | 44 | ||||
| -rw-r--r-- | makima/src/daemon/error.rs | 3 | ||||
| -rw-r--r-- | makima/src/daemon/mod.rs | 1 | ||||
| -rw-r--r-- | makima/src/daemon/storage/mod.rs | 8 | ||||
| -rw-r--r-- | makima/src/daemon/storage/patch.rs | 293 | ||||
| -rw-r--r-- | makima/src/daemon/task/manager.rs | 232 | ||||
| -rw-r--r-- | makima/src/daemon/worktree/manager.rs | 135 | ||||
| -rw-r--r-- | makima/src/daemon/ws/protocol.rs | 16 | ||||
| -rw-r--r-- | makima/src/db/models.rs | 42 | ||||
| -rw-r--r-- | makima/src/db/repository.rs | 113 | ||||
| -rw-r--r-- | makima/src/server/handlers/contract_chat.rs | 3 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh.rs | 18 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_chat.rs | 3 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 59 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_supervisor.rs | 33 | ||||
| -rw-r--r-- | makima/src/server/mod.rs | 43 | ||||
| -rw-r--r-- | makima/src/server/state.rs | 9 |
21 files changed, 1075 insertions, 16 deletions
@@ -2007,6 +2007,7 @@ dependencies = [ "crossterm", "dashmap", "dirs 5.0.1", + "flate2", "futures", "fuzzy-matcher", "hex", @@ -2035,6 +2036,7 @@ dependencies = [ "shell-escape", "sqlx", "symphonia", + "tempfile", "thiserror 2.0.17", "tokenizers 0.21.4", "tokio", diff --git a/makima/Cargo.toml b/makima/Cargo.toml index 650628a..950c123 100644 --- a/makima/Cargo.toml +++ b/makima/Cargo.toml @@ -92,3 +92,9 @@ ahash = "0.8" # TUI ratatui = "0.29" crossterm = "0.28" + +# Compression +flate2 = "1.0" + +[dev-dependencies] +tempfile = "3.10" diff --git a/makima/migrations/20250122000001_checkpoint_patches.sql b/makima/migrations/20250122000001_checkpoint_patches.sql new file mode 100644 index 0000000..19da66e --- /dev/null +++ b/makima/migrations/20250122000001_checkpoint_patches.sql @@ -0,0 +1,27 @@ +-- Checkpoint patches table for storing git diffs to enable task recovery +-- When a local worktree is deleted/corrupted, the patch can be used to restore state + +CREATE TABLE IF NOT EXISTS checkpoint_patches ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + task_id UUID NOT NULL REFERENCES tasks(id) ON DELETE CASCADE, + checkpoint_id UUID REFERENCES task_checkpoints(id) ON DELETE CASCADE, + base_commit_sha VARCHAR(40) NOT NULL, -- Commit to apply patch on top of + patch_data BYTEA NOT NULL, -- Compressed git diff (gzip) + patch_size_bytes INTEGER NOT NULL, + files_count INTEGER NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + expires_at TIMESTAMPTZ NOT NULL, -- TTL for auto-cleanup + CONSTRAINT patch_size_limit CHECK (patch_size_bytes <= 10485760) -- 10MB limit +); + +CREATE INDEX idx_checkpoint_patches_task ON checkpoint_patches(task_id); +CREATE INDEX idx_checkpoint_patches_expires ON checkpoint_patches(expires_at); + +-- Link checkpoints to their patches +ALTER TABLE task_checkpoints +ADD COLUMN IF NOT EXISTS patch_id UUID REFERENCES checkpoint_patches(id) ON DELETE SET NULL; + +COMMENT ON TABLE checkpoint_patches IS 'Stores compressed git diffs for task recovery when local worktree is lost'; +COMMENT ON COLUMN checkpoint_patches.base_commit_sha IS 'The commit SHA that the patch should be applied on top of'; +COMMENT ON COLUMN checkpoint_patches.patch_data IS 'Gzip-compressed git diff data'; +COMMENT ON COLUMN checkpoint_patches.expires_at IS 'Automatic cleanup time (TTL)'; diff --git a/makima/src/bin/makima.rs b/makima/src/bin/makima.rs index cb80c7f..f0db69c 100644 --- a/makima/src/bin/makima.rs +++ b/makima/src/bin/makima.rs @@ -241,6 +241,7 @@ async fn run_daemon( api_url, api_key: config.server.api_key.clone(), heartbeat_commit_interval_secs: config.process.heartbeat_commit_interval_secs, + checkpoint_patches: config.process.checkpoint_patches.clone(), }; // Create task manager diff --git a/makima/src/daemon/config.rs b/makima/src/daemon/config.rs index 9a00166..b7cb1e8 100644 --- a/makima/src/daemon/config.rs +++ b/makima/src/daemon/config.rs @@ -223,6 +223,48 @@ pub struct ProcessConfig { /// Set to 0 to disable. Default: 300 (5 minutes). #[serde(default = "default_heartbeat_commit_interval", alias = "heartbeatcommitintervalsecs")] pub heartbeat_commit_interval_secs: u64, + + /// Checkpoint patch storage configuration for task recovery. + #[serde(default)] + pub checkpoint_patches: CheckpointPatchConfig, +} + +/// Configuration for checkpoint patch storage in PostgreSQL. +/// Patches are stored to enable task recovery when local worktrees are lost. +#[derive(Debug, Clone, Deserialize)] +#[serde(default)] +pub struct CheckpointPatchConfig { + /// Enable patch storage in PostgreSQL (default: true). + #[serde(default = "default_true")] + pub enabled: bool, + + /// Maximum patch size in bytes (default: 10MB). + /// Patches larger than this will not be stored. + #[serde(default = "default_max_patch_size", alias = "maxpatchsizebytes")] + pub max_patch_size_bytes: usize, + + /// TTL for patches in hours (default: 168 = 7 days). + /// Patches older than this will be automatically cleaned up. + #[serde(default = "default_patch_ttl_hours", alias = "ttlhours")] + pub ttl_hours: u64, +} + +fn default_max_patch_size() -> usize { + 10 * 1024 * 1024 // 10MB +} + +fn default_patch_ttl_hours() -> u64 { + 168 // 7 days +} + +impl Default for CheckpointPatchConfig { + fn default() -> Self { + Self { + enabled: true, + max_patch_size_bytes: default_max_patch_size(), + ttl_hours: default_patch_ttl_hours(), + } + } } fn default_claude_command() -> String { @@ -255,6 +297,7 @@ impl Default for ProcessConfig { env_vars: HashMap::new(), bubblewrap: BubblewrapConfig::default(), heartbeat_commit_interval_secs: default_heartbeat_commit_interval(), + checkpoint_patches: CheckpointPatchConfig::default(), } } } @@ -576,6 +619,7 @@ impl DaemonConfig { env_vars: HashMap::new(), bubblewrap: BubblewrapConfig::default(), heartbeat_commit_interval_secs: 300, + checkpoint_patches: CheckpointPatchConfig::default(), }, local_db: LocalDbConfig { path: PathBuf::from("/tmp/makima-daemon-test/state.db"), diff --git a/makima/src/daemon/error.rs b/makima/src/daemon/error.rs index ea00d25..8d7fff0 100644 --- a/makima/src/daemon/error.rs +++ b/makima/src/daemon/error.rs @@ -30,6 +30,9 @@ pub enum DaemonError { #[error("JSON error: {0}")] Json(#[from] serde_json::Error), + #[error("Storage error: {0}")] + Storage(#[from] crate::daemon::storage::PatchError), + #[error("Authentication failed: {0}")] AuthFailed(String), diff --git a/makima/src/daemon/mod.rs b/makima/src/daemon/mod.rs index 18b5e8a..f5793d6 100644 --- a/makima/src/daemon/mod.rs +++ b/makima/src/daemon/mod.rs @@ -14,6 +14,7 @@ pub mod db; pub mod error; pub mod process; pub mod setup; +pub mod storage; pub mod task; pub mod temp; pub mod tui; diff --git a/makima/src/daemon/storage/mod.rs b/makima/src/daemon/storage/mod.rs new file mode 100644 index 0000000..cc5441a --- /dev/null +++ b/makima/src/daemon/storage/mod.rs @@ -0,0 +1,8 @@ +//! Checkpoint storage for task recovery. +//! +//! This module provides functionality to store and restore git patches +//! in PostgreSQL for recovering task worktrees when they are lost. + +mod patch; + +pub use patch::{create_patch, apply_patch, PatchError}; diff --git a/makima/src/daemon/storage/patch.rs b/makima/src/daemon/storage/patch.rs new file mode 100644 index 0000000..45624b5 --- /dev/null +++ b/makima/src/daemon/storage/patch.rs @@ -0,0 +1,293 @@ +//! Git patch creation and application for checkpoint recovery. + +use flate2::read::GzDecoder; +use flate2::write::GzEncoder; +use flate2::Compression; +use std::io::{Read, Write}; +use std::path::Path; +use thiserror::Error; +use tokio::process::Command; + +/// Errors that can occur during patch operations. +#[derive(Error, Debug)] +pub enum PatchError { + #[error("Git command failed: {0}")] + GitCommand(String), + + #[error("Compression error: {0}")] + Compression(#[from] std::io::Error), + + #[error("Patch too large: {size} bytes (max: {max} bytes)")] + TooLarge { size: usize, max: usize }, + + #[error("Empty patch (no changes)")] + EmptyPatch, + + #[error("Failed to apply patch: {0}")] + ApplyFailed(String), +} + +/// Create a compressed git diff from worktree changes. +/// +/// Generates a diff between `base_sha` and HEAD, then compresses it with gzip. +/// Returns the compressed patch bytes and the number of files changed. +pub async fn create_patch( + worktree_path: &Path, + base_sha: &str, +) -> Result<(Vec<u8>, usize), PatchError> { + // Get the diff between base commit and HEAD + let output = Command::new("git") + .current_dir(worktree_path) + .args(["diff", base_sha, "HEAD", "--binary"]) + .output() + .await + .map_err(|e| PatchError::GitCommand(format!("Failed to run git diff: {}", e)))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(PatchError::GitCommand(format!("git diff failed: {}", stderr))); + } + + let diff_data = output.stdout; + if diff_data.is_empty() { + return Err(PatchError::EmptyPatch); + } + + // Count files changed + let files_output = Command::new("git") + .current_dir(worktree_path) + .args(["diff", base_sha, "HEAD", "--name-only"]) + .output() + .await + .map_err(|e| PatchError::GitCommand(format!("Failed to count files: {}", e)))?; + + let files_count = if files_output.status.success() { + String::from_utf8_lossy(&files_output.stdout) + .lines() + .filter(|l| !l.is_empty()) + .count() + } else { + 0 + }; + + // Compress with gzip + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(&diff_data)?; + let compressed = encoder.finish()?; + + Ok((compressed, files_count)) +} + +/// Apply a compressed patch to restore worktree state. +/// +/// The worktree should already be checked out at `base_sha` before calling this. +pub async fn apply_patch(worktree_path: &Path, patch_data: &[u8]) -> Result<(), PatchError> { + // Decompress gzip + let mut decoder = GzDecoder::new(patch_data); + let mut decompressed = Vec::new(); + decoder.read_to_end(&mut decompressed)?; + + if decompressed.is_empty() { + return Err(PatchError::EmptyPatch); + } + + // Apply the patch using git apply + let mut child = Command::new("git") + .current_dir(worktree_path) + .args(["apply", "--binary", "-"]) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .map_err(|e| PatchError::GitCommand(format!("Failed to spawn git apply: {}", e)))?; + + // Write patch to stdin + if let Some(mut stdin) = child.stdin.take() { + use tokio::io::AsyncWriteExt; + stdin.write_all(&decompressed).await?; + drop(stdin); // Close stdin to signal EOF + } + + let output = child + .wait_with_output() + .await + .map_err(|e| PatchError::GitCommand(format!("Failed to wait for git apply: {}", e)))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(PatchError::ApplyFailed(stderr.to_string())); + } + + Ok(()) +} + +/// Get the parent commit SHA (HEAD~1) from a worktree. +pub async fn get_parent_sha(worktree_path: &Path) -> Result<String, PatchError> { + let output = Command::new("git") + .current_dir(worktree_path) + .args(["rev-parse", "HEAD~1"]) + .output() + .await + .map_err(|e| PatchError::GitCommand(format!("Failed to get parent SHA: {}", e)))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(PatchError::GitCommand(format!( + "git rev-parse HEAD~1 failed: {}", + stderr + ))); + } + + Ok(String::from_utf8_lossy(&output.stdout).trim().to_string()) +} + +/// Checkout a specific commit in the worktree. +pub async fn checkout_commit(worktree_path: &Path, sha: &str) -> Result<(), PatchError> { + let output = Command::new("git") + .current_dir(worktree_path) + .args(["checkout", sha]) + .output() + .await + .map_err(|e| PatchError::GitCommand(format!("Failed to checkout: {}", e)))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(PatchError::GitCommand(format!( + "git checkout {} failed: {}", + sha, stderr + ))); + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fs; + use tempfile::TempDir; + + async fn setup_test_repo() -> TempDir { + let dir = TempDir::new().unwrap(); + let path = dir.path(); + + // Initialize git repo + Command::new("git") + .current_dir(path) + .args(["init"]) + .output() + .await + .unwrap(); + + // Configure git user + Command::new("git") + .current_dir(path) + .args(["config", "user.email", "test@test.com"]) + .output() + .await + .unwrap(); + Command::new("git") + .current_dir(path) + .args(["config", "user.name", "Test"]) + .output() + .await + .unwrap(); + + // Create initial commit + fs::write(path.join("file.txt"), "initial").unwrap(); + Command::new("git") + .current_dir(path) + .args(["add", "."]) + .output() + .await + .unwrap(); + Command::new("git") + .current_dir(path) + .args(["commit", "-m", "initial"]) + .output() + .await + .unwrap(); + + dir + } + + #[tokio::test] + async fn test_create_and_apply_patch() { + let dir = setup_test_repo().await; + let path = dir.path(); + + // Get base SHA + let base_sha = get_parent_sha(path).await; + // This will fail since there's only one commit + assert!(base_sha.is_err()); + + // Make another commit first + fs::write(path.join("file.txt"), "modified").unwrap(); + Command::new("git") + .current_dir(path) + .args(["add", "."]) + .output() + .await + .unwrap(); + Command::new("git") + .current_dir(path) + .args(["commit", "-m", "modified"]) + .output() + .await + .unwrap(); + + // Now get the base SHA + let base_sha = get_parent_sha(path).await.unwrap(); + + // Create patch + let (patch_data, files_count) = create_patch(path, &base_sha).await.unwrap(); + assert!(!patch_data.is_empty()); + assert_eq!(files_count, 1); + + // Reset to base and apply patch + checkout_commit(path, &base_sha).await.unwrap(); + assert_eq!(fs::read_to_string(path.join("file.txt")).unwrap(), "initial"); + + apply_patch(path, &patch_data).await.unwrap(); + assert_eq!( + fs::read_to_string(path.join("file.txt")).unwrap(), + "modified" + ); + } + + #[tokio::test] + async fn test_empty_patch() { + let dir = setup_test_repo().await; + let path = dir.path(); + + // Make another commit + fs::write(path.join("file.txt"), "modified").unwrap(); + Command::new("git") + .current_dir(path) + .args(["add", "."]) + .output() + .await + .unwrap(); + Command::new("git") + .current_dir(path) + .args(["commit", "-m", "modified"]) + .output() + .await + .unwrap(); + + // Get current HEAD + let head_output = Command::new("git") + .current_dir(path) + .args(["rev-parse", "HEAD"]) + .output() + .await + .unwrap(); + let head_sha = String::from_utf8_lossy(&head_output.stdout) + .trim() + .to_string(); + + // Try to create patch from HEAD to HEAD (no changes) + let result = create_patch(path, &head_sha).await; + assert!(matches!(result, Err(PatchError::EmptyPatch))); + } +} diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs index 0cba516..cb4bde2 100644 --- a/makima/src/daemon/task/manager.rs +++ b/makima/src/daemon/task/manager.rs @@ -5,6 +5,7 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Instant; +use base64::Engine; use rand::Rng; use tokio::io::AsyncWriteExt; use tokio::sync::{mpsc, RwLock}; @@ -14,8 +15,10 @@ use std::collections::HashSet; use super::completion_gate::{CircuitBreaker, CompletionGate}; use super::state::TaskState; +use crate::daemon::config::CheckpointPatchConfig; use crate::daemon::error::{DaemonError, TaskError, TaskResult}; use crate::daemon::process::{ClaudeInputMessage, ProcessManager}; +use crate::daemon::storage; use crate::daemon::temp::TempManager; use crate::daemon::worktree::{is_new_repo_request, ConflictResolution, WorktreeInfo, WorktreeManager}; use crate::daemon::ws::{BranchInfo, DaemonCommand, DaemonMessage}; @@ -989,6 +992,8 @@ pub struct TaskConfig { /// Interval in seconds between heartbeat commits (WIP checkpoints). /// Set to 0 to disable. Default: 300 (5 minutes). pub heartbeat_commit_interval_secs: u64, + /// Checkpoint patch storage configuration. + pub checkpoint_patches: CheckpointPatchConfig, } impl Default for TaskConfig { @@ -1007,6 +1012,7 @@ impl Default for TaskConfig { api_url: "https://api.makima.jp".to_string(), api_key: String::new(), heartbeat_commit_interval_secs: 300, // 5 minutes + checkpoint_patches: CheckpointPatchConfig::default(), } } } @@ -1405,6 +1411,8 @@ impl TaskManager { autonomous_loop, resume_session, conversation_history, + patch_data, + patch_base_sha, } => { tracing::info!( task_id = %task_id, @@ -1431,7 +1439,7 @@ impl TaskManager { parent_task_id, depth, is_orchestrator, is_supervisor, target_repo_path, completion_action, continue_from_task_id, copy_files, contract_id, autonomous_loop, resume_session, - conversation_history, + conversation_history, patch_data, patch_base_sha, ).await?; } DaemonCommand::PauseTask { task_id } => { @@ -1529,6 +1537,8 @@ impl TaskManager { false, // autonomous_loop - supervisors don't use this false, // resume_session - respawning from scratch None, // conversation_history - not needed for fresh respawn + None, // patch_data - not available for respawn + None, // patch_base_sha - not available for respawn ).await { tracing::error!( task_id = %task_id, @@ -1755,17 +1765,22 @@ impl TaskManager { autonomous_loop: bool, resume_session: bool, conversation_history: Option<serde_json::Value>, + patch_data: Option<String>, + patch_base_sha: Option<String>, ) -> TaskResult<()> { - tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, "=== SPAWN_TASK START ==="); + tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, patch_available = patch_data.is_some(), "=== SPAWN_TASK START ==="); // Check if task already exists - allow re-spawning if in terminal state + // or if resuming a supervisor (supervisors stay in Running state after Claude exits) { let mut tasks = self.tasks.write().await; if let Some(existing) = tasks.get(&task_id) { - if existing.state.is_terminal() { - // Task exists but is in terminal state (completed, failed, interrupted) - // Remove it so we can re-spawn - tracing::info!(task_id = %task_id, old_state = ?existing.state, "Removing terminated task to allow re-spawn"); + let can_respawn = existing.state.is_terminal() + || (resume_session && existing.is_supervisor); + + if can_respawn { + // Task exists but can be re-spawned (terminal state or supervisor resume) + tracing::info!(task_id = %task_id, old_state = ?existing.state, resume_session = resume_session, is_supervisor = existing.is_supervisor, "Removing task to allow re-spawn"); tasks.remove(&task_id); } else { // Task is still active, reject @@ -1825,7 +1840,7 @@ impl TaskManager { task_id, task_name, plan, repo_url, base_branch, target_branch, is_orchestrator, is_supervisor, target_repo_path, completion_action, continue_from_task_id, copy_files, contract_id, autonomous_loop, resume_session, - conversation_history, + conversation_history, patch_data, patch_base_sha, ).await { tracing::error!(task_id = %task_id, error = %e, "Task execution failed"); inner.mark_failed(task_id, &e.to_string()).await; @@ -1855,6 +1870,7 @@ impl TaskManager { api_url: self.config.api_url.clone(), heartbeat_commit_interval_secs: self.config.heartbeat_commit_interval_secs, contract_task_counts: self.contract_task_counts.clone(), + checkpoint_patches: self.config.checkpoint_patches.clone(), } } @@ -2824,6 +2840,9 @@ impl TaskManager { lines_removed: None, error: Some(format!("Task {} not found or has no worktree", task_id)), message, + patch_data: None, + patch_base_sha: None, + patch_files_count: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); @@ -2854,6 +2873,9 @@ impl TaskManager { lines_removed: None, error: Some("No changes to checkpoint".to_string()), message, + patch_data: None, + patch_base_sha: None, + patch_files_count: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); @@ -2878,6 +2900,9 @@ impl TaskManager { lines_removed: None, error: Some(format!("Failed to stage changes: {}", e)), message, + patch_data: None, + patch_base_sha: None, + patch_files_count: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); @@ -2920,6 +2945,9 @@ impl TaskManager { lines_removed: Some(lines_removed), error: Some(format!("Commit failed: {}", stderr)), message, + patch_data: None, + patch_base_sha: None, + patch_files_count: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); @@ -2936,6 +2964,9 @@ impl TaskManager { lines_removed: None, error: Some(format!("Failed to execute git commit: {}", e)), message, + patch_data: None, + patch_base_sha: None, + patch_files_count: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); @@ -2943,6 +2974,7 @@ impl TaskManager { }; // Success - send response (checkpoint_number will be assigned by server on DB insert) + // Note: Manual checkpoints don't include patches (only heartbeat commits do) let msg = DaemonMessage::CheckpointCreated { task_id, success: true, @@ -2954,6 +2986,9 @@ impl TaskManager { lines_removed: Some(lines_removed), error: None, message, + patch_data: None, + patch_base_sha: None, + patch_files_count: None, }; let _ = self.ws_tx.send(msg).await; Ok(()) @@ -3153,6 +3188,8 @@ struct TaskManagerInner { heartbeat_commit_interval_secs: u64, /// Shared contract task counts for releasing concurrency slots. contract_task_counts: Arc<RwLock<HashMap<Uuid, usize>>>, + /// Checkpoint patch storage configuration. + checkpoint_patches: CheckpointPatchConfig, } impl TaskManagerInner { @@ -3193,8 +3230,10 @@ impl TaskManagerInner { autonomous_loop: bool, resume_session: bool, conversation_history: Option<serde_json::Value>, + patch_data: Option<String>, + patch_base_sha: Option<String>, ) -> Result<(), DaemonError> { - tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, resume_session = resume_session, "=== RUN_TASK START ==="); + tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, resume_session = resume_session, has_patch = patch_data.is_some(), "=== RUN_TASK START ==="); // If resuming session, try to find existing worktree first let existing_worktree = if resume_session { @@ -3212,8 +3251,83 @@ impl TaskManagerInner { None }; + // Try to restore from patch if worktree is missing but we have patch data + let restored_from_patch = if existing_worktree.is_none() { + if let (Some(patch_str), Some(base_sha), Some(source)) = (&patch_data, &patch_base_sha, &repo_source) { + tracing::info!( + task_id = %task_id, + base_sha = %base_sha, + patch_len = patch_str.len(), + "Attempting to restore worktree from patch" + ); + + let msg = DaemonMessage::task_output( + task_id, + format!("Restoring worktree from checkpoint patch...\n"), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // Decode base64 patch data + match base64::Engine::decode(&base64::engine::general_purpose::STANDARD, patch_str) { + Ok(patch_bytes) => { + match self.worktree_manager.restore_from_patch( + source, + task_id, + &task_name, + base_sha, + &patch_bytes, + ).await { + Ok(worktree_info) => { + tracing::info!( + task_id = %task_id, + path = %worktree_info.path.display(), + "Successfully restored worktree from patch" + ); + + // Store worktree info + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.worktree = Some(worktree_info.clone()); + } + } + + let msg = DaemonMessage::task_output( + task_id, + format!("Worktree restored at {}\n", worktree_info.path.display()), + false, + ); + let _ = self.ws_tx.send(msg).await; + + Some(worktree_info.path) + } + Err(e) => { + tracing::warn!(task_id = %task_id, error = %e, "Failed to restore from patch, will clone fresh"); + let msg = DaemonMessage::task_output( + task_id, + format!("Warning: Failed to restore from patch ({}), starting fresh\n", e), + false, + ); + let _ = self.ws_tx.send(msg).await; + None + } + } + } + Err(e) => { + tracing::warn!(task_id = %task_id, error = %e, "Failed to decode patch data"); + None + } + } + } else { + None + } + } else { + None + }; + // Determine working directory - let has_existing_worktree = existing_worktree.is_some(); + let has_existing_worktree = existing_worktree.is_some() || restored_from_patch.is_some(); let working_dir = if let Some(existing) = existing_worktree { // Reuse existing worktree for session resume let msg = DaemonMessage::task_output( @@ -3223,6 +3337,9 @@ impl TaskManagerInner { ); let _ = self.ws_tx.send(msg).await; existing + } else if let Some(restored_path) = restored_from_patch { + // Already restored from patch above + restored_path } else if let Some(ref source) = repo_source { if is_new_repo_request(source) { // Explicit new repo request: new:// or new://project-name @@ -4523,12 +4640,24 @@ impl TaskManagerInner { /// Create a heartbeat commit with all uncommitted changes (WIP checkpoint). /// Returns (commit SHA, push succeeded) on success, or an error message if nothing to commit. + /// Also creates a patch and sends it to the server for recovery purposes. async fn create_heartbeat_commit( &self, task_id: Uuid, worktree_path: &std::path::Path, ) -> Result<(String, bool), String> { - // 1. Check for uncommitted changes using git status --porcelain + // 1. Get parent SHA BEFORE committing (for patch creation) + let parent_sha_output = tokio::process::Command::new("git") + .current_dir(worktree_path) + .args(["rev-parse", "HEAD"]) + .output() + .await; + let parent_sha = parent_sha_output + .ok() + .filter(|o| o.status.success()) + .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string()); + + // 2. Check for uncommitted changes using git status --porcelain let status_output = tokio::process::Command::new("git") .current_dir(worktree_path) .args(["status", "--porcelain"]) @@ -4546,7 +4675,7 @@ impl TaskManagerInner { return Err("No changes to commit".into()); } - // 2. Stage all changes + // 3. Stage all changes let add_output = tokio::process::Command::new("git") .current_dir(worktree_path) .args(["add", "-A"]) @@ -4559,7 +4688,7 @@ impl TaskManagerInner { return Err(format!("git add failed: {}", stderr)); } - // 3. Create WIP commit with timestamp + // 4. Create WIP commit with timestamp let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC"); let commit_msg = format!("[WIP] Heartbeat checkpoint - {}", timestamp); @@ -4575,7 +4704,7 @@ impl TaskManagerInner { return Err(format!("git commit failed: {}", stderr)); } - // 4. Get the commit SHA + // 5. Get the commit SHA let sha_output = tokio::process::Command::new("git") .current_dir(worktree_path) .args(["rev-parse", "HEAD"]) @@ -4591,7 +4720,19 @@ impl TaskManagerInner { let sha = String::from_utf8_lossy(&sha_output.stdout).trim().to_string(); tracing::info!(task_id = %task_id, sha = %sha, "Created heartbeat commit"); - // 5. Push to remote (best effort - don't fail if push fails) + // 6. Get current branch name + let branch_output = tokio::process::Command::new("git") + .current_dir(worktree_path) + .args(["branch", "--show-current"]) + .output() + .await; + let branch_name = branch_output + .ok() + .filter(|o| o.status.success()) + .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string()) + .unwrap_or_else(|| "unknown".to_string()); + + // 7. Push to remote (best effort - don't fail if push fails) let push_output = tokio::process::Command::new("git") .current_dir(worktree_path) .args(["push"]) @@ -4614,6 +4755,68 @@ impl TaskManagerInner { } }; + // 8. Create patch and send CheckpointCreated message to server + let mut patch_data: Option<String> = None; + let mut patch_base_sha: Option<String> = None; + let mut patch_files_count: Option<i32> = None; + + if self.checkpoint_patches.enabled { + if let Some(ref base_sha) = parent_sha { + match storage::create_patch(worktree_path, base_sha).await { + Ok((compressed_patch, files_count)) => { + // Check size limit + if compressed_patch.len() <= self.checkpoint_patches.max_patch_size_bytes { + // Encode as base64 for JSON transport + patch_data = Some(base64::engine::general_purpose::STANDARD.encode(&compressed_patch)); + patch_base_sha = Some(base_sha.clone()); + patch_files_count = Some(files_count as i32); + tracing::debug!( + task_id = %task_id, + sha = %sha, + patch_size = compressed_patch.len(), + files_count = files_count, + "Created checkpoint patch" + ); + } else { + tracing::warn!( + task_id = %task_id, + sha = %sha, + patch_size = compressed_patch.len(), + max_size = self.checkpoint_patches.max_patch_size_bytes, + "Patch exceeds size limit, not including in checkpoint" + ); + } + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + sha = %sha, + error = %e, + "Failed to create patch for heartbeat commit" + ); + } + } + } + } + + // Send CheckpointCreated message to server (so it stores the checkpoint and patch) + let msg = DaemonMessage::CheckpointCreated { + task_id, + success: true, + commit_sha: Some(sha.clone()), + branch_name: Some(branch_name), + checkpoint_number: None, // Server will assign + files_changed: None, // Could get from git diff --name-status if needed + lines_added: None, + lines_removed: None, + error: None, + message: commit_msg, + patch_data, + patch_base_sha, + patch_files_count, + }; + let _ = self.ws_tx.send(msg).await; + Ok((sha, pushed)) } } @@ -4633,6 +4836,7 @@ impl Clone for TaskManagerInner { api_url: self.api_url.clone(), heartbeat_commit_interval_secs: self.heartbeat_commit_interval_secs, contract_task_counts: self.contract_task_counts.clone(), + checkpoint_patches: self.checkpoint_patches.clone(), } } } diff --git a/makima/src/daemon/worktree/manager.rs b/makima/src/daemon/worktree/manager.rs index 5edd7b1..04cb307 100644 --- a/makima/src/daemon/worktree/manager.rs +++ b/makima/src/daemon/worktree/manager.rs @@ -1697,6 +1697,141 @@ impl WorktreeManager { pub async fn target_directory_exists(&self, target_dir: &Path) -> bool { target_dir.exists() } + + /// Restore a worktree from a stored patch. + /// + /// This is used for task recovery when the local worktree has been lost. + /// 1. Clone/fetch the source repo to get the base commit + /// 2. Create a new worktree at the base commit + /// 3. Apply the patch to restore the task's state + pub async fn restore_from_patch( + &self, + source_repo: &str, + task_id: Uuid, + task_name: &str, + base_commit_sha: &str, + patch_data: &[u8], + ) -> Result<WorktreeInfo, WorktreeError> { + use crate::daemon::storage; + + // Generate directory and branch names + let dir_name = format!("{}-{}", short_uuid(task_id), sanitize_name(task_name)); + let worktree_path = self.base_dir.join(&dir_name); + let branch_name = format!( + "{}{}-{}", + self.branch_prefix, + sanitize_name(task_name), + short_uuid(task_id) + ); + + // Ensure base directory exists + tokio::fs::create_dir_all(&self.base_dir).await?; + + // Remove existing worktree if present (we're restoring from scratch) + if worktree_path.exists() { + tracing::info!( + task_id = %task_id, + worktree_path = %worktree_path.display(), + "Removing existing worktree before restore" + ); + tokio::fs::remove_dir_all(&worktree_path).await?; + } + + // Clone the source repo if needed + let repo_path = self.ensure_repo(source_repo).await?; + + // Create worktree at the base commit + // First, we need to make sure the base commit is available + let fetch_output = Command::new("git") + .args(["fetch", "--all"]) + .current_dir(&repo_path) + .output() + .await?; + + if !fetch_output.status.success() { + tracing::warn!( + task_id = %task_id, + stderr = %String::from_utf8_lossy(&fetch_output.stderr), + "git fetch failed, continuing anyway" + ); + } + + // Create the worktree from the base commit + let output = Command::new("git") + .args([ + "worktree", + "add", + "-b", + &branch_name, + worktree_path.to_str().ok_or_else(|| { + WorktreeError::InvalidPath("Invalid worktree path".to_string()) + })?, + base_commit_sha, + ]) + .current_dir(&repo_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + // If branch already exists, try without -b flag + if stderr.contains("already exists") { + // Remove the branch and try again + let _ = Command::new("git") + .args(["branch", "-D", &branch_name]) + .current_dir(&repo_path) + .output() + .await; + + let retry_output = Command::new("git") + .args([ + "worktree", + "add", + "-b", + &branch_name, + worktree_path.to_str().unwrap(), + base_commit_sha, + ]) + .current_dir(&repo_path) + .output() + .await?; + + if !retry_output.status.success() { + return Err(WorktreeError::GitCommand(format!( + "Failed to create worktree after retry: {}", + String::from_utf8_lossy(&retry_output.stderr) + ))); + } + } else { + return Err(WorktreeError::GitCommand(format!( + "Failed to create worktree: {}", + stderr + ))); + } + } + + // Apply the patch to restore the task's state + if let Err(e) = storage::apply_patch(&worktree_path, patch_data).await { + tracing::error!( + task_id = %task_id, + error = %e, + "Failed to apply patch, worktree is at base commit" + ); + // Don't fail - the worktree is usable, just at the base commit + } else { + tracing::info!( + task_id = %task_id, + worktree_path = %worktree_path.display(), + "Successfully restored worktree from patch" + ); + } + + Ok(WorktreeInfo { + path: worktree_path, + branch: branch_name, + source_repo: repo_path, + }) + } } /// Check if repo_source is a "new repo" request. diff --git a/makima/src/daemon/ws/protocol.rs b/makima/src/daemon/ws/protocol.rs index 3b02b53..ec9b09e 100644 --- a/makima/src/daemon/ws/protocol.rs +++ b/makima/src/daemon/ws/protocol.rs @@ -278,6 +278,15 @@ pub enum DaemonMessage { error: Option<String>, /// User-provided checkpoint message message: String, + /// Base64-encoded gzip-compressed patch data for recovery + #[serde(rename = "patchData", skip_serializing_if = "Option::is_none")] + patch_data: Option<String>, + /// Commit SHA to apply patch on top of (for recovery) + #[serde(rename = "patchBaseSha", skip_serializing_if = "Option::is_none")] + patch_base_sha: Option<String>, + /// Number of files in the patch + #[serde(rename = "patchFilesCount", skip_serializing_if = "Option::is_none")] + patch_files_count: Option<i32>, }, /// Response to CleanupWorktree command. @@ -387,6 +396,13 @@ pub enum DaemonCommand { /// Used to inject previous conversation context into the prompt. #[serde(rename = "conversationHistory", default)] conversation_history: Option<serde_json::Value>, + /// Base64-encoded gzip-compressed patch for worktree recovery. + /// Used when resume_session=true and the local worktree is missing. + #[serde(rename = "patchData", default, skip_serializing_if = "Option::is_none")] + patch_data: Option<String>, + /// Commit SHA to apply the patch on top of. + #[serde(rename = "patchBaseSha", default, skip_serializing_if = "Option::is_none")] + patch_base_sha: Option<String>, }, /// Pause a running task. diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index 6ede268..58f4da1 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -1966,3 +1966,45 @@ pub struct ForkPoint { pub checkpoint: Option<TaskCheckpoint>, pub timestamp: DateTime<Utc>, } + +// ============================================================================ +// Checkpoint Patches (for task recovery when worktrees are lost) +// ============================================================================ + +/// A stored git patch for checkpoint recovery. +/// Enables task recovery when local worktrees are deleted or corrupted. +#[derive(Debug, Clone, FromRow, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CheckpointPatch { + pub id: Uuid, + pub task_id: Uuid, + /// Optional link to a task_checkpoint record + pub checkpoint_id: Option<Uuid>, + /// The commit SHA that the patch should be applied on top of + pub base_commit_sha: String, + /// Compressed git diff data (gzip) + #[sqlx(rename = "patch_data")] + #[serde(skip)] // Don't serialize binary data to JSON + pub patch_data: Vec<u8>, + /// Size of the compressed patch in bytes + pub patch_size_bytes: i32, + /// Number of files affected by this patch + pub files_count: i32, + pub created_at: DateTime<Utc>, + /// When this patch expires and will be automatically deleted + pub expires_at: DateTime<Utc>, +} + +/// Response for checkpoint patch (without binary data) +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CheckpointPatchInfo { + pub id: Uuid, + pub task_id: Uuid, + pub checkpoint_id: Option<Uuid>, + pub base_commit_sha: String, + pub patch_size_bytes: i32, + pub files_count: i32, + pub created_at: DateTime<Utc>, + pub expires_at: DateTime<Utc>, +} diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 84afc8d..da44899 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -6,8 +6,9 @@ use sqlx::PgPool; use uuid::Uuid; use super::models::{ - Contract, ContractChatConversation, ContractChatMessageRecord, ContractEvent, ContractRepository, - ContractSummary, ConversationMessage, ConversationSnapshot, CreateContractRequest, CreateFileRequest, + CheckpointPatch, CheckpointPatchInfo, Contract, ContractChatConversation, + ContractChatMessageRecord, ContractEvent, ContractRepository, ContractSummary, + ConversationMessage, ConversationSnapshot, CreateContractRequest, CreateFileRequest, CreateTaskRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters, MeshChatConversation, MeshChatMessageRecord, SupervisorState, Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest, @@ -3717,3 +3718,111 @@ pub async fn cleanup_stale_anonymous_tasks( Ok(result.rows_affected() as i64) } + +// ============================================================================ +// Checkpoint Patches (for task recovery) +// ============================================================================ + +/// Create a checkpoint patch for task recovery. +pub async fn create_checkpoint_patch( + pool: &PgPool, + task_id: Uuid, + checkpoint_id: Option<Uuid>, + base_commit_sha: &str, + patch_data: &[u8], + files_count: i32, + ttl_hours: i64, +) -> Result<CheckpointPatch, sqlx::Error> { + sqlx::query_as::<_, CheckpointPatch>( + r#" + INSERT INTO checkpoint_patches ( + task_id, checkpoint_id, base_commit_sha, patch_data, + patch_size_bytes, files_count, expires_at + ) + VALUES ($1, $2, $3, $4, $5, $6, NOW() + INTERVAL '1 hour' * $7) + RETURNING * + "#, + ) + .bind(task_id) + .bind(checkpoint_id) + .bind(base_commit_sha) + .bind(patch_data) + .bind(patch_data.len() as i32) + .bind(files_count) + .bind(ttl_hours) + .fetch_one(pool) + .await +} + +/// Get the latest checkpoint patch for a task. +pub async fn get_latest_checkpoint_patch( + pool: &PgPool, + task_id: Uuid, +) -> Result<Option<CheckpointPatch>, sqlx::Error> { + sqlx::query_as::<_, CheckpointPatch>( + r#" + SELECT * FROM checkpoint_patches + WHERE task_id = $1 AND expires_at > NOW() + ORDER BY created_at DESC + LIMIT 1 + "#, + ) + .bind(task_id) + .fetch_optional(pool) + .await +} + +/// Get a checkpoint patch by ID. +pub async fn get_checkpoint_patch( + pool: &PgPool, + id: Uuid, +) -> Result<Option<CheckpointPatch>, sqlx::Error> { + sqlx::query_as::<_, CheckpointPatch>( + "SELECT * FROM checkpoint_patches WHERE id = $1", + ) + .bind(id) + .fetch_optional(pool) + .await +} + +/// List all checkpoint patches for a task (without patch data for efficiency). +pub async fn list_checkpoint_patches( + pool: &PgPool, + task_id: Uuid, +) -> Result<Vec<CheckpointPatchInfo>, sqlx::Error> { + sqlx::query_as::<_, CheckpointPatchInfo>( + r#" + SELECT id, task_id, checkpoint_id, base_commit_sha, + patch_size_bytes, files_count, created_at, expires_at + FROM checkpoint_patches + WHERE task_id = $1 + ORDER BY created_at DESC + "#, + ) + .bind(task_id) + .fetch_all(pool) + .await +} + +/// Delete expired checkpoint patches. +/// Returns the number of deleted patches. +pub async fn cleanup_expired_checkpoint_patches( + pool: &PgPool, +) -> Result<i64, sqlx::Error> { + let result = sqlx::query("DELETE FROM checkpoint_patches WHERE expires_at < NOW()") + .execute(pool) + .await?; + Ok(result.rows_affected() as i64) +} + +/// Delete all checkpoint patches for a task. +pub async fn delete_checkpoint_patches_for_task( + pool: &PgPool, + task_id: Uuid, +) -> Result<i64, sqlx::Error> { + let result = sqlx::query("DELETE FROM checkpoint_patches WHERE task_id = $1") + .bind(task_id) + .execute(pool) + .await?; + Ok(result.rows_affected() as i64) +} diff --git a/makima/src/server/handlers/contract_chat.rs b/makima/src/server/handlers/contract_chat.rs index c94538d..e2adb72 100644 --- a/makima/src/server/handlers/contract_chat.rs +++ b/makima/src/server/handlers/contract_chat.rs @@ -1593,8 +1593,11 @@ async fn handle_contract_request( copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; if let Err(e) = command_sender.send(command).await { diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index 53e1587..240e1f7 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -685,8 +685,11 @@ pub async fn start_task( copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; tracing::info!( @@ -734,8 +737,11 @@ pub async fn start_task( copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; if state.send_daemon_command(alt_daemon_id, alt_command).await.is_ok() { @@ -1135,8 +1141,11 @@ pub async fn send_message( copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: updated_task.contract_id, is_supervisor: updated_task.is_supervisor, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; if state.send_daemon_command(new_daemon_id, spawn_cmd).await.is_ok() { @@ -2273,8 +2282,11 @@ pub async fn reassign_task( copy_files: None, contract_id: task.contract_id, is_supervisor: task.is_supervisor, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; tracing::info!( @@ -2597,8 +2609,11 @@ pub async fn continue_task( copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; tracing::info!( @@ -3490,8 +3505,11 @@ pub async fn branch_task( copy_files: None, contract_id: None, is_supervisor: false, + autonomous_loop: false, resume_session: message_count > 0, // Resume if we have conversation history conversation_history: updated_task.conversation_state.clone(), + patch_data: None, + patch_base_sha: None, }; if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { diff --git a/makima/src/server/handlers/mesh_chat.rs b/makima/src/server/handlers/mesh_chat.rs index 8e134bd..1ff0724 100644 --- a/makima/src/server/handlers/mesh_chat.rs +++ b/makima/src/server/handlers/mesh_chat.rs @@ -1148,8 +1148,11 @@ async fn handle_mesh_request( copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; match state.send_daemon_command(target_daemon_id, command).await { diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 6262975..65db373 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -14,6 +14,7 @@ use axum::{ http::{HeaderMap, StatusCode}, response::{IntoResponse, Response}, }; +use base64::Engine; use futures::{SinkExt, StreamExt}; use serde::Deserialize; use sqlx::Row; @@ -410,6 +411,15 @@ pub enum DaemonMessage { error: Option<String>, /// User-provided checkpoint message message: String, + /// Base64-encoded gzip-compressed patch data for recovery + #[serde(rename = "patchData", skip_serializing_if = "Option::is_none")] + patch_data: Option<String>, + /// Commit SHA to apply patch on top of (for recovery) + #[serde(rename = "patchBaseSha", skip_serializing_if = "Option::is_none")] + patch_base_sha: Option<String>, + /// Number of files in the patch + #[serde(rename = "patchFilesCount", skip_serializing_if = "Option::is_none")] + patch_files_count: Option<i32>, }, /// Notification that git config was inherited GitConfigInherited { @@ -1279,11 +1289,15 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re lines_removed, error, message, + patch_data, + patch_base_sha, + patch_files_count, }) => { tracing::info!( task_id = %task_id, success = success, commit_sha = ?commit_sha, + has_patch = patch_data.is_some(), "Checkpoint created notification received" ); @@ -1309,6 +1323,50 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re "Checkpoint stored in database" ); + // Store patch if provided (for task recovery) + if let (Some(patch_b64), Some(base_sha)) = (&patch_data, &patch_base_sha) { + // Decode base64 patch data + match base64::engine::general_purpose::STANDARD.decode(patch_b64) { + Ok(patch_bytes) => { + let files_count = patch_files_count.unwrap_or(0); + // Default TTL: 7 days (168 hours) + let ttl_hours = 168i64; + match repository::create_checkpoint_patch( + pool, + task_id, + Some(checkpoint.id), + base_sha, + &patch_bytes, + files_count, + ttl_hours, + ).await { + Ok(patch) => { + tracing::info!( + task_id = %task_id, + patch_id = %patch.id, + patch_size = patch_bytes.len(), + "Checkpoint patch stored for recovery" + ); + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + error = %e, + "Failed to store checkpoint patch" + ); + } + } + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + error = %e, + "Failed to decode patch base64 data" + ); + } + } + } + // Broadcast success as task output state.broadcast_task_output(TaskOutputNotification { task_id, @@ -1346,6 +1404,7 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re "filesChanged": files_changed, "linesAdded": lines_added, "linesRemoved": lines_removed, + "hasPatch": patch_data.is_some(), }), ).await; } diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index 57f3f2f..21c9515 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -9,6 +9,7 @@ use axum::{ response::IntoResponse, Json, }; +use base64::Engine; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use uuid::Uuid; @@ -364,8 +365,11 @@ async fn try_start_pending_task( copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: updated_task.contract_id, is_supervisor: false, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { @@ -663,8 +667,11 @@ pub async fn spawn_task( copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: updated_task.contract_id, is_supervisor: false, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { @@ -1992,6 +1999,29 @@ pub async fn resume_supervisor( .into_response(); } + // Fetch latest checkpoint patch for worktree recovery + let (patch_data, patch_base_sha) = match repository::get_latest_checkpoint_patch(pool, supervisor_state.task_id).await { + Ok(Some(patch)) => { + tracing::info!( + task_id = %supervisor_state.task_id, + patch_size = patch.patch_size_bytes, + base_sha = %patch.base_commit_sha, + "Including checkpoint patch for worktree recovery" + ); + // Encode patch as base64 for JSON transport + let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data); + (Some(encoded), Some(patch.base_commit_sha)) + } + Ok(None) => { + tracing::debug!(task_id = %supervisor_state.task_id, "No checkpoint patch found"); + (None, None) + } + Err(e) => { + tracing::warn!(task_id = %supervisor_state.task_id, error = %e, "Failed to fetch checkpoint patch"); + (None, None) + } + }; + // Send SpawnTask with resume_session=true to use Claude's --continue // Include conversation_history as fallback if worktree doesn't exist on target daemon let command = DaemonCommand::SpawnTask { @@ -2010,8 +2040,11 @@ pub async fn resume_supervisor( copy_files: supervisor_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: supervisor_task.contract_id, is_supervisor: true, + autonomous_loop: false, resume_session: true, // Use --continue to preserve conversation conversation_history: Some(supervisor_state.conversation_history.clone()), // Fallback if worktree missing + patch_data, + patch_base_sha, }; if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index 0bc1b92..3a27513 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -248,6 +248,8 @@ const DAEMON_HEARTBEAT_TIMEOUT_SECS: i64 = 120; const ANONYMOUS_TASK_CLEANUP_INTERVAL_SECS: u64 = 24 * 60 * 60; /// Maximum age in days for anonymous tasks before cleanup const ANONYMOUS_TASK_MAX_AGE_DAYS: i32 = 7; +/// Interval for checkpoint patch cleanup (hourly) +const CHECKPOINT_PATCH_CLEANUP_INTERVAL_SECS: u64 = 3600; /// Run the HTTP server with graceful shutdown support. /// @@ -344,6 +346,47 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> { } } }); + + // Clone pool for checkpoint patch cleanup + let checkpoint_patch_cleanup_pool = pool.clone(); + + // Initial cleanup of any expired checkpoint patches + match crate::db::repository::cleanup_expired_checkpoint_patches(&pool).await { + Ok(deleted) if deleted > 0 => { + tracing::info!( + deleted = deleted, + "Cleaned up expired checkpoint patches on startup" + ); + } + Err(e) => { + tracing::warn!(error = %e, "Failed to clean up expired checkpoint patches on startup"); + } + _ => {} + } + + // Spawn periodic checkpoint patch cleanup task (runs hourly) + tokio::spawn(async move { + let mut interval = tokio::time::interval( + std::time::Duration::from_secs(CHECKPOINT_PATCH_CLEANUP_INTERVAL_SECS) + ); + loop { + interval.tick().await; + match crate::db::repository::cleanup_expired_checkpoint_patches( + &checkpoint_patch_cleanup_pool, + ).await { + Ok(deleted) if deleted > 0 => { + tracing::info!( + deleted = deleted, + "Cleaned up expired checkpoint patches" + ); + } + Err(e) => { + tracing::warn!(error = %e, "Failed to clean up expired checkpoint patches"); + } + _ => {} + } + } + }); } let app = make_router(state); diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs index 28d65d0..5b75281 100644 --- a/makima/src/server/state.rs +++ b/makima/src/server/state.rs @@ -208,12 +208,21 @@ pub enum DaemonCommand { /// Whether this task is a supervisor (long-running contract orchestrator) #[serde(rename = "isSupervisor")] is_supervisor: bool, + /// Whether to run in autonomous loop mode + #[serde(rename = "autonomousLoop", default)] + autonomous_loop: bool, /// Whether to resume from a previous session using --continue flag #[serde(rename = "resumeSession", default)] resume_session: bool, /// Conversation history for fallback when worktree doesn't exist #[serde(rename = "conversationHistory", default)] conversation_history: Option<serde_json::Value>, + /// Base64-encoded gzip-compressed patch for worktree recovery + #[serde(rename = "patchData", default, skip_serializing_if = "Option::is_none")] + patch_data: Option<String>, + /// Commit SHA to apply the patch on top of + #[serde(rename = "patchBaseSha", default, skip_serializing_if = "Option::is_none")] + patch_base_sha: Option<String>, }, /// Pause a running task PauseTask { |
