summaryrefslogtreecommitdiff
path: root/makima/src/orchestration/directive.rs
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/orchestration/directive.rs')
-rw-r--r--makima/src/orchestration/directive.rs478
1 files changed, 478 insertions, 0 deletions
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs
new file mode 100644
index 0000000..22003af
--- /dev/null
+++ b/makima/src/orchestration/directive.rs
@@ -0,0 +1,478 @@
+//! Directive orchestrator — automates the full directive lifecycle.
+//!
+//! Runs as a background loop, polling every 15s to:
+//! 1. Plan: active directives with no steps → spawn planning task
+//! 2. Execute: ready steps → spawn execution tasks
+//! 3. Monitor: detect task completion → update step status, advance DAG
+//! 4. Re-plan: goal updated → spawn new planning task
+
+use sqlx::PgPool;
+use uuid::Uuid;
+
+use crate::db::models::{CreateTaskRequest, UpdateTaskRequest};
+use crate::db::repository;
+use crate::server::state::{DaemonCommand, SharedState};
+
+pub struct DirectiveOrchestrator {
+ pool: PgPool,
+ state: SharedState,
+}
+
+impl DirectiveOrchestrator {
+ pub fn new(pool: PgPool, state: SharedState) -> Self {
+ Self { pool, state }
+ }
+
+ /// Run one orchestration tick — called every 15s.
+ pub async fn tick(&mut self) -> Result<(), anyhow::Error> {
+ self.phase_planning().await?;
+ self.phase_execution().await?;
+ self.phase_monitoring().await?;
+ self.phase_replanning().await?;
+ Ok(())
+ }
+
+ /// Phase 1: Active directives with no steps and no orchestrator task → spawn planning task.
+ async fn phase_planning(&self) -> Result<(), anyhow::Error> {
+ let directives = repository::get_directives_needing_planning(&self.pool).await?;
+
+ for directive in directives {
+ tracing::info!(
+ directive_id = %directive.id,
+ title = %directive.title,
+ "Directive needs planning — spawning planning task"
+ );
+
+ let plan = build_planning_prompt(&directive, &[], 1);
+
+ if let Err(e) = self
+ .spawn_orchestrator_task(
+ directive.id,
+ directive.owner_id,
+ format!("Plan: {}", directive.title),
+ plan,
+ directive.repository_url.as_deref(),
+ directive.base_branch.as_deref(),
+ )
+ .await
+ {
+ tracing::warn!(
+ directive_id = %directive.id,
+ error = %e,
+ "Failed to spawn planning task"
+ );
+ }
+ }
+ Ok(())
+ }
+
+ /// Phase 2: Ready steps with no task → create execution task and dispatch.
+ async fn phase_execution(&self) -> Result<(), anyhow::Error> {
+ let steps = repository::get_ready_steps_for_dispatch(&self.pool).await?;
+
+ for step in steps {
+ tracing::info!(
+ step_id = %step.step_id,
+ directive_id = %step.directive_id,
+ step_name = %step.step_name,
+ "Dispatching execution task for ready step"
+ );
+
+ let task_plan = step
+ .task_plan
+ .as_deref()
+ .unwrap_or("Execute the step described below.");
+
+ let plan = format!(
+ "You are executing a step in directive \"{directive_title}\".\n\n\
+ STEP: {step_name}\n\
+ DESCRIPTION: {description}\n\n\
+ INSTRUCTIONS:\n{task_plan}\n\n\
+ When done, the system will automatically mark this step as completed.\n\
+ If you cannot complete the task, report the failure clearly.",
+ directive_title = step.directive_title,
+ step_name = step.step_name,
+ description = step.step_description.as_deref().unwrap_or("(none)"),
+ task_plan = task_plan,
+ );
+
+ match self
+ .spawn_step_task(
+ step.step_id,
+ step.directive_id,
+ step.owner_id,
+ format!("{}: {}", step.directive_title, step.step_name),
+ plan,
+ step.repository_url.as_deref(),
+ step.base_branch.as_deref(),
+ )
+ .await
+ {
+ Ok(()) => {}
+ Err(e) => {
+ tracing::warn!(
+ step_id = %step.step_id,
+ error = %e,
+ "Failed to spawn execution task for step"
+ );
+ }
+ }
+ }
+ Ok(())
+ }
+
+ /// Phase 3: Monitor running steps and orchestrator tasks.
+ async fn phase_monitoring(&self) -> Result<(), anyhow::Error> {
+ // Check running steps
+ let running = repository::get_running_steps_with_tasks(&self.pool).await?;
+
+ for step in running {
+ match step.task_status.as_str() {
+ "completed" | "merged" | "done" => {
+ tracing::info!(
+ step_id = %step.step_id,
+ directive_id = %step.directive_id,
+ task_id = %step.task_id,
+ "Step task completed — updating step to completed"
+ );
+ let update = crate::db::models::UpdateDirectiveStepRequest {
+ status: Some("completed".to_string()),
+ ..Default::default()
+ };
+ repository::update_directive_step(&self.pool, step.step_id, update).await?;
+ repository::advance_directive_ready_steps(&self.pool, step.directive_id)
+ .await?;
+ repository::check_directive_idle(&self.pool, step.directive_id).await?;
+ }
+ "failed" | "interrupted" => {
+ tracing::warn!(
+ step_id = %step.step_id,
+ directive_id = %step.directive_id,
+ task_id = %step.task_id,
+ task_status = %step.task_status,
+ "Step task failed — updating step to failed"
+ );
+ let update = crate::db::models::UpdateDirectiveStepRequest {
+ status: Some("failed".to_string()),
+ ..Default::default()
+ };
+ repository::update_directive_step(&self.pool, step.step_id, update).await?;
+ repository::advance_directive_ready_steps(&self.pool, step.directive_id)
+ .await?;
+ repository::check_directive_idle(&self.pool, step.directive_id).await?;
+ }
+ _ => {
+ // Still running — do nothing
+ }
+ }
+ }
+
+ // Check orchestrator (planning) tasks
+ let orch_tasks = repository::get_orchestrator_tasks_to_check(&self.pool).await?;
+
+ for orch in orch_tasks {
+ match orch.task_status.as_str() {
+ "completed" | "merged" | "done" => {
+ tracing::info!(
+ directive_id = %orch.directive_id,
+ task_id = %orch.orchestrator_task_id,
+ "Planning task completed — clearing orchestrator task"
+ );
+ repository::clear_orchestrator_task(&self.pool, orch.directive_id).await?;
+ // Advance DAG — planning task should have created steps
+ repository::advance_directive_ready_steps(&self.pool, orch.directive_id)
+ .await?;
+ }
+ "failed" | "interrupted" => {
+ tracing::warn!(
+ directive_id = %orch.directive_id,
+ task_id = %orch.orchestrator_task_id,
+ "Planning task failed — pausing directive"
+ );
+ repository::clear_orchestrator_task(&self.pool, orch.directive_id).await?;
+ repository::set_directive_status(
+ &self.pool,
+ orch.owner_id,
+ orch.directive_id,
+ "paused",
+ )
+ .await?;
+ }
+ _ => {}
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Phase 4: Re-planning — goal updated after latest step creation.
+ async fn phase_replanning(&self) -> Result<(), anyhow::Error> {
+ let directives = repository::get_directives_needing_replanning(&self.pool).await?;
+
+ for directive in directives {
+ tracing::info!(
+ directive_id = %directive.id,
+ title = %directive.title,
+ "Directive goal updated — spawning re-planning task"
+ );
+
+ let existing_steps =
+ repository::list_directive_steps(&self.pool, directive.id).await?;
+ let generation =
+ repository::get_directive_max_generation(&self.pool, directive.id).await? + 1;
+
+ let plan = build_planning_prompt(&directive, &existing_steps, generation);
+
+ if let Err(e) = self
+ .spawn_orchestrator_task(
+ directive.id,
+ directive.owner_id,
+ format!("Re-plan: {}", directive.title),
+ plan,
+ directive.repository_url.as_deref(),
+ directive.base_branch.as_deref(),
+ )
+ .await
+ {
+ tracing::warn!(
+ directive_id = %directive.id,
+ error = %e,
+ "Failed to spawn re-planning task"
+ );
+ }
+ }
+ Ok(())
+ }
+
+ /// Spawn a planning/re-planning task and assign it as the directive's orchestrator task.
+ async fn spawn_orchestrator_task(
+ &self,
+ directive_id: Uuid,
+ owner_id: Uuid,
+ name: String,
+ plan: String,
+ repo_url: Option<&str>,
+ base_branch: Option<&str>,
+ ) -> Result<(), anyhow::Error> {
+ let req = CreateTaskRequest {
+ contract_id: None,
+ name,
+ description: Some("Directive planning task".to_string()),
+ plan,
+ parent_task_id: None,
+ is_supervisor: false,
+ priority: 0,
+ repository_url: repo_url.map(|s| s.to_string()),
+ base_branch: base_branch.map(|s| s.to_string()),
+ target_branch: None,
+ merge_mode: None,
+ target_repo_path: None,
+ completion_action: None,
+ continue_from_task_id: None,
+ copy_files: None,
+ checkpoint_sha: None,
+ branched_from_task_id: None,
+ conversation_history: None,
+ supervisor_worktree_task_id: None,
+ directive_id: Some(directive_id),
+ directive_step_id: None,
+ };
+
+ let task = repository::create_task_for_owner(&self.pool, owner_id, req).await?;
+
+ repository::assign_orchestrator_task(&self.pool, directive_id, task.id).await?;
+
+ // Try to dispatch to a daemon
+ self.try_dispatch_task(task.id, owner_id, &task.name, &task.plan, task.version).await;
+
+ Ok(())
+ }
+
+ /// Spawn an execution task for a step and assign it.
+ async fn spawn_step_task(
+ &self,
+ step_id: Uuid,
+ directive_id: Uuid,
+ owner_id: Uuid,
+ name: String,
+ plan: String,
+ repo_url: Option<&str>,
+ base_branch: Option<&str>,
+ ) -> Result<(), anyhow::Error> {
+ let req = CreateTaskRequest {
+ contract_id: None,
+ name,
+ description: Some("Directive step execution task".to_string()),
+ plan,
+ parent_task_id: None,
+ is_supervisor: false,
+ priority: 0,
+ repository_url: repo_url.map(|s| s.to_string()),
+ base_branch: base_branch.map(|s| s.to_string()),
+ target_branch: None,
+ merge_mode: None,
+ target_repo_path: None,
+ completion_action: None,
+ continue_from_task_id: None,
+ copy_files: None,
+ checkpoint_sha: None,
+ branched_from_task_id: None,
+ conversation_history: None,
+ supervisor_worktree_task_id: None,
+ directive_id: Some(directive_id),
+ directive_step_id: Some(step_id),
+ };
+
+ let task = repository::create_task_for_owner(&self.pool, owner_id, req).await?;
+
+ repository::assign_task_to_step(&self.pool, step_id, task.id).await?;
+
+ // Try to dispatch to a daemon
+ self.try_dispatch_task(task.id, owner_id, &task.name, &task.plan, task.version).await;
+
+ Ok(())
+ }
+
+ /// Try to dispatch a task to an available daemon. If none available, leave pending.
+ async fn try_dispatch_task(
+ &self,
+ task_id: Uuid,
+ owner_id: Uuid,
+ task_name: &str,
+ plan: &str,
+ version: i32,
+ ) {
+ let Some(daemon_id) = self.state.find_alternative_daemon(owner_id, &[]) else {
+ tracing::info!(
+ task_id = %task_id,
+ "No daemon available for directive task — leaving pending for retry"
+ );
+ return;
+ };
+
+ // Update task status to starting and assign daemon
+ let update_req = UpdateTaskRequest {
+ status: Some("starting".to_string()),
+ daemon_id: Some(daemon_id),
+ version: Some(version),
+ ..Default::default()
+ };
+
+ match repository::update_task_for_owner(&self.pool, task_id, owner_id, update_req).await {
+ Ok(Some(updated_task)) => {
+ let command = DaemonCommand::SpawnTask {
+ task_id,
+ task_name: task_name.to_string(),
+ plan: plan.to_string(),
+ repo_url: updated_task.repository_url.clone(),
+ base_branch: updated_task.base_branch.clone(),
+ target_branch: updated_task.target_branch.clone(),
+ parent_task_id: None,
+ depth: 0,
+ is_orchestrator: false,
+ target_repo_path: None,
+ completion_action: None,
+ continue_from_task_id: None,
+ copy_files: None,
+ contract_id: None,
+ is_supervisor: false,
+ autonomous_loop: false,
+ resume_session: false,
+ conversation_history: None,
+ patch_data: None,
+ patch_base_sha: None,
+ local_only: false,
+ auto_merge_local: false,
+ supervisor_worktree_task_id: None,
+ };
+
+ if let Err(e) = self.state.send_daemon_command(daemon_id, command).await {
+ tracing::warn!(
+ task_id = %task_id,
+ daemon_id = %daemon_id,
+ error = %e,
+ "Failed to send SpawnTask to daemon for directive task"
+ );
+ } else {
+ tracing::info!(
+ task_id = %task_id,
+ daemon_id = %daemon_id,
+ "Dispatched directive task to daemon"
+ );
+ }
+ }
+ Ok(None) => {
+ tracing::warn!(task_id = %task_id, "Task not found when trying to dispatch");
+ }
+ Err(e) => {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to update task for dispatch");
+ }
+ }
+ }
+}
+
+/// Build the planning prompt for a directive.
+fn build_planning_prompt(
+ directive: &crate::db::models::Directive,
+ existing_steps: &[crate::db::models::DirectiveStep],
+ generation: i32,
+) -> String {
+ let mut prompt = String::new();
+
+ if !existing_steps.is_empty() {
+ prompt.push_str(&format!(
+ "EXISTING STEPS (generation {}):\n",
+ generation - 1
+ ));
+ for step in existing_steps {
+ prompt.push_str(&format!(
+ "- {} [{}]: {}\n",
+ step.name,
+ step.status,
+ step.description.as_deref().unwrap_or("(no description)")
+ ));
+ }
+ prompt.push_str(&format!(
+ "\nAdd new steps that build on or complement existing work. Use generation {}.\n\n",
+ generation
+ ));
+ }
+
+ prompt.push_str(&format!(
+ r#"You are planning the implementation of a directive.
+
+DIRECTIVE: "{title}"
+GOAL: {goal}
+{repo_section}
+Your job:
+1. Explore the repository to understand the codebase
+2. Decompose the goal into concrete, ordered steps
+3. Each step = one task for a Claude Code instance to execute
+4. Submit ALL steps using the batch command or individual add-step commands
+
+For each step, define:
+- name: Short imperative title (e.g., "Add user authentication middleware")
+- description: What to do and acceptance criteria
+- taskPlan: Full instructions for the Claude instance (include file paths, patterns to follow)
+- dependsOn: UUIDs of steps this depends on (use IDs from previous add-step responses)
+- orderIndex: Execution order hint
+
+Submit steps:
+ makima directive add-step "Step Name" --description "..." --task-plan "..."
+ (Use --depends-on "uuid1,uuid2" for dependencies, referencing IDs from earlier add-step outputs)
+
+Or batch:
+ makima directive batch-add-steps --json '[{{"name":"...","description":"...","taskPlan":"...","dependsOn":[],"orderIndex":0}}]'
+
+IMPORTANT: Each step's taskPlan must be self-contained. The executing instance won't have your planning context.
+"#,
+ title = directive.title,
+ goal = directive.goal,
+ repo_section = match &directive.repository_url {
+ Some(url) => format!("REPOSITORY: {}\n", url),
+ None => String::new(),
+ },
+ ));
+
+ prompt
+}