//! Directive orchestrator — automates the full directive lifecycle.
//!
//! Runs as a background loop, polling every 15s to:
//! 1. Plan: active directives with no steps → spawn planning task
//! 2. Execute: ready steps → spawn execution tasks
//! 3. Monitor: detect task completion → update step status, advance DAG
//! 4. Re-plan: goal updated → spawn new planning task
use sqlx::PgPool;
use uuid::Uuid;
use crate::db::models::{CreateTaskRequest, DirectiveMemory, UpdateTaskRequest};
use crate::db::repository;
use crate::server::state::{DaemonCommand, SharedState};
pub struct DirectiveOrchestrator {
pool: PgPool,
state: SharedState,
}
impl DirectiveOrchestrator {
pub fn new(pool: PgPool, state: SharedState) -> Self {
Self { pool, state }
}
/// Run one orchestration tick — called every 15s.
pub async fn tick(&mut self) -> Result<(), anyhow::Error> {
self.phase_planning().await?;
self.phase_execution().await?;
self.phase_monitoring().await?;
self.phase_replanning().await?;
self.phase_completion().await?;
Ok(())
}
/// Phase 1: Active directives with no steps and no orchestrator task → spawn planning task.
async fn phase_planning(&self) -> Result<(), anyhow::Error> {
let directives = repository::get_directives_needing_planning(&self.pool).await?;
for directive in directives {
tracing::info!(
directive_id = %directive.id,
title = %directive.title,
"Directive needs planning — spawning planning task"
);
// Load memories if memory is enabled for this directive
let memories = if directive.memory_enabled {
match repository::list_directive_memories(&self.pool, directive.id, None).await {
Ok(m) => m,
Err(e) => {
tracing::warn!(
directive_id = %directive.id,
error = %e,
"Failed to load directive memories for planning — continuing without"
);
vec![]
}
}
} else {
vec![]
};
let plan = build_planning_prompt(&directive, &[], 1, &memories);
if let Err(e) = self
.spawn_orchestrator_task(
directive.id,
directive.owner_id,
format!("Plan: {}", directive.title),
plan,
directive.repository_url.as_deref(),
directive.base_branch.as_deref(),
)
.await
{
tracing::warn!(
directive_id = %directive.id,
error = %e,
"Failed to spawn planning task"
);
}
}
Ok(())
}
/// Phase 2: Ready steps with no task → create execution task and dispatch.
/// Also retries pending directive tasks that weren't dispatched previously.
async fn phase_execution(&self) -> Result<(), anyhow::Error> {
// Create tasks for ready steps
let steps = repository::get_ready_steps_for_dispatch(&self.pool).await?;
for step in steps {
tracing::info!(
step_id = %step.step_id,
directive_id = %step.directive_id,
step_name = %step.step_name,
"Dispatching execution task for ready step"
);
// Resolve dependency steps to their task IDs for worktree continuation
let dep_tasks =
repository::get_step_dependency_tasks(&self.pool, &step.depends_on).await?;
let continue_from_task_id = dep_tasks.first().map(|d| d.task_id);
let task_plan = step
.task_plan
.as_deref()
.unwrap_or("Execute the step described below.");
// Load memories if memory is enabled for this directive
let memory_context = if step.memory_enabled {
match repository::list_directive_memories(&self.pool, step.directive_id, None).await {
Ok(memories) if !memories.is_empty() => {
format!("\n\nMEMORY CONTEXT (from previous planning/execution cycles):\n{}\n",
format_memories_for_prompt(&memories))
}
Ok(_) => String::new(),
Err(e) => {
tracing::warn!(
directive_id = %step.directive_id,
error = %e,
"Failed to load directive memories for execution — continuing without"
);
String::new()
}
}
} else {
String::new()
};
// Build merge instructions for additional dependencies (beyond the first)
let merge_preamble = if dep_tasks.len() > 1 {
use crate::daemon::worktree::{sanitize_name, short_uuid};
let merge_lines: Vec<String> = dep_tasks[1..]
.iter()
.map(|d| {
let branch = format!(
"makima/task-{}-{}",
sanitize_name(&d.task_name),
short_uuid(d.task_id)
);
format!("git merge origin/{} --no-edit", branch)
})
.collect();
format!(
"IMPORTANT — MERGE DEPENDENCY BRANCHES FIRST:\n\
This step continues from one dependency's worktree, but also depends on \
additional branches. Before starting work, run:\n\
```\ngit fetch origin\n{}\n```\n\
Resolve any merge conflicts sensibly, then proceed.\n\n",
merge_lines.join("\n"),
)
} else {
String::new()
};
let plan = format!(
"You are executing a step in directive \"{directive_title}\".\n\n\
STEP: {step_name}\n\
DESCRIPTION: {description}\n\n\
{merge_preamble}\
INSTRUCTIONS:\n{task_plan}\n\
{memory_context}\
When done, the system will automatically mark this step as completed.\n\
If you cannot complete the task, report the failure clearly.",
directive_title = step.directive_title,
step_name = step.step_name,
description = step.step_description.as_deref().unwrap_or("(none)"),
merge_preamble = merge_preamble,
task_plan = task_plan,
memory_context = memory_context,
);
match self
.spawn_step_task(
step.step_id,
step.directive_id,
step.owner_id,
format!("{}: {}", step.directive_title, step.step_name),
plan,
step.repository_url.as_deref(),
step.base_branch.as_deref(),
continue_from_task_id,
)
.await
{
Ok(()) => {}
Err(e) => {
tracing::warn!(
step_id = %step.step_id,
error = %e,
"Failed to spawn execution task for step"
);
}
}
}
// Retry pending directive tasks that weren't dispatched
let pending = repository::get_pending_directive_tasks(&self.pool).await?;
for task in pending {
if self
.try_dispatch_task(task.id, task.owner_id, &task.name, &task.plan, task.version)
.await
{
// Task dispatched — mark its step as running if it has one
if let Some(step_id) = task.directive_step_id {
let _ = repository::set_step_running(&self.pool, step_id).await;
}
}
}
Ok(())
}
/// Phase 3: Monitor running steps and orchestrator tasks.
async fn phase_monitoring(&self) -> Result<(), anyhow::Error> {
// Check running steps
let running = repository::get_running_steps_with_tasks(&self.pool).await?;
for step in running {
match step.task_status.as_str() {
"completed" | "merged" | "done" => {
tracing::info!(
step_id = %step.step_id,
directive_id = %step.directive_id,
task_id = %step.task_id,
"Step task completed — updating step to completed"
);
let update = crate::db::models::UpdateDirectiveStepRequest {
status: Some("completed".to_string()),
..Default::default()
};
repository::update_directive_step(&self.pool, step.step_id, update).await?;
repository::advance_directive_ready_steps(&self.pool, step.directive_id)
.await?;
repository::check_directive_idle(&self.pool, step.directive_id).await?;
}
"failed" | "interrupted" => {
tracing::warn!(
step_id = %step.step_id,
directive_id = %step.directive_id,
task_id = %step.task_id,
task_status = %step.task_status,
"Step task failed — updating step to failed"
);
let update = crate::db::models::UpdateDirectiveStepRequest {
status: Some("failed".to_string()),
..Default::default()
};
repository::update_directive_step(&self.pool, step.step_id, update).await?;
repository::advance_directive_ready_steps(&self.pool, step.directive_id)
.await?;
repository::check_directive_idle(&self.pool, step.directive_id).await?;
}
_ => {
// Still running — do nothing
}
}
}
// Check orchestrator (planning) tasks
let orch_tasks = repository::get_orchestrator_tasks_to_check(&self.pool).await?;
for orch in orch_tasks {
match orch.task_status.as_str() {
"completed" | "merged" | "done" => {
tracing::info!(
directive_id = %orch.directive_id,
task_id = %orch.orchestrator_task_id,
"Planning task completed — clearing orchestrator task"
);
repository::clear_orchestrator_task(&self.pool, orch.directive_id).await?;
// Advance DAG — planning task should have created steps
repository::advance_directive_ready_steps(&self.pool, orch.directive_id)
.await?;
}
"failed" | "interrupted" => {
tracing::warn!(
directive_id = %orch.directive_id,
task_id = %orch.orchestrator_task_id,
"Planning task failed — pausing directive"
);
repository::clear_orchestrator_task(&self.pool, orch.directive_id).await?;
repository::set_directive_status(
&self.pool,
orch.owner_id,
orch.directive_id,
"paused",
)
.await?;
}
_ => {}
}
}
Ok(())
}
/// Phase 4: Re-planning — goal updated after latest step creation.
async fn phase_replanning(&self) -> Result<(), anyhow::Error> {
let directives = repository::get_directives_needing_replanning(&self.pool).await?;
for directive in directives {
tracing::info!(
directive_id = %directive.id,
title = %directive.title,
"Directive goal updated — spawning re-planning task"
);
let existing_steps =
repository::list_directive_steps(&self.pool, directive.id).await?;
let generation =
repository::get_directive_max_generation(&self.pool, directive.id).await? + 1;
// Load memories if memory is enabled for this directive
let memories = if directive.memory_enabled {
match repository::list_directive_memories(&self.pool, directive.id, None).await {
Ok(m) => m,
Err(e) => {
tracing::warn!(
directive_id = %directive.id,
error = %e,
"Failed to load directive memories for re-planning — continuing without"
);
vec![]
}
}
} else {
vec![]
};
let plan = build_planning_prompt(&directive, &existing_steps, generation, &memories);
if let Err(e) = self
.spawn_orchestrator_task(
directive.id,
directive.owner_id,
format!("Re-plan: {}", directive.title),
plan,
directive.repository_url.as_deref(),
directive.base_branch.as_deref(),
)
.await
{
tracing::warn!(
directive_id = %directive.id,
error = %e,
"Failed to spawn re-planning task"
);
}
}
Ok(())
}
/// Spawn a planning/re-planning task and assign it as the directive's orchestrator task.
async fn spawn_orchestrator_task(
&self,
directive_id: Uuid,
owner_id: Uuid,
name: String,
plan: String,
repo_url: Option<&str>,
base_branch: Option<&str>,
) -> Result<(), anyhow::Error> {
let req = CreateTaskRequest {
contract_id: None,
name,
description: Some("Directive planning task".to_string()),
plan,
parent_task_id: None,
is_supervisor: false,
priority: 0,
repository_url: repo_url.map(|s| s.to_string()),
base_branch: base_branch.map(|s| s.to_string()),
target_branch: None,
merge_mode: None,
target_repo_path: None,
completion_action: None,
continue_from_task_id: None,
copy_files: None,
checkpoint_sha: None,
branched_from_task_id: None,
conversation_history: None,
supervisor_worktree_task_id: None,
directive_id: Some(directive_id),
directive_step_id: None,
};
let task = repository::create_task_for_owner(&self.pool, owner_id, req).await?;
repository::assign_orchestrator_task(&self.pool, directive_id, task.id).await?;
// Try to dispatch to a daemon
self.try_dispatch_task(task.id, owner_id, &task.name, &task.plan, task.version).await;
Ok(())
}
/// Spawn an execution task for a step.
/// Links the task to the step but only marks the step as 'running' once dispatched.
async fn spawn_step_task(
&self,
step_id: Uuid,
directive_id: Uuid,
owner_id: Uuid,
name: String,
plan: String,
repo_url: Option<&str>,
base_branch: Option<&str>,
continue_from_task_id: Option<Uuid>,
) -> Result<(), anyhow::Error> {
let req = CreateTaskRequest {
contract_id: None,
name,
description: Some("Directive step execution task".to_string()),
plan,
parent_task_id: None,
is_supervisor: false,
priority: 0,
repository_url: repo_url.map(|s| s.to_string()),
base_branch: base_branch.map(|s| s.to_string()),
target_branch: None,
merge_mode: None,
target_repo_path: None,
completion_action: Some("branch".to_string()),
continue_from_task_id,
copy_files: None,
checkpoint_sha: None,
branched_from_task_id: None,
conversation_history: None,
supervisor_worktree_task_id: None,
directive_id: Some(directive_id),
directive_step_id: Some(step_id),
};
let task = repository::create_task_for_owner(&self.pool, owner_id, req).await?;
// Link the task to the step (sets task_id) but keep step as 'ready' for now
repository::link_task_to_step(&self.pool, step_id, task.id).await?;
// Only mark step as 'running' if we can actually dispatch the task
if self
.try_dispatch_task(task.id, owner_id, &task.name, &task.plan, task.version)
.await
{
repository::set_step_running(&self.pool, step_id).await?;
}
Ok(())
}
/// Try to dispatch a task to an available daemon. Returns true if dispatched.
async fn try_dispatch_task(
&self,
task_id: Uuid,
owner_id: Uuid,
task_name: &str,
plan: &str,
version: i32,
) -> bool {
let Some(daemon_id) = self.state.find_alternative_daemon(owner_id, &[]) else {
tracing::info!(
task_id = %task_id,
"No daemon available for directive task — leaving pending for retry"
);
return false;
};
// Update task status to starting and assign daemon
let update_req = UpdateTaskRequest {
status: Some("starting".to_string()),
daemon_id: Some(daemon_id),
version: Some(version),
..Default::default()
};
match repository::update_task_for_owner(&self.pool, task_id, owner_id, update_req).await {
Ok(Some(updated_task)) => {
let command = DaemonCommand::SpawnTask {
task_id,
task_name: task_name.to_string(),
plan: plan.to_string(),
repo_url: updated_task.repository_url.clone(),
base_branch: updated_task.base_branch.clone(),
target_branch: updated_task.target_branch.clone(),
parent_task_id: None,
depth: 0,
is_orchestrator: false,
target_repo_path: None,
completion_action: updated_task.completion_action.clone(),
continue_from_task_id: updated_task.continue_from_task_id,
copy_files: None,
contract_id: None,
is_supervisor: false,
autonomous_loop: false,
resume_session: false,
conversation_history: None,
patch_data: None,
patch_base_sha: None,
local_only: false,
auto_merge_local: false,
supervisor_worktree_task_id: None,
directive_id: updated_task.directive_id,
};
if let Err(e) = self.state.send_daemon_command(daemon_id, command).await {
tracing::warn!(
task_id = %task_id,
daemon_id = %daemon_id,
error = %e,
"Failed to send SpawnTask to daemon for directive task"
);
return false;
} else {
tracing::info!(
task_id = %task_id,
daemon_id = %daemon_id,
"Dispatched directive task to daemon"
);
return true;
}
}
Ok(None) => {
tracing::warn!(task_id = %task_id, "Task not found when trying to dispatch");
}
Err(e) => {
tracing::warn!(task_id = %task_id, error = %e, "Failed to update task for dispatch");
}
}
false
}
/// Phase 5: Completion — spawn PR-creation tasks for idle directives.
async fn phase_completion(&self) -> Result<(), anyhow::Error> {
// Part 1: Spawn completion tasks for idle directives
let directives = repository::get_idle_directives_needing_completion(&self.pool).await?;
for directive in directives {
tracing::info!(
directive_id = %directive.id,
title = %directive.title,
"Directive idle — spawning completion task for PR"
);
let step_tasks = repository::get_completed_step_tasks(&self.pool, directive.id).await?;
if step_tasks.is_empty() {
continue;
}
let base_branch = directive
.base_branch
.as_deref()
.unwrap_or("main");
let directive_branch = format!(
"makima/directive-{}-{}",
crate::daemon::worktree::sanitize_name(&directive.title),
crate::daemon::worktree::short_uuid(directive.id),
);
// Compute step branch names using the same formula as execute_completion_action
let step_branches: Vec<String> = step_tasks
.iter()
.map(|st| {
format!(
"makima/{}-{}",
crate::daemon::worktree::sanitize_name(&st.task_name),
crate::daemon::worktree::short_uuid(st.task_id),
)
})
.collect();
let prompt = build_completion_prompt(
&directive,
&step_tasks,
&step_branches,
&directive_branch,
base_branch,
);
match self
.spawn_completion_task(
directive.id,
directive.owner_id,
format!("PR: {}", directive.title),
prompt,
directive.repository_url.as_deref(),
directive.base_branch.as_deref(),
)
.await
{
Ok(task_id) => {
// Store pr_branch on directive immediately
let update = crate::db::models::UpdateDirectiveRequest {
pr_branch: Some(directive_branch.clone()),
..Default::default()
};
let _ = repository::update_directive_for_owner(
&self.pool,
directive.owner_id,
directive.id,
update,
)
.await;
repository::assign_completion_task(&self.pool, directive.id, task_id).await?;
}
Err(e) => {
tracing::warn!(
directive_id = %directive.id,
error = %e,
"Failed to spawn completion task"
);
}
}
}
// Part 2: Monitor completion tasks
let checks = repository::get_completion_tasks_to_check(&self.pool).await?;
for check in checks {
match check.task_status.as_str() {
"completed" | "merged" | "done" => {
tracing::info!(
directive_id = %check.directive_id,
task_id = %check.completion_task_id,
"Completion task finished"
);
repository::clear_completion_task(&self.pool, check.directive_id).await?;
}
"failed" | "interrupted" => {
tracing::warn!(
directive_id = %check.directive_id,
task_id = %check.completion_task_id,
"Completion task failed"
);
repository::clear_completion_task(&self.pool, check.directive_id).await?;
}
_ => {
// Still running
}
}
}
Ok(())
}
/// Spawn a completion task that creates/updates a PR from step branches.
async fn spawn_completion_task(
&self,
directive_id: Uuid,
owner_id: Uuid,
name: String,
plan: String,
repo_url: Option<&str>,
base_branch: Option<&str>,
) -> Result<Uuid, anyhow::Error> {
let req = CreateTaskRequest {
contract_id: None,
name,
description: Some("Directive PR completion task".to_string()),
plan,
parent_task_id: None,
is_supervisor: false,
priority: 0,
repository_url: repo_url.map(|s| s.to_string()),
base_branch: base_branch.map(|s| s.to_string()),
target_branch: None,
merge_mode: None,
target_repo_path: None,
completion_action: None,
continue_from_task_id: None,
copy_files: None,
checkpoint_sha: None,
branched_from_task_id: None,
conversation_history: None,
supervisor_worktree_task_id: None,
directive_id: Some(directive_id),
directive_step_id: None,
};
let task = repository::create_task_for_owner(&self.pool, owner_id, req).await?;
// Try to dispatch to a daemon
self.try_dispatch_task(task.id, owner_id, &task.name, &task.plan, task.version)
.await;
Ok(task.id)
}
}
/// Format memory entries into a readable prompt section.
fn format_memories_for_prompt(memories: &[DirectiveMemory]) -> String {
let mut out = String::new();
for memory in memories {
let cat = memory.category.as_deref().unwrap_or("other");
out.push_str(&format!(
"- [{}] {}: {}\n",
cat, memory.key, memory.value
));
}
out
}
/// Build the planning prompt for a directive.
fn build_planning_prompt(
directive: &crate::db::models::Directive,
existing_steps: &[crate::db::models::DirectiveStep],
generation: i32,
memories: &[DirectiveMemory],
) -> String {
let mut prompt = String::new();
// Include memory context if available
if !memories.is_empty() {
prompt.push_str("MEMORY CONTEXT (insights and decisions from previous cycles):\n");
prompt.push_str(&format_memories_for_prompt(memories));
prompt.push_str("\nUse these memories to inform your planning. Avoid repeating past mistakes and build on prior insights.\n\n");
}
if !existing_steps.is_empty() {
prompt.push_str(&format!(
"EXISTING STEPS (generation {}):\n",
generation - 1
));
for step in existing_steps {
prompt.push_str(&format!(
"- {} [{}]: {}\n",
step.name,
step.status,
step.description.as_deref().unwrap_or("(no description)")
));
}
prompt.push_str(&format!(
"\nAdd new steps that build on or complement existing work. Use generation {}.\n\n",
generation
));
}
prompt.push_str(&format!(
r#"You are planning the implementation of a directive.
DIRECTIVE: "{title}"
GOAL: {goal}
{repo_section}
Your job:
1. Explore the repository to understand the codebase
2. Decompose the goal into concrete, ordered steps
3. Each step = one task for a Claude Code instance to execute
4. Submit ALL steps using the batch command or individual add-step commands
For each step, define:
- name: Short imperative title (e.g., "Add user authentication middleware")
- description: What to do and acceptance criteria
- taskPlan: Full instructions for the Claude instance (include file paths, patterns to follow)
- dependsOn: UUIDs of steps this depends on (use IDs from previous add-step responses)
- orderIndex: Execution phase number. Steps only start after ALL steps with a lower orderIndex complete.
Steps with the same orderIndex run in parallel. Use ascending values (0, 1, 2, ...) to create sequential phases.
Use dependsOn for fine-grained control within the same phase.
Submit steps:
makima directive add-step "Step Name" --description "..." --task-plan "..."
(Use --depends-on "uuid1,uuid2" for dependencies, referencing IDs from earlier add-step outputs)
Or batch:
makima directive batch-add-steps --json '[{{"name":"...","description":"...","taskPlan":"...","dependsOn":[],"orderIndex":0}}]'
DEPENDENCY WORKTREE CONTINUATION:
When a step has dependsOn, it automatically continues from the first dependency's worktree (inheriting
committed and uncommitted changes). If there are multiple dependencies, the first provides the base worktree
and additional dependency branches are merged in before work starts. Use this for incremental work chains.
IMPORTANT: Each step's taskPlan must be self-contained. The executing instance won't have your planning context.
"#,
title = directive.title,
goal = directive.goal,
repo_section = match &directive.repository_url {
Some(url) => format!("REPOSITORY: {}\n", url),
None => String::new(),
},
));
prompt
}
/// Build the prompt for a completion task that creates or updates a PR.
fn build_completion_prompt(
directive: &crate::db::models::Directive,
step_tasks: &[crate::db::repository::CompletedStepTask],
step_branches: &[String],
directive_branch: &str,
base_branch: &str,
) -> String {
let merge_commands: String = step_branches
.iter()
.map(|b| format!("git merge origin/{} --no-edit", b))
.collect::<Vec<_>>()
.join("\n");
let step_summary: String = step_tasks
.iter()
.zip(step_branches.iter())
.map(|(st, branch)| format!("- {} (branch: {})", st.step_name, branch))
.collect::<Vec<_>>()
.join("\n");
if directive.pr_url.is_some() {
// Re-completion: PR already exists, merge new branches into existing PR branch
format!(
r#"You are updating an existing PR for directive "{title}".
The PR branch `{directive_branch}` already exists. Merge any new step branches into it.
Steps completed:
{step_summary}
Run these commands:
```
git fetch origin
git checkout {directive_branch}
git pull origin {directive_branch}
{merge_commands}
git push origin {directive_branch}
```
Already-merged branches will be a no-op. If there are merge conflicts, resolve them sensibly.
"#,
title = directive.title,
directive_branch = directive_branch,
step_summary = step_summary,
merge_commands = merge_commands,
)
} else {
// First completion: create new PR
format!(
r#"You are creating a PR for directive "{title}".
Goal: {goal}
Steps completed:
{step_summary}
Run these commands to create a combined branch and PR:
```
git fetch origin
git checkout -b {directive_branch} origin/{base_branch}
{merge_commands}
git push -u origin {directive_branch}
```
Then create the PR:
```
gh pr create --title "{title}" --body "{pr_body}" --head {directive_branch} --base {base_branch}
```
After creating the PR, store the URL:
```
makima directive update --pr-url "<the PR URL from gh pr create output>"
```
If there are merge conflicts, resolve them sensibly before pushing.
"#,
title = directive.title,
goal = directive.goal,
directive_branch = directive_branch,
base_branch = base_branch,
step_summary = step_summary,
merge_commands = merge_commands,
pr_body = format!(
"## Directive\\n\\n{}\\n\\n## Steps\\n\\n{}",
directive.goal.replace('\n', "\\n").replace('"', "\\\""),
step_summary.replace('\n', "\\n").replace('"', "\\\""),
),
)
}
}