diff options
Diffstat (limited to 'makima')
| -rw-r--r-- | makima/src/orchestration/directive.rs | 82 | ||||
| -rw-r--r-- | makima/src/server/handlers/directives.rs | 50 | ||||
| -rw-r--r-- | makima/src/server/mod.rs | 17 | ||||
| -rw-r--r-- | makima/src/server/state.rs | 12 |
4 files changed, 149 insertions, 12 deletions
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs index 22279e8..7dbfe65 100644 --- a/makima/src/orchestration/directive.rs +++ b/makima/src/orchestration/directive.rs @@ -2547,6 +2547,88 @@ pub enum GoalEditInterruptResult { Skipped, } +/// Best-effort cancellation of a directive's currently-running orchestrator +/// (planner) task. Used by the goal-update path: when we are about to clear +/// `orchestrator_task_id` from the directive, the still-running task would +/// otherwise keep emitting `add-step` calls based on the OLD goal, racing the +/// freshly-spawned replanner. We send an `InterruptTask` daemon command and +/// mark the task `interrupted` in the DB so monitoring sees a clean state. +/// +/// All errors are logged and translated into `Ok(false)` — the caller's path +/// (clearing orchestrator_task_id, kicking the reconciler) is still safe to +/// proceed. +pub async fn try_cancel_running_planner( + pool: &PgPool, + state: &SharedState, + directive_id: Uuid, + orchestrator_task_id: Uuid, +) -> Result<bool, anyhow::Error> { + let task = match repository::get_task(pool, orchestrator_task_id).await? { + Some(t) => t, + None => return Ok(false), + }; + + let cancellable = matches!( + task.status.as_str(), + "queued" | "pending" | "starting" | "running" + ); + if !cancellable { + return Ok(false); + } + + if let Some(daemon_id) = task.daemon_id { + let command = DaemonCommand::InterruptTask { + task_id: orchestrator_task_id, + graceful: true, + }; + if let Err(e) = state.send_daemon_command(daemon_id, command).await { + tracing::warn!( + directive_id = %directive_id, + task_id = %orchestrator_task_id, + daemon_id = %daemon_id, + error = %e, + "Failed to dispatch InterruptTask to orphaned planner; will mark interrupted in DB" + ); + } else { + tracing::info!( + directive_id = %directive_id, + task_id = %orchestrator_task_id, + daemon_id = %daemon_id, + "Sent InterruptTask to orphaned planner before clearing orchestrator" + ); + } + } else { + tracing::debug!( + directive_id = %directive_id, + task_id = %orchestrator_task_id, + "Orphaned planner has no daemon assignment; skipping daemon command" + ); + } + + // Mark the task interrupted regardless of whether the daemon was reachable + // — this is the source of truth the monitor consults next tick. + let upd = crate::db::models::UpdateTaskRequest { + status: Some("interrupted".to_string()), + version: Some(task.version), + error_message: Some( + "Cancelled because the directive's goal was updated".to_string(), + ), + ..Default::default() + }; + if let Err(e) = + repository::update_task_for_owner(pool, orchestrator_task_id, task.owner_id, upd).await + { + tracing::warn!( + directive_id = %directive_id, + task_id = %orchestrator_task_id, + error = %e, + "Failed to mark orphaned planner interrupted" + ); + } + + Ok(true) +} + /// Attempt to interrupt a directive's currently-running planner with a goal /// edit summary instead of replanning from scratch. /// diff --git a/makima/src/server/handlers/directives.rs b/makima/src/server/handlers/directives.rs index 01c4659..44bf4ac 100644 --- a/makima/src/server/handlers/directives.rs +++ b/makima/src/server/handlers/directives.rs @@ -20,7 +20,8 @@ use crate::db::models::{ use crate::db::repository; use crate::orchestration::directive::{ build_cleanup_prompt, build_order_pickup_prompt, classify_goal_change, - try_interrupt_planner_with_goal_edit, GoalChangeKind, GoalEditInterruptResult, + try_cancel_running_planner, try_interrupt_planner_with_goal_edit, + GoalChangeKind, GoalEditInterruptResult, }; use crate::server::auth::Authenticated; use crate::server::messages::ApiError; @@ -895,6 +896,25 @@ pub async fn update_goal( // SendMessage and adjust in-flight. Otherwise, fall through to the normal // path which clears orchestrator_task_id and lets phase_replanning kick // in on the next tick. + // + // CRITICAL: when going down the "clear" path, we must also CANCEL the + // running planner. Otherwise the orphaned task keeps producing add-step + // calls based on the old goal, racing the freshly-spawned replanner. + if !interrupted { + if let Some(ref current) = current { + if let Some(orch_task_id) = current.orchestrator_task_id { + if let Err(e) = try_cancel_running_planner(pool, &state, id, orch_task_id).await { + tracing::warn!( + directive_id = %id, + task_id = %orch_task_id, + error = %e, + "Failed to cancel orphaned planner — proceeding with clear anyway" + ); + } + } + } + } + let update_result = if interrupted { repository::update_directive_goal_keep_orchestrator(pool, auth.owner_id, id, &req.goal) .await @@ -902,22 +922,32 @@ pub async fn update_goal( repository::update_directive_goal(pool, auth.owner_id, id, &req.goal).await }; - match update_result { + let response = match update_result { Ok(Some(directive)) => Json(directive).into_response(), - Ok(None) => ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Directive not found")), - ) - .into_response(), + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response(); + } Err(e) => { tracing::error!("Failed to update goal: {}", e); - ( + return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("UPDATE_FAILED", &e.to_string())), ) - .into_response() + .into_response(); } - } + }; + + // Nudge the directive reconciler so the user does not wait up to 15s for + // the next interval tick before the new planner is spawned (clear path) or + // the small-edit interrupt is consumed (keep path). Best-effort: if the + // channel is full or closed we just rely on the normal interval. + state.kick_directive_reconciler(); + + response } // ============================================================================= diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index beee0e9..1c1ed6e 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -530,21 +530,34 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> { // Spawn directive orchestrator - automates directive lifecycle let directive_pool = pool.clone(); let directive_state = state.clone(); + let directive_kick = state.directive_kick.clone(); tokio::spawn(async move { let mut orch = crate::orchestration::directive::DirectiveOrchestrator::new( directive_pool, directive_state, ); let mut interval = tokio::time::interval(std::time::Duration::from_secs(15)); + // Skip the immediate-fire that `interval` does on creation so the + // first tick still waits one period after startup. + interval.tick().await; loop { - interval.tick().await; + tokio::select! { + _ = interval.tick() => {} + _ = directive_kick.notified() => { + // A handler nudged us — run a tick right away, but + // keep the interval running so we still poll on the + // regular cadence in addition to the kick. + } + } if let Err(e) = orch.tick().await { tracing::warn!(error = %e, "Directive orchestrator tick failed"); } } }); - tracing::info!("Directive orchestrator started (interval: 15s)"); + tracing::info!( + "Directive orchestrator started (interval: 15s, also kicked on goal updates)" + ); } let app = make_router(state); diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs index 1f7b264..6bd9e2b 100644 --- a/makima/src/server/state.rs +++ b/makima/src/server/state.rs @@ -683,6 +683,11 @@ pub struct AppState { pub tts_engine: OnceCell<Box<dyn TtsEngine>>, /// Daemon reauth status storage (keyed by (daemon_id, request_id)) pub daemon_reauth_status: DashMap<(Uuid, Uuid), DaemonReauthStatus>, + /// Signal used to nudge the directive reconciler to run a tick immediately + /// (e.g. after a goal update) rather than waiting up to 15s for the next + /// interval. The reconciler loop in `server::mod` awaits `notified()` in + /// parallel with its interval; handlers call `kick_directive_reconciler()`. + pub directive_kick: std::sync::Arc<tokio::sync::Notify>, } impl AppState { @@ -765,9 +770,16 @@ impl AppState { pending_worktree_commit: DashMap::new(), tts_engine: OnceCell::new(), daemon_reauth_status: DashMap::new(), + directive_kick: std::sync::Arc::new(tokio::sync::Notify::new()), } } + /// Wake the directive reconciler so it ticks now instead of waiting for + /// the next 15-second interval. Cheap and safe to call from any handler. + pub fn kick_directive_reconciler(&self) { + self.directive_kick.notify_one(); + } + /// Get or initialize the TTS engine (lazy loading). /// /// The TTS engine is loaded on first Speak connection using the Chatterbox backend. |
