diff options
Diffstat (limited to 'makima/src/orchestration/engine.rs')
| -rw-r--r-- | makima/src/orchestration/engine.rs | 1335 |
1 files changed, 0 insertions, 1335 deletions
diff --git a/makima/src/orchestration/engine.rs b/makima/src/orchestration/engine.rs deleted file mode 100644 index 9f7c3b1..0000000 --- a/makima/src/orchestration/engine.rs +++ /dev/null @@ -1,1335 +0,0 @@ -//! Directive orchestration engine. -//! -//! Manages the lifecycle of directives: -//! - Starts directives and generates initial chains -//! - Monitors step execution and triggers evaluations -//! - Handles rework, escalation, and chain regeneration -//! - Enforces circuit breakers (cost, time, rework limits) - -use std::collections::HashMap; - -use sqlx::PgPool; -use thiserror::Error; -use tokio::sync::broadcast; -use uuid::Uuid; - -use crate::db::models::{ - AddStepRequest, ChainStep, CreateContractRequest, CreateTaskRequest, Directive, - DirectiveEvent, UpdateStepRequest, -}; -use crate::db::repository::{self, RepositoryError}; - -use super::planner::{ChainPlanner, GeneratedChain, PlannerError}; -use super::verifier::{ - auto_detect_verifiers, CompositeEvaluator, ConfidenceLevel, EvaluationResult, - VerificationContext, -}; - -/// Error type for engine operations. -#[derive(Error, Debug)] -pub enum EngineError { - #[error("Database error: {0}")] - Database(#[from] sqlx::Error), - - #[error("Repository error: {0}")] - Repository(#[from] RepositoryError), - - #[error("Planner error: {0}")] - Planner(#[from] PlannerError), - - #[error("Directive not found: {0}")] - DirectiveNotFound(Uuid), - - #[error("Chain not found for directive: {0}")] - ChainNotFound(Uuid), - - #[error("Step not found: {0}")] - StepNotFound(Uuid), - - #[error("Invalid state transition: {from} -> {to}")] - InvalidStateTransition { from: String, to: String }, - - #[error("Circuit breaker triggered: {0}")] - CircuitBreaker(String), - - #[error("Directive is paused")] - DirectivePaused, - - #[error("Contract creation failed: {0}")] - ContractCreation(String), - - #[error("LLM error: {0}")] - LlmError(String), -} - -/// Event emitted by the engine for UI updates. -#[derive(Debug, Clone)] -pub enum EngineEvent { - /// Directive status changed - DirectiveStatusChanged { - directive_id: Uuid, - old_status: String, - new_status: String, - }, - /// Step status changed - StepStatusChanged { - directive_id: Uuid, - step_id: Uuid, - old_status: String, - new_status: String, - }, - /// Evaluation completed - EvaluationCompleted { - directive_id: Uuid, - step_id: Uuid, - passed: bool, - confidence_level: ConfidenceLevel, - }, - /// Approval required - ApprovalRequired { - directive_id: Uuid, - approval_id: Uuid, - approval_type: String, - }, - /// Chain regenerated - ChainRegenerated { - directive_id: Uuid, - old_chain_id: Uuid, - new_chain_id: Uuid, - }, -} - -/// Result from starting a directive, containing info needed for auto-start. -pub struct PlanningStartResult { - /// The planning task ID that needs to be started on a daemon - pub task_id: Uuid, - /// The owner ID for finding available daemons - pub owner_id: Uuid, - /// The planning task details needed for the SpawnTask command - pub task_name: String, - pub plan: String, - pub contract_id: Uuid, - pub repository_url: Option<String>, - pub base_branch: Option<String>, -} - -/// Main orchestration engine for directives. -pub struct DirectiveEngine { - pool: PgPool, - planner: ChainPlanner, - event_tx: Option<broadcast::Sender<EngineEvent>>, -} - -impl DirectiveEngine { - /// Create a new directive engine. - pub fn new(pool: PgPool) -> Self { - Self { - planner: ChainPlanner::new(pool.clone()), - pool, - event_tx: None, - } - } - - /// Set the event broadcast channel for UI updates. - pub fn with_event_channel(mut self, tx: broadcast::Sender<EngineEvent>) -> Self { - self.event_tx = Some(tx); - self - } - - /// Emit an event if channel is configured. - fn emit_event(&self, event: EngineEvent) { - if let Some(tx) = &self.event_tx { - let _ = tx.send(event); - } - } - - // ======================================================================== - // Directive Lifecycle - // ======================================================================== - - /// Start a directive: spawn a planning contract+task to generate the chain. - /// Returns a `PlanningStartResult` so the caller can auto-start the task on a daemon. - pub async fn start_directive(&self, directive_id: Uuid) -> Result<PlanningStartResult, EngineError> { - let directive = repository::get_directive(&self.pool, directive_id) - .await? - .ok_or(EngineError::DirectiveNotFound(directive_id))?; - - // Validate current state - if directive.status != "draft" && directive.status != "paused" { - return Err(EngineError::InvalidStateTransition { - from: directive.status, - to: "planning".to_string(), - }); - } - - // Update status to planning - repository::update_directive_status(&self.pool, directive_id, "planning").await?; - self.emit_directive_event( - directive_id, - "status_changed", - "info", - serde_json::json!({"old_status": directive.status, "new_status": "planning"}), - "system", - ) - .await?; - - // Create an empty chain for the planning task to populate - let chain_name = format!( - "{}-chain", - directive.title.to_lowercase().replace(' ', "-") - ); - let _db_chain = repository::create_directive_chain( - &self.pool, - directive_id, - &chain_name, - Some(&format!("Execution plan for: {}", directive.goal)), - None, // rationale - None, // planning_model - ) - .await?; - - // Create a planning contract (type "execute", no phase guard) - let contract = repository::create_contract_for_owner( - &self.pool, - directive.owner_id, - CreateContractRequest { - name: format!("{} - Planning", directive.title), - description: Some(format!( - "Planning contract for directive: {}", - directive.goal - )), - contract_type: Some("execute".to_string()), - template_id: None, - initial_phase: Some("execute".to_string()), - autonomous_loop: Some(true), - phase_guard: Some(false), - local_only: Some(false), - auto_merge_local: None, - }, - ) - .await - .map_err(|e| { - EngineError::ContractCreation(format!("Failed to create planning contract: {}", e)) - })?; - - // Build instructions for the planning task - let plan = self.build_planning_task_instructions(&directive); - - // Create the planning task - let task_name = format!("{} - Planning", directive.title); - let task = repository::create_task_for_owner( - &self.pool, - directive.owner_id, - CreateTaskRequest { - contract_id: Some(contract.id), - name: task_name.clone(), - description: Some(format!( - "Plan the execution chain for directive: {}", - directive.goal - )), - plan: plan.clone(), - parent_task_id: None, - is_supervisor: true, - priority: 5, - repository_url: directive.repository_url.clone(), - base_branch: directive.base_branch.clone(), - target_branch: None, - merge_mode: None, - target_repo_path: None, - completion_action: Some("none".to_string()), - continue_from_task_id: None, - copy_files: None, - checkpoint_sha: None, - branched_from_task_id: None, - conversation_history: None, - supervisor_worktree_task_id: None, - }, - ) - .await - .map_err(|e| { - EngineError::ContractCreation(format!("Failed to create planning task: {}", e)) - })?; - - // Link the supervisor task to the contract - if let Err(e) = repository::update_contract_supervisor( - &self.pool, - contract.id, - task.id, - ) - .await - { - tracing::warn!( - contract_id = %contract.id, - task_id = %task.id, - error = %e, - "Failed to link supervisor task to planning contract" - ); - } - - // Link the planning contract to the directive - repository::set_directive_orchestrator_contract( - &self.pool, - directive_id, - contract.id, - ) - .await?; - - self.emit_directive_event( - directive_id, - "planning_started", - "info", - serde_json::json!({ - "contract_id": contract.id, - "task_id": task.id, - "message": "Planning task spawned, waiting for chain generation", - }), - "system", - ) - .await?; - - Ok(PlanningStartResult { - task_id: task.id, - owner_id: directive.owner_id, - task_name, - plan, - contract_id: contract.id, - repository_url: directive.repository_url.clone(), - base_branch: directive.base_branch.clone(), - }) - } - - /// Pause a directive. - pub async fn pause_directive(&self, directive_id: Uuid) -> Result<(), EngineError> { - let directive = repository::get_directive(&self.pool, directive_id) - .await? - .ok_or(EngineError::DirectiveNotFound(directive_id))?; - - if directive.status != "active" { - return Err(EngineError::InvalidStateTransition { - from: directive.status, - to: "paused".to_string(), - }); - } - - repository::update_directive_status(&self.pool, directive_id, "paused").await?; - self.emit_event(EngineEvent::DirectiveStatusChanged { - directive_id, - old_status: "active".to_string(), - new_status: "paused".to_string(), - }); - - Ok(()) - } - - /// Resume a paused directive. - pub async fn resume_directive(&self, directive_id: Uuid) -> Result<(), EngineError> { - let directive = repository::get_directive(&self.pool, directive_id) - .await? - .ok_or(EngineError::DirectiveNotFound(directive_id))?; - - if directive.status != "paused" { - return Err(EngineError::InvalidStateTransition { - from: directive.status, - to: "active".to_string(), - }); - } - - repository::update_directive_status(&self.pool, directive_id, "active").await?; - self.emit_event(EngineEvent::DirectiveStatusChanged { - directive_id, - old_status: "paused".to_string(), - new_status: "active".to_string(), - }); - - // Continue execution - self.advance_chain(directive_id).await?; - - Ok(()) - } - - /// Stop a directive (cannot be resumed). - pub async fn stop_directive(&self, directive_id: Uuid) -> Result<(), EngineError> { - let directive = repository::get_directive(&self.pool, directive_id) - .await? - .ok_or(EngineError::DirectiveNotFound(directive_id))?; - - if directive.status == "completed" || directive.status == "failed" { - return Err(EngineError::InvalidStateTransition { - from: directive.status, - to: "failed".to_string(), - }); - } - - repository::update_directive_status(&self.pool, directive_id, "failed").await?; - self.emit_event(EngineEvent::DirectiveStatusChanged { - directive_id, - old_status: directive.status, - new_status: "failed".to_string(), - }); - - Ok(()) - } - - // ======================================================================== - // Chain Management - // ======================================================================== - - /// Build a default chain as a fallback. - fn build_default_chain(&self, directive: &Directive) -> GeneratedChain { - GeneratedChain { - name: format!( - "{}-chain", - directive.title.to_lowercase().replace(' ', "-") - ), - description: format!("Execution plan for: {}", directive.goal), - steps: vec![ - super::planner::GeneratedStep { - name: "research".to_string(), - step_type: "research".to_string(), - description: format!( - "Research and understand the requirements for: {}", - directive.goal - ), - depends_on: vec![], - requirement_ids: vec![], - contract_template: None, - }, - super::planner::GeneratedStep { - name: "implement".to_string(), - step_type: "implement".to_string(), - description: format!("Implement the solution for: {}", directive.goal), - depends_on: vec!["research".to_string()], - requirement_ids: vec![], - contract_template: None, - }, - super::planner::GeneratedStep { - name: "test".to_string(), - step_type: "test".to_string(), - description: "Test and verify the implementation".to_string(), - depends_on: vec!["implement".to_string()], - requirement_ids: vec![], - contract_template: None, - }, - ], - } - } - - /// Create database steps from a generated chain. - async fn create_steps_from_chain( - &self, - chain_id: &Uuid, - chain: &GeneratedChain, - ) -> Result<(), EngineError> { - // First pass: create all steps and build name-to-id map - let mut step_id_map: HashMap<String, Uuid> = HashMap::new(); - - // Get editor positions - let positions = self.planner.compute_editor_positions(chain); - - for step in &chain.steps { - let (editor_x, editor_y) = positions - .get(&step.name) - .copied() - .unwrap_or((100.0, 100.0)); - - let task_plan = step - .contract_template - .as_ref() - .and_then(|t| t.tasks.first()) - .map(|t| t.plan.clone()) - .or_else(|| Some(step.description.clone())); - - let request = AddStepRequest { - name: step.name.clone(), - description: Some(step.description.clone()), - step_type: Some(step.step_type.clone()), - contract_type: step.contract_template.as_ref().map(|t| t.contract_type.clone()), - initial_phase: Some("plan".to_string()), - task_plan, - phases: step.contract_template.as_ref().map(|t| t.phases.clone()), - depends_on: None, // Will update in second pass - parallel_group: None, - requirement_ids: Some(step.requirement_ids.clone()), - acceptance_criteria_ids: None, - verifier_config: None, - editor_x: Some(editor_x), - editor_y: Some(editor_y), - }; - - let db_step = repository::create_chain_step(&self.pool, *chain_id, request).await?; - step_id_map.insert(step.name.clone(), db_step.id); - } - - // Second pass: update dependencies - for step in &chain.steps { - if step.depends_on.is_empty() { - continue; - } - - let step_id = step_id_map.get(&step.name).unwrap(); - let dep_ids: Vec<Uuid> = step - .depends_on - .iter() - .filter_map(|name| step_id_map.get(name)) - .copied() - .collect(); - - // Update step with proper dependencies - let update = UpdateStepRequest { - name: None, - description: None, - task_plan: None, - depends_on: Some(dep_ids), - requirement_ids: None, - acceptance_criteria_ids: None, - verifier_config: None, - editor_x: None, - editor_y: None, - }; - - repository::update_chain_step(&self.pool, *step_id, update).await?; - } - - Ok(()) - } - - /// Regenerate chain while preserving completed steps. - pub async fn regenerate_chain( - &self, - directive_id: Uuid, - reason: &str, - ) -> Result<Uuid, EngineError> { - let directive = repository::get_directive(&self.pool, directive_id) - .await? - .ok_or(EngineError::DirectiveNotFound(directive_id))?; - - let current_chain = repository::get_current_chain(&self.pool, directive_id) - .await? - .ok_or(EngineError::ChainNotFound(directive_id))?; - - // Use default chain for regeneration - // (planning contract handles initial generation; regeneration uses fallback) - let new_chain = self.build_default_chain(&directive); - - // Supersede old chain - repository::supersede_chain(&self.pool, current_chain.id).await?; - - // Create new chain - let db_chain = repository::create_directive_chain( - &self.pool, - directive_id, - &new_chain.name, - Some(&new_chain.description), - Some(reason), // rationale - None, // planning_model - ) - .await?; - - // Create steps - self.create_steps_from_chain(&db_chain.id, &new_chain).await?; - - self.emit_event(EngineEvent::ChainRegenerated { - directive_id, - old_chain_id: current_chain.id, - new_chain_id: db_chain.id, - }); - - // Continue execution - self.advance_chain(directive_id).await?; - - Ok(db_chain.id) - } - - // ======================================================================== - // Step Execution - // ======================================================================== - - /// Advance chain execution: find ready steps and start them. - pub async fn advance_chain(&self, directive_id: Uuid) -> Result<(), EngineError> { - let directive = repository::get_directive(&self.pool, directive_id) - .await? - .ok_or(EngineError::DirectiveNotFound(directive_id))?; - - // Check if directive is active - if directive.status == "paused" { - return Err(EngineError::DirectivePaused); - } - if directive.status != "active" { - return Ok(()); // Not an error, just nothing to do - } - - // Check circuit breakers - self.check_circuit_breakers(&directive).await?; - - // Get current chain - let chain = repository::get_current_chain(&self.pool, directive_id) - .await? - .ok_or(EngineError::ChainNotFound(directive_id))?; - - // Find ready steps (dependencies met, status=pending) - let ready_steps = repository::find_ready_steps(&self.pool, chain.id).await?; - - // Start each ready step - for step in ready_steps { - self.start_step(&directive, &step).await?; - } - - // Check if chain is complete - let all_steps = repository::list_chain_steps(&self.pool, chain.id).await?; - let all_passed = all_steps.iter().all(|s| s.status == "passed" || s.status == "skipped"); - let any_blocked = all_steps.iter().any(|s| s.status == "blocked" || s.status == "failed"); - - if all_passed && !all_steps.is_empty() { - // Complete the directive - self.complete_directive(directive_id).await?; - } else if any_blocked { - // Check if we should regenerate or fail - let failed_count = all_steps.iter().filter(|s| s.status == "failed").count(); - if failed_count > 3 { - // Too many failures, fail the directive - repository::update_directive_status(&self.pool, directive_id, "failed").await?; - } - } - - Ok(()) - } - - /// Start a step by creating its contract and supervisor task. - async fn start_step(&self, directive: &Directive, step: &ChainStep) -> Result<(), EngineError> { - // Update step status to ready - repository::update_step_status(&self.pool, step.id, "ready").await?; - self.emit_event(EngineEvent::StepStatusChanged { - directive_id: directive.id, - step_id: step.id, - old_status: "pending".to_string(), - new_status: "ready".to_string(), - }); - - // Get contract details from step template - let (name, description, contract_type, initial_phase) = - self.get_contract_details(directive, step); - - // Create contract for this step - let contract = repository::create_contract_for_owner( - &self.pool, - directive.owner_id, - CreateContractRequest { - name: name.clone(), - description: description.clone(), - contract_type: Some(contract_type), - template_id: None, - initial_phase: Some(initial_phase), - autonomous_loop: Some(directive.autonomy_level == "full_auto"), - phase_guard: Some(true), - local_only: Some(false), - auto_merge_local: None, - }, - ) - .await - .map_err(|e| EngineError::ContractCreation(format!("Failed to create contract: {}", e)))?; - - // Build task plan from step description and task_plan - let task_plan = step - .task_plan - .clone() - .unwrap_or_else(|| { - format!( - "## Step: {}\n\n{}\n\n## Directive Goal\n{}", - step.name, - description.as_deref().unwrap_or("Complete this step."), - directive.goal, - ) - }); - - // Create supervisor task linked to the contract - let task = repository::create_task_for_owner( - &self.pool, - directive.owner_id, - CreateTaskRequest { - contract_id: Some(contract.id), - name: name.clone(), - description: description.clone(), - plan: task_plan, - parent_task_id: None, - is_supervisor: true, - priority: 5, - repository_url: directive.repository_url.clone(), - base_branch: directive.base_branch.clone(), - target_branch: None, - merge_mode: Some("pr".to_string()), - target_repo_path: None, - completion_action: Some("pr".to_string()), - continue_from_task_id: None, - copy_files: None, - checkpoint_sha: None, - branched_from_task_id: None, - conversation_history: None, - supervisor_worktree_task_id: None, - }, - ) - .await - .map_err(|e| EngineError::ContractCreation(format!("Failed to create task: {}", e)))?; - - // Link contract and task to step - repository::update_step_contract(&self.pool, step.id, contract.id, Some(task.id)).await?; - - // Update step status to running - repository::update_step_status(&self.pool, step.id, "running").await?; - self.emit_event(EngineEvent::StepStatusChanged { - directive_id: directive.id, - step_id: step.id, - old_status: "ready".to_string(), - new_status: "running".to_string(), - }); - - self.emit_directive_event( - directive.id, - "step_started", - "info", - serde_json::json!({ - "step_id": step.id, - "step_name": step.name, - "contract_id": contract.id, - "task_id": task.id, - }), - "system", - ) - .await?; - - Ok(()) - } - - /// Build contract details from a step. - /// Returns (name, description, contract_type, initial_phase) - fn get_contract_details( - &self, - directive: &Directive, - step: &ChainStep, - ) -> (String, Option<String>, String, String) { - let name = format!("{} - {}", directive.title, step.name); - let description = step.description.clone(); - let contract_type = step.contract_type.clone(); - let initial_phase = step.initial_phase.clone().unwrap_or_else(|| "plan".to_string()); - - (name, description, contract_type, initial_phase) - } - - // ======================================================================== - // Evaluation - // ======================================================================== - - /// Handle contract completion: evaluate the step. - pub async fn on_contract_completed( - &self, - contract_id: Uuid, - ) -> Result<(), EngineError> { - // Find the step for this contract - let step = repository::get_step_by_contract_id(&self.pool, contract_id) - .await? - .ok_or(EngineError::StepNotFound(contract_id))?; - - // Get directive - let chain = repository::get_directive_chain(&self.pool, step.chain_id) - .await? - .ok_or(EngineError::ChainNotFound(step.chain_id))?; - - let directive = repository::get_directive(&self.pool, chain.directive_id) - .await? - .ok_or(EngineError::DirectiveNotFound(chain.directive_id))?; - - // Update step status to evaluating - repository::update_step_status(&self.pool, step.id, "evaluating").await?; - self.emit_event(EngineEvent::StepStatusChanged { - directive_id: directive.id, - step_id: step.id, - old_status: "running".to_string(), - new_status: "evaluating".to_string(), - }); - - // Run evaluation - let result = self.evaluate_step(&directive, &step).await?; - - // Record evaluation - let programmatic_results = result - .verifier_results - .iter() - .filter(|r| r.verifier_type != super::verifier::VerifierType::Llm) - .map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)) - .collect::<Vec<_>>(); - - let llm_results = result - .verifier_results - .iter() - .filter(|r| r.verifier_type == super::verifier::VerifierType::Llm) - .map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)) - .collect::<Vec<_>>(); - - // Get chain_id from step - let chain_id = step.chain_id; - - let _evaluation = repository::create_directive_evaluation( - &self.pool, - directive.id, - Some(chain_id), - Some(step.id), - step.contract_id, - "composite", - Some("orchestration_engine"), - result.passed, - Some(result.composite_score), - Some(result.confidence_level.as_str()), - serde_json::Value::Array(programmatic_results), - serde_json::Value::Array(llm_results), - serde_json::Value::Null, // criteria_results - &result.summary, - result.rework_instructions.as_deref(), - ) - .await?; - - // Update step based on result - let new_status = match result.confidence_level { - ConfidenceLevel::Green => "passed", - ConfidenceLevel::Yellow => { - // Check autonomy level - if directive.autonomy_level == "full_auto" { - "passed" // Accept yellow in full auto mode - } else { - // Create approval request - self.request_approval( - &directive, - &step, - "step_review", - &format!( - "Step '{}' completed with yellow confidence ({:.0}%). Review required.", - step.name, - result.composite_score * 100.0 - ), - ) - .await?; - "evaluating" // Wait for approval - } - } - ConfidenceLevel::Red => { - // Initiate rework - self.initiate_rework(&directive, &step, &result).await?; - "rework" - } - }; - - repository::update_step_status(&self.pool, step.id, new_status).await?; - repository::update_step_confidence( - &self.pool, - step.id, - result.composite_score, - result.confidence_level.as_str(), - result.id, - ) - .await?; - - self.emit_event(EngineEvent::EvaluationCompleted { - directive_id: directive.id, - step_id: step.id, - passed: result.passed, - confidence_level: result.confidence_level, - }); - - // If passed, continue chain execution - if new_status == "passed" { - self.advance_chain(directive.id).await?; - } - - Ok(()) - } - - /// Evaluate a step using tiered verification. - async fn evaluate_step( - &self, - directive: &Directive, - step: &ChainStep, - ) -> Result<EvaluationResult, EngineError> { - // Get repository path - let repo_path = directive - .local_path - .as_ref() - .map(std::path::PathBuf::from) - .unwrap_or_else(|| std::path::PathBuf::from(".")); - - // Auto-detect verifiers - let verifiers = auto_detect_verifiers(&repo_path).await; - - // Build verification context - let context = VerificationContext { - step_id: step.id, - contract_id: step.contract_id, - modified_files: vec![], // TODO: Get from contract/git - step_description: step.description.clone().unwrap_or_default(), - acceptance_criteria: vec![], // TODO: Get from directive - directive_context: directive.goal.clone(), - }; - - // Run composite evaluation - let evaluator = CompositeEvaluator::new(verifiers) - .with_thresholds( - directive.confidence_threshold_green, - directive.confidence_threshold_yellow, - ); - - Ok(evaluator.evaluate(&repo_path, &context).await) - } - - /// Initiate rework for a failed step. - async fn initiate_rework( - &self, - directive: &Directive, - step: &ChainStep, - result: &EvaluationResult, - ) -> Result<(), EngineError> { - // Increment rework count - let updated_step = repository::increment_step_rework_count(&self.pool, step.id).await?; - - // Check rework limit - let max_rework = directive.max_rework_cycles.unwrap_or(3); - if updated_step.rework_count >= max_rework { - // Too many rework attempts, mark as blocked - repository::update_step_status(&self.pool, step.id, "blocked").await?; - self.emit_directive_event( - directive.id, - "step_blocked", - "warning", - serde_json::json!({ - "step_id": step.id, - "step_name": step.name, - "reason": "Max rework attempts reached", - }), - "system", - ) - .await?; - return Ok(()); - } - - // Log rework event - self.emit_directive_event( - directive.id, - "step_rework", - "info", - serde_json::json!({ - "step_id": step.id, - "step_name": step.name, - "rework_count": updated_step.rework_count, - "instructions": result.rework_instructions, - }), - "system", - ) - .await?; - - // TODO: Send rework instructions to supervisor task - // This would involve: - // 1. Reset contract phase to 'plan' - // 2. Send message to supervisor with rework instructions - // 3. Update step status to 'running' - - Ok(()) - } - - /// Request human approval for a step. - async fn request_approval( - &self, - directive: &Directive, - step: &ChainStep, - approval_type: &str, - description: &str, - ) -> Result<Uuid, EngineError> { - let context = serde_json::json!({ - "step_id": step.id, - "step_name": step.name, - "confidence_score": step.confidence_score, - }); - - let approval = repository::create_approval_request( - &self.pool, - directive.id, - Some(step.id), - approval_type, - description, - Some(context), - "medium", - None, // expires_at - ) - .await?; - - self.emit_event(EngineEvent::ApprovalRequired { - directive_id: directive.id, - approval_id: approval.id, - approval_type: approval_type.to_string(), - }); - - Ok(approval.id) - } - - /// Handle approval resolution. - pub async fn on_approval_resolved( - &self, - approval_id: Uuid, - approved: bool, - responded_by: Uuid, - ) -> Result<(), EngineError> { - let status = if approved { "approved" } else { "denied" }; - let approval = repository::resolve_approval( - &self.pool, - approval_id, - status, - None, - responded_by, - ) - .await?; - - if let Some(step_id) = approval.step_id { - let step = repository::get_chain_step(&self.pool, step_id) - .await? - .ok_or(EngineError::StepNotFound(step_id))?; - - let chain = repository::get_directive_chain(&self.pool, step.chain_id) - .await? - .ok_or(EngineError::ChainNotFound(step.chain_id))?; - - if approved { - // Mark step as passed and continue - repository::update_step_status(&self.pool, step_id, "passed").await?; - self.advance_chain(chain.directive_id).await?; - } else { - // Mark step as failed/blocked - repository::update_step_status(&self.pool, step_id, "blocked").await?; - } - } - - Ok(()) - } - - // ======================================================================== - // Planning - // ======================================================================== - - /// Build the task instructions for the planning task. - fn build_planning_task_instructions(&self, directive: &Directive) -> String { - let requirements: Vec<String> = directive - .requirements - .as_array() - .map(|arr| { - arr.iter() - .filter_map(|v| v.as_object()) - .map(|obj| { - let id = obj.get("id").and_then(|v| v.as_str()).unwrap_or("?"); - let desc = obj - .get("description") - .and_then(|v| v.as_str()) - .unwrap_or("?"); - format!("- {}: {}", id, desc) - }) - .collect() - }) - .unwrap_or_default(); - - let criteria: Vec<String> = directive - .acceptance_criteria - .as_array() - .map(|arr| { - arr.iter() - .filter_map(|v| v.as_object()) - .map(|obj| { - let id = obj.get("id").and_then(|v| v.as_str()).unwrap_or("?"); - let criterion = obj - .get("criterion") - .and_then(|v| v.as_str()) - .unwrap_or("?"); - format!("- {}: {}", id, criterion) - }) - .collect() - }) - .unwrap_or_default(); - - let constraints: Vec<String> = directive - .constraints - .as_array() - .map(|arr| { - arr.iter() - .filter_map(|v| v.as_str()) - .map(|s| format!("- {}", s)) - .collect() - }) - .unwrap_or_default(); - - let repo_info = directive - .repository_url - .as_deref() - .unwrap_or("(not specified)"); - - format!( - r#"You are planning an execution chain for a directive. - -## Directive: {title} -## Goal -{goal} - -## Requirements -{requirements} - -## Acceptance Criteria -{criteria} - -## Constraints -{constraints} - -## Repository: {repo} - -## Your Task - -Analyze the repository and create a chain of execution steps. -For each step, add it via the API: - -```bash -curl -s -X POST "$MAKIMA_URL/api/v1/directives/{directive_id}/chain/steps" \ - -H "Authorization: Bearer $MAKIMA_API_KEY" \ - -H "Content-Type: application/json" \ - -d '{{ - "name": "step-name", - "description": "What this step accomplishes", - "step_type": "implement", - "depends_on": [], - "contract_type": "execute", - "initial_phase": "execute", - "task_plan": "Detailed instructions for the step executor" - }}' -``` - -### Step types -Use these step types: research, design, implement, test, review, document - -### Dependencies -Each step can depend on other steps by name. Use the `depends_on` field with an array of step names. -Steps with no dependencies will run in parallel. - -### Guidelines -1. Break the work into logical, independently executable steps -2. Each step should be completable by a single Claude Code session -3. Use dependencies to enforce ordering where needed -4. Include a "test" or "verify" step at the end -5. Keep step names in kebab-case -6. The `task_plan` field should contain detailed instructions for the agent that will execute the step - -When you have added all steps, your task is complete."#, - title = directive.title, - goal = directive.goal, - requirements = if requirements.is_empty() { - "(none)".to_string() - } else { - requirements.join("\n") - }, - criteria = if criteria.is_empty() { - "(none)".to_string() - } else { - criteria.join("\n") - }, - constraints = if constraints.is_empty() { - "(none)".to_string() - } else { - constraints.join("\n") - }, - repo = repo_info, - directive_id = directive.id, - ) - } - - /// Handle planning task completion. - pub async fn on_planning_complete( - &self, - directive_id: Uuid, - success: bool, - ) -> Result<(), EngineError> { - let directive = repository::get_directive(&self.pool, directive_id) - .await? - .ok_or(EngineError::DirectiveNotFound(directive_id))?; - - // Only process if directive is still in planning state - if directive.status != "planning" { - tracing::warn!( - "Directive {} is in state '{}', not 'planning'. Skipping planning completion.", - directive_id, - directive.status - ); - return Ok(()); - } - - // Get current chain - let chain = repository::get_current_chain(&self.pool, directive_id) - .await? - .ok_or(EngineError::ChainNotFound(directive_id))?; - - // Check if chain has steps - let steps = repository::list_chain_steps(&self.pool, chain.id).await?; - - if success && !steps.is_empty() { - tracing::info!( - "Planning completed successfully for directive {} with {} steps", - directive_id, - steps.len() - ); - } else { - // Fall back to default chain - let reason = if !success { - "Planning task failed" - } else { - "Planning task produced no steps" - }; - tracing::warn!( - "{} for directive {}, using default chain", - reason, - directive_id - ); - - let default_chain = self.build_default_chain(&directive); - self.create_steps_from_chain(&chain.id, &default_chain) - .await?; - } - - // Activate the directive - repository::update_directive_status(&self.pool, directive_id, "active").await?; - self.emit_event(EngineEvent::DirectiveStatusChanged { - directive_id, - old_status: "planning".to_string(), - new_status: "active".to_string(), - }); - - self.emit_directive_event( - directive_id, - "planning_completed", - "info", - serde_json::json!({"success": success}), - "system", - ) - .await?; - - // Start ready steps - self.advance_chain(directive_id).await?; - - Ok(()) - } - - // ======================================================================== - // Circuit Breakers - // ======================================================================== - - /// Check circuit breakers for a directive. - async fn check_circuit_breakers(&self, directive: &Directive) -> Result<(), EngineError> { - // Check cost limit - if let Some(max_cost) = directive.max_total_cost_usd { - let current_cost = directive.total_cost_usd; - if current_cost >= max_cost { - return Err(EngineError::CircuitBreaker(format!( - "Cost limit exceeded: ${:.2} >= ${:.2}", - current_cost, max_cost - ))); - } - } - - // Check time limit (stored in minutes) - if let Some(max_minutes) = directive.max_wall_time_minutes { - if let Some(started_at) = directive.started_at { - let elapsed = chrono::Utc::now().signed_duration_since(started_at); - let elapsed_minutes = elapsed.num_minutes(); - if elapsed_minutes >= max_minutes as i64 { - return Err(EngineError::CircuitBreaker(format!( - "Time limit exceeded: {} min >= {} min", - elapsed_minutes, max_minutes - ))); - } - } - } - - // Check chain generation limit - if let Some(max_gen) = directive.max_chain_regenerations { - let current_gen = directive.chain_generation_count; - if current_gen >= max_gen { - return Err(EngineError::CircuitBreaker(format!( - "Chain generation limit exceeded: {} >= {}", - current_gen, max_gen - ))); - } - } - - Ok(()) - } - - // ======================================================================== - // Completion - // ======================================================================== - - /// Complete a directive after all steps pass. - async fn complete_directive(&self, directive_id: Uuid) -> Result<(), EngineError> { - // Run final evaluation (optional) - // TODO: LLM evaluation of overall directive completion - - // Update directive status - repository::update_directive_status(&self.pool, directive_id, "completed").await?; - - self.emit_event(EngineEvent::DirectiveStatusChanged { - directive_id, - old_status: "active".to_string(), - new_status: "completed".to_string(), - }); - - self.emit_directive_event( - directive_id, - "directive_completed", - "info", - serde_json::json!({}), - "system", - ) - .await?; - - Ok(()) - } - - // ======================================================================== - // Event Logging - // ======================================================================== - - /// Emit a directive event to the database. - async fn emit_directive_event( - &self, - directive_id: Uuid, - event_type: &str, - severity: &str, - event_data: serde_json::Value, - actor_type: &str, - ) -> Result<DirectiveEvent, EngineError> { - Ok(repository::emit_directive_event( - &self.pool, - directive_id, - None, // chain_id - None, // step_id - event_type, - severity, - Some(event_data), - actor_type, - None, // actor_id - ) - .await?) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_confidence_level_decision() { - // Green confidence should pass in all modes - assert_eq!(ConfidenceLevel::Green.as_str(), "green"); - - // Yellow confidence behavior depends on autonomy level - assert_eq!(ConfidenceLevel::Yellow.as_str(), "yellow"); - - // Red confidence should always trigger rework - assert_eq!(ConfidenceLevel::Red.as_str(), "red"); - } -} |
