summaryrefslogtreecommitdiff
path: root/makima/src
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-02-07 16:36:19 +0000
committersoryu <soryu@soryu.co>2026-02-07 16:36:19 +0000
commit1b72449496ce3a057a43d002c8042d5e7a1d6576 (patch)
treef9151df7cc5128499ee91aafde3ff3c3b3281c1e /makima/src
parent9e9f18884c78c21f5785908fb7ccd00e2fa5436b (diff)
downloadsoryu-1b72449496ce3a057a43d002c8042d5e7a1d6576.tar.gz
soryu-1b72449496ce3a057a43d002c8042d5e7a1d6576.zip
Add directive init mechanism
Diffstat (limited to 'makima/src')
-rw-r--r--makima/src/bin/makima.rs5
-rw-r--r--makima/src/daemon/api/directive.rs6
-rw-r--r--makima/src/daemon/cli/mod.rs3
-rw-r--r--makima/src/db/repository.rs296
-rw-r--r--makima/src/lib.rs1
-rw-r--r--makima/src/orchestration/directive.rs736
-rw-r--r--makima/src/orchestration/mod.rs1
-rw-r--r--makima/src/server/handlers/directives.rs58
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs11
-rw-r--r--makima/src/server/mod.rs1
-rw-r--r--makima/src/server/openapi.rs1
11 files changed, 1118 insertions, 1 deletions
diff --git a/makima/src/bin/makima.rs b/makima/src/bin/makima.rs
index 308d689..9d7f847 100644
--- a/makima/src/bin/makima.rs
+++ b/makima/src/bin/makima.rs
@@ -768,6 +768,11 @@ async fn run_directive(
.await?;
println!("{}", serde_json::to_string(&result.0)?);
}
+ DirectiveCommand::Start(args) => {
+ let client = ApiClient::new(args.api_url, args.api_key)?;
+ let result = client.directive_start(args.directive_id).await?;
+ println!("{}", serde_json::to_string(&result.0)?);
+ }
}
Ok(())
diff --git a/makima/src/daemon/api/directive.rs b/makima/src/daemon/api/directive.rs
index 0c8115a..42f6f45 100644
--- a/makima/src/daemon/api/directive.rs
+++ b/makima/src/daemon/api/directive.rs
@@ -51,4 +51,10 @@ impl ApiClient {
self.put(&format!("/api/v1/directives/{}", directive_id), &req)
.await
}
+
+ /// Start a directive (transition from draft to planning).
+ pub async fn directive_start(&self, directive_id: Uuid) -> Result<JsonValue, ApiError> {
+ self.post_empty(&format!("/api/v1/directives/{}/start", directive_id))
+ .await
+ }
}
diff --git a/makima/src/daemon/cli/mod.rs b/makima/src/daemon/cli/mod.rs
index 9fba216..b07ab5a 100644
--- a/makima/src/daemon/cli/mod.rs
+++ b/makima/src/daemon/cli/mod.rs
@@ -222,6 +222,9 @@ pub enum DirectiveCommand {
/// Update directive status
UpdateStatus(directive::UpdateStatusArgs),
+
+ /// Start a directive (create planning contract and begin orchestration)
+ Start(DirectiveArgs),
}
impl Cli {
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index 5949079..e072eb8 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -5183,3 +5183,299 @@ pub async fn list_steps_for_chain(
.fetch_all(pool)
.await
}
+
+// ── Directive orchestration functions ───────────────────────────────────────
+
+/// Update directive status with automatic timestamp management.
+pub async fn update_directive_status(
+ pool: &PgPool,
+ id: Uuid,
+ new_status: &str,
+) -> Result<Option<Directive>, sqlx::Error> {
+ sqlx::query_as::<_, Directive>(
+ r#"
+ UPDATE directives
+ SET status = $2,
+ started_at = CASE WHEN $2 = 'active' AND started_at IS NULL THEN NOW() ELSE started_at END,
+ completed_at = CASE WHEN $2 IN ('completed', 'failed') THEN NOW() ELSE completed_at END,
+ version = version + 1,
+ updated_at = NOW()
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(id)
+ .bind(new_status)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Set the orchestrator contract ID on a directive.
+pub async fn set_directive_orchestrator_contract(
+ pool: &PgPool,
+ directive_id: Uuid,
+ contract_id: Uuid,
+) -> Result<Option<Directive>, sqlx::Error> {
+ sqlx::query_as::<_, Directive>(
+ r#"
+ UPDATE directives
+ SET orchestrator_contract_id = $2,
+ version = version + 1,
+ updated_at = NOW()
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(directive_id)
+ .bind(contract_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Set the current chain ID on a directive and increment chain_generation_count.
+pub async fn set_directive_current_chain(
+ pool: &PgPool,
+ directive_id: Uuid,
+ chain_id: Uuid,
+) -> Result<Option<Directive>, sqlx::Error> {
+ sqlx::query_as::<_, Directive>(
+ r#"
+ UPDATE directives
+ SET current_chain_id = $2,
+ chain_generation_count = chain_generation_count + 1,
+ version = version + 1,
+ updated_at = NOW()
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(directive_id)
+ .bind(chain_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Create a new directive chain.
+pub async fn create_directive_chain(
+ pool: &PgPool,
+ directive_id: Uuid,
+ name: &str,
+ description: Option<&str>,
+ rationale: Option<&str>,
+ total_steps: i32,
+) -> Result<DirectiveChain, sqlx::Error> {
+ // Get next generation number
+ let next_gen: (i64,) = sqlx::query_as(
+ "SELECT COALESCE(MAX(generation), 0) + 1 FROM directive_chains WHERE directive_id = $1",
+ )
+ .bind(directive_id)
+ .fetch_one(pool)
+ .await?;
+
+ sqlx::query_as::<_, DirectiveChain>(
+ r#"
+ INSERT INTO directive_chains (directive_id, generation, name, description, rationale, total_steps, status)
+ VALUES ($1, $2, $3, $4, $5, $6, 'active')
+ RETURNING *
+ "#,
+ )
+ .bind(directive_id)
+ .bind(next_gen.0 as i32)
+ .bind(name)
+ .bind(description)
+ .bind(rationale)
+ .bind(total_steps)
+ .fetch_one(pool)
+ .await
+}
+
+/// Create a chain step.
+pub async fn create_chain_step(
+ pool: &PgPool,
+ chain_id: Uuid,
+ name: &str,
+ description: Option<&str>,
+ step_type: &str,
+ contract_type: &str,
+ initial_phase: Option<&str>,
+ task_plan: Option<&str>,
+ depends_on: Option<Vec<Uuid>>,
+ order_index: i32,
+) -> Result<ChainStep, sqlx::Error> {
+ sqlx::query_as::<_, ChainStep>(
+ r#"
+ INSERT INTO chain_steps (chain_id, name, description, step_type, contract_type, initial_phase, task_plan, depends_on, order_index, status)
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, 'pending')
+ RETURNING *
+ "#,
+ )
+ .bind(chain_id)
+ .bind(name)
+ .bind(description)
+ .bind(step_type)
+ .bind(contract_type)
+ .bind(initial_phase)
+ .bind(task_plan)
+ .bind(depends_on.as_deref())
+ .bind(order_index)
+ .fetch_one(pool)
+ .await
+}
+
+/// Update a chain step's status with automatic timestamp management.
+pub async fn update_step_status(
+ pool: &PgPool,
+ step_id: Uuid,
+ new_status: &str,
+) -> Result<Option<ChainStep>, sqlx::Error> {
+ sqlx::query_as::<_, ChainStep>(
+ r#"
+ UPDATE chain_steps
+ SET status = $2,
+ started_at = CASE WHEN $2 = 'running' AND started_at IS NULL THEN NOW() ELSE started_at END,
+ completed_at = CASE WHEN $2 IN ('passed', 'failed') THEN NOW() ELSE completed_at END
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(step_id)
+ .bind(new_status)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Link a chain step to a contract and supervisor task.
+pub async fn update_step_contract(
+ pool: &PgPool,
+ step_id: Uuid,
+ contract_id: Uuid,
+ supervisor_task_id: Uuid,
+) -> Result<Option<ChainStep>, sqlx::Error> {
+ sqlx::query_as::<_, ChainStep>(
+ r#"
+ UPDATE chain_steps
+ SET contract_id = $2,
+ supervisor_task_id = $3
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(step_id)
+ .bind(contract_id)
+ .bind(supervisor_task_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Find steps that are ready to execute (pending, with all dependencies passed).
+pub async fn find_ready_steps(
+ pool: &PgPool,
+ chain_id: Uuid,
+) -> Result<Vec<ChainStep>, sqlx::Error> {
+ sqlx::query_as::<_, ChainStep>(
+ r#"
+ SELECT * FROM chain_steps
+ WHERE chain_id = $1
+ AND status = 'pending'
+ AND (
+ depends_on IS NULL
+ OR array_length(depends_on, 1) IS NULL
+ OR NOT EXISTS (
+ SELECT 1 FROM unnest(depends_on) AS dep_id
+ WHERE dep_id NOT IN (
+ SELECT id FROM chain_steps WHERE chain_id = $1 AND status = 'passed'
+ )
+ )
+ )
+ ORDER BY order_index ASC
+ "#,
+ )
+ .bind(chain_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Get a chain step by its linked contract ID.
+pub async fn get_step_by_contract_id(
+ pool: &PgPool,
+ contract_id: Uuid,
+) -> Result<Option<ChainStep>, sqlx::Error> {
+ sqlx::query_as::<_, ChainStep>(
+ r#"SELECT * FROM chain_steps WHERE contract_id = $1"#,
+ )
+ .bind(contract_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Get a directive by its orchestrator contract ID.
+pub async fn get_directive_by_orchestrator_contract(
+ pool: &PgPool,
+ contract_id: Uuid,
+) -> Result<Option<Directive>, sqlx::Error> {
+ sqlx::query_as::<_, Directive>(
+ r#"SELECT * FROM directives WHERE orchestrator_contract_id = $1"#,
+ )
+ .bind(contract_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Set directive-related fields on a contract (directive_id, is_directive_orchestrator).
+pub async fn set_contract_directive_fields(
+ pool: &PgPool,
+ contract_id: Uuid,
+ directive_id: Option<Uuid>,
+ is_orchestrator: bool,
+) -> Result<(), sqlx::Error> {
+ sqlx::query(
+ r#"
+ UPDATE contracts
+ SET directive_id = $2,
+ is_directive_orchestrator = $3
+ WHERE id = $1
+ "#,
+ )
+ .bind(contract_id)
+ .bind(directive_id)
+ .bind(is_orchestrator)
+ .execute(pool)
+ .await?;
+ Ok(())
+}
+
+/// Get a directive by ID (no owner scoping, for internal use).
+pub async fn get_directive(
+ pool: &PgPool,
+ id: Uuid,
+) -> Result<Option<Directive>, sqlx::Error> {
+ sqlx::query_as::<_, Directive>(
+ r#"SELECT * FROM directives WHERE id = $1"#,
+ )
+ .bind(id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Update chain status.
+pub async fn update_chain_status(
+ pool: &PgPool,
+ chain_id: Uuid,
+ new_status: &str,
+) -> Result<Option<DirectiveChain>, sqlx::Error> {
+ sqlx::query_as::<_, DirectiveChain>(
+ r#"
+ UPDATE directive_chains
+ SET status = $2,
+ completed_at = CASE WHEN $2 IN ('completed', 'failed') THEN NOW() ELSE completed_at END,
+ version = version + 1,
+ updated_at = NOW()
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(chain_id)
+ .bind(new_status)
+ .fetch_optional(pool)
+ .await
+}
diff --git a/makima/src/lib.rs b/makima/src/lib.rs
index 8d3db58..3bc460b 100644
--- a/makima/src/lib.rs
+++ b/makima/src/lib.rs
@@ -3,5 +3,6 @@ pub mod daemon;
pub mod db;
pub mod listen;
pub mod llm;
+pub mod orchestration;
pub mod server;
pub mod tts;
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs
new file mode 100644
index 0000000..d17deeb
--- /dev/null
+++ b/makima/src/orchestration/directive.rs
@@ -0,0 +1,736 @@
+//! Directive orchestration — init, planning completion, chain advancement.
+
+use serde::Deserialize;
+use sqlx::PgPool;
+use uuid::Uuid;
+
+use crate::db::models::{
+ CreateContractRequest, CreateTaskRequest, Directive, Task, UpdateContractRequest,
+};
+use crate::db::repository;
+use crate::server::state::SharedState;
+
+/// A single step in the chain plan produced by the planning supervisor.
+#[derive(Debug, Deserialize)]
+struct ChainPlanStep {
+ name: String,
+ description: String,
+ task_plan: String,
+ #[serde(default)]
+ depends_on: Vec<String>, // names of steps this depends on
+}
+
+/// Wrapper for the plan JSON written by the planning supervisor.
+#[derive(Debug, Deserialize)]
+struct ChainPlan {
+ steps: Vec<ChainPlanStep>,
+}
+
+/// Initialize a directive: create a planning contract and transition to "planning".
+pub async fn init_directive(
+ pool: &PgPool,
+ _state: &SharedState,
+ owner_id: Uuid,
+ directive_id: Uuid,
+) -> Result<Directive, String> {
+ // 1. Get directive, verify status
+ let directive = repository::get_directive_for_owner(pool, directive_id, owner_id)
+ .await
+ .map_err(|e| format!("Failed to get directive: {}", e))?
+ .ok_or("Directive not found")?;
+
+ if directive.status != "draft" {
+ return Err(format!(
+ "Directive must be in 'draft' status to start, current status: '{}'",
+ directive.status
+ ));
+ }
+
+ // 2. Create planning contract
+ let contract = repository::create_contract_for_owner(
+ pool,
+ owner_id,
+ CreateContractRequest {
+ name: format!("{} - Planning", directive.title),
+ description: Some(format!(
+ "Planning contract for directive: {}",
+ directive.title
+ )),
+ contract_type: Some("simple".to_string()),
+ template_id: None,
+ initial_phase: Some("plan".to_string()),
+ autonomous_loop: Some(true),
+ phase_guard: None,
+ local_only: Some(true),
+ auto_merge_local: None,
+ },
+ )
+ .await
+ .map_err(|e| format!("Failed to create planning contract: {}", e))?;
+
+ // 3. Mark contract as directive orchestrator
+ repository::set_contract_directive_fields(pool, contract.id, Some(directive_id), true)
+ .await
+ .map_err(|e| format!("Failed to set contract directive fields: {}", e))?;
+
+ // 4. Build planning prompt
+ let planning_prompt = build_planning_prompt(&directive);
+
+ // 5. Create supervisor task
+ let supervisor_task = repository::create_task_for_owner(
+ pool,
+ owner_id,
+ CreateTaskRequest {
+ contract_id: Some(contract.id),
+ name: format!("{} - Planner", directive.title),
+ description: Some("Decompose directive goal into executable chain steps".to_string()),
+ plan: planning_prompt,
+ parent_task_id: None,
+ is_supervisor: true,
+ priority: 10,
+ repository_url: directive.repository_url.clone(),
+ base_branch: directive.base_branch.clone(),
+ target_branch: None,
+ merge_mode: None,
+ target_repo_path: directive.local_path.clone(),
+ completion_action: None,
+ continue_from_task_id: None,
+ copy_files: None,
+ checkpoint_sha: None,
+ branched_from_task_id: None,
+ conversation_history: None,
+ supervisor_worktree_task_id: None,
+ },
+ )
+ .await
+ .map_err(|e| format!("Failed to create supervisor task: {}", e))?;
+
+ // 6. Link supervisor to contract
+ repository::update_contract_for_owner(
+ pool,
+ contract.id,
+ owner_id,
+ UpdateContractRequest {
+ supervisor_task_id: Some(supervisor_task.id),
+ ..Default::default()
+ },
+ )
+ .await
+ .map_err(|e| match e {
+ crate::db::repository::RepositoryError::Database(e) => {
+ format!("Failed to link supervisor to contract: {}", e)
+ }
+ other => format!("Failed to link supervisor to contract: {:?}", other),
+ })?;
+
+ // 7. Set orchestrator_contract_id on directive
+ repository::set_directive_orchestrator_contract(pool, directive_id, contract.id)
+ .await
+ .map_err(|e| format!("Failed to set orchestrator contract: {}", e))?;
+
+ // 8. Transition directive to "planning"
+ let updated = repository::update_directive_status(pool, directive_id, "planning")
+ .await
+ .map_err(|e| format!("Failed to update directive status: {}", e))?
+ .ok_or("Directive not found after status update")?;
+
+ // 9. Copy repo config to contract if repository_url is set
+ if let Some(ref repo_url) = directive.repository_url {
+ let _ = repository::add_remote_repository(
+ pool,
+ contract.id,
+ "directive-repo",
+ repo_url,
+ true,
+ )
+ .await;
+ } else if let Some(ref local_path) = directive.local_path {
+ let _ = repository::add_local_repository(
+ pool,
+ contract.id,
+ "directive-repo",
+ local_path,
+ true,
+ )
+ .await;
+ }
+
+ tracing::info!(
+ directive_id = %directive_id,
+ contract_id = %contract.id,
+ task_id = %supervisor_task.id,
+ "Directive started: planning contract created"
+ );
+
+ Ok(updated)
+}
+
+/// Called when any task completes — checks if it's directive-related and advances.
+pub async fn on_task_completed(
+ pool: &PgPool,
+ state: &SharedState,
+ task: &Task,
+ owner_id: Uuid,
+) -> Result<(), String> {
+ let Some(contract_id) = task.contract_id else {
+ return Ok(());
+ };
+
+ let contract = repository::get_contract_for_owner(pool, contract_id, owner_id)
+ .await
+ .map_err(|e| format!("Failed to get contract: {}", e))?;
+
+ let Some(contract) = contract else {
+ return Ok(());
+ };
+
+ if contract.is_directive_orchestrator {
+ // This is a planning contract completion
+ let directive =
+ repository::get_directive_by_orchestrator_contract(pool, contract_id)
+ .await
+ .map_err(|e| format!("Failed to get directive by orchestrator: {}", e))?;
+
+ if let Some(directive) = directive {
+ on_planning_completed(pool, state, &directive, task, owner_id).await?;
+ }
+ } else if contract.directive_id.is_some() {
+ // This is a step contract completion
+ on_step_completed(pool, state, &contract, task, owner_id).await?;
+ }
+
+ Ok(())
+}
+
+/// Handle planning task completion: parse chain plan, create steps, advance.
+async fn on_planning_completed(
+ pool: &PgPool,
+ state: &SharedState,
+ directive: &Directive,
+ task: &Task,
+ owner_id: Uuid,
+) -> Result<(), String> {
+ // If task failed, fail the directive
+ if task.status == "failed" {
+ tracing::warn!(
+ directive_id = %directive.id,
+ task_id = %task.id,
+ "Planning task failed, marking directive as failed"
+ );
+ repository::update_directive_status(pool, directive.id, "failed")
+ .await
+ .map_err(|e| format!("Failed to update directive status: {}", e))?;
+ return Ok(());
+ }
+
+ // Only process when the supervisor task itself is done
+ if task.status != "done" || !task.is_supervisor {
+ return Ok(());
+ }
+
+ let Some(contract_id) = task.contract_id else {
+ return Ok(());
+ };
+
+ // Get contract files to find the chain plan
+ let files = repository::list_files_in_contract(pool, contract_id, owner_id)
+ .await
+ .map_err(|e| format!("Failed to list contract files: {}", e))?;
+
+ // Find the chain plan file
+ let plan_file = files.iter().find(|f| {
+ let name_lower = f.name.to_lowercase();
+ name_lower.contains("chain") || name_lower.contains("plan")
+ });
+
+ let plan_file = plan_file.or_else(|| files.first());
+
+ let Some(plan_file) = plan_file else {
+ tracing::warn!(
+ directive_id = %directive.id,
+ "No plan file found in planning contract, marking directive failed"
+ );
+ repository::update_directive_status(pool, directive.id, "failed")
+ .await
+ .map_err(|e| format!("Failed to update directive status: {}", e))?;
+ return Ok(());
+ };
+
+ // Read the full file to get the body content
+ let full_file = repository::get_file(pool, plan_file.id)
+ .await
+ .map_err(|e| format!("Failed to get plan file: {}", e))?
+ .ok_or("Plan file not found")?;
+
+ // Extract JSON from the file body elements
+ let plan_json = extract_plan_json(&full_file.body);
+
+ let Some(plan_json) = plan_json else {
+ tracing::warn!(
+ directive_id = %directive.id,
+ "Could not extract chain plan JSON from file body, marking directive failed"
+ );
+ repository::update_directive_status(pool, directive.id, "failed")
+ .await
+ .map_err(|e| format!("Failed to update directive status: {}", e))?;
+ return Ok(());
+ };
+
+ let chain_plan: ChainPlan = serde_json::from_str(&plan_json).map_err(|e| {
+ format!("Failed to parse chain plan JSON: {}", e)
+ })?;
+
+ if chain_plan.steps.is_empty() {
+ tracing::warn!(
+ directive_id = %directive.id,
+ "Chain plan has no steps, marking directive failed"
+ );
+ repository::update_directive_status(pool, directive.id, "failed")
+ .await
+ .map_err(|e| format!("Failed to update directive status: {}", e))?;
+ return Ok(());
+ }
+
+ // Create chain
+ let chain = repository::create_directive_chain(
+ pool,
+ directive.id,
+ &format!("{} - Chain", directive.title),
+ Some("Auto-generated from planning"),
+ None,
+ chain_plan.steps.len() as i32,
+ )
+ .await
+ .map_err(|e| format!("Failed to create directive chain: {}", e))?;
+
+ // Create steps (two passes: first create all, then resolve dependencies)
+ let mut step_ids: Vec<(String, Uuid)> = Vec::new();
+
+ for (i, plan_step) in chain_plan.steps.iter().enumerate() {
+ let step = repository::create_chain_step(
+ pool,
+ chain.id,
+ &plan_step.name,
+ Some(&plan_step.description),
+ "task",
+ "simple",
+ Some("plan"),
+ Some(&plan_step.task_plan),
+ None, // dependencies set in second pass
+ i as i32,
+ )
+ .await
+ .map_err(|e| format!("Failed to create chain step: {}", e))?;
+
+ step_ids.push((plan_step.name.clone(), step.id));
+ }
+
+ // Second pass: resolve name-based dependencies to UUIDs and update
+ for (i, plan_step) in chain_plan.steps.iter().enumerate() {
+ if plan_step.depends_on.is_empty() {
+ continue;
+ }
+
+ let dep_uuids: Vec<Uuid> = plan_step
+ .depends_on
+ .iter()
+ .filter_map(|dep_name| {
+ step_ids
+ .iter()
+ .find(|(name, _)| name == dep_name)
+ .map(|(_, id)| *id)
+ })
+ .collect();
+
+ if !dep_uuids.is_empty() {
+ let step_id = step_ids[i].1;
+ sqlx::query(
+ "UPDATE chain_steps SET depends_on = $2 WHERE id = $1",
+ )
+ .bind(step_id)
+ .bind(&dep_uuids)
+ .execute(pool)
+ .await
+ .map_err(|e| format!("Failed to update step dependencies: {}", e))?;
+ }
+ }
+
+ // Set current chain on directive
+ repository::set_directive_current_chain(pool, directive.id, chain.id)
+ .await
+ .map_err(|e| format!("Failed to set current chain: {}", e))?;
+
+ // Transition directive to active
+ let updated_directive = repository::update_directive_status(pool, directive.id, "active")
+ .await
+ .map_err(|e| format!("Failed to update directive status: {}", e))?
+ .ok_or("Directive not found after status update")?;
+
+ tracing::info!(
+ directive_id = %directive.id,
+ chain_id = %chain.id,
+ step_count = chain_plan.steps.len(),
+ "Chain plan created, advancing chain"
+ );
+
+ // Advance chain to dispatch ready steps
+ advance_chain(pool, state, &updated_directive, owner_id).await
+}
+
+/// Handle a step contract task completion.
+async fn on_step_completed(
+ pool: &PgPool,
+ state: &SharedState,
+ contract: &crate::db::models::Contract,
+ task: &Task,
+ owner_id: Uuid,
+) -> Result<(), String> {
+ // Only process supervisor task completions
+ if !task.is_supervisor {
+ return Ok(());
+ }
+
+ let Some(directive_id) = contract.directive_id else {
+ return Ok(());
+ };
+
+ // Find the step linked to this contract
+ let step = repository::get_step_by_contract_id(pool, contract.id)
+ .await
+ .map_err(|e| format!("Failed to get step by contract: {}", e))?;
+
+ let Some(step) = step else {
+ return Ok(());
+ };
+
+ // Update step status based on task outcome
+ let new_status = if task.status == "done" {
+ "passed"
+ } else {
+ "failed"
+ };
+
+ repository::update_step_status(pool, step.id, new_status)
+ .await
+ .map_err(|e| format!("Failed to update step status: {}", e))?;
+
+ tracing::info!(
+ directive_id = %directive_id,
+ step_id = %step.id,
+ step_name = %step.name,
+ new_status = new_status,
+ "Step completed"
+ );
+
+ // Get the directive and advance
+ let directive = repository::get_directive(pool, directive_id)
+ .await
+ .map_err(|e| format!("Failed to get directive: {}", e))?
+ .ok_or("Directive not found")?;
+
+ advance_chain(pool, state, &directive, owner_id).await
+}
+
+/// Check chain progress and dispatch ready steps or mark directive complete.
+async fn advance_chain(
+ pool: &PgPool,
+ _state: &SharedState,
+ directive: &Directive,
+ owner_id: Uuid,
+) -> Result<(), String> {
+ let Some(chain_id) = directive.current_chain_id else {
+ return Ok(());
+ };
+
+ let steps = repository::list_steps_for_chain(pool, chain_id)
+ .await
+ .map_err(|e| format!("Failed to list steps: {}", e))?;
+
+ // Check if all steps passed
+ let all_passed = steps.iter().all(|s| s.status == "passed");
+ if all_passed && !steps.is_empty() {
+ repository::update_chain_status(pool, chain_id, "completed")
+ .await
+ .map_err(|e| format!("Failed to update chain status: {}", e))?;
+ repository::update_directive_status(pool, directive.id, "completed")
+ .await
+ .map_err(|e| format!("Failed to update directive status: {}", e))?;
+ tracing::info!(directive_id = %directive.id, "Directive completed: all steps passed");
+ return Ok(());
+ }
+
+ // Check if any step failed
+ let any_failed = steps.iter().any(|s| s.status == "failed");
+ if any_failed {
+ repository::update_chain_status(pool, chain_id, "failed")
+ .await
+ .map_err(|e| format!("Failed to update chain status: {}", e))?;
+ repository::update_directive_status(pool, directive.id, "failed")
+ .await
+ .map_err(|e| format!("Failed to update directive status: {}", e))?;
+ tracing::info!(directive_id = %directive.id, "Directive failed: step failure detected");
+ return Ok(());
+ }
+
+ // Find and dispatch ready steps
+ let ready_steps = repository::find_ready_steps(pool, chain_id)
+ .await
+ .map_err(|e| format!("Failed to find ready steps: {}", e))?;
+
+ for step in ready_steps {
+ if let Err(e) = dispatch_step(pool, directive, &step, owner_id).await {
+ tracing::error!(
+ step_id = %step.id,
+ step_name = %step.name,
+ error = %e,
+ "Failed to dispatch step"
+ );
+ }
+ }
+
+ Ok(())
+}
+
+/// Dispatch a single chain step as a new contract with supervisor.
+async fn dispatch_step(
+ pool: &PgPool,
+ directive: &Directive,
+ step: &crate::db::models::ChainStep,
+ owner_id: Uuid,
+) -> Result<(), String> {
+ // Mark step as running
+ repository::update_step_status(pool, step.id, "running")
+ .await
+ .map_err(|e| format!("Failed to update step status: {}", e))?;
+
+ // Create contract for this step
+ let contract = repository::create_contract_for_owner(
+ pool,
+ owner_id,
+ CreateContractRequest {
+ name: step.name.clone(),
+ description: step.description.clone(),
+ contract_type: Some(step.contract_type.clone()),
+ template_id: None,
+ initial_phase: step.initial_phase.clone(),
+ autonomous_loop: Some(true),
+ phase_guard: None,
+ local_only: Some(true),
+ auto_merge_local: None,
+ },
+ )
+ .await
+ .map_err(|e| format!("Failed to create step contract: {}", e))?;
+
+ // Set directive_id on contract
+ repository::set_contract_directive_fields(pool, contract.id, Some(directive.id), false)
+ .await
+ .map_err(|e| format!("Failed to set contract directive fields: {}", e))?;
+
+ // Build the task plan
+ let task_plan = step
+ .task_plan
+ .clone()
+ .unwrap_or_else(|| format!("Execute step: {}", step.name));
+
+ // Create supervisor task
+ let supervisor_task = repository::create_task_for_owner(
+ pool,
+ owner_id,
+ CreateTaskRequest {
+ contract_id: Some(contract.id),
+ name: format!("{} Supervisor", step.name),
+ description: step.description.clone(),
+ plan: task_plan,
+ parent_task_id: None,
+ is_supervisor: true,
+ priority: 5,
+ repository_url: directive.repository_url.clone(),
+ base_branch: directive.base_branch.clone(),
+ target_branch: None,
+ merge_mode: None,
+ target_repo_path: directive.local_path.clone(),
+ completion_action: None,
+ continue_from_task_id: None,
+ copy_files: None,
+ checkpoint_sha: None,
+ branched_from_task_id: None,
+ conversation_history: None,
+ supervisor_worktree_task_id: None,
+ },
+ )
+ .await
+ .map_err(|e| format!("Failed to create step supervisor task: {}", e))?;
+
+ // Link supervisor to contract
+ repository::update_contract_for_owner(
+ pool,
+ contract.id,
+ owner_id,
+ UpdateContractRequest {
+ supervisor_task_id: Some(supervisor_task.id),
+ ..Default::default()
+ },
+ )
+ .await
+ .map_err(|e| match e {
+ crate::db::repository::RepositoryError::Database(e) => {
+ format!("Failed to link supervisor to step contract: {}", e)
+ }
+ other => format!("Failed to link supervisor to step contract: {:?}", other),
+ })?;
+
+ // Link step to contract/task
+ repository::update_step_contract(pool, step.id, contract.id, supervisor_task.id)
+ .await
+ .map_err(|e| format!("Failed to update step contract link: {}", e))?;
+
+ // Copy repo config from directive to step contract
+ if let Some(ref repo_url) = directive.repository_url {
+ let _ = repository::add_remote_repository(
+ pool,
+ contract.id,
+ "directive-repo",
+ repo_url,
+ true,
+ )
+ .await;
+ } else if let Some(ref local_path) = directive.local_path {
+ let _ = repository::add_local_repository(
+ pool,
+ contract.id,
+ "directive-repo",
+ local_path,
+ true,
+ )
+ .await;
+ }
+
+ tracing::info!(
+ directive_id = %directive.id,
+ step_id = %step.id,
+ step_name = %step.name,
+ contract_id = %contract.id,
+ task_id = %supervisor_task.id,
+ "Step dispatched"
+ );
+
+ Ok(())
+}
+
+/// Build the planning supervisor prompt from a directive.
+fn build_planning_prompt(directive: &Directive) -> String {
+ format!(
+ r#"You are planning the execution of a directive.
+
+DIRECTIVE: {}
+GOAL: {}
+REQUIREMENTS: {}
+ACCEPTANCE CRITERIA: {}
+CONSTRAINTS: {}
+
+Your job is to decompose this goal into a sequence of executable steps.
+Each step will become a separate contract with its own supervisor.
+
+Write a JSON plan to a contract file named "chain-plan" using:
+ makima contract create-file "chain-plan" < plan.json
+
+The JSON format:
+{{
+ "steps": [
+ {{
+ "name": "Step name",
+ "description": "What this step accomplishes",
+ "task_plan": "Detailed instructions for the step's supervisor",
+ "depends_on": []
+ }}
+ ]
+}}
+
+Rules:
+- Steps with no dependencies (empty depends_on array) will run in parallel.
+- Steps that depend on other steps will wait until those complete.
+- The depends_on array contains names of steps this step depends on.
+- Each step should be a self-contained unit of work.
+- Be specific in task_plan — include file paths, function names, and acceptance criteria where possible.
+- Keep the number of steps reasonable (3-10 typically).
+
+After writing the plan file, mark the contract as complete using:
+ makima supervisor complete"#,
+ directive.title,
+ directive.goal,
+ serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(),
+ serde_json::to_string_pretty(&directive.acceptance_criteria).unwrap_or_default(),
+ serde_json::to_string_pretty(&directive.constraints).unwrap_or_default(),
+ )
+}
+
+/// Extract JSON from file body elements.
+fn extract_plan_json(body: &[crate::db::models::BodyElement]) -> Option<String> {
+ use crate::db::models::BodyElement;
+
+ for element in body {
+ match element {
+ BodyElement::Code { content, .. } => {
+ // Try to parse as JSON
+ let trimmed = content.trim();
+ if trimmed.starts_with('{') || trimmed.starts_with('[') {
+ if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
+ return Some(trimmed.to_string());
+ }
+ }
+ }
+ BodyElement::Paragraph { text } => {
+ let trimmed = text.trim();
+ if trimmed.starts_with('{') || trimmed.starts_with('[') {
+ if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
+ return Some(trimmed.to_string());
+ }
+ }
+ }
+ BodyElement::Markdown { content } => {
+ // Try to find JSON in markdown content
+ let trimmed = content.trim();
+ if trimmed.starts_with('{') || trimmed.starts_with('[') {
+ if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
+ return Some(trimmed.to_string());
+ }
+ }
+ // Try to find JSON in code blocks within markdown
+ if let Some(json_start) = trimmed.find("```json") {
+ let after = &trimmed[json_start + 7..];
+ if let Some(json_end) = after.find("```") {
+ let json_str = after[..json_end].trim();
+ if serde_json::from_str::<serde_json::Value>(json_str).is_ok() {
+ return Some(json_str.to_string());
+ }
+ }
+ }
+ }
+ _ => {}
+ }
+ }
+
+ // Fallback: concatenate all text content and try to find JSON
+ let all_text: String = body
+ .iter()
+ .map(|el| match el {
+ BodyElement::Code { content, .. } => content.clone(),
+ BodyElement::Paragraph { text } => text.clone(),
+ BodyElement::Markdown { content } => content.clone(),
+ _ => String::new(),
+ })
+ .collect::<Vec<_>>()
+ .join("\n");
+
+ let trimmed = all_text.trim();
+ if let Some(start) = trimmed.find('{') {
+ // Find matching closing brace
+ let substr = &trimmed[start..];
+ if serde_json::from_str::<serde_json::Value>(substr).is_ok() {
+ return Some(substr.to_string());
+ }
+ }
+
+ None
+}
diff --git a/makima/src/orchestration/mod.rs b/makima/src/orchestration/mod.rs
new file mode 100644
index 0000000..e7ffb70
--- /dev/null
+++ b/makima/src/orchestration/mod.rs
@@ -0,0 +1 @@
+pub mod directive;
diff --git a/makima/src/server/handlers/directives.rs b/makima/src/server/handlers/directives.rs
index a74f8ff..560151b 100644
--- a/makima/src/server/handlers/directives.rs
+++ b/makima/src/server/handlers/directives.rs
@@ -13,6 +13,7 @@ use crate::db::models::{
DirectiveListResponse, DirectiveWithChains, UpdateDirectiveRequest,
};
use crate::db::repository::{self, RepositoryError};
+use crate::orchestration;
use crate::server::auth::Authenticated;
use crate::server::messages::ApiError;
use crate::server::state::SharedState;
@@ -438,3 +439,60 @@ pub async fn get_chain(
Json(ChainWithSteps { chain, steps }).into_response()
}
+
+/// Start a directive: create a planning contract and begin orchestration.
+#[utoipa::path(
+ post,
+ path = "/api/v1/directives/{id}/start",
+ params(
+ ("id" = Uuid, Path, description = "Directive ID")
+ ),
+ responses(
+ (status = 200, description = "Directive started", body = Directive),
+ (status = 400, description = "Directive not in draft status", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Directive not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Directives"
+)]
+pub async fn start_directive(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ match orchestration::directive::init_directive(pool, &state, auth.owner_id, id).await {
+ Ok(directive) => Json(directive).into_response(),
+ Err(e) if e.contains("not found") => (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", e)),
+ )
+ .into_response(),
+ Err(e) if e.contains("must be in 'draft'") => (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new("INVALID_STATUS", e)),
+ )
+ .into_response(),
+ Err(e) => {
+ tracing::error!("Failed to start directive {}: {}", id, e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("START_FAILED", e)),
+ )
+ .into_response()
+ }
+ }
+}
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index beb4c15..767d059 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -1303,7 +1303,16 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
}),
).await;
- // TODO: Directive engine integration (removed for reimplementation)
+ // Directive engine integration
+ if let Err(e) = crate::orchestration::directive::on_task_completed(
+ &pool, &state, &updated_task, owner_id,
+ ).await {
+ tracing::warn!(
+ task_id = %task_id,
+ error = %e,
+ "Failed to process directive task completion"
+ );
+ }
}
Ok(None) => {
tracing::warn!(
diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs
index a429612..1a59e12 100644
--- a/makima/src/server/mod.rs
+++ b/makima/src/server/mod.rs
@@ -181,6 +181,7 @@ pub fn make_router(state: SharedState) -> Router {
.put(directives::update_directive)
.delete(directives::delete_directive),
)
+ .route("/directives/{id}/start", post(directives::start_directive))
.route("/directives/{id}/chains", get(directives::list_chains))
.route("/directives/{id}/chains/{chain_id}", get(directives::get_chain))
// Contract supervisor resume endpoints
diff --git a/makima/src/server/openapi.rs b/makima/src/server/openapi.rs
index 0e6912a..96c19e0 100644
--- a/makima/src/server/openapi.rs
+++ b/makima/src/server/openapi.rs
@@ -111,6 +111,7 @@ use crate::server::messages::{ApiError, AudioEncoding, StartMessage, StopMessage
directives::create_directive,
directives::update_directive,
directives::delete_directive,
+ directives::start_directive,
directives::list_chains,
directives::get_chain,
),