diff options
| author | soryu <soryu@soryu.co> | 2026-02-07 18:27:54 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-02-07 18:27:54 +0000 |
| commit | 97e21c8296ec5f91912d56980ebf3b18a1ca3507 (patch) | |
| tree | 3650e2eb62ab5b387006563ce64139aa7688da5f /makima/src/orchestration | |
| parent | 8f757f561eeb397aaea70d7c10d41445cc5e50b5 (diff) | |
| download | soryu-97e21c8296ec5f91912d56980ebf3b18a1ca3507.tar.gz soryu-97e21c8296ec5f91912d56980ebf3b18a1ca3507.zip | |
Add directive monitor contracts
Diffstat (limited to 'makima/src/orchestration')
| -rw-r--r-- | makima/src/orchestration/directive.rs | 562 |
1 files changed, 538 insertions, 24 deletions
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs index d17deeb..e779c18 100644 --- a/makima/src/orchestration/directive.rs +++ b/makima/src/orchestration/directive.rs @@ -4,8 +4,9 @@ use serde::Deserialize; use sqlx::PgPool; use uuid::Uuid; +use serde::Serialize; use crate::db::models::{ - CreateContractRequest, CreateTaskRequest, Directive, Task, UpdateContractRequest, + ChainStep, CreateContractRequest, CreateTaskRequest, Directive, Task, UpdateContractRequest, }; use crate::db::repository; use crate::server::state::SharedState; @@ -26,6 +27,20 @@ struct ChainPlan { steps: Vec<ChainPlanStep>, } +/// Result written by the monitoring supervisor after evaluating a step. +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +struct MonitoringResult { + passed: bool, + overall_score: Option<f64>, + confidence_level: Option<String>, + #[serde(default)] + criteria_results: serde_json::Value, + #[serde(default)] + summary_feedback: String, + rework_instructions: Option<String>, +} + /// Initialize a directive: create a planning contract and transition to "planning". pub async fn init_directive( pool: &PgPool, @@ -195,8 +210,18 @@ pub async fn on_task_completed( on_planning_completed(pool, state, &directive, task, owner_id).await?; } } else if contract.directive_id.is_some() { - // This is a step contract completion - on_step_completed(pool, state, &contract, task, owner_id).await?; + // Check if this is a monitoring contract completion + let monitoring_step = + repository::get_step_by_monitoring_contract_id(pool, contract_id) + .await + .map_err(|e| format!("Failed to check monitoring contract: {}", e))?; + + if let Some(step) = monitoring_step { + on_monitoring_completed(pool, state, &contract, &step, task, owner_id).await?; + } else { + // This is a step contract completion + on_step_completed(pool, state, &contract, task, owner_id).await?; + } } Ok(()) @@ -403,32 +428,54 @@ async fn on_step_completed( return Ok(()); }; - // Update step status based on task outcome - let new_status = if task.status == "done" { - "passed" - } else { - "failed" - }; - - repository::update_step_status(pool, step.id, new_status) - .await - .map_err(|e| format!("Failed to update step status: {}", e))?; - - tracing::info!( - directive_id = %directive_id, - step_id = %step.id, - step_name = %step.name, - new_status = new_status, - "Step completed" - ); - - // Get the directive and advance + // Get the directive for threshold info let directive = repository::get_directive(pool, directive_id) .await .map_err(|e| format!("Failed to get directive: {}", e))? .ok_or("Directive not found")?; - advance_chain(pool, state, &directive, owner_id).await + if task.status == "done" { + // Step task succeeded — dispatch monitoring evaluation + repository::update_step_status(pool, step.id, "evaluating") + .await + .map_err(|e| format!("Failed to update step status: {}", e))?; + + let _ = repository::create_directive_event( + pool, + directive.id, + directive.current_chain_id, + Some(step.id), + "step_evaluating", + "info", + None, + "system", + None, + ) + .await; + + tracing::info!( + directive_id = %directive_id, + step_id = %step.id, + step_name = %step.name, + "Step task done, dispatching monitoring evaluation" + ); + + dispatch_monitoring(pool, &directive, &step, contract, owner_id).await + } else { + // Step task failed — mark step failed and advance + repository::update_step_status(pool, step.id, "failed") + .await + .map_err(|e| format!("Failed to update step status: {}", e))?; + + tracing::info!( + directive_id = %directive_id, + step_id = %step.id, + step_name = %step.name, + "Step failed" + ); + + advance_chain(pool, state, &directive, owner_id).await + } } /// Check chain progress and dispatch ready steps or mark directive complete. @@ -734,3 +781,470 @@ fn extract_plan_json(body: &[crate::db::models::BodyElement]) -> Option<String> None } + +/// Dispatch a monitoring contract to evaluate a completed step. +async fn dispatch_monitoring( + pool: &PgPool, + directive: &Directive, + step: &ChainStep, + step_contract: &crate::db::models::Contract, + owner_id: Uuid, +) -> Result<(), String> { + // Create monitoring contract + let contract = repository::create_contract_for_owner( + pool, + owner_id, + CreateContractRequest { + name: format!("{} - Monitor", step.name), + description: Some(format!("Monitoring evaluation for step: {}", step.name)), + contract_type: Some("monitoring".to_string()), + template_id: None, + initial_phase: Some("plan".to_string()), + autonomous_loop: Some(true), + phase_guard: None, + local_only: Some(true), + auto_merge_local: None, + }, + ) + .await + .map_err(|e| format!("Failed to create monitoring contract: {}", e))?; + + // Mark contract as directive-related (not orchestrator) + repository::set_contract_directive_fields(pool, contract.id, Some(directive.id), false) + .await + .map_err(|e| format!("Failed to set monitoring contract directive fields: {}", e))?; + + // Build evaluation prompt + let prompt = build_monitoring_prompt(directive, step, step_contract); + + // Create supervisor task + let supervisor_task = repository::create_task_for_owner( + pool, + owner_id, + CreateTaskRequest { + contract_id: Some(contract.id), + name: format!("{} - Evaluator", step.name), + description: Some("Evaluate step output against directive criteria".to_string()), + plan: prompt, + parent_task_id: None, + is_supervisor: true, + priority: 8, + repository_url: directive.repository_url.clone(), + base_branch: directive.base_branch.clone(), + target_branch: None, + merge_mode: None, + target_repo_path: directive.local_path.clone(), + completion_action: None, + continue_from_task_id: None, + copy_files: None, + checkpoint_sha: None, + branched_from_task_id: None, + conversation_history: None, + supervisor_worktree_task_id: None, + }, + ) + .await + .map_err(|e| format!("Failed to create monitoring supervisor task: {}", e))?; + + // Link supervisor to contract + repository::update_contract_for_owner( + pool, + contract.id, + owner_id, + UpdateContractRequest { + supervisor_task_id: Some(supervisor_task.id), + ..Default::default() + }, + ) + .await + .map_err(|e| match e { + crate::db::repository::RepositoryError::Database(e) => { + format!("Failed to link supervisor to monitoring contract: {}", e) + } + other => format!("Failed to link supervisor to monitoring contract: {:?}", other), + })?; + + // Link step to monitoring contract/task + repository::update_step_monitoring_contract(pool, step.id, contract.id, supervisor_task.id) + .await + .map_err(|e| format!("Failed to update step monitoring contract link: {}", e))?; + + // Copy repo config from directive to monitoring contract + if let Some(ref repo_url) = directive.repository_url { + let _ = repository::add_remote_repository( + pool, + contract.id, + "directive-repo", + repo_url, + true, + ) + .await; + } else if let Some(ref local_path) = directive.local_path { + let _ = repository::add_local_repository( + pool, + contract.id, + "directive-repo", + local_path, + true, + ) + .await; + } + + tracing::info!( + directive_id = %directive.id, + step_id = %step.id, + step_name = %step.name, + monitoring_contract_id = %contract.id, + monitoring_task_id = %supervisor_task.id, + "Monitoring evaluation dispatched" + ); + + Ok(()) +} + +/// Build the monitoring supervisor prompt. +fn build_monitoring_prompt( + directive: &Directive, + step: &ChainStep, + step_contract: &crate::db::models::Contract, +) -> String { + format!( + r#"You are evaluating the output of a completed step in a directive chain. + +DIRECTIVE: {title} +GOAL: {goal} +REQUIREMENTS: {requirements} +ACCEPTANCE CRITERIA: {acceptance_criteria} +CONSTRAINTS: {constraints} + +STEP: {step_name} +STEP DESCRIPTION: {step_description} +STEP TASK PLAN: {task_plan} +STEP CONTRACT ID: {step_contract_id} + +CONFIDENCE THRESHOLDS: +- Green (pass): >= {threshold_green} +- Yellow (marginal): >= {threshold_yellow} +- Red (fail): < {threshold_yellow} + +Your job: +1. Read the step contract's files to understand what was delivered: + makima contract files --contract-id {step_contract_id} + makima contract file <file_id> --contract-id {step_contract_id} + +2. Evaluate whether the step's output meets the directive's requirements and the step's specific task plan. + +3. Write your evaluation result as a JSON file named "evaluation-result" to this contract: + makima contract create-file "evaluation-result" < evaluation.json + +The JSON format: +{{ + "passed": true/false, + "overallScore": 0.0-1.0, + "confidenceLevel": "green" | "yellow" | "red", + "criteriaResults": [ + {{ + "criterion": "Description of what was checked", + "passed": true/false, + "score": 0.0-1.0, + "evidence": "Evidence supporting the assessment" + }} + ], + "summaryFeedback": "Brief summary of the evaluation", + "reworkInstructions": "If failed, specific instructions for rework (null if passed)" +}} + +Scoring guidelines: +- Score >= {threshold_green}: confidenceLevel = "green", passed = true +- Score >= {threshold_yellow} and < {threshold_green}: confidenceLevel = "yellow", use judgment on passed +- Score < {threshold_yellow}: confidenceLevel = "red", passed = false +- Be specific in reworkInstructions if the step fails — the step will be re-executed with these instructions. + +After writing the evaluation file, mark the contract as complete: + makima supervisor complete"#, + title = directive.title, + goal = directive.goal, + requirements = serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(), + acceptance_criteria = serde_json::to_string_pretty(&directive.acceptance_criteria).unwrap_or_default(), + constraints = serde_json::to_string_pretty(&directive.constraints).unwrap_or_default(), + step_name = step.name, + step_description = step.description.as_deref().unwrap_or("N/A"), + task_plan = step.task_plan.as_deref().unwrap_or("N/A"), + step_contract_id = step_contract.id, + threshold_green = directive.confidence_threshold_green, + threshold_yellow = directive.confidence_threshold_yellow, + ) +} + +/// Handle monitoring contract task completion — parse evaluation and decide step outcome. +async fn on_monitoring_completed( + pool: &PgPool, + state: &SharedState, + contract: &crate::db::models::Contract, + step: &ChainStep, + task: &Task, + owner_id: Uuid, +) -> Result<(), String> { + // Only process supervisor task completions + if !task.is_supervisor { + return Ok(()); + } + + let Some(directive_id) = contract.directive_id else { + return Ok(()); + }; + + let directive = repository::get_directive(pool, directive_id) + .await + .map_err(|e| format!("Failed to get directive: {}", e))? + .ok_or("Directive not found")?; + + // If monitoring task itself failed, fail-open: mark step as passed + if task.status == "failed" { + tracing::warn!( + directive_id = %directive_id, + step_id = %step.id, + "Monitoring task failed, fail-open: marking step as passed" + ); + + repository::update_step_status(pool, step.id, "passed") + .await + .map_err(|e| format!("Failed to update step status: {}", e))?; + + let _ = repository::create_directive_event( + pool, + directive_id, + directive.current_chain_id, + Some(step.id), + "monitoring_failed_open", + "warn", + None, + "system", + None, + ) + .await; + + return advance_chain(pool, state, &directive, owner_id).await; + } + + if task.status != "done" { + return Ok(()); + } + + // Read evaluation result from monitoring contract files + let files = repository::list_files_in_contract(pool, contract.id, owner_id) + .await + .map_err(|e| format!("Failed to list monitoring contract files: {}", e))?; + + let eval_file = files.iter().find(|f| { + let name_lower = f.name.to_lowercase(); + name_lower.contains("evaluation") || name_lower.contains("eval") + }); + + let eval_file = eval_file.or_else(|| files.first()); + + let monitoring_result = if let Some(eval_file) = eval_file { + let full_file = repository::get_file(pool, eval_file.id) + .await + .map_err(|e| format!("Failed to get evaluation file: {}", e))?; + + if let Some(full_file) = full_file { + let json_str = extract_plan_json(&full_file.body); + json_str.and_then(|s| serde_json::from_str::<MonitoringResult>(&s).ok()) + } else { + None + } + } else { + None + }; + + // If we couldn't parse the result, fail-open + let Some(result) = monitoring_result else { + tracing::warn!( + directive_id = %directive_id, + step_id = %step.id, + "Could not parse monitoring result, fail-open: marking step as passed" + ); + + repository::update_step_status(pool, step.id, "passed") + .await + .map_err(|e| format!("Failed to update step status: {}", e))?; + + let _ = repository::create_directive_event( + pool, + directive_id, + directive.current_chain_id, + Some(step.id), + "monitoring_parse_failed_open", + "warn", + None, + "system", + None, + ) + .await; + + return advance_chain(pool, state, &directive, owner_id).await; + }; + + // Create evaluation record + let chain_id = directive.current_chain_id.unwrap_or(step.chain_id); + let evaluation = repository::create_directive_evaluation( + pool, + directive_id, + chain_id, + step.id, + contract.id, + "monitoring", + Some("automated"), + result.passed, + result.overall_score, + result.confidence_level.as_deref(), + &result.criteria_results, + &result.summary_feedback, + result.rework_instructions.as_deref(), + ) + .await + .map_err(|e| format!("Failed to create directive evaluation: {}", e))?; + + // Update step evaluation fields + repository::update_step_evaluation_fields( + pool, + step.id, + result.overall_score, + result.confidence_level.as_deref(), + evaluation.id, + ) + .await + .map_err(|e| format!("Failed to update step evaluation fields: {}", e))?; + + // Create event + let event_data = serde_json::json!({ + "passed": result.passed, + "overallScore": result.overall_score, + "confidenceLevel": result.confidence_level, + "summaryFeedback": result.summary_feedback, + }); + let _ = repository::create_directive_event( + pool, + directive_id, + Some(chain_id), + Some(step.id), + if result.passed { "step_evaluation_passed" } else { "step_evaluation_failed" }, + "info", + Some(&event_data), + "system", + None, + ) + .await; + + if result.passed { + // Evaluation passed — mark step as passed + tracing::info!( + directive_id = %directive_id, + step_id = %step.id, + step_name = %step.name, + score = ?result.overall_score, + "Step evaluation passed" + ); + + repository::update_step_status(pool, step.id, "passed") + .await + .map_err(|e| format!("Failed to update step status: {}", e))?; + + advance_chain(pool, state, &directive, owner_id).await + } else { + // Evaluation failed — check rework budget + let max_rework = directive.max_rework_cycles.unwrap_or(3); + if step.rework_count >= max_rework { + tracing::warn!( + directive_id = %directive_id, + step_id = %step.id, + step_name = %step.name, + rework_count = step.rework_count, + max_rework = max_rework, + "Step evaluation failed, max rework cycles exceeded" + ); + + repository::update_step_status(pool, step.id, "failed") + .await + .map_err(|e| format!("Failed to update step status: {}", e))?; + + advance_chain(pool, state, &directive, owner_id).await + } else { + tracing::info!( + directive_id = %directive_id, + step_id = %step.id, + step_name = %step.name, + rework_count = step.rework_count, + "Step evaluation failed, scheduling rework" + ); + + repository::increment_step_rework_count(pool, step.id) + .await + .map_err(|e| format!("Failed to increment rework count: {}", e))?; + + // Set step back to pending so advance_chain re-dispatches it + repository::update_step_status(pool, step.id, "pending") + .await + .map_err(|e| format!("Failed to update step status: {}", e))?; + + advance_chain(pool, state, &directive, owner_id).await + } + } +} + +/// Trigger a manual evaluation for a step. Public for use by handlers. +pub async fn trigger_manual_evaluation( + pool: &PgPool, + _state: &SharedState, + owner_id: Uuid, + directive_id: Uuid, + step_id: Uuid, +) -> Result<ChainStep, String> { + let directive = repository::get_directive_for_owner(pool, directive_id, owner_id) + .await + .map_err(|e| format!("Failed to get directive: {}", e))? + .ok_or("Directive not found")?; + + // Get the step — find via chain steps + let chain_id = directive.current_chain_id.ok_or("Directive has no active chain")?; + let steps = repository::list_steps_for_chain(pool, chain_id) + .await + .map_err(|e| format!("Failed to list steps: {}", e))?; + + let step = steps + .into_iter() + .find(|s| s.id == step_id) + .ok_or("Step not found in current chain")?; + + // Step must have a contract_id (must have been executed) + let contract_id = step.contract_id.ok_or("Step has no contract — it hasn't been executed yet")?; + + let contract = repository::get_contract_for_owner(pool, contract_id, owner_id) + .await + .map_err(|e| format!("Failed to get step contract: {}", e))? + .ok_or("Step contract not found")?; + + // Set step to evaluating + let updated_step = repository::update_step_status(pool, step.id, "evaluating") + .await + .map_err(|e| format!("Failed to update step status: {}", e))? + .ok_or("Step not found after status update")?; + + let _ = repository::create_directive_event( + pool, + directive.id, + directive.current_chain_id, + Some(step.id), + "manual_evaluation_triggered", + "info", + None, + "user", + None, + ) + .await; + + dispatch_monitoring(pool, &directive, &step, &contract, owner_id).await?; + + Ok(updated_step) +} |
