summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--makima/src/orchestration/directive.rs82
-rw-r--r--makima/src/server/handlers/directives.rs50
-rw-r--r--makima/src/server/mod.rs17
-rw-r--r--makima/src/server/state.rs12
4 files changed, 149 insertions, 12 deletions
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs
index 22279e8..7dbfe65 100644
--- a/makima/src/orchestration/directive.rs
+++ b/makima/src/orchestration/directive.rs
@@ -2547,6 +2547,88 @@ pub enum GoalEditInterruptResult {
Skipped,
}
+/// Best-effort cancellation of a directive's currently-running orchestrator
+/// (planner) task. Used by the goal-update path: when we are about to clear
+/// `orchestrator_task_id` from the directive, the still-running task would
+/// otherwise keep emitting `add-step` calls based on the OLD goal, racing the
+/// freshly-spawned replanner. We send an `InterruptTask` daemon command and
+/// mark the task `interrupted` in the DB so monitoring sees a clean state.
+///
+/// All errors are logged and translated into `Ok(false)` — the caller's path
+/// (clearing orchestrator_task_id, kicking the reconciler) is still safe to
+/// proceed.
+pub async fn try_cancel_running_planner(
+ pool: &PgPool,
+ state: &SharedState,
+ directive_id: Uuid,
+ orchestrator_task_id: Uuid,
+) -> Result<bool, anyhow::Error> {
+ let task = match repository::get_task(pool, orchestrator_task_id).await? {
+ Some(t) => t,
+ None => return Ok(false),
+ };
+
+ let cancellable = matches!(
+ task.status.as_str(),
+ "queued" | "pending" | "starting" | "running"
+ );
+ if !cancellable {
+ return Ok(false);
+ }
+
+ if let Some(daemon_id) = task.daemon_id {
+ let command = DaemonCommand::InterruptTask {
+ task_id: orchestrator_task_id,
+ graceful: true,
+ };
+ if let Err(e) = state.send_daemon_command(daemon_id, command).await {
+ tracing::warn!(
+ directive_id = %directive_id,
+ task_id = %orchestrator_task_id,
+ daemon_id = %daemon_id,
+ error = %e,
+ "Failed to dispatch InterruptTask to orphaned planner; will mark interrupted in DB"
+ );
+ } else {
+ tracing::info!(
+ directive_id = %directive_id,
+ task_id = %orchestrator_task_id,
+ daemon_id = %daemon_id,
+ "Sent InterruptTask to orphaned planner before clearing orchestrator"
+ );
+ }
+ } else {
+ tracing::debug!(
+ directive_id = %directive_id,
+ task_id = %orchestrator_task_id,
+ "Orphaned planner has no daemon assignment; skipping daemon command"
+ );
+ }
+
+ // Mark the task interrupted regardless of whether the daemon was reachable
+ // — this is the source of truth the monitor consults next tick.
+ let upd = crate::db::models::UpdateTaskRequest {
+ status: Some("interrupted".to_string()),
+ version: Some(task.version),
+ error_message: Some(
+ "Cancelled because the directive's goal was updated".to_string(),
+ ),
+ ..Default::default()
+ };
+ if let Err(e) =
+ repository::update_task_for_owner(pool, orchestrator_task_id, task.owner_id, upd).await
+ {
+ tracing::warn!(
+ directive_id = %directive_id,
+ task_id = %orchestrator_task_id,
+ error = %e,
+ "Failed to mark orphaned planner interrupted"
+ );
+ }
+
+ Ok(true)
+}
+
/// Attempt to interrupt a directive's currently-running planner with a goal
/// edit summary instead of replanning from scratch.
///
diff --git a/makima/src/server/handlers/directives.rs b/makima/src/server/handlers/directives.rs
index 01c4659..44bf4ac 100644
--- a/makima/src/server/handlers/directives.rs
+++ b/makima/src/server/handlers/directives.rs
@@ -20,7 +20,8 @@ use crate::db::models::{
use crate::db::repository;
use crate::orchestration::directive::{
build_cleanup_prompt, build_order_pickup_prompt, classify_goal_change,
- try_interrupt_planner_with_goal_edit, GoalChangeKind, GoalEditInterruptResult,
+ try_cancel_running_planner, try_interrupt_planner_with_goal_edit,
+ GoalChangeKind, GoalEditInterruptResult,
};
use crate::server::auth::Authenticated;
use crate::server::messages::ApiError;
@@ -895,6 +896,25 @@ pub async fn update_goal(
// SendMessage and adjust in-flight. Otherwise, fall through to the normal
// path which clears orchestrator_task_id and lets phase_replanning kick
// in on the next tick.
+ //
+ // CRITICAL: when going down the "clear" path, we must also CANCEL the
+ // running planner. Otherwise the orphaned task keeps producing add-step
+ // calls based on the old goal, racing the freshly-spawned replanner.
+ if !interrupted {
+ if let Some(ref current) = current {
+ if let Some(orch_task_id) = current.orchestrator_task_id {
+ if let Err(e) = try_cancel_running_planner(pool, &state, id, orch_task_id).await {
+ tracing::warn!(
+ directive_id = %id,
+ task_id = %orch_task_id,
+ error = %e,
+ "Failed to cancel orphaned planner — proceeding with clear anyway"
+ );
+ }
+ }
+ }
+ }
+
let update_result = if interrupted {
repository::update_directive_goal_keep_orchestrator(pool, auth.owner_id, id, &req.goal)
.await
@@ -902,22 +922,32 @@ pub async fn update_goal(
repository::update_directive_goal(pool, auth.owner_id, id, &req.goal).await
};
- match update_result {
+ let response = match update_result {
Ok(Some(directive)) => Json(directive).into_response(),
- Ok(None) => (
- StatusCode::NOT_FOUND,
- Json(ApiError::new("NOT_FOUND", "Directive not found")),
- )
- .into_response(),
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Directive not found")),
+ )
+ .into_response();
+ }
Err(e) => {
tracing::error!("Failed to update goal: {}", e);
- (
+ return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("UPDATE_FAILED", &e.to_string())),
)
- .into_response()
+ .into_response();
}
- }
+ };
+
+ // Nudge the directive reconciler so the user does not wait up to 15s for
+ // the next interval tick before the new planner is spawned (clear path) or
+ // the small-edit interrupt is consumed (keep path). Best-effort: if the
+ // channel is full or closed we just rely on the normal interval.
+ state.kick_directive_reconciler();
+
+ response
}
// =============================================================================
diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs
index beee0e9..1c1ed6e 100644
--- a/makima/src/server/mod.rs
+++ b/makima/src/server/mod.rs
@@ -530,21 +530,34 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> {
// Spawn directive orchestrator - automates directive lifecycle
let directive_pool = pool.clone();
let directive_state = state.clone();
+ let directive_kick = state.directive_kick.clone();
tokio::spawn(async move {
let mut orch = crate::orchestration::directive::DirectiveOrchestrator::new(
directive_pool,
directive_state,
);
let mut interval = tokio::time::interval(std::time::Duration::from_secs(15));
+ // Skip the immediate-fire that `interval` does on creation so the
+ // first tick still waits one period after startup.
+ interval.tick().await;
loop {
- interval.tick().await;
+ tokio::select! {
+ _ = interval.tick() => {}
+ _ = directive_kick.notified() => {
+ // A handler nudged us — run a tick right away, but
+ // keep the interval running so we still poll on the
+ // regular cadence in addition to the kick.
+ }
+ }
if let Err(e) = orch.tick().await {
tracing::warn!(error = %e, "Directive orchestrator tick failed");
}
}
});
- tracing::info!("Directive orchestrator started (interval: 15s)");
+ tracing::info!(
+ "Directive orchestrator started (interval: 15s, also kicked on goal updates)"
+ );
}
let app = make_router(state);
diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs
index 1f7b264..6bd9e2b 100644
--- a/makima/src/server/state.rs
+++ b/makima/src/server/state.rs
@@ -683,6 +683,11 @@ pub struct AppState {
pub tts_engine: OnceCell<Box<dyn TtsEngine>>,
/// Daemon reauth status storage (keyed by (daemon_id, request_id))
pub daemon_reauth_status: DashMap<(Uuid, Uuid), DaemonReauthStatus>,
+ /// Signal used to nudge the directive reconciler to run a tick immediately
+ /// (e.g. after a goal update) rather than waiting up to 15s for the next
+ /// interval. The reconciler loop in `server::mod` awaits `notified()` in
+ /// parallel with its interval; handlers call `kick_directive_reconciler()`.
+ pub directive_kick: std::sync::Arc<tokio::sync::Notify>,
}
impl AppState {
@@ -765,9 +770,16 @@ impl AppState {
pending_worktree_commit: DashMap::new(),
tts_engine: OnceCell::new(),
daemon_reauth_status: DashMap::new(),
+ directive_kick: std::sync::Arc::new(tokio::sync::Notify::new()),
}
}
+ /// Wake the directive reconciler so it ticks now instead of waiting for
+ /// the next 15-second interval. Cheap and safe to call from any handler.
+ pub fn kick_directive_reconciler(&self) {
+ self.directive_kick.notify_one();
+ }
+
/// Get or initialize the TTS engine (lazy loading).
///
/// The TTS engine is loaded on first Speak connection using the Chatterbox backend.