diff options
Diffstat (limited to 'makima/src/orchestration/directive.rs')
| -rw-r--r-- | makima/src/orchestration/directive.rs | 1685 |
1 files changed, 0 insertions, 1685 deletions
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs deleted file mode 100644 index 46d9425..0000000 --- a/makima/src/orchestration/directive.rs +++ /dev/null @@ -1,1685 +0,0 @@ -//! Directive orchestration — init, planning completion, chain advancement. - -use serde::Deserialize; -use sqlx::PgPool; -use uuid::Uuid; - -use serde::Serialize; -use crate::db::models::{ - ChainStep, CreateContractRequest, CreateTaskRequest, Directive, Task, - UpdateContractRequest, UpdateTaskRequest, -}; -use crate::db::repository; -use crate::server::state::{DaemonCommand, SharedState}; - -/// A single step in the chain plan produced by the planning supervisor. -#[derive(Debug, Deserialize)] -#[serde(rename_all = "snake_case")] -struct ChainPlanStep { - name: String, - description: String, - #[serde(alias = "taskPlan")] - task_plan: String, - #[serde(default, alias = "dependsOn")] - depends_on: Vec<String>, // names of steps this depends on -} - -/// Wrapper for the plan JSON written by the planning supervisor. -#[derive(Debug, Deserialize)] -struct ChainPlan { - steps: Vec<ChainPlanStep>, -} - -/// Result written by the monitoring supervisor after evaluating a step. -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -struct MonitoringResult { - passed: bool, - overall_score: Option<f64>, - confidence_level: Option<String>, - #[serde(default)] - criteria_results: serde_json::Value, - #[serde(default)] - summary_feedback: String, - rework_instructions: Option<String>, -} - -/// Dispatch a task to an available daemon. Finds a connected daemon with capacity, -/// assigns the task, and sends a SpawnTask command. -async fn dispatch_task_to_daemon( - pool: &PgPool, - state: &SharedState, - task: &Task, - contract_local_only: bool, - contract_auto_merge_local: bool, - owner_id: Uuid, -) -> Result<(), String> { - // Find available daemons - let daemons = repository::get_available_daemons_excluding(pool, owner_id, &[]) - .await - .map_err(|e| format!("Failed to get available daemons: {}", e))?; - - let available_daemon = daemons.iter().find(|d| { - d.current_task_count < d.max_concurrent_tasks - && state.daemon_connections.contains_key(&d.connection_id) - }); - - let daemon = match available_daemon { - Some(d) => d, - None => { - tracing::warn!( - task_id = %task.id, - "No daemon available to dispatch task — will be picked up by retry loop" - ); - return Ok(()); - } - }; - - // Assign task to daemon - let update_req = UpdateTaskRequest { - status: Some("starting".to_string()), - daemon_id: Some(daemon.id), - version: Some(task.version), - ..Default::default() - }; - - let updated = repository::update_task_for_owner(pool, task.id, owner_id, update_req) - .await - .map_err(|e| format!("Failed to assign task to daemon: {:?}", e))?; - - let Some(updated_task) = updated else { - return Err("Task not found when assigning to daemon".to_string()); - }; - - // Get repo URL from task or contract repositories - let repo_url = if let Some(url) = &updated_task.repository_url { - Some(url.clone()) - } else if let Some(contract_id) = updated_task.contract_id { - match repository::list_contract_repositories(pool, contract_id).await { - Ok(repos) => repos - .iter() - .find(|r| r.is_primary) - .or(repos.first()) - .and_then(|r| r.repository_url.clone().or_else(|| r.local_path.clone())), - Err(_) => None, - } - } else { - None - }; - - let cmd = DaemonCommand::SpawnTask { - task_id: updated_task.id, - task_name: updated_task.name.clone(), - plan: updated_task.plan.clone(), - repo_url, - base_branch: updated_task.base_branch.clone(), - target_branch: updated_task.target_branch.clone(), - parent_task_id: updated_task.parent_task_id, - depth: updated_task.depth, - is_orchestrator: false, - target_repo_path: updated_task.target_repo_path.clone(), - completion_action: updated_task.completion_action.clone(), - continue_from_task_id: updated_task.continue_from_task_id, - copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), - contract_id: updated_task.contract_id, - is_supervisor: updated_task.is_supervisor, - autonomous_loop: updated_task.contract_id.is_some(), - resume_session: false, - conversation_history: None, - patch_data: None, - patch_base_sha: None, - local_only: contract_local_only, - auto_merge_local: contract_auto_merge_local, - supervisor_worktree_task_id: None, - }; - - if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { - tracing::warn!( - task_id = %task.id, - daemon_id = %daemon.id, - error = %e, - "Failed to send spawn command — rolling back" - ); - let rollback = UpdateTaskRequest { - status: Some("pending".to_string()), - clear_daemon_id: true, - ..Default::default() - }; - let _ = repository::update_task_for_owner(pool, task.id, owner_id, rollback).await; - return Ok(()); // Non-fatal, retry loop will pick it up - } - - tracing::info!( - task_id = %task.id, - daemon_id = %daemon.id, - "Dispatched directive task to daemon" - ); - - Ok(()) -} - -/// Initialize a directive: create a planning contract and transition to "planning". -pub async fn init_directive( - pool: &PgPool, - state: &SharedState, - owner_id: Uuid, - directive_id: Uuid, -) -> Result<Directive, String> { - // 1. Get directive, verify status - let directive = repository::get_directive_for_owner(pool, directive_id, owner_id) - .await - .map_err(|e| format!("Failed to get directive: {}", e))? - .ok_or("Directive not found")?; - - if directive.status != "draft" { - return Err(format!( - "Directive must be in 'draft' status to start, current status: '{}'", - directive.status - )); - } - - // 2. Create planning contract - let contract = repository::create_contract_for_owner( - pool, - owner_id, - CreateContractRequest { - name: format!("{} - Planning", directive.title), - description: Some(format!( - "Planning contract for directive: {}", - directive.title - )), - contract_type: Some("simple".to_string()), - template_id: None, - initial_phase: Some("plan".to_string()), - autonomous_loop: Some(true), - phase_guard: None, - local_only: Some(true), - auto_merge_local: None, - }, - ) - .await - .map_err(|e| format!("Failed to create planning contract: {}", e))?; - - // 3. Mark contract as directive orchestrator - repository::set_contract_directive_fields(pool, contract.id, Some(directive_id), true) - .await - .map_err(|e| format!("Failed to set contract directive fields: {}", e))?; - - // 4. Build planning prompt - let planning_prompt = build_planning_prompt(&directive); - - // 5. Create supervisor task - let supervisor_task = repository::create_task_for_owner( - pool, - owner_id, - CreateTaskRequest { - contract_id: Some(contract.id), - name: format!("{} - Planner", directive.title), - description: Some("Decompose directive goal into executable chain steps".to_string()), - plan: planning_prompt, - parent_task_id: None, - is_supervisor: true, - priority: 10, - repository_url: directive.repository_url.clone(), - base_branch: directive.base_branch.clone(), - target_branch: None, - merge_mode: None, - target_repo_path: directive.local_path.clone(), - completion_action: None, - continue_from_task_id: None, - copy_files: None, - checkpoint_sha: None, - branched_from_task_id: None, - conversation_history: None, - supervisor_worktree_task_id: None, - }, - ) - .await - .map_err(|e| format!("Failed to create supervisor task: {}", e))?; - - // 6. Link supervisor to contract - repository::update_contract_for_owner( - pool, - contract.id, - owner_id, - UpdateContractRequest { - supervisor_task_id: Some(supervisor_task.id), - ..Default::default() - }, - ) - .await - .map_err(|e| match e { - crate::db::repository::RepositoryError::Database(e) => { - format!("Failed to link supervisor to contract: {}", e) - } - other => format!("Failed to link supervisor to contract: {:?}", other), - })?; - - // 7. Set orchestrator_contract_id on directive - repository::set_directive_orchestrator_contract(pool, directive_id, contract.id) - .await - .map_err(|e| format!("Failed to set orchestrator contract: {}", e))?; - - // 8. Transition directive to "planning" - let updated = repository::update_directive_status(pool, directive_id, "planning") - .await - .map_err(|e| format!("Failed to update directive status: {}", e))? - .ok_or("Directive not found after status update")?; - - // 9. Copy repo config to contract if repository_url is set - if let Some(ref repo_url) = directive.repository_url { - let _ = repository::add_remote_repository( - pool, - contract.id, - "directive-repo", - repo_url, - true, - ) - .await; - } else if let Some(ref local_path) = directive.local_path { - let _ = repository::add_local_repository( - pool, - contract.id, - "directive-repo", - local_path, - true, - ) - .await; - } - - tracing::info!( - directive_id = %directive_id, - contract_id = %contract.id, - task_id = %supervisor_task.id, - "Directive started: planning contract created" - ); - - // 10. Dispatch planning task to an available daemon immediately - dispatch_task_to_daemon( - pool, state, &supervisor_task, - contract.local_only, contract.auto_merge_local, - owner_id, - ).await?; - - Ok(updated) -} - -/// Submit a chain plan for a directive via the CLI/API (instead of file-based extraction). -pub async fn submit_plan( - pool: &PgPool, - state: &SharedState, - owner_id: Uuid, - directive_id: Uuid, - plan_json: &str, -) -> Result<Directive, String> { - // 1. Get directive, verify status - let directive = repository::get_directive_for_owner(pool, directive_id, owner_id) - .await - .map_err(|e| format!("Failed to get directive: {}", e))? - .ok_or("Directive not found")?; - - if directive.status != "planning" { - return Err(format!( - "Directive must be in 'planning' status to submit a plan, current status: '{}'", - directive.status - )); - } - - // 2. Idempotency: if current_chain_id already set, return existing directive - if directive.current_chain_id.is_some() { - tracing::info!( - directive_id = %directive_id, - "Plan already submitted (current_chain_id set), returning existing directive" - ); - return Ok(directive); - } - - // 3. Parse the plan JSON - let chain_plan: ChainPlan = serde_json::from_str(plan_json) - .map_err(|e| format!("Failed to parse chain plan JSON: {}", e))?; - - if chain_plan.steps.is_empty() { - return Err("Chain plan has no steps".to_string()); - } - - // 4. Create chain and steps, transition to active - create_chain_and_steps(pool, state, &directive, &chain_plan, owner_id).await?; - - // 5. Re-fetch and return the updated directive - let updated = repository::get_directive(pool, directive_id) - .await - .map_err(|e| format!("Failed to re-fetch directive: {}", e))? - .ok_or("Directive not found after plan submission")?; - - tracing::info!( - directive_id = %directive_id, - step_count = chain_plan.steps.len(), - "Plan submitted via API, directive now active" - ); - - Ok(updated) -} - -/// Called when any task completes — checks if it's directive-related and advances. -/// Called when a contract's status is updated to "completed" via the API. -/// This is the primary entry point for directive orchestration because supervisor -/// tasks do not send TaskComplete messages — they complete via contract status updates. -pub async fn on_contract_completed( - pool: &PgPool, - state: &SharedState, - contract: &crate::db::models::Contract, - owner_id: Uuid, -) -> Result<(), String> { - if contract.status != "completed" { - return Ok(()); - } - - if contract.is_directive_orchestrator { - let directive = - repository::get_directive_by_orchestrator_contract(pool, contract.id) - .await - .map_err(|e| format!("Failed to get directive by orchestrator: {}", e))?; - - if let Some(directive) = directive { - tracing::info!( - directive_id = %directive.id, - contract_id = %contract.id, - "Directive orchestrator contract completed, handling planning completion" - ); - handle_planning_completion(pool, state, &directive, owner_id).await?; - } else { - tracing::warn!( - contract_id = %contract.id, - "Directive orchestrator contract completed but no directive found" - ); - } - } else if let Some(directive_id) = contract.directive_id { - // Check if this is a monitoring contract - let monitoring_step = - repository::get_step_by_monitoring_contract_id(pool, contract.id) - .await - .map_err(|e| format!("Failed to check monitoring contract: {}", e))?; - - if let Some(step) = monitoring_step { - tracing::info!( - directive_id = %directive_id, - step_id = %step.id, - contract_id = %contract.id, - "Monitoring contract completed" - ); - process_monitoring_result(pool, state, contract, &step, owner_id).await?; - } else { - // Step contract completed - let step = repository::get_step_by_contract_id(pool, contract.id) - .await - .map_err(|e| format!("Failed to get step by contract: {}", e))?; - - if let Some(step) = step { - // Idempotency: only dispatch monitoring if step is still "running" - // (on_step_completed may also fire via the task path) - if step.status != "running" { - tracing::info!( - step_id = %step.id, - status = %step.status, - "Skipping step contract completion: step no longer running" - ); - return Ok(()); - } - - let directive = repository::get_directive(pool, directive_id) - .await - .map_err(|e| format!("Failed to get directive: {}", e))? - .ok_or("Directive not found")?; - - tracing::info!( - directive_id = %directive_id, - step_id = %step.id, - contract_id = %contract.id, - "Step contract completed, dispatching monitoring" - ); - - // Step contract completed successfully — dispatch monitoring - repository::update_step_status(pool, step.id, "evaluating") - .await - .map_err(|e| format!("Failed to update step status: {}", e))?; - - let _ = repository::create_directive_event( - pool, - directive.id, - directive.current_chain_id, - Some(step.id), - "step_evaluating", - "info", - None, - "system", - None, - ) - .await; - - dispatch_monitoring(pool, state, &directive, &step, contract, owner_id).await?; - } - } - } - - Ok(()) -} - -pub async fn on_task_completed( - pool: &PgPool, - state: &SharedState, - task: &Task, - owner_id: Uuid, -) -> Result<(), String> { - let Some(contract_id) = task.contract_id else { - return Ok(()); - }; - - let contract = repository::get_contract_for_owner(pool, contract_id, owner_id) - .await - .map_err(|e| format!("Failed to get contract: {}", e))?; - - let Some(contract) = contract else { - return Ok(()); - }; - - if contract.is_directive_orchestrator { - // This is a planning contract completion - let directive = - repository::get_directive_by_orchestrator_contract(pool, contract_id) - .await - .map_err(|e| format!("Failed to get directive by orchestrator: {}", e))?; - - if let Some(directive) = directive { - on_planning_completed(pool, state, &directive, task, owner_id).await?; - } - } else if contract.directive_id.is_some() { - // Check if this is a monitoring contract completion - let monitoring_step = - repository::get_step_by_monitoring_contract_id(pool, contract_id) - .await - .map_err(|e| format!("Failed to check monitoring contract: {}", e))?; - - if let Some(step) = monitoring_step { - on_monitoring_completed(pool, state, &contract, &step, task, owner_id).await?; - } else { - // This is a step contract completion - on_step_completed(pool, state, &contract, task, owner_id).await?; - } - } - - Ok(()) -} - -/// Handle planning task completion: parse chain plan, create steps, advance. -async fn on_planning_completed( - pool: &PgPool, - state: &SharedState, - directive: &Directive, - task: &Task, - owner_id: Uuid, -) -> Result<(), String> { - // If task failed, fail the directive - if task.status == "failed" { - tracing::warn!( - directive_id = %directive.id, - task_id = %task.id, - "Planning task failed, marking directive as failed" - ); - repository::update_directive_status(pool, directive.id, "failed") - .await - .map_err(|e| format!("Failed to update directive status: {}", e))?; - return Ok(()); - } - - // Only process when the supervisor task itself is done - if task.status != "done" || !task.is_supervisor { - return Ok(()); - } - - handle_planning_completion(pool, state, directive, owner_id).await -} - -/// Handle planning contract/task completion. -/// Checks if a plan was submitted via the CLI; if not, retries or fails. -async fn handle_planning_completion( - pool: &PgPool, - state: &SharedState, - directive: &Directive, - owner_id: Uuid, -) -> Result<(), String> { - // Re-fetch directive to check latest state - let current = repository::get_directive(pool, directive.id) - .await - .map_err(|e| format!("Failed to re-fetch directive: {}", e))? - .ok_or("Directive not found")?; - - // Idempotency: only process if still in "planning" status - if current.status != "planning" { - tracing::info!( - directive_id = %directive.id, - status = %current.status, - "Skipping handle_planning_completion: directive no longer in planning status" - ); - return Ok(()); - } - - // If plan was already submitted via CLI (current_chain_id is set), nothing to do - if current.current_chain_id.is_some() { - tracing::info!( - directive_id = %directive.id, - "Plan already submitted via CLI, skipping handle_planning_completion" - ); - return Ok(()); - } - - // No plan was submitted — check retry budget - let max_regenerations = current.max_chain_regenerations.unwrap_or(2); - if current.chain_generation_count < max_regenerations { - tracing::warn!( - directive_id = %directive.id, - attempt = current.chain_generation_count + 1, - max = max_regenerations, - "Planning completed without plan submission, retrying" - ); - - let _ = repository::create_directive_event( - pool, - directive.id, - None, - None, - "planning_retry", - "warn", - Some(&serde_json::json!({ - "attempt": current.chain_generation_count + 1, - "maxRegenerations": max_regenerations, - "reason": "Planning contract completed without submitting a plan" - })), - "system", - None, - ) - .await; - - // Increment generation count - repository::increment_chain_generation_count(pool, directive.id) - .await - .map_err(|e| format!("Failed to increment chain generation count: {}", e))?; - - // Reset to draft so init_directive can be called again - repository::update_directive_status(pool, directive.id, "draft") - .await - .map_err(|e| format!("Failed to reset directive status: {}", e))?; - - // Re-init planning - init_directive(pool, state, owner_id, directive.id).await?; - - Ok(()) - } else { - tracing::error!( - directive_id = %directive.id, - attempts = current.chain_generation_count, - max = max_regenerations, - "Planning failed: max regeneration attempts exhausted without plan submission" - ); - - let _ = repository::create_directive_event( - pool, - directive.id, - None, - None, - "planning_failed", - "error", - Some(&serde_json::json!({ - "attempts": current.chain_generation_count, - "maxRegenerations": max_regenerations, - "reason": "Max chain regeneration attempts exhausted without plan submission" - })), - "system", - None, - ) - .await; - - repository::update_directive_status(pool, directive.id, "failed") - .await - .map_err(|e| format!("Failed to update directive status: {}", e))?; - - Ok(()) - } -} - -/// Inner helper: create chain, steps, set current chain, transition to active, and advance. -/// Extracted so that `process_planning_result` can catch errors and mark the directive failed. -async fn create_chain_and_steps( - pool: &PgPool, - state: &SharedState, - directive: &Directive, - chain_plan: &ChainPlan, - owner_id: Uuid, -) -> Result<(), String> { - // Create chain - let chain = repository::create_directive_chain( - pool, - directive.id, - &format!("{} - Chain", directive.title), - Some("Auto-generated from planning"), - None, - chain_plan.steps.len() as i32, - ) - .await - .map_err(|e| format!("Failed to create directive chain: {}", e))?; - - // Create steps (two passes: first create all, then resolve dependencies) - let mut step_ids: Vec<(String, Uuid)> = Vec::new(); - - for (i, plan_step) in chain_plan.steps.iter().enumerate() { - let step = repository::create_chain_step( - pool, - chain.id, - &plan_step.name, - Some(&plan_step.description), - "task", - "simple", - Some("plan"), - Some(&plan_step.task_plan), - None, // dependencies set in second pass - i as i32, - ) - .await - .map_err(|e| format!("Failed to create chain step: {}", e))?; - - step_ids.push((plan_step.name.clone(), step.id)); - } - - // Second pass: resolve name-based dependencies to UUIDs and update - for (i, plan_step) in chain_plan.steps.iter().enumerate() { - if plan_step.depends_on.is_empty() { - continue; - } - - let dep_uuids: Vec<Uuid> = plan_step - .depends_on - .iter() - .filter_map(|dep_name| { - step_ids - .iter() - .find(|(name, _)| name == dep_name) - .map(|(_, id)| *id) - }) - .collect(); - - if !dep_uuids.is_empty() { - let step_id = step_ids[i].1; - sqlx::query( - "UPDATE chain_steps SET depends_on = $2 WHERE id = $1", - ) - .bind(step_id) - .bind(&dep_uuids) - .execute(pool) - .await - .map_err(|e| format!("Failed to update step dependencies: {}", e))?; - } - } - - // Set current chain on directive - repository::set_directive_current_chain(pool, directive.id, chain.id) - .await - .map_err(|e| format!("Failed to set current chain: {}", e))?; - - // Transition directive to active - let updated_directive = repository::update_directive_status(pool, directive.id, "active") - .await - .map_err(|e| format!("Failed to update directive status: {}", e))? - .ok_or("Directive not found after status update")?; - - tracing::info!( - directive_id = %directive.id, - chain_id = %chain.id, - step_count = chain_plan.steps.len(), - "Chain plan created, advancing chain" - ); - - // Advance chain to dispatch ready steps - advance_chain(pool, state, &updated_directive, owner_id).await -} - -/// Handle a step contract task completion. -async fn on_step_completed( - pool: &PgPool, - state: &SharedState, - contract: &crate::db::models::Contract, - task: &Task, - owner_id: Uuid, -) -> Result<(), String> { - // Only process supervisor task completions - if !task.is_supervisor { - return Ok(()); - } - - let Some(directive_id) = contract.directive_id else { - return Ok(()); - }; - - // Find the step linked to this contract - let step = repository::get_step_by_contract_id(pool, contract.id) - .await - .map_err(|e| format!("Failed to get step by contract: {}", e))?; - - let Some(step) = step else { - return Ok(()); - }; - - // Idempotency: only process if step is still "running" - // (on_contract_completed may also fire via the contract path) - if step.status != "running" { - tracing::info!( - step_id = %step.id, - status = %step.status, - "Skipping on_step_completed: step no longer running" - ); - return Ok(()); - } - - // Get the directive for threshold info - let directive = repository::get_directive(pool, directive_id) - .await - .map_err(|e| format!("Failed to get directive: {}", e))? - .ok_or("Directive not found")?; - - if task.status == "done" { - // Step task succeeded — dispatch monitoring evaluation - repository::update_step_status(pool, step.id, "evaluating") - .await - .map_err(|e| format!("Failed to update step status: {}", e))?; - - let _ = repository::create_directive_event( - pool, - directive.id, - directive.current_chain_id, - Some(step.id), - "step_evaluating", - "info", - None, - "system", - None, - ) - .await; - - tracing::info!( - directive_id = %directive_id, - step_id = %step.id, - step_name = %step.name, - "Step task done, dispatching monitoring evaluation" - ); - - dispatch_monitoring(pool, state, &directive, &step, contract, owner_id).await - } else { - // Step task failed — mark step failed and advance - repository::update_step_status(pool, step.id, "failed") - .await - .map_err(|e| format!("Failed to update step status: {}", e))?; - - let _ = repository::increment_chain_failed_steps(pool, step.chain_id).await; - - tracing::info!( - directive_id = %directive_id, - step_id = %step.id, - step_name = %step.name, - "Step failed" - ); - - advance_chain(pool, state, &directive, owner_id).await - } -} - -/// Check chain progress and dispatch ready steps or mark directive complete. -async fn advance_chain( - pool: &PgPool, - state: &SharedState, - directive: &Directive, - owner_id: Uuid, -) -> Result<(), String> { - let Some(chain_id) = directive.current_chain_id else { - return Ok(()); - }; - - let steps = repository::list_steps_for_chain(pool, chain_id) - .await - .map_err(|e| format!("Failed to list steps: {}", e))?; - - // Check if all steps passed - let all_passed = steps.iter().all(|s| s.status == "passed"); - if all_passed && !steps.is_empty() { - repository::update_chain_status(pool, chain_id, "completed") - .await - .map_err(|e| format!("Failed to update chain status: {}", e))?; - repository::update_directive_status(pool, directive.id, "completed") - .await - .map_err(|e| format!("Failed to update directive status: {}", e))?; - tracing::info!(directive_id = %directive.id, "Directive completed: all steps passed"); - return Ok(()); - } - - // Check if any step failed - let any_failed = steps.iter().any(|s| s.status == "failed"); - if any_failed { - repository::update_chain_status(pool, chain_id, "failed") - .await - .map_err(|e| format!("Failed to update chain status: {}", e))?; - repository::update_directive_status(pool, directive.id, "failed") - .await - .map_err(|e| format!("Failed to update directive status: {}", e))?; - tracing::info!(directive_id = %directive.id, "Directive failed: step failure detected"); - return Ok(()); - } - - // Find and dispatch ready steps - let ready_steps = repository::find_ready_steps(pool, chain_id) - .await - .map_err(|e| format!("Failed to find ready steps: {}", e))?; - - for step in ready_steps { - if let Err(e) = dispatch_step(pool, state, directive, &step, owner_id).await { - tracing::error!( - step_id = %step.id, - step_name = %step.name, - error = %e, - "Failed to dispatch step" - ); - } - } - - Ok(()) -} - -/// Dispatch a single chain step as a new contract with supervisor. -async fn dispatch_step( - pool: &PgPool, - state: &SharedState, - directive: &Directive, - step: &crate::db::models::ChainStep, - owner_id: Uuid, -) -> Result<(), String> { - // Mark step as running - repository::update_step_status(pool, step.id, "running") - .await - .map_err(|e| format!("Failed to update step status: {}", e))?; - - // Create contract for this step. - // Step contracts use the directive's repository config — not local_only, - // so they can branch and merge to share work across steps. - let has_repo = directive.repository_url.is_some() || directive.local_path.is_some(); - let contract = repository::create_contract_for_owner( - pool, - owner_id, - CreateContractRequest { - name: step.name.clone(), - description: step.description.clone(), - contract_type: Some(step.contract_type.clone()), - template_id: None, - initial_phase: step.initial_phase.clone(), - autonomous_loop: Some(true), - phase_guard: None, - local_only: Some(!has_repo), - auto_merge_local: if has_repo { Some(true) } else { None }, - }, - ) - .await - .map_err(|e| format!("Failed to create step contract: {}", e))?; - - // Set directive_id on contract - repository::set_contract_directive_fields(pool, contract.id, Some(directive.id), false) - .await - .map_err(|e| format!("Failed to set contract directive fields: {}", e))?; - - // Build the task plan, prepending rework instructions if this is a rework cycle - let mut task_plan = step - .task_plan - .clone() - .unwrap_or_else(|| format!("Execute step: {}", step.name)); - - if let Some(eval_id) = step.last_evaluation_id { - if let Ok(Some(evaluation)) = repository::get_directive_evaluation(pool, eval_id).await { - if let Some(ref rework) = evaluation.rework_instructions { - task_plan = format!( - "IMPORTANT — REWORK REQUIRED (attempt #{}):\n\ - The previous attempt was evaluated and did NOT pass.\n\ - Feedback: {}\n\ - Rework instructions: {}\n\n\ - ---\n\n\ - Original task plan:\n{}", - step.rework_count + 1, - evaluation.summary_feedback, - rework, - task_plan, - ); - } - } - } - - // Create supervisor task - let supervisor_task = repository::create_task_for_owner( - pool, - owner_id, - CreateTaskRequest { - contract_id: Some(contract.id), - name: format!("{} Supervisor", step.name), - description: step.description.clone(), - plan: task_plan, - parent_task_id: None, - is_supervisor: true, - priority: 5, - repository_url: directive.repository_url.clone(), - base_branch: directive.base_branch.clone(), - target_branch: None, - merge_mode: None, - target_repo_path: directive.local_path.clone(), - completion_action: None, - continue_from_task_id: None, - copy_files: None, - checkpoint_sha: None, - branched_from_task_id: None, - conversation_history: None, - supervisor_worktree_task_id: None, - }, - ) - .await - .map_err(|e| format!("Failed to create step supervisor task: {}", e))?; - - // Link supervisor to contract - repository::update_contract_for_owner( - pool, - contract.id, - owner_id, - UpdateContractRequest { - supervisor_task_id: Some(supervisor_task.id), - ..Default::default() - }, - ) - .await - .map_err(|e| match e { - crate::db::repository::RepositoryError::Database(e) => { - format!("Failed to link supervisor to step contract: {}", e) - } - other => format!("Failed to link supervisor to step contract: {:?}", other), - })?; - - // Link step to contract/task - repository::update_step_contract(pool, step.id, contract.id, supervisor_task.id) - .await - .map_err(|e| format!("Failed to update step contract link: {}", e))?; - - // Copy repo config from directive to step contract - if let Some(ref repo_url) = directive.repository_url { - let _ = repository::add_remote_repository( - pool, - contract.id, - "directive-repo", - repo_url, - true, - ) - .await; - } else if let Some(ref local_path) = directive.local_path { - let _ = repository::add_local_repository( - pool, - contract.id, - "directive-repo", - local_path, - true, - ) - .await; - } - - tracing::info!( - directive_id = %directive.id, - step_id = %step.id, - step_name = %step.name, - contract_id = %contract.id, - task_id = %supervisor_task.id, - "Step dispatched" - ); - - // Dispatch step task to an available daemon immediately - dispatch_task_to_daemon( - pool, state, &supervisor_task, - contract.local_only, contract.auto_merge_local, - owner_id, - ).await?; - - Ok(()) -} - -/// Build the planning supervisor prompt from a directive. -fn build_planning_prompt(directive: &Directive) -> String { - format!( - r#"You are planning the execution of a directive. - -DIRECTIVE: {title} -GOAL: {goal} -REQUIREMENTS: {requirements} -ACCEPTANCE CRITERIA: {acceptance_criteria} -CONSTRAINTS: {constraints} - -Your job is to decompose this goal into a sequence of executable steps. -Each step will become a separate contract with its own supervisor. - -The JSON format: -{{ - "steps": [ - {{ - "name": "Step name", - "description": "What this step accomplishes", - "task_plan": "Detailed instructions for the step's supervisor", - "depends_on": [] - }} - ] -}} - -Rules: -- Steps with no dependencies (empty depends_on array) will run in parallel. -- Steps that depend on other steps will wait until those complete. -- The depends_on array contains names of steps this step depends on. -- Each step should be a self-contained unit of work. -- Be specific in task_plan — include file paths, function names, and acceptance criteria where possible. -- Keep the number of steps reasonable (3-10 typically). - -Submit your plan by piping the JSON to stdin: - echo '<your_json_plan>' | makima directive submit-plan --directive-id {directive_id} - -After submitting the plan, mark the contract as complete: - makima supervisor complete"#, - title = directive.title, - goal = directive.goal, - requirements = serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(), - acceptance_criteria = serde_json::to_string_pretty(&directive.acceptance_criteria).unwrap_or_default(), - constraints = serde_json::to_string_pretty(&directive.constraints).unwrap_or_default(), - directive_id = directive.id, - ) -} - -/// Extract JSON from file body elements. -fn extract_plan_json(body: &[crate::db::models::BodyElement]) -> Option<String> { - use crate::db::models::BodyElement; - - for element in body { - match element { - BodyElement::Code { content, .. } => { - // Try to parse as JSON - let trimmed = content.trim(); - if trimmed.starts_with('{') || trimmed.starts_with('[') { - if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() { - return Some(trimmed.to_string()); - } - } - } - BodyElement::Paragraph { text } => { - let trimmed = text.trim(); - if trimmed.starts_with('{') || trimmed.starts_with('[') { - if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() { - return Some(trimmed.to_string()); - } - } - } - BodyElement::Markdown { content } => { - // Try to find JSON in markdown content - let trimmed = content.trim(); - if trimmed.starts_with('{') || trimmed.starts_with('[') { - if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() { - return Some(trimmed.to_string()); - } - } - // Try to find JSON in code blocks within markdown - if let Some(json_start) = trimmed.find("```json") { - let after = &trimmed[json_start + 7..]; - if let Some(json_end) = after.find("```") { - let json_str = after[..json_end].trim(); - if serde_json::from_str::<serde_json::Value>(json_str).is_ok() { - return Some(json_str.to_string()); - } - } - } - } - _ => {} - } - } - - // Fallback: concatenate all text content and try to find JSON - let all_text: String = body - .iter() - .map(|el| match el { - BodyElement::Code { content, .. } => content.clone(), - BodyElement::Paragraph { text } => text.clone(), - BodyElement::Markdown { content } => content.clone(), - _ => String::new(), - }) - .collect::<Vec<_>>() - .join("\n"); - - let trimmed = all_text.trim(); - if let Some(start) = trimmed.find('{') { - // Find matching closing brace - let substr = &trimmed[start..]; - if serde_json::from_str::<serde_json::Value>(substr).is_ok() { - return Some(substr.to_string()); - } - } - - None -} - -/// Dispatch a monitoring contract to evaluate a completed step. -async fn dispatch_monitoring( - pool: &PgPool, - state: &SharedState, - directive: &Directive, - step: &ChainStep, - step_contract: &crate::db::models::Contract, - owner_id: Uuid, -) -> Result<(), String> { - // Create monitoring contract - let contract = repository::create_contract_for_owner( - pool, - owner_id, - CreateContractRequest { - name: format!("{} - Monitor", step.name), - description: Some(format!("Monitoring evaluation for step: {}", step.name)), - contract_type: Some("monitoring".to_string()), - template_id: None, - initial_phase: Some("plan".to_string()), - autonomous_loop: Some(true), - phase_guard: None, - local_only: Some(true), - auto_merge_local: None, - }, - ) - .await - .map_err(|e| format!("Failed to create monitoring contract: {}", e))?; - - // Mark contract as directive-related (not orchestrator) - repository::set_contract_directive_fields(pool, contract.id, Some(directive.id), false) - .await - .map_err(|e| format!("Failed to set monitoring contract directive fields: {}", e))?; - - // Build evaluation prompt - let prompt = build_monitoring_prompt(directive, step, step_contract); - - // Create monitoring task (NOT a supervisor — regular task that exits when done, - // which triggers on_task_completed → on_monitoring_completed automatically) - let supervisor_task = repository::create_task_for_owner( - pool, - owner_id, - CreateTaskRequest { - contract_id: Some(contract.id), - name: format!("{} - Evaluator", step.name), - description: Some("Evaluate step output against directive criteria".to_string()), - plan: prompt, - parent_task_id: None, - is_supervisor: false, - priority: 8, - repository_url: directive.repository_url.clone(), - base_branch: directive.base_branch.clone(), - target_branch: None, - merge_mode: None, - target_repo_path: directive.local_path.clone(), - completion_action: None, - continue_from_task_id: None, - copy_files: None, - checkpoint_sha: None, - branched_from_task_id: None, - conversation_history: None, - supervisor_worktree_task_id: None, - }, - ) - .await - .map_err(|e| format!("Failed to create monitoring task: {}", e))?; - - // Link monitoring task to contract - repository::update_contract_for_owner( - pool, - contract.id, - owner_id, - UpdateContractRequest { - supervisor_task_id: Some(supervisor_task.id), - ..Default::default() - }, - ) - .await - .map_err(|e| match e { - crate::db::repository::RepositoryError::Database(e) => { - format!("Failed to link task to monitoring contract: {}", e) - } - other => format!("Failed to link task to monitoring contract: {:?}", other), - })?; - - // Link step to monitoring contract/task - repository::update_step_monitoring_contract(pool, step.id, contract.id, supervisor_task.id) - .await - .map_err(|e| format!("Failed to update step monitoring contract link: {}", e))?; - - // Copy repo config from directive to monitoring contract - if let Some(ref repo_url) = directive.repository_url { - let _ = repository::add_remote_repository( - pool, - contract.id, - "directive-repo", - repo_url, - true, - ) - .await; - } else if let Some(ref local_path) = directive.local_path { - let _ = repository::add_local_repository( - pool, - contract.id, - "directive-repo", - local_path, - true, - ) - .await; - } - - tracing::info!( - directive_id = %directive.id, - step_id = %step.id, - step_name = %step.name, - monitoring_contract_id = %contract.id, - monitoring_task_id = %supervisor_task.id, - "Monitoring evaluation dispatched" - ); - - // Dispatch monitoring task to an available daemon immediately - dispatch_task_to_daemon( - pool, state, &supervisor_task, - contract.local_only, contract.auto_merge_local, - owner_id, - ).await?; - - Ok(()) -} - -/// Build the monitoring supervisor prompt. -fn build_monitoring_prompt( - directive: &Directive, - step: &ChainStep, - step_contract: &crate::db::models::Contract, -) -> String { - format!( - r#"You are evaluating the output of a completed step in a directive chain. - -DIRECTIVE: {title} -GOAL: {goal} -REQUIREMENTS: {requirements} -ACCEPTANCE CRITERIA: {acceptance_criteria} -CONSTRAINTS: {constraints} - -STEP: {step_name} -STEP DESCRIPTION: {step_description} -STEP TASK PLAN: {task_plan} -STEP CONTRACT ID: {step_contract_id} - -CONFIDENCE THRESHOLDS: -- Green (pass): >= {threshold_green} -- Yellow (marginal): >= {threshold_yellow} -- Red (fail): < {threshold_yellow} - -INSTRUCTIONS: -1. Read the step contract's files to understand what was delivered: - makima contract files --contract-id {step_contract_id} - makima contract file <file_id> --contract-id {step_contract_id} - -2. Evaluate whether the step's output meets the directive's requirements and the step's task plan. - -3. Write your evaluation as a JSON file to this monitoring contract. Create a file called - evaluation.json with the JSON content first, then upload it: - - cat > /tmp/eval-result.json << 'EVALEOF' - {{ - "passed": true, - "overallScore": 0.85, - "confidenceLevel": "green", - "criteriaResults": [ - {{ - "criterion": "Example criterion", - "passed": true, - "score": 0.9, - "evidence": "What was found" - }} - ], - "summaryFeedback": "Summary of evaluation", - "reworkInstructions": null - }} - EVALEOF - makima contract create-file evaluation-result < /tmp/eval-result.json - - Replace the example values with your actual evaluation results. - -Scoring guidelines: -- overallScore >= {threshold_green}: confidenceLevel = "green", passed = true -- overallScore >= {threshold_yellow} and < {threshold_green}: confidenceLevel = "yellow", use judgment -- overallScore < {threshold_yellow}: confidenceLevel = "red", passed = false -- Be specific in reworkInstructions if the step fails — the step will be re-executed with these instructions. -- Set reworkInstructions to null if the step passed. - -You are done after writing the evaluation file."#, - title = directive.title, - goal = directive.goal, - requirements = serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(), - acceptance_criteria = serde_json::to_string_pretty(&directive.acceptance_criteria).unwrap_or_default(), - constraints = serde_json::to_string_pretty(&directive.constraints).unwrap_or_default(), - step_name = step.name, - step_description = step.description.as_deref().unwrap_or("N/A"), - task_plan = step.task_plan.as_deref().unwrap_or("N/A"), - step_contract_id = step_contract.id, - threshold_green = directive.confidence_threshold_green, - threshold_yellow = directive.confidence_threshold_yellow, - ) -} - -/// Handle monitoring contract task completion — parse evaluation and decide step outcome. -async fn on_monitoring_completed( - pool: &PgPool, - state: &SharedState, - contract: &crate::db::models::Contract, - step: &ChainStep, - task: &Task, - owner_id: Uuid, -) -> Result<(), String> { - let Some(directive_id) = contract.directive_id else { - return Ok(()); - }; - - let directive = repository::get_directive(pool, directive_id) - .await - .map_err(|e| format!("Failed to get directive: {}", e))? - .ok_or("Directive not found")?; - - // If monitoring task itself failed, fail-open: mark step as passed - if task.status == "failed" { - tracing::warn!( - directive_id = %directive_id, - step_id = %step.id, - "Monitoring task failed, fail-open: marking step as passed" - ); - - repository::update_step_status(pool, step.id, "passed") - .await - .map_err(|e| format!("Failed to update step status: {}", e))?; - - let _ = repository::increment_chain_completed_steps(pool, step.chain_id).await; - - let _ = repository::create_directive_event( - pool, - directive_id, - directive.current_chain_id, - Some(step.id), - "monitoring_failed_open", - "warn", - None, - "system", - None, - ) - .await; - - return advance_chain(pool, state, &directive, owner_id).await; - } - - if task.status != "done" { - return Ok(()); - } - - process_monitoring_result(pool, state, contract, step, owner_id).await -} - -/// Core monitoring logic: read evaluation from files, create record, handle pass/fail/rework. -/// Called from both `on_monitoring_completed` (task path) and `on_contract_completed` (API path). -async fn process_monitoring_result( - pool: &PgPool, - state: &SharedState, - contract: &crate::db::models::Contract, - step: &ChainStep, - owner_id: Uuid, -) -> Result<(), String> { - let Some(directive_id) = contract.directive_id else { - return Ok(()); - }; - - // Idempotency guard: re-fetch step and only process if still "evaluating". - let current_step = repository::get_chain_step(pool, step.id) - .await - .map_err(|e| format!("Failed to re-fetch step: {}", e))?; - if let Some(ref s) = current_step { - if s.status != "evaluating" { - tracing::info!( - step_id = %step.id, - status = %s.status, - "Skipping process_monitoring_result: step no longer in evaluating status" - ); - return Ok(()); - } - } - - let directive = repository::get_directive(pool, directive_id) - .await - .map_err(|e| format!("Failed to get directive: {}", e))? - .ok_or("Directive not found")?; - - // Read evaluation result from monitoring contract files - let files = repository::list_files_in_contract(pool, contract.id, owner_id) - .await - .map_err(|e| format!("Failed to list monitoring contract files: {}", e))?; - - let eval_file = files.iter().find(|f| { - let name_lower = f.name.to_lowercase(); - name_lower.contains("evaluation") || name_lower.contains("eval") - }); - - let eval_file = eval_file.or_else(|| files.first()); - - let monitoring_result = if let Some(eval_file) = eval_file { - let full_file = repository::get_file(pool, eval_file.id) - .await - .map_err(|e| format!("Failed to get evaluation file: {}", e))?; - - if let Some(full_file) = full_file { - let json_str = extract_plan_json(&full_file.body); - json_str.and_then(|s| serde_json::from_str::<MonitoringResult>(&s).ok()) - } else { - None - } - } else { - None - }; - - // If we couldn't parse the result, fail-open - let Some(result) = monitoring_result else { - tracing::warn!( - directive_id = %directive_id, - step_id = %step.id, - "Could not parse monitoring result, fail-open: marking step as passed" - ); - - repository::update_step_status(pool, step.id, "passed") - .await - .map_err(|e| format!("Failed to update step status: {}", e))?; - - let _ = repository::increment_chain_completed_steps(pool, step.chain_id).await; - - let _ = repository::create_directive_event( - pool, - directive_id, - directive.current_chain_id, - Some(step.id), - "monitoring_parse_failed_open", - "warn", - None, - "system", - None, - ) - .await; - - return advance_chain(pool, state, &directive, owner_id).await; - }; - - // Create evaluation record - let chain_id = directive.current_chain_id.unwrap_or(step.chain_id); - let evaluation = repository::create_directive_evaluation( - pool, - directive_id, - chain_id, - step.id, - contract.id, - "monitoring", - Some("automated"), - result.passed, - result.overall_score, - result.confidence_level.as_deref(), - &result.criteria_results, - &result.summary_feedback, - result.rework_instructions.as_deref(), - ) - .await - .map_err(|e| format!("Failed to create directive evaluation: {}", e))?; - - // Update step evaluation fields - repository::update_step_evaluation_fields( - pool, - step.id, - result.overall_score, - result.confidence_level.as_deref(), - evaluation.id, - ) - .await - .map_err(|e| format!("Failed to update step evaluation fields: {}", e))?; - - // Create event - let event_data = serde_json::json!({ - "passed": result.passed, - "overallScore": result.overall_score, - "confidenceLevel": result.confidence_level, - "summaryFeedback": result.summary_feedback, - }); - let _ = repository::create_directive_event( - pool, - directive_id, - Some(chain_id), - Some(step.id), - if result.passed { "step_evaluation_passed" } else { "step_evaluation_failed" }, - "info", - Some(&event_data), - "system", - None, - ) - .await; - - if result.passed { - // Evaluation passed — mark step as passed - tracing::info!( - directive_id = %directive_id, - step_id = %step.id, - step_name = %step.name, - score = ?result.overall_score, - "Step evaluation passed" - ); - - repository::update_step_status(pool, step.id, "passed") - .await - .map_err(|e| format!("Failed to update step status: {}", e))?; - - let _ = repository::increment_chain_completed_steps(pool, step.chain_id).await; - - advance_chain(pool, state, &directive, owner_id).await - } else { - // Evaluation failed — check rework budget - let max_rework = directive.max_rework_cycles.unwrap_or(3); - if step.rework_count >= max_rework { - tracing::warn!( - directive_id = %directive_id, - step_id = %step.id, - step_name = %step.name, - rework_count = step.rework_count, - max_rework = max_rework, - "Step evaluation failed, max rework cycles exceeded" - ); - - repository::update_step_status(pool, step.id, "failed") - .await - .map_err(|e| format!("Failed to update step status: {}", e))?; - - let _ = repository::increment_chain_failed_steps(pool, step.chain_id).await; - - advance_chain(pool, state, &directive, owner_id).await - } else { - tracing::info!( - directive_id = %directive_id, - step_id = %step.id, - step_name = %step.name, - rework_count = step.rework_count, - "Step evaluation failed, scheduling rework" - ); - - repository::increment_step_rework_count(pool, step.id) - .await - .map_err(|e| format!("Failed to increment rework count: {}", e))?; - - // Set step back to pending so advance_chain re-dispatches it - repository::update_step_status(pool, step.id, "pending") - .await - .map_err(|e| format!("Failed to update step status: {}", e))?; - - advance_chain(pool, state, &directive, owner_id).await - } - } -} - -/// Trigger a manual evaluation for a step. Public for use by handlers. -pub async fn trigger_manual_evaluation( - pool: &PgPool, - state: &SharedState, - owner_id: Uuid, - directive_id: Uuid, - step_id: Uuid, -) -> Result<ChainStep, String> { - let directive = repository::get_directive_for_owner(pool, directive_id, owner_id) - .await - .map_err(|e| format!("Failed to get directive: {}", e))? - .ok_or("Directive not found")?; - - // Get the step — find via chain steps - let chain_id = directive.current_chain_id.ok_or("Directive has no active chain")?; - let steps = repository::list_steps_for_chain(pool, chain_id) - .await - .map_err(|e| format!("Failed to list steps: {}", e))?; - - let step = steps - .into_iter() - .find(|s| s.id == step_id) - .ok_or("Step not found in current chain")?; - - // Step must have a contract_id (must have been executed) - let contract_id = step.contract_id.ok_or("Step has no contract — it hasn't been executed yet")?; - - let contract = repository::get_contract_for_owner(pool, contract_id, owner_id) - .await - .map_err(|e| format!("Failed to get step contract: {}", e))? - .ok_or("Step contract not found")?; - - // Set step to evaluating - let updated_step = repository::update_step_status(pool, step.id, "evaluating") - .await - .map_err(|e| format!("Failed to update step status: {}", e))? - .ok_or("Step not found after status update")?; - - let _ = repository::create_directive_event( - pool, - directive.id, - directive.current_chain_id, - Some(step.id), - "manual_evaluation_triggered", - "info", - None, - "user", - None, - ) - .await; - - dispatch_monitoring(pool, state, &directive, &step, &contract, owner_id).await?; - - Ok(updated_step) -} |
