diff options
| author | soryu <soryu@soryu.co> | 2026-01-22 22:32:46 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-23 01:03:04 +0000 |
| commit | 1ed362424dafec690f919154f5116471951cda9c (patch) | |
| tree | 19c7ca9231887394a791223fe32a8ad335a687a8 /makima/src/daemon/task | |
| parent | 265f8cf14fec9d7116d09af49e4b48b357faceda (diff) | |
| download | soryu-1ed362424dafec690f919154f5116471951cda9c.tar.gz soryu-1ed362424dafec690f919154f5116471951cda9c.zip | |
Add patch checkpointing
Diffstat (limited to 'makima/src/daemon/task')
| -rw-r--r-- | makima/src/daemon/task/manager.rs | 232 |
1 files changed, 218 insertions, 14 deletions
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs index 0cba516..cb4bde2 100644 --- a/makima/src/daemon/task/manager.rs +++ b/makima/src/daemon/task/manager.rs @@ -5,6 +5,7 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Instant; +use base64::Engine; use rand::Rng; use tokio::io::AsyncWriteExt; use tokio::sync::{mpsc, RwLock}; @@ -14,8 +15,10 @@ use std::collections::HashSet; use super::completion_gate::{CircuitBreaker, CompletionGate}; use super::state::TaskState; +use crate::daemon::config::CheckpointPatchConfig; use crate::daemon::error::{DaemonError, TaskError, TaskResult}; use crate::daemon::process::{ClaudeInputMessage, ProcessManager}; +use crate::daemon::storage; use crate::daemon::temp::TempManager; use crate::daemon::worktree::{is_new_repo_request, ConflictResolution, WorktreeInfo, WorktreeManager}; use crate::daemon::ws::{BranchInfo, DaemonCommand, DaemonMessage}; @@ -989,6 +992,8 @@ pub struct TaskConfig { /// Interval in seconds between heartbeat commits (WIP checkpoints). /// Set to 0 to disable. Default: 300 (5 minutes). pub heartbeat_commit_interval_secs: u64, + /// Checkpoint patch storage configuration. + pub checkpoint_patches: CheckpointPatchConfig, } impl Default for TaskConfig { @@ -1007,6 +1012,7 @@ impl Default for TaskConfig { api_url: "https://api.makima.jp".to_string(), api_key: String::new(), heartbeat_commit_interval_secs: 300, // 5 minutes + checkpoint_patches: CheckpointPatchConfig::default(), } } } @@ -1405,6 +1411,8 @@ impl TaskManager { autonomous_loop, resume_session, conversation_history, + patch_data, + patch_base_sha, } => { tracing::info!( task_id = %task_id, @@ -1431,7 +1439,7 @@ impl TaskManager { parent_task_id, depth, is_orchestrator, is_supervisor, target_repo_path, completion_action, continue_from_task_id, copy_files, contract_id, autonomous_loop, resume_session, - conversation_history, + conversation_history, patch_data, patch_base_sha, ).await?; } DaemonCommand::PauseTask { task_id } => { @@ -1529,6 +1537,8 @@ impl TaskManager { false, // autonomous_loop - supervisors don't use this false, // resume_session - respawning from scratch None, // conversation_history - not needed for fresh respawn + None, // patch_data - not available for respawn + None, // patch_base_sha - not available for respawn ).await { tracing::error!( task_id = %task_id, @@ -1755,17 +1765,22 @@ impl TaskManager { autonomous_loop: bool, resume_session: bool, conversation_history: Option<serde_json::Value>, + patch_data: Option<String>, + patch_base_sha: Option<String>, ) -> TaskResult<()> { - tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, "=== SPAWN_TASK START ==="); + tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, patch_available = patch_data.is_some(), "=== SPAWN_TASK START ==="); // Check if task already exists - allow re-spawning if in terminal state + // or if resuming a supervisor (supervisors stay in Running state after Claude exits) { let mut tasks = self.tasks.write().await; if let Some(existing) = tasks.get(&task_id) { - if existing.state.is_terminal() { - // Task exists but is in terminal state (completed, failed, interrupted) - // Remove it so we can re-spawn - tracing::info!(task_id = %task_id, old_state = ?existing.state, "Removing terminated task to allow re-spawn"); + let can_respawn = existing.state.is_terminal() + || (resume_session && existing.is_supervisor); + + if can_respawn { + // Task exists but can be re-spawned (terminal state or supervisor resume) + tracing::info!(task_id = %task_id, old_state = ?existing.state, resume_session = resume_session, is_supervisor = existing.is_supervisor, "Removing task to allow re-spawn"); tasks.remove(&task_id); } else { // Task is still active, reject @@ -1825,7 +1840,7 @@ impl TaskManager { task_id, task_name, plan, repo_url, base_branch, target_branch, is_orchestrator, is_supervisor, target_repo_path, completion_action, continue_from_task_id, copy_files, contract_id, autonomous_loop, resume_session, - conversation_history, + conversation_history, patch_data, patch_base_sha, ).await { tracing::error!(task_id = %task_id, error = %e, "Task execution failed"); inner.mark_failed(task_id, &e.to_string()).await; @@ -1855,6 +1870,7 @@ impl TaskManager { api_url: self.config.api_url.clone(), heartbeat_commit_interval_secs: self.config.heartbeat_commit_interval_secs, contract_task_counts: self.contract_task_counts.clone(), + checkpoint_patches: self.config.checkpoint_patches.clone(), } } @@ -2824,6 +2840,9 @@ impl TaskManager { lines_removed: None, error: Some(format!("Task {} not found or has no worktree", task_id)), message, + patch_data: None, + patch_base_sha: None, + patch_files_count: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); @@ -2854,6 +2873,9 @@ impl TaskManager { lines_removed: None, error: Some("No changes to checkpoint".to_string()), message, + patch_data: None, + patch_base_sha: None, + patch_files_count: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); @@ -2878,6 +2900,9 @@ impl TaskManager { lines_removed: None, error: Some(format!("Failed to stage changes: {}", e)), message, + patch_data: None, + patch_base_sha: None, + patch_files_count: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); @@ -2920,6 +2945,9 @@ impl TaskManager { lines_removed: Some(lines_removed), error: Some(format!("Commit failed: {}", stderr)), message, + patch_data: None, + patch_base_sha: None, + patch_files_count: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); @@ -2936,6 +2964,9 @@ impl TaskManager { lines_removed: None, error: Some(format!("Failed to execute git commit: {}", e)), message, + patch_data: None, + patch_base_sha: None, + patch_files_count: None, }; let _ = self.ws_tx.send(msg).await; return Ok(()); @@ -2943,6 +2974,7 @@ impl TaskManager { }; // Success - send response (checkpoint_number will be assigned by server on DB insert) + // Note: Manual checkpoints don't include patches (only heartbeat commits do) let msg = DaemonMessage::CheckpointCreated { task_id, success: true, @@ -2954,6 +2986,9 @@ impl TaskManager { lines_removed: Some(lines_removed), error: None, message, + patch_data: None, + patch_base_sha: None, + patch_files_count: None, }; let _ = self.ws_tx.send(msg).await; Ok(()) @@ -3153,6 +3188,8 @@ struct TaskManagerInner { heartbeat_commit_interval_secs: u64, /// Shared contract task counts for releasing concurrency slots. contract_task_counts: Arc<RwLock<HashMap<Uuid, usize>>>, + /// Checkpoint patch storage configuration. + checkpoint_patches: CheckpointPatchConfig, } impl TaskManagerInner { @@ -3193,8 +3230,10 @@ impl TaskManagerInner { autonomous_loop: bool, resume_session: bool, conversation_history: Option<serde_json::Value>, + patch_data: Option<String>, + patch_base_sha: Option<String>, ) -> Result<(), DaemonError> { - tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, resume_session = resume_session, "=== RUN_TASK START ==="); + tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, resume_session = resume_session, has_patch = patch_data.is_some(), "=== RUN_TASK START ==="); // If resuming session, try to find existing worktree first let existing_worktree = if resume_session { @@ -3212,8 +3251,83 @@ impl TaskManagerInner { None }; + // Try to restore from patch if worktree is missing but we have patch data + let restored_from_patch = if existing_worktree.is_none() { + if let (Some(patch_str), Some(base_sha), Some(source)) = (&patch_data, &patch_base_sha, &repo_source) { + tracing::info!( + task_id = %task_id, + base_sha = %base_sha, + patch_len = patch_str.len(), + "Attempting to restore worktree from patch" + ); + + let msg = DaemonMessage::task_output( + task_id, + format!("Restoring worktree from checkpoint patch...\n"), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // Decode base64 patch data + match base64::Engine::decode(&base64::engine::general_purpose::STANDARD, patch_str) { + Ok(patch_bytes) => { + match self.worktree_manager.restore_from_patch( + source, + task_id, + &task_name, + base_sha, + &patch_bytes, + ).await { + Ok(worktree_info) => { + tracing::info!( + task_id = %task_id, + path = %worktree_info.path.display(), + "Successfully restored worktree from patch" + ); + + // Store worktree info + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.worktree = Some(worktree_info.clone()); + } + } + + let msg = DaemonMessage::task_output( + task_id, + format!("Worktree restored at {}\n", worktree_info.path.display()), + false, + ); + let _ = self.ws_tx.send(msg).await; + + Some(worktree_info.path) + } + Err(e) => { + tracing::warn!(task_id = %task_id, error = %e, "Failed to restore from patch, will clone fresh"); + let msg = DaemonMessage::task_output( + task_id, + format!("Warning: Failed to restore from patch ({}), starting fresh\n", e), + false, + ); + let _ = self.ws_tx.send(msg).await; + None + } + } + } + Err(e) => { + tracing::warn!(task_id = %task_id, error = %e, "Failed to decode patch data"); + None + } + } + } else { + None + } + } else { + None + }; + // Determine working directory - let has_existing_worktree = existing_worktree.is_some(); + let has_existing_worktree = existing_worktree.is_some() || restored_from_patch.is_some(); let working_dir = if let Some(existing) = existing_worktree { // Reuse existing worktree for session resume let msg = DaemonMessage::task_output( @@ -3223,6 +3337,9 @@ impl TaskManagerInner { ); let _ = self.ws_tx.send(msg).await; existing + } else if let Some(restored_path) = restored_from_patch { + // Already restored from patch above + restored_path } else if let Some(ref source) = repo_source { if is_new_repo_request(source) { // Explicit new repo request: new:// or new://project-name @@ -4523,12 +4640,24 @@ impl TaskManagerInner { /// Create a heartbeat commit with all uncommitted changes (WIP checkpoint). /// Returns (commit SHA, push succeeded) on success, or an error message if nothing to commit. + /// Also creates a patch and sends it to the server for recovery purposes. async fn create_heartbeat_commit( &self, task_id: Uuid, worktree_path: &std::path::Path, ) -> Result<(String, bool), String> { - // 1. Check for uncommitted changes using git status --porcelain + // 1. Get parent SHA BEFORE committing (for patch creation) + let parent_sha_output = tokio::process::Command::new("git") + .current_dir(worktree_path) + .args(["rev-parse", "HEAD"]) + .output() + .await; + let parent_sha = parent_sha_output + .ok() + .filter(|o| o.status.success()) + .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string()); + + // 2. Check for uncommitted changes using git status --porcelain let status_output = tokio::process::Command::new("git") .current_dir(worktree_path) .args(["status", "--porcelain"]) @@ -4546,7 +4675,7 @@ impl TaskManagerInner { return Err("No changes to commit".into()); } - // 2. Stage all changes + // 3. Stage all changes let add_output = tokio::process::Command::new("git") .current_dir(worktree_path) .args(["add", "-A"]) @@ -4559,7 +4688,7 @@ impl TaskManagerInner { return Err(format!("git add failed: {}", stderr)); } - // 3. Create WIP commit with timestamp + // 4. Create WIP commit with timestamp let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC"); let commit_msg = format!("[WIP] Heartbeat checkpoint - {}", timestamp); @@ -4575,7 +4704,7 @@ impl TaskManagerInner { return Err(format!("git commit failed: {}", stderr)); } - // 4. Get the commit SHA + // 5. Get the commit SHA let sha_output = tokio::process::Command::new("git") .current_dir(worktree_path) .args(["rev-parse", "HEAD"]) @@ -4591,7 +4720,19 @@ impl TaskManagerInner { let sha = String::from_utf8_lossy(&sha_output.stdout).trim().to_string(); tracing::info!(task_id = %task_id, sha = %sha, "Created heartbeat commit"); - // 5. Push to remote (best effort - don't fail if push fails) + // 6. Get current branch name + let branch_output = tokio::process::Command::new("git") + .current_dir(worktree_path) + .args(["branch", "--show-current"]) + .output() + .await; + let branch_name = branch_output + .ok() + .filter(|o| o.status.success()) + .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string()) + .unwrap_or_else(|| "unknown".to_string()); + + // 7. Push to remote (best effort - don't fail if push fails) let push_output = tokio::process::Command::new("git") .current_dir(worktree_path) .args(["push"]) @@ -4614,6 +4755,68 @@ impl TaskManagerInner { } }; + // 8. Create patch and send CheckpointCreated message to server + let mut patch_data: Option<String> = None; + let mut patch_base_sha: Option<String> = None; + let mut patch_files_count: Option<i32> = None; + + if self.checkpoint_patches.enabled { + if let Some(ref base_sha) = parent_sha { + match storage::create_patch(worktree_path, base_sha).await { + Ok((compressed_patch, files_count)) => { + // Check size limit + if compressed_patch.len() <= self.checkpoint_patches.max_patch_size_bytes { + // Encode as base64 for JSON transport + patch_data = Some(base64::engine::general_purpose::STANDARD.encode(&compressed_patch)); + patch_base_sha = Some(base_sha.clone()); + patch_files_count = Some(files_count as i32); + tracing::debug!( + task_id = %task_id, + sha = %sha, + patch_size = compressed_patch.len(), + files_count = files_count, + "Created checkpoint patch" + ); + } else { + tracing::warn!( + task_id = %task_id, + sha = %sha, + patch_size = compressed_patch.len(), + max_size = self.checkpoint_patches.max_patch_size_bytes, + "Patch exceeds size limit, not including in checkpoint" + ); + } + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + sha = %sha, + error = %e, + "Failed to create patch for heartbeat commit" + ); + } + } + } + } + + // Send CheckpointCreated message to server (so it stores the checkpoint and patch) + let msg = DaemonMessage::CheckpointCreated { + task_id, + success: true, + commit_sha: Some(sha.clone()), + branch_name: Some(branch_name), + checkpoint_number: None, // Server will assign + files_changed: None, // Could get from git diff --name-status if needed + lines_added: None, + lines_removed: None, + error: None, + message: commit_msg, + patch_data, + patch_base_sha, + patch_files_count, + }; + let _ = self.ws_tx.send(msg).await; + Ok((sha, pushed)) } } @@ -4633,6 +4836,7 @@ impl Clone for TaskManagerInner { api_url: self.api_url.clone(), heartbeat_commit_interval_secs: self.heartbeat_commit_interval_secs, contract_task_counts: self.contract_task_counts.clone(), + checkpoint_patches: self.checkpoint_patches.clone(), } } } |
