//! Directive orchestrator — automates the full directive lifecycle.
//!
//! Runs as a background loop, polling every 15s to:
//! 1. Plan: active directives with no steps → spawn planning task
//! 2. Execute: ready steps → spawn execution tasks
//! 3. Monitor: detect task completion → update step status, advance DAG
//! 4. Re-plan: goal updated → spawn new planning task
use sqlx::PgPool;
use uuid::Uuid;
use crate::db::models::{CreateTaskRequest, UpdateTaskRequest};
use crate::db::repository;
use crate::server::state::{DaemonCommand, SharedState};
pub struct DirectiveOrchestrator {
pool: PgPool,
state: SharedState,
}
impl DirectiveOrchestrator {
pub fn new(pool: PgPool, state: SharedState) -> Self {
Self { pool, state }
}
/// Run one orchestration tick — called every 15s.
pub async fn tick(&mut self) -> Result<(), anyhow::Error> {
self.phase_planning().await?;
self.phase_execution().await?;
self.phase_monitoring().await?;
self.phase_replanning().await?;
Ok(())
}
/// Phase 1: Active directives with no steps and no orchestrator task → spawn planning task.
async fn phase_planning(&self) -> Result<(), anyhow::Error> {
let directives = repository::get_directives_needing_planning(&self.pool).await?;
for directive in directives {
tracing::info!(
directive_id = %directive.id,
title = %directive.title,
"Directive needs planning — spawning planning task"
);
let plan = build_planning_prompt(&directive, &[], 1);
if let Err(e) = self
.spawn_orchestrator_task(
directive.id,
directive.owner_id,
format!("Plan: {}", directive.title),
plan,
directive.repository_url.as_deref(),
directive.base_branch.as_deref(),
)
.await
{
tracing::warn!(
directive_id = %directive.id,
error = %e,
"Failed to spawn planning task"
);
}
}
Ok(())
}
/// Phase 2: Ready steps with no task → create execution task and dispatch.
async fn phase_execution(&self) -> Result<(), anyhow::Error> {
let steps = repository::get_ready_steps_for_dispatch(&self.pool).await?;
for step in steps {
tracing::info!(
step_id = %step.step_id,
directive_id = %step.directive_id,
step_name = %step.step_name,
"Dispatching execution task for ready step"
);
let task_plan = step
.task_plan
.as_deref()
.unwrap_or("Execute the step described below.");
let plan = format!(
"You are executing a step in directive \"{directive_title}\".\n\n\
STEP: {step_name}\n\
DESCRIPTION: {description}\n\n\
INSTRUCTIONS:\n{task_plan}\n\n\
When done, the system will automatically mark this step as completed.\n\
If you cannot complete the task, report the failure clearly.",
directive_title = step.directive_title,
step_name = step.step_name,
description = step.step_description.as_deref().unwrap_or("(none)"),
task_plan = task_plan,
);
match self
.spawn_step_task(
step.step_id,
step.directive_id,
step.owner_id,
format!("{}: {}", step.directive_title, step.step_name),
plan,
step.repository_url.as_deref(),
step.base_branch.as_deref(),
)
.await
{
Ok(()) => {}
Err(e) => {
tracing::warn!(
step_id = %step.step_id,
error = %e,
"Failed to spawn execution task for step"
);
}
}
}
Ok(())
}
/// Phase 3: Monitor running steps and orchestrator tasks.
async fn phase_monitoring(&self) -> Result<(), anyhow::Error> {
// Check running steps
let running = repository::get_running_steps_with_tasks(&self.pool).await?;
for step in running {
match step.task_status.as_str() {
"completed" | "merged" | "done" => {
tracing::info!(
step_id = %step.step_id,
directive_id = %step.directive_id,
task_id = %step.task_id,
"Step task completed — updating step to completed"
);
let update = crate::db::models::UpdateDirectiveStepRequest {
status: Some("completed".to_string()),
..Default::default()
};
repository::update_directive_step(&self.pool, step.step_id, update).await?;
repository::advance_directive_ready_steps(&self.pool, step.directive_id)
.await?;
repository::check_directive_idle(&self.pool, step.directive_id).await?;
}
"failed" | "interrupted" => {
tracing::warn!(
step_id = %step.step_id,
directive_id = %step.directive_id,
task_id = %step.task_id,
task_status = %step.task_status,
"Step task failed — updating step to failed"
);
let update = crate::db::models::UpdateDirectiveStepRequest {
status: Some("failed".to_string()),
..Default::default()
};
repository::update_directive_step(&self.pool, step.step_id, update).await?;
repository::advance_directive_ready_steps(&self.pool, step.directive_id)
.await?;
repository::check_directive_idle(&self.pool, step.directive_id).await?;
}
_ => {
// Still running — do nothing
}
}
}
// Check orchestrator (planning) tasks
let orch_tasks = repository::get_orchestrator_tasks_to_check(&self.pool).await?;
for orch in orch_tasks {
match orch.task_status.as_str() {
"completed" | "merged" | "done" => {
tracing::info!(
directive_id = %orch.directive_id,
task_id = %orch.orchestrator_task_id,
"Planning task completed — clearing orchestrator task"
);
repository::clear_orchestrator_task(&self.pool, orch.directive_id).await?;
// Advance DAG — planning task should have created steps
repository::advance_directive_ready_steps(&self.pool, orch.directive_id)
.await?;
}
"failed" | "interrupted" => {
tracing::warn!(
directive_id = %orch.directive_id,
task_id = %orch.orchestrator_task_id,
"Planning task failed — pausing directive"
);
repository::clear_orchestrator_task(&self.pool, orch.directive_id).await?;
repository::set_directive_status(
&self.pool,
orch.owner_id,
orch.directive_id,
"paused",
)
.await?;
}
_ => {}
}
}
Ok(())
}
/// Phase 4: Re-planning — goal updated after latest step creation.
async fn phase_replanning(&self) -> Result<(), anyhow::Error> {
let directives = repository::get_directives_needing_replanning(&self.pool).await?;
for directive in directives {
tracing::info!(
directive_id = %directive.id,
title = %directive.title,
"Directive goal updated — spawning re-planning task"
);
let existing_steps =
repository::list_directive_steps(&self.pool, directive.id).await?;
let generation =
repository::get_directive_max_generation(&self.pool, directive.id).await? + 1;
let plan = build_planning_prompt(&directive, &existing_steps, generation);
if let Err(e) = self
.spawn_orchestrator_task(
directive.id,
directive.owner_id,
format!("Re-plan: {}", directive.title),
plan,
directive.repository_url.as_deref(),
directive.base_branch.as_deref(),
)
.await
{
tracing::warn!(
directive_id = %directive.id,
error = %e,
"Failed to spawn re-planning task"
);
}
}
Ok(())
}
/// Spawn a planning/re-planning task and assign it as the directive's orchestrator task.
async fn spawn_orchestrator_task(
&self,
directive_id: Uuid,
owner_id: Uuid,
name: String,
plan: String,
repo_url: Option<&str>,
base_branch: Option<&str>,
) -> Result<(), anyhow::Error> {
let req = CreateTaskRequest {
contract_id: None,
name,
description: Some("Directive planning task".to_string()),
plan,
parent_task_id: None,
is_supervisor: false,
priority: 0,
repository_url: repo_url.map(|s| s.to_string()),
base_branch: base_branch.map(|s| s.to_string()),
target_branch: None,
merge_mode: None,
target_repo_path: None,
completion_action: None,
continue_from_task_id: None,
copy_files: None,
checkpoint_sha: None,
branched_from_task_id: None,
conversation_history: None,
supervisor_worktree_task_id: None,
directive_id: Some(directive_id),
directive_step_id: None,
};
let task = repository::create_task_for_owner(&self.pool, owner_id, req).await?;
repository::assign_orchestrator_task(&self.pool, directive_id, task.id).await?;
// Try to dispatch to a daemon
self.try_dispatch_task(task.id, owner_id, &task.name, &task.plan, task.version).await;
Ok(())
}
/// Spawn an execution task for a step and assign it.
async fn spawn_step_task(
&self,
step_id: Uuid,
directive_id: Uuid,
owner_id: Uuid,
name: String,
plan: String,
repo_url: Option<&str>,
base_branch: Option<&str>,
) -> Result<(), anyhow::Error> {
let req = CreateTaskRequest {
contract_id: None,
name,
description: Some("Directive step execution task".to_string()),
plan,
parent_task_id: None,
is_supervisor: false,
priority: 0,
repository_url: repo_url.map(|s| s.to_string()),
base_branch: base_branch.map(|s| s.to_string()),
target_branch: None,
merge_mode: None,
target_repo_path: None,
completion_action: None,
continue_from_task_id: None,
copy_files: None,
checkpoint_sha: None,
branched_from_task_id: None,
conversation_history: None,
supervisor_worktree_task_id: None,
directive_id: Some(directive_id),
directive_step_id: Some(step_id),
};
let task = repository::create_task_for_owner(&self.pool, owner_id, req).await?;
repository::assign_task_to_step(&self.pool, step_id, task.id).await?;
// Try to dispatch to a daemon
self.try_dispatch_task(task.id, owner_id, &task.name, &task.plan, task.version).await;
Ok(())
}
/// Try to dispatch a task to an available daemon. If none available, leave pending.
async fn try_dispatch_task(
&self,
task_id: Uuid,
owner_id: Uuid,
task_name: &str,
plan: &str,
version: i32,
) {
let Some(daemon_id) = self.state.find_alternative_daemon(owner_id, &[]) else {
tracing::info!(
task_id = %task_id,
"No daemon available for directive task — leaving pending for retry"
);
return;
};
// Update task status to starting and assign daemon
let update_req = UpdateTaskRequest {
status: Some("starting".to_string()),
daemon_id: Some(daemon_id),
version: Some(version),
..Default::default()
};
match repository::update_task_for_owner(&self.pool, task_id, owner_id, update_req).await {
Ok(Some(updated_task)) => {
let command = DaemonCommand::SpawnTask {
task_id,
task_name: task_name.to_string(),
plan: plan.to_string(),
repo_url: updated_task.repository_url.clone(),
base_branch: updated_task.base_branch.clone(),
target_branch: updated_task.target_branch.clone(),
parent_task_id: None,
depth: 0,
is_orchestrator: false,
target_repo_path: None,
completion_action: None,
continue_from_task_id: None,
copy_files: None,
contract_id: None,
is_supervisor: false,
autonomous_loop: false,
resume_session: false,
conversation_history: None,
patch_data: None,
patch_base_sha: None,
local_only: false,
auto_merge_local: false,
supervisor_worktree_task_id: None,
};
if let Err(e) = self.state.send_daemon_command(daemon_id, command).await {
tracing::warn!(
task_id = %task_id,
daemon_id = %daemon_id,
error = %e,
"Failed to send SpawnTask to daemon for directive task"
);
} else {
tracing::info!(
task_id = %task_id,
daemon_id = %daemon_id,
"Dispatched directive task to daemon"
);
}
}
Ok(None) => {
tracing::warn!(task_id = %task_id, "Task not found when trying to dispatch");
}
Err(e) => {
tracing::warn!(task_id = %task_id, error = %e, "Failed to update task for dispatch");
}
}
}
}
/// Build the planning prompt for a directive.
fn build_planning_prompt(
directive: &crate::db::models::Directive,
existing_steps: &[crate::db::models::DirectiveStep],
generation: i32,
) -> String {
let mut prompt = String::new();
if !existing_steps.is_empty() {
prompt.push_str(&format!(
"EXISTING STEPS (generation {}):\n",
generation - 1
));
for step in existing_steps {
prompt.push_str(&format!(
"- {} [{}]: {}\n",
step.name,
step.status,
step.description.as_deref().unwrap_or("(no description)")
));
}
prompt.push_str(&format!(
"\nAdd new steps that build on or complement existing work. Use generation {}.\n\n",
generation
));
}
prompt.push_str(&format!(
r#"You are planning the implementation of a directive.
DIRECTIVE: "{title}"
GOAL: {goal}
{repo_section}
Your job:
1. Explore the repository to understand the codebase
2. Decompose the goal into concrete, ordered steps
3. Each step = one task for a Claude Code instance to execute
4. Submit ALL steps using the batch command or individual add-step commands
For each step, define:
- name: Short imperative title (e.g., "Add user authentication middleware")
- description: What to do and acceptance criteria
- taskPlan: Full instructions for the Claude instance (include file paths, patterns to follow)
- dependsOn: UUIDs of steps this depends on (use IDs from previous add-step responses)
- orderIndex: Execution order hint
Submit steps:
makima directive add-step "Step Name" --description "..." --task-plan "..."
(Use --depends-on "uuid1,uuid2" for dependencies, referencing IDs from earlier add-step outputs)
Or batch:
makima directive batch-add-steps --json '[{{"name":"...","description":"...","taskPlan":"...","dependsOn":[],"orderIndex":0}}]'
IMPORTANT: Each step's taskPlan must be self-contained. The executing instance won't have your planning context.
"#,
title = directive.title,
goal = directive.goal,
repo_section = match &directive.repository_url {
Some(url) => format!("REPOSITORY: {}\n", url),
None => String::new(),
},
));
prompt
}