summaryrefslogtreecommitdiff
path: root/makima/src/orchestration/engine.rs
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/orchestration/engine.rs')
-rw-r--r--makima/src/orchestration/engine.rs976
1 files changed, 976 insertions, 0 deletions
diff --git a/makima/src/orchestration/engine.rs b/makima/src/orchestration/engine.rs
new file mode 100644
index 0000000..5bbb99f
--- /dev/null
+++ b/makima/src/orchestration/engine.rs
@@ -0,0 +1,976 @@
+//! Directive orchestration engine.
+//!
+//! Manages the lifecycle of directives:
+//! - Starts directives and generates initial chains
+//! - Monitors step execution and triggers evaluations
+//! - Handles rework, escalation, and chain regeneration
+//! - Enforces circuit breakers (cost, time, rework limits)
+
+use std::collections::HashMap;
+
+use sqlx::PgPool;
+use thiserror::Error;
+use tokio::sync::broadcast;
+use uuid::Uuid;
+
+use crate::db::models::{AddStepRequest, ChainStep, Directive, DirectiveEvent, UpdateStepRequest};
+use crate::db::repository::{self, RepositoryError};
+
+use super::planner::{ChainPlanner, GeneratedChain, PlannerError};
+use super::verifier::{
+ auto_detect_verifiers, CompositeEvaluator, ConfidenceLevel, EvaluationResult,
+ VerificationContext,
+};
+
+/// Error type for engine operations.
+#[derive(Error, Debug)]
+pub enum EngineError {
+ #[error("Database error: {0}")]
+ Database(#[from] sqlx::Error),
+
+ #[error("Repository error: {0}")]
+ Repository(#[from] RepositoryError),
+
+ #[error("Planner error: {0}")]
+ Planner(#[from] PlannerError),
+
+ #[error("Directive not found: {0}")]
+ DirectiveNotFound(Uuid),
+
+ #[error("Chain not found for directive: {0}")]
+ ChainNotFound(Uuid),
+
+ #[error("Step not found: {0}")]
+ StepNotFound(Uuid),
+
+ #[error("Invalid state transition: {from} -> {to}")]
+ InvalidStateTransition { from: String, to: String },
+
+ #[error("Circuit breaker triggered: {0}")]
+ CircuitBreaker(String),
+
+ #[error("Directive is paused")]
+ DirectivePaused,
+
+ #[error("Contract creation failed: {0}")]
+ ContractCreation(String),
+
+ #[error("LLM error: {0}")]
+ LlmError(String),
+}
+
+/// Event emitted by the engine for UI updates.
+#[derive(Debug, Clone)]
+pub enum EngineEvent {
+ /// Directive status changed
+ DirectiveStatusChanged {
+ directive_id: Uuid,
+ old_status: String,
+ new_status: String,
+ },
+ /// Step status changed
+ StepStatusChanged {
+ directive_id: Uuid,
+ step_id: Uuid,
+ old_status: String,
+ new_status: String,
+ },
+ /// Evaluation completed
+ EvaluationCompleted {
+ directive_id: Uuid,
+ step_id: Uuid,
+ passed: bool,
+ confidence_level: ConfidenceLevel,
+ },
+ /// Approval required
+ ApprovalRequired {
+ directive_id: Uuid,
+ approval_id: Uuid,
+ approval_type: String,
+ },
+ /// Chain regenerated
+ ChainRegenerated {
+ directive_id: Uuid,
+ old_chain_id: Uuid,
+ new_chain_id: Uuid,
+ },
+}
+
+/// Main orchestration engine for directives.
+pub struct DirectiveEngine {
+ pool: PgPool,
+ planner: ChainPlanner,
+ event_tx: Option<broadcast::Sender<EngineEvent>>,
+}
+
+impl DirectiveEngine {
+ /// Create a new directive engine.
+ pub fn new(pool: PgPool) -> Self {
+ Self {
+ pool,
+ planner: ChainPlanner::new(),
+ event_tx: None,
+ }
+ }
+
+ /// Set the event broadcast channel for UI updates.
+ pub fn with_event_channel(mut self, tx: broadcast::Sender<EngineEvent>) -> Self {
+ self.event_tx = Some(tx);
+ self
+ }
+
+ /// Emit an event if channel is configured.
+ fn emit_event(&self, event: EngineEvent) {
+ if let Some(tx) = &self.event_tx {
+ let _ = tx.send(event);
+ }
+ }
+
+ // ========================================================================
+ // Directive Lifecycle
+ // ========================================================================
+
+ /// Start a directive: generate chain and begin execution.
+ pub async fn start_directive(&self, directive_id: Uuid) -> Result<(), EngineError> {
+ let directive = repository::get_directive(&self.pool, directive_id)
+ .await?
+ .ok_or(EngineError::DirectiveNotFound(directive_id))?;
+
+ // Validate current state
+ if directive.status != "draft" && directive.status != "paused" {
+ return Err(EngineError::InvalidStateTransition {
+ from: directive.status,
+ to: "planning".to_string(),
+ });
+ }
+
+ // Update status to planning
+ repository::update_directive_status(&self.pool, directive_id, "planning").await?;
+ self.emit_directive_event(
+ directive_id,
+ "status_changed",
+ "info",
+ serde_json::json!({"old_status": directive.status, "new_status": "planning"}),
+ "system",
+ )
+ .await?;
+
+ // Generate chain (placeholder - actual LLM call would go here)
+ let chain = self.generate_initial_chain(&directive).await?;
+
+ // Create chain in database
+ let db_chain = repository::create_directive_chain(
+ &self.pool,
+ directive_id,
+ &chain.name,
+ Some(&chain.description),
+ None, // rationale
+ None, // planning_model
+ )
+ .await?;
+
+ // Create steps
+ self.create_steps_from_chain(&db_chain.id, &chain).await?;
+
+ // Update directive to active
+ repository::update_directive_status(&self.pool, directive_id, "active").await?;
+ self.emit_event(EngineEvent::DirectiveStatusChanged {
+ directive_id,
+ old_status: "planning".to_string(),
+ new_status: "active".to_string(),
+ });
+
+ // Start ready steps
+ self.advance_chain(directive_id).await?;
+
+ Ok(())
+ }
+
+ /// Pause a directive.
+ pub async fn pause_directive(&self, directive_id: Uuid) -> Result<(), EngineError> {
+ let directive = repository::get_directive(&self.pool, directive_id)
+ .await?
+ .ok_or(EngineError::DirectiveNotFound(directive_id))?;
+
+ if directive.status != "active" {
+ return Err(EngineError::InvalidStateTransition {
+ from: directive.status,
+ to: "paused".to_string(),
+ });
+ }
+
+ repository::update_directive_status(&self.pool, directive_id, "paused").await?;
+ self.emit_event(EngineEvent::DirectiveStatusChanged {
+ directive_id,
+ old_status: "active".to_string(),
+ new_status: "paused".to_string(),
+ });
+
+ Ok(())
+ }
+
+ /// Resume a paused directive.
+ pub async fn resume_directive(&self, directive_id: Uuid) -> Result<(), EngineError> {
+ let directive = repository::get_directive(&self.pool, directive_id)
+ .await?
+ .ok_or(EngineError::DirectiveNotFound(directive_id))?;
+
+ if directive.status != "paused" {
+ return Err(EngineError::InvalidStateTransition {
+ from: directive.status,
+ to: "active".to_string(),
+ });
+ }
+
+ repository::update_directive_status(&self.pool, directive_id, "active").await?;
+ self.emit_event(EngineEvent::DirectiveStatusChanged {
+ directive_id,
+ old_status: "paused".to_string(),
+ new_status: "active".to_string(),
+ });
+
+ // Continue execution
+ self.advance_chain(directive_id).await?;
+
+ Ok(())
+ }
+
+ /// Stop a directive (cannot be resumed).
+ pub async fn stop_directive(&self, directive_id: Uuid) -> Result<(), EngineError> {
+ let directive = repository::get_directive(&self.pool, directive_id)
+ .await?
+ .ok_or(EngineError::DirectiveNotFound(directive_id))?;
+
+ if directive.status == "completed" || directive.status == "failed" {
+ return Err(EngineError::InvalidStateTransition {
+ from: directive.status,
+ to: "failed".to_string(),
+ });
+ }
+
+ repository::update_directive_status(&self.pool, directive_id, "failed").await?;
+ self.emit_event(EngineEvent::DirectiveStatusChanged {
+ directive_id,
+ old_status: directive.status,
+ new_status: "failed".to_string(),
+ });
+
+ Ok(())
+ }
+
+ // ========================================================================
+ // Chain Management
+ // ========================================================================
+
+ /// Generate initial chain from directive.
+ async fn generate_initial_chain(
+ &self,
+ directive: &Directive,
+ ) -> Result<GeneratedChain, EngineError> {
+ // Build planning prompt
+ let _prompt = self.planner.build_planning_prompt(directive);
+
+ // TODO: Call LLM to generate chain
+ // For now, return a simple placeholder chain
+ let chain = GeneratedChain {
+ name: format!("{}-chain", directive.title.to_lowercase().replace(' ', "-")),
+ description: format!("Execution plan for: {}", directive.goal),
+ steps: vec![
+ super::planner::GeneratedStep {
+ name: "research".to_string(),
+ step_type: "research".to_string(),
+ description: "Research and understand the requirements".to_string(),
+ depends_on: vec![],
+ requirement_ids: vec![],
+ contract_template: None,
+ },
+ super::planner::GeneratedStep {
+ name: "implement".to_string(),
+ step_type: "implement".to_string(),
+ description: "Implement the solution".to_string(),
+ depends_on: vec!["research".to_string()],
+ requirement_ids: vec![],
+ contract_template: None,
+ },
+ super::planner::GeneratedStep {
+ name: "test".to_string(),
+ step_type: "test".to_string(),
+ description: "Test and verify the implementation".to_string(),
+ depends_on: vec!["implement".to_string()],
+ requirement_ids: vec![],
+ contract_template: None,
+ },
+ ],
+ };
+
+ // Validate the chain
+ self.planner.validate_chain(&chain)?;
+
+ Ok(chain)
+ }
+
+ /// Create database steps from a generated chain.
+ async fn create_steps_from_chain(
+ &self,
+ chain_id: &Uuid,
+ chain: &GeneratedChain,
+ ) -> Result<(), EngineError> {
+ // First pass: create all steps and build name-to-id map
+ let mut step_id_map: HashMap<String, Uuid> = HashMap::new();
+
+ // Get editor positions
+ let positions = self.planner.compute_editor_positions(chain);
+
+ for step in &chain.steps {
+ let (editor_x, editor_y) = positions
+ .get(&step.name)
+ .copied()
+ .unwrap_or((100.0, 100.0));
+
+ let task_plan = step
+ .contract_template
+ .as_ref()
+ .and_then(|t| t.tasks.first())
+ .map(|t| t.plan.clone())
+ .or_else(|| Some(step.description.clone()));
+
+ let request = AddStepRequest {
+ name: step.name.clone(),
+ description: Some(step.description.clone()),
+ step_type: Some(step.step_type.clone()),
+ contract_type: step.contract_template.as_ref().map(|t| t.contract_type.clone()),
+ initial_phase: Some("plan".to_string()),
+ task_plan,
+ phases: step.contract_template.as_ref().map(|t| t.phases.clone()),
+ depends_on: None, // Will update in second pass
+ parallel_group: None,
+ requirement_ids: Some(step.requirement_ids.clone()),
+ acceptance_criteria_ids: None,
+ verifier_config: None,
+ editor_x: Some(editor_x),
+ editor_y: Some(editor_y),
+ };
+
+ let db_step = repository::create_chain_step(&self.pool, *chain_id, request).await?;
+ step_id_map.insert(step.name.clone(), db_step.id);
+ }
+
+ // Second pass: update dependencies
+ for step in &chain.steps {
+ if step.depends_on.is_empty() {
+ continue;
+ }
+
+ let step_id = step_id_map.get(&step.name).unwrap();
+ let dep_ids: Vec<Uuid> = step
+ .depends_on
+ .iter()
+ .filter_map(|name| step_id_map.get(name))
+ .copied()
+ .collect();
+
+ // Update step with proper dependencies
+ let update = UpdateStepRequest {
+ name: None,
+ description: None,
+ task_plan: None,
+ depends_on: Some(dep_ids),
+ requirement_ids: None,
+ acceptance_criteria_ids: None,
+ verifier_config: None,
+ editor_x: None,
+ editor_y: None,
+ };
+
+ repository::update_chain_step(&self.pool, *step_id, update).await?;
+ }
+
+ Ok(())
+ }
+
+ /// Regenerate chain while preserving completed steps.
+ pub async fn regenerate_chain(
+ &self,
+ directive_id: Uuid,
+ reason: &str,
+ ) -> Result<Uuid, EngineError> {
+ let directive = repository::get_directive(&self.pool, directive_id)
+ .await?
+ .ok_or(EngineError::DirectiveNotFound(directive_id))?;
+
+ let current_chain = repository::get_current_chain(&self.pool, directive_id)
+ .await?
+ .ok_or(EngineError::ChainNotFound(directive_id))?;
+
+ // Get completed and failed steps
+ let steps = repository::list_chain_steps(&self.pool, current_chain.id).await?;
+ let completed_steps: Vec<_> = steps.iter().filter(|s| s.status == "passed").collect();
+ let failed_step = steps.iter().find(|s| s.status == "failed");
+
+ // Build replan prompt
+ let _prompt = self.planner.build_replan_prompt(
+ &directive,
+ &completed_steps.iter().map(|s| (*s).clone()).collect::<Vec<_>>(),
+ failed_step.map(|s| &*s),
+ reason,
+ );
+
+ // TODO: Call LLM to regenerate chain
+ // For now, just create a new chain with similar structure
+ let new_chain = self.generate_initial_chain(&directive).await?;
+
+ // Supersede old chain
+ repository::supersede_chain(&self.pool, current_chain.id).await?;
+
+ // Create new chain
+ let db_chain = repository::create_directive_chain(
+ &self.pool,
+ directive_id,
+ &new_chain.name,
+ Some(&new_chain.description),
+ Some(reason), // rationale
+ None, // planning_model
+ )
+ .await?;
+
+ // Create steps
+ self.create_steps_from_chain(&db_chain.id, &new_chain).await?;
+
+ self.emit_event(EngineEvent::ChainRegenerated {
+ directive_id,
+ old_chain_id: current_chain.id,
+ new_chain_id: db_chain.id,
+ });
+
+ // Continue execution
+ self.advance_chain(directive_id).await?;
+
+ Ok(db_chain.id)
+ }
+
+ // ========================================================================
+ // Step Execution
+ // ========================================================================
+
+ /// Advance chain execution: find ready steps and start them.
+ pub async fn advance_chain(&self, directive_id: Uuid) -> Result<(), EngineError> {
+ let directive = repository::get_directive(&self.pool, directive_id)
+ .await?
+ .ok_or(EngineError::DirectiveNotFound(directive_id))?;
+
+ // Check if directive is active
+ if directive.status == "paused" {
+ return Err(EngineError::DirectivePaused);
+ }
+ if directive.status != "active" {
+ return Ok(()); // Not an error, just nothing to do
+ }
+
+ // Check circuit breakers
+ self.check_circuit_breakers(&directive).await?;
+
+ // Get current chain
+ let chain = repository::get_current_chain(&self.pool, directive_id)
+ .await?
+ .ok_or(EngineError::ChainNotFound(directive_id))?;
+
+ // Find ready steps (dependencies met, status=pending)
+ let ready_steps = repository::find_ready_steps(&self.pool, chain.id).await?;
+
+ // Start each ready step
+ for step in ready_steps {
+ self.start_step(&directive, &step).await?;
+ }
+
+ // Check if chain is complete
+ let all_steps = repository::list_chain_steps(&self.pool, chain.id).await?;
+ let all_passed = all_steps.iter().all(|s| s.status == "passed" || s.status == "skipped");
+ let any_blocked = all_steps.iter().any(|s| s.status == "blocked" || s.status == "failed");
+
+ if all_passed && !all_steps.is_empty() {
+ // Complete the directive
+ self.complete_directive(directive_id).await?;
+ } else if any_blocked {
+ // Check if we should regenerate or fail
+ let failed_count = all_steps.iter().filter(|s| s.status == "failed").count();
+ if failed_count > 3 {
+ // Too many failures, fail the directive
+ repository::update_directive_status(&self.pool, directive_id, "failed").await?;
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Start a step by creating its contract and supervisor task.
+ async fn start_step(&self, directive: &Directive, step: &ChainStep) -> Result<(), EngineError> {
+ // Update step status to ready
+ repository::update_step_status(&self.pool, step.id, "ready").await?;
+ self.emit_event(EngineEvent::StepStatusChanged {
+ directive_id: directive.id,
+ step_id: step.id,
+ old_status: "pending".to_string(),
+ new_status: "ready".to_string(),
+ });
+
+ // Get contract details from step template
+ let (_name, _description, _contract_type, _initial_phase) =
+ self.get_contract_details(directive, step);
+
+ // TODO: Actually create the contract via the contracts handler
+ // For now, just update the step status to running
+ // In a full implementation, this would:
+ // 1. Create contract via POST /api/v1/contracts
+ // 2. Create supervisor task via POST /api/v1/tasks
+ // 3. Link contract and task to step
+ // 4. Update step status to running
+
+ // Placeholder: mark step as running
+ repository::update_step_status(&self.pool, step.id, "running").await?;
+ self.emit_event(EngineEvent::StepStatusChanged {
+ directive_id: directive.id,
+ step_id: step.id,
+ old_status: "ready".to_string(),
+ new_status: "running".to_string(),
+ });
+
+ self.emit_directive_event(
+ directive.id,
+ "step_started",
+ "info",
+ serde_json::json!({
+ "step_id": step.id,
+ "step_name": step.name,
+ }),
+ "system",
+ )
+ .await?;
+
+ Ok(())
+ }
+
+ /// Build contract details from a step.
+ /// Returns (name, description, contract_type, initial_phase)
+ fn get_contract_details(
+ &self,
+ directive: &Directive,
+ step: &ChainStep,
+ ) -> (String, Option<String>, String, String) {
+ let name = format!("{} - {}", directive.title, step.name);
+ let description = step.description.clone();
+ let contract_type = step.contract_type.clone();
+ let initial_phase = step.initial_phase.clone().unwrap_or_else(|| "plan".to_string());
+
+ (name, description, contract_type, initial_phase)
+ }
+
+ // ========================================================================
+ // Evaluation
+ // ========================================================================
+
+ /// Handle contract completion: evaluate the step.
+ pub async fn on_contract_completed(
+ &self,
+ contract_id: Uuid,
+ ) -> Result<(), EngineError> {
+ // Find the step for this contract
+ let step = repository::get_step_by_contract_id(&self.pool, contract_id)
+ .await?
+ .ok_or(EngineError::StepNotFound(contract_id))?;
+
+ // Get directive
+ let chain = repository::get_directive_chain(&self.pool, step.chain_id)
+ .await?
+ .ok_or(EngineError::ChainNotFound(step.chain_id))?;
+
+ let directive = repository::get_directive(&self.pool, chain.directive_id)
+ .await?
+ .ok_or(EngineError::DirectiveNotFound(chain.directive_id))?;
+
+ // Update step status to evaluating
+ repository::update_step_status(&self.pool, step.id, "evaluating").await?;
+ self.emit_event(EngineEvent::StepStatusChanged {
+ directive_id: directive.id,
+ step_id: step.id,
+ old_status: "running".to_string(),
+ new_status: "evaluating".to_string(),
+ });
+
+ // Run evaluation
+ let result = self.evaluate_step(&directive, &step).await?;
+
+ // Record evaluation
+ let programmatic_results = result
+ .verifier_results
+ .iter()
+ .filter(|r| r.verifier_type != super::verifier::VerifierType::Llm)
+ .map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null))
+ .collect::<Vec<_>>();
+
+ let llm_results = result
+ .verifier_results
+ .iter()
+ .filter(|r| r.verifier_type == super::verifier::VerifierType::Llm)
+ .map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null))
+ .collect::<Vec<_>>();
+
+ // Get chain_id from step
+ let chain_id = step.chain_id;
+
+ let _evaluation = repository::create_directive_evaluation(
+ &self.pool,
+ directive.id,
+ Some(chain_id),
+ Some(step.id),
+ step.contract_id,
+ "composite",
+ Some("orchestration_engine"),
+ result.passed,
+ Some(result.composite_score),
+ Some(result.confidence_level.as_str()),
+ serde_json::Value::Array(programmatic_results),
+ serde_json::Value::Array(llm_results),
+ serde_json::Value::Null, // criteria_results
+ &result.summary,
+ result.rework_instructions.as_deref(),
+ )
+ .await?;
+
+ // Update step based on result
+ let new_status = match result.confidence_level {
+ ConfidenceLevel::Green => "passed",
+ ConfidenceLevel::Yellow => {
+ // Check autonomy level
+ if directive.autonomy_level == "full_auto" {
+ "passed" // Accept yellow in full auto mode
+ } else {
+ // Create approval request
+ self.request_approval(
+ &directive,
+ &step,
+ "step_review",
+ &format!(
+ "Step '{}' completed with yellow confidence ({:.0}%). Review required.",
+ step.name,
+ result.composite_score * 100.0
+ ),
+ )
+ .await?;
+ "evaluating" // Wait for approval
+ }
+ }
+ ConfidenceLevel::Red => {
+ // Initiate rework
+ self.initiate_rework(&directive, &step, &result).await?;
+ "rework"
+ }
+ };
+
+ repository::update_step_status(&self.pool, step.id, new_status).await?;
+ repository::update_step_confidence(
+ &self.pool,
+ step.id,
+ result.composite_score,
+ result.confidence_level.as_str(),
+ result.id,
+ )
+ .await?;
+
+ self.emit_event(EngineEvent::EvaluationCompleted {
+ directive_id: directive.id,
+ step_id: step.id,
+ passed: result.passed,
+ confidence_level: result.confidence_level,
+ });
+
+ // If passed, continue chain execution
+ if new_status == "passed" {
+ self.advance_chain(directive.id).await?;
+ }
+
+ Ok(())
+ }
+
+ /// Evaluate a step using tiered verification.
+ async fn evaluate_step(
+ &self,
+ directive: &Directive,
+ step: &ChainStep,
+ ) -> Result<EvaluationResult, EngineError> {
+ // Get repository path
+ let repo_path = directive
+ .local_path
+ .as_ref()
+ .map(std::path::PathBuf::from)
+ .unwrap_or_else(|| std::path::PathBuf::from("."));
+
+ // Auto-detect verifiers
+ let verifiers = auto_detect_verifiers(&repo_path).await;
+
+ // Build verification context
+ let context = VerificationContext {
+ step_id: step.id,
+ contract_id: step.contract_id,
+ modified_files: vec![], // TODO: Get from contract/git
+ step_description: step.description.clone().unwrap_or_default(),
+ acceptance_criteria: vec![], // TODO: Get from directive
+ directive_context: directive.goal.clone(),
+ };
+
+ // Run composite evaluation
+ let evaluator = CompositeEvaluator::new(verifiers)
+ .with_thresholds(
+ directive.confidence_threshold_green,
+ directive.confidence_threshold_yellow,
+ );
+
+ Ok(evaluator.evaluate(&repo_path, &context).await)
+ }
+
+ /// Initiate rework for a failed step.
+ async fn initiate_rework(
+ &self,
+ directive: &Directive,
+ step: &ChainStep,
+ result: &EvaluationResult,
+ ) -> Result<(), EngineError> {
+ // Increment rework count
+ let updated_step = repository::increment_step_rework_count(&self.pool, step.id).await?;
+
+ // Check rework limit
+ let max_rework = directive.max_rework_cycles.unwrap_or(3);
+ if updated_step.rework_count >= max_rework {
+ // Too many rework attempts, mark as blocked
+ repository::update_step_status(&self.pool, step.id, "blocked").await?;
+ self.emit_directive_event(
+ directive.id,
+ "step_blocked",
+ "warning",
+ serde_json::json!({
+ "step_id": step.id,
+ "step_name": step.name,
+ "reason": "Max rework attempts reached",
+ }),
+ "system",
+ )
+ .await?;
+ return Ok(());
+ }
+
+ // Log rework event
+ self.emit_directive_event(
+ directive.id,
+ "step_rework",
+ "info",
+ serde_json::json!({
+ "step_id": step.id,
+ "step_name": step.name,
+ "rework_count": updated_step.rework_count,
+ "instructions": result.rework_instructions,
+ }),
+ "system",
+ )
+ .await?;
+
+ // TODO: Send rework instructions to supervisor task
+ // This would involve:
+ // 1. Reset contract phase to 'plan'
+ // 2. Send message to supervisor with rework instructions
+ // 3. Update step status to 'running'
+
+ Ok(())
+ }
+
+ /// Request human approval for a step.
+ async fn request_approval(
+ &self,
+ directive: &Directive,
+ step: &ChainStep,
+ approval_type: &str,
+ description: &str,
+ ) -> Result<Uuid, EngineError> {
+ let context = serde_json::json!({
+ "step_id": step.id,
+ "step_name": step.name,
+ "confidence_score": step.confidence_score,
+ });
+
+ let approval = repository::create_approval_request(
+ &self.pool,
+ directive.id,
+ Some(step.id),
+ approval_type,
+ description,
+ Some(context),
+ "medium",
+ None, // expires_at
+ )
+ .await?;
+
+ self.emit_event(EngineEvent::ApprovalRequired {
+ directive_id: directive.id,
+ approval_id: approval.id,
+ approval_type: approval_type.to_string(),
+ });
+
+ Ok(approval.id)
+ }
+
+ /// Handle approval resolution.
+ pub async fn on_approval_resolved(
+ &self,
+ approval_id: Uuid,
+ approved: bool,
+ responded_by: Uuid,
+ ) -> Result<(), EngineError> {
+ let status = if approved { "approved" } else { "denied" };
+ let approval = repository::resolve_approval(
+ &self.pool,
+ approval_id,
+ status,
+ None,
+ responded_by,
+ )
+ .await?;
+
+ if let Some(step_id) = approval.step_id {
+ let step = repository::get_chain_step(&self.pool, step_id)
+ .await?
+ .ok_or(EngineError::StepNotFound(step_id))?;
+
+ let chain = repository::get_directive_chain(&self.pool, step.chain_id)
+ .await?
+ .ok_or(EngineError::ChainNotFound(step.chain_id))?;
+
+ if approved {
+ // Mark step as passed and continue
+ repository::update_step_status(&self.pool, step_id, "passed").await?;
+ self.advance_chain(chain.directive_id).await?;
+ } else {
+ // Mark step as failed/blocked
+ repository::update_step_status(&self.pool, step_id, "blocked").await?;
+ }
+ }
+
+ Ok(())
+ }
+
+ // ========================================================================
+ // Circuit Breakers
+ // ========================================================================
+
+ /// Check circuit breakers for a directive.
+ async fn check_circuit_breakers(&self, directive: &Directive) -> Result<(), EngineError> {
+ // Check cost limit
+ if let Some(max_cost) = directive.max_total_cost_usd {
+ let current_cost = directive.total_cost_usd;
+ if current_cost >= max_cost {
+ return Err(EngineError::CircuitBreaker(format!(
+ "Cost limit exceeded: ${:.2} >= ${:.2}",
+ current_cost, max_cost
+ )));
+ }
+ }
+
+ // Check time limit (stored in minutes)
+ if let Some(max_minutes) = directive.max_wall_time_minutes {
+ if let Some(started_at) = directive.started_at {
+ let elapsed = chrono::Utc::now().signed_duration_since(started_at);
+ let elapsed_minutes = elapsed.num_minutes();
+ if elapsed_minutes >= max_minutes as i64 {
+ return Err(EngineError::CircuitBreaker(format!(
+ "Time limit exceeded: {} min >= {} min",
+ elapsed_minutes, max_minutes
+ )));
+ }
+ }
+ }
+
+ // Check chain generation limit
+ if let Some(max_gen) = directive.max_chain_regenerations {
+ let current_gen = directive.chain_generation_count;
+ if current_gen >= max_gen {
+ return Err(EngineError::CircuitBreaker(format!(
+ "Chain generation limit exceeded: {} >= {}",
+ current_gen, max_gen
+ )));
+ }
+ }
+
+ Ok(())
+ }
+
+ // ========================================================================
+ // Completion
+ // ========================================================================
+
+ /// Complete a directive after all steps pass.
+ async fn complete_directive(&self, directive_id: Uuid) -> Result<(), EngineError> {
+ // Run final evaluation (optional)
+ // TODO: LLM evaluation of overall directive completion
+
+ // Update directive status
+ repository::update_directive_status(&self.pool, directive_id, "completed").await?;
+
+ self.emit_event(EngineEvent::DirectiveStatusChanged {
+ directive_id,
+ old_status: "active".to_string(),
+ new_status: "completed".to_string(),
+ });
+
+ self.emit_directive_event(
+ directive_id,
+ "directive_completed",
+ "info",
+ serde_json::json!({}),
+ "system",
+ )
+ .await?;
+
+ Ok(())
+ }
+
+ // ========================================================================
+ // Event Logging
+ // ========================================================================
+
+ /// Emit a directive event to the database.
+ async fn emit_directive_event(
+ &self,
+ directive_id: Uuid,
+ event_type: &str,
+ severity: &str,
+ event_data: serde_json::Value,
+ actor_type: &str,
+ ) -> Result<DirectiveEvent, EngineError> {
+ Ok(repository::emit_directive_event(
+ &self.pool,
+ directive_id,
+ None, // chain_id
+ None, // step_id
+ event_type,
+ severity,
+ Some(event_data),
+ actor_type,
+ None, // actor_id
+ )
+ .await?)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_confidence_level_decision() {
+ // Green confidence should pass in all modes
+ assert_eq!(ConfidenceLevel::Green.as_str(), "green");
+
+ // Yellow confidence behavior depends on autonomy level
+ assert_eq!(ConfidenceLevel::Yellow.as_str(), "yellow");
+
+ // Red confidence should always trigger rework
+ assert_eq!(ConfidenceLevel::Red.as_str(), "red");
+ }
+}