//! Directive orchestration — init, planning completion, chain advancement. use serde::Deserialize; use sqlx::PgPool; use uuid::Uuid; use serde::Serialize; use crate::db::models::{ ChainStep, CreateContractRequest, CreateTaskRequest, Directive, Task, UpdateContractRequest, UpdateTaskRequest, }; use crate::db::repository; use crate::server::state::{DaemonCommand, SharedState}; /// A single step in the chain plan produced by the planning supervisor. #[derive(Debug, Deserialize)] #[serde(rename_all = "snake_case")] struct ChainPlanStep { name: String, description: String, #[serde(alias = "taskPlan")] task_plan: String, #[serde(default, alias = "dependsOn")] depends_on: Vec, // names of steps this depends on } /// Wrapper for the plan JSON written by the planning supervisor. #[derive(Debug, Deserialize)] struct ChainPlan { steps: Vec, } /// Result written by the monitoring supervisor after evaluating a step. #[derive(Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] struct MonitoringResult { passed: bool, overall_score: Option, confidence_level: Option, #[serde(default)] criteria_results: serde_json::Value, #[serde(default)] summary_feedback: String, rework_instructions: Option, } /// Dispatch a task to an available daemon. Finds a connected daemon with capacity, /// assigns the task, and sends a SpawnTask command. async fn dispatch_task_to_daemon( pool: &PgPool, state: &SharedState, task: &Task, contract_local_only: bool, contract_auto_merge_local: bool, owner_id: Uuid, ) -> Result<(), String> { // Find available daemons let daemons = repository::get_available_daemons_excluding(pool, owner_id, &[]) .await .map_err(|e| format!("Failed to get available daemons: {}", e))?; let available_daemon = daemons.iter().find(|d| { d.current_task_count < d.max_concurrent_tasks && state.daemon_connections.contains_key(&d.connection_id) }); let daemon = match available_daemon { Some(d) => d, None => { tracing::warn!( task_id = %task.id, "No daemon available to dispatch task — will be picked up by retry loop" ); return Ok(()); } }; // Assign task to daemon let update_req = UpdateTaskRequest { status: Some("starting".to_string()), daemon_id: Some(daemon.id), version: Some(task.version), ..Default::default() }; let updated = repository::update_task_for_owner(pool, task.id, owner_id, update_req) .await .map_err(|e| format!("Failed to assign task to daemon: {:?}", e))?; let Some(updated_task) = updated else { return Err("Task not found when assigning to daemon".to_string()); }; // Get repo URL from task or contract repositories let repo_url = if let Some(url) = &updated_task.repository_url { Some(url.clone()) } else if let Some(contract_id) = updated_task.contract_id { match repository::list_contract_repositories(pool, contract_id).await { Ok(repos) => repos .iter() .find(|r| r.is_primary) .or(repos.first()) .and_then(|r| r.repository_url.clone().or_else(|| r.local_path.clone())), Err(_) => None, } } else { None }; let cmd = DaemonCommand::SpawnTask { task_id: updated_task.id, task_name: updated_task.name.clone(), plan: updated_task.plan.clone(), repo_url, base_branch: updated_task.base_branch.clone(), target_branch: updated_task.target_branch.clone(), parent_task_id: updated_task.parent_task_id, depth: updated_task.depth, is_orchestrator: false, target_repo_path: updated_task.target_repo_path.clone(), completion_action: updated_task.completion_action.clone(), continue_from_task_id: updated_task.continue_from_task_id, copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: updated_task.contract_id, is_supervisor: updated_task.is_supervisor, autonomous_loop: updated_task.contract_id.is_some(), resume_session: false, conversation_history: None, patch_data: None, patch_base_sha: None, local_only: contract_local_only, auto_merge_local: contract_auto_merge_local, supervisor_worktree_task_id: None, }; if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { tracing::warn!( task_id = %task.id, daemon_id = %daemon.id, error = %e, "Failed to send spawn command — rolling back" ); let rollback = UpdateTaskRequest { status: Some("pending".to_string()), clear_daemon_id: true, ..Default::default() }; let _ = repository::update_task_for_owner(pool, task.id, owner_id, rollback).await; return Ok(()); // Non-fatal, retry loop will pick it up } tracing::info!( task_id = %task.id, daemon_id = %daemon.id, "Dispatched directive task to daemon" ); Ok(()) } /// Initialize a directive: create a planning contract and transition to "planning". pub async fn init_directive( pool: &PgPool, state: &SharedState, owner_id: Uuid, directive_id: Uuid, ) -> Result { // 1. Get directive, verify status let directive = repository::get_directive_for_owner(pool, directive_id, owner_id) .await .map_err(|e| format!("Failed to get directive: {}", e))? .ok_or("Directive not found")?; if directive.status != "draft" { return Err(format!( "Directive must be in 'draft' status to start, current status: '{}'", directive.status )); } // 2. Create planning contract let contract = repository::create_contract_for_owner( pool, owner_id, CreateContractRequest { name: format!("{} - Planning", directive.title), description: Some(format!( "Planning contract for directive: {}", directive.title )), contract_type: Some("simple".to_string()), template_id: None, initial_phase: Some("plan".to_string()), autonomous_loop: Some(true), phase_guard: None, local_only: Some(true), auto_merge_local: None, }, ) .await .map_err(|e| format!("Failed to create planning contract: {}", e))?; // 3. Mark contract as directive orchestrator repository::set_contract_directive_fields(pool, contract.id, Some(directive_id), true) .await .map_err(|e| format!("Failed to set contract directive fields: {}", e))?; // 4. Build planning prompt let planning_prompt = build_planning_prompt(&directive); // 5. Create supervisor task let supervisor_task = repository::create_task_for_owner( pool, owner_id, CreateTaskRequest { contract_id: Some(contract.id), name: format!("{} - Planner", directive.title), description: Some("Decompose directive goal into executable chain steps".to_string()), plan: planning_prompt, parent_task_id: None, is_supervisor: true, priority: 10, repository_url: directive.repository_url.clone(), base_branch: directive.base_branch.clone(), target_branch: None, merge_mode: None, target_repo_path: directive.local_path.clone(), completion_action: None, continue_from_task_id: None, copy_files: None, checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, supervisor_worktree_task_id: None, }, ) .await .map_err(|e| format!("Failed to create supervisor task: {}", e))?; // 6. Link supervisor to contract repository::update_contract_for_owner( pool, contract.id, owner_id, UpdateContractRequest { supervisor_task_id: Some(supervisor_task.id), ..Default::default() }, ) .await .map_err(|e| match e { crate::db::repository::RepositoryError::Database(e) => { format!("Failed to link supervisor to contract: {}", e) } other => format!("Failed to link supervisor to contract: {:?}", other), })?; // 7. Set orchestrator_contract_id on directive repository::set_directive_orchestrator_contract(pool, directive_id, contract.id) .await .map_err(|e| format!("Failed to set orchestrator contract: {}", e))?; // 8. Transition directive to "planning" let updated = repository::update_directive_status(pool, directive_id, "planning") .await .map_err(|e| format!("Failed to update directive status: {}", e))? .ok_or("Directive not found after status update")?; // 9. Copy repo config to contract if repository_url is set if let Some(ref repo_url) = directive.repository_url { let _ = repository::add_remote_repository( pool, contract.id, "directive-repo", repo_url, true, ) .await; } else if let Some(ref local_path) = directive.local_path { let _ = repository::add_local_repository( pool, contract.id, "directive-repo", local_path, true, ) .await; } tracing::info!( directive_id = %directive_id, contract_id = %contract.id, task_id = %supervisor_task.id, "Directive started: planning contract created" ); // 10. Dispatch planning task to an available daemon immediately dispatch_task_to_daemon( pool, state, &supervisor_task, contract.local_only, contract.auto_merge_local, owner_id, ).await?; Ok(updated) } /// Submit a chain plan for a directive via the CLI/API (instead of file-based extraction). pub async fn submit_plan( pool: &PgPool, state: &SharedState, owner_id: Uuid, directive_id: Uuid, plan_json: &str, ) -> Result { // 1. Get directive, verify status let directive = repository::get_directive_for_owner(pool, directive_id, owner_id) .await .map_err(|e| format!("Failed to get directive: {}", e))? .ok_or("Directive not found")?; if directive.status != "planning" { return Err(format!( "Directive must be in 'planning' status to submit a plan, current status: '{}'", directive.status )); } // 2. Idempotency: if current_chain_id already set, return existing directive if directive.current_chain_id.is_some() { tracing::info!( directive_id = %directive_id, "Plan already submitted (current_chain_id set), returning existing directive" ); return Ok(directive); } // 3. Parse the plan JSON let chain_plan: ChainPlan = serde_json::from_str(plan_json) .map_err(|e| format!("Failed to parse chain plan JSON: {}", e))?; if chain_plan.steps.is_empty() { return Err("Chain plan has no steps".to_string()); } // 4. Create chain and steps, transition to active create_chain_and_steps(pool, state, &directive, &chain_plan, owner_id).await?; // 5. Re-fetch and return the updated directive let updated = repository::get_directive(pool, directive_id) .await .map_err(|e| format!("Failed to re-fetch directive: {}", e))? .ok_or("Directive not found after plan submission")?; tracing::info!( directive_id = %directive_id, step_count = chain_plan.steps.len(), "Plan submitted via API, directive now active" ); Ok(updated) } /// Called when any task completes — checks if it's directive-related and advances. /// Called when a contract's status is updated to "completed" via the API. /// This is the primary entry point for directive orchestration because supervisor /// tasks do not send TaskComplete messages — they complete via contract status updates. pub async fn on_contract_completed( pool: &PgPool, state: &SharedState, contract: &crate::db::models::Contract, owner_id: Uuid, ) -> Result<(), String> { if contract.status != "completed" { return Ok(()); } if contract.is_directive_orchestrator { let directive = repository::get_directive_by_orchestrator_contract(pool, contract.id) .await .map_err(|e| format!("Failed to get directive by orchestrator: {}", e))?; if let Some(directive) = directive { tracing::info!( directive_id = %directive.id, contract_id = %contract.id, "Directive orchestrator contract completed, handling planning completion" ); handle_planning_completion(pool, state, &directive, owner_id).await?; } else { tracing::warn!( contract_id = %contract.id, "Directive orchestrator contract completed but no directive found" ); } } else if let Some(directive_id) = contract.directive_id { // Check if this is a monitoring contract let monitoring_step = repository::get_step_by_monitoring_contract_id(pool, contract.id) .await .map_err(|e| format!("Failed to check monitoring contract: {}", e))?; if let Some(step) = monitoring_step { tracing::info!( directive_id = %directive_id, step_id = %step.id, contract_id = %contract.id, "Monitoring contract completed" ); process_monitoring_result(pool, state, contract, &step, owner_id).await?; } else { // Step contract completed let step = repository::get_step_by_contract_id(pool, contract.id) .await .map_err(|e| format!("Failed to get step by contract: {}", e))?; if let Some(step) = step { // Idempotency: only dispatch monitoring if step is still "running" // (on_step_completed may also fire via the task path) if step.status != "running" { tracing::info!( step_id = %step.id, status = %step.status, "Skipping step contract completion: step no longer running" ); return Ok(()); } let directive = repository::get_directive(pool, directive_id) .await .map_err(|e| format!("Failed to get directive: {}", e))? .ok_or("Directive not found")?; tracing::info!( directive_id = %directive_id, step_id = %step.id, contract_id = %contract.id, "Step contract completed, dispatching monitoring" ); // Step contract completed successfully — dispatch monitoring repository::update_step_status(pool, step.id, "evaluating") .await .map_err(|e| format!("Failed to update step status: {}", e))?; let _ = repository::create_directive_event( pool, directive.id, directive.current_chain_id, Some(step.id), "step_evaluating", "info", None, "system", None, ) .await; dispatch_monitoring(pool, state, &directive, &step, contract, owner_id).await?; } } } Ok(()) } pub async fn on_task_completed( pool: &PgPool, state: &SharedState, task: &Task, owner_id: Uuid, ) -> Result<(), String> { let Some(contract_id) = task.contract_id else { return Ok(()); }; let contract = repository::get_contract_for_owner(pool, contract_id, owner_id) .await .map_err(|e| format!("Failed to get contract: {}", e))?; let Some(contract) = contract else { return Ok(()); }; if contract.is_directive_orchestrator { // This is a planning contract completion let directive = repository::get_directive_by_orchestrator_contract(pool, contract_id) .await .map_err(|e| format!("Failed to get directive by orchestrator: {}", e))?; if let Some(directive) = directive { on_planning_completed(pool, state, &directive, task, owner_id).await?; } } else if contract.directive_id.is_some() { // Check if this is a monitoring contract completion let monitoring_step = repository::get_step_by_monitoring_contract_id(pool, contract_id) .await .map_err(|e| format!("Failed to check monitoring contract: {}", e))?; if let Some(step) = monitoring_step { on_monitoring_completed(pool, state, &contract, &step, task, owner_id).await?; } else { // This is a step contract completion on_step_completed(pool, state, &contract, task, owner_id).await?; } } Ok(()) } /// Handle planning task completion: parse chain plan, create steps, advance. async fn on_planning_completed( pool: &PgPool, state: &SharedState, directive: &Directive, task: &Task, owner_id: Uuid, ) -> Result<(), String> { // If task failed, fail the directive if task.status == "failed" { tracing::warn!( directive_id = %directive.id, task_id = %task.id, "Planning task failed, marking directive as failed" ); repository::update_directive_status(pool, directive.id, "failed") .await .map_err(|e| format!("Failed to update directive status: {}", e))?; return Ok(()); } // Only process when the supervisor task itself is done if task.status != "done" || !task.is_supervisor { return Ok(()); } handle_planning_completion(pool, state, directive, owner_id).await } /// Handle planning contract/task completion. /// Checks if a plan was submitted via the CLI; if not, retries or fails. async fn handle_planning_completion( pool: &PgPool, state: &SharedState, directive: &Directive, owner_id: Uuid, ) -> Result<(), String> { // Re-fetch directive to check latest state let current = repository::get_directive(pool, directive.id) .await .map_err(|e| format!("Failed to re-fetch directive: {}", e))? .ok_or("Directive not found")?; // Idempotency: only process if still in "planning" status if current.status != "planning" { tracing::info!( directive_id = %directive.id, status = %current.status, "Skipping handle_planning_completion: directive no longer in planning status" ); return Ok(()); } // If plan was already submitted via CLI (current_chain_id is set), nothing to do if current.current_chain_id.is_some() { tracing::info!( directive_id = %directive.id, "Plan already submitted via CLI, skipping handle_planning_completion" ); return Ok(()); } // No plan was submitted — check retry budget let max_regenerations = current.max_chain_regenerations.unwrap_or(2); if current.chain_generation_count < max_regenerations { tracing::warn!( directive_id = %directive.id, attempt = current.chain_generation_count + 1, max = max_regenerations, "Planning completed without plan submission, retrying" ); let _ = repository::create_directive_event( pool, directive.id, None, None, "planning_retry", "warn", Some(&serde_json::json!({ "attempt": current.chain_generation_count + 1, "maxRegenerations": max_regenerations, "reason": "Planning contract completed without submitting a plan" })), "system", None, ) .await; // Increment generation count repository::increment_chain_generation_count(pool, directive.id) .await .map_err(|e| format!("Failed to increment chain generation count: {}", e))?; // Reset to draft so init_directive can be called again repository::update_directive_status(pool, directive.id, "draft") .await .map_err(|e| format!("Failed to reset directive status: {}", e))?; // Re-init planning init_directive(pool, state, owner_id, directive.id).await?; Ok(()) } else { tracing::error!( directive_id = %directive.id, attempts = current.chain_generation_count, max = max_regenerations, "Planning failed: max regeneration attempts exhausted without plan submission" ); let _ = repository::create_directive_event( pool, directive.id, None, None, "planning_failed", "error", Some(&serde_json::json!({ "attempts": current.chain_generation_count, "maxRegenerations": max_regenerations, "reason": "Max chain regeneration attempts exhausted without plan submission" })), "system", None, ) .await; repository::update_directive_status(pool, directive.id, "failed") .await .map_err(|e| format!("Failed to update directive status: {}", e))?; Ok(()) } } /// Inner helper: create chain, steps, set current chain, transition to active, and advance. /// Extracted so that `process_planning_result` can catch errors and mark the directive failed. async fn create_chain_and_steps( pool: &PgPool, state: &SharedState, directive: &Directive, chain_plan: &ChainPlan, owner_id: Uuid, ) -> Result<(), String> { // Create chain let chain = repository::create_directive_chain( pool, directive.id, &format!("{} - Chain", directive.title), Some("Auto-generated from planning"), None, chain_plan.steps.len() as i32, ) .await .map_err(|e| format!("Failed to create directive chain: {}", e))?; // Create steps (two passes: first create all, then resolve dependencies) let mut step_ids: Vec<(String, Uuid)> = Vec::new(); for (i, plan_step) in chain_plan.steps.iter().enumerate() { let step = repository::create_chain_step( pool, chain.id, &plan_step.name, Some(&plan_step.description), "task", "simple", Some("plan"), Some(&plan_step.task_plan), None, // dependencies set in second pass i as i32, ) .await .map_err(|e| format!("Failed to create chain step: {}", e))?; step_ids.push((plan_step.name.clone(), step.id)); } // Second pass: resolve name-based dependencies to UUIDs and update for (i, plan_step) in chain_plan.steps.iter().enumerate() { if plan_step.depends_on.is_empty() { continue; } let dep_uuids: Vec = plan_step .depends_on .iter() .filter_map(|dep_name| { step_ids .iter() .find(|(name, _)| name == dep_name) .map(|(_, id)| *id) }) .collect(); if !dep_uuids.is_empty() { let step_id = step_ids[i].1; sqlx::query( "UPDATE chain_steps SET depends_on = $2 WHERE id = $1", ) .bind(step_id) .bind(&dep_uuids) .execute(pool) .await .map_err(|e| format!("Failed to update step dependencies: {}", e))?; } } // Set current chain on directive repository::set_directive_current_chain(pool, directive.id, chain.id) .await .map_err(|e| format!("Failed to set current chain: {}", e))?; // Transition directive to active let updated_directive = repository::update_directive_status(pool, directive.id, "active") .await .map_err(|e| format!("Failed to update directive status: {}", e))? .ok_or("Directive not found after status update")?; tracing::info!( directive_id = %directive.id, chain_id = %chain.id, step_count = chain_plan.steps.len(), "Chain plan created, advancing chain" ); // Advance chain to dispatch ready steps advance_chain(pool, state, &updated_directive, owner_id).await } /// Handle a step contract task completion. async fn on_step_completed( pool: &PgPool, state: &SharedState, contract: &crate::db::models::Contract, task: &Task, owner_id: Uuid, ) -> Result<(), String> { // Only process supervisor task completions if !task.is_supervisor { return Ok(()); } let Some(directive_id) = contract.directive_id else { return Ok(()); }; // Find the step linked to this contract let step = repository::get_step_by_contract_id(pool, contract.id) .await .map_err(|e| format!("Failed to get step by contract: {}", e))?; let Some(step) = step else { return Ok(()); }; // Idempotency: only process if step is still "running" // (on_contract_completed may also fire via the contract path) if step.status != "running" { tracing::info!( step_id = %step.id, status = %step.status, "Skipping on_step_completed: step no longer running" ); return Ok(()); } // Get the directive for threshold info let directive = repository::get_directive(pool, directive_id) .await .map_err(|e| format!("Failed to get directive: {}", e))? .ok_or("Directive not found")?; if task.status == "done" { // Step task succeeded — dispatch monitoring evaluation repository::update_step_status(pool, step.id, "evaluating") .await .map_err(|e| format!("Failed to update step status: {}", e))?; let _ = repository::create_directive_event( pool, directive.id, directive.current_chain_id, Some(step.id), "step_evaluating", "info", None, "system", None, ) .await; tracing::info!( directive_id = %directive_id, step_id = %step.id, step_name = %step.name, "Step task done, dispatching monitoring evaluation" ); dispatch_monitoring(pool, state, &directive, &step, contract, owner_id).await } else { // Step task failed — mark step failed and advance repository::update_step_status(pool, step.id, "failed") .await .map_err(|e| format!("Failed to update step status: {}", e))?; let _ = repository::increment_chain_failed_steps(pool, step.chain_id).await; tracing::info!( directive_id = %directive_id, step_id = %step.id, step_name = %step.name, "Step failed" ); advance_chain(pool, state, &directive, owner_id).await } } /// Check chain progress and dispatch ready steps or mark directive complete. async fn advance_chain( pool: &PgPool, state: &SharedState, directive: &Directive, owner_id: Uuid, ) -> Result<(), String> { let Some(chain_id) = directive.current_chain_id else { return Ok(()); }; let steps = repository::list_steps_for_chain(pool, chain_id) .await .map_err(|e| format!("Failed to list steps: {}", e))?; // Check if all steps passed let all_passed = steps.iter().all(|s| s.status == "passed"); if all_passed && !steps.is_empty() { repository::update_chain_status(pool, chain_id, "completed") .await .map_err(|e| format!("Failed to update chain status: {}", e))?; repository::update_directive_status(pool, directive.id, "completed") .await .map_err(|e| format!("Failed to update directive status: {}", e))?; tracing::info!(directive_id = %directive.id, "Directive completed: all steps passed"); return Ok(()); } // Check if any step failed let any_failed = steps.iter().any(|s| s.status == "failed"); if any_failed { repository::update_chain_status(pool, chain_id, "failed") .await .map_err(|e| format!("Failed to update chain status: {}", e))?; repository::update_directive_status(pool, directive.id, "failed") .await .map_err(|e| format!("Failed to update directive status: {}", e))?; tracing::info!(directive_id = %directive.id, "Directive failed: step failure detected"); return Ok(()); } // Find and dispatch ready steps let ready_steps = repository::find_ready_steps(pool, chain_id) .await .map_err(|e| format!("Failed to find ready steps: {}", e))?; for step in ready_steps { if let Err(e) = dispatch_step(pool, state, directive, &step, owner_id).await { tracing::error!( step_id = %step.id, step_name = %step.name, error = %e, "Failed to dispatch step" ); } } Ok(()) } /// Dispatch a single chain step as a new contract with supervisor. async fn dispatch_step( pool: &PgPool, state: &SharedState, directive: &Directive, step: &crate::db::models::ChainStep, owner_id: Uuid, ) -> Result<(), String> { // Mark step as running repository::update_step_status(pool, step.id, "running") .await .map_err(|e| format!("Failed to update step status: {}", e))?; // Create contract for this step. // Step contracts use the directive's repository config — not local_only, // so they can branch and merge to share work across steps. let has_repo = directive.repository_url.is_some() || directive.local_path.is_some(); let contract = repository::create_contract_for_owner( pool, owner_id, CreateContractRequest { name: step.name.clone(), description: step.description.clone(), contract_type: Some(step.contract_type.clone()), template_id: None, initial_phase: step.initial_phase.clone(), autonomous_loop: Some(true), phase_guard: None, local_only: Some(!has_repo), auto_merge_local: if has_repo { Some(true) } else { None }, }, ) .await .map_err(|e| format!("Failed to create step contract: {}", e))?; // Set directive_id on contract repository::set_contract_directive_fields(pool, contract.id, Some(directive.id), false) .await .map_err(|e| format!("Failed to set contract directive fields: {}", e))?; // Build the task plan, prepending rework instructions if this is a rework cycle let mut task_plan = step .task_plan .clone() .unwrap_or_else(|| format!("Execute step: {}", step.name)); if let Some(eval_id) = step.last_evaluation_id { if let Ok(Some(evaluation)) = repository::get_directive_evaluation(pool, eval_id).await { if let Some(ref rework) = evaluation.rework_instructions { task_plan = format!( "IMPORTANT — REWORK REQUIRED (attempt #{}):\n\ The previous attempt was evaluated and did NOT pass.\n\ Feedback: {}\n\ Rework instructions: {}\n\n\ ---\n\n\ Original task plan:\n{}", step.rework_count + 1, evaluation.summary_feedback, rework, task_plan, ); } } } // Create supervisor task let supervisor_task = repository::create_task_for_owner( pool, owner_id, CreateTaskRequest { contract_id: Some(contract.id), name: format!("{} Supervisor", step.name), description: step.description.clone(), plan: task_plan, parent_task_id: None, is_supervisor: true, priority: 5, repository_url: directive.repository_url.clone(), base_branch: directive.base_branch.clone(), target_branch: None, merge_mode: None, target_repo_path: directive.local_path.clone(), completion_action: None, continue_from_task_id: None, copy_files: None, checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, supervisor_worktree_task_id: None, }, ) .await .map_err(|e| format!("Failed to create step supervisor task: {}", e))?; // Link supervisor to contract repository::update_contract_for_owner( pool, contract.id, owner_id, UpdateContractRequest { supervisor_task_id: Some(supervisor_task.id), ..Default::default() }, ) .await .map_err(|e| match e { crate::db::repository::RepositoryError::Database(e) => { format!("Failed to link supervisor to step contract: {}", e) } other => format!("Failed to link supervisor to step contract: {:?}", other), })?; // Link step to contract/task repository::update_step_contract(pool, step.id, contract.id, supervisor_task.id) .await .map_err(|e| format!("Failed to update step contract link: {}", e))?; // Copy repo config from directive to step contract if let Some(ref repo_url) = directive.repository_url { let _ = repository::add_remote_repository( pool, contract.id, "directive-repo", repo_url, true, ) .await; } else if let Some(ref local_path) = directive.local_path { let _ = repository::add_local_repository( pool, contract.id, "directive-repo", local_path, true, ) .await; } tracing::info!( directive_id = %directive.id, step_id = %step.id, step_name = %step.name, contract_id = %contract.id, task_id = %supervisor_task.id, "Step dispatched" ); // Dispatch step task to an available daemon immediately dispatch_task_to_daemon( pool, state, &supervisor_task, contract.local_only, contract.auto_merge_local, owner_id, ).await?; Ok(()) } /// Build the planning supervisor prompt from a directive. fn build_planning_prompt(directive: &Directive) -> String { format!( r#"You are planning the execution of a directive. DIRECTIVE: {title} GOAL: {goal} REQUIREMENTS: {requirements} ACCEPTANCE CRITERIA: {acceptance_criteria} CONSTRAINTS: {constraints} Your job is to decompose this goal into a sequence of executable steps. Each step will become a separate contract with its own supervisor. The JSON format: {{ "steps": [ {{ "name": "Step name", "description": "What this step accomplishes", "task_plan": "Detailed instructions for the step's supervisor", "depends_on": [] }} ] }} Rules: - Steps with no dependencies (empty depends_on array) will run in parallel. - Steps that depend on other steps will wait until those complete. - The depends_on array contains names of steps this step depends on. - Each step should be a self-contained unit of work. - Be specific in task_plan — include file paths, function names, and acceptance criteria where possible. - Keep the number of steps reasonable (3-10 typically). Submit your plan by piping the JSON to stdin: echo '' | makima directive submit-plan --directive-id {directive_id} After submitting the plan, mark the contract as complete: makima supervisor complete"#, title = directive.title, goal = directive.goal, requirements = serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(), acceptance_criteria = serde_json::to_string_pretty(&directive.acceptance_criteria).unwrap_or_default(), constraints = serde_json::to_string_pretty(&directive.constraints).unwrap_or_default(), directive_id = directive.id, ) } /// Extract JSON from file body elements. fn extract_plan_json(body: &[crate::db::models::BodyElement]) -> Option { use crate::db::models::BodyElement; for element in body { match element { BodyElement::Code { content, .. } => { // Try to parse as JSON let trimmed = content.trim(); if trimmed.starts_with('{') || trimmed.starts_with('[') { if serde_json::from_str::(trimmed).is_ok() { return Some(trimmed.to_string()); } } } BodyElement::Paragraph { text } => { let trimmed = text.trim(); if trimmed.starts_with('{') || trimmed.starts_with('[') { if serde_json::from_str::(trimmed).is_ok() { return Some(trimmed.to_string()); } } } BodyElement::Markdown { content } => { // Try to find JSON in markdown content let trimmed = content.trim(); if trimmed.starts_with('{') || trimmed.starts_with('[') { if serde_json::from_str::(trimmed).is_ok() { return Some(trimmed.to_string()); } } // Try to find JSON in code blocks within markdown if let Some(json_start) = trimmed.find("```json") { let after = &trimmed[json_start + 7..]; if let Some(json_end) = after.find("```") { let json_str = after[..json_end].trim(); if serde_json::from_str::(json_str).is_ok() { return Some(json_str.to_string()); } } } } _ => {} } } // Fallback: concatenate all text content and try to find JSON let all_text: String = body .iter() .map(|el| match el { BodyElement::Code { content, .. } => content.clone(), BodyElement::Paragraph { text } => text.clone(), BodyElement::Markdown { content } => content.clone(), _ => String::new(), }) .collect::>() .join("\n"); let trimmed = all_text.trim(); if let Some(start) = trimmed.find('{') { // Find matching closing brace let substr = &trimmed[start..]; if serde_json::from_str::(substr).is_ok() { return Some(substr.to_string()); } } None } /// Dispatch a monitoring contract to evaluate a completed step. async fn dispatch_monitoring( pool: &PgPool, state: &SharedState, directive: &Directive, step: &ChainStep, step_contract: &crate::db::models::Contract, owner_id: Uuid, ) -> Result<(), String> { // Create monitoring contract let contract = repository::create_contract_for_owner( pool, owner_id, CreateContractRequest { name: format!("{} - Monitor", step.name), description: Some(format!("Monitoring evaluation for step: {}", step.name)), contract_type: Some("monitoring".to_string()), template_id: None, initial_phase: Some("plan".to_string()), autonomous_loop: Some(true), phase_guard: None, local_only: Some(true), auto_merge_local: None, }, ) .await .map_err(|e| format!("Failed to create monitoring contract: {}", e))?; // Mark contract as directive-related (not orchestrator) repository::set_contract_directive_fields(pool, contract.id, Some(directive.id), false) .await .map_err(|e| format!("Failed to set monitoring contract directive fields: {}", e))?; // Build evaluation prompt let prompt = build_monitoring_prompt(directive, step, step_contract); // Create monitoring task (NOT a supervisor — regular task that exits when done, // which triggers on_task_completed → on_monitoring_completed automatically) let supervisor_task = repository::create_task_for_owner( pool, owner_id, CreateTaskRequest { contract_id: Some(contract.id), name: format!("{} - Evaluator", step.name), description: Some("Evaluate step output against directive criteria".to_string()), plan: prompt, parent_task_id: None, is_supervisor: false, priority: 8, repository_url: directive.repository_url.clone(), base_branch: directive.base_branch.clone(), target_branch: None, merge_mode: None, target_repo_path: directive.local_path.clone(), completion_action: None, continue_from_task_id: None, copy_files: None, checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, supervisor_worktree_task_id: None, }, ) .await .map_err(|e| format!("Failed to create monitoring task: {}", e))?; // Link monitoring task to contract repository::update_contract_for_owner( pool, contract.id, owner_id, UpdateContractRequest { supervisor_task_id: Some(supervisor_task.id), ..Default::default() }, ) .await .map_err(|e| match e { crate::db::repository::RepositoryError::Database(e) => { format!("Failed to link task to monitoring contract: {}", e) } other => format!("Failed to link task to monitoring contract: {:?}", other), })?; // Link step to monitoring contract/task repository::update_step_monitoring_contract(pool, step.id, contract.id, supervisor_task.id) .await .map_err(|e| format!("Failed to update step monitoring contract link: {}", e))?; // Copy repo config from directive to monitoring contract if let Some(ref repo_url) = directive.repository_url { let _ = repository::add_remote_repository( pool, contract.id, "directive-repo", repo_url, true, ) .await; } else if let Some(ref local_path) = directive.local_path { let _ = repository::add_local_repository( pool, contract.id, "directive-repo", local_path, true, ) .await; } tracing::info!( directive_id = %directive.id, step_id = %step.id, step_name = %step.name, monitoring_contract_id = %contract.id, monitoring_task_id = %supervisor_task.id, "Monitoring evaluation dispatched" ); // Dispatch monitoring task to an available daemon immediately dispatch_task_to_daemon( pool, state, &supervisor_task, contract.local_only, contract.auto_merge_local, owner_id, ).await?; Ok(()) } /// Build the monitoring supervisor prompt. fn build_monitoring_prompt( directive: &Directive, step: &ChainStep, step_contract: &crate::db::models::Contract, ) -> String { format!( r#"You are evaluating the output of a completed step in a directive chain. DIRECTIVE: {title} GOAL: {goal} REQUIREMENTS: {requirements} ACCEPTANCE CRITERIA: {acceptance_criteria} CONSTRAINTS: {constraints} STEP: {step_name} STEP DESCRIPTION: {step_description} STEP TASK PLAN: {task_plan} STEP CONTRACT ID: {step_contract_id} CONFIDENCE THRESHOLDS: - Green (pass): >= {threshold_green} - Yellow (marginal): >= {threshold_yellow} - Red (fail): < {threshold_yellow} INSTRUCTIONS: 1. Read the step contract's files to understand what was delivered: makima contract files --contract-id {step_contract_id} makima contract file --contract-id {step_contract_id} 2. Evaluate whether the step's output meets the directive's requirements and the step's task plan. 3. Write your evaluation as a JSON file to this monitoring contract. Create a file called evaluation.json with the JSON content first, then upload it: cat > /tmp/eval-result.json << 'EVALEOF' {{ "passed": true, "overallScore": 0.85, "confidenceLevel": "green", "criteriaResults": [ {{ "criterion": "Example criterion", "passed": true, "score": 0.9, "evidence": "What was found" }} ], "summaryFeedback": "Summary of evaluation", "reworkInstructions": null }} EVALEOF makima contract create-file evaluation-result < /tmp/eval-result.json Replace the example values with your actual evaluation results. Scoring guidelines: - overallScore >= {threshold_green}: confidenceLevel = "green", passed = true - overallScore >= {threshold_yellow} and < {threshold_green}: confidenceLevel = "yellow", use judgment - overallScore < {threshold_yellow}: confidenceLevel = "red", passed = false - Be specific in reworkInstructions if the step fails — the step will be re-executed with these instructions. - Set reworkInstructions to null if the step passed. You are done after writing the evaluation file."#, title = directive.title, goal = directive.goal, requirements = serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(), acceptance_criteria = serde_json::to_string_pretty(&directive.acceptance_criteria).unwrap_or_default(), constraints = serde_json::to_string_pretty(&directive.constraints).unwrap_or_default(), step_name = step.name, step_description = step.description.as_deref().unwrap_or("N/A"), task_plan = step.task_plan.as_deref().unwrap_or("N/A"), step_contract_id = step_contract.id, threshold_green = directive.confidence_threshold_green, threshold_yellow = directive.confidence_threshold_yellow, ) } /// Handle monitoring contract task completion — parse evaluation and decide step outcome. async fn on_monitoring_completed( pool: &PgPool, state: &SharedState, contract: &crate::db::models::Contract, step: &ChainStep, task: &Task, owner_id: Uuid, ) -> Result<(), String> { let Some(directive_id) = contract.directive_id else { return Ok(()); }; let directive = repository::get_directive(pool, directive_id) .await .map_err(|e| format!("Failed to get directive: {}", e))? .ok_or("Directive not found")?; // If monitoring task itself failed, fail-open: mark step as passed if task.status == "failed" { tracing::warn!( directive_id = %directive_id, step_id = %step.id, "Monitoring task failed, fail-open: marking step as passed" ); repository::update_step_status(pool, step.id, "passed") .await .map_err(|e| format!("Failed to update step status: {}", e))?; let _ = repository::increment_chain_completed_steps(pool, step.chain_id).await; let _ = repository::create_directive_event( pool, directive_id, directive.current_chain_id, Some(step.id), "monitoring_failed_open", "warn", None, "system", None, ) .await; return advance_chain(pool, state, &directive, owner_id).await; } if task.status != "done" { return Ok(()); } process_monitoring_result(pool, state, contract, step, owner_id).await } /// Core monitoring logic: read evaluation from files, create record, handle pass/fail/rework. /// Called from both `on_monitoring_completed` (task path) and `on_contract_completed` (API path). async fn process_monitoring_result( pool: &PgPool, state: &SharedState, contract: &crate::db::models::Contract, step: &ChainStep, owner_id: Uuid, ) -> Result<(), String> { let Some(directive_id) = contract.directive_id else { return Ok(()); }; // Idempotency guard: re-fetch step and only process if still "evaluating". let current_step = repository::get_chain_step(pool, step.id) .await .map_err(|e| format!("Failed to re-fetch step: {}", e))?; if let Some(ref s) = current_step { if s.status != "evaluating" { tracing::info!( step_id = %step.id, status = %s.status, "Skipping process_monitoring_result: step no longer in evaluating status" ); return Ok(()); } } let directive = repository::get_directive(pool, directive_id) .await .map_err(|e| format!("Failed to get directive: {}", e))? .ok_or("Directive not found")?; // Read evaluation result from monitoring contract files let files = repository::list_files_in_contract(pool, contract.id, owner_id) .await .map_err(|e| format!("Failed to list monitoring contract files: {}", e))?; let eval_file = files.iter().find(|f| { let name_lower = f.name.to_lowercase(); name_lower.contains("evaluation") || name_lower.contains("eval") }); let eval_file = eval_file.or_else(|| files.first()); let monitoring_result = if let Some(eval_file) = eval_file { let full_file = repository::get_file(pool, eval_file.id) .await .map_err(|e| format!("Failed to get evaluation file: {}", e))?; if let Some(full_file) = full_file { let json_str = extract_plan_json(&full_file.body); json_str.and_then(|s| serde_json::from_str::(&s).ok()) } else { None } } else { None }; // If we couldn't parse the result, fail-open let Some(result) = monitoring_result else { tracing::warn!( directive_id = %directive_id, step_id = %step.id, "Could not parse monitoring result, fail-open: marking step as passed" ); repository::update_step_status(pool, step.id, "passed") .await .map_err(|e| format!("Failed to update step status: {}", e))?; let _ = repository::increment_chain_completed_steps(pool, step.chain_id).await; let _ = repository::create_directive_event( pool, directive_id, directive.current_chain_id, Some(step.id), "monitoring_parse_failed_open", "warn", None, "system", None, ) .await; return advance_chain(pool, state, &directive, owner_id).await; }; // Create evaluation record let chain_id = directive.current_chain_id.unwrap_or(step.chain_id); let evaluation = repository::create_directive_evaluation( pool, directive_id, chain_id, step.id, contract.id, "monitoring", Some("automated"), result.passed, result.overall_score, result.confidence_level.as_deref(), &result.criteria_results, &result.summary_feedback, result.rework_instructions.as_deref(), ) .await .map_err(|e| format!("Failed to create directive evaluation: {}", e))?; // Update step evaluation fields repository::update_step_evaluation_fields( pool, step.id, result.overall_score, result.confidence_level.as_deref(), evaluation.id, ) .await .map_err(|e| format!("Failed to update step evaluation fields: {}", e))?; // Create event let event_data = serde_json::json!({ "passed": result.passed, "overallScore": result.overall_score, "confidenceLevel": result.confidence_level, "summaryFeedback": result.summary_feedback, }); let _ = repository::create_directive_event( pool, directive_id, Some(chain_id), Some(step.id), if result.passed { "step_evaluation_passed" } else { "step_evaluation_failed" }, "info", Some(&event_data), "system", None, ) .await; if result.passed { // Evaluation passed — mark step as passed tracing::info!( directive_id = %directive_id, step_id = %step.id, step_name = %step.name, score = ?result.overall_score, "Step evaluation passed" ); repository::update_step_status(pool, step.id, "passed") .await .map_err(|e| format!("Failed to update step status: {}", e))?; let _ = repository::increment_chain_completed_steps(pool, step.chain_id).await; advance_chain(pool, state, &directive, owner_id).await } else { // Evaluation failed — check rework budget let max_rework = directive.max_rework_cycles.unwrap_or(3); if step.rework_count >= max_rework { tracing::warn!( directive_id = %directive_id, step_id = %step.id, step_name = %step.name, rework_count = step.rework_count, max_rework = max_rework, "Step evaluation failed, max rework cycles exceeded" ); repository::update_step_status(pool, step.id, "failed") .await .map_err(|e| format!("Failed to update step status: {}", e))?; let _ = repository::increment_chain_failed_steps(pool, step.chain_id).await; advance_chain(pool, state, &directive, owner_id).await } else { tracing::info!( directive_id = %directive_id, step_id = %step.id, step_name = %step.name, rework_count = step.rework_count, "Step evaluation failed, scheduling rework" ); repository::increment_step_rework_count(pool, step.id) .await .map_err(|e| format!("Failed to increment rework count: {}", e))?; // Set step back to pending so advance_chain re-dispatches it repository::update_step_status(pool, step.id, "pending") .await .map_err(|e| format!("Failed to update step status: {}", e))?; advance_chain(pool, state, &directive, owner_id).await } } } /// Trigger a manual evaluation for a step. Public for use by handlers. pub async fn trigger_manual_evaluation( pool: &PgPool, state: &SharedState, owner_id: Uuid, directive_id: Uuid, step_id: Uuid, ) -> Result { let directive = repository::get_directive_for_owner(pool, directive_id, owner_id) .await .map_err(|e| format!("Failed to get directive: {}", e))? .ok_or("Directive not found")?; // Get the step — find via chain steps let chain_id = directive.current_chain_id.ok_or("Directive has no active chain")?; let steps = repository::list_steps_for_chain(pool, chain_id) .await .map_err(|e| format!("Failed to list steps: {}", e))?; let step = steps .into_iter() .find(|s| s.id == step_id) .ok_or("Step not found in current chain")?; // Step must have a contract_id (must have been executed) let contract_id = step.contract_id.ok_or("Step has no contract — it hasn't been executed yet")?; let contract = repository::get_contract_for_owner(pool, contract_id, owner_id) .await .map_err(|e| format!("Failed to get step contract: {}", e))? .ok_or("Step contract not found")?; // Set step to evaluating let updated_step = repository::update_step_status(pool, step.id, "evaluating") .await .map_err(|e| format!("Failed to update step status: {}", e))? .ok_or("Step not found after status update")?; let _ = repository::create_directive_event( pool, directive.id, directive.current_chain_id, Some(step.id), "manual_evaluation_triggered", "info", None, "user", None, ) .await; dispatch_monitoring(pool, state, &directive, &step, &contract, owner_id).await?; Ok(updated_step) }