//! Directive orchestration — init, planning completion, chain advancement. use serde::Deserialize; use sqlx::PgPool; use uuid::Uuid; use crate::db::models::{ CreateContractRequest, CreateTaskRequest, Directive, Task, UpdateContractRequest, }; use crate::db::repository; use crate::server::state::SharedState; /// A single step in the chain plan produced by the planning supervisor. #[derive(Debug, Deserialize)] struct ChainPlanStep { name: String, description: String, task_plan: String, #[serde(default)] depends_on: Vec, // names of steps this depends on } /// Wrapper for the plan JSON written by the planning supervisor. #[derive(Debug, Deserialize)] struct ChainPlan { steps: Vec, } /// Initialize a directive: create a planning contract and transition to "planning". pub async fn init_directive( pool: &PgPool, _state: &SharedState, owner_id: Uuid, directive_id: Uuid, ) -> Result { // 1. Get directive, verify status let directive = repository::get_directive_for_owner(pool, directive_id, owner_id) .await .map_err(|e| format!("Failed to get directive: {}", e))? .ok_or("Directive not found")?; if directive.status != "draft" { return Err(format!( "Directive must be in 'draft' status to start, current status: '{}'", directive.status )); } // 2. Create planning contract let contract = repository::create_contract_for_owner( pool, owner_id, CreateContractRequest { name: format!("{} - Planning", directive.title), description: Some(format!( "Planning contract for directive: {}", directive.title )), contract_type: Some("simple".to_string()), template_id: None, initial_phase: Some("plan".to_string()), autonomous_loop: Some(true), phase_guard: None, local_only: Some(true), auto_merge_local: None, }, ) .await .map_err(|e| format!("Failed to create planning contract: {}", e))?; // 3. Mark contract as directive orchestrator repository::set_contract_directive_fields(pool, contract.id, Some(directive_id), true) .await .map_err(|e| format!("Failed to set contract directive fields: {}", e))?; // 4. Build planning prompt let planning_prompt = build_planning_prompt(&directive); // 5. Create supervisor task let supervisor_task = repository::create_task_for_owner( pool, owner_id, CreateTaskRequest { contract_id: Some(contract.id), name: format!("{} - Planner", directive.title), description: Some("Decompose directive goal into executable chain steps".to_string()), plan: planning_prompt, parent_task_id: None, is_supervisor: true, priority: 10, repository_url: directive.repository_url.clone(), base_branch: directive.base_branch.clone(), target_branch: None, merge_mode: None, target_repo_path: directive.local_path.clone(), completion_action: None, continue_from_task_id: None, copy_files: None, checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, supervisor_worktree_task_id: None, }, ) .await .map_err(|e| format!("Failed to create supervisor task: {}", e))?; // 6. Link supervisor to contract repository::update_contract_for_owner( pool, contract.id, owner_id, UpdateContractRequest { supervisor_task_id: Some(supervisor_task.id), ..Default::default() }, ) .await .map_err(|e| match e { crate::db::repository::RepositoryError::Database(e) => { format!("Failed to link supervisor to contract: {}", e) } other => format!("Failed to link supervisor to contract: {:?}", other), })?; // 7. Set orchestrator_contract_id on directive repository::set_directive_orchestrator_contract(pool, directive_id, contract.id) .await .map_err(|e| format!("Failed to set orchestrator contract: {}", e))?; // 8. Transition directive to "planning" let updated = repository::update_directive_status(pool, directive_id, "planning") .await .map_err(|e| format!("Failed to update directive status: {}", e))? .ok_or("Directive not found after status update")?; // 9. Copy repo config to contract if repository_url is set if let Some(ref repo_url) = directive.repository_url { let _ = repository::add_remote_repository( pool, contract.id, "directive-repo", repo_url, true, ) .await; } else if let Some(ref local_path) = directive.local_path { let _ = repository::add_local_repository( pool, contract.id, "directive-repo", local_path, true, ) .await; } tracing::info!( directive_id = %directive_id, contract_id = %contract.id, task_id = %supervisor_task.id, "Directive started: planning contract created" ); Ok(updated) } /// Called when any task completes — checks if it's directive-related and advances. pub async fn on_task_completed( pool: &PgPool, state: &SharedState, task: &Task, owner_id: Uuid, ) -> Result<(), String> { let Some(contract_id) = task.contract_id else { return Ok(()); }; let contract = repository::get_contract_for_owner(pool, contract_id, owner_id) .await .map_err(|e| format!("Failed to get contract: {}", e))?; let Some(contract) = contract else { return Ok(()); }; if contract.is_directive_orchestrator { // This is a planning contract completion let directive = repository::get_directive_by_orchestrator_contract(pool, contract_id) .await .map_err(|e| format!("Failed to get directive by orchestrator: {}", e))?; if let Some(directive) = directive { on_planning_completed(pool, state, &directive, task, owner_id).await?; } } else if contract.directive_id.is_some() { // This is a step contract completion on_step_completed(pool, state, &contract, task, owner_id).await?; } Ok(()) } /// Handle planning task completion: parse chain plan, create steps, advance. async fn on_planning_completed( pool: &PgPool, state: &SharedState, directive: &Directive, task: &Task, owner_id: Uuid, ) -> Result<(), String> { // If task failed, fail the directive if task.status == "failed" { tracing::warn!( directive_id = %directive.id, task_id = %task.id, "Planning task failed, marking directive as failed" ); repository::update_directive_status(pool, directive.id, "failed") .await .map_err(|e| format!("Failed to update directive status: {}", e))?; return Ok(()); } // Only process when the supervisor task itself is done if task.status != "done" || !task.is_supervisor { return Ok(()); } let Some(contract_id) = task.contract_id else { return Ok(()); }; // Get contract files to find the chain plan let files = repository::list_files_in_contract(pool, contract_id, owner_id) .await .map_err(|e| format!("Failed to list contract files: {}", e))?; // Find the chain plan file let plan_file = files.iter().find(|f| { let name_lower = f.name.to_lowercase(); name_lower.contains("chain") || name_lower.contains("plan") }); let plan_file = plan_file.or_else(|| files.first()); let Some(plan_file) = plan_file else { tracing::warn!( directive_id = %directive.id, "No plan file found in planning contract, marking directive failed" ); repository::update_directive_status(pool, directive.id, "failed") .await .map_err(|e| format!("Failed to update directive status: {}", e))?; return Ok(()); }; // Read the full file to get the body content let full_file = repository::get_file(pool, plan_file.id) .await .map_err(|e| format!("Failed to get plan file: {}", e))? .ok_or("Plan file not found")?; // Extract JSON from the file body elements let plan_json = extract_plan_json(&full_file.body); let Some(plan_json) = plan_json else { tracing::warn!( directive_id = %directive.id, "Could not extract chain plan JSON from file body, marking directive failed" ); repository::update_directive_status(pool, directive.id, "failed") .await .map_err(|e| format!("Failed to update directive status: {}", e))?; return Ok(()); }; let chain_plan: ChainPlan = serde_json::from_str(&plan_json).map_err(|e| { format!("Failed to parse chain plan JSON: {}", e) })?; if chain_plan.steps.is_empty() { tracing::warn!( directive_id = %directive.id, "Chain plan has no steps, marking directive failed" ); repository::update_directive_status(pool, directive.id, "failed") .await .map_err(|e| format!("Failed to update directive status: {}", e))?; return Ok(()); } // Create chain let chain = repository::create_directive_chain( pool, directive.id, &format!("{} - Chain", directive.title), Some("Auto-generated from planning"), None, chain_plan.steps.len() as i32, ) .await .map_err(|e| format!("Failed to create directive chain: {}", e))?; // Create steps (two passes: first create all, then resolve dependencies) let mut step_ids: Vec<(String, Uuid)> = Vec::new(); for (i, plan_step) in chain_plan.steps.iter().enumerate() { let step = repository::create_chain_step( pool, chain.id, &plan_step.name, Some(&plan_step.description), "task", "simple", Some("plan"), Some(&plan_step.task_plan), None, // dependencies set in second pass i as i32, ) .await .map_err(|e| format!("Failed to create chain step: {}", e))?; step_ids.push((plan_step.name.clone(), step.id)); } // Second pass: resolve name-based dependencies to UUIDs and update for (i, plan_step) in chain_plan.steps.iter().enumerate() { if plan_step.depends_on.is_empty() { continue; } let dep_uuids: Vec = plan_step .depends_on .iter() .filter_map(|dep_name| { step_ids .iter() .find(|(name, _)| name == dep_name) .map(|(_, id)| *id) }) .collect(); if !dep_uuids.is_empty() { let step_id = step_ids[i].1; sqlx::query( "UPDATE chain_steps SET depends_on = $2 WHERE id = $1", ) .bind(step_id) .bind(&dep_uuids) .execute(pool) .await .map_err(|e| format!("Failed to update step dependencies: {}", e))?; } } // Set current chain on directive repository::set_directive_current_chain(pool, directive.id, chain.id) .await .map_err(|e| format!("Failed to set current chain: {}", e))?; // Transition directive to active let updated_directive = repository::update_directive_status(pool, directive.id, "active") .await .map_err(|e| format!("Failed to update directive status: {}", e))? .ok_or("Directive not found after status update")?; tracing::info!( directive_id = %directive.id, chain_id = %chain.id, step_count = chain_plan.steps.len(), "Chain plan created, advancing chain" ); // Advance chain to dispatch ready steps advance_chain(pool, state, &updated_directive, owner_id).await } /// Handle a step contract task completion. async fn on_step_completed( pool: &PgPool, state: &SharedState, contract: &crate::db::models::Contract, task: &Task, owner_id: Uuid, ) -> Result<(), String> { // Only process supervisor task completions if !task.is_supervisor { return Ok(()); } let Some(directive_id) = contract.directive_id else { return Ok(()); }; // Find the step linked to this contract let step = repository::get_step_by_contract_id(pool, contract.id) .await .map_err(|e| format!("Failed to get step by contract: {}", e))?; let Some(step) = step else { return Ok(()); }; // Update step status based on task outcome let new_status = if task.status == "done" { "passed" } else { "failed" }; repository::update_step_status(pool, step.id, new_status) .await .map_err(|e| format!("Failed to update step status: {}", e))?; tracing::info!( directive_id = %directive_id, step_id = %step.id, step_name = %step.name, new_status = new_status, "Step completed" ); // Get the directive and advance let directive = repository::get_directive(pool, directive_id) .await .map_err(|e| format!("Failed to get directive: {}", e))? .ok_or("Directive not found")?; advance_chain(pool, state, &directive, owner_id).await } /// Check chain progress and dispatch ready steps or mark directive complete. async fn advance_chain( pool: &PgPool, _state: &SharedState, directive: &Directive, owner_id: Uuid, ) -> Result<(), String> { let Some(chain_id) = directive.current_chain_id else { return Ok(()); }; let steps = repository::list_steps_for_chain(pool, chain_id) .await .map_err(|e| format!("Failed to list steps: {}", e))?; // Check if all steps passed let all_passed = steps.iter().all(|s| s.status == "passed"); if all_passed && !steps.is_empty() { repository::update_chain_status(pool, chain_id, "completed") .await .map_err(|e| format!("Failed to update chain status: {}", e))?; repository::update_directive_status(pool, directive.id, "completed") .await .map_err(|e| format!("Failed to update directive status: {}", e))?; tracing::info!(directive_id = %directive.id, "Directive completed: all steps passed"); return Ok(()); } // Check if any step failed let any_failed = steps.iter().any(|s| s.status == "failed"); if any_failed { repository::update_chain_status(pool, chain_id, "failed") .await .map_err(|e| format!("Failed to update chain status: {}", e))?; repository::update_directive_status(pool, directive.id, "failed") .await .map_err(|e| format!("Failed to update directive status: {}", e))?; tracing::info!(directive_id = %directive.id, "Directive failed: step failure detected"); return Ok(()); } // Find and dispatch ready steps let ready_steps = repository::find_ready_steps(pool, chain_id) .await .map_err(|e| format!("Failed to find ready steps: {}", e))?; for step in ready_steps { if let Err(e) = dispatch_step(pool, directive, &step, owner_id).await { tracing::error!( step_id = %step.id, step_name = %step.name, error = %e, "Failed to dispatch step" ); } } Ok(()) } /// Dispatch a single chain step as a new contract with supervisor. async fn dispatch_step( pool: &PgPool, directive: &Directive, step: &crate::db::models::ChainStep, owner_id: Uuid, ) -> Result<(), String> { // Mark step as running repository::update_step_status(pool, step.id, "running") .await .map_err(|e| format!("Failed to update step status: {}", e))?; // Create contract for this step let contract = repository::create_contract_for_owner( pool, owner_id, CreateContractRequest { name: step.name.clone(), description: step.description.clone(), contract_type: Some(step.contract_type.clone()), template_id: None, initial_phase: step.initial_phase.clone(), autonomous_loop: Some(true), phase_guard: None, local_only: Some(true), auto_merge_local: None, }, ) .await .map_err(|e| format!("Failed to create step contract: {}", e))?; // Set directive_id on contract repository::set_contract_directive_fields(pool, contract.id, Some(directive.id), false) .await .map_err(|e| format!("Failed to set contract directive fields: {}", e))?; // Build the task plan let task_plan = step .task_plan .clone() .unwrap_or_else(|| format!("Execute step: {}", step.name)); // Create supervisor task let supervisor_task = repository::create_task_for_owner( pool, owner_id, CreateTaskRequest { contract_id: Some(contract.id), name: format!("{} Supervisor", step.name), description: step.description.clone(), plan: task_plan, parent_task_id: None, is_supervisor: true, priority: 5, repository_url: directive.repository_url.clone(), base_branch: directive.base_branch.clone(), target_branch: None, merge_mode: None, target_repo_path: directive.local_path.clone(), completion_action: None, continue_from_task_id: None, copy_files: None, checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, supervisor_worktree_task_id: None, }, ) .await .map_err(|e| format!("Failed to create step supervisor task: {}", e))?; // Link supervisor to contract repository::update_contract_for_owner( pool, contract.id, owner_id, UpdateContractRequest { supervisor_task_id: Some(supervisor_task.id), ..Default::default() }, ) .await .map_err(|e| match e { crate::db::repository::RepositoryError::Database(e) => { format!("Failed to link supervisor to step contract: {}", e) } other => format!("Failed to link supervisor to step contract: {:?}", other), })?; // Link step to contract/task repository::update_step_contract(pool, step.id, contract.id, supervisor_task.id) .await .map_err(|e| format!("Failed to update step contract link: {}", e))?; // Copy repo config from directive to step contract if let Some(ref repo_url) = directive.repository_url { let _ = repository::add_remote_repository( pool, contract.id, "directive-repo", repo_url, true, ) .await; } else if let Some(ref local_path) = directive.local_path { let _ = repository::add_local_repository( pool, contract.id, "directive-repo", local_path, true, ) .await; } tracing::info!( directive_id = %directive.id, step_id = %step.id, step_name = %step.name, contract_id = %contract.id, task_id = %supervisor_task.id, "Step dispatched" ); Ok(()) } /// Build the planning supervisor prompt from a directive. fn build_planning_prompt(directive: &Directive) -> String { format!( r#"You are planning the execution of a directive. DIRECTIVE: {} GOAL: {} REQUIREMENTS: {} ACCEPTANCE CRITERIA: {} CONSTRAINTS: {} Your job is to decompose this goal into a sequence of executable steps. Each step will become a separate contract with its own supervisor. Write a JSON plan to a contract file named "chain-plan" using: makima contract create-file "chain-plan" < plan.json The JSON format: {{ "steps": [ {{ "name": "Step name", "description": "What this step accomplishes", "task_plan": "Detailed instructions for the step's supervisor", "depends_on": [] }} ] }} Rules: - Steps with no dependencies (empty depends_on array) will run in parallel. - Steps that depend on other steps will wait until those complete. - The depends_on array contains names of steps this step depends on. - Each step should be a self-contained unit of work. - Be specific in task_plan — include file paths, function names, and acceptance criteria where possible. - Keep the number of steps reasonable (3-10 typically). After writing the plan file, mark the contract as complete using: makima supervisor complete"#, directive.title, directive.goal, serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(), serde_json::to_string_pretty(&directive.acceptance_criteria).unwrap_or_default(), serde_json::to_string_pretty(&directive.constraints).unwrap_or_default(), ) } /// Extract JSON from file body elements. fn extract_plan_json(body: &[crate::db::models::BodyElement]) -> Option { use crate::db::models::BodyElement; for element in body { match element { BodyElement::Code { content, .. } => { // Try to parse as JSON let trimmed = content.trim(); if trimmed.starts_with('{') || trimmed.starts_with('[') { if serde_json::from_str::(trimmed).is_ok() { return Some(trimmed.to_string()); } } } BodyElement::Paragraph { text } => { let trimmed = text.trim(); if trimmed.starts_with('{') || trimmed.starts_with('[') { if serde_json::from_str::(trimmed).is_ok() { return Some(trimmed.to_string()); } } } BodyElement::Markdown { content } => { // Try to find JSON in markdown content let trimmed = content.trim(); if trimmed.starts_with('{') || trimmed.starts_with('[') { if serde_json::from_str::(trimmed).is_ok() { return Some(trimmed.to_string()); } } // Try to find JSON in code blocks within markdown if let Some(json_start) = trimmed.find("```json") { let after = &trimmed[json_start + 7..]; if let Some(json_end) = after.find("```") { let json_str = after[..json_end].trim(); if serde_json::from_str::(json_str).is_ok() { return Some(json_str.to_string()); } } } } _ => {} } } // Fallback: concatenate all text content and try to find JSON let all_text: String = body .iter() .map(|el| match el { BodyElement::Code { content, .. } => content.clone(), BodyElement::Paragraph { text } => text.clone(), BodyElement::Markdown { content } => content.clone(), _ => String::new(), }) .collect::>() .join("\n"); let trimmed = all_text.trim(); if let Some(start) = trimmed.find('{') { // Find matching closing brace let substr = &trimmed[start..]; if serde_json::from_str::(substr).is_ok() { return Some(substr.to_string()); } } None }