summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-24 04:31:20 +0000
committersoryu <soryu@soryu.co>2026-01-24 04:31:20 +0000
commit0cf6f6a4c5c75c736e6fe3b1c726ef80c0a6c802 (patch)
tree8f08fa5421e13381c5955f52a5197379feb44c58
parentf6f0790217d4098ffb6d2b3df08b0cf83ff61727 (diff)
downloadsoryu-0cf6f6a4c5c75c736e6fe3b1c726ef80c0a6c802.tar.gz
soryu-0cf6f6a4c5c75c736e6fe3b1c726ef80c0a6c802.zip
feat: implement dependency-ordered task execution
Add dependency tracking and validation for tasks to enforce execution order (schema changes → backend → UI) as specified in Section 1.3 of ralph-features-spec.md. Changes: - Add depends_on field to Task model (Vec<Uuid>) for explicit dependencies - Create database migration for depends_on column with GIN index - Add dependency_analysis.rs module with: - can_start_task() for checking if all dependencies are complete - Auto-detection of dependency patterns from file paths - Detection of schema/types/backend/UI categories - Warnings for potential dependency violations - Add DependencyOrderingConfig to daemon config with: - enabled: Enable/disable dependency checking - auto_detect: Auto-detect dependencies from file patterns - warn_on_violation: Warn on detected violations - Integrate dependency checks into task manager scheduling - Add depends_on to DaemonCommand::SpawnTask protocol The daemon performs dependency validation as a sanity check but defers to the server for authoritative scheduling decisions. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
-rw-r--r--makima/migrations/20250124000000_add_task_depends_on.sql11
-rw-r--r--makima/src/bin/makima.rs29
-rw-r--r--makima/src/daemon/config.rs39
-rw-r--r--makima/src/daemon/task/dependency_analysis.rs483
-rw-r--r--makima/src/daemon/task/manager.rs371
-rw-r--r--makima/src/daemon/task/mod.rs5
-rw-r--r--makima/src/daemon/ws/protocol.rs40
-rw-r--r--makima/src/db/models.rs15
-rw-r--r--makima/src/db/repository.rs32
-rw-r--r--makima/src/server/handlers/contract_chat.rs4
-rw-r--r--makima/src/server/handlers/contracts.rs1
-rw-r--r--makima/src/server/handlers/mesh.rs33
-rw-r--r--makima/src/server/handlers/mesh_chat.rs1
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs117
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs40
-rw-r--r--makima/src/server/handlers/transcript_analysis.rs2
-rw-r--r--makima/src/server/mod.rs61
17 files changed, 1265 insertions, 19 deletions
diff --git a/makima/migrations/20250124000000_add_task_depends_on.sql b/makima/migrations/20250124000000_add_task_depends_on.sql
new file mode 100644
index 0000000..4327bbe
--- /dev/null
+++ b/makima/migrations/20250124000000_add_task_depends_on.sql
@@ -0,0 +1,11 @@
+-- Add depends_on column for dependency-ordered task execution
+-- This allows tasks to declare dependencies on other tasks that must complete first.
+-- Used for enforcing execution order: schema changes -> backend -> UI.
+
+ALTER TABLE tasks
+ADD COLUMN IF NOT EXISTS depends_on JSONB DEFAULT '[]'::jsonb;
+
+-- Index for querying tasks by their dependencies
+CREATE INDEX IF NOT EXISTS idx_tasks_depends_on ON tasks USING GIN (depends_on);
+
+COMMENT ON COLUMN tasks.depends_on IS 'Array of task UUIDs that must complete before this task can start';
diff --git a/makima/src/bin/makima.rs b/makima/src/bin/makima.rs
index 1a307d1..e2072da 100644
--- a/makima/src/bin/makima.rs
+++ b/makima/src/bin/makima.rs
@@ -165,7 +165,7 @@ async fn run_daemon(
"[3/5] Opening local database: {}",
config.local_db.path.display()
);
- let _local_db = LocalDb::open(&config.local_db.path)?;
+ let local_db = Arc::new(std::sync::Mutex::new(LocalDb::open(&config.local_db.path)?));
eprintln!(" Database opened");
// Initialize worktree directories
@@ -242,10 +242,17 @@ async fn run_daemon(
api_key: config.server.api_key.clone(),
heartbeat_commit_interval_secs: config.process.heartbeat_commit_interval_secs,
checkpoint_patches: config.process.checkpoint_patches.clone(),
+ dependency_ordering: config.dependency_ordering.clone(),
};
- // Create task manager
- let task_manager = Arc::new(TaskManager::new(task_config, ws_tx.clone()));
+ // Create task manager with local database for crash recovery
+ let task_manager = Arc::new(TaskManager::new(task_config, ws_tx.clone(), local_db));
+
+ // Recover any orphaned tasks from previous daemon run
+ let recovered = task_manager.recover_orphaned_tasks().await;
+ if !recovered.is_empty() {
+ eprintln!(" Recovered {} orphaned tasks with intact worktrees", recovered.len());
+ }
// Spawn command handler
let task_manager_clone = task_manager.clone();
@@ -260,6 +267,22 @@ async fn run_daemon(
tracing::info!("Command handler stopped");
});
+ // Spawn periodic worktree health check (every 60 seconds)
+ let health_check_manager = task_manager.clone();
+ tokio::spawn(async move {
+ let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
+ loop {
+ interval.tick().await;
+ let affected = health_check_manager.check_worktree_health().await;
+ if !affected.is_empty() {
+ tracing::info!(
+ count = affected.len(),
+ "Worktree health check detected missing worktrees - tasks marked for retry"
+ );
+ }
+ }
+ });
+
// Handle shutdown signals
let shutdown_signal = async {
tokio::signal::ctrl_c()
diff --git a/makima/src/daemon/config.rs b/makima/src/daemon/config.rs
index b7cb1e8..4408fff 100644
--- a/makima/src/daemon/config.rs
+++ b/makima/src/daemon/config.rs
@@ -63,6 +63,44 @@ pub struct DaemonConfig {
/// Repositories to auto-clone on startup.
#[serde(default)]
pub repos: ReposConfig,
+
+ /// Dependency ordering settings for task execution.
+ #[serde(default)]
+ pub dependency_ordering: DependencyOrderingConfig,
+}
+
+/// Dependency ordering configuration.
+/// Controls how task dependencies are validated and auto-detected.
+#[derive(Debug, Clone, Deserialize)]
+#[serde(default)]
+pub struct DependencyOrderingConfig {
+ /// Enable dependency ordering checks.
+ /// When enabled, tasks with unmet dependencies cannot start.
+ #[serde(default = "default_true")]
+ pub enabled: bool,
+
+ /// Auto-detect dependencies from file patterns.
+ /// Analyzes task plans to detect potential dependencies based on:
+ /// - Migration files -> backend code
+ /// - Types/models -> consumers
+ /// - APIs -> UI components
+ #[serde(default = "default_true")]
+ pub auto_detect: bool,
+
+ /// Warn on detected dependency violations.
+ /// Produces warnings when tasks may be executing out of order.
+ #[serde(default = "default_true")]
+ pub warn_on_violation: bool,
+}
+
+impl Default for DependencyOrderingConfig {
+ fn default() -> Self {
+ Self {
+ enabled: true,
+ auto_detect: true,
+ warn_on_violation: true,
+ }
+ }
}
/// Server connection configuration.
@@ -626,6 +664,7 @@ impl DaemonConfig {
},
logging: LoggingConfig::default(),
repos: ReposConfig::default(),
+ dependency_ordering: DependencyOrderingConfig::default(),
}
}
}
diff --git a/makima/src/daemon/task/dependency_analysis.rs b/makima/src/daemon/task/dependency_analysis.rs
new file mode 100644
index 0000000..0891f4e
--- /dev/null
+++ b/makima/src/daemon/task/dependency_analysis.rs
@@ -0,0 +1,483 @@
+//! Dependency analysis for task execution ordering.
+//!
+//! This module provides functionality to:
+//! - Check if a task's dependencies are satisfied before it can start
+//! - Auto-detect dependency patterns based on file patterns
+//! - Warn about potential dependency violations
+
+use std::collections::HashSet;
+use uuid::Uuid;
+
+/// Dependency ordering configuration.
+#[derive(Debug, Clone)]
+pub struct DependencyOrderingConfig {
+ /// Enable dependency ordering checks.
+ pub enabled: bool,
+ /// Auto-detect dependencies from file patterns.
+ pub auto_detect: bool,
+ /// Warn on detected dependency violations.
+ pub warn_on_violation: bool,
+}
+
+impl Default for DependencyOrderingConfig {
+ fn default() -> Self {
+ Self {
+ enabled: true,
+ auto_detect: true,
+ warn_on_violation: true,
+ }
+ }
+}
+
+/// Result of a dependency check.
+#[derive(Debug, Clone)]
+pub struct DependencyCheckResult {
+ /// Whether the task can start (all dependencies satisfied).
+ pub can_start: bool,
+ /// IDs of tasks that are still pending/running.
+ pub unmet_dependencies: Vec<Uuid>,
+ /// Warnings about potential dependency issues.
+ pub warnings: Vec<String>,
+}
+
+impl DependencyCheckResult {
+ /// Create a result indicating the task can start.
+ pub fn can_start() -> Self {
+ Self {
+ can_start: true,
+ unmet_dependencies: Vec::new(),
+ warnings: Vec::new(),
+ }
+ }
+
+ /// Create a result indicating the task cannot start due to unmet dependencies.
+ pub fn blocked(unmet: Vec<Uuid>) -> Self {
+ Self {
+ can_start: false,
+ unmet_dependencies: unmet,
+ warnings: Vec::new(),
+ }
+ }
+
+ /// Add a warning to the result.
+ pub fn with_warning(mut self, warning: String) -> Self {
+ self.warnings.push(warning);
+ self
+ }
+}
+
+/// File pattern categories for dependency detection.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub enum FileCategory {
+ /// Schema/migration files (must run first).
+ Schema,
+ /// Type/model definitions.
+ Types,
+ /// Backend/API code.
+ Backend,
+ /// UI components (must run last).
+ Ui,
+ /// Unknown/uncategorized files.
+ Unknown,
+}
+
+impl FileCategory {
+ /// Determine the category of a file based on its path.
+ pub fn from_path(path: &str) -> Self {
+ let path_lower = path.to_lowercase();
+
+ // Schema/migration patterns
+ if path_lower.contains("migration")
+ || path_lower.contains("schema")
+ || path_lower.ends_with(".sql")
+ || path_lower.contains("prisma/schema")
+ || path_lower.contains("drizzle")
+ {
+ return FileCategory::Schema;
+ }
+
+ // Type/model patterns
+ if path_lower.contains("/types")
+ || path_lower.contains("/models")
+ || path_lower.contains("/entities")
+ || path_lower.ends_with(".d.ts")
+ || path_lower.ends_with("types.ts")
+ || path_lower.ends_with("types.rs")
+ || path_lower.ends_with("models.rs")
+ {
+ return FileCategory::Types;
+ }
+
+ // UI patterns
+ if path_lower.contains("/components")
+ || path_lower.contains("/views")
+ || path_lower.contains("/pages")
+ || path_lower.contains("/ui/")
+ || path_lower.ends_with(".tsx")
+ || path_lower.ends_with(".jsx")
+ || path_lower.ends_with(".vue")
+ || path_lower.ends_with(".svelte")
+ {
+ return FileCategory::Ui;
+ }
+
+ // Backend patterns
+ if path_lower.contains("/api")
+ || path_lower.contains("/server")
+ || path_lower.contains("/handlers")
+ || path_lower.contains("/services")
+ || path_lower.contains("/controllers")
+ || path_lower.contains("/routes")
+ || path_lower.ends_with(".rs")
+ || path_lower.ends_with(".go")
+ || path_lower.ends_with(".py")
+ {
+ return FileCategory::Backend;
+ }
+
+ FileCategory::Unknown
+ }
+
+ /// Get the expected execution order (lower = earlier).
+ pub fn execution_order(&self) -> u8 {
+ match self {
+ FileCategory::Schema => 0,
+ FileCategory::Types => 1,
+ FileCategory::Backend => 2,
+ FileCategory::Ui => 3,
+ FileCategory::Unknown => 2, // Treat unknown as backend-level
+ }
+ }
+}
+
+/// Analyze a task's plan to determine what file categories it affects.
+pub fn analyze_task_files(plan: &str) -> HashSet<FileCategory> {
+ let mut categories = HashSet::new();
+
+ // Simple heuristic: look for file patterns in the plan text
+ let patterns_schema = [
+ "migration", "schema", ".sql", "prisma", "drizzle", "database",
+ "ALTER TABLE", "CREATE TABLE", "DROP TABLE",
+ ];
+ let patterns_types = [
+ "type ", "interface ", "struct ", "model ", "entity",
+ "types.ts", "types.rs", "models.rs", ".d.ts",
+ ];
+ let patterns_backend = [
+ "api", "endpoint", "handler", "controller", "route", "service",
+ "server", "backend", "REST", "GraphQL",
+ ];
+ let patterns_ui = [
+ "component", "view", "page", "ui ", "frontend", "react",
+ ".tsx", ".jsx", ".vue", ".svelte", "button", "form",
+ ];
+
+ let plan_lower = plan.to_lowercase();
+
+ for pattern in patterns_schema {
+ if plan_lower.contains(&pattern.to_lowercase()) {
+ categories.insert(FileCategory::Schema);
+ break;
+ }
+ }
+
+ for pattern in patterns_types {
+ if plan_lower.contains(&pattern.to_lowercase()) {
+ categories.insert(FileCategory::Types);
+ break;
+ }
+ }
+
+ for pattern in patterns_backend {
+ if plan_lower.contains(&pattern.to_lowercase()) {
+ categories.insert(FileCategory::Backend);
+ break;
+ }
+ }
+
+ for pattern in patterns_ui {
+ if plan_lower.contains(&pattern.to_lowercase()) {
+ categories.insert(FileCategory::Ui);
+ break;
+ }
+ }
+
+ categories
+}
+
+/// Information about a task's status for dependency checking.
+#[derive(Debug, Clone)]
+pub struct TaskDependencyInfo {
+ pub id: Uuid,
+ pub status: String,
+ pub plan: String,
+}
+
+impl TaskDependencyInfo {
+ /// Check if this task is considered "complete" for dependency purposes.
+ pub fn is_complete(&self) -> bool {
+ matches!(self.status.as_str(), "done" | "merged")
+ }
+}
+
+/// Check if a task can start based on its explicit dependencies.
+///
+/// Returns a `DependencyCheckResult` indicating whether the task can start
+/// and listing any unmet dependencies.
+pub fn can_start_task(
+ depends_on: &[Uuid],
+ dependency_tasks: &[TaskDependencyInfo],
+ _config: &DependencyOrderingConfig,
+) -> DependencyCheckResult {
+ // If no explicit dependencies, the task can start
+ if depends_on.is_empty() {
+ return DependencyCheckResult::can_start();
+ }
+
+ let complete_task_ids: HashSet<Uuid> = dependency_tasks
+ .iter()
+ .filter(|t| t.is_complete())
+ .map(|t| t.id)
+ .collect();
+
+ let unmet: Vec<Uuid> = depends_on
+ .iter()
+ .filter(|dep_id| !complete_task_ids.contains(dep_id))
+ .copied()
+ .collect();
+
+ if unmet.is_empty() {
+ DependencyCheckResult::can_start()
+ } else {
+ DependencyCheckResult::blocked(unmet)
+ }
+}
+
+/// Auto-detect potential dependency violations based on file patterns.
+///
+/// This analyzes the task's plan and compares it with sibling tasks to warn
+/// if execution order might be problematic.
+pub fn detect_dependency_violations(
+ task_plan: &str,
+ sibling_tasks: &[TaskDependencyInfo],
+ config: &DependencyOrderingConfig,
+) -> Vec<String> {
+ if !config.auto_detect || !config.warn_on_violation {
+ return Vec::new();
+ }
+
+ let mut warnings = Vec::new();
+ let task_categories = analyze_task_files(task_plan);
+
+ // Find the minimum execution order this task touches
+ let task_min_order = task_categories
+ .iter()
+ .map(|c| c.execution_order())
+ .min()
+ .unwrap_or(u8::MAX);
+
+ // Check if any pending/running sibling tasks should run first
+ for sibling in sibling_tasks {
+ if sibling.is_complete() {
+ continue;
+ }
+
+ let sibling_categories = analyze_task_files(&sibling.plan);
+ let sibling_min_order = sibling_categories
+ .iter()
+ .map(|c| c.execution_order())
+ .min()
+ .unwrap_or(u8::MAX);
+
+ // Warn if this task touches "earlier" categories while sibling touches "later" ones
+ // and the sibling is still pending
+ if sibling_min_order < task_min_order {
+ let sibling_cat_names: Vec<&str> = sibling_categories
+ .iter()
+ .map(|c| match c {
+ FileCategory::Schema => "schema/migrations",
+ FileCategory::Types => "types/models",
+ FileCategory::Backend => "backend/API",
+ FileCategory::Ui => "UI components",
+ FileCategory::Unknown => "other",
+ })
+ .collect();
+
+ let task_cat_names: Vec<&str> = task_categories
+ .iter()
+ .map(|c| match c {
+ FileCategory::Schema => "schema/migrations",
+ FileCategory::Types => "types/models",
+ FileCategory::Backend => "backend/API",
+ FileCategory::Ui => "UI components",
+ FileCategory::Unknown => "other",
+ })
+ .collect();
+
+ warnings.push(format!(
+ "Task may depend on sibling task {} which affects {} (this task affects {})",
+ sibling.id,
+ sibling_cat_names.join(", "),
+ task_cat_names.join(", ")
+ ));
+ }
+ }
+
+ warnings
+}
+
+/// Suggest automatic dependencies based on file pattern analysis.
+///
+/// This analyzes tasks in a contract and suggests which tasks should depend on others
+/// based on the Ralph pattern: schema -> types -> backend -> UI.
+pub fn suggest_dependencies(
+ task_plan: &str,
+ sibling_tasks: &[TaskDependencyInfo],
+) -> Vec<Uuid> {
+ let mut suggested = Vec::new();
+ let task_categories = analyze_task_files(task_plan);
+
+ // Get this task's minimum execution order
+ let task_min_order = task_categories
+ .iter()
+ .map(|c| c.execution_order())
+ .min()
+ .unwrap_or(u8::MAX);
+
+ // Suggest dependencies on tasks that touch "earlier" categories
+ for sibling in sibling_tasks {
+ let sibling_categories = analyze_task_files(&sibling.plan);
+ let sibling_max_order = sibling_categories
+ .iter()
+ .map(|c| c.execution_order())
+ .max()
+ .unwrap_or(0);
+
+ // If sibling's work is at an earlier stage, suggest it as a dependency
+ if sibling_max_order < task_min_order {
+ suggested.push(sibling.id);
+ }
+ }
+
+ suggested
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_file_category_from_path() {
+ assert_eq!(
+ FileCategory::from_path("migrations/001_create_users.sql"),
+ FileCategory::Schema
+ );
+ assert_eq!(
+ FileCategory::from_path("src/types/user.ts"),
+ FileCategory::Types
+ );
+ assert_eq!(
+ FileCategory::from_path("src/api/users.rs"),
+ FileCategory::Backend
+ );
+ assert_eq!(
+ FileCategory::from_path("src/components/UserProfile.tsx"),
+ FileCategory::Ui
+ );
+ }
+
+ #[test]
+ fn test_execution_order() {
+ assert!(FileCategory::Schema.execution_order() < FileCategory::Types.execution_order());
+ assert!(FileCategory::Types.execution_order() < FileCategory::Backend.execution_order());
+ assert!(FileCategory::Backend.execution_order() < FileCategory::Ui.execution_order());
+ }
+
+ #[test]
+ fn test_analyze_task_files() {
+ let plan = "Create a new migration to add the users table";
+ let categories = analyze_task_files(plan);
+ assert!(categories.contains(&FileCategory::Schema));
+
+ let plan = "Implement the UserProfile component with React";
+ let categories = analyze_task_files(plan);
+ assert!(categories.contains(&FileCategory::Ui));
+ }
+
+ #[test]
+ fn test_can_start_task_no_deps() {
+ let result = can_start_task(&[], &[], &DependencyOrderingConfig::default());
+ assert!(result.can_start);
+ assert!(result.unmet_dependencies.is_empty());
+ }
+
+ #[test]
+ fn test_can_start_task_with_complete_deps() {
+ let dep_id = Uuid::new_v4();
+ let dep_task = TaskDependencyInfo {
+ id: dep_id,
+ status: "done".to_string(),
+ plan: "Some completed task".to_string(),
+ };
+
+ let result = can_start_task(
+ &[dep_id],
+ &[dep_task],
+ &DependencyOrderingConfig::default(),
+ );
+ assert!(result.can_start);
+ assert!(result.unmet_dependencies.is_empty());
+ }
+
+ #[test]
+ fn test_can_start_task_with_pending_deps() {
+ let dep_id = Uuid::new_v4();
+ let dep_task = TaskDependencyInfo {
+ id: dep_id,
+ status: "running".to_string(),
+ plan: "Some running task".to_string(),
+ };
+
+ let result = can_start_task(
+ &[dep_id],
+ &[dep_task],
+ &DependencyOrderingConfig::default(),
+ );
+ assert!(!result.can_start);
+ assert_eq!(result.unmet_dependencies, vec![dep_id]);
+ }
+
+ #[test]
+ fn test_detect_dependency_violations() {
+ let ui_task_plan = "Create the UserProfile component";
+ let schema_sibling = TaskDependencyInfo {
+ id: Uuid::new_v4(),
+ status: "pending".to_string(),
+ plan: "Add migration to create users table".to_string(),
+ };
+
+ let warnings = detect_dependency_violations(
+ ui_task_plan,
+ &[schema_sibling],
+ &DependencyOrderingConfig::default(),
+ );
+
+ // Should warn that UI task might depend on schema task
+ assert!(!warnings.is_empty());
+ }
+
+ #[test]
+ fn test_suggest_dependencies() {
+ let schema_task = TaskDependencyInfo {
+ id: Uuid::new_v4(),
+ status: "pending".to_string(),
+ plan: "Add migration to create users table".to_string(),
+ };
+
+ let ui_task_plan = "Create the UserProfile component";
+ let suggested = suggest_dependencies(ui_task_plan, &[schema_task.clone()]);
+
+ // Should suggest schema task as a dependency for UI task
+ assert!(suggested.contains(&schema_task.id));
+ }
+}
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs
index cb4bde2..df77171 100644
--- a/makima/src/daemon/task/manager.rs
+++ b/makima/src/daemon/task/manager.rs
@@ -14,6 +14,9 @@ use uuid::Uuid;
use std::collections::HashSet;
use super::completion_gate::{CircuitBreaker, CompletionGate};
+use super::dependency_analysis::{
+ can_start_task, detect_dependency_violations, DependencyOrderingConfig, TaskDependencyInfo,
+};
use super::state::TaskState;
use crate::daemon::config::CheckpointPatchConfig;
use crate::daemon::error::{DaemonError, TaskError, TaskResult};
@@ -21,6 +24,7 @@ use crate::daemon::process::{ClaudeInputMessage, ProcessManager};
use crate::daemon::storage;
use crate::daemon::temp::TempManager;
use crate::daemon::worktree::{is_new_repo_request, ConflictResolution, WorktreeInfo, WorktreeManager};
+use crate::daemon::db::local::LocalDb;
use crate::daemon::ws::{BranchInfo, DaemonCommand, DaemonMessage};
/// Generate a secure random API key for orchestrator tool access.
@@ -952,6 +956,8 @@ pub struct ManagedTask {
pub concurrency_key: Uuid,
/// Whether to run in autonomous loop mode.
pub autonomous_loop: bool,
+ /// Task IDs that must complete before this task can start.
+ pub depends_on: Option<Vec<Uuid>>,
/// Time task was created.
pub created_at: Instant,
/// Time task started running.
@@ -994,6 +1000,8 @@ pub struct TaskConfig {
pub heartbeat_commit_interval_secs: u64,
/// Checkpoint patch storage configuration.
pub checkpoint_patches: CheckpointPatchConfig,
+ /// Dependency ordering configuration.
+ pub dependency_ordering: crate::daemon::config::DependencyOrderingConfig,
}
impl Default for TaskConfig {
@@ -1013,6 +1021,7 @@ impl Default for TaskConfig {
api_key: String::new(),
heartbeat_commit_interval_secs: 300, // 5 minutes
checkpoint_patches: CheckpointPatchConfig::default(),
+ dependency_ordering: crate::daemon::config::DependencyOrderingConfig::default(),
}
}
}
@@ -1045,11 +1054,17 @@ pub struct TaskManager {
git_user_email: Arc<RwLock<Option<String>>>,
/// Inherited git user.name for worktrees.
git_user_name: Arc<RwLock<Option<String>>>,
+ /// Local SQLite database for crash recovery.
+ local_db: Arc<std::sync::Mutex<LocalDb>>,
}
impl TaskManager {
- /// Create a new task manager.
- pub fn new(config: TaskConfig, ws_tx: mpsc::Sender<DaemonMessage>) -> Self {
+ /// Create a new task manager with local database for crash recovery.
+ pub fn new(
+ config: TaskConfig,
+ ws_tx: mpsc::Sender<DaemonMessage>,
+ local_db: Arc<std::sync::Mutex<LocalDb>>,
+ ) -> Self {
let worktree_manager = Arc::new(WorktreeManager::new(config.worktree_base_dir.clone()));
let process_manager = Arc::new(
ProcessManager::with_command(config.claude_command.clone())
@@ -1075,9 +1090,239 @@ impl TaskManager {
active_pids: Arc::new(RwLock::new(HashMap::new())),
git_user_email: Arc::new(RwLock::new(None)),
git_user_name: Arc::new(RwLock::new(None)),
+ local_db,
}
}
+ /// Persist task state to local SQLite database for crash recovery.
+ fn persist_task_to_local_db(&self, task: &ManagedTask) {
+ use crate::daemon::db::local::LocalTask;
+
+ let local_task = LocalTask {
+ id: task.id,
+ server_task_id: task.id, // Same as task id
+ state: task.state.clone(),
+ container_id: None,
+ overlay_path: task.worktree.as_ref().map(|w| w.path.to_string_lossy().to_string()),
+ repo_url: task.repo_source.clone(),
+ base_branch: task.base_branch.clone(),
+ plan: task.plan.clone(),
+ created_at: chrono::Utc::now(),
+ started_at: task.started_at.map(|_| chrono::Utc::now()),
+ completed_at: task.completed_at.map(|_| chrono::Utc::now()),
+ error_message: task.error.clone(),
+ };
+
+ if let Ok(db) = self.local_db.lock() {
+ if let Err(e) = db.save_task(&local_task) {
+ tracing::warn!(task_id = %task.id, error = %e, "Failed to persist task to local database");
+ } else {
+ tracing::debug!(task_id = %task.id, state = ?task.state, "Persisted task to local database");
+ }
+ }
+ }
+
+ /// Remove completed/failed task from local database.
+ fn remove_task_from_local_db(&self, task_id: Uuid) {
+ if let Ok(db) = self.local_db.lock() {
+ if let Err(e) = db.delete_task(task_id) {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to remove task from local database");
+ } else {
+ tracing::debug!(task_id = %task_id, "Removed task from local database");
+ }
+ }
+ }
+
+ /// Recover orphaned tasks from local database after daemon restart.
+ /// Returns list of task IDs that have worktrees and can potentially be recovered.
+ pub async fn recover_orphaned_tasks(&self) -> Vec<Uuid> {
+ tracing::info!("=== STARTING ORPHANED TASK RECOVERY ===");
+
+ let active_tasks = {
+ let db = match self.local_db.lock() {
+ Ok(db) => db,
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to lock local database for recovery");
+ return Vec::new();
+ }
+ };
+
+ match db.get_active_tasks() {
+ Ok(tasks) => tasks,
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to load active tasks from local database");
+ return Vec::new();
+ }
+ }
+ };
+
+ if active_tasks.is_empty() {
+ tracing::info!("No orphaned tasks found in local database");
+ return Vec::new();
+ }
+
+ tracing::info!(count = active_tasks.len(), "Found orphaned tasks in local database");
+
+ let mut recoverable_task_ids = Vec::new();
+
+ for local_task in active_tasks {
+ tracing::info!(
+ task_id = %local_task.id,
+ state = ?local_task.state,
+ overlay_path = ?local_task.overlay_path,
+ "Checking orphaned task"
+ );
+
+ // Check if worktree exists on filesystem
+ let worktree_exists = if let Some(ref path) = local_task.overlay_path {
+ let path = std::path::PathBuf::from(path);
+ path.exists() && path.join(".git").exists()
+ } else {
+ // Try to find worktree by task ID pattern (scan worktrees directory)
+ let short_id = &local_task.id.to_string()[..8];
+ let worktrees_dir = self.worktree_manager.base_dir();
+ let mut found = false;
+
+ if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await {
+ while let Ok(Some(entry)) = entries.next_entry().await {
+ let name = entry.file_name();
+ let name_str = name.to_string_lossy();
+ if name_str.starts_with(short_id) {
+ let path = entry.path();
+ if path.join(".git").exists() {
+ found = true;
+ break;
+ }
+ }
+ }
+ }
+ found
+ };
+
+ if worktree_exists {
+ tracing::info!(
+ task_id = %local_task.id,
+ "Found worktree for orphaned task - can be recovered"
+ );
+ recoverable_task_ids.push(local_task.id);
+
+ // Send structured recovery notification to server
+ let msg = DaemonMessage::task_recovery_detected(
+ local_task.id,
+ local_task.state.as_str(),
+ true, // worktree intact
+ local_task.overlay_path.clone(),
+ false, // doesn't need patch since worktree is intact
+ );
+ let _ = self.ws_tx.send(msg).await;
+ } else {
+ tracing::warn!(
+ task_id = %local_task.id,
+ "Worktree missing for orphaned task - marking as lost"
+ );
+
+ // Update local db to mark as failed
+ if let Ok(db) = self.local_db.lock() {
+ let _ = db.update_task_state(local_task.id, TaskState::Failed);
+ }
+ }
+ }
+
+ tracing::info!(
+ recoverable = recoverable_task_ids.len(),
+ "=== ORPHANED TASK RECOVERY COMPLETE ==="
+ );
+
+ recoverable_task_ids
+ }
+
+ /// Check worktree health for all running tasks.
+ /// If a worktree is missing, marks the task as interrupted and notifies the server.
+ /// This allows the retry orchestrator to pick up the task and restore it from checkpoint.
+ pub async fn check_worktree_health(&self) -> Vec<Uuid> {
+ let mut affected_task_ids = Vec::new();
+
+ // Get all running tasks
+ let tasks_snapshot: Vec<(Uuid, Option<PathBuf>)> = {
+ let tasks = self.tasks.read().await;
+ tasks
+ .iter()
+ .filter(|(_, t)| matches!(t.state, TaskState::Running | TaskState::Starting))
+ .map(|(id, t)| (*id, t.worktree.as_ref().map(|w| w.path.clone())))
+ .collect()
+ };
+
+ if tasks_snapshot.is_empty() {
+ return affected_task_ids;
+ }
+
+ for (task_id, worktree_path) in tasks_snapshot {
+ let worktree_exists = if let Some(ref path) = worktree_path {
+ path.exists() && path.join(".git").exists()
+ } else {
+ // No worktree set - scan by task ID
+ let short_id = &task_id.to_string()[..8];
+ let worktrees_dir = self.worktree_manager.base_dir();
+ let mut found = false;
+
+ if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await {
+ while let Ok(Some(entry)) = entries.next_entry().await {
+ let name = entry.file_name();
+ let name_str = name.to_string_lossy();
+ if name_str.starts_with(short_id) {
+ let path = entry.path();
+ if path.join(".git").exists() {
+ found = true;
+ break;
+ }
+ }
+ }
+ }
+ found
+ };
+
+ if !worktree_exists {
+ tracing::warn!(
+ task_id = %task_id,
+ worktree_path = ?worktree_path,
+ "Worktree missing for running task - marking as interrupted for retry"
+ );
+
+ affected_task_ids.push(task_id);
+
+ // Update task state to interrupted
+ {
+ let mut tasks = self.tasks.write().await;
+ if let Some(task) = tasks.get_mut(&task_id) {
+ task.state = TaskState::Interrupted;
+ task.error = Some("Worktree directory was deleted".to_string());
+ task.completed_at = Some(Instant::now());
+ }
+ }
+
+ // Notify server - task needs recovery/retry
+ let msg = DaemonMessage::task_complete(
+ task_id,
+ false,
+ Some("Worktree deleted - task interrupted for recovery".to_string()),
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ // Remove from local db since server will handle retry
+ self.remove_task_from_local_db(task_id);
+ }
+ }
+
+ if !affected_task_ids.is_empty() {
+ tracing::info!(
+ count = affected_task_ids.len(),
+ "Worktree health check found missing worktrees"
+ );
+ }
+
+ affected_task_ids
+ }
+
/// Check if a task can be spawned given contract-based concurrency limits.
/// Returns the concurrency key to use (contract_id or task_id for standalone).
async fn try_acquire_concurrency_slot(
@@ -1413,6 +1658,7 @@ impl TaskManager {
conversation_history,
patch_data,
patch_base_sha,
+ depends_on,
} => {
tracing::info!(
task_id = %task_id,
@@ -1431,6 +1677,7 @@ impl TaskManager {
continue_from_task_id = ?continue_from_task_id,
copy_files = ?copy_files,
contract_id = ?contract_id,
+ depends_on = ?depends_on,
plan_len = plan.len(),
"Spawning new task"
);
@@ -1439,7 +1686,7 @@ impl TaskManager {
parent_task_id, depth, is_orchestrator, is_supervisor,
target_repo_path, completion_action, continue_from_task_id,
copy_files, contract_id, autonomous_loop, resume_session,
- conversation_history, patch_data, patch_base_sha,
+ conversation_history, patch_data, patch_base_sha, depends_on,
).await?;
}
DaemonCommand::PauseTask { task_id } => {
@@ -1767,6 +2014,7 @@ impl TaskManager {
conversation_history: Option<serde_json::Value>,
patch_data: Option<String>,
patch_base_sha: Option<String>,
+ depends_on: Option<Vec<Uuid>>,
) -> TaskResult<()> {
tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, patch_available = patch_data.is_some(), "=== SPAWN_TASK START ===");
@@ -1795,6 +2043,30 @@ impl TaskManager {
let concurrency_key = self.try_acquire_concurrency_slot(contract_id, task_id).await?;
tracing::info!(task_id = %task_id, concurrency_key = %concurrency_key, "Concurrency slot acquired");
+ // Check dependencies if enabled
+ if self.config.dependency_ordering.enabled {
+ if let Some(ref deps) = depends_on {
+ if !deps.is_empty() {
+ // Check if all dependencies are complete
+ let dep_check_result = self.check_task_dependencies(deps, &plan).await;
+ if !dep_check_result.can_start {
+ tracing::warn!(
+ task_id = %task_id,
+ unmet_deps = ?dep_check_result.unmet_dependencies,
+ "Task has unmet dependencies - server should not have dispatched this task"
+ );
+ // Note: We don't block here because the server is the authority on scheduling.
+ // This is just a sanity check and warning.
+ }
+
+ // Log any warnings from auto-detection
+ for warning in &dep_check_result.warnings {
+ tracing::warn!(task_id = %task_id, warning = %warning, "Dependency warning");
+ }
+ }
+ }
+ }
+
// Create task entry
tracing::info!(task_id = %task_id, "Creating task entry in state: Initializing");
let task = ManagedTask {
@@ -1817,12 +2089,16 @@ impl TaskManager {
contract_id,
concurrency_key,
autonomous_loop,
+ depends_on: depends_on.clone(),
created_at: Instant::now(),
started_at: None,
completed_at: None,
error: None,
};
+ // Persist task to local database for crash recovery
+ self.persist_task_to_local_db(&task);
+
self.tasks.write().await.insert(task_id, task);
tracing::info!(task_id = %task_id, "Task entry created and stored");
@@ -1855,6 +2131,74 @@ impl TaskManager {
Ok(())
}
+ /// Check if a task's dependencies are satisfied.
+ ///
+ /// This method checks against the daemon's local task state. For full accuracy,
+ /// the server should be the source of truth for task status.
+ async fn check_task_dependencies(
+ &self,
+ depends_on: &[Uuid],
+ task_plan: &str,
+ ) -> super::dependency_analysis::DependencyCheckResult {
+ // Convert daemon config to dependency analysis config
+ let config = DependencyOrderingConfig {
+ enabled: self.config.dependency_ordering.enabled,
+ auto_detect: self.config.dependency_ordering.auto_detect,
+ warn_on_violation: self.config.dependency_ordering.warn_on_violation,
+ };
+
+ // Build list of dependency tasks from local state
+ let tasks = self.tasks.read().await;
+ let dependency_tasks: Vec<TaskDependencyInfo> = depends_on
+ .iter()
+ .filter_map(|dep_id| {
+ tasks.get(dep_id).map(|t| TaskDependencyInfo {
+ id: *dep_id,
+ status: match t.state {
+ TaskState::Completed => "done".to_string(),
+ TaskState::Failed => "failed".to_string(),
+ TaskState::Running => "running".to_string(),
+ TaskState::Initializing | TaskState::Starting => "pending".to_string(),
+ _ => "unknown".to_string(),
+ },
+ plan: t.plan.clone(),
+ })
+ })
+ .collect();
+
+ // Build list of sibling tasks for auto-detection
+ let sibling_tasks: Vec<TaskDependencyInfo> = tasks
+ .values()
+ .filter(|t| !depends_on.contains(&t.id))
+ .map(|t| TaskDependencyInfo {
+ id: t.id,
+ status: match t.state {
+ TaskState::Completed => "done".to_string(),
+ TaskState::Failed => "failed".to_string(),
+ TaskState::Running => "running".to_string(),
+ TaskState::Initializing | TaskState::Starting => "pending".to_string(),
+ _ => "unknown".to_string(),
+ },
+ plan: t.plan.clone(),
+ })
+ .collect();
+
+ drop(tasks);
+
+ // Check explicit dependencies
+ let mut result = can_start_task(depends_on, &dependency_tasks, &config);
+
+ // Add auto-detected warnings
+ if config.auto_detect && config.warn_on_violation {
+ let warnings = detect_dependency_violations(task_plan, &sibling_tasks, &config);
+ for warning in warnings {
+ result = result.with_warning(warning);
+ }
+ }
+
+ result
+ }
+
/// Clone inner state for spawned tasks.
fn clone_inner(&self) -> TaskManagerInner {
TaskManagerInner {
@@ -1871,6 +2215,7 @@ impl TaskManager {
heartbeat_commit_interval_secs: self.config.heartbeat_commit_interval_secs,
contract_task_counts: self.contract_task_counts.clone(),
checkpoint_patches: self.config.checkpoint_patches.clone(),
+ local_db: self.local_db.clone(),
}
}
@@ -3190,6 +3535,8 @@ struct TaskManagerInner {
contract_task_counts: Arc<RwLock<HashMap<Uuid, usize>>>,
/// Checkpoint patch storage configuration.
checkpoint_patches: CheckpointPatchConfig,
+ /// Local SQLite database for crash recovery.
+ local_db: Arc<std::sync::Mutex<LocalDb>>,
}
impl TaskManagerInner {
@@ -3210,6 +3557,17 @@ impl TaskManagerInner {
}
}
+ /// Remove completed/failed task from local database.
+ fn remove_task_from_local_db(&self, task_id: Uuid) {
+ if let Ok(db) = self.local_db.lock() {
+ if let Err(e) = db.delete_task(task_id) {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to remove task from local database");
+ } else {
+ tracing::debug!(task_id = %task_id, "Removed task from local database");
+ }
+ }
+ }
+
/// Run a task to completion.
#[allow(clippy::too_many_arguments)]
async fn run_task(
@@ -4375,6 +4733,9 @@ impl TaskManagerInner {
tracing::info!(task_id = %task_id, success = success, "Notifying server of task completion");
let msg = DaemonMessage::task_complete(task_id, success, error);
let _ = self.ws_tx.send(msg).await;
+
+ // Remove completed task from local database (no longer needs crash recovery)
+ self.remove_task_from_local_db(task_id);
}
// Note: Worktrees are kept until explicitly deleted (per user preference)
@@ -4578,6 +4939,9 @@ impl TaskManagerInner {
// Notify server
let msg = DaemonMessage::task_complete(task_id, false, Some(error.to_string()));
let _ = self.ws_tx.send(msg).await;
+
+ // Remove failed task from local database
+ self.remove_task_from_local_db(task_id);
}
/// Apply inherited git config to a worktree directory.
@@ -4837,6 +5201,7 @@ impl Clone for TaskManagerInner {
heartbeat_commit_interval_secs: self.heartbeat_commit_interval_secs,
contract_task_counts: self.contract_task_counts.clone(),
checkpoint_patches: self.checkpoint_patches.clone(),
+ local_db: self.local_db.clone(),
}
}
}
diff --git a/makima/src/daemon/task/mod.rs b/makima/src/daemon/task/mod.rs
index 3830e1d..a33655b 100644
--- a/makima/src/daemon/task/mod.rs
+++ b/makima/src/daemon/task/mod.rs
@@ -1,9 +1,14 @@
//! Task management and execution.
pub mod completion_gate;
+pub mod dependency_analysis;
pub mod manager;
pub mod state;
pub use completion_gate::CompletionGate;
+pub use dependency_analysis::{
+ can_start_task, detect_dependency_violations, suggest_dependencies,
+ DependencyCheckResult, DependencyOrderingConfig, FileCategory, TaskDependencyInfo,
+};
pub use manager::{ManagedTask, TaskConfig, TaskManager};
pub use state::TaskState;
diff --git a/makima/src/daemon/ws/protocol.rs b/makima/src/daemon/ws/protocol.rs
index ec9b09e..7b0a0b4 100644
--- a/makima/src/daemon/ws/protocol.rs
+++ b/makima/src/daemon/ws/protocol.rs
@@ -60,6 +60,25 @@ pub enum DaemonMessage {
error: Option<String>,
},
+ /// Task recovery detected after daemon restart.
+ /// Sent when daemon finds orphaned tasks that can be recovered.
+ TaskRecoveryDetected {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ /// Previous state of the task before daemon restart.
+ #[serde(rename = "previousState")]
+ previous_state: String,
+ /// Whether the worktree is still intact.
+ #[serde(rename = "worktreeIntact")]
+ worktree_intact: bool,
+ /// Path to the worktree if available.
+ #[serde(rename = "worktreePath")]
+ worktree_path: Option<String>,
+ /// Whether the task needs a checkpoint patch for recovery.
+ #[serde(rename = "needsPatch")]
+ needs_patch: bool,
+ },
+
/// Register a tool key for orchestrator API access.
RegisterToolKey {
#[serde(rename = "taskId")]
@@ -403,6 +422,10 @@ pub enum DaemonCommand {
/// Commit SHA to apply the patch on top of.
#[serde(rename = "patchBaseSha", default, skip_serializing_if = "Option::is_none")]
patch_base_sha: Option<String>,
+ /// Task IDs that must complete before this task can start.
+ /// Used for enforcing execution order: schema changes -> backend -> UI.
+ #[serde(rename = "dependsOn", default, skip_serializing_if = "Option::is_none")]
+ depends_on: Option<Vec<Uuid>>,
},
/// Pause a running task.
@@ -698,6 +721,23 @@ impl DaemonMessage {
}
}
+ /// Create a task recovery detected message.
+ pub fn task_recovery_detected(
+ task_id: Uuid,
+ previous_state: &str,
+ worktree_intact: bool,
+ worktree_path: Option<String>,
+ needs_patch: bool,
+ ) -> Self {
+ Self::TaskRecoveryDetected {
+ task_id,
+ previous_state: previous_state.to_string(),
+ worktree_intact,
+ worktree_path,
+ needs_patch,
+ }
+ }
+
/// Create a register tool key message.
pub fn register_tool_key(task_id: Uuid, key: String) -> Self {
Self::RegisterToolKey { task_id, key }
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs
index 58f4da1..f3977e0 100644
--- a/makima/src/db/models.rs
+++ b/makima/src/db/models.rs
@@ -531,6 +531,13 @@ pub struct Task {
/// Standalone completed tasks can be dismissed by the user.
#[serde(default)]
pub hidden: bool,
+
+ // Dependency tracking for dependency-ordered execution
+ /// Task IDs that must complete before this task can start.
+ /// Used for enforcing execution order: schema changes → backend → UI.
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ #[sqlx(json)]
+ pub depends_on: Option<Vec<Uuid>>,
}
impl Task {
@@ -611,8 +618,8 @@ pub struct TaskListResponse {
}
/// Request payload for creating a new task
-#[derive(Debug, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
+#[derive(Debug, Default, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase", default)]
pub struct CreateTaskRequest {
/// Contract this task belongs to (optional for branched/anonymous tasks)
pub contract_id: Option<Uuid>,
@@ -653,6 +660,10 @@ pub struct CreateTaskRequest {
pub branched_from_task_id: Option<Uuid>,
/// Conversation history to initialize the task with (JSON array of messages)
pub conversation_history: Option<serde_json::Value>,
+ /// Task IDs that must complete before this task can start.
+ /// Used for enforcing execution order: schema changes → backend → UI.
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub depends_on: Option<Vec<Uuid>>,
}
/// Request payload for updating a task
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index da44899..d9ba97d 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -684,6 +684,7 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task,
};
let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default());
+ let depends_on_json = req.depends_on.as_ref().map(|d| serde_json::to_value(d).unwrap_or_default());
sqlx::query_as::<_, Task>(
r#"
@@ -691,9 +692,9 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task,
contract_id, parent_task_id, depth, name, description, plan, priority,
is_supervisor, repository_url, base_branch, target_branch, merge_mode,
target_repo_path, completion_action, continue_from_task_id, copy_files,
- branched_from_task_id, conversation_state
+ branched_from_task_id, conversation_state, depends_on
)
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18)
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19)
RETURNING *
"#,
)
@@ -715,6 +716,7 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task,
.bind(&copy_files_json)
.bind(&req.branched_from_task_id)
.bind(&req.conversation_history)
+ .bind(&depends_on_json)
.fetch_one(pool)
.await
}
@@ -823,6 +825,26 @@ pub async fn get_pending_tasks_for_contract(
.await
}
+/// Get all contracts that have pending tasks awaiting retry.
+/// Returns tuples of (contract_id, owner_id) for contracts with retryable tasks.
+pub async fn get_all_pending_task_contracts(
+ pool: &PgPool,
+) -> Result<Vec<(Uuid, Uuid)>, sqlx::Error> {
+ sqlx::query_as::<_, (Uuid, Uuid)>(
+ r#"
+ SELECT DISTINCT contract_id, owner_id
+ FROM tasks
+ WHERE contract_id IS NOT NULL
+ AND status = 'pending'
+ AND is_supervisor = false
+ AND retry_count < max_retries
+ ORDER BY owner_id, contract_id
+ "#,
+ )
+ .fetch_all(pool)
+ .await
+}
+
/// Mark a task as pending for retry after daemon failure.
/// Increments retry count and adds the failed daemon to exclusion list.
pub async fn mark_task_for_retry(
@@ -1075,6 +1097,7 @@ pub async fn create_task_for_owner(
};
let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default());
+ let depends_on_json = req.depends_on.as_ref().map(|d| serde_json::to_value(d).unwrap_or_default());
sqlx::query_as::<_, Task>(
r#"
@@ -1082,9 +1105,9 @@ pub async fn create_task_for_owner(
owner_id, contract_id, parent_task_id, depth, name, description, plan, priority,
is_supervisor, repository_url, base_branch, target_branch, merge_mode,
target_repo_path, completion_action, continue_from_task_id, copy_files,
- branched_from_task_id, conversation_state
+ branched_from_task_id, conversation_state, depends_on
)
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19)
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20)
RETURNING *
"#,
)
@@ -1107,6 +1130,7 @@ pub async fn create_task_for_owner(
.bind(&copy_files_json)
.bind(&req.branched_from_task_id)
.bind(&req.conversation_history)
+ .bind(&depends_on_json)
.fetch_one(pool)
.await
}
diff --git a/makima/src/server/handlers/contract_chat.rs b/makima/src/server/handlers/contract_chat.rs
index e2adb72..86f9500 100644
--- a/makima/src/server/handlers/contract_chat.rs
+++ b/makima/src/server/handlers/contract_chat.rs
@@ -1374,6 +1374,7 @@ async fn handle_contract_request(
checkpoint_sha: None,
branched_from_task_id: None,
conversation_history: None,
+ depends_on: None,
};
match repository::create_task_for_owner(pool, owner_id, create_req).await {
@@ -1470,6 +1471,7 @@ async fn handle_contract_request(
checkpoint_sha: None,
branched_from_task_id: None,
conversation_history: None,
+ depends_on: None,
};
match repository::create_task_for_owner(pool, owner_id, create_req).await {
@@ -2079,6 +2081,7 @@ async fn handle_contract_request(
checkpoint_sha: None,
branched_from_task_id: None,
conversation_history: None,
+ depends_on: None,
};
match repository::create_task_for_owner(pool, owner_id, create_req).await {
@@ -2595,6 +2598,7 @@ async fn handle_contract_request(
checkpoint_sha: None,
branched_from_task_id: None,
conversation_history: None,
+ depends_on: None,
};
if repository::create_task_for_owner(pool, owner_id, task_req).await.is_ok() {
diff --git a/makima/src/server/handlers/contracts.rs b/makima/src/server/handlers/contracts.rs
index 462b385..d9a47a2 100644
--- a/makima/src/server/handlers/contracts.rs
+++ b/makima/src/server/handlers/contracts.rs
@@ -298,6 +298,7 @@ pub async fn create_contract(
merge_mode: None,
branched_from_task_id: None,
conversation_history: None,
+ depends_on: None,
};
match repository::create_task_for_owner(pool, auth.owner_id, supervisor_req).await {
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs
index 240e1f7..f89f067 100644
--- a/makima/src/server/handlers/mesh.rs
+++ b/makima/src/server/handlers/mesh.rs
@@ -6,6 +6,7 @@ use axum::{
response::IntoResponse,
Json,
};
+use base64::Engine;
use uuid::Uuid;
use crate::db::models::{
@@ -2224,6 +2225,7 @@ pub async fn reassign_task(
checkpoint_sha: task.last_checkpoint_sha.clone(),
branched_from_task_id: None,
conversation_history: None,
+ depends_on: None,
};
let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
@@ -2265,6 +2267,30 @@ pub async fn reassign_task(
}
};
+ // Fetch latest checkpoint patch for worktree recovery during reassignment
+ let (patch_data, patch_base_sha) = match repository::get_latest_checkpoint_patch(pool, id).await {
+ Ok(Some(patch)) => {
+ tracing::info!(
+ old_task_id = %id,
+ new_task_id = %new_task.id,
+ patch_size = patch.patch_size_bytes,
+ base_sha = %patch.base_commit_sha,
+ files_count = patch.files_count,
+ "Including checkpoint patch for task reassignment recovery"
+ );
+ let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data);
+ (Some(encoded), Some(patch.base_commit_sha))
+ }
+ Ok(None) => {
+ tracing::debug!(old_task_id = %id, "No checkpoint patch found for reassignment");
+ (None, None)
+ }
+ Err(e) => {
+ tracing::warn!(old_task_id = %id, error = %e, "Failed to fetch checkpoint patch for reassignment");
+ (None, None)
+ }
+ };
+
// Send SpawnTask command to daemon for the new task
let command = DaemonCommand::SpawnTask {
task_id: new_task.id,
@@ -2285,8 +2311,8 @@ pub async fn reassign_task(
autonomous_loop: false,
resume_session: false,
conversation_history: None,
- patch_data: None,
- patch_base_sha: None,
+ patch_data,
+ patch_base_sha,
};
tracing::info!(
@@ -2949,6 +2975,7 @@ pub async fn fork_task(
checkpoint_sha: Some(checkpoint.commit_sha.clone()),
branched_from_task_id: None,
conversation_history: None,
+ depends_on: None,
};
let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
@@ -3106,6 +3133,7 @@ pub async fn resume_from_checkpoint(
checkpoint_sha: Some(checkpoint.commit_sha.clone()),
branched_from_task_id: None,
conversation_history: None,
+ depends_on: None,
};
let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
@@ -3441,6 +3469,7 @@ pub async fn branch_task(
checkpoint_sha: None,
branched_from_task_id: Some(source_task_id),
conversation_history,
+ depends_on: None,
};
let task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
diff --git a/makima/src/server/handlers/mesh_chat.rs b/makima/src/server/handlers/mesh_chat.rs
index 1ff0724..157bad0 100644
--- a/makima/src/server/handlers/mesh_chat.rs
+++ b/makima/src/server/handlers/mesh_chat.rs
@@ -1020,6 +1020,7 @@ async fn handle_mesh_request(
checkpoint_sha: None,
branched_from_task_id: None,
conversation_history: None,
+ depends_on: None,
};
match repository::create_task_for_owner(pool, owner_id, create_req).await {
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index 65db373..53ee806 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -291,6 +291,19 @@ pub enum DaemonMessage {
success: bool,
error: Option<String>,
},
+ /// Task recovery detected after daemon restart
+ TaskRecoveryDetected {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ #[serde(rename = "previousState")]
+ previous_state: String,
+ #[serde(rename = "worktreeIntact")]
+ worktree_intact: bool,
+ #[serde(rename = "worktreePath")]
+ worktree_path: Option<String>,
+ #[serde(rename = "needsPatch")]
+ needs_patch: bool,
+ },
/// Register a tool key for orchestrator API access
RegisterToolKey {
#[serde(rename = "taskId")]
@@ -990,6 +1003,110 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
});
}
}
+ Ok(DaemonMessage::TaskRecoveryDetected {
+ task_id,
+ previous_state,
+ worktree_intact,
+ worktree_path,
+ needs_patch,
+ }) => {
+ tracing::info!(
+ task_id = %task_id,
+ previous_state = %previous_state,
+ worktree_intact = worktree_intact,
+ worktree_path = ?worktree_path,
+ needs_patch = needs_patch,
+ "Task recovery detected after daemon restart"
+ );
+
+ // Update task in database based on recovery state
+ if let Some(ref pool) = state.db_pool {
+ let pool = pool.clone();
+ let state = state.clone();
+ tokio::spawn(async move {
+ if worktree_intact {
+ // Worktree exists - task can be resumed on this daemon
+ // Update task status to 'pending' so it can be picked up
+ match sqlx::query(
+ r#"
+ UPDATE tasks
+ SET status = 'pending',
+ daemon_id = NULL,
+ error_message = 'Daemon restarted - task ready for resumption',
+ interrupted_at = NOW(),
+ updated_at = NOW()
+ WHERE id = $1 AND owner_id = $2
+ RETURNING id
+ "#,
+ )
+ .bind(task_id)
+ .bind(owner_id)
+ .fetch_optional(&pool)
+ .await
+ {
+ Ok(Some(_)) => {
+ tracing::info!(
+ task_id = %task_id,
+ "Task marked as pending for resumption"
+ );
+ state.broadcast_task_update(TaskUpdateNotification {
+ task_id,
+ owner_id: Some(owner_id),
+ version: 0,
+ status: "pending".into(),
+ updated_fields: vec![
+ "status".into(),
+ "daemon_id".into(),
+ "interrupted_at".into(),
+ ],
+ updated_by: "daemon_recovery".into(),
+ });
+ }
+ Ok(None) => {
+ tracing::warn!(
+ task_id = %task_id,
+ "Task not found during recovery update"
+ );
+ }
+ Err(e) => {
+ tracing::error!(
+ task_id = %task_id,
+ error = %e,
+ "Failed to update task during recovery"
+ );
+ }
+ }
+ } else {
+ // Worktree missing - mark for retry with patch restoration
+ match repository::mark_task_for_retry(
+ &pool,
+ task_id,
+ daemon_uuid, // Mark this daemon as failed
+ ).await {
+ Ok(Some(_)) => {
+ tracing::info!(
+ task_id = %task_id,
+ "Task marked for retry (worktree missing)"
+ );
+ }
+ Ok(None) => {
+ tracing::warn!(
+ task_id = %task_id,
+ "Task not found or exceeded retries"
+ );
+ }
+ Err(e) => {
+ tracing::error!(
+ task_id = %task_id,
+ error = %e,
+ "Failed to mark task for retry"
+ );
+ }
+ }
+ }
+ });
+ }
+ }
Ok(DaemonMessage::Authenticate { .. }) => {
// Already authenticated, ignore
}
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index 21c9515..3d85c45 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -279,8 +279,9 @@ async fn verify_supervisor_auth(
/// Try to start a pending task on an available daemon.
/// Returns Ok(Some(task)) if a task was started, Ok(None) if no tasks could be started.
-/// For retried tasks, excludes daemons that previously failed the task.
-async fn try_start_pending_task(
+/// For retried tasks, excludes daemons that previously failed the task and includes
+/// checkpoint patch data for worktree recovery.
+pub async fn try_start_pending_task(
state: &SharedState,
contract_id: Uuid,
owner_id: Uuid,
@@ -348,6 +349,34 @@ async fn try_start_pending_task(
}
};
+ // For retried tasks, fetch checkpoint patch for worktree recovery
+ let (patch_data, patch_base_sha) = if task.retry_count > 0 {
+ // This is a retry - try to restore from checkpoint
+ match repository::get_latest_checkpoint_patch(pool, task.id).await {
+ Ok(Some(patch)) => {
+ tracing::info!(
+ task_id = %task.id,
+ retry_count = task.retry_count,
+ patch_size = patch.patch_size_bytes,
+ base_sha = %patch.base_commit_sha,
+ "Including checkpoint patch for task retry recovery"
+ );
+ let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data);
+ (Some(encoded), Some(patch.base_commit_sha))
+ }
+ Ok(None) => {
+ tracing::debug!(task_id = %task.id, "No checkpoint patch found for retry");
+ (None, None)
+ }
+ Err(e) => {
+ tracing::warn!(task_id = %task.id, error = %e, "Failed to fetch checkpoint patch for retry");
+ (None, None)
+ }
+ }
+ } else {
+ (None, None)
+ };
+
// Send spawn command
let cmd = DaemonCommand::SpawnTask {
task_id: updated_task.id,
@@ -366,10 +395,10 @@ async fn try_start_pending_task(
contract_id: updated_task.contract_id,
is_supervisor: false,
autonomous_loop: false,
- resume_session: false,
+ resume_session: task.retry_count > 0, // Use --continue for retried tasks
conversation_history: None,
- patch_data: None,
- patch_base_sha: None,
+ patch_data,
+ patch_base_sha,
};
if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
@@ -585,6 +614,7 @@ pub async fn spawn_task(
copy_files: None,
branched_from_task_id: None,
conversation_history: None,
+ depends_on: None,
};
// Create task in DB
diff --git a/makima/src/server/handlers/transcript_analysis.rs b/makima/src/server/handlers/transcript_analysis.rs
index 3b71eca..cc62eb4 100644
--- a/makima/src/server/handlers/transcript_analysis.rs
+++ b/makima/src/server/handlers/transcript_analysis.rs
@@ -366,6 +366,7 @@ pub async fn create_contract_from_analysis(
merge_mode: None,
branched_from_task_id: None,
conversation_history: None,
+ depends_on: None,
};
if let Ok(t) = repository::create_task_for_owner(pool, auth.owner_id, task_req).await {
@@ -535,6 +536,7 @@ pub async fn update_contract_from_analysis(
merge_mode: None,
branched_from_task_id: None,
conversation_history: None,
+ depends_on: None,
};
if let Ok(t) = repository::create_task_for_owner(pool, auth.owner_id, task_req).await {
diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs
index 3a27513..de20569 100644
--- a/makima/src/server/mod.rs
+++ b/makima/src/server/mod.rs
@@ -251,6 +251,9 @@ const ANONYMOUS_TASK_MAX_AGE_DAYS: i32 = 7;
/// Interval for checkpoint patch cleanup (hourly)
const CHECKPOINT_PATCH_CLEANUP_INTERVAL_SECS: u64 = 3600;
+// Retry orchestrator checks for pending tasks every 30 seconds
+const RETRY_ORCHESTRATOR_INTERVAL_SECS: u64 = 30;
+
/// Run the HTTP server with graceful shutdown support.
///
/// # Arguments
@@ -387,6 +390,64 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> {
}
}
});
+
+ // Clone state and pool for retry orchestrator
+ let retry_pool = pool.clone();
+ let retry_state = state.clone();
+
+ // Spawn retry orchestrator - periodically retries pending tasks on available daemons
+ tokio::spawn(async move {
+ let mut interval = tokio::time::interval(
+ std::time::Duration::from_secs(RETRY_ORCHESTRATOR_INTERVAL_SECS)
+ );
+ loop {
+ interval.tick().await;
+
+ // Get all contracts with pending tasks awaiting retry
+ match crate::db::repository::get_all_pending_task_contracts(&retry_pool).await {
+ Ok(contract_owners) => {
+ for (contract_id, owner_id) in contract_owners {
+ // Try to start a pending task for this contract
+ match handlers::mesh_supervisor::try_start_pending_task(
+ &retry_state,
+ contract_id,
+ owner_id,
+ ).await {
+ Ok(Some(task)) => {
+ tracing::info!(
+ task_id = %task.id,
+ contract_id = %contract_id,
+ retry_count = task.retry_count,
+ "Retry orchestrator started pending task"
+ );
+ }
+ Ok(None) => {
+ // No tasks could be started (no available daemons, etc.)
+ }
+ Err(e) => {
+ tracing::warn!(
+ contract_id = %contract_id,
+ error = %e,
+ "Retry orchestrator failed to start pending task"
+ );
+ }
+ }
+ }
+ }
+ Err(e) => {
+ tracing::warn!(
+ error = %e,
+ "Retry orchestrator failed to query pending task contracts"
+ );
+ }
+ }
+ }
+ });
+
+ tracing::info!(
+ "Retry orchestrator started (interval: {}s)",
+ RETRY_ORCHESTRATOR_INTERVAL_SECS
+ );
}
let app = make_router(state);