summaryrefslogtreecommitdiff
path: root/makima/src/orchestration
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-02-08 19:50:20 +0000
committersoryu <soryu@soryu.co>2026-02-08 19:50:20 +0000
commit87fa8c4af66745bd30bc84b6c5ef657dd4dec002 (patch)
treecf8216c349086819300b4400c2db27c5c97be62c /makima/src/orchestration
parent2166befc8869dbb76008a1fe62f28a4936e77bce (diff)
downloadsoryu-87fa8c4af66745bd30bc84b6c5ef657dd4dec002.tar.gz
soryu-87fa8c4af66745bd30bc84b6c5ef657dd4dec002.zip
Fix directive evaluation and add to frontend
Diffstat (limited to 'makima/src/orchestration')
-rw-r--r--makima/src/orchestration/directive.rs217
1 files changed, 180 insertions, 37 deletions
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs
index 9a65035..46d9425 100644
--- a/makima/src/orchestration/directive.rs
+++ b/makima/src/orchestration/directive.rs
@@ -6,10 +6,11 @@ use uuid::Uuid;
use serde::Serialize;
use crate::db::models::{
- ChainStep, CreateContractRequest, CreateTaskRequest, Directive, Task, UpdateContractRequest,
+ ChainStep, CreateContractRequest, CreateTaskRequest, Directive, Task,
+ UpdateContractRequest, UpdateTaskRequest,
};
use crate::db::repository;
-use crate::server::state::SharedState;
+use crate::server::state::{DaemonCommand, SharedState};
/// A single step in the chain plan produced by the planning supervisor.
#[derive(Debug, Deserialize)]
@@ -43,10 +44,124 @@ struct MonitoringResult {
rework_instructions: Option<String>,
}
+/// Dispatch a task to an available daemon. Finds a connected daemon with capacity,
+/// assigns the task, and sends a SpawnTask command.
+async fn dispatch_task_to_daemon(
+ pool: &PgPool,
+ state: &SharedState,
+ task: &Task,
+ contract_local_only: bool,
+ contract_auto_merge_local: bool,
+ owner_id: Uuid,
+) -> Result<(), String> {
+ // Find available daemons
+ let daemons = repository::get_available_daemons_excluding(pool, owner_id, &[])
+ .await
+ .map_err(|e| format!("Failed to get available daemons: {}", e))?;
+
+ let available_daemon = daemons.iter().find(|d| {
+ d.current_task_count < d.max_concurrent_tasks
+ && state.daemon_connections.contains_key(&d.connection_id)
+ });
+
+ let daemon = match available_daemon {
+ Some(d) => d,
+ None => {
+ tracing::warn!(
+ task_id = %task.id,
+ "No daemon available to dispatch task — will be picked up by retry loop"
+ );
+ return Ok(());
+ }
+ };
+
+ // Assign task to daemon
+ let update_req = UpdateTaskRequest {
+ status: Some("starting".to_string()),
+ daemon_id: Some(daemon.id),
+ version: Some(task.version),
+ ..Default::default()
+ };
+
+ let updated = repository::update_task_for_owner(pool, task.id, owner_id, update_req)
+ .await
+ .map_err(|e| format!("Failed to assign task to daemon: {:?}", e))?;
+
+ let Some(updated_task) = updated else {
+ return Err("Task not found when assigning to daemon".to_string());
+ };
+
+ // Get repo URL from task or contract repositories
+ let repo_url = if let Some(url) = &updated_task.repository_url {
+ Some(url.clone())
+ } else if let Some(contract_id) = updated_task.contract_id {
+ match repository::list_contract_repositories(pool, contract_id).await {
+ Ok(repos) => repos
+ .iter()
+ .find(|r| r.is_primary)
+ .or(repos.first())
+ .and_then(|r| r.repository_url.clone().or_else(|| r.local_path.clone())),
+ Err(_) => None,
+ }
+ } else {
+ None
+ };
+
+ let cmd = DaemonCommand::SpawnTask {
+ task_id: updated_task.id,
+ task_name: updated_task.name.clone(),
+ plan: updated_task.plan.clone(),
+ repo_url,
+ base_branch: updated_task.base_branch.clone(),
+ target_branch: updated_task.target_branch.clone(),
+ parent_task_id: updated_task.parent_task_id,
+ depth: updated_task.depth,
+ is_orchestrator: false,
+ target_repo_path: updated_task.target_repo_path.clone(),
+ completion_action: updated_task.completion_action.clone(),
+ continue_from_task_id: updated_task.continue_from_task_id,
+ copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
+ contract_id: updated_task.contract_id,
+ is_supervisor: updated_task.is_supervisor,
+ autonomous_loop: updated_task.contract_id.is_some(),
+ resume_session: false,
+ conversation_history: None,
+ patch_data: None,
+ patch_base_sha: None,
+ local_only: contract_local_only,
+ auto_merge_local: contract_auto_merge_local,
+ supervisor_worktree_task_id: None,
+ };
+
+ if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
+ tracing::warn!(
+ task_id = %task.id,
+ daemon_id = %daemon.id,
+ error = %e,
+ "Failed to send spawn command — rolling back"
+ );
+ let rollback = UpdateTaskRequest {
+ status: Some("pending".to_string()),
+ clear_daemon_id: true,
+ ..Default::default()
+ };
+ let _ = repository::update_task_for_owner(pool, task.id, owner_id, rollback).await;
+ return Ok(()); // Non-fatal, retry loop will pick it up
+ }
+
+ tracing::info!(
+ task_id = %task.id,
+ daemon_id = %daemon.id,
+ "Dispatched directive task to daemon"
+ );
+
+ Ok(())
+}
+
/// Initialize a directive: create a planning contract and transition to "planning".
pub async fn init_directive(
pool: &PgPool,
- _state: &SharedState,
+ state: &SharedState,
owner_id: Uuid,
directive_id: Uuid,
) -> Result<Directive, String> {
@@ -179,6 +294,13 @@ pub async fn init_directive(
"Directive started: planning contract created"
);
+ // 10. Dispatch planning task to an available daemon immediately
+ dispatch_task_to_daemon(
+ pool, state, &supervisor_task,
+ contract.local_only, contract.auto_merge_local,
+ owner_id,
+ ).await?;
+
Ok(updated)
}
@@ -334,7 +456,7 @@ pub async fn on_contract_completed(
)
.await;
- dispatch_monitoring(pool, &directive, &step, contract, owner_id).await?;
+ dispatch_monitoring(pool, state, &directive, &step, contract, owner_id).await?;
}
}
}
@@ -421,7 +543,7 @@ async fn on_planning_completed(
/// Checks if a plan was submitted via the CLI; if not, retries or fails.
async fn handle_planning_completion(
pool: &PgPool,
- _state: &SharedState,
+ state: &SharedState,
directive: &Directive,
owner_id: Uuid,
) -> Result<(), String> {
@@ -488,7 +610,7 @@ async fn handle_planning_completion(
.map_err(|e| format!("Failed to reset directive status: {}", e))?;
// Re-init planning
- init_directive(pool, _state, owner_id, directive.id).await?;
+ init_directive(pool, state, owner_id, directive.id).await?;
Ok(())
} else {
@@ -688,7 +810,7 @@ async fn on_step_completed(
"Step task done, dispatching monitoring evaluation"
);
- dispatch_monitoring(pool, &directive, &step, contract, owner_id).await
+ dispatch_monitoring(pool, state, &directive, &step, contract, owner_id).await
} else {
// Step task failed — mark step failed and advance
repository::update_step_status(pool, step.id, "failed")
@@ -711,7 +833,7 @@ async fn on_step_completed(
/// Check chain progress and dispatch ready steps or mark directive complete.
async fn advance_chain(
pool: &PgPool,
- _state: &SharedState,
+ state: &SharedState,
directive: &Directive,
owner_id: Uuid,
) -> Result<(), String> {
@@ -755,7 +877,7 @@ async fn advance_chain(
.map_err(|e| format!("Failed to find ready steps: {}", e))?;
for step in ready_steps {
- if let Err(e) = dispatch_step(pool, directive, &step, owner_id).await {
+ if let Err(e) = dispatch_step(pool, state, directive, &step, owner_id).await {
tracing::error!(
step_id = %step.id,
step_name = %step.name,
@@ -771,6 +893,7 @@ async fn advance_chain(
/// Dispatch a single chain step as a new contract with supervisor.
async fn dispatch_step(
pool: &PgPool,
+ state: &SharedState,
directive: &Directive,
step: &crate::db::models::ChainStep,
owner_id: Uuid,
@@ -914,6 +1037,13 @@ async fn dispatch_step(
"Step dispatched"
);
+ // Dispatch step task to an available daemon immediately
+ dispatch_task_to_daemon(
+ pool, state, &supervisor_task,
+ contract.local_only, contract.auto_merge_local,
+ owner_id,
+ ).await?;
+
Ok(())
}
@@ -1038,6 +1168,7 @@ fn extract_plan_json(body: &[crate::db::models::BodyElement]) -> Option<String>
/// Dispatch a monitoring contract to evaluate a completed step.
async fn dispatch_monitoring(
pool: &PgPool,
+ state: &SharedState,
directive: &Directive,
step: &ChainStep,
step_contract: &crate::db::models::Contract,
@@ -1153,6 +1284,13 @@ async fn dispatch_monitoring(
"Monitoring evaluation dispatched"
);
+ // Dispatch monitoring task to an available daemon immediately
+ dispatch_task_to_daemon(
+ pool, state, &supervisor_task,
+ contract.local_only, contract.auto_merge_local,
+ owner_id,
+ ).await?;
+
Ok(())
}
@@ -1181,40 +1319,45 @@ CONFIDENCE THRESHOLDS:
- Yellow (marginal): >= {threshold_yellow}
- Red (fail): < {threshold_yellow}
-Your job:
+INSTRUCTIONS:
1. Read the step contract's files to understand what was delivered:
makima contract files --contract-id {step_contract_id}
makima contract file <file_id> --contract-id {step_contract_id}
-2. Evaluate whether the step's output meets the directive's requirements and the step's specific task plan.
-
-3. Write your evaluation result as a JSON file named "evaluation-result" to this contract:
- makima contract create-file "evaluation-result" < evaluation.json
-
-The JSON format:
-{{
- "passed": true/false,
- "overallScore": 0.0-1.0,
- "confidenceLevel": "green" | "yellow" | "red",
- "criteriaResults": [
- {{
- "criterion": "Description of what was checked",
- "passed": true/false,
- "score": 0.0-1.0,
- "evidence": "Evidence supporting the assessment"
- }}
- ],
- "summaryFeedback": "Brief summary of the evaluation",
- "reworkInstructions": "If failed, specific instructions for rework (null if passed)"
-}}
+2. Evaluate whether the step's output meets the directive's requirements and the step's task plan.
+
+3. Write your evaluation as a JSON file to this monitoring contract. Create a file called
+ evaluation.json with the JSON content first, then upload it:
+
+ cat > /tmp/eval-result.json << 'EVALEOF'
+ {{
+ "passed": true,
+ "overallScore": 0.85,
+ "confidenceLevel": "green",
+ "criteriaResults": [
+ {{
+ "criterion": "Example criterion",
+ "passed": true,
+ "score": 0.9,
+ "evidence": "What was found"
+ }}
+ ],
+ "summaryFeedback": "Summary of evaluation",
+ "reworkInstructions": null
+ }}
+ EVALEOF
+ makima contract create-file evaluation-result < /tmp/eval-result.json
+
+ Replace the example values with your actual evaluation results.
Scoring guidelines:
-- Score >= {threshold_green}: confidenceLevel = "green", passed = true
-- Score >= {threshold_yellow} and < {threshold_green}: confidenceLevel = "yellow", use judgment on passed
-- Score < {threshold_yellow}: confidenceLevel = "red", passed = false
+- overallScore >= {threshold_green}: confidenceLevel = "green", passed = true
+- overallScore >= {threshold_yellow} and < {threshold_green}: confidenceLevel = "yellow", use judgment
+- overallScore < {threshold_yellow}: confidenceLevel = "red", passed = false
- Be specific in reworkInstructions if the step fails — the step will be re-executed with these instructions.
+- Set reworkInstructions to null if the step passed.
-After writing the evaluation file, you are done. Simply exit."#,
+You are done after writing the evaluation file."#,
title = directive.title,
goal = directive.goal,
requirements = serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(),
@@ -1488,7 +1631,7 @@ async fn process_monitoring_result(
/// Trigger a manual evaluation for a step. Public for use by handlers.
pub async fn trigger_manual_evaluation(
pool: &PgPool,
- _state: &SharedState,
+ state: &SharedState,
owner_id: Uuid,
directive_id: Uuid,
step_id: Uuid,
@@ -1536,7 +1679,7 @@ pub async fn trigger_manual_evaluation(
)
.await;
- dispatch_monitoring(pool, &directive, &step, &contract, owner_id).await?;
+ dispatch_monitoring(pool, state, &directive, &step, &contract, owner_id).await?;
Ok(updated_step)
}