summaryrefslogtreecommitdiff
path: root/makima/src/db
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-02-07 16:36:19 +0000
committersoryu <soryu@soryu.co>2026-02-07 16:36:19 +0000
commit1b72449496ce3a057a43d002c8042d5e7a1d6576 (patch)
treef9151df7cc5128499ee91aafde3ff3c3b3281c1e /makima/src/db
parent9e9f18884c78c21f5785908fb7ccd00e2fa5436b (diff)
downloadsoryu-1b72449496ce3a057a43d002c8042d5e7a1d6576.tar.gz
soryu-1b72449496ce3a057a43d002c8042d5e7a1d6576.zip
Add directive init mechanism
Diffstat (limited to 'makima/src/db')
-rw-r--r--makima/src/db/repository.rs296
1 files changed, 296 insertions, 0 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index 5949079..e072eb8 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -5183,3 +5183,299 @@ pub async fn list_steps_for_chain(
.fetch_all(pool)
.await
}
+
+// ── Directive orchestration functions ───────────────────────────────────────
+
+/// Update directive status with automatic timestamp management.
+pub async fn update_directive_status(
+ pool: &PgPool,
+ id: Uuid,
+ new_status: &str,
+) -> Result<Option<Directive>, sqlx::Error> {
+ sqlx::query_as::<_, Directive>(
+ r#"
+ UPDATE directives
+ SET status = $2,
+ started_at = CASE WHEN $2 = 'active' AND started_at IS NULL THEN NOW() ELSE started_at END,
+ completed_at = CASE WHEN $2 IN ('completed', 'failed') THEN NOW() ELSE completed_at END,
+ version = version + 1,
+ updated_at = NOW()
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(id)
+ .bind(new_status)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Set the orchestrator contract ID on a directive.
+pub async fn set_directive_orchestrator_contract(
+ pool: &PgPool,
+ directive_id: Uuid,
+ contract_id: Uuid,
+) -> Result<Option<Directive>, sqlx::Error> {
+ sqlx::query_as::<_, Directive>(
+ r#"
+ UPDATE directives
+ SET orchestrator_contract_id = $2,
+ version = version + 1,
+ updated_at = NOW()
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(directive_id)
+ .bind(contract_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Set the current chain ID on a directive and increment chain_generation_count.
+pub async fn set_directive_current_chain(
+ pool: &PgPool,
+ directive_id: Uuid,
+ chain_id: Uuid,
+) -> Result<Option<Directive>, sqlx::Error> {
+ sqlx::query_as::<_, Directive>(
+ r#"
+ UPDATE directives
+ SET current_chain_id = $2,
+ chain_generation_count = chain_generation_count + 1,
+ version = version + 1,
+ updated_at = NOW()
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(directive_id)
+ .bind(chain_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Create a new directive chain.
+pub async fn create_directive_chain(
+ pool: &PgPool,
+ directive_id: Uuid,
+ name: &str,
+ description: Option<&str>,
+ rationale: Option<&str>,
+ total_steps: i32,
+) -> Result<DirectiveChain, sqlx::Error> {
+ // Get next generation number
+ let next_gen: (i64,) = sqlx::query_as(
+ "SELECT COALESCE(MAX(generation), 0) + 1 FROM directive_chains WHERE directive_id = $1",
+ )
+ .bind(directive_id)
+ .fetch_one(pool)
+ .await?;
+
+ sqlx::query_as::<_, DirectiveChain>(
+ r#"
+ INSERT INTO directive_chains (directive_id, generation, name, description, rationale, total_steps, status)
+ VALUES ($1, $2, $3, $4, $5, $6, 'active')
+ RETURNING *
+ "#,
+ )
+ .bind(directive_id)
+ .bind(next_gen.0 as i32)
+ .bind(name)
+ .bind(description)
+ .bind(rationale)
+ .bind(total_steps)
+ .fetch_one(pool)
+ .await
+}
+
+/// Create a chain step.
+pub async fn create_chain_step(
+ pool: &PgPool,
+ chain_id: Uuid,
+ name: &str,
+ description: Option<&str>,
+ step_type: &str,
+ contract_type: &str,
+ initial_phase: Option<&str>,
+ task_plan: Option<&str>,
+ depends_on: Option<Vec<Uuid>>,
+ order_index: i32,
+) -> Result<ChainStep, sqlx::Error> {
+ sqlx::query_as::<_, ChainStep>(
+ r#"
+ INSERT INTO chain_steps (chain_id, name, description, step_type, contract_type, initial_phase, task_plan, depends_on, order_index, status)
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, 'pending')
+ RETURNING *
+ "#,
+ )
+ .bind(chain_id)
+ .bind(name)
+ .bind(description)
+ .bind(step_type)
+ .bind(contract_type)
+ .bind(initial_phase)
+ .bind(task_plan)
+ .bind(depends_on.as_deref())
+ .bind(order_index)
+ .fetch_one(pool)
+ .await
+}
+
+/// Update a chain step's status with automatic timestamp management.
+pub async fn update_step_status(
+ pool: &PgPool,
+ step_id: Uuid,
+ new_status: &str,
+) -> Result<Option<ChainStep>, sqlx::Error> {
+ sqlx::query_as::<_, ChainStep>(
+ r#"
+ UPDATE chain_steps
+ SET status = $2,
+ started_at = CASE WHEN $2 = 'running' AND started_at IS NULL THEN NOW() ELSE started_at END,
+ completed_at = CASE WHEN $2 IN ('passed', 'failed') THEN NOW() ELSE completed_at END
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(step_id)
+ .bind(new_status)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Link a chain step to a contract and supervisor task.
+pub async fn update_step_contract(
+ pool: &PgPool,
+ step_id: Uuid,
+ contract_id: Uuid,
+ supervisor_task_id: Uuid,
+) -> Result<Option<ChainStep>, sqlx::Error> {
+ sqlx::query_as::<_, ChainStep>(
+ r#"
+ UPDATE chain_steps
+ SET contract_id = $2,
+ supervisor_task_id = $3
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(step_id)
+ .bind(contract_id)
+ .bind(supervisor_task_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Find steps that are ready to execute (pending, with all dependencies passed).
+pub async fn find_ready_steps(
+ pool: &PgPool,
+ chain_id: Uuid,
+) -> Result<Vec<ChainStep>, sqlx::Error> {
+ sqlx::query_as::<_, ChainStep>(
+ r#"
+ SELECT * FROM chain_steps
+ WHERE chain_id = $1
+ AND status = 'pending'
+ AND (
+ depends_on IS NULL
+ OR array_length(depends_on, 1) IS NULL
+ OR NOT EXISTS (
+ SELECT 1 FROM unnest(depends_on) AS dep_id
+ WHERE dep_id NOT IN (
+ SELECT id FROM chain_steps WHERE chain_id = $1 AND status = 'passed'
+ )
+ )
+ )
+ ORDER BY order_index ASC
+ "#,
+ )
+ .bind(chain_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Get a chain step by its linked contract ID.
+pub async fn get_step_by_contract_id(
+ pool: &PgPool,
+ contract_id: Uuid,
+) -> Result<Option<ChainStep>, sqlx::Error> {
+ sqlx::query_as::<_, ChainStep>(
+ r#"SELECT * FROM chain_steps WHERE contract_id = $1"#,
+ )
+ .bind(contract_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Get a directive by its orchestrator contract ID.
+pub async fn get_directive_by_orchestrator_contract(
+ pool: &PgPool,
+ contract_id: Uuid,
+) -> Result<Option<Directive>, sqlx::Error> {
+ sqlx::query_as::<_, Directive>(
+ r#"SELECT * FROM directives WHERE orchestrator_contract_id = $1"#,
+ )
+ .bind(contract_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Set directive-related fields on a contract (directive_id, is_directive_orchestrator).
+pub async fn set_contract_directive_fields(
+ pool: &PgPool,
+ contract_id: Uuid,
+ directive_id: Option<Uuid>,
+ is_orchestrator: bool,
+) -> Result<(), sqlx::Error> {
+ sqlx::query(
+ r#"
+ UPDATE contracts
+ SET directive_id = $2,
+ is_directive_orchestrator = $3
+ WHERE id = $1
+ "#,
+ )
+ .bind(contract_id)
+ .bind(directive_id)
+ .bind(is_orchestrator)
+ .execute(pool)
+ .await?;
+ Ok(())
+}
+
+/// Get a directive by ID (no owner scoping, for internal use).
+pub async fn get_directive(
+ pool: &PgPool,
+ id: Uuid,
+) -> Result<Option<Directive>, sqlx::Error> {
+ sqlx::query_as::<_, Directive>(
+ r#"SELECT * FROM directives WHERE id = $1"#,
+ )
+ .bind(id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Update chain status.
+pub async fn update_chain_status(
+ pool: &PgPool,
+ chain_id: Uuid,
+ new_status: &str,
+) -> Result<Option<DirectiveChain>, sqlx::Error> {
+ sqlx::query_as::<_, DirectiveChain>(
+ r#"
+ UPDATE directive_chains
+ SET status = $2,
+ completed_at = CASE WHEN $2 IN ('completed', 'failed') THEN NOW() ELSE completed_at END,
+ version = version + 1,
+ updated_at = NOW()
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(chain_id)
+ .bind(new_status)
+ .fetch_optional(pool)
+ .await
+}