summaryrefslogblamecommitdiff
path: root/makima/src/orchestration/directive.rs
blob: 744977e5fc698906d4919ff0ee37a8284647f807 (plain) (tree)




































































                                                                                                  
                                                                                
                                                                  
                                       

















































                                                                                         














                                                                                                









































































































































































                                                                                               

                                                                                        



































                                                                                       

                                                                                    
 






                                                                                       



              
                                                                                  






                               
               




                                                                                      
                         



































                                                                                                  
                                                            








                                                                                          
                                 





                                                             
                                








                                                                                                     
             



































































                                                                                                                                 
//! Directive orchestrator — automates the full directive lifecycle.
//!
//! Runs as a background loop, polling every 15s to:
//! 1. Plan: active directives with no steps → spawn planning task
//! 2. Execute: ready steps → spawn execution tasks
//! 3. Monitor: detect task completion → update step status, advance DAG
//! 4. Re-plan: goal updated → spawn new planning task

use sqlx::PgPool;
use uuid::Uuid;

use crate::db::models::{CreateTaskRequest, UpdateTaskRequest};
use crate::db::repository;
use crate::server::state::{DaemonCommand, SharedState};

pub struct DirectiveOrchestrator {
    pool: PgPool,
    state: SharedState,
}

impl DirectiveOrchestrator {
    pub fn new(pool: PgPool, state: SharedState) -> Self {
        Self { pool, state }
    }

    /// Run one orchestration tick — called every 15s.
    pub async fn tick(&mut self) -> Result<(), anyhow::Error> {
        self.phase_planning().await?;
        self.phase_execution().await?;
        self.phase_monitoring().await?;
        self.phase_replanning().await?;
        Ok(())
    }

    /// Phase 1: Active directives with no steps and no orchestrator task → spawn planning task.
    async fn phase_planning(&self) -> Result<(), anyhow::Error> {
        let directives = repository::get_directives_needing_planning(&self.pool).await?;

        for directive in directives {
            tracing::info!(
                directive_id = %directive.id,
                title = %directive.title,
                "Directive needs planning — spawning planning task"
            );

            let plan = build_planning_prompt(&directive, &[], 1);

            if let Err(e) = self
                .spawn_orchestrator_task(
                    directive.id,
                    directive.owner_id,
                    format!("Plan: {}", directive.title),
                    plan,
                    directive.repository_url.as_deref(),
                    directive.base_branch.as_deref(),
                )
                .await
            {
                tracing::warn!(
                    directive_id = %directive.id,
                    error = %e,
                    "Failed to spawn planning task"
                );
            }
        }
        Ok(())
    }

    /// Phase 2: Ready steps with no task → create execution task and dispatch.
    /// Also retries pending directive tasks that weren't dispatched previously.
    async fn phase_execution(&self) -> Result<(), anyhow::Error> {
        // Create tasks for ready steps
        let steps = repository::get_ready_steps_for_dispatch(&self.pool).await?;

        for step in steps {
            tracing::info!(
                step_id = %step.step_id,
                directive_id = %step.directive_id,
                step_name = %step.step_name,
                "Dispatching execution task for ready step"
            );

            let task_plan = step
                .task_plan
                .as_deref()
                .unwrap_or("Execute the step described below.");

            let plan = format!(
                "You are executing a step in directive \"{directive_title}\".\n\n\
                 STEP: {step_name}\n\
                 DESCRIPTION: {description}\n\n\
                 INSTRUCTIONS:\n{task_plan}\n\n\
                 When done, the system will automatically mark this step as completed.\n\
                 If you cannot complete the task, report the failure clearly.",
                directive_title = step.directive_title,
                step_name = step.step_name,
                description = step.step_description.as_deref().unwrap_or("(none)"),
                task_plan = task_plan,
            );

            match self
                .spawn_step_task(
                    step.step_id,
                    step.directive_id,
                    step.owner_id,
                    format!("{}: {}", step.directive_title, step.step_name),
                    plan,
                    step.repository_url.as_deref(),
                    step.base_branch.as_deref(),
                )
                .await
            {
                Ok(()) => {}
                Err(e) => {
                    tracing::warn!(
                        step_id = %step.step_id,
                        error = %e,
                        "Failed to spawn execution task for step"
                    );
                }
            }
        }

        // Retry pending directive tasks that weren't dispatched
        let pending = repository::get_pending_directive_tasks(&self.pool).await?;
        for task in pending {
            if self
                .try_dispatch_task(task.id, task.owner_id, &task.name, &task.plan, task.version)
                .await
            {
                // Task dispatched — mark its step as running if it has one
                if let Some(step_id) = task.directive_step_id {
                    let _ = repository::set_step_running(&self.pool, step_id).await;
                }
            }
        }

        Ok(())
    }

