//! Directive orchestrator — automates the full directive lifecycle. //! //! Runs as a background loop, polling every 15s to: //! 1. Plan: active directives with no steps → spawn planning task //! 2. Execute: ready steps → spawn execution tasks //! 3. Monitor: detect task completion → update step status, advance DAG //! 4. Re-plan: goal updated → spawn new planning task use sqlx::PgPool; use uuid::Uuid; use crate::db::models::{CreateTaskRequest, UpdateTaskRequest}; use crate::db::repository; use crate::server::state::{DaemonCommand, SharedState}; pub struct DirectiveOrchestrator { pool: PgPool, state: SharedState, } impl DirectiveOrchestrator { pub fn new(pool: PgPool, state: SharedState) -> Self { Self { pool, state } } /// Run one orchestration tick — called every 15s. pub async fn tick(&mut self) -> Result<(), anyhow::Error> { self.phase_planning().await?; self.phase_execution().await?; self.phase_monitoring().await?; self.phase_replanning().await?; self.phase_completion().await?; Ok(()) } /// Phase 1: Active directives with no steps and no orchestrator task → spawn planning task. async fn phase_planning(&self) -> Result<(), anyhow::Error> { let directives = repository::get_directives_needing_planning(&self.pool).await?; for directive in directives { tracing::info!( directive_id = %directive.id, title = %directive.title, "Directive needs planning — spawning planning task" ); let plan = build_planning_prompt(&directive, &[], 1); if let Err(e) = self .spawn_orchestrator_task( directive.id, directive.owner_id, format!("Plan: {}", directive.title), plan, directive.repository_url.as_deref(), directive.base_branch.as_deref(), ) .await { tracing::warn!( directive_id = %directive.id, error = %e, "Failed to spawn planning task" ); } } Ok(()) } /// Phase 2: Ready steps with no task → create execution task and dispatch. /// Also retries pending directive tasks that weren't dispatched previously. async fn phase_execution(&self) -> Result<(), anyhow::Error> { // Create tasks for ready steps let steps = repository::get_ready_steps_for_dispatch(&self.pool).await?; for step in steps { tracing::info!( step_id = %step.step_id, directive_id = %step.directive_id, step_name = %step.step_name, "Dispatching execution task for ready step" ); // Resolve dependency steps to their task IDs for worktree continuation let dep_tasks = repository::get_step_dependency_tasks(&self.pool, &step.depends_on).await?; let mut continue_from_task_id = dep_tasks.first().map(|d| d.task_id); // If no dependency tasks resolved, try to continue from previous work: // 1) Use the directive's PR branch as base (contains all previous merged work) // 2) Fall back to the last completed step's task for worktree continuation let effective_base_branch = if continue_from_task_id.is_none() { if step.pr_branch.is_some() { tracing::info!( step_id = %step.step_id, pr_branch = ?step.pr_branch, "Step has no deps — using directive PR branch as base" ); step.pr_branch.as_deref() } else { // No PR branch yet — try to continue from the last completed step's worktree match repository::get_last_completed_step_task_id( &self.pool, step.directive_id, ) .await { Ok(Some(task_id)) => { tracing::info!( step_id = %step.step_id, continue_from = %task_id, "Step has no deps, no PR branch — continuing from last completed task" ); continue_from_task_id = Some(task_id); step.base_branch.as_deref() } _ => step.base_branch.as_deref(), } } } else { step.base_branch.as_deref() }; let task_plan = step .task_plan .as_deref() .unwrap_or("Execute the step described below."); // Build merge instructions for additional dependencies (beyond the first) let merge_preamble = if dep_tasks.len() > 1 { use crate::daemon::worktree::{sanitize_name, short_uuid}; let merge_lines: Vec = dep_tasks[1..] .iter() .map(|d| { let branch = format!( "makima/task-{}-{}", sanitize_name(&d.task_name), short_uuid(d.task_id) ); format!("git merge origin/{} --no-edit", branch) }) .collect(); format!( "IMPORTANT — MERGE DEPENDENCY BRANCHES FIRST:\n\ This step continues from one dependency's worktree, but also depends on \ additional branches. Before starting work, run:\n\ ```\ngit fetch origin\n{}\n```\n\ Resolve any merge conflicts sensibly, then proceed.\n\n", merge_lines.join("\n"), ) } else { String::new() }; let plan = format!( "You are executing a step in directive \"{directive_title}\".\n\n\ STEP: {step_name}\n\ DESCRIPTION: {description}\n\n\ {merge_preamble}\ INSTRUCTIONS:\n{task_plan}\n\ When done, the system will automatically mark this step as completed.\n\ If you cannot complete the task, report the failure clearly.", directive_title = step.directive_title, step_name = step.step_name, description = step.step_description.as_deref().unwrap_or("(none)"), merge_preamble = merge_preamble, task_plan = task_plan, ); match self .spawn_step_task( step.step_id, step.directive_id, step.owner_id, format!("{}: {}", step.directive_title, step.step_name), plan, step.repository_url.as_deref(), effective_base_branch, continue_from_task_id, ) .await { Ok(()) => {} Err(e) => { tracing::warn!( step_id = %step.step_id, error = %e, "Failed to spawn execution task for step" ); } } } // Retry pending directive tasks that weren't dispatched let pending = repository::get_pending_directive_tasks(&self.pool).await?; for task in pending { if self .try_dispatch_task(task.id, task.owner_id, &task.name, &task.plan, task.version) .await { // Task dispatched — mark its step as running if it has one if let Some(step_id) = task.directive_step_id { let _ = repository::set_step_running(&self.pool, step_id).await; } } } Ok(()) } /// Phase 3: Monitor running steps and orchestrator tasks. async fn phase_monitoring(&self) -> Result<(), anyhow::Error> { // Check running steps let running = repository::get_running_steps_with_tasks(&self.pool).await?; for step in running { match step.task_status.as_str() { "completed" | "merged" | "done" => { tracing::info!( step_id = %step.step_id, directive_id = %step.directive_id, task_id = %step.task_id, "Step task completed — updating step to completed" ); let update = crate::db::models::UpdateDirectiveStepRequest { status: Some("completed".to_string()), ..Default::default() }; repository::update_directive_step(&self.pool, step.step_id, update).await?; repository::advance_directive_ready_steps(&self.pool, step.directive_id) .await?; repository::check_directive_idle(&self.pool, step.directive_id).await?; } "failed" | "interrupted" => { tracing::warn!( step_id = %step.step_id, directive_id = %step.directive_id, task_id = %step.task_id, task_status = %step.task_status, "Step task failed — updating step to failed" ); let update = crate::db::models::UpdateDirectiveStepRequest { status: Some("failed".to_string()), ..Default::default() }; repository::update_directive_step(&self.pool, step.step_id, update).await?; repository::advance_directive_ready_steps(&self.pool, step.directive_id) .await?; repository::check_directive_idle(&self.pool, step.directive_id).await?; } _ => { // Still running — do nothing } } } // Check orchestrator (planning) tasks let orch_tasks = repository::get_orchestrator_tasks_to_check(&self.pool).await?; for orch in orch_tasks { match orch.task_status.as_str() { "completed" | "merged" | "done" => { tracing::info!( directive_id = %orch.directive_id, task_id = %orch.orchestrator_task_id, "Planning task completed — clearing orchestrator task" ); repository::clear_orchestrator_task(&self.pool, orch.directive_id).await?; // Advance DAG — planning task should have created steps repository::advance_directive_ready_steps(&self.pool, orch.directive_id) .await?; } "failed" | "interrupted" => { tracing::warn!( directive_id = %orch.directive_id, task_id = %orch.orchestrator_task_id, "Planning task failed — pausing directive" ); repository::clear_orchestrator_task(&self.pool, orch.directive_id).await?; repository::set_directive_status( &self.pool, orch.owner_id, orch.directive_id, "paused", ) .await?; } _ => {} } } Ok(()) } /// Phase 4: Re-planning — goal updated after latest step creation. async fn phase_replanning(&self) -> Result<(), anyhow::Error> { let directives = repository::get_directives_needing_replanning(&self.pool).await?; for directive in directives { tracing::info!( directive_id = %directive.id, title = %directive.title, "Directive goal updated — spawning re-planning task" ); let existing_steps = repository::list_directive_steps(&self.pool, directive.id).await?; let generation = repository::get_directive_max_generation(&self.pool, directive.id).await? + 1; let plan = build_planning_prompt(&directive, &existing_steps, generation); if let Err(e) = self .spawn_orchestrator_task( directive.id, directive.owner_id, format!("Re-plan: {}", directive.title), plan, directive.repository_url.as_deref(), directive.base_branch.as_deref(), ) .await { tracing::warn!( directive_id = %directive.id, error = %e, "Failed to spawn re-planning task" ); } } Ok(()) } /// Spawn a planning/re-planning task and assign it as the directive's orchestrator task. async fn spawn_orchestrator_task( &self, directive_id: Uuid, owner_id: Uuid, name: String, plan: String, repo_url: Option<&str>, base_branch: Option<&str>, ) -> Result<(), anyhow::Error> { let req = CreateTaskRequest { contract_id: None, name, description: Some("Directive planning task".to_string()), plan, parent_task_id: None, is_supervisor: false, priority: 0, repository_url: repo_url.map(|s| s.to_string()), base_branch: base_branch.map(|s| s.to_string()), target_branch: None, merge_mode: None, target_repo_path: None, completion_action: None, continue_from_task_id: None, copy_files: None, checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, supervisor_worktree_task_id: None, directive_id: Some(directive_id), directive_step_id: None, }; let task = repository::create_task_for_owner(&self.pool, owner_id, req).await?; repository::assign_orchestrator_task(&self.pool, directive_id, task.id).await?; // Try to dispatch to a daemon self.try_dispatch_task(task.id, owner_id, &task.name, &task.plan, task.version).await; Ok(()) } /// Spawn an execution task for a step. /// Links the task to the step but only marks the step as 'running' once dispatched. async fn spawn_step_task( &self, step_id: Uuid, directive_id: Uuid, owner_id: Uuid, name: String, plan: String, repo_url: Option<&str>, base_branch: Option<&str>, continue_from_task_id: Option, ) -> Result<(), anyhow::Error> { let req = CreateTaskRequest { contract_id: None, name, description: Some("Directive step execution task".to_string()), plan, parent_task_id: None, is_supervisor: false, priority: 0, repository_url: repo_url.map(|s| s.to_string()), base_branch: base_branch.map(|s| s.to_string()), target_branch: None, merge_mode: None, target_repo_path: None, completion_action: Some("branch".to_string()), continue_from_task_id, copy_files: None, checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, supervisor_worktree_task_id: None, directive_id: Some(directive_id), directive_step_id: Some(step_id), }; let task = repository::create_task_for_owner(&self.pool, owner_id, req).await?; // Link the task to the step (sets task_id) but keep step as 'ready' for now repository::link_task_to_step(&self.pool, step_id, task.id).await?; // Only mark step as 'running' if we can actually dispatch the task if self .try_dispatch_task(task.id, owner_id, &task.name, &task.plan, task.version) .await { repository::set_step_running(&self.pool, step_id).await?; } Ok(()) } /// Try to dispatch a task to an available daemon. Returns true if dispatched. async fn try_dispatch_task( &self, task_id: Uuid, owner_id: Uuid, task_name: &str, plan: &str, version: i32, ) -> bool { let Some(daemon_id) = self.state.find_alternative_daemon(owner_id, &[]) else { tracing::info!( task_id = %task_id, "No daemon available for directive task — leaving pending for retry" ); return false; }; // Update task status to starting and assign daemon let update_req = UpdateTaskRequest { status: Some("starting".to_string()), daemon_id: Some(daemon_id), version: Some(version), ..Default::default() }; match repository::update_task_for_owner(&self.pool, task_id, owner_id, update_req).await { Ok(Some(updated_task)) => { let command = DaemonCommand::SpawnTask { task_id, task_name: task_name.to_string(), plan: plan.to_string(), repo_url: updated_task.repository_url.clone(), base_branch: updated_task.base_branch.clone(), target_branch: updated_task.target_branch.clone(), parent_task_id: None, depth: 0, is_orchestrator: false, target_repo_path: None, completion_action: updated_task.completion_action.clone(), continue_from_task_id: updated_task.continue_from_task_id, copy_files: None, contract_id: None, is_supervisor: false, autonomous_loop: false, resume_session: false, conversation_history: None, patch_data: None, patch_base_sha: None, local_only: false, auto_merge_local: false, supervisor_worktree_task_id: None, directive_id: updated_task.directive_id, }; if let Err(e) = self.state.send_daemon_command(daemon_id, command).await { tracing::warn!( task_id = %task_id, daemon_id = %daemon_id, error = %e, "Failed to send SpawnTask to daemon for directive task" ); return false; } else { tracing::info!( task_id = %task_id, daemon_id = %daemon_id, "Dispatched directive task to daemon" ); return true; } } Ok(None) => { tracing::warn!(task_id = %task_id, "Task not found when trying to dispatch"); } Err(e) => { tracing::warn!(task_id = %task_id, error = %e, "Failed to update task for dispatch"); } } false } /// Phase 5: Completion — spawn PR-creation tasks for idle directives. async fn phase_completion(&self) -> Result<(), anyhow::Error> { // Part 1: Spawn completion tasks for idle directives let directives = repository::get_idle_directives_needing_completion(&self.pool).await?; for directive in directives { // Atomically claim this directive for completion using a placeholder. // This prevents a concurrent tick from also spawning a completion task. let placeholder_id = Uuid::new_v4(); let claimed = repository::claim_directive_for_completion( &self.pool, directive.id, placeholder_id, ) .await?; if !claimed { tracing::debug!( directive_id = %directive.id, "Directive already claimed for completion — skipping" ); continue; } tracing::info!( directive_id = %directive.id, title = %directive.title, "Directive idle — spawning completion task for PR" ); let step_tasks = repository::get_completed_step_tasks(&self.pool, directive.id).await?; if step_tasks.is_empty() { // Release the claim since there's nothing to complete let _ = repository::clear_completion_task(&self.pool, directive.id).await; continue; } let base_branch = directive .base_branch .as_deref() .unwrap_or("main"); let directive_branch = format!( "makima/directive-{}-{}", crate::daemon::worktree::sanitize_name(&directive.title), crate::daemon::worktree::short_uuid(directive.id), ); // Compute step branch names using the same formula as execute_completion_action let step_branches: Vec = step_tasks .iter() .map(|st| { format!( "makima/{}-{}", crate::daemon::worktree::sanitize_name(&st.task_name), crate::daemon::worktree::short_uuid(st.task_id), ) }) .collect(); let prompt = build_completion_prompt( &directive, &step_tasks, &step_branches, &directive_branch, base_branch, ); match self .spawn_completion_task( directive.id, directive.owner_id, format!("PR: {}", directive.title), prompt, directive.repository_url.as_deref(), directive.base_branch.as_deref(), ) .await { Ok(task_id) => { // Store pr_branch on directive immediately let update = crate::db::models::UpdateDirectiveRequest { pr_branch: Some(directive_branch.clone()), ..Default::default() }; let _ = repository::update_directive_for_owner( &self.pool, directive.owner_id, directive.id, update, ) .await; // Replace placeholder with the real task ID repository::assign_completion_task(&self.pool, directive.id, task_id).await?; } Err(e) => { tracing::warn!( directive_id = %directive.id, error = %e, "Failed to spawn completion task — releasing claim" ); // Release the claim so it can be retried on the next tick let _ = repository::clear_completion_task(&self.pool, directive.id).await; } } } // Part 2: Monitor completion tasks let checks = repository::get_completion_tasks_to_check(&self.pool).await?; for check in checks { match check.task_status.as_str() { "completed" | "merged" | "done" => { tracing::info!( directive_id = %check.directive_id, task_id = %check.completion_task_id, "Completion task finished" ); repository::clear_completion_task(&self.pool, check.directive_id).await?; } "failed" | "interrupted" => { tracing::warn!( directive_id = %check.directive_id, task_id = %check.completion_task_id, "Completion task failed" ); repository::clear_completion_task(&self.pool, check.directive_id).await?; } _ => { // Still running } } } Ok(()) } /// Spawn a completion task that creates/updates a PR from step branches. async fn spawn_completion_task( &self, directive_id: Uuid, owner_id: Uuid, name: String, plan: String, repo_url: Option<&str>, base_branch: Option<&str>, ) -> Result { let req = CreateTaskRequest { contract_id: None, name, description: Some("Directive PR completion task".to_string()), plan, parent_task_id: None, is_supervisor: false, priority: 0, repository_url: repo_url.map(|s| s.to_string()), base_branch: base_branch.map(|s| s.to_string()), target_branch: None, merge_mode: None, target_repo_path: None, completion_action: None, continue_from_task_id: None, copy_files: None, checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, supervisor_worktree_task_id: None, directive_id: Some(directive_id), directive_step_id: None, }; let task = repository::create_task_for_owner(&self.pool, owner_id, req).await?; // Try to dispatch to a daemon self.try_dispatch_task(task.id, owner_id, &task.name, &task.plan, task.version) .await; Ok(task.id) } } /// Build the planning prompt for a directive. fn build_planning_prompt( directive: &crate::db::models::Directive, existing_steps: &[crate::db::models::DirectiveStep], generation: i32, ) -> String { let mut prompt = String::new(); if !existing_steps.is_empty() { prompt.push_str(&format!( "EXISTING STEPS (generation {}):\n", generation - 1 )); let mut last_completed_id: Option = None; for step in existing_steps { prompt.push_str(&format!( "- [{}] {} (id: {}): {}\n", step.status, step.name, step.id, step.description.as_deref().unwrap_or("(no description)") )); if step.status == "completed" { last_completed_id = Some(step.id); } } if let Some(last_id) = last_completed_id { prompt.push_str(&format!( "\nNew steps that build on previous work SHOULD use --depends-on \"{}\" (the last completed step) \ so their worktree inherits all prior changes. Without this dependency, new steps start from a \ fresh checkout and won't see any of the work done by previous steps.\n", last_id )); } prompt.push_str(&format!( "\nAdd new steps that build on or complement existing work. Use generation {}.\n\n", generation )); } prompt.push_str(&format!( r#"You are planning the implementation of a directive. DIRECTIVE: "{title}" GOAL: {goal} {repo_section} Your job: 1. Explore the repository to understand the codebase 2. Decompose the goal into concrete, ordered steps 3. Each step = one task for a Claude Code instance to execute 4. Submit ALL steps using the batch command or individual add-step commands For each step, define: - name: Short imperative title (e.g., "Add user authentication middleware") - description: What to do and acceptance criteria - taskPlan: Full instructions for the Claude instance (include file paths, patterns to follow) - dependsOn: UUIDs of steps this depends on (use IDs from previous add-step responses) - orderIndex: Execution phase number. Steps only start after ALL steps with a lower orderIndex complete. Steps with the same orderIndex run in parallel. Use ascending values (0, 1, 2, ...) to create sequential phases. Use dependsOn for fine-grained control within the same phase. Submit steps: makima directive add-step "Step Name" --description "..." --task-plan "..." (Use --depends-on "uuid1,uuid2" for dependencies, referencing IDs from earlier add-step outputs) Or batch: makima directive batch-add-steps --json '[{{"name":"...","description":"...","taskPlan":"...","dependsOn":[],"orderIndex":0}}]' DEPENDENCY WORKTREE CONTINUATION: Each step runs in its own git worktree. How that worktree is initialised depends on dependsOn: - With dependsOn: the step continues from the first dependency's worktree (inheriting all committed and uncommitted changes). Additional dependencies are merged in as branches before work starts. - Without dependsOn: the step starts from a FRESH worktree based on the base branch (or the PR branch if a PR already exists from previous completions). Because of this, you MUST chain steps using dependsOn whenever one step's work builds on another's. If step B modifies files created/changed by step A, step B MUST list step A in its dependsOn — otherwise step B will start from a blank worktree and won't see step A's changes at all. Guidelines: - For sequential work, create a linear chain: step1 → step2 → step3 (each depends on the previous). - Only omit dependsOn for truly independent steps that can start from a fresh checkout. - Parallel steps that share no files can omit mutual dependencies, but if they both build on a prior step they should BOTH list that prior step in dependsOn. IMPORTANT: Each step's taskPlan must be self-contained. The executing instance won't have your planning context. "#, title = directive.title, goal = directive.goal, repo_section = match &directive.repository_url { Some(url) => format!("REPOSITORY: {}\n", url), None => String::new(), }, )); prompt } /// Build the prompt for a completion task that creates or updates a PR. fn build_completion_prompt( directive: &crate::db::models::Directive, step_tasks: &[crate::db::repository::CompletedStepTask], step_branches: &[String], directive_branch: &str, base_branch: &str, ) -> String { let merge_commands: String = step_branches .iter() .map(|b| format!("git merge origin/{} --no-edit", b)) .collect::>() .join("\n"); let step_summary: String = step_tasks .iter() .zip(step_branches.iter()) .map(|(st, branch)| format!("- {} (branch: {})", st.step_name, branch)) .collect::>() .join("\n"); if directive.pr_url.is_some() { // Re-completion: PR already exists, merge new branches into existing PR branch format!( r#"You are updating an existing PR for directive "{title}". The PR branch `{directive_branch}` already exists. Merge any new step branches into it. Steps completed: {step_summary} Run these commands: ``` git fetch origin git checkout {directive_branch} git pull origin {directive_branch} {merge_commands} git push origin {directive_branch} ``` Already-merged branches will be a no-op. If there are merge conflicts, resolve them sensibly. "#, title = directive.title, directive_branch = directive_branch, step_summary = step_summary, merge_commands = merge_commands, ) } else { // First completion: create new PR format!( r#"You are creating a PR for directive "{title}". Goal: {goal} Steps completed: {step_summary} Run these commands to create a combined branch and PR: ``` git fetch origin git checkout -b {directive_branch} origin/{base_branch} {merge_commands} git push -u origin {directive_branch} ``` Then create the PR: ``` gh pr create --title "{title}" --body "{pr_body}" --head {directive_branch} --base {base_branch} ``` After creating the PR, store the URL: ``` makima directive update --pr-url "" ``` If there are merge conflicts, resolve them sensibly before pushing. "#, title = directive.title, goal = directive.goal, directive_branch = directive_branch, base_branch = base_branch, step_summary = step_summary, merge_commands = merge_commands, pr_body = format!( "## Directive\\n\\n{}\\n\\n## Steps\\n\\n{}", directive.goal.replace('\n', "\\n").replace('"', "\\\""), step_summary.replace('\n', "\\n").replace('"', "\\\""), ), ) } }