summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-02-07 21:53:59 +0000
committersoryu <soryu@soryu.co>2026-02-07 21:53:59 +0000
commit89ccd1375c2eaddd1f2b74a42b2d152c83b346e8 (patch)
tree2b33025aaddef7d2a872af22cfea815344918e9e
parent9762ee464419042b817ff58ea8ec4d9678dc0fb4 (diff)
downloadsoryu-89ccd1375c2eaddd1f2b74a42b2d152c83b346e8.tar.gz
soryu-89ccd1375c2eaddd1f2b74a42b2d152c83b346e8.zip
Check on completion for contracts
-rw-r--r--makima/src/orchestration/directive.rs125
-rw-r--r--makima/src/server/handlers/contracts.rs19
2 files changed, 143 insertions, 1 deletions
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs
index e779c18..4b75b4a 100644
--- a/makima/src/orchestration/directive.rs
+++ b/makima/src/orchestration/directive.rs
@@ -181,6 +181,98 @@ pub async fn init_directive(
}
/// Called when any task completes — checks if it's directive-related and advances.
+/// Called when a contract's status is updated to "completed" via the API.
+/// This is the primary entry point for directive orchestration because supervisor
+/// tasks do not send TaskComplete messages — they complete via contract status updates.
+pub async fn on_contract_completed(
+ pool: &PgPool,
+ state: &SharedState,
+ contract: &crate::db::models::Contract,
+ owner_id: Uuid,
+) -> Result<(), String> {
+ if contract.status != "completed" {
+ return Ok(());
+ }
+
+ if contract.is_directive_orchestrator {
+ let directive =
+ repository::get_directive_by_orchestrator_contract(pool, contract.id)
+ .await
+ .map_err(|e| format!("Failed to get directive by orchestrator: {}", e))?;
+
+ if let Some(directive) = directive {
+ tracing::info!(
+ directive_id = %directive.id,
+ contract_id = %contract.id,
+ "Directive orchestrator contract completed, processing plan"
+ );
+ process_planning_result(pool, state, &directive, contract.id, owner_id).await?;
+ } else {
+ tracing::warn!(
+ contract_id = %contract.id,
+ "Directive orchestrator contract completed but no directive found"
+ );
+ }
+ } else if let Some(directive_id) = contract.directive_id {
+ // Check if this is a monitoring contract
+ let monitoring_step =
+ repository::get_step_by_monitoring_contract_id(pool, contract.id)
+ .await
+ .map_err(|e| format!("Failed to check monitoring contract: {}", e))?;
+
+ if let Some(step) = monitoring_step {
+ tracing::info!(
+ directive_id = %directive_id,
+ step_id = %step.id,
+ contract_id = %contract.id,
+ "Monitoring contract completed"
+ );
+ process_monitoring_result(pool, state, contract, &step, owner_id).await?;
+ } else {
+ // Step contract completed
+ let step = repository::get_step_by_contract_id(pool, contract.id)
+ .await
+ .map_err(|e| format!("Failed to get step by contract: {}", e))?;
+
+ if let Some(step) = step {
+ let directive = repository::get_directive(pool, directive_id)
+ .await
+ .map_err(|e| format!("Failed to get directive: {}", e))?
+ .ok_or("Directive not found")?;
+
+ tracing::info!(
+ directive_id = %directive_id,
+ step_id = %step.id,
+ contract_id = %contract.id,
+ "Step contract completed, dispatching monitoring"
+ );
+
+ // Step contract completed successfully — dispatch monitoring
+ repository::update_step_status(pool, step.id, "evaluating")
+ .await
+ .map_err(|e| format!("Failed to update step status: {}", e))?;
+
+ let _ = repository::create_directive_event(
+ pool,
+ directive.id,
+ directive.current_chain_id,
+ Some(step.id),
+ "step_evaluating",
+ "info",
+ None,
+ "system",
+ None,
+ )
+ .await;
+
+ dispatch_monitoring(pool, &directive, &step, contract, owner_id).await?;
+ }
+ }
+ }
+
+ Ok(())
+}
+
pub async fn on_task_completed(
pool: &PgPool,
state: &SharedState,
@@ -257,6 +349,18 @@ async fn on_planning_completed(
return Ok(());
};
+ process_planning_result(pool, state, directive, contract_id, owner_id).await
+}
+
+/// Core logic: read plan from contract files, create chain and steps, advance.
+/// Called from both `on_planning_completed` (task path) and `on_contract_completed` (API path).
+async fn process_planning_result(
+ pool: &PgPool,
+ state: &SharedState,
+ directive: &Directive,
+ contract_id: Uuid,
+ owner_id: Uuid,
+) -> Result<(), String> {
// Get contract files to find the chain plan
let files = repository::list_files_in_contract(pool, contract_id, owner_id)
.await
@@ -1031,6 +1135,27 @@ async fn on_monitoring_completed(
return Ok(());
}
+ process_monitoring_result(pool, state, contract, step, owner_id).await
+}
+
+/// Core monitoring logic: read evaluation from files, create record, handle pass/fail/rework.
+/// Called from both `on_monitoring_completed` (task path) and `on_contract_completed` (API path).
+async fn process_monitoring_result(
+ pool: &PgPool,
+ state: &SharedState,
+ contract: &crate::db::models::Contract,
+ step: &ChainStep,
+ owner_id: Uuid,
+) -> Result<(), String> {
+ let Some(directive_id) = contract.directive_id else {
+ return Ok(());
+ };
+
+ let directive = repository::get_directive(pool, directive_id)
+ .await
+ .map_err(|e| format!("Failed to get directive: {}", e))?
+ .ok_or("Directive not found")?;
+
// Read evaluation result from monitoring contract files
let files = repository::list_files_in_contract(pool, contract.id, owner_id)
.await
diff --git a/makima/src/server/handlers/contracts.rs b/makima/src/server/handlers/contracts.rs
index a83c72d..ad0a1ff 100644
--- a/makima/src/server/handlers/contracts.rs
+++ b/makima/src/server/handlers/contracts.rs
@@ -575,7 +575,24 @@ pub async fn update_contract(
}),
).await;
- // TODO: Directive engine integration (removed for reimplementation)
+ // Directive engine integration — process planning/step/monitoring completion
+ if contract.is_directive_orchestrator || contract.directive_id.is_some() {
+ let pool_clone = pool.clone();
+ let state_clone = state.clone();
+ let contract_clone = contract.clone();
+ let owner = auth.owner_id;
+ tokio::spawn(async move {
+ if let Err(e) = crate::orchestration::directive::on_contract_completed(
+ &pool_clone, &state_clone, &contract_clone, owner,
+ ).await {
+ tracing::warn!(
+ contract_id = %contract_clone.id,
+ error = %e,
+ "Failed to process directive contract completion"
+ );
+ }
+ });
+ }
}
// Get summary with counts