From 89ccd1375c2eaddd1f2b74a42b2d152c83b346e8 Mon Sep 17 00:00:00 2001 From: soryu Date: Sat, 7 Feb 2026 21:53:59 +0000 Subject: Check on completion for contracts --- makima/src/orchestration/directive.rs | 125 ++++++++++++++++++++++++++++++++ makima/src/server/handlers/contracts.rs | 19 ++++- 2 files changed, 143 insertions(+), 1 deletion(-) diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs index e779c18..4b75b4a 100644 --- a/makima/src/orchestration/directive.rs +++ b/makima/src/orchestration/directive.rs @@ -181,6 +181,98 @@ pub async fn init_directive( } /// Called when any task completes — checks if it's directive-related and advances. +/// Called when a contract's status is updated to "completed" via the API. +/// This is the primary entry point for directive orchestration because supervisor +/// tasks do not send TaskComplete messages — they complete via contract status updates. +pub async fn on_contract_completed( + pool: &PgPool, + state: &SharedState, + contract: &crate::db::models::Contract, + owner_id: Uuid, +) -> Result<(), String> { + if contract.status != "completed" { + return Ok(()); + } + + if contract.is_directive_orchestrator { + let directive = + repository::get_directive_by_orchestrator_contract(pool, contract.id) + .await + .map_err(|e| format!("Failed to get directive by orchestrator: {}", e))?; + + if let Some(directive) = directive { + tracing::info!( + directive_id = %directive.id, + contract_id = %contract.id, + "Directive orchestrator contract completed, processing plan" + ); + process_planning_result(pool, state, &directive, contract.id, owner_id).await?; + } else { + tracing::warn!( + contract_id = %contract.id, + "Directive orchestrator contract completed but no directive found" + ); + } + } else if let Some(directive_id) = contract.directive_id { + // Check if this is a monitoring contract + let monitoring_step = + repository::get_step_by_monitoring_contract_id(pool, contract.id) + .await + .map_err(|e| format!("Failed to check monitoring contract: {}", e))?; + + if let Some(step) = monitoring_step { + tracing::info!( + directive_id = %directive_id, + step_id = %step.id, + contract_id = %contract.id, + "Monitoring contract completed" + ); + process_monitoring_result(pool, state, contract, &step, owner_id).await?; + } else { + // Step contract completed + let step = repository::get_step_by_contract_id(pool, contract.id) + .await + .map_err(|e| format!("Failed to get step by contract: {}", e))?; + + if let Some(step) = step { + let directive = repository::get_directive(pool, directive_id) + .await + .map_err(|e| format!("Failed to get directive: {}", e))? + .ok_or("Directive not found")?; + + tracing::info!( + directive_id = %directive_id, + step_id = %step.id, + contract_id = %contract.id, + "Step contract completed, dispatching monitoring" + ); + + // Step contract completed successfully — dispatch monitoring + repository::update_step_status(pool, step.id, "evaluating") + .await + .map_err(|e| format!("Failed to update step status: {}", e))?; + + let _ = repository::create_directive_event( + pool, + directive.id, + directive.current_chain_id, + Some(step.id), + "step_evaluating", + "info", + None, + "system", + None, + ) + .await; + + dispatch_monitoring(pool, &directive, &step, contract, owner_id).await?; + } + } + } + + Ok(()) +} + pub async fn on_task_completed( pool: &PgPool, state: &SharedState, @@ -257,6 +349,18 @@ async fn on_planning_completed( return Ok(()); }; + process_planning_result(pool, state, directive, contract_id, owner_id).await +} + +/// Core logic: read plan from contract files, create chain and steps, advance. +/// Called from both `on_planning_completed` (task path) and `on_contract_completed` (API path). +async fn process_planning_result( + pool: &PgPool, + state: &SharedState, + directive: &Directive, + contract_id: Uuid, + owner_id: Uuid, +) -> Result<(), String> { // Get contract files to find the chain plan let files = repository::list_files_in_contract(pool, contract_id, owner_id) .await @@ -1031,6 +1135,27 @@ async fn on_monitoring_completed( return Ok(()); } + process_monitoring_result(pool, state, contract, step, owner_id).await +} + +/// Core monitoring logic: read evaluation from files, create record, handle pass/fail/rework. +/// Called from both `on_monitoring_completed` (task path) and `on_contract_completed` (API path). +async fn process_monitoring_result( + pool: &PgPool, + state: &SharedState, + contract: &crate::db::models::Contract, + step: &ChainStep, + owner_id: Uuid, +) -> Result<(), String> { + let Some(directive_id) = contract.directive_id else { + return Ok(()); + }; + + let directive = repository::get_directive(pool, directive_id) + .await + .map_err(|e| format!("Failed to get directive: {}", e))? + .ok_or("Directive not found")?; + // Read evaluation result from monitoring contract files let files = repository::list_files_in_contract(pool, contract.id, owner_id) .await diff --git a/makima/src/server/handlers/contracts.rs b/makima/src/server/handlers/contracts.rs index a83c72d..ad0a1ff 100644 --- a/makima/src/server/handlers/contracts.rs +++ b/makima/src/server/handlers/contracts.rs @@ -575,7 +575,24 @@ pub async fn update_contract( }), ).await; - // TODO: Directive engine integration (removed for reimplementation) + // Directive engine integration — process planning/step/monitoring completion + if contract.is_directive_orchestrator || contract.directive_id.is_some() { + let pool_clone = pool.clone(); + let state_clone = state.clone(); + let contract_clone = contract.clone(); + let owner = auth.owner_id; + tokio::spawn(async move { + if let Err(e) = crate::orchestration::directive::on_contract_completed( + &pool_clone, &state_clone, &contract_clone, owner, + ).await { + tracing::warn!( + contract_id = %contract_clone.id, + error = %e, + "Failed to process directive contract completion" + ); + } + }); + } } // Get summary with counts -- cgit v1.2.3