diff options
Diffstat (limited to 'makima')
| -rw-r--r-- | makima/frontend/src/hooks/useDirectives.ts | 11 | ||||
| -rw-r--r-- | makima/src/db/repository.rs | 55 | ||||
| -rw-r--r-- | makima/src/orchestration/directive.rs | 41 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 9 |
4 files changed, 109 insertions, 7 deletions
diff --git a/makima/frontend/src/hooks/useDirectives.ts b/makima/frontend/src/hooks/useDirectives.ts index b69275a..f5f2b36 100644 --- a/makima/frontend/src/hooks/useDirectives.ts +++ b/makima/frontend/src/hooks/useDirectives.ts @@ -80,6 +80,17 @@ export function useDirective(id: string | undefined) { refresh(); }, [refresh]); + // Auto-poll while directive is active or has an orchestrator task + useEffect(() => { + if (!directive) return; + const needsPolling = + directive.status === "active" || directive.orchestratorTaskId != null; + if (!needsPolling) return; + + const interval = setInterval(refresh, 5000); + return () => clearInterval(interval); + }, [directive?.status, directive?.orchestratorTaskId, refresh]); + const update = useCallback(async (req: UpdateDirectiveRequest) => { if (!id) return; await updateDirective(id, req); diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 9ec5275..9dedc14 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -5535,6 +5535,61 @@ pub async fn clear_orchestrator_task( Ok(()) } +/// Link a task to a step without changing step status. +pub async fn link_task_to_step( + pool: &PgPool, + step_id: Uuid, + task_id: Uuid, +) -> Result<(), sqlx::Error> { + sqlx::query( + r#" + UPDATE directive_steps + SET task_id = $2 + WHERE id = $1 + "#, + ) + .bind(step_id) + .bind(task_id) + .execute(pool) + .await?; + Ok(()) +} + +/// Set a step to 'running' status (after its task has been dispatched). +pub async fn set_step_running( + pool: &PgPool, + step_id: Uuid, +) -> Result<(), sqlx::Error> { + sqlx::query( + r#" + UPDATE directive_steps + SET status = 'running', started_at = COALESCE(started_at, NOW()) + WHERE id = $1 + "#, + ) + .bind(step_id) + .execute(pool) + .await?; + Ok(()) +} + +/// Get pending directive tasks (tasks with directive_id that are still pending). +pub async fn get_pending_directive_tasks( + pool: &PgPool, +) -> Result<Vec<Task>, sqlx::Error> { + sqlx::query_as::<_, Task>( + r#" + SELECT * FROM tasks + WHERE directive_id IS NOT NULL + AND status = 'pending' + AND daemon_id IS NULL + ORDER BY created_at + "#, + ) + .fetch_all(pool) + .await +} + /// Get the max generation number for steps in a directive. pub async fn get_directive_max_generation( pool: &PgPool, diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs index 22003af..a95e63f 100644 --- a/makima/src/orchestration/directive.rs +++ b/makima/src/orchestration/directive.rs @@ -67,7 +67,9 @@ impl DirectiveOrchestrator { } /// Phase 2: Ready steps with no task → create execution task and dispatch. + /// Also retries pending directive tasks that weren't dispatched previously. async fn phase_execution(&self) -> Result<(), anyhow::Error> { + // Create tasks for ready steps let steps = repository::get_ready_steps_for_dispatch(&self.pool).await?; for step in steps { @@ -118,6 +120,21 @@ impl DirectiveOrchestrator { } } } + + // Retry pending directive tasks that weren't dispatched + let pending = repository::get_pending_directive_tasks(&self.pool).await?; + for task in pending { + if self + .try_dispatch_task(task.id, task.owner_id, &task.name, &task.plan, task.version) + .await + { + // Task dispatched — mark its step as running if it has one + if let Some(step_id) = task.directive_step_id { + let _ = repository::set_step_running(&self.pool, step_id).await; + } + } + } + Ok(()) } @@ -288,7 +305,8 @@ impl DirectiveOrchestrator { Ok(()) } - /// Spawn an execution task for a step and assign it. + /// Spawn an execution task for a step. + /// Links the task to the step but only marks the step as 'running' once dispatched. async fn spawn_step_task( &self, step_id: Uuid, @@ -325,15 +343,21 @@ impl DirectiveOrchestrator { let task = repository::create_task_for_owner(&self.pool, owner_id, req).await?; - repository::assign_task_to_step(&self.pool, step_id, task.id).await?; + // Link the task to the step (sets task_id) but keep step as 'ready' for now + repository::link_task_to_step(&self.pool, step_id, task.id).await?; - // Try to dispatch to a daemon - self.try_dispatch_task(task.id, owner_id, &task.name, &task.plan, task.version).await; + // Only mark step as 'running' if we can actually dispatch the task + if self + .try_dispatch_task(task.id, owner_id, &task.name, &task.plan, task.version) + .await + { + repository::set_step_running(&self.pool, step_id).await?; + } Ok(()) } - /// Try to dispatch a task to an available daemon. If none available, leave pending. + /// Try to dispatch a task to an available daemon. Returns true if dispatched. async fn try_dispatch_task( &self, task_id: Uuid, @@ -341,13 +365,13 @@ impl DirectiveOrchestrator { task_name: &str, plan: &str, version: i32, - ) { + ) -> bool { let Some(daemon_id) = self.state.find_alternative_daemon(owner_id, &[]) else { tracing::info!( task_id = %task_id, "No daemon available for directive task — leaving pending for retry" ); - return; + return false; }; // Update task status to starting and assign daemon @@ -393,12 +417,14 @@ impl DirectiveOrchestrator { error = %e, "Failed to send SpawnTask to daemon for directive task" ); + return false; } else { tracing::info!( task_id = %task_id, daemon_id = %daemon_id, "Dispatched directive task to daemon" ); + return true; } } Ok(None) => { @@ -408,6 +434,7 @@ impl DirectiveOrchestrator { tracing::warn!(task_id = %task_id, error = %e, "Failed to update task for dispatch"); } } + false } } diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 2ea7805..60de2e6 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -1318,6 +1318,15 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re // Check if all steps are done → set directive to idle let _ = repository::check_directive_idle(&pool, directive_id).await; } + } else if let Some(directive_id) = updated_task.directive_id { + // Planning/orchestrator task completed — clear orchestrator_task_id + let _ = repository::clear_orchestrator_task(&pool, directive_id).await; + // Advance DAG — planning task should have created steps + let _ = repository::advance_directive_ready_steps(&pool, directive_id).await; + if updated_task.status != "done" { + // Planning failed — pause directive + let _ = repository::set_directive_status(&pool, updated_task.owner_id, directive_id, "paused").await; + } } } |
