summaryrefslogtreecommitdiff
path: root/makima/src/daemon/task
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-22 22:32:46 +0000
committersoryu <soryu@soryu.co>2026-01-23 01:03:04 +0000
commit1ed362424dafec690f919154f5116471951cda9c (patch)
tree19c7ca9231887394a791223fe32a8ad335a687a8 /makima/src/daemon/task
parent265f8cf14fec9d7116d09af49e4b48b357faceda (diff)
downloadsoryu-1ed362424dafec690f919154f5116471951cda9c.tar.gz
soryu-1ed362424dafec690f919154f5116471951cda9c.zip
Add patch checkpointing
Diffstat (limited to 'makima/src/daemon/task')
-rw-r--r--makima/src/daemon/task/manager.rs232
1 files changed, 218 insertions, 14 deletions
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs
index 0cba516..cb4bde2 100644
--- a/makima/src/daemon/task/manager.rs
+++ b/makima/src/daemon/task/manager.rs
@@ -5,6 +5,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
+use base64::Engine;
use rand::Rng;
use tokio::io::AsyncWriteExt;
use tokio::sync::{mpsc, RwLock};
@@ -14,8 +15,10 @@ use std::collections::HashSet;
use super::completion_gate::{CircuitBreaker, CompletionGate};
use super::state::TaskState;
+use crate::daemon::config::CheckpointPatchConfig;
use crate::daemon::error::{DaemonError, TaskError, TaskResult};
use crate::daemon::process::{ClaudeInputMessage, ProcessManager};
+use crate::daemon::storage;
use crate::daemon::temp::TempManager;
use crate::daemon::worktree::{is_new_repo_request, ConflictResolution, WorktreeInfo, WorktreeManager};
use crate::daemon::ws::{BranchInfo, DaemonCommand, DaemonMessage};
@@ -989,6 +992,8 @@ pub struct TaskConfig {
/// Interval in seconds between heartbeat commits (WIP checkpoints).
/// Set to 0 to disable. Default: 300 (5 minutes).
pub heartbeat_commit_interval_secs: u64,
+ /// Checkpoint patch storage configuration.
+ pub checkpoint_patches: CheckpointPatchConfig,
}
impl Default for TaskConfig {
@@ -1007,6 +1012,7 @@ impl Default for TaskConfig {
api_url: "https://api.makima.jp".to_string(),
api_key: String::new(),
heartbeat_commit_interval_secs: 300, // 5 minutes
+ checkpoint_patches: CheckpointPatchConfig::default(),
}
}
}
@@ -1405,6 +1411,8 @@ impl TaskManager {
autonomous_loop,
resume_session,
conversation_history,
+ patch_data,
+ patch_base_sha,
} => {
tracing::info!(
task_id = %task_id,
@@ -1431,7 +1439,7 @@ impl TaskManager {
parent_task_id, depth, is_orchestrator, is_supervisor,
target_repo_path, completion_action, continue_from_task_id,
copy_files, contract_id, autonomous_loop, resume_session,
- conversation_history,
+ conversation_history, patch_data, patch_base_sha,
).await?;
}
DaemonCommand::PauseTask { task_id } => {
@@ -1529,6 +1537,8 @@ impl TaskManager {
false, // autonomous_loop - supervisors don't use this
false, // resume_session - respawning from scratch
None, // conversation_history - not needed for fresh respawn
+ None, // patch_data - not available for respawn
+ None, // patch_base_sha - not available for respawn
).await {
tracing::error!(
task_id = %task_id,
@@ -1755,17 +1765,22 @@ impl TaskManager {
autonomous_loop: bool,
resume_session: bool,
conversation_history: Option<serde_json::Value>,
+ patch_data: Option<String>,
+ patch_base_sha: Option<String>,
) -> TaskResult<()> {
- tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, "=== SPAWN_TASK START ===");
+ tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, patch_available = patch_data.is_some(), "=== SPAWN_TASK START ===");
// Check if task already exists - allow re-spawning if in terminal state
+ // or if resuming a supervisor (supervisors stay in Running state after Claude exits)
{
let mut tasks = self.tasks.write().await;
if let Some(existing) = tasks.get(&task_id) {
- if existing.state.is_terminal() {
- // Task exists but is in terminal state (completed, failed, interrupted)
- // Remove it so we can re-spawn
- tracing::info!(task_id = %task_id, old_state = ?existing.state, "Removing terminated task to allow re-spawn");
+ let can_respawn = existing.state.is_terminal()
+ || (resume_session && existing.is_supervisor);
+
+ if can_respawn {
+ // Task exists but can be re-spawned (terminal state or supervisor resume)
+ tracing::info!(task_id = %task_id, old_state = ?existing.state, resume_session = resume_session, is_supervisor = existing.is_supervisor, "Removing task to allow re-spawn");
tasks.remove(&task_id);
} else {
// Task is still active, reject
@@ -1825,7 +1840,7 @@ impl TaskManager {
task_id, task_name, plan, repo_url, base_branch, target_branch,
is_orchestrator, is_supervisor, target_repo_path, completion_action,
continue_from_task_id, copy_files, contract_id, autonomous_loop, resume_session,
- conversation_history,
+ conversation_history, patch_data, patch_base_sha,
).await {
tracing::error!(task_id = %task_id, error = %e, "Task execution failed");
inner.mark_failed(task_id, &e.to_string()).await;
@@ -1855,6 +1870,7 @@ impl TaskManager {
api_url: self.config.api_url.clone(),
heartbeat_commit_interval_secs: self.config.heartbeat_commit_interval_secs,
contract_task_counts: self.contract_task_counts.clone(),
+ checkpoint_patches: self.config.checkpoint_patches.clone(),
}
}
@@ -2824,6 +2840,9 @@ impl TaskManager {
lines_removed: None,
error: Some(format!("Task {} not found or has no worktree", task_id)),
message,
+ patch_data: None,
+ patch_base_sha: None,
+ patch_files_count: None,
};
let _ = self.ws_tx.send(msg).await;
return Ok(());
@@ -2854,6 +2873,9 @@ impl TaskManager {
lines_removed: None,
error: Some("No changes to checkpoint".to_string()),
message,
+ patch_data: None,
+ patch_base_sha: None,
+ patch_files_count: None,
};
let _ = self.ws_tx.send(msg).await;
return Ok(());
@@ -2878,6 +2900,9 @@ impl TaskManager {
lines_removed: None,
error: Some(format!("Failed to stage changes: {}", e)),
message,
+ patch_data: None,
+ patch_base_sha: None,
+ patch_files_count: None,
};
let _ = self.ws_tx.send(msg).await;
return Ok(());
@@ -2920,6 +2945,9 @@ impl TaskManager {
lines_removed: Some(lines_removed),
error: Some(format!("Commit failed: {}", stderr)),
message,
+ patch_data: None,
+ patch_base_sha: None,
+ patch_files_count: None,
};
let _ = self.ws_tx.send(msg).await;
return Ok(());
@@ -2936,6 +2964,9 @@ impl TaskManager {
lines_removed: None,
error: Some(format!("Failed to execute git commit: {}", e)),
message,
+ patch_data: None,
+ patch_base_sha: None,
+ patch_files_count: None,
};
let _ = self.ws_tx.send(msg).await;
return Ok(());
@@ -2943,6 +2974,7 @@ impl TaskManager {
};
// Success - send response (checkpoint_number will be assigned by server on DB insert)
+ // Note: Manual checkpoints don't include patches (only heartbeat commits do)
let msg = DaemonMessage::CheckpointCreated {
task_id,
success: true,
@@ -2954,6 +2986,9 @@ impl TaskManager {
lines_removed: Some(lines_removed),
error: None,
message,
+ patch_data: None,
+ patch_base_sha: None,
+ patch_files_count: None,
};
let _ = self.ws_tx.send(msg).await;
Ok(())
@@ -3153,6 +3188,8 @@ struct TaskManagerInner {
heartbeat_commit_interval_secs: u64,
/// Shared contract task counts for releasing concurrency slots.
contract_task_counts: Arc<RwLock<HashMap<Uuid, usize>>>,
+ /// Checkpoint patch storage configuration.
+ checkpoint_patches: CheckpointPatchConfig,
}
impl TaskManagerInner {
@@ -3193,8 +3230,10 @@ impl TaskManagerInner {
autonomous_loop: bool,
resume_session: bool,
conversation_history: Option<serde_json::Value>,
+ patch_data: Option<String>,
+ patch_base_sha: Option<String>,
) -> Result<(), DaemonError> {
- tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, resume_session = resume_session, "=== RUN_TASK START ===");
+ tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, resume_session = resume_session, has_patch = patch_data.is_some(), "=== RUN_TASK START ===");
// If resuming session, try to find existing worktree first
let existing_worktree = if resume_session {
@@ -3212,8 +3251,83 @@ impl TaskManagerInner {
None
};
+ // Try to restore from patch if worktree is missing but we have patch data
+ let restored_from_patch = if existing_worktree.is_none() {
+ if let (Some(patch_str), Some(base_sha), Some(source)) = (&patch_data, &patch_base_sha, &repo_source) {
+ tracing::info!(
+ task_id = %task_id,
+ base_sha = %base_sha,
+ patch_len = patch_str.len(),
+ "Attempting to restore worktree from patch"
+ );
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("Restoring worktree from checkpoint patch...\n"),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ // Decode base64 patch data
+ match base64::Engine::decode(&base64::engine::general_purpose::STANDARD, patch_str) {
+ Ok(patch_bytes) => {
+ match self.worktree_manager.restore_from_patch(
+ source,
+ task_id,
+ &task_name,
+ base_sha,
+ &patch_bytes,
+ ).await {
+ Ok(worktree_info) => {
+ tracing::info!(
+ task_id = %task_id,
+ path = %worktree_info.path.display(),
+ "Successfully restored worktree from patch"
+ );
+
+ // Store worktree info
+ {
+ let mut tasks = self.tasks.write().await;
+ if let Some(task) = tasks.get_mut(&task_id) {
+ task.worktree = Some(worktree_info.clone());
+ }
+ }
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("Worktree restored at {}\n", worktree_info.path.display()),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ Some(worktree_info.path)
+ }
+ Err(e) => {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to restore from patch, will clone fresh");
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("Warning: Failed to restore from patch ({}), starting fresh\n", e),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+ None
+ }
+ }
+ }
+ Err(e) => {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to decode patch data");
+ None
+ }
+ }
+ } else {
+ None
+ }
+ } else {
+ None
+ };
+
// Determine working directory
- let has_existing_worktree = existing_worktree.is_some();
+ let has_existing_worktree = existing_worktree.is_some() || restored_from_patch.is_some();
let working_dir = if let Some(existing) = existing_worktree {
// Reuse existing worktree for session resume
let msg = DaemonMessage::task_output(
@@ -3223,6 +3337,9 @@ impl TaskManagerInner {
);
let _ = self.ws_tx.send(msg).await;
existing
+ } else if let Some(restored_path) = restored_from_patch {
+ // Already restored from patch above
+ restored_path
} else if let Some(ref source) = repo_source {
if is_new_repo_request(source) {
// Explicit new repo request: new:// or new://project-name
@@ -4523,12 +4640,24 @@ impl TaskManagerInner {
/// Create a heartbeat commit with all uncommitted changes (WIP checkpoint).
/// Returns (commit SHA, push succeeded) on success, or an error message if nothing to commit.
+ /// Also creates a patch and sends it to the server for recovery purposes.
async fn create_heartbeat_commit(
&self,
task_id: Uuid,
worktree_path: &std::path::Path,
) -> Result<(String, bool), String> {
- // 1. Check for uncommitted changes using git status --porcelain
+ // 1. Get parent SHA BEFORE committing (for patch creation)
+ let parent_sha_output = tokio::process::Command::new("git")
+ .current_dir(worktree_path)
+ .args(["rev-parse", "HEAD"])
+ .output()
+ .await;
+ let parent_sha = parent_sha_output
+ .ok()
+ .filter(|o| o.status.success())
+ .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string());
+
+ // 2. Check for uncommitted changes using git status --porcelain
let status_output = tokio::process::Command::new("git")
.current_dir(worktree_path)
.args(["status", "--porcelain"])
@@ -4546,7 +4675,7 @@ impl TaskManagerInner {
return Err("No changes to commit".into());
}
- // 2. Stage all changes
+ // 3. Stage all changes
let add_output = tokio::process::Command::new("git")
.current_dir(worktree_path)
.args(["add", "-A"])
@@ -4559,7 +4688,7 @@ impl TaskManagerInner {
return Err(format!("git add failed: {}", stderr));
}
- // 3. Create WIP commit with timestamp
+ // 4. Create WIP commit with timestamp
let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC");
let commit_msg = format!("[WIP] Heartbeat checkpoint - {}", timestamp);
@@ -4575,7 +4704,7 @@ impl TaskManagerInner {
return Err(format!("git commit failed: {}", stderr));
}
- // 4. Get the commit SHA
+ // 5. Get the commit SHA
let sha_output = tokio::process::Command::new("git")
.current_dir(worktree_path)
.args(["rev-parse", "HEAD"])
@@ -4591,7 +4720,19 @@ impl TaskManagerInner {
let sha = String::from_utf8_lossy(&sha_output.stdout).trim().to_string();
tracing::info!(task_id = %task_id, sha = %sha, "Created heartbeat commit");
- // 5. Push to remote (best effort - don't fail if push fails)
+ // 6. Get current branch name
+ let branch_output = tokio::process::Command::new("git")
+ .current_dir(worktree_path)
+ .args(["branch", "--show-current"])
+ .output()
+ .await;
+ let branch_name = branch_output
+ .ok()
+ .filter(|o| o.status.success())
+ .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string())
+ .unwrap_or_else(|| "unknown".to_string());
+
+ // 7. Push to remote (best effort - don't fail if push fails)
let push_output = tokio::process::Command::new("git")
.current_dir(worktree_path)
.args(["push"])
@@ -4614,6 +4755,68 @@ impl TaskManagerInner {
}
};
+ // 8. Create patch and send CheckpointCreated message to server
+ let mut patch_data: Option<String> = None;
+ let mut patch_base_sha: Option<String> = None;
+ let mut patch_files_count: Option<i32> = None;
+
+ if self.checkpoint_patches.enabled {
+ if let Some(ref base_sha) = parent_sha {
+ match storage::create_patch(worktree_path, base_sha).await {
+ Ok((compressed_patch, files_count)) => {
+ // Check size limit
+ if compressed_patch.len() <= self.checkpoint_patches.max_patch_size_bytes {
+ // Encode as base64 for JSON transport
+ patch_data = Some(base64::engine::general_purpose::STANDARD.encode(&compressed_patch));
+ patch_base_sha = Some(base_sha.clone());
+ patch_files_count = Some(files_count as i32);
+ tracing::debug!(
+ task_id = %task_id,
+ sha = %sha,
+ patch_size = compressed_patch.len(),
+ files_count = files_count,
+ "Created checkpoint patch"
+ );
+ } else {
+ tracing::warn!(
+ task_id = %task_id,
+ sha = %sha,
+ patch_size = compressed_patch.len(),
+ max_size = self.checkpoint_patches.max_patch_size_bytes,
+ "Patch exceeds size limit, not including in checkpoint"
+ );
+ }
+ }
+ Err(e) => {
+ tracing::warn!(
+ task_id = %task_id,
+ sha = %sha,
+ error = %e,
+ "Failed to create patch for heartbeat commit"
+ );
+ }
+ }
+ }
+ }
+
+ // Send CheckpointCreated message to server (so it stores the checkpoint and patch)
+ let msg = DaemonMessage::CheckpointCreated {
+ task_id,
+ success: true,
+ commit_sha: Some(sha.clone()),
+ branch_name: Some(branch_name),
+ checkpoint_number: None, // Server will assign
+ files_changed: None, // Could get from git diff --name-status if needed
+ lines_added: None,
+ lines_removed: None,
+ error: None,
+ message: commit_msg,
+ patch_data,
+ patch_base_sha,
+ patch_files_count,
+ };
+ let _ = self.ws_tx.send(msg).await;
+
Ok((sha, pushed))
}
}
@@ -4633,6 +4836,7 @@ impl Clone for TaskManagerInner {
api_url: self.api_url.clone(),
heartbeat_commit_interval_secs: self.heartbeat_commit_interval_secs,
contract_task_counts: self.contract_task_counts.clone(),
+ checkpoint_patches: self.checkpoint_patches.clone(),
}
}
}