//! Directive orchestration — init, planning completion, chain advancement.
use serde::Deserialize;
use sqlx::PgPool;
use uuid::Uuid;
use crate::db::models::{
CreateContractRequest, CreateTaskRequest, Directive, Task, UpdateContractRequest,
};
use crate::db::repository;
use crate::server::state::SharedState;
/// A single step in the chain plan produced by the planning supervisor.
#[derive(Debug, Deserialize)]
struct ChainPlanStep {
name: String,
description: String,
task_plan: String,
#[serde(default)]
depends_on: Vec<String>, // names of steps this depends on
}
/// Wrapper for the plan JSON written by the planning supervisor.
#[derive(Debug, Deserialize)]
struct ChainPlan {
steps: Vec<ChainPlanStep>,
}
/// Initialize a directive: create a planning contract and transition to "planning".
pub async fn init_directive(
pool: &PgPool,
_state: &SharedState,
owner_id: Uuid,
directive_id: Uuid,
) -> Result<Directive, String> {
// 1. Get directive, verify status
let directive = repository::get_directive_for_owner(pool, directive_id, owner_id)
.await
.map_err(|e| format!("Failed to get directive: {}", e))?
.ok_or("Directive not found")?;
if directive.status != "draft" {
return Err(format!(
"Directive must be in 'draft' status to start, current status: '{}'",
directive.status
));
}
// 2. Create planning contract
let contract = repository::create_contract_for_owner(
pool,
owner_id,
CreateContractRequest {
name: format!("{} - Planning", directive.title),
description: Some(format!(
"Planning contract for directive: {}",
directive.title
)),
contract_type: Some("simple".to_string()),
template_id: None,
initial_phase: Some("plan".to_string()),
autonomous_loop: Some(true),
phase_guard: None,
local_only: Some(true),
auto_merge_local: None,
},
)
.await
.map_err(|e| format!("Failed to create planning contract: {}", e))?;
// 3. Mark contract as directive orchestrator
repository::set_contract_directive_fields(pool, contract.id, Some(directive_id), true)
.await
.map_err(|e| format!("Failed to set contract directive fields: {}", e))?;
// 4. Build planning prompt
let planning_prompt = build_planning_prompt(&directive);
// 5. Create supervisor task
let supervisor_task = repository::create_task_for_owner(
pool,
owner_id,
CreateTaskRequest {
contract_id: Some(contract.id),
name: format!("{} - Planner", directive.title),
description: Some("Decompose directive goal into executable chain steps".to_string()),
plan: planning_prompt,
parent_task_id: None,
is_supervisor: true,
priority: 10,
repository_url: directive.repository_url.clone(),
base_branch: directive.base_branch.clone(),
target_branch: None,
merge_mode: None,
target_repo_path: directive.local_path.clone(),
completion_action: None,
continue_from_task_id: None,
copy_files: None,
checkpoint_sha: None,
branched_from_task_id: None,
conversation_history: None,
supervisor_worktree_task_id: None,
},
)
.await
.map_err(|e| format!("Failed to create supervisor task: {}", e))?;
// 6. Link supervisor to contract
repository::update_contract_for_owner(
pool,
contract.id,
owner_id,
UpdateContractRequest {
supervisor_task_id: Some(supervisor_task.id),
..Default::default()
},
)
.await
.map_err(|e| match e {
crate::db::repository::RepositoryError::Database(e) => {
format!("Failed to link supervisor to contract: {}", e)
}
other => format!("Failed to link supervisor to contract: {:?}", other),
})?;
// 7. Set orchestrator_contract_id on directive
repository::set_directive_orchestrator_contract(pool, directive_id, contract.id)
.await
.map_err(|e| format!("Failed to set orchestrator contract: {}", e))?;
// 8. Transition directive to "planning"
let updated = repository::update_directive_status(pool, directive_id, "planning")
.await
.map_err(|e| format!("Failed to update directive status: {}", e))?
.ok_or("Directive not found after status update")?;
// 9. Copy repo config to contract if repository_url is set
if let Some(ref repo_url) = directive.repository_url {
let _ = repository::add_remote_repository(
pool,
contract.id,
"directive-repo",
repo_url,
true,
)
.await;
} else if let Some(ref local_path) = directive.local_path {
let _ = repository::add_local_repository(
pool,
contract.id,
"directive-repo",
local_path,
true,
)
.await;
}
tracing::info!(
directive_id = %directive_id,
contract_id = %contract.id,
task_id = %supervisor_task.id,
"Directive started: planning contract created"
);
Ok(updated)
}
/// Called when any task completes — checks if it's directive-related and advances.
pub async fn on_task_completed(
pool: &PgPool,
state: &SharedState,
task: &Task,
owner_id: Uuid,
) -> Result<(), String> {
let Some(contract_id) = task.contract_id else {
return Ok(());
};
let contract = repository::get_contract_for_owner(pool, contract_id, owner_id)
.await
.map_err(|e| format!("Failed to get contract: {}", e))?;
let Some(contract) = contract else {
return Ok(());
};
if contract.is_directive_orchestrator {
// This is a planning contract completion
let directive =
repository::get_directive_by_orchestrator_contract(pool, contract_id)
.await
.map_err(|e| format!("Failed to get directive by orchestrator: {}", e))?;
if let Some(directive) = directive {
on_planning_completed(pool, state, &directive, task, owner_id).await?;
}
} else if contract.directive_id.is_some() {
// This is a step contract completion
on_step_completed(pool, state, &contract, task, owner_id).await?;
}
Ok(())
}
/// Handle planning task completion: parse chain plan, create steps, advance.
async fn on_planning_completed(
pool: &PgPool,
state: &SharedState,
directive: &Directive,
task: &Task,
owner_id: Uuid,
) -> Result<(), String> {
// If task failed, fail the directive
if task.status == "failed" {
tracing::warn!(
directive_id = %directive.id,
task_id = %task.id,
"Planning task failed, marking directive as failed"
);
repository::update_directive_status(pool, directive.id, "failed")
.await
.map_err(|e| format!("Failed to update directive status: {}", e))?;
return Ok(());
}
// Only process when the supervisor task itself is done
if task.status != "done" || !task.is_supervisor {
return Ok(());
}
let Some(contract_id) = task.contract_id else {
return Ok(());
};
// Get contract files to find the chain plan
let files = repository::list_files_in_contract(pool, contract_id, owner_id)
.await
.map_err(|e| format!("Failed to list contract files: {}", e))?;
// Find the chain plan file
let plan_file = files.iter().find(|f| {
let name_lower = f.name.to_lowercase();
name_lower.contains("chain") || name_lower.contains("plan")
});
let plan_file = plan_file.or_else(|| files.first());
let Some(plan_file) = plan_file else {
tracing::warn!(
directive_id = %directive.id,
"No plan file found in planning contract, marking directive failed"
);
repository::update_directive_status(pool, directive.id, "failed")
.await
.map_err(|e| format!("Failed to update directive status: {}", e))?;
return Ok(());
};
// Read the full file to get the body content
let full_file = repository::get_file(pool, plan_file.id)
.await
.map_err(|e| format!("Failed to get plan file: {}", e))?
.ok_or("Plan file not found")?;
// Extract JSON from the file body elements
let plan_json = extract_plan_json(&full_file.body);
let Some(plan_json) = plan_json else {
tracing::warn!(
directive_id = %directive.id,
"Could not extract chain plan JSON from file body, marking directive failed"
);
repository::update_directive_status(pool, directive.id, "failed")
.await
.map_err(|e| format!("Failed to update directive status: {}", e))?;
return Ok(());
};
let chain_plan: ChainPlan = serde_json::from_str(&plan_json).map_err(|e| {
format!("Failed to parse chain plan JSON: {}", e)
})?;
if chain_plan.steps.is_empty() {
tracing::warn!(
directive_id = %directive.id,
"Chain plan has no steps, marking directive failed"
);
repository::update_directive_status(pool, directive.id, "failed")
.await
.map_err(|e| format!("Failed to update directive status: {}", e))?;
return Ok(());
}
// Create chain
let chain = repository::create_directive_chain(
pool,
directive.id,
&format!("{} - Chain", directive.title),
Some("Auto-generated from planning"),
None,
chain_plan.steps.len() as i32,
)
.await
.map_err(|e| format!("Failed to create directive chain: {}", e))?;
// Create steps (two passes: first create all, then resolve dependencies)
let mut step_ids: Vec<(String, Uuid)> = Vec::new();
for (i, plan_step) in chain_plan.steps.iter().enumerate() {
let step = repository::create_chain_step(
pool,
chain.id,
&plan_step.name,
Some(&plan_step.description),
"task",
"simple",
Some("plan"),
Some(&plan_step.task_plan),
None, // dependencies set in second pass
i as i32,
)
.await
.map_err(|e| format!("Failed to create chain step: {}", e))?;
step_ids.push((plan_step.name.clone(), step.id));
}
// Second pass: resolve name-based dependencies to UUIDs and update
for (i, plan_step) in chain_plan.steps.iter().enumerate() {
if plan_step.depends_on.is_empty() {
continue;
}
let dep_uuids: Vec<Uuid> = plan_step
.depends_on
.iter()
.filter_map(|dep_name| {
step_ids
.iter()
.find(|(name, _)| name == dep_name)
.map(|(_, id)| *id)
})
.collect();
if !dep_uuids.is_empty() {
let step_id = step_ids[i].1;
sqlx::query(
"UPDATE chain_steps SET depends_on = $2 WHERE id = $1",
)
.bind(step_id)
.bind(&dep_uuids)
.execute(pool)
.await
.map_err(|e| format!("Failed to update step dependencies: {}", e))?;
}
}
// Set current chain on directive
repository::set_directive_current_chain(pool, directive.id, chain.id)
.await
.map_err(|e| format!("Failed to set current chain: {}", e))?;
// Transition directive to active
let updated_directive = repository::update_directive_status(pool, directive.id, "active")
.await
.map_err(|e| format!("Failed to update directive status: {}", e))?
.ok_or("Directive not found after status update")?;
tracing::info!(
directive_id = %directive.id,
chain_id = %chain.id,
step_count = chain_plan.steps.len(),
"Chain plan created, advancing chain"
);
// Advance chain to dispatch ready steps
advance_chain(pool, state, &updated_directive, owner_id).await
}
/// Handle a step contract task completion.
async fn on_step_completed(
pool: &PgPool,
state: &SharedState,
contract: &crate::db::models::Contract,
task: &Task,
owner_id: Uuid,
) -> Result<(), String> {
// Only process supervisor task completions
if !task.is_supervisor {
return Ok(());
}
let Some(directive_id) = contract.directive_id else {
return Ok(());
};
// Find the step linked to this contract
let step = repository::get_step_by_contract_id(pool, contract.id)
.await
.map_err(|e| format!("Failed to get step by contract: {}", e))?;
let Some(step) = step else {
return Ok(());
};
// Update step status based on task outcome
let new_status = if task.status == "done" {
"passed"
} else {
"failed"
};
repository::update_step_status(pool, step.id, new_status)
.await
.map_err(|e| format!("Failed to update step status: {}", e))?;
tracing::info!(
directive_id = %directive_id,
step_id = %step.id,
step_name = %step.name,
new_status = new_status,
"Step completed"
);
// Get the directive and advance
let directive = repository::get_directive(pool, directive_id)
.await
.map_err(|e| format!("Failed to get directive: {}", e))?
.ok_or("Directive not found")?;
advance_chain(pool, state, &directive, owner_id).await
}
/// Check chain progress and dispatch ready steps or mark directive complete.
async fn advance_chain(
pool: &PgPool,
_state: &SharedState,
directive: &Directive,
owner_id: Uuid,
) -> Result<(), String> {
let Some(chain_id) = directive.current_chain_id else {
return Ok(());
};
let steps = repository::list_steps_for_chain(pool, chain_id)
.await
.map_err(|e| format!("Failed to list steps: {}", e))?;
// Check if all steps passed
let all_passed = steps.iter().all(|s| s.status == "passed");
if all_passed && !steps.is_empty() {
repository::update_chain_status(pool, chain_id, "completed")
.await
.map_err(|e| format!("Failed to update chain status: {}", e))?;
repository::update_directive_status(pool, directive.id, "completed")
.await
.map_err(|e| format!("Failed to update directive status: {}", e))?;
tracing::info!(directive_id = %directive.id, "Directive completed: all steps passed");
return Ok(());
}
// Check if any step failed
let any_failed = steps.iter().any(|s| s.status == "failed");
if any_failed {
repository::update_chain_status(pool, chain_id, "failed")
.await
.map_err(|e| format!("Failed to update chain status: {}", e))?;
repository::update_directive_status(pool, directive.id, "failed")
.await
.map_err(|e| format!("Failed to update directive status: {}", e))?;
tracing::info!(directive_id = %directive.id, "Directive failed: step failure detected");
return Ok(());
}
// Find and dispatch ready steps
let ready_steps = repository::find_ready_steps(pool, chain_id)
.await
.map_err(|e| format!("Failed to find ready steps: {}", e))?;
for step in ready_steps {
if let Err(e) = dispatch_step(pool, directive, &step, owner_id).await {
tracing::error!(
step_id = %step.id,
step_name = %step.name,
error = %e,
"Failed to dispatch step"
);
}
}
Ok(())
}
/// Dispatch a single chain step as a new contract with supervisor.
async fn dispatch_step(
pool: &PgPool,
directive: &Directive,
step: &crate::db::models::ChainStep,
owner_id: Uuid,
) -> Result<(), String> {
// Mark step as running
repository::update_step_status(pool, step.id, "running")
.await
.map_err(|e| format!("Failed to update step status: {}", e))?;
// Create contract for this step
let contract = repository::create_contract_for_owner(
pool,
owner_id,
CreateContractRequest {
name: step.name.clone(),
description: step.description.clone(),
contract_type: Some(step.contract_type.clone()),
template_id: None,
initial_phase: step.initial_phase.clone(),
autonomous_loop: Some(true),
phase_guard: None,
local_only: Some(true),
auto_merge_local: None,
},
)
.await
.map_err(|e| format!("Failed to create step contract: {}", e))?;
// Set directive_id on contract
repository::set_contract_directive_fields(pool, contract.id, Some(directive.id), false)
.await
.map_err(|e| format!("Failed to set contract directive fields: {}", e))?;
// Build the task plan
let task_plan = step
.task_plan
.clone()
.unwrap_or_else(|| format!("Execute step: {}", step.name));
// Create supervisor task
let supervisor_task = repository::create_task_for_owner(
pool,
owner_id,
CreateTaskRequest {
contract_id: Some(contract.id),
name: format!("{} Supervisor", step.name),
description: step.description.clone(),
plan: task_plan,
parent_task_id: None,
is_supervisor: true,
priority: 5,
repository_url: directive.repository_url.clone(),
base_branch: directive.base_branch.clone(),
target_branch: None,
merge_mode: None,
target_repo_path: directive.local_path.clone(),
completion_action: None,
continue_from_task_id: None,
copy_files: None,
checkpoint_sha: None,
branched_from_task_id: None,
conversation_history: None,
supervisor_worktree_task_id: None,
},
)
.await
.map_err(|e| format!("Failed to create step supervisor task: {}", e))?;
// Link supervisor to contract
repository::update_contract_for_owner(
pool,
contract.id,
owner_id,
UpdateContractRequest {
supervisor_task_id: Some(supervisor_task.id),
..Default::default()
},
)
.await
.map_err(|e| match e {
crate::db::repository::RepositoryError::Database(e) => {
format!("Failed to link supervisor to step contract: {}", e)
}
other => format!("Failed to link supervisor to step contract: {:?}", other),
})?;
// Link step to contract/task
repository::update_step_contract(pool, step.id, contract.id, supervisor_task.id)
.await
.map_err(|e| format!("Failed to update step contract link: {}", e))?;
// Copy repo config from directive to step contract
if let Some(ref repo_url) = directive.repository_url {
let _ = repository::add_remote_repository(
pool,
contract.id,
"directive-repo",
repo_url,
true,
)
.await;
} else if let Some(ref local_path) = directive.local_path {
let _ = repository::add_local_repository(
pool,
contract.id,
"directive-repo",
local_path,
true,
)
.await;
}
tracing::info!(
directive_id = %directive.id,
step_id = %step.id,
step_name = %step.name,
contract_id = %contract.id,
task_id = %supervisor_task.id,
"Step dispatched"
);
Ok(())
}
/// Build the planning supervisor prompt from a directive.
fn build_planning_prompt(directive: &Directive) -> String {
format!(
r#"You are planning the execution of a directive.
DIRECTIVE: {}
GOAL: {}
REQUIREMENTS: {}
ACCEPTANCE CRITERIA: {}
CONSTRAINTS: {}
Your job is to decompose this goal into a sequence of executable steps.
Each step will become a separate contract with its own supervisor.
Write a JSON plan to a contract file named "chain-plan" using:
makima contract create-file "chain-plan" < plan.json
The JSON format:
{{
"steps": [
{{
"name": "Step name",
"description": "What this step accomplishes",
"task_plan": "Detailed instructions for the step's supervisor",
"depends_on": []
}}
]
}}
Rules:
- Steps with no dependencies (empty depends_on array) will run in parallel.
- Steps that depend on other steps will wait until those complete.
- The depends_on array contains names of steps this step depends on.
- Each step should be a self-contained unit of work.
- Be specific in task_plan — include file paths, function names, and acceptance criteria where possible.
- Keep the number of steps reasonable (3-10 typically).
After writing the plan file, mark the contract as complete using:
makima supervisor complete"#,
directive.title,
directive.goal,
serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(),
serde_json::to_string_pretty(&directive.acceptance_criteria).unwrap_or_default(),
serde_json::to_string_pretty(&directive.constraints).unwrap_or_default(),
)
}
/// Extract JSON from file body elements.
fn extract_plan_json(body: &[crate::db::models::BodyElement]) -> Option<String> {
use crate::db::models::BodyElement;
for element in body {
match element {
BodyElement::Code { content, .. } => {
// Try to parse as JSON
let trimmed = content.trim();
if trimmed.starts_with('{') || trimmed.starts_with('[') {
if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
return Some(trimmed.to_string());
}
}
}
BodyElement::Paragraph { text } => {
let trimmed = text.trim();
if trimmed.starts_with('{') || trimmed.starts_with('[') {
if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
return Some(trimmed.to_string());
}
}
}
BodyElement::Markdown { content } => {
// Try to find JSON in markdown content
let trimmed = content.trim();
if trimmed.starts_with('{') || trimmed.starts_with('[') {
if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
return Some(trimmed.to_string());
}
}
// Try to find JSON in code blocks within markdown
if let Some(json_start) = trimmed.find("```json") {
let after = &trimmed[json_start + 7..];
if let Some(json_end) = after.find("```") {
let json_str = after[..json_end].trim();
if serde_json::from_str::<serde_json::Value>(json_str).is_ok() {
return Some(json_str.to_string());
}
}
}
}
_ => {}
}
}
// Fallback: concatenate all text content and try to find JSON
let all_text: String = body
.iter()
.map(|el| match el {
BodyElement::Code { content, .. } => content.clone(),
BodyElement::Paragraph { text } => text.clone(),
BodyElement::Markdown { content } => content.clone(),
_ => String::new(),
})
.collect::<Vec<_>>()
.join("\n");
let trimmed = all_text.trim();
if let Some(start) = trimmed.find('{') {
// Find matching closing brace
let substr = &trimmed[start..];
if serde_json::from_str::<serde_json::Value>(substr).is_ok() {
return Some(substr.to_string());
}
}
None
}