    /// Phase 3: Monitor running steps and orchestrator tasks.
    async fn phase_monitoring(&self) -> Result<(), anyhow::Error> {
        // Check running steps
        let running = repository::get_running_steps_with_tasks(&self.pool).await?;

        for step in running {
            match step.task_status.as_str() {
                "completed" | "merged" | "done" => {
                    tracing::info!(
                        step_id = %step.step_id,
                        directive_id = %step.directive_id,
                        task_id = %step.task_id,
                        "Step task completed — updating step to completed"
                    );
                    let update = crate::db::models::UpdateDirectiveStepRequest {
                        status: Some("completed".to_string()),
                        ..Default::default()
                    };
                    repository::update_directive_step(&self.pool, step.step_id, update).await?;
                    repository::advance_directive_ready_steps(&self.pool, step.directive_id)
                        .await?;
                    repository::check_directive_idle(&self.pool, step.directive_id).await?;
                }
                "failed" | "interrupted" => {
                    tracing::warn!(
                        step_id = %step.step_id,
                        directive_id = %step.directive_id,
                        task_id = %step.task_id,
                        task_status = %step.task_status,
                        "Step task failed — updating step to failed"
                    );
                    let update = crate::db::models::UpdateDirectiveStepRequest {
                        status: Some("failed".to_string()),
                        ..Default::default()
                    };
                    repository::update_directive_step(&self.pool, step.step_id, update).await?;
                    repository::advance_directive_ready_steps(&self.pool, step.directive_id)
                        .await?;
                    repository::check_directive_idle(&self.pool, step.directive_id).await?;
                }
                _ => {
                    // Still running — do nothing
                }
            }
        }

        // Check orchestrator (planning) tasks
        let orch_tasks = repository::get_orchestrator_tasks_to_check(&self.pool).await?;

        for orch in orch_tasks {
            match orch.task_status.as_str() {
                "completed" | "merged" | "done" => {
                    tracing::info!(
                        directive_id = %orch.directive_id,
                        task_id = %orch.orchestrator_task_id,
                        "Planning task completed — clearing orchestrator task"
                    );
                    repository::clear_orchestrator_task(&self.pool, orch.directive_id).await?;
                    // Advance DAG — planning task should have created steps
                    repository::advance_directive_ready_steps(&self.pool, orch.directive_id)
                        .await?;
                }
                "failed" | "interrupted" => {
                    tracing::warn!(
                        directive_id = %orch.directive_id,
                        task_id = %orch.orchestrator_task_id,
                        "Planning task failed — pausing directive"
                    );
                    repository::clear_orchestrator_task(&self.pool, orch.directive_id).await?;
                    repository::set_directive_status(
                        &self.pool,
                        orch.owner_id,
                        orch.directive_id,
                        "paused",
                    )
                    .await?;
                }
                _ => {}
            }
        }

        Ok(())
    }

