diff options
Diffstat (limited to 'makima/src/orchestration')
| -rw-r--r-- | makima/src/orchestration/directive.rs | 109 |
1 files changed, 103 insertions, 6 deletions
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs index 4b75b4a..0dbdbf3 100644 --- a/makima/src/orchestration/directive.rs +++ b/makima/src/orchestration/directive.rs @@ -13,11 +13,13 @@ use crate::server::state::SharedState; /// A single step in the chain plan produced by the planning supervisor. #[derive(Debug, Deserialize)] +#[serde(rename_all = "snake_case")] struct ChainPlanStep { name: String, description: String, + #[serde(alias = "taskPlan")] task_plan: String, - #[serde(default)] + #[serde(default, alias = "dependsOn")] depends_on: Vec<String>, // names of steps this depends on } @@ -361,6 +363,22 @@ async fn process_planning_result( contract_id: Uuid, owner_id: Uuid, ) -> Result<(), String> { + // Idempotency guard: only process if directive is still in "planning" status. + // Both the contract-completion and task-completion paths can fire concurrently. + let current = repository::get_directive(pool, directive.id) + .await + .map_err(|e| format!("Failed to re-fetch directive: {}", e))?; + if let Some(ref d) = current { + if d.status != "planning" { + tracing::info!( + directive_id = %directive.id, + status = %d.status, + "Skipping process_planning_result: directive no longer in planning status" + ); + return Ok(()); + } + } + // Get contract files to find the chain plan let files = repository::list_files_in_contract(pool, contract_id, owner_id) .await @@ -405,9 +423,20 @@ async fn process_planning_result( return Ok(()); }; - let chain_plan: ChainPlan = serde_json::from_str(&plan_json).map_err(|e| { - format!("Failed to parse chain plan JSON: {}", e) - })?; + let chain_plan: ChainPlan = match serde_json::from_str(&plan_json) { + Ok(plan) => plan, + Err(e) => { + tracing::warn!( + directive_id = %directive.id, + error = %e, + "Failed to parse chain plan JSON, marking directive failed" + ); + repository::update_directive_status(pool, directive.id, "failed") + .await + .map_err(|e| format!("Failed to update directive status: {}", e))?; + return Ok(()); + } + }; if chain_plan.steps.is_empty() { tracing::warn!( @@ -420,6 +449,30 @@ async fn process_planning_result( return Ok(()); } + // Create chain and steps — if anything fails, mark directive as failed + match create_chain_and_steps(pool, state, directive, &chain_plan, owner_id).await { + Ok(()) => Ok(()), + Err(e) => { + tracing::error!( + directive_id = %directive.id, + error = %e, + "Failed to create chain/steps, marking directive failed" + ); + let _ = repository::update_directive_status(pool, directive.id, "failed").await; + Err(e) + } + } +} + +/// Inner helper: create chain, steps, set current chain, transition to active, and advance. +/// Extracted so that `process_planning_result` can catch errors and mark the directive failed. +async fn create_chain_and_steps( + pool: &PgPool, + state: &SharedState, + directive: &Directive, + chain_plan: &ChainPlan, + owner_id: Uuid, +) -> Result<(), String> { // Create chain let chain = repository::create_directive_chain( pool, @@ -571,6 +624,8 @@ async fn on_step_completed( .await .map_err(|e| format!("Failed to update step status: {}", e))?; + let _ = repository::increment_chain_failed_steps(pool, step.chain_id).await; + tracing::info!( directive_id = %directive_id, step_id = %step.id, @@ -678,12 +733,31 @@ async fn dispatch_step( .await .map_err(|e| format!("Failed to set contract directive fields: {}", e))?; - // Build the task plan - let task_plan = step + // Build the task plan, prepending rework instructions if this is a rework cycle + let mut task_plan = step .task_plan .clone() .unwrap_or_else(|| format!("Execute step: {}", step.name)); + if let Some(eval_id) = step.last_evaluation_id { + if let Ok(Some(evaluation)) = repository::get_directive_evaluation(pool, eval_id).await { + if let Some(ref rework) = evaluation.rework_instructions { + task_plan = format!( + "IMPORTANT — REWORK REQUIRED (attempt #{}):\n\ + The previous attempt was evaluated and did NOT pass.\n\ + Feedback: {}\n\ + Rework instructions: {}\n\n\ + ---\n\n\ + Original task plan:\n{}", + step.rework_count + 1, + evaluation.summary_feedback, + rework, + task_plan, + ); + } + } + } + // Create supervisor task let supervisor_task = repository::create_task_for_owner( pool, @@ -1115,6 +1189,8 @@ async fn on_monitoring_completed( .await .map_err(|e| format!("Failed to update step status: {}", e))?; + let _ = repository::increment_chain_completed_steps(pool, step.chain_id).await; + let _ = repository::create_directive_event( pool, directive_id, @@ -1151,6 +1227,21 @@ async fn process_monitoring_result( return Ok(()); }; + // Idempotency guard: re-fetch step and only process if still "evaluating". + let current_step = repository::get_chain_step(pool, step.id) + .await + .map_err(|e| format!("Failed to re-fetch step: {}", e))?; + if let Some(ref s) = current_step { + if s.status != "evaluating" { + tracing::info!( + step_id = %step.id, + status = %s.status, + "Skipping process_monitoring_result: step no longer in evaluating status" + ); + return Ok(()); + } + } + let directive = repository::get_directive(pool, directive_id) .await .map_err(|e| format!("Failed to get directive: {}", e))? @@ -1195,6 +1286,8 @@ async fn process_monitoring_result( .await .map_err(|e| format!("Failed to update step status: {}", e))?; + let _ = repository::increment_chain_completed_steps(pool, step.chain_id).await; + let _ = repository::create_directive_event( pool, directive_id, @@ -1276,6 +1369,8 @@ async fn process_monitoring_result( .await .map_err(|e| format!("Failed to update step status: {}", e))?; + let _ = repository::increment_chain_completed_steps(pool, step.chain_id).await; + advance_chain(pool, state, &directive, owner_id).await } else { // Evaluation failed — check rework budget @@ -1294,6 +1389,8 @@ async fn process_monitoring_result( .await .map_err(|e| format!("Failed to update step status: {}", e))?; + let _ = repository::increment_chain_failed_steps(pool, step.chain_id).await; + advance_chain(pool, state, &directive, owner_id).await } else { tracing::info!( |
