//! Directive orchestration engine. //! //! Manages the lifecycle of directives: //! - Starts directives and generates initial chains //! - Monitors step execution and triggers evaluations //! - Handles rework, escalation, and chain regeneration //! - Enforces circuit breakers (cost, time, rework limits) use std::collections::HashMap; use sqlx::PgPool; use thiserror::Error; use tokio::sync::broadcast; use uuid::Uuid; use crate::db::models::{ AddStepRequest, ChainStep, CreateContractRequest, CreateTaskRequest, Directive, DirectiveEvent, UpdateStepRequest, }; use crate::db::repository::{self, RepositoryError}; use super::planner::{ChainPlanner, GeneratedChain, PlannerError}; use super::verifier::{ auto_detect_verifiers, CompositeEvaluator, ConfidenceLevel, EvaluationResult, VerificationContext, }; /// Error type for engine operations. #[derive(Error, Debug)] pub enum EngineError { #[error("Database error: {0}")] Database(#[from] sqlx::Error), #[error("Repository error: {0}")] Repository(#[from] RepositoryError), #[error("Planner error: {0}")] Planner(#[from] PlannerError), #[error("Directive not found: {0}")] DirectiveNotFound(Uuid), #[error("Chain not found for directive: {0}")] ChainNotFound(Uuid), #[error("Step not found: {0}")] StepNotFound(Uuid), #[error("Invalid state transition: {from} -> {to}")] InvalidStateTransition { from: String, to: String }, #[error("Circuit breaker triggered: {0}")] CircuitBreaker(String), #[error("Directive is paused")] DirectivePaused, #[error("Contract creation failed: {0}")] ContractCreation(String), #[error("LLM error: {0}")] LlmError(String), } /// Event emitted by the engine for UI updates. #[derive(Debug, Clone)] pub enum EngineEvent { /// Directive status changed DirectiveStatusChanged { directive_id: Uuid, old_status: String, new_status: String, }, /// Step status changed StepStatusChanged { directive_id: Uuid, step_id: Uuid, old_status: String, new_status: String, }, /// Evaluation completed EvaluationCompleted { directive_id: Uuid, step_id: Uuid, passed: bool, confidence_level: ConfidenceLevel, }, /// Approval required ApprovalRequired { directive_id: Uuid, approval_id: Uuid, approval_type: String, }, /// Chain regenerated ChainRegenerated { directive_id: Uuid, old_chain_id: Uuid, new_chain_id: Uuid, }, } /// Result from starting a directive, containing info needed for auto-start. pub struct PlanningStartResult { /// The planning task ID that needs to be started on a daemon pub task_id: Uuid, /// The owner ID for finding available daemons pub owner_id: Uuid, /// The planning task details needed for the SpawnTask command pub task_name: String, pub plan: String, pub contract_id: Uuid, pub repository_url: Option, pub base_branch: Option, } /// Main orchestration engine for directives. pub struct DirectiveEngine { pool: PgPool, planner: ChainPlanner, event_tx: Option>, } impl DirectiveEngine { /// Create a new directive engine. pub fn new(pool: PgPool) -> Self { Self { planner: ChainPlanner::new(pool.clone()), pool, event_tx: None, } } /// Set the event broadcast channel for UI updates. pub fn with_event_channel(mut self, tx: broadcast::Sender) -> Self { self.event_tx = Some(tx); self } /// Emit an event if channel is configured. fn emit_event(&self, event: EngineEvent) { if let Some(tx) = &self.event_tx { let _ = tx.send(event); } } // ======================================================================== // Directive Lifecycle // ======================================================================== /// Start a directive: spawn a planning contract+task to generate the chain. /// Returns a `PlanningStartResult` so the caller can auto-start the task on a daemon. pub async fn start_directive(&self, directive_id: Uuid) -> Result { let directive = repository::get_directive(&self.pool, directive_id) .await? .ok_or(EngineError::DirectiveNotFound(directive_id))?; // Validate current state if directive.status != "draft" && directive.status != "paused" { return Err(EngineError::InvalidStateTransition { from: directive.status, to: "planning".to_string(), }); } // Update status to planning repository::update_directive_status(&self.pool, directive_id, "planning").await?; self.emit_directive_event( directive_id, "status_changed", "info", serde_json::json!({"old_status": directive.status, "new_status": "planning"}), "system", ) .await?; // Create an empty chain for the planning task to populate let chain_name = format!( "{}-chain", directive.title.to_lowercase().replace(' ', "-") ); let _db_chain = repository::create_directive_chain( &self.pool, directive_id, &chain_name, Some(&format!("Execution plan for: {}", directive.goal)), None, // rationale None, // planning_model ) .await?; // Create a planning contract (type "execute", no phase guard) let contract = repository::create_contract_for_owner( &self.pool, directive.owner_id, CreateContractRequest { name: format!("{} - Planning", directive.title), description: Some(format!( "Planning contract for directive: {}", directive.goal )), contract_type: Some("execute".to_string()), template_id: None, initial_phase: Some("execute".to_string()), autonomous_loop: Some(true), phase_guard: Some(false), local_only: Some(false), auto_merge_local: None, }, ) .await .map_err(|e| { EngineError::ContractCreation(format!("Failed to create planning contract: {}", e)) })?; // Build instructions for the planning task let plan = self.build_planning_task_instructions(&directive); // Create the planning task let task_name = format!("{} - Planning", directive.title); let task = repository::create_task_for_owner( &self.pool, directive.owner_id, CreateTaskRequest { contract_id: Some(contract.id), name: task_name.clone(), description: Some(format!( "Plan the execution chain for directive: {}", directive.goal )), plan: plan.clone(), parent_task_id: None, is_supervisor: true, priority: 5, repository_url: directive.repository_url.clone(), base_branch: directive.base_branch.clone(), target_branch: None, merge_mode: None, target_repo_path: None, completion_action: Some("none".to_string()), continue_from_task_id: None, copy_files: None, checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, supervisor_worktree_task_id: None, }, ) .await .map_err(|e| { EngineError::ContractCreation(format!("Failed to create planning task: {}", e)) })?; // Link the supervisor task to the contract if let Err(e) = repository::update_contract_supervisor( &self.pool, contract.id, task.id, ) .await { tracing::warn!( contract_id = %contract.id, task_id = %task.id, error = %e, "Failed to link supervisor task to planning contract" ); } // Link the planning contract to the directive repository::set_directive_orchestrator_contract( &self.pool, directive_id, contract.id, ) .await?; self.emit_directive_event( directive_id, "planning_started", "info", serde_json::json!({ "contract_id": contract.id, "task_id": task.id, "message": "Planning task spawned, waiting for chain generation", }), "system", ) .await?; Ok(PlanningStartResult { task_id: task.id, owner_id: directive.owner_id, task_name, plan, contract_id: contract.id, repository_url: directive.repository_url.clone(), base_branch: directive.base_branch.clone(), }) } /// Pause a directive. pub async fn pause_directive(&self, directive_id: Uuid) -> Result<(), EngineError> { let directive = repository::get_directive(&self.pool, directive_id) .await? .ok_or(EngineError::DirectiveNotFound(directive_id))?; if directive.status != "active" { return Err(EngineError::InvalidStateTransition { from: directive.status, to: "paused".to_string(), }); } repository::update_directive_status(&self.pool, directive_id, "paused").await?; self.emit_event(EngineEvent::DirectiveStatusChanged { directive_id, old_status: "active".to_string(), new_status: "paused".to_string(), }); Ok(()) } /// Resume a paused directive. pub async fn resume_directive(&self, directive_id: Uuid) -> Result<(), EngineError> { let directive = repository::get_directive(&self.pool, directive_id) .await? .ok_or(EngineError::DirectiveNotFound(directive_id))?; if directive.status != "paused" { return Err(EngineError::InvalidStateTransition { from: directive.status, to: "active".to_string(), }); } repository::update_directive_status(&self.pool, directive_id, "active").await?; self.emit_event(EngineEvent::DirectiveStatusChanged { directive_id, old_status: "paused".to_string(), new_status: "active".to_string(), }); // Continue execution self.advance_chain(directive_id).await?; Ok(()) } /// Stop a directive (cannot be resumed). pub async fn stop_directive(&self, directive_id: Uuid) -> Result<(), EngineError> { let directive = repository::get_directive(&self.pool, directive_id) .await? .ok_or(EngineError::DirectiveNotFound(directive_id))?; if directive.status == "completed" || directive.status == "failed" { return Err(EngineError::InvalidStateTransition { from: directive.status, to: "failed".to_string(), }); } repository::update_directive_status(&self.pool, directive_id, "failed").await?; self.emit_event(EngineEvent::DirectiveStatusChanged { directive_id, old_status: directive.status, new_status: "failed".to_string(), }); Ok(()) } // ======================================================================== // Chain Management // ======================================================================== /// Build a default chain as a fallback. fn build_default_chain(&self, directive: &Directive) -> GeneratedChain { GeneratedChain { name: format!( "{}-chain", directive.title.to_lowercase().replace(' ', "-") ), description: format!("Execution plan for: {}", directive.goal), steps: vec![ super::planner::GeneratedStep { name: "research".to_string(), step_type: "research".to_string(), description: format!( "Research and understand the requirements for: {}", directive.goal ), depends_on: vec![], requirement_ids: vec![], contract_template: None, }, super::planner::GeneratedStep { name: "implement".to_string(), step_type: "implement".to_string(), description: format!("Implement the solution for: {}", directive.goal), depends_on: vec!["research".to_string()], requirement_ids: vec![], contract_template: None, }, super::planner::GeneratedStep { name: "test".to_string(), step_type: "test".to_string(), description: "Test and verify the implementation".to_string(), depends_on: vec!["implement".to_string()], requirement_ids: vec![], contract_template: None, }, ], } } /// Create database steps from a generated chain. async fn create_steps_from_chain( &self, chain_id: &Uuid, chain: &GeneratedChain, ) -> Result<(), EngineError> { // First pass: create all steps and build name-to-id map let mut step_id_map: HashMap = HashMap::new(); // Get editor positions let positions = self.planner.compute_editor_positions(chain); for step in &chain.steps { let (editor_x, editor_y) = positions .get(&step.name) .copied() .unwrap_or((100.0, 100.0)); let task_plan = step .contract_template .as_ref() .and_then(|t| t.tasks.first()) .map(|t| t.plan.clone()) .or_else(|| Some(step.description.clone())); let request = AddStepRequest { name: step.name.clone(), description: Some(step.description.clone()), step_type: Some(step.step_type.clone()), contract_type: step.contract_template.as_ref().map(|t| t.contract_type.clone()), initial_phase: Some("plan".to_string()), task_plan, phases: step.contract_template.as_ref().map(|t| t.phases.clone()), depends_on: None, // Will update in second pass parallel_group: None, requirement_ids: Some(step.requirement_ids.clone()), acceptance_criteria_ids: None, verifier_config: None, editor_x: Some(editor_x), editor_y: Some(editor_y), }; let db_step = repository::create_chain_step(&self.pool, *chain_id, request).await?; step_id_map.insert(step.name.clone(), db_step.id); } // Second pass: update dependencies for step in &chain.steps { if step.depends_on.is_empty() { continue; } let step_id = step_id_map.get(&step.name).unwrap(); let dep_ids: Vec = step .depends_on .iter() .filter_map(|name| step_id_map.get(name)) .copied() .collect(); // Update step with proper dependencies let update = UpdateStepRequest { name: None, description: None, task_plan: None, depends_on: Some(dep_ids), requirement_ids: None, acceptance_criteria_ids: None, verifier_config: None, editor_x: None, editor_y: None, }; repository::update_chain_step(&self.pool, *step_id, update).await?; } Ok(()) } /// Regenerate chain while preserving completed steps. pub async fn regenerate_chain( &self, directive_id: Uuid, reason: &str, ) -> Result { let directive = repository::get_directive(&self.pool, directive_id) .await? .ok_or(EngineError::DirectiveNotFound(directive_id))?; let current_chain = repository::get_current_chain(&self.pool, directive_id) .await? .ok_or(EngineError::ChainNotFound(directive_id))?; // Use default chain for regeneration // (planning contract handles initial generation; regeneration uses fallback) let new_chain = self.build_default_chain(&directive); // Supersede old chain repository::supersede_chain(&self.pool, current_chain.id).await?; // Create new chain let db_chain = repository::create_directive_chain( &self.pool, directive_id, &new_chain.name, Some(&new_chain.description), Some(reason), // rationale None, // planning_model ) .await?; // Create steps self.create_steps_from_chain(&db_chain.id, &new_chain).await?; self.emit_event(EngineEvent::ChainRegenerated { directive_id, old_chain_id: current_chain.id, new_chain_id: db_chain.id, }); // Continue execution self.advance_chain(directive_id).await?; Ok(db_chain.id) } // ======================================================================== // Step Execution // ======================================================================== /// Advance chain execution: find ready steps and start them. pub async fn advance_chain(&self, directive_id: Uuid) -> Result<(), EngineError> { let directive = repository::get_directive(&self.pool, directive_id) .await? .ok_or(EngineError::DirectiveNotFound(directive_id))?; // Check if directive is active if directive.status == "paused" { return Err(EngineError::DirectivePaused); } if directive.status != "active" { return Ok(()); // Not an error, just nothing to do } // Check circuit breakers self.check_circuit_breakers(&directive).await?; // Get current chain let chain = repository::get_current_chain(&self.pool, directive_id) .await? .ok_or(EngineError::ChainNotFound(directive_id))?; // Find ready steps (dependencies met, status=pending) let ready_steps = repository::find_ready_steps(&self.pool, chain.id).await?; // Start each ready step for step in ready_steps { self.start_step(&directive, &step).await?; } // Check if chain is complete let all_steps = repository::list_chain_steps(&self.pool, chain.id).await?; let all_passed = all_steps.iter().all(|s| s.status == "passed" || s.status == "skipped"); let any_blocked = all_steps.iter().any(|s| s.status == "blocked" || s.status == "failed"); if all_passed && !all_steps.is_empty() { // Complete the directive self.complete_directive(directive_id).await?; } else if any_blocked { // Check if we should regenerate or fail let failed_count = all_steps.iter().filter(|s| s.status == "failed").count(); if failed_count > 3 { // Too many failures, fail the directive repository::update_directive_status(&self.pool, directive_id, "failed").await?; } } Ok(()) } /// Start a step by creating its contract and supervisor task. async fn start_step(&self, directive: &Directive, step: &ChainStep) -> Result<(), EngineError> { // Update step status to ready repository::update_step_status(&self.pool, step.id, "ready").await?; self.emit_event(EngineEvent::StepStatusChanged { directive_id: directive.id, step_id: step.id, old_status: "pending".to_string(), new_status: "ready".to_string(), }); // Get contract details from step template let (name, description, contract_type, initial_phase) = self.get_contract_details(directive, step); // Create contract for this step let contract = repository::create_contract_for_owner( &self.pool, directive.owner_id, CreateContractRequest { name: name.clone(), description: description.clone(), contract_type: Some(contract_type), template_id: None, initial_phase: Some(initial_phase), autonomous_loop: Some(directive.autonomy_level == "full_auto"), phase_guard: Some(true), local_only: Some(false), auto_merge_local: None, }, ) .await .map_err(|e| EngineError::ContractCreation(format!("Failed to create contract: {}", e)))?; // Build task plan from step description and task_plan let task_plan = step .task_plan .clone() .unwrap_or_else(|| { format!( "## Step: {}\n\n{}\n\n## Directive Goal\n{}", step.name, description.as_deref().unwrap_or("Complete this step."), directive.goal, ) }); // Create supervisor task linked to the contract let task = repository::create_task_for_owner( &self.pool, directive.owner_id, CreateTaskRequest { contract_id: Some(contract.id), name: name.clone(), description: description.clone(), plan: task_plan, parent_task_id: None, is_supervisor: true, priority: 5, repository_url: directive.repository_url.clone(), base_branch: directive.base_branch.clone(), target_branch: None, merge_mode: Some("pr".to_string()), target_repo_path: None, completion_action: Some("pr".to_string()), continue_from_task_id: None, copy_files: None, checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, supervisor_worktree_task_id: None, }, ) .await .map_err(|e| EngineError::ContractCreation(format!("Failed to create task: {}", e)))?; // Link contract and task to step repository::update_step_contract(&self.pool, step.id, contract.id, Some(task.id)).await?; // Update step status to running repository::update_step_status(&self.pool, step.id, "running").await?; self.emit_event(EngineEvent::StepStatusChanged { directive_id: directive.id, step_id: step.id, old_status: "ready".to_string(), new_status: "running".to_string(), }); self.emit_directive_event( directive.id, "step_started", "info", serde_json::json!({ "step_id": step.id, "step_name": step.name, "contract_id": contract.id, "task_id": task.id, }), "system", ) .await?; Ok(()) } /// Build contract details from a step. /// Returns (name, description, contract_type, initial_phase) fn get_contract_details( &self, directive: &Directive, step: &ChainStep, ) -> (String, Option, String, String) { let name = format!("{} - {}", directive.title, step.name); let description = step.description.clone(); let contract_type = step.contract_type.clone(); let initial_phase = step.initial_phase.clone().unwrap_or_else(|| "plan".to_string()); (name, description, contract_type, initial_phase) } // ======================================================================== // Evaluation // ======================================================================== /// Handle contract completion: evaluate the step. pub async fn on_contract_completed( &self, contract_id: Uuid, ) -> Result<(), EngineError> { // Find the step for this contract let step = repository::get_step_by_contract_id(&self.pool, contract_id) .await? .ok_or(EngineError::StepNotFound(contract_id))?; // Get directive let chain = repository::get_directive_chain(&self.pool, step.chain_id) .await? .ok_or(EngineError::ChainNotFound(step.chain_id))?; let directive = repository::get_directive(&self.pool, chain.directive_id) .await? .ok_or(EngineError::DirectiveNotFound(chain.directive_id))?; // Update step status to evaluating repository::update_step_status(&self.pool, step.id, "evaluating").await?; self.emit_event(EngineEvent::StepStatusChanged { directive_id: directive.id, step_id: step.id, old_status: "running".to_string(), new_status: "evaluating".to_string(), }); // Run evaluation let result = self.evaluate_step(&directive, &step).await?; // Record evaluation let programmatic_results = result .verifier_results .iter() .filter(|r| r.verifier_type != super::verifier::VerifierType::Llm) .map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)) .collect::>(); let llm_results = result .verifier_results .iter() .filter(|r| r.verifier_type == super::verifier::VerifierType::Llm) .map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)) .collect::>(); // Get chain_id from step let chain_id = step.chain_id; let _evaluation = repository::create_directive_evaluation( &self.pool, directive.id, Some(chain_id), Some(step.id), step.contract_id, "composite", Some("orchestration_engine"), result.passed, Some(result.composite_score), Some(result.confidence_level.as_str()), serde_json::Value::Array(programmatic_results), serde_json::Value::Array(llm_results), serde_json::Value::Null, // criteria_results &result.summary, result.rework_instructions.as_deref(), ) .await?; // Update step based on result let new_status = match result.confidence_level { ConfidenceLevel::Green => "passed", ConfidenceLevel::Yellow => { // Check autonomy level if directive.autonomy_level == "full_auto" { "passed" // Accept yellow in full auto mode } else { // Create approval request self.request_approval( &directive, &step, "step_review", &format!( "Step '{}' completed with yellow confidence ({:.0}%). Review required.", step.name, result.composite_score * 100.0 ), ) .await?; "evaluating" // Wait for approval } } ConfidenceLevel::Red => { // Initiate rework self.initiate_rework(&directive, &step, &result).await?; "rework" } }; repository::update_step_status(&self.pool, step.id, new_status).await?; repository::update_step_confidence( &self.pool, step.id, result.composite_score, result.confidence_level.as_str(), result.id, ) .await?; self.emit_event(EngineEvent::EvaluationCompleted { directive_id: directive.id, step_id: step.id, passed: result.passed, confidence_level: result.confidence_level, }); // If passed, continue chain execution if new_status == "passed" { self.advance_chain(directive.id).await?; } Ok(()) } /// Evaluate a step using tiered verification. async fn evaluate_step( &self, directive: &Directive, step: &ChainStep, ) -> Result { // Get repository path let repo_path = directive .local_path .as_ref() .map(std::path::PathBuf::from) .unwrap_or_else(|| std::path::PathBuf::from(".")); // Auto-detect verifiers let verifiers = auto_detect_verifiers(&repo_path).await; // Build verification context let context = VerificationContext { step_id: step.id, contract_id: step.contract_id, modified_files: vec![], // TODO: Get from contract/git step_description: step.description.clone().unwrap_or_default(), acceptance_criteria: vec![], // TODO: Get from directive directive_context: directive.goal.clone(), }; // Run composite evaluation let evaluator = CompositeEvaluator::new(verifiers) .with_thresholds( directive.confidence_threshold_green, directive.confidence_threshold_yellow, ); Ok(evaluator.evaluate(&repo_path, &context).await) } /// Initiate rework for a failed step. async fn initiate_rework( &self, directive: &Directive, step: &ChainStep, result: &EvaluationResult, ) -> Result<(), EngineError> { // Increment rework count let updated_step = repository::increment_step_rework_count(&self.pool, step.id).await?; // Check rework limit let max_rework = directive.max_rework_cycles.unwrap_or(3); if updated_step.rework_count >= max_rework { // Too many rework attempts, mark as blocked repository::update_step_status(&self.pool, step.id, "blocked").await?; self.emit_directive_event( directive.id, "step_blocked", "warning", serde_json::json!({ "step_id": step.id, "step_name": step.name, "reason": "Max rework attempts reached", }), "system", ) .await?; return Ok(()); } // Log rework event self.emit_directive_event( directive.id, "step_rework", "info", serde_json::json!({ "step_id": step.id, "step_name": step.name, "rework_count": updated_step.rework_count, "instructions": result.rework_instructions, }), "system", ) .await?; // TODO: Send rework instructions to supervisor task // This would involve: // 1. Reset contract phase to 'plan' // 2. Send message to supervisor with rework instructions // 3. Update step status to 'running' Ok(()) } /// Request human approval for a step. async fn request_approval( &self, directive: &Directive, step: &ChainStep, approval_type: &str, description: &str, ) -> Result { let context = serde_json::json!({ "step_id": step.id, "step_name": step.name, "confidence_score": step.confidence_score, }); let approval = repository::create_approval_request( &self.pool, directive.id, Some(step.id), approval_type, description, Some(context), "medium", None, // expires_at ) .await?; self.emit_event(EngineEvent::ApprovalRequired { directive_id: directive.id, approval_id: approval.id, approval_type: approval_type.to_string(), }); Ok(approval.id) } /// Handle approval resolution. pub async fn on_approval_resolved( &self, approval_id: Uuid, approved: bool, responded_by: Uuid, ) -> Result<(), EngineError> { let status = if approved { "approved" } else { "denied" }; let approval = repository::resolve_approval( &self.pool, approval_id, status, None, responded_by, ) .await?; if let Some(step_id) = approval.step_id { let step = repository::get_chain_step(&self.pool, step_id) .await? .ok_or(EngineError::StepNotFound(step_id))?; let chain = repository::get_directive_chain(&self.pool, step.chain_id) .await? .ok_or(EngineError::ChainNotFound(step.chain_id))?; if approved { // Mark step as passed and continue repository::update_step_status(&self.pool, step_id, "passed").await?; self.advance_chain(chain.directive_id).await?; } else { // Mark step as failed/blocked repository::update_step_status(&self.pool, step_id, "blocked").await?; } } Ok(()) } // ======================================================================== // Planning // ======================================================================== /// Build the task instructions for the planning task. fn build_planning_task_instructions(&self, directive: &Directive) -> String { let requirements: Vec = directive .requirements .as_array() .map(|arr| { arr.iter() .filter_map(|v| v.as_object()) .map(|obj| { let id = obj.get("id").and_then(|v| v.as_str()).unwrap_or("?"); let desc = obj .get("description") .and_then(|v| v.as_str()) .unwrap_or("?"); format!("- {}: {}", id, desc) }) .collect() }) .unwrap_or_default(); let criteria: Vec = directive .acceptance_criteria .as_array() .map(|arr| { arr.iter() .filter_map(|v| v.as_object()) .map(|obj| { let id = obj.get("id").and_then(|v| v.as_str()).unwrap_or("?"); let criterion = obj .get("criterion") .and_then(|v| v.as_str()) .unwrap_or("?"); format!("- {}: {}", id, criterion) }) .collect() }) .unwrap_or_default(); let constraints: Vec = directive .constraints .as_array() .map(|arr| { arr.iter() .filter_map(|v| v.as_str()) .map(|s| format!("- {}", s)) .collect() }) .unwrap_or_default(); let repo_info = directive .repository_url .as_deref() .unwrap_or("(not specified)"); format!( r#"You are planning an execution chain for a directive. ## Directive: {title} ## Goal {goal} ## Requirements {requirements} ## Acceptance Criteria {criteria} ## Constraints {constraints} ## Repository: {repo} ## Your Task Analyze the repository and create a chain of execution steps. For each step, add it via the API: ```bash curl -s -X POST "$MAKIMA_URL/api/v1/directives/{directive_id}/chain/steps" \ -H "Authorization: Bearer $MAKIMA_API_KEY" \ -H "Content-Type: application/json" \ -d '{{ "name": "step-name", "description": "What this step accomplishes", "step_type": "implement", "depends_on": [], "contract_type": "execute", "initial_phase": "execute", "task_plan": "Detailed instructions for the step executor" }}' ``` ### Step types Use these step types: research, design, implement, test, review, document ### Dependencies Each step can depend on other steps by name. Use the `depends_on` field with an array of step names. Steps with no dependencies will run in parallel. ### Guidelines 1. Break the work into logical, independently executable steps 2. Each step should be completable by a single Claude Code session 3. Use dependencies to enforce ordering where needed 4. Include a "test" or "verify" step at the end 5. Keep step names in kebab-case 6. The `task_plan` field should contain detailed instructions for the agent that will execute the step When you have added all steps, your task is complete."#, title = directive.title, goal = directive.goal, requirements = if requirements.is_empty() { "(none)".to_string() } else { requirements.join("\n") }, criteria = if criteria.is_empty() { "(none)".to_string() } else { criteria.join("\n") }, constraints = if constraints.is_empty() { "(none)".to_string() } else { constraints.join("\n") }, repo = repo_info, directive_id = directive.id, ) } /// Handle planning task completion. pub async fn on_planning_complete( &self, directive_id: Uuid, success: bool, ) -> Result<(), EngineError> { let directive = repository::get_directive(&self.pool, directive_id) .await? .ok_or(EngineError::DirectiveNotFound(directive_id))?; // Only process if directive is still in planning state if directive.status != "planning" { tracing::warn!( "Directive {} is in state '{}', not 'planning'. Skipping planning completion.", directive_id, directive.status ); return Ok(()); } // Get current chain let chain = repository::get_current_chain(&self.pool, directive_id) .await? .ok_or(EngineError::ChainNotFound(directive_id))?; // Check if chain has steps let steps = repository::list_chain_steps(&self.pool, chain.id).await?; if success && !steps.is_empty() { tracing::info!( "Planning completed successfully for directive {} with {} steps", directive_id, steps.len() ); } else { // Fall back to default chain let reason = if !success { "Planning task failed" } else { "Planning task produced no steps" }; tracing::warn!( "{} for directive {}, using default chain", reason, directive_id ); let default_chain = self.build_default_chain(&directive); self.create_steps_from_chain(&chain.id, &default_chain) .await?; } // Activate the directive repository::update_directive_status(&self.pool, directive_id, "active").await?; self.emit_event(EngineEvent::DirectiveStatusChanged { directive_id, old_status: "planning".to_string(), new_status: "active".to_string(), }); self.emit_directive_event( directive_id, "planning_completed", "info", serde_json::json!({"success": success}), "system", ) .await?; // Start ready steps self.advance_chain(directive_id).await?; Ok(()) } // ======================================================================== // Circuit Breakers // ======================================================================== /// Check circuit breakers for a directive. async fn check_circuit_breakers(&self, directive: &Directive) -> Result<(), EngineError> { // Check cost limit if let Some(max_cost) = directive.max_total_cost_usd { let current_cost = directive.total_cost_usd; if current_cost >= max_cost { return Err(EngineError::CircuitBreaker(format!( "Cost limit exceeded: ${:.2} >= ${:.2}", current_cost, max_cost ))); } } // Check time limit (stored in minutes) if let Some(max_minutes) = directive.max_wall_time_minutes { if let Some(started_at) = directive.started_at { let elapsed = chrono::Utc::now().signed_duration_since(started_at); let elapsed_minutes = elapsed.num_minutes(); if elapsed_minutes >= max_minutes as i64 { return Err(EngineError::CircuitBreaker(format!( "Time limit exceeded: {} min >= {} min", elapsed_minutes, max_minutes ))); } } } // Check chain generation limit if let Some(max_gen) = directive.max_chain_regenerations { let current_gen = directive.chain_generation_count; if current_gen >= max_gen { return Err(EngineError::CircuitBreaker(format!( "Chain generation limit exceeded: {} >= {}", current_gen, max_gen ))); } } Ok(()) } // ======================================================================== // Completion // ======================================================================== /// Complete a directive after all steps pass. async fn complete_directive(&self, directive_id: Uuid) -> Result<(), EngineError> { // Run final evaluation (optional) // TODO: LLM evaluation of overall directive completion // Update directive status repository::update_directive_status(&self.pool, directive_id, "completed").await?; self.emit_event(EngineEvent::DirectiveStatusChanged { directive_id, old_status: "active".to_string(), new_status: "completed".to_string(), }); self.emit_directive_event( directive_id, "directive_completed", "info", serde_json::json!({}), "system", ) .await?; Ok(()) } // ======================================================================== // Event Logging // ======================================================================== /// Emit a directive event to the database. async fn emit_directive_event( &self, directive_id: Uuid, event_type: &str, severity: &str, event_data: serde_json::Value, actor_type: &str, ) -> Result { Ok(repository::emit_directive_event( &self.pool, directive_id, None, // chain_id None, // step_id event_type, severity, Some(event_data), actor_type, None, // actor_id ) .await?) } } #[cfg(test)] mod tests { use super::*; #[test] fn test_confidence_level_decision() { // Green confidence should pass in all modes assert_eq!(ConfidenceLevel::Green.as_str(), "green"); // Yellow confidence behavior depends on autonomy level assert_eq!(ConfidenceLevel::Yellow.as_str(), "yellow"); // Red confidence should always trigger rework assert_eq!(ConfidenceLevel::Red.as_str(), "red"); } }