diff options
Diffstat (limited to 'makima/src/orchestration/directive.rs')
| -rw-r--r-- | makima/src/orchestration/directive.rs | 243 |
1 files changed, 241 insertions, 2 deletions
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs index 98690bb..155cfad 100644 --- a/makima/src/orchestration/directive.rs +++ b/makima/src/orchestration/directive.rs @@ -11,7 +11,7 @@ use uuid::Uuid; use base64::Engine; -use crate::db::models::{CreateTaskRequest, UpdateTaskRequest}; +use crate::db::models::{CreateContractRequest, CreateTaskRequest, UpdateContractRequest, UpdateTaskRequest}; use crate::db::repository; use crate::server::state::{DaemonCommand, SharedState}; @@ -86,6 +86,42 @@ impl DirectiveOrchestrator { let steps = repository::get_ready_steps_for_dispatch(&self.pool).await?; for step in steps { + // If the step has a contract_type, create a contract instead of a standalone task + if step.contract_type.is_some() { + tracing::info!( + step_id = %step.step_id, + directive_id = %step.directive_id, + step_name = %step.step_name, + contract_type = ?step.contract_type, + "Spawning contract for contract-backed step" + ); + + match self + .spawn_step_contract( + step.step_id, + step.directive_id, + step.owner_id, + &step.step_name, + step.step_description.as_deref(), + step.task_plan.as_deref(), + step.contract_type.as_deref().unwrap_or("simple"), + step.repository_url.as_deref(), + step.base_branch.as_deref(), + ) + .await + { + Ok(()) => {} + Err(e) => { + tracing::warn!( + step_id = %step.step_id, + error = %e, + "Failed to spawn contract for step" + ); + } + } + continue; + } + tracing::info!( step_id = %step.step_id, directive_id = %step.directive_id, @@ -218,7 +254,70 @@ impl DirectiveOrchestrator { /// Phase 3: Monitor running steps and orchestrator tasks. async fn phase_monitoring(&self) -> Result<(), anyhow::Error> { - // Check running steps + // Check contract-backed running steps first + let contract_steps = repository::get_running_steps_with_contracts(&self.pool).await?; + + for step in contract_steps { + if let Err(e) = async { + match step.contract_status.as_str() { + "completed" | "archived" => { + tracing::info!( + step_id = %step.step_id, + directive_id = %step.directive_id, + contract_id = %step.contract_id, + contract_status = %step.contract_status, + "Contract-backed step contract completed — updating step to completed" + ); + let update = crate::db::models::UpdateDirectiveStepRequest { + status: Some("completed".to_string()), + ..Default::default() + }; + repository::update_directive_step(&self.pool, step.step_id, update).await?; + + // Mark linked orders as done + if let Ok(linked_orders) = repository::get_orders_by_step_id(&self.pool, step.step_id).await { + for order in linked_orders { + if order.status != "done" && order.status != "archived" { + let order_update = crate::db::models::UpdateOrderRequest { + status: Some("done".to_string()), + ..Default::default() + }; + let _ = repository::update_order(&self.pool, order.owner_id, order.id, order_update).await; + } + } + } + + repository::advance_directive_ready_steps(&self.pool, step.directive_id) + .await?; + repository::check_directive_idle(&self.pool, step.directive_id).await?; + } + "active" => { + // Contract still active — check if the supervisor has failed + // by looking at whether there are any failed tasks with no active tasks remaining + tracing::debug!( + step_id = %step.step_id, + contract_id = %step.contract_id, + contract_phase = %step.contract_phase, + "Contract-backed step still active — monitoring" + ); + } + _ => { + // Unknown status — log and skip + tracing::debug!( + step_id = %step.step_id, + contract_id = %step.contract_id, + contract_status = %step.contract_status, + "Contract-backed step in unexpected status" + ); + } + } + Ok::<(), anyhow::Error>(()) + }.await { + tracing::warn!(step_id = %step.step_id, error = %e, "Error processing contract-backed step — continuing"); + } + } + + // Check task-backed running steps (excludes contract-backed steps) let running = repository::get_running_steps_with_tasks(&self.pool).await?; for step in running { @@ -505,6 +604,142 @@ impl DirectiveOrchestrator { Ok(()) } + /// Spawn a contract for a contract-backed step. + /// Creates a contract, adds the directive's repository to it, links it to the step, + /// creates a supervisor task, and marks the step as running. + async fn spawn_step_contract( + &self, + step_id: Uuid, + directive_id: Uuid, + owner_id: Uuid, + step_name: &str, + step_description: Option<&str>, + task_plan: Option<&str>, + contract_type: &str, + repo_url: Option<&str>, + base_branch: Option<&str>, + ) -> Result<(), anyhow::Error> { + // Build contract description from step info + let description = match (step_description, task_plan) { + (Some(desc), Some(plan)) => Some(format!("{}\n\n{}", desc, plan)), + (Some(desc), None) => Some(desc.to_string()), + (None, Some(plan)) => Some(plan.to_string()), + (None, None) => None, + }; + + // Create the contract + let contract_req = CreateContractRequest { + name: step_name.to_string(), + description, + contract_type: Some(contract_type.to_string()), + template_id: None, + initial_phase: None, + autonomous_loop: Some(true), + phase_guard: None, + local_only: None, + auto_merge_local: None, + }; + + let contract = repository::create_contract_for_owner(&self.pool, owner_id, contract_req).await?; + + tracing::info!( + step_id = %step_id, + contract_id = %contract.id, + contract_type = %contract.contract_type, + "Created contract for directive step" + ); + + // Link the contract to the step + repository::link_contract_to_step(&self.pool, step_id, contract.id).await?; + + // Add the directive's repository to the contract (if available) + if let Some(url) = repo_url { + if let Err(e) = repository::add_remote_repository( + &self.pool, + contract.id, + step_name, + url, + true, // is_primary + ) + .await + { + tracing::warn!( + contract_id = %contract.id, + error = %e, + "Failed to add repository to contract — continuing without it" + ); + } + } + + // Create supervisor task for the contract (following the pattern from contract handlers) + let supervisor_name = format!("{} Supervisor", step_name); + let supervisor_plan = format!( + "You are the supervisor for contract '{}'. Your goal is to drive this contract to completion.\n\n{}", + step_name, + contract.description.as_deref().unwrap_or("No description provided.") + ); + + let supervisor_req = CreateTaskRequest { + name: supervisor_name.clone(), + description: None, + plan: supervisor_plan.clone(), + repository_url: repo_url.map(|s| s.to_string()), + base_branch: base_branch.map(|s| s.to_string()), + target_branch: None, + parent_task_id: None, + contract_id: Some(contract.id), + target_repo_path: None, + completion_action: None, + continue_from_task_id: None, + copy_files: None, + is_supervisor: true, + checkpoint_sha: None, + priority: 0, + merge_mode: None, + branched_from_task_id: None, + conversation_history: None, + supervisor_worktree_task_id: None, + directive_id: Some(directive_id), + directive_step_id: Some(step_id), + }; + + let supervisor_task = repository::create_task_for_owner(&self.pool, owner_id, supervisor_req).await?; + + tracing::info!( + contract_id = %contract.id, + supervisor_task_id = %supervisor_task.id, + "Created supervisor task for contract-backed step" + ); + + // Link supervisor task to contract + let update_req = UpdateContractRequest { + supervisor_task_id: Some(supervisor_task.id), + version: Some(contract.version), + ..Default::default() + }; + if let Err(e) = repository::update_contract_for_owner(&self.pool, contract.id, owner_id, update_req).await { + tracing::warn!( + contract_id = %contract.id, + error = %e, + "Failed to link supervisor task to contract" + ); + } + + // Try to dispatch the supervisor task to a daemon + if self + .try_dispatch_task(supervisor_task.id, owner_id, &supervisor_task.name, &supervisor_task.plan, supervisor_task.version) + .await + { + repository::set_step_running(&self.pool, step_id).await?; + } else { + // Even if dispatch fails, mark step as running since contract is created. + // The supervisor task will be retried by the pending task retry logic. + repository::set_step_running(&self.pool, step_id).await?; + } + + Ok(()) + } + /// Try to dispatch a task to an available daemon. Returns true if dispatched. async fn try_dispatch_task( &self, @@ -1337,6 +1572,10 @@ For each step, define: - orderIndex: Execution phase number. Steps only start after ALL steps with a lower orderIndex complete. Steps with the same orderIndex run in parallel. Use ascending values (0, 1, 2, ...) to create sequential phases. Use dependsOn for fine-grained control within the same phase. +- contractType (OPTIONAL): For large, complex work items, set this to create a full contract instead of a + standalone task. Valid values: "simple" (Plan → Execute), "specification" (Research → Specify → Plan → Execute → Review), + "execute" (Execute only). Only use this for steps that truly need multi-phase orchestration. + Most steps should NOT use this — standalone tasks are the default and preferred for typical work. Submit steps: makima directive add-step "Step Name" --description "..." --task-plan "..." |
