summaryrefslogtreecommitdiff
path: root/makima/src/orchestration
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-02-08 21:07:30 +0000
committersoryu <soryu@soryu.co>2026-02-08 21:07:30 +0000
commit3662b334dfd68cfdf00ed44ae88927c2e1b2aabe (patch)
treebff5ae1e189fb8bcc0211d97dab3b9acb4257038 /makima/src/orchestration
parent87fa8c4af66745bd30bc84b6c5ef657dd4dec002 (diff)
downloadsoryu-3662b334dfd68cfdf00ed44ae88927c2e1b2aabe.tar.gz
soryu-3662b334dfd68cfdf00ed44ae88927c2e1b2aabe.zip
Remove directive mechanism
Diffstat (limited to 'makima/src/orchestration')
-rw-r--r--makima/src/orchestration/directive.rs1685
-rw-r--r--makima/src/orchestration/mod.rs2
2 files changed, 1 insertions, 1686 deletions
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs
deleted file mode 100644
index 46d9425..0000000
--- a/makima/src/orchestration/directive.rs
+++ /dev/null
@@ -1,1685 +0,0 @@
-//! Directive orchestration — init, planning completion, chain advancement.
-
-use serde::Deserialize;
-use sqlx::PgPool;
-use uuid::Uuid;
-
-use serde::Serialize;
-use crate::db::models::{
- ChainStep, CreateContractRequest, CreateTaskRequest, Directive, Task,
- UpdateContractRequest, UpdateTaskRequest,
-};
-use crate::db::repository;
-use crate::server::state::{DaemonCommand, SharedState};
-
-/// A single step in the chain plan produced by the planning supervisor.
-#[derive(Debug, Deserialize)]
-#[serde(rename_all = "snake_case")]
-struct ChainPlanStep {
- name: String,
- description: String,
- #[serde(alias = "taskPlan")]
- task_plan: String,
- #[serde(default, alias = "dependsOn")]
- depends_on: Vec<String>, // names of steps this depends on
-}
-
-/// Wrapper for the plan JSON written by the planning supervisor.
-#[derive(Debug, Deserialize)]
-struct ChainPlan {
- steps: Vec<ChainPlanStep>,
-}
-
-/// Result written by the monitoring supervisor after evaluating a step.
-#[derive(Debug, Deserialize, Serialize)]
-#[serde(rename_all = "camelCase")]
-struct MonitoringResult {
- passed: bool,
- overall_score: Option<f64>,
- confidence_level: Option<String>,
- #[serde(default)]
- criteria_results: serde_json::Value,
- #[serde(default)]
- summary_feedback: String,
- rework_instructions: Option<String>,
-}
-
-/// Dispatch a task to an available daemon. Finds a connected daemon with capacity,
-/// assigns the task, and sends a SpawnTask command.
-async fn dispatch_task_to_daemon(
- pool: &PgPool,
- state: &SharedState,
- task: &Task,
- contract_local_only: bool,
- contract_auto_merge_local: bool,
- owner_id: Uuid,
-) -> Result<(), String> {
- // Find available daemons
- let daemons = repository::get_available_daemons_excluding(pool, owner_id, &[])
- .await
- .map_err(|e| format!("Failed to get available daemons: {}", e))?;
-
- let available_daemon = daemons.iter().find(|d| {
- d.current_task_count < d.max_concurrent_tasks
- && state.daemon_connections.contains_key(&d.connection_id)
- });
-
- let daemon = match available_daemon {
- Some(d) => d,
- None => {
- tracing::warn!(
- task_id = %task.id,
- "No daemon available to dispatch task — will be picked up by retry loop"
- );
- return Ok(());
- }
- };
-
- // Assign task to daemon
- let update_req = UpdateTaskRequest {
- status: Some("starting".to_string()),
- daemon_id: Some(daemon.id),
- version: Some(task.version),
- ..Default::default()
- };
-
- let updated = repository::update_task_for_owner(pool, task.id, owner_id, update_req)
- .await
- .map_err(|e| format!("Failed to assign task to daemon: {:?}", e))?;
-
- let Some(updated_task) = updated else {
- return Err("Task not found when assigning to daemon".to_string());
- };
-
- // Get repo URL from task or contract repositories
- let repo_url = if let Some(url) = &updated_task.repository_url {
- Some(url.clone())
- } else if let Some(contract_id) = updated_task.contract_id {
- match repository::list_contract_repositories(pool, contract_id).await {
- Ok(repos) => repos
- .iter()
- .find(|r| r.is_primary)
- .or(repos.first())
- .and_then(|r| r.repository_url.clone().or_else(|| r.local_path.clone())),
- Err(_) => None,
- }
- } else {
- None
- };
-
- let cmd = DaemonCommand::SpawnTask {
- task_id: updated_task.id,
- task_name: updated_task.name.clone(),
- plan: updated_task.plan.clone(),
- repo_url,
- base_branch: updated_task.base_branch.clone(),
- target_branch: updated_task.target_branch.clone(),
- parent_task_id: updated_task.parent_task_id,
- depth: updated_task.depth,
- is_orchestrator: false,
- target_repo_path: updated_task.target_repo_path.clone(),
- completion_action: updated_task.completion_action.clone(),
- continue_from_task_id: updated_task.continue_from_task_id,
- copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
- contract_id: updated_task.contract_id,
- is_supervisor: updated_task.is_supervisor,
- autonomous_loop: updated_task.contract_id.is_some(),
- resume_session: false,
- conversation_history: None,
- patch_data: None,
- patch_base_sha: None,
- local_only: contract_local_only,
- auto_merge_local: contract_auto_merge_local,
- supervisor_worktree_task_id: None,
- };
-
- if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
- tracing::warn!(
- task_id = %task.id,
- daemon_id = %daemon.id,
- error = %e,
- "Failed to send spawn command — rolling back"
- );
- let rollback = UpdateTaskRequest {
- status: Some("pending".to_string()),
- clear_daemon_id: true,
- ..Default::default()
- };
- let _ = repository::update_task_for_owner(pool, task.id, owner_id, rollback).await;
- return Ok(()); // Non-fatal, retry loop will pick it up
- }
-
- tracing::info!(
- task_id = %task.id,
- daemon_id = %daemon.id,
- "Dispatched directive task to daemon"
- );
-
- Ok(())
-}
-
-/// Initialize a directive: create a planning contract and transition to "planning".
-pub async fn init_directive(
- pool: &PgPool,
- state: &SharedState,
- owner_id: Uuid,
- directive_id: Uuid,
-) -> Result<Directive, String> {
- // 1. Get directive, verify status
- let directive = repository::get_directive_for_owner(pool, directive_id, owner_id)
- .await
- .map_err(|e| format!("Failed to get directive: {}", e))?
- .ok_or("Directive not found")?;
-
- if directive.status != "draft" {
- return Err(format!(
- "Directive must be in 'draft' status to start, current status: '{}'",
- directive.status
- ));
- }
-
- // 2. Create planning contract
- let contract = repository::create_contract_for_owner(
- pool,
- owner_id,
- CreateContractRequest {
- name: format!("{} - Planning", directive.title),
- description: Some(format!(
- "Planning contract for directive: {}",
- directive.title
- )),
- contract_type: Some("simple".to_string()),
- template_id: None,
- initial_phase: Some("plan".to_string()),
- autonomous_loop: Some(true),
- phase_guard: None,
- local_only: Some(true),
- auto_merge_local: None,
- },
- )
- .await
- .map_err(|e| format!("Failed to create planning contract: {}", e))?;
-
- // 3. Mark contract as directive orchestrator
- repository::set_contract_directive_fields(pool, contract.id, Some(directive_id), true)
- .await
- .map_err(|e| format!("Failed to set contract directive fields: {}", e))?;
-
- // 4. Build planning prompt
- let planning_prompt = build_planning_prompt(&directive);
-
- // 5. Create supervisor task
- let supervisor_task = repository::create_task_for_owner(
- pool,
- owner_id,
- CreateTaskRequest {
- contract_id: Some(contract.id),
- name: format!("{} - Planner", directive.title),
- description: Some("Decompose directive goal into executable chain steps".to_string()),
- plan: planning_prompt,
- parent_task_id: None,
- is_supervisor: true,
- priority: 10,
- repository_url: directive.repository_url.clone(),
- base_branch: directive.base_branch.clone(),
- target_branch: None,
- merge_mode: None,
- target_repo_path: directive.local_path.clone(),
- completion_action: None,
- continue_from_task_id: None,
- copy_files: None,
- checkpoint_sha: None,
- branched_from_task_id: None,
- conversation_history: None,
- supervisor_worktree_task_id: None,
- },
- )
- .await
- .map_err(|e| format!("Failed to create supervisor task: {}", e))?;
-
- // 6. Link supervisor to contract
- repository::update_contract_for_owner(
- pool,
- contract.id,
- owner_id,
- UpdateContractRequest {
- supervisor_task_id: Some(supervisor_task.id),
- ..Default::default()
- },
- )
- .await
- .map_err(|e| match e {
- crate::db::repository::RepositoryError::Database(e) => {
- format!("Failed to link supervisor to contract: {}", e)
- }
- other => format!("Failed to link supervisor to contract: {:?}", other),
- })?;
-
- // 7. Set orchestrator_contract_id on directive
- repository::set_directive_orchestrator_contract(pool, directive_id, contract.id)
- .await
- .map_err(|e| format!("Failed to set orchestrator contract: {}", e))?;
-
- // 8. Transition directive to "planning"
- let updated = repository::update_directive_status(pool, directive_id, "planning")
- .await
- .map_err(|e| format!("Failed to update directive status: {}", e))?
- .ok_or("Directive not found after status update")?;
-
- // 9. Copy repo config to contract if repository_url is set
- if let Some(ref repo_url) = directive.repository_url {
- let _ = repository::add_remote_repository(
- pool,
- contract.id,
- "directive-repo",
- repo_url,
- true,
- )
- .await;
- } else if let Some(ref local_path) = directive.local_path {
- let _ = repository::add_local_repository(
- pool,
- contract.id,
- "directive-repo",
- local_path,
- true,
- )
- .await;
- }
-
- tracing::info!(
- directive_id = %directive_id,
- contract_id = %contract.id,
- task_id = %supervisor_task.id,
- "Directive started: planning contract created"
- );
-
- // 10. Dispatch planning task to an available daemon immediately
- dispatch_task_to_daemon(
- pool, state, &supervisor_task,
- contract.local_only, contract.auto_merge_local,
- owner_id,
- ).await?;
-
- Ok(updated)
-}
-
-/// Submit a chain plan for a directive via the CLI/API (instead of file-based extraction).
-pub async fn submit_plan(
- pool: &PgPool,
- state: &SharedState,
- owner_id: Uuid,
- directive_id: Uuid,
- plan_json: &str,
-) -> Result<Directive, String> {
- // 1. Get directive, verify status
- let directive = repository::get_directive_for_owner(pool, directive_id, owner_id)
- .await
- .map_err(|e| format!("Failed to get directive: {}", e))?
- .ok_or("Directive not found")?;
-
- if directive.status != "planning" {
- return Err(format!(
- "Directive must be in 'planning' status to submit a plan, current status: '{}'",
- directive.status
- ));
- }
-
- // 2. Idempotency: if current_chain_id already set, return existing directive
- if directive.current_chain_id.is_some() {
- tracing::info!(
- directive_id = %directive_id,
- "Plan already submitted (current_chain_id set), returning existing directive"
- );
- return Ok(directive);
- }
-
- // 3. Parse the plan JSON
- let chain_plan: ChainPlan = serde_json::from_str(plan_json)
- .map_err(|e| format!("Failed to parse chain plan JSON: {}", e))?;
-
- if chain_plan.steps.is_empty() {
- return Err("Chain plan has no steps".to_string());
- }
-
- // 4. Create chain and steps, transition to active
- create_chain_and_steps(pool, state, &directive, &chain_plan, owner_id).await?;
-
- // 5. Re-fetch and return the updated directive
- let updated = repository::get_directive(pool, directive_id)
- .await
- .map_err(|e| format!("Failed to re-fetch directive: {}", e))?
- .ok_or("Directive not found after plan submission")?;
-
- tracing::info!(
- directive_id = %directive_id,
- step_count = chain_plan.steps.len(),
- "Plan submitted via API, directive now active"
- );
-
- Ok(updated)
-}
-
-/// Called when any task completes — checks if it's directive-related and advances.
-/// Called when a contract's status is updated to "completed" via the API.
-/// This is the primary entry point for directive orchestration because supervisor
-/// tasks do not send TaskComplete messages — they complete via contract status updates.
-pub async fn on_contract_completed(
- pool: &PgPool,
- state: &SharedState,
- contract: &crate::db::models::Contract,
- owner_id: Uuid,
-) -> Result<(), String> {
- if contract.status != "completed" {
- return Ok(());
- }
-
- if contract.is_directive_orchestrator {
- let directive =
- repository::get_directive_by_orchestrator_contract(pool, contract.id)
- .await
- .map_err(|e| format!("Failed to get directive by orchestrator: {}", e))?;
-
- if let Some(directive) = directive {
- tracing::info!(
- directive_id = %directive.id,
- contract_id = %contract.id,
- "Directive orchestrator contract completed, handling planning completion"
- );
- handle_planning_completion(pool, state, &directive, owner_id).await?;
- } else {
- tracing::warn!(
- contract_id = %contract.id,
- "Directive orchestrator contract completed but no directive found"
- );
- }
- } else if let Some(directive_id) = contract.directive_id {
- // Check if this is a monitoring contract
- let monitoring_step =
- repository::get_step_by_monitoring_contract_id(pool, contract.id)
- .await
- .map_err(|e| format!("Failed to check monitoring contract: {}", e))?;
-
- if let Some(step) = monitoring_step {
- tracing::info!(
- directive_id = %directive_id,
- step_id = %step.id,
- contract_id = %contract.id,
- "Monitoring contract completed"
- );
- process_monitoring_result(pool, state, contract, &step, owner_id).await?;
- } else {
- // Step contract completed
- let step = repository::get_step_by_contract_id(pool, contract.id)
- .await
- .map_err(|e| format!("Failed to get step by contract: {}", e))?;
-
- if let Some(step) = step {
- // Idempotency: only dispatch monitoring if step is still "running"
- // (on_step_completed may also fire via the task path)
- if step.status != "running" {
- tracing::info!(
- step_id = %step.id,
- status = %step.status,
- "Skipping step contract completion: step no longer running"
- );
- return Ok(());
- }
-
- let directive = repository::get_directive(pool, directive_id)
- .await
- .map_err(|e| format!("Failed to get directive: {}", e))?
- .ok_or("Directive not found")?;
-
- tracing::info!(
- directive_id = %directive_id,
- step_id = %step.id,
- contract_id = %contract.id,
- "Step contract completed, dispatching monitoring"
- );
-
- // Step contract completed successfully — dispatch monitoring
- repository::update_step_status(pool, step.id, "evaluating")
- .await
- .map_err(|e| format!("Failed to update step status: {}", e))?;
-
- let _ = repository::create_directive_event(
- pool,
- directive.id,
- directive.current_chain_id,
- Some(step.id),
- "step_evaluating",
- "info",
- None,
- "system",
- None,
- )
- .await;
-
- dispatch_monitoring(pool, state, &directive, &step, contract, owner_id).await?;
- }
- }
- }
-
- Ok(())
-}
-
-pub async fn on_task_completed(
- pool: &PgPool,
- state: &SharedState,
- task: &Task,
- owner_id: Uuid,
-) -> Result<(), String> {
- let Some(contract_id) = task.contract_id else {
- return Ok(());
- };
-
- let contract = repository::get_contract_for_owner(pool, contract_id, owner_id)
- .await
- .map_err(|e| format!("Failed to get contract: {}", e))?;
-
- let Some(contract) = contract else {
- return Ok(());
- };
-
- if contract.is_directive_orchestrator {
- // This is a planning contract completion
- let directive =
- repository::get_directive_by_orchestrator_contract(pool, contract_id)
- .await
- .map_err(|e| format!("Failed to get directive by orchestrator: {}", e))?;
-
- if let Some(directive) = directive {
- on_planning_completed(pool, state, &directive, task, owner_id).await?;
- }
- } else if contract.directive_id.is_some() {
- // Check if this is a monitoring contract completion
- let monitoring_step =
- repository::get_step_by_monitoring_contract_id(pool, contract_id)
- .await
- .map_err(|e| format!("Failed to check monitoring contract: {}", e))?;
-
- if let Some(step) = monitoring_step {
- on_monitoring_completed(pool, state, &contract, &step, task, owner_id).await?;
- } else {
- // This is a step contract completion
- on_step_completed(pool, state, &contract, task, owner_id).await?;
- }
- }
-
- Ok(())
-}
-
-/// Handle planning task completion: parse chain plan, create steps, advance.
-async fn on_planning_completed(
- pool: &PgPool,
- state: &SharedState,
- directive: &Directive,
- task: &Task,
- owner_id: Uuid,
-) -> Result<(), String> {
- // If task failed, fail the directive
- if task.status == "failed" {
- tracing::warn!(
- directive_id = %directive.id,
- task_id = %task.id,
- "Planning task failed, marking directive as failed"
- );
- repository::update_directive_status(pool, directive.id, "failed")
- .await
- .map_err(|e| format!("Failed to update directive status: {}", e))?;
- return Ok(());
- }
-
- // Only process when the supervisor task itself is done
- if task.status != "done" || !task.is_supervisor {
- return Ok(());
- }
-
- handle_planning_completion(pool, state, directive, owner_id).await
-}
-
-/// Handle planning contract/task completion.
-/// Checks if a plan was submitted via the CLI; if not, retries or fails.
-async fn handle_planning_completion(
- pool: &PgPool,
- state: &SharedState,
- directive: &Directive,
- owner_id: Uuid,
-) -> Result<(), String> {
- // Re-fetch directive to check latest state
- let current = repository::get_directive(pool, directive.id)
- .await
- .map_err(|e| format!("Failed to re-fetch directive: {}", e))?
- .ok_or("Directive not found")?;
-
- // Idempotency: only process if still in "planning" status
- if current.status != "planning" {
- tracing::info!(
- directive_id = %directive.id,
- status = %current.status,
- "Skipping handle_planning_completion: directive no longer in planning status"
- );
- return Ok(());
- }
-
- // If plan was already submitted via CLI (current_chain_id is set), nothing to do
- if current.current_chain_id.is_some() {
- tracing::info!(
- directive_id = %directive.id,
- "Plan already submitted via CLI, skipping handle_planning_completion"
- );
- return Ok(());
- }
-
- // No plan was submitted — check retry budget
- let max_regenerations = current.max_chain_regenerations.unwrap_or(2);
- if current.chain_generation_count < max_regenerations {
- tracing::warn!(
- directive_id = %directive.id,
- attempt = current.chain_generation_count + 1,
- max = max_regenerations,
- "Planning completed without plan submission, retrying"
- );
-
- let _ = repository::create_directive_event(
- pool,
- directive.id,
- None,
- None,
- "planning_retry",
- "warn",
- Some(&serde_json::json!({
- "attempt": current.chain_generation_count + 1,
- "maxRegenerations": max_regenerations,
- "reason": "Planning contract completed without submitting a plan"
- })),
- "system",
- None,
- )
- .await;
-
- // Increment generation count
- repository::increment_chain_generation_count(pool, directive.id)
- .await
- .map_err(|e| format!("Failed to increment chain generation count: {}", e))?;
-
- // Reset to draft so init_directive can be called again
- repository::update_directive_status(pool, directive.id, "draft")
- .await
- .map_err(|e| format!("Failed to reset directive status: {}", e))?;
-
- // Re-init planning
- init_directive(pool, state, owner_id, directive.id).await?;
-
- Ok(())
- } else {
- tracing::error!(
- directive_id = %directive.id,
- attempts = current.chain_generation_count,
- max = max_regenerations,
- "Planning failed: max regeneration attempts exhausted without plan submission"
- );
-
- let _ = repository::create_directive_event(
- pool,
- directive.id,
- None,
- None,
- "planning_failed",
- "error",
- Some(&serde_json::json!({
- "attempts": current.chain_generation_count,
- "maxRegenerations": max_regenerations,
- "reason": "Max chain regeneration attempts exhausted without plan submission"
- })),
- "system",
- None,
- )
- .await;
-
- repository::update_directive_status(pool, directive.id, "failed")
- .await
- .map_err(|e| format!("Failed to update directive status: {}", e))?;
-
- Ok(())
- }
-}
-
-/// Inner helper: create chain, steps, set current chain, transition to active, and advance.
-/// Extracted so that `process_planning_result` can catch errors and mark the directive failed.
-async fn create_chain_and_steps(
- pool: &PgPool,
- state: &SharedState,
- directive: &Directive,
- chain_plan: &ChainPlan,
- owner_id: Uuid,
-) -> Result<(), String> {
- // Create chain
- let chain = repository::create_directive_chain(
- pool,
- directive.id,
- &format!("{} - Chain", directive.title),
- Some("Auto-generated from planning"),
- None,
- chain_plan.steps.len() as i32,
- )
- .await
- .map_err(|e| format!("Failed to create directive chain: {}", e))?;
-
- // Create steps (two passes: first create all, then resolve dependencies)
- let mut step_ids: Vec<(String, Uuid)> = Vec::new();
-
- for (i, plan_step) in chain_plan.steps.iter().enumerate() {
- let step = repository::create_chain_step(
- pool,
- chain.id,
- &plan_step.name,
- Some(&plan_step.description),
- "task",
- "simple",
- Some("plan"),
- Some(&plan_step.task_plan),
- None, // dependencies set in second pass
- i as i32,
- )
- .await
- .map_err(|e| format!("Failed to create chain step: {}", e))?;
-
- step_ids.push((plan_step.name.clone(), step.id));
- }
-
- // Second pass: resolve name-based dependencies to UUIDs and update
- for (i, plan_step) in chain_plan.steps.iter().enumerate() {
- if plan_step.depends_on.is_empty() {
- continue;
- }
-
- let dep_uuids: Vec<Uuid> = plan_step
- .depends_on
- .iter()
- .filter_map(|dep_name| {
- step_ids
- .iter()
- .find(|(name, _)| name == dep_name)
- .map(|(_, id)| *id)
- })
- .collect();
-
- if !dep_uuids.is_empty() {
- let step_id = step_ids[i].1;
- sqlx::query(
- "UPDATE chain_steps SET depends_on = $2 WHERE id = $1",
- )
- .bind(step_id)
- .bind(&dep_uuids)
- .execute(pool)
- .await
- .map_err(|e| format!("Failed to update step dependencies: {}", e))?;
- }
- }
-
- // Set current chain on directive
- repository::set_directive_current_chain(pool, directive.id, chain.id)
- .await
- .map_err(|e| format!("Failed to set current chain: {}", e))?;
-
- // Transition directive to active
- let updated_directive = repository::update_directive_status(pool, directive.id, "active")
- .await
- .map_err(|e| format!("Failed to update directive status: {}", e))?
- .ok_or("Directive not found after status update")?;
-
- tracing::info!(
- directive_id = %directive.id,
- chain_id = %chain.id,
- step_count = chain_plan.steps.len(),
- "Chain plan created, advancing chain"
- );
-
- // Advance chain to dispatch ready steps
- advance_chain(pool, state, &updated_directive, owner_id).await
-}
-
-/// Handle a step contract task completion.
-async fn on_step_completed(
- pool: &PgPool,
- state: &SharedState,
- contract: &crate::db::models::Contract,
- task: &Task,
- owner_id: Uuid,
-) -> Result<(), String> {
- // Only process supervisor task completions
- if !task.is_supervisor {
- return Ok(());
- }
-
- let Some(directive_id) = contract.directive_id else {
- return Ok(());
- };
-
- // Find the step linked to this contract
- let step = repository::get_step_by_contract_id(pool, contract.id)
- .await
- .map_err(|e| format!("Failed to get step by contract: {}", e))?;
-
- let Some(step) = step else {
- return Ok(());
- };
-
- // Idempotency: only process if step is still "running"
- // (on_contract_completed may also fire via the contract path)
- if step.status != "running" {
- tracing::info!(
- step_id = %step.id,
- status = %step.status,
- "Skipping on_step_completed: step no longer running"
- );
- return Ok(());
- }
-
- // Get the directive for threshold info
- let directive = repository::get_directive(pool, directive_id)
- .await
- .map_err(|e| format!("Failed to get directive: {}", e))?
- .ok_or("Directive not found")?;
-
- if task.status == "done" {
- // Step task succeeded — dispatch monitoring evaluation
- repository::update_step_status(pool, step.id, "evaluating")
- .await
- .map_err(|e| format!("Failed to update step status: {}", e))?;
-
- let _ = repository::create_directive_event(
- pool,
- directive.id,
- directive.current_chain_id,
- Some(step.id),
- "step_evaluating",
- "info",
- None,
- "system",
- None,
- )
- .await;
-
- tracing::info!(
- directive_id = %directive_id,
- step_id = %step.id,
- step_name = %step.name,
- "Step task done, dispatching monitoring evaluation"
- );
-
- dispatch_monitoring(pool, state, &directive, &step, contract, owner_id).await
- } else {
- // Step task failed — mark step failed and advance
- repository::update_step_status(pool, step.id, "failed")
- .await
- .map_err(|e| format!("Failed to update step status: {}", e))?;
-
- let _ = repository::increment_chain_failed_steps(pool, step.chain_id).await;
-
- tracing::info!(
- directive_id = %directive_id,
- step_id = %step.id,
- step_name = %step.name,
- "Step failed"
- );
-
- advance_chain(pool, state, &directive, owner_id).await
- }
-}
-
-/// Check chain progress and dispatch ready steps or mark directive complete.
-async fn advance_chain(
- pool: &PgPool,
- state: &SharedState,
- directive: &Directive,
- owner_id: Uuid,
-) -> Result<(), String> {
- let Some(chain_id) = directive.current_chain_id else {
- return Ok(());
- };
-
- let steps = repository::list_steps_for_chain(pool, chain_id)
- .await
- .map_err(|e| format!("Failed to list steps: {}", e))?;
-
- // Check if all steps passed
- let all_passed = steps.iter().all(|s| s.status == "passed");
- if all_passed && !steps.is_empty() {
- repository::update_chain_status(pool, chain_id, "completed")
- .await
- .map_err(|e| format!("Failed to update chain status: {}", e))?;
- repository::update_directive_status(pool, directive.id, "completed")
- .await
- .map_err(|e| format!("Failed to update directive status: {}", e))?;
- tracing::info!(directive_id = %directive.id, "Directive completed: all steps passed");
- return Ok(());
- }
-
- // Check if any step failed
- let any_failed = steps.iter().any(|s| s.status == "failed");
- if any_failed {
- repository::update_chain_status(pool, chain_id, "failed")
- .await
- .map_err(|e| format!("Failed to update chain status: {}", e))?;
- repository::update_directive_status(pool, directive.id, "failed")
- .await
- .map_err(|e| format!("Failed to update directive status: {}", e))?;
- tracing::info!(directive_id = %directive.id, "Directive failed: step failure detected");
- return Ok(());
- }
-
- // Find and dispatch ready steps
- let ready_steps = repository::find_ready_steps(pool, chain_id)
- .await
- .map_err(|e| format!("Failed to find ready steps: {}", e))?;
-
- for step in ready_steps {
- if let Err(e) = dispatch_step(pool, state, directive, &step, owner_id).await {
- tracing::error!(
- step_id = %step.id,
- step_name = %step.name,
- error = %e,
- "Failed to dispatch step"
- );
- }
- }
-
- Ok(())
-}
-
-/// Dispatch a single chain step as a new contract with supervisor.
-async fn dispatch_step(
- pool: &PgPool,
- state: &SharedState,
- directive: &Directive,
- step: &crate::db::models::ChainStep,
- owner_id: Uuid,
-) -> Result<(), String> {
- // Mark step as running
- repository::update_step_status(pool, step.id, "running")
- .await
- .map_err(|e| format!("Failed to update step status: {}", e))?;
-
- // Create contract for this step.
- // Step contracts use the directive's repository config — not local_only,
- // so they can branch and merge to share work across steps.
- let has_repo = directive.repository_url.is_some() || directive.local_path.is_some();
- let contract = repository::create_contract_for_owner(
- pool,
- owner_id,
- CreateContractRequest {
- name: step.name.clone(),
- description: step.description.clone(),
- contract_type: Some(step.contract_type.clone()),
- template_id: None,
- initial_phase: step.initial_phase.clone(),
- autonomous_loop: Some(true),
- phase_guard: None,
- local_only: Some(!has_repo),
- auto_merge_local: if has_repo { Some(true) } else { None },
- },
- )
- .await
- .map_err(|e| format!("Failed to create step contract: {}", e))?;
-
- // Set directive_id on contract
- repository::set_contract_directive_fields(pool, contract.id, Some(directive.id), false)
- .await
- .map_err(|e| format!("Failed to set contract directive fields: {}", e))?;
-
- // Build the task plan, prepending rework instructions if this is a rework cycle
- let mut task_plan = step
- .task_plan
- .clone()
- .unwrap_or_else(|| format!("Execute step: {}", step.name));
-
- if let Some(eval_id) = step.last_evaluation_id {
- if let Ok(Some(evaluation)) = repository::get_directive_evaluation(pool, eval_id).await {
- if let Some(ref rework) = evaluation.rework_instructions {
- task_plan = format!(
- "IMPORTANT — REWORK REQUIRED (attempt #{}):\n\
- The previous attempt was evaluated and did NOT pass.\n\
- Feedback: {}\n\
- Rework instructions: {}\n\n\
- ---\n\n\
- Original task plan:\n{}",
- step.rework_count + 1,
- evaluation.summary_feedback,
- rework,
- task_plan,
- );
- }
- }
- }
-
- // Create supervisor task
- let supervisor_task = repository::create_task_for_owner(
- pool,
- owner_id,
- CreateTaskRequest {
- contract_id: Some(contract.id),
- name: format!("{} Supervisor", step.name),
- description: step.description.clone(),
- plan: task_plan,
- parent_task_id: None,
- is_supervisor: true,
- priority: 5,
- repository_url: directive.repository_url.clone(),
- base_branch: directive.base_branch.clone(),
- target_branch: None,
- merge_mode: None,
- target_repo_path: directive.local_path.clone(),
- completion_action: None,
- continue_from_task_id: None,
- copy_files: None,
- checkpoint_sha: None,
- branched_from_task_id: None,
- conversation_history: None,
- supervisor_worktree_task_id: None,
- },
- )
- .await
- .map_err(|e| format!("Failed to create step supervisor task: {}", e))?;
-
- // Link supervisor to contract
- repository::update_contract_for_owner(
- pool,
- contract.id,
- owner_id,
- UpdateContractRequest {
- supervisor_task_id: Some(supervisor_task.id),
- ..Default::default()
- },
- )
- .await
- .map_err(|e| match e {
- crate::db::repository::RepositoryError::Database(e) => {
- format!("Failed to link supervisor to step contract: {}", e)
- }
- other => format!("Failed to link supervisor to step contract: {:?}", other),
- })?;
-
- // Link step to contract/task
- repository::update_step_contract(pool, step.id, contract.id, supervisor_task.id)
- .await
- .map_err(|e| format!("Failed to update step contract link: {}", e))?;
-
- // Copy repo config from directive to step contract
- if let Some(ref repo_url) = directive.repository_url {
- let _ = repository::add_remote_repository(
- pool,
- contract.id,
- "directive-repo",
- repo_url,
- true,
- )
- .await;
- } else if let Some(ref local_path) = directive.local_path {
- let _ = repository::add_local_repository(
- pool,
- contract.id,
- "directive-repo",
- local_path,
- true,
- )
- .await;
- }
-
- tracing::info!(
- directive_id = %directive.id,
- step_id = %step.id,
- step_name = %step.name,
- contract_id = %contract.id,
- task_id = %supervisor_task.id,
- "Step dispatched"
- );
-
- // Dispatch step task to an available daemon immediately
- dispatch_task_to_daemon(
- pool, state, &supervisor_task,
- contract.local_only, contract.auto_merge_local,
- owner_id,
- ).await?;
-
- Ok(())
-}
-
-/// Build the planning supervisor prompt from a directive.
-fn build_planning_prompt(directive: &Directive) -> String {
- format!(
- r#"You are planning the execution of a directive.
-
-DIRECTIVE: {title}
-GOAL: {goal}
-REQUIREMENTS: {requirements}
-ACCEPTANCE CRITERIA: {acceptance_criteria}
-CONSTRAINTS: {constraints}
-
-Your job is to decompose this goal into a sequence of executable steps.
-Each step will become a separate contract with its own supervisor.
-
-The JSON format:
-{{
- "steps": [
- {{
- "name": "Step name",
- "description": "What this step accomplishes",
- "task_plan": "Detailed instructions for the step's supervisor",
- "depends_on": []
- }}
- ]
-}}
-
-Rules:
-- Steps with no dependencies (empty depends_on array) will run in parallel.
-- Steps that depend on other steps will wait until those complete.
-- The depends_on array contains names of steps this step depends on.
-- Each step should be a self-contained unit of work.
-- Be specific in task_plan — include file paths, function names, and acceptance criteria where possible.
-- Keep the number of steps reasonable (3-10 typically).
-
-Submit your plan by piping the JSON to stdin:
- echo '<your_json_plan>' | makima directive submit-plan --directive-id {directive_id}
-
-After submitting the plan, mark the contract as complete:
- makima supervisor complete"#,
- title = directive.title,
- goal = directive.goal,
- requirements = serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(),
- acceptance_criteria = serde_json::to_string_pretty(&directive.acceptance_criteria).unwrap_or_default(),
- constraints = serde_json::to_string_pretty(&directive.constraints).unwrap_or_default(),
- directive_id = directive.id,
- )
-}
-
-/// Extract JSON from file body elements.
-fn extract_plan_json(body: &[crate::db::models::BodyElement]) -> Option<String> {
- use crate::db::models::BodyElement;
-
- for element in body {
- match element {
- BodyElement::Code { content, .. } => {
- // Try to parse as JSON
- let trimmed = content.trim();
- if trimmed.starts_with('{') || trimmed.starts_with('[') {
- if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
- return Some(trimmed.to_string());
- }
- }
- }
- BodyElement::Paragraph { text } => {
- let trimmed = text.trim();
- if trimmed.starts_with('{') || trimmed.starts_with('[') {
- if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
- return Some(trimmed.to_string());
- }
- }
- }
- BodyElement::Markdown { content } => {
- // Try to find JSON in markdown content
- let trimmed = content.trim();
- if trimmed.starts_with('{') || trimmed.starts_with('[') {
- if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
- return Some(trimmed.to_string());
- }
- }
- // Try to find JSON in code blocks within markdown
- if let Some(json_start) = trimmed.find("```json") {
- let after = &trimmed[json_start + 7..];
- if let Some(json_end) = after.find("```") {
- let json_str = after[..json_end].trim();
- if serde_json::from_str::<serde_json::Value>(json_str).is_ok() {
- return Some(json_str.to_string());
- }
- }
- }
- }
- _ => {}
- }
- }
-
- // Fallback: concatenate all text content and try to find JSON
- let all_text: String = body
- .iter()
- .map(|el| match el {
- BodyElement::Code { content, .. } => content.clone(),
- BodyElement::Paragraph { text } => text.clone(),
- BodyElement::Markdown { content } => content.clone(),
- _ => String::new(),
- })
- .collect::<Vec<_>>()
- .join("\n");
-
- let trimmed = all_text.trim();
- if let Some(start) = trimmed.find('{') {
- // Find matching closing brace
- let substr = &trimmed[start..];
- if serde_json::from_str::<serde_json::Value>(substr).is_ok() {
- return Some(substr.to_string());
- }
- }
-
- None
-}
-
-/// Dispatch a monitoring contract to evaluate a completed step.
-async fn dispatch_monitoring(
- pool: &PgPool,
- state: &SharedState,
- directive: &Directive,
- step: &ChainStep,
- step_contract: &crate::db::models::Contract,
- owner_id: Uuid,
-) -> Result<(), String> {
- // Create monitoring contract
- let contract = repository::create_contract_for_owner(
- pool,
- owner_id,
- CreateContractRequest {
- name: format!("{} - Monitor", step.name),
- description: Some(format!("Monitoring evaluation for step: {}", step.name)),
- contract_type: Some("monitoring".to_string()),
- template_id: None,
- initial_phase: Some("plan".to_string()),
- autonomous_loop: Some(true),
- phase_guard: None,
- local_only: Some(true),
- auto_merge_local: None,
- },
- )
- .await
- .map_err(|e| format!("Failed to create monitoring contract: {}", e))?;
-
- // Mark contract as directive-related (not orchestrator)
- repository::set_contract_directive_fields(pool, contract.id, Some(directive.id), false)
- .await
- .map_err(|e| format!("Failed to set monitoring contract directive fields: {}", e))?;
-
- // Build evaluation prompt
- let prompt = build_monitoring_prompt(directive, step, step_contract);
-
- // Create monitoring task (NOT a supervisor — regular task that exits when done,
- // which triggers on_task_completed → on_monitoring_completed automatically)
- let supervisor_task = repository::create_task_for_owner(
- pool,
- owner_id,
- CreateTaskRequest {
- contract_id: Some(contract.id),
- name: format!("{} - Evaluator", step.name),
- description: Some("Evaluate step output against directive criteria".to_string()),
- plan: prompt,
- parent_task_id: None,
- is_supervisor: false,
- priority: 8,
- repository_url: directive.repository_url.clone(),
- base_branch: directive.base_branch.clone(),
- target_branch: None,
- merge_mode: None,
- target_repo_path: directive.local_path.clone(),
- completion_action: None,
- continue_from_task_id: None,
- copy_files: None,
- checkpoint_sha: None,
- branched_from_task_id: None,
- conversation_history: None,
- supervisor_worktree_task_id: None,
- },
- )
- .await
- .map_err(|e| format!("Failed to create monitoring task: {}", e))?;
-
- // Link monitoring task to contract
- repository::update_contract_for_owner(
- pool,
- contract.id,
- owner_id,
- UpdateContractRequest {
- supervisor_task_id: Some(supervisor_task.id),
- ..Default::default()
- },
- )
- .await
- .map_err(|e| match e {
- crate::db::repository::RepositoryError::Database(e) => {
- format!("Failed to link task to monitoring contract: {}", e)
- }
- other => format!("Failed to link task to monitoring contract: {:?}", other),
- })?;
-
- // Link step to monitoring contract/task
- repository::update_step_monitoring_contract(pool, step.id, contract.id, supervisor_task.id)
- .await
- .map_err(|e| format!("Failed to update step monitoring contract link: {}", e))?;
-
- // Copy repo config from directive to monitoring contract
- if let Some(ref repo_url) = directive.repository_url {
- let _ = repository::add_remote_repository(
- pool,
- contract.id,
- "directive-repo",
- repo_url,
- true,
- )
- .await;
- } else if let Some(ref local_path) = directive.local_path {
- let _ = repository::add_local_repository(
- pool,
- contract.id,
- "directive-repo",
- local_path,
- true,
- )
- .await;
- }
-
- tracing::info!(
- directive_id = %directive.id,
- step_id = %step.id,
- step_name = %step.name,
- monitoring_contract_id = %contract.id,
- monitoring_task_id = %supervisor_task.id,
- "Monitoring evaluation dispatched"
- );
-
- // Dispatch monitoring task to an available daemon immediately
- dispatch_task_to_daemon(
- pool, state, &supervisor_task,
- contract.local_only, contract.auto_merge_local,
- owner_id,
- ).await?;
-
- Ok(())
-}
-
-/// Build the monitoring supervisor prompt.
-fn build_monitoring_prompt(
- directive: &Directive,
- step: &ChainStep,
- step_contract: &crate::db::models::Contract,
-) -> String {
- format!(
- r#"You are evaluating the output of a completed step in a directive chain.
-
-DIRECTIVE: {title}
-GOAL: {goal}
-REQUIREMENTS: {requirements}
-ACCEPTANCE CRITERIA: {acceptance_criteria}
-CONSTRAINTS: {constraints}
-
-STEP: {step_name}
-STEP DESCRIPTION: {step_description}
-STEP TASK PLAN: {task_plan}
-STEP CONTRACT ID: {step_contract_id}
-
-CONFIDENCE THRESHOLDS:
-- Green (pass): >= {threshold_green}
-- Yellow (marginal): >= {threshold_yellow}
-- Red (fail): < {threshold_yellow}
-
-INSTRUCTIONS:
-1. Read the step contract's files to understand what was delivered:
- makima contract files --contract-id {step_contract_id}
- makima contract file <file_id> --contract-id {step_contract_id}
-
-2. Evaluate whether the step's output meets the directive's requirements and the step's task plan.
-
-3. Write your evaluation as a JSON file to this monitoring contract. Create a file called
- evaluation.json with the JSON content first, then upload it:
-
- cat > /tmp/eval-result.json << 'EVALEOF'
- {{
- "passed": true,
- "overallScore": 0.85,
- "confidenceLevel": "green",
- "criteriaResults": [
- {{
- "criterion": "Example criterion",
- "passed": true,
- "score": 0.9,
- "evidence": "What was found"
- }}
- ],
- "summaryFeedback": "Summary of evaluation",
- "reworkInstructions": null
- }}
- EVALEOF
- makima contract create-file evaluation-result < /tmp/eval-result.json
-
- Replace the example values with your actual evaluation results.
-
-Scoring guidelines:
-- overallScore >= {threshold_green}: confidenceLevel = "green", passed = true
-- overallScore >= {threshold_yellow} and < {threshold_green}: confidenceLevel = "yellow", use judgment
-- overallScore < {threshold_yellow}: confidenceLevel = "red", passed = false
-- Be specific in reworkInstructions if the step fails — the step will be re-executed with these instructions.
-- Set reworkInstructions to null if the step passed.
-
-You are done after writing the evaluation file."#,
- title = directive.title,
- goal = directive.goal,
- requirements = serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(),
- acceptance_criteria = serde_json::to_string_pretty(&directive.acceptance_criteria).unwrap_or_default(),
- constraints = serde_json::to_string_pretty(&directive.constraints).unwrap_or_default(),
- step_name = step.name,
- step_description = step.description.as_deref().unwrap_or("N/A"),
- task_plan = step.task_plan.as_deref().unwrap_or("N/A"),
- step_contract_id = step_contract.id,
- threshold_green = directive.confidence_threshold_green,
- threshold_yellow = directive.confidence_threshold_yellow,
- )
-}
-
-/// Handle monitoring contract task completion — parse evaluation and decide step outcome.
-async fn on_monitoring_completed(
- pool: &PgPool,
- state: &SharedState,
- contract: &crate::db::models::Contract,
- step: &ChainStep,
- task: &Task,
- owner_id: Uuid,
-) -> Result<(), String> {
- let Some(directive_id) = contract.directive_id else {
- return Ok(());
- };
-
- let directive = repository::get_directive(pool, directive_id)
- .await
- .map_err(|e| format!("Failed to get directive: {}", e))?
- .ok_or("Directive not found")?;
-
- // If monitoring task itself failed, fail-open: mark step as passed
- if task.status == "failed" {
- tracing::warn!(
- directive_id = %directive_id,
- step_id = %step.id,
- "Monitoring task failed, fail-open: marking step as passed"
- );
-
- repository::update_step_status(pool, step.id, "passed")
- .await
- .map_err(|e| format!("Failed to update step status: {}", e))?;
-
- let _ = repository::increment_chain_completed_steps(pool, step.chain_id).await;
-
- let _ = repository::create_directive_event(
- pool,
- directive_id,
- directive.current_chain_id,
- Some(step.id),
- "monitoring_failed_open",
- "warn",
- None,
- "system",
- None,
- )
- .await;
-
- return advance_chain(pool, state, &directive, owner_id).await;
- }
-
- if task.status != "done" {
- return Ok(());
- }
-
- process_monitoring_result(pool, state, contract, step, owner_id).await
-}
-
-/// Core monitoring logic: read evaluation from files, create record, handle pass/fail/rework.
-/// Called from both `on_monitoring_completed` (task path) and `on_contract_completed` (API path).
-async fn process_monitoring_result(
- pool: &PgPool,
- state: &SharedState,
- contract: &crate::db::models::Contract,
- step: &ChainStep,
- owner_id: Uuid,
-) -> Result<(), String> {
- let Some(directive_id) = contract.directive_id else {
- return Ok(());
- };
-
- // Idempotency guard: re-fetch step and only process if still "evaluating".
- let current_step = repository::get_chain_step(pool, step.id)
- .await
- .map_err(|e| format!("Failed to re-fetch step: {}", e))?;
- if let Some(ref s) = current_step {
- if s.status != "evaluating" {
- tracing::info!(
- step_id = %step.id,
- status = %s.status,
- "Skipping process_monitoring_result: step no longer in evaluating status"
- );
- return Ok(());
- }
- }
-
- let directive = repository::get_directive(pool, directive_id)
- .await
- .map_err(|e| format!("Failed to get directive: {}", e))?
- .ok_or("Directive not found")?;
-
- // Read evaluation result from monitoring contract files
- let files = repository::list_files_in_contract(pool, contract.id, owner_id)
- .await
- .map_err(|e| format!("Failed to list monitoring contract files: {}", e))?;
-
- let eval_file = files.iter().find(|f| {
- let name_lower = f.name.to_lowercase();
- name_lower.contains("evaluation") || name_lower.contains("eval")
- });
-
- let eval_file = eval_file.or_else(|| files.first());
-
- let monitoring_result = if let Some(eval_file) = eval_file {
- let full_file = repository::get_file(pool, eval_file.id)
- .await
- .map_err(|e| format!("Failed to get evaluation file: {}", e))?;
-
- if let Some(full_file) = full_file {
- let json_str = extract_plan_json(&full_file.body);
- json_str.and_then(|s| serde_json::from_str::<MonitoringResult>(&s).ok())
- } else {
- None
- }
- } else {
- None
- };
-
- // If we couldn't parse the result, fail-open
- let Some(result) = monitoring_result else {
- tracing::warn!(
- directive_id = %directive_id,
- step_id = %step.id,
- "Could not parse monitoring result, fail-open: marking step as passed"
- );
-
- repository::update_step_status(pool, step.id, "passed")
- .await
- .map_err(|e| format!("Failed to update step status: {}", e))?;
-
- let _ = repository::increment_chain_completed_steps(pool, step.chain_id).await;
-
- let _ = repository::create_directive_event(
- pool,
- directive_id,
- directive.current_chain_id,
- Some(step.id),
- "monitoring_parse_failed_open",
- "warn",
- None,
- "system",
- None,
- )
- .await;
-
- return advance_chain(pool, state, &directive, owner_id).await;
- };
-
- // Create evaluation record
- let chain_id = directive.current_chain_id.unwrap_or(step.chain_id);
- let evaluation = repository::create_directive_evaluation(
- pool,
- directive_id,
- chain_id,
- step.id,
- contract.id,
- "monitoring",
- Some("automated"),
- result.passed,
- result.overall_score,
- result.confidence_level.as_deref(),
- &result.criteria_results,
- &result.summary_feedback,
- result.rework_instructions.as_deref(),
- )
- .await
- .map_err(|e| format!("Failed to create directive evaluation: {}", e))?;
-
- // Update step evaluation fields
- repository::update_step_evaluation_fields(
- pool,
- step.id,
- result.overall_score,
- result.confidence_level.as_deref(),
- evaluation.id,
- )
- .await
- .map_err(|e| format!("Failed to update step evaluation fields: {}", e))?;
-
- // Create event
- let event_data = serde_json::json!({
- "passed": result.passed,
- "overallScore": result.overall_score,
- "confidenceLevel": result.confidence_level,
- "summaryFeedback": result.summary_feedback,
- });
- let _ = repository::create_directive_event(
- pool,
- directive_id,
- Some(chain_id),
- Some(step.id),
- if result.passed { "step_evaluation_passed" } else { "step_evaluation_failed" },
- "info",
- Some(&event_data),
- "system",
- None,
- )
- .await;
-
- if result.passed {
- // Evaluation passed — mark step as passed
- tracing::info!(
- directive_id = %directive_id,
- step_id = %step.id,
- step_name = %step.name,
- score = ?result.overall_score,
- "Step evaluation passed"
- );
-
- repository::update_step_status(pool, step.id, "passed")
- .await
- .map_err(|e| format!("Failed to update step status: {}", e))?;
-
- let _ = repository::increment_chain_completed_steps(pool, step.chain_id).await;
-
- advance_chain(pool, state, &directive, owner_id).await
- } else {
- // Evaluation failed — check rework budget
- let max_rework = directive.max_rework_cycles.unwrap_or(3);
- if step.rework_count >= max_rework {
- tracing::warn!(
- directive_id = %directive_id,
- step_id = %step.id,
- step_name = %step.name,
- rework_count = step.rework_count,
- max_rework = max_rework,
- "Step evaluation failed, max rework cycles exceeded"
- );
-
- repository::update_step_status(pool, step.id, "failed")
- .await
- .map_err(|e| format!("Failed to update step status: {}", e))?;
-
- let _ = repository::increment_chain_failed_steps(pool, step.chain_id).await;
-
- advance_chain(pool, state, &directive, owner_id).await
- } else {
- tracing::info!(
- directive_id = %directive_id,
- step_id = %step.id,
- step_name = %step.name,
- rework_count = step.rework_count,
- "Step evaluation failed, scheduling rework"
- );
-
- repository::increment_step_rework_count(pool, step.id)
- .await
- .map_err(|e| format!("Failed to increment rework count: {}", e))?;
-
- // Set step back to pending so advance_chain re-dispatches it
- repository::update_step_status(pool, step.id, "pending")
- .await
- .map_err(|e| format!("Failed to update step status: {}", e))?;
-
- advance_chain(pool, state, &directive, owner_id).await
- }
- }
-}
-
-/// Trigger a manual evaluation for a step. Public for use by handlers.
-pub async fn trigger_manual_evaluation(
- pool: &PgPool,
- state: &SharedState,
- owner_id: Uuid,
- directive_id: Uuid,
- step_id: Uuid,
-) -> Result<ChainStep, String> {
- let directive = repository::get_directive_for_owner(pool, directive_id, owner_id)
- .await
- .map_err(|e| format!("Failed to get directive: {}", e))?
- .ok_or("Directive not found")?;
-
- // Get the step — find via chain steps
- let chain_id = directive.current_chain_id.ok_or("Directive has no active chain")?;
- let steps = repository::list_steps_for_chain(pool, chain_id)
- .await
- .map_err(|e| format!("Failed to list steps: {}", e))?;
-
- let step = steps
- .into_iter()
- .find(|s| s.id == step_id)
- .ok_or("Step not found in current chain")?;
-
- // Step must have a contract_id (must have been executed)
- let contract_id = step.contract_id.ok_or("Step has no contract — it hasn't been executed yet")?;
-
- let contract = repository::get_contract_for_owner(pool, contract_id, owner_id)
- .await
- .map_err(|e| format!("Failed to get step contract: {}", e))?
- .ok_or("Step contract not found")?;
-
- // Set step to evaluating
- let updated_step = repository::update_step_status(pool, step.id, "evaluating")
- .await
- .map_err(|e| format!("Failed to update step status: {}", e))?
- .ok_or("Step not found after status update")?;
-
- let _ = repository::create_directive_event(
- pool,
- directive.id,
- directive.current_chain_id,
- Some(step.id),
- "manual_evaluation_triggered",
- "info",
- None,
- "user",
- None,
- )
- .await;
-
- dispatch_monitoring(pool, state, &directive, &step, &contract, owner_id).await?;
-
- Ok(updated_step)
-}
diff --git a/makima/src/orchestration/mod.rs b/makima/src/orchestration/mod.rs
index e7ffb70..8b13789 100644
--- a/makima/src/orchestration/mod.rs
+++ b/makima/src/orchestration/mod.rs
@@ -1 +1 @@
-pub mod directive;
+