diff options
Diffstat (limited to 'makima/src/daemon/task/manager.rs')
| -rw-r--r-- | makima/src/daemon/task/manager.rs | 265 |
1 files changed, 263 insertions, 2 deletions
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs index cb4bde2..3fdde9b 100644 --- a/makima/src/daemon/task/manager.rs +++ b/makima/src/daemon/task/manager.rs @@ -21,6 +21,7 @@ use crate::daemon::process::{ClaudeInputMessage, ProcessManager}; use crate::daemon::storage; use crate::daemon::temp::TempManager; use crate::daemon::worktree::{is_new_repo_request, ConflictResolution, WorktreeInfo, WorktreeManager}; +use crate::daemon::db::local::LocalDb; use crate::daemon::ws::{BranchInfo, DaemonCommand, DaemonMessage}; /// Generate a secure random API key for orchestrator tool access. @@ -1045,11 +1046,17 @@ pub struct TaskManager { git_user_email: Arc<RwLock<Option<String>>>, /// Inherited git user.name for worktrees. git_user_name: Arc<RwLock<Option<String>>>, + /// Local SQLite database for crash recovery. + local_db: Arc<std::sync::Mutex<LocalDb>>, } impl TaskManager { - /// Create a new task manager. - pub fn new(config: TaskConfig, ws_tx: mpsc::Sender<DaemonMessage>) -> Self { + /// Create a new task manager with local database for crash recovery. + pub fn new( + config: TaskConfig, + ws_tx: mpsc::Sender<DaemonMessage>, + local_db: Arc<std::sync::Mutex<LocalDb>>, + ) -> Self { let worktree_manager = Arc::new(WorktreeManager::new(config.worktree_base_dir.clone())); let process_manager = Arc::new( ProcessManager::with_command(config.claude_command.clone()) @@ -1075,9 +1082,239 @@ impl TaskManager { active_pids: Arc::new(RwLock::new(HashMap::new())), git_user_email: Arc::new(RwLock::new(None)), git_user_name: Arc::new(RwLock::new(None)), + local_db, } } + /// Persist task state to local SQLite database for crash recovery. + fn persist_task_to_local_db(&self, task: &ManagedTask) { + use crate::daemon::db::local::LocalTask; + + let local_task = LocalTask { + id: task.id, + server_task_id: task.id, // Same as task id + state: task.state.clone(), + container_id: None, + overlay_path: task.worktree.as_ref().map(|w| w.path.to_string_lossy().to_string()), + repo_url: task.repo_source.clone(), + base_branch: task.base_branch.clone(), + plan: task.plan.clone(), + created_at: chrono::Utc::now(), + started_at: task.started_at.map(|_| chrono::Utc::now()), + completed_at: task.completed_at.map(|_| chrono::Utc::now()), + error_message: task.error.clone(), + }; + + if let Ok(db) = self.local_db.lock() { + if let Err(e) = db.save_task(&local_task) { + tracing::warn!(task_id = %task.id, error = %e, "Failed to persist task to local database"); + } else { + tracing::debug!(task_id = %task.id, state = ?task.state, "Persisted task to local database"); + } + } + } + + /// Remove completed/failed task from local database. + fn remove_task_from_local_db(&self, task_id: Uuid) { + if let Ok(db) = self.local_db.lock() { + if let Err(e) = db.delete_task(task_id) { + tracing::warn!(task_id = %task_id, error = %e, "Failed to remove task from local database"); + } else { + tracing::debug!(task_id = %task_id, "Removed task from local database"); + } + } + } + + /// Recover orphaned tasks from local database after daemon restart. + /// Returns list of task IDs that have worktrees and can potentially be recovered. + pub async fn recover_orphaned_tasks(&self) -> Vec<Uuid> { + tracing::info!("=== STARTING ORPHANED TASK RECOVERY ==="); + + let active_tasks = { + let db = match self.local_db.lock() { + Ok(db) => db, + Err(e) => { + tracing::error!(error = %e, "Failed to lock local database for recovery"); + return Vec::new(); + } + }; + + match db.get_active_tasks() { + Ok(tasks) => tasks, + Err(e) => { + tracing::error!(error = %e, "Failed to load active tasks from local database"); + return Vec::new(); + } + } + }; + + if active_tasks.is_empty() { + tracing::info!("No orphaned tasks found in local database"); + return Vec::new(); + } + + tracing::info!(count = active_tasks.len(), "Found orphaned tasks in local database"); + + let mut recoverable_task_ids = Vec::new(); + + for local_task in active_tasks { + tracing::info!( + task_id = %local_task.id, + state = ?local_task.state, + overlay_path = ?local_task.overlay_path, + "Checking orphaned task" + ); + + // Check if worktree exists on filesystem + let worktree_exists = if let Some(ref path) = local_task.overlay_path { + let path = std::path::PathBuf::from(path); + path.exists() && path.join(".git").exists() + } else { + // Try to find worktree by task ID pattern (scan worktrees directory) + let short_id = &local_task.id.to_string()[..8]; + let worktrees_dir = self.worktree_manager.base_dir(); + let mut found = false; + + if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await { + while let Ok(Some(entry)) = entries.next_entry().await { + let name = entry.file_name(); + let name_str = name.to_string_lossy(); + if name_str.starts_with(short_id) { + let path = entry.path(); + if path.join(".git").exists() { + found = true; + break; + } + } + } + } + found + }; + + if worktree_exists { + tracing::info!( + task_id = %local_task.id, + "Found worktree for orphaned task - can be recovered" + ); + recoverable_task_ids.push(local_task.id); + + // Send structured recovery notification to server + let msg = DaemonMessage::task_recovery_detected( + local_task.id, + local_task.state.as_str(), + true, // worktree intact + local_task.overlay_path.clone(), + false, // doesn't need patch since worktree is intact + ); + let _ = self.ws_tx.send(msg).await; + } else { + tracing::warn!( + task_id = %local_task.id, + "Worktree missing for orphaned task - marking as lost" + ); + + // Update local db to mark as failed + if let Ok(db) = self.local_db.lock() { + let _ = db.update_task_state(local_task.id, TaskState::Failed); + } + } + } + + tracing::info!( + recoverable = recoverable_task_ids.len(), + "=== ORPHANED TASK RECOVERY COMPLETE ===" + ); + + recoverable_task_ids + } + + /// Check worktree health for all running tasks. + /// If a worktree is missing, marks the task as interrupted and notifies the server. + /// This allows the retry orchestrator to pick up the task and restore it from checkpoint. + pub async fn check_worktree_health(&self) -> Vec<Uuid> { + let mut affected_task_ids = Vec::new(); + + // Get all running tasks + let tasks_snapshot: Vec<(Uuid, Option<PathBuf>)> = { + let tasks = self.tasks.read().await; + tasks + .iter() + .filter(|(_, t)| matches!(t.state, TaskState::Running | TaskState::Starting)) + .map(|(id, t)| (*id, t.worktree.as_ref().map(|w| w.path.clone()))) + .collect() + }; + + if tasks_snapshot.is_empty() { + return affected_task_ids; + } + + for (task_id, worktree_path) in tasks_snapshot { + let worktree_exists = if let Some(ref path) = worktree_path { + path.exists() && path.join(".git").exists() + } else { + // No worktree set - scan by task ID + let short_id = &task_id.to_string()[..8]; + let worktrees_dir = self.worktree_manager.base_dir(); + let mut found = false; + + if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await { + while let Ok(Some(entry)) = entries.next_entry().await { + let name = entry.file_name(); + let name_str = name.to_string_lossy(); + if name_str.starts_with(short_id) { + let path = entry.path(); + if path.join(".git").exists() { + found = true; + break; + } + } + } + } + found + }; + + if !worktree_exists { + tracing::warn!( + task_id = %task_id, + worktree_path = ?worktree_path, + "Worktree missing for running task - marking as interrupted for retry" + ); + + affected_task_ids.push(task_id); + + // Update task state to interrupted + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.state = TaskState::Interrupted; + task.error = Some("Worktree directory was deleted".to_string()); + task.completed_at = Some(Instant::now()); + } + } + + // Notify server - task needs recovery/retry + let msg = DaemonMessage::task_complete( + task_id, + false, + Some("Worktree deleted - task interrupted for recovery".to_string()), + ); + let _ = self.ws_tx.send(msg).await; + + // Remove from local db since server will handle retry + self.remove_task_from_local_db(task_id); + } + } + + if !affected_task_ids.is_empty() { + tracing::info!( + count = affected_task_ids.len(), + "Worktree health check found missing worktrees" + ); + } + + affected_task_ids + } + /// Check if a task can be spawned given contract-based concurrency limits. /// Returns the concurrency key to use (contract_id or task_id for standalone). async fn try_acquire_concurrency_slot( @@ -1823,6 +2060,9 @@ impl TaskManager { error: None, }; + // Persist task to local database for crash recovery + self.persist_task_to_local_db(&task); + self.tasks.write().await.insert(task_id, task); tracing::info!(task_id = %task_id, "Task entry created and stored"); @@ -1871,6 +2111,7 @@ impl TaskManager { heartbeat_commit_interval_secs: self.config.heartbeat_commit_interval_secs, contract_task_counts: self.contract_task_counts.clone(), checkpoint_patches: self.config.checkpoint_patches.clone(), + local_db: self.local_db.clone(), } } @@ -3190,6 +3431,8 @@ struct TaskManagerInner { contract_task_counts: Arc<RwLock<HashMap<Uuid, usize>>>, /// Checkpoint patch storage configuration. checkpoint_patches: CheckpointPatchConfig, + /// Local SQLite database for crash recovery. + local_db: Arc<std::sync::Mutex<LocalDb>>, } impl TaskManagerInner { @@ -3210,6 +3453,17 @@ impl TaskManagerInner { } } + /// Remove completed/failed task from local database. + fn remove_task_from_local_db(&self, task_id: Uuid) { + if let Ok(db) = self.local_db.lock() { + if let Err(e) = db.delete_task(task_id) { + tracing::warn!(task_id = %task_id, error = %e, "Failed to remove task from local database"); + } else { + tracing::debug!(task_id = %task_id, "Removed task from local database"); + } + } + } + /// Run a task to completion. #[allow(clippy::too_many_arguments)] async fn run_task( @@ -4375,6 +4629,9 @@ impl TaskManagerInner { tracing::info!(task_id = %task_id, success = success, "Notifying server of task completion"); let msg = DaemonMessage::task_complete(task_id, success, error); let _ = self.ws_tx.send(msg).await; + + // Remove completed task from local database (no longer needs crash recovery) + self.remove_task_from_local_db(task_id); } // Note: Worktrees are kept until explicitly deleted (per user preference) @@ -4578,6 +4835,9 @@ impl TaskManagerInner { // Notify server let msg = DaemonMessage::task_complete(task_id, false, Some(error.to_string())); let _ = self.ws_tx.send(msg).await; + + // Remove failed task from local database + self.remove_task_from_local_db(task_id); } /// Apply inherited git config to a worktree directory. @@ -4837,6 +5097,7 @@ impl Clone for TaskManagerInner { heartbeat_commit_interval_secs: self.heartbeat_commit_interval_secs, contract_task_counts: self.contract_task_counts.clone(), checkpoint_patches: self.checkpoint_patches.clone(), + local_db: self.local_db.clone(), } } } |
