diff options
Diffstat (limited to 'makima/src/db')
| -rw-r--r-- | makima/src/db/models.rs | 333 | ||||
| -rw-r--r-- | makima/src/db/repository.rs | 888 |
2 files changed, 7 insertions, 1214 deletions
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index 6045c7d..d0a0bd6 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -1446,16 +1446,6 @@ pub struct Contract { /// Use `get_phase_config()` to get the parsed PhaseConfig. #[serde(skip_serializing_if = "Option::is_none")] pub phase_config: Option<serde_json::Value>, - /// Directive ID if this contract is part of a directive's chain - #[serde(skip_serializing_if = "Option::is_none")] - pub directive_id: Option<Uuid>, - /// Whether this contract is a directive orchestrator - #[serde(default)] - #[sqlx(default)] - pub is_directive_orchestrator: bool, - /// Reference to directive spawned by this orchestrator contract - #[serde(skip_serializing_if = "Option::is_none")] - pub spawned_directive_id: Option<Uuid>, pub version: i32, pub created_at: DateTime<Utc>, pub updated_at: DateTime<Utc>, @@ -2692,326 +2682,3 @@ mod tests { } // ============================================================================= -// Directive Types -// ============================================================================= - -/// Default autonomy level for directives -fn default_autonomy_level() -> String { - "guardrails".to_string() -} - -/// Default empty JSON array -fn default_json_array() -> serde_json::Value { - serde_json::json!([]) -} - -/// Default empty JSON object -fn default_json_object() -> serde_json::Value { - serde_json::json!({}) -} - -/// Default confidence threshold (green) -fn default_confidence_green() -> f64 { - 0.85 -} - -/// Default confidence threshold (yellow) -fn default_confidence_yellow() -> f64 { - 0.60 -} - -/// Default max rework cycles -fn default_max_rework_cycles() -> Option<i32> { - Some(3) -} - -/// Default max chain regenerations -fn default_max_chain_regenerations() -> Option<i32> { - Some(2) -} - -/// Full directive row from the database. -#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct Directive { - pub id: Uuid, - pub owner_id: Uuid, - pub title: String, - pub goal: String, - #[sqlx(json)] - pub requirements: serde_json::Value, - #[sqlx(json)] - pub acceptance_criteria: serde_json::Value, - #[sqlx(json)] - pub constraints: serde_json::Value, - #[sqlx(json)] - pub external_dependencies: serde_json::Value, - pub status: String, - pub autonomy_level: String, - pub confidence_threshold_green: f64, - pub confidence_threshold_yellow: f64, - pub max_total_cost_usd: Option<f64>, - pub max_wall_time_minutes: Option<i32>, - pub max_rework_cycles: Option<i32>, - pub max_chain_regenerations: Option<i32>, - pub repository_url: Option<String>, - pub local_path: Option<String>, - pub base_branch: Option<String>, - pub orchestrator_contract_id: Option<Uuid>, - pub current_chain_id: Option<Uuid>, - pub chain_generation_count: i32, - pub total_cost_usd: f64, - pub started_at: Option<DateTime<Utc>>, - pub completed_at: Option<DateTime<Utc>>, - pub version: i32, - pub created_at: DateTime<Utc>, - pub updated_at: DateTime<Utc>, -} - -/// Summary of a directive for list views. -#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct DirectiveSummary { - pub id: Uuid, - pub title: String, - pub goal: String, - pub status: String, - pub autonomy_level: String, - pub chain_count: i64, - pub step_count: i64, - pub total_cost_usd: f64, - pub version: i32, - pub created_at: DateTime<Utc>, - pub updated_at: DateTime<Utc>, -} - -/// Response for directive list endpoint. -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct DirectiveListResponse { - pub directives: Vec<DirectiveSummary>, - pub total: i64, -} - -/// Request to create a new directive. -#[derive(Debug, Clone, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct CreateDirectiveRequest { - pub title: String, - pub goal: String, - #[serde(default = "default_json_array")] - pub requirements: serde_json::Value, - #[serde(default = "default_json_array")] - pub acceptance_criteria: serde_json::Value, - #[serde(default = "default_json_array")] - pub constraints: serde_json::Value, - #[serde(default = "default_json_array")] - pub external_dependencies: serde_json::Value, - #[serde(default = "default_autonomy_level")] - pub autonomy_level: String, - #[serde(default = "default_confidence_green")] - pub confidence_threshold_green: f64, - #[serde(default = "default_confidence_yellow")] - pub confidence_threshold_yellow: f64, - pub max_total_cost_usd: Option<f64>, - pub max_wall_time_minutes: Option<i32>, - #[serde(default = "default_max_rework_cycles")] - pub max_rework_cycles: Option<i32>, - #[serde(default = "default_max_chain_regenerations")] - pub max_chain_regenerations: Option<i32>, - pub repository_url: Option<String>, - pub local_path: Option<String>, - pub base_branch: Option<String>, -} - -/// Request to submit a chain plan for a directive. -#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct SubmitPlanRequest { - pub plan: String, -} - -/// Request to update an existing directive. -#[derive(Debug, Clone, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct UpdateDirectiveRequest { - pub title: Option<String>, - pub goal: Option<String>, - pub requirements: Option<serde_json::Value>, - pub acceptance_criteria: Option<serde_json::Value>, - pub constraints: Option<serde_json::Value>, - pub external_dependencies: Option<serde_json::Value>, - pub status: Option<String>, - pub autonomy_level: Option<String>, - pub confidence_threshold_green: Option<f64>, - pub confidence_threshold_yellow: Option<f64>, - pub max_total_cost_usd: Option<f64>, - pub max_wall_time_minutes: Option<i32>, - pub max_rework_cycles: Option<i32>, - pub max_chain_regenerations: Option<i32>, - pub repository_url: Option<String>, - pub local_path: Option<String>, - pub base_branch: Option<String>, - /// Version for optimistic locking - pub version: Option<i32>, -} - -/// Lightweight contract summary attached to a chain step. -#[derive(Debug, FromRow, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct StepContractSummary { - pub id: Uuid, - pub name: String, - pub contract_type: String, - pub phase: String, - pub status: String, - pub task_count: i64, - pub tasks_done: i64, - pub tasks_running: i64, - pub tasks_failed: i64, -} - -/// Chain step enriched with optional contract summary. -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ChainStepWithContract { - #[serde(flatten)] - pub step: ChainStep, - pub contract_summary: Option<StepContractSummary>, -} - -/// Directive with its chains and steps for detail view. -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct DirectiveWithChains { - #[serde(flatten)] - pub directive: Directive, - pub orchestrator_contract_summary: Option<StepContractSummary>, - pub chains: Vec<ChainWithSteps>, -} - -/// Full row from directive_chains table. -#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct DirectiveChain { - pub id: Uuid, - pub directive_id: Uuid, - pub generation: i32, - pub name: String, - pub description: Option<String>, - pub rationale: Option<String>, - pub planning_model: Option<String>, - pub status: String, - pub total_steps: i32, - pub completed_steps: i32, - pub failed_steps: i32, - pub current_confidence: Option<f64>, - pub started_at: Option<DateTime<Utc>>, - pub completed_at: Option<DateTime<Utc>>, - pub version: i32, - pub created_at: DateTime<Utc>, - pub updated_at: DateTime<Utc>, -} - -/// Full row from chain_steps table. -#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ChainStep { - pub id: Uuid, - pub chain_id: Uuid, - pub name: String, - pub description: Option<String>, - pub step_type: String, - pub contract_type: String, - pub initial_phase: Option<String>, - pub task_plan: Option<String>, - pub phases: Option<Vec<String>>, - pub depends_on: Option<Vec<Uuid>>, - pub parallel_group: Option<String>, - pub requirement_ids: Option<Vec<String>>, - pub acceptance_criteria_ids: Option<Vec<String>>, - #[sqlx(json)] - pub verifier_config: serde_json::Value, - pub status: String, - pub contract_id: Option<Uuid>, - pub supervisor_task_id: Option<Uuid>, - pub monitoring_contract_id: Option<Uuid>, - pub monitoring_task_id: Option<Uuid>, - pub confidence_score: Option<f64>, - pub confidence_level: Option<String>, - pub evaluation_count: i32, - pub rework_count: i32, - pub last_evaluation_id: Option<Uuid>, - pub editor_x: Option<f64>, - pub editor_y: Option<f64>, - pub order_index: i32, - pub started_at: Option<DateTime<Utc>>, - pub completed_at: Option<DateTime<Utc>>, - pub created_at: DateTime<Utc>, -} - -/// Chain with its steps for detail view. -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ChainWithSteps { - #[serde(flatten)] - pub chain: DirectiveChain, - pub steps: Vec<ChainStepWithContract>, -} - -/// Full row from directive_evaluations table. -#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct DirectiveEvaluation { - pub id: Uuid, - pub directive_id: Uuid, - pub chain_id: Option<Uuid>, - pub step_id: Option<Uuid>, - pub contract_id: Option<Uuid>, - pub evaluation_type: String, - pub evaluation_number: i32, - pub evaluator: Option<String>, - pub passed: bool, - pub overall_score: Option<f64>, - pub confidence_level: Option<String>, - #[sqlx(json)] - pub programmatic_results: serde_json::Value, - #[sqlx(json)] - pub llm_results: serde_json::Value, - #[sqlx(json)] - pub criteria_results: serde_json::Value, - pub summary_feedback: String, - pub rework_instructions: Option<String>, - #[sqlx(json)] - pub directive_snapshot: Option<serde_json::Value>, - #[sqlx(json)] - pub deliverables_snapshot: Option<serde_json::Value>, - pub started_at: DateTime<Utc>, - pub completed_at: Option<DateTime<Utc>>, - pub created_at: DateTime<Utc>, -} - -/// Full row from directive_events table. -#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct DirectiveEvent { - pub id: Uuid, - pub directive_id: Uuid, - pub chain_id: Option<Uuid>, - pub step_id: Option<Uuid>, - pub event_type: String, - pub severity: String, - #[sqlx(json)] - pub event_data: Option<serde_json::Value>, - pub actor_type: String, - pub actor_id: Option<Uuid>, - pub created_at: DateTime<Utc>, -} - -/// Response for evaluation list endpoint. -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct EvaluationListResponse { - pub evaluations: Vec<DirectiveEvaluation>, - pub total: i64, -} diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 4298fa5..4ed2298 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -6,18 +6,17 @@ use sqlx::PgPool; use uuid::Uuid; use super::models::{ - ChainStep, CheckpointPatch, CheckpointPatchInfo, Contract, ContractChatConversation, + CheckpointPatch, CheckpointPatchInfo, Contract, ContractChatConversation, ContractChatMessageRecord, ContractEvent, ContractRepository, ContractSummary, ContractTypeTemplateRecord, ConversationMessage, ConversationSnapshot, - CreateContractRequest, CreateDirectiveRequest, CreateFileRequest, CreateTaskRequest, + CreateContractRequest, CreateFileRequest, CreateTaskRequest, CreateTemplateRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, - DeliverableDefinition, Directive, DirectiveChain, DirectiveEvaluation, DirectiveEvent, - DirectiveSummary, + DeliverableDefinition, File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters, MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult, PhaseConfig, - PhaseDefinition, StepContractSummary, SupervisorHeartbeatRecord, SupervisorState, + PhaseDefinition, SupervisorHeartbeatRecord, SupervisorState, Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest, - UpdateDirectiveRequest, UpdateFileRequest, UpdateTaskRequest, UpdateTemplateRequest, + UpdateFileRequest, UpdateTaskRequest, UpdateTemplateRequest, }; /// Repository error types. @@ -816,10 +815,7 @@ pub async fn get_pending_tasks_for_contract( WHERE t.contract_id = $1 AND t.owner_id = $2 AND t.status = 'pending' AND t.retry_count < t.max_retries - AND (t.is_supervisor = false - OR EXISTS (SELECT 1 FROM contracts c - WHERE c.id = t.contract_id - AND (c.directive_id IS NOT NULL OR c.is_directive_orchestrator = true))) + AND t.is_supervisor = false ORDER BY t.interrupted_at DESC NULLS LAST, t.priority DESC, @@ -844,10 +840,7 @@ pub async fn get_all_pending_task_contracts( WHERE t.contract_id IS NOT NULL AND t.status = 'pending' AND t.retry_count < t.max_retries - AND (t.is_supervisor = false - OR EXISTS (SELECT 1 FROM contracts c - WHERE c.id = t.contract_id - AND (c.directive_id IS NOT NULL OR c.is_directive_orchestrator = true))) + AND t.is_supervisor = false ORDER BY t.owner_id, t.contract_id "#, ) @@ -4919,870 +4912,3 @@ fn truncate_string(s: &str, max_len: usize) -> String { } } -// ============================================================================= -// Directive CRUD -// ============================================================================= - -/// Create a new directive, scoped to owner. -pub async fn create_directive_for_owner( - pool: &PgPool, - owner_id: Uuid, - req: CreateDirectiveRequest, -) -> Result<Directive, sqlx::Error> { - sqlx::query_as::<_, Directive>( - r#" - INSERT INTO directives ( - owner_id, title, goal, - requirements, acceptance_criteria, constraints, external_dependencies, - autonomy_level, confidence_threshold_green, confidence_threshold_yellow, - max_total_cost_usd, max_wall_time_minutes, max_rework_cycles, max_chain_regenerations, - repository_url, local_path, base_branch - ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) - RETURNING * - "#, - ) - .bind(owner_id) - .bind(&req.title) - .bind(&req.goal) - .bind(&req.requirements) - .bind(&req.acceptance_criteria) - .bind(&req.constraints) - .bind(&req.external_dependencies) - .bind(&req.autonomy_level) - .bind(req.confidence_threshold_green) - .bind(req.confidence_threshold_yellow) - .bind(req.max_total_cost_usd) - .bind(req.max_wall_time_minutes) - .bind(req.max_rework_cycles) - .bind(req.max_chain_regenerations) - .bind(&req.repository_url) - .bind(&req.local_path) - .bind(&req.base_branch) - .fetch_one(pool) - .await -} - -/// Get a directive by ID, scoped to owner. -pub async fn get_directive_for_owner( - pool: &PgPool, - id: Uuid, - owner_id: Uuid, -) -> Result<Option<Directive>, sqlx::Error> { - sqlx::query_as::<_, Directive>( - r#" - SELECT * - FROM directives - WHERE id = $1 AND owner_id = $2 - "#, - ) - .bind(id) - .bind(owner_id) - .fetch_optional(pool) - .await -} - -/// List all directives for an owner, ordered by created_at DESC. -pub async fn list_directives_for_owner( - pool: &PgPool, - owner_id: Uuid, -) -> Result<Vec<DirectiveSummary>, sqlx::Error> { - sqlx::query_as::<_, DirectiveSummary>( - r#" - SELECT - d.id, d.title, d.goal, d.status, d.autonomy_level, - (SELECT COUNT(*) FROM directive_chains WHERE directive_id = d.id) as chain_count, - (SELECT COUNT(*) FROM chain_steps cs JOIN directive_chains dc ON cs.chain_id = dc.id WHERE dc.directive_id = d.id) as step_count, - d.total_cost_usd, d.version, d.created_at, d.updated_at - FROM directives d - WHERE d.owner_id = $1 - ORDER BY d.created_at DESC - "#, - ) - .bind(owner_id) - .fetch_all(pool) - .await -} - -/// Update a directive by ID with optimistic locking, scoped to owner. -pub async fn update_directive_for_owner( - pool: &PgPool, - id: Uuid, - owner_id: Uuid, - req: UpdateDirectiveRequest, -) -> Result<Option<Directive>, RepositoryError> { - let existing = get_directive_for_owner(pool, id, owner_id).await?; - let Some(existing) = existing else { - return Ok(None); - }; - - // Check version if provided (optimistic locking) - if let Some(expected_version) = req.version { - if existing.version != expected_version { - return Err(RepositoryError::VersionConflict { - expected: expected_version, - actual: existing.version, - }); - } - } - - // Apply updates - let title = req.title.unwrap_or(existing.title); - let goal = req.goal.unwrap_or(existing.goal); - let requirements = req.requirements.unwrap_or(existing.requirements); - let acceptance_criteria = req.acceptance_criteria.unwrap_or(existing.acceptance_criteria); - let constraints = req.constraints.unwrap_or(existing.constraints); - let external_dependencies = req.external_dependencies.unwrap_or(existing.external_dependencies); - let status = req.status.unwrap_or(existing.status); - let autonomy_level = req.autonomy_level.unwrap_or(existing.autonomy_level); - let confidence_threshold_green = req.confidence_threshold_green.unwrap_or(existing.confidence_threshold_green); - let confidence_threshold_yellow = req.confidence_threshold_yellow.unwrap_or(existing.confidence_threshold_yellow); - let max_total_cost_usd = req.max_total_cost_usd.or(existing.max_total_cost_usd); - let max_wall_time_minutes = req.max_wall_time_minutes.or(existing.max_wall_time_minutes); - let max_rework_cycles = req.max_rework_cycles.or(existing.max_rework_cycles); - let max_chain_regenerations = req.max_chain_regenerations.or(existing.max_chain_regenerations); - let repository_url = req.repository_url.or(existing.repository_url); - let local_path = req.local_path.or(existing.local_path); - let base_branch = req.base_branch.or(existing.base_branch); - - let result = if req.version.is_some() { - sqlx::query_as::<_, Directive>( - r#" - UPDATE directives - SET title = $3, goal = $4, - requirements = $5, acceptance_criteria = $6, constraints = $7, external_dependencies = $8, - status = $9, autonomy_level = $10, - confidence_threshold_green = $11, confidence_threshold_yellow = $12, - max_total_cost_usd = $13, max_wall_time_minutes = $14, - max_rework_cycles = $15, max_chain_regenerations = $16, - repository_url = $17, local_path = $18, base_branch = $19, - version = version + 1, updated_at = NOW() - WHERE id = $1 AND owner_id = $2 AND version = $20 - RETURNING * - "#, - ) - .bind(id) - .bind(owner_id) - .bind(&title) - .bind(&goal) - .bind(&requirements) - .bind(&acceptance_criteria) - .bind(&constraints) - .bind(&external_dependencies) - .bind(&status) - .bind(&autonomy_level) - .bind(confidence_threshold_green) - .bind(confidence_threshold_yellow) - .bind(max_total_cost_usd) - .bind(max_wall_time_minutes) - .bind(max_rework_cycles) - .bind(max_chain_regenerations) - .bind(&repository_url) - .bind(&local_path) - .bind(&base_branch) - .bind(req.version.unwrap()) - .fetch_optional(pool) - .await? - } else { - sqlx::query_as::<_, Directive>( - r#" - UPDATE directives - SET title = $3, goal = $4, - requirements = $5, acceptance_criteria = $6, constraints = $7, external_dependencies = $8, - status = $9, autonomy_level = $10, - confidence_threshold_green = $11, confidence_threshold_yellow = $12, - max_total_cost_usd = $13, max_wall_time_minutes = $14, - max_rework_cycles = $15, max_chain_regenerations = $16, - repository_url = $17, local_path = $18, base_branch = $19, - version = version + 1, updated_at = NOW() - WHERE id = $1 AND owner_id = $2 - RETURNING * - "#, - ) - .bind(id) - .bind(owner_id) - .bind(&title) - .bind(&goal) - .bind(&requirements) - .bind(&acceptance_criteria) - .bind(&constraints) - .bind(&external_dependencies) - .bind(&status) - .bind(&autonomy_level) - .bind(confidence_threshold_green) - .bind(confidence_threshold_yellow) - .bind(max_total_cost_usd) - .bind(max_wall_time_minutes) - .bind(max_rework_cycles) - .bind(max_chain_regenerations) - .bind(&repository_url) - .bind(&local_path) - .bind(&base_branch) - .fetch_optional(pool) - .await? - }; - - // If versioned update returned None, there was a race condition - if result.is_none() && req.version.is_some() { - if let Some(current) = get_directive_for_owner(pool, id, owner_id).await? { - return Err(RepositoryError::VersionConflict { - expected: req.version.unwrap(), - actual: current.version, - }); - } - } - - Ok(result) -} - -/// Delete a directive by ID, scoped to owner. -/// Also deletes all contracts (and their cascaded tasks/files) associated with this directive. -pub async fn delete_directive_for_owner( - pool: &PgPool, - id: Uuid, - owner_id: Uuid, -) -> Result<bool, sqlx::Error> { - // First verify the directive exists and belongs to the owner - let directive = get_directive_for_owner(pool, id, owner_id).await?; - let Some(_directive) = directive else { - return Ok(false); - }; - - // Delete all contracts linked to this directive (tasks/files cascade from contracts). - // This covers step contracts (directive_id FK) and the orchestrator contract. - sqlx::query( - r#" - DELETE FROM contracts - WHERE directive_id = $1 - OR id = (SELECT orchestrator_contract_id FROM directives WHERE id = $1) - "#, - ) - .bind(id) - .execute(pool) - .await?; - - // Now delete the directive itself (chains, steps, events, evaluations cascade via FK) - let result = sqlx::query( - r#" - DELETE FROM directives - WHERE id = $1 AND owner_id = $2 - "#, - ) - .bind(id) - .bind(owner_id) - .execute(pool) - .await?; - - Ok(result.rows_affected() > 0) -} - -/// List chains for a directive (read-only). -pub async fn list_chains_for_directive( - pool: &PgPool, - directive_id: Uuid, -) -> Result<Vec<DirectiveChain>, sqlx::Error> { - sqlx::query_as::<_, DirectiveChain>( - r#" - SELECT * - FROM directive_chains - WHERE directive_id = $1 - ORDER BY generation DESC, created_at DESC - "#, - ) - .bind(directive_id) - .fetch_all(pool) - .await -} - -/// List steps for a chain (read-only). -pub async fn list_steps_for_chain( - pool: &PgPool, - chain_id: Uuid, -) -> Result<Vec<ChainStep>, sqlx::Error> { - sqlx::query_as::<_, ChainStep>( - r#" - SELECT * - FROM chain_steps - WHERE chain_id = $1 - ORDER BY order_index ASC, created_at ASC - "#, - ) - .bind(chain_id) - .fetch_all(pool) - .await -} - -/// Batch-fetch lightweight contract summaries for a set of contract IDs. -pub async fn get_contract_summaries_batch( - pool: &PgPool, - contract_ids: &[Uuid], -) -> Result<Vec<StepContractSummary>, sqlx::Error> { - sqlx::query_as::<_, StepContractSummary>( - r#" - SELECT c.id, c.name, c.contract_type, c.phase, c.status, - COUNT(t.id) as task_count, - COUNT(t.id) FILTER (WHERE t.status IN ('done','merged')) as tasks_done, - COUNT(t.id) FILTER (WHERE t.status IN ('running','initializing','starting')) as tasks_running, - COUNT(t.id) FILTER (WHERE t.status = 'failed') as tasks_failed - FROM contracts c - LEFT JOIN tasks t ON t.contract_id = c.id - WHERE c.id = ANY($1) - GROUP BY c.id, c.name, c.contract_type, c.phase, c.status - "#, - ) - .bind(contract_ids) - .fetch_all(pool) - .await -} - -// ── Directive orchestration functions ─────────────────────────────────────── - -/// Update directive status with automatic timestamp management. -pub async fn update_directive_status( - pool: &PgPool, - id: Uuid, - new_status: &str, -) -> Result<Option<Directive>, sqlx::Error> { - sqlx::query_as::<_, Directive>( - r#" - UPDATE directives - SET status = $2, - started_at = CASE WHEN $2 = 'active' AND started_at IS NULL THEN NOW() ELSE started_at END, - completed_at = CASE WHEN $2 IN ('completed', 'failed') THEN NOW() ELSE completed_at END, - version = version + 1, - updated_at = NOW() - WHERE id = $1 - RETURNING * - "#, - ) - .bind(id) - .bind(new_status) - .fetch_optional(pool) - .await -} - -/// Set the orchestrator contract ID on a directive. -pub async fn set_directive_orchestrator_contract( - pool: &PgPool, - directive_id: Uuid, - contract_id: Uuid, -) -> Result<Option<Directive>, sqlx::Error> { - sqlx::query_as::<_, Directive>( - r#" - UPDATE directives - SET orchestrator_contract_id = $2, - version = version + 1, - updated_at = NOW() - WHERE id = $1 - RETURNING * - "#, - ) - .bind(directive_id) - .bind(contract_id) - .fetch_optional(pool) - .await -} - -/// Set the current chain ID on a directive and increment chain_generation_count. -pub async fn set_directive_current_chain( - pool: &PgPool, - directive_id: Uuid, - chain_id: Uuid, -) -> Result<Option<Directive>, sqlx::Error> { - sqlx::query_as::<_, Directive>( - r#" - UPDATE directives - SET current_chain_id = $2, - chain_generation_count = chain_generation_count + 1, - version = version + 1, - updated_at = NOW() - WHERE id = $1 - RETURNING * - "#, - ) - .bind(directive_id) - .bind(chain_id) - .fetch_optional(pool) - .await -} - -/// Increment the chain_generation_count on a directive (without setting current_chain_id). -pub async fn increment_chain_generation_count( - pool: &PgPool, - directive_id: Uuid, -) -> Result<Option<Directive>, sqlx::Error> { - sqlx::query_as::<_, Directive>( - r#" - UPDATE directives - SET chain_generation_count = chain_generation_count + 1, - version = version + 1, - updated_at = NOW() - WHERE id = $1 - RETURNING * - "#, - ) - .bind(directive_id) - .fetch_optional(pool) - .await -} - -/// Create a new directive chain. -pub async fn create_directive_chain( - pool: &PgPool, - directive_id: Uuid, - name: &str, - description: Option<&str>, - rationale: Option<&str>, - total_steps: i32, -) -> Result<DirectiveChain, sqlx::Error> { - // Get next generation number - let next_gen: (i32,) = sqlx::query_as( - "SELECT COALESCE(MAX(generation), 0) + 1 FROM directive_chains WHERE directive_id = $1", - ) - .bind(directive_id) - .fetch_one(pool) - .await?; - - sqlx::query_as::<_, DirectiveChain>( - r#" - INSERT INTO directive_chains (directive_id, generation, name, description, rationale, total_steps, status) - VALUES ($1, $2, $3, $4, $5, $6, 'running') - RETURNING * - "#, - ) - .bind(directive_id) - .bind(next_gen.0) - .bind(name) - .bind(description) - .bind(rationale) - .bind(total_steps) - .fetch_one(pool) - .await -} - -/// Create a chain step. -pub async fn create_chain_step( - pool: &PgPool, - chain_id: Uuid, - name: &str, - description: Option<&str>, - step_type: &str, - contract_type: &str, - initial_phase: Option<&str>, - task_plan: Option<&str>, - depends_on: Option<Vec<Uuid>>, - order_index: i32, -) -> Result<ChainStep, sqlx::Error> { - sqlx::query_as::<_, ChainStep>( - r#" - INSERT INTO chain_steps (chain_id, name, description, step_type, contract_type, initial_phase, task_plan, depends_on, order_index, status) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, 'pending') - RETURNING * - "#, - ) - .bind(chain_id) - .bind(name) - .bind(description) - .bind(step_type) - .bind(contract_type) - .bind(initial_phase) - .bind(task_plan) - .bind(depends_on.as_deref()) - .bind(order_index) - .fetch_one(pool) - .await -} - -/// Get a single chain step by ID. -pub async fn get_chain_step( - pool: &PgPool, - step_id: Uuid, -) -> Result<Option<ChainStep>, sqlx::Error> { - sqlx::query_as::<_, ChainStep>( - "SELECT * FROM chain_steps WHERE id = $1", - ) - .bind(step_id) - .fetch_optional(pool) - .await -} - -/// Increment completed_steps counter on a directive chain. -pub async fn increment_chain_completed_steps( - pool: &PgPool, - chain_id: Uuid, -) -> Result<(), sqlx::Error> { - sqlx::query( - "UPDATE directive_chains SET completed_steps = completed_steps + 1, updated_at = NOW() WHERE id = $1", - ) - .bind(chain_id) - .execute(pool) - .await?; - Ok(()) -} - -/// Increment failed_steps counter on a directive chain. -pub async fn increment_chain_failed_steps( - pool: &PgPool, - chain_id: Uuid, -) -> Result<(), sqlx::Error> { - sqlx::query( - "UPDATE directive_chains SET failed_steps = failed_steps + 1, updated_at = NOW() WHERE id = $1", - ) - .bind(chain_id) - .execute(pool) - .await?; - Ok(()) -} - -/// Update a chain step's status with automatic timestamp management. -pub async fn update_step_status( - pool: &PgPool, - step_id: Uuid, - new_status: &str, -) -> Result<Option<ChainStep>, sqlx::Error> { - sqlx::query_as::<_, ChainStep>( - r#" - UPDATE chain_steps - SET status = $2, - started_at = CASE WHEN $2 = 'running' AND started_at IS NULL THEN NOW() ELSE started_at END, - completed_at = CASE WHEN $2 IN ('passed', 'failed') THEN NOW() ELSE completed_at END - WHERE id = $1 - RETURNING * - "#, - ) - .bind(step_id) - .bind(new_status) - .fetch_optional(pool) - .await -} - -/// Link a chain step to a contract and supervisor task. -pub async fn update_step_contract( - pool: &PgPool, - step_id: Uuid, - contract_id: Uuid, - supervisor_task_id: Uuid, -) -> Result<Option<ChainStep>, sqlx::Error> { - sqlx::query_as::<_, ChainStep>( - r#" - UPDATE chain_steps - SET contract_id = $2, - supervisor_task_id = $3 - WHERE id = $1 - RETURNING * - "#, - ) - .bind(step_id) - .bind(contract_id) - .bind(supervisor_task_id) - .fetch_optional(pool) - .await -} - -/// Find steps that are ready to execute (pending, with all dependencies passed). -pub async fn find_ready_steps( - pool: &PgPool, - chain_id: Uuid, -) -> Result<Vec<ChainStep>, sqlx::Error> { - sqlx::query_as::<_, ChainStep>( - r#" - SELECT * FROM chain_steps - WHERE chain_id = $1 - AND status = 'pending' - AND ( - depends_on IS NULL - OR array_length(depends_on, 1) IS NULL - OR NOT EXISTS ( - SELECT 1 FROM unnest(depends_on) AS dep_id - WHERE dep_id NOT IN ( - SELECT id FROM chain_steps WHERE chain_id = $1 AND status = 'passed' - ) - ) - ) - ORDER BY order_index ASC - "#, - ) - .bind(chain_id) - .fetch_all(pool) - .await -} - -/// Get a chain step by its linked contract ID. -pub async fn get_step_by_contract_id( - pool: &PgPool, - contract_id: Uuid, -) -> Result<Option<ChainStep>, sqlx::Error> { - sqlx::query_as::<_, ChainStep>( - r#"SELECT * FROM chain_steps WHERE contract_id = $1"#, - ) - .bind(contract_id) - .fetch_optional(pool) - .await -} - -/// Get a directive by its orchestrator contract ID. -pub async fn get_directive_by_orchestrator_contract( - pool: &PgPool, - contract_id: Uuid, -) -> Result<Option<Directive>, sqlx::Error> { - sqlx::query_as::<_, Directive>( - r#"SELECT * FROM directives WHERE orchestrator_contract_id = $1"#, - ) - .bind(contract_id) - .fetch_optional(pool) - .await -} - -/// Set directive-related fields on a contract (directive_id, is_directive_orchestrator). -pub async fn set_contract_directive_fields( - pool: &PgPool, - contract_id: Uuid, - directive_id: Option<Uuid>, - is_orchestrator: bool, -) -> Result<(), sqlx::Error> { - sqlx::query( - r#" - UPDATE contracts - SET directive_id = $2, - is_directive_orchestrator = $3 - WHERE id = $1 - "#, - ) - .bind(contract_id) - .bind(directive_id) - .bind(is_orchestrator) - .execute(pool) - .await?; - Ok(()) -} - -/// Get a directive by ID (no owner scoping, for internal use). -pub async fn get_directive( - pool: &PgPool, - id: Uuid, -) -> Result<Option<Directive>, sqlx::Error> { - sqlx::query_as::<_, Directive>( - r#"SELECT * FROM directives WHERE id = $1"#, - ) - .bind(id) - .fetch_optional(pool) - .await -} - -/// Update chain status. -pub async fn update_chain_status( - pool: &PgPool, - chain_id: Uuid, - new_status: &str, -) -> Result<Option<DirectiveChain>, sqlx::Error> { - sqlx::query_as::<_, DirectiveChain>( - r#" - UPDATE directive_chains - SET status = $2, - completed_at = CASE WHEN $2 IN ('completed', 'failed') THEN NOW() ELSE completed_at END, - version = version + 1, - updated_at = NOW() - WHERE id = $1 - RETURNING * - "#, - ) - .bind(chain_id) - .bind(new_status) - .fetch_optional(pool) - .await -} - -// ── Directive monitoring / evaluation functions ───────────────────────────── - -/// Create a directive evaluation record. evaluation_number is auto-incremented per step. -pub async fn create_directive_evaluation( - pool: &PgPool, - directive_id: Uuid, - chain_id: Uuid, - step_id: Uuid, - contract_id: Uuid, - evaluation_type: &str, - evaluator: Option<&str>, - passed: bool, - overall_score: Option<f64>, - confidence_level: Option<&str>, - criteria_results: &serde_json::Value, - summary_feedback: &str, - rework_instructions: Option<&str>, -) -> Result<DirectiveEvaluation, sqlx::Error> { - sqlx::query_as::<_, DirectiveEvaluation>( - r#" - INSERT INTO directive_evaluations ( - directive_id, chain_id, step_id, contract_id, - evaluation_type, evaluation_number, evaluator, - passed, overall_score, confidence_level, - criteria_results, summary_feedback, rework_instructions, - completed_at - ) - VALUES ( - $1, $2, $3, $4, - $5, COALESCE((SELECT MAX(evaluation_number) FROM directive_evaluations WHERE step_id = $3), 0) + 1, $6, - $7, $8, $9, - $10, $11, $12, - NOW() - ) - RETURNING * - "#, - ) - .bind(directive_id) - .bind(chain_id) - .bind(step_id) - .bind(contract_id) - .bind(evaluation_type) - .bind(evaluator) - .bind(passed) - .bind(overall_score) - .bind(confidence_level) - .bind(criteria_results) - .bind(summary_feedback) - .bind(rework_instructions) - .fetch_one(pool) - .await -} - -/// List evaluations for a step, ordered by evaluation_number. -pub async fn list_evaluations_for_step( - pool: &PgPool, - step_id: Uuid, -) -> Result<Vec<DirectiveEvaluation>, sqlx::Error> { - sqlx::query_as::<_, DirectiveEvaluation>( - r#" - SELECT * FROM directive_evaluations - WHERE step_id = $1 - ORDER BY evaluation_number ASC - "#, - ) - .bind(step_id) - .fetch_all(pool) - .await -} - -/// Get a single directive evaluation by ID. -pub async fn get_directive_evaluation( - pool: &PgPool, - evaluation_id: Uuid, -) -> Result<Option<DirectiveEvaluation>, sqlx::Error> { - sqlx::query_as::<_, DirectiveEvaluation>( - "SELECT * FROM directive_evaluations WHERE id = $1", - ) - .bind(evaluation_id) - .fetch_optional(pool) - .await -} - -/// Create a directive event. -pub async fn create_directive_event( - pool: &PgPool, - directive_id: Uuid, - chain_id: Option<Uuid>, - step_id: Option<Uuid>, - event_type: &str, - severity: &str, - event_data: Option<&serde_json::Value>, - actor_type: &str, - actor_id: Option<Uuid>, -) -> Result<DirectiveEvent, sqlx::Error> { - sqlx::query_as::<_, DirectiveEvent>( - r#" - INSERT INTO directive_events (directive_id, chain_id, step_id, event_type, severity, event_data, actor_type, actor_id) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - RETURNING * - "#, - ) - .bind(directive_id) - .bind(chain_id) - .bind(step_id) - .bind(event_type) - .bind(severity) - .bind(event_data) - .bind(actor_type) - .bind(actor_id) - .fetch_one(pool) - .await -} - -/// Update step evaluation fields after an evaluation completes. -pub async fn update_step_evaluation_fields( - pool: &PgPool, - step_id: Uuid, - confidence_score: Option<f64>, - confidence_level: Option<&str>, - last_evaluation_id: Uuid, -) -> Result<Option<ChainStep>, sqlx::Error> { - sqlx::query_as::<_, ChainStep>( - r#" - UPDATE chain_steps - SET confidence_score = $2, - confidence_level = $3, - evaluation_count = evaluation_count + 1, - last_evaluation_id = $4 - WHERE id = $1 - RETURNING * - "#, - ) - .bind(step_id) - .bind(confidence_score) - .bind(confidence_level) - .bind(last_evaluation_id) - .fetch_optional(pool) - .await -} - -/// Update step monitoring contract/task references. -pub async fn update_step_monitoring_contract( - pool: &PgPool, - step_id: Uuid, - monitoring_contract_id: Uuid, - monitoring_task_id: Uuid, -) -> Result<Option<ChainStep>, sqlx::Error> { - sqlx::query_as::<_, ChainStep>( - r#" - UPDATE chain_steps - SET monitoring_contract_id = $2, - monitoring_task_id = $3 - WHERE id = $1 - RETURNING * - "#, - ) - .bind(step_id) - .bind(monitoring_contract_id) - .bind(monitoring_task_id) - .fetch_optional(pool) - .await -} - -/// Increment step rework_count. -pub async fn increment_step_rework_count( - pool: &PgPool, - step_id: Uuid, -) -> Result<Option<ChainStep>, sqlx::Error> { - sqlx::query_as::<_, ChainStep>( - r#" - UPDATE chain_steps - SET rework_count = rework_count + 1 - WHERE id = $1 - RETURNING * - "#, - ) - .bind(step_id) - .fetch_optional(pool) - .await -} - -/// Get a chain step by its monitoring contract ID. -pub async fn get_step_by_monitoring_contract_id( - pool: &PgPool, - contract_id: Uuid, -) -> Result<Option<ChainStep>, sqlx::Error> { - sqlx::query_as::<_, ChainStep>( - r#"SELECT * FROM chain_steps WHERE monitoring_contract_id = $1"#, - ) - .bind(contract_id) - .fetch_optional(pool) - .await -} |
