summaryrefslogtreecommitdiff
path: root/makima/src/daemon/task
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-23 23:52:35 +0000
committersoryu <soryu@soryu.co>2026-01-23 23:52:35 +0000
commit579c983d3efb8f1414ffb45b9e031f741cce5f76 (patch)
tree1a0060f19a4f4eea8fb9cff9eb52a46cedcdc152 /makima/src/daemon/task
parentf6f0790217d4098ffb6d2b3df08b0cf83ff61727 (diff)
downloadsoryu-579c983d3efb8f1414ffb45b9e031f741cce5f76.tar.gz
soryu-579c983d3efb8f1414ffb45b9e031f741cce5f76.zip
Add resume to daemon tasks
Diffstat (limited to 'makima/src/daemon/task')
-rw-r--r--makima/src/daemon/task/manager.rs265
1 files changed, 263 insertions, 2 deletions
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs
index cb4bde2..3fdde9b 100644
--- a/makima/src/daemon/task/manager.rs
+++ b/makima/src/daemon/task/manager.rs
@@ -21,6 +21,7 @@ use crate::daemon::process::{ClaudeInputMessage, ProcessManager};
use crate::daemon::storage;
use crate::daemon::temp::TempManager;
use crate::daemon::worktree::{is_new_repo_request, ConflictResolution, WorktreeInfo, WorktreeManager};
+use crate::daemon::db::local::LocalDb;
use crate::daemon::ws::{BranchInfo, DaemonCommand, DaemonMessage};
/// Generate a secure random API key for orchestrator tool access.
@@ -1045,11 +1046,17 @@ pub struct TaskManager {
git_user_email: Arc<RwLock<Option<String>>>,
/// Inherited git user.name for worktrees.
git_user_name: Arc<RwLock<Option<String>>>,
+ /// Local SQLite database for crash recovery.
+ local_db: Arc<std::sync::Mutex<LocalDb>>,
}
impl TaskManager {
- /// Create a new task manager.
- pub fn new(config: TaskConfig, ws_tx: mpsc::Sender<DaemonMessage>) -> Self {
+ /// Create a new task manager with local database for crash recovery.
+ pub fn new(
+ config: TaskConfig,
+ ws_tx: mpsc::Sender<DaemonMessage>,
+ local_db: Arc<std::sync::Mutex<LocalDb>>,
+ ) -> Self {
let worktree_manager = Arc::new(WorktreeManager::new(config.worktree_base_dir.clone()));
let process_manager = Arc::new(
ProcessManager::with_command(config.claude_command.clone())
@@ -1075,9 +1082,239 @@ impl TaskManager {
active_pids: Arc::new(RwLock::new(HashMap::new())),
git_user_email: Arc::new(RwLock::new(None)),
git_user_name: Arc::new(RwLock::new(None)),
+ local_db,
}
}
+ /// Persist task state to local SQLite database for crash recovery.
+ fn persist_task_to_local_db(&self, task: &ManagedTask) {
+ use crate::daemon::db::local::LocalTask;
+
+ let local_task = LocalTask {
+ id: task.id,
+ server_task_id: task.id, // Same as task id
+ state: task.state.clone(),
+ container_id: None,
+ overlay_path: task.worktree.as_ref().map(|w| w.path.to_string_lossy().to_string()),
+ repo_url: task.repo_source.clone(),
+ base_branch: task.base_branch.clone(),
+ plan: task.plan.clone(),
+ created_at: chrono::Utc::now(),
+ started_at: task.started_at.map(|_| chrono::Utc::now()),
+ completed_at: task.completed_at.map(|_| chrono::Utc::now()),
+ error_message: task.error.clone(),
+ };
+
+ if let Ok(db) = self.local_db.lock() {
+ if let Err(e) = db.save_task(&local_task) {
+ tracing::warn!(task_id = %task.id, error = %e, "Failed to persist task to local database");
+ } else {
+ tracing::debug!(task_id = %task.id, state = ?task.state, "Persisted task to local database");
+ }
+ }
+ }
+
+ /// Remove completed/failed task from local database.
+ fn remove_task_from_local_db(&self, task_id: Uuid) {
+ if let Ok(db) = self.local_db.lock() {
+ if let Err(e) = db.delete_task(task_id) {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to remove task from local database");
+ } else {
+ tracing::debug!(task_id = %task_id, "Removed task from local database");
+ }
+ }
+ }
+
+ /// Recover orphaned tasks from local database after daemon restart.
+ /// Returns list of task IDs that have worktrees and can potentially be recovered.
+ pub async fn recover_orphaned_tasks(&self) -> Vec<Uuid> {
+ tracing::info!("=== STARTING ORPHANED TASK RECOVERY ===");
+
+ let active_tasks = {
+ let db = match self.local_db.lock() {
+ Ok(db) => db,
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to lock local database for recovery");
+ return Vec::new();
+ }
+ };
+
+ match db.get_active_tasks() {
+ Ok(tasks) => tasks,
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to load active tasks from local database");
+ return Vec::new();
+ }
+ }
+ };
+
+ if active_tasks.is_empty() {
+ tracing::info!("No orphaned tasks found in local database");
+ return Vec::new();
+ }
+
+ tracing::info!(count = active_tasks.len(), "Found orphaned tasks in local database");
+
+ let mut recoverable_task_ids = Vec::new();
+
+ for local_task in active_tasks {
+ tracing::info!(
+ task_id = %local_task.id,
+ state = ?local_task.state,
+ overlay_path = ?local_task.overlay_path,
+ "Checking orphaned task"
+ );
+
+ // Check if worktree exists on filesystem
+ let worktree_exists = if let Some(ref path) = local_task.overlay_path {
+ let path = std::path::PathBuf::from(path);
+ path.exists() && path.join(".git").exists()
+ } else {
+ // Try to find worktree by task ID pattern (scan worktrees directory)
+ let short_id = &local_task.id.to_string()[..8];
+ let worktrees_dir = self.worktree_manager.base_dir();
+ let mut found = false;
+
+ if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await {
+ while let Ok(Some(entry)) = entries.next_entry().await {
+ let name = entry.file_name();
+ let name_str = name.to_string_lossy();
+ if name_str.starts_with(short_id) {
+ let path = entry.path();
+ if path.join(".git").exists() {
+ found = true;
+ break;
+ }
+ }
+ }
+ }
+ found
+ };
+
+ if worktree_exists {
+ tracing::info!(
+ task_id = %local_task.id,
+ "Found worktree for orphaned task - can be recovered"
+ );
+ recoverable_task_ids.push(local_task.id);
+
+ // Send structured recovery notification to server
+ let msg = DaemonMessage::task_recovery_detected(
+ local_task.id,
+ local_task.state.as_str(),
+ true, // worktree intact
+ local_task.overlay_path.clone(),
+ false, // doesn't need patch since worktree is intact
+ );
+ let _ = self.ws_tx.send(msg).await;
+ } else {
+ tracing::warn!(
+ task_id = %local_task.id,
+ "Worktree missing for orphaned task - marking as lost"
+ );
+
+ // Update local db to mark as failed
+ if let Ok(db) = self.local_db.lock() {
+ let _ = db.update_task_state(local_task.id, TaskState::Failed);
+ }
+ }
+ }
+
+ tracing::info!(
+ recoverable = recoverable_task_ids.len(),
+ "=== ORPHANED TASK RECOVERY COMPLETE ==="
+ );
+
+ recoverable_task_ids
+ }
+
+ /// Check worktree health for all running tasks.
+ /// If a worktree is missing, marks the task as interrupted and notifies the server.
+ /// This allows the retry orchestrator to pick up the task and restore it from checkpoint.
+ pub async fn check_worktree_health(&self) -> Vec<Uuid> {
+ let mut affected_task_ids = Vec::new();
+
+ // Get all running tasks
+ let tasks_snapshot: Vec<(Uuid, Option<PathBuf>)> = {
+ let tasks = self.tasks.read().await;
+ tasks
+ .iter()
+ .filter(|(_, t)| matches!(t.state, TaskState::Running | TaskState::Starting))
+ .map(|(id, t)| (*id, t.worktree.as_ref().map(|w| w.path.clone())))
+ .collect()
+ };
+
+ if tasks_snapshot.is_empty() {
+ return affected_task_ids;
+ }
+
+ for (task_id, worktree_path) in tasks_snapshot {
+ let worktree_exists = if let Some(ref path) = worktree_path {
+ path.exists() && path.join(".git").exists()
+ } else {
+ // No worktree set - scan by task ID
+ let short_id = &task_id.to_string()[..8];
+ let worktrees_dir = self.worktree_manager.base_dir();
+ let mut found = false;
+
+ if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await {
+ while let Ok(Some(entry)) = entries.next_entry().await {
+ let name = entry.file_name();
+ let name_str = name.to_string_lossy();
+ if name_str.starts_with(short_id) {
+ let path = entry.path();
+ if path.join(".git").exists() {
+ found = true;
+ break;
+ }
+ }
+ }
+ }
+ found
+ };
+
+ if !worktree_exists {
+ tracing::warn!(
+ task_id = %task_id,
+ worktree_path = ?worktree_path,
+ "Worktree missing for running task - marking as interrupted for retry"
+ );
+
+ affected_task_ids.push(task_id);
+
+ // Update task state to interrupted
+ {
+ let mut tasks = self.tasks.write().await;
+ if let Some(task) = tasks.get_mut(&task_id) {
+ task.state = TaskState::Interrupted;
+ task.error = Some("Worktree directory was deleted".to_string());
+ task.completed_at = Some(Instant::now());
+ }
+ }
+
+ // Notify server - task needs recovery/retry
+ let msg = DaemonMessage::task_complete(
+ task_id,
+ false,
+ Some("Worktree deleted - task interrupted for recovery".to_string()),
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ // Remove from local db since server will handle retry
+ self.remove_task_from_local_db(task_id);
+ }
+ }
+
+ if !affected_task_ids.is_empty() {
+ tracing::info!(
+ count = affected_task_ids.len(),
+ "Worktree health check found missing worktrees"
+ );
+ }
+
+ affected_task_ids
+ }
+
/// Check if a task can be spawned given contract-based concurrency limits.
/// Returns the concurrency key to use (contract_id or task_id for standalone).
async fn try_acquire_concurrency_slot(
@@ -1823,6 +2060,9 @@ impl TaskManager {
error: None,
};
+ // Persist task to local database for crash recovery
+ self.persist_task_to_local_db(&task);
+
self.tasks.write().await.insert(task_id, task);
tracing::info!(task_id = %task_id, "Task entry created and stored");
@@ -1871,6 +2111,7 @@ impl TaskManager {
heartbeat_commit_interval_secs: self.config.heartbeat_commit_interval_secs,
contract_task_counts: self.contract_task_counts.clone(),
checkpoint_patches: self.config.checkpoint_patches.clone(),
+ local_db: self.local_db.clone(),
}
}
@@ -3190,6 +3431,8 @@ struct TaskManagerInner {
contract_task_counts: Arc<RwLock<HashMap<Uuid, usize>>>,
/// Checkpoint patch storage configuration.
checkpoint_patches: CheckpointPatchConfig,
+ /// Local SQLite database for crash recovery.
+ local_db: Arc<std::sync::Mutex<LocalDb>>,
}
impl TaskManagerInner {
@@ -3210,6 +3453,17 @@ impl TaskManagerInner {
}
}
+ /// Remove completed/failed task from local database.
+ fn remove_task_from_local_db(&self, task_id: Uuid) {
+ if let Ok(db) = self.local_db.lock() {
+ if let Err(e) = db.delete_task(task_id) {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to remove task from local database");
+ } else {
+ tracing::debug!(task_id = %task_id, "Removed task from local database");
+ }
+ }
+ }
+
/// Run a task to completion.
#[allow(clippy::too_many_arguments)]
async fn run_task(
@@ -4375,6 +4629,9 @@ impl TaskManagerInner {
tracing::info!(task_id = %task_id, success = success, "Notifying server of task completion");
let msg = DaemonMessage::task_complete(task_id, success, error);
let _ = self.ws_tx.send(msg).await;
+
+ // Remove completed task from local database (no longer needs crash recovery)
+ self.remove_task_from_local_db(task_id);
}
// Note: Worktrees are kept until explicitly deleted (per user preference)
@@ -4578,6 +4835,9 @@ impl TaskManagerInner {
// Notify server
let msg = DaemonMessage::task_complete(task_id, false, Some(error.to_string()));
let _ = self.ws_tx.send(msg).await;
+
+ // Remove failed task from local database
+ self.remove_task_from_local_db(task_id);
}
/// Apply inherited git config to a worktree directory.
@@ -4837,6 +5097,7 @@ impl Clone for TaskManagerInner {
heartbeat_commit_interval_secs: self.heartbeat_commit_interval_secs,
contract_task_counts: self.contract_task_counts.clone(),
checkpoint_patches: self.checkpoint_patches.clone(),
+ local_db: self.local_db.clone(),
}
}
}