diff options
| author | soryu <soryu@soryu.co> | 2026-02-07 21:53:59 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-02-07 21:53:59 +0000 |
| commit | 89ccd1375c2eaddd1f2b74a42b2d152c83b346e8 (patch) | |
| tree | 2b33025aaddef7d2a872af22cfea815344918e9e /makima/src/orchestration/directive.rs | |
| parent | 9762ee464419042b817ff58ea8ec4d9678dc0fb4 (diff) | |
| download | soryu-89ccd1375c2eaddd1f2b74a42b2d152c83b346e8.tar.gz soryu-89ccd1375c2eaddd1f2b74a42b2d152c83b346e8.zip | |
Check on completion for contracts
Diffstat (limited to 'makima/src/orchestration/directive.rs')
| -rw-r--r-- | makima/src/orchestration/directive.rs | 125 |
1 files changed, 125 insertions, 0 deletions
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs index e779c18..4b75b4a 100644 --- a/makima/src/orchestration/directive.rs +++ b/makima/src/orchestration/directive.rs @@ -181,6 +181,98 @@ pub async fn init_directive( } /// Called when any task completes — checks if it's directive-related and advances. +/// Called when a contract's status is updated to "completed" via the API. +/// This is the primary entry point for directive orchestration because supervisor +/// tasks do not send TaskComplete messages — they complete via contract status updates. +pub async fn on_contract_completed( + pool: &PgPool, + state: &SharedState, + contract: &crate::db::models::Contract, + owner_id: Uuid, +) -> Result<(), String> { + if contract.status != "completed" { + return Ok(()); + } + + if contract.is_directive_orchestrator { + let directive = + repository::get_directive_by_orchestrator_contract(pool, contract.id) + .await + .map_err(|e| format!("Failed to get directive by orchestrator: {}", e))?; + + if let Some(directive) = directive { + tracing::info!( + directive_id = %directive.id, + contract_id = %contract.id, + "Directive orchestrator contract completed, processing plan" + ); + process_planning_result(pool, state, &directive, contract.id, owner_id).await?; + } else { + tracing::warn!( + contract_id = %contract.id, + "Directive orchestrator contract completed but no directive found" + ); + } + } else if let Some(directive_id) = contract.directive_id { + // Check if this is a monitoring contract + let monitoring_step = + repository::get_step_by_monitoring_contract_id(pool, contract.id) + .await + .map_err(|e| format!("Failed to check monitoring contract: {}", e))?; + + if let Some(step) = monitoring_step { + tracing::info!( + directive_id = %directive_id, + step_id = %step.id, + contract_id = %contract.id, + "Monitoring contract completed" + ); + process_monitoring_result(pool, state, contract, &step, owner_id).await?; + } else { + // Step contract completed + let step = repository::get_step_by_contract_id(pool, contract.id) + .await + .map_err(|e| format!("Failed to get step by contract: {}", e))?; + + if let Some(step) = step { + let directive = repository::get_directive(pool, directive_id) + .await + .map_err(|e| format!("Failed to get directive: {}", e))? + .ok_or("Directive not found")?; + + tracing::info!( + directive_id = %directive_id, + step_id = %step.id, + contract_id = %contract.id, + "Step contract completed, dispatching monitoring" + ); + + // Step contract completed successfully — dispatch monitoring + repository::update_step_status(pool, step.id, "evaluating") + .await + .map_err(|e| format!("Failed to update step status: {}", e))?; + + let _ = repository::create_directive_event( + pool, + directive.id, + directive.current_chain_id, + Some(step.id), + "step_evaluating", + "info", + None, + "system", + None, + ) + .await; + + dispatch_monitoring(pool, &directive, &step, contract, owner_id).await?; + } + } + } + + Ok(()) +} + pub async fn on_task_completed( pool: &PgPool, state: &SharedState, @@ -257,6 +349,18 @@ async fn on_planning_completed( return Ok(()); }; + process_planning_result(pool, state, directive, contract_id, owner_id).await +} + +/// Core logic: read plan from contract files, create chain and steps, advance. +/// Called from both `on_planning_completed` (task path) and `on_contract_completed` (API path). +async fn process_planning_result( + pool: &PgPool, + state: &SharedState, + directive: &Directive, + contract_id: Uuid, + owner_id: Uuid, +) -> Result<(), String> { // Get contract files to find the chain plan let files = repository::list_files_in_contract(pool, contract_id, owner_id) .await @@ -1031,6 +1135,27 @@ async fn on_monitoring_completed( return Ok(()); } + process_monitoring_result(pool, state, contract, step, owner_id).await +} + +/// Core monitoring logic: read evaluation from files, create record, handle pass/fail/rework. +/// Called from both `on_monitoring_completed` (task path) and `on_contract_completed` (API path). +async fn process_monitoring_result( + pool: &PgPool, + state: &SharedState, + contract: &crate::db::models::Contract, + step: &ChainStep, + owner_id: Uuid, +) -> Result<(), String> { + let Some(directive_id) = contract.directive_id else { + return Ok(()); + }; + + let directive = repository::get_directive(pool, directive_id) + .await + .map_err(|e| format!("Failed to get directive: {}", e))? + .ok_or("Directive not found")?; + // Read evaluation result from monitoring contract files let files = repository::list_files_in_contract(pool, contract.id, owner_id) .await |