    /// Phase 4: Re-planning — goal updated after latest step creation.
    async fn phase_replanning(&self) -> Result<(), anyhow::Error> {
        let directives = repository::get_directives_needing_replanning(&self.pool).await?;

        for directive in directives {
            tracing::info!(
                directive_id = %directive.id,
                title = %directive.title,
                "Directive goal updated — spawning re-planning task"
            );

            let existing_steps =
                repository::list_directive_steps(&self.pool, directive.id).await?;
            let generation =
                repository::get_directive_max_generation(&self.pool, directive.id).await? + 1;

            let plan = build_planning_prompt(&directive, &existing_steps, generation);

            if let Err(e) = self
                .spawn_orchestrator_task(
                    directive.id,
                    directive.owner_id,
                    format!("Re-plan: {}", directive.title),
                    plan,
                    directive.repository_url.as_deref(),
                    directive.base_branch.as_deref(),
                )
                .await
            {
                tracing::warn!(
                    directive_id = %directive.id,
                    error = %e,
                    "Failed to spawn re-planning task"
                );
            }
        }
        Ok(())
    }

    /// Spawn a planning/re-planning task and assign it as the directive's orchestrator task.
    async fn spawn_orchestrator_task(
        &self,
        directive_id: Uuid,
        owner_id: Uuid,
        name: String,
        plan: String,
        repo_url: Option<&str>,
        base_branch: Option<&str>,
    ) -> Result<(), anyhow::Error> {
        let req = CreateTaskRequest {
            contract_id: None,
            name,
            description: Some("Directive planning task".to_string()),
            plan,
            parent_task_id: None,
            is_supervisor: false,
            priority: 0,
            repository_url: repo_url.map(|s| s.to_string()),
            base_branch: base_branch.map(|s| s.to_string()),
            target_branch: None,
            merge_mode: None,
            target_repo_path: None,
            completion_action: None,
            continue_from_task_id: None,
            copy_files: None,
            checkpoint_sha: None,
            branched_from_task_id: None,
            conversation_history: None,
            supervisor_worktree_task_id: None,
            directive_id: Some(directive_id),
            directive_step_id: None,
        };

        let task = repository::create_task_for_owner(&self.pool, owner_id, req).await?;

        repository::assign_orchestrator_task(&self.pool, directive_id, task.id).await?;

        // Try to dispatch to a daemon
        self.try_dispatch_task(task.id, owner_id, &task.name, &task.plan, task.version).await;

        Ok(())
    }

    /// Spawn an execution task for a step.
    /// Links the task to the step but only marks the step as 'running' once dispatched.
    async fn spawn_step_task(
        &self,
        step_id: Uuid,
        directive_id: Uuid,
        owner_id: Uuid,
        name: String,
        plan: String,
        repo_url: Option<&str>,
        base_branch: Option<&str>,
    ) -> Result<(), anyhow::Error> {
        let req = CreateTaskRequest {
            contract_id: None,
            name,
            description: Some("Directive step execution task".to_string()),
            plan,
            parent_task_id: None,
            is_supervisor: false,
            priority: 0,
            repository_url: repo_url.map(|s| s.to_string()),
            base_branch: base_branch.map(|s| s.to_string()),
            target_branch: None,
            merge_mode: None,
            target_repo_path: None,
            completion_action: None,
            continue_from_task_id: None,
            copy_files: None,
            checkpoint_sha: None,
            branched_from_task_id: None,
            conversation_history: None,
            supervisor_worktree_task_id: None,
            directive_id: Some(directive_id),
            directive_step_id: Some(step_id),
        };

        let task = repository::create_task_for_owner(&self.pool, owner_id, req).await?;

        // Link the task to the step (sets task_id) but keep step as 'ready' for now
        repository::link_task_to_step(&self.pool, step_id, task.id).await?;

        // Only mark step as 'running' if we can actually dispatch the task
        if self
            .try_dispatch_task(task.id, owner_id, &task.name, &task.plan, task.version)
            .await
        {
            repository::set_step_running(&self.pool, step_id).await?;
        }

        Ok(())
    }

