diff options
| author | soryu <soryu@soryu.co> | 2026-02-07 16:36:19 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-02-07 16:36:19 +0000 |
| commit | 1b72449496ce3a057a43d002c8042d5e7a1d6576 (patch) | |
| tree | f9151df7cc5128499ee91aafde3ff3c3b3281c1e /makima/src/db/repository.rs | |
| parent | 9e9f18884c78c21f5785908fb7ccd00e2fa5436b (diff) | |
| download | soryu-1b72449496ce3a057a43d002c8042d5e7a1d6576.tar.gz soryu-1b72449496ce3a057a43d002c8042d5e7a1d6576.zip | |
Add directive init mechanism
Diffstat (limited to 'makima/src/db/repository.rs')
| -rw-r--r-- | makima/src/db/repository.rs | 296 |
1 files changed, 296 insertions, 0 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 5949079..e072eb8 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -5183,3 +5183,299 @@ pub async fn list_steps_for_chain( .fetch_all(pool) .await } + +// ── Directive orchestration functions ─────────────────────────────────────── + +/// Update directive status with automatic timestamp management. +pub async fn update_directive_status( + pool: &PgPool, + id: Uuid, + new_status: &str, +) -> Result<Option<Directive>, sqlx::Error> { + sqlx::query_as::<_, Directive>( + r#" + UPDATE directives + SET status = $2, + started_at = CASE WHEN $2 = 'active' AND started_at IS NULL THEN NOW() ELSE started_at END, + completed_at = CASE WHEN $2 IN ('completed', 'failed') THEN NOW() ELSE completed_at END, + version = version + 1, + updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(id) + .bind(new_status) + .fetch_optional(pool) + .await +} + +/// Set the orchestrator contract ID on a directive. +pub async fn set_directive_orchestrator_contract( + pool: &PgPool, + directive_id: Uuid, + contract_id: Uuid, +) -> Result<Option<Directive>, sqlx::Error> { + sqlx::query_as::<_, Directive>( + r#" + UPDATE directives + SET orchestrator_contract_id = $2, + version = version + 1, + updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(directive_id) + .bind(contract_id) + .fetch_optional(pool) + .await +} + +/// Set the current chain ID on a directive and increment chain_generation_count. +pub async fn set_directive_current_chain( + pool: &PgPool, + directive_id: Uuid, + chain_id: Uuid, +) -> Result<Option<Directive>, sqlx::Error> { + sqlx::query_as::<_, Directive>( + r#" + UPDATE directives + SET current_chain_id = $2, + chain_generation_count = chain_generation_count + 1, + version = version + 1, + updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(directive_id) + .bind(chain_id) + .fetch_optional(pool) + .await +} + +/// Create a new directive chain. +pub async fn create_directive_chain( + pool: &PgPool, + directive_id: Uuid, + name: &str, + description: Option<&str>, + rationale: Option<&str>, + total_steps: i32, +) -> Result<DirectiveChain, sqlx::Error> { + // Get next generation number + let next_gen: (i64,) = sqlx::query_as( + "SELECT COALESCE(MAX(generation), 0) + 1 FROM directive_chains WHERE directive_id = $1", + ) + .bind(directive_id) + .fetch_one(pool) + .await?; + + sqlx::query_as::<_, DirectiveChain>( + r#" + INSERT INTO directive_chains (directive_id, generation, name, description, rationale, total_steps, status) + VALUES ($1, $2, $3, $4, $5, $6, 'active') + RETURNING * + "#, + ) + .bind(directive_id) + .bind(next_gen.0 as i32) + .bind(name) + .bind(description) + .bind(rationale) + .bind(total_steps) + .fetch_one(pool) + .await +} + +/// Create a chain step. +pub async fn create_chain_step( + pool: &PgPool, + chain_id: Uuid, + name: &str, + description: Option<&str>, + step_type: &str, + contract_type: &str, + initial_phase: Option<&str>, + task_plan: Option<&str>, + depends_on: Option<Vec<Uuid>>, + order_index: i32, +) -> Result<ChainStep, sqlx::Error> { + sqlx::query_as::<_, ChainStep>( + r#" + INSERT INTO chain_steps (chain_id, name, description, step_type, contract_type, initial_phase, task_plan, depends_on, order_index, status) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, 'pending') + RETURNING * + "#, + ) + .bind(chain_id) + .bind(name) + .bind(description) + .bind(step_type) + .bind(contract_type) + .bind(initial_phase) + .bind(task_plan) + .bind(depends_on.as_deref()) + .bind(order_index) + .fetch_one(pool) + .await +} + +/// Update a chain step's status with automatic timestamp management. +pub async fn update_step_status( + pool: &PgPool, + step_id: Uuid, + new_status: &str, +) -> Result<Option<ChainStep>, sqlx::Error> { + sqlx::query_as::<_, ChainStep>( + r#" + UPDATE chain_steps + SET status = $2, + started_at = CASE WHEN $2 = 'running' AND started_at IS NULL THEN NOW() ELSE started_at END, + completed_at = CASE WHEN $2 IN ('passed', 'failed') THEN NOW() ELSE completed_at END + WHERE id = $1 + RETURNING * + "#, + ) + .bind(step_id) + .bind(new_status) + .fetch_optional(pool) + .await +} + +/// Link a chain step to a contract and supervisor task. +pub async fn update_step_contract( + pool: &PgPool, + step_id: Uuid, + contract_id: Uuid, + supervisor_task_id: Uuid, +) -> Result<Option<ChainStep>, sqlx::Error> { + sqlx::query_as::<_, ChainStep>( + r#" + UPDATE chain_steps + SET contract_id = $2, + supervisor_task_id = $3 + WHERE id = $1 + RETURNING * + "#, + ) + .bind(step_id) + .bind(contract_id) + .bind(supervisor_task_id) + .fetch_optional(pool) + .await +} + +/// Find steps that are ready to execute (pending, with all dependencies passed). +pub async fn find_ready_steps( + pool: &PgPool, + chain_id: Uuid, +) -> Result<Vec<ChainStep>, sqlx::Error> { + sqlx::query_as::<_, ChainStep>( + r#" + SELECT * FROM chain_steps + WHERE chain_id = $1 + AND status = 'pending' + AND ( + depends_on IS NULL + OR array_length(depends_on, 1) IS NULL + OR NOT EXISTS ( + SELECT 1 FROM unnest(depends_on) AS dep_id + WHERE dep_id NOT IN ( + SELECT id FROM chain_steps WHERE chain_id = $1 AND status = 'passed' + ) + ) + ) + ORDER BY order_index ASC + "#, + ) + .bind(chain_id) + .fetch_all(pool) + .await +} + +/// Get a chain step by its linked contract ID. +pub async fn get_step_by_contract_id( + pool: &PgPool, + contract_id: Uuid, +) -> Result<Option<ChainStep>, sqlx::Error> { + sqlx::query_as::<_, ChainStep>( + r#"SELECT * FROM chain_steps WHERE contract_id = $1"#, + ) + .bind(contract_id) + .fetch_optional(pool) + .await +} + +/// Get a directive by its orchestrator contract ID. +pub async fn get_directive_by_orchestrator_contract( + pool: &PgPool, + contract_id: Uuid, +) -> Result<Option<Directive>, sqlx::Error> { + sqlx::query_as::<_, Directive>( + r#"SELECT * FROM directives WHERE orchestrator_contract_id = $1"#, + ) + .bind(contract_id) + .fetch_optional(pool) + .await +} + +/// Set directive-related fields on a contract (directive_id, is_directive_orchestrator). +pub async fn set_contract_directive_fields( + pool: &PgPool, + contract_id: Uuid, + directive_id: Option<Uuid>, + is_orchestrator: bool, +) -> Result<(), sqlx::Error> { + sqlx::query( + r#" + UPDATE contracts + SET directive_id = $2, + is_directive_orchestrator = $3 + WHERE id = $1 + "#, + ) + .bind(contract_id) + .bind(directive_id) + .bind(is_orchestrator) + .execute(pool) + .await?; + Ok(()) +} + +/// Get a directive by ID (no owner scoping, for internal use). +pub async fn get_directive( + pool: &PgPool, + id: Uuid, +) -> Result<Option<Directive>, sqlx::Error> { + sqlx::query_as::<_, Directive>( + r#"SELECT * FROM directives WHERE id = $1"#, + ) + .bind(id) + .fetch_optional(pool) + .await +} + +/// Update chain status. +pub async fn update_chain_status( + pool: &PgPool, + chain_id: Uuid, + new_status: &str, +) -> Result<Option<DirectiveChain>, sqlx::Error> { + sqlx::query_as::<_, DirectiveChain>( + r#" + UPDATE directive_chains + SET status = $2, + completed_at = CASE WHEN $2 IN ('completed', 'failed') THEN NOW() ELSE completed_at END, + version = version + 1, + updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(chain_id) + .bind(new_status) + .fetch_optional(pool) + .await +} |
