summaryrefslogtreecommitdiff
path: root/makima/src
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src')
-rw-r--r--makima/src/db/repository.rs55
-rw-r--r--makima/src/orchestration/directive.rs41
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs9
3 files changed, 98 insertions, 7 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index 9ec5275..9dedc14 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -5535,6 +5535,61 @@ pub async fn clear_orchestrator_task(
Ok(())
}
+/// Link a task to a step without changing step status.
+pub async fn link_task_to_step(
+ pool: &PgPool,
+ step_id: Uuid,
+ task_id: Uuid,
+) -> Result<(), sqlx::Error> {
+ sqlx::query(
+ r#"
+ UPDATE directive_steps
+ SET task_id = $2
+ WHERE id = $1
+ "#,
+ )
+ .bind(step_id)
+ .bind(task_id)
+ .execute(pool)
+ .await?;
+ Ok(())
+}
+
+/// Set a step to 'running' status (after its task has been dispatched).
+pub async fn set_step_running(
+ pool: &PgPool,
+ step_id: Uuid,
+) -> Result<(), sqlx::Error> {
+ sqlx::query(
+ r#"
+ UPDATE directive_steps
+ SET status = 'running', started_at = COALESCE(started_at, NOW())
+ WHERE id = $1
+ "#,
+ )
+ .bind(step_id)
+ .execute(pool)
+ .await?;
+ Ok(())
+}
+
+/// Get pending directive tasks (tasks with directive_id that are still pending).
+pub async fn get_pending_directive_tasks(
+ pool: &PgPool,
+) -> Result<Vec<Task>, sqlx::Error> {
+ sqlx::query_as::<_, Task>(
+ r#"
+ SELECT * FROM tasks
+ WHERE directive_id IS NOT NULL
+ AND status = 'pending'
+ AND daemon_id IS NULL
+ ORDER BY created_at
+ "#,
+ )
+ .fetch_all(pool)
+ .await
+}
+
/// Get the max generation number for steps in a directive.
pub async fn get_directive_max_generation(
pool: &PgPool,
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs
index 22003af..a95e63f 100644
--- a/makima/src/orchestration/directive.rs
+++ b/makima/src/orchestration/directive.rs
@@ -67,7 +67,9 @@ impl DirectiveOrchestrator {
}
/// Phase 2: Ready steps with no task → create execution task and dispatch.
+ /// Also retries pending directive tasks that weren't dispatched previously.
async fn phase_execution(&self) -> Result<(), anyhow::Error> {
+ // Create tasks for ready steps
let steps = repository::get_ready_steps_for_dispatch(&self.pool).await?;
for step in steps {
@@ -118,6 +120,21 @@ impl DirectiveOrchestrator {
}
}
}
+
+ // Retry pending directive tasks that weren't dispatched
+ let pending = repository::get_pending_directive_tasks(&self.pool).await?;
+ for task in pending {
+ if self
+ .try_dispatch_task(task.id, task.owner_id, &task.name, &task.plan, task.version)
+ .await
+ {
+ // Task dispatched — mark its step as running if it has one
+ if let Some(step_id) = task.directive_step_id {
+ let _ = repository::set_step_running(&self.pool, step_id).await;
+ }
+ }
+ }
+
Ok(())
}
@@ -288,7 +305,8 @@ impl DirectiveOrchestrator {
Ok(())
}
- /// Spawn an execution task for a step and assign it.
+ /// Spawn an execution task for a step.
+ /// Links the task to the step but only marks the step as 'running' once dispatched.
async fn spawn_step_task(
&self,
step_id: Uuid,
@@ -325,15 +343,21 @@ impl DirectiveOrchestrator {
let task = repository::create_task_for_owner(&self.pool, owner_id, req).await?;
- repository::assign_task_to_step(&self.pool, step_id, task.id).await?;
+ // Link the task to the step (sets task_id) but keep step as 'ready' for now
+ repository::link_task_to_step(&self.pool, step_id, task.id).await?;
- // Try to dispatch to a daemon
- self.try_dispatch_task(task.id, owner_id, &task.name, &task.plan, task.version).await;
+ // Only mark step as 'running' if we can actually dispatch the task
+ if self
+ .try_dispatch_task(task.id, owner_id, &task.name, &task.plan, task.version)
+ .await
+ {
+ repository::set_step_running(&self.pool, step_id).await?;
+ }
Ok(())
}
- /// Try to dispatch a task to an available daemon. If none available, leave pending.
+ /// Try to dispatch a task to an available daemon. Returns true if dispatched.
async fn try_dispatch_task(
&self,
task_id: Uuid,
@@ -341,13 +365,13 @@ impl DirectiveOrchestrator {
task_name: &str,
plan: &str,
version: i32,
- ) {
+ ) -> bool {
let Some(daemon_id) = self.state.find_alternative_daemon(owner_id, &[]) else {
tracing::info!(
task_id = %task_id,
"No daemon available for directive task — leaving pending for retry"
);
- return;
+ return false;
};
// Update task status to starting and assign daemon
@@ -393,12 +417,14 @@ impl DirectiveOrchestrator {
error = %e,
"Failed to send SpawnTask to daemon for directive task"
);
+ return false;
} else {
tracing::info!(
task_id = %task_id,
daemon_id = %daemon_id,
"Dispatched directive task to daemon"
);
+ return true;
}
}
Ok(None) => {
@@ -408,6 +434,7 @@ impl DirectiveOrchestrator {
tracing::warn!(task_id = %task_id, error = %e, "Failed to update task for dispatch");
}
}
+ false
}
}
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index 2ea7805..60de2e6 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -1318,6 +1318,15 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
// Check if all steps are done → set directive to idle
let _ = repository::check_directive_idle(&pool, directive_id).await;
}
+ } else if let Some(directive_id) = updated_task.directive_id {
+ // Planning/orchestrator task completed — clear orchestrator_task_id
+ let _ = repository::clear_orchestrator_task(&pool, directive_id).await;
+ // Advance DAG — planning task should have created steps
+ let _ = repository::advance_directive_ready_steps(&pool, directive_id).await;
+ if updated_task.status != "done" {
+ // Planning failed — pause directive
+ let _ = repository::set_directive_status(&pool, updated_task.owner_id, directive_id, "paused").await;
+ }
}
}