summaryrefslogtreecommitdiff
path: root/makima/src/orchestration/engine.rs
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/orchestration/engine.rs')
-rw-r--r--makima/src/orchestration/engine.rs1335
1 files changed, 0 insertions, 1335 deletions
diff --git a/makima/src/orchestration/engine.rs b/makima/src/orchestration/engine.rs
deleted file mode 100644
index 9f7c3b1..0000000
--- a/makima/src/orchestration/engine.rs
+++ /dev/null
@@ -1,1335 +0,0 @@
-//! Directive orchestration engine.
-//!
-//! Manages the lifecycle of directives:
-//! - Starts directives and generates initial chains
-//! - Monitors step execution and triggers evaluations
-//! - Handles rework, escalation, and chain regeneration
-//! - Enforces circuit breakers (cost, time, rework limits)
-
-use std::collections::HashMap;
-
-use sqlx::PgPool;
-use thiserror::Error;
-use tokio::sync::broadcast;
-use uuid::Uuid;
-
-use crate::db::models::{
- AddStepRequest, ChainStep, CreateContractRequest, CreateTaskRequest, Directive,
- DirectiveEvent, UpdateStepRequest,
-};
-use crate::db::repository::{self, RepositoryError};
-
-use super::planner::{ChainPlanner, GeneratedChain, PlannerError};
-use super::verifier::{
- auto_detect_verifiers, CompositeEvaluator, ConfidenceLevel, EvaluationResult,
- VerificationContext,
-};
-
-/// Error type for engine operations.
-#[derive(Error, Debug)]
-pub enum EngineError {
- #[error("Database error: {0}")]
- Database(#[from] sqlx::Error),
-
- #[error("Repository error: {0}")]
- Repository(#[from] RepositoryError),
-
- #[error("Planner error: {0}")]
- Planner(#[from] PlannerError),
-
- #[error("Directive not found: {0}")]
- DirectiveNotFound(Uuid),
-
- #[error("Chain not found for directive: {0}")]
- ChainNotFound(Uuid),
-
- #[error("Step not found: {0}")]
- StepNotFound(Uuid),
-
- #[error("Invalid state transition: {from} -> {to}")]
- InvalidStateTransition { from: String, to: String },
-
- #[error("Circuit breaker triggered: {0}")]
- CircuitBreaker(String),
-
- #[error("Directive is paused")]
- DirectivePaused,
-
- #[error("Contract creation failed: {0}")]
- ContractCreation(String),
-
- #[error("LLM error: {0}")]
- LlmError(String),
-}
-
-/// Event emitted by the engine for UI updates.
-#[derive(Debug, Clone)]
-pub enum EngineEvent {
- /// Directive status changed
- DirectiveStatusChanged {
- directive_id: Uuid,
- old_status: String,
- new_status: String,
- },
- /// Step status changed
- StepStatusChanged {
- directive_id: Uuid,
- step_id: Uuid,
- old_status: String,
- new_status: String,
- },
- /// Evaluation completed
- EvaluationCompleted {
- directive_id: Uuid,
- step_id: Uuid,
- passed: bool,
- confidence_level: ConfidenceLevel,
- },
- /// Approval required
- ApprovalRequired {
- directive_id: Uuid,
- approval_id: Uuid,
- approval_type: String,
- },
- /// Chain regenerated
- ChainRegenerated {
- directive_id: Uuid,
- old_chain_id: Uuid,
- new_chain_id: Uuid,
- },
-}
-
-/// Result from starting a directive, containing info needed for auto-start.
-pub struct PlanningStartResult {
- /// The planning task ID that needs to be started on a daemon
- pub task_id: Uuid,
- /// The owner ID for finding available daemons
- pub owner_id: Uuid,
- /// The planning task details needed for the SpawnTask command
- pub task_name: String,
- pub plan: String,
- pub contract_id: Uuid,
- pub repository_url: Option<String>,
- pub base_branch: Option<String>,
-}
-
-/// Main orchestration engine for directives.
-pub struct DirectiveEngine {
- pool: PgPool,
- planner: ChainPlanner,
- event_tx: Option<broadcast::Sender<EngineEvent>>,
-}
-
-impl DirectiveEngine {
- /// Create a new directive engine.
- pub fn new(pool: PgPool) -> Self {
- Self {
- planner: ChainPlanner::new(pool.clone()),
- pool,
- event_tx: None,
- }
- }
-
- /// Set the event broadcast channel for UI updates.
- pub fn with_event_channel(mut self, tx: broadcast::Sender<EngineEvent>) -> Self {
- self.event_tx = Some(tx);
- self
- }
-
- /// Emit an event if channel is configured.
- fn emit_event(&self, event: EngineEvent) {
- if let Some(tx) = &self.event_tx {
- let _ = tx.send(event);
- }
- }
-
- // ========================================================================
- // Directive Lifecycle
- // ========================================================================
-
- /// Start a directive: spawn a planning contract+task to generate the chain.
- /// Returns a `PlanningStartResult` so the caller can auto-start the task on a daemon.
- pub async fn start_directive(&self, directive_id: Uuid) -> Result<PlanningStartResult, EngineError> {
- let directive = repository::get_directive(&self.pool, directive_id)
- .await?
- .ok_or(EngineError::DirectiveNotFound(directive_id))?;
-
- // Validate current state
- if directive.status != "draft" && directive.status != "paused" {
- return Err(EngineError::InvalidStateTransition {
- from: directive.status,
- to: "planning".to_string(),
- });
- }
-
- // Update status to planning
- repository::update_directive_status(&self.pool, directive_id, "planning").await?;
- self.emit_directive_event(
- directive_id,
- "status_changed",
- "info",
- serde_json::json!({"old_status": directive.status, "new_status": "planning"}),
- "system",
- )
- .await?;
-
- // Create an empty chain for the planning task to populate
- let chain_name = format!(
- "{}-chain",
- directive.title.to_lowercase().replace(' ', "-")
- );
- let _db_chain = repository::create_directive_chain(
- &self.pool,
- directive_id,
- &chain_name,
- Some(&format!("Execution plan for: {}", directive.goal)),
- None, // rationale
- None, // planning_model
- )
- .await?;
-
- // Create a planning contract (type "execute", no phase guard)
- let contract = repository::create_contract_for_owner(
- &self.pool,
- directive.owner_id,
- CreateContractRequest {
- name: format!("{} - Planning", directive.title),
- description: Some(format!(
- "Planning contract for directive: {}",
- directive.goal
- )),
- contract_type: Some("execute".to_string()),
- template_id: None,
- initial_phase: Some("execute".to_string()),
- autonomous_loop: Some(true),
- phase_guard: Some(false),
- local_only: Some(false),
- auto_merge_local: None,
- },
- )
- .await
- .map_err(|e| {
- EngineError::ContractCreation(format!("Failed to create planning contract: {}", e))
- })?;
-
- // Build instructions for the planning task
- let plan = self.build_planning_task_instructions(&directive);
-
- // Create the planning task
- let task_name = format!("{} - Planning", directive.title);
- let task = repository::create_task_for_owner(
- &self.pool,
- directive.owner_id,
- CreateTaskRequest {
- contract_id: Some(contract.id),
- name: task_name.clone(),
- description: Some(format!(
- "Plan the execution chain for directive: {}",
- directive.goal
- )),
- plan: plan.clone(),
- parent_task_id: None,
- is_supervisor: true,
- priority: 5,
- repository_url: directive.repository_url.clone(),
- base_branch: directive.base_branch.clone(),
- target_branch: None,
- merge_mode: None,
- target_repo_path: None,
- completion_action: Some("none".to_string()),
- continue_from_task_id: None,
- copy_files: None,
- checkpoint_sha: None,
- branched_from_task_id: None,
- conversation_history: None,
- supervisor_worktree_task_id: None,
- },
- )
- .await
- .map_err(|e| {
- EngineError::ContractCreation(format!("Failed to create planning task: {}", e))
- })?;
-
- // Link the supervisor task to the contract
- if let Err(e) = repository::update_contract_supervisor(
- &self.pool,
- contract.id,
- task.id,
- )
- .await
- {
- tracing::warn!(
- contract_id = %contract.id,
- task_id = %task.id,
- error = %e,
- "Failed to link supervisor task to planning contract"
- );
- }
-
- // Link the planning contract to the directive
- repository::set_directive_orchestrator_contract(
- &self.pool,
- directive_id,
- contract.id,
- )
- .await?;
-
- self.emit_directive_event(
- directive_id,
- "planning_started",
- "info",
- serde_json::json!({
- "contract_id": contract.id,
- "task_id": task.id,
- "message": "Planning task spawned, waiting for chain generation",
- }),
- "system",
- )
- .await?;
-
- Ok(PlanningStartResult {
- task_id: task.id,
- owner_id: directive.owner_id,
- task_name,
- plan,
- contract_id: contract.id,
- repository_url: directive.repository_url.clone(),
- base_branch: directive.base_branch.clone(),
- })
- }
-
- /// Pause a directive.
- pub async fn pause_directive(&self, directive_id: Uuid) -> Result<(), EngineError> {
- let directive = repository::get_directive(&self.pool, directive_id)
- .await?
- .ok_or(EngineError::DirectiveNotFound(directive_id))?;
-
- if directive.status != "active" {
- return Err(EngineError::InvalidStateTransition {
- from: directive.status,
- to: "paused".to_string(),
- });
- }
-
- repository::update_directive_status(&self.pool, directive_id, "paused").await?;
- self.emit_event(EngineEvent::DirectiveStatusChanged {
- directive_id,
- old_status: "active".to_string(),
- new_status: "paused".to_string(),
- });
-
- Ok(())
- }
-
- /// Resume a paused directive.
- pub async fn resume_directive(&self, directive_id: Uuid) -> Result<(), EngineError> {
- let directive = repository::get_directive(&self.pool, directive_id)
- .await?
- .ok_or(EngineError::DirectiveNotFound(directive_id))?;
-
- if directive.status != "paused" {
- return Err(EngineError::InvalidStateTransition {
- from: directive.status,
- to: "active".to_string(),
- });
- }
-
- repository::update_directive_status(&self.pool, directive_id, "active").await?;
- self.emit_event(EngineEvent::DirectiveStatusChanged {
- directive_id,
- old_status: "paused".to_string(),
- new_status: "active".to_string(),
- });
-
- // Continue execution
- self.advance_chain(directive_id).await?;
-
- Ok(())
- }
-
- /// Stop a directive (cannot be resumed).
- pub async fn stop_directive(&self, directive_id: Uuid) -> Result<(), EngineError> {
- let directive = repository::get_directive(&self.pool, directive_id)
- .await?
- .ok_or(EngineError::DirectiveNotFound(directive_id))?;
-
- if directive.status == "completed" || directive.status == "failed" {
- return Err(EngineError::InvalidStateTransition {
- from: directive.status,
- to: "failed".to_string(),
- });
- }
-
- repository::update_directive_status(&self.pool, directive_id, "failed").await?;
- self.emit_event(EngineEvent::DirectiveStatusChanged {
- directive_id,
- old_status: directive.status,
- new_status: "failed".to_string(),
- });
-
- Ok(())
- }
-
- // ========================================================================
- // Chain Management
- // ========================================================================
-
- /// Build a default chain as a fallback.
- fn build_default_chain(&self, directive: &Directive) -> GeneratedChain {
- GeneratedChain {
- name: format!(
- "{}-chain",
- directive.title.to_lowercase().replace(' ', "-")
- ),
- description: format!("Execution plan for: {}", directive.goal),
- steps: vec![
- super::planner::GeneratedStep {
- name: "research".to_string(),
- step_type: "research".to_string(),
- description: format!(
- "Research and understand the requirements for: {}",
- directive.goal
- ),
- depends_on: vec![],
- requirement_ids: vec![],
- contract_template: None,
- },
- super::planner::GeneratedStep {
- name: "implement".to_string(),
- step_type: "implement".to_string(),
- description: format!("Implement the solution for: {}", directive.goal),
- depends_on: vec!["research".to_string()],
- requirement_ids: vec![],
- contract_template: None,
- },
- super::planner::GeneratedStep {
- name: "test".to_string(),
- step_type: "test".to_string(),
- description: "Test and verify the implementation".to_string(),
- depends_on: vec!["implement".to_string()],
- requirement_ids: vec![],
- contract_template: None,
- },
- ],
- }
- }
-
- /// Create database steps from a generated chain.
- async fn create_steps_from_chain(
- &self,
- chain_id: &Uuid,
- chain: &GeneratedChain,
- ) -> Result<(), EngineError> {
- // First pass: create all steps and build name-to-id map
- let mut step_id_map: HashMap<String, Uuid> = HashMap::new();
-
- // Get editor positions
- let positions = self.planner.compute_editor_positions(chain);
-
- for step in &chain.steps {
- let (editor_x, editor_y) = positions
- .get(&step.name)
- .copied()
- .unwrap_or((100.0, 100.0));
-
- let task_plan = step
- .contract_template
- .as_ref()
- .and_then(|t| t.tasks.first())
- .map(|t| t.plan.clone())
- .or_else(|| Some(step.description.clone()));
-
- let request = AddStepRequest {
- name: step.name.clone(),
- description: Some(step.description.clone()),
- step_type: Some(step.step_type.clone()),
- contract_type: step.contract_template.as_ref().map(|t| t.contract_type.clone()),
- initial_phase: Some("plan".to_string()),
- task_plan,
- phases: step.contract_template.as_ref().map(|t| t.phases.clone()),
- depends_on: None, // Will update in second pass
- parallel_group: None,
- requirement_ids: Some(step.requirement_ids.clone()),
- acceptance_criteria_ids: None,
- verifier_config: None,
- editor_x: Some(editor_x),
- editor_y: Some(editor_y),
- };
-
- let db_step = repository::create_chain_step(&self.pool, *chain_id, request).await?;
- step_id_map.insert(step.name.clone(), db_step.id);
- }
-
- // Second pass: update dependencies
- for step in &chain.steps {
- if step.depends_on.is_empty() {
- continue;
- }
-
- let step_id = step_id_map.get(&step.name).unwrap();
- let dep_ids: Vec<Uuid> = step
- .depends_on
- .iter()
- .filter_map(|name| step_id_map.get(name))
- .copied()
- .collect();
-
- // Update step with proper dependencies
- let update = UpdateStepRequest {
- name: None,
- description: None,
- task_plan: None,
- depends_on: Some(dep_ids),
- requirement_ids: None,
- acceptance_criteria_ids: None,
- verifier_config: None,
- editor_x: None,
- editor_y: None,
- };
-
- repository::update_chain_step(&self.pool, *step_id, update).await?;
- }
-
- Ok(())
- }
-
- /// Regenerate chain while preserving completed steps.
- pub async fn regenerate_chain(
- &self,
- directive_id: Uuid,
- reason: &str,
- ) -> Result<Uuid, EngineError> {
- let directive = repository::get_directive(&self.pool, directive_id)
- .await?
- .ok_or(EngineError::DirectiveNotFound(directive_id))?;
-
- let current_chain = repository::get_current_chain(&self.pool, directive_id)
- .await?
- .ok_or(EngineError::ChainNotFound(directive_id))?;
-
- // Use default chain for regeneration
- // (planning contract handles initial generation; regeneration uses fallback)
- let new_chain = self.build_default_chain(&directive);
-
- // Supersede old chain
- repository::supersede_chain(&self.pool, current_chain.id).await?;
-
- // Create new chain
- let db_chain = repository::create_directive_chain(
- &self.pool,
- directive_id,
- &new_chain.name,
- Some(&new_chain.description),
- Some(reason), // rationale
- None, // planning_model
- )
- .await?;
-
- // Create steps
- self.create_steps_from_chain(&db_chain.id, &new_chain).await?;
-
- self.emit_event(EngineEvent::ChainRegenerated {
- directive_id,
- old_chain_id: current_chain.id,
- new_chain_id: db_chain.id,
- });
-
- // Continue execution
- self.advance_chain(directive_id).await?;
-
- Ok(db_chain.id)
- }
-
- // ========================================================================
- // Step Execution
- // ========================================================================
-
- /// Advance chain execution: find ready steps and start them.
- pub async fn advance_chain(&self, directive_id: Uuid) -> Result<(), EngineError> {
- let directive = repository::get_directive(&self.pool, directive_id)
- .await?
- .ok_or(EngineError::DirectiveNotFound(directive_id))?;
-
- // Check if directive is active
- if directive.status == "paused" {
- return Err(EngineError::DirectivePaused);
- }
- if directive.status != "active" {
- return Ok(()); // Not an error, just nothing to do
- }
-
- // Check circuit breakers
- self.check_circuit_breakers(&directive).await?;
-
- // Get current chain
- let chain = repository::get_current_chain(&self.pool, directive_id)
- .await?
- .ok_or(EngineError::ChainNotFound(directive_id))?;
-
- // Find ready steps (dependencies met, status=pending)
- let ready_steps = repository::find_ready_steps(&self.pool, chain.id).await?;
-
- // Start each ready step
- for step in ready_steps {
- self.start_step(&directive, &step).await?;
- }
-
- // Check if chain is complete
- let all_steps = repository::list_chain_steps(&self.pool, chain.id).await?;
- let all_passed = all_steps.iter().all(|s| s.status == "passed" || s.status == "skipped");
- let any_blocked = all_steps.iter().any(|s| s.status == "blocked" || s.status == "failed");
-
- if all_passed && !all_steps.is_empty() {
- // Complete the directive
- self.complete_directive(directive_id).await?;
- } else if any_blocked {
- // Check if we should regenerate or fail
- let failed_count = all_steps.iter().filter(|s| s.status == "failed").count();
- if failed_count > 3 {
- // Too many failures, fail the directive
- repository::update_directive_status(&self.pool, directive_id, "failed").await?;
- }
- }
-
- Ok(())
- }
-
- /// Start a step by creating its contract and supervisor task.
- async fn start_step(&self, directive: &Directive, step: &ChainStep) -> Result<(), EngineError> {
- // Update step status to ready
- repository::update_step_status(&self.pool, step.id, "ready").await?;
- self.emit_event(EngineEvent::StepStatusChanged {
- directive_id: directive.id,
- step_id: step.id,
- old_status: "pending".to_string(),
- new_status: "ready".to_string(),
- });
-
- // Get contract details from step template
- let (name, description, contract_type, initial_phase) =
- self.get_contract_details(directive, step);
-
- // Create contract for this step
- let contract = repository::create_contract_for_owner(
- &self.pool,
- directive.owner_id,
- CreateContractRequest {
- name: name.clone(),
- description: description.clone(),
- contract_type: Some(contract_type),
- template_id: None,
- initial_phase: Some(initial_phase),
- autonomous_loop: Some(directive.autonomy_level == "full_auto"),
- phase_guard: Some(true),
- local_only: Some(false),
- auto_merge_local: None,
- },
- )
- .await
- .map_err(|e| EngineError::ContractCreation(format!("Failed to create contract: {}", e)))?;
-
- // Build task plan from step description and task_plan
- let task_plan = step
- .task_plan
- .clone()
- .unwrap_or_else(|| {
- format!(
- "## Step: {}\n\n{}\n\n## Directive Goal\n{}",
- step.name,
- description.as_deref().unwrap_or("Complete this step."),
- directive.goal,
- )
- });
-
- // Create supervisor task linked to the contract
- let task = repository::create_task_for_owner(
- &self.pool,
- directive.owner_id,
- CreateTaskRequest {
- contract_id: Some(contract.id),
- name: name.clone(),
- description: description.clone(),
- plan: task_plan,
- parent_task_id: None,
- is_supervisor: true,
- priority: 5,
- repository_url: directive.repository_url.clone(),
- base_branch: directive.base_branch.clone(),
- target_branch: None,
- merge_mode: Some("pr".to_string()),
- target_repo_path: None,
- completion_action: Some("pr".to_string()),
- continue_from_task_id: None,
- copy_files: None,
- checkpoint_sha: None,
- branched_from_task_id: None,
- conversation_history: None,
- supervisor_worktree_task_id: None,
- },
- )
- .await
- .map_err(|e| EngineError::ContractCreation(format!("Failed to create task: {}", e)))?;
-
- // Link contract and task to step
- repository::update_step_contract(&self.pool, step.id, contract.id, Some(task.id)).await?;
-
- // Update step status to running
- repository::update_step_status(&self.pool, step.id, "running").await?;
- self.emit_event(EngineEvent::StepStatusChanged {
- directive_id: directive.id,
- step_id: step.id,
- old_status: "ready".to_string(),
- new_status: "running".to_string(),
- });
-
- self.emit_directive_event(
- directive.id,
- "step_started",
- "info",
- serde_json::json!({
- "step_id": step.id,
- "step_name": step.name,
- "contract_id": contract.id,
- "task_id": task.id,
- }),
- "system",
- )
- .await?;
-
- Ok(())
- }
-
- /// Build contract details from a step.
- /// Returns (name, description, contract_type, initial_phase)
- fn get_contract_details(
- &self,
- directive: &Directive,
- step: &ChainStep,
- ) -> (String, Option<String>, String, String) {
- let name = format!("{} - {}", directive.title, step.name);
- let description = step.description.clone();
- let contract_type = step.contract_type.clone();
- let initial_phase = step.initial_phase.clone().unwrap_or_else(|| "plan".to_string());
-
- (name, description, contract_type, initial_phase)
- }
-
- // ========================================================================
- // Evaluation
- // ========================================================================
-
- /// Handle contract completion: evaluate the step.
- pub async fn on_contract_completed(
- &self,
- contract_id: Uuid,
- ) -> Result<(), EngineError> {
- // Find the step for this contract
- let step = repository::get_step_by_contract_id(&self.pool, contract_id)
- .await?
- .ok_or(EngineError::StepNotFound(contract_id))?;
-
- // Get directive
- let chain = repository::get_directive_chain(&self.pool, step.chain_id)
- .await?
- .ok_or(EngineError::ChainNotFound(step.chain_id))?;
-
- let directive = repository::get_directive(&self.pool, chain.directive_id)
- .await?
- .ok_or(EngineError::DirectiveNotFound(chain.directive_id))?;
-
- // Update step status to evaluating
- repository::update_step_status(&self.pool, step.id, "evaluating").await?;
- self.emit_event(EngineEvent::StepStatusChanged {
- directive_id: directive.id,
- step_id: step.id,
- old_status: "running".to_string(),
- new_status: "evaluating".to_string(),
- });
-
- // Run evaluation
- let result = self.evaluate_step(&directive, &step).await?;
-
- // Record evaluation
- let programmatic_results = result
- .verifier_results
- .iter()
- .filter(|r| r.verifier_type != super::verifier::VerifierType::Llm)
- .map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null))
- .collect::<Vec<_>>();
-
- let llm_results = result
- .verifier_results
- .iter()
- .filter(|r| r.verifier_type == super::verifier::VerifierType::Llm)
- .map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null))
- .collect::<Vec<_>>();
-
- // Get chain_id from step
- let chain_id = step.chain_id;
-
- let _evaluation = repository::create_directive_evaluation(
- &self.pool,
- directive.id,
- Some(chain_id),
- Some(step.id),
- step.contract_id,
- "composite",
- Some("orchestration_engine"),
- result.passed,
- Some(result.composite_score),
- Some(result.confidence_level.as_str()),
- serde_json::Value::Array(programmatic_results),
- serde_json::Value::Array(llm_results),
- serde_json::Value::Null, // criteria_results
- &result.summary,
- result.rework_instructions.as_deref(),
- )
- .await?;
-
- // Update step based on result
- let new_status = match result.confidence_level {
- ConfidenceLevel::Green => "passed",
- ConfidenceLevel::Yellow => {
- // Check autonomy level
- if directive.autonomy_level == "full_auto" {
- "passed" // Accept yellow in full auto mode
- } else {
- // Create approval request
- self.request_approval(
- &directive,
- &step,
- "step_review",
- &format!(
- "Step '{}' completed with yellow confidence ({:.0}%). Review required.",
- step.name,
- result.composite_score * 100.0
- ),
- )
- .await?;
- "evaluating" // Wait for approval
- }
- }
- ConfidenceLevel::Red => {
- // Initiate rework
- self.initiate_rework(&directive, &step, &result).await?;
- "rework"
- }
- };
-
- repository::update_step_status(&self.pool, step.id, new_status).await?;
- repository::update_step_confidence(
- &self.pool,
- step.id,
- result.composite_score,
- result.confidence_level.as_str(),
- result.id,
- )
- .await?;
-
- self.emit_event(EngineEvent::EvaluationCompleted {
- directive_id: directive.id,
- step_id: step.id,
- passed: result.passed,
- confidence_level: result.confidence_level,
- });
-
- // If passed, continue chain execution
- if new_status == "passed" {
- self.advance_chain(directive.id).await?;
- }
-
- Ok(())
- }
-
- /// Evaluate a step using tiered verification.
- async fn evaluate_step(
- &self,
- directive: &Directive,
- step: &ChainStep,
- ) -> Result<EvaluationResult, EngineError> {
- // Get repository path
- let repo_path = directive
- .local_path
- .as_ref()
- .map(std::path::PathBuf::from)
- .unwrap_or_else(|| std::path::PathBuf::from("."));
-
- // Auto-detect verifiers
- let verifiers = auto_detect_verifiers(&repo_path).await;
-
- // Build verification context
- let context = VerificationContext {
- step_id: step.id,
- contract_id: step.contract_id,
- modified_files: vec![], // TODO: Get from contract/git
- step_description: step.description.clone().unwrap_or_default(),
- acceptance_criteria: vec![], // TODO: Get from directive
- directive_context: directive.goal.clone(),
- };
-
- // Run composite evaluation
- let evaluator = CompositeEvaluator::new(verifiers)
- .with_thresholds(
- directive.confidence_threshold_green,
- directive.confidence_threshold_yellow,
- );
-
- Ok(evaluator.evaluate(&repo_path, &context).await)
- }
-
- /// Initiate rework for a failed step.
- async fn initiate_rework(
- &self,
- directive: &Directive,
- step: &ChainStep,
- result: &EvaluationResult,
- ) -> Result<(), EngineError> {
- // Increment rework count
- let updated_step = repository::increment_step_rework_count(&self.pool, step.id).await?;
-
- // Check rework limit
- let max_rework = directive.max_rework_cycles.unwrap_or(3);
- if updated_step.rework_count >= max_rework {
- // Too many rework attempts, mark as blocked
- repository::update_step_status(&self.pool, step.id, "blocked").await?;
- self.emit_directive_event(
- directive.id,
- "step_blocked",
- "warning",
- serde_json::json!({
- "step_id": step.id,
- "step_name": step.name,
- "reason": "Max rework attempts reached",
- }),
- "system",
- )
- .await?;
- return Ok(());
- }
-
- // Log rework event
- self.emit_directive_event(
- directive.id,
- "step_rework",
- "info",
- serde_json::json!({
- "step_id": step.id,
- "step_name": step.name,
- "rework_count": updated_step.rework_count,
- "instructions": result.rework_instructions,
- }),
- "system",
- )
- .await?;
-
- // TODO: Send rework instructions to supervisor task
- // This would involve:
- // 1. Reset contract phase to 'plan'
- // 2. Send message to supervisor with rework instructions
- // 3. Update step status to 'running'
-
- Ok(())
- }
-
- /// Request human approval for a step.
- async fn request_approval(
- &self,
- directive: &Directive,
- step: &ChainStep,
- approval_type: &str,
- description: &str,
- ) -> Result<Uuid, EngineError> {
- let context = serde_json::json!({
- "step_id": step.id,
- "step_name": step.name,
- "confidence_score": step.confidence_score,
- });
-
- let approval = repository::create_approval_request(
- &self.pool,
- directive.id,
- Some(step.id),
- approval_type,
- description,
- Some(context),
- "medium",
- None, // expires_at
- )
- .await?;
-
- self.emit_event(EngineEvent::ApprovalRequired {
- directive_id: directive.id,
- approval_id: approval.id,
- approval_type: approval_type.to_string(),
- });
-
- Ok(approval.id)
- }
-
- /// Handle approval resolution.
- pub async fn on_approval_resolved(
- &self,
- approval_id: Uuid,
- approved: bool,
- responded_by: Uuid,
- ) -> Result<(), EngineError> {
- let status = if approved { "approved" } else { "denied" };
- let approval = repository::resolve_approval(
- &self.pool,
- approval_id,
- status,
- None,
- responded_by,
- )
- .await?;
-
- if let Some(step_id) = approval.step_id {
- let step = repository::get_chain_step(&self.pool, step_id)
- .await?
- .ok_or(EngineError::StepNotFound(step_id))?;
-
- let chain = repository::get_directive_chain(&self.pool, step.chain_id)
- .await?
- .ok_or(EngineError::ChainNotFound(step.chain_id))?;
-
- if approved {
- // Mark step as passed and continue
- repository::update_step_status(&self.pool, step_id, "passed").await?;
- self.advance_chain(chain.directive_id).await?;
- } else {
- // Mark step as failed/blocked
- repository::update_step_status(&self.pool, step_id, "blocked").await?;
- }
- }
-
- Ok(())
- }
-
- // ========================================================================
- // Planning
- // ========================================================================
-
- /// Build the task instructions for the planning task.
- fn build_planning_task_instructions(&self, directive: &Directive) -> String {
- let requirements: Vec<String> = directive
- .requirements
- .as_array()
- .map(|arr| {
- arr.iter()
- .filter_map(|v| v.as_object())
- .map(|obj| {
- let id = obj.get("id").and_then(|v| v.as_str()).unwrap_or("?");
- let desc = obj
- .get("description")
- .and_then(|v| v.as_str())
- .unwrap_or("?");
- format!("- {}: {}", id, desc)
- })
- .collect()
- })
- .unwrap_or_default();
-
- let criteria: Vec<String> = directive
- .acceptance_criteria
- .as_array()
- .map(|arr| {
- arr.iter()
- .filter_map(|v| v.as_object())
- .map(|obj| {
- let id = obj.get("id").and_then(|v| v.as_str()).unwrap_or("?");
- let criterion = obj
- .get("criterion")
- .and_then(|v| v.as_str())
- .unwrap_or("?");
- format!("- {}: {}", id, criterion)
- })
- .collect()
- })
- .unwrap_or_default();
-
- let constraints: Vec<String> = directive
- .constraints
- .as_array()
- .map(|arr| {
- arr.iter()
- .filter_map(|v| v.as_str())
- .map(|s| format!("- {}", s))
- .collect()
- })
- .unwrap_or_default();
-
- let repo_info = directive
- .repository_url
- .as_deref()
- .unwrap_or("(not specified)");
-
- format!(
- r#"You are planning an execution chain for a directive.
-
-## Directive: {title}
-## Goal
-{goal}
-
-## Requirements
-{requirements}
-
-## Acceptance Criteria
-{criteria}
-
-## Constraints
-{constraints}
-
-## Repository: {repo}
-
-## Your Task
-
-Analyze the repository and create a chain of execution steps.
-For each step, add it via the API:
-
-```bash
-curl -s -X POST "$MAKIMA_URL/api/v1/directives/{directive_id}/chain/steps" \
- -H "Authorization: Bearer $MAKIMA_API_KEY" \
- -H "Content-Type: application/json" \
- -d '{{
- "name": "step-name",
- "description": "What this step accomplishes",
- "step_type": "implement",
- "depends_on": [],
- "contract_type": "execute",
- "initial_phase": "execute",
- "task_plan": "Detailed instructions for the step executor"
- }}'
-```
-
-### Step types
-Use these step types: research, design, implement, test, review, document
-
-### Dependencies
-Each step can depend on other steps by name. Use the `depends_on` field with an array of step names.
-Steps with no dependencies will run in parallel.
-
-### Guidelines
-1. Break the work into logical, independently executable steps
-2. Each step should be completable by a single Claude Code session
-3. Use dependencies to enforce ordering where needed
-4. Include a "test" or "verify" step at the end
-5. Keep step names in kebab-case
-6. The `task_plan` field should contain detailed instructions for the agent that will execute the step
-
-When you have added all steps, your task is complete."#,
- title = directive.title,
- goal = directive.goal,
- requirements = if requirements.is_empty() {
- "(none)".to_string()
- } else {
- requirements.join("\n")
- },
- criteria = if criteria.is_empty() {
- "(none)".to_string()
- } else {
- criteria.join("\n")
- },
- constraints = if constraints.is_empty() {
- "(none)".to_string()
- } else {
- constraints.join("\n")
- },
- repo = repo_info,
- directive_id = directive.id,
- )
- }
-
- /// Handle planning task completion.
- pub async fn on_planning_complete(
- &self,
- directive_id: Uuid,
- success: bool,
- ) -> Result<(), EngineError> {
- let directive = repository::get_directive(&self.pool, directive_id)
- .await?
- .ok_or(EngineError::DirectiveNotFound(directive_id))?;
-
- // Only process if directive is still in planning state
- if directive.status != "planning" {
- tracing::warn!(
- "Directive {} is in state '{}', not 'planning'. Skipping planning completion.",
- directive_id,
- directive.status
- );
- return Ok(());
- }
-
- // Get current chain
- let chain = repository::get_current_chain(&self.pool, directive_id)
- .await?
- .ok_or(EngineError::ChainNotFound(directive_id))?;
-
- // Check if chain has steps
- let steps = repository::list_chain_steps(&self.pool, chain.id).await?;
-
- if success && !steps.is_empty() {
- tracing::info!(
- "Planning completed successfully for directive {} with {} steps",
- directive_id,
- steps.len()
- );
- } else {
- // Fall back to default chain
- let reason = if !success {
- "Planning task failed"
- } else {
- "Planning task produced no steps"
- };
- tracing::warn!(
- "{} for directive {}, using default chain",
- reason,
- directive_id
- );
-
- let default_chain = self.build_default_chain(&directive);
- self.create_steps_from_chain(&chain.id, &default_chain)
- .await?;
- }
-
- // Activate the directive
- repository::update_directive_status(&self.pool, directive_id, "active").await?;
- self.emit_event(EngineEvent::DirectiveStatusChanged {
- directive_id,
- old_status: "planning".to_string(),
- new_status: "active".to_string(),
- });
-
- self.emit_directive_event(
- directive_id,
- "planning_completed",
- "info",
- serde_json::json!({"success": success}),
- "system",
- )
- .await?;
-
- // Start ready steps
- self.advance_chain(directive_id).await?;
-
- Ok(())
- }
-
- // ========================================================================
- // Circuit Breakers
- // ========================================================================
-
- /// Check circuit breakers for a directive.
- async fn check_circuit_breakers(&self, directive: &Directive) -> Result<(), EngineError> {
- // Check cost limit
- if let Some(max_cost) = directive.max_total_cost_usd {
- let current_cost = directive.total_cost_usd;
- if current_cost >= max_cost {
- return Err(EngineError::CircuitBreaker(format!(
- "Cost limit exceeded: ${:.2} >= ${:.2}",
- current_cost, max_cost
- )));
- }
- }
-
- // Check time limit (stored in minutes)
- if let Some(max_minutes) = directive.max_wall_time_minutes {
- if let Some(started_at) = directive.started_at {
- let elapsed = chrono::Utc::now().signed_duration_since(started_at);
- let elapsed_minutes = elapsed.num_minutes();
- if elapsed_minutes >= max_minutes as i64 {
- return Err(EngineError::CircuitBreaker(format!(
- "Time limit exceeded: {} min >= {} min",
- elapsed_minutes, max_minutes
- )));
- }
- }
- }
-
- // Check chain generation limit
- if let Some(max_gen) = directive.max_chain_regenerations {
- let current_gen = directive.chain_generation_count;
- if current_gen >= max_gen {
- return Err(EngineError::CircuitBreaker(format!(
- "Chain generation limit exceeded: {} >= {}",
- current_gen, max_gen
- )));
- }
- }
-
- Ok(())
- }
-
- // ========================================================================
- // Completion
- // ========================================================================
-
- /// Complete a directive after all steps pass.
- async fn complete_directive(&self, directive_id: Uuid) -> Result<(), EngineError> {
- // Run final evaluation (optional)
- // TODO: LLM evaluation of overall directive completion
-
- // Update directive status
- repository::update_directive_status(&self.pool, directive_id, "completed").await?;
-
- self.emit_event(EngineEvent::DirectiveStatusChanged {
- directive_id,
- old_status: "active".to_string(),
- new_status: "completed".to_string(),
- });
-
- self.emit_directive_event(
- directive_id,
- "directive_completed",
- "info",
- serde_json::json!({}),
- "system",
- )
- .await?;
-
- Ok(())
- }
-
- // ========================================================================
- // Event Logging
- // ========================================================================
-
- /// Emit a directive event to the database.
- async fn emit_directive_event(
- &self,
- directive_id: Uuid,
- event_type: &str,
- severity: &str,
- event_data: serde_json::Value,
- actor_type: &str,
- ) -> Result<DirectiveEvent, EngineError> {
- Ok(repository::emit_directive_event(
- &self.pool,
- directive_id,
- None, // chain_id
- None, // step_id
- event_type,
- severity,
- Some(event_data),
- actor_type,
- None, // actor_id
- )
- .await?)
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_confidence_level_decision() {
- // Green confidence should pass in all modes
- assert_eq!(ConfidenceLevel::Green.as_str(), "green");
-
- // Yellow confidence behavior depends on autonomy level
- assert_eq!(ConfidenceLevel::Yellow.as_str(), "yellow");
-
- // Red confidence should always trigger rework
- assert_eq!(ConfidenceLevel::Red.as_str(), "red");
- }
-}