From c3e97bbcc32bd18d9344dd44cc54dfcdce32100b Mon Sep 17 00:00:00 2001 From: soryu Date: Thu, 30 Apr 2026 10:43:31 +0100 Subject: fix(directive): cancel orphaned planner and kick reconciler on goal update (#104) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Resolves the user-visible bug where editing a directive's goal mid-flight shows "saved" but does not actually replan: the running planner kept emitting add-step calls based on the OLD goal while a fresh planner was supposed to take over, and the user had to wait up to 15s for the next reconciler tick before any replanning even started. ## What was happening PUT /api/v1/directives/{id}/goal already had two paths: - Small change + planner running → SendMessage interrupt + KEEP orchestrator. - Everything else → clear orchestrator_task_id and let phase_replanning spawn a new planner on the next 15s tick. The "everything else" path cleared the directive's pointer to the planner task but never cancelled the task itself. The task kept executing and could race the new planner by adding more steps from the stale plan. Worse, those new steps could push MAX(steps.created_at) past the just-bumped goal_updated_at, suppressing phase_replanning entirely. ## Fix 1. New helper `try_cancel_running_planner()` (orchestration/directive.rs): sends `InterruptTask { graceful: true }` to the daemon owning the orchestrator task and marks the task `interrupted` in the DB. All errors are logged and swallowed so the goal update still completes. 2. update_goal handler calls the helper whenever it is about to take the "clear orchestrator_task_id" branch, so the orphaned planner stops producing stale-plan steps before its DB linkage is cut. 3. New `AppState::directive_kick` (tokio::sync::Notify) lets the handler signal the reconciler to run a tick immediately. The reconciler loop in server/mod.rs now selects between its 15s interval and the notify, so the user no longer waits up to 15s after editing a goal before replanning actually starts. update_goal calls `kick_directive_reconciler()` after the goal is persisted (both paths). ## Why not also loosen `get_directives_needing_replanning` The query already covers the common cases once the orphan-cancel lands — without a still-running orphan adding fresh steps, goal_updated_at reliably exceeds MAX(steps.created_at) after a goal edit. Loosening the predicate risked spurious replans for directives that legitimately have no steps yet (those are handled by `phase_planning`). Co-authored-by: Claude Opus 4.7 (1M context) --- makima/src/orchestration/directive.rs | 82 ++++++++++++++++++++++++++++++++ makima/src/server/handlers/directives.rs | 50 +++++++++++++++---- makima/src/server/mod.rs | 17 ++++++- makima/src/server/state.rs | 12 +++++ 4 files changed, 149 insertions(+), 12 deletions(-) diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs index 22279e8..7dbfe65 100644 --- a/makima/src/orchestration/directive.rs +++ b/makima/src/orchestration/directive.rs @@ -2547,6 +2547,88 @@ pub enum GoalEditInterruptResult { Skipped, } +/// Best-effort cancellation of a directive's currently-running orchestrator +/// (planner) task. Used by the goal-update path: when we are about to clear +/// `orchestrator_task_id` from the directive, the still-running task would +/// otherwise keep emitting `add-step` calls based on the OLD goal, racing the +/// freshly-spawned replanner. We send an `InterruptTask` daemon command and +/// mark the task `interrupted` in the DB so monitoring sees a clean state. +/// +/// All errors are logged and translated into `Ok(false)` — the caller's path +/// (clearing orchestrator_task_id, kicking the reconciler) is still safe to +/// proceed. +pub async fn try_cancel_running_planner( + pool: &PgPool, + state: &SharedState, + directive_id: Uuid, + orchestrator_task_id: Uuid, +) -> Result { + let task = match repository::get_task(pool, orchestrator_task_id).await? { + Some(t) => t, + None => return Ok(false), + }; + + let cancellable = matches!( + task.status.as_str(), + "queued" | "pending" | "starting" | "running" + ); + if !cancellable { + return Ok(false); + } + + if let Some(daemon_id) = task.daemon_id { + let command = DaemonCommand::InterruptTask { + task_id: orchestrator_task_id, + graceful: true, + }; + if let Err(e) = state.send_daemon_command(daemon_id, command).await { + tracing::warn!( + directive_id = %directive_id, + task_id = %orchestrator_task_id, + daemon_id = %daemon_id, + error = %e, + "Failed to dispatch InterruptTask to orphaned planner; will mark interrupted in DB" + ); + } else { + tracing::info!( + directive_id = %directive_id, + task_id = %orchestrator_task_id, + daemon_id = %daemon_id, + "Sent InterruptTask to orphaned planner before clearing orchestrator" + ); + } + } else { + tracing::debug!( + directive_id = %directive_id, + task_id = %orchestrator_task_id, + "Orphaned planner has no daemon assignment; skipping daemon command" + ); + } + + // Mark the task interrupted regardless of whether the daemon was reachable + // — this is the source of truth the monitor consults next tick. + let upd = crate::db::models::UpdateTaskRequest { + status: Some("interrupted".to_string()), + version: Some(task.version), + error_message: Some( + "Cancelled because the directive's goal was updated".to_string(), + ), + ..Default::default() + }; + if let Err(e) = + repository::update_task_for_owner(pool, orchestrator_task_id, task.owner_id, upd).await + { + tracing::warn!( + directive_id = %directive_id, + task_id = %orchestrator_task_id, + error = %e, + "Failed to mark orphaned planner interrupted" + ); + } + + Ok(true) +} + /// Attempt to interrupt a directive's currently-running planner with a goal /// edit summary instead of replanning from scratch. /// diff --git a/makima/src/server/handlers/directives.rs b/makima/src/server/handlers/directives.rs index 01c4659..44bf4ac 100644 --- a/makima/src/server/handlers/directives.rs +++ b/makima/src/server/handlers/directives.rs @@ -20,7 +20,8 @@ use crate::db::models::{ use crate::db::repository; use crate::orchestration::directive::{ build_cleanup_prompt, build_order_pickup_prompt, classify_goal_change, - try_interrupt_planner_with_goal_edit, GoalChangeKind, GoalEditInterruptResult, + try_cancel_running_planner, try_interrupt_planner_with_goal_edit, + GoalChangeKind, GoalEditInterruptResult, }; use crate::server::auth::Authenticated; use crate::server::messages::ApiError; @@ -895,6 +896,25 @@ pub async fn update_goal( // SendMessage and adjust in-flight. Otherwise, fall through to the normal // path which clears orchestrator_task_id and lets phase_replanning kick // in on the next tick. + // + // CRITICAL: when going down the "clear" path, we must also CANCEL the + // running planner. Otherwise the orphaned task keeps producing add-step + // calls based on the old goal, racing the freshly-spawned replanner. + if !interrupted { + if let Some(ref current) = current { + if let Some(orch_task_id) = current.orchestrator_task_id { + if let Err(e) = try_cancel_running_planner(pool, &state, id, orch_task_id).await { + tracing::warn!( + directive_id = %id, + task_id = %orch_task_id, + error = %e, + "Failed to cancel orphaned planner — proceeding with clear anyway" + ); + } + } + } + } + let update_result = if interrupted { repository::update_directive_goal_keep_orchestrator(pool, auth.owner_id, id, &req.goal) .await @@ -902,22 +922,32 @@ pub async fn update_goal( repository::update_directive_goal(pool, auth.owner_id, id, &req.goal).await }; - match update_result { + let response = match update_result { Ok(Some(directive)) => Json(directive).into_response(), - Ok(None) => ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Directive not found")), - ) - .into_response(), + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response(); + } Err(e) => { tracing::error!("Failed to update goal: {}", e); - ( + return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("UPDATE_FAILED", &e.to_string())), ) - .into_response() + .into_response(); } - } + }; + + // Nudge the directive reconciler so the user does not wait up to 15s for + // the next interval tick before the new planner is spawned (clear path) or + // the small-edit interrupt is consumed (keep path). Best-effort: if the + // channel is full or closed we just rely on the normal interval. + state.kick_directive_reconciler(); + + response } // ============================================================================= diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index beee0e9..1c1ed6e 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -530,21 +530,34 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> { // Spawn directive orchestrator - automates directive lifecycle let directive_pool = pool.clone(); let directive_state = state.clone(); + let directive_kick = state.directive_kick.clone(); tokio::spawn(async move { let mut orch = crate::orchestration::directive::DirectiveOrchestrator::new( directive_pool, directive_state, ); let mut interval = tokio::time::interval(std::time::Duration::from_secs(15)); + // Skip the immediate-fire that `interval` does on creation so the + // first tick still waits one period after startup. + interval.tick().await; loop { - interval.tick().await; + tokio::select! { + _ = interval.tick() => {} + _ = directive_kick.notified() => { + // A handler nudged us — run a tick right away, but + // keep the interval running so we still poll on the + // regular cadence in addition to the kick. + } + } if let Err(e) = orch.tick().await { tracing::warn!(error = %e, "Directive orchestrator tick failed"); } } }); - tracing::info!("Directive orchestrator started (interval: 15s)"); + tracing::info!( + "Directive orchestrator started (interval: 15s, also kicked on goal updates)" + ); } let app = make_router(state); diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs index 1f7b264..6bd9e2b 100644 --- a/makima/src/server/state.rs +++ b/makima/src/server/state.rs @@ -683,6 +683,11 @@ pub struct AppState { pub tts_engine: OnceCell>, /// Daemon reauth status storage (keyed by (daemon_id, request_id)) pub daemon_reauth_status: DashMap<(Uuid, Uuid), DaemonReauthStatus>, + /// Signal used to nudge the directive reconciler to run a tick immediately + /// (e.g. after a goal update) rather than waiting up to 15s for the next + /// interval. The reconciler loop in `server::mod` awaits `notified()` in + /// parallel with its interval; handlers call `kick_directive_reconciler()`. + pub directive_kick: std::sync::Arc, } impl AppState { @@ -765,9 +770,16 @@ impl AppState { pending_worktree_commit: DashMap::new(), tts_engine: OnceCell::new(), daemon_reauth_status: DashMap::new(), + directive_kick: std::sync::Arc::new(tokio::sync::Notify::new()), } } + /// Wake the directive reconciler so it ticks now instead of waiting for + /// the next 15-second interval. Cheap and safe to call from any handler. + pub fn kick_directive_reconciler(&self) { + self.directive_kick.notify_one(); + } + /// Get or initialize the TTS engine (lazy loading). /// /// The TTS engine is loaded on first Speak connection using the Chatterbox backend. -- cgit v1.2.3