From 1b72449496ce3a057a43d002c8042d5e7a1d6576 Mon Sep 17 00:00:00 2001 From: soryu Date: Sat, 7 Feb 2026 16:36:19 +0000 Subject: Add directive init mechanism --- makima/src/orchestration/directive.rs | 736 ++++++++++++++++++++++++++++++++++ makima/src/orchestration/mod.rs | 1 + 2 files changed, 737 insertions(+) create mode 100644 makima/src/orchestration/directive.rs create mode 100644 makima/src/orchestration/mod.rs (limited to 'makima/src/orchestration') diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs new file mode 100644 index 0000000..d17deeb --- /dev/null +++ b/makima/src/orchestration/directive.rs @@ -0,0 +1,736 @@ +//! Directive orchestration — init, planning completion, chain advancement. + +use serde::Deserialize; +use sqlx::PgPool; +use uuid::Uuid; + +use crate::db::models::{ + CreateContractRequest, CreateTaskRequest, Directive, Task, UpdateContractRequest, +}; +use crate::db::repository; +use crate::server::state::SharedState; + +/// A single step in the chain plan produced by the planning supervisor. +#[derive(Debug, Deserialize)] +struct ChainPlanStep { + name: String, + description: String, + task_plan: String, + #[serde(default)] + depends_on: Vec, // names of steps this depends on +} + +/// Wrapper for the plan JSON written by the planning supervisor. +#[derive(Debug, Deserialize)] +struct ChainPlan { + steps: Vec, +} + +/// Initialize a directive: create a planning contract and transition to "planning". +pub async fn init_directive( + pool: &PgPool, + _state: &SharedState, + owner_id: Uuid, + directive_id: Uuid, +) -> Result { + // 1. Get directive, verify status + let directive = repository::get_directive_for_owner(pool, directive_id, owner_id) + .await + .map_err(|e| format!("Failed to get directive: {}", e))? + .ok_or("Directive not found")?; + + if directive.status != "draft" { + return Err(format!( + "Directive must be in 'draft' status to start, current status: '{}'", + directive.status + )); + } + + // 2. Create planning contract + let contract = repository::create_contract_for_owner( + pool, + owner_id, + CreateContractRequest { + name: format!("{} - Planning", directive.title), + description: Some(format!( + "Planning contract for directive: {}", + directive.title + )), + contract_type: Some("simple".to_string()), + template_id: None, + initial_phase: Some("plan".to_string()), + autonomous_loop: Some(true), + phase_guard: None, + local_only: Some(true), + auto_merge_local: None, + }, + ) + .await + .map_err(|e| format!("Failed to create planning contract: {}", e))?; + + // 3. Mark contract as directive orchestrator + repository::set_contract_directive_fields(pool, contract.id, Some(directive_id), true) + .await + .map_err(|e| format!("Failed to set contract directive fields: {}", e))?; + + // 4. Build planning prompt + let planning_prompt = build_planning_prompt(&directive); + + // 5. Create supervisor task + let supervisor_task = repository::create_task_for_owner( + pool, + owner_id, + CreateTaskRequest { + contract_id: Some(contract.id), + name: format!("{} - Planner", directive.title), + description: Some("Decompose directive goal into executable chain steps".to_string()), + plan: planning_prompt, + parent_task_id: None, + is_supervisor: true, + priority: 10, + repository_url: directive.repository_url.clone(), + base_branch: directive.base_branch.clone(), + target_branch: None, + merge_mode: None, + target_repo_path: directive.local_path.clone(), + completion_action: None, + continue_from_task_id: None, + copy_files: None, + checkpoint_sha: None, + branched_from_task_id: None, + conversation_history: None, + supervisor_worktree_task_id: None, + }, + ) + .await + .map_err(|e| format!("Failed to create supervisor task: {}", e))?; + + // 6. Link supervisor to contract + repository::update_contract_for_owner( + pool, + contract.id, + owner_id, + UpdateContractRequest { + supervisor_task_id: Some(supervisor_task.id), + ..Default::default() + }, + ) + .await + .map_err(|e| match e { + crate::db::repository::RepositoryError::Database(e) => { + format!("Failed to link supervisor to contract: {}", e) + } + other => format!("Failed to link supervisor to contract: {:?}", other), + })?; + + // 7. Set orchestrator_contract_id on directive + repository::set_directive_orchestrator_contract(pool, directive_id, contract.id) + .await + .map_err(|e| format!("Failed to set orchestrator contract: {}", e))?; + + // 8. Transition directive to "planning" + let updated = repository::update_directive_status(pool, directive_id, "planning") + .await + .map_err(|e| format!("Failed to update directive status: {}", e))? + .ok_or("Directive not found after status update")?; + + // 9. Copy repo config to contract if repository_url is set + if let Some(ref repo_url) = directive.repository_url { + let _ = repository::add_remote_repository( + pool, + contract.id, + "directive-repo", + repo_url, + true, + ) + .await; + } else if let Some(ref local_path) = directive.local_path { + let _ = repository::add_local_repository( + pool, + contract.id, + "directive-repo", + local_path, + true, + ) + .await; + } + + tracing::info!( + directive_id = %directive_id, + contract_id = %contract.id, + task_id = %supervisor_task.id, + "Directive started: planning contract created" + ); + + Ok(updated) +} + +/// Called when any task completes — checks if it's directive-related and advances. +pub async fn on_task_completed( + pool: &PgPool, + state: &SharedState, + task: &Task, + owner_id: Uuid, +) -> Result<(), String> { + let Some(contract_id) = task.contract_id else { + return Ok(()); + }; + + let contract = repository::get_contract_for_owner(pool, contract_id, owner_id) + .await + .map_err(|e| format!("Failed to get contract: {}", e))?; + + let Some(contract) = contract else { + return Ok(()); + }; + + if contract.is_directive_orchestrator { + // This is a planning contract completion + let directive = + repository::get_directive_by_orchestrator_contract(pool, contract_id) + .await + .map_err(|e| format!("Failed to get directive by orchestrator: {}", e))?; + + if let Some(directive) = directive { + on_planning_completed(pool, state, &directive, task, owner_id).await?; + } + } else if contract.directive_id.is_some() { + // This is a step contract completion + on_step_completed(pool, state, &contract, task, owner_id).await?; + } + + Ok(()) +} + +/// Handle planning task completion: parse chain plan, create steps, advance. +async fn on_planning_completed( + pool: &PgPool, + state: &SharedState, + directive: &Directive, + task: &Task, + owner_id: Uuid, +) -> Result<(), String> { + // If task failed, fail the directive + if task.status == "failed" { + tracing::warn!( + directive_id = %directive.id, + task_id = %task.id, + "Planning task failed, marking directive as failed" + ); + repository::update_directive_status(pool, directive.id, "failed") + .await + .map_err(|e| format!("Failed to update directive status: {}", e))?; + return Ok(()); + } + + // Only process when the supervisor task itself is done + if task.status != "done" || !task.is_supervisor { + return Ok(()); + } + + let Some(contract_id) = task.contract_id else { + return Ok(()); + }; + + // Get contract files to find the chain plan + let files = repository::list_files_in_contract(pool, contract_id, owner_id) + .await + .map_err(|e| format!("Failed to list contract files: {}", e))?; + + // Find the chain plan file + let plan_file = files.iter().find(|f| { + let name_lower = f.name.to_lowercase(); + name_lower.contains("chain") || name_lower.contains("plan") + }); + + let plan_file = plan_file.or_else(|| files.first()); + + let Some(plan_file) = plan_file else { + tracing::warn!( + directive_id = %directive.id, + "No plan file found in planning contract, marking directive failed" + ); + repository::update_directive_status(pool, directive.id, "failed") + .await + .map_err(|e| format!("Failed to update directive status: {}", e))?; + return Ok(()); + }; + + // Read the full file to get the body content + let full_file = repository::get_file(pool, plan_file.id) + .await + .map_err(|e| format!("Failed to get plan file: {}", e))? + .ok_or("Plan file not found")?; + + // Extract JSON from the file body elements + let plan_json = extract_plan_json(&full_file.body); + + let Some(plan_json) = plan_json else { + tracing::warn!( + directive_id = %directive.id, + "Could not extract chain plan JSON from file body, marking directive failed" + ); + repository::update_directive_status(pool, directive.id, "failed") + .await + .map_err(|e| format!("Failed to update directive status: {}", e))?; + return Ok(()); + }; + + let chain_plan: ChainPlan = serde_json::from_str(&plan_json).map_err(|e| { + format!("Failed to parse chain plan JSON: {}", e) + })?; + + if chain_plan.steps.is_empty() { + tracing::warn!( + directive_id = %directive.id, + "Chain plan has no steps, marking directive failed" + ); + repository::update_directive_status(pool, directive.id, "failed") + .await + .map_err(|e| format!("Failed to update directive status: {}", e))?; + return Ok(()); + } + + // Create chain + let chain = repository::create_directive_chain( + pool, + directive.id, + &format!("{} - Chain", directive.title), + Some("Auto-generated from planning"), + None, + chain_plan.steps.len() as i32, + ) + .await + .map_err(|e| format!("Failed to create directive chain: {}", e))?; + + // Create steps (two passes: first create all, then resolve dependencies) + let mut step_ids: Vec<(String, Uuid)> = Vec::new(); + + for (i, plan_step) in chain_plan.steps.iter().enumerate() { + let step = repository::create_chain_step( + pool, + chain.id, + &plan_step.name, + Some(&plan_step.description), + "task", + "simple", + Some("plan"), + Some(&plan_step.task_plan), + None, // dependencies set in second pass + i as i32, + ) + .await + .map_err(|e| format!("Failed to create chain step: {}", e))?; + + step_ids.push((plan_step.name.clone(), step.id)); + } + + // Second pass: resolve name-based dependencies to UUIDs and update + for (i, plan_step) in chain_plan.steps.iter().enumerate() { + if plan_step.depends_on.is_empty() { + continue; + } + + let dep_uuids: Vec = plan_step + .depends_on + .iter() + .filter_map(|dep_name| { + step_ids + .iter() + .find(|(name, _)| name == dep_name) + .map(|(_, id)| *id) + }) + .collect(); + + if !dep_uuids.is_empty() { + let step_id = step_ids[i].1; + sqlx::query( + "UPDATE chain_steps SET depends_on = $2 WHERE id = $1", + ) + .bind(step_id) + .bind(&dep_uuids) + .execute(pool) + .await + .map_err(|e| format!("Failed to update step dependencies: {}", e))?; + } + } + + // Set current chain on directive + repository::set_directive_current_chain(pool, directive.id, chain.id) + .await + .map_err(|e| format!("Failed to set current chain: {}", e))?; + + // Transition directive to active + let updated_directive = repository::update_directive_status(pool, directive.id, "active") + .await + .map_err(|e| format!("Failed to update directive status: {}", e))? + .ok_or("Directive not found after status update")?; + + tracing::info!( + directive_id = %directive.id, + chain_id = %chain.id, + step_count = chain_plan.steps.len(), + "Chain plan created, advancing chain" + ); + + // Advance chain to dispatch ready steps + advance_chain(pool, state, &updated_directive, owner_id).await +} + +/// Handle a step contract task completion. +async fn on_step_completed( + pool: &PgPool, + state: &SharedState, + contract: &crate::db::models::Contract, + task: &Task, + owner_id: Uuid, +) -> Result<(), String> { + // Only process supervisor task completions + if !task.is_supervisor { + return Ok(()); + } + + let Some(directive_id) = contract.directive_id else { + return Ok(()); + }; + + // Find the step linked to this contract + let step = repository::get_step_by_contract_id(pool, contract.id) + .await + .map_err(|e| format!("Failed to get step by contract: {}", e))?; + + let Some(step) = step else { + return Ok(()); + }; + + // Update step status based on task outcome + let new_status = if task.status == "done" { + "passed" + } else { + "failed" + }; + + repository::update_step_status(pool, step.id, new_status) + .await + .map_err(|e| format!("Failed to update step status: {}", e))?; + + tracing::info!( + directive_id = %directive_id, + step_id = %step.id, + step_name = %step.name, + new_status = new_status, + "Step completed" + ); + + // Get the directive and advance + let directive = repository::get_directive(pool, directive_id) + .await + .map_err(|e| format!("Failed to get directive: {}", e))? + .ok_or("Directive not found")?; + + advance_chain(pool, state, &directive, owner_id).await +} + +/// Check chain progress and dispatch ready steps or mark directive complete. +async fn advance_chain( + pool: &PgPool, + _state: &SharedState, + directive: &Directive, + owner_id: Uuid, +) -> Result<(), String> { + let Some(chain_id) = directive.current_chain_id else { + return Ok(()); + }; + + let steps = repository::list_steps_for_chain(pool, chain_id) + .await + .map_err(|e| format!("Failed to list steps: {}", e))?; + + // Check if all steps passed + let all_passed = steps.iter().all(|s| s.status == "passed"); + if all_passed && !steps.is_empty() { + repository::update_chain_status(pool, chain_id, "completed") + .await + .map_err(|e| format!("Failed to update chain status: {}", e))?; + repository::update_directive_status(pool, directive.id, "completed") + .await + .map_err(|e| format!("Failed to update directive status: {}", e))?; + tracing::info!(directive_id = %directive.id, "Directive completed: all steps passed"); + return Ok(()); + } + + // Check if any step failed + let any_failed = steps.iter().any(|s| s.status == "failed"); + if any_failed { + repository::update_chain_status(pool, chain_id, "failed") + .await + .map_err(|e| format!("Failed to update chain status: {}", e))?; + repository::update_directive_status(pool, directive.id, "failed") + .await + .map_err(|e| format!("Failed to update directive status: {}", e))?; + tracing::info!(directive_id = %directive.id, "Directive failed: step failure detected"); + return Ok(()); + } + + // Find and dispatch ready steps + let ready_steps = repository::find_ready_steps(pool, chain_id) + .await + .map_err(|e| format!("Failed to find ready steps: {}", e))?; + + for step in ready_steps { + if let Err(e) = dispatch_step(pool, directive, &step, owner_id).await { + tracing::error!( + step_id = %step.id, + step_name = %step.name, + error = %e, + "Failed to dispatch step" + ); + } + } + + Ok(()) +} + +/// Dispatch a single chain step as a new contract with supervisor. +async fn dispatch_step( + pool: &PgPool, + directive: &Directive, + step: &crate::db::models::ChainStep, + owner_id: Uuid, +) -> Result<(), String> { + // Mark step as running + repository::update_step_status(pool, step.id, "running") + .await + .map_err(|e| format!("Failed to update step status: {}", e))?; + + // Create contract for this step + let contract = repository::create_contract_for_owner( + pool, + owner_id, + CreateContractRequest { + name: step.name.clone(), + description: step.description.clone(), + contract_type: Some(step.contract_type.clone()), + template_id: None, + initial_phase: step.initial_phase.clone(), + autonomous_loop: Some(true), + phase_guard: None, + local_only: Some(true), + auto_merge_local: None, + }, + ) + .await + .map_err(|e| format!("Failed to create step contract: {}", e))?; + + // Set directive_id on contract + repository::set_contract_directive_fields(pool, contract.id, Some(directive.id), false) + .await + .map_err(|e| format!("Failed to set contract directive fields: {}", e))?; + + // Build the task plan + let task_plan = step + .task_plan + .clone() + .unwrap_or_else(|| format!("Execute step: {}", step.name)); + + // Create supervisor task + let supervisor_task = repository::create_task_for_owner( + pool, + owner_id, + CreateTaskRequest { + contract_id: Some(contract.id), + name: format!("{} Supervisor", step.name), + description: step.description.clone(), + plan: task_plan, + parent_task_id: None, + is_supervisor: true, + priority: 5, + repository_url: directive.repository_url.clone(), + base_branch: directive.base_branch.clone(), + target_branch: None, + merge_mode: None, + target_repo_path: directive.local_path.clone(), + completion_action: None, + continue_from_task_id: None, + copy_files: None, + checkpoint_sha: None, + branched_from_task_id: None, + conversation_history: None, + supervisor_worktree_task_id: None, + }, + ) + .await + .map_err(|e| format!("Failed to create step supervisor task: {}", e))?; + + // Link supervisor to contract + repository::update_contract_for_owner( + pool, + contract.id, + owner_id, + UpdateContractRequest { + supervisor_task_id: Some(supervisor_task.id), + ..Default::default() + }, + ) + .await + .map_err(|e| match e { + crate::db::repository::RepositoryError::Database(e) => { + format!("Failed to link supervisor to step contract: {}", e) + } + other => format!("Failed to link supervisor to step contract: {:?}", other), + })?; + + // Link step to contract/task + repository::update_step_contract(pool, step.id, contract.id, supervisor_task.id) + .await + .map_err(|e| format!("Failed to update step contract link: {}", e))?; + + // Copy repo config from directive to step contract + if let Some(ref repo_url) = directive.repository_url { + let _ = repository::add_remote_repository( + pool, + contract.id, + "directive-repo", + repo_url, + true, + ) + .await; + } else if let Some(ref local_path) = directive.local_path { + let _ = repository::add_local_repository( + pool, + contract.id, + "directive-repo", + local_path, + true, + ) + .await; + } + + tracing::info!( + directive_id = %directive.id, + step_id = %step.id, + step_name = %step.name, + contract_id = %contract.id, + task_id = %supervisor_task.id, + "Step dispatched" + ); + + Ok(()) +} + +/// Build the planning supervisor prompt from a directive. +fn build_planning_prompt(directive: &Directive) -> String { + format!( + r#"You are planning the execution of a directive. + +DIRECTIVE: {} +GOAL: {} +REQUIREMENTS: {} +ACCEPTANCE CRITERIA: {} +CONSTRAINTS: {} + +Your job is to decompose this goal into a sequence of executable steps. +Each step will become a separate contract with its own supervisor. + +Write a JSON plan to a contract file named "chain-plan" using: + makima contract create-file "chain-plan" < plan.json + +The JSON format: +{{ + "steps": [ + {{ + "name": "Step name", + "description": "What this step accomplishes", + "task_plan": "Detailed instructions for the step's supervisor", + "depends_on": [] + }} + ] +}} + +Rules: +- Steps with no dependencies (empty depends_on array) will run in parallel. +- Steps that depend on other steps will wait until those complete. +- The depends_on array contains names of steps this step depends on. +- Each step should be a self-contained unit of work. +- Be specific in task_plan — include file paths, function names, and acceptance criteria where possible. +- Keep the number of steps reasonable (3-10 typically). + +After writing the plan file, mark the contract as complete using: + makima supervisor complete"#, + directive.title, + directive.goal, + serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(), + serde_json::to_string_pretty(&directive.acceptance_criteria).unwrap_or_default(), + serde_json::to_string_pretty(&directive.constraints).unwrap_or_default(), + ) +} + +/// Extract JSON from file body elements. +fn extract_plan_json(body: &[crate::db::models::BodyElement]) -> Option { + use crate::db::models::BodyElement; + + for element in body { + match element { + BodyElement::Code { content, .. } => { + // Try to parse as JSON + let trimmed = content.trim(); + if trimmed.starts_with('{') || trimmed.starts_with('[') { + if serde_json::from_str::(trimmed).is_ok() { + return Some(trimmed.to_string()); + } + } + } + BodyElement::Paragraph { text } => { + let trimmed = text.trim(); + if trimmed.starts_with('{') || trimmed.starts_with('[') { + if serde_json::from_str::(trimmed).is_ok() { + return Some(trimmed.to_string()); + } + } + } + BodyElement::Markdown { content } => { + // Try to find JSON in markdown content + let trimmed = content.trim(); + if trimmed.starts_with('{') || trimmed.starts_with('[') { + if serde_json::from_str::(trimmed).is_ok() { + return Some(trimmed.to_string()); + } + } + // Try to find JSON in code blocks within markdown + if let Some(json_start) = trimmed.find("```json") { + let after = &trimmed[json_start + 7..]; + if let Some(json_end) = after.find("```") { + let json_str = after[..json_end].trim(); + if serde_json::from_str::(json_str).is_ok() { + return Some(json_str.to_string()); + } + } + } + } + _ => {} + } + } + + // Fallback: concatenate all text content and try to find JSON + let all_text: String = body + .iter() + .map(|el| match el { + BodyElement::Code { content, .. } => content.clone(), + BodyElement::Paragraph { text } => text.clone(), + BodyElement::Markdown { content } => content.clone(), + _ => String::new(), + }) + .collect::>() + .join("\n"); + + let trimmed = all_text.trim(); + if let Some(start) = trimmed.find('{') { + // Find matching closing brace + let substr = &trimmed[start..]; + if serde_json::from_str::(substr).is_ok() { + return Some(substr.to_string()); + } + } + + None +} diff --git a/makima/src/orchestration/mod.rs b/makima/src/orchestration/mod.rs new file mode 100644 index 0000000..e7ffb70 --- /dev/null +++ b/makima/src/orchestration/mod.rs @@ -0,0 +1 @@ +pub mod directive; -- cgit v1.2.3