diff options
| author | soryu <soryu@soryu.co> | 2026-01-24 04:31:20 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-24 04:31:20 +0000 |
| commit | 0cf6f6a4c5c75c736e6fe3b1c726ef80c0a6c802 (patch) | |
| tree | 8f08fa5421e13381c5955f52a5197379feb44c58 | |
| parent | f6f0790217d4098ffb6d2b3df08b0cf83ff61727 (diff) | |
| download | soryu-0cf6f6a4c5c75c736e6fe3b1c726ef80c0a6c802.tar.gz soryu-0cf6f6a4c5c75c736e6fe3b1c726ef80c0a6c802.zip | |
feat: implement dependency-ordered task execution
Add dependency tracking and validation for tasks to enforce execution order
(schema changes → backend → UI) as specified in Section 1.3 of ralph-features-spec.md.
Changes:
- Add depends_on field to Task model (Vec<Uuid>) for explicit dependencies
- Create database migration for depends_on column with GIN index
- Add dependency_analysis.rs module with:
- can_start_task() for checking if all dependencies are complete
- Auto-detection of dependency patterns from file paths
- Detection of schema/types/backend/UI categories
- Warnings for potential dependency violations
- Add DependencyOrderingConfig to daemon config with:
- enabled: Enable/disable dependency checking
- auto_detect: Auto-detect dependencies from file patterns
- warn_on_violation: Warn on detected violations
- Integrate dependency checks into task manager scheduling
- Add depends_on to DaemonCommand::SpawnTask protocol
The daemon performs dependency validation as a sanity check but defers to
the server for authoritative scheduling decisions.
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
| -rw-r--r-- | makima/migrations/20250124000000_add_task_depends_on.sql | 11 | ||||
| -rw-r--r-- | makima/src/bin/makima.rs | 29 | ||||
| -rw-r--r-- | makima/src/daemon/config.rs | 39 | ||||
| -rw-r--r-- | makima/src/daemon/task/dependency_analysis.rs | 483 | ||||
| -rw-r--r-- | makima/src/daemon/task/manager.rs | 371 | ||||
| -rw-r--r-- | makima/src/daemon/task/mod.rs | 5 | ||||
| -rw-r--r-- | makima/src/daemon/ws/protocol.rs | 40 | ||||
| -rw-r--r-- | makima/src/db/models.rs | 15 | ||||
| -rw-r--r-- | makima/src/db/repository.rs | 32 | ||||
| -rw-r--r-- | makima/src/server/handlers/contract_chat.rs | 4 | ||||
| -rw-r--r-- | makima/src/server/handlers/contracts.rs | 1 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh.rs | 33 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_chat.rs | 1 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 117 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_supervisor.rs | 40 | ||||
| -rw-r--r-- | makima/src/server/handlers/transcript_analysis.rs | 2 | ||||
| -rw-r--r-- | makima/src/server/mod.rs | 61 |
17 files changed, 1265 insertions, 19 deletions
diff --git a/makima/migrations/20250124000000_add_task_depends_on.sql b/makima/migrations/20250124000000_add_task_depends_on.sql new file mode 100644 index 0000000..4327bbe --- /dev/null +++ b/makima/migrations/20250124000000_add_task_depends_on.sql @@ -0,0 +1,11 @@ +-- Add depends_on column for dependency-ordered task execution +-- This allows tasks to declare dependencies on other tasks that must complete first. +-- Used for enforcing execution order: schema changes -> backend -> UI. + +ALTER TABLE tasks +ADD COLUMN IF NOT EXISTS depends_on JSONB DEFAULT '[]'::jsonb; + +-- Index for querying tasks by their dependencies +CREATE INDEX IF NOT EXISTS idx_tasks_depends_on ON tasks USING GIN (depends_on); + +COMMENT ON COLUMN tasks.depends_on IS 'Array of task UUIDs that must complete before this task can start'; diff --git a/makima/src/bin/makima.rs b/makima/src/bin/makima.rs index 1a307d1..e2072da 100644 --- a/makima/src/bin/makima.rs +++ b/makima/src/bin/makima.rs @@ -165,7 +165,7 @@ async fn run_daemon( "[3/5] Opening local database: {}", config.local_db.path.display() ); - let _local_db = LocalDb::open(&config.local_db.path)?; + let local_db = Arc::new(std::sync::Mutex::new(LocalDb::open(&config.local_db.path)?)); eprintln!(" Database opened"); // Initialize worktree directories @@ -242,10 +242,17 @@ async fn run_daemon( api_key: config.server.api_key.clone(), heartbeat_commit_interval_secs: config.process.heartbeat_commit_interval_secs, checkpoint_patches: config.process.checkpoint_patches.clone(), + dependency_ordering: config.dependency_ordering.clone(), }; - // Create task manager - let task_manager = Arc::new(TaskManager::new(task_config, ws_tx.clone())); + // Create task manager with local database for crash recovery + let task_manager = Arc::new(TaskManager::new(task_config, ws_tx.clone(), local_db)); + + // Recover any orphaned tasks from previous daemon run + let recovered = task_manager.recover_orphaned_tasks().await; + if !recovered.is_empty() { + eprintln!(" Recovered {} orphaned tasks with intact worktrees", recovered.len()); + } // Spawn command handler let task_manager_clone = task_manager.clone(); @@ -260,6 +267,22 @@ async fn run_daemon( tracing::info!("Command handler stopped"); }); + // Spawn periodic worktree health check (every 60 seconds) + let health_check_manager = task_manager.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); + loop { + interval.tick().await; + let affected = health_check_manager.check_worktree_health().await; + if !affected.is_empty() { + tracing::info!( + count = affected.len(), + "Worktree health check detected missing worktrees - tasks marked for retry" + ); + } + } + }); + // Handle shutdown signals let shutdown_signal = async { tokio::signal::ctrl_c() diff --git a/makima/src/daemon/config.rs b/makima/src/daemon/config.rs index b7cb1e8..4408fff 100644 --- a/makima/src/daemon/config.rs +++ b/makima/src/daemon/config.rs @@ -63,6 +63,44 @@ pub struct DaemonConfig { /// Repositories to auto-clone on startup. #[serde(default)] pub repos: ReposConfig, + + /// Dependency ordering settings for task execution. + #[serde(default)] + pub dependency_ordering: DependencyOrderingConfig, +} + +/// Dependency ordering configuration. +/// Controls how task dependencies are validated and auto-detected. +#[derive(Debug, Clone, Deserialize)] +#[serde(default)] +pub struct DependencyOrderingConfig { + /// Enable dependency ordering checks. + /// When enabled, tasks with unmet dependencies cannot start. + #[serde(default = "default_true")] + pub enabled: bool, + + /// Auto-detect dependencies from file patterns. + /// Analyzes task plans to detect potential dependencies based on: + /// - Migration files -> backend code + /// - Types/models -> consumers + /// - APIs -> UI components + #[serde(default = "default_true")] + pub auto_detect: bool, + + /// Warn on detected dependency violations. + /// Produces warnings when tasks may be executing out of order. + #[serde(default = "default_true")] + pub warn_on_violation: bool, +} + +impl Default for DependencyOrderingConfig { + fn default() -> Self { + Self { + enabled: true, + auto_detect: true, + warn_on_violation: true, + } + } } /// Server connection configuration. @@ -626,6 +664,7 @@ impl DaemonConfig { }, logging: LoggingConfig::default(), repos: ReposConfig::default(), + dependency_ordering: DependencyOrderingConfig::default(), } } } diff --git a/makima/src/daemon/task/dependency_analysis.rs b/makima/src/daemon/task/dependency_analysis.rs new file mode 100644 index 0000000..0891f4e --- /dev/null +++ b/makima/src/daemon/task/dependency_analysis.rs @@ -0,0 +1,483 @@ +//! Dependency analysis for task execution ordering. +//! +//! This module provides functionality to: +//! - Check if a task's dependencies are satisfied before it can start +//! - Auto-detect dependency patterns based on file patterns +//! - Warn about potential dependency violations + +use std::collections::HashSet; +use uuid::Uuid; + +/// Dependency ordering configuration. +#[derive(Debug, Clone)] +pub struct DependencyOrderingConfig { + /// Enable dependency ordering checks. + pub enabled: bool, + /// Auto-detect dependencies from file patterns. + pub auto_detect: bool, + /// Warn on detected dependency violations. + pub warn_on_violation: bool, +} + +impl Default for DependencyOrderingConfig { + fn default() -> Self { + Self { + enabled: true, + auto_detect: true, + warn_on_violation: true, + } + } +} + +/// Result of a dependency check. +#[derive(Debug, Clone)] +pub struct DependencyCheckResult { + /// Whether the task can start (all dependencies satisfied). + pub can_start: bool, + /// IDs of tasks that are still pending/running. + pub unmet_dependencies: Vec<Uuid>, + /// Warnings about potential dependency issues. + pub warnings: Vec<String>, +} + +impl DependencyCheckResult { + /// Create a result indicating the task can start. + pub fn can_start() -> Self { + Self { + can_start: true, + unmet_dependencies: Vec::new(), + warnings: Vec::new(), + } + } + + /// Create a result indicating the task cannot start due to unmet dependencies. + pub fn blocked(unmet: Vec<Uuid>) -> Self { + Self { + can_start: false, + unmet_dependencies: unmet, + warnings: Vec::new(), + } + } + + /// Add a warning to the result. + pub fn with_warning(mut self, warning: String) -> Self { + self.warnings.push(warning); + self + } +} + +/// File pattern categories for dependency detection. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum FileCategory { + /// Schema/migration files (must run first). + Schema, + /// Type/model definitions. + Types, + /// Backend/API code. + Backend, + /// UI components (must run last). + Ui, + /// Unknown/uncategorized files. + Unknown, +} + +impl FileCategory { + /// Determine the category of a file based on its path. + pub fn from_path(path: &str) -> Self { + let path_lower = path.to_lowercase(); + + // Schema/migration patterns + if path_lower.contains("migration") + || path_lower.contains("schema") + || path_lower.ends_with(".sql") + || path_lower.contains("prisma/schema") + || path_lower.contains("drizzle") + { + return FileCategory::Schema; + } + + // Type/model patterns + if path_lower.contains("/types") + || path_lower.contains("/models") + || path_lower.contains("/entities") + || path_lower.ends_with(".d.ts") + || path_lower.ends_with("types.ts") + || path_lower.ends_with("types.rs") + || path_lower.ends_with("models.rs") + { + return FileCategory::Types; + } + + // UI patterns + if path_lower.contains("/components") + || path_lower.contains("/views") + || path_lower.contains("/pages") + || path_lower.contains("/ui/") + || path_lower.ends_with(".tsx") + || path_lower.ends_with(".jsx") + || path_lower.ends_with(".vue") + || path_lower.ends_with(".svelte") + { + return FileCategory::Ui; + } + + // Backend patterns + if path_lower.contains("/api") + || path_lower.contains("/server") + || path_lower.contains("/handlers") + || path_lower.contains("/services") + || path_lower.contains("/controllers") + || path_lower.contains("/routes") + || path_lower.ends_with(".rs") + || path_lower.ends_with(".go") + || path_lower.ends_with(".py") + { + return FileCategory::Backend; + } + + FileCategory::Unknown + } + + /// Get the expected execution order (lower = earlier). + pub fn execution_order(&self) -> u8 { + match self { + FileCategory::Schema => 0, + FileCategory::Types => 1, + FileCategory::Backend => 2, + FileCategory::Ui => 3, + FileCategory::Unknown => 2, // Treat unknown as backend-level + } + } +} + +/// Analyze a task's plan to determine what file categories it affects. +pub fn analyze_task_files(plan: &str) -> HashSet<FileCategory> { + let mut categories = HashSet::new(); + + // Simple heuristic: look for file patterns in the plan text + let patterns_schema = [ + "migration", "schema", ".sql", "prisma", "drizzle", "database", + "ALTER TABLE", "CREATE TABLE", "DROP TABLE", + ]; + let patterns_types = [ + "type ", "interface ", "struct ", "model ", "entity", + "types.ts", "types.rs", "models.rs", ".d.ts", + ]; + let patterns_backend = [ + "api", "endpoint", "handler", "controller", "route", "service", + "server", "backend", "REST", "GraphQL", + ]; + let patterns_ui = [ + "component", "view", "page", "ui ", "frontend", "react", + ".tsx", ".jsx", ".vue", ".svelte", "button", "form", + ]; + + let plan_lower = plan.to_lowercase(); + + for pattern in patterns_schema { + if plan_lower.contains(&pattern.to_lowercase()) { + categories.insert(FileCategory::Schema); + break; + } + } + + for pattern in patterns_types { + if plan_lower.contains(&pattern.to_lowercase()) { + categories.insert(FileCategory::Types); + break; + } + } + + for pattern in patterns_backend { + if plan_lower.contains(&pattern.to_lowercase()) { + categories.insert(FileCategory::Backend); + break; + } + } + + for pattern in patterns_ui { + if plan_lower.contains(&pattern.to_lowercase()) { + categories.insert(FileCategory::Ui); + break; + } + } + + categories +} + +/// Information about a task's status for dependency checking. +#[derive(Debug, Clone)] +pub struct TaskDependencyInfo { + pub id: Uuid, + pub status: String, + pub plan: String, +} + +impl TaskDependencyInfo { + /// Check if this task is considered "complete" for dependency purposes. + pub fn is_complete(&self) -> bool { + matches!(self.status.as_str(), "done" | "merged") + } +} + +/// Check if a task can start based on its explicit dependencies. +/// +/// Returns a `DependencyCheckResult` indicating whether the task can start +/// and listing any unmet dependencies. +pub fn can_start_task( + depends_on: &[Uuid], + dependency_tasks: &[TaskDependencyInfo], + _config: &DependencyOrderingConfig, +) -> DependencyCheckResult { + // If no explicit dependencies, the task can start + if depends_on.is_empty() { + return DependencyCheckResult::can_start(); + } + + let complete_task_ids: HashSet<Uuid> = dependency_tasks + .iter() + .filter(|t| t.is_complete()) + .map(|t| t.id) + .collect(); + + let unmet: Vec<Uuid> = depends_on + .iter() + .filter(|dep_id| !complete_task_ids.contains(dep_id)) + .copied() + .collect(); + + if unmet.is_empty() { + DependencyCheckResult::can_start() + } else { + DependencyCheckResult::blocked(unmet) + } +} + +/// Auto-detect potential dependency violations based on file patterns. +/// +/// This analyzes the task's plan and compares it with sibling tasks to warn +/// if execution order might be problematic. +pub fn detect_dependency_violations( + task_plan: &str, + sibling_tasks: &[TaskDependencyInfo], + config: &DependencyOrderingConfig, +) -> Vec<String> { + if !config.auto_detect || !config.warn_on_violation { + return Vec::new(); + } + + let mut warnings = Vec::new(); + let task_categories = analyze_task_files(task_plan); + + // Find the minimum execution order this task touches + let task_min_order = task_categories + .iter() + .map(|c| c.execution_order()) + .min() + .unwrap_or(u8::MAX); + + // Check if any pending/running sibling tasks should run first + for sibling in sibling_tasks { + if sibling.is_complete() { + continue; + } + + let sibling_categories = analyze_task_files(&sibling.plan); + let sibling_min_order = sibling_categories + .iter() + .map(|c| c.execution_order()) + .min() + .unwrap_or(u8::MAX); + + // Warn if this task touches "earlier" categories while sibling touches "later" ones + // and the sibling is still pending + if sibling_min_order < task_min_order { + let sibling_cat_names: Vec<&str> = sibling_categories + .iter() + .map(|c| match c { + FileCategory::Schema => "schema/migrations", + FileCategory::Types => "types/models", + FileCategory::Backend => "backend/API", + FileCategory::Ui => "UI components", + FileCategory::Unknown => "other", + }) + .collect(); + + let task_cat_names: Vec<&str> = task_categories + .iter() + .map(|c| match c { + FileCategory::Schema => "schema/migrations", + FileCategory::Types => "types/models", + FileCategory::Backend => "backend/API", + FileCategory::Ui => "UI components", + FileCategory::Unknown => "other", + }) + .collect(); + + warnings.push(format!( + "Task may depend on sibling task {} which affects {} (this task affects {})", + sibling.id, + sibling_cat_names.join(", "), + task_cat_names.join(", ") + )); + } + } + + warnings +} + +/// Suggest automatic dependencies based on file pattern analysis. +/// +/// This analyzes tasks in a contract and suggests which tasks should depend on others +/// based on the Ralph pattern: schema -> types -> backend -> UI. +pub fn suggest_dependencies( + task_plan: &str, + sibling_tasks: &[TaskDependencyInfo], +) -> Vec<Uuid> { + let mut suggested = Vec::new(); + let task_categories = analyze_task_files(task_plan); + + // Get this task's minimum execution order + let task_min_order = task_categories + .iter() + .map(|c| c.execution_order()) + .min() + .unwrap_or(u8::MAX); + + // Suggest dependencies on tasks that touch "earlier" categories + for sibling in sibling_tasks { + let sibling_categories = analyze_task_files(&sibling.plan); + let sibling_max_order = sibling_categories + .iter() + .map(|c| c.execution_order()) + .max() + .unwrap_or(0); + + // If sibling's work is at an earlier stage, suggest it as a dependency + if sibling_max_order < task_min_order { + suggested.push(sibling.id); + } + } + + suggested +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_file_category_from_path() { + assert_eq!( + FileCategory::from_path("migrations/001_create_users.sql"), + FileCategory::Schema + ); + assert_eq!( + FileCategory::from_path("src/types/user.ts"), + FileCategory::Types + ); + assert_eq!( + FileCategory::from_path("src/api/users.rs"), + FileCategory::Backend + ); + assert_eq!( + FileCategory::from_path("src/components/UserProfile.tsx"), + FileCategory::Ui + ); + } + + #[test] + fn test_execution_order() { + assert!(FileCategory::Schema.execution_order() < FileCategory::Types.execution_order()); + assert!(FileCategory::Types.execution_order() < FileCategory::Backend.execution_order()); + assert!(FileCategory::Backend.execution_order() < FileCategory::Ui.execution_order()); + } + + #[test] + fn test_analyze_task_files() { + let plan = "Create a new migration to add the users table"; + let categories = analyze_task_files(plan); + assert!(categories.contains(&FileCategory::Schema)); + + let plan = "Implement the UserProfile component with React"; + let categories = analyze_task_files(plan); + assert!(categories.contains(&FileCategory::Ui)); + } + + #[test] + fn test_can_start_task_no_deps() { + let result = can_start_task(&[], &[], &DependencyOrderingConfig::default()); + assert!(result.can_start); + assert!(result.unmet_dependencies.is_empty()); + } + + #[test] + fn test_can_start_task_with_complete_deps() { + let dep_id = Uuid::new_v4(); + let dep_task = TaskDependencyInfo { + id: dep_id, + status: "done".to_string(), + plan: "Some completed task".to_string(), + }; + + let result = can_start_task( + &[dep_id], + &[dep_task], + &DependencyOrderingConfig::default(), + ); + assert!(result.can_start); + assert!(result.unmet_dependencies.is_empty()); + } + + #[test] + fn test_can_start_task_with_pending_deps() { + let dep_id = Uuid::new_v4(); + let dep_task = TaskDependencyInfo { + id: dep_id, + status: "running".to_string(), + plan: "Some running task".to_string(), + }; + + let result = can_start_task( + &[dep_id], + &[dep_task], + &DependencyOrderingConfig::default(), + ); + assert!(!result.can_start); + assert_eq!(result.unmet_dependencies, vec![dep_id]); + } + + #[test] + fn test_detect_dependency_violations() { + let ui_task_plan = "Create the UserProfile component"; + let schema_sibling = TaskDependencyInfo { + id: Uuid::new_v4(), + status: "pending".to_string(), + plan: "Add migration to create users table".to_string(), + }; + + let warnings = detect_dependency_violations( + ui_task_plan, + &[schema_sibling], + &DependencyOrderingConfig::default(), + ); + + // Should warn that UI task might depend on schema task + assert!(!warnings.is_empty()); + } + + #[test] + fn test_suggest_dependencies() { + let schema_task = TaskDependencyInfo { + id: Uuid::new_v4(), + status: "pending".to_string(), + plan: "Add migration to create users table".to_string(), + }; + + let ui_task_plan = "Create the UserProfile component"; + let suggested = suggest_dependencies(ui_task_plan, &[schema_task.clone()]); + + // Should suggest schema task as a dependency for UI task + assert!(suggested.contains(&schema_task.id)); + } +} diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs index cb4bde2..df77171 100644 --- a/makima/src/daemon/task/manager.rs +++ b/makima/src/daemon/task/manager.rs @@ -14,6 +14,9 @@ use uuid::Uuid; use std::collections::HashSet; use super::completion_gate::{CircuitBreaker, CompletionGate}; +use super::dependency_analysis::{ + can_start_task, detect_dependency_violations, DependencyOrderingConfig, TaskDependencyInfo, +}; use super::state::TaskState; use crate::daemon::config::CheckpointPatchConfig; use crate::daemon::error::{DaemonError, TaskError, TaskResult}; @@ -21,6 +24,7 @@ use crate::daemon::process::{ClaudeInputMessage, ProcessManager}; use crate::daemon::storage; use crate::daemon::temp::TempManager; use crate::daemon::worktree::{is_new_repo_request, ConflictResolution, WorktreeInfo, WorktreeManager}; +use crate::daemon::db::local::LocalDb; use crate::daemon::ws::{BranchInfo, DaemonCommand, DaemonMessage}; /// Generate a secure random API key for orchestrator tool access. @@ -952,6 +956,8 @@ pub struct ManagedTask { pub concurrency_key: Uuid, /// Whether to run in autonomous loop mode. pub autonomous_loop: bool, + /// Task IDs that must complete before this task can start. + pub depends_on: Option<Vec<Uuid>>, /// Time task was created. pub created_at: Instant, /// Time task started running. @@ -994,6 +1000,8 @@ pub struct TaskConfig { pub heartbeat_commit_interval_secs: u64, /// Checkpoint patch storage configuration. pub checkpoint_patches: CheckpointPatchConfig, + /// Dependency ordering configuration. + pub dependency_ordering: crate::daemon::config::DependencyOrderingConfig, } impl Default for TaskConfig { @@ -1013,6 +1021,7 @@ impl Default for TaskConfig { api_key: String::new(), heartbeat_commit_interval_secs: 300, // 5 minutes checkpoint_patches: CheckpointPatchConfig::default(), + dependency_ordering: crate::daemon::config::DependencyOrderingConfig::default(), } } } @@ -1045,11 +1054,17 @@ pub struct TaskManager { git_user_email: Arc<RwLock<Option<String>>>, /// Inherited git user.name for worktrees. git_user_name: Arc<RwLock<Option<String>>>, + /// Local SQLite database for crash recovery. + local_db: Arc<std::sync::Mutex<LocalDb>>, } impl TaskManager { - /// Create a new task manager. - pub fn new(config: TaskConfig, ws_tx: mpsc::Sender<DaemonMessage>) -> Self { + /// Create a new task manager with local database for crash recovery. + pub fn new( + config: TaskConfig, + ws_tx: mpsc::Sender<DaemonMessage>, + local_db: Arc<std::sync::Mutex<LocalDb>>, + ) -> Self { let worktree_manager = Arc::new(WorktreeManager::new(config.worktree_base_dir.clone())); let process_manager = Arc::new( ProcessManager::with_command(config.claude_command.clone()) @@ -1075,9 +1090,239 @@ impl TaskManager { active_pids: Arc::new(RwLock::new(HashMap::new())), git_user_email: Arc::new(RwLock::new(None)), git_user_name: Arc::new(RwLock::new(None)), + local_db, } } + /// Persist task state to local SQLite database for crash recovery. + fn persist_task_to_local_db(&self, task: &ManagedTask) { + use crate::daemon::db::local::LocalTask; + + let local_task = LocalTask { + id: task.id, + server_task_id: task.id, // Same as task id + state: task.state.clone(), + container_id: None, + overlay_path: task.worktree.as_ref().map(|w| w.path.to_string_lossy().to_string()), + repo_url: task.repo_source.clone(), + base_branch: task.base_branch.clone(), + plan: task.plan.clone(), + created_at: chrono::Utc::now(), + started_at: task.started_at.map(|_| chrono::Utc::now()), + completed_at: task.completed_at.map(|_| chrono::Utc::now()), + error_message: task.error.clone(), + }; + + if let Ok(db) = self.local_db.lock() { + if let Err(e) = db.save_task(&local_task) { + tracing::warn!(task_id = %task.id, error = %e, "Failed to persist task to local database"); + } else { + tracing::debug!(task_id = %task.id, state = ?task.state, "Persisted task to local database"); + } + } + } + + /// Remove completed/failed task from local database. + fn remove_task_from_local_db(&self, task_id: Uuid) { + if let Ok(db) = self.local_db.lock() { + if let Err(e) = db.delete_task(task_id) { + tracing::warn!(task_id = %task_id, error = %e, "Failed to remove task from local database"); + } else { + tracing::debug!(task_id = %task_id, "Removed task from local database"); + } + } + } + + /// Recover orphaned tasks from local database after daemon restart. + /// Returns list of task IDs that have worktrees and can potentially be recovered. + pub async fn recover_orphaned_tasks(&self) -> Vec<Uuid> { + tracing::info!("=== STARTING ORPHANED TASK RECOVERY ==="); + + let active_tasks = { + let db = match self.local_db.lock() { + Ok(db) => db, + Err(e) => { + tracing::error!(error = %e, "Failed to lock local database for recovery"); + return Vec::new(); + } + }; + + match db.get_active_tasks() { + Ok(tasks) => tasks, + Err(e) => { + tracing::error!(error = %e, "Failed to load active tasks from local database"); + return Vec::new(); + } + } + }; + + if active_tasks.is_empty() { + tracing::info!("No orphaned tasks found in local database"); + return Vec::new(); + } + + tracing::info!(count = active_tasks.len(), "Found orphaned tasks in local database"); + + let mut recoverable_task_ids = Vec::new(); + + for local_task in active_tasks { + tracing::info!( + task_id = %local_task.id, + state = ?local_task.state, + overlay_path = ?local_task.overlay_path, + "Checking orphaned task" + ); + + // Check if worktree exists on filesystem + let worktree_exists = if let Some(ref path) = local_task.overlay_path { + let path = std::path::PathBuf::from(path); + path.exists() && path.join(".git").exists() + } else { + // Try to find worktree by task ID pattern (scan worktrees directory) + let short_id = &local_task.id.to_string()[..8]; + let worktrees_dir = self.worktree_manager.base_dir(); + let mut found = false; + + if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await { + while let Ok(Some(entry)) = entries.next_entry().await { + let name = entry.file_name(); + let name_str = name.to_string_lossy(); + if name_str.starts_with(short_id) { + let path = entry.path(); + if path.join(".git").exists() { + found = true; + break; + } + } + } + } + found + }; + + if worktree_exists { + tracing::info!( + task_id = %local_task.id, + "Found worktree for orphaned task - can be recovered" + ); + recoverable_task_ids.push(local_task.id); + + // Send structured recovery notification to server + let msg = DaemonMessage::task_recovery_detected( + local_task.id, + local_task.state.as_str(), + true, // worktree intact + local_task.overlay_path.clone(), + false, // doesn't need patch since worktree is intact + ); + let _ = self.ws_tx.send(msg).await; + } else { + tracing::warn!( + task_id = %local_task.id, + "Worktree missing for orphaned task - marking as lost" + ); + + // Update local db to mark as failed + if let Ok(db) = self.local_db.lock() { + let _ = db.update_task_state(local_task.id, TaskState::Failed); + } + } + } + + tracing::info!( + recoverable = recoverable_task_ids.len(), + "=== ORPHANED TASK RECOVERY COMPLETE ===" + ); + + recoverable_task_ids + } + + /// Check worktree health for all running tasks. + /// If a worktree is missing, marks the task as interrupted and notifies the server. + /// This allows the retry orchestrator to pick up the task and restore it from checkpoint. + pub async fn check_worktree_health(&self) -> Vec<Uuid> { + let mut affected_task_ids = Vec::new(); + + // Get all running tasks + let tasks_snapshot: Vec<(Uuid, Option<PathBuf>)> = { + let tasks = self.tasks.read().await; + tasks + .iter() + .filter(|(_, t)| matches!(t.state, TaskState::Running | TaskState::Starting)) + .map(|(id, t)| (*id, t.worktree.as_ref().map(|w| w.path.clone()))) + .collect() + }; + + if tasks_snapshot.is_empty() { + return affected_task_ids; + } + + for (task_id, worktree_path) in tasks_snapshot { + let worktree_exists = if let Some(ref path) = worktree_path { + path.exists() && path.join(".git").exists() + } else { + // No worktree set - scan by task ID + let short_id = &task_id.to_string()[..8]; + let worktrees_dir = self.worktree_manager.base_dir(); + let mut found = false; + + if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await { + while let Ok(Some(entry)) = entries.next_entry().await { + let name = entry.file_name(); + let name_str = name.to_string_lossy(); + if name_str.starts_with(short_id) { + let path = entry.path(); + if path.join(".git").exists() { + found = true; + break; + } + } + } + } + found + }; + + if !worktree_exists { + tracing::warn!( + task_id = %task_id, + worktree_path = ?worktree_path, + "Worktree missing for running task - marking as interrupted for retry" + ); + + affected_task_ids.push(task_id); + + // Update task state to interrupted + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.state = TaskState::Interrupted; + task.error = Some("Worktree directory was deleted".to_string()); + task.completed_at = Some(Instant::now()); + } + } + + // Notify server - task needs recovery/retry + let msg = DaemonMessage::task_complete( + task_id, + false, + Some("Worktree deleted - task interrupted for recovery".to_string()), + ); + let _ = self.ws_tx.send(msg).await; + + // Remove from local db since server will handle retry + self.remove_task_from_local_db(task_id); + } + } + + if !affected_task_ids.is_empty() { + tracing::info!( + count = affected_task_ids.len(), + "Worktree health check found missing worktrees" + ); + } + + affected_task_ids + } + /// Check if a task can be spawned given contract-based concurrency limits. /// Returns the concurrency key to use (contract_id or task_id for standalone). async fn try_acquire_concurrency_slot( @@ -1413,6 +1658,7 @@ impl TaskManager { conversation_history, patch_data, patch_base_sha, + depends_on, } => { tracing::info!( task_id = %task_id, @@ -1431,6 +1677,7 @@ impl TaskManager { continue_from_task_id = ?continue_from_task_id, copy_files = ?copy_files, contract_id = ?contract_id, + depends_on = ?depends_on, plan_len = plan.len(), "Spawning new task" ); @@ -1439,7 +1686,7 @@ impl TaskManager { parent_task_id, depth, is_orchestrator, is_supervisor, target_repo_path, completion_action, continue_from_task_id, copy_files, contract_id, autonomous_loop, resume_session, - conversation_history, patch_data, patch_base_sha, + conversation_history, patch_data, patch_base_sha, depends_on, ).await?; } DaemonCommand::PauseTask { task_id } => { @@ -1767,6 +2014,7 @@ impl TaskManager { conversation_history: Option<serde_json::Value>, patch_data: Option<String>, patch_base_sha: Option<String>, + depends_on: Option<Vec<Uuid>>, ) -> TaskResult<()> { tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, patch_available = patch_data.is_some(), "=== SPAWN_TASK START ==="); @@ -1795,6 +2043,30 @@ impl TaskManager { let concurrency_key = self.try_acquire_concurrency_slot(contract_id, task_id).await?; tracing::info!(task_id = %task_id, concurrency_key = %concurrency_key, "Concurrency slot acquired"); + // Check dependencies if enabled + if self.config.dependency_ordering.enabled { + if let Some(ref deps) = depends_on { + if !deps.is_empty() { + // Check if all dependencies are complete + let dep_check_result = self.check_task_dependencies(deps, &plan).await; + if !dep_check_result.can_start { + tracing::warn!( + task_id = %task_id, + unmet_deps = ?dep_check_result.unmet_dependencies, + "Task has unmet dependencies - server should not have dispatched this task" + ); + // Note: We don't block here because the server is the authority on scheduling. + // This is just a sanity check and warning. + } + + // Log any warnings from auto-detection + for warning in &dep_check_result.warnings { + tracing::warn!(task_id = %task_id, warning = %warning, "Dependency warning"); + } + } + } + } + // Create task entry tracing::info!(task_id = %task_id, "Creating task entry in state: Initializing"); let task = ManagedTask { @@ -1817,12 +2089,16 @@ impl TaskManager { contract_id, concurrency_key, autonomous_loop, + depends_on: depends_on.clone(), created_at: Instant::now(), started_at: None, completed_at: None, error: None, }; + // Persist task to local database for crash recovery + self.persist_task_to_local_db(&task); + self.tasks.write().await.insert(task_id, task); tracing::info!(task_id = %task_id, "Task entry created and stored"); @@ -1855,6 +2131,74 @@ impl TaskManager { Ok(()) } + /// Check if a task's dependencies are satisfied. + /// + /// This method checks against the daemon's local task state. For full accuracy, + /// the server should be the source of truth for task status. + async fn check_task_dependencies( + &self, + depends_on: &[Uuid], + task_plan: &str, + ) -> super::dependency_analysis::DependencyCheckResult { + // Convert daemon config to dependency analysis config + let config = DependencyOrderingConfig { + enabled: self.config.dependency_ordering.enabled, + auto_detect: self.config.dependency_ordering.auto_detect, + warn_on_violation: self.config.dependency_ordering.warn_on_violation, + }; + + // Build list of dependency tasks from local state + let tasks = self.tasks.read().await; + let dependency_tasks: Vec<TaskDependencyInfo> = depends_on + .iter() + .filter_map(|dep_id| { + tasks.get(dep_id).map(|t| TaskDependencyInfo { + id: *dep_id, + status: match t.state { + TaskState::Completed => "done".to_string(), + TaskState::Failed => "failed".to_string(), + TaskState::Running => "running".to_string(), + TaskState::Initializing | TaskState::Starting => "pending".to_string(), + _ => "unknown".to_string(), + }, + plan: t.plan.clone(), + }) + }) + .collect(); + + // Build list of sibling tasks for auto-detection + let sibling_tasks: Vec<TaskDependencyInfo> = tasks + .values() + .filter(|t| !depends_on.contains(&t.id)) + .map(|t| TaskDependencyInfo { + id: t.id, + status: match t.state { + TaskState::Completed => "done".to_string(), + TaskState::Failed => "failed".to_string(), + TaskState::Running => "running".to_string(), + TaskState::Initializing | TaskState::Starting => "pending".to_string(), + _ => "unknown".to_string(), + }, + plan: t.plan.clone(), + }) + .collect(); + + drop(tasks); + + // Check explicit dependencies + let mut result = can_start_task(depends_on, &dependency_tasks, &config); + + // Add auto-detected warnings + if config.auto_detect && config.warn_on_violation { + let warnings = detect_dependency_violations(task_plan, &sibling_tasks, &config); + for warning in warnings { + result = result.with_warning(warning); + } + } + + result + } + /// Clone inner state for spawned tasks. fn clone_inner(&self) -> TaskManagerInner { TaskManagerInner { @@ -1871,6 +2215,7 @@ impl TaskManager { heartbeat_commit_interval_secs: self.config.heartbeat_commit_interval_secs, contract_task_counts: self.contract_task_counts.clone(), checkpoint_patches: self.config.checkpoint_patches.clone(), + local_db: self.local_db.clone(), } } @@ -3190,6 +3535,8 @@ struct TaskManagerInner { contract_task_counts: Arc<RwLock<HashMap<Uuid, usize>>>, /// Checkpoint patch storage configuration. checkpoint_patches: CheckpointPatchConfig, + /// Local SQLite database for crash recovery. + local_db: Arc<std::sync::Mutex<LocalDb>>, } impl TaskManagerInner { @@ -3210,6 +3557,17 @@ impl TaskManagerInner { } } + /// Remove completed/failed task from local database. + fn remove_task_from_local_db(&self, task_id: Uuid) { + if let Ok(db) = self.local_db.lock() { + if let Err(e) = db.delete_task(task_id) { + tracing::warn!(task_id = %task_id, error = %e, "Failed to remove task from local database"); + } else { + tracing::debug!(task_id = %task_id, "Removed task from local database"); + } + } + } + /// Run a task to completion. #[allow(clippy::too_many_arguments)] async fn run_task( @@ -4375,6 +4733,9 @@ impl TaskManagerInner { tracing::info!(task_id = %task_id, success = success, "Notifying server of task completion"); let msg = DaemonMessage::task_complete(task_id, success, error); let _ = self.ws_tx.send(msg).await; + + // Remove completed task from local database (no longer needs crash recovery) + self.remove_task_from_local_db(task_id); } // Note: Worktrees are kept until explicitly deleted (per user preference) @@ -4578,6 +4939,9 @@ impl TaskManagerInner { // Notify server let msg = DaemonMessage::task_complete(task_id, false, Some(error.to_string())); let _ = self.ws_tx.send(msg).await; + + // Remove failed task from local database + self.remove_task_from_local_db(task_id); } /// Apply inherited git config to a worktree directory. @@ -4837,6 +5201,7 @@ impl Clone for TaskManagerInner { heartbeat_commit_interval_secs: self.heartbeat_commit_interval_secs, contract_task_counts: self.contract_task_counts.clone(), checkpoint_patches: self.checkpoint_patches.clone(), + local_db: self.local_db.clone(), } } } diff --git a/makima/src/daemon/task/mod.rs b/makima/src/daemon/task/mod.rs index 3830e1d..a33655b 100644 --- a/makima/src/daemon/task/mod.rs +++ b/makima/src/daemon/task/mod.rs @@ -1,9 +1,14 @@ //! Task management and execution. pub mod completion_gate; +pub mod dependency_analysis; pub mod manager; pub mod state; pub use completion_gate::CompletionGate; +pub use dependency_analysis::{ + can_start_task, detect_dependency_violations, suggest_dependencies, + DependencyCheckResult, DependencyOrderingConfig, FileCategory, TaskDependencyInfo, +}; pub use manager::{ManagedTask, TaskConfig, TaskManager}; pub use state::TaskState; diff --git a/makima/src/daemon/ws/protocol.rs b/makima/src/daemon/ws/protocol.rs index ec9b09e..7b0a0b4 100644 --- a/makima/src/daemon/ws/protocol.rs +++ b/makima/src/daemon/ws/protocol.rs @@ -60,6 +60,25 @@ pub enum DaemonMessage { error: Option<String>, }, + /// Task recovery detected after daemon restart. + /// Sent when daemon finds orphaned tasks that can be recovered. + TaskRecoveryDetected { + #[serde(rename = "taskId")] + task_id: Uuid, + /// Previous state of the task before daemon restart. + #[serde(rename = "previousState")] + previous_state: String, + /// Whether the worktree is still intact. + #[serde(rename = "worktreeIntact")] + worktree_intact: bool, + /// Path to the worktree if available. + #[serde(rename = "worktreePath")] + worktree_path: Option<String>, + /// Whether the task needs a checkpoint patch for recovery. + #[serde(rename = "needsPatch")] + needs_patch: bool, + }, + /// Register a tool key for orchestrator API access. RegisterToolKey { #[serde(rename = "taskId")] @@ -403,6 +422,10 @@ pub enum DaemonCommand { /// Commit SHA to apply the patch on top of. #[serde(rename = "patchBaseSha", default, skip_serializing_if = "Option::is_none")] patch_base_sha: Option<String>, + /// Task IDs that must complete before this task can start. + /// Used for enforcing execution order: schema changes -> backend -> UI. + #[serde(rename = "dependsOn", default, skip_serializing_if = "Option::is_none")] + depends_on: Option<Vec<Uuid>>, }, /// Pause a running task. @@ -698,6 +721,23 @@ impl DaemonMessage { } } + /// Create a task recovery detected message. + pub fn task_recovery_detected( + task_id: Uuid, + previous_state: &str, + worktree_intact: bool, + worktree_path: Option<String>, + needs_patch: bool, + ) -> Self { + Self::TaskRecoveryDetected { + task_id, + previous_state: previous_state.to_string(), + worktree_intact, + worktree_path, + needs_patch, + } + } + /// Create a register tool key message. pub fn register_tool_key(task_id: Uuid, key: String) -> Self { Self::RegisterToolKey { task_id, key } diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index 58f4da1..f3977e0 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -531,6 +531,13 @@ pub struct Task { /// Standalone completed tasks can be dismissed by the user. #[serde(default)] pub hidden: bool, + + // Dependency tracking for dependency-ordered execution + /// Task IDs that must complete before this task can start. + /// Used for enforcing execution order: schema changes → backend → UI. + #[serde(default, skip_serializing_if = "Option::is_none")] + #[sqlx(json)] + pub depends_on: Option<Vec<Uuid>>, } impl Task { @@ -611,8 +618,8 @@ pub struct TaskListResponse { } /// Request payload for creating a new task -#[derive(Debug, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] +#[derive(Debug, Default, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase", default)] pub struct CreateTaskRequest { /// Contract this task belongs to (optional for branched/anonymous tasks) pub contract_id: Option<Uuid>, @@ -653,6 +660,10 @@ pub struct CreateTaskRequest { pub branched_from_task_id: Option<Uuid>, /// Conversation history to initialize the task with (JSON array of messages) pub conversation_history: Option<serde_json::Value>, + /// Task IDs that must complete before this task can start. + /// Used for enforcing execution order: schema changes → backend → UI. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub depends_on: Option<Vec<Uuid>>, } /// Request payload for updating a task diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index da44899..d9ba97d 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -684,6 +684,7 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, }; let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default()); + let depends_on_json = req.depends_on.as_ref().map(|d| serde_json::to_value(d).unwrap_or_default()); sqlx::query_as::<_, Task>( r#" @@ -691,9 +692,9 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, contract_id, parent_task_id, depth, name, description, plan, priority, is_supervisor, repository_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action, continue_from_task_id, copy_files, - branched_from_task_id, conversation_state + branched_from_task_id, conversation_state, depends_on ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19) RETURNING * "#, ) @@ -715,6 +716,7 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, .bind(©_files_json) .bind(&req.branched_from_task_id) .bind(&req.conversation_history) + .bind(&depends_on_json) .fetch_one(pool) .await } @@ -823,6 +825,26 @@ pub async fn get_pending_tasks_for_contract( .await } +/// Get all contracts that have pending tasks awaiting retry. +/// Returns tuples of (contract_id, owner_id) for contracts with retryable tasks. +pub async fn get_all_pending_task_contracts( + pool: &PgPool, +) -> Result<Vec<(Uuid, Uuid)>, sqlx::Error> { + sqlx::query_as::<_, (Uuid, Uuid)>( + r#" + SELECT DISTINCT contract_id, owner_id + FROM tasks + WHERE contract_id IS NOT NULL + AND status = 'pending' + AND is_supervisor = false + AND retry_count < max_retries + ORDER BY owner_id, contract_id + "#, + ) + .fetch_all(pool) + .await +} + /// Mark a task as pending for retry after daemon failure. /// Increments retry count and adds the failed daemon to exclusion list. pub async fn mark_task_for_retry( @@ -1075,6 +1097,7 @@ pub async fn create_task_for_owner( }; let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default()); + let depends_on_json = req.depends_on.as_ref().map(|d| serde_json::to_value(d).unwrap_or_default()); sqlx::query_as::<_, Task>( r#" @@ -1082,9 +1105,9 @@ pub async fn create_task_for_owner( owner_id, contract_id, parent_task_id, depth, name, description, plan, priority, is_supervisor, repository_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action, continue_from_task_id, copy_files, - branched_from_task_id, conversation_state + branched_from_task_id, conversation_state, depends_on ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20) RETURNING * "#, ) @@ -1107,6 +1130,7 @@ pub async fn create_task_for_owner( .bind(©_files_json) .bind(&req.branched_from_task_id) .bind(&req.conversation_history) + .bind(&depends_on_json) .fetch_one(pool) .await } diff --git a/makima/src/server/handlers/contract_chat.rs b/makima/src/server/handlers/contract_chat.rs index e2adb72..86f9500 100644 --- a/makima/src/server/handlers/contract_chat.rs +++ b/makima/src/server/handlers/contract_chat.rs @@ -1374,6 +1374,7 @@ async fn handle_contract_request( checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, + depends_on: None, }; match repository::create_task_for_owner(pool, owner_id, create_req).await { @@ -1470,6 +1471,7 @@ async fn handle_contract_request( checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, + depends_on: None, }; match repository::create_task_for_owner(pool, owner_id, create_req).await { @@ -2079,6 +2081,7 @@ async fn handle_contract_request( checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, + depends_on: None, }; match repository::create_task_for_owner(pool, owner_id, create_req).await { @@ -2595,6 +2598,7 @@ async fn handle_contract_request( checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, + depends_on: None, }; if repository::create_task_for_owner(pool, owner_id, task_req).await.is_ok() { diff --git a/makima/src/server/handlers/contracts.rs b/makima/src/server/handlers/contracts.rs index 462b385..d9a47a2 100644 --- a/makima/src/server/handlers/contracts.rs +++ b/makima/src/server/handlers/contracts.rs @@ -298,6 +298,7 @@ pub async fn create_contract( merge_mode: None, branched_from_task_id: None, conversation_history: None, + depends_on: None, }; match repository::create_task_for_owner(pool, auth.owner_id, supervisor_req).await { diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index 240e1f7..f89f067 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -6,6 +6,7 @@ use axum::{ response::IntoResponse, Json, }; +use base64::Engine; use uuid::Uuid; use crate::db::models::{ @@ -2224,6 +2225,7 @@ pub async fn reassign_task( checkpoint_sha: task.last_checkpoint_sha.clone(), branched_from_task_id: None, conversation_history: None, + depends_on: None, }; let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await { @@ -2265,6 +2267,30 @@ pub async fn reassign_task( } }; + // Fetch latest checkpoint patch for worktree recovery during reassignment + let (patch_data, patch_base_sha) = match repository::get_latest_checkpoint_patch(pool, id).await { + Ok(Some(patch)) => { + tracing::info!( + old_task_id = %id, + new_task_id = %new_task.id, + patch_size = patch.patch_size_bytes, + base_sha = %patch.base_commit_sha, + files_count = patch.files_count, + "Including checkpoint patch for task reassignment recovery" + ); + let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data); + (Some(encoded), Some(patch.base_commit_sha)) + } + Ok(None) => { + tracing::debug!(old_task_id = %id, "No checkpoint patch found for reassignment"); + (None, None) + } + Err(e) => { + tracing::warn!(old_task_id = %id, error = %e, "Failed to fetch checkpoint patch for reassignment"); + (None, None) + } + }; + // Send SpawnTask command to daemon for the new task let command = DaemonCommand::SpawnTask { task_id: new_task.id, @@ -2285,8 +2311,8 @@ pub async fn reassign_task( autonomous_loop: false, resume_session: false, conversation_history: None, - patch_data: None, - patch_base_sha: None, + patch_data, + patch_base_sha, }; tracing::info!( @@ -2949,6 +2975,7 @@ pub async fn fork_task( checkpoint_sha: Some(checkpoint.commit_sha.clone()), branched_from_task_id: None, conversation_history: None, + depends_on: None, }; let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await { @@ -3106,6 +3133,7 @@ pub async fn resume_from_checkpoint( checkpoint_sha: Some(checkpoint.commit_sha.clone()), branched_from_task_id: None, conversation_history: None, + depends_on: None, }; let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await { @@ -3441,6 +3469,7 @@ pub async fn branch_task( checkpoint_sha: None, branched_from_task_id: Some(source_task_id), conversation_history, + depends_on: None, }; let task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await { diff --git a/makima/src/server/handlers/mesh_chat.rs b/makima/src/server/handlers/mesh_chat.rs index 1ff0724..157bad0 100644 --- a/makima/src/server/handlers/mesh_chat.rs +++ b/makima/src/server/handlers/mesh_chat.rs @@ -1020,6 +1020,7 @@ async fn handle_mesh_request( checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, + depends_on: None, }; match repository::create_task_for_owner(pool, owner_id, create_req).await { diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 65db373..53ee806 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -291,6 +291,19 @@ pub enum DaemonMessage { success: bool, error: Option<String>, }, + /// Task recovery detected after daemon restart + TaskRecoveryDetected { + #[serde(rename = "taskId")] + task_id: Uuid, + #[serde(rename = "previousState")] + previous_state: String, + #[serde(rename = "worktreeIntact")] + worktree_intact: bool, + #[serde(rename = "worktreePath")] + worktree_path: Option<String>, + #[serde(rename = "needsPatch")] + needs_patch: bool, + }, /// Register a tool key for orchestrator API access RegisterToolKey { #[serde(rename = "taskId")] @@ -990,6 +1003,110 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re }); } } + Ok(DaemonMessage::TaskRecoveryDetected { + task_id, + previous_state, + worktree_intact, + worktree_path, + needs_patch, + }) => { + tracing::info!( + task_id = %task_id, + previous_state = %previous_state, + worktree_intact = worktree_intact, + worktree_path = ?worktree_path, + needs_patch = needs_patch, + "Task recovery detected after daemon restart" + ); + + // Update task in database based on recovery state + if let Some(ref pool) = state.db_pool { + let pool = pool.clone(); + let state = state.clone(); + tokio::spawn(async move { + if worktree_intact { + // Worktree exists - task can be resumed on this daemon + // Update task status to 'pending' so it can be picked up + match sqlx::query( + r#" + UPDATE tasks + SET status = 'pending', + daemon_id = NULL, + error_message = 'Daemon restarted - task ready for resumption', + interrupted_at = NOW(), + updated_at = NOW() + WHERE id = $1 AND owner_id = $2 + RETURNING id + "#, + ) + .bind(task_id) + .bind(owner_id) + .fetch_optional(&pool) + .await + { + Ok(Some(_)) => { + tracing::info!( + task_id = %task_id, + "Task marked as pending for resumption" + ); + state.broadcast_task_update(TaskUpdateNotification { + task_id, + owner_id: Some(owner_id), + version: 0, + status: "pending".into(), + updated_fields: vec![ + "status".into(), + "daemon_id".into(), + "interrupted_at".into(), + ], + updated_by: "daemon_recovery".into(), + }); + } + Ok(None) => { + tracing::warn!( + task_id = %task_id, + "Task not found during recovery update" + ); + } + Err(e) => { + tracing::error!( + task_id = %task_id, + error = %e, + "Failed to update task during recovery" + ); + } + } + } else { + // Worktree missing - mark for retry with patch restoration + match repository::mark_task_for_retry( + &pool, + task_id, + daemon_uuid, // Mark this daemon as failed + ).await { + Ok(Some(_)) => { + tracing::info!( + task_id = %task_id, + "Task marked for retry (worktree missing)" + ); + } + Ok(None) => { + tracing::warn!( + task_id = %task_id, + "Task not found or exceeded retries" + ); + } + Err(e) => { + tracing::error!( + task_id = %task_id, + error = %e, + "Failed to mark task for retry" + ); + } + } + } + }); + } + } Ok(DaemonMessage::Authenticate { .. }) => { // Already authenticated, ignore } diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index 21c9515..3d85c45 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -279,8 +279,9 @@ async fn verify_supervisor_auth( /// Try to start a pending task on an available daemon. /// Returns Ok(Some(task)) if a task was started, Ok(None) if no tasks could be started. -/// For retried tasks, excludes daemons that previously failed the task. -async fn try_start_pending_task( +/// For retried tasks, excludes daemons that previously failed the task and includes +/// checkpoint patch data for worktree recovery. +pub async fn try_start_pending_task( state: &SharedState, contract_id: Uuid, owner_id: Uuid, @@ -348,6 +349,34 @@ async fn try_start_pending_task( } }; + // For retried tasks, fetch checkpoint patch for worktree recovery + let (patch_data, patch_base_sha) = if task.retry_count > 0 { + // This is a retry - try to restore from checkpoint + match repository::get_latest_checkpoint_patch(pool, task.id).await { + Ok(Some(patch)) => { + tracing::info!( + task_id = %task.id, + retry_count = task.retry_count, + patch_size = patch.patch_size_bytes, + base_sha = %patch.base_commit_sha, + "Including checkpoint patch for task retry recovery" + ); + let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data); + (Some(encoded), Some(patch.base_commit_sha)) + } + Ok(None) => { + tracing::debug!(task_id = %task.id, "No checkpoint patch found for retry"); + (None, None) + } + Err(e) => { + tracing::warn!(task_id = %task.id, error = %e, "Failed to fetch checkpoint patch for retry"); + (None, None) + } + } + } else { + (None, None) + }; + // Send spawn command let cmd = DaemonCommand::SpawnTask { task_id: updated_task.id, @@ -366,10 +395,10 @@ async fn try_start_pending_task( contract_id: updated_task.contract_id, is_supervisor: false, autonomous_loop: false, - resume_session: false, + resume_session: task.retry_count > 0, // Use --continue for retried tasks conversation_history: None, - patch_data: None, - patch_base_sha: None, + patch_data, + patch_base_sha, }; if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { @@ -585,6 +614,7 @@ pub async fn spawn_task( copy_files: None, branched_from_task_id: None, conversation_history: None, + depends_on: None, }; // Create task in DB diff --git a/makima/src/server/handlers/transcript_analysis.rs b/makima/src/server/handlers/transcript_analysis.rs index 3b71eca..cc62eb4 100644 --- a/makima/src/server/handlers/transcript_analysis.rs +++ b/makima/src/server/handlers/transcript_analysis.rs @@ -366,6 +366,7 @@ pub async fn create_contract_from_analysis( merge_mode: None, branched_from_task_id: None, conversation_history: None, + depends_on: None, }; if let Ok(t) = repository::create_task_for_owner(pool, auth.owner_id, task_req).await { @@ -535,6 +536,7 @@ pub async fn update_contract_from_analysis( merge_mode: None, branched_from_task_id: None, conversation_history: None, + depends_on: None, }; if let Ok(t) = repository::create_task_for_owner(pool, auth.owner_id, task_req).await { diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index 3a27513..de20569 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -251,6 +251,9 @@ const ANONYMOUS_TASK_MAX_AGE_DAYS: i32 = 7; /// Interval for checkpoint patch cleanup (hourly) const CHECKPOINT_PATCH_CLEANUP_INTERVAL_SECS: u64 = 3600; +// Retry orchestrator checks for pending tasks every 30 seconds +const RETRY_ORCHESTRATOR_INTERVAL_SECS: u64 = 30; + /// Run the HTTP server with graceful shutdown support. /// /// # Arguments @@ -387,6 +390,64 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> { } } }); + + // Clone state and pool for retry orchestrator + let retry_pool = pool.clone(); + let retry_state = state.clone(); + + // Spawn retry orchestrator - periodically retries pending tasks on available daemons + tokio::spawn(async move { + let mut interval = tokio::time::interval( + std::time::Duration::from_secs(RETRY_ORCHESTRATOR_INTERVAL_SECS) + ); + loop { + interval.tick().await; + + // Get all contracts with pending tasks awaiting retry + match crate::db::repository::get_all_pending_task_contracts(&retry_pool).await { + Ok(contract_owners) => { + for (contract_id, owner_id) in contract_owners { + // Try to start a pending task for this contract + match handlers::mesh_supervisor::try_start_pending_task( + &retry_state, + contract_id, + owner_id, + ).await { + Ok(Some(task)) => { + tracing::info!( + task_id = %task.id, + contract_id = %contract_id, + retry_count = task.retry_count, + "Retry orchestrator started pending task" + ); + } + Ok(None) => { + // No tasks could be started (no available daemons, etc.) + } + Err(e) => { + tracing::warn!( + contract_id = %contract_id, + error = %e, + "Retry orchestrator failed to start pending task" + ); + } + } + } + } + Err(e) => { + tracing::warn!( + error = %e, + "Retry orchestrator failed to query pending task contracts" + ); + } + } + } + }); + + tracing::info!( + "Retry orchestrator started (interval: {}s)", + RETRY_ORCHESTRATOR_INTERVAL_SECS + ); } let app = make_router(state); |
