diff options
Diffstat (limited to 'makima/src')
| -rw-r--r-- | makima/src/db/repository.rs | 34 | ||||
| -rw-r--r-- | makima/src/orchestration/engine.rs | 410 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 15 |
3 files changed, 348 insertions, 111 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index eeda4a5..cd806f0 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -5191,6 +5191,40 @@ pub async fn update_directive_status( .await } +/// Set the orchestrator contract ID for a directive. +pub async fn set_directive_orchestrator_contract( + pool: &PgPool, + directive_id: Uuid, + contract_id: Uuid, +) -> Result<(), sqlx::Error> { + sqlx::query( + r#" + UPDATE directives SET orchestrator_contract_id = $2, updated_at = NOW() + WHERE id = $1 + "#, + ) + .bind(directive_id) + .bind(contract_id) + .execute(pool) + .await?; + Ok(()) +} + +/// Find a directive by its orchestrator contract ID. +pub async fn get_directive_by_orchestrator_contract_id( + pool: &PgPool, + contract_id: Uuid, +) -> Result<Option<Directive>, sqlx::Error> { + sqlx::query_as::<_, Directive>( + r#" + SELECT * FROM directives WHERE orchestrator_contract_id = $1 + "#, + ) + .bind(contract_id) + .fetch_optional(pool) + .await +} + /// Archive a directive (soft delete). pub async fn archive_directive_for_owner( pool: &PgPool, diff --git a/makima/src/orchestration/engine.rs b/makima/src/orchestration/engine.rs index 954b857..c794156 100644 --- a/makima/src/orchestration/engine.rs +++ b/makima/src/orchestration/engine.rs @@ -133,7 +133,7 @@ impl DirectiveEngine { // Directive Lifecycle // ======================================================================== - /// Start a directive: generate chain and begin execution. + /// Start a directive: spawn a planning contract+task to generate the chain. pub async fn start_directive(&self, directive_id: Uuid) -> Result<(), EngineError> { let directive = repository::get_directive(&self.pool, directive_id) .await? @@ -158,33 +158,101 @@ impl DirectiveEngine { ) .await?; - // Generate chain (placeholder - actual LLM call would go here) - let chain = self.generate_initial_chain(&directive).await?; - - // Create chain in database - let db_chain = repository::create_directive_chain( + // Create an empty chain for the planning task to populate + let chain_name = format!( + "{}-chain", + directive.title.to_lowercase().replace(' ', "-") + ); + let _db_chain = repository::create_directive_chain( &self.pool, directive_id, - &chain.name, - Some(&chain.description), + &chain_name, + Some(&format!("Execution plan for: {}", directive.goal)), None, // rationale None, // planning_model ) .await?; - // Create steps - self.create_steps_from_chain(&db_chain.id, &chain).await?; + // Create a planning contract (type "execute", no phase guard) + let contract = repository::create_contract_for_owner( + &self.pool, + directive.owner_id, + CreateContractRequest { + name: format!("{} - Planning", directive.title), + description: Some(format!( + "Planning contract for directive: {}", + directive.goal + )), + contract_type: Some("execute".to_string()), + template_id: None, + initial_phase: Some("execute".to_string()), + autonomous_loop: Some(true), + phase_guard: Some(false), + local_only: Some(false), + auto_merge_local: None, + }, + ) + .await + .map_err(|e| { + EngineError::ContractCreation(format!("Failed to create planning contract: {}", e)) + })?; - // Update directive to active - repository::update_directive_status(&self.pool, directive_id, "active").await?; - self.emit_event(EngineEvent::DirectiveStatusChanged { + // Build instructions for the planning task + let plan = self.build_planning_task_instructions(&directive); + + // Create the planning task + let _task = repository::create_task_for_owner( + &self.pool, + directive.owner_id, + CreateTaskRequest { + contract_id: Some(contract.id), + name: format!("{} - Planning", directive.title), + description: Some(format!( + "Plan the execution chain for directive: {}", + directive.goal + )), + plan, + parent_task_id: None, + is_supervisor: true, + priority: 5, + repository_url: directive.repository_url.clone(), + base_branch: directive.base_branch.clone(), + target_branch: None, + merge_mode: None, + target_repo_path: None, + completion_action: Some("none".to_string()), + continue_from_task_id: None, + copy_files: None, + checkpoint_sha: None, + branched_from_task_id: None, + conversation_history: None, + supervisor_worktree_task_id: None, + }, + ) + .await + .map_err(|e| { + EngineError::ContractCreation(format!("Failed to create planning task: {}", e)) + })?; + + // Link the planning contract to the directive + repository::set_directive_orchestrator_contract( + &self.pool, directive_id, - old_status: "planning".to_string(), - new_status: "active".to_string(), - }); + contract.id, + ) + .await?; - // Start ready steps - self.advance_chain(directive_id).await?; + self.emit_directive_event( + directive_id, + "planning_started", + "info", + serde_json::json!({ + "contract_id": contract.id, + "message": "Planning task spawned, waiting for chain generation", + }), + "system", + ) + .await?; Ok(()) } @@ -265,67 +333,7 @@ impl DirectiveEngine { // Chain Management // ======================================================================== - /// Generate initial chain from directive using LLM. - async fn generate_initial_chain( - &self, - directive: &Directive, - ) -> Result<GeneratedChain, EngineError> { - // Build planning prompt - let prompt = self.planner.build_planning_prompt(directive); - - // Try LLM chain generation, fall back to default if unavailable - let chain = match self.generate_chain_via_llm(&prompt).await { - Ok(chain) => { - tracing::info!( - "LLM generated chain with {} steps for directive {}", - chain.steps.len(), - directive.id - ); - chain - } - Err(e) => { - tracing::warn!( - "LLM chain generation failed ({}), using default chain for directive {}", - e, - directive.id - ); - self.build_default_chain(directive) - } - }; - - // Validate the chain - self.planner.validate_chain(&chain)?; - - Ok(chain) - } - - /// Call LLM to generate a chain from the planning prompt. - async fn generate_chain_via_llm(&self, prompt: &str) -> Result<GeneratedChain, EngineError> { - use crate::llm::claude::{ClaudeClient, ClaudeModel, Message, MessageContent}; - - let client = ClaudeClient::from_env(ClaudeModel::Sonnet) - .map_err(|e| EngineError::LlmError(format!("Failed to create LLM client: {}", e)))?; - - let messages = vec![Message { - role: "user".to_string(), - content: MessageContent::Text(prompt.to_string()), - }]; - - let result = client - .chat_with_tools(messages, &[]) - .await - .map_err(|e| EngineError::LlmError(format!("LLM call failed: {}", e)))?; - - let response_text = result - .content - .ok_or_else(|| EngineError::LlmError("Empty LLM response".to_string()))?; - - self.planner - .parse_plan_response(&response_text) - .map_err(|e| EngineError::Planner(e)) - } - - /// Build a default chain when LLM is unavailable. + /// Build a default chain as a fallback. fn build_default_chain(&self, directive: &Directive) -> GeneratedChain { GeneratedChain { name: format!( @@ -458,38 +466,9 @@ impl DirectiveEngine { .await? .ok_or(EngineError::ChainNotFound(directive_id))?; - // Get completed and failed steps - let steps = repository::list_chain_steps(&self.pool, current_chain.id).await?; - let completed_steps: Vec<_> = steps.iter().filter(|s| s.status == "passed").collect(); - let failed_step = steps.iter().find(|s| s.status == "failed"); - - // Build replan prompt - let prompt = self.planner.build_replan_prompt( - &directive, - &completed_steps.iter().map(|s| (*s).clone()).collect::<Vec<_>>(), - failed_step.map(|s| &*s), - reason, - ); - - // Try LLM regeneration, fall back to default - let new_chain = match self.generate_chain_via_llm(&prompt).await { - Ok(chain) => { - tracing::info!( - "LLM regenerated chain with {} steps for directive {}", - chain.steps.len(), - directive.id - ); - chain - } - Err(e) => { - tracing::warn!( - "LLM chain regeneration failed ({}), using default chain for directive {}", - e, - directive.id - ); - self.build_default_chain(&directive) - } - }; + // Use default chain for regeneration + // (planning contract handles initial generation; regeneration uses fallback) + let new_chain = self.build_default_chain(&directive); // Supersede old chain repository::supersede_chain(&self.pool, current_chain.id).await?; @@ -986,6 +965,215 @@ impl DirectiveEngine { } // ======================================================================== + // Planning + // ======================================================================== + + /// Build the task instructions for the planning task. + fn build_planning_task_instructions(&self, directive: &Directive) -> String { + let requirements: Vec<String> = directive + .requirements + .as_array() + .map(|arr| { + arr.iter() + .filter_map(|v| v.as_object()) + .map(|obj| { + let id = obj.get("id").and_then(|v| v.as_str()).unwrap_or("?"); + let desc = obj + .get("description") + .and_then(|v| v.as_str()) + .unwrap_or("?"); + format!("- {}: {}", id, desc) + }) + .collect() + }) + .unwrap_or_default(); + + let criteria: Vec<String> = directive + .acceptance_criteria + .as_array() + .map(|arr| { + arr.iter() + .filter_map(|v| v.as_object()) + .map(|obj| { + let id = obj.get("id").and_then(|v| v.as_str()).unwrap_or("?"); + let criterion = obj + .get("criterion") + .and_then(|v| v.as_str()) + .unwrap_or("?"); + format!("- {}: {}", id, criterion) + }) + .collect() + }) + .unwrap_or_default(); + + let constraints: Vec<String> = directive + .constraints + .as_array() + .map(|arr| { + arr.iter() + .filter_map(|v| v.as_str()) + .map(|s| format!("- {}", s)) + .collect() + }) + .unwrap_or_default(); + + let repo_info = directive + .repository_url + .as_deref() + .unwrap_or("(not specified)"); + + format!( + r#"You are planning an execution chain for a directive. + +## Directive: {title} +## Goal +{goal} + +## Requirements +{requirements} + +## Acceptance Criteria +{criteria} + +## Constraints +{constraints} + +## Repository: {repo} + +## Your Task + +Analyze the repository and create a chain of execution steps. +For each step, add it via the API: + +```bash +curl -s -X POST "$MAKIMA_URL/api/v1/directives/{directive_id}/chain/steps" \ + -H "Authorization: Bearer $MAKIMA_API_KEY" \ + -H "Content-Type: application/json" \ + -d '{{ + "name": "step-name", + "description": "What this step accomplishes", + "step_type": "implement", + "depends_on": [], + "contract_type": "execute", + "initial_phase": "execute", + "task_plan": "Detailed instructions for the step executor" + }}' +``` + +### Step types +Use these step types: research, design, implement, test, review, document + +### Dependencies +Each step can depend on other steps by name. Use the `depends_on` field with an array of step names. +Steps with no dependencies will run in parallel. + +### Guidelines +1. Break the work into logical, independently executable steps +2. Each step should be completable by a single Claude Code session +3. Use dependencies to enforce ordering where needed +4. Include a "test" or "verify" step at the end +5. Keep step names in kebab-case +6. The `task_plan` field should contain detailed instructions for the agent that will execute the step + +When you have added all steps, your task is complete."#, + title = directive.title, + goal = directive.goal, + requirements = if requirements.is_empty() { + "(none)".to_string() + } else { + requirements.join("\n") + }, + criteria = if criteria.is_empty() { + "(none)".to_string() + } else { + criteria.join("\n") + }, + constraints = if constraints.is_empty() { + "(none)".to_string() + } else { + constraints.join("\n") + }, + repo = repo_info, + directive_id = directive.id, + ) + } + + /// Handle planning task completion. + pub async fn on_planning_complete( + &self, + directive_id: Uuid, + success: bool, + ) -> Result<(), EngineError> { + let directive = repository::get_directive(&self.pool, directive_id) + .await? + .ok_or(EngineError::DirectiveNotFound(directive_id))?; + + // Only process if directive is still in planning state + if directive.status != "planning" { + tracing::warn!( + "Directive {} is in state '{}', not 'planning'. Skipping planning completion.", + directive_id, + directive.status + ); + return Ok(()); + } + + // Get current chain + let chain = repository::get_current_chain(&self.pool, directive_id) + .await? + .ok_or(EngineError::ChainNotFound(directive_id))?; + + // Check if chain has steps + let steps = repository::list_chain_steps(&self.pool, chain.id).await?; + + if success && !steps.is_empty() { + tracing::info!( + "Planning completed successfully for directive {} with {} steps", + directive_id, + steps.len() + ); + } else { + // Fall back to default chain + let reason = if !success { + "Planning task failed" + } else { + "Planning task produced no steps" + }; + tracing::warn!( + "{} for directive {}, using default chain", + reason, + directive_id + ); + + let default_chain = self.build_default_chain(&directive); + self.create_steps_from_chain(&chain.id, &default_chain) + .await?; + } + + // Activate the directive + repository::update_directive_status(&self.pool, directive_id, "active").await?; + self.emit_event(EngineEvent::DirectiveStatusChanged { + directive_id, + old_status: "planning".to_string(), + new_status: "active".to_string(), + }); + + self.emit_directive_event( + directive_id, + "planning_completed", + "info", + serde_json::json!({"success": success}), + "system", + ) + .await?; + + // Start ready steps + self.advance_chain(directive_id).await?; + + Ok(()) + } + + // ======================================================================== // Circuit Breakers // ======================================================================== diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index cb929ea..9938145 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -1302,6 +1302,21 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re "error": &updated_task.error_message, }), ).await; + + // Check if this task's contract is a directive orchestrator + if let Some(contract_id) = updated_task.contract_id { + if let Ok(Some(directive)) = repository::get_directive_by_orchestrator_contract_id( + &pool, contract_id + ).await { + let engine = crate::orchestration::DirectiveEngine::new(pool.clone()); + if let Err(e) = engine.on_planning_complete(directive.id, success).await { + tracing::error!( + "Failed to handle planning completion for directive {}: {}", + directive.id, e + ); + } + } + } } Ok(None) => { tracing::warn!( |
