diff options
Diffstat (limited to 'makima/src/daemon')
| -rw-r--r-- | makima/src/daemon/config.rs | 44 | ||||
| -rw-r--r-- | makima/src/daemon/error.rs | 3 | ||||
| -rw-r--r-- | makima/src/daemon/mod.rs | 1 | ||||
| -rw-r--r-- | makima/src/daemon/storage/mod.rs | 8 | ||||
| -rw-r--r-- | makima/src/daemon/storage/patch.rs | 293 | ||||
| -rw-r--r-- | makima/src/daemon/task/manager.rs | 232 | ||||
| -rw-r--r-- | makima/src/daemon/worktree/manager.rs | 135 | ||||
| -rw-r--r-- | makima/src/daemon/ws/protocol.rs | 16 |
8 files changed, 718 insertions, 14 deletions
diff --git a/makima/src/daemon/config.rs b/makima/src/daemon/config.rs index 9a00166..b7cb1e8 100644 --- a/makima/src/daemon/config.rs +++ b/makima/src/daemon/config.rs @@ -223,6 +223,48 @@ pub struct ProcessConfig { /// Set to 0 to disable. Default: 300 (5 minutes). #[serde(default = "default_heartbeat_commit_interval", alias = "heartbeatcommitintervalsecs")] pub heartbeat_commit_interval_secs: u64, + + /// Checkpoint patch storage configuration for task recovery. + #[serde(default)] + pub checkpoint_patches: CheckpointPatchConfig, +} + +/// Configuration for checkpoint patch storage in PostgreSQL. +/// Patches are stored to enable task recovery when local worktrees are lost. +#[derive(Debug, Clone, Deserialize)] +#[serde(default)] +pub struct CheckpointPatchConfig { + /// Enable patch storage in PostgreSQL (default: true). + #[serde(default = "default_true")] + pub enabled: bool, + + /// Maximum patch size in bytes (default: 10MB). + /// Patches larger than this will not be stored. + #[serde(default = "default_max_patch_size", alias = "maxpatchsizebytes")] + pub max_patch_size_bytes: usize, + + /// TTL for patches in hours (default: 168 = 7 days). + /// Patches older than this will be automatically cleaned up. + #[serde(default = "default_patch_ttl_hours", alias = "ttlhours")] + pub ttl_hours: u64, +} + +fn default_max_patch_size() -> usize { + 10 * 1024 * 1024 // 10MB +} + +fn default_patch_ttl_hours() -> u64 { + 168 // 7 days +} + +impl Default for CheckpointPatchConfig { + fn default() -> Self { + Self { + enabled: true, + max_patch_size_bytes: default_max_patch_size(), + ttl_hours: default_patch_ttl_hours(), + } + } } fn default_claude_command() -> String { @@ -255,6 +297,7 @@ impl Default for ProcessConfig { env_vars: HashMap::new(), bubblewrap: BubblewrapConfig::default(), heartbeat_commit_interval_secs: default_heartbeat_commit_interval(), + checkpoint_patches: CheckpointPatchConfig::default(), } } } @@ -576,6 +619,7 @@ impl DaemonConfig { env_vars: HashMap::new(), bubblewrap: BubblewrapConfig::default(), heartbeat_commit_interval_secs: 300, + checkpoint_patches: CheckpointPatchConfig::default(), }, local_db: LocalDbConfig { path: PathBuf::from("/tmp/makima-daemon-test/state.db"), diff --git a/makima/src/daemon/error.rs b/makima/src/daemon/error.rs index ea00d25..8d7fff0 100644 --- a/makima/src/daemon/error.rs +++ b/makima/src/daemon/error.rs @@ -30,6 +30,9 @@ pub enum DaemonError { #[error("JSON error: {0}")] Json(#[from] serde_json::Error), + #[error("Storage error: {0}")] + Storage(#[from] crate::daemon::storage::PatchError), + #[error("Authentication failed: {0}")] AuthFailed(String), diff --git a/makima/src/daemon/mod.rs b/makima/src/daemon/mod.rs index 18b5e8a..f5793d6 100644 --- a/makima/src/daemon/mod.rs +++ b/makima/src/daemon/mod.rs @@ -14,6 +14,7 @@ pub mod db; pub mod error; pub mod process; pub mod setup; +pub mod storage; pub mod task; pub mod temp; pub mod tui; diff --git a/makima/src/daemon/storage/mod.rs b/makima/src/daemon/storage/mod.rs new file mode 100644 index 0000000..cc5441a --- /dev/null +++ b/makima/src/daemon/storage/mod.rs @@ -0,0 +1,8 @@ +//! Checkpoint storage for task recovery. +//! +//! This module provides functionality to store and restore git patches +//! in PostgreSQL for recovering task worktrees when they are lost. + +mod patch; + +pub use patch::{create_patch, apply_patch, PatchError}; diff --git a/makima/src/daemon/storage/patch.rs b/makima/src/daemon/storage/patch.rs new file mode 100644 index 0000000..45624b5 --- /dev/null +++ b/makima/src/daemon/storage/patch.rs @@ -0,0 +1,293 @@ +//! Git patch creation and application for checkpoint recovery. + +use flate2::read::GzDecoder; +use flate2::write::GzEncoder; +use flate2::Compression; +use std::io::{Read, Write}; +use std::path::Path; +use thiserror::Error; +use tokio::process::Command; + +/// Errors that can occur during patch operations. +#[derive(Error, Debug)] +pub enum PatchError { + #[error("Git command failed: {0}")] + GitCommand(String), + + #[error("Compression error: {0}")] + Compression(#[from] std::io::Error), + + #[error("Patch too large: {size} bytes (max: {max} bytes)")] + TooLarge { size: usize, max: usize }, + + #[error("Empty patch (no changes)")] + EmptyPatch, + + #[error("Failed to apply patch: {0}")] + ApplyFailed(String), +} + +/// Create a compressed git diff from worktree changes. +/// +/// Generates a diff between `base_sha` and HEAD, then compresses it with gzip. +/// Returns the compressed patch bytes and the number of files changed. +pub async fn create_patch( + worktree_path: &Path, + base_sha: &str, +) -> Result<(Vec<u8>, usize), PatchError> { + // Get the diff between base commit and HEAD + let output = Command::new("git") + .current_dir(worktree_path) + .args(["diff", base_sha, "HEAD", "--binary"]) + .output() + .await + .map_err(|e| PatchError::GitCommand(format!("Failed to run git diff: {}", e)))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(PatchError::GitCommand(format!("git diff failed: {}", stderr))); + } + + let diff_data = output.stdout; + if diff_data.is_empty() { + return Err(PatchError::EmptyPatch); + } + + // Count files changed + let files_output = Command::new("git") + .current_dir(worktree_path) + .args(["diff", base_sha, "HEAD", "--name-only"]) + .output() + .await + .map_err(|e| PatchError::GitCommand(format!("Failed to count files: {}", e)))?; + + let files_count = if files_output.status.success() { + String::from_utf8_lossy(&files_output.stdout) + .lines() + .filter(|l| !l.is_empty()) + .count() + } else { + 0 + }; + + // Compress with gzip + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(&diff_data)?; + let compressed = encoder.finish()?; + + Ok((compressed, files_count)) +} + +/// Apply a compressed patch to restore worktree state. +/// +/// The worktree should already be checked out at `base_sha` before calling this. +pub async fn apply_patch(worktree_path: &Path, patch_data: &[u8]) -> Result<(), PatchError> { + // Decompress gzip + let mut decoder = GzDecoder::new(patch_data); + let mut decompressed = Vec::new(); + decoder.read_to_end(&mut decompressed)?; + + if decompressed.is_empty() { + return Err(PatchError::EmptyPatch); + } + + // Apply the patch using git apply + let mut child = Command::new("git") + .current_dir(worktree_path) + .args(["apply", "--binary", "-"]) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .map_err(|e| PatchError::GitCommand(format!("Failed to spawn git apply: {}", e)))?; + + // Write patch to stdin + if let Some(mut stdin) = child.stdin.take() { + use tokio::io::AsyncWriteExt; + stdin.write_all(&decompressed).await?; + drop(stdin); // Close stdin to signal EOF + } + + let output = child + .wait_with_output() + .await + .map_err(|e| PatchError::GitCommand(format!("Failed to wait for git apply: {}", e)))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(PatchError::ApplyFailed(stderr.to_string())); + } + + Ok(()) +} + +/// Get the parent commit SHA (HEAD~1) from a worktree. +pub async fn get_parent_sha(worktree_path: &Path) -> Result<String, PatchError> { + let output = Command::new("git") + .current_dir(worktree_path) + .args(["rev-parse", "HEAD~1"]) + .output() + .await + .map_err(|e| PatchError::GitCommand(format!("Failed to get parent SHA: {}", e)))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(PatchError::GitCommand(format!( + "git rev-parse HEAD~1 failed: {}", + stderr + ))); + } + + Ok(String::from_utf8_lossy(&output.stdout).trim().to_string()) +} + +/// Checkout a specific commit in the worktree. +pub async fn checkout_commit(worktree_path: &Path, sha: &str) -> Result<(), PatchError> { + let output = Command::new("git") + .current_dir(worktree_path) + .args(["checkout", sha]) + .output() + .await + .map_err(|e| PatchError::GitCommand(format!("Failed to checkout: {}", e)))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(PatchError::GitCommand(format!( + "git checkout {} failed: {}", + sha, stderr + ))); + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fs; + use tempfile::TempDir; + + async fn setup_test_repo() -> TempDir { + let dir = TempDir::new().unwrap(); + let path = dir.path(); + + // Initialize git repo + Command::new("git") + .current_dir(path) + .args(["init"]) + .output() + .await + .unwrap(); + + // Configure git user + Command::new("git") + .current_dir(path) + .args(["config", "user.email", "test@test.com"]) + .output() + .await + .unwrap(); + Command::new("git") + .current_dir(path) + .args(["config", "user.name", "Test"]) + .output() + .await + .unwrap(); + + // Create initial commit + fs::write(path.join("file.txt"), "initial").unwrap(); + Command::new("git") + .current_dir(path) + .args(["add", "."]) + .output() + .await + .unwrap(); + Command::new("git") + .current_dir(path) + .args(["commit", "-m", "initial"]) + .output() + .await + .unwrap(); + + dir + } + + #[tokio::test] + async fn test_create_and_apply_patch() { + let dir = setup_test_repo().await; + let path = dir.path(); + + // Get base SHA + let base_sha = get_parent_sha(path).await; + // This will fail since there's only one commit + assert!(base_sha.is_err()); + + // Make another commit first + fs::write(path.join("file.txt"), "modified").unwrap(); + Command::new("git") + .current_dir(path) + .args(["add", "."]) + .output() + .await + .unwrap(); + Command::new("git") + .current_dir(path) + .args(["commit", "-m", "modified"]) + .output() + .await + .unwrap(); + + // Now get the base SHA + let base_sha = get_parent_sha(path).await.unwrap(); + + // Create patch + let (patch_data, files_count) = create_patch(path, &base_sha).await.unwrap(); + assert!(!patch_data.is_empty()); + assert_eq!(files_count, 1); + + // Reset to base and apply patch + checkout_commit(path, &base_sha).await.unwrap(); + assert_eq!(fs::read_to_string(path.join("file.txt")).unwrap(), "initial"); + + apply_patch(path, &patch_data).await.unwrap(); + assert_eq!( + fs::read_to_string(path.join("file.txt")).unwrap(), + "modified" + ); + } + + #[tokio::test] + async fn test_empty_patch() { + let dir = setup_test_repo().await; + let path = dir.path(); + + // Make another commit + fs::write(path.join("file.txt"), "modified").unwrap(); + Command::new("git") + .current_dir(path) + .args(["add", "."]) + .output() + .await + .unwrap(); + Command::new("git") + .current_dir(path) + .args(["commit", "-m", "modified"]) + .output() + .await + .unwrap(); + + // Get current HEAD + let head_output = Command::new("git") + .current_dir(path) + .args(["rev-parse", "HEAD"]) + .output() + .await + .unwrap(); + let head_sha = String::from_utf8_lossy(&head_output.stdout) + .trim() + .to_string(); + + // Try to create patch from HEAD to HEAD (no changes) + let result = create_patch(path, &head_sha).await; + assert!(matches!(result, Err(PatchError::EmptyPatch))); + } +} diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs index 0cba516..cb4bde2 100644 --- a/makima/src/daemon/task/manager.rs +++ b/makima/src/daemon/task/manager.rs @@ -5,6 +5,7 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Instant; +use base64::Engine; use rand::Rng; use tokio::io::AsyncWriteExt; use tokio::sync::{mpsc, RwLock}; @@ -14,8 +15,10 @@ use std::collections::HashSet; use super::completion_gate::{CircuitBreaker, CompletionGate}; use super::state::TaskState; +use crate::daemon::config::CheckpointPatchConfig; use crate::daemon::error::{DaemonError, TaskError, TaskResult}; use crate::daemon::process::{ClaudeInputMessage, ProcessManager}; +use crate::daemon::storage; use crate::daemon::temp::TempManager; use crate::daemon::worktree::{is_new_repo_request, ConflictResolution, WorktreeInfo, WorktreeManager}; use crate::daemon::ws::{BranchInfo, DaemonCommand, DaemonMessage}; @@ -989,6 +992,8 @@ pub struct TaskConfig { /// Interval in seconds between heartbeat commits (WIP checkpoints). /// Set to 0 to disable. Default: 300 (5 minutes). pub heartbeat_commit_interval_secs: u64, + /// Checkpoint patch storage configuration. + pub checkpoint_patches: CheckpointPatchConfig, } impl Default for TaskConfig { @@ -1007,6 +1012,7 @@ impl Default for TaskConfig { api_url: "https://api.makima.jp".to_string(), api_key: String::new(), heartbeat_commit_interval_secs: 300, // 5 minutes + checkpoint_patches: CheckpointPatchConfig::default(), } } } @@ -1405,6 +1411,8 @@ impl TaskManager { autonomous_loop, resume_session, conversation_history, + patch_data, + patch_base_sha, } => { tracing::info!( task_id = %task_id, @@ -1431,7 +1439,7 @@ impl TaskManager { parent_task_id, depth, is_orchestrator, is_supervisor, target_repo_path, completion_action, continue_from_task_id, copy_files, contract_id, autonomous_loop, resume_session, - conversation_history, + conversation_history, patch_data, patch_base_sha, ).await?; } DaemonCommand::PauseTask { task_id } => { @@ -1529,6 +1537,8 @@ impl TaskManager { false, // autonomous_loop - supervisors don't use this false, // resume_session - respawning from scratch None, // conversation_history - not needed for fresh respawn + None, // patch_data - not available for respawn + None, // patch_base_sha - not available for respawn ).await { tracing::error!( task_id = %task_id, @@ -1755,17 +1765,22 @@ impl TaskManager { autonomous_loop: bool, resume_session: bool, conversation_history: Option<serde_json::Value>, + patch_data: Option<String>, + patch_base_sha: Option<String>, ) -> TaskResult<()> { - tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, "=== SPAWN_TASK START ==="); + tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, patch_available = patch_data.is_some(), "=== SPAWN_TASK START ==="); // Check if task already exists - allow re-spawning if in terminal state + // or if resuming a supervisor (supervisors stay in Running state after Claude exits) { let mut tasks = self.tasks.write().await; if let Some(existing) = tasks.get(&task_id) { - if existing.state.is_terminal() { - // Task exists but is in terminal state (completed, failed, interrupted) - // Remove it so we can re-spawn - tracing::info!(task_id = %task_id, old_state = ?existing.state, "Removing terminated task to allow re-spawn"); + let can_respawn = existing.state.is_terminal() + || (resume_session && existing.is_supervisor); + + if can_respawn { + // Task exists but can be re-spawned (terminal state or supervisor resume) + tracing::info!(task_id = %task_id, old_state = ?existing.state, resume_session = resume_session, is_supervisor = existing.is_supervisor, "Removing task to allow re-spawn"); tasks.remove(&task_id); } else { // Task is still active, reject @@ -1825,7 +1840,7 @@ impl TaskManager { task_id, task_name, plan, repo_url, base_branch, target_branch, is_orchestrator, is_supervisor, target_repo_path, completion_action, continue_from_task_id, copy_files, contract_id, autonomous_loop, resume_session, - conversation_history, + conversation_history, patch_data, patch_base_sha, ).await { tracing::error!(task_id = %task_id, error = %e, "Task execution failed"); inner.mark_failed(task_id, &e.to_string()).await; @@ -1855,6 +1870,7 @@ impl TaskManager { api_url: self.config.api_url.clone(), heartbeat_commit_interval_secs: self.config.heartbeat_commit_interval_secs, contract_task_counts: self.contract_task_counts.clone(), + checkpoint_patches: self.config.checkpoint_patches.clone(), } } @@ -2824,6 +2840,9 @@ impl TaskManager { lines_removed: None, error: Some(format!("Task {} not found or has no worktree", task_id)), message, + patch_data: None, + patch_base_sha: None, + patch_files_count: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); @@ -2854,6 +2873,9 @@ impl TaskManager { lines_removed: None, error: Some("No changes to checkpoint".to_string()), message, + patch_data: None, + patch_base_sha: None, + patch_files_count: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); @@ -2878,6 +2900,9 @@ impl TaskManager { lines_removed: None, error: Some(format!("Failed to stage changes: {}", e)), message, + patch_data: None, + patch_base_sha: None, + patch_files_count: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); @@ -2920,6 +2945,9 @@ impl TaskManager { lines_removed: Some(lines_removed), error: Some(format!("Commit failed: {}", stderr)), message, + patch_data: None, + patch_base_sha: None, + patch_files_count: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); @@ -2936,6 +2964,9 @@ impl TaskManager { lines_removed: None, error: Some(format!("Failed to execute git commit: {}", e)), message, + patch_data: None, + patch_base_sha: None, + patch_files_count: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); @@ -2943,6 +2974,7 @@ impl TaskManager { }; // Success - send response (checkpoint_number will be assigned by server on DB insert) + // Note: Manual checkpoints don't include patches (only heartbeat commits do) let msg = DaemonMessage::CheckpointCreated { task_id, success: true, @@ -2954,6 +2986,9 @@ impl TaskManager { lines_removed: Some(lines_removed), error: None, message, + patch_data: None, + patch_base_sha: None, + patch_files_count: None, }; let _ = self.ws_tx.send(msg).await; Ok(()) @@ -3153,6 +3188,8 @@ struct TaskManagerInner { heartbeat_commit_interval_secs: u64, /// Shared contract task counts for releasing concurrency slots. contract_task_counts: Arc<RwLock<HashMap<Uuid, usize>>>, + /// Checkpoint patch storage configuration. + checkpoint_patches: CheckpointPatchConfig, } impl TaskManagerInner { @@ -3193,8 +3230,10 @@ impl TaskManagerInner { autonomous_loop: bool, resume_session: bool, conversation_history: Option<serde_json::Value>, + patch_data: Option<String>, + patch_base_sha: Option<String>, ) -> Result<(), DaemonError> { - tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, resume_session = resume_session, "=== RUN_TASK START ==="); + tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, resume_session = resume_session, has_patch = patch_data.is_some(), "=== RUN_TASK START ==="); // If resuming session, try to find existing worktree first let existing_worktree = if resume_session { @@ -3212,8 +3251,83 @@ impl TaskManagerInner { None }; + // Try to restore from patch if worktree is missing but we have patch data + let restored_from_patch = if existing_worktree.is_none() { + if let (Some(patch_str), Some(base_sha), Some(source)) = (&patch_data, &patch_base_sha, &repo_source) { + tracing::info!( + task_id = %task_id, + base_sha = %base_sha, + patch_len = patch_str.len(), + "Attempting to restore worktree from patch" + ); + + let msg = DaemonMessage::task_output( + task_id, + format!("Restoring worktree from checkpoint patch...\n"), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // Decode base64 patch data + match base64::Engine::decode(&base64::engine::general_purpose::STANDARD, patch_str) { + Ok(patch_bytes) => { + match self.worktree_manager.restore_from_patch( + source, + task_id, + &task_name, + base_sha, + &patch_bytes, + ).await { + Ok(worktree_info) => { + tracing::info!( + task_id = %task_id, + path = %worktree_info.path.display(), + "Successfully restored worktree from patch" + ); + + // Store worktree info + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.worktree = Some(worktree_info.clone()); + } + } + + let msg = DaemonMessage::task_output( + task_id, + format!("Worktree restored at {}\n", worktree_info.path.display()), + false, + ); + let _ = self.ws_tx.send(msg).await; + + Some(worktree_info.path) + } + Err(e) => { + tracing::warn!(task_id = %task_id, error = %e, "Failed to restore from patch, will clone fresh"); + let msg = DaemonMessage::task_output( + task_id, + format!("Warning: Failed to restore from patch ({}), starting fresh\n", e), + false, + ); + let _ = self.ws_tx.send(msg).await; + None + } + } + } + Err(e) => { + tracing::warn!(task_id = %task_id, error = %e, "Failed to decode patch data"); + None + } + } + } else { + None + } + } else { + None + }; + // Determine working directory - let has_existing_worktree = existing_worktree.is_some(); + let has_existing_worktree = existing_worktree.is_some() || restored_from_patch.is_some(); let working_dir = if let Some(existing) = existing_worktree { // Reuse existing worktree for session resume let msg = DaemonMessage::task_output( @@ -3223,6 +3337,9 @@ impl TaskManagerInner { ); let _ = self.ws_tx.send(msg).await; existing + } else if let Some(restored_path) = restored_from_patch { + // Already restored from patch above + restored_path } else if let Some(ref source) = repo_source { if is_new_repo_request(source) { // Explicit new repo request: new:// or new://project-name @@ -4523,12 +4640,24 @@ impl TaskManagerInner { /// Create a heartbeat commit with all uncommitted changes (WIP checkpoint). /// Returns (commit SHA, push succeeded) on success, or an error message if nothing to commit. + /// Also creates a patch and sends it to the server for recovery purposes. async fn create_heartbeat_commit( &self, task_id: Uuid, worktree_path: &std::path::Path, ) -> Result<(String, bool), String> { - // 1. Check for uncommitted changes using git status --porcelain + // 1. Get parent SHA BEFORE committing (for patch creation) + let parent_sha_output = tokio::process::Command::new("git") + .current_dir(worktree_path) + .args(["rev-parse", "HEAD"]) + .output() + .await; + let parent_sha = parent_sha_output + .ok() + .filter(|o| o.status.success()) + .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string()); + + // 2. Check for uncommitted changes using git status --porcelain let status_output = tokio::process::Command::new("git") .current_dir(worktree_path) .args(["status", "--porcelain"]) @@ -4546,7 +4675,7 @@ impl TaskManagerInner { return Err("No changes to commit".into()); } - // 2. Stage all changes + // 3. Stage all changes let add_output = tokio::process::Command::new("git") .current_dir(worktree_path) .args(["add", "-A"]) @@ -4559,7 +4688,7 @@ impl TaskManagerInner { return Err(format!("git add failed: {}", stderr)); } - // 3. Create WIP commit with timestamp + // 4. Create WIP commit with timestamp let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC"); let commit_msg = format!("[WIP] Heartbeat checkpoint - {}", timestamp); @@ -4575,7 +4704,7 @@ impl TaskManagerInner { return Err(format!("git commit failed: {}", stderr)); } - // 4. Get the commit SHA + // 5. Get the commit SHA let sha_output = tokio::process::Command::new("git") .current_dir(worktree_path) .args(["rev-parse", "HEAD"]) @@ -4591,7 +4720,19 @@ impl TaskManagerInner { let sha = String::from_utf8_lossy(&sha_output.stdout).trim().to_string(); tracing::info!(task_id = %task_id, sha = %sha, "Created heartbeat commit"); - // 5. Push to remote (best effort - don't fail if push fails) + // 6. Get current branch name + let branch_output = tokio::process::Command::new("git") + .current_dir(worktree_path) + .args(["branch", "--show-current"]) + .output() + .await; + let branch_name = branch_output + .ok() + .filter(|o| o.status.success()) + .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string()) + .unwrap_or_else(|| "unknown".to_string()); + + // 7. Push to remote (best effort - don't fail if push fails) let push_output = tokio::process::Command::new("git") .current_dir(worktree_path) .args(["push"]) @@ -4614,6 +4755,68 @@ impl TaskManagerInner { } }; + // 8. Create patch and send CheckpointCreated message to server + let mut patch_data: Option<String> = None; + let mut patch_base_sha: Option<String> = None; + let mut patch_files_count: Option<i32> = None; + + if self.checkpoint_patches.enabled { + if let Some(ref base_sha) = parent_sha { + match storage::create_patch(worktree_path, base_sha).await { + Ok((compressed_patch, files_count)) => { + // Check size limit + if compressed_patch.len() <= self.checkpoint_patches.max_patch_size_bytes { + // Encode as base64 for JSON transport + patch_data = Some(base64::engine::general_purpose::STANDARD.encode(&compressed_patch)); + patch_base_sha = Some(base_sha.clone()); + patch_files_count = Some(files_count as i32); + tracing::debug!( + task_id = %task_id, + sha = %sha, + patch_size = compressed_patch.len(), + files_count = files_count, + "Created checkpoint patch" + ); + } else { + tracing::warn!( + task_id = %task_id, + sha = %sha, + patch_size = compressed_patch.len(), + max_size = self.checkpoint_patches.max_patch_size_bytes, + "Patch exceeds size limit, not including in checkpoint" + ); + } + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + sha = %sha, + error = %e, + "Failed to create patch for heartbeat commit" + ); + } + } + } + } + + // Send CheckpointCreated message to server (so it stores the checkpoint and patch) + let msg = DaemonMessage::CheckpointCreated { + task_id, + success: true, + commit_sha: Some(sha.clone()), + branch_name: Some(branch_name), + checkpoint_number: None, // Server will assign + files_changed: None, // Could get from git diff --name-status if needed + lines_added: None, + lines_removed: None, + error: None, + message: commit_msg, + patch_data, + patch_base_sha, + patch_files_count, + }; + let _ = self.ws_tx.send(msg).await; + Ok((sha, pushed)) } } @@ -4633,6 +4836,7 @@ impl Clone for TaskManagerInner { api_url: self.api_url.clone(), heartbeat_commit_interval_secs: self.heartbeat_commit_interval_secs, contract_task_counts: self.contract_task_counts.clone(), + checkpoint_patches: self.checkpoint_patches.clone(), } } } diff --git a/makima/src/daemon/worktree/manager.rs b/makima/src/daemon/worktree/manager.rs index 5edd7b1..04cb307 100644 --- a/makima/src/daemon/worktree/manager.rs +++ b/makima/src/daemon/worktree/manager.rs @@ -1697,6 +1697,141 @@ impl WorktreeManager { pub async fn target_directory_exists(&self, target_dir: &Path) -> bool { target_dir.exists() } + + /// Restore a worktree from a stored patch. + /// + /// This is used for task recovery when the local worktree has been lost. + /// 1. Clone/fetch the source repo to get the base commit + /// 2. Create a new worktree at the base commit + /// 3. Apply the patch to restore the task's state + pub async fn restore_from_patch( + &self, + source_repo: &str, + task_id: Uuid, + task_name: &str, + base_commit_sha: &str, + patch_data: &[u8], + ) -> Result<WorktreeInfo, WorktreeError> { + use crate::daemon::storage; + + // Generate directory and branch names + let dir_name = format!("{}-{}", short_uuid(task_id), sanitize_name(task_name)); + let worktree_path = self.base_dir.join(&dir_name); + let branch_name = format!( + "{}{}-{}", + self.branch_prefix, + sanitize_name(task_name), + short_uuid(task_id) + ); + + // Ensure base directory exists + tokio::fs::create_dir_all(&self.base_dir).await?; + + // Remove existing worktree if present (we're restoring from scratch) + if worktree_path.exists() { + tracing::info!( + task_id = %task_id, + worktree_path = %worktree_path.display(), + "Removing existing worktree before restore" + ); + tokio::fs::remove_dir_all(&worktree_path).await?; + } + + // Clone the source repo if needed + let repo_path = self.ensure_repo(source_repo).await?; + + // Create worktree at the base commit + // First, we need to make sure the base commit is available + let fetch_output = Command::new("git") + .args(["fetch", "--all"]) + .current_dir(&repo_path) + .output() + .await?; + + if !fetch_output.status.success() { + tracing::warn!( + task_id = %task_id, + stderr = %String::from_utf8_lossy(&fetch_output.stderr), + "git fetch failed, continuing anyway" + ); + } + + // Create the worktree from the base commit + let output = Command::new("git") + .args([ + "worktree", + "add", + "-b", + &branch_name, + worktree_path.to_str().ok_or_else(|| { + WorktreeError::InvalidPath("Invalid worktree path".to_string()) + })?, + base_commit_sha, + ]) + .current_dir(&repo_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + // If branch already exists, try without -b flag + if stderr.contains("already exists") { + // Remove the branch and try again + let _ = Command::new("git") + .args(["branch", "-D", &branch_name]) + .current_dir(&repo_path) + .output() + .await; + + let retry_output = Command::new("git") + .args([ + "worktree", + "add", + "-b", + &branch_name, + worktree_path.to_str().unwrap(), + base_commit_sha, + ]) + .current_dir(&repo_path) + .output() + .await?; + + if !retry_output.status.success() { + return Err(WorktreeError::GitCommand(format!( + "Failed to create worktree after retry: {}", + String::from_utf8_lossy(&retry_output.stderr) + ))); + } + } else { + return Err(WorktreeError::GitCommand(format!( + "Failed to create worktree: {}", + stderr + ))); + } + } + + // Apply the patch to restore the task's state + if let Err(e) = storage::apply_patch(&worktree_path, patch_data).await { + tracing::error!( + task_id = %task_id, + error = %e, + "Failed to apply patch, worktree is at base commit" + ); + // Don't fail - the worktree is usable, just at the base commit + } else { + tracing::info!( + task_id = %task_id, + worktree_path = %worktree_path.display(), + "Successfully restored worktree from patch" + ); + } + + Ok(WorktreeInfo { + path: worktree_path, + branch: branch_name, + source_repo: repo_path, + }) + } } /// Check if repo_source is a "new repo" request. diff --git a/makima/src/daemon/ws/protocol.rs b/makima/src/daemon/ws/protocol.rs index 3b02b53..ec9b09e 100644 --- a/makima/src/daemon/ws/protocol.rs +++ b/makima/src/daemon/ws/protocol.rs @@ -278,6 +278,15 @@ pub enum DaemonMessage { error: Option<String>, /// User-provided checkpoint message message: String, + /// Base64-encoded gzip-compressed patch data for recovery + #[serde(rename = "patchData", skip_serializing_if = "Option::is_none")] + patch_data: Option<String>, + /// Commit SHA to apply patch on top of (for recovery) + #[serde(rename = "patchBaseSha", skip_serializing_if = "Option::is_none")] + patch_base_sha: Option<String>, + /// Number of files in the patch + #[serde(rename = "patchFilesCount", skip_serializing_if = "Option::is_none")] + patch_files_count: Option<i32>, }, /// Response to CleanupWorktree command. @@ -387,6 +396,13 @@ pub enum DaemonCommand { /// Used to inject previous conversation context into the prompt. #[serde(rename = "conversationHistory", default)] conversation_history: Option<serde_json::Value>, + /// Base64-encoded gzip-compressed patch for worktree recovery. + /// Used when resume_session=true and the local worktree is missing. + #[serde(rename = "patchData", default, skip_serializing_if = "Option::is_none")] + patch_data: Option<String>, + /// Commit SHA to apply the patch on top of. + #[serde(rename = "patchBaseSha", default, skip_serializing_if = "Option::is_none")] + patch_base_sha: Option<String>, }, /// Pause a running task. |
