//! Directive orchestration engine. //! //! Manages the lifecycle of directives: //! - Starts directives and generates initial chains //! - Monitors step execution and triggers evaluations //! - Handles rework, escalation, and chain regeneration //! - Enforces circuit breakers (cost, time, rework limits) use std::collections::HashMap; use sqlx::PgPool; use thiserror::Error; use tokio::sync::broadcast; use uuid::Uuid; use crate::db::models::{AddStepRequest, ChainStep, Directive, DirectiveEvent, UpdateStepRequest}; use crate::db::repository::{self, RepositoryError}; use super::planner::{ChainPlanner, GeneratedChain, PlannerError}; use super::verifier::{ auto_detect_verifiers, CompositeEvaluator, ConfidenceLevel, EvaluationResult, VerificationContext, }; /// Error type for engine operations. #[derive(Error, Debug)] pub enum EngineError { #[error("Database error: {0}")] Database(#[from] sqlx::Error), #[error("Repository error: {0}")] Repository(#[from] RepositoryError), #[error("Planner error: {0}")] Planner(#[from] PlannerError), #[error("Directive not found: {0}")] DirectiveNotFound(Uuid), #[error("Chain not found for directive: {0}")] ChainNotFound(Uuid), #[error("Step not found: {0}")] StepNotFound(Uuid), #[error("Invalid state transition: {from} -> {to}")] InvalidStateTransition { from: String, to: String }, #[error("Circuit breaker triggered: {0}")] CircuitBreaker(String), #[error("Directive is paused")] DirectivePaused, #[error("Contract creation failed: {0}")] ContractCreation(String), #[error("LLM error: {0}")] LlmError(String), } /// Event emitted by the engine for UI updates. #[derive(Debug, Clone)] pub enum EngineEvent { /// Directive status changed DirectiveStatusChanged { directive_id: Uuid, old_status: String, new_status: String, }, /// Step status changed StepStatusChanged { directive_id: Uuid, step_id: Uuid, old_status: String, new_status: String, }, /// Evaluation completed EvaluationCompleted { directive_id: Uuid, step_id: Uuid, passed: bool, confidence_level: ConfidenceLevel, }, /// Approval required ApprovalRequired { directive_id: Uuid, approval_id: Uuid, approval_type: String, }, /// Chain regenerated ChainRegenerated { directive_id: Uuid, old_chain_id: Uuid, new_chain_id: Uuid, }, } /// Main orchestration engine for directives. pub struct DirectiveEngine { pool: PgPool, planner: ChainPlanner, event_tx: Option>, } impl DirectiveEngine { /// Create a new directive engine. pub fn new(pool: PgPool) -> Self { Self { pool, planner: ChainPlanner::new(), event_tx: None, } } /// Set the event broadcast channel for UI updates. pub fn with_event_channel(mut self, tx: broadcast::Sender) -> Self { self.event_tx = Some(tx); self } /// Emit an event if channel is configured. fn emit_event(&self, event: EngineEvent) { if let Some(tx) = &self.event_tx { let _ = tx.send(event); } } // ======================================================================== // Directive Lifecycle // ======================================================================== /// Start a directive: generate chain and begin execution. pub async fn start_directive(&self, directive_id: Uuid) -> Result<(), EngineError> { let directive = repository::get_directive(&self.pool, directive_id) .await? .ok_or(EngineError::DirectiveNotFound(directive_id))?; // Validate current state if directive.status != "draft" && directive.status != "paused" { return Err(EngineError::InvalidStateTransition { from: directive.status, to: "planning".to_string(), }); } // Update status to planning repository::update_directive_status(&self.pool, directive_id, "planning").await?; self.emit_directive_event( directive_id, "status_changed", "info", serde_json::json!({"old_status": directive.status, "new_status": "planning"}), "system", ) .await?; // Generate chain (placeholder - actual LLM call would go here) let chain = self.generate_initial_chain(&directive).await?; // Create chain in database let db_chain = repository::create_directive_chain( &self.pool, directive_id, &chain.name, Some(&chain.description), None, // rationale None, // planning_model ) .await?; // Create steps self.create_steps_from_chain(&db_chain.id, &chain).await?; // Update directive to active repository::update_directive_status(&self.pool, directive_id, "active").await?; self.emit_event(EngineEvent::DirectiveStatusChanged { directive_id, old_status: "planning".to_string(), new_status: "active".to_string(), }); // Start ready steps self.advance_chain(directive_id).await?; Ok(()) } /// Pause a directive. pub async fn pause_directive(&self, directive_id: Uuid) -> Result<(), EngineError> { let directive = repository::get_directive(&self.pool, directive_id) .await? .ok_or(EngineError::DirectiveNotFound(directive_id))?; if directive.status != "active" { return Err(EngineError::InvalidStateTransition { from: directive.status, to: "paused".to_string(), }); } repository::update_directive_status(&self.pool, directive_id, "paused").await?; self.emit_event(EngineEvent::DirectiveStatusChanged { directive_id, old_status: "active".to_string(), new_status: "paused".to_string(), }); Ok(()) } /// Resume a paused directive. pub async fn resume_directive(&self, directive_id: Uuid) -> Result<(), EngineError> { let directive = repository::get_directive(&self.pool, directive_id) .await? .ok_or(EngineError::DirectiveNotFound(directive_id))?; if directive.status != "paused" { return Err(EngineError::InvalidStateTransition { from: directive.status, to: "active".to_string(), }); } repository::update_directive_status(&self.pool, directive_id, "active").await?; self.emit_event(EngineEvent::DirectiveStatusChanged { directive_id, old_status: "paused".to_string(), new_status: "active".to_string(), }); // Continue execution self.advance_chain(directive_id).await?; Ok(()) } /// Stop a directive (cannot be resumed). pub async fn stop_directive(&self, directive_id: Uuid) -> Result<(), EngineError> { let directive = repository::get_directive(&self.pool, directive_id) .await? .ok_or(EngineError::DirectiveNotFound(directive_id))?; if directive.status == "completed" || directive.status == "failed" { return Err(EngineError::InvalidStateTransition { from: directive.status, to: "failed".to_string(), }); } repository::update_directive_status(&self.pool, directive_id, "failed").await?; self.emit_event(EngineEvent::DirectiveStatusChanged { directive_id, old_status: directive.status, new_status: "failed".to_string(), }); Ok(()) } // ======================================================================== // Chain Management // ======================================================================== /// Generate initial chain from directive. async fn generate_initial_chain( &self, directive: &Directive, ) -> Result { // Build planning prompt let _prompt = self.planner.build_planning_prompt(directive); // TODO: Call LLM to generate chain // For now, return a simple placeholder chain let chain = GeneratedChain { name: format!("{}-chain", directive.title.to_lowercase().replace(' ', "-")), description: format!("Execution plan for: {}", directive.goal), steps: vec![ super::planner::GeneratedStep { name: "research".to_string(), step_type: "research".to_string(), description: "Research and understand the requirements".to_string(), depends_on: vec![], requirement_ids: vec![], contract_template: None, }, super::planner::GeneratedStep { name: "implement".to_string(), step_type: "implement".to_string(), description: "Implement the solution".to_string(), depends_on: vec!["research".to_string()], requirement_ids: vec![], contract_template: None, }, super::planner::GeneratedStep { name: "test".to_string(), step_type: "test".to_string(), description: "Test and verify the implementation".to_string(), depends_on: vec!["implement".to_string()], requirement_ids: vec![], contract_template: None, }, ], }; // Validate the chain self.planner.validate_chain(&chain)?; Ok(chain) } /// Create database steps from a generated chain. async fn create_steps_from_chain( &self, chain_id: &Uuid, chain: &GeneratedChain, ) -> Result<(), EngineError> { // First pass: create all steps and build name-to-id map let mut step_id_map: HashMap = HashMap::new(); // Get editor positions let positions = self.planner.compute_editor_positions(chain); for step in &chain.steps { let (editor_x, editor_y) = positions .get(&step.name) .copied() .unwrap_or((100.0, 100.0)); let task_plan = step .contract_template .as_ref() .and_then(|t| t.tasks.first()) .map(|t| t.plan.clone()) .or_else(|| Some(step.description.clone())); let request = AddStepRequest { name: step.name.clone(), description: Some(step.description.clone()), step_type: Some(step.step_type.clone()), contract_type: step.contract_template.as_ref().map(|t| t.contract_type.clone()), initial_phase: Some("plan".to_string()), task_plan, phases: step.contract_template.as_ref().map(|t| t.phases.clone()), depends_on: None, // Will update in second pass parallel_group: None, requirement_ids: Some(step.requirement_ids.clone()), acceptance_criteria_ids: None, verifier_config: None, editor_x: Some(editor_x), editor_y: Some(editor_y), }; let db_step = repository::create_chain_step(&self.pool, *chain_id, request).await?; step_id_map.insert(step.name.clone(), db_step.id); } // Second pass: update dependencies for step in &chain.steps { if step.depends_on.is_empty() { continue; } let step_id = step_id_map.get(&step.name).unwrap(); let dep_ids: Vec = step .depends_on .iter() .filter_map(|name| step_id_map.get(name)) .copied() .collect(); // Update step with proper dependencies let update = UpdateStepRequest { name: None, description: None, task_plan: None, depends_on: Some(dep_ids), requirement_ids: None, acceptance_criteria_ids: None, verifier_config: None, editor_x: None, editor_y: None, }; repository::update_chain_step(&self.pool, *step_id, update).await?; } Ok(()) } /// Regenerate chain while preserving completed steps. pub async fn regenerate_chain( &self, directive_id: Uuid, reason: &str, ) -> Result { let directive = repository::get_directive(&self.pool, directive_id) .await? .ok_or(EngineError::DirectiveNotFound(directive_id))?; let current_chain = repository::get_current_chain(&self.pool, directive_id) .await? .ok_or(EngineError::ChainNotFound(directive_id))?; // Get completed and failed steps let steps = repository::list_chain_steps(&self.pool, current_chain.id).await?; let completed_steps: Vec<_> = steps.iter().filter(|s| s.status == "passed").collect(); let failed_step = steps.iter().find(|s| s.status == "failed"); // Build replan prompt let _prompt = self.planner.build_replan_prompt( &directive, &completed_steps.iter().map(|s| (*s).clone()).collect::>(), failed_step.map(|s| &*s), reason, ); // TODO: Call LLM to regenerate chain // For now, just create a new chain with similar structure let new_chain = self.generate_initial_chain(&directive).await?; // Supersede old chain repository::supersede_chain(&self.pool, current_chain.id).await?; // Create new chain let db_chain = repository::create_directive_chain( &self.pool, directive_id, &new_chain.name, Some(&new_chain.description), Some(reason), // rationale None, // planning_model ) .await?; // Create steps self.create_steps_from_chain(&db_chain.id, &new_chain).await?; self.emit_event(EngineEvent::ChainRegenerated { directive_id, old_chain_id: current_chain.id, new_chain_id: db_chain.id, }); // Continue execution self.advance_chain(directive_id).await?; Ok(db_chain.id) } // ======================================================================== // Step Execution // ======================================================================== /// Advance chain execution: find ready steps and start them. pub async fn advance_chain(&self, directive_id: Uuid) -> Result<(), EngineError> { let directive = repository::get_directive(&self.pool, directive_id) .await? .ok_or(EngineError::DirectiveNotFound(directive_id))?; // Check if directive is active if directive.status == "paused" { return Err(EngineError::DirectivePaused); } if directive.status != "active" { return Ok(()); // Not an error, just nothing to do } // Check circuit breakers self.check_circuit_breakers(&directive).await?; // Get current chain let chain = repository::get_current_chain(&self.pool, directive_id) .await? .ok_or(EngineError::ChainNotFound(directive_id))?; // Find ready steps (dependencies met, status=pending) let ready_steps = repository::find_ready_steps(&self.pool, chain.id).await?; // Start each ready step for step in ready_steps { self.start_step(&directive, &step).await?; } // Check if chain is complete let all_steps = repository::list_chain_steps(&self.pool, chain.id).await?; let all_passed = all_steps.iter().all(|s| s.status == "passed" || s.status == "skipped"); let any_blocked = all_steps.iter().any(|s| s.status == "blocked" || s.status == "failed"); if all_passed && !all_steps.is_empty() { // Complete the directive self.complete_directive(directive_id).await?; } else if any_blocked { // Check if we should regenerate or fail let failed_count = all_steps.iter().filter(|s| s.status == "failed").count(); if failed_count > 3 { // Too many failures, fail the directive repository::update_directive_status(&self.pool, directive_id, "failed").await?; } } Ok(()) } /// Start a step by creating its contract and supervisor task. async fn start_step(&self, directive: &Directive, step: &ChainStep) -> Result<(), EngineError> { // Update step status to ready repository::update_step_status(&self.pool, step.id, "ready").await?; self.emit_event(EngineEvent::StepStatusChanged { directive_id: directive.id, step_id: step.id, old_status: "pending".to_string(), new_status: "ready".to_string(), }); // Get contract details from step template let (_name, _description, _contract_type, _initial_phase) = self.get_contract_details(directive, step); // TODO: Actually create the contract via the contracts handler // For now, just update the step status to running // In a full implementation, this would: // 1. Create contract via POST /api/v1/contracts // 2. Create supervisor task via POST /api/v1/tasks // 3. Link contract and task to step // 4. Update step status to running // Placeholder: mark step as running repository::update_step_status(&self.pool, step.id, "running").await?; self.emit_event(EngineEvent::StepStatusChanged { directive_id: directive.id, step_id: step.id, old_status: "ready".to_string(), new_status: "running".to_string(), }); self.emit_directive_event( directive.id, "step_started", "info", serde_json::json!({ "step_id": step.id, "step_name": step.name, }), "system", ) .await?; Ok(()) } /// Build contract details from a step. /// Returns (name, description, contract_type, initial_phase) fn get_contract_details( &self, directive: &Directive, step: &ChainStep, ) -> (String, Option, String, String) { let name = format!("{} - {}", directive.title, step.name); let description = step.description.clone(); let contract_type = step.contract_type.clone(); let initial_phase = step.initial_phase.clone().unwrap_or_else(|| "plan".to_string()); (name, description, contract_type, initial_phase) } // ======================================================================== // Evaluation // ======================================================================== /// Handle contract completion: evaluate the step. pub async fn on_contract_completed( &self, contract_id: Uuid, ) -> Result<(), EngineError> { // Find the step for this contract let step = repository::get_step_by_contract_id(&self.pool, contract_id) .await? .ok_or(EngineError::StepNotFound(contract_id))?; // Get directive let chain = repository::get_directive_chain(&self.pool, step.chain_id) .await? .ok_or(EngineError::ChainNotFound(step.chain_id))?; let directive = repository::get_directive(&self.pool, chain.directive_id) .await? .ok_or(EngineError::DirectiveNotFound(chain.directive_id))?; // Update step status to evaluating repository::update_step_status(&self.pool, step.id, "evaluating").await?; self.emit_event(EngineEvent::StepStatusChanged { directive_id: directive.id, step_id: step.id, old_status: "running".to_string(), new_status: "evaluating".to_string(), }); // Run evaluation let result = self.evaluate_step(&directive, &step).await?; // Record evaluation let programmatic_results = result .verifier_results .iter() .filter(|r| r.verifier_type != super::verifier::VerifierType::Llm) .map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)) .collect::>(); let llm_results = result .verifier_results .iter() .filter(|r| r.verifier_type == super::verifier::VerifierType::Llm) .map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)) .collect::>(); // Get chain_id from step let chain_id = step.chain_id; let _evaluation = repository::create_directive_evaluation( &self.pool, directive.id, Some(chain_id), Some(step.id), step.contract_id, "composite", Some("orchestration_engine"), result.passed, Some(result.composite_score), Some(result.confidence_level.as_str()), serde_json::Value::Array(programmatic_results), serde_json::Value::Array(llm_results), serde_json::Value::Null, // criteria_results &result.summary, result.rework_instructions.as_deref(), ) .await?; // Update step based on result let new_status = match result.confidence_level { ConfidenceLevel::Green => "passed", ConfidenceLevel::Yellow => { // Check autonomy level if directive.autonomy_level == "full_auto" { "passed" // Accept yellow in full auto mode } else { // Create approval request self.request_approval( &directive, &step, "step_review", &format!( "Step '{}' completed with yellow confidence ({:.0}%). Review required.", step.name, result.composite_score * 100.0 ), ) .await?; "evaluating" // Wait for approval } } ConfidenceLevel::Red => { // Initiate rework self.initiate_rework(&directive, &step, &result).await?; "rework" } }; repository::update_step_status(&self.pool, step.id, new_status).await?; repository::update_step_confidence( &self.pool, step.id, result.composite_score, result.confidence_level.as_str(), result.id, ) .await?; self.emit_event(EngineEvent::EvaluationCompleted { directive_id: directive.id, step_id: step.id, passed: result.passed, confidence_level: result.confidence_level, }); // If passed, continue chain execution if new_status == "passed" { self.advance_chain(directive.id).await?; } Ok(()) } /// Evaluate a step using tiered verification. async fn evaluate_step( &self, directive: &Directive, step: &ChainStep, ) -> Result { // Get repository path let repo_path = directive .local_path .as_ref() .map(std::path::PathBuf::from) .unwrap_or_else(|| std::path::PathBuf::from(".")); // Auto-detect verifiers let verifiers = auto_detect_verifiers(&repo_path).await; // Build verification context let context = VerificationContext { step_id: step.id, contract_id: step.contract_id, modified_files: vec![], // TODO: Get from contract/git step_description: step.description.clone().unwrap_or_default(), acceptance_criteria: vec![], // TODO: Get from directive directive_context: directive.goal.clone(), }; // Run composite evaluation let evaluator = CompositeEvaluator::new(verifiers) .with_thresholds( directive.confidence_threshold_green, directive.confidence_threshold_yellow, ); Ok(evaluator.evaluate(&repo_path, &context).await) } /// Initiate rework for a failed step. async fn initiate_rework( &self, directive: &Directive, step: &ChainStep, result: &EvaluationResult, ) -> Result<(), EngineError> { // Increment rework count let updated_step = repository::increment_step_rework_count(&self.pool, step.id).await?; // Check rework limit let max_rework = directive.max_rework_cycles.unwrap_or(3); if updated_step.rework_count >= max_rework { // Too many rework attempts, mark as blocked repository::update_step_status(&self.pool, step.id, "blocked").await?; self.emit_directive_event( directive.id, "step_blocked", "warning", serde_json::json!({ "step_id": step.id, "step_name": step.name, "reason": "Max rework attempts reached", }), "system", ) .await?; return Ok(()); } // Log rework event self.emit_directive_event( directive.id, "step_rework", "info", serde_json::json!({ "step_id": step.id, "step_name": step.name, "rework_count": updated_step.rework_count, "instructions": result.rework_instructions, }), "system", ) .await?; // TODO: Send rework instructions to supervisor task // This would involve: // 1. Reset contract phase to 'plan' // 2. Send message to supervisor with rework instructions // 3. Update step status to 'running' Ok(()) } /// Request human approval for a step. async fn request_approval( &self, directive: &Directive, step: &ChainStep, approval_type: &str, description: &str, ) -> Result { let context = serde_json::json!({ "step_id": step.id, "step_name": step.name, "confidence_score": step.confidence_score, }); let approval = repository::create_approval_request( &self.pool, directive.id, Some(step.id), approval_type, description, Some(context), "medium", None, // expires_at ) .await?; self.emit_event(EngineEvent::ApprovalRequired { directive_id: directive.id, approval_id: approval.id, approval_type: approval_type.to_string(), }); Ok(approval.id) } /// Handle approval resolution. pub async fn on_approval_resolved( &self, approval_id: Uuid, approved: bool, responded_by: Uuid, ) -> Result<(), EngineError> { let status = if approved { "approved" } else { "denied" }; let approval = repository::resolve_approval( &self.pool, approval_id, status, None, responded_by, ) .await?; if let Some(step_id) = approval.step_id { let step = repository::get_chain_step(&self.pool, step_id) .await? .ok_or(EngineError::StepNotFound(step_id))?; let chain = repository::get_directive_chain(&self.pool, step.chain_id) .await? .ok_or(EngineError::ChainNotFound(step.chain_id))?; if approved { // Mark step as passed and continue repository::update_step_status(&self.pool, step_id, "passed").await?; self.advance_chain(chain.directive_id).await?; } else { // Mark step as failed/blocked repository::update_step_status(&self.pool, step_id, "blocked").await?; } } Ok(()) } // ======================================================================== // Circuit Breakers // ======================================================================== /// Check circuit breakers for a directive. async fn check_circuit_breakers(&self, directive: &Directive) -> Result<(), EngineError> { // Check cost limit if let Some(max_cost) = directive.max_total_cost_usd { let current_cost = directive.total_cost_usd; if current_cost >= max_cost { return Err(EngineError::CircuitBreaker(format!( "Cost limit exceeded: ${:.2} >= ${:.2}", current_cost, max_cost ))); } } // Check time limit (stored in minutes) if let Some(max_minutes) = directive.max_wall_time_minutes { if let Some(started_at) = directive.started_at { let elapsed = chrono::Utc::now().signed_duration_since(started_at); let elapsed_minutes = elapsed.num_minutes(); if elapsed_minutes >= max_minutes as i64 { return Err(EngineError::CircuitBreaker(format!( "Time limit exceeded: {} min >= {} min", elapsed_minutes, max_minutes ))); } } } // Check chain generation limit if let Some(max_gen) = directive.max_chain_regenerations { let current_gen = directive.chain_generation_count; if current_gen >= max_gen { return Err(EngineError::CircuitBreaker(format!( "Chain generation limit exceeded: {} >= {}", current_gen, max_gen ))); } } Ok(()) } // ======================================================================== // Completion // ======================================================================== /// Complete a directive after all steps pass. async fn complete_directive(&self, directive_id: Uuid) -> Result<(), EngineError> { // Run final evaluation (optional) // TODO: LLM evaluation of overall directive completion // Update directive status repository::update_directive_status(&self.pool, directive_id, "completed").await?; self.emit_event(EngineEvent::DirectiveStatusChanged { directive_id, old_status: "active".to_string(), new_status: "completed".to_string(), }); self.emit_directive_event( directive_id, "directive_completed", "info", serde_json::json!({}), "system", ) .await?; Ok(()) } // ======================================================================== // Event Logging // ======================================================================== /// Emit a directive event to the database. async fn emit_directive_event( &self, directive_id: Uuid, event_type: &str, severity: &str, event_data: serde_json::Value, actor_type: &str, ) -> Result { Ok(repository::emit_directive_event( &self.pool, directive_id, None, // chain_id None, // step_id event_type, severity, Some(event_data), actor_type, None, // actor_id ) .await?) } } #[cfg(test)] mod tests { use super::*; #[test] fn test_confidence_level_decision() { // Green confidence should pass in all modes assert_eq!(ConfidenceLevel::Green.as_str(), "green"); // Yellow confidence behavior depends on autonomy level assert_eq!(ConfidenceLevel::Yellow.as_str(), "yellow"); // Red confidence should always trigger rework assert_eq!(ConfidenceLevel::Red.as_str(), "red"); } }