summaryrefslogtreecommitdiff
path: root/makima/src/orchestration/directive.rs
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/orchestration/directive.rs')
-rw-r--r--makima/src/orchestration/directive.rs41
1 files changed, 34 insertions, 7 deletions
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs
index 22003af..a95e63f 100644
--- a/makima/src/orchestration/directive.rs
+++ b/makima/src/orchestration/directive.rs
@@ -67,7 +67,9 @@ impl DirectiveOrchestrator {
}
/// Phase 2: Ready steps with no task → create execution task and dispatch.
+ /// Also retries pending directive tasks that weren't dispatched previously.
async fn phase_execution(&self) -> Result<(), anyhow::Error> {
+ // Create tasks for ready steps
let steps = repository::get_ready_steps_for_dispatch(&self.pool).await?;
for step in steps {
@@ -118,6 +120,21 @@ impl DirectiveOrchestrator {
}
}
}
+
+ // Retry pending directive tasks that weren't dispatched
+ let pending = repository::get_pending_directive_tasks(&self.pool).await?;
+ for task in pending {
+ if self
+ .try_dispatch_task(task.id, task.owner_id, &task.name, &task.plan, task.version)
+ .await
+ {
+ // Task dispatched — mark its step as running if it has one
+ if let Some(step_id) = task.directive_step_id {
+ let _ = repository::set_step_running(&self.pool, step_id).await;
+ }
+ }
+ }
+
Ok(())
}
@@ -288,7 +305,8 @@ impl DirectiveOrchestrator {
Ok(())
}
- /// Spawn an execution task for a step and assign it.
+ /// Spawn an execution task for a step.
+ /// Links the task to the step but only marks the step as 'running' once dispatched.
async fn spawn_step_task(
&self,
step_id: Uuid,
@@ -325,15 +343,21 @@ impl DirectiveOrchestrator {
let task = repository::create_task_for_owner(&self.pool, owner_id, req).await?;
- repository::assign_task_to_step(&self.pool, step_id, task.id).await?;
+ // Link the task to the step (sets task_id) but keep step as 'ready' for now
+ repository::link_task_to_step(&self.pool, step_id, task.id).await?;
- // Try to dispatch to a daemon
- self.try_dispatch_task(task.id, owner_id, &task.name, &task.plan, task.version).await;
+ // Only mark step as 'running' if we can actually dispatch the task
+ if self
+ .try_dispatch_task(task.id, owner_id, &task.name, &task.plan, task.version)
+ .await
+ {
+ repository::set_step_running(&self.pool, step_id).await?;
+ }
Ok(())
}
- /// Try to dispatch a task to an available daemon. If none available, leave pending.
+ /// Try to dispatch a task to an available daemon. Returns true if dispatched.
async fn try_dispatch_task(
&self,
task_id: Uuid,
@@ -341,13 +365,13 @@ impl DirectiveOrchestrator {
task_name: &str,
plan: &str,
version: i32,
- ) {
+ ) -> bool {
let Some(daemon_id) = self.state.find_alternative_daemon(owner_id, &[]) else {
tracing::info!(
task_id = %task_id,
"No daemon available for directive task — leaving pending for retry"
);
- return;
+ return false;
};
// Update task status to starting and assign daemon
@@ -393,12 +417,14 @@ impl DirectiveOrchestrator {
error = %e,
"Failed to send SpawnTask to daemon for directive task"
);
+ return false;
} else {
tracing::info!(
task_id = %task_id,
daemon_id = %daemon_id,
"Dispatched directive task to daemon"
);
+ return true;
}
}
Ok(None) => {
@@ -408,6 +434,7 @@ impl DirectiveOrchestrator {
tracing::warn!(task_id = %task_id, error = %e, "Failed to update task for dispatch");
}
}
+ false
}
}