From 97e21c8296ec5f91912d56980ebf3b18a1ca3507 Mon Sep 17 00:00:00 2001 From: soryu Date: Sat, 7 Feb 2026 18:27:54 +0000 Subject: Add directive monitor contracts --- makima/src/bin/makima.rs | 14 + makima/src/daemon/api/directive.rs | 26 ++ makima/src/daemon/cli/directive.rs | 20 ++ makima/src/daemon/cli/mod.rs | 6 + makima/src/daemon/skills/directive.md | 20 ++ makima/src/db/models.rs | 86 ++++- makima/src/db/repository.rs | 214 +++++++++++- makima/src/orchestration/directive.rs | 562 +++++++++++++++++++++++++++++-- makima/src/server/handlers/directives.rs | 223 +++++++++++- makima/src/server/mod.rs | 2 + makima/src/server/openapi.rs | 18 +- 11 files changed, 1152 insertions(+), 39 deletions(-) (limited to 'makima/src') diff --git a/makima/src/bin/makima.rs b/makima/src/bin/makima.rs index 9d7f847..92fdae6 100644 --- a/makima/src/bin/makima.rs +++ b/makima/src/bin/makima.rs @@ -773,6 +773,20 @@ async fn run_directive( let result = client.directive_start(args.directive_id).await?; println!("{}", serde_json::to_string(&result.0)?); } + DirectiveCommand::Evaluate(args) => { + let client = ApiClient::new(args.common.api_url, args.common.api_key)?; + let result = client + .directive_evaluate_step(args.common.directive_id, args.step_id) + .await?; + println!("{}", serde_json::to_string(&result.0)?); + } + DirectiveCommand::Evaluations(args) => { + let client = ApiClient::new(args.common.api_url, args.common.api_key)?; + let result = client + .directive_evaluations(args.common.directive_id, args.step_id) + .await?; + println!("{}", serde_json::to_string(&result.0)?); + } } Ok(()) diff --git a/makima/src/daemon/api/directive.rs b/makima/src/daemon/api/directive.rs index 42f6f45..3589e78 100644 --- a/makima/src/daemon/api/directive.rs +++ b/makima/src/daemon/api/directive.rs @@ -57,4 +57,30 @@ impl ApiClient { self.post_empty(&format!("/api/v1/directives/{}/start", directive_id)) .await } + + /// Trigger a manual evaluation for a step. + pub async fn directive_evaluate_step( + &self, + directive_id: Uuid, + step_id: Uuid, + ) -> Result { + self.post_empty(&format!( + "/api/v1/directives/{}/steps/{}/evaluate", + directive_id, step_id + )) + .await + } + + /// List evaluations for a step. + pub async fn directive_evaluations( + &self, + directive_id: Uuid, + step_id: Uuid, + ) -> Result { + self.get(&format!( + "/api/v1/directives/{}/steps/{}/evaluations", + directive_id, step_id + )) + .await + } } diff --git a/makima/src/daemon/cli/directive.rs b/makima/src/daemon/cli/directive.rs index 5ce88c5..4c29c14 100644 --- a/makima/src/daemon/cli/directive.rs +++ b/makima/src/daemon/cli/directive.rs @@ -38,3 +38,23 @@ pub struct UpdateStatusArgs { /// New status (draft, planning, active, paused, completed, archived, failed) pub status: String, } + +/// Arguments for evaluate command (trigger manual evaluation). +#[derive(Args, Debug)] +pub struct EvaluateArgs { + #[command(flatten)] + pub common: DirectiveArgs, + + /// Step ID to evaluate + pub step_id: Uuid, +} + +/// Arguments for evaluations command (list evaluation history). +#[derive(Args, Debug)] +pub struct EvaluationsArgs { + #[command(flatten)] + pub common: DirectiveArgs, + + /// Step ID to list evaluations for + pub step_id: Uuid, +} diff --git a/makima/src/daemon/cli/mod.rs b/makima/src/daemon/cli/mod.rs index b07ab5a..c9a8c6f 100644 --- a/makima/src/daemon/cli/mod.rs +++ b/makima/src/daemon/cli/mod.rs @@ -225,6 +225,12 @@ pub enum DirectiveCommand { /// Start a directive (create planning contract and begin orchestration) Start(DirectiveArgs), + + /// Trigger a manual evaluation for a step + Evaluate(directive::EvaluateArgs), + + /// List evaluation history for a step + Evaluations(directive::EvaluationsArgs), } impl Cli { diff --git a/makima/src/daemon/skills/directive.md b/makima/src/daemon/skills/directive.md index cdfdaa2..0d1e9d6 100644 --- a/makima/src/daemon/skills/directive.md +++ b/makima/src/daemon/skills/directive.md @@ -47,6 +47,20 @@ makima directive update-status ``` Updates the directive status. Valid statuses: `draft`, `planning`, `active`, `paused`, `completed`, `archived`, `failed`. +## Evaluation + +### Trigger manual evaluation for a step +```bash +makima directive evaluate +``` +Triggers a monitoring evaluation for the specified step. The step must have been executed (have a contract). Sets the step to "evaluating" and dispatches a monitoring contract. + +### List evaluations for a step +```bash +makima directive evaluations +``` +Returns the evaluation history for a step, ordered by evaluation number. + ## Output Format All commands output JSON to stdout. @@ -63,6 +77,12 @@ makima directive chains # Get details of a specific chain makima directive chain +# Trigger manual evaluation of a step +makima directive evaluate + +# Check evaluation history +makima directive evaluations + # Update status to active makima directive update-status active ``` diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index bc90942..eff2df0 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -2849,12 +2849,37 @@ pub struct UpdateDirectiveRequest { pub version: Option, } +/// Lightweight contract summary attached to a chain step. +#[derive(Debug, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct StepContractSummary { + pub id: Uuid, + pub name: String, + pub contract_type: String, + pub phase: String, + pub status: String, + pub task_count: i64, + pub tasks_done: i64, + pub tasks_running: i64, + pub tasks_failed: i64, +} + +/// Chain step enriched with optional contract summary. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ChainStepWithContract { + #[serde(flatten)] + pub step: ChainStep, + pub contract_summary: Option, +} + /// Directive with its chains and steps for detail view. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct DirectiveWithChains { #[serde(flatten)] pub directive: Directive, + pub orchestrator_contract_summary: Option, pub chains: Vec, } @@ -2903,6 +2928,8 @@ pub struct ChainStep { pub status: String, pub contract_id: Option, pub supervisor_task_id: Option, + pub monitoring_contract_id: Option, + pub monitoring_task_id: Option, pub confidence_score: Option, pub confidence_level: Option, pub evaluation_count: i32, @@ -2922,5 +2949,62 @@ pub struct ChainStep { pub struct ChainWithSteps { #[serde(flatten)] pub chain: DirectiveChain, - pub steps: Vec, + pub steps: Vec, +} + +/// Full row from directive_evaluations table. +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct DirectiveEvaluation { + pub id: Uuid, + pub directive_id: Uuid, + pub chain_id: Option, + pub step_id: Option, + pub contract_id: Option, + pub evaluation_type: String, + pub evaluation_number: i32, + pub evaluator: Option, + pub passed: bool, + pub overall_score: Option, + pub confidence_level: Option, + #[sqlx(json)] + pub programmatic_results: serde_json::Value, + #[sqlx(json)] + pub llm_results: serde_json::Value, + #[sqlx(json)] + pub criteria_results: serde_json::Value, + pub summary_feedback: String, + pub rework_instructions: Option, + #[sqlx(json)] + pub directive_snapshot: Option, + #[sqlx(json)] + pub deliverables_snapshot: Option, + pub started_at: DateTime, + pub completed_at: Option>, + pub created_at: DateTime, +} + +/// Full row from directive_events table. +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct DirectiveEvent { + pub id: Uuid, + pub directive_id: Uuid, + pub chain_id: Option, + pub step_id: Option, + pub event_type: String, + pub severity: String, + #[sqlx(json)] + pub event_data: Option, + pub actor_type: String, + pub actor_id: Option, + pub created_at: DateTime, +} + +/// Response for evaluation list endpoint. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct EvaluationListResponse { + pub evaluations: Vec, + pub total: i64, } diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index e072eb8..d50ef61 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -11,12 +11,13 @@ use super::models::{ ContractTypeTemplateRecord, ConversationMessage, ConversationSnapshot, CreateContractRequest, CreateDirectiveRequest, CreateFileRequest, CreateTaskRequest, CreateTemplateRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, - DeliverableDefinition, Directive, DirectiveChain, DirectiveSummary, + DeliverableDefinition, Directive, DirectiveChain, DirectiveEvaluation, DirectiveEvent, + DirectiveSummary, File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters, MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult, PhaseConfig, - PhaseDefinition, SupervisorHeartbeatRecord, SupervisorState, Task, TaskCheckpoint, - TaskEvent, TaskSummary, UpdateContractRequest, UpdateDirectiveRequest, - UpdateFileRequest, UpdateTaskRequest, UpdateTemplateRequest, + PhaseDefinition, StepContractSummary, SupervisorHeartbeatRecord, SupervisorState, + Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest, + UpdateDirectiveRequest, UpdateFileRequest, UpdateTaskRequest, UpdateTemplateRequest, }; /// Repository error types. @@ -5184,6 +5185,29 @@ pub async fn list_steps_for_chain( .await } +/// Batch-fetch lightweight contract summaries for a set of contract IDs. +pub async fn get_contract_summaries_batch( + pool: &PgPool, + contract_ids: &[Uuid], +) -> Result, sqlx::Error> { + sqlx::query_as::<_, StepContractSummary>( + r#" + SELECT c.id, c.name, c.contract_type, c.phase, c.status, + COUNT(t.id) as task_count, + COUNT(t.id) FILTER (WHERE t.status IN ('done','merged')) as tasks_done, + COUNT(t.id) FILTER (WHERE t.status IN ('running','initializing','starting')) as tasks_running, + COUNT(t.id) FILTER (WHERE t.status = 'failed') as tasks_failed + FROM contracts c + LEFT JOIN tasks t ON t.contract_id = c.id + WHERE c.id = ANY($1) + GROUP BY c.id, c.name, c.contract_type, c.phase, c.status + "#, + ) + .bind(contract_ids) + .fetch_all(pool) + .await +} + // ── Directive orchestration functions ─────────────────────────────────────── /// Update directive status with automatic timestamp management. @@ -5479,3 +5503,185 @@ pub async fn update_chain_status( .fetch_optional(pool) .await } + +// ── Directive monitoring / evaluation functions ───────────────────────────── + +/// Create a directive evaluation record. evaluation_number is auto-incremented per step. +pub async fn create_directive_evaluation( + pool: &PgPool, + directive_id: Uuid, + chain_id: Uuid, + step_id: Uuid, + contract_id: Uuid, + evaluation_type: &str, + evaluator: Option<&str>, + passed: bool, + overall_score: Option, + confidence_level: Option<&str>, + criteria_results: &serde_json::Value, + summary_feedback: &str, + rework_instructions: Option<&str>, +) -> Result { + sqlx::query_as::<_, DirectiveEvaluation>( + r#" + INSERT INTO directive_evaluations ( + directive_id, chain_id, step_id, contract_id, + evaluation_type, evaluation_number, evaluator, + passed, overall_score, confidence_level, + criteria_results, summary_feedback, rework_instructions, + completed_at + ) + VALUES ( + $1, $2, $3, $4, + $5, COALESCE((SELECT MAX(evaluation_number) FROM directive_evaluations WHERE step_id = $3), 0) + 1, $6, + $7, $8, $9, + $10, $11, $12, + NOW() + ) + RETURNING * + "#, + ) + .bind(directive_id) + .bind(chain_id) + .bind(step_id) + .bind(contract_id) + .bind(evaluation_type) + .bind(evaluator) + .bind(passed) + .bind(overall_score) + .bind(confidence_level) + .bind(criteria_results) + .bind(summary_feedback) + .bind(rework_instructions) + .fetch_one(pool) + .await +} + +/// List evaluations for a step, ordered by evaluation_number. +pub async fn list_evaluations_for_step( + pool: &PgPool, + step_id: Uuid, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, DirectiveEvaluation>( + r#" + SELECT * FROM directive_evaluations + WHERE step_id = $1 + ORDER BY evaluation_number ASC + "#, + ) + .bind(step_id) + .fetch_all(pool) + .await +} + +/// Create a directive event. +pub async fn create_directive_event( + pool: &PgPool, + directive_id: Uuid, + chain_id: Option, + step_id: Option, + event_type: &str, + severity: &str, + event_data: Option<&serde_json::Value>, + actor_type: &str, + actor_id: Option, +) -> Result { + sqlx::query_as::<_, DirectiveEvent>( + r#" + INSERT INTO directive_events (directive_id, chain_id, step_id, event_type, severity, event_data, actor_type, actor_id) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + RETURNING * + "#, + ) + .bind(directive_id) + .bind(chain_id) + .bind(step_id) + .bind(event_type) + .bind(severity) + .bind(event_data) + .bind(actor_type) + .bind(actor_id) + .fetch_one(pool) + .await +} + +/// Update step evaluation fields after an evaluation completes. +pub async fn update_step_evaluation_fields( + pool: &PgPool, + step_id: Uuid, + confidence_score: Option, + confidence_level: Option<&str>, + last_evaluation_id: Uuid, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, ChainStep>( + r#" + UPDATE chain_steps + SET confidence_score = $2, + confidence_level = $3, + evaluation_count = evaluation_count + 1, + last_evaluation_id = $4 + WHERE id = $1 + RETURNING * + "#, + ) + .bind(step_id) + .bind(confidence_score) + .bind(confidence_level) + .bind(last_evaluation_id) + .fetch_optional(pool) + .await +} + +/// Update step monitoring contract/task references. +pub async fn update_step_monitoring_contract( + pool: &PgPool, + step_id: Uuid, + monitoring_contract_id: Uuid, + monitoring_task_id: Uuid, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, ChainStep>( + r#" + UPDATE chain_steps + SET monitoring_contract_id = $2, + monitoring_task_id = $3 + WHERE id = $1 + RETURNING * + "#, + ) + .bind(step_id) + .bind(monitoring_contract_id) + .bind(monitoring_task_id) + .fetch_optional(pool) + .await +} + +/// Increment step rework_count. +pub async fn increment_step_rework_count( + pool: &PgPool, + step_id: Uuid, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, ChainStep>( + r#" + UPDATE chain_steps + SET rework_count = rework_count + 1 + WHERE id = $1 + RETURNING * + "#, + ) + .bind(step_id) + .fetch_optional(pool) + .await +} + +/// Get a chain step by its monitoring contract ID. +pub async fn get_step_by_monitoring_contract_id( + pool: &PgPool, + contract_id: Uuid, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, ChainStep>( + r#"SELECT * FROM chain_steps WHERE monitoring_contract_id = $1"#, + ) + .bind(contract_id) + .fetch_optional(pool) + .await +} diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs index d17deeb..e779c18 100644 --- a/makima/src/orchestration/directive.rs +++ b/makima/src/orchestration/directive.rs @@ -4,8 +4,9 @@ use serde::Deserialize; use sqlx::PgPool; use uuid::Uuid; +use serde::Serialize; use crate::db::models::{ - CreateContractRequest, CreateTaskRequest, Directive, Task, UpdateContractRequest, + ChainStep, CreateContractRequest, CreateTaskRequest, Directive, Task, UpdateContractRequest, }; use crate::db::repository; use crate::server::state::SharedState; @@ -26,6 +27,20 @@ struct ChainPlan { steps: Vec, } +/// Result written by the monitoring supervisor after evaluating a step. +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +struct MonitoringResult { + passed: bool, + overall_score: Option, + confidence_level: Option, + #[serde(default)] + criteria_results: serde_json::Value, + #[serde(default)] + summary_feedback: String, + rework_instructions: Option, +} + /// Initialize a directive: create a planning contract and transition to "planning". pub async fn init_directive( pool: &PgPool, @@ -195,8 +210,18 @@ pub async fn on_task_completed( on_planning_completed(pool, state, &directive, task, owner_id).await?; } } else if contract.directive_id.is_some() { - // This is a step contract completion - on_step_completed(pool, state, &contract, task, owner_id).await?; + // Check if this is a monitoring contract completion + let monitoring_step = + repository::get_step_by_monitoring_contract_id(pool, contract_id) + .await + .map_err(|e| format!("Failed to check monitoring contract: {}", e))?; + + if let Some(step) = monitoring_step { + on_monitoring_completed(pool, state, &contract, &step, task, owner_id).await?; + } else { + // This is a step contract completion + on_step_completed(pool, state, &contract, task, owner_id).await?; + } } Ok(()) @@ -403,32 +428,54 @@ async fn on_step_completed( return Ok(()); }; - // Update step status based on task outcome - let new_status = if task.status == "done" { - "passed" - } else { - "failed" - }; - - repository::update_step_status(pool, step.id, new_status) - .await - .map_err(|e| format!("Failed to update step status: {}", e))?; - - tracing::info!( - directive_id = %directive_id, - step_id = %step.id, - step_name = %step.name, - new_status = new_status, - "Step completed" - ); - - // Get the directive and advance + // Get the directive for threshold info let directive = repository::get_directive(pool, directive_id) .await .map_err(|e| format!("Failed to get directive: {}", e))? .ok_or("Directive not found")?; - advance_chain(pool, state, &directive, owner_id).await + if task.status == "done" { + // Step task succeeded — dispatch monitoring evaluation + repository::update_step_status(pool, step.id, "evaluating") + .await + .map_err(|e| format!("Failed to update step status: {}", e))?; + + let _ = repository::create_directive_event( + pool, + directive.id, + directive.current_chain_id, + Some(step.id), + "step_evaluating", + "info", + None, + "system", + None, + ) + .await; + + tracing::info!( + directive_id = %directive_id, + step_id = %step.id, + step_name = %step.name, + "Step task done, dispatching monitoring evaluation" + ); + + dispatch_monitoring(pool, &directive, &step, contract, owner_id).await + } else { + // Step task failed — mark step failed and advance + repository::update_step_status(pool, step.id, "failed") + .await + .map_err(|e| format!("Failed to update step status: {}", e))?; + + tracing::info!( + directive_id = %directive_id, + step_id = %step.id, + step_name = %step.name, + "Step failed" + ); + + advance_chain(pool, state, &directive, owner_id).await + } } /// Check chain progress and dispatch ready steps or mark directive complete. @@ -734,3 +781,470 @@ fn extract_plan_json(body: &[crate::db::models::BodyElement]) -> Option None } + +/// Dispatch a monitoring contract to evaluate a completed step. +async fn dispatch_monitoring( + pool: &PgPool, + directive: &Directive, + step: &ChainStep, + step_contract: &crate::db::models::Contract, + owner_id: Uuid, +) -> Result<(), String> { + // Create monitoring contract + let contract = repository::create_contract_for_owner( + pool, + owner_id, + CreateContractRequest { + name: format!("{} - Monitor", step.name), + description: Some(format!("Monitoring evaluation for step: {}", step.name)), + contract_type: Some("monitoring".to_string()), + template_id: None, + initial_phase: Some("plan".to_string()), + autonomous_loop: Some(true), + phase_guard: None, + local_only: Some(true), + auto_merge_local: None, + }, + ) + .await + .map_err(|e| format!("Failed to create monitoring contract: {}", e))?; + + // Mark contract as directive-related (not orchestrator) + repository::set_contract_directive_fields(pool, contract.id, Some(directive.id), false) + .await + .map_err(|e| format!("Failed to set monitoring contract directive fields: {}", e))?; + + // Build evaluation prompt + let prompt = build_monitoring_prompt(directive, step, step_contract); + + // Create supervisor task + let supervisor_task = repository::create_task_for_owner( + pool, + owner_id, + CreateTaskRequest { + contract_id: Some(contract.id), + name: format!("{} - Evaluator", step.name), + description: Some("Evaluate step output against directive criteria".to_string()), + plan: prompt, + parent_task_id: None, + is_supervisor: true, + priority: 8, + repository_url: directive.repository_url.clone(), + base_branch: directive.base_branch.clone(), + target_branch: None, + merge_mode: None, + target_repo_path: directive.local_path.clone(), + completion_action: None, + continue_from_task_id: None, + copy_files: None, + checkpoint_sha: None, + branched_from_task_id: None, + conversation_history: None, + supervisor_worktree_task_id: None, + }, + ) + .await + .map_err(|e| format!("Failed to create monitoring supervisor task: {}", e))?; + + // Link supervisor to contract + repository::update_contract_for_owner( + pool, + contract.id, + owner_id, + UpdateContractRequest { + supervisor_task_id: Some(supervisor_task.id), + ..Default::default() + }, + ) + .await + .map_err(|e| match e { + crate::db::repository::RepositoryError::Database(e) => { + format!("Failed to link supervisor to monitoring contract: {}", e) + } + other => format!("Failed to link supervisor to monitoring contract: {:?}", other), + })?; + + // Link step to monitoring contract/task + repository::update_step_monitoring_contract(pool, step.id, contract.id, supervisor_task.id) + .await + .map_err(|e| format!("Failed to update step monitoring contract link: {}", e))?; + + // Copy repo config from directive to monitoring contract + if let Some(ref repo_url) = directive.repository_url { + let _ = repository::add_remote_repository( + pool, + contract.id, + "directive-repo", + repo_url, + true, + ) + .await; + } else if let Some(ref local_path) = directive.local_path { + let _ = repository::add_local_repository( + pool, + contract.id, + "directive-repo", + local_path, + true, + ) + .await; + } + + tracing::info!( + directive_id = %directive.id, + step_id = %step.id, + step_name = %step.name, + monitoring_contract_id = %contract.id, + monitoring_task_id = %supervisor_task.id, + "Monitoring evaluation dispatched" + ); + + Ok(()) +} + +/// Build the monitoring supervisor prompt. +fn build_monitoring_prompt( + directive: &Directive, + step: &ChainStep, + step_contract: &crate::db::models::Contract, +) -> String { + format!( + r#"You are evaluating the output of a completed step in a directive chain. + +DIRECTIVE: {title} +GOAL: {goal} +REQUIREMENTS: {requirements} +ACCEPTANCE CRITERIA: {acceptance_criteria} +CONSTRAINTS: {constraints} + +STEP: {step_name} +STEP DESCRIPTION: {step_description} +STEP TASK PLAN: {task_plan} +STEP CONTRACT ID: {step_contract_id} + +CONFIDENCE THRESHOLDS: +- Green (pass): >= {threshold_green} +- Yellow (marginal): >= {threshold_yellow} +- Red (fail): < {threshold_yellow} + +Your job: +1. Read the step contract's files to understand what was delivered: + makima contract files --contract-id {step_contract_id} + makima contract file --contract-id {step_contract_id} + +2. Evaluate whether the step's output meets the directive's requirements and the step's specific task plan. + +3. Write your evaluation result as a JSON file named "evaluation-result" to this contract: + makima contract create-file "evaluation-result" < evaluation.json + +The JSON format: +{{ + "passed": true/false, + "overallScore": 0.0-1.0, + "confidenceLevel": "green" | "yellow" | "red", + "criteriaResults": [ + {{ + "criterion": "Description of what was checked", + "passed": true/false, + "score": 0.0-1.0, + "evidence": "Evidence supporting the assessment" + }} + ], + "summaryFeedback": "Brief summary of the evaluation", + "reworkInstructions": "If failed, specific instructions for rework (null if passed)" +}} + +Scoring guidelines: +- Score >= {threshold_green}: confidenceLevel = "green", passed = true +- Score >= {threshold_yellow} and < {threshold_green}: confidenceLevel = "yellow", use judgment on passed +- Score < {threshold_yellow}: confidenceLevel = "red", passed = false +- Be specific in reworkInstructions if the step fails — the step will be re-executed with these instructions. + +After writing the evaluation file, mark the contract as complete: + makima supervisor complete"#, + title = directive.title, + goal = directive.goal, + requirements = serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(), + acceptance_criteria = serde_json::to_string_pretty(&directive.acceptance_criteria).unwrap_or_default(), + constraints = serde_json::to_string_pretty(&directive.constraints).unwrap_or_default(), + step_name = step.name, + step_description = step.description.as_deref().unwrap_or("N/A"), + task_plan = step.task_plan.as_deref().unwrap_or("N/A"), + step_contract_id = step_contract.id, + threshold_green = directive.confidence_threshold_green, + threshold_yellow = directive.confidence_threshold_yellow, + ) +} + +/// Handle monitoring contract task completion — parse evaluation and decide step outcome. +async fn on_monitoring_completed( + pool: &PgPool, + state: &SharedState, + contract: &crate::db::models::Contract, + step: &ChainStep, + task: &Task, + owner_id: Uuid, +) -> Result<(), String> { + // Only process supervisor task completions + if !task.is_supervisor { + return Ok(()); + } + + let Some(directive_id) = contract.directive_id else { + return Ok(()); + }; + + let directive = repository::get_directive(pool, directive_id) + .await + .map_err(|e| format!("Failed to get directive: {}", e))? + .ok_or("Directive not found")?; + + // If monitoring task itself failed, fail-open: mark step as passed + if task.status == "failed" { + tracing::warn!( + directive_id = %directive_id, + step_id = %step.id, + "Monitoring task failed, fail-open: marking step as passed" + ); + + repository::update_step_status(pool, step.id, "passed") + .await + .map_err(|e| format!("Failed to update step status: {}", e))?; + + let _ = repository::create_directive_event( + pool, + directive_id, + directive.current_chain_id, + Some(step.id), + "monitoring_failed_open", + "warn", + None, + "system", + None, + ) + .await; + + return advance_chain(pool, state, &directive, owner_id).await; + } + + if task.status != "done" { + return Ok(()); + } + + // Read evaluation result from monitoring contract files + let files = repository::list_files_in_contract(pool, contract.id, owner_id) + .await + .map_err(|e| format!("Failed to list monitoring contract files: {}", e))?; + + let eval_file = files.iter().find(|f| { + let name_lower = f.name.to_lowercase(); + name_lower.contains("evaluation") || name_lower.contains("eval") + }); + + let eval_file = eval_file.or_else(|| files.first()); + + let monitoring_result = if let Some(eval_file) = eval_file { + let full_file = repository::get_file(pool, eval_file.id) + .await + .map_err(|e| format!("Failed to get evaluation file: {}", e))?; + + if let Some(full_file) = full_file { + let json_str = extract_plan_json(&full_file.body); + json_str.and_then(|s| serde_json::from_str::(&s).ok()) + } else { + None + } + } else { + None + }; + + // If we couldn't parse the result, fail-open + let Some(result) = monitoring_result else { + tracing::warn!( + directive_id = %directive_id, + step_id = %step.id, + "Could not parse monitoring result, fail-open: marking step as passed" + ); + + repository::update_step_status(pool, step.id, "passed") + .await + .map_err(|e| format!("Failed to update step status: {}", e))?; + + let _ = repository::create_directive_event( + pool, + directive_id, + directive.current_chain_id, + Some(step.id), + "monitoring_parse_failed_open", + "warn", + None, + "system", + None, + ) + .await; + + return advance_chain(pool, state, &directive, owner_id).await; + }; + + // Create evaluation record + let chain_id = directive.current_chain_id.unwrap_or(step.chain_id); + let evaluation = repository::create_directive_evaluation( + pool, + directive_id, + chain_id, + step.id, + contract.id, + "monitoring", + Some("automated"), + result.passed, + result.overall_score, + result.confidence_level.as_deref(), + &result.criteria_results, + &result.summary_feedback, + result.rework_instructions.as_deref(), + ) + .await + .map_err(|e| format!("Failed to create directive evaluation: {}", e))?; + + // Update step evaluation fields + repository::update_step_evaluation_fields( + pool, + step.id, + result.overall_score, + result.confidence_level.as_deref(), + evaluation.id, + ) + .await + .map_err(|e| format!("Failed to update step evaluation fields: {}", e))?; + + // Create event + let event_data = serde_json::json!({ + "passed": result.passed, + "overallScore": result.overall_score, + "confidenceLevel": result.confidence_level, + "summaryFeedback": result.summary_feedback, + }); + let _ = repository::create_directive_event( + pool, + directive_id, + Some(chain_id), + Some(step.id), + if result.passed { "step_evaluation_passed" } else { "step_evaluation_failed" }, + "info", + Some(&event_data), + "system", + None, + ) + .await; + + if result.passed { + // Evaluation passed — mark step as passed + tracing::info!( + directive_id = %directive_id, + step_id = %step.id, + step_name = %step.name, + score = ?result.overall_score, + "Step evaluation passed" + ); + + repository::update_step_status(pool, step.id, "passed") + .await + .map_err(|e| format!("Failed to update step status: {}", e))?; + + advance_chain(pool, state, &directive, owner_id).await + } else { + // Evaluation failed — check rework budget + let max_rework = directive.max_rework_cycles.unwrap_or(3); + if step.rework_count >= max_rework { + tracing::warn!( + directive_id = %directive_id, + step_id = %step.id, + step_name = %step.name, + rework_count = step.rework_count, + max_rework = max_rework, + "Step evaluation failed, max rework cycles exceeded" + ); + + repository::update_step_status(pool, step.id, "failed") + .await + .map_err(|e| format!("Failed to update step status: {}", e))?; + + advance_chain(pool, state, &directive, owner_id).await + } else { + tracing::info!( + directive_id = %directive_id, + step_id = %step.id, + step_name = %step.name, + rework_count = step.rework_count, + "Step evaluation failed, scheduling rework" + ); + + repository::increment_step_rework_count(pool, step.id) + .await + .map_err(|e| format!("Failed to increment rework count: {}", e))?; + + // Set step back to pending so advance_chain re-dispatches it + repository::update_step_status(pool, step.id, "pending") + .await + .map_err(|e| format!("Failed to update step status: {}", e))?; + + advance_chain(pool, state, &directive, owner_id).await + } + } +} + +/// Trigger a manual evaluation for a step. Public for use by handlers. +pub async fn trigger_manual_evaluation( + pool: &PgPool, + _state: &SharedState, + owner_id: Uuid, + directive_id: Uuid, + step_id: Uuid, +) -> Result { + let directive = repository::get_directive_for_owner(pool, directive_id, owner_id) + .await + .map_err(|e| format!("Failed to get directive: {}", e))? + .ok_or("Directive not found")?; + + // Get the step — find via chain steps + let chain_id = directive.current_chain_id.ok_or("Directive has no active chain")?; + let steps = repository::list_steps_for_chain(pool, chain_id) + .await + .map_err(|e| format!("Failed to list steps: {}", e))?; + + let step = steps + .into_iter() + .find(|s| s.id == step_id) + .ok_or("Step not found in current chain")?; + + // Step must have a contract_id (must have been executed) + let contract_id = step.contract_id.ok_or("Step has no contract — it hasn't been executed yet")?; + + let contract = repository::get_contract_for_owner(pool, contract_id, owner_id) + .await + .map_err(|e| format!("Failed to get step contract: {}", e))? + .ok_or("Step contract not found")?; + + // Set step to evaluating + let updated_step = repository::update_step_status(pool, step.id, "evaluating") + .await + .map_err(|e| format!("Failed to update step status: {}", e))? + .ok_or("Step not found after status update")?; + + let _ = repository::create_directive_event( + pool, + directive.id, + directive.current_chain_id, + Some(step.id), + "manual_evaluation_triggered", + "info", + None, + "user", + None, + ) + .await; + + dispatch_monitoring(pool, &directive, &step, &contract, owner_id).await?; + + Ok(updated_step) +} diff --git a/makima/src/server/handlers/directives.rs b/makima/src/server/handlers/directives.rs index a877c6b..65f32d5 100644 --- a/makima/src/server/handlers/directives.rs +++ b/makima/src/server/handlers/directives.rs @@ -8,9 +8,12 @@ use axum::{ }; use uuid::Uuid; +use std::collections::HashMap; + use crate::db::models::{ - ChainStep, ChainWithSteps, CreateDirectiveRequest, Directive, DirectiveChain, - DirectiveListResponse, DirectiveWithChains, UpdateDirectiveRequest, + ChainStep, ChainStepWithContract, ChainWithSteps, CreateDirectiveRequest, Directive, + DirectiveChain, DirectiveListResponse, DirectiveWithChains, EvaluationListResponse, + StepContractSummary, UpdateDirectiveRequest, }; use crate::db::repository::{self, RepositoryError}; use crate::orchestration; @@ -123,8 +126,8 @@ pub async fn get_directive( }; // Build chains with steps - let mut chains_with_steps = Vec::new(); - for chain in chains { + let mut all_steps_by_chain = Vec::new(); + for chain in &chains { let steps = match repository::list_steps_for_chain(pool, chain.id).await { Ok(s) => s, Err(e) => { @@ -132,11 +135,61 @@ pub async fn get_directive( Vec::new() } }; - chains_with_steps.push(ChainWithSteps { chain, steps }); + all_steps_by_chain.push(steps); + } + + // Collect all contract IDs (from steps + orchestrator) + let mut contract_ids: Vec = all_steps_by_chain + .iter() + .flat_map(|steps| steps.iter().filter_map(|s| s.contract_id)) + .collect(); + if let Some(orch_id) = directive.orchestrator_contract_id { + contract_ids.push(orch_id); } + // Batch fetch contract summaries + let mut summary_map: HashMap = if contract_ids.is_empty() { + HashMap::new() + } else { + match repository::get_contract_summaries_batch(pool, &contract_ids).await { + Ok(summaries) => summaries.into_iter().map(|s| (s.id, s)).collect(), + Err(e) => { + tracing::warn!("Failed to fetch contract summaries: {}", e); + HashMap::new() + } + } + }; + + // Build enriched chains + let chains_with_steps: Vec = chains + .into_iter() + .zip(all_steps_by_chain.into_iter()) + .map(|(chain, steps)| { + let enriched_steps = steps + .into_iter() + .map(|step| { + let contract_summary = + step.contract_id.and_then(|id| summary_map.remove(&id)); + ChainStepWithContract { + step, + contract_summary, + } + }) + .collect(); + ChainWithSteps { + chain, + steps: enriched_steps, + } + }) + .collect(); + + let orchestrator_contract_summary = directive + .orchestrator_contract_id + .and_then(|id| summary_map.remove(&id)); + Json(DirectiveWithChains { directive, + orchestrator_contract_summary, chains: chains_with_steps, }) .into_response() @@ -454,7 +507,37 @@ pub async fn get_chain( } }; - Json(ChainWithSteps { chain, steps }).into_response() + // Collect contract IDs from steps + let contract_ids: Vec = steps.iter().filter_map(|s| s.contract_id).collect(); + + let mut summary_map: HashMap = if contract_ids.is_empty() { + HashMap::new() + } else { + match repository::get_contract_summaries_batch(pool, &contract_ids).await { + Ok(summaries) => summaries.into_iter().map(|s| (s.id, s)).collect(), + Err(e) => { + tracing::warn!("Failed to fetch contract summaries: {}", e); + HashMap::new() + } + } + }; + + let enriched_steps = steps + .into_iter() + .map(|step| { + let contract_summary = step.contract_id.and_then(|id| summary_map.remove(&id)); + ChainStepWithContract { + step, + contract_summary, + } + }) + .collect(); + + Json(ChainWithSteps { + chain, + steps: enriched_steps, + }) + .into_response() } /// Start a directive: create a planning contract and begin orchestration. @@ -513,3 +596,131 @@ pub async fn start_directive( } } } + +/// Trigger a manual evaluation for a step. +#[utoipa::path( + post, + path = "/api/v1/directives/{id}/steps/{step_id}/evaluate", + params( + ("id" = Uuid, Path, description = "Directive ID"), + ("step_id" = Uuid, Path, description = "Step ID") + ), + responses( + (status = 200, description = "Evaluation triggered", body = ChainStep), + (status = 400, description = "Step cannot be evaluated", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Directives" +)] +pub async fn evaluate_step( + State(state): State, + Authenticated(auth): Authenticated, + Path((id, step_id)): Path<(Uuid, Uuid)>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match orchestration::directive::trigger_manual_evaluation(pool, &state, auth.owner_id, id, step_id).await { + Ok(step) => Json(step).into_response(), + Err(e) if e.contains("not found") || e.contains("Not found") => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", e)), + ) + .into_response(), + Err(e) if e.contains("hasn't been executed") || e.contains("no active chain") => ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("INVALID_STATE", e)), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to trigger evaluation for step {}: {}", step_id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("EVALUATION_FAILED", e)), + ) + .into_response() + } + } +} + +/// List evaluations for a step. +#[utoipa::path( + get, + path = "/api/v1/directives/{id}/steps/{step_id}/evaluations", + params( + ("id" = Uuid, Path, description = "Directive ID"), + ("step_id" = Uuid, Path, description = "Step ID") + ), + responses( + (status = 200, description = "List of evaluations", body = EvaluationListResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Directives" +)] +pub async fn list_evaluations( + State(state): State, + Authenticated(auth): Authenticated, + Path((id, step_id)): Path<(Uuid, Uuid)>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify directive exists and belongs to owner + match repository::get_directive_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get directive {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + + match repository::list_evaluations_for_step(pool, step_id).await { + Ok(evaluations) => { + let total = evaluations.len() as i64; + Json(EvaluationListResponse { evaluations, total }).into_response() + } + Err(e) => { + tracing::error!("Failed to list evaluations for step {}: {}", step_id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index 1a59e12..c8242ae 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -184,6 +184,8 @@ pub fn make_router(state: SharedState) -> Router { .route("/directives/{id}/start", post(directives::start_directive)) .route("/directives/{id}/chains", get(directives::list_chains)) .route("/directives/{id}/chains/{chain_id}", get(directives::get_chain)) + .route("/directives/{id}/steps/{step_id}/evaluate", post(directives::evaluate_step)) + .route("/directives/{id}/steps/{step_id}/evaluations", get(directives::list_evaluations)) // Contract supervisor resume endpoints .route("/contracts/{id}/supervisor/resume", post(mesh_supervisor::resume_supervisor)) .route("/contracts/{id}/supervisor/conversation/rewind", post(mesh_supervisor::rewind_conversation)) diff --git a/makima/src/server/openapi.rs b/makima/src/server/openapi.rs index 96c19e0..e680c07 100644 --- a/makima/src/server/openapi.rs +++ b/makima/src/server/openapi.rs @@ -4,17 +4,20 @@ use utoipa::OpenApi; use crate::db::models::{ AddLocalRepositoryRequest, AddRemoteRepositoryRequest, BranchInfo, BranchListResponse, - BranchTaskRequest, BranchTaskResponse, ChainStep, ChainWithSteps, ChangePhaseRequest, + BranchTaskRequest, BranchTaskResponse, ChainStep, ChainStepWithContract, ChainWithSteps, + ChangePhaseRequest, Contract, ContractChatHistoryResponse, ContractChatMessageRecord, ContractEvent, ContractListResponse, ContractRepository, ContractSummary, ContractWithRelations, CreateContractRequest, CreateDirectiveRequest, CreateFileRequest, CreateManagedRepositoryRequest, CreateTaskRequest, Daemon, DaemonDirectoriesResponse, - DaemonDirectory, DaemonListResponse, Directive, DirectiveChain, DirectiveListResponse, - DirectiveSummary, DirectiveWithChains, File, FileListResponse, FileSummary, + DaemonDirectory, DaemonListResponse, Directive, DirectiveChain, DirectiveEvaluation, + DirectiveEvent, DirectiveListResponse, DirectiveSummary, DirectiveWithChains, + EvaluationListResponse, File, FileListResponse, FileSummary, MergeCommitRequest, MergeCompleteCheckResponse, MergeResolveRequest, MergeResultResponse, MergeSkipRequest, MergeStartRequest, MergeStatusResponse, MeshChatConversation, MeshChatHistoryResponse, MeshChatMessageRecord, RepositoryHistoryEntry, - RepositoryHistoryListResponse, RepositorySuggestionsQuery, SendMessageRequest, Task, + RepositoryHistoryListResponse, RepositorySuggestionsQuery, SendMessageRequest, + StepContractSummary, Task, TaskEventListResponse, TaskListResponse, TaskSummary, TaskWithSubtasks, TranscriptEntry, UpdateContractRequest, UpdateDirectiveRequest, UpdateFileRequest, UpdateTaskRequest, }; @@ -114,6 +117,8 @@ use crate::server::messages::{ApiError, AudioEncoding, StartMessage, StopMessage directives::start_directive, directives::list_chains, directives::get_chain, + directives::evaluate_step, + directives::list_evaluations, ), components( schemas( @@ -205,9 +210,14 @@ use crate::server::messages::{ApiError, AudioEncoding, StartMessage, StopMessage DirectiveWithChains, DirectiveChain, ChainStep, + ChainStepWithContract, ChainWithSteps, + StepContractSummary, CreateDirectiveRequest, UpdateDirectiveRequest, + DirectiveEvaluation, + DirectiveEvent, + EvaluationListResponse, ) ), tags( -- cgit v1.2.3