summaryrefslogtreecommitdiff
path: root/makima/src/orchestration/directive.rs
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/orchestration/directive.rs')
-rw-r--r--makima/src/orchestration/directive.rs562
1 files changed, 538 insertions, 24 deletions
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs
index d17deeb..e779c18 100644
--- a/makima/src/orchestration/directive.rs
+++ b/makima/src/orchestration/directive.rs
@@ -4,8 +4,9 @@ use serde::Deserialize;
use sqlx::PgPool;
use uuid::Uuid;
+use serde::Serialize;
use crate::db::models::{
- CreateContractRequest, CreateTaskRequest, Directive, Task, UpdateContractRequest,
+ ChainStep, CreateContractRequest, CreateTaskRequest, Directive, Task, UpdateContractRequest,
};
use crate::db::repository;
use crate::server::state::SharedState;
@@ -26,6 +27,20 @@ struct ChainPlan {
steps: Vec<ChainPlanStep>,
}
+/// Result written by the monitoring supervisor after evaluating a step.
+#[derive(Debug, Deserialize, Serialize)]
+#[serde(rename_all = "camelCase")]
+struct MonitoringResult {
+ passed: bool,
+ overall_score: Option<f64>,
+ confidence_level: Option<String>,
+ #[serde(default)]
+ criteria_results: serde_json::Value,
+ #[serde(default)]
+ summary_feedback: String,
+ rework_instructions: Option<String>,
+}
+
/// Initialize a directive: create a planning contract and transition to "planning".
pub async fn init_directive(
pool: &PgPool,
@@ -195,8 +210,18 @@ pub async fn on_task_completed(
on_planning_completed(pool, state, &directive, task, owner_id).await?;
}
} else if contract.directive_id.is_some() {
- // This is a step contract completion
- on_step_completed(pool, state, &contract, task, owner_id).await?;
+ // Check if this is a monitoring contract completion
+ let monitoring_step =
+ repository::get_step_by_monitoring_contract_id(pool, contract_id)
+ .await
+ .map_err(|e| format!("Failed to check monitoring contract: {}", e))?;
+
+ if let Some(step) = monitoring_step {
+ on_monitoring_completed(pool, state, &contract, &step, task, owner_id).await?;
+ } else {
+ // This is a step contract completion
+ on_step_completed(pool, state, &contract, task, owner_id).await?;
+ }
}
Ok(())
@@ -403,32 +428,54 @@ async fn on_step_completed(
return Ok(());
};
- // Update step status based on task outcome
- let new_status = if task.status == "done" {
- "passed"
- } else {
- "failed"
- };
-
- repository::update_step_status(pool, step.id, new_status)
- .await
- .map_err(|e| format!("Failed to update step status: {}", e))?;
-
- tracing::info!(
- directive_id = %directive_id,
- step_id = %step.id,
- step_name = %step.name,
- new_status = new_status,
- "Step completed"
- );
-
- // Get the directive and advance
+ // Get the directive for threshold info
let directive = repository::get_directive(pool, directive_id)
.await
.map_err(|e| format!("Failed to get directive: {}", e))?
.ok_or("Directive not found")?;
- advance_chain(pool, state, &directive, owner_id).await
+ if task.status == "done" {
+ // Step task succeeded — dispatch monitoring evaluation
+ repository::update_step_status(pool, step.id, "evaluating")
+ .await
+ .map_err(|e| format!("Failed to update step status: {}", e))?;
+
+ let _ = repository::create_directive_event(
+ pool,
+ directive.id,
+ directive.current_chain_id,
+ Some(step.id),
+ "step_evaluating",
+ "info",
+ None,
+ "system",
+ None,
+ )
+ .await;
+
+ tracing::info!(
+ directive_id = %directive_id,
+ step_id = %step.id,
+ step_name = %step.name,
+ "Step task done, dispatching monitoring evaluation"
+ );
+
+ dispatch_monitoring(pool, &directive, &step, contract, owner_id).await
+ } else {
+ // Step task failed — mark step failed and advance
+ repository::update_step_status(pool, step.id, "failed")
+ .await
+ .map_err(|e| format!("Failed to update step status: {}", e))?;
+
+ tracing::info!(
+ directive_id = %directive_id,
+ step_id = %step.id,
+ step_name = %step.name,
+ "Step failed"
+ );
+
+ advance_chain(pool, state, &directive, owner_id).await
+ }
}
/// Check chain progress and dispatch ready steps or mark directive complete.
@@ -734,3 +781,470 @@ fn extract_plan_json(body: &[crate::db::models::BodyElement]) -> Option<String>
None
}
+
+/// Dispatch a monitoring contract to evaluate a completed step.
+async fn dispatch_monitoring(
+ pool: &PgPool,
+ directive: &Directive,
+ step: &ChainStep,
+ step_contract: &crate::db::models::Contract,
+ owner_id: Uuid,
+) -> Result<(), String> {
+ // Create monitoring contract
+ let contract = repository::create_contract_for_owner(
+ pool,
+ owner_id,
+ CreateContractRequest {
+ name: format!("{} - Monitor", step.name),
+ description: Some(format!("Monitoring evaluation for step: {}", step.name)),
+ contract_type: Some("monitoring".to_string()),
+ template_id: None,
+ initial_phase: Some("plan".to_string()),
+ autonomous_loop: Some(true),
+ phase_guard: None,
+ local_only: Some(true),
+ auto_merge_local: None,
+ },
+ )
+ .await
+ .map_err(|e| format!("Failed to create monitoring contract: {}", e))?;
+
+ // Mark contract as directive-related (not orchestrator)
+ repository::set_contract_directive_fields(pool, contract.id, Some(directive.id), false)
+ .await
+ .map_err(|e| format!("Failed to set monitoring contract directive fields: {}", e))?;
+
+ // Build evaluation prompt
+ let prompt = build_monitoring_prompt(directive, step, step_contract);
+
+ // Create supervisor task
+ let supervisor_task = repository::create_task_for_owner(
+ pool,
+ owner_id,
+ CreateTaskRequest {
+ contract_id: Some(contract.id),
+ name: format!("{} - Evaluator", step.name),
+ description: Some("Evaluate step output against directive criteria".to_string()),
+ plan: prompt,
+ parent_task_id: None,
+ is_supervisor: true,
+ priority: 8,
+ repository_url: directive.repository_url.clone(),
+ base_branch: directive.base_branch.clone(),
+ target_branch: None,
+ merge_mode: None,
+ target_repo_path: directive.local_path.clone(),
+ completion_action: None,
+ continue_from_task_id: None,
+ copy_files: None,
+ checkpoint_sha: None,
+ branched_from_task_id: None,
+ conversation_history: None,
+ supervisor_worktree_task_id: None,
+ },
+ )
+ .await
+ .map_err(|e| format!("Failed to create monitoring supervisor task: {}", e))?;
+
+ // Link supervisor to contract
+ repository::update_contract_for_owner(
+ pool,
+ contract.id,
+ owner_id,
+ UpdateContractRequest {
+ supervisor_task_id: Some(supervisor_task.id),
+ ..Default::default()
+ },
+ )
+ .await
+ .map_err(|e| match e {
+ crate::db::repository::RepositoryError::Database(e) => {
+ format!("Failed to link supervisor to monitoring contract: {}", e)
+ }
+ other => format!("Failed to link supervisor to monitoring contract: {:?}", other),
+ })?;
+
+ // Link step to monitoring contract/task
+ repository::update_step_monitoring_contract(pool, step.id, contract.id, supervisor_task.id)
+ .await
+ .map_err(|e| format!("Failed to update step monitoring contract link: {}", e))?;
+
+ // Copy repo config from directive to monitoring contract
+ if let Some(ref repo_url) = directive.repository_url {
+ let _ = repository::add_remote_repository(
+ pool,
+ contract.id,
+ "directive-repo",
+ repo_url,
+ true,
+ )
+ .await;
+ } else if let Some(ref local_path) = directive.local_path {
+ let _ = repository::add_local_repository(
+ pool,
+ contract.id,
+ "directive-repo",
+ local_path,
+ true,
+ )
+ .await;
+ }
+
+ tracing::info!(
+ directive_id = %directive.id,
+ step_id = %step.id,
+ step_name = %step.name,
+ monitoring_contract_id = %contract.id,
+ monitoring_task_id = %supervisor_task.id,
+ "Monitoring evaluation dispatched"
+ );
+
+ Ok(())
+}
+
+/// Build the monitoring supervisor prompt.
+fn build_monitoring_prompt(
+ directive: &Directive,
+ step: &ChainStep,
+ step_contract: &crate::db::models::Contract,
+) -> String {
+ format!(
+ r#"You are evaluating the output of a completed step in a directive chain.
+
+DIRECTIVE: {title}
+GOAL: {goal}
+REQUIREMENTS: {requirements}
+ACCEPTANCE CRITERIA: {acceptance_criteria}
+CONSTRAINTS: {constraints}
+
+STEP: {step_name}
+STEP DESCRIPTION: {step_description}
+STEP TASK PLAN: {task_plan}
+STEP CONTRACT ID: {step_contract_id}
+
+CONFIDENCE THRESHOLDS:
+- Green (pass): >= {threshold_green}
+- Yellow (marginal): >= {threshold_yellow}
+- Red (fail): < {threshold_yellow}
+
+Your job:
+1. Read the step contract's files to understand what was delivered:
+ makima contract files --contract-id {step_contract_id}
+ makima contract file <file_id> --contract-id {step_contract_id}
+
+2. Evaluate whether the step's output meets the directive's requirements and the step's specific task plan.
+
+3. Write your evaluation result as a JSON file named "evaluation-result" to this contract:
+ makima contract create-file "evaluation-result" < evaluation.json
+
+The JSON format:
+{{
+ "passed": true/false,
+ "overallScore": 0.0-1.0,
+ "confidenceLevel": "green" | "yellow" | "red",
+ "criteriaResults": [
+ {{
+ "criterion": "Description of what was checked",
+ "passed": true/false,
+ "score": 0.0-1.0,
+ "evidence": "Evidence supporting the assessment"
+ }}
+ ],
+ "summaryFeedback": "Brief summary of the evaluation",
+ "reworkInstructions": "If failed, specific instructions for rework (null if passed)"
+}}
+
+Scoring guidelines:
+- Score >= {threshold_green}: confidenceLevel = "green", passed = true
+- Score >= {threshold_yellow} and < {threshold_green}: confidenceLevel = "yellow", use judgment on passed
+- Score < {threshold_yellow}: confidenceLevel = "red", passed = false
+- Be specific in reworkInstructions if the step fails — the step will be re-executed with these instructions.
+
+After writing the evaluation file, mark the contract as complete:
+ makima supervisor complete"#,
+ title = directive.title,
+ goal = directive.goal,
+ requirements = serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(),
+ acceptance_criteria = serde_json::to_string_pretty(&directive.acceptance_criteria).unwrap_or_default(),
+ constraints = serde_json::to_string_pretty(&directive.constraints).unwrap_or_default(),
+ step_name = step.name,
+ step_description = step.description.as_deref().unwrap_or("N/A"),
+ task_plan = step.task_plan.as_deref().unwrap_or("N/A"),
+ step_contract_id = step_contract.id,
+ threshold_green = directive.confidence_threshold_green,
+ threshold_yellow = directive.confidence_threshold_yellow,
+ )
+}
+
+/// Handle monitoring contract task completion — parse evaluation and decide step outcome.
+async fn on_monitoring_completed(
+ pool: &PgPool,
+ state: &SharedState,
+ contract: &crate::db::models::Contract,
+ step: &ChainStep,
+ task: &Task,
+ owner_id: Uuid,
+) -> Result<(), String> {
+ // Only process supervisor task completions
+ if !task.is_supervisor {
+ return Ok(());
+ }
+
+ let Some(directive_id) = contract.directive_id else {
+ return Ok(());
+ };
+
+ let directive = repository::get_directive(pool, directive_id)
+ .await
+ .map_err(|e| format!("Failed to get directive: {}", e))?
+ .ok_or("Directive not found")?;
+
+ // If monitoring task itself failed, fail-open: mark step as passed
+ if task.status == "failed" {
+ tracing::warn!(
+ directive_id = %directive_id,
+ step_id = %step.id,
+ "Monitoring task failed, fail-open: marking step as passed"
+ );
+
+ repository::update_step_status(pool, step.id, "passed")
+ .await
+ .map_err(|e| format!("Failed to update step status: {}", e))?;
+
+ let _ = repository::create_directive_event(
+ pool,
+ directive_id,
+ directive.current_chain_id,
+ Some(step.id),
+ "monitoring_failed_open",
+ "warn",
+ None,
+ "system",
+ None,
+ )
+ .await;
+
+ return advance_chain(pool, state, &directive, owner_id).await;
+ }
+
+ if task.status != "done" {
+ return Ok(());
+ }
+
+ // Read evaluation result from monitoring contract files
+ let files = repository::list_files_in_contract(pool, contract.id, owner_id)
+ .await
+ .map_err(|e| format!("Failed to list monitoring contract files: {}", e))?;
+
+ let eval_file = files.iter().find(|f| {
+ let name_lower = f.name.to_lowercase();
+ name_lower.contains("evaluation") || name_lower.contains("eval")
+ });
+
+ let eval_file = eval_file.or_else(|| files.first());
+
+ let monitoring_result = if let Some(eval_file) = eval_file {
+ let full_file = repository::get_file(pool, eval_file.id)
+ .await
+ .map_err(|e| format!("Failed to get evaluation file: {}", e))?;
+
+ if let Some(full_file) = full_file {
+ let json_str = extract_plan_json(&full_file.body);
+ json_str.and_then(|s| serde_json::from_str::<MonitoringResult>(&s).ok())
+ } else {
+ None
+ }
+ } else {
+ None
+ };
+
+ // If we couldn't parse the result, fail-open
+ let Some(result) = monitoring_result else {
+ tracing::warn!(
+ directive_id = %directive_id,
+ step_id = %step.id,
+ "Could not parse monitoring result, fail-open: marking step as passed"
+ );
+
+ repository::update_step_status(pool, step.id, "passed")
+ .await
+ .map_err(|e| format!("Failed to update step status: {}", e))?;
+
+ let _ = repository::create_directive_event(
+ pool,
+ directive_id,
+ directive.current_chain_id,
+ Some(step.id),
+ "monitoring_parse_failed_open",
+ "warn",
+ None,
+ "system",
+ None,
+ )
+ .await;
+
+ return advance_chain(pool, state, &directive, owner_id).await;
+ };
+
+ // Create evaluation record
+ let chain_id = directive.current_chain_id.unwrap_or(step.chain_id);
+ let evaluation = repository::create_directive_evaluation(
+ pool,
+ directive_id,
+ chain_id,
+ step.id,
+ contract.id,
+ "monitoring",
+ Some("automated"),
+ result.passed,
+ result.overall_score,
+ result.confidence_level.as_deref(),
+ &result.criteria_results,
+ &result.summary_feedback,
+ result.rework_instructions.as_deref(),
+ )
+ .await
+ .map_err(|e| format!("Failed to create directive evaluation: {}", e))?;
+
+ // Update step evaluation fields
+ repository::update_step_evaluation_fields(
+ pool,
+ step.id,
+ result.overall_score,
+ result.confidence_level.as_deref(),
+ evaluation.id,
+ )
+ .await
+ .map_err(|e| format!("Failed to update step evaluation fields: {}", e))?;
+
+ // Create event
+ let event_data = serde_json::json!({
+ "passed": result.passed,
+ "overallScore": result.overall_score,
+ "confidenceLevel": result.confidence_level,
+ "summaryFeedback": result.summary_feedback,
+ });
+ let _ = repository::create_directive_event(
+ pool,
+ directive_id,
+ Some(chain_id),
+ Some(step.id),
+ if result.passed { "step_evaluation_passed" } else { "step_evaluation_failed" },
+ "info",
+ Some(&event_data),
+ "system",
+ None,
+ )
+ .await;
+
+ if result.passed {
+ // Evaluation passed — mark step as passed
+ tracing::info!(
+ directive_id = %directive_id,
+ step_id = %step.id,
+ step_name = %step.name,
+ score = ?result.overall_score,
+ "Step evaluation passed"
+ );
+
+ repository::update_step_status(pool, step.id, "passed")
+ .await
+ .map_err(|e| format!("Failed to update step status: {}", e))?;
+
+ advance_chain(pool, state, &directive, owner_id).await
+ } else {
+ // Evaluation failed — check rework budget
+ let max_rework = directive.max_rework_cycles.unwrap_or(3);
+ if step.rework_count >= max_rework {
+ tracing::warn!(
+ directive_id = %directive_id,
+ step_id = %step.id,
+ step_name = %step.name,
+ rework_count = step.rework_count,
+ max_rework = max_rework,
+ "Step evaluation failed, max rework cycles exceeded"
+ );
+
+ repository::update_step_status(pool, step.id, "failed")
+ .await
+ .map_err(|e| format!("Failed to update step status: {}", e))?;
+
+ advance_chain(pool, state, &directive, owner_id).await
+ } else {
+ tracing::info!(
+ directive_id = %directive_id,
+ step_id = %step.id,
+ step_name = %step.name,
+ rework_count = step.rework_count,
+ "Step evaluation failed, scheduling rework"
+ );
+
+ repository::increment_step_rework_count(pool, step.id)
+ .await
+ .map_err(|e| format!("Failed to increment rework count: {}", e))?;
+
+ // Set step back to pending so advance_chain re-dispatches it
+ repository::update_step_status(pool, step.id, "pending")
+ .await
+ .map_err(|e| format!("Failed to update step status: {}", e))?;
+
+ advance_chain(pool, state, &directive, owner_id).await
+ }
+ }
+}
+
+/// Trigger a manual evaluation for a step. Public for use by handlers.
+pub async fn trigger_manual_evaluation(
+ pool: &PgPool,
+ _state: &SharedState,
+ owner_id: Uuid,
+ directive_id: Uuid,
+ step_id: Uuid,
+) -> Result<ChainStep, String> {
+ let directive = repository::get_directive_for_owner(pool, directive_id, owner_id)
+ .await
+ .map_err(|e| format!("Failed to get directive: {}", e))?
+ .ok_or("Directive not found")?;
+
+ // Get the step — find via chain steps
+ let chain_id = directive.current_chain_id.ok_or("Directive has no active chain")?;
+ let steps = repository::list_steps_for_chain(pool, chain_id)
+ .await
+ .map_err(|e| format!("Failed to list steps: {}", e))?;
+
+ let step = steps
+ .into_iter()
+ .find(|s| s.id == step_id)
+ .ok_or("Step not found in current chain")?;
+
+ // Step must have a contract_id (must have been executed)
+ let contract_id = step.contract_id.ok_or("Step has no contract — it hasn't been executed yet")?;
+
+ let contract = repository::get_contract_for_owner(pool, contract_id, owner_id)
+ .await
+ .map_err(|e| format!("Failed to get step contract: {}", e))?
+ .ok_or("Step contract not found")?;
+
+ // Set step to evaluating
+ let updated_step = repository::update_step_status(pool, step.id, "evaluating")
+ .await
+ .map_err(|e| format!("Failed to update step status: {}", e))?
+ .ok_or("Step not found after status update")?;
+
+ let _ = repository::create_directive_event(
+ pool,
+ directive.id,
+ directive.current_chain_id,
+ Some(step.id),
+ "manual_evaluation_triggered",
+ "info",
+ None,
+ "user",
+ None,
+ )
+ .await;
+
+ dispatch_monitoring(pool, &directive, &step, &contract, owner_id).await?;
+
+ Ok(updated_step)
+}