diff options
Diffstat (limited to 'makima/src/db/repository.rs')
| -rw-r--r-- | makima/src/db/repository.rs | 223 |
1 files changed, 223 insertions, 0 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index f347fc7..9ec5275 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -5325,3 +5325,226 @@ pub async fn set_directive_status( .await } +// ============================================================================= +// Directive Orchestrator Queries +// ============================================================================= + +/// Get active directives that need planning (no steps, no orchestrator task). +pub async fn get_directives_needing_planning( + pool: &PgPool, +) -> Result<Vec<Directive>, sqlx::Error> { + sqlx::query_as::<_, Directive>( + r#" + SELECT d.* FROM directives d + WHERE d.status = 'active' + AND d.orchestrator_task_id IS NULL + AND NOT EXISTS ( + SELECT 1 FROM directive_steps WHERE directive_id = d.id + ) + "#, + ) + .fetch_all(pool) + .await +} + +/// A step joined with minimal directive info for dispatch. +#[derive(Debug, Clone, sqlx::FromRow)] +pub struct StepForDispatch { + // Step fields + pub step_id: Uuid, + pub directive_id: Uuid, + pub step_name: String, + pub step_description: Option<String>, + pub task_plan: Option<String>, + pub order_index: i32, + pub generation: i32, + // Directive fields + pub owner_id: Uuid, + pub directive_title: String, + pub repository_url: Option<String>, + pub base_branch: Option<String>, +} + +/// Get ready steps that need task dispatch. +pub async fn get_ready_steps_for_dispatch( + pool: &PgPool, +) -> Result<Vec<StepForDispatch>, sqlx::Error> { + sqlx::query_as::<_, StepForDispatch>( + r#" + SELECT + ds.id AS step_id, + ds.directive_id, + ds.name AS step_name, + ds.description AS step_description, + ds.task_plan, + ds.order_index, + ds.generation, + d.owner_id, + d.title AS directive_title, + d.repository_url, + d.base_branch + FROM directive_steps ds + JOIN directives d ON d.id = ds.directive_id + WHERE ds.status = 'ready' + AND ds.task_id IS NULL + AND d.status = 'active' + ORDER BY ds.order_index + "#, + ) + .fetch_all(pool) + .await +} + +/// A running step joined with its task's current status. +#[derive(Debug, Clone, sqlx::FromRow)] +pub struct RunningStepWithTask { + pub step_id: Uuid, + pub directive_id: Uuid, + pub task_id: Uuid, + pub task_status: String, +} + +/// Get running steps with their task status for monitoring. +pub async fn get_running_steps_with_tasks( + pool: &PgPool, +) -> Result<Vec<RunningStepWithTask>, sqlx::Error> { + sqlx::query_as::<_, RunningStepWithTask>( + r#" + SELECT + ds.id AS step_id, + ds.directive_id, + ds.task_id AS "task_id!", + t.status AS task_status + FROM directive_steps ds + JOIN tasks t ON t.id = ds.task_id + WHERE ds.status = 'running' + AND ds.task_id IS NOT NULL + "#, + ) + .fetch_all(pool) + .await +} + +/// An orchestrator task to check (directive with pending planning task). +#[derive(Debug, Clone, sqlx::FromRow)] +pub struct OrchestratorTaskCheck { + pub directive_id: Uuid, + pub orchestrator_task_id: Uuid, + pub task_status: String, + pub owner_id: Uuid, +} + +/// Get directives with orchestrator tasks to check completion. +pub async fn get_orchestrator_tasks_to_check( + pool: &PgPool, +) -> Result<Vec<OrchestratorTaskCheck>, sqlx::Error> { + sqlx::query_as::<_, OrchestratorTaskCheck>( + r#" + SELECT + d.id AS directive_id, + d.orchestrator_task_id AS "orchestrator_task_id!", + t.status AS task_status, + d.owner_id + FROM directives d + JOIN tasks t ON t.id = d.orchestrator_task_id + WHERE d.orchestrator_task_id IS NOT NULL + AND d.status = 'active' + "#, + ) + .fetch_all(pool) + .await +} + +/// Get active directives that need re-planning (goal updated after latest step). +pub async fn get_directives_needing_replanning( + pool: &PgPool, +) -> Result<Vec<Directive>, sqlx::Error> { + sqlx::query_as::<_, Directive>( + r#" + SELECT d.* FROM directives d + WHERE d.status = 'active' + AND d.orchestrator_task_id IS NULL + AND EXISTS ( + SELECT 1 FROM directive_steps WHERE directive_id = d.id + ) + AND d.goal_updated_at > ( + SELECT COALESCE(MAX(ds.created_at), '1970-01-01'::timestamptz) + FROM directive_steps ds WHERE ds.directive_id = d.id + ) + "#, + ) + .fetch_all(pool) + .await +} + +/// Assign a task to a step and set status to running. +pub async fn assign_task_to_step( + pool: &PgPool, + step_id: Uuid, + task_id: Uuid, +) -> Result<Option<DirectiveStep>, sqlx::Error> { + sqlx::query_as::<_, DirectiveStep>( + r#" + UPDATE directive_steps + SET task_id = $2, status = 'running', started_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(step_id) + .bind(task_id) + .fetch_optional(pool) + .await +} + +/// Set the orchestrator_task_id on a directive. +pub async fn assign_orchestrator_task( + pool: &PgPool, + directive_id: Uuid, + task_id: Uuid, +) -> Result<(), sqlx::Error> { + sqlx::query( + r#" + UPDATE directives + SET orchestrator_task_id = $2, updated_at = NOW() + WHERE id = $1 + "#, + ) + .bind(directive_id) + .bind(task_id) + .execute(pool) + .await?; + Ok(()) +} + +/// Clear the orchestrator_task_id on a directive. +pub async fn clear_orchestrator_task( + pool: &PgPool, + directive_id: Uuid, +) -> Result<(), sqlx::Error> { + sqlx::query( + r#" + UPDATE directives + SET orchestrator_task_id = NULL, updated_at = NOW() + WHERE id = $1 + "#, + ) + .bind(directive_id) + .execute(pool) + .await?; + Ok(()) +} + +/// Get the max generation number for steps in a directive. +pub async fn get_directive_max_generation( + pool: &PgPool, + directive_id: Uuid, +) -> Result<i32, sqlx::Error> { + let row: (Option<i32>,) = sqlx::query_as( + r#"SELECT MAX(generation) FROM directive_steps WHERE directive_id = $1"#, + ) + .bind(directive_id) + .fetch_one(pool) + .await?; + Ok(row.0.unwrap_or(0)) +} |
