diff options
Diffstat (limited to 'makima/src/orchestration/directive.rs')
| -rw-r--r-- | makima/src/orchestration/directive.rs | 258 |
1 files changed, 154 insertions, 104 deletions
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs index 0dbdbf3..80e2a8b 100644 --- a/makima/src/orchestration/directive.rs +++ b/makima/src/orchestration/directive.rs @@ -182,6 +182,62 @@ pub async fn init_directive( Ok(updated) } +/// Submit a chain plan for a directive via the CLI/API (instead of file-based extraction). +pub async fn submit_plan( + pool: &PgPool, + state: &SharedState, + owner_id: Uuid, + directive_id: Uuid, + plan_json: &str, +) -> Result<Directive, String> { + // 1. Get directive, verify status + let directive = repository::get_directive_for_owner(pool, directive_id, owner_id) + .await + .map_err(|e| format!("Failed to get directive: {}", e))? + .ok_or("Directive not found")?; + + if directive.status != "planning" { + return Err(format!( + "Directive must be in 'planning' status to submit a plan, current status: '{}'", + directive.status + )); + } + + // 2. Idempotency: if current_chain_id already set, return existing directive + if directive.current_chain_id.is_some() { + tracing::info!( + directive_id = %directive_id, + "Plan already submitted (current_chain_id set), returning existing directive" + ); + return Ok(directive); + } + + // 3. Parse the plan JSON + let chain_plan: ChainPlan = serde_json::from_str(plan_json) + .map_err(|e| format!("Failed to parse chain plan JSON: {}", e))?; + + if chain_plan.steps.is_empty() { + return Err("Chain plan has no steps".to_string()); + } + + // 4. Create chain and steps, transition to active + create_chain_and_steps(pool, state, &directive, &chain_plan, owner_id).await?; + + // 5. Re-fetch and return the updated directive + let updated = repository::get_directive(pool, directive_id) + .await + .map_err(|e| format!("Failed to re-fetch directive: {}", e))? + .ok_or("Directive not found after plan submission")?; + + tracing::info!( + directive_id = %directive_id, + step_count = chain_plan.steps.len(), + "Plan submitted via API, directive now active" + ); + + Ok(updated) +} + /// Called when any task completes — checks if it's directive-related and advances. /// Called when a contract's status is updated to "completed" via the API. /// This is the primary entry point for directive orchestration because supervisor @@ -206,9 +262,9 @@ pub async fn on_contract_completed( tracing::info!( directive_id = %directive.id, contract_id = %contract.id, - "Directive orchestrator contract completed, processing plan" + "Directive orchestrator contract completed, handling planning completion" ); - process_planning_result(pool, state, &directive, contract.id, owner_id).await?; + handle_planning_completion(pool, state, &directive, owner_id).await?; } else { tracing::warn!( contract_id = %contract.id, @@ -347,120 +403,113 @@ async fn on_planning_completed( return Ok(()); } - let Some(contract_id) = task.contract_id else { - return Ok(()); - }; - - process_planning_result(pool, state, directive, contract_id, owner_id).await + handle_planning_completion(pool, state, directive, owner_id).await } -/// Core logic: read plan from contract files, create chain and steps, advance. -/// Called from both `on_planning_completed` (task path) and `on_contract_completed` (API path). -async fn process_planning_result( +/// Handle planning contract/task completion. +/// Checks if a plan was submitted via the CLI; if not, retries or fails. +async fn handle_planning_completion( pool: &PgPool, - state: &SharedState, + _state: &SharedState, directive: &Directive, - contract_id: Uuid, owner_id: Uuid, ) -> Result<(), String> { - // Idempotency guard: only process if directive is still in "planning" status. - // Both the contract-completion and task-completion paths can fire concurrently. + // Re-fetch directive to check latest state let current = repository::get_directive(pool, directive.id) .await - .map_err(|e| format!("Failed to re-fetch directive: {}", e))?; - if let Some(ref d) = current { - if d.status != "planning" { - tracing::info!( - directive_id = %directive.id, - status = %d.status, - "Skipping process_planning_result: directive no longer in planning status" - ); - return Ok(()); - } - } - - // Get contract files to find the chain plan - let files = repository::list_files_in_contract(pool, contract_id, owner_id) - .await - .map_err(|e| format!("Failed to list contract files: {}", e))?; - - // Find the chain plan file - let plan_file = files.iter().find(|f| { - let name_lower = f.name.to_lowercase(); - name_lower.contains("chain") || name_lower.contains("plan") - }); - - let plan_file = plan_file.or_else(|| files.first()); + .map_err(|e| format!("Failed to re-fetch directive: {}", e))? + .ok_or("Directive not found")?; - let Some(plan_file) = plan_file else { - tracing::warn!( + // Idempotency: only process if still in "planning" status + if current.status != "planning" { + tracing::info!( directive_id = %directive.id, - "No plan file found in planning contract, marking directive failed" + status = %current.status, + "Skipping handle_planning_completion: directive no longer in planning status" ); - repository::update_directive_status(pool, directive.id, "failed") - .await - .map_err(|e| format!("Failed to update directive status: {}", e))?; return Ok(()); - }; - - // Read the full file to get the body content - let full_file = repository::get_file(pool, plan_file.id) - .await - .map_err(|e| format!("Failed to get plan file: {}", e))? - .ok_or("Plan file not found")?; + } - // Extract JSON from the file body elements - let plan_json = extract_plan_json(&full_file.body); + // If plan was already submitted via CLI (current_chain_id is set), nothing to do + if current.current_chain_id.is_some() { + tracing::info!( + directive_id = %directive.id, + "Plan already submitted via CLI, skipping handle_planning_completion" + ); + return Ok(()); + } - let Some(plan_json) = plan_json else { + // No plan was submitted — check retry budget + let max_regenerations = current.max_chain_regenerations.unwrap_or(2); + if current.chain_generation_count < max_regenerations { tracing::warn!( directive_id = %directive.id, - "Could not extract chain plan JSON from file body, marking directive failed" + attempt = current.chain_generation_count + 1, + max = max_regenerations, + "Planning completed without plan submission, retrying" ); - repository::update_directive_status(pool, directive.id, "failed") + + let _ = repository::create_directive_event( + pool, + directive.id, + None, + None, + "planning_retry", + "warn", + Some(&serde_json::json!({ + "attempt": current.chain_generation_count + 1, + "maxRegenerations": max_regenerations, + "reason": "Planning contract completed without submitting a plan" + })), + "system", + None, + ) + .await; + + // Increment generation count + repository::increment_chain_generation_count(pool, directive.id) .await - .map_err(|e| format!("Failed to update directive status: {}", e))?; - return Ok(()); - }; + .map_err(|e| format!("Failed to increment chain generation count: {}", e))?; - let chain_plan: ChainPlan = match serde_json::from_str(&plan_json) { - Ok(plan) => plan, - Err(e) => { - tracing::warn!( - directive_id = %directive.id, - error = %e, - "Failed to parse chain plan JSON, marking directive failed" - ); - repository::update_directive_status(pool, directive.id, "failed") - .await - .map_err(|e| format!("Failed to update directive status: {}", e))?; - return Ok(()); - } - }; + // Reset to draft so init_directive can be called again + repository::update_directive_status(pool, directive.id, "draft") + .await + .map_err(|e| format!("Failed to reset directive status: {}", e))?; - if chain_plan.steps.is_empty() { - tracing::warn!( + // Re-init planning + init_directive(pool, _state, owner_id, directive.id).await?; + + Ok(()) + } else { + tracing::error!( directive_id = %directive.id, - "Chain plan has no steps, marking directive failed" + attempts = current.chain_generation_count, + max = max_regenerations, + "Planning failed: max regeneration attempts exhausted without plan submission" ); + + let _ = repository::create_directive_event( + pool, + directive.id, + None, + None, + "planning_failed", + "error", + Some(&serde_json::json!({ + "attempts": current.chain_generation_count, + "maxRegenerations": max_regenerations, + "reason": "Max chain regeneration attempts exhausted without plan submission" + })), + "system", + None, + ) + .await; + repository::update_directive_status(pool, directive.id, "failed") .await .map_err(|e| format!("Failed to update directive status: {}", e))?; - return Ok(()); - } - // Create chain and steps — if anything fails, mark directive as failed - match create_chain_and_steps(pool, state, directive, &chain_plan, owner_id).await { - Ok(()) => Ok(()), - Err(e) => { - tracing::error!( - directive_id = %directive.id, - error = %e, - "Failed to create chain/steps, marking directive failed" - ); - let _ = repository::update_directive_status(pool, directive.id, "failed").await; - Err(e) - } + Ok(()) } } @@ -848,18 +897,15 @@ fn build_planning_prompt(directive: &Directive) -> String { format!( r#"You are planning the execution of a directive. -DIRECTIVE: {} -GOAL: {} -REQUIREMENTS: {} -ACCEPTANCE CRITERIA: {} -CONSTRAINTS: {} +DIRECTIVE: {title} +GOAL: {goal} +REQUIREMENTS: {requirements} +ACCEPTANCE CRITERIA: {acceptance_criteria} +CONSTRAINTS: {constraints} Your job is to decompose this goal into a sequence of executable steps. Each step will become a separate contract with its own supervisor. -Write a JSON plan to a contract file named "chain-plan" using: - makima contract create-file "chain-plan" < plan.json - The JSON format: {{ "steps": [ @@ -880,13 +926,17 @@ Rules: - Be specific in task_plan — include file paths, function names, and acceptance criteria where possible. - Keep the number of steps reasonable (3-10 typically). -After writing the plan file, mark the contract as complete using: +Submit your plan by piping the JSON to stdin: + echo '<your_json_plan>' | makima directive submit-plan --directive-id {directive_id} + +After submitting the plan, mark the contract as complete: makima supervisor complete"#, - directive.title, - directive.goal, - serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(), - serde_json::to_string_pretty(&directive.acceptance_criteria).unwrap_or_default(), - serde_json::to_string_pretty(&directive.constraints).unwrap_or_default(), + title = directive.title, + goal = directive.goal, + requirements = serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(), + acceptance_criteria = serde_json::to_string_pretty(&directive.acceptance_criteria).unwrap_or_default(), + constraints = serde_json::to_string_pretty(&directive.constraints).unwrap_or_default(), + directive_id = directive.id, ) } |
