summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--makima/frontend/src/components/directives/DirectiveContractsTab.tsx17
-rw-r--r--makima/frontend/src/components/directives/StepDiagram.tsx58
-rw-r--r--makima/frontend/src/lib/api.ts7
-rw-r--r--makima/src/db/repository.rs36
-rw-r--r--makima/src/orchestration/directive.rs217
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs4
6 files changed, 282 insertions, 57 deletions
diff --git a/makima/frontend/src/components/directives/DirectiveContractsTab.tsx b/makima/frontend/src/components/directives/DirectiveContractsTab.tsx
index 59ebfc8..28da117 100644
--- a/makima/frontend/src/components/directives/DirectiveContractsTab.tsx
+++ b/makima/frontend/src/components/directives/DirectiveContractsTab.tsx
@@ -100,6 +100,23 @@ export function DirectiveContractsTab({
label: step.name,
});
}
+ // Show monitoring/evaluation contracts
+ if (step.monitoringContractId) {
+ contracts.push({
+ summary: {
+ id: step.monitoringContractId,
+ name: `${step.name} - Evaluation`,
+ contractType: "monitoring",
+ status: step.status === "evaluating" ? "active" : "completed",
+ phase: "plan",
+ taskCount: 1,
+ tasksDone: step.status === "evaluating" ? 0 : 1,
+ tasksRunning: step.status === "evaluating" ? 1 : 0,
+ tasksFailed: 0,
+ },
+ label: `${step.name} eval`,
+ });
+ }
}
}
diff --git a/makima/frontend/src/components/directives/StepDiagram.tsx b/makima/frontend/src/components/directives/StepDiagram.tsx
index 91a3438..33892e0 100644
--- a/makima/frontend/src/components/directives/StepDiagram.tsx
+++ b/makima/frontend/src/components/directives/StepDiagram.tsx
@@ -41,10 +41,20 @@ const statusColors: Record<string, { border: string; dot: string; bg: string; gl
const statusLabels: Record<string, string> = {
pending: "Pending",
+ ready: "Ready",
running: "Running",
evaluating: "Evaluating",
passed: "Passed",
failed: "Failed",
+ rework: "Rework",
+ skipped: "Skipped",
+ blocked: "Blocked",
+};
+
+const confidenceColors: Record<string, string> = {
+ green: "text-green-400",
+ yellow: "text-yellow-400",
+ red: "text-red-400",
};
/**
@@ -122,6 +132,44 @@ function StepCard({ step }: { step: ChainStep }) {
</p>
)}
+ {/* Evaluation info */}
+ {(step.confidenceScore != null || step.evaluationCount > 0 || step.reworkCount > 0) && (
+ <div className="border-t border-[rgba(117,170,252,0.1)] pt-2 mt-1">
+ <div className="flex items-center gap-2 font-mono text-[9px]">
+ {step.confidenceScore != null && (
+ <span className={confidenceColors[step.confidenceLevel || ""] || "text-[#7788aa]"}>
+ {Math.round(step.confidenceScore * 100)}% confidence
+ </span>
+ )}
+ {step.evaluationCount > 0 && (
+ <span className="text-[#7788aa]">
+ eval #{step.evaluationCount}
+ </span>
+ )}
+ {step.reworkCount > 0 && (
+ <span className="text-orange-400">
+ rework x{step.reworkCount}
+ </span>
+ )}
+ </div>
+ </div>
+ )}
+
+ {/* Monitoring link (when evaluating) */}
+ {step.status === "evaluating" && step.monitoringContractId && (
+ <div className="border-t border-[rgba(117,170,252,0.1)] pt-1.5 mt-1">
+ <span
+ className="font-mono text-[9px] text-blue-400 cursor-pointer hover:text-blue-300"
+ onClick={(e) => {
+ e.stopPropagation();
+ navigate(`/contracts/${step.monitoringContractId}`);
+ }}
+ >
+ evaluation contract &rarr;
+ </span>
+ </div>
+ )}
+
{/* Contract progress */}
{summary && (
<div className="border-t border-[rgba(117,170,252,0.1)] pt-2 mt-1">
@@ -149,7 +197,7 @@ function StepCard({ step }: { step: ChainStep }) {
)}
{/* Contract link arrow */}
- {hasContract && !summary && (
+ {hasContract && !summary && step.status !== "evaluating" && (
<div className="border-t border-[rgba(117,170,252,0.1)] pt-1.5 mt-1">
<span className="font-mono text-[9px] text-[#75aafc]">
view contract &rarr;
@@ -200,7 +248,8 @@ export function StepDiagram({ steps }: StepDiagramProps) {
// Compute overall progress
const passedCount = steps.filter(s => s.status === "passed").length;
const failedCount = steps.filter(s => s.status === "failed").length;
- const runningCount = steps.filter(s => s.status === "running" || s.status === "evaluating").length;
+ const runningCount = steps.filter(s => s.status === "running").length;
+ const evaluatingCount = steps.filter(s => s.status === "evaluating").length;
return (
<div>
@@ -213,7 +262,10 @@ export function StepDiagram({ steps }: StepDiagramProps) {
<span className="text-green-400">{passedCount} passed</span>
)}
{runningCount > 0 && (
- <span className="text-yellow-400">{runningCount} active</span>
+ <span className="text-yellow-400">{runningCount} running</span>
+ )}
+ {evaluatingCount > 0 && (
+ <span className="text-blue-400">{evaluatingCount} evaluating</span>
)}
{failedCount > 0 && (
<span className="text-red-400">{failedCount} failed</span>
diff --git a/makima/frontend/src/lib/api.ts b/makima/frontend/src/lib/api.ts
index 9782a07..5080ee1 100644
--- a/makima/frontend/src/lib/api.ts
+++ b/makima/frontend/src/lib/api.ts
@@ -3100,6 +3100,13 @@ export interface ChainStep {
status: string;
contractId: string | null;
supervisorTaskId: string | null;
+ monitoringContractId: string | null;
+ monitoringTaskId: string | null;
+ confidenceScore: number | null;
+ confidenceLevel: string | null;
+ evaluationCount: number;
+ reworkCount: number;
+ lastEvaluationId: string | null;
orderIndex: number;
startedAt: string | null;
completedAt: string | null;
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index 6b3f15f..4298fa5 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -812,15 +812,18 @@ pub async fn get_pending_tasks_for_contract(
) -> Result<Vec<Task>, sqlx::Error> {
sqlx::query_as::<_, Task>(
r#"
- SELECT * FROM tasks
- WHERE contract_id = $1 AND owner_id = $2
- AND status = 'pending'
- AND is_supervisor = false
- AND retry_count < max_retries
+ SELECT t.* FROM tasks t
+ WHERE t.contract_id = $1 AND t.owner_id = $2
+ AND t.status = 'pending'
+ AND t.retry_count < t.max_retries
+ AND (t.is_supervisor = false
+ OR EXISTS (SELECT 1 FROM contracts c
+ WHERE c.id = t.contract_id
+ AND (c.directive_id IS NOT NULL OR c.is_directive_orchestrator = true)))
ORDER BY
- interrupted_at DESC NULLS LAST,
- priority DESC,
- created_at ASC
+ t.interrupted_at DESC NULLS LAST,
+ t.priority DESC,
+ t.created_at ASC
"#,
)
.bind(contract_id)
@@ -836,13 +839,16 @@ pub async fn get_all_pending_task_contracts(
) -> Result<Vec<(Uuid, Uuid)>, sqlx::Error> {
sqlx::query_as::<_, (Uuid, Uuid)>(
r#"
- SELECT DISTINCT contract_id, owner_id
- FROM tasks
- WHERE contract_id IS NOT NULL
- AND status = 'pending'
- AND is_supervisor = false
- AND retry_count < max_retries
- ORDER BY owner_id, contract_id
+ SELECT DISTINCT t.contract_id, t.owner_id
+ FROM tasks t
+ WHERE t.contract_id IS NOT NULL
+ AND t.status = 'pending'
+ AND t.retry_count < t.max_retries
+ AND (t.is_supervisor = false
+ OR EXISTS (SELECT 1 FROM contracts c
+ WHERE c.id = t.contract_id
+ AND (c.directive_id IS NOT NULL OR c.is_directive_orchestrator = true)))
+ ORDER BY t.owner_id, t.contract_id
"#,
)
.fetch_all(pool)
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs
index 9a65035..46d9425 100644
--- a/makima/src/orchestration/directive.rs
+++ b/makima/src/orchestration/directive.rs
@@ -6,10 +6,11 @@ use uuid::Uuid;
use serde::Serialize;
use crate::db::models::{
- ChainStep, CreateContractRequest, CreateTaskRequest, Directive, Task, UpdateContractRequest,
+ ChainStep, CreateContractRequest, CreateTaskRequest, Directive, Task,
+ UpdateContractRequest, UpdateTaskRequest,
};
use crate::db::repository;
-use crate::server::state::SharedState;
+use crate::server::state::{DaemonCommand, SharedState};
/// A single step in the chain plan produced by the planning supervisor.
#[derive(Debug, Deserialize)]
@@ -43,10 +44,124 @@ struct MonitoringResult {
rework_instructions: Option<String>,
}
+/// Dispatch a task to an available daemon. Finds a connected daemon with capacity,
+/// assigns the task, and sends a SpawnTask command.
+async fn dispatch_task_to_daemon(
+ pool: &PgPool,
+ state: &SharedState,
+ task: &Task,
+ contract_local_only: bool,
+ contract_auto_merge_local: bool,
+ owner_id: Uuid,
+) -> Result<(), String> {
+ // Find available daemons
+ let daemons = repository::get_available_daemons_excluding(pool, owner_id, &[])
+ .await
+ .map_err(|e| format!("Failed to get available daemons: {}", e))?;
+
+ let available_daemon = daemons.iter().find(|d| {
+ d.current_task_count < d.max_concurrent_tasks
+ && state.daemon_connections.contains_key(&d.connection_id)
+ });
+
+ let daemon = match available_daemon {
+ Some(d) => d,
+ None => {
+ tracing::warn!(
+ task_id = %task.id,
+ "No daemon available to dispatch task — will be picked up by retry loop"
+ );
+ return Ok(());
+ }
+ };
+
+ // Assign task to daemon
+ let update_req = UpdateTaskRequest {
+ status: Some("starting".to_string()),
+ daemon_id: Some(daemon.id),
+ version: Some(task.version),
+ ..Default::default()
+ };
+
+ let updated = repository::update_task_for_owner(pool, task.id, owner_id, update_req)
+ .await
+ .map_err(|e| format!("Failed to assign task to daemon: {:?}", e))?;
+
+ let Some(updated_task) = updated else {
+ return Err("Task not found when assigning to daemon".to_string());
+ };
+
+ // Get repo URL from task or contract repositories
+ let repo_url = if let Some(url) = &updated_task.repository_url {
+ Some(url.clone())
+ } else if let Some(contract_id) = updated_task.contract_id {
+ match repository::list_contract_repositories(pool, contract_id).await {
+ Ok(repos) => repos
+ .iter()
+ .find(|r| r.is_primary)
+ .or(repos.first())
+ .and_then(|r| r.repository_url.clone().or_else(|| r.local_path.clone())),
+ Err(_) => None,
+ }
+ } else {
+ None
+ };
+
+ let cmd = DaemonCommand::SpawnTask {
+ task_id: updated_task.id,
+ task_name: updated_task.name.clone(),
+ plan: updated_task.plan.clone(),
+ repo_url,
+ base_branch: updated_task.base_branch.clone(),
+ target_branch: updated_task.target_branch.clone(),
+ parent_task_id: updated_task.parent_task_id,
+ depth: updated_task.depth,
+ is_orchestrator: false,
+ target_repo_path: updated_task.target_repo_path.clone(),
+ completion_action: updated_task.completion_action.clone(),
+ continue_from_task_id: updated_task.continue_from_task_id,
+ copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
+ contract_id: updated_task.contract_id,
+ is_supervisor: updated_task.is_supervisor,
+ autonomous_loop: updated_task.contract_id.is_some(),
+ resume_session: false,
+ conversation_history: None,
+ patch_data: None,
+ patch_base_sha: None,
+ local_only: contract_local_only,
+ auto_merge_local: contract_auto_merge_local,
+ supervisor_worktree_task_id: None,
+ };
+
+ if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
+ tracing::warn!(
+ task_id = %task.id,
+ daemon_id = %daemon.id,
+ error = %e,
+ "Failed to send spawn command — rolling back"
+ );
+ let rollback = UpdateTaskRequest {
+ status: Some("pending".to_string()),
+ clear_daemon_id: true,
+ ..Default::default()
+ };
+ let _ = repository::update_task_for_owner(pool, task.id, owner_id, rollback).await;
+ return Ok(()); // Non-fatal, retry loop will pick it up
+ }
+
+ tracing::info!(
+ task_id = %task.id,
+ daemon_id = %daemon.id,
+ "Dispatched directive task to daemon"
+ );
+
+ Ok(())
+}
+
/// Initialize a directive: create a planning contract and transition to "planning".
pub async fn init_directive(
pool: &PgPool,
- _state: &SharedState,
+ state: &SharedState,
owner_id: Uuid,
directive_id: Uuid,
) -> Result<Directive, String> {
@@ -179,6 +294,13 @@ pub async fn init_directive(
"Directive started: planning contract created"
);
+ // 10. Dispatch planning task to an available daemon immediately
+ dispatch_task_to_daemon(
+ pool, state, &supervisor_task,
+ contract.local_only, contract.auto_merge_local,
+ owner_id,
+ ).await?;
+
Ok(updated)
}
@@ -334,7 +456,7 @@ pub async fn on_contract_completed(
)
.await;
- dispatch_monitoring(pool, &directive, &step, contract, owner_id).await?;
+ dispatch_monitoring(pool, state, &directive, &step, contract, owner_id).await?;
}
}
}
@@ -421,7 +543,7 @@ async fn on_planning_completed(
/// Checks if a plan was submitted via the CLI; if not, retries or fails.
async fn handle_planning_completion(
pool: &PgPool,
- _state: &SharedState,
+ state: &SharedState,
directive: &Directive,
owner_id: Uuid,
) -> Result<(), String> {
@@ -488,7 +610,7 @@ async fn handle_planning_completion(
.map_err(|e| format!("Failed to reset directive status: {}", e))?;
// Re-init planning
- init_directive(pool, _state, owner_id, directive.id).await?;
+ init_directive(pool, state, owner_id, directive.id).await?;
Ok(())
} else {
@@ -688,7 +810,7 @@ async fn on_step_completed(
"Step task done, dispatching monitoring evaluation"
);
- dispatch_monitoring(pool, &directive, &step, contract, owner_id).await
+ dispatch_monitoring(pool, state, &directive, &step, contract, owner_id).await
} else {
// Step task failed — mark step failed and advance
repository::update_step_status(pool, step.id, "failed")
@@ -711,7 +833,7 @@ async fn on_step_completed(
/// Check chain progress and dispatch ready steps or mark directive complete.
async fn advance_chain(
pool: &PgPool,
- _state: &SharedState,
+ state: &SharedState,
directive: &Directive,
owner_id: Uuid,
) -> Result<(), String> {
@@ -755,7 +877,7 @@ async fn advance_chain(
.map_err(|e| format!("Failed to find ready steps: {}", e))?;
for step in ready_steps {
- if let Err(e) = dispatch_step(pool, directive, &step, owner_id).await {
+ if let Err(e) = dispatch_step(pool, state, directive, &step, owner_id).await {
tracing::error!(
step_id = %step.id,
step_name = %step.name,
@@ -771,6 +893,7 @@ async fn advance_chain(
/// Dispatch a single chain step as a new contract with supervisor.
async fn dispatch_step(
pool: &PgPool,
+ state: &SharedState,
directive: &Directive,
step: &crate::db::models::ChainStep,
owner_id: Uuid,
@@ -914,6 +1037,13 @@ async fn dispatch_step(
"Step dispatched"
);
+ // Dispatch step task to an available daemon immediately
+ dispatch_task_to_daemon(
+ pool, state, &supervisor_task,
+ contract.local_only, contract.auto_merge_local,
+ owner_id,
+ ).await?;
+
Ok(())
}
@@ -1038,6 +1168,7 @@ fn extract_plan_json(body: &[crate::db::models::BodyElement]) -> Option<String>
/// Dispatch a monitoring contract to evaluate a completed step.
async fn dispatch_monitoring(
pool: &PgPool,
+ state: &SharedState,
directive: &Directive,
step: &ChainStep,
step_contract: &crate::db::models::Contract,
@@ -1153,6 +1284,13 @@ async fn dispatch_monitoring(
"Monitoring evaluation dispatched"
);
+ // Dispatch monitoring task to an available daemon immediately
+ dispatch_task_to_daemon(
+ pool, state, &supervisor_task,
+ contract.local_only, contract.auto_merge_local,
+ owner_id,
+ ).await?;
+
Ok(())
}
@@ -1181,40 +1319,45 @@ CONFIDENCE THRESHOLDS:
- Yellow (marginal): >= {threshold_yellow}
- Red (fail): < {threshold_yellow}
-Your job:
+INSTRUCTIONS:
1. Read the step contract's files to understand what was delivered:
makima contract files --contract-id {step_contract_id}
makima contract file <file_id> --contract-id {step_contract_id}
-2. Evaluate whether the step's output meets the directive's requirements and the step's specific task plan.
-
-3. Write your evaluation result as a JSON file named "evaluation-result" to this contract:
- makima contract create-file "evaluation-result" < evaluation.json
-
-The JSON format:
-{{
- "passed": true/false,
- "overallScore": 0.0-1.0,
- "confidenceLevel": "green" | "yellow" | "red",
- "criteriaResults": [
- {{
- "criterion": "Description of what was checked",
- "passed": true/false,
- "score": 0.0-1.0,
- "evidence": "Evidence supporting the assessment"
- }}
- ],
- "summaryFeedback": "Brief summary of the evaluation",
- "reworkInstructions": "If failed, specific instructions for rework (null if passed)"
-}}
+2. Evaluate whether the step's output meets the directive's requirements and the step's task plan.
+
+3. Write your evaluation as a JSON file to this monitoring contract. Create a file called
+ evaluation.json with the JSON content first, then upload it:
+
+ cat > /tmp/eval-result.json << 'EVALEOF'
+ {{
+ "passed": true,
+ "overallScore": 0.85,
+ "confidenceLevel": "green",
+ "criteriaResults": [
+ {{
+ "criterion": "Example criterion",
+ "passed": true,
+ "score": 0.9,
+ "evidence": "What was found"
+ }}
+ ],
+ "summaryFeedback": "Summary of evaluation",
+ "reworkInstructions": null
+ }}
+ EVALEOF
+ makima contract create-file evaluation-result < /tmp/eval-result.json
+
+ Replace the example values with your actual evaluation results.
Scoring guidelines:
-- Score >= {threshold_green}: confidenceLevel = "green", passed = true
-- Score >= {threshold_yellow} and < {threshold_green}: confidenceLevel = "yellow", use judgment on passed
-- Score < {threshold_yellow}: confidenceLevel = "red", passed = false
+- overallScore >= {threshold_green}: confidenceLevel = "green", passed = true
+- overallScore >= {threshold_yellow} and < {threshold_green}: confidenceLevel = "yellow", use judgment
+- overallScore < {threshold_yellow}: confidenceLevel = "red", passed = false
- Be specific in reworkInstructions if the step fails — the step will be re-executed with these instructions.
+- Set reworkInstructions to null if the step passed.
-After writing the evaluation file, you are done. Simply exit."#,
+You are done after writing the evaluation file."#,
title = directive.title,
goal = directive.goal,
requirements = serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(),
@@ -1488,7 +1631,7 @@ async fn process_monitoring_result(
/// Trigger a manual evaluation for a step. Public for use by handlers.
pub async fn trigger_manual_evaluation(
pool: &PgPool,
- _state: &SharedState,
+ state: &SharedState,
owner_id: Uuid,
directive_id: Uuid,
step_id: Uuid,
@@ -1536,7 +1679,7 @@ pub async fn trigger_manual_evaluation(
)
.await;
- dispatch_monitoring(pool, &directive, &step, &contract, owner_id).await?;
+ dispatch_monitoring(pool, state, &directive, &step, &contract, owner_id).await?;
Ok(updated_step)
}
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index 43388a8..09758bb 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -400,8 +400,8 @@ pub async fn try_start_pending_task(
continue_from_task_id: updated_task.continue_from_task_id,
copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: updated_task.contract_id,
- is_supervisor: false,
- autonomous_loop: false,
+ is_supervisor: updated_task.is_supervisor,
+ autonomous_loop: updated_task.is_supervisor,
resume_session: task.retry_count > 0, // Use --continue for retried tasks
conversation_history: None,
patch_data,