    /// Try to dispatch a task to an available daemon. Returns true if dispatched.
    async fn try_dispatch_task(
        &self,
        task_id: Uuid,
        owner_id: Uuid,
        task_name: &str,
        plan: &str,
        version: i32,
    ) -> bool {
        let Some(daemon_id) = self.state.find_alternative_daemon(owner_id, &[]) else {
            tracing::info!(
                task_id = %task_id,
                "No daemon available for directive task — leaving pending for retry"
            );
            return false;
        };

        // Update task status to starting and assign daemon
        let update_req = UpdateTaskRequest {
            status: Some("starting".to_string()),
            daemon_id: Some(daemon_id),
            version: Some(version),
            ..Default::default()
        };

        match repository::update_task_for_owner(&self.pool, task_id, owner_id, update_req).await {
            Ok(Some(updated_task)) => {
                let command = DaemonCommand::SpawnTask {
                    task_id,
                    task_name: task_name.to_string(),
                    plan: plan.to_string(),
                    repo_url: updated_task.repository_url.clone(),
                    base_branch: updated_task.base_branch.clone(),
                    target_branch: updated_task.target_branch.clone(),
                    parent_task_id: None,
                    depth: 0,
                    is_orchestrator: false,
                    target_repo_path: None,
                    completion_action: None,
                    continue_from_task_id: None,
                    copy_files: None,
                    contract_id: None,
                    is_supervisor: false,
                    autonomous_loop: false,
                    resume_session: false,
                    conversation_history: None,
                    patch_data: None,
                    patch_base_sha: None,
                    local_only: false,
                    auto_merge_local: false,
                    supervisor_worktree_task_id: None,
                    directive_id: updated_task.directive_id,
                };

                if let Err(e) = self.state.send_daemon_command(daemon_id, command).await {
                    tracing::warn!(
                        task_id = %task_id,
                        daemon_id = %daemon_id,
                        error = %e,
                        "Failed to send SpawnTask to daemon for directive task"
                    );
                    return false;
                } else {
                    tracing::info!(
                        task_id = %task_id,
                        daemon_id = %daemon_id,
                        "Dispatched directive task to daemon"
                    );
                    return true;
                }
            }
            Ok(None) => {
                tracing::warn!(task_id = %task_id, "Task not found when trying to dispatch");
            }
            Err(e) => {
                tracing::warn!(task_id = %task_id, error = %e, "Failed to update task for dispatch");
            }
        }
        false
    }
}

/// Build the planning prompt for a directive.
fn build_planning_prompt(
    directive: &crate::db::models::Directive,
    existing_steps: &[crate::db::models::DirectiveStep],
    generation: i32,
) -> String {
    let mut prompt = String::new();

    if !existing_steps.is_empty() {
        prompt.push_str(&format!(
            "EXISTING STEPS (generation {}):\n",
            generation - 1
        ));
        for step in existing_steps {
            prompt.push_str(&format!(
                "- {} [{}]: {}\n",
                step.name,
                step.status,
                step.description.as_deref().unwrap_or("(no description)")
            ));
        }
        prompt.push_str(&format!(
            "\nAdd new steps that build on or complement existing work. Use generation {}.\n\n",
            generation
        ));
    }

    prompt.push_str(&format!(
        r#"You are planning the implementation of a directive.

DIRECTIVE: "{title}"
GOAL: {goal}
{repo_section}
Your job:
1. Explore the repository to understand the codebase
2. Decompose the goal into concrete, ordered steps
3. Each step = one task for a Claude Code instance to execute
4. Submit ALL steps using the batch command or individual add-step commands

For each step, define:
- name: Short imperative title (e.g., "Add user authentication middleware")
- description: What to do and acceptance criteria
- taskPlan: Full instructions for the Claude instance (include file paths, patterns to follow)
- dependsOn: UUIDs of steps this depends on (use IDs from previous add-step responses)
- orderIndex: Execution order hint

Submit steps:
  makima directive add-step "Step Name" --description "..." --task-plan "..."
  (Use --depends-on "uuid1,uuid2" for dependencies, referencing IDs from earlier add-step outputs)

Or batch:
  makima directive batch-add-steps --json '[{{"name":"...","description":"...","taskPlan":"...","dependsOn":[],"orderIndex":0}}]'

IMPORTANT: Each step's taskPlan must be self-contained. The executing instance won't have your planning context.
"#,
        title = directive.title,
        goal = directive.goal,
        repo_section = match &directive.repository_url {
            Some(url) => format!("REPOSITORY: {}\n", url),
            None => String::new(),
        },
    ));

    prompt
}