summaryrefslogtreecommitdiff
path: root/makima/src/db
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-02-09 02:35:36 +0000
committersoryu <soryu@soryu.co>2026-02-09 02:35:36 +0000
commita2646a828febbdac798a206655a15eae7e463bca (patch)
tree7736396d87f6bf4dd50a2d3e91525534a36adf00 /makima/src/db
parent9c92d9235a0d1258fff9f7e625b0463c4952c45f (diff)
downloadsoryu-a2646a828febbdac798a206655a15eae7e463bca.tar.gz
soryu-a2646a828febbdac798a206655a15eae7e463bca.zip
Add directive init
Diffstat (limited to 'makima/src/db')
-rw-r--r--makima/src/db/repository.rs223
1 files changed, 223 insertions, 0 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index f347fc7..9ec5275 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -5325,3 +5325,226 @@ pub async fn set_directive_status(
.await
}
+// =============================================================================
+// Directive Orchestrator Queries
+// =============================================================================
+
+/// Get active directives that need planning (no steps, no orchestrator task).
+pub async fn get_directives_needing_planning(
+ pool: &PgPool,
+) -> Result<Vec<Directive>, sqlx::Error> {
+ sqlx::query_as::<_, Directive>(
+ r#"
+ SELECT d.* FROM directives d
+ WHERE d.status = 'active'
+ AND d.orchestrator_task_id IS NULL
+ AND NOT EXISTS (
+ SELECT 1 FROM directive_steps WHERE directive_id = d.id
+ )
+ "#,
+ )
+ .fetch_all(pool)
+ .await
+}
+
+/// A step joined with minimal directive info for dispatch.
+#[derive(Debug, Clone, sqlx::FromRow)]
+pub struct StepForDispatch {
+ // Step fields
+ pub step_id: Uuid,
+ pub directive_id: Uuid,
+ pub step_name: String,
+ pub step_description: Option<String>,
+ pub task_plan: Option<String>,
+ pub order_index: i32,
+ pub generation: i32,
+ // Directive fields
+ pub owner_id: Uuid,
+ pub directive_title: String,
+ pub repository_url: Option<String>,
+ pub base_branch: Option<String>,
+}
+
+/// Get ready steps that need task dispatch.
+pub async fn get_ready_steps_for_dispatch(
+ pool: &PgPool,
+) -> Result<Vec<StepForDispatch>, sqlx::Error> {
+ sqlx::query_as::<_, StepForDispatch>(
+ r#"
+ SELECT
+ ds.id AS step_id,
+ ds.directive_id,
+ ds.name AS step_name,
+ ds.description AS step_description,
+ ds.task_plan,
+ ds.order_index,
+ ds.generation,
+ d.owner_id,
+ d.title AS directive_title,
+ d.repository_url,
+ d.base_branch
+ FROM directive_steps ds
+ JOIN directives d ON d.id = ds.directive_id
+ WHERE ds.status = 'ready'
+ AND ds.task_id IS NULL
+ AND d.status = 'active'
+ ORDER BY ds.order_index
+ "#,
+ )
+ .fetch_all(pool)
+ .await
+}
+
+/// A running step joined with its task's current status.
+#[derive(Debug, Clone, sqlx::FromRow)]
+pub struct RunningStepWithTask {
+ pub step_id: Uuid,
+ pub directive_id: Uuid,
+ pub task_id: Uuid,
+ pub task_status: String,
+}
+
+/// Get running steps with their task status for monitoring.
+pub async fn get_running_steps_with_tasks(
+ pool: &PgPool,
+) -> Result<Vec<RunningStepWithTask>, sqlx::Error> {
+ sqlx::query_as::<_, RunningStepWithTask>(
+ r#"
+ SELECT
+ ds.id AS step_id,
+ ds.directive_id,
+ ds.task_id AS "task_id!",
+ t.status AS task_status
+ FROM directive_steps ds
+ JOIN tasks t ON t.id = ds.task_id
+ WHERE ds.status = 'running'
+ AND ds.task_id IS NOT NULL
+ "#,
+ )
+ .fetch_all(pool)
+ .await
+}
+
+/// An orchestrator task to check (directive with pending planning task).
+#[derive(Debug, Clone, sqlx::FromRow)]
+pub struct OrchestratorTaskCheck {
+ pub directive_id: Uuid,
+ pub orchestrator_task_id: Uuid,
+ pub task_status: String,
+ pub owner_id: Uuid,
+}
+
+/// Get directives with orchestrator tasks to check completion.
+pub async fn get_orchestrator_tasks_to_check(
+ pool: &PgPool,
+) -> Result<Vec<OrchestratorTaskCheck>, sqlx::Error> {
+ sqlx::query_as::<_, OrchestratorTaskCheck>(
+ r#"
+ SELECT
+ d.id AS directive_id,
+ d.orchestrator_task_id AS "orchestrator_task_id!",
+ t.status AS task_status,
+ d.owner_id
+ FROM directives d
+ JOIN tasks t ON t.id = d.orchestrator_task_id
+ WHERE d.orchestrator_task_id IS NOT NULL
+ AND d.status = 'active'
+ "#,
+ )
+ .fetch_all(pool)
+ .await
+}
+
+/// Get active directives that need re-planning (goal updated after latest step).
+pub async fn get_directives_needing_replanning(
+ pool: &PgPool,
+) -> Result<Vec<Directive>, sqlx::Error> {
+ sqlx::query_as::<_, Directive>(
+ r#"
+ SELECT d.* FROM directives d
+ WHERE d.status = 'active'
+ AND d.orchestrator_task_id IS NULL
+ AND EXISTS (
+ SELECT 1 FROM directive_steps WHERE directive_id = d.id
+ )
+ AND d.goal_updated_at > (
+ SELECT COALESCE(MAX(ds.created_at), '1970-01-01'::timestamptz)
+ FROM directive_steps ds WHERE ds.directive_id = d.id
+ )
+ "#,
+ )
+ .fetch_all(pool)
+ .await
+}
+
+/// Assign a task to a step and set status to running.
+pub async fn assign_task_to_step(
+ pool: &PgPool,
+ step_id: Uuid,
+ task_id: Uuid,
+) -> Result<Option<DirectiveStep>, sqlx::Error> {
+ sqlx::query_as::<_, DirectiveStep>(
+ r#"
+ UPDATE directive_steps
+ SET task_id = $2, status = 'running', started_at = NOW()
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(step_id)
+ .bind(task_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Set the orchestrator_task_id on a directive.
+pub async fn assign_orchestrator_task(
+ pool: &PgPool,
+ directive_id: Uuid,
+ task_id: Uuid,
+) -> Result<(), sqlx::Error> {
+ sqlx::query(
+ r#"
+ UPDATE directives
+ SET orchestrator_task_id = $2, updated_at = NOW()
+ WHERE id = $1
+ "#,
+ )
+ .bind(directive_id)
+ .bind(task_id)
+ .execute(pool)
+ .await?;
+ Ok(())
+}
+
+/// Clear the orchestrator_task_id on a directive.
+pub async fn clear_orchestrator_task(
+ pool: &PgPool,
+ directive_id: Uuid,
+) -> Result<(), sqlx::Error> {
+ sqlx::query(
+ r#"
+ UPDATE directives
+ SET orchestrator_task_id = NULL, updated_at = NOW()
+ WHERE id = $1
+ "#,
+ )
+ .bind(directive_id)
+ .execute(pool)
+ .await?;
+ Ok(())
+}
+
+/// Get the max generation number for steps in a directive.
+pub async fn get_directive_max_generation(
+ pool: &PgPool,
+ directive_id: Uuid,
+) -> Result<i32, sqlx::Error> {
+ let row: (Option<i32>,) = sqlx::query_as(
+ r#"SELECT MAX(generation) FROM directive_steps WHERE directive_id = $1"#,
+ )
+ .bind(directive_id)
+ .fetch_one(pool)
+ .await?;
+ Ok(row.0.unwrap_or(0))
+}