diff options
| author | soryu <soryu@soryu.co> | 2026-02-09 14:39:36 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-02-09 14:39:36 +0000 |
| commit | bfa7bd8d7609397f570f1cd9b83d2269abc0ed63 (patch) | |
| tree | 71e4e3decb5b07550427472079dfddffcc5c3753 /makima/src/orchestration | |
| parent | a2646a828febbdac798a206655a15eae7e463bca (diff) | |
| download | soryu-bfa7bd8d7609397f570f1cd9b83d2269abc0ed63.tar.gz soryu-bfa7bd8d7609397f570f1cd9b83d2269abc0ed63.zip | |
Add directive task progression
Diffstat (limited to 'makima/src/orchestration')
| -rw-r--r-- | makima/src/orchestration/directive.rs | 41 |
1 files changed, 34 insertions, 7 deletions
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs index 22003af..a95e63f 100644 --- a/makima/src/orchestration/directive.rs +++ b/makima/src/orchestration/directive.rs @@ -67,7 +67,9 @@ impl DirectiveOrchestrator { } /// Phase 2: Ready steps with no task → create execution task and dispatch. + /// Also retries pending directive tasks that weren't dispatched previously. async fn phase_execution(&self) -> Result<(), anyhow::Error> { + // Create tasks for ready steps let steps = repository::get_ready_steps_for_dispatch(&self.pool).await?; for step in steps { @@ -118,6 +120,21 @@ impl DirectiveOrchestrator { } } } + + // Retry pending directive tasks that weren't dispatched + let pending = repository::get_pending_directive_tasks(&self.pool).await?; + for task in pending { + if self + .try_dispatch_task(task.id, task.owner_id, &task.name, &task.plan, task.version) + .await + { + // Task dispatched — mark its step as running if it has one + if let Some(step_id) = task.directive_step_id { + let _ = repository::set_step_running(&self.pool, step_id).await; + } + } + } + Ok(()) } @@ -288,7 +305,8 @@ impl DirectiveOrchestrator { Ok(()) } - /// Spawn an execution task for a step and assign it. + /// Spawn an execution task for a step. + /// Links the task to the step but only marks the step as 'running' once dispatched. async fn spawn_step_task( &self, step_id: Uuid, @@ -325,15 +343,21 @@ impl DirectiveOrchestrator { let task = repository::create_task_for_owner(&self.pool, owner_id, req).await?; - repository::assign_task_to_step(&self.pool, step_id, task.id).await?; + // Link the task to the step (sets task_id) but keep step as 'ready' for now + repository::link_task_to_step(&self.pool, step_id, task.id).await?; - // Try to dispatch to a daemon - self.try_dispatch_task(task.id, owner_id, &task.name, &task.plan, task.version).await; + // Only mark step as 'running' if we can actually dispatch the task + if self + .try_dispatch_task(task.id, owner_id, &task.name, &task.plan, task.version) + .await + { + repository::set_step_running(&self.pool, step_id).await?; + } Ok(()) } - /// Try to dispatch a task to an available daemon. If none available, leave pending. + /// Try to dispatch a task to an available daemon. Returns true if dispatched. async fn try_dispatch_task( &self, task_id: Uuid, @@ -341,13 +365,13 @@ impl DirectiveOrchestrator { task_name: &str, plan: &str, version: i32, - ) { + ) -> bool { let Some(daemon_id) = self.state.find_alternative_daemon(owner_id, &[]) else { tracing::info!( task_id = %task_id, "No daemon available for directive task — leaving pending for retry" ); - return; + return false; }; // Update task status to starting and assign daemon @@ -393,12 +417,14 @@ impl DirectiveOrchestrator { error = %e, "Failed to send SpawnTask to daemon for directive task" ); + return false; } else { tracing::info!( task_id = %task_id, daemon_id = %daemon_id, "Dispatched directive task to daemon" ); + return true; } } Ok(None) => { @@ -408,6 +434,7 @@ impl DirectiveOrchestrator { tracing::warn!(task_id = %task_id, error = %e, "Failed to update task for dispatch"); } } + false } } |
