summaryrefslogtreecommitdiff
path: root/makima/src/orchestration/directive.rs
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/orchestration/directive.rs')
-rw-r--r--makima/src/orchestration/directive.rs243
1 files changed, 241 insertions, 2 deletions
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs
index 98690bb..155cfad 100644
--- a/makima/src/orchestration/directive.rs
+++ b/makima/src/orchestration/directive.rs
@@ -11,7 +11,7 @@ use uuid::Uuid;
use base64::Engine;
-use crate::db::models::{CreateTaskRequest, UpdateTaskRequest};
+use crate::db::models::{CreateContractRequest, CreateTaskRequest, UpdateContractRequest, UpdateTaskRequest};
use crate::db::repository;
use crate::server::state::{DaemonCommand, SharedState};
@@ -86,6 +86,42 @@ impl DirectiveOrchestrator {
let steps = repository::get_ready_steps_for_dispatch(&self.pool).await?;
for step in steps {
+ // If the step has a contract_type, create a contract instead of a standalone task
+ if step.contract_type.is_some() {
+ tracing::info!(
+ step_id = %step.step_id,
+ directive_id = %step.directive_id,
+ step_name = %step.step_name,
+ contract_type = ?step.contract_type,
+ "Spawning contract for contract-backed step"
+ );
+
+ match self
+ .spawn_step_contract(
+ step.step_id,
+ step.directive_id,
+ step.owner_id,
+ &step.step_name,
+ step.step_description.as_deref(),
+ step.task_plan.as_deref(),
+ step.contract_type.as_deref().unwrap_or("simple"),
+ step.repository_url.as_deref(),
+ step.base_branch.as_deref(),
+ )
+ .await
+ {
+ Ok(()) => {}
+ Err(e) => {
+ tracing::warn!(
+ step_id = %step.step_id,
+ error = %e,
+ "Failed to spawn contract for step"
+ );
+ }
+ }
+ continue;
+ }
+
tracing::info!(
step_id = %step.step_id,
directive_id = %step.directive_id,
@@ -218,7 +254,70 @@ impl DirectiveOrchestrator {
/// Phase 3: Monitor running steps and orchestrator tasks.
async fn phase_monitoring(&self) -> Result<(), anyhow::Error> {
- // Check running steps
+ // Check contract-backed running steps first
+ let contract_steps = repository::get_running_steps_with_contracts(&self.pool).await?;
+
+ for step in contract_steps {
+ if let Err(e) = async {
+ match step.contract_status.as_str() {
+ "completed" | "archived" => {
+ tracing::info!(
+ step_id = %step.step_id,
+ directive_id = %step.directive_id,
+ contract_id = %step.contract_id,
+ contract_status = %step.contract_status,
+ "Contract-backed step contract completed — updating step to completed"
+ );
+ let update = crate::db::models::UpdateDirectiveStepRequest {
+ status: Some("completed".to_string()),
+ ..Default::default()
+ };
+ repository::update_directive_step(&self.pool, step.step_id, update).await?;
+
+ // Mark linked orders as done
+ if let Ok(linked_orders) = repository::get_orders_by_step_id(&self.pool, step.step_id).await {
+ for order in linked_orders {
+ if order.status != "done" && order.status != "archived" {
+ let order_update = crate::db::models::UpdateOrderRequest {
+ status: Some("done".to_string()),
+ ..Default::default()
+ };
+ let _ = repository::update_order(&self.pool, order.owner_id, order.id, order_update).await;
+ }
+ }
+ }
+
+ repository::advance_directive_ready_steps(&self.pool, step.directive_id)
+ .await?;
+ repository::check_directive_idle(&self.pool, step.directive_id).await?;
+ }
+ "active" => {
+ // Contract still active — check if the supervisor has failed
+ // by looking at whether there are any failed tasks with no active tasks remaining
+ tracing::debug!(
+ step_id = %step.step_id,
+ contract_id = %step.contract_id,
+ contract_phase = %step.contract_phase,
+ "Contract-backed step still active — monitoring"
+ );
+ }
+ _ => {
+ // Unknown status — log and skip
+ tracing::debug!(
+ step_id = %step.step_id,
+ contract_id = %step.contract_id,
+ contract_status = %step.contract_status,
+ "Contract-backed step in unexpected status"
+ );
+ }
+ }
+ Ok::<(), anyhow::Error>(())
+ }.await {
+ tracing::warn!(step_id = %step.step_id, error = %e, "Error processing contract-backed step — continuing");
+ }
+ }
+
+ // Check task-backed running steps (excludes contract-backed steps)
let running = repository::get_running_steps_with_tasks(&self.pool).await?;
for step in running {
@@ -505,6 +604,142 @@ impl DirectiveOrchestrator {
Ok(())
}
+ /// Spawn a contract for a contract-backed step.
+ /// Creates a contract, adds the directive's repository to it, links it to the step,
+ /// creates a supervisor task, and marks the step as running.
+ async fn spawn_step_contract(
+ &self,
+ step_id: Uuid,
+ directive_id: Uuid,
+ owner_id: Uuid,
+ step_name: &str,
+ step_description: Option<&str>,
+ task_plan: Option<&str>,
+ contract_type: &str,
+ repo_url: Option<&str>,
+ base_branch: Option<&str>,
+ ) -> Result<(), anyhow::Error> {
+ // Build contract description from step info
+ let description = match (step_description, task_plan) {
+ (Some(desc), Some(plan)) => Some(format!("{}\n\n{}", desc, plan)),
+ (Some(desc), None) => Some(desc.to_string()),
+ (None, Some(plan)) => Some(plan.to_string()),
+ (None, None) => None,
+ };
+
+ // Create the contract
+ let contract_req = CreateContractRequest {
+ name: step_name.to_string(),
+ description,
+ contract_type: Some(contract_type.to_string()),
+ template_id: None,
+ initial_phase: None,
+ autonomous_loop: Some(true),
+ phase_guard: None,
+ local_only: None,
+ auto_merge_local: None,
+ };
+
+ let contract = repository::create_contract_for_owner(&self.pool, owner_id, contract_req).await?;
+
+ tracing::info!(
+ step_id = %step_id,
+ contract_id = %contract.id,
+ contract_type = %contract.contract_type,
+ "Created contract for directive step"
+ );
+
+ // Link the contract to the step
+ repository::link_contract_to_step(&self.pool, step_id, contract.id).await?;
+
+ // Add the directive's repository to the contract (if available)
+ if let Some(url) = repo_url {
+ if let Err(e) = repository::add_remote_repository(
+ &self.pool,
+ contract.id,
+ step_name,
+ url,
+ true, // is_primary
+ )
+ .await
+ {
+ tracing::warn!(
+ contract_id = %contract.id,
+ error = %e,
+ "Failed to add repository to contract — continuing without it"
+ );
+ }
+ }
+
+ // Create supervisor task for the contract (following the pattern from contract handlers)
+ let supervisor_name = format!("{} Supervisor", step_name);
+ let supervisor_plan = format!(
+ "You are the supervisor for contract '{}'. Your goal is to drive this contract to completion.\n\n{}",
+ step_name,
+ contract.description.as_deref().unwrap_or("No description provided.")
+ );
+
+ let supervisor_req = CreateTaskRequest {
+ name: supervisor_name.clone(),
+ description: None,
+ plan: supervisor_plan.clone(),
+ repository_url: repo_url.map(|s| s.to_string()),
+ base_branch: base_branch.map(|s| s.to_string()),
+ target_branch: None,
+ parent_task_id: None,
+ contract_id: Some(contract.id),
+ target_repo_path: None,
+ completion_action: None,
+ continue_from_task_id: None,
+ copy_files: None,
+ is_supervisor: true,
+ checkpoint_sha: None,
+ priority: 0,
+ merge_mode: None,
+ branched_from_task_id: None,
+ conversation_history: None,
+ supervisor_worktree_task_id: None,
+ directive_id: Some(directive_id),
+ directive_step_id: Some(step_id),
+ };
+
+ let supervisor_task = repository::create_task_for_owner(&self.pool, owner_id, supervisor_req).await?;
+
+ tracing::info!(
+ contract_id = %contract.id,
+ supervisor_task_id = %supervisor_task.id,
+ "Created supervisor task for contract-backed step"
+ );
+
+ // Link supervisor task to contract
+ let update_req = UpdateContractRequest {
+ supervisor_task_id: Some(supervisor_task.id),
+ version: Some(contract.version),
+ ..Default::default()
+ };
+ if let Err(e) = repository::update_contract_for_owner(&self.pool, contract.id, owner_id, update_req).await {
+ tracing::warn!(
+ contract_id = %contract.id,
+ error = %e,
+ "Failed to link supervisor task to contract"
+ );
+ }
+
+ // Try to dispatch the supervisor task to a daemon
+ if self
+ .try_dispatch_task(supervisor_task.id, owner_id, &supervisor_task.name, &supervisor_task.plan, supervisor_task.version)
+ .await
+ {
+ repository::set_step_running(&self.pool, step_id).await?;
+ } else {
+ // Even if dispatch fails, mark step as running since contract is created.
+ // The supervisor task will be retried by the pending task retry logic.
+ repository::set_step_running(&self.pool, step_id).await?;
+ }
+
+ Ok(())
+ }
+
/// Try to dispatch a task to an available daemon. Returns true if dispatched.
async fn try_dispatch_task(
&self,
@@ -1337,6 +1572,10 @@ For each step, define:
- orderIndex: Execution phase number. Steps only start after ALL steps with a lower orderIndex complete.
Steps with the same orderIndex run in parallel. Use ascending values (0, 1, 2, ...) to create sequential phases.
Use dependsOn for fine-grained control within the same phase.
+- contractType (OPTIONAL): For large, complex work items, set this to create a full contract instead of a
+ standalone task. Valid values: "simple" (Plan → Execute), "specification" (Research → Specify → Plan → Execute → Review),
+ "execute" (Execute only). Only use this for steps that truly need multi-phase orchestration.
+ Most steps should NOT use this — standalone tasks are the default and preferred for typical work.
Submit steps:
makima directive add-step "Step Name" --description "..." --task-plan "..."