diff options
Diffstat (limited to 'makima/src/orchestration')
| -rw-r--r-- | makima/src/orchestration/engine.rs | 976 | ||||
| -rw-r--r-- | makima/src/orchestration/mod.rs | 26 | ||||
| -rw-r--r-- | makima/src/orchestration/planner.rs | 742 | ||||
| -rw-r--r-- | makima/src/orchestration/verifier.rs | 806 |
4 files changed, 2550 insertions, 0 deletions
diff --git a/makima/src/orchestration/engine.rs b/makima/src/orchestration/engine.rs new file mode 100644 index 0000000..5bbb99f --- /dev/null +++ b/makima/src/orchestration/engine.rs @@ -0,0 +1,976 @@ +//! Directive orchestration engine. +//! +//! Manages the lifecycle of directives: +//! - Starts directives and generates initial chains +//! - Monitors step execution and triggers evaluations +//! - Handles rework, escalation, and chain regeneration +//! - Enforces circuit breakers (cost, time, rework limits) + +use std::collections::HashMap; + +use sqlx::PgPool; +use thiserror::Error; +use tokio::sync::broadcast; +use uuid::Uuid; + +use crate::db::models::{AddStepRequest, ChainStep, Directive, DirectiveEvent, UpdateStepRequest}; +use crate::db::repository::{self, RepositoryError}; + +use super::planner::{ChainPlanner, GeneratedChain, PlannerError}; +use super::verifier::{ + auto_detect_verifiers, CompositeEvaluator, ConfidenceLevel, EvaluationResult, + VerificationContext, +}; + +/// Error type for engine operations. +#[derive(Error, Debug)] +pub enum EngineError { + #[error("Database error: {0}")] + Database(#[from] sqlx::Error), + + #[error("Repository error: {0}")] + Repository(#[from] RepositoryError), + + #[error("Planner error: {0}")] + Planner(#[from] PlannerError), + + #[error("Directive not found: {0}")] + DirectiveNotFound(Uuid), + + #[error("Chain not found for directive: {0}")] + ChainNotFound(Uuid), + + #[error("Step not found: {0}")] + StepNotFound(Uuid), + + #[error("Invalid state transition: {from} -> {to}")] + InvalidStateTransition { from: String, to: String }, + + #[error("Circuit breaker triggered: {0}")] + CircuitBreaker(String), + + #[error("Directive is paused")] + DirectivePaused, + + #[error("Contract creation failed: {0}")] + ContractCreation(String), + + #[error("LLM error: {0}")] + LlmError(String), +} + +/// Event emitted by the engine for UI updates. +#[derive(Debug, Clone)] +pub enum EngineEvent { + /// Directive status changed + DirectiveStatusChanged { + directive_id: Uuid, + old_status: String, + new_status: String, + }, + /// Step status changed + StepStatusChanged { + directive_id: Uuid, + step_id: Uuid, + old_status: String, + new_status: String, + }, + /// Evaluation completed + EvaluationCompleted { + directive_id: Uuid, + step_id: Uuid, + passed: bool, + confidence_level: ConfidenceLevel, + }, + /// Approval required + ApprovalRequired { + directive_id: Uuid, + approval_id: Uuid, + approval_type: String, + }, + /// Chain regenerated + ChainRegenerated { + directive_id: Uuid, + old_chain_id: Uuid, + new_chain_id: Uuid, + }, +} + +/// Main orchestration engine for directives. +pub struct DirectiveEngine { + pool: PgPool, + planner: ChainPlanner, + event_tx: Option<broadcast::Sender<EngineEvent>>, +} + +impl DirectiveEngine { + /// Create a new directive engine. + pub fn new(pool: PgPool) -> Self { + Self { + pool, + planner: ChainPlanner::new(), + event_tx: None, + } + } + + /// Set the event broadcast channel for UI updates. + pub fn with_event_channel(mut self, tx: broadcast::Sender<EngineEvent>) -> Self { + self.event_tx = Some(tx); + self + } + + /// Emit an event if channel is configured. + fn emit_event(&self, event: EngineEvent) { + if let Some(tx) = &self.event_tx { + let _ = tx.send(event); + } + } + + // ======================================================================== + // Directive Lifecycle + // ======================================================================== + + /// Start a directive: generate chain and begin execution. + pub async fn start_directive(&self, directive_id: Uuid) -> Result<(), EngineError> { + let directive = repository::get_directive(&self.pool, directive_id) + .await? + .ok_or(EngineError::DirectiveNotFound(directive_id))?; + + // Validate current state + if directive.status != "draft" && directive.status != "paused" { + return Err(EngineError::InvalidStateTransition { + from: directive.status, + to: "planning".to_string(), + }); + } + + // Update status to planning + repository::update_directive_status(&self.pool, directive_id, "planning").await?; + self.emit_directive_event( + directive_id, + "status_changed", + "info", + serde_json::json!({"old_status": directive.status, "new_status": "planning"}), + "system", + ) + .await?; + + // Generate chain (placeholder - actual LLM call would go here) + let chain = self.generate_initial_chain(&directive).await?; + + // Create chain in database + let db_chain = repository::create_directive_chain( + &self.pool, + directive_id, + &chain.name, + Some(&chain.description), + None, // rationale + None, // planning_model + ) + .await?; + + // Create steps + self.create_steps_from_chain(&db_chain.id, &chain).await?; + + // Update directive to active + repository::update_directive_status(&self.pool, directive_id, "active").await?; + self.emit_event(EngineEvent::DirectiveStatusChanged { + directive_id, + old_status: "planning".to_string(), + new_status: "active".to_string(), + }); + + // Start ready steps + self.advance_chain(directive_id).await?; + + Ok(()) + } + + /// Pause a directive. + pub async fn pause_directive(&self, directive_id: Uuid) -> Result<(), EngineError> { + let directive = repository::get_directive(&self.pool, directive_id) + .await? + .ok_or(EngineError::DirectiveNotFound(directive_id))?; + + if directive.status != "active" { + return Err(EngineError::InvalidStateTransition { + from: directive.status, + to: "paused".to_string(), + }); + } + + repository::update_directive_status(&self.pool, directive_id, "paused").await?; + self.emit_event(EngineEvent::DirectiveStatusChanged { + directive_id, + old_status: "active".to_string(), + new_status: "paused".to_string(), + }); + + Ok(()) + } + + /// Resume a paused directive. + pub async fn resume_directive(&self, directive_id: Uuid) -> Result<(), EngineError> { + let directive = repository::get_directive(&self.pool, directive_id) + .await? + .ok_or(EngineError::DirectiveNotFound(directive_id))?; + + if directive.status != "paused" { + return Err(EngineError::InvalidStateTransition { + from: directive.status, + to: "active".to_string(), + }); + } + + repository::update_directive_status(&self.pool, directive_id, "active").await?; + self.emit_event(EngineEvent::DirectiveStatusChanged { + directive_id, + old_status: "paused".to_string(), + new_status: "active".to_string(), + }); + + // Continue execution + self.advance_chain(directive_id).await?; + + Ok(()) + } + + /// Stop a directive (cannot be resumed). + pub async fn stop_directive(&self, directive_id: Uuid) -> Result<(), EngineError> { + let directive = repository::get_directive(&self.pool, directive_id) + .await? + .ok_or(EngineError::DirectiveNotFound(directive_id))?; + + if directive.status == "completed" || directive.status == "failed" { + return Err(EngineError::InvalidStateTransition { + from: directive.status, + to: "failed".to_string(), + }); + } + + repository::update_directive_status(&self.pool, directive_id, "failed").await?; + self.emit_event(EngineEvent::DirectiveStatusChanged { + directive_id, + old_status: directive.status, + new_status: "failed".to_string(), + }); + + Ok(()) + } + + // ======================================================================== + // Chain Management + // ======================================================================== + + /// Generate initial chain from directive. + async fn generate_initial_chain( + &self, + directive: &Directive, + ) -> Result<GeneratedChain, EngineError> { + // Build planning prompt + let _prompt = self.planner.build_planning_prompt(directive); + + // TODO: Call LLM to generate chain + // For now, return a simple placeholder chain + let chain = GeneratedChain { + name: format!("{}-chain", directive.title.to_lowercase().replace(' ', "-")), + description: format!("Execution plan for: {}", directive.goal), + steps: vec![ + super::planner::GeneratedStep { + name: "research".to_string(), + step_type: "research".to_string(), + description: "Research and understand the requirements".to_string(), + depends_on: vec![], + requirement_ids: vec![], + contract_template: None, + }, + super::planner::GeneratedStep { + name: "implement".to_string(), + step_type: "implement".to_string(), + description: "Implement the solution".to_string(), + depends_on: vec!["research".to_string()], + requirement_ids: vec![], + contract_template: None, + }, + super::planner::GeneratedStep { + name: "test".to_string(), + step_type: "test".to_string(), + description: "Test and verify the implementation".to_string(), + depends_on: vec!["implement".to_string()], + requirement_ids: vec![], + contract_template: None, + }, + ], + }; + + // Validate the chain + self.planner.validate_chain(&chain)?; + + Ok(chain) + } + + /// Create database steps from a generated chain. + async fn create_steps_from_chain( + &self, + chain_id: &Uuid, + chain: &GeneratedChain, + ) -> Result<(), EngineError> { + // First pass: create all steps and build name-to-id map + let mut step_id_map: HashMap<String, Uuid> = HashMap::new(); + + // Get editor positions + let positions = self.planner.compute_editor_positions(chain); + + for step in &chain.steps { + let (editor_x, editor_y) = positions + .get(&step.name) + .copied() + .unwrap_or((100.0, 100.0)); + + let task_plan = step + .contract_template + .as_ref() + .and_then(|t| t.tasks.first()) + .map(|t| t.plan.clone()) + .or_else(|| Some(step.description.clone())); + + let request = AddStepRequest { + name: step.name.clone(), + description: Some(step.description.clone()), + step_type: Some(step.step_type.clone()), + contract_type: step.contract_template.as_ref().map(|t| t.contract_type.clone()), + initial_phase: Some("plan".to_string()), + task_plan, + phases: step.contract_template.as_ref().map(|t| t.phases.clone()), + depends_on: None, // Will update in second pass + parallel_group: None, + requirement_ids: Some(step.requirement_ids.clone()), + acceptance_criteria_ids: None, + verifier_config: None, + editor_x: Some(editor_x), + editor_y: Some(editor_y), + }; + + let db_step = repository::create_chain_step(&self.pool, *chain_id, request).await?; + step_id_map.insert(step.name.clone(), db_step.id); + } + + // Second pass: update dependencies + for step in &chain.steps { + if step.depends_on.is_empty() { + continue; + } + + let step_id = step_id_map.get(&step.name).unwrap(); + let dep_ids: Vec<Uuid> = step + .depends_on + .iter() + .filter_map(|name| step_id_map.get(name)) + .copied() + .collect(); + + // Update step with proper dependencies + let update = UpdateStepRequest { + name: None, + description: None, + task_plan: None, + depends_on: Some(dep_ids), + requirement_ids: None, + acceptance_criteria_ids: None, + verifier_config: None, + editor_x: None, + editor_y: None, + }; + + repository::update_chain_step(&self.pool, *step_id, update).await?; + } + + Ok(()) + } + + /// Regenerate chain while preserving completed steps. + pub async fn regenerate_chain( + &self, + directive_id: Uuid, + reason: &str, + ) -> Result<Uuid, EngineError> { + let directive = repository::get_directive(&self.pool, directive_id) + .await? + .ok_or(EngineError::DirectiveNotFound(directive_id))?; + + let current_chain = repository::get_current_chain(&self.pool, directive_id) + .await? + .ok_or(EngineError::ChainNotFound(directive_id))?; + + // Get completed and failed steps + let steps = repository::list_chain_steps(&self.pool, current_chain.id).await?; + let completed_steps: Vec<_> = steps.iter().filter(|s| s.status == "passed").collect(); + let failed_step = steps.iter().find(|s| s.status == "failed"); + + // Build replan prompt + let _prompt = self.planner.build_replan_prompt( + &directive, + &completed_steps.iter().map(|s| (*s).clone()).collect::<Vec<_>>(), + failed_step.map(|s| &*s), + reason, + ); + + // TODO: Call LLM to regenerate chain + // For now, just create a new chain with similar structure + let new_chain = self.generate_initial_chain(&directive).await?; + + // Supersede old chain + repository::supersede_chain(&self.pool, current_chain.id).await?; + + // Create new chain + let db_chain = repository::create_directive_chain( + &self.pool, + directive_id, + &new_chain.name, + Some(&new_chain.description), + Some(reason), // rationale + None, // planning_model + ) + .await?; + + // Create steps + self.create_steps_from_chain(&db_chain.id, &new_chain).await?; + + self.emit_event(EngineEvent::ChainRegenerated { + directive_id, + old_chain_id: current_chain.id, + new_chain_id: db_chain.id, + }); + + // Continue execution + self.advance_chain(directive_id).await?; + + Ok(db_chain.id) + } + + // ======================================================================== + // Step Execution + // ======================================================================== + + /// Advance chain execution: find ready steps and start them. + pub async fn advance_chain(&self, directive_id: Uuid) -> Result<(), EngineError> { + let directive = repository::get_directive(&self.pool, directive_id) + .await? + .ok_or(EngineError::DirectiveNotFound(directive_id))?; + + // Check if directive is active + if directive.status == "paused" { + return Err(EngineError::DirectivePaused); + } + if directive.status != "active" { + return Ok(()); // Not an error, just nothing to do + } + + // Check circuit breakers + self.check_circuit_breakers(&directive).await?; + + // Get current chain + let chain = repository::get_current_chain(&self.pool, directive_id) + .await? + .ok_or(EngineError::ChainNotFound(directive_id))?; + + // Find ready steps (dependencies met, status=pending) + let ready_steps = repository::find_ready_steps(&self.pool, chain.id).await?; + + // Start each ready step + for step in ready_steps { + self.start_step(&directive, &step).await?; + } + + // Check if chain is complete + let all_steps = repository::list_chain_steps(&self.pool, chain.id).await?; + let all_passed = all_steps.iter().all(|s| s.status == "passed" || s.status == "skipped"); + let any_blocked = all_steps.iter().any(|s| s.status == "blocked" || s.status == "failed"); + + if all_passed && !all_steps.is_empty() { + // Complete the directive + self.complete_directive(directive_id).await?; + } else if any_blocked { + // Check if we should regenerate or fail + let failed_count = all_steps.iter().filter(|s| s.status == "failed").count(); + if failed_count > 3 { + // Too many failures, fail the directive + repository::update_directive_status(&self.pool, directive_id, "failed").await?; + } + } + + Ok(()) + } + + /// Start a step by creating its contract and supervisor task. + async fn start_step(&self, directive: &Directive, step: &ChainStep) -> Result<(), EngineError> { + // Update step status to ready + repository::update_step_status(&self.pool, step.id, "ready").await?; + self.emit_event(EngineEvent::StepStatusChanged { + directive_id: directive.id, + step_id: step.id, + old_status: "pending".to_string(), + new_status: "ready".to_string(), + }); + + // Get contract details from step template + let (_name, _description, _contract_type, _initial_phase) = + self.get_contract_details(directive, step); + + // TODO: Actually create the contract via the contracts handler + // For now, just update the step status to running + // In a full implementation, this would: + // 1. Create contract via POST /api/v1/contracts + // 2. Create supervisor task via POST /api/v1/tasks + // 3. Link contract and task to step + // 4. Update step status to running + + // Placeholder: mark step as running + repository::update_step_status(&self.pool, step.id, "running").await?; + self.emit_event(EngineEvent::StepStatusChanged { + directive_id: directive.id, + step_id: step.id, + old_status: "ready".to_string(), + new_status: "running".to_string(), + }); + + self.emit_directive_event( + directive.id, + "step_started", + "info", + serde_json::json!({ + "step_id": step.id, + "step_name": step.name, + }), + "system", + ) + .await?; + + Ok(()) + } + + /// Build contract details from a step. + /// Returns (name, description, contract_type, initial_phase) + fn get_contract_details( + &self, + directive: &Directive, + step: &ChainStep, + ) -> (String, Option<String>, String, String) { + let name = format!("{} - {}", directive.title, step.name); + let description = step.description.clone(); + let contract_type = step.contract_type.clone(); + let initial_phase = step.initial_phase.clone().unwrap_or_else(|| "plan".to_string()); + + (name, description, contract_type, initial_phase) + } + + // ======================================================================== + // Evaluation + // ======================================================================== + + /// Handle contract completion: evaluate the step. + pub async fn on_contract_completed( + &self, + contract_id: Uuid, + ) -> Result<(), EngineError> { + // Find the step for this contract + let step = repository::get_step_by_contract_id(&self.pool, contract_id) + .await? + .ok_or(EngineError::StepNotFound(contract_id))?; + + // Get directive + let chain = repository::get_directive_chain(&self.pool, step.chain_id) + .await? + .ok_or(EngineError::ChainNotFound(step.chain_id))?; + + let directive = repository::get_directive(&self.pool, chain.directive_id) + .await? + .ok_or(EngineError::DirectiveNotFound(chain.directive_id))?; + + // Update step status to evaluating + repository::update_step_status(&self.pool, step.id, "evaluating").await?; + self.emit_event(EngineEvent::StepStatusChanged { + directive_id: directive.id, + step_id: step.id, + old_status: "running".to_string(), + new_status: "evaluating".to_string(), + }); + + // Run evaluation + let result = self.evaluate_step(&directive, &step).await?; + + // Record evaluation + let programmatic_results = result + .verifier_results + .iter() + .filter(|r| r.verifier_type != super::verifier::VerifierType::Llm) + .map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)) + .collect::<Vec<_>>(); + + let llm_results = result + .verifier_results + .iter() + .filter(|r| r.verifier_type == super::verifier::VerifierType::Llm) + .map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)) + .collect::<Vec<_>>(); + + // Get chain_id from step + let chain_id = step.chain_id; + + let _evaluation = repository::create_directive_evaluation( + &self.pool, + directive.id, + Some(chain_id), + Some(step.id), + step.contract_id, + "composite", + Some("orchestration_engine"), + result.passed, + Some(result.composite_score), + Some(result.confidence_level.as_str()), + serde_json::Value::Array(programmatic_results), + serde_json::Value::Array(llm_results), + serde_json::Value::Null, // criteria_results + &result.summary, + result.rework_instructions.as_deref(), + ) + .await?; + + // Update step based on result + let new_status = match result.confidence_level { + ConfidenceLevel::Green => "passed", + ConfidenceLevel::Yellow => { + // Check autonomy level + if directive.autonomy_level == "full_auto" { + "passed" // Accept yellow in full auto mode + } else { + // Create approval request + self.request_approval( + &directive, + &step, + "step_review", + &format!( + "Step '{}' completed with yellow confidence ({:.0}%). Review required.", + step.name, + result.composite_score * 100.0 + ), + ) + .await?; + "evaluating" // Wait for approval + } + } + ConfidenceLevel::Red => { + // Initiate rework + self.initiate_rework(&directive, &step, &result).await?; + "rework" + } + }; + + repository::update_step_status(&self.pool, step.id, new_status).await?; + repository::update_step_confidence( + &self.pool, + step.id, + result.composite_score, + result.confidence_level.as_str(), + result.id, + ) + .await?; + + self.emit_event(EngineEvent::EvaluationCompleted { + directive_id: directive.id, + step_id: step.id, + passed: result.passed, + confidence_level: result.confidence_level, + }); + + // If passed, continue chain execution + if new_status == "passed" { + self.advance_chain(directive.id).await?; + } + + Ok(()) + } + + /// Evaluate a step using tiered verification. + async fn evaluate_step( + &self, + directive: &Directive, + step: &ChainStep, + ) -> Result<EvaluationResult, EngineError> { + // Get repository path + let repo_path = directive + .local_path + .as_ref() + .map(std::path::PathBuf::from) + .unwrap_or_else(|| std::path::PathBuf::from(".")); + + // Auto-detect verifiers + let verifiers = auto_detect_verifiers(&repo_path).await; + + // Build verification context + let context = VerificationContext { + step_id: step.id, + contract_id: step.contract_id, + modified_files: vec![], // TODO: Get from contract/git + step_description: step.description.clone().unwrap_or_default(), + acceptance_criteria: vec![], // TODO: Get from directive + directive_context: directive.goal.clone(), + }; + + // Run composite evaluation + let evaluator = CompositeEvaluator::new(verifiers) + .with_thresholds( + directive.confidence_threshold_green, + directive.confidence_threshold_yellow, + ); + + Ok(evaluator.evaluate(&repo_path, &context).await) + } + + /// Initiate rework for a failed step. + async fn initiate_rework( + &self, + directive: &Directive, + step: &ChainStep, + result: &EvaluationResult, + ) -> Result<(), EngineError> { + // Increment rework count + let updated_step = repository::increment_step_rework_count(&self.pool, step.id).await?; + + // Check rework limit + let max_rework = directive.max_rework_cycles.unwrap_or(3); + if updated_step.rework_count >= max_rework { + // Too many rework attempts, mark as blocked + repository::update_step_status(&self.pool, step.id, "blocked").await?; + self.emit_directive_event( + directive.id, + "step_blocked", + "warning", + serde_json::json!({ + "step_id": step.id, + "step_name": step.name, + "reason": "Max rework attempts reached", + }), + "system", + ) + .await?; + return Ok(()); + } + + // Log rework event + self.emit_directive_event( + directive.id, + "step_rework", + "info", + serde_json::json!({ + "step_id": step.id, + "step_name": step.name, + "rework_count": updated_step.rework_count, + "instructions": result.rework_instructions, + }), + "system", + ) + .await?; + + // TODO: Send rework instructions to supervisor task + // This would involve: + // 1. Reset contract phase to 'plan' + // 2. Send message to supervisor with rework instructions + // 3. Update step status to 'running' + + Ok(()) + } + + /// Request human approval for a step. + async fn request_approval( + &self, + directive: &Directive, + step: &ChainStep, + approval_type: &str, + description: &str, + ) -> Result<Uuid, EngineError> { + let context = serde_json::json!({ + "step_id": step.id, + "step_name": step.name, + "confidence_score": step.confidence_score, + }); + + let approval = repository::create_approval_request( + &self.pool, + directive.id, + Some(step.id), + approval_type, + description, + Some(context), + "medium", + None, // expires_at + ) + .await?; + + self.emit_event(EngineEvent::ApprovalRequired { + directive_id: directive.id, + approval_id: approval.id, + approval_type: approval_type.to_string(), + }); + + Ok(approval.id) + } + + /// Handle approval resolution. + pub async fn on_approval_resolved( + &self, + approval_id: Uuid, + approved: bool, + responded_by: Uuid, + ) -> Result<(), EngineError> { + let status = if approved { "approved" } else { "denied" }; + let approval = repository::resolve_approval( + &self.pool, + approval_id, + status, + None, + responded_by, + ) + .await?; + + if let Some(step_id) = approval.step_id { + let step = repository::get_chain_step(&self.pool, step_id) + .await? + .ok_or(EngineError::StepNotFound(step_id))?; + + let chain = repository::get_directive_chain(&self.pool, step.chain_id) + .await? + .ok_or(EngineError::ChainNotFound(step.chain_id))?; + + if approved { + // Mark step as passed and continue + repository::update_step_status(&self.pool, step_id, "passed").await?; + self.advance_chain(chain.directive_id).await?; + } else { + // Mark step as failed/blocked + repository::update_step_status(&self.pool, step_id, "blocked").await?; + } + } + + Ok(()) + } + + // ======================================================================== + // Circuit Breakers + // ======================================================================== + + /// Check circuit breakers for a directive. + async fn check_circuit_breakers(&self, directive: &Directive) -> Result<(), EngineError> { + // Check cost limit + if let Some(max_cost) = directive.max_total_cost_usd { + let current_cost = directive.total_cost_usd; + if current_cost >= max_cost { + return Err(EngineError::CircuitBreaker(format!( + "Cost limit exceeded: ${:.2} >= ${:.2}", + current_cost, max_cost + ))); + } + } + + // Check time limit (stored in minutes) + if let Some(max_minutes) = directive.max_wall_time_minutes { + if let Some(started_at) = directive.started_at { + let elapsed = chrono::Utc::now().signed_duration_since(started_at); + let elapsed_minutes = elapsed.num_minutes(); + if elapsed_minutes >= max_minutes as i64 { + return Err(EngineError::CircuitBreaker(format!( + "Time limit exceeded: {} min >= {} min", + elapsed_minutes, max_minutes + ))); + } + } + } + + // Check chain generation limit + if let Some(max_gen) = directive.max_chain_regenerations { + let current_gen = directive.chain_generation_count; + if current_gen >= max_gen { + return Err(EngineError::CircuitBreaker(format!( + "Chain generation limit exceeded: {} >= {}", + current_gen, max_gen + ))); + } + } + + Ok(()) + } + + // ======================================================================== + // Completion + // ======================================================================== + + /// Complete a directive after all steps pass. + async fn complete_directive(&self, directive_id: Uuid) -> Result<(), EngineError> { + // Run final evaluation (optional) + // TODO: LLM evaluation of overall directive completion + + // Update directive status + repository::update_directive_status(&self.pool, directive_id, "completed").await?; + + self.emit_event(EngineEvent::DirectiveStatusChanged { + directive_id, + old_status: "active".to_string(), + new_status: "completed".to_string(), + }); + + self.emit_directive_event( + directive_id, + "directive_completed", + "info", + serde_json::json!({}), + "system", + ) + .await?; + + Ok(()) + } + + // ======================================================================== + // Event Logging + // ======================================================================== + + /// Emit a directive event to the database. + async fn emit_directive_event( + &self, + directive_id: Uuid, + event_type: &str, + severity: &str, + event_data: serde_json::Value, + actor_type: &str, + ) -> Result<DirectiveEvent, EngineError> { + Ok(repository::emit_directive_event( + &self.pool, + directive_id, + None, // chain_id + None, // step_id + event_type, + severity, + Some(event_data), + actor_type, + None, // actor_id + ) + .await?) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_confidence_level_decision() { + // Green confidence should pass in all modes + assert_eq!(ConfidenceLevel::Green.as_str(), "green"); + + // Yellow confidence behavior depends on autonomy level + assert_eq!(ConfidenceLevel::Yellow.as_str(), "yellow"); + + // Red confidence should always trigger rework + assert_eq!(ConfidenceLevel::Red.as_str(), "red"); + } +} diff --git a/makima/src/orchestration/mod.rs b/makima/src/orchestration/mod.rs new file mode 100644 index 0000000..41913ca --- /dev/null +++ b/makima/src/orchestration/mod.rs @@ -0,0 +1,26 @@ +//! Orchestration engine for directive-driven autonomous execution. +//! +//! This module provides the core orchestration capabilities: +//! - [`DirectiveEngine`]: Main orchestration loop that manages directive lifecycle +//! - [`ChainPlanner`]: LLM-based chain generation from directive goals +//! - [`Verifier`]: Pluggable verification system for step validation +//! +//! # Architecture +//! +//! The orchestration system follows a directive-first approach: +//! 1. Directives define goals, requirements, and acceptance criteria +//! 2. Chains are generated execution plans (DAGs of steps) +//! 3. Steps map to contracts that are created and monitored +//! 4. Tiered verification (programmatic first, then LLM) determines confidence +//! 5. Confidence scoring (green/yellow/red) drives autonomy decisions + +mod engine; +mod planner; +mod verifier; + +pub use engine::{DirectiveEngine, EngineError}; +pub use planner::{ChainPlanner, PlannerError}; +pub use verifier::{ + auto_detect_verifiers, CompositeEvaluator, ConfidenceLevel, EvaluationResult, Verifier, + VerifierError, VerifierResult, VerifierType, +}; diff --git a/makima/src/orchestration/planner.rs b/makima/src/orchestration/planner.rs new file mode 100644 index 0000000..cdca8a0 --- /dev/null +++ b/makima/src/orchestration/planner.rs @@ -0,0 +1,742 @@ +//! Chain planner for LLM-based execution plan generation. +//! +//! Generates chains (DAGs of steps) from directive goals and requirements. +//! Supports both initial plan generation and replanning while preserving +//! completed work. + +use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, HashSet}; +use thiserror::Error; +use uuid::Uuid; + +use crate::db::models::{AddStepRequest, ChainStep, Directive}; + +/// Error type for planner operations. +#[derive(Error, Debug)] +pub enum PlannerError { + #[error("Cycle detected in DAG: {0}")] + CycleDetected(String), + + #[error("Invalid dependency: step '{step}' depends on unknown step '{dependency}'")] + InvalidDependency { step: String, dependency: String }, + + #[error("LLM generation failed: {0}")] + LlmError(String), + + #[error("Requirement not covered: {0}")] + RequirementNotCovered(String), + + #[error("Invalid plan: {0}")] + InvalidPlan(String), + + #[error("Empty plan generated")] + EmptyPlan, +} + +/// Generated step from LLM planning. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GeneratedStep { + /// Unique name within the chain + pub name: String, + /// Type of step (e.g., "research", "implement", "test", "review") + pub step_type: String, + /// Description of what this step accomplishes + pub description: String, + /// Names of steps this depends on + pub depends_on: Vec<String>, + /// IDs of requirements this step addresses + pub requirement_ids: Vec<String>, + /// Contract template fields + pub contract_template: Option<ContractTemplate>, +} + +/// Template for contract creation from step. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ContractTemplate { + /// Contract name + pub name: String, + /// Contract description + pub description: String, + /// Contract type (e.g., "simple", "agentic") + pub contract_type: String, + /// Phases for the contract + pub phases: Vec<String>, + /// Tasks within the contract + pub tasks: Vec<TaskTemplate>, + /// Deliverables expected + pub deliverables: Vec<DeliverableTemplate>, +} + +/// Template for task within contract. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TaskTemplate { + pub name: String, + pub plan: String, +} + +/// Template for deliverable. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeliverableTemplate { + pub id: String, + pub name: String, + pub priority: String, +} + +/// Generated chain from planning. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GeneratedChain { + /// Name for the chain + pub name: String, + /// Description of the execution plan + pub description: String, + /// Steps in the chain + pub steps: Vec<GeneratedStep>, +} + +/// Chain planner for LLM-based plan generation. +pub struct ChainPlanner { + /// Default step types to suggest (reserved for future use) + #[allow(dead_code)] + default_step_types: Vec<String>, +} + +impl Default for ChainPlanner { + fn default() -> Self { + Self::new() + } +} + +impl ChainPlanner { + /// Create a new chain planner. + pub fn new() -> Self { + Self { + default_step_types: vec![ + "research".to_string(), + "design".to_string(), + "implement".to_string(), + "test".to_string(), + "review".to_string(), + "document".to_string(), + ], + } + } + + /// Build a planning prompt for the LLM. + pub fn build_planning_prompt(&self, directive: &Directive) -> String { + let requirements: Vec<String> = directive + .requirements + .as_array() + .map(|arr| { + arr.iter() + .filter_map(|v| v.as_object()) + .map(|obj| { + let id = obj.get("id").and_then(|v| v.as_str()).unwrap_or("?"); + let desc = obj + .get("description") + .and_then(|v| v.as_str()) + .unwrap_or("?"); + format!("- {}: {}", id, desc) + }) + .collect() + }) + .unwrap_or_default(); + + let criteria: Vec<String> = directive + .acceptance_criteria + .as_array() + .map(|arr| { + arr.iter() + .filter_map(|v| v.as_object()) + .map(|obj| { + let id = obj.get("id").and_then(|v| v.as_str()).unwrap_or("?"); + let criterion = obj + .get("criterion") + .and_then(|v| v.as_str()) + .unwrap_or("?"); + format!("- {}: {}", id, criterion) + }) + .collect() + }) + .unwrap_or_default(); + + let constraints: Vec<String> = directive + .constraints + .as_array() + .map(|arr| { + arr.iter() + .filter_map(|v| v.as_str()) + .map(|s| format!("- {}", s)) + .collect() + }) + .unwrap_or_default(); + + format!( + r#"You are a software architect planning an execution chain for a coding task. + +## Directive Goal +{goal} + +## Requirements +{requirements} + +## Acceptance Criteria +{criteria} + +## Constraints +{constraints} + +## Instructions + +Create an execution plan as a chain of steps. Each step should: +1. Have a unique, descriptive name (kebab-case) +2. Specify its type (research, design, implement, test, review, document) +3. Declare dependencies on prior steps (if any) +4. Map to specific requirement IDs it addresses +5. Include a contract template with tasks and deliverables + +The chain should form a valid DAG (no cycles). Steps can run in parallel if they don't depend on each other. + +Respond with a JSON object in this format: +```json +{{ + "name": "chain-name", + "description": "Brief description of the plan", + "steps": [ + {{ + "name": "step-name", + "step_type": "implement", + "description": "What this step does", + "depends_on": ["prior-step-name"], + "requirement_ids": ["REQ-001"], + "contract_template": {{ + "name": "Contract Name", + "description": "Contract description", + "contract_type": "simple", + "phases": ["plan", "execute"], + "tasks": [ + {{"name": "Task 1", "plan": "Detailed plan for this task"}} + ], + "deliverables": [ + {{"id": "del-1", "name": "Deliverable 1", "priority": "required"}} + ] + }} + }} + ] +}} +``` + +Generate the optimal execution plan now."#, + goal = directive.goal, + requirements = requirements.join("\n"), + criteria = criteria.join("\n"), + constraints = constraints.join("\n"), + ) + } + + /// Parse LLM response into a generated chain. + pub fn parse_plan_response(&self, response: &str) -> Result<GeneratedChain, PlannerError> { + // Extract JSON from response (may be wrapped in markdown code blocks) + let json_str = extract_json_from_response(response)?; + + let chain: GeneratedChain = serde_json::from_str(&json_str) + .map_err(|e| PlannerError::InvalidPlan(format!("JSON parse error: {}", e)))?; + + if chain.steps.is_empty() { + return Err(PlannerError::EmptyPlan); + } + + // Validate the chain + self.validate_chain(&chain)?; + + Ok(chain) + } + + /// Validate a generated chain. + pub fn validate_chain(&self, chain: &GeneratedChain) -> Result<(), PlannerError> { + // Build step name set + let step_names: HashSet<&str> = chain.steps.iter().map(|s| s.name.as_str()).collect(); + + // Check for duplicate names + if step_names.len() != chain.steps.len() { + return Err(PlannerError::InvalidPlan( + "Duplicate step names detected".to_string(), + )); + } + + // Validate dependencies exist + for step in &chain.steps { + for dep in &step.depends_on { + if !step_names.contains(dep.as_str()) { + return Err(PlannerError::InvalidDependency { + step: step.name.clone(), + dependency: dep.clone(), + }); + } + } + } + + // Check for cycles using DFS + self.detect_cycles(chain)?; + + Ok(()) + } + + /// Detect cycles in the chain DAG using DFS. + fn detect_cycles(&self, chain: &GeneratedChain) -> Result<(), PlannerError> { + let mut visited = HashSet::new(); + let mut rec_stack = HashSet::new(); + + // Build adjacency map + let adj: HashMap<&str, Vec<&str>> = chain + .steps + .iter() + .map(|s| (s.name.as_str(), s.depends_on.iter().map(|d| d.as_str()).collect())) + .collect(); + + for step in &chain.steps { + if !visited.contains(step.name.as_str()) { + if self.has_cycle(&step.name, &adj, &mut visited, &mut rec_stack) { + return Err(PlannerError::CycleDetected(step.name.clone())); + } + } + } + + Ok(()) + } + + fn has_cycle<'a>( + &self, + node: &'a str, + adj: &HashMap<&'a str, Vec<&'a str>>, + visited: &mut HashSet<&'a str>, + rec_stack: &mut HashSet<&'a str>, + ) -> bool { + visited.insert(node); + rec_stack.insert(node); + + if let Some(deps) = adj.get(node) { + for &dep in deps { + if !visited.contains(dep) { + if self.has_cycle(dep, adj, visited, rec_stack) { + return true; + } + } else if rec_stack.contains(dep) { + return true; + } + } + } + + rec_stack.remove(node); + false + } + + /// Check that all requirements are covered by at least one step. + pub fn check_requirement_coverage( + &self, + chain: &GeneratedChain, + directive: &Directive, + ) -> Result<(), PlannerError> { + let required_ids: HashSet<String> = directive + .requirements + .as_array() + .map(|arr| { + arr.iter() + .filter_map(|v| v.get("id").and_then(|id| id.as_str())) + .map(|s| s.to_string()) + .collect() + }) + .unwrap_or_default(); + + let covered_ids: HashSet<String> = chain + .steps + .iter() + .flat_map(|s| s.requirement_ids.clone()) + .collect(); + + for req_id in required_ids { + if !covered_ids.contains(&req_id) { + return Err(PlannerError::RequirementNotCovered(req_id)); + } + } + + Ok(()) + } + + /// Get topological order of steps. + pub fn topological_sort<'a>( + &self, + chain: &'a GeneratedChain, + ) -> Result<Vec<&'a str>, PlannerError> { + let mut in_degree: HashMap<&str, usize> = HashMap::new(); + let mut adj: HashMap<&str, Vec<&str>> = HashMap::new(); + + // Initialize + for step in &chain.steps { + in_degree.entry(step.name.as_str()).or_insert(0); + adj.entry(step.name.as_str()).or_insert_with(Vec::new); + } + + // Build graph (reversed - edges from dependency to dependent) + for step in &chain.steps { + for dep in &step.depends_on { + adj.entry(dep.as_str()) + .or_insert_with(Vec::new) + .push(step.name.as_str()); + *in_degree.entry(step.name.as_str()).or_insert(0) += 1; + } + } + + // Kahn's algorithm + let mut queue: Vec<&str> = in_degree + .iter() + .filter(|&(_, deg)| *deg == 0) + .map(|(&name, _)| name) + .collect(); + + let mut result = Vec::new(); + + while let Some(node) = queue.pop() { + result.push(node); + + if let Some(neighbors) = adj.get(node) { + for &neighbor in neighbors { + let deg = in_degree.get_mut(neighbor).unwrap(); + *deg -= 1; + if *deg == 0 { + queue.push(neighbor); + } + } + } + } + + if result.len() != chain.steps.len() { + return Err(PlannerError::CycleDetected( + "Cycle detected during topological sort".to_string(), + )); + } + + Ok(result) + } + + /// Convert generated steps to AddStepRequest for database insertion. + pub fn steps_to_requests( + &self, + chain: &GeneratedChain, + step_id_map: &HashMap<String, Uuid>, + ) -> Vec<AddStepRequest> { + chain + .steps + .iter() + .map(|step| { + let depends_on: Vec<Uuid> = step + .depends_on + .iter() + .filter_map(|name| step_id_map.get(name)) + .copied() + .collect(); + + let task_plan = step + .contract_template + .as_ref() + .and_then(|t| t.tasks.first()) + .map(|t| t.plan.clone()); + + AddStepRequest { + name: step.name.clone(), + description: Some(step.description.clone()), + step_type: Some(step.step_type.clone()), + contract_type: step.contract_template.as_ref().map(|t| t.contract_type.clone()), + initial_phase: Some("plan".to_string()), + task_plan, + phases: step.contract_template.as_ref().map(|t| t.phases.clone()), + depends_on: Some(depends_on), + parallel_group: None, + requirement_ids: Some(step.requirement_ids.clone()), + acceptance_criteria_ids: None, + verifier_config: None, + editor_x: None, + editor_y: None, + } + }) + .collect() + } + + /// Compute editor positions for steps based on DAG layout. + pub fn compute_editor_positions( + &self, + chain: &GeneratedChain, + ) -> HashMap<String, (f64, f64)> { + let depths = self.get_step_depths(chain); + let mut positions: HashMap<String, (f64, f64)> = HashMap::new(); + + // Group by depth + let mut by_depth: HashMap<usize, Vec<&str>> = HashMap::new(); + for step in &chain.steps { + let depth = depths.get(&step.name).copied().unwrap_or(0); + by_depth.entry(depth).or_default().push(&step.name); + } + + // Compute positions: x based on depth, y based on index within depth + let x_spacing = 250.0; + let y_spacing = 150.0; + + for (depth, steps) in &by_depth { + let x = (*depth as f64) * x_spacing + 100.0; + for (i, name) in steps.iter().enumerate() { + let y = (i as f64) * y_spacing + 100.0; + positions.insert(name.to_string(), (x, y)); + } + } + + positions + } + + /// Get depth of each step in the DAG. + fn get_step_depths(&self, chain: &GeneratedChain) -> HashMap<String, usize> { + let mut depths: HashMap<String, usize> = HashMap::new(); + + // Build dependency map + let deps: HashMap<String, Vec<String>> = chain + .steps + .iter() + .map(|s| (s.name.clone(), s.depends_on.clone())) + .collect(); + + fn compute_depth( + name: &str, + deps: &HashMap<String, Vec<String>>, + depths: &mut HashMap<String, usize>, + ) -> usize { + if let Some(&d) = depths.get(name) { + return d; + } + + let depth = deps + .get(name) + .map(|dep_list| { + dep_list + .iter() + .map(|d| compute_depth(d, deps, depths) + 1) + .max() + .unwrap_or(0) + }) + .unwrap_or(0); + + depths.insert(name.to_string(), depth); + depth + } + + for step in &chain.steps { + compute_depth(&step.name, &deps, &mut depths); + } + + depths + } + + /// Build a replanning prompt that preserves completed steps. + pub fn build_replan_prompt( + &self, + directive: &Directive, + completed_steps: &[ChainStep], + failed_step: Option<&ChainStep>, + reason: &str, + ) -> String { + let completed_summary: Vec<String> = completed_steps + .iter() + .map(|s| format!("- {} ({}): completed", s.name, s.step_type)) + .collect(); + + let failed_summary = failed_step + .map(|s| format!("Failed step: {} - {}", s.name, s.description.as_deref().unwrap_or(""))) + .unwrap_or_default(); + + format!( + r#"You are a software architect replanning an execution chain. + +## Original Goal +{goal} + +## Completed Steps (preserve these) +{completed} + +## Failure Information +{failed} +Reason: {reason} + +## Instructions +Generate a new execution plan that: +1. Preserves all completed work +2. Addresses the failure +3. Continues toward the original goal + +Use the same JSON format as before. Do not include already completed steps."#, + goal = directive.goal, + completed = completed_summary.join("\n"), + failed = failed_summary, + reason = reason, + ) + } +} + +/// Extract JSON from LLM response (handles markdown code blocks). +fn extract_json_from_response(response: &str) -> Result<String, PlannerError> { + // Try to find JSON in code block + if let Some(start) = response.find("```json") { + let json_start = start + 7; + if let Some(end) = response[json_start..].find("```") { + return Ok(response[json_start..json_start + end].trim().to_string()); + } + } + + // Try to find JSON in generic code block + if let Some(start) = response.find("```") { + let block_start = start + 3; + // Skip language identifier if present + let json_start = response[block_start..] + .find('\n') + .map(|i| block_start + i + 1) + .unwrap_or(block_start); + if let Some(end) = response[json_start..].find("```") { + return Ok(response[json_start..json_start + end].trim().to_string()); + } + } + + // Try to parse the whole thing as JSON + if response.trim().starts_with('{') { + return Ok(response.trim().to_string()); + } + + Err(PlannerError::InvalidPlan( + "Could not extract JSON from response".to_string(), + )) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_test_chain() -> GeneratedChain { + GeneratedChain { + name: "test-chain".to_string(), + description: "Test chain".to_string(), + steps: vec![ + GeneratedStep { + name: "step-a".to_string(), + step_type: "research".to_string(), + description: "Research step".to_string(), + depends_on: vec![], + requirement_ids: vec!["REQ-001".to_string()], + contract_template: None, + }, + GeneratedStep { + name: "step-b".to_string(), + step_type: "implement".to_string(), + description: "Implementation step".to_string(), + depends_on: vec!["step-a".to_string()], + requirement_ids: vec!["REQ-002".to_string()], + contract_template: None, + }, + GeneratedStep { + name: "step-c".to_string(), + step_type: "test".to_string(), + description: "Test step".to_string(), + depends_on: vec!["step-b".to_string()], + requirement_ids: vec!["REQ-001".to_string()], + contract_template: None, + }, + ], + } + } + + #[test] + fn test_validate_chain_valid() { + let planner = ChainPlanner::new(); + let chain = make_test_chain(); + assert!(planner.validate_chain(&chain).is_ok()); + } + + #[test] + fn test_validate_chain_invalid_dependency() { + let planner = ChainPlanner::new(); + let mut chain = make_test_chain(); + chain.steps[1].depends_on = vec!["nonexistent".to_string()]; + + let result = planner.validate_chain(&chain); + assert!(matches!(result, Err(PlannerError::InvalidDependency { .. }))); + } + + #[test] + fn test_validate_chain_cycle() { + let planner = ChainPlanner::new(); + let chain = GeneratedChain { + name: "cyclic".to_string(), + description: "Has cycle".to_string(), + steps: vec![ + GeneratedStep { + name: "a".to_string(), + step_type: "research".to_string(), + description: "A".to_string(), + depends_on: vec!["c".to_string()], + requirement_ids: vec![], + contract_template: None, + }, + GeneratedStep { + name: "b".to_string(), + step_type: "implement".to_string(), + description: "B".to_string(), + depends_on: vec!["a".to_string()], + requirement_ids: vec![], + contract_template: None, + }, + GeneratedStep { + name: "c".to_string(), + step_type: "test".to_string(), + description: "C".to_string(), + depends_on: vec!["b".to_string()], + requirement_ids: vec![], + contract_template: None, + }, + ], + }; + + let result = planner.validate_chain(&chain); + assert!(matches!(result, Err(PlannerError::CycleDetected(_)))); + } + + #[test] + fn test_topological_sort() { + let planner = ChainPlanner::new(); + let chain = make_test_chain(); + let order = planner.topological_sort(&chain).unwrap(); + + // step-a must come before step-b, step-b before step-c + let pos_a = order.iter().position(|&n| n == "step-a").unwrap(); + let pos_b = order.iter().position(|&n| n == "step-b").unwrap(); + let pos_c = order.iter().position(|&n| n == "step-c").unwrap(); + + assert!(pos_a < pos_b); + assert!(pos_b < pos_c); + } + + #[test] + fn test_extract_json_from_code_block() { + let response = r#" +Here's the plan: + +```json +{"name": "test"} +``` + +That's it! +"#; + let json = extract_json_from_response(response).unwrap(); + assert_eq!(json, r#"{"name": "test"}"#); + } + + #[test] + fn test_extract_json_raw() { + let response = r#"{"name": "test"}"#; + let json = extract_json_from_response(response).unwrap(); + assert_eq!(json, r#"{"name": "test"}"#); + } +} diff --git a/makima/src/orchestration/verifier.rs b/makima/src/orchestration/verifier.rs new file mode 100644 index 0000000..e98da50 --- /dev/null +++ b/makima/src/orchestration/verifier.rs @@ -0,0 +1,806 @@ +//! Verification system for directive step evaluation. +//! +//! Provides tiered verification: programmatic verifiers run first, +//! then LLM evaluation if programmatic checks pass. Composite scoring +//! combines results with configurable weights. + +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use serde_json::Value as JsonValue; +use std::path::Path; +use thiserror::Error; +use uuid::Uuid; + +/// Confidence level based on composite score and thresholds. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum ConfidenceLevel { + /// High confidence (score >= green threshold) + Green, + /// Medium confidence (score >= yellow threshold but < green) + Yellow, + /// Low confidence (score < yellow threshold) + Red, +} + +impl ConfidenceLevel { + /// Compute confidence level from score and thresholds. + pub fn from_score(score: f64, green_threshold: f64, yellow_threshold: f64) -> Self { + if score >= green_threshold { + ConfidenceLevel::Green + } else if score >= yellow_threshold { + ConfidenceLevel::Yellow + } else { + ConfidenceLevel::Red + } + } + + /// Convert to string for database storage. + pub fn as_str(&self) -> &'static str { + match self { + ConfidenceLevel::Green => "green", + ConfidenceLevel::Yellow => "yellow", + ConfidenceLevel::Red => "red", + } + } +} + +impl std::fmt::Display for ConfidenceLevel { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.as_str()) + } +} + +/// Type of verifier for categorization. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum VerifierType { + /// Run test suite (npm test, cargo test, pytest, etc.) + TestRunner, + /// Run linter (eslint, clippy, ruff, etc.) + Linter, + /// Run type checker (tsc, mypy, etc.) + TypeChecker, + /// Run build command (npm build, cargo build, etc.) + Build, + /// Custom command verifier + Custom, + /// LLM-based semantic evaluation + Llm, +} + +impl VerifierType { + pub fn as_str(&self) -> &'static str { + match self { + VerifierType::TestRunner => "test_runner", + VerifierType::Linter => "linter", + VerifierType::TypeChecker => "type_checker", + VerifierType::Build => "build", + VerifierType::Custom => "custom", + VerifierType::Llm => "llm", + } + } +} + +/// Result of a single verifier run. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct VerifierResult { + /// Name of the verifier + pub name: String, + /// Type of verifier + pub verifier_type: VerifierType, + /// Whether the verification passed + pub passed: bool, + /// Score from 0.0 to 1.0 (1.0 = perfect, 0.0 = complete failure) + pub score: f64, + /// Weight for composite scoring (default 1.0 for programmatic, 2.0 for LLM) + pub weight: f64, + /// Whether this verifier is required (failure = automatic red confidence) + pub required: bool, + /// Human-readable output/feedback + pub output: String, + /// Structured details (test counts, lint errors, etc.) + pub details: Option<JsonValue>, + /// Execution time in milliseconds + pub duration_ms: u64, +} + +impl VerifierResult { + /// Create a passed result with full score. + pub fn passed(name: String, verifier_type: VerifierType, output: String) -> Self { + Self { + name, + verifier_type, + passed: true, + score: 1.0, + weight: 1.0, + required: false, + output, + details: None, + duration_ms: 0, + } + } + + /// Create a failed result with zero score. + pub fn failed(name: String, verifier_type: VerifierType, output: String) -> Self { + Self { + name, + verifier_type, + passed: false, + score: 0.0, + weight: 1.0, + required: false, + output, + details: None, + duration_ms: 0, + } + } + + /// Set the weight for this result. + pub fn with_weight(mut self, weight: f64) -> Self { + self.weight = weight; + self + } + + /// Mark this verifier as required. + pub fn as_required(mut self) -> Self { + self.required = true; + self + } + + /// Set the score explicitly. + pub fn with_score(mut self, score: f64) -> Self { + self.score = score.clamp(0.0, 1.0); + self + } + + /// Set structured details. + pub fn with_details(mut self, details: JsonValue) -> Self { + self.details = Some(details); + self + } + + /// Set execution duration. + pub fn with_duration(mut self, duration_ms: u64) -> Self { + self.duration_ms = duration_ms; + self + } +} + +/// Composite evaluation result combining multiple verifier results. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EvaluationResult { + /// Unique ID for this evaluation + pub id: Uuid, + /// Step ID being evaluated + pub step_id: Uuid, + /// Whether all required verifiers passed + pub passed: bool, + /// Weighted composite score (0.0-1.0) + pub composite_score: f64, + /// Confidence level derived from score + pub confidence_level: ConfidenceLevel, + /// Individual verifier results + pub verifier_results: Vec<VerifierResult>, + /// Summary feedback for the step + pub summary: String, + /// Rework instructions if failed + pub rework_instructions: Option<String>, + /// Total evaluation duration in milliseconds + pub total_duration_ms: u64, +} + +impl EvaluationResult { + /// Create a new evaluation result from verifier results. + pub fn from_verifiers( + step_id: Uuid, + results: Vec<VerifierResult>, + green_threshold: f64, + yellow_threshold: f64, + ) -> Self { + let id = Uuid::new_v4(); + + // Check if any required verifier failed + let any_required_failed = results.iter().any(|r| r.required && !r.passed); + + // Calculate weighted composite score + let (total_weighted_score, total_weight) = + results + .iter() + .fold((0.0, 0.0), |(score_acc, weight_acc), r| { + (score_acc + r.score * r.weight, weight_acc + r.weight) + }); + + let composite_score = if total_weight > 0.0 { + total_weighted_score / total_weight + } else { + 0.0 + }; + + // Override confidence to red if any required verifier failed + let confidence_level = if any_required_failed { + ConfidenceLevel::Red + } else { + ConfidenceLevel::from_score(composite_score, green_threshold, yellow_threshold) + }; + + let passed = !any_required_failed && confidence_level != ConfidenceLevel::Red; + + // Generate summary + let passed_count = results.iter().filter(|r| r.passed).count(); + let total_count = results.len(); + let summary = format!( + "{}/{} verifiers passed, composite score: {:.2}, confidence: {}", + passed_count, total_count, composite_score, confidence_level + ); + + // Generate rework instructions if failed + let rework_instructions = if !passed { + let failed_verifiers: Vec<&str> = results + .iter() + .filter(|r| !r.passed) + .map(|r| r.name.as_str()) + .collect(); + Some(format!( + "Fix issues identified by: {}", + failed_verifiers.join(", ") + )) + } else { + None + }; + + let total_duration_ms = results.iter().map(|r| r.duration_ms).sum(); + + Self { + id, + step_id, + passed, + composite_score, + confidence_level, + verifier_results: results, + summary, + rework_instructions, + total_duration_ms, + } + } +} + +/// Error type for verification operations. +#[derive(Error, Debug)] +pub enum VerifierError { + #[error("Command execution failed: {0}")] + CommandFailed(String), + + #[error("Command timed out after {0}ms")] + Timeout(u64), + + #[error("Working directory not found: {0}")] + WorkingDirectoryNotFound(String), + + #[error("Verifier not configured: {0}")] + NotConfigured(String), + + #[error("Parse error: {0}")] + ParseError(String), + + #[error("LLM error: {0}")] + LlmError(String), + + #[error("IO error: {0}")] + Io(#[from] std::io::Error), +} + +/// Verifier trait for pluggable verification implementations. +#[async_trait] +pub trait Verifier: Send + Sync { + /// Get the name of this verifier. + fn name(&self) -> &str; + + /// Get the type of this verifier. + fn verifier_type(&self) -> VerifierType; + + /// Check if this verifier is applicable to the given repository. + async fn is_applicable(&self, repo_path: &Path) -> bool; + + /// Run verification and return result. + async fn verify(&self, repo_path: &Path, context: &VerificationContext) + -> Result<VerifierResult, VerifierError>; +} + +/// Context provided to verifiers during execution. +#[derive(Debug, Clone)] +pub struct VerificationContext { + /// Step ID being verified + pub step_id: Uuid, + /// Contract ID if step has been instantiated + pub contract_id: Option<Uuid>, + /// Files that were modified in this step + pub modified_files: Vec<String>, + /// Step description for LLM context + pub step_description: String, + /// Acceptance criteria for LLM evaluation + pub acceptance_criteria: Vec<String>, + /// Additional context from directive + pub directive_context: String, +} + +/// Command-based verifier for running shell commands. +pub struct CommandVerifier { + name: String, + verifier_type: VerifierType, + command: String, + #[allow(dead_code)] + working_dir: Option<String>, + #[allow(dead_code)] + timeout_ms: u64, + required: bool, + /// Files/patterns that indicate this verifier is applicable + applicable_patterns: Vec<String>, +} + +impl CommandVerifier { + /// Create a new command verifier. + pub fn new( + name: impl Into<String>, + verifier_type: VerifierType, + command: impl Into<String>, + ) -> Self { + Self { + name: name.into(), + verifier_type, + command: command.into(), + working_dir: None, + timeout_ms: 300_000, // 5 minute default + required: false, + applicable_patterns: Vec::new(), + } + } + + /// Set the working directory. + #[allow(dead_code)] + pub fn with_working_dir(mut self, dir: impl Into<String>) -> Self { + self.working_dir = Some(dir.into()); + self + } + + /// Set the timeout in milliseconds. + #[allow(dead_code)] + pub fn with_timeout(mut self, timeout_ms: u64) -> Self { + self.timeout_ms = timeout_ms; + self + } + + /// Mark as required verifier. + pub fn as_required(mut self) -> Self { + self.required = true; + self + } + + /// Add applicability patterns (files that must exist). + pub fn with_patterns(mut self, patterns: Vec<String>) -> Self { + self.applicable_patterns = patterns; + self + } +} + +#[async_trait] +impl Verifier for CommandVerifier { + fn name(&self) -> &str { + &self.name + } + + fn verifier_type(&self) -> VerifierType { + self.verifier_type.clone() + } + + async fn is_applicable(&self, repo_path: &Path) -> bool { + if self.applicable_patterns.is_empty() { + return true; + } + + for pattern in &self.applicable_patterns { + let check_path = repo_path.join(pattern); + if check_path.exists() { + return true; + } + } + false + } + + async fn verify( + &self, + repo_path: &Path, + _context: &VerificationContext, + ) -> Result<VerifierResult, VerifierError> { + let start = std::time::Instant::now(); + + let work_dir = self + .working_dir + .as_ref() + .map(|d| repo_path.join(d)) + .unwrap_or_else(|| repo_path.to_path_buf()); + + if !work_dir.exists() { + return Err(VerifierError::WorkingDirectoryNotFound( + work_dir.display().to_string(), + )); + } + + // Parse command into program and args + let parts: Vec<&str> = self.command.split_whitespace().collect(); + if parts.is_empty() { + return Err(VerifierError::CommandFailed( + "Empty command".to_string(), + )); + } + + let program = parts[0]; + let args = &parts[1..]; + + // Execute command + let output = tokio::process::Command::new(program) + .args(args) + .current_dir(&work_dir) + .output() + .await?; + + let duration_ms = start.elapsed().as_millis() as u64; + let stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + let combined_output = format!("{}\n{}", stdout, stderr); + + let passed = output.status.success(); + let score = if passed { 1.0 } else { 0.0 }; + + let mut result = VerifierResult { + name: self.name.clone(), + verifier_type: self.verifier_type.clone(), + passed, + score, + weight: 1.0, + required: self.required, + output: combined_output, + details: Some(serde_json::json!({ + "exit_code": output.status.code(), + "command": self.command, + "working_dir": work_dir.display().to_string(), + })), + duration_ms, + }; + + // Try to extract more detailed scoring from output + result = self.enhance_result(result, &stdout); + + Ok(result) + } +} + +impl CommandVerifier { + /// Enhance result with parsed details from output. + fn enhance_result(&self, mut result: VerifierResult, stdout: &str) -> VerifierResult { + match self.verifier_type { + VerifierType::TestRunner => { + // Try to parse test counts from common formats + if let Some((passed, failed, total)) = parse_test_output(stdout) { + result.details = Some(serde_json::json!({ + "tests_passed": passed, + "tests_failed": failed, + "tests_total": total, + "command": self.command, + })); + if total > 0 { + result.score = passed as f64 / total as f64; + } + } + } + VerifierType::Linter => { + // Try to parse lint error counts + if let Some(error_count) = parse_lint_output(stdout) { + result.details = Some(serde_json::json!({ + "errors": error_count, + "command": self.command, + })); + // Score decreases with more errors (up to 10 errors = 0) + result.score = (1.0 - (error_count as f64 / 10.0)).max(0.0); + } + } + _ => {} + } + result + } +} + +/// Parse test output for common formats (Jest, pytest, cargo test). +fn parse_test_output(output: &str) -> Option<(u32, u32, u32)> { + // Jest format: "Tests: X passed, Y failed, Z total" + if let Some(caps) = regex::Regex::new(r"Tests:\s*(\d+)\s*passed,\s*(\d+)\s*failed,\s*(\d+)\s*total") + .ok()? + .captures(output) + { + let passed: u32 = caps.get(1)?.as_str().parse().ok()?; + let failed: u32 = caps.get(2)?.as_str().parse().ok()?; + let total: u32 = caps.get(3)?.as_str().parse().ok()?; + return Some((passed, failed, total)); + } + + // pytest format: "X passed, Y failed" + if let Some(caps) = regex::Regex::new(r"(\d+)\s*passed(?:,\s*(\d+)\s*failed)?") + .ok()? + .captures(output) + { + let passed: u32 = caps.get(1)?.as_str().parse().ok()?; + let failed: u32 = caps.get(2).map(|m| m.as_str().parse().ok()).flatten().unwrap_or(0); + let total = passed + failed; + return Some((passed, failed, total)); + } + + // cargo test format: "test result: ok. X passed; Y failed;" + if let Some(caps) = regex::Regex::new(r"test result:.*?(\d+)\s*passed;\s*(\d+)\s*failed") + .ok()? + .captures(output) + { + let passed: u32 = caps.get(1)?.as_str().parse().ok()?; + let failed: u32 = caps.get(2)?.as_str().parse().ok()?; + let total = passed + failed; + return Some((passed, failed, total)); + } + + None +} + +/// Parse lint output for error counts. +fn parse_lint_output(output: &str) -> Option<u32> { + // ESLint format: "X problems (Y errors, Z warnings)" + if let Some(caps) = regex::Regex::new(r"(\d+)\s*problems?\s*\((\d+)\s*errors?") + .ok()? + .captures(output) + { + return caps.get(2)?.as_str().parse().ok(); + } + + // Clippy format: "warning: X warnings emitted" + if let Some(caps) = regex::Regex::new(r"warning:\s*(\d+)\s*warnings?\s*emitted") + .ok()? + .captures(output) + { + return caps.get(1)?.as_str().parse().ok(); + } + + None +} + +/// Auto-detect applicable verifiers for a repository. +pub async fn auto_detect_verifiers(repo_path: &Path) -> Vec<Box<dyn Verifier>> { + let mut verifiers: Vec<Box<dyn Verifier>> = Vec::new(); + + // Check for package.json (Node.js) + let package_json = repo_path.join("package.json"); + if package_json.exists() { + if let Ok(content) = tokio::fs::read_to_string(&package_json).await { + if let Ok(pkg) = serde_json::from_str::<serde_json::Value>(&content) { + if let Some(scripts) = pkg.get("scripts").and_then(|s| s.as_object()) { + // Test runner + if scripts.contains_key("test") { + verifiers.push(Box::new( + CommandVerifier::new("npm-test", VerifierType::TestRunner, "npm test") + .with_patterns(vec!["package.json".to_string()]) + .as_required(), + )); + } + + // Linter + if scripts.contains_key("lint") { + verifiers.push(Box::new( + CommandVerifier::new("npm-lint", VerifierType::Linter, "npm run lint") + .with_patterns(vec!["package.json".to_string()]), + )); + } + + // Build + if scripts.contains_key("build") { + verifiers.push(Box::new( + CommandVerifier::new("npm-build", VerifierType::Build, "npm run build") + .with_patterns(vec!["package.json".to_string()]) + .as_required(), + )); + } + + // Type check (for TypeScript projects) + if scripts.contains_key("typecheck") || scripts.contains_key("type-check") { + let cmd = if scripts.contains_key("typecheck") { + "npm run typecheck" + } else { + "npm run type-check" + }; + verifiers.push(Box::new( + CommandVerifier::new("npm-typecheck", VerifierType::TypeChecker, cmd) + .with_patterns(vec!["tsconfig.json".to_string()]), + )); + } + } + } + } + } + + // Check for Cargo.toml (Rust) + let cargo_toml = repo_path.join("Cargo.toml"); + if cargo_toml.exists() { + verifiers.push(Box::new( + CommandVerifier::new("cargo-test", VerifierType::TestRunner, "cargo test") + .with_patterns(vec!["Cargo.toml".to_string()]) + .as_required(), + )); + + verifiers.push(Box::new( + CommandVerifier::new("cargo-clippy", VerifierType::Linter, "cargo clippy -- -D warnings") + .with_patterns(vec!["Cargo.toml".to_string()]), + )); + + verifiers.push(Box::new( + CommandVerifier::new("cargo-build", VerifierType::Build, "cargo build") + .with_patterns(vec!["Cargo.toml".to_string()]) + .as_required(), + )); + } + + // Check for pyproject.toml or setup.py (Python) + let pyproject = repo_path.join("pyproject.toml"); + let setup_py = repo_path.join("setup.py"); + if pyproject.exists() || setup_py.exists() { + verifiers.push(Box::new( + CommandVerifier::new("pytest", VerifierType::TestRunner, "pytest") + .with_patterns(vec![ + "pyproject.toml".to_string(), + "setup.py".to_string(), + ]) + .as_required(), + )); + + verifiers.push(Box::new( + CommandVerifier::new("ruff", VerifierType::Linter, "ruff check .") + .with_patterns(vec!["pyproject.toml".to_string()]), + )); + } + + verifiers +} + +/// Composite evaluator that runs multiple verifiers and combines results. +pub struct CompositeEvaluator { + verifiers: Vec<Box<dyn Verifier>>, + green_threshold: f64, + yellow_threshold: f64, +} + +impl CompositeEvaluator { + /// Create a new composite evaluator with default thresholds. + pub fn new(verifiers: Vec<Box<dyn Verifier>>) -> Self { + Self { + verifiers, + green_threshold: 0.8, + yellow_threshold: 0.5, + } + } + + /// Set confidence thresholds. + pub fn with_thresholds(mut self, green: f64, yellow: f64) -> Self { + self.green_threshold = green; + self.yellow_threshold = yellow; + self + } + + /// Add a verifier. + pub fn add_verifier(mut self, verifier: Box<dyn Verifier>) -> Self { + self.verifiers.push(verifier); + self + } + + /// Run all applicable verifiers and return composite result. + pub async fn evaluate( + &self, + repo_path: &Path, + context: &VerificationContext, + ) -> EvaluationResult { + let mut results = Vec::new(); + + for verifier in &self.verifiers { + if !verifier.is_applicable(repo_path).await { + continue; + } + + match verifier.verify(repo_path, context).await { + Ok(result) => results.push(result), + Err(e) => { + // Convert error to failed result + results.push(VerifierResult::failed( + verifier.name().to_string(), + verifier.verifier_type(), + format!("Verifier error: {}", e), + )); + } + } + } + + EvaluationResult::from_verifiers( + context.step_id, + results, + self.green_threshold, + self.yellow_threshold, + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_confidence_level_from_score() { + assert_eq!( + ConfidenceLevel::from_score(0.9, 0.8, 0.5), + ConfidenceLevel::Green + ); + assert_eq!( + ConfidenceLevel::from_score(0.8, 0.8, 0.5), + ConfidenceLevel::Green + ); + assert_eq!( + ConfidenceLevel::from_score(0.6, 0.8, 0.5), + ConfidenceLevel::Yellow + ); + assert_eq!( + ConfidenceLevel::from_score(0.5, 0.8, 0.5), + ConfidenceLevel::Yellow + ); + assert_eq!( + ConfidenceLevel::from_score(0.4, 0.8, 0.5), + ConfidenceLevel::Red + ); + } + + #[test] + fn test_evaluation_result_composite_score() { + let results = vec![ + VerifierResult::passed("test1".into(), VerifierType::TestRunner, "OK".into()) + .with_weight(1.0), + VerifierResult::failed("test2".into(), VerifierType::Linter, "Failed".into()) + .with_weight(1.0), + ]; + + let eval = EvaluationResult::from_verifiers(Uuid::new_v4(), results, 0.8, 0.5); + assert!((eval.composite_score - 0.5).abs() < 0.001); + assert_eq!(eval.confidence_level, ConfidenceLevel::Yellow); + } + + #[test] + fn test_required_verifier_override() { + let results = vec![ + VerifierResult::passed("test1".into(), VerifierType::TestRunner, "OK".into()), + VerifierResult::failed("build".into(), VerifierType::Build, "Failed".into()) + .as_required(), + ]; + + let eval = EvaluationResult::from_verifiers(Uuid::new_v4(), results, 0.8, 0.5); + // Even though composite score is 0.5, required failure overrides to red + assert_eq!(eval.confidence_level, ConfidenceLevel::Red); + assert!(!eval.passed); + } + + #[test] + fn test_parse_test_output_jest() { + let output = "Tests: 10 passed, 2 failed, 12 total"; + let (passed, failed, total) = parse_test_output(output).unwrap(); + assert_eq!(passed, 10); + assert_eq!(failed, 2); + assert_eq!(total, 12); + } + + #[test] + fn test_parse_test_output_cargo() { + let output = "test result: ok. 25 passed; 0 failed;"; + let (passed, failed, total) = parse_test_output(output).unwrap(); + assert_eq!(passed, 25); + assert_eq!(failed, 0); + assert_eq!(total, 25); + } +} |
