diff options
Diffstat (limited to 'makima/src/orchestration/directive.rs')
| -rw-r--r-- | makima/src/orchestration/directive.rs | 82 |
1 files changed, 82 insertions, 0 deletions
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs index 22279e8..7dbfe65 100644 --- a/makima/src/orchestration/directive.rs +++ b/makima/src/orchestration/directive.rs @@ -2547,6 +2547,88 @@ pub enum GoalEditInterruptResult { Skipped, } +/// Best-effort cancellation of a directive's currently-running orchestrator +/// (planner) task. Used by the goal-update path: when we are about to clear +/// `orchestrator_task_id` from the directive, the still-running task would +/// otherwise keep emitting `add-step` calls based on the OLD goal, racing the +/// freshly-spawned replanner. We send an `InterruptTask` daemon command and +/// mark the task `interrupted` in the DB so monitoring sees a clean state. +/// +/// All errors are logged and translated into `Ok(false)` — the caller's path +/// (clearing orchestrator_task_id, kicking the reconciler) is still safe to +/// proceed. +pub async fn try_cancel_running_planner( + pool: &PgPool, + state: &SharedState, + directive_id: Uuid, + orchestrator_task_id: Uuid, +) -> Result<bool, anyhow::Error> { + let task = match repository::get_task(pool, orchestrator_task_id).await? { + Some(t) => t, + None => return Ok(false), + }; + + let cancellable = matches!( + task.status.as_str(), + "queued" | "pending" | "starting" | "running" + ); + if !cancellable { + return Ok(false); + } + + if let Some(daemon_id) = task.daemon_id { + let command = DaemonCommand::InterruptTask { + task_id: orchestrator_task_id, + graceful: true, + }; + if let Err(e) = state.send_daemon_command(daemon_id, command).await { + tracing::warn!( + directive_id = %directive_id, + task_id = %orchestrator_task_id, + daemon_id = %daemon_id, + error = %e, + "Failed to dispatch InterruptTask to orphaned planner; will mark interrupted in DB" + ); + } else { + tracing::info!( + directive_id = %directive_id, + task_id = %orchestrator_task_id, + daemon_id = %daemon_id, + "Sent InterruptTask to orphaned planner before clearing orchestrator" + ); + } + } else { + tracing::debug!( + directive_id = %directive_id, + task_id = %orchestrator_task_id, + "Orphaned planner has no daemon assignment; skipping daemon command" + ); + } + + // Mark the task interrupted regardless of whether the daemon was reachable + // — this is the source of truth the monitor consults next tick. + let upd = crate::db::models::UpdateTaskRequest { + status: Some("interrupted".to_string()), + version: Some(task.version), + error_message: Some( + "Cancelled because the directive's goal was updated".to_string(), + ), + ..Default::default() + }; + if let Err(e) = + repository::update_task_for_owner(pool, orchestrator_task_id, task.owner_id, upd).await + { + tracing::warn!( + directive_id = %directive_id, + task_id = %orchestrator_task_id, + error = %e, + "Failed to mark orphaned planner interrupted" + ); + } + + Ok(true) +} + /// Attempt to interrupt a directive's currently-running planner with a goal /// edit summary instead of replanning from scratch. /// |
