diff options
Diffstat (limited to 'makima/src/orchestration/engine.rs')
| -rw-r--r-- | makima/src/orchestration/engine.rs | 976 |
1 files changed, 976 insertions, 0 deletions
diff --git a/makima/src/orchestration/engine.rs b/makima/src/orchestration/engine.rs new file mode 100644 index 0000000..5bbb99f --- /dev/null +++ b/makima/src/orchestration/engine.rs @@ -0,0 +1,976 @@ +//! Directive orchestration engine. +//! +//! Manages the lifecycle of directives: +//! - Starts directives and generates initial chains +//! - Monitors step execution and triggers evaluations +//! - Handles rework, escalation, and chain regeneration +//! - Enforces circuit breakers (cost, time, rework limits) + +use std::collections::HashMap; + +use sqlx::PgPool; +use thiserror::Error; +use tokio::sync::broadcast; +use uuid::Uuid; + +use crate::db::models::{AddStepRequest, ChainStep, Directive, DirectiveEvent, UpdateStepRequest}; +use crate::db::repository::{self, RepositoryError}; + +use super::planner::{ChainPlanner, GeneratedChain, PlannerError}; +use super::verifier::{ + auto_detect_verifiers, CompositeEvaluator, ConfidenceLevel, EvaluationResult, + VerificationContext, +}; + +/// Error type for engine operations. +#[derive(Error, Debug)] +pub enum EngineError { + #[error("Database error: {0}")] + Database(#[from] sqlx::Error), + + #[error("Repository error: {0}")] + Repository(#[from] RepositoryError), + + #[error("Planner error: {0}")] + Planner(#[from] PlannerError), + + #[error("Directive not found: {0}")] + DirectiveNotFound(Uuid), + + #[error("Chain not found for directive: {0}")] + ChainNotFound(Uuid), + + #[error("Step not found: {0}")] + StepNotFound(Uuid), + + #[error("Invalid state transition: {from} -> {to}")] + InvalidStateTransition { from: String, to: String }, + + #[error("Circuit breaker triggered: {0}")] + CircuitBreaker(String), + + #[error("Directive is paused")] + DirectivePaused, + + #[error("Contract creation failed: {0}")] + ContractCreation(String), + + #[error("LLM error: {0}")] + LlmError(String), +} + +/// Event emitted by the engine for UI updates. +#[derive(Debug, Clone)] +pub enum EngineEvent { + /// Directive status changed + DirectiveStatusChanged { + directive_id: Uuid, + old_status: String, + new_status: String, + }, + /// Step status changed + StepStatusChanged { + directive_id: Uuid, + step_id: Uuid, + old_status: String, + new_status: String, + }, + /// Evaluation completed + EvaluationCompleted { + directive_id: Uuid, + step_id: Uuid, + passed: bool, + confidence_level: ConfidenceLevel, + }, + /// Approval required + ApprovalRequired { + directive_id: Uuid, + approval_id: Uuid, + approval_type: String, + }, + /// Chain regenerated + ChainRegenerated { + directive_id: Uuid, + old_chain_id: Uuid, + new_chain_id: Uuid, + }, +} + +/// Main orchestration engine for directives. +pub struct DirectiveEngine { + pool: PgPool, + planner: ChainPlanner, + event_tx: Option<broadcast::Sender<EngineEvent>>, +} + +impl DirectiveEngine { + /// Create a new directive engine. + pub fn new(pool: PgPool) -> Self { + Self { + pool, + planner: ChainPlanner::new(), + event_tx: None, + } + } + + /// Set the event broadcast channel for UI updates. + pub fn with_event_channel(mut self, tx: broadcast::Sender<EngineEvent>) -> Self { + self.event_tx = Some(tx); + self + } + + /// Emit an event if channel is configured. + fn emit_event(&self, event: EngineEvent) { + if let Some(tx) = &self.event_tx { + let _ = tx.send(event); + } + } + + // ======================================================================== + // Directive Lifecycle + // ======================================================================== + + /// Start a directive: generate chain and begin execution. + pub async fn start_directive(&self, directive_id: Uuid) -> Result<(), EngineError> { + let directive = repository::get_directive(&self.pool, directive_id) + .await? + .ok_or(EngineError::DirectiveNotFound(directive_id))?; + + // Validate current state + if directive.status != "draft" && directive.status != "paused" { + return Err(EngineError::InvalidStateTransition { + from: directive.status, + to: "planning".to_string(), + }); + } + + // Update status to planning + repository::update_directive_status(&self.pool, directive_id, "planning").await?; + self.emit_directive_event( + directive_id, + "status_changed", + "info", + serde_json::json!({"old_status": directive.status, "new_status": "planning"}), + "system", + ) + .await?; + + // Generate chain (placeholder - actual LLM call would go here) + let chain = self.generate_initial_chain(&directive).await?; + + // Create chain in database + let db_chain = repository::create_directive_chain( + &self.pool, + directive_id, + &chain.name, + Some(&chain.description), + None, // rationale + None, // planning_model + ) + .await?; + + // Create steps + self.create_steps_from_chain(&db_chain.id, &chain).await?; + + // Update directive to active + repository::update_directive_status(&self.pool, directive_id, "active").await?; + self.emit_event(EngineEvent::DirectiveStatusChanged { + directive_id, + old_status: "planning".to_string(), + new_status: "active".to_string(), + }); + + // Start ready steps + self.advance_chain(directive_id).await?; + + Ok(()) + } + + /// Pause a directive. + pub async fn pause_directive(&self, directive_id: Uuid) -> Result<(), EngineError> { + let directive = repository::get_directive(&self.pool, directive_id) + .await? + .ok_or(EngineError::DirectiveNotFound(directive_id))?; + + if directive.status != "active" { + return Err(EngineError::InvalidStateTransition { + from: directive.status, + to: "paused".to_string(), + }); + } + + repository::update_directive_status(&self.pool, directive_id, "paused").await?; + self.emit_event(EngineEvent::DirectiveStatusChanged { + directive_id, + old_status: "active".to_string(), + new_status: "paused".to_string(), + }); + + Ok(()) + } + + /// Resume a paused directive. + pub async fn resume_directive(&self, directive_id: Uuid) -> Result<(), EngineError> { + let directive = repository::get_directive(&self.pool, directive_id) + .await? + .ok_or(EngineError::DirectiveNotFound(directive_id))?; + + if directive.status != "paused" { + return Err(EngineError::InvalidStateTransition { + from: directive.status, + to: "active".to_string(), + }); + } + + repository::update_directive_status(&self.pool, directive_id, "active").await?; + self.emit_event(EngineEvent::DirectiveStatusChanged { + directive_id, + old_status: "paused".to_string(), + new_status: "active".to_string(), + }); + + // Continue execution + self.advance_chain(directive_id).await?; + + Ok(()) + } + + /// Stop a directive (cannot be resumed). + pub async fn stop_directive(&self, directive_id: Uuid) -> Result<(), EngineError> { + let directive = repository::get_directive(&self.pool, directive_id) + .await? + .ok_or(EngineError::DirectiveNotFound(directive_id))?; + + if directive.status == "completed" || directive.status == "failed" { + return Err(EngineError::InvalidStateTransition { + from: directive.status, + to: "failed".to_string(), + }); + } + + repository::update_directive_status(&self.pool, directive_id, "failed").await?; + self.emit_event(EngineEvent::DirectiveStatusChanged { + directive_id, + old_status: directive.status, + new_status: "failed".to_string(), + }); + + Ok(()) + } + + // ======================================================================== + // Chain Management + // ======================================================================== + + /// Generate initial chain from directive. + async fn generate_initial_chain( + &self, + directive: &Directive, + ) -> Result<GeneratedChain, EngineError> { + // Build planning prompt + let _prompt = self.planner.build_planning_prompt(directive); + + // TODO: Call LLM to generate chain + // For now, return a simple placeholder chain + let chain = GeneratedChain { + name: format!("{}-chain", directive.title.to_lowercase().replace(' ', "-")), + description: format!("Execution plan for: {}", directive.goal), + steps: vec![ + super::planner::GeneratedStep { + name: "research".to_string(), + step_type: "research".to_string(), + description: "Research and understand the requirements".to_string(), + depends_on: vec![], + requirement_ids: vec![], + contract_template: None, + }, + super::planner::GeneratedStep { + name: "implement".to_string(), + step_type: "implement".to_string(), + description: "Implement the solution".to_string(), + depends_on: vec!["research".to_string()], + requirement_ids: vec![], + contract_template: None, + }, + super::planner::GeneratedStep { + name: "test".to_string(), + step_type: "test".to_string(), + description: "Test and verify the implementation".to_string(), + depends_on: vec!["implement".to_string()], + requirement_ids: vec![], + contract_template: None, + }, + ], + }; + + // Validate the chain + self.planner.validate_chain(&chain)?; + + Ok(chain) + } + + /// Create database steps from a generated chain. + async fn create_steps_from_chain( + &self, + chain_id: &Uuid, + chain: &GeneratedChain, + ) -> Result<(), EngineError> { + // First pass: create all steps and build name-to-id map + let mut step_id_map: HashMap<String, Uuid> = HashMap::new(); + + // Get editor positions + let positions = self.planner.compute_editor_positions(chain); + + for step in &chain.steps { + let (editor_x, editor_y) = positions + .get(&step.name) + .copied() + .unwrap_or((100.0, 100.0)); + + let task_plan = step + .contract_template + .as_ref() + .and_then(|t| t.tasks.first()) + .map(|t| t.plan.clone()) + .or_else(|| Some(step.description.clone())); + + let request = AddStepRequest { + name: step.name.clone(), + description: Some(step.description.clone()), + step_type: Some(step.step_type.clone()), + contract_type: step.contract_template.as_ref().map(|t| t.contract_type.clone()), + initial_phase: Some("plan".to_string()), + task_plan, + phases: step.contract_template.as_ref().map(|t| t.phases.clone()), + depends_on: None, // Will update in second pass + parallel_group: None, + requirement_ids: Some(step.requirement_ids.clone()), + acceptance_criteria_ids: None, + verifier_config: None, + editor_x: Some(editor_x), + editor_y: Some(editor_y), + }; + + let db_step = repository::create_chain_step(&self.pool, *chain_id, request).await?; + step_id_map.insert(step.name.clone(), db_step.id); + } + + // Second pass: update dependencies + for step in &chain.steps { + if step.depends_on.is_empty() { + continue; + } + + let step_id = step_id_map.get(&step.name).unwrap(); + let dep_ids: Vec<Uuid> = step + .depends_on + .iter() + .filter_map(|name| step_id_map.get(name)) + .copied() + .collect(); + + // Update step with proper dependencies + let update = UpdateStepRequest { + name: None, + description: None, + task_plan: None, + depends_on: Some(dep_ids), + requirement_ids: None, + acceptance_criteria_ids: None, + verifier_config: None, + editor_x: None, + editor_y: None, + }; + + repository::update_chain_step(&self.pool, *step_id, update).await?; + } + + Ok(()) + } + + /// Regenerate chain while preserving completed steps. + pub async fn regenerate_chain( + &self, + directive_id: Uuid, + reason: &str, + ) -> Result<Uuid, EngineError> { + let directive = repository::get_directive(&self.pool, directive_id) + .await? + .ok_or(EngineError::DirectiveNotFound(directive_id))?; + + let current_chain = repository::get_current_chain(&self.pool, directive_id) + .await? + .ok_or(EngineError::ChainNotFound(directive_id))?; + + // Get completed and failed steps + let steps = repository::list_chain_steps(&self.pool, current_chain.id).await?; + let completed_steps: Vec<_> = steps.iter().filter(|s| s.status == "passed").collect(); + let failed_step = steps.iter().find(|s| s.status == "failed"); + + // Build replan prompt + let _prompt = self.planner.build_replan_prompt( + &directive, + &completed_steps.iter().map(|s| (*s).clone()).collect::<Vec<_>>(), + failed_step.map(|s| &*s), + reason, + ); + + // TODO: Call LLM to regenerate chain + // For now, just create a new chain with similar structure + let new_chain = self.generate_initial_chain(&directive).await?; + + // Supersede old chain + repository::supersede_chain(&self.pool, current_chain.id).await?; + + // Create new chain + let db_chain = repository::create_directive_chain( + &self.pool, + directive_id, + &new_chain.name, + Some(&new_chain.description), + Some(reason), // rationale + None, // planning_model + ) + .await?; + + // Create steps + self.create_steps_from_chain(&db_chain.id, &new_chain).await?; + + self.emit_event(EngineEvent::ChainRegenerated { + directive_id, + old_chain_id: current_chain.id, + new_chain_id: db_chain.id, + }); + + // Continue execution + self.advance_chain(directive_id).await?; + + Ok(db_chain.id) + } + + // ======================================================================== + // Step Execution + // ======================================================================== + + /// Advance chain execution: find ready steps and start them. + pub async fn advance_chain(&self, directive_id: Uuid) -> Result<(), EngineError> { + let directive = repository::get_directive(&self.pool, directive_id) + .await? + .ok_or(EngineError::DirectiveNotFound(directive_id))?; + + // Check if directive is active + if directive.status == "paused" { + return Err(EngineError::DirectivePaused); + } + if directive.status != "active" { + return Ok(()); // Not an error, just nothing to do + } + + // Check circuit breakers + self.check_circuit_breakers(&directive).await?; + + // Get current chain + let chain = repository::get_current_chain(&self.pool, directive_id) + .await? + .ok_or(EngineError::ChainNotFound(directive_id))?; + + // Find ready steps (dependencies met, status=pending) + let ready_steps = repository::find_ready_steps(&self.pool, chain.id).await?; + + // Start each ready step + for step in ready_steps { + self.start_step(&directive, &step).await?; + } + + // Check if chain is complete + let all_steps = repository::list_chain_steps(&self.pool, chain.id).await?; + let all_passed = all_steps.iter().all(|s| s.status == "passed" || s.status == "skipped"); + let any_blocked = all_steps.iter().any(|s| s.status == "blocked" || s.status == "failed"); + + if all_passed && !all_steps.is_empty() { + // Complete the directive + self.complete_directive(directive_id).await?; + } else if any_blocked { + // Check if we should regenerate or fail + let failed_count = all_steps.iter().filter(|s| s.status == "failed").count(); + if failed_count > 3 { + // Too many failures, fail the directive + repository::update_directive_status(&self.pool, directive_id, "failed").await?; + } + } + + Ok(()) + } + + /// Start a step by creating its contract and supervisor task. + async fn start_step(&self, directive: &Directive, step: &ChainStep) -> Result<(), EngineError> { + // Update step status to ready + repository::update_step_status(&self.pool, step.id, "ready").await?; + self.emit_event(EngineEvent::StepStatusChanged { + directive_id: directive.id, + step_id: step.id, + old_status: "pending".to_string(), + new_status: "ready".to_string(), + }); + + // Get contract details from step template + let (_name, _description, _contract_type, _initial_phase) = + self.get_contract_details(directive, step); + + // TODO: Actually create the contract via the contracts handler + // For now, just update the step status to running + // In a full implementation, this would: + // 1. Create contract via POST /api/v1/contracts + // 2. Create supervisor task via POST /api/v1/tasks + // 3. Link contract and task to step + // 4. Update step status to running + + // Placeholder: mark step as running + repository::update_step_status(&self.pool, step.id, "running").await?; + self.emit_event(EngineEvent::StepStatusChanged { + directive_id: directive.id, + step_id: step.id, + old_status: "ready".to_string(), + new_status: "running".to_string(), + }); + + self.emit_directive_event( + directive.id, + "step_started", + "info", + serde_json::json!({ + "step_id": step.id, + "step_name": step.name, + }), + "system", + ) + .await?; + + Ok(()) + } + + /// Build contract details from a step. + /// Returns (name, description, contract_type, initial_phase) + fn get_contract_details( + &self, + directive: &Directive, + step: &ChainStep, + ) -> (String, Option<String>, String, String) { + let name = format!("{} - {}", directive.title, step.name); + let description = step.description.clone(); + let contract_type = step.contract_type.clone(); + let initial_phase = step.initial_phase.clone().unwrap_or_else(|| "plan".to_string()); + + (name, description, contract_type, initial_phase) + } + + // ======================================================================== + // Evaluation + // ======================================================================== + + /// Handle contract completion: evaluate the step. + pub async fn on_contract_completed( + &self, + contract_id: Uuid, + ) -> Result<(), EngineError> { + // Find the step for this contract + let step = repository::get_step_by_contract_id(&self.pool, contract_id) + .await? + .ok_or(EngineError::StepNotFound(contract_id))?; + + // Get directive + let chain = repository::get_directive_chain(&self.pool, step.chain_id) + .await? + .ok_or(EngineError::ChainNotFound(step.chain_id))?; + + let directive = repository::get_directive(&self.pool, chain.directive_id) + .await? + .ok_or(EngineError::DirectiveNotFound(chain.directive_id))?; + + // Update step status to evaluating + repository::update_step_status(&self.pool, step.id, "evaluating").await?; + self.emit_event(EngineEvent::StepStatusChanged { + directive_id: directive.id, + step_id: step.id, + old_status: "running".to_string(), + new_status: "evaluating".to_string(), + }); + + // Run evaluation + let result = self.evaluate_step(&directive, &step).await?; + + // Record evaluation + let programmatic_results = result + .verifier_results + .iter() + .filter(|r| r.verifier_type != super::verifier::VerifierType::Llm) + .map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)) + .collect::<Vec<_>>(); + + let llm_results = result + .verifier_results + .iter() + .filter(|r| r.verifier_type == super::verifier::VerifierType::Llm) + .map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)) + .collect::<Vec<_>>(); + + // Get chain_id from step + let chain_id = step.chain_id; + + let _evaluation = repository::create_directive_evaluation( + &self.pool, + directive.id, + Some(chain_id), + Some(step.id), + step.contract_id, + "composite", + Some("orchestration_engine"), + result.passed, + Some(result.composite_score), + Some(result.confidence_level.as_str()), + serde_json::Value::Array(programmatic_results), + serde_json::Value::Array(llm_results), + serde_json::Value::Null, // criteria_results + &result.summary, + result.rework_instructions.as_deref(), + ) + .await?; + + // Update step based on result + let new_status = match result.confidence_level { + ConfidenceLevel::Green => "passed", + ConfidenceLevel::Yellow => { + // Check autonomy level + if directive.autonomy_level == "full_auto" { + "passed" // Accept yellow in full auto mode + } else { + // Create approval request + self.request_approval( + &directive, + &step, + "step_review", + &format!( + "Step '{}' completed with yellow confidence ({:.0}%). Review required.", + step.name, + result.composite_score * 100.0 + ), + ) + .await?; + "evaluating" // Wait for approval + } + } + ConfidenceLevel::Red => { + // Initiate rework + self.initiate_rework(&directive, &step, &result).await?; + "rework" + } + }; + + repository::update_step_status(&self.pool, step.id, new_status).await?; + repository::update_step_confidence( + &self.pool, + step.id, + result.composite_score, + result.confidence_level.as_str(), + result.id, + ) + .await?; + + self.emit_event(EngineEvent::EvaluationCompleted { + directive_id: directive.id, + step_id: step.id, + passed: result.passed, + confidence_level: result.confidence_level, + }); + + // If passed, continue chain execution + if new_status == "passed" { + self.advance_chain(directive.id).await?; + } + + Ok(()) + } + + /// Evaluate a step using tiered verification. + async fn evaluate_step( + &self, + directive: &Directive, + step: &ChainStep, + ) -> Result<EvaluationResult, EngineError> { + // Get repository path + let repo_path = directive + .local_path + .as_ref() + .map(std::path::PathBuf::from) + .unwrap_or_else(|| std::path::PathBuf::from(".")); + + // Auto-detect verifiers + let verifiers = auto_detect_verifiers(&repo_path).await; + + // Build verification context + let context = VerificationContext { + step_id: step.id, + contract_id: step.contract_id, + modified_files: vec![], // TODO: Get from contract/git + step_description: step.description.clone().unwrap_or_default(), + acceptance_criteria: vec![], // TODO: Get from directive + directive_context: directive.goal.clone(), + }; + + // Run composite evaluation + let evaluator = CompositeEvaluator::new(verifiers) + .with_thresholds( + directive.confidence_threshold_green, + directive.confidence_threshold_yellow, + ); + + Ok(evaluator.evaluate(&repo_path, &context).await) + } + + /// Initiate rework for a failed step. + async fn initiate_rework( + &self, + directive: &Directive, + step: &ChainStep, + result: &EvaluationResult, + ) -> Result<(), EngineError> { + // Increment rework count + let updated_step = repository::increment_step_rework_count(&self.pool, step.id).await?; + + // Check rework limit + let max_rework = directive.max_rework_cycles.unwrap_or(3); + if updated_step.rework_count >= max_rework { + // Too many rework attempts, mark as blocked + repository::update_step_status(&self.pool, step.id, "blocked").await?; + self.emit_directive_event( + directive.id, + "step_blocked", + "warning", + serde_json::json!({ + "step_id": step.id, + "step_name": step.name, + "reason": "Max rework attempts reached", + }), + "system", + ) + .await?; + return Ok(()); + } + + // Log rework event + self.emit_directive_event( + directive.id, + "step_rework", + "info", + serde_json::json!({ + "step_id": step.id, + "step_name": step.name, + "rework_count": updated_step.rework_count, + "instructions": result.rework_instructions, + }), + "system", + ) + .await?; + + // TODO: Send rework instructions to supervisor task + // This would involve: + // 1. Reset contract phase to 'plan' + // 2. Send message to supervisor with rework instructions + // 3. Update step status to 'running' + + Ok(()) + } + + /// Request human approval for a step. + async fn request_approval( + &self, + directive: &Directive, + step: &ChainStep, + approval_type: &str, + description: &str, + ) -> Result<Uuid, EngineError> { + let context = serde_json::json!({ + "step_id": step.id, + "step_name": step.name, + "confidence_score": step.confidence_score, + }); + + let approval = repository::create_approval_request( + &self.pool, + directive.id, + Some(step.id), + approval_type, + description, + Some(context), + "medium", + None, // expires_at + ) + .await?; + + self.emit_event(EngineEvent::ApprovalRequired { + directive_id: directive.id, + approval_id: approval.id, + approval_type: approval_type.to_string(), + }); + + Ok(approval.id) + } + + /// Handle approval resolution. + pub async fn on_approval_resolved( + &self, + approval_id: Uuid, + approved: bool, + responded_by: Uuid, + ) -> Result<(), EngineError> { + let status = if approved { "approved" } else { "denied" }; + let approval = repository::resolve_approval( + &self.pool, + approval_id, + status, + None, + responded_by, + ) + .await?; + + if let Some(step_id) = approval.step_id { + let step = repository::get_chain_step(&self.pool, step_id) + .await? + .ok_or(EngineError::StepNotFound(step_id))?; + + let chain = repository::get_directive_chain(&self.pool, step.chain_id) + .await? + .ok_or(EngineError::ChainNotFound(step.chain_id))?; + + if approved { + // Mark step as passed and continue + repository::update_step_status(&self.pool, step_id, "passed").await?; + self.advance_chain(chain.directive_id).await?; + } else { + // Mark step as failed/blocked + repository::update_step_status(&self.pool, step_id, "blocked").await?; + } + } + + Ok(()) + } + + // ======================================================================== + // Circuit Breakers + // ======================================================================== + + /// Check circuit breakers for a directive. + async fn check_circuit_breakers(&self, directive: &Directive) -> Result<(), EngineError> { + // Check cost limit + if let Some(max_cost) = directive.max_total_cost_usd { + let current_cost = directive.total_cost_usd; + if current_cost >= max_cost { + return Err(EngineError::CircuitBreaker(format!( + "Cost limit exceeded: ${:.2} >= ${:.2}", + current_cost, max_cost + ))); + } + } + + // Check time limit (stored in minutes) + if let Some(max_minutes) = directive.max_wall_time_minutes { + if let Some(started_at) = directive.started_at { + let elapsed = chrono::Utc::now().signed_duration_since(started_at); + let elapsed_minutes = elapsed.num_minutes(); + if elapsed_minutes >= max_minutes as i64 { + return Err(EngineError::CircuitBreaker(format!( + "Time limit exceeded: {} min >= {} min", + elapsed_minutes, max_minutes + ))); + } + } + } + + // Check chain generation limit + if let Some(max_gen) = directive.max_chain_regenerations { + let current_gen = directive.chain_generation_count; + if current_gen >= max_gen { + return Err(EngineError::CircuitBreaker(format!( + "Chain generation limit exceeded: {} >= {}", + current_gen, max_gen + ))); + } + } + + Ok(()) + } + + // ======================================================================== + // Completion + // ======================================================================== + + /// Complete a directive after all steps pass. + async fn complete_directive(&self, directive_id: Uuid) -> Result<(), EngineError> { + // Run final evaluation (optional) + // TODO: LLM evaluation of overall directive completion + + // Update directive status + repository::update_directive_status(&self.pool, directive_id, "completed").await?; + + self.emit_event(EngineEvent::DirectiveStatusChanged { + directive_id, + old_status: "active".to_string(), + new_status: "completed".to_string(), + }); + + self.emit_directive_event( + directive_id, + "directive_completed", + "info", + serde_json::json!({}), + "system", + ) + .await?; + + Ok(()) + } + + // ======================================================================== + // Event Logging + // ======================================================================== + + /// Emit a directive event to the database. + async fn emit_directive_event( + &self, + directive_id: Uuid, + event_type: &str, + severity: &str, + event_data: serde_json::Value, + actor_type: &str, + ) -> Result<DirectiveEvent, EngineError> { + Ok(repository::emit_directive_event( + &self.pool, + directive_id, + None, // chain_id + None, // step_id + event_type, + severity, + Some(event_data), + actor_type, + None, // actor_id + ) + .await?) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_confidence_level_decision() { + // Green confidence should pass in all modes + assert_eq!(ConfidenceLevel::Green.as_str(), "green"); + + // Yellow confidence behavior depends on autonomy level + assert_eq!(ConfidenceLevel::Yellow.as_str(), "yellow"); + + // Red confidence should always trigger rework + assert_eq!(ConfidenceLevel::Red.as_str(), "red"); + } +} |
