diff options
Diffstat (limited to 'makima/src/db')
| -rw-r--r-- | makima/src/db/repository.rs | 276 |
1 files changed, 273 insertions, 3 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 12d5e4d..e58f58c 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -35,6 +35,8 @@ pub enum RepositoryError { /// The actual current version in the database actual: i32, }, + /// Caller-facing precondition failure (wrong status, etc.). + Validation(String), } impl From<sqlx::Error> for RepositoryError { @@ -54,6 +56,7 @@ impl std::fmt::Display for RepositoryError { expected, actual ) } + RepositoryError::Validation(msg) => write!(f, "Validation error: {}", msg), } } } @@ -6038,11 +6041,16 @@ pub async fn mark_directive_document_shipped( /// Archive a directive document. Sets status = 'archived' and stamps /// archived_at = NOW(). Idempotent — archiving an already-archived doc /// re-stamps archived_at and bumps version. +/// +/// If the archived contract was `active`, the next-up `queued` contract +/// in the same directive auto-promotes to `active` (sequential queue). pub async fn archive_directive_document( pool: &PgPool, document_id: Uuid, ) -> Result<Option<DirectiveDocument>, sqlx::Error> { - sqlx::query_as::<_, DirectiveDocument>( + let mut tx = pool.begin().await?; + + let archived = sqlx::query_as::<_, DirectiveDocument>( r#" UPDATE directive_documents SET status = 'archived', @@ -6054,8 +6062,270 @@ pub async fn archive_directive_document( "#, ) .bind(document_id) - .fetch_optional(pool) - .await + .fetch_optional(&mut *tx) + .await?; + + if let Some(ref doc) = archived { + promote_next_queued_contract(&mut tx, doc.directive_id).await?; + } + + tx.commit().await?; + Ok(archived) +} + +// ============================================================================ +// Lifecycle transitions: start / pause / complete / unlock +// +// The lifecycle is `draft → queued → active → shipped → archived`. At most +// one contract per directive sits in `active` at a time — the queue is +// serialised because a directive owns a single shared worktree. Helpers +// below enforce that invariant in SQL transactions. +// ============================================================================ + +/// Lock a draft contract and either activate it (if no sibling is active) +/// or queue it. Returns the updated row, or `Ok(None)` if the contract +/// doesn't exist. Errors with `RepositoryError::Validation` if the +/// contract is in any state other than `draft`. +pub async fn start_contract( + pool: &PgPool, + contract_id: Uuid, +) -> Result<Option<DirectiveDocument>, RepositoryError> { + let mut tx = pool.begin().await?; + + let current = sqlx::query_as::<_, DirectiveDocument>( + r#"SELECT * FROM directive_documents WHERE id = $1"#, + ) + .bind(contract_id) + .fetch_optional(&mut *tx) + .await?; + + let current = match current { + Some(c) => c, + None => return Ok(None), + }; + + if current.status != "draft" { + return Err(RepositoryError::Validation(format!( + "contract is in status '{}'; only 'draft' contracts can be started", + current.status + ))); + } + + // If any sibling is already active, this one queues. Otherwise it + // claims the active slot directly. + let active_count: (i64,) = sqlx::query_as( + r#"SELECT COUNT(*)::BIGINT FROM directive_documents + WHERE directive_id = $1 AND status = 'active'"#, + ) + .bind(current.directive_id) + .fetch_one(&mut *tx) + .await?; + + let new_status = if active_count.0 > 0 { "queued" } else { "active" }; + + let updated = sqlx::query_as::<_, DirectiveDocument>( + r#" + UPDATE directive_documents + SET status = $2, + version = version + 1, + updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(contract_id) + .bind(new_status) + .fetch_optional(&mut *tx) + .await?; + + tx.commit().await?; + Ok(updated) +} + +/// Pause an active contract — moves it back to `queued` so the next +/// queued sibling can pick up the active slot. The orchestrator-daemon +/// stop is the caller's responsibility. +pub async fn pause_contract( + pool: &PgPool, + contract_id: Uuid, +) -> Result<Option<DirectiveDocument>, RepositoryError> { + let mut tx = pool.begin().await?; + + let current = sqlx::query_as::<_, DirectiveDocument>( + r#"SELECT * FROM directive_documents WHERE id = $1"#, + ) + .bind(contract_id) + .fetch_optional(&mut *tx) + .await?; + + let current = match current { + Some(c) => c, + None => return Ok(None), + }; + + if current.status != "active" { + return Err(RepositoryError::Validation(format!( + "contract is in status '{}'; only 'active' contracts can be paused", + current.status + ))); + } + + let updated = sqlx::query_as::<_, DirectiveDocument>( + r#" + UPDATE directive_documents + SET status = 'queued', + version = version + 1, + updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(contract_id) + .fetch_optional(&mut *tx) + .await?; + + // The slot is free — promote the next queued contract (lowest + // position, excluding the one we just paused). + promote_next_queued_contract(&mut tx, current.directive_id).await?; + + tx.commit().await?; + Ok(updated) +} + +/// Mark an active contract as `shipped` — the work is done. Optional +/// pr_url / pr_branch are recorded if supplied. Promotes the next +/// queued sibling to `active`. +pub async fn complete_contract( + pool: &PgPool, + contract_id: Uuid, + pr_url: Option<&str>, + pr_branch: Option<&str>, +) -> Result<Option<DirectiveDocument>, RepositoryError> { + let mut tx = pool.begin().await?; + + let current = sqlx::query_as::<_, DirectiveDocument>( + r#"SELECT * FROM directive_documents WHERE id = $1"#, + ) + .bind(contract_id) + .fetch_optional(&mut *tx) + .await?; + + let current = match current { + Some(c) => c, + None => return Ok(None), + }; + + if current.status != "active" && current.status != "queued" { + return Err(RepositoryError::Validation(format!( + "contract is in status '{}'; only 'active' or 'queued' contracts can be completed", + current.status + ))); + } + + let updated = sqlx::query_as::<_, DirectiveDocument>( + r#" + UPDATE directive_documents + SET status = 'shipped', + pr_url = COALESCE($2, pr_url), + pr_branch = COALESCE($3, pr_branch), + shipped_at = NOW(), + version = version + 1, + updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(contract_id) + .bind(pr_url) + .bind(pr_branch) + .fetch_optional(&mut *tx) + .await?; + + promote_next_queued_contract(&mut tx, current.directive_id).await?; + + tx.commit().await?; + Ok(updated) +} + +/// Unlock a queued or active contract back to `draft` so the spec is +/// editable again. If the contract was active, the slot frees and the +/// next queued sibling auto-promotes. +pub async fn unlock_contract( + pool: &PgPool, + contract_id: Uuid, +) -> Result<Option<DirectiveDocument>, RepositoryError> { + let mut tx = pool.begin().await?; + + let current = sqlx::query_as::<_, DirectiveDocument>( + r#"SELECT * FROM directive_documents WHERE id = $1"#, + ) + .bind(contract_id) + .fetch_optional(&mut *tx) + .await?; + + let current = match current { + Some(c) => c, + None => return Ok(None), + }; + + if current.status != "queued" && current.status != "active" { + return Err(RepositoryError::Validation(format!( + "contract is in status '{}'; only 'queued' or 'active' contracts can be unlocked", + current.status + ))); + } + + let was_active = current.status == "active"; + + let updated = sqlx::query_as::<_, DirectiveDocument>( + r#" + UPDATE directive_documents + SET status = 'draft', + version = version + 1, + updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(contract_id) + .fetch_optional(&mut *tx) + .await?; + + if was_active { + promote_next_queued_contract(&mut tx, current.directive_id).await?; + } + + tx.commit().await?; + Ok(updated) +} + +/// Find the lowest-position `queued` contract under a directive and +/// flip it to `active`. No-op when no queued contract exists. +/// +/// Caller must hold the parent transaction so the count → promote +/// sequence stays atomic w.r.t. other lifecycle transitions. +async fn promote_next_queued_contract( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + directive_id: Uuid, +) -> Result<(), sqlx::Error> { + sqlx::query( + r#" + UPDATE directive_documents + SET status = 'active', + version = version + 1, + updated_at = NOW() + WHERE id = ( + SELECT id FROM directive_documents + WHERE directive_id = $1 AND status = 'queued' + ORDER BY position ASC, created_at ASC + LIMIT 1 + ) + "#, + ) + .bind(directive_id) + .execute(&mut **tx) + .await?; + Ok(()) } /// Count the number of currently-active documents under a directive. |
