//! Directive orchestration — init, planning completion, chain advancement. use serde::Deserialize; use sqlx::PgPool; use uuid::Uuid; use serde::Serialize; use crate::db::models::{ ChainStep, CreateContractRequest, CreateTaskRequest, Directive, Task, UpdateContractRequest, }; use crate::db::repository; use crate::server::state::SharedState; /// A single step in the chain plan produced by the planning supervisor. #[derive(Debug, Deserialize)] struct ChainPlanStep { name: String, description: String, task_plan: String, #[serde(default)] depends_on: Vec, // names of steps this depends on } /// Wrapper for the plan JSON written by the planning supervisor. #[derive(Debug, Deserialize)] struct ChainPlan { steps: Vec, } /// Result written by the monitoring supervisor after evaluating a step. #[derive(Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] struct MonitoringResult { passed: bool, overall_score: Option, confidence_level: Option, #[serde(default)] criteria_results: serde_json::Value, #[serde(default)] summary_feedback: String, rework_instructions: Option, } /// Initialize a directive: create a planning contract and transition to "planning". pub async fn init_directive( pool: &PgPool, _state: &SharedState, owner_id: Uuid, directive_id: Uuid, ) -> Result { // 1. Get directive, verify status let directive = repository::get_directive_for_owner(pool, directive_id, owner_id) .await .map_err(|e| format!("Failed to get directive: {}", e))? .ok_or("Directive not found")?; if directive.status != "draft" { return Err(format!( "Directive must be in 'draft' status to start, current status: '{}'", directive.status )); } // 2. Create planning contract let contract = repository::create_contract_for_owner( pool, owner_id, CreateContractRequest { name: format!("{} - Planning", directive.title), description: Some(format!( "Planning contract for directive: {}", directive.title )), contract_type: Some("simple".to_string()), template_id: None, initial_phase: Some("plan".to_string()), autonomous_loop: Some(true), phase_guard: None, local_only: Some(true), auto_merge_local: None, }, ) .await .map_err(|e| format!("Failed to create planning contract: {}", e))?; // 3. Mark contract as directive orchestrator repository::set_contract_directive_fields(pool, contract.id, Some(directive_id), true) .await .map_err(|e| format!("Failed to set contract directive fields: {}", e))?; // 4. Build planning prompt let planning_prompt = build_planning_prompt(&directive); // 5. Create supervisor task let supervisor_task = repository::create_task_for_owner( pool, owner_id, CreateTaskRequest { contract_id: Some(contract.id), name: format!("{} - Planner", directive.title), description: Some("Decompose directive goal into executable chain steps".to_string()), plan: planning_prompt, parent_task_id: None, is_supervisor: true, priority: 10, repository_url: directive.repository_url.clone(), base_branch: directive.base_branch.clone(), target_branch: None, merge_mode: None, target_repo_path: directive.local_path.clone(), completion_action: None, continue_from_task_id: None, copy_files: None, checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, supervisor_worktree_task_id: None, }, ) .await .map_err(|e| format!("Failed to create supervisor task: {}", e))?; // 6. Link supervisor to contract repository::update_contract_for_owner( pool, contract.id, owner_id, UpdateContractRequest { supervisor_task_id: Some(supervisor_task.id), ..Default::default() }, ) .await .map_err(|e| match e { crate::db::repository::RepositoryError::Database(e) => { format!("Failed to link supervisor to contract: {}", e) } other => format!("Failed to link supervisor to contract: {:?}", other), })?; // 7. Set orchestrator_contract_id on directive repository::set_directive_orchestrator_contract(pool, directive_id, contract.id) .await .map_err(|e| format!("Failed to set orchestrator contract: {}", e))?; // 8. Transition directive to "planning" let updated = repository::update_directive_status(pool, directive_id, "planning") .await .map_err(|e| format!("Failed to update directive status: {}", e))? .ok_or("Directive not found after status update")?; // 9. Copy repo config to contract if repository_url is set if let Some(ref repo_url) = directive.repository_url { let _ = repository::add_remote_repository( pool, contract.id, "directive-repo", repo_url, true, ) .await; } else if let Some(ref local_path) = directive.local_path { let _ = repository::add_local_repository( pool, contract.id, "directive-repo", local_path, true, ) .await; } tracing::info!( directive_id = %directive_id, contract_id = %contract.id, task_id = %supervisor_task.id, "Directive started: planning contract created" ); Ok(updated) } /// Called when any task completes — checks if it's directive-related and advances. /// Called when a contract's status is updated to "completed" via the API. /// This is the primary entry point for directive orchestration because supervisor /// tasks do not send TaskComplete messages — they complete via contract status updates. pub async fn on_contract_completed( pool: &PgPool, state: &SharedState, contract: &crate::db::models::Contract, owner_id: Uuid, ) -> Result<(), String> { if contract.status != "completed" { return Ok(()); } if contract.is_directive_orchestrator { let directive = repository::get_directive_by_orchestrator_contract(pool, contract.id) .await .map_err(|e| format!("Failed to get directive by orchestrator: {}", e))?; if let Some(directive) = directive { tracing::info!( directive_id = %directive.id, contract_id = %contract.id, "Directive orchestrator contract completed, processing plan" ); process_planning_result(pool, state, &directive, contract.id, owner_id).await?; } else { tracing::warn!( contract_id = %contract.id, "Directive orchestrator contract completed but no directive found" ); } } else if let Some(directive_id) = contract.directive_id { // Check if this is a monitoring contract let monitoring_step = repository::get_step_by_monitoring_contract_id(pool, contract.id) .await .map_err(|e| format!("Failed to check monitoring contract: {}", e))?; if let Some(step) = monitoring_step { tracing::info!( directive_id = %directive_id, step_id = %step.id, contract_id = %contract.id, "Monitoring contract completed" ); process_monitoring_result(pool, state, contract, &step, owner_id).await?; } else { // Step contract completed let step = repository::get_step_by_contract_id(pool, contract.id) .await .map_err(|e| format!("Failed to get step by contract: {}", e))?; if let Some(step) = step { let directive = repository::get_directive(pool, directive_id) .await .map_err(|e| format!("Failed to get directive: {}", e))? .ok_or("Directive not found")?; tracing::info!( directive_id = %directive_id, step_id = %step.id, contract_id = %contract.id, "Step contract completed, dispatching monitoring" ); // Step contract completed successfully — dispatch monitoring repository::update_step_status(pool, step.id, "evaluating") .await .map_err(|e| format!("Failed to update step status: {}", e))?; let _ = repository::create_directive_event( pool, directive.id, directive.current_chain_id, Some(step.id), "step_evaluating", "info", None, "system", None, ) .await; dispatch_monitoring(pool, &directive, &step, contract, owner_id).await?; } } } Ok(()) } pub async fn on_task_completed( pool: &PgPool, state: &SharedState, task: &Task, owner_id: Uuid, ) -> Result<(), String> { let Some(contract_id) = task.contract_id else { return Ok(()); }; let contract = repository::get_contract_for_owner(pool, contract_id, owner_id) .await .map_err(|e| format!("Failed to get contract: {}", e))?; let Some(contract) = contract else { return Ok(()); }; if contract.is_directive_orchestrator { // This is a planning contract completion let directive = repository::get_directive_by_orchestrator_contract(pool, contract_id) .await .map_err(|e| format!("Failed to get directive by orchestrator: {}", e))?; if let Some(directive) = directive { on_planning_completed(pool, state, &directive, task, owner_id).await?; } } else if contract.directive_id.is_some() { // Check if this is a monitoring contract completion let monitoring_step = repository::get_step_by_monitoring_contract_id(pool, contract_id) .await .map_err(|e| format!("Failed to check monitoring contract: {}", e))?; if let Some(step) = monitoring_step { on_monitoring_completed(pool, state, &contract, &step, task, owner_id).await?; } else { // This is a step contract completion on_step_completed(pool, state, &contract, task, owner_id).await?; } } Ok(()) } /// Handle planning task completion: parse chain plan, create steps, advance. async fn on_planning_completed( pool: &PgPool, state: &SharedState, directive: &Directive, task: &Task, owner_id: Uuid, ) -> Result<(), String> { // If task failed, fail the directive if task.status == "failed" { tracing::warn!( directive_id = %directive.id, task_id = %task.id, "Planning task failed, marking directive as failed" ); repository::update_directive_status(pool, directive.id, "failed") .await .map_err(|e| format!("Failed to update directive status: {}", e))?; return Ok(()); } // Only process when the supervisor task itself is done if task.status != "done" || !task.is_supervisor { return Ok(()); } let Some(contract_id) = task.contract_id else { return Ok(()); }; process_planning_result(pool, state, directive, contract_id, owner_id).await } /// Core logic: read plan from contract files, create chain and steps, advance. /// Called from both `on_planning_completed` (task path) and `on_contract_completed` (API path). async fn process_planning_result( pool: &PgPool, state: &SharedState, directive: &Directive, contract_id: Uuid, owner_id: Uuid, ) -> Result<(), String> { // Get contract files to find the chain plan let files = repository::list_files_in_contract(pool, contract_id, owner_id) .await .map_err(|e| format!("Failed to list contract files: {}", e))?; // Find the chain plan file let plan_file = files.iter().find(|f| { let name_lower = f.name.to_lowercase(); name_lower.contains("chain") || name_lower.contains("plan") }); let plan_file = plan_file.or_else(|| files.first()); let Some(plan_file) = plan_file else { tracing::warn!( directive_id = %directive.id, "No plan file found in planning contract, marking directive failed" ); repository::update_directive_status(pool, directive.id, "failed") .await .map_err(|e| format!("Failed to update directive status: {}", e))?; return Ok(()); }; // Read the full file to get the body content let full_file = repository::get_file(pool, plan_file.id) .await .map_err(|e| format!("Failed to get plan file: {}", e))? .ok_or("Plan file not found")?; // Extract JSON from the file body elements let plan_json = extract_plan_json(&full_file.body); let Some(plan_json) = plan_json else { tracing::warn!( directive_id = %directive.id, "Could not extract chain plan JSON from file body, marking directive failed" ); repository::update_directive_status(pool, directive.id, "failed") .await .map_err(|e| format!("Failed to update directive status: {}", e))?; return Ok(()); }; let chain_plan: ChainPlan = serde_json::from_str(&plan_json).map_err(|e| { format!("Failed to parse chain plan JSON: {}", e) })?; if chain_plan.steps.is_empty() { tracing::warn!( directive_id = %directive.id, "Chain plan has no steps, marking directive failed" ); repository::update_directive_status(pool, directive.id, "failed") .await .map_err(|e| format!("Failed to update directive status: {}", e))?; return Ok(()); } // Create chain let chain = repository::create_directive_chain( pool, directive.id, &format!("{} - Chain", directive.title), Some("Auto-generated from planning"), None, chain_plan.steps.len() as i32, ) .await .map_err(|e| format!("Failed to create directive chain: {}", e))?; // Create steps (two passes: first create all, then resolve dependencies) let mut step_ids: Vec<(String, Uuid)> = Vec::new(); for (i, plan_step) in chain_plan.steps.iter().enumerate() { let step = repository::create_chain_step( pool, chain.id, &plan_step.name, Some(&plan_step.description), "task", "simple", Some("plan"), Some(&plan_step.task_plan), None, // dependencies set in second pass i as i32, ) .await .map_err(|e| format!("Failed to create chain step: {}", e))?; step_ids.push((plan_step.name.clone(), step.id)); } // Second pass: resolve name-based dependencies to UUIDs and update for (i, plan_step) in chain_plan.steps.iter().enumerate() { if plan_step.depends_on.is_empty() { continue; } let dep_uuids: Vec = plan_step .depends_on .iter() .filter_map(|dep_name| { step_ids .iter() .find(|(name, _)| name == dep_name) .map(|(_, id)| *id) }) .collect(); if !dep_uuids.is_empty() { let step_id = step_ids[i].1; sqlx::query( "UPDATE chain_steps SET depends_on = $2 WHERE id = $1", ) .bind(step_id) .bind(&dep_uuids) .execute(pool) .await .map_err(|e| format!("Failed to update step dependencies: {}", e))?; } } // Set current chain on directive repository::set_directive_current_chain(pool, directive.id, chain.id) .await .map_err(|e| format!("Failed to set current chain: {}", e))?; // Transition directive to active let updated_directive = repository::update_directive_status(pool, directive.id, "active") .await .map_err(|e| format!("Failed to update directive status: {}", e))? .ok_or("Directive not found after status update")?; tracing::info!( directive_id = %directive.id, chain_id = %chain.id, step_count = chain_plan.steps.len(), "Chain plan created, advancing chain" ); // Advance chain to dispatch ready steps advance_chain(pool, state, &updated_directive, owner_id).await } /// Handle a step contract task completion. async fn on_step_completed( pool: &PgPool, state: &SharedState, contract: &crate::db::models::Contract, task: &Task, owner_id: Uuid, ) -> Result<(), String> { // Only process supervisor task completions if !task.is_supervisor { return Ok(()); } let Some(directive_id) = contract.directive_id else { return Ok(()); }; // Find the step linked to this contract let step = repository::get_step_by_contract_id(pool, contract.id) .await .map_err(|e| format!("Failed to get step by contract: {}", e))?; let Some(step) = step else { return Ok(()); }; // Get the directive for threshold info let directive = repository::get_directive(pool, directive_id) .await .map_err(|e| format!("Failed to get directive: {}", e))? .ok_or("Directive not found")?; if task.status == "done" { // Step task succeeded — dispatch monitoring evaluation repository::update_step_status(pool, step.id, "evaluating") .await .map_err(|e| format!("Failed to update step status: {}", e))?; let _ = repository::create_directive_event( pool, directive.id, directive.current_chain_id, Some(step.id), "step_evaluating", "info", None, "system", None, ) .await; tracing::info!( directive_id = %directive_id, step_id = %step.id, step_name = %step.name, "Step task done, dispatching monitoring evaluation" ); dispatch_monitoring(pool, &directive, &step, contract, owner_id).await } else { // Step task failed — mark step failed and advance repository::update_step_status(pool, step.id, "failed") .await .map_err(|e| format!("Failed to update step status: {}", e))?; tracing::info!( directive_id = %directive_id, step_id = %step.id, step_name = %step.name, "Step failed" ); advance_chain(pool, state, &directive, owner_id).await } } /// Check chain progress and dispatch ready steps or mark directive complete. async fn advance_chain( pool: &PgPool, _state: &SharedState, directive: &Directive, owner_id: Uuid, ) -> Result<(), String> { let Some(chain_id) = directive.current_chain_id else { return Ok(()); }; let steps = repository::list_steps_for_chain(pool, chain_id) .await .map_err(|e| format!("Failed to list steps: {}", e))?; // Check if all steps passed let all_passed = steps.iter().all(|s| s.status == "passed"); if all_passed && !steps.is_empty() { repository::update_chain_status(pool, chain_id, "completed") .await .map_err(|e| format!("Failed to update chain status: {}", e))?; repository::update_directive_status(pool, directive.id, "completed") .await .map_err(|e| format!("Failed to update directive status: {}", e))?; tracing::info!(directive_id = %directive.id, "Directive completed: all steps passed"); return Ok(()); } // Check if any step failed let any_failed = steps.iter().any(|s| s.status == "failed"); if any_failed { repository::update_chain_status(pool, chain_id, "failed") .await .map_err(|e| format!("Failed to update chain status: {}", e))?; repository::update_directive_status(pool, directive.id, "failed") .await .map_err(|e| format!("Failed to update directive status: {}", e))?; tracing::info!(directive_id = %directive.id, "Directive failed: step failure detected"); return Ok(()); } // Find and dispatch ready steps let ready_steps = repository::find_ready_steps(pool, chain_id) .await .map_err(|e| format!("Failed to find ready steps: {}", e))?; for step in ready_steps { if let Err(e) = dispatch_step(pool, directive, &step, owner_id).await { tracing::error!( step_id = %step.id, step_name = %step.name, error = %e, "Failed to dispatch step" ); } } Ok(()) } /// Dispatch a single chain step as a new contract with supervisor. async fn dispatch_step( pool: &PgPool, directive: &Directive, step: &crate::db::models::ChainStep, owner_id: Uuid, ) -> Result<(), String> { // Mark step as running repository::update_step_status(pool, step.id, "running") .await .map_err(|e| format!("Failed to update step status: {}", e))?; // Create contract for this step let contract = repository::create_contract_for_owner( pool, owner_id, CreateContractRequest { name: step.name.clone(), description: step.description.clone(), contract_type: Some(step.contract_type.clone()), template_id: None, initial_phase: step.initial_phase.clone(), autonomous_loop: Some(true), phase_guard: None, local_only: Some(true), auto_merge_local: None, }, ) .await .map_err(|e| format!("Failed to create step contract: {}", e))?; // Set directive_id on contract repository::set_contract_directive_fields(pool, contract.id, Some(directive.id), false) .await .map_err(|e| format!("Failed to set contract directive fields: {}", e))?; // Build the task plan let task_plan = step .task_plan .clone() .unwrap_or_else(|| format!("Execute step: {}", step.name)); // Create supervisor task let supervisor_task = repository::create_task_for_owner( pool, owner_id, CreateTaskRequest { contract_id: Some(contract.id), name: format!("{} Supervisor", step.name), description: step.description.clone(), plan: task_plan, parent_task_id: None, is_supervisor: true, priority: 5, repository_url: directive.repository_url.clone(), base_branch: directive.base_branch.clone(), target_branch: None, merge_mode: None, target_repo_path: directive.local_path.clone(), completion_action: None, continue_from_task_id: None, copy_files: None, checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, supervisor_worktree_task_id: None, }, ) .await .map_err(|e| format!("Failed to create step supervisor task: {}", e))?; // Link supervisor to contract repository::update_contract_for_owner( pool, contract.id, owner_id, UpdateContractRequest { supervisor_task_id: Some(supervisor_task.id), ..Default::default() }, ) .await .map_err(|e| match e { crate::db::repository::RepositoryError::Database(e) => { format!("Failed to link supervisor to step contract: {}", e) } other => format!("Failed to link supervisor to step contract: {:?}", other), })?; // Link step to contract/task repository::update_step_contract(pool, step.id, contract.id, supervisor_task.id) .await .map_err(|e| format!("Failed to update step contract link: {}", e))?; // Copy repo config from directive to step contract if let Some(ref repo_url) = directive.repository_url { let _ = repository::add_remote_repository( pool, contract.id, "directive-repo", repo_url, true, ) .await; } else if let Some(ref local_path) = directive.local_path { let _ = repository::add_local_repository( pool, contract.id, "directive-repo", local_path, true, ) .await; } tracing::info!( directive_id = %directive.id, step_id = %step.id, step_name = %step.name, contract_id = %contract.id, task_id = %supervisor_task.id, "Step dispatched" ); Ok(()) } /// Build the planning supervisor prompt from a directive. fn build_planning_prompt(directive: &Directive) -> String { format!( r#"You are planning the execution of a directive. DIRECTIVE: {} GOAL: {} REQUIREMENTS: {} ACCEPTANCE CRITERIA: {} CONSTRAINTS: {} Your job is to decompose this goal into a sequence of executable steps. Each step will become a separate contract with its own supervisor. Write a JSON plan to a contract file named "chain-plan" using: makima contract create-file "chain-plan" < plan.json The JSON format: {{ "steps": [ {{ "name": "Step name", "description": "What this step accomplishes", "task_plan": "Detailed instructions for the step's supervisor", "depends_on": [] }} ] }} Rules: - Steps with no dependencies (empty depends_on array) will run in parallel. - Steps that depend on other steps will wait until those complete. - The depends_on array contains names of steps this step depends on. - Each step should be a self-contained unit of work. - Be specific in task_plan — include file paths, function names, and acceptance criteria where possible. - Keep the number of steps reasonable (3-10 typically). After writing the plan file, mark the contract as complete using: makima supervisor complete"#, directive.title, directive.goal, serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(), serde_json::to_string_pretty(&directive.acceptance_criteria).unwrap_or_default(), serde_json::to_string_pretty(&directive.constraints).unwrap_or_default(), ) } /// Extract JSON from file body elements. fn extract_plan_json(body: &[crate::db::models::BodyElement]) -> Option { use crate::db::models::BodyElement; for element in body { match element { BodyElement::Code { content, .. } => { // Try to parse as JSON let trimmed = content.trim(); if trimmed.starts_with('{') || trimmed.starts_with('[') { if serde_json::from_str::(trimmed).is_ok() { return Some(trimmed.to_string()); } } } BodyElement::Paragraph { text } => { let trimmed = text.trim(); if trimmed.starts_with('{') || trimmed.starts_with('[') { if serde_json::from_str::(trimmed).is_ok() { return Some(trimmed.to_string()); } } } BodyElement::Markdown { content } => { // Try to find JSON in markdown content let trimmed = content.trim(); if trimmed.starts_with('{') || trimmed.starts_with('[') { if serde_json::from_str::(trimmed).is_ok() { return Some(trimmed.to_string()); } } // Try to find JSON in code blocks within markdown if let Some(json_start) = trimmed.find("```json") { let after = &trimmed[json_start + 7..]; if let Some(json_end) = after.find("```") { let json_str = after[..json_end].trim(); if serde_json::from_str::(json_str).is_ok() { return Some(json_str.to_string()); } } } } _ => {} } } // Fallback: concatenate all text content and try to find JSON let all_text: String = body .iter() .map(|el| match el { BodyElement::Code { content, .. } => content.clone(), BodyElement::Paragraph { text } => text.clone(), BodyElement::Markdown { content } => content.clone(), _ => String::new(), }) .collect::>() .join("\n"); let trimmed = all_text.trim(); if let Some(start) = trimmed.find('{') { // Find matching closing brace let substr = &trimmed[start..]; if serde_json::from_str::(substr).is_ok() { return Some(substr.to_string()); } } None } /// Dispatch a monitoring contract to evaluate a completed step. async fn dispatch_monitoring( pool: &PgPool, directive: &Directive, step: &ChainStep, step_contract: &crate::db::models::Contract, owner_id: Uuid, ) -> Result<(), String> { // Create monitoring contract let contract = repository::create_contract_for_owner( pool, owner_id, CreateContractRequest { name: format!("{} - Monitor", step.name), description: Some(format!("Monitoring evaluation for step: {}", step.name)), contract_type: Some("monitoring".to_string()), template_id: None, initial_phase: Some("plan".to_string()), autonomous_loop: Some(true), phase_guard: None, local_only: Some(true), auto_merge_local: None, }, ) .await .map_err(|e| format!("Failed to create monitoring contract: {}", e))?; // Mark contract as directive-related (not orchestrator) repository::set_contract_directive_fields(pool, contract.id, Some(directive.id), false) .await .map_err(|e| format!("Failed to set monitoring contract directive fields: {}", e))?; // Build evaluation prompt let prompt = build_monitoring_prompt(directive, step, step_contract); // Create supervisor task let supervisor_task = repository::create_task_for_owner( pool, owner_id, CreateTaskRequest { contract_id: Some(contract.id), name: format!("{} - Evaluator", step.name), description: Some("Evaluate step output against directive criteria".to_string()), plan: prompt, parent_task_id: None, is_supervisor: true, priority: 8, repository_url: directive.repository_url.clone(), base_branch: directive.base_branch.clone(), target_branch: None, merge_mode: None, target_repo_path: directive.local_path.clone(), completion_action: None, continue_from_task_id: None, copy_files: None, checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, supervisor_worktree_task_id: None, }, ) .await .map_err(|e| format!("Failed to create monitoring supervisor task: {}", e))?; // Link supervisor to contract repository::update_contract_for_owner( pool, contract.id, owner_id, UpdateContractRequest { supervisor_task_id: Some(supervisor_task.id), ..Default::default() }, ) .await .map_err(|e| match e { crate::db::repository::RepositoryError::Database(e) => { format!("Failed to link supervisor to monitoring contract: {}", e) } other => format!("Failed to link supervisor to monitoring contract: {:?}", other), })?; // Link step to monitoring contract/task repository::update_step_monitoring_contract(pool, step.id, contract.id, supervisor_task.id) .await .map_err(|e| format!("Failed to update step monitoring contract link: {}", e))?; // Copy repo config from directive to monitoring contract if let Some(ref repo_url) = directive.repository_url { let _ = repository::add_remote_repository( pool, contract.id, "directive-repo", repo_url, true, ) .await; } else if let Some(ref local_path) = directive.local_path { let _ = repository::add_local_repository( pool, contract.id, "directive-repo", local_path, true, ) .await; } tracing::info!( directive_id = %directive.id, step_id = %step.id, step_name = %step.name, monitoring_contract_id = %contract.id, monitoring_task_id = %supervisor_task.id, "Monitoring evaluation dispatched" ); Ok(()) } /// Build the monitoring supervisor prompt. fn build_monitoring_prompt( directive: &Directive, step: &ChainStep, step_contract: &crate::db::models::Contract, ) -> String { format!( r#"You are evaluating the output of a completed step in a directive chain. DIRECTIVE: {title} GOAL: {goal} REQUIREMENTS: {requirements} ACCEPTANCE CRITERIA: {acceptance_criteria} CONSTRAINTS: {constraints} STEP: {step_name} STEP DESCRIPTION: {step_description} STEP TASK PLAN: {task_plan} STEP CONTRACT ID: {step_contract_id} CONFIDENCE THRESHOLDS: - Green (pass): >= {threshold_green} - Yellow (marginal): >= {threshold_yellow} - Red (fail): < {threshold_yellow} Your job: 1. Read the step contract's files to understand what was delivered: makima contract files --contract-id {step_contract_id} makima contract file --contract-id {step_contract_id} 2. Evaluate whether the step's output meets the directive's requirements and the step's specific task plan. 3. Write your evaluation result as a JSON file named "evaluation-result" to this contract: makima contract create-file "evaluation-result" < evaluation.json The JSON format: {{ "passed": true/false, "overallScore": 0.0-1.0, "confidenceLevel": "green" | "yellow" | "red", "criteriaResults": [ {{ "criterion": "Description of what was checked", "passed": true/false, "score": 0.0-1.0, "evidence": "Evidence supporting the assessment" }} ], "summaryFeedback": "Brief summary of the evaluation", "reworkInstructions": "If failed, specific instructions for rework (null if passed)" }} Scoring guidelines: - Score >= {threshold_green}: confidenceLevel = "green", passed = true - Score >= {threshold_yellow} and < {threshold_green}: confidenceLevel = "yellow", use judgment on passed - Score < {threshold_yellow}: confidenceLevel = "red", passed = false - Be specific in reworkInstructions if the step fails — the step will be re-executed with these instructions. After writing the evaluation file, mark the contract as complete: makima supervisor complete"#, title = directive.title, goal = directive.goal, requirements = serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(), acceptance_criteria = serde_json::to_string_pretty(&directive.acceptance_criteria).unwrap_or_default(), constraints = serde_json::to_string_pretty(&directive.constraints).unwrap_or_default(), step_name = step.name, step_description = step.description.as_deref().unwrap_or("N/A"), task_plan = step.task_plan.as_deref().unwrap_or("N/A"), step_contract_id = step_contract.id, threshold_green = directive.confidence_threshold_green, threshold_yellow = directive.confidence_threshold_yellow, ) } /// Handle monitoring contract task completion — parse evaluation and decide step outcome. async fn on_monitoring_completed( pool: &PgPool, state: &SharedState, contract: &crate::db::models::Contract, step: &ChainStep, task: &Task, owner_id: Uuid, ) -> Result<(), String> { // Only process supervisor task completions if !task.is_supervisor { return Ok(()); } let Some(directive_id) = contract.directive_id else { return Ok(()); }; let directive = repository::get_directive(pool, directive_id) .await .map_err(|e| format!("Failed to get directive: {}", e))? .ok_or("Directive not found")?; // If monitoring task itself failed, fail-open: mark step as passed if task.status == "failed" { tracing::warn!( directive_id = %directive_id, step_id = %step.id, "Monitoring task failed, fail-open: marking step as passed" ); repository::update_step_status(pool, step.id, "passed") .await .map_err(|e| format!("Failed to update step status: {}", e))?; let _ = repository::create_directive_event( pool, directive_id, directive.current_chain_id, Some(step.id), "monitoring_failed_open", "warn", None, "system", None, ) .await; return advance_chain(pool, state, &directive, owner_id).await; } if task.status != "done" { return Ok(()); } process_monitoring_result(pool, state, contract, step, owner_id).await } /// Core monitoring logic: read evaluation from files, create record, handle pass/fail/rework. /// Called from both `on_monitoring_completed` (task path) and `on_contract_completed` (API path). async fn process_monitoring_result( pool: &PgPool, state: &SharedState, contract: &crate::db::models::Contract, step: &ChainStep, owner_id: Uuid, ) -> Result<(), String> { let Some(directive_id) = contract.directive_id else { return Ok(()); }; let directive = repository::get_directive(pool, directive_id) .await .map_err(|e| format!("Failed to get directive: {}", e))? .ok_or("Directive not found")?; // Read evaluation result from monitoring contract files let files = repository::list_files_in_contract(pool, contract.id, owner_id) .await .map_err(|e| format!("Failed to list monitoring contract files: {}", e))?; let eval_file = files.iter().find(|f| { let name_lower = f.name.to_lowercase(); name_lower.contains("evaluation") || name_lower.contains("eval") }); let eval_file = eval_file.or_else(|| files.first()); let monitoring_result = if let Some(eval_file) = eval_file { let full_file = repository::get_file(pool, eval_file.id) .await .map_err(|e| format!("Failed to get evaluation file: {}", e))?; if let Some(full_file) = full_file { let json_str = extract_plan_json(&full_file.body); json_str.and_then(|s| serde_json::from_str::(&s).ok()) } else { None } } else { None }; // If we couldn't parse the result, fail-open let Some(result) = monitoring_result else { tracing::warn!( directive_id = %directive_id, step_id = %step.id, "Could not parse monitoring result, fail-open: marking step as passed" ); repository::update_step_status(pool, step.id, "passed") .await .map_err(|e| format!("Failed to update step status: {}", e))?; let _ = repository::create_directive_event( pool, directive_id, directive.current_chain_id, Some(step.id), "monitoring_parse_failed_open", "warn", None, "system", None, ) .await; return advance_chain(pool, state, &directive, owner_id).await; }; // Create evaluation record let chain_id = directive.current_chain_id.unwrap_or(step.chain_id); let evaluation = repository::create_directive_evaluation( pool, directive_id, chain_id, step.id, contract.id, "monitoring", Some("automated"), result.passed, result.overall_score, result.confidence_level.as_deref(), &result.criteria_results, &result.summary_feedback, result.rework_instructions.as_deref(), ) .await .map_err(|e| format!("Failed to create directive evaluation: {}", e))?; // Update step evaluation fields repository::update_step_evaluation_fields( pool, step.id, result.overall_score, result.confidence_level.as_deref(), evaluation.id, ) .await .map_err(|e| format!("Failed to update step evaluation fields: {}", e))?; // Create event let event_data = serde_json::json!({ "passed": result.passed, "overallScore": result.overall_score, "confidenceLevel": result.confidence_level, "summaryFeedback": result.summary_feedback, }); let _ = repository::create_directive_event( pool, directive_id, Some(chain_id), Some(step.id), if result.passed { "step_evaluation_passed" } else { "step_evaluation_failed" }, "info", Some(&event_data), "system", None, ) .await; if result.passed { // Evaluation passed — mark step as passed tracing::info!( directive_id = %directive_id, step_id = %step.id, step_name = %step.name, score = ?result.overall_score, "Step evaluation passed" ); repository::update_step_status(pool, step.id, "passed") .await .map_err(|e| format!("Failed to update step status: {}", e))?; advance_chain(pool, state, &directive, owner_id).await } else { // Evaluation failed — check rework budget let max_rework = directive.max_rework_cycles.unwrap_or(3); if step.rework_count >= max_rework { tracing::warn!( directive_id = %directive_id, step_id = %step.id, step_name = %step.name, rework_count = step.rework_count, max_rework = max_rework, "Step evaluation failed, max rework cycles exceeded" ); repository::update_step_status(pool, step.id, "failed") .await .map_err(|e| format!("Failed to update step status: {}", e))?; advance_chain(pool, state, &directive, owner_id).await } else { tracing::info!( directive_id = %directive_id, step_id = %step.id, step_name = %step.name, rework_count = step.rework_count, "Step evaluation failed, scheduling rework" ); repository::increment_step_rework_count(pool, step.id) .await .map_err(|e| format!("Failed to increment rework count: {}", e))?; // Set step back to pending so advance_chain re-dispatches it repository::update_step_status(pool, step.id, "pending") .await .map_err(|e| format!("Failed to update step status: {}", e))?; advance_chain(pool, state, &directive, owner_id).await } } } /// Trigger a manual evaluation for a step. Public for use by handlers. pub async fn trigger_manual_evaluation( pool: &PgPool, _state: &SharedState, owner_id: Uuid, directive_id: Uuid, step_id: Uuid, ) -> Result { let directive = repository::get_directive_for_owner(pool, directive_id, owner_id) .await .map_err(|e| format!("Failed to get directive: {}", e))? .ok_or("Directive not found")?; // Get the step — find via chain steps let chain_id = directive.current_chain_id.ok_or("Directive has no active chain")?; let steps = repository::list_steps_for_chain(pool, chain_id) .await .map_err(|e| format!("Failed to list steps: {}", e))?; let step = steps .into_iter() .find(|s| s.id == step_id) .ok_or("Step not found in current chain")?; // Step must have a contract_id (must have been executed) let contract_id = step.contract_id.ok_or("Step has no contract — it hasn't been executed yet")?; let contract = repository::get_contract_for_owner(pool, contract_id, owner_id) .await .map_err(|e| format!("Failed to get step contract: {}", e))? .ok_or("Step contract not found")?; // Set step to evaluating let updated_step = repository::update_step_status(pool, step.id, "evaluating") .await .map_err(|e| format!("Failed to update step status: {}", e))? .ok_or("Step not found after status update")?; let _ = repository::create_directive_event( pool, directive.id, directive.current_chain_id, Some(step.id), "manual_evaluation_triggered", "info", None, "user", None, ) .await; dispatch_monitoring(pool, &directive, &step, &contract, owner_id).await?; Ok(updated_step) }