summaryrefslogtreecommitdiff
path: root/makima/src/orchestration
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-02-07 16:36:19 +0000
committersoryu <soryu@soryu.co>2026-02-07 16:36:19 +0000
commit1b72449496ce3a057a43d002c8042d5e7a1d6576 (patch)
treef9151df7cc5128499ee91aafde3ff3c3b3281c1e /makima/src/orchestration
parent9e9f18884c78c21f5785908fb7ccd00e2fa5436b (diff)
downloadsoryu-1b72449496ce3a057a43d002c8042d5e7a1d6576.tar.gz
soryu-1b72449496ce3a057a43d002c8042d5e7a1d6576.zip
Add directive init mechanism
Diffstat (limited to 'makima/src/orchestration')
-rw-r--r--makima/src/orchestration/directive.rs736
-rw-r--r--makima/src/orchestration/mod.rs1
2 files changed, 737 insertions, 0 deletions
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs
new file mode 100644
index 0000000..d17deeb
--- /dev/null
+++ b/makima/src/orchestration/directive.rs
@@ -0,0 +1,736 @@
+//! Directive orchestration — init, planning completion, chain advancement.
+
+use serde::Deserialize;
+use sqlx::PgPool;
+use uuid::Uuid;
+
+use crate::db::models::{
+ CreateContractRequest, CreateTaskRequest, Directive, Task, UpdateContractRequest,
+};
+use crate::db::repository;
+use crate::server::state::SharedState;
+
+/// A single step in the chain plan produced by the planning supervisor.
+#[derive(Debug, Deserialize)]
+struct ChainPlanStep {
+ name: String,
+ description: String,
+ task_plan: String,
+ #[serde(default)]
+ depends_on: Vec<String>, // names of steps this depends on
+}
+
+/// Wrapper for the plan JSON written by the planning supervisor.
+#[derive(Debug, Deserialize)]
+struct ChainPlan {
+ steps: Vec<ChainPlanStep>,
+}
+
+/// Initialize a directive: create a planning contract and transition to "planning".
+pub async fn init_directive(
+ pool: &PgPool,
+ _state: &SharedState,
+ owner_id: Uuid,
+ directive_id: Uuid,
+) -> Result<Directive, String> {
+ // 1. Get directive, verify status
+ let directive = repository::get_directive_for_owner(pool, directive_id, owner_id)
+ .await
+ .map_err(|e| format!("Failed to get directive: {}", e))?
+ .ok_or("Directive not found")?;
+
+ if directive.status != "draft" {
+ return Err(format!(
+ "Directive must be in 'draft' status to start, current status: '{}'",
+ directive.status
+ ));
+ }
+
+ // 2. Create planning contract
+ let contract = repository::create_contract_for_owner(
+ pool,
+ owner_id,
+ CreateContractRequest {
+ name: format!("{} - Planning", directive.title),
+ description: Some(format!(
+ "Planning contract for directive: {}",
+ directive.title
+ )),
+ contract_type: Some("simple".to_string()),
+ template_id: None,
+ initial_phase: Some("plan".to_string()),
+ autonomous_loop: Some(true),
+ phase_guard: None,
+ local_only: Some(true),
+ auto_merge_local: None,
+ },
+ )
+ .await
+ .map_err(|e| format!("Failed to create planning contract: {}", e))?;
+
+ // 3. Mark contract as directive orchestrator
+ repository::set_contract_directive_fields(pool, contract.id, Some(directive_id), true)
+ .await
+ .map_err(|e| format!("Failed to set contract directive fields: {}", e))?;
+
+ // 4. Build planning prompt
+ let planning_prompt = build_planning_prompt(&directive);
+
+ // 5. Create supervisor task
+ let supervisor_task = repository::create_task_for_owner(
+ pool,
+ owner_id,
+ CreateTaskRequest {
+ contract_id: Some(contract.id),
+ name: format!("{} - Planner", directive.title),
+ description: Some("Decompose directive goal into executable chain steps".to_string()),
+ plan: planning_prompt,
+ parent_task_id: None,
+ is_supervisor: true,
+ priority: 10,
+ repository_url: directive.repository_url.clone(),
+ base_branch: directive.base_branch.clone(),
+ target_branch: None,
+ merge_mode: None,
+ target_repo_path: directive.local_path.clone(),
+ completion_action: None,
+ continue_from_task_id: None,
+ copy_files: None,
+ checkpoint_sha: None,
+ branched_from_task_id: None,
+ conversation_history: None,
+ supervisor_worktree_task_id: None,
+ },
+ )
+ .await
+ .map_err(|e| format!("Failed to create supervisor task: {}", e))?;
+
+ // 6. Link supervisor to contract
+ repository::update_contract_for_owner(
+ pool,
+ contract.id,
+ owner_id,
+ UpdateContractRequest {
+ supervisor_task_id: Some(supervisor_task.id),
+ ..Default::default()
+ },
+ )
+ .await
+ .map_err(|e| match e {
+ crate::db::repository::RepositoryError::Database(e) => {
+ format!("Failed to link supervisor to contract: {}", e)
+ }
+ other => format!("Failed to link supervisor to contract: {:?}", other),
+ })?;
+
+ // 7. Set orchestrator_contract_id on directive
+ repository::set_directive_orchestrator_contract(pool, directive_id, contract.id)
+ .await
+ .map_err(|e| format!("Failed to set orchestrator contract: {}", e))?;
+
+ // 8. Transition directive to "planning"
+ let updated = repository::update_directive_status(pool, directive_id, "planning")
+ .await
+ .map_err(|e| format!("Failed to update directive status: {}", e))?
+ .ok_or("Directive not found after status update")?;
+
+ // 9. Copy repo config to contract if repository_url is set
+ if let Some(ref repo_url) = directive.repository_url {
+ let _ = repository::add_remote_repository(
+ pool,
+ contract.id,
+ "directive-repo",
+ repo_url,
+ true,
+ )
+ .await;
+ } else if let Some(ref local_path) = directive.local_path {
+ let _ = repository::add_local_repository(
+ pool,
+ contract.id,
+ "directive-repo",
+ local_path,
+ true,
+ )
+ .await;
+ }
+
+ tracing::info!(
+ directive_id = %directive_id,
+ contract_id = %contract.id,
+ task_id = %supervisor_task.id,
+ "Directive started: planning contract created"
+ );
+
+ Ok(updated)
+}
+
+/// Called when any task completes — checks if it's directive-related and advances.
+pub async fn on_task_completed(
+ pool: &PgPool,
+ state: &SharedState,
+ task: &Task,
+ owner_id: Uuid,
+) -> Result<(), String> {
+ let Some(contract_id) = task.contract_id else {
+ return Ok(());
+ };
+
+ let contract = repository::get_contract_for_owner(pool, contract_id, owner_id)
+ .await
+ .map_err(|e| format!("Failed to get contract: {}", e))?;
+
+ let Some(contract) = contract else {
+ return Ok(());
+ };
+
+ if contract.is_directive_orchestrator {
+ // This is a planning contract completion
+ let directive =
+ repository::get_directive_by_orchestrator_contract(pool, contract_id)
+ .await
+ .map_err(|e| format!("Failed to get directive by orchestrator: {}", e))?;
+
+ if let Some(directive) = directive {
+ on_planning_completed(pool, state, &directive, task, owner_id).await?;
+ }
+ } else if contract.directive_id.is_some() {
+ // This is a step contract completion
+ on_step_completed(pool, state, &contract, task, owner_id).await?;
+ }
+
+ Ok(())
+}
+
+/// Handle planning task completion: parse chain plan, create steps, advance.
+async fn on_planning_completed(
+ pool: &PgPool,
+ state: &SharedState,
+ directive: &Directive,
+ task: &Task,
+ owner_id: Uuid,
+) -> Result<(), String> {
+ // If task failed, fail the directive
+ if task.status == "failed" {
+ tracing::warn!(
+ directive_id = %directive.id,
+ task_id = %task.id,
+ "Planning task failed, marking directive as failed"
+ );
+ repository::update_directive_status(pool, directive.id, "failed")
+ .await
+ .map_err(|e| format!("Failed to update directive status: {}", e))?;
+ return Ok(());
+ }
+
+ // Only process when the supervisor task itself is done
+ if task.status != "done" || !task.is_supervisor {
+ return Ok(());
+ }
+
+ let Some(contract_id) = task.contract_id else {
+ return Ok(());
+ };
+
+ // Get contract files to find the chain plan
+ let files = repository::list_files_in_contract(pool, contract_id, owner_id)
+ .await
+ .map_err(|e| format!("Failed to list contract files: {}", e))?;
+
+ // Find the chain plan file
+ let plan_file = files.iter().find(|f| {
+ let name_lower = f.name.to_lowercase();
+ name_lower.contains("chain") || name_lower.contains("plan")
+ });
+
+ let plan_file = plan_file.or_else(|| files.first());
+
+ let Some(plan_file) = plan_file else {
+ tracing::warn!(
+ directive_id = %directive.id,
+ "No plan file found in planning contract, marking directive failed"
+ );
+ repository::update_directive_status(pool, directive.id, "failed")
+ .await
+ .map_err(|e| format!("Failed to update directive status: {}", e))?;
+ return Ok(());
+ };
+
+ // Read the full file to get the body content
+ let full_file = repository::get_file(pool, plan_file.id)
+ .await
+ .map_err(|e| format!("Failed to get plan file: {}", e))?
+ .ok_or("Plan file not found")?;
+
+ // Extract JSON from the file body elements
+ let plan_json = extract_plan_json(&full_file.body);
+
+ let Some(plan_json) = plan_json else {
+ tracing::warn!(
+ directive_id = %directive.id,
+ "Could not extract chain plan JSON from file body, marking directive failed"
+ );
+ repository::update_directive_status(pool, directive.id, "failed")
+ .await
+ .map_err(|e| format!("Failed to update directive status: {}", e))?;
+ return Ok(());
+ };
+
+ let chain_plan: ChainPlan = serde_json::from_str(&plan_json).map_err(|e| {
+ format!("Failed to parse chain plan JSON: {}", e)
+ })?;
+
+ if chain_plan.steps.is_empty() {
+ tracing::warn!(
+ directive_id = %directive.id,
+ "Chain plan has no steps, marking directive failed"
+ );
+ repository::update_directive_status(pool, directive.id, "failed")
+ .await
+ .map_err(|e| format!("Failed to update directive status: {}", e))?;
+ return Ok(());
+ }
+
+ // Create chain
+ let chain = repository::create_directive_chain(
+ pool,
+ directive.id,
+ &format!("{} - Chain", directive.title),
+ Some("Auto-generated from planning"),
+ None,
+ chain_plan.steps.len() as i32,
+ )
+ .await
+ .map_err(|e| format!("Failed to create directive chain: {}", e))?;
+
+ // Create steps (two passes: first create all, then resolve dependencies)
+ let mut step_ids: Vec<(String, Uuid)> = Vec::new();
+
+ for (i, plan_step) in chain_plan.steps.iter().enumerate() {
+ let step = repository::create_chain_step(
+ pool,
+ chain.id,
+ &plan_step.name,
+ Some(&plan_step.description),
+ "task",
+ "simple",
+ Some("plan"),
+ Some(&plan_step.task_plan),
+ None, // dependencies set in second pass
+ i as i32,
+ )
+ .await
+ .map_err(|e| format!("Failed to create chain step: {}", e))?;
+
+ step_ids.push((plan_step.name.clone(), step.id));
+ }
+
+ // Second pass: resolve name-based dependencies to UUIDs and update
+ for (i, plan_step) in chain_plan.steps.iter().enumerate() {
+ if plan_step.depends_on.is_empty() {
+ continue;
+ }
+
+ let dep_uuids: Vec<Uuid> = plan_step
+ .depends_on
+ .iter()
+ .filter_map(|dep_name| {
+ step_ids
+ .iter()
+ .find(|(name, _)| name == dep_name)
+ .map(|(_, id)| *id)
+ })
+ .collect();
+
+ if !dep_uuids.is_empty() {
+ let step_id = step_ids[i].1;
+ sqlx::query(
+ "UPDATE chain_steps SET depends_on = $2 WHERE id = $1",
+ )
+ .bind(step_id)
+ .bind(&dep_uuids)
+ .execute(pool)
+ .await
+ .map_err(|e| format!("Failed to update step dependencies: {}", e))?;
+ }
+ }
+
+ // Set current chain on directive
+ repository::set_directive_current_chain(pool, directive.id, chain.id)
+ .await
+ .map_err(|e| format!("Failed to set current chain: {}", e))?;
+
+ // Transition directive to active
+ let updated_directive = repository::update_directive_status(pool, directive.id, "active")
+ .await
+ .map_err(|e| format!("Failed to update directive status: {}", e))?
+ .ok_or("Directive not found after status update")?;
+
+ tracing::info!(
+ directive_id = %directive.id,
+ chain_id = %chain.id,
+ step_count = chain_plan.steps.len(),
+ "Chain plan created, advancing chain"
+ );
+
+ // Advance chain to dispatch ready steps
+ advance_chain(pool, state, &updated_directive, owner_id).await
+}
+
+/// Handle a step contract task completion.
+async fn on_step_completed(
+ pool: &PgPool,
+ state: &SharedState,
+ contract: &crate::db::models::Contract,
+ task: &Task,
+ owner_id: Uuid,
+) -> Result<(), String> {
+ // Only process supervisor task completions
+ if !task.is_supervisor {
+ return Ok(());
+ }
+
+ let Some(directive_id) = contract.directive_id else {
+ return Ok(());
+ };
+
+ // Find the step linked to this contract
+ let step = repository::get_step_by_contract_id(pool, contract.id)
+ .await
+ .map_err(|e| format!("Failed to get step by contract: {}", e))?;
+
+ let Some(step) = step else {
+ return Ok(());
+ };
+
+ // Update step status based on task outcome
+ let new_status = if task.status == "done" {
+ "passed"
+ } else {
+ "failed"
+ };
+
+ repository::update_step_status(pool, step.id, new_status)
+ .await
+ .map_err(|e| format!("Failed to update step status: {}", e))?;
+
+ tracing::info!(
+ directive_id = %directive_id,
+ step_id = %step.id,
+ step_name = %step.name,
+ new_status = new_status,
+ "Step completed"
+ );
+
+ // Get the directive and advance
+ let directive = repository::get_directive(pool, directive_id)
+ .await
+ .map_err(|e| format!("Failed to get directive: {}", e))?
+ .ok_or("Directive not found")?;
+
+ advance_chain(pool, state, &directive, owner_id).await
+}
+
+/// Check chain progress and dispatch ready steps or mark directive complete.
+async fn advance_chain(
+ pool: &PgPool,
+ _state: &SharedState,
+ directive: &Directive,
+ owner_id: Uuid,
+) -> Result<(), String> {
+ let Some(chain_id) = directive.current_chain_id else {
+ return Ok(());
+ };
+
+ let steps = repository::list_steps_for_chain(pool, chain_id)
+ .await
+ .map_err(|e| format!("Failed to list steps: {}", e))?;
+
+ // Check if all steps passed
+ let all_passed = steps.iter().all(|s| s.status == "passed");
+ if all_passed && !steps.is_empty() {
+ repository::update_chain_status(pool, chain_id, "completed")
+ .await
+ .map_err(|e| format!("Failed to update chain status: {}", e))?;
+ repository::update_directive_status(pool, directive.id, "completed")
+ .await
+ .map_err(|e| format!("Failed to update directive status: {}", e))?;
+ tracing::info!(directive_id = %directive.id, "Directive completed: all steps passed");
+ return Ok(());
+ }
+
+ // Check if any step failed
+ let any_failed = steps.iter().any(|s| s.status == "failed");
+ if any_failed {
+ repository::update_chain_status(pool, chain_id, "failed")
+ .await
+ .map_err(|e| format!("Failed to update chain status: {}", e))?;
+ repository::update_directive_status(pool, directive.id, "failed")
+ .await
+ .map_err(|e| format!("Failed to update directive status: {}", e))?;
+ tracing::info!(directive_id = %directive.id, "Directive failed: step failure detected");
+ return Ok(());
+ }
+
+ // Find and dispatch ready steps
+ let ready_steps = repository::find_ready_steps(pool, chain_id)
+ .await
+ .map_err(|e| format!("Failed to find ready steps: {}", e))?;
+
+ for step in ready_steps {
+ if let Err(e) = dispatch_step(pool, directive, &step, owner_id).await {
+ tracing::error!(
+ step_id = %step.id,
+ step_name = %step.name,
+ error = %e,
+ "Failed to dispatch step"
+ );
+ }
+ }
+
+ Ok(())
+}
+
+/// Dispatch a single chain step as a new contract with supervisor.
+async fn dispatch_step(
+ pool: &PgPool,
+ directive: &Directive,
+ step: &crate::db::models::ChainStep,
+ owner_id: Uuid,
+) -> Result<(), String> {
+ // Mark step as running
+ repository::update_step_status(pool, step.id, "running")
+ .await
+ .map_err(|e| format!("Failed to update step status: {}", e))?;
+
+ // Create contract for this step
+ let contract = repository::create_contract_for_owner(
+ pool,
+ owner_id,
+ CreateContractRequest {
+ name: step.name.clone(),
+ description: step.description.clone(),
+ contract_type: Some(step.contract_type.clone()),
+ template_id: None,
+ initial_phase: step.initial_phase.clone(),
+ autonomous_loop: Some(true),
+ phase_guard: None,
+ local_only: Some(true),
+ auto_merge_local: None,
+ },
+ )
+ .await
+ .map_err(|e| format!("Failed to create step contract: {}", e))?;
+
+ // Set directive_id on contract
+ repository::set_contract_directive_fields(pool, contract.id, Some(directive.id), false)
+ .await
+ .map_err(|e| format!("Failed to set contract directive fields: {}", e))?;
+
+ // Build the task plan
+ let task_plan = step
+ .task_plan
+ .clone()
+ .unwrap_or_else(|| format!("Execute step: {}", step.name));
+
+ // Create supervisor task
+ let supervisor_task = repository::create_task_for_owner(
+ pool,
+ owner_id,
+ CreateTaskRequest {
+ contract_id: Some(contract.id),
+ name: format!("{} Supervisor", step.name),
+ description: step.description.clone(),
+ plan: task_plan,
+ parent_task_id: None,
+ is_supervisor: true,
+ priority: 5,
+ repository_url: directive.repository_url.clone(),
+ base_branch: directive.base_branch.clone(),
+ target_branch: None,
+ merge_mode: None,
+ target_repo_path: directive.local_path.clone(),
+ completion_action: None,
+ continue_from_task_id: None,
+ copy_files: None,
+ checkpoint_sha: None,
+ branched_from_task_id: None,
+ conversation_history: None,
+ supervisor_worktree_task_id: None,
+ },
+ )
+ .await
+ .map_err(|e| format!("Failed to create step supervisor task: {}", e))?;
+
+ // Link supervisor to contract
+ repository::update_contract_for_owner(
+ pool,
+ contract.id,
+ owner_id,
+ UpdateContractRequest {
+ supervisor_task_id: Some(supervisor_task.id),
+ ..Default::default()
+ },
+ )
+ .await
+ .map_err(|e| match e {
+ crate::db::repository::RepositoryError::Database(e) => {
+ format!("Failed to link supervisor to step contract: {}", e)
+ }
+ other => format!("Failed to link supervisor to step contract: {:?}", other),
+ })?;
+
+ // Link step to contract/task
+ repository::update_step_contract(pool, step.id, contract.id, supervisor_task.id)
+ .await
+ .map_err(|e| format!("Failed to update step contract link: {}", e))?;
+
+ // Copy repo config from directive to step contract
+ if let Some(ref repo_url) = directive.repository_url {
+ let _ = repository::add_remote_repository(
+ pool,
+ contract.id,
+ "directive-repo",
+ repo_url,
+ true,
+ )
+ .await;
+ } else if let Some(ref local_path) = directive.local_path {
+ let _ = repository::add_local_repository(
+ pool,
+ contract.id,
+ "directive-repo",
+ local_path,
+ true,
+ )
+ .await;
+ }
+
+ tracing::info!(
+ directive_id = %directive.id,
+ step_id = %step.id,
+ step_name = %step.name,
+ contract_id = %contract.id,
+ task_id = %supervisor_task.id,
+ "Step dispatched"
+ );
+
+ Ok(())
+}
+
+/// Build the planning supervisor prompt from a directive.
+fn build_planning_prompt(directive: &Directive) -> String {
+ format!(
+ r#"You are planning the execution of a directive.
+
+DIRECTIVE: {}
+GOAL: {}
+REQUIREMENTS: {}
+ACCEPTANCE CRITERIA: {}
+CONSTRAINTS: {}
+
+Your job is to decompose this goal into a sequence of executable steps.
+Each step will become a separate contract with its own supervisor.
+
+Write a JSON plan to a contract file named "chain-plan" using:
+ makima contract create-file "chain-plan" < plan.json
+
+The JSON format:
+{{
+ "steps": [
+ {{
+ "name": "Step name",
+ "description": "What this step accomplishes",
+ "task_plan": "Detailed instructions for the step's supervisor",
+ "depends_on": []
+ }}
+ ]
+}}
+
+Rules:
+- Steps with no dependencies (empty depends_on array) will run in parallel.
+- Steps that depend on other steps will wait until those complete.
+- The depends_on array contains names of steps this step depends on.
+- Each step should be a self-contained unit of work.
+- Be specific in task_plan — include file paths, function names, and acceptance criteria where possible.
+- Keep the number of steps reasonable (3-10 typically).
+
+After writing the plan file, mark the contract as complete using:
+ makima supervisor complete"#,
+ directive.title,
+ directive.goal,
+ serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(),
+ serde_json::to_string_pretty(&directive.acceptance_criteria).unwrap_or_default(),
+ serde_json::to_string_pretty(&directive.constraints).unwrap_or_default(),
+ )
+}
+
+/// Extract JSON from file body elements.
+fn extract_plan_json(body: &[crate::db::models::BodyElement]) -> Option<String> {
+ use crate::db::models::BodyElement;
+
+ for element in body {
+ match element {
+ BodyElement::Code { content, .. } => {
+ // Try to parse as JSON
+ let trimmed = content.trim();
+ if trimmed.starts_with('{') || trimmed.starts_with('[') {
+ if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
+ return Some(trimmed.to_string());
+ }
+ }
+ }
+ BodyElement::Paragraph { text } => {
+ let trimmed = text.trim();
+ if trimmed.starts_with('{') || trimmed.starts_with('[') {
+ if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
+ return Some(trimmed.to_string());
+ }
+ }
+ }
+ BodyElement::Markdown { content } => {
+ // Try to find JSON in markdown content
+ let trimmed = content.trim();
+ if trimmed.starts_with('{') || trimmed.starts_with('[') {
+ if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
+ return Some(trimmed.to_string());
+ }
+ }
+ // Try to find JSON in code blocks within markdown
+ if let Some(json_start) = trimmed.find("```json") {
+ let after = &trimmed[json_start + 7..];
+ if let Some(json_end) = after.find("```") {
+ let json_str = after[..json_end].trim();
+ if serde_json::from_str::<serde_json::Value>(json_str).is_ok() {
+ return Some(json_str.to_string());
+ }
+ }
+ }
+ }
+ _ => {}
+ }
+ }
+
+ // Fallback: concatenate all text content and try to find JSON
+ let all_text: String = body
+ .iter()
+ .map(|el| match el {
+ BodyElement::Code { content, .. } => content.clone(),
+ BodyElement::Paragraph { text } => text.clone(),
+ BodyElement::Markdown { content } => content.clone(),
+ _ => String::new(),
+ })
+ .collect::<Vec<_>>()
+ .join("\n");
+
+ let trimmed = all_text.trim();
+ if let Some(start) = trimmed.find('{') {
+ // Find matching closing brace
+ let substr = &trimmed[start..];
+ if serde_json::from_str::<serde_json::Value>(substr).is_ok() {
+ return Some(substr.to_string());
+ }
+ }
+
+ None
+}
diff --git a/makima/src/orchestration/mod.rs b/makima/src/orchestration/mod.rs
new file mode 100644
index 0000000..e7ffb70
--- /dev/null
+++ b/makima/src/orchestration/mod.rs
@@ -0,0 +1 @@
+pub mod directive;