diff options
| author | soryu <soryu@soryu.co> | 2026-05-08 12:12:51 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-05-08 12:12:51 +0100 |
| commit | 6690b714c64aaef5781bc0aac41b777ab72e9070 (patch) | |
| tree | 1ffe451c3dec2fbb91f1e71f55abed37083ec62a /makima/src | |
| parent | e00be74c8b575c725829677aadeb755ee81454d0 (diff) | |
| download | soryu-6690b714c64aaef5781bc0aac41b777ab72e9070.tar.gz soryu-6690b714c64aaef5781bc0aac41b777ab72e9070.zip | |
feat(contracts): lifecycle — Lock/Start/Pause/Complete/Unlock + queue scheduler (#129)
Adds the contract lifecycle layer on top of the unified-contracts
backbone (#128). State machine:
draft → queued → active → shipped → archived
At most one contract per directive sits in `active` at any time —
the queue is serialised because each directive owns a single shared
worktree. Repository helpers handle the transition checks AND
auto-promote the next-up `queued` contract whenever the active slot
frees (pause / complete / unlock-from-active / archive-from-active).
Endpoints (all under /api/v1/contracts/{id}):
POST /start draft → queued | active (depending on slot)
POST /pause active → queued; promotes next queued
POST /complete active → shipped; optional pr_url + pr_branch
POST /unlock queued | active → draft; promotes if was active
Frontend wiring:
* `DirectiveContractStatus` now includes `queued`.
* Migration adds `queued` to the CHECK constraint on
directive_documents.status.
* `ContractHeader` component renders breadcrumb + status pill +
status-driven action buttons + a merge-mode (shared / own_pr)
radio. Merge mode is editable only while draft / queued so a
running flow's branch target can't change mid-stream.
* RepositoryError gains a `Validation(String)` arm; the three
existing exhaustive matches (files, mesh, versions) get a
400 BAD_REQUEST response for it.
Drag-to-reorder UI deferred to a small follow-up — the backend
endpoint already exists from the backbone PR.
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Diffstat (limited to 'makima/src')
| -rw-r--r-- | makima/src/db/repository.rs | 276 | ||||
| -rw-r--r-- | makima/src/server/handlers/directive_documents.rs | 222 | ||||
| -rw-r--r-- | makima/src/server/handlers/files.rs | 5 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh.rs | 5 | ||||
| -rw-r--r-- | makima/src/server/handlers/versions.rs | 5 | ||||
| -rw-r--r-- | makima/src/server/mod.rs | 16 | ||||
| -rw-r--r-- | makima/src/server/openapi.rs | 5 |
7 files changed, 530 insertions, 4 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 12d5e4d..e58f58c 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -35,6 +35,8 @@ pub enum RepositoryError { /// The actual current version in the database actual: i32, }, + /// Caller-facing precondition failure (wrong status, etc.). + Validation(String), } impl From<sqlx::Error> for RepositoryError { @@ -54,6 +56,7 @@ impl std::fmt::Display for RepositoryError { expected, actual ) } + RepositoryError::Validation(msg) => write!(f, "Validation error: {}", msg), } } } @@ -6038,11 +6041,16 @@ pub async fn mark_directive_document_shipped( /// Archive a directive document. Sets status = 'archived' and stamps /// archived_at = NOW(). Idempotent — archiving an already-archived doc /// re-stamps archived_at and bumps version. +/// +/// If the archived contract was `active`, the next-up `queued` contract +/// in the same directive auto-promotes to `active` (sequential queue). pub async fn archive_directive_document( pool: &PgPool, document_id: Uuid, ) -> Result<Option<DirectiveDocument>, sqlx::Error> { - sqlx::query_as::<_, DirectiveDocument>( + let mut tx = pool.begin().await?; + + let archived = sqlx::query_as::<_, DirectiveDocument>( r#" UPDATE directive_documents SET status = 'archived', @@ -6054,8 +6062,270 @@ pub async fn archive_directive_document( "#, ) .bind(document_id) - .fetch_optional(pool) - .await + .fetch_optional(&mut *tx) + .await?; + + if let Some(ref doc) = archived { + promote_next_queued_contract(&mut tx, doc.directive_id).await?; + } + + tx.commit().await?; + Ok(archived) +} + +// ============================================================================ +// Lifecycle transitions: start / pause / complete / unlock +// +// The lifecycle is `draft → queued → active → shipped → archived`. At most +// one contract per directive sits in `active` at a time — the queue is +// serialised because a directive owns a single shared worktree. Helpers +// below enforce that invariant in SQL transactions. +// ============================================================================ + +/// Lock a draft contract and either activate it (if no sibling is active) +/// or queue it. Returns the updated row, or `Ok(None)` if the contract +/// doesn't exist. Errors with `RepositoryError::Validation` if the +/// contract is in any state other than `draft`. +pub async fn start_contract( + pool: &PgPool, + contract_id: Uuid, +) -> Result<Option<DirectiveDocument>, RepositoryError> { + let mut tx = pool.begin().await?; + + let current = sqlx::query_as::<_, DirectiveDocument>( + r#"SELECT * FROM directive_documents WHERE id = $1"#, + ) + .bind(contract_id) + .fetch_optional(&mut *tx) + .await?; + + let current = match current { + Some(c) => c, + None => return Ok(None), + }; + + if current.status != "draft" { + return Err(RepositoryError::Validation(format!( + "contract is in status '{}'; only 'draft' contracts can be started", + current.status + ))); + } + + // If any sibling is already active, this one queues. Otherwise it + // claims the active slot directly. + let active_count: (i64,) = sqlx::query_as( + r#"SELECT COUNT(*)::BIGINT FROM directive_documents + WHERE directive_id = $1 AND status = 'active'"#, + ) + .bind(current.directive_id) + .fetch_one(&mut *tx) + .await?; + + let new_status = if active_count.0 > 0 { "queued" } else { "active" }; + + let updated = sqlx::query_as::<_, DirectiveDocument>( + r#" + UPDATE directive_documents + SET status = $2, + version = version + 1, + updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(contract_id) + .bind(new_status) + .fetch_optional(&mut *tx) + .await?; + + tx.commit().await?; + Ok(updated) +} + +/// Pause an active contract — moves it back to `queued` so the next +/// queued sibling can pick up the active slot. The orchestrator-daemon +/// stop is the caller's responsibility. +pub async fn pause_contract( + pool: &PgPool, + contract_id: Uuid, +) -> Result<Option<DirectiveDocument>, RepositoryError> { + let mut tx = pool.begin().await?; + + let current = sqlx::query_as::<_, DirectiveDocument>( + r#"SELECT * FROM directive_documents WHERE id = $1"#, + ) + .bind(contract_id) + .fetch_optional(&mut *tx) + .await?; + + let current = match current { + Some(c) => c, + None => return Ok(None), + }; + + if current.status != "active" { + return Err(RepositoryError::Validation(format!( + "contract is in status '{}'; only 'active' contracts can be paused", + current.status + ))); + } + + let updated = sqlx::query_as::<_, DirectiveDocument>( + r#" + UPDATE directive_documents + SET status = 'queued', + version = version + 1, + updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(contract_id) + .fetch_optional(&mut *tx) + .await?; + + // The slot is free — promote the next queued contract (lowest + // position, excluding the one we just paused). + promote_next_queued_contract(&mut tx, current.directive_id).await?; + + tx.commit().await?; + Ok(updated) +} + +/// Mark an active contract as `shipped` — the work is done. Optional +/// pr_url / pr_branch are recorded if supplied. Promotes the next +/// queued sibling to `active`. +pub async fn complete_contract( + pool: &PgPool, + contract_id: Uuid, + pr_url: Option<&str>, + pr_branch: Option<&str>, +) -> Result<Option<DirectiveDocument>, RepositoryError> { + let mut tx = pool.begin().await?; + + let current = sqlx::query_as::<_, DirectiveDocument>( + r#"SELECT * FROM directive_documents WHERE id = $1"#, + ) + .bind(contract_id) + .fetch_optional(&mut *tx) + .await?; + + let current = match current { + Some(c) => c, + None => return Ok(None), + }; + + if current.status != "active" && current.status != "queued" { + return Err(RepositoryError::Validation(format!( + "contract is in status '{}'; only 'active' or 'queued' contracts can be completed", + current.status + ))); + } + + let updated = sqlx::query_as::<_, DirectiveDocument>( + r#" + UPDATE directive_documents + SET status = 'shipped', + pr_url = COALESCE($2, pr_url), + pr_branch = COALESCE($3, pr_branch), + shipped_at = NOW(), + version = version + 1, + updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(contract_id) + .bind(pr_url) + .bind(pr_branch) + .fetch_optional(&mut *tx) + .await?; + + promote_next_queued_contract(&mut tx, current.directive_id).await?; + + tx.commit().await?; + Ok(updated) +} + +/// Unlock a queued or active contract back to `draft` so the spec is +/// editable again. If the contract was active, the slot frees and the +/// next queued sibling auto-promotes. +pub async fn unlock_contract( + pool: &PgPool, + contract_id: Uuid, +) -> Result<Option<DirectiveDocument>, RepositoryError> { + let mut tx = pool.begin().await?; + + let current = sqlx::query_as::<_, DirectiveDocument>( + r#"SELECT * FROM directive_documents WHERE id = $1"#, + ) + .bind(contract_id) + .fetch_optional(&mut *tx) + .await?; + + let current = match current { + Some(c) => c, + None => return Ok(None), + }; + + if current.status != "queued" && current.status != "active" { + return Err(RepositoryError::Validation(format!( + "contract is in status '{}'; only 'queued' or 'active' contracts can be unlocked", + current.status + ))); + } + + let was_active = current.status == "active"; + + let updated = sqlx::query_as::<_, DirectiveDocument>( + r#" + UPDATE directive_documents + SET status = 'draft', + version = version + 1, + updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(contract_id) + .fetch_optional(&mut *tx) + .await?; + + if was_active { + promote_next_queued_contract(&mut tx, current.directive_id).await?; + } + + tx.commit().await?; + Ok(updated) +} + +/// Find the lowest-position `queued` contract under a directive and +/// flip it to `active`. No-op when no queued contract exists. +/// +/// Caller must hold the parent transaction so the count → promote +/// sequence stays atomic w.r.t. other lifecycle transitions. +async fn promote_next_queued_contract( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + directive_id: Uuid, +) -> Result<(), sqlx::Error> { + sqlx::query( + r#" + UPDATE directive_documents + SET status = 'active', + version = version + 1, + updated_at = NOW() + WHERE id = ( + SELECT id FROM directive_documents + WHERE directive_id = $1 AND status = 'queued' + ORDER BY position ASC, created_at ASC + LIMIT 1 + ) + "#, + ) + .bind(directive_id) + .execute(&mut **tx) + .await?; + Ok(()) } /// Count the number of currently-active documents under a directive. diff --git a/makima/src/server/handlers/directive_documents.rs b/makima/src/server/handlers/directive_documents.rs index ed38ee4..23081b5 100644 --- a/makima/src/server/handlers/directive_documents.rs +++ b/makima/src/server/handlers/directive_documents.rs @@ -21,7 +21,7 @@ use utoipa::ToSchema; use uuid::Uuid; use crate::db::models::{DirectiveStep, Task}; -use crate::db::repository; +use crate::db::repository::{self, RepositoryError}; use crate::server::auth::Authenticated; use crate::server::messages::ApiError; use crate::server::state::SharedState; @@ -60,6 +60,16 @@ pub struct ReorderDirectiveDocumentRequest { pub position: i32, } +/// Body for `POST /api/v1/contracts/{document_id}/complete`. Both fields +/// are optional — supplying them records PR provenance; leaving them off +/// just marks the contract done. +#[derive(Debug, Default, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CompleteContractRequest { + pub pr_url: Option<String>, + pub pr_branch: Option<String>, +} + /// Body for `POST /api/v1/contracts/{document_id}/ship`. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] @@ -656,3 +666,213 @@ pub async fn reorder_contract( } } } + +// ============================================================================= +// Contract lifecycle: start / pause / complete / unlock +// +// State machine: `draft → queued → active → shipped → archived`. The +// repository functions enforce transition validity and handle queue +// auto-promotion when the active slot frees. Handlers here own auth +// and mapping to HTTP status codes. +// ============================================================================= + +/// Common path: load + ownership-check, then dispatch a state-transition +/// closure. Cuts the boilerplate from start/pause/complete/unlock down +/// to a couple of lines each. +async fn run_contract_transition<F, Fut>( + pool: sqlx::PgPool, + owner_id: Uuid, + contract_id: Uuid, + f: F, +) -> impl IntoResponse +where + F: FnOnce(sqlx::PgPool, Uuid) -> Fut, + Fut: std::future::Future< + Output = Result<Option<crate::db::models::DirectiveDocument>, RepositoryError>, + >, +{ + match load_owned_document(&pool, owner_id, contract_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("GET_FAILED", &e.to_string())), + ) + .into_response(); + } + } + + match f(pool, contract_id).await { + Ok(Some(doc)) => Json(doc).into_response(), + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(), + Err(RepositoryError::Validation(msg)) => ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("VALIDATION", &msg)), + ) + .into_response(), + Err(RepositoryError::VersionConflict { expected, actual }) => ( + StatusCode::CONFLICT, + Json(ApiError::new( + "VERSION_CONFLICT", + &format!("expected version {}, actual {}", expected, actual), + )), + ) + .into_response(), + Err(RepositoryError::Database(e)) => { + tracing::error!("Contract transition failed: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("TRANSITION_FAILED", &e.to_string())), + ) + .into_response() + } + } +} + +/// Lock & start a draft contract. If a sibling is already `active`, +/// this contract goes to `queued`; otherwise it activates immediately. +#[utoipa::path( + post, + path = "/api/v1/contracts/{document_id}/start", + params(("document_id" = Uuid, Path, description = "Contract ID")), + responses( + (status = 200, description = "Contract started", body = crate::db::models::DirectiveDocument), + (status = 400, description = "Invalid state transition", body = ApiError), + (status = 404, description = "Not found", body = ApiError), + ), + security(("bearer_auth" = []), ("api_key" = [])), + tag = "Directive Documents" +)] +pub async fn start_contract( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(document_id): Path<Uuid>, +) -> impl IntoResponse { + let Some(pool) = state.db_pool.clone() else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + run_contract_transition(pool, auth.owner_id, document_id, |pool, id| async move { + repository::start_contract(&pool, id).await + }) + .await + .into_response() +} + +/// Pause an active contract — moves it back to `queued` and lets the +/// next queued sibling take the active slot. The orchestrator daemon +/// stop is the caller's responsibility. +#[utoipa::path( + post, + path = "/api/v1/contracts/{document_id}/pause", + params(("document_id" = Uuid, Path, description = "Contract ID")), + responses( + (status = 200, description = "Contract paused", body = crate::db::models::DirectiveDocument), + (status = 400, description = "Invalid state transition", body = ApiError), + (status = 404, description = "Not found", body = ApiError), + ), + security(("bearer_auth" = []), ("api_key" = [])), + tag = "Directive Documents" +)] +pub async fn pause_contract( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(document_id): Path<Uuid>, +) -> impl IntoResponse { + let Some(pool) = state.db_pool.clone() else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + run_contract_transition(pool, auth.owner_id, document_id, |pool, id| async move { + repository::pause_contract(&pool, id).await + }) + .await + .into_response() +} + +/// Mark an active contract as `shipped` (work done). PR url + branch +/// are optional — pass them to record provenance, leave them off to +/// just close out the contract. Auto-promotes the next queued sibling. +#[utoipa::path( + post, + path = "/api/v1/contracts/{document_id}/complete", + params(("document_id" = Uuid, Path, description = "Contract ID")), + request_body = CompleteContractRequest, + responses( + (status = 200, description = "Contract completed", body = crate::db::models::DirectiveDocument), + (status = 400, description = "Invalid state transition", body = ApiError), + (status = 404, description = "Not found", body = ApiError), + ), + security(("bearer_auth" = []), ("api_key" = [])), + tag = "Directive Documents" +)] +pub async fn complete_contract( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(document_id): Path<Uuid>, + Json(req): Json<CompleteContractRequest>, +) -> impl IntoResponse { + let Some(pool) = state.db_pool.clone() else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + run_contract_transition(pool, auth.owner_id, document_id, move |pool, id| async move { + repository::complete_contract(&pool, id, req.pr_url.as_deref(), req.pr_branch.as_deref()).await + }) + .await + .into_response() +} + +/// Unlock a queued or active contract back to `draft` so its spec is +/// editable again. If the contract was active, the slot frees and the +/// next queued sibling auto-promotes. +#[utoipa::path( + post, + path = "/api/v1/contracts/{document_id}/unlock", + params(("document_id" = Uuid, Path, description = "Contract ID")), + responses( + (status = 200, description = "Contract unlocked", body = crate::db::models::DirectiveDocument), + (status = 400, description = "Invalid state transition", body = ApiError), + (status = 404, description = "Not found", body = ApiError), + ), + security(("bearer_auth" = []), ("api_key" = [])), + tag = "Directive Documents" +)] +pub async fn unlock_contract( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(document_id): Path<Uuid>, +) -> impl IntoResponse { + let Some(pool) = state.db_pool.clone() else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + run_contract_transition(pool, auth.owner_id, document_id, |pool, id| async move { + repository::unlock_contract(&pool, id).await + }) + .await + .into_response() +} diff --git a/makima/src/server/handlers/files.rs b/makima/src/server/handlers/files.rs index 05e871c..711be41 100644 --- a/makima/src/server/handlers/files.rs +++ b/makima/src/server/handlers/files.rs @@ -277,6 +277,11 @@ pub async fn update_file( ) .into_response() } + Err(RepositoryError::Validation(msg)) => ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("VALIDATION", &msg)), + ) + .into_response(), } } diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index 63b1827..be5387e 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -467,6 +467,11 @@ pub async fn update_task( ) .into_response() } + Err(RepositoryError::Validation(msg)) => ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("VALIDATION", &msg)), + ) + .into_response(), } } diff --git a/makima/src/server/handlers/versions.rs b/makima/src/server/handlers/versions.rs index 15118d6..bb1b00c 100644 --- a/makima/src/server/handlers/versions.rs +++ b/makima/src/server/handlers/versions.rs @@ -203,5 +203,10 @@ pub async fn restore_version( ) .into_response() } + Err(RepositoryError::Validation(msg)) => ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("VALIDATION", &msg)), + ) + .into_response(), } } diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index 68d3dea..a3a1886 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -242,6 +242,22 @@ pub fn make_router(state: SharedState) -> Router { post(directive_documents::reorder_contract), ) .route( + "/contracts/{document_id}/start", + post(directive_documents::start_contract), + ) + .route( + "/contracts/{document_id}/pause", + post(directive_documents::pause_contract), + ) + .route( + "/contracts/{document_id}/complete", + post(directive_documents::complete_contract), + ) + .route( + "/contracts/{document_id}/unlock", + post(directive_documents::unlock_contract), + ) + .route( "/contracts/{document_id}/tasks", get(directive_documents::list_document_tasks), ) diff --git a/makima/src/server/openapi.rs b/makima/src/server/openapi.rs index 7ddaf1b..184d12a 100644 --- a/makima/src/server/openapi.rs +++ b/makima/src/server/openapi.rs @@ -124,6 +124,10 @@ use crate::server::messages::{ApiError, AudioEncoding, StartMessage, StopMessage directive_documents::ship_document, directive_documents::archive_document, directive_documents::reorder_contract, + directive_documents::start_contract, + directive_documents::pause_contract, + directive_documents::complete_contract, + directive_documents::unlock_contract, directive_documents::list_document_tasks, // Order endpoints orders::list_orders, @@ -233,6 +237,7 @@ use crate::server::messages::{ApiError, AudioEncoding, StartMessage, StopMessage directive_documents::UpdateDirectiveDocumentRequest, directive_documents::ShipDirectiveDocumentRequest, directive_documents::ReorderDirectiveDocumentRequest, + directive_documents::CompleteContractRequest, directive_documents::DocumentTasksResponse, // Order schemas Order, |
