diff options
| author | soryu <soryu@soryu.co> | 2026-02-08 19:50:20 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-02-08 19:50:20 +0000 |
| commit | 87fa8c4af66745bd30bc84b6c5ef657dd4dec002 (patch) | |
| tree | cf8216c349086819300b4400c2db27c5c97be62c /makima/src/orchestration/directive.rs | |
| parent | 2166befc8869dbb76008a1fe62f28a4936e77bce (diff) | |
| download | soryu-87fa8c4af66745bd30bc84b6c5ef657dd4dec002.tar.gz soryu-87fa8c4af66745bd30bc84b6c5ef657dd4dec002.zip | |
Fix directive evaluation and add to frontend
Diffstat (limited to 'makima/src/orchestration/directive.rs')
| -rw-r--r-- | makima/src/orchestration/directive.rs | 217 |
1 files changed, 180 insertions, 37 deletions
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs index 9a65035..46d9425 100644 --- a/makima/src/orchestration/directive.rs +++ b/makima/src/orchestration/directive.rs @@ -6,10 +6,11 @@ use uuid::Uuid; use serde::Serialize; use crate::db::models::{ - ChainStep, CreateContractRequest, CreateTaskRequest, Directive, Task, UpdateContractRequest, + ChainStep, CreateContractRequest, CreateTaskRequest, Directive, Task, + UpdateContractRequest, UpdateTaskRequest, }; use crate::db::repository; -use crate::server::state::SharedState; +use crate::server::state::{DaemonCommand, SharedState}; /// A single step in the chain plan produced by the planning supervisor. #[derive(Debug, Deserialize)] @@ -43,10 +44,124 @@ struct MonitoringResult { rework_instructions: Option<String>, } +/// Dispatch a task to an available daemon. Finds a connected daemon with capacity, +/// assigns the task, and sends a SpawnTask command. +async fn dispatch_task_to_daemon( + pool: &PgPool, + state: &SharedState, + task: &Task, + contract_local_only: bool, + contract_auto_merge_local: bool, + owner_id: Uuid, +) -> Result<(), String> { + // Find available daemons + let daemons = repository::get_available_daemons_excluding(pool, owner_id, &[]) + .await + .map_err(|e| format!("Failed to get available daemons: {}", e))?; + + let available_daemon = daemons.iter().find(|d| { + d.current_task_count < d.max_concurrent_tasks + && state.daemon_connections.contains_key(&d.connection_id) + }); + + let daemon = match available_daemon { + Some(d) => d, + None => { + tracing::warn!( + task_id = %task.id, + "No daemon available to dispatch task — will be picked up by retry loop" + ); + return Ok(()); + } + }; + + // Assign task to daemon + let update_req = UpdateTaskRequest { + status: Some("starting".to_string()), + daemon_id: Some(daemon.id), + version: Some(task.version), + ..Default::default() + }; + + let updated = repository::update_task_for_owner(pool, task.id, owner_id, update_req) + .await + .map_err(|e| format!("Failed to assign task to daemon: {:?}", e))?; + + let Some(updated_task) = updated else { + return Err("Task not found when assigning to daemon".to_string()); + }; + + // Get repo URL from task or contract repositories + let repo_url = if let Some(url) = &updated_task.repository_url { + Some(url.clone()) + } else if let Some(contract_id) = updated_task.contract_id { + match repository::list_contract_repositories(pool, contract_id).await { + Ok(repos) => repos + .iter() + .find(|r| r.is_primary) + .or(repos.first()) + .and_then(|r| r.repository_url.clone().or_else(|| r.local_path.clone())), + Err(_) => None, + } + } else { + None + }; + + let cmd = DaemonCommand::SpawnTask { + task_id: updated_task.id, + task_name: updated_task.name.clone(), + plan: updated_task.plan.clone(), + repo_url, + base_branch: updated_task.base_branch.clone(), + target_branch: updated_task.target_branch.clone(), + parent_task_id: updated_task.parent_task_id, + depth: updated_task.depth, + is_orchestrator: false, + target_repo_path: updated_task.target_repo_path.clone(), + completion_action: updated_task.completion_action.clone(), + continue_from_task_id: updated_task.continue_from_task_id, + copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), + contract_id: updated_task.contract_id, + is_supervisor: updated_task.is_supervisor, + autonomous_loop: updated_task.contract_id.is_some(), + resume_session: false, + conversation_history: None, + patch_data: None, + patch_base_sha: None, + local_only: contract_local_only, + auto_merge_local: contract_auto_merge_local, + supervisor_worktree_task_id: None, + }; + + if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { + tracing::warn!( + task_id = %task.id, + daemon_id = %daemon.id, + error = %e, + "Failed to send spawn command — rolling back" + ); + let rollback = UpdateTaskRequest { + status: Some("pending".to_string()), + clear_daemon_id: true, + ..Default::default() + }; + let _ = repository::update_task_for_owner(pool, task.id, owner_id, rollback).await; + return Ok(()); // Non-fatal, retry loop will pick it up + } + + tracing::info!( + task_id = %task.id, + daemon_id = %daemon.id, + "Dispatched directive task to daemon" + ); + + Ok(()) +} + /// Initialize a directive: create a planning contract and transition to "planning". pub async fn init_directive( pool: &PgPool, - _state: &SharedState, + state: &SharedState, owner_id: Uuid, directive_id: Uuid, ) -> Result<Directive, String> { @@ -179,6 +294,13 @@ pub async fn init_directive( "Directive started: planning contract created" ); + // 10. Dispatch planning task to an available daemon immediately + dispatch_task_to_daemon( + pool, state, &supervisor_task, + contract.local_only, contract.auto_merge_local, + owner_id, + ).await?; + Ok(updated) } @@ -334,7 +456,7 @@ pub async fn on_contract_completed( ) .await; - dispatch_monitoring(pool, &directive, &step, contract, owner_id).await?; + dispatch_monitoring(pool, state, &directive, &step, contract, owner_id).await?; } } } @@ -421,7 +543,7 @@ async fn on_planning_completed( /// Checks if a plan was submitted via the CLI; if not, retries or fails. async fn handle_planning_completion( pool: &PgPool, - _state: &SharedState, + state: &SharedState, directive: &Directive, owner_id: Uuid, ) -> Result<(), String> { @@ -488,7 +610,7 @@ async fn handle_planning_completion( .map_err(|e| format!("Failed to reset directive status: {}", e))?; // Re-init planning - init_directive(pool, _state, owner_id, directive.id).await?; + init_directive(pool, state, owner_id, directive.id).await?; Ok(()) } else { @@ -688,7 +810,7 @@ async fn on_step_completed( "Step task done, dispatching monitoring evaluation" ); - dispatch_monitoring(pool, &directive, &step, contract, owner_id).await + dispatch_monitoring(pool, state, &directive, &step, contract, owner_id).await } else { // Step task failed — mark step failed and advance repository::update_step_status(pool, step.id, "failed") @@ -711,7 +833,7 @@ async fn on_step_completed( /// Check chain progress and dispatch ready steps or mark directive complete. async fn advance_chain( pool: &PgPool, - _state: &SharedState, + state: &SharedState, directive: &Directive, owner_id: Uuid, ) -> Result<(), String> { @@ -755,7 +877,7 @@ async fn advance_chain( .map_err(|e| format!("Failed to find ready steps: {}", e))?; for step in ready_steps { - if let Err(e) = dispatch_step(pool, directive, &step, owner_id).await { + if let Err(e) = dispatch_step(pool, state, directive, &step, owner_id).await { tracing::error!( step_id = %step.id, step_name = %step.name, @@ -771,6 +893,7 @@ async fn advance_chain( /// Dispatch a single chain step as a new contract with supervisor. async fn dispatch_step( pool: &PgPool, + state: &SharedState, directive: &Directive, step: &crate::db::models::ChainStep, owner_id: Uuid, @@ -914,6 +1037,13 @@ async fn dispatch_step( "Step dispatched" ); + // Dispatch step task to an available daemon immediately + dispatch_task_to_daemon( + pool, state, &supervisor_task, + contract.local_only, contract.auto_merge_local, + owner_id, + ).await?; + Ok(()) } @@ -1038,6 +1168,7 @@ fn extract_plan_json(body: &[crate::db::models::BodyElement]) -> Option<String> /// Dispatch a monitoring contract to evaluate a completed step. async fn dispatch_monitoring( pool: &PgPool, + state: &SharedState, directive: &Directive, step: &ChainStep, step_contract: &crate::db::models::Contract, @@ -1153,6 +1284,13 @@ async fn dispatch_monitoring( "Monitoring evaluation dispatched" ); + // Dispatch monitoring task to an available daemon immediately + dispatch_task_to_daemon( + pool, state, &supervisor_task, + contract.local_only, contract.auto_merge_local, + owner_id, + ).await?; + Ok(()) } @@ -1181,40 +1319,45 @@ CONFIDENCE THRESHOLDS: - Yellow (marginal): >= {threshold_yellow} - Red (fail): < {threshold_yellow} -Your job: +INSTRUCTIONS: 1. Read the step contract's files to understand what was delivered: makima contract files --contract-id {step_contract_id} makima contract file <file_id> --contract-id {step_contract_id} -2. Evaluate whether the step's output meets the directive's requirements and the step's specific task plan. - -3. Write your evaluation result as a JSON file named "evaluation-result" to this contract: - makima contract create-file "evaluation-result" < evaluation.json - -The JSON format: -{{ - "passed": true/false, - "overallScore": 0.0-1.0, - "confidenceLevel": "green" | "yellow" | "red", - "criteriaResults": [ - {{ - "criterion": "Description of what was checked", - "passed": true/false, - "score": 0.0-1.0, - "evidence": "Evidence supporting the assessment" - }} - ], - "summaryFeedback": "Brief summary of the evaluation", - "reworkInstructions": "If failed, specific instructions for rework (null if passed)" -}} +2. Evaluate whether the step's output meets the directive's requirements and the step's task plan. + +3. Write your evaluation as a JSON file to this monitoring contract. Create a file called + evaluation.json with the JSON content first, then upload it: + + cat > /tmp/eval-result.json << 'EVALEOF' + {{ + "passed": true, + "overallScore": 0.85, + "confidenceLevel": "green", + "criteriaResults": [ + {{ + "criterion": "Example criterion", + "passed": true, + "score": 0.9, + "evidence": "What was found" + }} + ], + "summaryFeedback": "Summary of evaluation", + "reworkInstructions": null + }} + EVALEOF + makima contract create-file evaluation-result < /tmp/eval-result.json + + Replace the example values with your actual evaluation results. Scoring guidelines: -- Score >= {threshold_green}: confidenceLevel = "green", passed = true -- Score >= {threshold_yellow} and < {threshold_green}: confidenceLevel = "yellow", use judgment on passed -- Score < {threshold_yellow}: confidenceLevel = "red", passed = false +- overallScore >= {threshold_green}: confidenceLevel = "green", passed = true +- overallScore >= {threshold_yellow} and < {threshold_green}: confidenceLevel = "yellow", use judgment +- overallScore < {threshold_yellow}: confidenceLevel = "red", passed = false - Be specific in reworkInstructions if the step fails — the step will be re-executed with these instructions. +- Set reworkInstructions to null if the step passed. -After writing the evaluation file, you are done. Simply exit."#, +You are done after writing the evaluation file."#, title = directive.title, goal = directive.goal, requirements = serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(), @@ -1488,7 +1631,7 @@ async fn process_monitoring_result( /// Trigger a manual evaluation for a step. Public for use by handlers. pub async fn trigger_manual_evaluation( pool: &PgPool, - _state: &SharedState, + state: &SharedState, owner_id: Uuid, directive_id: Uuid, step_id: Uuid, @@ -1536,7 +1679,7 @@ pub async fn trigger_manual_evaluation( ) .await; - dispatch_monitoring(pool, &directive, &step, &contract, owner_id).await?; + dispatch_monitoring(pool, state, &directive, &step, &contract, owner_id).await?; Ok(updated_step) } |
