summaryrefslogtreecommitdiff
path: root/makima/src/db/repository.rs
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/db/repository.rs')
-rw-r--r--makima/src/db/repository.rs888
1 files changed, 7 insertions, 881 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index 4298fa5..4ed2298 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -6,18 +6,17 @@ use sqlx::PgPool;
use uuid::Uuid;
use super::models::{
- ChainStep, CheckpointPatch, CheckpointPatchInfo, Contract, ContractChatConversation,
+ CheckpointPatch, CheckpointPatchInfo, Contract, ContractChatConversation,
ContractChatMessageRecord, ContractEvent, ContractRepository, ContractSummary,
ContractTypeTemplateRecord, ConversationMessage, ConversationSnapshot,
- CreateContractRequest, CreateDirectiveRequest, CreateFileRequest, CreateTaskRequest,
+ CreateContractRequest, CreateFileRequest, CreateTaskRequest,
CreateTemplateRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity,
- DeliverableDefinition, Directive, DirectiveChain, DirectiveEvaluation, DirectiveEvent,
- DirectiveSummary,
+ DeliverableDefinition,
File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters,
MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult, PhaseConfig,
- PhaseDefinition, StepContractSummary, SupervisorHeartbeatRecord, SupervisorState,
+ PhaseDefinition, SupervisorHeartbeatRecord, SupervisorState,
Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest,
- UpdateDirectiveRequest, UpdateFileRequest, UpdateTaskRequest, UpdateTemplateRequest,
+ UpdateFileRequest, UpdateTaskRequest, UpdateTemplateRequest,
};
/// Repository error types.
@@ -816,10 +815,7 @@ pub async fn get_pending_tasks_for_contract(
WHERE t.contract_id = $1 AND t.owner_id = $2
AND t.status = 'pending'
AND t.retry_count < t.max_retries
- AND (t.is_supervisor = false
- OR EXISTS (SELECT 1 FROM contracts c
- WHERE c.id = t.contract_id
- AND (c.directive_id IS NOT NULL OR c.is_directive_orchestrator = true)))
+ AND t.is_supervisor = false
ORDER BY
t.interrupted_at DESC NULLS LAST,
t.priority DESC,
@@ -844,10 +840,7 @@ pub async fn get_all_pending_task_contracts(
WHERE t.contract_id IS NOT NULL
AND t.status = 'pending'
AND t.retry_count < t.max_retries
- AND (t.is_supervisor = false
- OR EXISTS (SELECT 1 FROM contracts c
- WHERE c.id = t.contract_id
- AND (c.directive_id IS NOT NULL OR c.is_directive_orchestrator = true)))
+ AND t.is_supervisor = false
ORDER BY t.owner_id, t.contract_id
"#,
)
@@ -4919,870 +4912,3 @@ fn truncate_string(s: &str, max_len: usize) -> String {
}
}
-// =============================================================================
-// Directive CRUD
-// =============================================================================
-
-/// Create a new directive, scoped to owner.
-pub async fn create_directive_for_owner(
- pool: &PgPool,
- owner_id: Uuid,
- req: CreateDirectiveRequest,
-) -> Result<Directive, sqlx::Error> {
- sqlx::query_as::<_, Directive>(
- r#"
- INSERT INTO directives (
- owner_id, title, goal,
- requirements, acceptance_criteria, constraints, external_dependencies,
- autonomy_level, confidence_threshold_green, confidence_threshold_yellow,
- max_total_cost_usd, max_wall_time_minutes, max_rework_cycles, max_chain_regenerations,
- repository_url, local_path, base_branch
- )
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)
- RETURNING *
- "#,
- )
- .bind(owner_id)
- .bind(&req.title)
- .bind(&req.goal)
- .bind(&req.requirements)
- .bind(&req.acceptance_criteria)
- .bind(&req.constraints)
- .bind(&req.external_dependencies)
- .bind(&req.autonomy_level)
- .bind(req.confidence_threshold_green)
- .bind(req.confidence_threshold_yellow)
- .bind(req.max_total_cost_usd)
- .bind(req.max_wall_time_minutes)
- .bind(req.max_rework_cycles)
- .bind(req.max_chain_regenerations)
- .bind(&req.repository_url)
- .bind(&req.local_path)
- .bind(&req.base_branch)
- .fetch_one(pool)
- .await
-}
-
-/// Get a directive by ID, scoped to owner.
-pub async fn get_directive_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
-) -> Result<Option<Directive>, sqlx::Error> {
- sqlx::query_as::<_, Directive>(
- r#"
- SELECT *
- FROM directives
- WHERE id = $1 AND owner_id = $2
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .fetch_optional(pool)
- .await
-}
-
-/// List all directives for an owner, ordered by created_at DESC.
-pub async fn list_directives_for_owner(
- pool: &PgPool,
- owner_id: Uuid,
-) -> Result<Vec<DirectiveSummary>, sqlx::Error> {
- sqlx::query_as::<_, DirectiveSummary>(
- r#"
- SELECT
- d.id, d.title, d.goal, d.status, d.autonomy_level,
- (SELECT COUNT(*) FROM directive_chains WHERE directive_id = d.id) as chain_count,
- (SELECT COUNT(*) FROM chain_steps cs JOIN directive_chains dc ON cs.chain_id = dc.id WHERE dc.directive_id = d.id) as step_count,
- d.total_cost_usd, d.version, d.created_at, d.updated_at
- FROM directives d
- WHERE d.owner_id = $1
- ORDER BY d.created_at DESC
- "#,
- )
- .bind(owner_id)
- .fetch_all(pool)
- .await
-}
-
-/// Update a directive by ID with optimistic locking, scoped to owner.
-pub async fn update_directive_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
- req: UpdateDirectiveRequest,
-) -> Result<Option<Directive>, RepositoryError> {
- let existing = get_directive_for_owner(pool, id, owner_id).await?;
- let Some(existing) = existing else {
- return Ok(None);
- };
-
- // Check version if provided (optimistic locking)
- if let Some(expected_version) = req.version {
- if existing.version != expected_version {
- return Err(RepositoryError::VersionConflict {
- expected: expected_version,
- actual: existing.version,
- });
- }
- }
-
- // Apply updates
- let title = req.title.unwrap_or(existing.title);
- let goal = req.goal.unwrap_or(existing.goal);
- let requirements = req.requirements.unwrap_or(existing.requirements);
- let acceptance_criteria = req.acceptance_criteria.unwrap_or(existing.acceptance_criteria);
- let constraints = req.constraints.unwrap_or(existing.constraints);
- let external_dependencies = req.external_dependencies.unwrap_or(existing.external_dependencies);
- let status = req.status.unwrap_or(existing.status);
- let autonomy_level = req.autonomy_level.unwrap_or(existing.autonomy_level);
- let confidence_threshold_green = req.confidence_threshold_green.unwrap_or(existing.confidence_threshold_green);
- let confidence_threshold_yellow = req.confidence_threshold_yellow.unwrap_or(existing.confidence_threshold_yellow);
- let max_total_cost_usd = req.max_total_cost_usd.or(existing.max_total_cost_usd);
- let max_wall_time_minutes = req.max_wall_time_minutes.or(existing.max_wall_time_minutes);
- let max_rework_cycles = req.max_rework_cycles.or(existing.max_rework_cycles);
- let max_chain_regenerations = req.max_chain_regenerations.or(existing.max_chain_regenerations);
- let repository_url = req.repository_url.or(existing.repository_url);
- let local_path = req.local_path.or(existing.local_path);
- let base_branch = req.base_branch.or(existing.base_branch);
-
- let result = if req.version.is_some() {
- sqlx::query_as::<_, Directive>(
- r#"
- UPDATE directives
- SET title = $3, goal = $4,
- requirements = $5, acceptance_criteria = $6, constraints = $7, external_dependencies = $8,
- status = $9, autonomy_level = $10,
- confidence_threshold_green = $11, confidence_threshold_yellow = $12,
- max_total_cost_usd = $13, max_wall_time_minutes = $14,
- max_rework_cycles = $15, max_chain_regenerations = $16,
- repository_url = $17, local_path = $18, base_branch = $19,
- version = version + 1, updated_at = NOW()
- WHERE id = $1 AND owner_id = $2 AND version = $20
- RETURNING *
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .bind(&title)
- .bind(&goal)
- .bind(&requirements)
- .bind(&acceptance_criteria)
- .bind(&constraints)
- .bind(&external_dependencies)
- .bind(&status)
- .bind(&autonomy_level)
- .bind(confidence_threshold_green)
- .bind(confidence_threshold_yellow)
- .bind(max_total_cost_usd)
- .bind(max_wall_time_minutes)
- .bind(max_rework_cycles)
- .bind(max_chain_regenerations)
- .bind(&repository_url)
- .bind(&local_path)
- .bind(&base_branch)
- .bind(req.version.unwrap())
- .fetch_optional(pool)
- .await?
- } else {
- sqlx::query_as::<_, Directive>(
- r#"
- UPDATE directives
- SET title = $3, goal = $4,
- requirements = $5, acceptance_criteria = $6, constraints = $7, external_dependencies = $8,
- status = $9, autonomy_level = $10,
- confidence_threshold_green = $11, confidence_threshold_yellow = $12,
- max_total_cost_usd = $13, max_wall_time_minutes = $14,
- max_rework_cycles = $15, max_chain_regenerations = $16,
- repository_url = $17, local_path = $18, base_branch = $19,
- version = version + 1, updated_at = NOW()
- WHERE id = $1 AND owner_id = $2
- RETURNING *
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .bind(&title)
- .bind(&goal)
- .bind(&requirements)
- .bind(&acceptance_criteria)
- .bind(&constraints)
- .bind(&external_dependencies)
- .bind(&status)
- .bind(&autonomy_level)
- .bind(confidence_threshold_green)
- .bind(confidence_threshold_yellow)
- .bind(max_total_cost_usd)
- .bind(max_wall_time_minutes)
- .bind(max_rework_cycles)
- .bind(max_chain_regenerations)
- .bind(&repository_url)
- .bind(&local_path)
- .bind(&base_branch)
- .fetch_optional(pool)
- .await?
- };
-
- // If versioned update returned None, there was a race condition
- if result.is_none() && req.version.is_some() {
- if let Some(current) = get_directive_for_owner(pool, id, owner_id).await? {
- return Err(RepositoryError::VersionConflict {
- expected: req.version.unwrap(),
- actual: current.version,
- });
- }
- }
-
- Ok(result)
-}
-
-/// Delete a directive by ID, scoped to owner.
-/// Also deletes all contracts (and their cascaded tasks/files) associated with this directive.
-pub async fn delete_directive_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
-) -> Result<bool, sqlx::Error> {
- // First verify the directive exists and belongs to the owner
- let directive = get_directive_for_owner(pool, id, owner_id).await?;
- let Some(_directive) = directive else {
- return Ok(false);
- };
-
- // Delete all contracts linked to this directive (tasks/files cascade from contracts).
- // This covers step contracts (directive_id FK) and the orchestrator contract.
- sqlx::query(
- r#"
- DELETE FROM contracts
- WHERE directive_id = $1
- OR id = (SELECT orchestrator_contract_id FROM directives WHERE id = $1)
- "#,
- )
- .bind(id)
- .execute(pool)
- .await?;
-
- // Now delete the directive itself (chains, steps, events, evaluations cascade via FK)
- let result = sqlx::query(
- r#"
- DELETE FROM directives
- WHERE id = $1 AND owner_id = $2
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected() > 0)
-}
-
-/// List chains for a directive (read-only).
-pub async fn list_chains_for_directive(
- pool: &PgPool,
- directive_id: Uuid,
-) -> Result<Vec<DirectiveChain>, sqlx::Error> {
- sqlx::query_as::<_, DirectiveChain>(
- r#"
- SELECT *
- FROM directive_chains
- WHERE directive_id = $1
- ORDER BY generation DESC, created_at DESC
- "#,
- )
- .bind(directive_id)
- .fetch_all(pool)
- .await
-}
-
-/// List steps for a chain (read-only).
-pub async fn list_steps_for_chain(
- pool: &PgPool,
- chain_id: Uuid,
-) -> Result<Vec<ChainStep>, sqlx::Error> {
- sqlx::query_as::<_, ChainStep>(
- r#"
- SELECT *
- FROM chain_steps
- WHERE chain_id = $1
- ORDER BY order_index ASC, created_at ASC
- "#,
- )
- .bind(chain_id)
- .fetch_all(pool)
- .await
-}
-
-/// Batch-fetch lightweight contract summaries for a set of contract IDs.
-pub async fn get_contract_summaries_batch(
- pool: &PgPool,
- contract_ids: &[Uuid],
-) -> Result<Vec<StepContractSummary>, sqlx::Error> {
- sqlx::query_as::<_, StepContractSummary>(
- r#"
- SELECT c.id, c.name, c.contract_type, c.phase, c.status,
- COUNT(t.id) as task_count,
- COUNT(t.id) FILTER (WHERE t.status IN ('done','merged')) as tasks_done,
- COUNT(t.id) FILTER (WHERE t.status IN ('running','initializing','starting')) as tasks_running,
- COUNT(t.id) FILTER (WHERE t.status = 'failed') as tasks_failed
- FROM contracts c
- LEFT JOIN tasks t ON t.contract_id = c.id
- WHERE c.id = ANY($1)
- GROUP BY c.id, c.name, c.contract_type, c.phase, c.status
- "#,
- )
- .bind(contract_ids)
- .fetch_all(pool)
- .await
-}
-
-// ── Directive orchestration functions ───────────────────────────────────────
-
-/// Update directive status with automatic timestamp management.
-pub async fn update_directive_status(
- pool: &PgPool,
- id: Uuid,
- new_status: &str,
-) -> Result<Option<Directive>, sqlx::Error> {
- sqlx::query_as::<_, Directive>(
- r#"
- UPDATE directives
- SET status = $2,
- started_at = CASE WHEN $2 = 'active' AND started_at IS NULL THEN NOW() ELSE started_at END,
- completed_at = CASE WHEN $2 IN ('completed', 'failed') THEN NOW() ELSE completed_at END,
- version = version + 1,
- updated_at = NOW()
- WHERE id = $1
- RETURNING *
- "#,
- )
- .bind(id)
- .bind(new_status)
- .fetch_optional(pool)
- .await
-}
-
-/// Set the orchestrator contract ID on a directive.
-pub async fn set_directive_orchestrator_contract(
- pool: &PgPool,
- directive_id: Uuid,
- contract_id: Uuid,
-) -> Result<Option<Directive>, sqlx::Error> {
- sqlx::query_as::<_, Directive>(
- r#"
- UPDATE directives
- SET orchestrator_contract_id = $2,
- version = version + 1,
- updated_at = NOW()
- WHERE id = $1
- RETURNING *
- "#,
- )
- .bind(directive_id)
- .bind(contract_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Set the current chain ID on a directive and increment chain_generation_count.
-pub async fn set_directive_current_chain(
- pool: &PgPool,
- directive_id: Uuid,
- chain_id: Uuid,
-) -> Result<Option<Directive>, sqlx::Error> {
- sqlx::query_as::<_, Directive>(
- r#"
- UPDATE directives
- SET current_chain_id = $2,
- chain_generation_count = chain_generation_count + 1,
- version = version + 1,
- updated_at = NOW()
- WHERE id = $1
- RETURNING *
- "#,
- )
- .bind(directive_id)
- .bind(chain_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Increment the chain_generation_count on a directive (without setting current_chain_id).
-pub async fn increment_chain_generation_count(
- pool: &PgPool,
- directive_id: Uuid,
-) -> Result<Option<Directive>, sqlx::Error> {
- sqlx::query_as::<_, Directive>(
- r#"
- UPDATE directives
- SET chain_generation_count = chain_generation_count + 1,
- version = version + 1,
- updated_at = NOW()
- WHERE id = $1
- RETURNING *
- "#,
- )
- .bind(directive_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Create a new directive chain.
-pub async fn create_directive_chain(
- pool: &PgPool,
- directive_id: Uuid,
- name: &str,
- description: Option<&str>,
- rationale: Option<&str>,
- total_steps: i32,
-) -> Result<DirectiveChain, sqlx::Error> {
- // Get next generation number
- let next_gen: (i32,) = sqlx::query_as(
- "SELECT COALESCE(MAX(generation), 0) + 1 FROM directive_chains WHERE directive_id = $1",
- )
- .bind(directive_id)
- .fetch_one(pool)
- .await?;
-
- sqlx::query_as::<_, DirectiveChain>(
- r#"
- INSERT INTO directive_chains (directive_id, generation, name, description, rationale, total_steps, status)
- VALUES ($1, $2, $3, $4, $5, $6, 'running')
- RETURNING *
- "#,
- )
- .bind(directive_id)
- .bind(next_gen.0)
- .bind(name)
- .bind(description)
- .bind(rationale)
- .bind(total_steps)
- .fetch_one(pool)
- .await
-}
-
-/// Create a chain step.
-pub async fn create_chain_step(
- pool: &PgPool,
- chain_id: Uuid,
- name: &str,
- description: Option<&str>,
- step_type: &str,
- contract_type: &str,
- initial_phase: Option<&str>,
- task_plan: Option<&str>,
- depends_on: Option<Vec<Uuid>>,
- order_index: i32,
-) -> Result<ChainStep, sqlx::Error> {
- sqlx::query_as::<_, ChainStep>(
- r#"
- INSERT INTO chain_steps (chain_id, name, description, step_type, contract_type, initial_phase, task_plan, depends_on, order_index, status)
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, 'pending')
- RETURNING *
- "#,
- )
- .bind(chain_id)
- .bind(name)
- .bind(description)
- .bind(step_type)
- .bind(contract_type)
- .bind(initial_phase)
- .bind(task_plan)
- .bind(depends_on.as_deref())
- .bind(order_index)
- .fetch_one(pool)
- .await
-}
-
-/// Get a single chain step by ID.
-pub async fn get_chain_step(
- pool: &PgPool,
- step_id: Uuid,
-) -> Result<Option<ChainStep>, sqlx::Error> {
- sqlx::query_as::<_, ChainStep>(
- "SELECT * FROM chain_steps WHERE id = $1",
- )
- .bind(step_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Increment completed_steps counter on a directive chain.
-pub async fn increment_chain_completed_steps(
- pool: &PgPool,
- chain_id: Uuid,
-) -> Result<(), sqlx::Error> {
- sqlx::query(
- "UPDATE directive_chains SET completed_steps = completed_steps + 1, updated_at = NOW() WHERE id = $1",
- )
- .bind(chain_id)
- .execute(pool)
- .await?;
- Ok(())
-}
-
-/// Increment failed_steps counter on a directive chain.
-pub async fn increment_chain_failed_steps(
- pool: &PgPool,
- chain_id: Uuid,
-) -> Result<(), sqlx::Error> {
- sqlx::query(
- "UPDATE directive_chains SET failed_steps = failed_steps + 1, updated_at = NOW() WHERE id = $1",
- )
- .bind(chain_id)
- .execute(pool)
- .await?;
- Ok(())
-}
-
-/// Update a chain step's status with automatic timestamp management.
-pub async fn update_step_status(
- pool: &PgPool,
- step_id: Uuid,
- new_status: &str,
-) -> Result<Option<ChainStep>, sqlx::Error> {
- sqlx::query_as::<_, ChainStep>(
- r#"
- UPDATE chain_steps
- SET status = $2,
- started_at = CASE WHEN $2 = 'running' AND started_at IS NULL THEN NOW() ELSE started_at END,
- completed_at = CASE WHEN $2 IN ('passed', 'failed') THEN NOW() ELSE completed_at END
- WHERE id = $1
- RETURNING *
- "#,
- )
- .bind(step_id)
- .bind(new_status)
- .fetch_optional(pool)
- .await
-}
-
-/// Link a chain step to a contract and supervisor task.
-pub async fn update_step_contract(
- pool: &PgPool,
- step_id: Uuid,
- contract_id: Uuid,
- supervisor_task_id: Uuid,
-) -> Result<Option<ChainStep>, sqlx::Error> {
- sqlx::query_as::<_, ChainStep>(
- r#"
- UPDATE chain_steps
- SET contract_id = $2,
- supervisor_task_id = $3
- WHERE id = $1
- RETURNING *
- "#,
- )
- .bind(step_id)
- .bind(contract_id)
- .bind(supervisor_task_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Find steps that are ready to execute (pending, with all dependencies passed).
-pub async fn find_ready_steps(
- pool: &PgPool,
- chain_id: Uuid,
-) -> Result<Vec<ChainStep>, sqlx::Error> {
- sqlx::query_as::<_, ChainStep>(
- r#"
- SELECT * FROM chain_steps
- WHERE chain_id = $1
- AND status = 'pending'
- AND (
- depends_on IS NULL
- OR array_length(depends_on, 1) IS NULL
- OR NOT EXISTS (
- SELECT 1 FROM unnest(depends_on) AS dep_id
- WHERE dep_id NOT IN (
- SELECT id FROM chain_steps WHERE chain_id = $1 AND status = 'passed'
- )
- )
- )
- ORDER BY order_index ASC
- "#,
- )
- .bind(chain_id)
- .fetch_all(pool)
- .await
-}
-
-/// Get a chain step by its linked contract ID.
-pub async fn get_step_by_contract_id(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Option<ChainStep>, sqlx::Error> {
- sqlx::query_as::<_, ChainStep>(
- r#"SELECT * FROM chain_steps WHERE contract_id = $1"#,
- )
- .bind(contract_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Get a directive by its orchestrator contract ID.
-pub async fn get_directive_by_orchestrator_contract(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Option<Directive>, sqlx::Error> {
- sqlx::query_as::<_, Directive>(
- r#"SELECT * FROM directives WHERE orchestrator_contract_id = $1"#,
- )
- .bind(contract_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Set directive-related fields on a contract (directive_id, is_directive_orchestrator).
-pub async fn set_contract_directive_fields(
- pool: &PgPool,
- contract_id: Uuid,
- directive_id: Option<Uuid>,
- is_orchestrator: bool,
-) -> Result<(), sqlx::Error> {
- sqlx::query(
- r#"
- UPDATE contracts
- SET directive_id = $2,
- is_directive_orchestrator = $3
- WHERE id = $1
- "#,
- )
- .bind(contract_id)
- .bind(directive_id)
- .bind(is_orchestrator)
- .execute(pool)
- .await?;
- Ok(())
-}
-
-/// Get a directive by ID (no owner scoping, for internal use).
-pub async fn get_directive(
- pool: &PgPool,
- id: Uuid,
-) -> Result<Option<Directive>, sqlx::Error> {
- sqlx::query_as::<_, Directive>(
- r#"SELECT * FROM directives WHERE id = $1"#,
- )
- .bind(id)
- .fetch_optional(pool)
- .await
-}
-
-/// Update chain status.
-pub async fn update_chain_status(
- pool: &PgPool,
- chain_id: Uuid,
- new_status: &str,
-) -> Result<Option<DirectiveChain>, sqlx::Error> {
- sqlx::query_as::<_, DirectiveChain>(
- r#"
- UPDATE directive_chains
- SET status = $2,
- completed_at = CASE WHEN $2 IN ('completed', 'failed') THEN NOW() ELSE completed_at END,
- version = version + 1,
- updated_at = NOW()
- WHERE id = $1
- RETURNING *
- "#,
- )
- .bind(chain_id)
- .bind(new_status)
- .fetch_optional(pool)
- .await
-}
-
-// ── Directive monitoring / evaluation functions ─────────────────────────────
-
-/// Create a directive evaluation record. evaluation_number is auto-incremented per step.
-pub async fn create_directive_evaluation(
- pool: &PgPool,
- directive_id: Uuid,
- chain_id: Uuid,
- step_id: Uuid,
- contract_id: Uuid,
- evaluation_type: &str,
- evaluator: Option<&str>,
- passed: bool,
- overall_score: Option<f64>,
- confidence_level: Option<&str>,
- criteria_results: &serde_json::Value,
- summary_feedback: &str,
- rework_instructions: Option<&str>,
-) -> Result<DirectiveEvaluation, sqlx::Error> {
- sqlx::query_as::<_, DirectiveEvaluation>(
- r#"
- INSERT INTO directive_evaluations (
- directive_id, chain_id, step_id, contract_id,
- evaluation_type, evaluation_number, evaluator,
- passed, overall_score, confidence_level,
- criteria_results, summary_feedback, rework_instructions,
- completed_at
- )
- VALUES (
- $1, $2, $3, $4,
- $5, COALESCE((SELECT MAX(evaluation_number) FROM directive_evaluations WHERE step_id = $3), 0) + 1, $6,
- $7, $8, $9,
- $10, $11, $12,
- NOW()
- )
- RETURNING *
- "#,
- )
- .bind(directive_id)
- .bind(chain_id)
- .bind(step_id)
- .bind(contract_id)
- .bind(evaluation_type)
- .bind(evaluator)
- .bind(passed)
- .bind(overall_score)
- .bind(confidence_level)
- .bind(criteria_results)
- .bind(summary_feedback)
- .bind(rework_instructions)
- .fetch_one(pool)
- .await
-}
-
-/// List evaluations for a step, ordered by evaluation_number.
-pub async fn list_evaluations_for_step(
- pool: &PgPool,
- step_id: Uuid,
-) -> Result<Vec<DirectiveEvaluation>, sqlx::Error> {
- sqlx::query_as::<_, DirectiveEvaluation>(
- r#"
- SELECT * FROM directive_evaluations
- WHERE step_id = $1
- ORDER BY evaluation_number ASC
- "#,
- )
- .bind(step_id)
- .fetch_all(pool)
- .await
-}
-
-/// Get a single directive evaluation by ID.
-pub async fn get_directive_evaluation(
- pool: &PgPool,
- evaluation_id: Uuid,
-) -> Result<Option<DirectiveEvaluation>, sqlx::Error> {
- sqlx::query_as::<_, DirectiveEvaluation>(
- "SELECT * FROM directive_evaluations WHERE id = $1",
- )
- .bind(evaluation_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Create a directive event.
-pub async fn create_directive_event(
- pool: &PgPool,
- directive_id: Uuid,
- chain_id: Option<Uuid>,
- step_id: Option<Uuid>,
- event_type: &str,
- severity: &str,
- event_data: Option<&serde_json::Value>,
- actor_type: &str,
- actor_id: Option<Uuid>,
-) -> Result<DirectiveEvent, sqlx::Error> {
- sqlx::query_as::<_, DirectiveEvent>(
- r#"
- INSERT INTO directive_events (directive_id, chain_id, step_id, event_type, severity, event_data, actor_type, actor_id)
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
- RETURNING *
- "#,
- )
- .bind(directive_id)
- .bind(chain_id)
- .bind(step_id)
- .bind(event_type)
- .bind(severity)
- .bind(event_data)
- .bind(actor_type)
- .bind(actor_id)
- .fetch_one(pool)
- .await
-}
-
-/// Update step evaluation fields after an evaluation completes.
-pub async fn update_step_evaluation_fields(
- pool: &PgPool,
- step_id: Uuid,
- confidence_score: Option<f64>,
- confidence_level: Option<&str>,
- last_evaluation_id: Uuid,
-) -> Result<Option<ChainStep>, sqlx::Error> {
- sqlx::query_as::<_, ChainStep>(
- r#"
- UPDATE chain_steps
- SET confidence_score = $2,
- confidence_level = $3,
- evaluation_count = evaluation_count + 1,
- last_evaluation_id = $4
- WHERE id = $1
- RETURNING *
- "#,
- )
- .bind(step_id)
- .bind(confidence_score)
- .bind(confidence_level)
- .bind(last_evaluation_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Update step monitoring contract/task references.
-pub async fn update_step_monitoring_contract(
- pool: &PgPool,
- step_id: Uuid,
- monitoring_contract_id: Uuid,
- monitoring_task_id: Uuid,
-) -> Result<Option<ChainStep>, sqlx::Error> {
- sqlx::query_as::<_, ChainStep>(
- r#"
- UPDATE chain_steps
- SET monitoring_contract_id = $2,
- monitoring_task_id = $3
- WHERE id = $1
- RETURNING *
- "#,
- )
- .bind(step_id)
- .bind(monitoring_contract_id)
- .bind(monitoring_task_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Increment step rework_count.
-pub async fn increment_step_rework_count(
- pool: &PgPool,
- step_id: Uuid,
-) -> Result<Option<ChainStep>, sqlx::Error> {
- sqlx::query_as::<_, ChainStep>(
- r#"
- UPDATE chain_steps
- SET rework_count = rework_count + 1
- WHERE id = $1
- RETURNING *
- "#,
- )
- .bind(step_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Get a chain step by its monitoring contract ID.
-pub async fn get_step_by_monitoring_contract_id(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Option<ChainStep>, sqlx::Error> {
- sqlx::query_as::<_, ChainStep>(
- r#"SELECT * FROM chain_steps WHERE monitoring_contract_id = $1"#,
- )
- .bind(contract_id)
- .fetch_optional(pool)
- .await
-}