diff options
Diffstat (limited to 'makima/src/server')
| -rw-r--r-- | makima/src/server/handlers/directives.rs | 50 | ||||
| -rw-r--r-- | makima/src/server/mod.rs | 17 | ||||
| -rw-r--r-- | makima/src/server/state.rs | 12 |
3 files changed, 67 insertions, 12 deletions
diff --git a/makima/src/server/handlers/directives.rs b/makima/src/server/handlers/directives.rs index 01c4659..44bf4ac 100644 --- a/makima/src/server/handlers/directives.rs +++ b/makima/src/server/handlers/directives.rs @@ -20,7 +20,8 @@ use crate::db::models::{ use crate::db::repository; use crate::orchestration::directive::{ build_cleanup_prompt, build_order_pickup_prompt, classify_goal_change, - try_interrupt_planner_with_goal_edit, GoalChangeKind, GoalEditInterruptResult, + try_cancel_running_planner, try_interrupt_planner_with_goal_edit, + GoalChangeKind, GoalEditInterruptResult, }; use crate::server::auth::Authenticated; use crate::server::messages::ApiError; @@ -895,6 +896,25 @@ pub async fn update_goal( // SendMessage and adjust in-flight. Otherwise, fall through to the normal // path which clears orchestrator_task_id and lets phase_replanning kick // in on the next tick. + // + // CRITICAL: when going down the "clear" path, we must also CANCEL the + // running planner. Otherwise the orphaned task keeps producing add-step + // calls based on the old goal, racing the freshly-spawned replanner. + if !interrupted { + if let Some(ref current) = current { + if let Some(orch_task_id) = current.orchestrator_task_id { + if let Err(e) = try_cancel_running_planner(pool, &state, id, orch_task_id).await { + tracing::warn!( + directive_id = %id, + task_id = %orch_task_id, + error = %e, + "Failed to cancel orphaned planner — proceeding with clear anyway" + ); + } + } + } + } + let update_result = if interrupted { repository::update_directive_goal_keep_orchestrator(pool, auth.owner_id, id, &req.goal) .await @@ -902,22 +922,32 @@ pub async fn update_goal( repository::update_directive_goal(pool, auth.owner_id, id, &req.goal).await }; - match update_result { + let response = match update_result { Ok(Some(directive)) => Json(directive).into_response(), - Ok(None) => ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Directive not found")), - ) - .into_response(), + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response(); + } Err(e) => { tracing::error!("Failed to update goal: {}", e); - ( + return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("UPDATE_FAILED", &e.to_string())), ) - .into_response() + .into_response(); } - } + }; + + // Nudge the directive reconciler so the user does not wait up to 15s for + // the next interval tick before the new planner is spawned (clear path) or + // the small-edit interrupt is consumed (keep path). Best-effort: if the + // channel is full or closed we just rely on the normal interval. + state.kick_directive_reconciler(); + + response } // ============================================================================= diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index beee0e9..1c1ed6e 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -530,21 +530,34 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> { // Spawn directive orchestrator - automates directive lifecycle let directive_pool = pool.clone(); let directive_state = state.clone(); + let directive_kick = state.directive_kick.clone(); tokio::spawn(async move { let mut orch = crate::orchestration::directive::DirectiveOrchestrator::new( directive_pool, directive_state, ); let mut interval = tokio::time::interval(std::time::Duration::from_secs(15)); + // Skip the immediate-fire that `interval` does on creation so the + // first tick still waits one period after startup. + interval.tick().await; loop { - interval.tick().await; + tokio::select! { + _ = interval.tick() => {} + _ = directive_kick.notified() => { + // A handler nudged us — run a tick right away, but + // keep the interval running so we still poll on the + // regular cadence in addition to the kick. + } + } if let Err(e) = orch.tick().await { tracing::warn!(error = %e, "Directive orchestrator tick failed"); } } }); - tracing::info!("Directive orchestrator started (interval: 15s)"); + tracing::info!( + "Directive orchestrator started (interval: 15s, also kicked on goal updates)" + ); } let app = make_router(state); diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs index 1f7b264..6bd9e2b 100644 --- a/makima/src/server/state.rs +++ b/makima/src/server/state.rs @@ -683,6 +683,11 @@ pub struct AppState { pub tts_engine: OnceCell<Box<dyn TtsEngine>>, /// Daemon reauth status storage (keyed by (daemon_id, request_id)) pub daemon_reauth_status: DashMap<(Uuid, Uuid), DaemonReauthStatus>, + /// Signal used to nudge the directive reconciler to run a tick immediately + /// (e.g. after a goal update) rather than waiting up to 15s for the next + /// interval. The reconciler loop in `server::mod` awaits `notified()` in + /// parallel with its interval; handlers call `kick_directive_reconciler()`. + pub directive_kick: std::sync::Arc<tokio::sync::Notify>, } impl AppState { @@ -765,9 +770,16 @@ impl AppState { pending_worktree_commit: DashMap::new(), tts_engine: OnceCell::new(), daemon_reauth_status: DashMap::new(), + directive_kick: std::sync::Arc::new(tokio::sync::Notify::new()), } } + /// Wake the directive reconciler so it ticks now instead of waiting for + /// the next 15-second interval. Cheap and safe to call from any handler. + pub fn kick_directive_reconciler(&self) { + self.directive_kick.notify_one(); + } + /// Get or initialize the TTS engine (lazy loading). /// /// The TTS engine is loaded on first Speak connection using the Chatterbox backend. |
