diff options
| author | soryu <soryu@soryu.co> | 2026-02-07 16:36:19 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-02-07 16:36:19 +0000 |
| commit | 1b72449496ce3a057a43d002c8042d5e7a1d6576 (patch) | |
| tree | f9151df7cc5128499ee91aafde3ff3c3b3281c1e /makima | |
| parent | 9e9f18884c78c21f5785908fb7ccd00e2fa5436b (diff) | |
| download | soryu-1b72449496ce3a057a43d002c8042d5e7a1d6576.tar.gz soryu-1b72449496ce3a057a43d002c8042d5e7a1d6576.zip | |
Add directive init mechanism
Diffstat (limited to 'makima')
| -rw-r--r-- | makima/frontend/src/components/directives/DirectiveDetail.tsx | 28 | ||||
| -rw-r--r-- | makima/frontend/src/hooks/useDirectives.ts | 19 | ||||
| -rw-r--r-- | makima/frontend/src/lib/api.ts | 12 | ||||
| -rw-r--r-- | makima/frontend/src/routes/directives.tsx | 16 | ||||
| -rw-r--r-- | makima/src/bin/makima.rs | 5 | ||||
| -rw-r--r-- | makima/src/daemon/api/directive.rs | 6 | ||||
| -rw-r--r-- | makima/src/daemon/cli/mod.rs | 3 | ||||
| -rw-r--r-- | makima/src/db/repository.rs | 296 | ||||
| -rw-r--r-- | makima/src/lib.rs | 1 | ||||
| -rw-r--r-- | makima/src/orchestration/directive.rs | 736 | ||||
| -rw-r--r-- | makima/src/orchestration/mod.rs | 1 | ||||
| -rw-r--r-- | makima/src/server/handlers/directives.rs | 58 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 11 | ||||
| -rw-r--r-- | makima/src/server/mod.rs | 1 | ||||
| -rw-r--r-- | makima/src/server/openapi.rs | 1 |
15 files changed, 1185 insertions, 9 deletions
diff --git a/makima/frontend/src/components/directives/DirectiveDetail.tsx b/makima/frontend/src/components/directives/DirectiveDetail.tsx index 3634a79..e519b92 100644 --- a/makima/frontend/src/components/directives/DirectiveDetail.tsx +++ b/makima/frontend/src/components/directives/DirectiveDetail.tsx @@ -8,6 +8,7 @@ interface DirectiveDetailProps { directive: DirectiveWithChains; onBack: () => void; onDelete?: (id: string) => void; + onStart?: (id: string) => void; } const statusColors: Record<DirectiveStatus, string> = { @@ -79,6 +80,7 @@ export function DirectiveDetail({ directive, onBack, onDelete, + onStart, }: DirectiveDetailProps) { return ( <div className="panel h-full flex flex-col"> @@ -101,14 +103,24 @@ export function DirectiveDetail({ <span className="font-mono text-[10px] text-[#7788aa]"> v{directive.version} </span> - {onDelete && ( - <button - onClick={() => onDelete(directive.id)} - className="ml-auto font-mono text-[10px] text-red-400 hover:text-red-300 transition-colors uppercase" - > - Delete - </button> - )} + <div className="ml-auto flex gap-2"> + {onStart && directive.status === "draft" && ( + <button + onClick={() => onStart(directive.id)} + className="font-mono text-[10px] text-green-400 hover:text-green-300 transition-colors uppercase" + > + Start + </button> + )} + {onDelete && ( + <button + onClick={() => onDelete(directive.id)} + className="font-mono text-[10px] text-red-400 hover:text-red-300 transition-colors uppercase" + > + Delete + </button> + )} + </div> </div> <h2 className="font-mono text-sm text-[#dbe7ff]"> {directive.title} diff --git a/makima/frontend/src/hooks/useDirectives.ts b/makima/frontend/src/hooks/useDirectives.ts index 001cf89..af1c8c6 100644 --- a/makima/frontend/src/hooks/useDirectives.ts +++ b/makima/frontend/src/hooks/useDirectives.ts @@ -5,6 +5,7 @@ import { createDirective, updateDirective, deleteDirective, + startDirective as startDirectiveApi, type DirectiveSummary, type DirectiveWithChains, type CreateDirectiveRequest, @@ -90,6 +91,23 @@ export function useDirectives() { [fetchDirectives] ); + const startDirective = useCallback( + async (id: string): Promise<boolean> => { + setError(null); + try { + await startDirectiveApi(id); + await fetchDirectives(); + return true; + } catch (e) { + setError( + e instanceof Error ? e.message : "Failed to start directive" + ); + return false; + } + }, + [fetchDirectives] + ); + // Initial fetch useEffect(() => { fetchDirectives(); @@ -104,5 +122,6 @@ export function useDirectives() { saveDirective, editDirective, removeDirective, + startDirective, }; } diff --git a/makima/frontend/src/lib/api.ts b/makima/frontend/src/lib/api.ts index 6c450eb..2187c34 100644 --- a/makima/frontend/src/lib/api.ts +++ b/makima/frontend/src/lib/api.ts @@ -3161,3 +3161,15 @@ export async function deleteDirective(id: string): Promise<void> { } } +export async function startDirective(id: string): Promise<Directive> { + const res = await authFetch(`${API_BASE}/api/v1/directives/${id}/start`, { + method: "POST", + }); + if (!res.ok) { + const body = await res.json().catch(() => null); + const msg = body?.message || res.statusText; + throw new Error(`Failed to start directive: ${msg}`); + } + return res.json(); +} + diff --git a/makima/frontend/src/routes/directives.tsx b/makima/frontend/src/routes/directives.tsx index 89535e2..8b82f99 100644 --- a/makima/frontend/src/routes/directives.tsx +++ b/makima/frontend/src/routes/directives.tsx @@ -50,6 +50,7 @@ function DirectivesContent() { fetchDirective, saveDirective, removeDirective, + startDirective, } = useDirectives(); const [selectedDirective, setSelectedDirective] = @@ -114,6 +115,20 @@ function DirectivesContent() { [removeDirective, id, navigate] ); + const handleStart = useCallback( + async (directiveId: string) => { + const ok = await startDirective(directiveId); + if (ok) { + // Refresh the detail view + const updated = await fetchDirective(directiveId); + if (updated) { + setSelectedDirective(updated); + } + } + }, + [startDirective, fetchDirective] + ); + // Detail view if (id) { if (detailLoading) { @@ -136,6 +151,7 @@ function DirectivesContent() { directive={selectedDirective} onBack={handleBack} onDelete={handleDelete} + onStart={handleStart} /> </main> ); diff --git a/makima/src/bin/makima.rs b/makima/src/bin/makima.rs index 308d689..9d7f847 100644 --- a/makima/src/bin/makima.rs +++ b/makima/src/bin/makima.rs @@ -768,6 +768,11 @@ async fn run_directive( .await?; println!("{}", serde_json::to_string(&result.0)?); } + DirectiveCommand::Start(args) => { + let client = ApiClient::new(args.api_url, args.api_key)?; + let result = client.directive_start(args.directive_id).await?; + println!("{}", serde_json::to_string(&result.0)?); + } } Ok(()) diff --git a/makima/src/daemon/api/directive.rs b/makima/src/daemon/api/directive.rs index 0c8115a..42f6f45 100644 --- a/makima/src/daemon/api/directive.rs +++ b/makima/src/daemon/api/directive.rs @@ -51,4 +51,10 @@ impl ApiClient { self.put(&format!("/api/v1/directives/{}", directive_id), &req) .await } + + /// Start a directive (transition from draft to planning). + pub async fn directive_start(&self, directive_id: Uuid) -> Result<JsonValue, ApiError> { + self.post_empty(&format!("/api/v1/directives/{}/start", directive_id)) + .await + } } diff --git a/makima/src/daemon/cli/mod.rs b/makima/src/daemon/cli/mod.rs index 9fba216..b07ab5a 100644 --- a/makima/src/daemon/cli/mod.rs +++ b/makima/src/daemon/cli/mod.rs @@ -222,6 +222,9 @@ pub enum DirectiveCommand { /// Update directive status UpdateStatus(directive::UpdateStatusArgs), + + /// Start a directive (create planning contract and begin orchestration) + Start(DirectiveArgs), } impl Cli { diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 5949079..e072eb8 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -5183,3 +5183,299 @@ pub async fn list_steps_for_chain( .fetch_all(pool) .await } + +// ── Directive orchestration functions ─────────────────────────────────────── + +/// Update directive status with automatic timestamp management. +pub async fn update_directive_status( + pool: &PgPool, + id: Uuid, + new_status: &str, +) -> Result<Option<Directive>, sqlx::Error> { + sqlx::query_as::<_, Directive>( + r#" + UPDATE directives + SET status = $2, + started_at = CASE WHEN $2 = 'active' AND started_at IS NULL THEN NOW() ELSE started_at END, + completed_at = CASE WHEN $2 IN ('completed', 'failed') THEN NOW() ELSE completed_at END, + version = version + 1, + updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(id) + .bind(new_status) + .fetch_optional(pool) + .await +} + +/// Set the orchestrator contract ID on a directive. +pub async fn set_directive_orchestrator_contract( + pool: &PgPool, + directive_id: Uuid, + contract_id: Uuid, +) -> Result<Option<Directive>, sqlx::Error> { + sqlx::query_as::<_, Directive>( + r#" + UPDATE directives + SET orchestrator_contract_id = $2, + version = version + 1, + updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(directive_id) + .bind(contract_id) + .fetch_optional(pool) + .await +} + +/// Set the current chain ID on a directive and increment chain_generation_count. +pub async fn set_directive_current_chain( + pool: &PgPool, + directive_id: Uuid, + chain_id: Uuid, +) -> Result<Option<Directive>, sqlx::Error> { + sqlx::query_as::<_, Directive>( + r#" + UPDATE directives + SET current_chain_id = $2, + chain_generation_count = chain_generation_count + 1, + version = version + 1, + updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(directive_id) + .bind(chain_id) + .fetch_optional(pool) + .await +} + +/// Create a new directive chain. +pub async fn create_directive_chain( + pool: &PgPool, + directive_id: Uuid, + name: &str, + description: Option<&str>, + rationale: Option<&str>, + total_steps: i32, +) -> Result<DirectiveChain, sqlx::Error> { + // Get next generation number + let next_gen: (i64,) = sqlx::query_as( + "SELECT COALESCE(MAX(generation), 0) + 1 FROM directive_chains WHERE directive_id = $1", + ) + .bind(directive_id) + .fetch_one(pool) + .await?; + + sqlx::query_as::<_, DirectiveChain>( + r#" + INSERT INTO directive_chains (directive_id, generation, name, description, rationale, total_steps, status) + VALUES ($1, $2, $3, $4, $5, $6, 'active') + RETURNING * + "#, + ) + .bind(directive_id) + .bind(next_gen.0 as i32) + .bind(name) + .bind(description) + .bind(rationale) + .bind(total_steps) + .fetch_one(pool) + .await +} + +/// Create a chain step. +pub async fn create_chain_step( + pool: &PgPool, + chain_id: Uuid, + name: &str, + description: Option<&str>, + step_type: &str, + contract_type: &str, + initial_phase: Option<&str>, + task_plan: Option<&str>, + depends_on: Option<Vec<Uuid>>, + order_index: i32, +) -> Result<ChainStep, sqlx::Error> { + sqlx::query_as::<_, ChainStep>( + r#" + INSERT INTO chain_steps (chain_id, name, description, step_type, contract_type, initial_phase, task_plan, depends_on, order_index, status) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, 'pending') + RETURNING * + "#, + ) + .bind(chain_id) + .bind(name) + .bind(description) + .bind(step_type) + .bind(contract_type) + .bind(initial_phase) + .bind(task_plan) + .bind(depends_on.as_deref()) + .bind(order_index) + .fetch_one(pool) + .await +} + +/// Update a chain step's status with automatic timestamp management. +pub async fn update_step_status( + pool: &PgPool, + step_id: Uuid, + new_status: &str, +) -> Result<Option<ChainStep>, sqlx::Error> { + sqlx::query_as::<_, ChainStep>( + r#" + UPDATE chain_steps + SET status = $2, + started_at = CASE WHEN $2 = 'running' AND started_at IS NULL THEN NOW() ELSE started_at END, + completed_at = CASE WHEN $2 IN ('passed', 'failed') THEN NOW() ELSE completed_at END + WHERE id = $1 + RETURNING * + "#, + ) + .bind(step_id) + .bind(new_status) + .fetch_optional(pool) + .await +} + +/// Link a chain step to a contract and supervisor task. +pub async fn update_step_contract( + pool: &PgPool, + step_id: Uuid, + contract_id: Uuid, + supervisor_task_id: Uuid, +) -> Result<Option<ChainStep>, sqlx::Error> { + sqlx::query_as::<_, ChainStep>( + r#" + UPDATE chain_steps + SET contract_id = $2, + supervisor_task_id = $3 + WHERE id = $1 + RETURNING * + "#, + ) + .bind(step_id) + .bind(contract_id) + .bind(supervisor_task_id) + .fetch_optional(pool) + .await +} + +/// Find steps that are ready to execute (pending, with all dependencies passed). +pub async fn find_ready_steps( + pool: &PgPool, + chain_id: Uuid, +) -> Result<Vec<ChainStep>, sqlx::Error> { + sqlx::query_as::<_, ChainStep>( + r#" + SELECT * FROM chain_steps + WHERE chain_id = $1 + AND status = 'pending' + AND ( + depends_on IS NULL + OR array_length(depends_on, 1) IS NULL + OR NOT EXISTS ( + SELECT 1 FROM unnest(depends_on) AS dep_id + WHERE dep_id NOT IN ( + SELECT id FROM chain_steps WHERE chain_id = $1 AND status = 'passed' + ) + ) + ) + ORDER BY order_index ASC + "#, + ) + .bind(chain_id) + .fetch_all(pool) + .await +} + +/// Get a chain step by its linked contract ID. +pub async fn get_step_by_contract_id( + pool: &PgPool, + contract_id: Uuid, +) -> Result<Option<ChainStep>, sqlx::Error> { + sqlx::query_as::<_, ChainStep>( + r#"SELECT * FROM chain_steps WHERE contract_id = $1"#, + ) + .bind(contract_id) + .fetch_optional(pool) + .await +} + +/// Get a directive by its orchestrator contract ID. +pub async fn get_directive_by_orchestrator_contract( + pool: &PgPool, + contract_id: Uuid, +) -> Result<Option<Directive>, sqlx::Error> { + sqlx::query_as::<_, Directive>( + r#"SELECT * FROM directives WHERE orchestrator_contract_id = $1"#, + ) + .bind(contract_id) + .fetch_optional(pool) + .await +} + +/// Set directive-related fields on a contract (directive_id, is_directive_orchestrator). +pub async fn set_contract_directive_fields( + pool: &PgPool, + contract_id: Uuid, + directive_id: Option<Uuid>, + is_orchestrator: bool, +) -> Result<(), sqlx::Error> { + sqlx::query( + r#" + UPDATE contracts + SET directive_id = $2, + is_directive_orchestrator = $3 + WHERE id = $1 + "#, + ) + .bind(contract_id) + .bind(directive_id) + .bind(is_orchestrator) + .execute(pool) + .await?; + Ok(()) +} + +/// Get a directive by ID (no owner scoping, for internal use). +pub async fn get_directive( + pool: &PgPool, + id: Uuid, +) -> Result<Option<Directive>, sqlx::Error> { + sqlx::query_as::<_, Directive>( + r#"SELECT * FROM directives WHERE id = $1"#, + ) + .bind(id) + .fetch_optional(pool) + .await +} + +/// Update chain status. +pub async fn update_chain_status( + pool: &PgPool, + chain_id: Uuid, + new_status: &str, +) -> Result<Option<DirectiveChain>, sqlx::Error> { + sqlx::query_as::<_, DirectiveChain>( + r#" + UPDATE directive_chains + SET status = $2, + completed_at = CASE WHEN $2 IN ('completed', 'failed') THEN NOW() ELSE completed_at END, + version = version + 1, + updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(chain_id) + .bind(new_status) + .fetch_optional(pool) + .await +} diff --git a/makima/src/lib.rs b/makima/src/lib.rs index 8d3db58..3bc460b 100644 --- a/makima/src/lib.rs +++ b/makima/src/lib.rs @@ -3,5 +3,6 @@ pub mod daemon; pub mod db; pub mod listen; pub mod llm; +pub mod orchestration; pub mod server; pub mod tts; diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs new file mode 100644 index 0000000..d17deeb --- /dev/null +++ b/makima/src/orchestration/directive.rs @@ -0,0 +1,736 @@ +//! Directive orchestration — init, planning completion, chain advancement. + +use serde::Deserialize; +use sqlx::PgPool; +use uuid::Uuid; + +use crate::db::models::{ + CreateContractRequest, CreateTaskRequest, Directive, Task, UpdateContractRequest, +}; +use crate::db::repository; +use crate::server::state::SharedState; + +/// A single step in the chain plan produced by the planning supervisor. +#[derive(Debug, Deserialize)] +struct ChainPlanStep { + name: String, + description: String, + task_plan: String, + #[serde(default)] + depends_on: Vec<String>, // names of steps this depends on +} + +/// Wrapper for the plan JSON written by the planning supervisor. +#[derive(Debug, Deserialize)] +struct ChainPlan { + steps: Vec<ChainPlanStep>, +} + +/// Initialize a directive: create a planning contract and transition to "planning". +pub async fn init_directive( + pool: &PgPool, + _state: &SharedState, + owner_id: Uuid, + directive_id: Uuid, +) -> Result<Directive, String> { + // 1. Get directive, verify status + let directive = repository::get_directive_for_owner(pool, directive_id, owner_id) + .await + .map_err(|e| format!("Failed to get directive: {}", e))? + .ok_or("Directive not found")?; + + if directive.status != "draft" { + return Err(format!( + "Directive must be in 'draft' status to start, current status: '{}'", + directive.status + )); + } + + // 2. Create planning contract + let contract = repository::create_contract_for_owner( + pool, + owner_id, + CreateContractRequest { + name: format!("{} - Planning", directive.title), + description: Some(format!( + "Planning contract for directive: {}", + directive.title + )), + contract_type: Some("simple".to_string()), + template_id: None, + initial_phase: Some("plan".to_string()), + autonomous_loop: Some(true), + phase_guard: None, + local_only: Some(true), + auto_merge_local: None, + }, + ) + .await + .map_err(|e| format!("Failed to create planning contract: {}", e))?; + + // 3. Mark contract as directive orchestrator + repository::set_contract_directive_fields(pool, contract.id, Some(directive_id), true) + .await + .map_err(|e| format!("Failed to set contract directive fields: {}", e))?; + + // 4. Build planning prompt + let planning_prompt = build_planning_prompt(&directive); + + // 5. Create supervisor task + let supervisor_task = repository::create_task_for_owner( + pool, + owner_id, + CreateTaskRequest { + contract_id: Some(contract.id), + name: format!("{} - Planner", directive.title), + description: Some("Decompose directive goal into executable chain steps".to_string()), + plan: planning_prompt, + parent_task_id: None, + is_supervisor: true, + priority: 10, + repository_url: directive.repository_url.clone(), + base_branch: directive.base_branch.clone(), + target_branch: None, + merge_mode: None, + target_repo_path: directive.local_path.clone(), + completion_action: None, + continue_from_task_id: None, + copy_files: None, + checkpoint_sha: None, + branched_from_task_id: None, + conversation_history: None, + supervisor_worktree_task_id: None, + }, + ) + .await + .map_err(|e| format!("Failed to create supervisor task: {}", e))?; + + // 6. Link supervisor to contract + repository::update_contract_for_owner( + pool, + contract.id, + owner_id, + UpdateContractRequest { + supervisor_task_id: Some(supervisor_task.id), + ..Default::default() + }, + ) + .await + .map_err(|e| match e { + crate::db::repository::RepositoryError::Database(e) => { + format!("Failed to link supervisor to contract: {}", e) + } + other => format!("Failed to link supervisor to contract: {:?}", other), + })?; + + // 7. Set orchestrator_contract_id on directive + repository::set_directive_orchestrator_contract(pool, directive_id, contract.id) + .await + .map_err(|e| format!("Failed to set orchestrator contract: {}", e))?; + + // 8. Transition directive to "planning" + let updated = repository::update_directive_status(pool, directive_id, "planning") + .await + .map_err(|e| format!("Failed to update directive status: {}", e))? + .ok_or("Directive not found after status update")?; + + // 9. Copy repo config to contract if repository_url is set + if let Some(ref repo_url) = directive.repository_url { + let _ = repository::add_remote_repository( + pool, + contract.id, + "directive-repo", + repo_url, + true, + ) + .await; + } else if let Some(ref local_path) = directive.local_path { + let _ = repository::add_local_repository( + pool, + contract.id, + "directive-repo", + local_path, + true, + ) + .await; + } + + tracing::info!( + directive_id = %directive_id, + contract_id = %contract.id, + task_id = %supervisor_task.id, + "Directive started: planning contract created" + ); + + Ok(updated) +} + +/// Called when any task completes — checks if it's directive-related and advances. +pub async fn on_task_completed( + pool: &PgPool, + state: &SharedState, + task: &Task, + owner_id: Uuid, +) -> Result<(), String> { + let Some(contract_id) = task.contract_id else { + return Ok(()); + }; + + let contract = repository::get_contract_for_owner(pool, contract_id, owner_id) + .await + .map_err(|e| format!("Failed to get contract: {}", e))?; + + let Some(contract) = contract else { + return Ok(()); + }; + + if contract.is_directive_orchestrator { + // This is a planning contract completion + let directive = + repository::get_directive_by_orchestrator_contract(pool, contract_id) + .await + .map_err(|e| format!("Failed to get directive by orchestrator: {}", e))?; + + if let Some(directive) = directive { + on_planning_completed(pool, state, &directive, task, owner_id).await?; + } + } else if contract.directive_id.is_some() { + // This is a step contract completion + on_step_completed(pool, state, &contract, task, owner_id).await?; + } + + Ok(()) +} + +/// Handle planning task completion: parse chain plan, create steps, advance. +async fn on_planning_completed( + pool: &PgPool, + state: &SharedState, + directive: &Directive, + task: &Task, + owner_id: Uuid, +) -> Result<(), String> { + // If task failed, fail the directive + if task.status == "failed" { + tracing::warn!( + directive_id = %directive.id, + task_id = %task.id, + "Planning task failed, marking directive as failed" + ); + repository::update_directive_status(pool, directive.id, "failed") + .await + .map_err(|e| format!("Failed to update directive status: {}", e))?; + return Ok(()); + } + + // Only process when the supervisor task itself is done + if task.status != "done" || !task.is_supervisor { + return Ok(()); + } + + let Some(contract_id) = task.contract_id else { + return Ok(()); + }; + + // Get contract files to find the chain plan + let files = repository::list_files_in_contract(pool, contract_id, owner_id) + .await + .map_err(|e| format!("Failed to list contract files: {}", e))?; + + // Find the chain plan file + let plan_file = files.iter().find(|f| { + let name_lower = f.name.to_lowercase(); + name_lower.contains("chain") || name_lower.contains("plan") + }); + + let plan_file = plan_file.or_else(|| files.first()); + + let Some(plan_file) = plan_file else { + tracing::warn!( + directive_id = %directive.id, + "No plan file found in planning contract, marking directive failed" + ); + repository::update_directive_status(pool, directive.id, "failed") + .await + .map_err(|e| format!("Failed to update directive status: {}", e))?; + return Ok(()); + }; + + // Read the full file to get the body content + let full_file = repository::get_file(pool, plan_file.id) + .await + .map_err(|e| format!("Failed to get plan file: {}", e))? + .ok_or("Plan file not found")?; + + // Extract JSON from the file body elements + let plan_json = extract_plan_json(&full_file.body); + + let Some(plan_json) = plan_json else { + tracing::warn!( + directive_id = %directive.id, + "Could not extract chain plan JSON from file body, marking directive failed" + ); + repository::update_directive_status(pool, directive.id, "failed") + .await + .map_err(|e| format!("Failed to update directive status: {}", e))?; + return Ok(()); + }; + + let chain_plan: ChainPlan = serde_json::from_str(&plan_json).map_err(|e| { + format!("Failed to parse chain plan JSON: {}", e) + })?; + + if chain_plan.steps.is_empty() { + tracing::warn!( + directive_id = %directive.id, + "Chain plan has no steps, marking directive failed" + ); + repository::update_directive_status(pool, directive.id, "failed") + .await + .map_err(|e| format!("Failed to update directive status: {}", e))?; + return Ok(()); + } + + // Create chain + let chain = repository::create_directive_chain( + pool, + directive.id, + &format!("{} - Chain", directive.title), + Some("Auto-generated from planning"), + None, + chain_plan.steps.len() as i32, + ) + .await + .map_err(|e| format!("Failed to create directive chain: {}", e))?; + + // Create steps (two passes: first create all, then resolve dependencies) + let mut step_ids: Vec<(String, Uuid)> = Vec::new(); + + for (i, plan_step) in chain_plan.steps.iter().enumerate() { + let step = repository::create_chain_step( + pool, + chain.id, + &plan_step.name, + Some(&plan_step.description), + "task", + "simple", + Some("plan"), + Some(&plan_step.task_plan), + None, // dependencies set in second pass + i as i32, + ) + .await + .map_err(|e| format!("Failed to create chain step: {}", e))?; + + step_ids.push((plan_step.name.clone(), step.id)); + } + + // Second pass: resolve name-based dependencies to UUIDs and update + for (i, plan_step) in chain_plan.steps.iter().enumerate() { + if plan_step.depends_on.is_empty() { + continue; + } + + let dep_uuids: Vec<Uuid> = plan_step + .depends_on + .iter() + .filter_map(|dep_name| { + step_ids + .iter() + .find(|(name, _)| name == dep_name) + .map(|(_, id)| *id) + }) + .collect(); + + if !dep_uuids.is_empty() { + let step_id = step_ids[i].1; + sqlx::query( + "UPDATE chain_steps SET depends_on = $2 WHERE id = $1", + ) + .bind(step_id) + .bind(&dep_uuids) + .execute(pool) + .await + .map_err(|e| format!("Failed to update step dependencies: {}", e))?; + } + } + + // Set current chain on directive + repository::set_directive_current_chain(pool, directive.id, chain.id) + .await + .map_err(|e| format!("Failed to set current chain: {}", e))?; + + // Transition directive to active + let updated_directive = repository::update_directive_status(pool, directive.id, "active") + .await + .map_err(|e| format!("Failed to update directive status: {}", e))? + .ok_or("Directive not found after status update")?; + + tracing::info!( + directive_id = %directive.id, + chain_id = %chain.id, + step_count = chain_plan.steps.len(), + "Chain plan created, advancing chain" + ); + + // Advance chain to dispatch ready steps + advance_chain(pool, state, &updated_directive, owner_id).await +} + +/// Handle a step contract task completion. +async fn on_step_completed( + pool: &PgPool, + state: &SharedState, + contract: &crate::db::models::Contract, + task: &Task, + owner_id: Uuid, +) -> Result<(), String> { + // Only process supervisor task completions + if !task.is_supervisor { + return Ok(()); + } + + let Some(directive_id) = contract.directive_id else { + return Ok(()); + }; + + // Find the step linked to this contract + let step = repository::get_step_by_contract_id(pool, contract.id) + .await + .map_err(|e| format!("Failed to get step by contract: {}", e))?; + + let Some(step) = step else { + return Ok(()); + }; + + // Update step status based on task outcome + let new_status = if task.status == "done" { + "passed" + } else { + "failed" + }; + + repository::update_step_status(pool, step.id, new_status) + .await + .map_err(|e| format!("Failed to update step status: {}", e))?; + + tracing::info!( + directive_id = %directive_id, + step_id = %step.id, + step_name = %step.name, + new_status = new_status, + "Step completed" + ); + + // Get the directive and advance + let directive = repository::get_directive(pool, directive_id) + .await + .map_err(|e| format!("Failed to get directive: {}", e))? + .ok_or("Directive not found")?; + + advance_chain(pool, state, &directive, owner_id).await +} + +/// Check chain progress and dispatch ready steps or mark directive complete. +async fn advance_chain( + pool: &PgPool, + _state: &SharedState, + directive: &Directive, + owner_id: Uuid, +) -> Result<(), String> { + let Some(chain_id) = directive.current_chain_id else { + return Ok(()); + }; + + let steps = repository::list_steps_for_chain(pool, chain_id) + .await + .map_err(|e| format!("Failed to list steps: {}", e))?; + + // Check if all steps passed + let all_passed = steps.iter().all(|s| s.status == "passed"); + if all_passed && !steps.is_empty() { + repository::update_chain_status(pool, chain_id, "completed") + .await + .map_err(|e| format!("Failed to update chain status: {}", e))?; + repository::update_directive_status(pool, directive.id, "completed") + .await + .map_err(|e| format!("Failed to update directive status: {}", e))?; + tracing::info!(directive_id = %directive.id, "Directive completed: all steps passed"); + return Ok(()); + } + + // Check if any step failed + let any_failed = steps.iter().any(|s| s.status == "failed"); + if any_failed { + repository::update_chain_status(pool, chain_id, "failed") + .await + .map_err(|e| format!("Failed to update chain status: {}", e))?; + repository::update_directive_status(pool, directive.id, "failed") + .await + .map_err(|e| format!("Failed to update directive status: {}", e))?; + tracing::info!(directive_id = %directive.id, "Directive failed: step failure detected"); + return Ok(()); + } + + // Find and dispatch ready steps + let ready_steps = repository::find_ready_steps(pool, chain_id) + .await + .map_err(|e| format!("Failed to find ready steps: {}", e))?; + + for step in ready_steps { + if let Err(e) = dispatch_step(pool, directive, &step, owner_id).await { + tracing::error!( + step_id = %step.id, + step_name = %step.name, + error = %e, + "Failed to dispatch step" + ); + } + } + + Ok(()) +} + +/// Dispatch a single chain step as a new contract with supervisor. +async fn dispatch_step( + pool: &PgPool, + directive: &Directive, + step: &crate::db::models::ChainStep, + owner_id: Uuid, +) -> Result<(), String> { + // Mark step as running + repository::update_step_status(pool, step.id, "running") + .await + .map_err(|e| format!("Failed to update step status: {}", e))?; + + // Create contract for this step + let contract = repository::create_contract_for_owner( + pool, + owner_id, + CreateContractRequest { + name: step.name.clone(), + description: step.description.clone(), + contract_type: Some(step.contract_type.clone()), + template_id: None, + initial_phase: step.initial_phase.clone(), + autonomous_loop: Some(true), + phase_guard: None, + local_only: Some(true), + auto_merge_local: None, + }, + ) + .await + .map_err(|e| format!("Failed to create step contract: {}", e))?; + + // Set directive_id on contract + repository::set_contract_directive_fields(pool, contract.id, Some(directive.id), false) + .await + .map_err(|e| format!("Failed to set contract directive fields: {}", e))?; + + // Build the task plan + let task_plan = step + .task_plan + .clone() + .unwrap_or_else(|| format!("Execute step: {}", step.name)); + + // Create supervisor task + let supervisor_task = repository::create_task_for_owner( + pool, + owner_id, + CreateTaskRequest { + contract_id: Some(contract.id), + name: format!("{} Supervisor", step.name), + description: step.description.clone(), + plan: task_plan, + parent_task_id: None, + is_supervisor: true, + priority: 5, + repository_url: directive.repository_url.clone(), + base_branch: directive.base_branch.clone(), + target_branch: None, + merge_mode: None, + target_repo_path: directive.local_path.clone(), + completion_action: None, + continue_from_task_id: None, + copy_files: None, + checkpoint_sha: None, + branched_from_task_id: None, + conversation_history: None, + supervisor_worktree_task_id: None, + }, + ) + .await + .map_err(|e| format!("Failed to create step supervisor task: {}", e))?; + + // Link supervisor to contract + repository::update_contract_for_owner( + pool, + contract.id, + owner_id, + UpdateContractRequest { + supervisor_task_id: Some(supervisor_task.id), + ..Default::default() + }, + ) + .await + .map_err(|e| match e { + crate::db::repository::RepositoryError::Database(e) => { + format!("Failed to link supervisor to step contract: {}", e) + } + other => format!("Failed to link supervisor to step contract: {:?}", other), + })?; + + // Link step to contract/task + repository::update_step_contract(pool, step.id, contract.id, supervisor_task.id) + .await + .map_err(|e| format!("Failed to update step contract link: {}", e))?; + + // Copy repo config from directive to step contract + if let Some(ref repo_url) = directive.repository_url { + let _ = repository::add_remote_repository( + pool, + contract.id, + "directive-repo", + repo_url, + true, + ) + .await; + } else if let Some(ref local_path) = directive.local_path { + let _ = repository::add_local_repository( + pool, + contract.id, + "directive-repo", + local_path, + true, + ) + .await; + } + + tracing::info!( + directive_id = %directive.id, + step_id = %step.id, + step_name = %step.name, + contract_id = %contract.id, + task_id = %supervisor_task.id, + "Step dispatched" + ); + + Ok(()) +} + +/// Build the planning supervisor prompt from a directive. +fn build_planning_prompt(directive: &Directive) -> String { + format!( + r#"You are planning the execution of a directive. + +DIRECTIVE: {} +GOAL: {} +REQUIREMENTS: {} +ACCEPTANCE CRITERIA: {} +CONSTRAINTS: {} + +Your job is to decompose this goal into a sequence of executable steps. +Each step will become a separate contract with its own supervisor. + +Write a JSON plan to a contract file named "chain-plan" using: + makima contract create-file "chain-plan" < plan.json + +The JSON format: +{{ + "steps": [ + {{ + "name": "Step name", + "description": "What this step accomplishes", + "task_plan": "Detailed instructions for the step's supervisor", + "depends_on": [] + }} + ] +}} + +Rules: +- Steps with no dependencies (empty depends_on array) will run in parallel. +- Steps that depend on other steps will wait until those complete. +- The depends_on array contains names of steps this step depends on. +- Each step should be a self-contained unit of work. +- Be specific in task_plan — include file paths, function names, and acceptance criteria where possible. +- Keep the number of steps reasonable (3-10 typically). + +After writing the plan file, mark the contract as complete using: + makima supervisor complete"#, + directive.title, + directive.goal, + serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(), + serde_json::to_string_pretty(&directive.acceptance_criteria).unwrap_or_default(), + serde_json::to_string_pretty(&directive.constraints).unwrap_or_default(), + ) +} + +/// Extract JSON from file body elements. +fn extract_plan_json(body: &[crate::db::models::BodyElement]) -> Option<String> { + use crate::db::models::BodyElement; + + for element in body { + match element { + BodyElement::Code { content, .. } => { + // Try to parse as JSON + let trimmed = content.trim(); + if trimmed.starts_with('{') || trimmed.starts_with('[') { + if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() { + return Some(trimmed.to_string()); + } + } + } + BodyElement::Paragraph { text } => { + let trimmed = text.trim(); + if trimmed.starts_with('{') || trimmed.starts_with('[') { + if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() { + return Some(trimmed.to_string()); + } + } + } + BodyElement::Markdown { content } => { + // Try to find JSON in markdown content + let trimmed = content.trim(); + if trimmed.starts_with('{') || trimmed.starts_with('[') { + if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() { + return Some(trimmed.to_string()); + } + } + // Try to find JSON in code blocks within markdown + if let Some(json_start) = trimmed.find("```json") { + let after = &trimmed[json_start + 7..]; + if let Some(json_end) = after.find("```") { + let json_str = after[..json_end].trim(); + if serde_json::from_str::<serde_json::Value>(json_str).is_ok() { + return Some(json_str.to_string()); + } + } + } + } + _ => {} + } + } + + // Fallback: concatenate all text content and try to find JSON + let all_text: String = body + .iter() + .map(|el| match el { + BodyElement::Code { content, .. } => content.clone(), + BodyElement::Paragraph { text } => text.clone(), + BodyElement::Markdown { content } => content.clone(), + _ => String::new(), + }) + .collect::<Vec<_>>() + .join("\n"); + + let trimmed = all_text.trim(); + if let Some(start) = trimmed.find('{') { + // Find matching closing brace + let substr = &trimmed[start..]; + if serde_json::from_str::<serde_json::Value>(substr).is_ok() { + return Some(substr.to_string()); + } + } + + None +} diff --git a/makima/src/orchestration/mod.rs b/makima/src/orchestration/mod.rs new file mode 100644 index 0000000..e7ffb70 --- /dev/null +++ b/makima/src/orchestration/mod.rs @@ -0,0 +1 @@ +pub mod directive; diff --git a/makima/src/server/handlers/directives.rs b/makima/src/server/handlers/directives.rs index a74f8ff..560151b 100644 --- a/makima/src/server/handlers/directives.rs +++ b/makima/src/server/handlers/directives.rs @@ -13,6 +13,7 @@ use crate::db::models::{ DirectiveListResponse, DirectiveWithChains, UpdateDirectiveRequest, }; use crate::db::repository::{self, RepositoryError}; +use crate::orchestration; use crate::server::auth::Authenticated; use crate::server::messages::ApiError; use crate::server::state::SharedState; @@ -438,3 +439,60 @@ pub async fn get_chain( Json(ChainWithSteps { chain, steps }).into_response() } + +/// Start a directive: create a planning contract and begin orchestration. +#[utoipa::path( + post, + path = "/api/v1/directives/{id}/start", + params( + ("id" = Uuid, Path, description = "Directive ID") + ), + responses( + (status = 200, description = "Directive started", body = Directive), + (status = 400, description = "Directive not in draft status", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Directive not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Directives" +)] +pub async fn start_directive( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match orchestration::directive::init_directive(pool, &state, auth.owner_id, id).await { + Ok(directive) => Json(directive).into_response(), + Err(e) if e.contains("not found") => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", e)), + ) + .into_response(), + Err(e) if e.contains("must be in 'draft'") => ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("INVALID_STATUS", e)), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to start directive {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("START_FAILED", e)), + ) + .into_response() + } + } +} diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index beb4c15..767d059 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -1303,7 +1303,16 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re }), ).await; - // TODO: Directive engine integration (removed for reimplementation) + // Directive engine integration + if let Err(e) = crate::orchestration::directive::on_task_completed( + &pool, &state, &updated_task, owner_id, + ).await { + tracing::warn!( + task_id = %task_id, + error = %e, + "Failed to process directive task completion" + ); + } } Ok(None) => { tracing::warn!( diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index a429612..1a59e12 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -181,6 +181,7 @@ pub fn make_router(state: SharedState) -> Router { .put(directives::update_directive) .delete(directives::delete_directive), ) + .route("/directives/{id}/start", post(directives::start_directive)) .route("/directives/{id}/chains", get(directives::list_chains)) .route("/directives/{id}/chains/{chain_id}", get(directives::get_chain)) // Contract supervisor resume endpoints diff --git a/makima/src/server/openapi.rs b/makima/src/server/openapi.rs index 0e6912a..96c19e0 100644 --- a/makima/src/server/openapi.rs +++ b/makima/src/server/openapi.rs @@ -111,6 +111,7 @@ use crate::server::messages::{ApiError, AudioEncoding, StartMessage, StopMessage directives::create_directive, directives::update_directive, directives::delete_directive, + directives::start_directive, directives::list_chains, directives::get_chain, ), |
