summaryrefslogtreecommitdiff
path: root/makima/src/db/repository.rs
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/db/repository.rs')
-rw-r--r--makima/src/db/repository.rs2420
1 files changed, 839 insertions, 1581 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index 7be7bc8..eeda4a5 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -6,23 +6,22 @@ use sqlx::PgPool;
use uuid::Uuid;
use super::models::{
- AddChainRepositoryRequest, AddContractDefinitionRequest, AddContractToChainRequest, Chain,
- ChainContract, ChainContractDefinition, ChainContractDetail, ChainDefinitionGraphNode,
- ChainDefinitionGraphResponse, ChainDirective, ChainEditorContract, ChainEditorData,
- ChainEditorDeliverable, ChainEditorEdge, ChainEditorNode, ChainEditorTask, ChainEvent,
- ChainGraphEdge, ChainGraphNode, ChainGraphResponse, ChainRepository, ChainSummary,
- ChainWithContracts, CheckpointPatch, CheckpointPatchInfo, Contract, ContractChatConversation,
- ContractChatMessageRecord, ContractEvaluation, ContractEvaluationSummary, ContractEvent,
- ContractRepository, ContractSummary, ContractTypeTemplateRecord, ConversationMessage,
- ConversationSnapshot, CreateChainDirectiveRequest, CreateChainRequest,
- CreateContractEvaluationRequest, CreateContractRequest, CreateFileRequest, CreateTaskRequest,
- CreateTemplateRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, DeliverableDefinition,
- DirectiveTraceabilityResponse, EvaluationCriterionResult, File, FileSummary, FileVersion,
- HistoryEvent, HistoryQueryFilters, InitChainRequest, InitChainResponse, MeshChatConversation,
- MeshChatMessageRecord, PhaseChangeResult, PhaseConfig, PhaseDefinition,
- SupervisorHeartbeatRecord, SupervisorState, Task, TaskCheckpoint, TaskEvent, TaskSummary,
- TraceabilityEntry, UpdateChainRequest, UpdateContractDefinitionRequest, UpdateContractRequest,
- UpdateFileRequest, UpdateTaskRequest, UpdateTemplateRequest,
+ // Core types
+ CheckpointPatch, CheckpointPatchInfo, Contract, ContractChatConversation,
+ ContractChatMessageRecord, ContractEvent, ContractRepository, ContractSummary,
+ ContractTypeTemplateRecord, ConversationMessage, ConversationSnapshot,
+ CreateContractRequest, CreateFileRequest, CreateTaskRequest, CreateTemplateRequest,
+ Daemon, DaemonTaskAssignment, DaemonWithCapacity, DeliverableDefinition,
+ File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters,
+ MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult, PhaseConfig,
+ PhaseDefinition, SupervisorHeartbeatRecord, SupervisorState, Task, TaskCheckpoint,
+ TaskEvent, TaskSummary, UpdateContractRequest, UpdateFileRequest, UpdateTaskRequest,
+ UpdateTemplateRequest,
+ // Directive types
+ AddStepRequest, ChainStep, CreateDirectiveRequest, Directive, DirectiveApproval,
+ DirectiveChain, DirectiveChainGraphEdge, DirectiveChainGraphNode, DirectiveChainGraphResponse,
+ DirectiveEvaluation, DirectiveEvent, DirectiveSummary, DirectiveVerifier,
+ DirectiveWithProgress, UpdateDirectiveRequest, UpdateStepRequest,
};
/// Repository error types.
@@ -4906,46 +4905,140 @@ pub async fn sync_supervisor_state(
}
// =============================================================================
-// Chain Operations (DAG of contracts for multi-contract orchestration)
+// Directive Operations (top-level orchestration entity)
// =============================================================================
+// TODO: Implement directive CRUD functions
+// - create_directive_for_owner
+// - get_directive_for_owner
+// - list_directives_for_owner
+// - update_directive_for_owner
+// - archive_directive_for_owner
+// - update_directive_status
-/// Create a new chain for a specific owner.
-pub async fn create_chain_for_owner(
+// =============================================================================
+// Directive Chain Operations (generated execution plans)
+// =============================================================================
+// TODO: Implement chain CRUD functions
+// - create_directive_chain
+// - get_current_chain
+// - supersede_chain
+
+// =============================================================================
+// Chain Step Operations (nodes in the DAG)
+// =============================================================================
+// TODO: Implement step CRUD functions
+// - create_chain_step
+// - update_chain_step
+// - delete_chain_step
+// - find_ready_steps
+// - update_step_status
+// - update_step_contract
+// - update_step_confidence
+// - increment_step_rework_count
+
+// =============================================================================
+// Directive Evaluation Operations
+// =============================================================================
+// TODO: Implement evaluation functions
+// - create_directive_evaluation
+// - list_step_evaluations
+// - list_directive_evaluations
+
+// =============================================================================
+// Directive Event Operations (audit stream)
+// =============================================================================
+// TODO: Implement event functions
+// - emit_directive_event
+// - list_directive_events
+
+// =============================================================================
+// Directive Verifier Operations
+// =============================================================================
+// TODO: Implement verifier CRUD functions
+// - create_directive_verifier
+// - list_directive_verifiers
+// - update_directive_verifier
+
+// =============================================================================
+// Directive Approval Operations (human-in-the-loop)
+// =============================================================================
+// TODO: Implement approval functions
+// - create_approval_request
+// - resolve_approval
+// - list_pending_approvals
+
+// NOTE: Old chain functions removed. See git history for reference.
+// Old functions included: create_chain_for_owner, get_chain_for_owner,
+// list_chains_for_owner, update_chain_for_owner, delete_chain_for_owner,
+// add_contract_to_chain, remove_contract_from_chain, list_chain_contracts,
+// get_chain_with_contracts, list_chain_repositories, add_chain_repository,
+// delete_chain_repository, set_chain_repository_primary, get_chain_graph,
+// record_chain_event, list_chain_events, increment_chain_loop, complete_chain,
+// get_ready_chain_contracts, is_chain_complete, get_chain_editor_data,
+// create_chain_contract_definition, list_chain_contract_definitions,
+// update_chain_contract_definition, delete_chain_contract_definition,
+// get_chain_definition_graph, update_chain_status, progress_chain,
+// create_chain_directive, get_chain_directive, update_chain_directive,
+// delete_chain_directive, create_contract_evaluation, get_contract_evaluation,
+// list_chain_evaluations, update_chain_contract_evaluation_status,
+// mark_chain_contract_original_completion, get_chain_contract_by_contract_id,
+// init_chain_for_owner.
+
+// =============================================================================
+// Directive Operations
+// =============================================================================
+
+/// Create a new directive for an owner.
+pub async fn create_directive_for_owner(
pool: &PgPool,
owner_id: Uuid,
- req: CreateChainRequest,
-) -> Result<Chain, sqlx::Error> {
- let loop_enabled = req.loop_enabled.unwrap_or(false);
- let loop_max_iterations = req.loop_max_iterations.unwrap_or(10);
-
- sqlx::query_as::<_, Chain>(
- r#"
- INSERT INTO chains (owner_id, name, description, loop_enabled, loop_max_iterations, loop_progress_check)
- VALUES ($1, $2, $3, $4, $5, $6)
+ req: CreateDirectiveRequest,
+) -> Result<Directive, sqlx::Error> {
+ let title = req.title.unwrap_or_else(|| truncate_string(&req.goal, 100));
+ let autonomy_level = req.autonomy_level.unwrap_or_else(|| "guardrails".to_string());
+ let green_threshold = req.confidence_threshold_green.unwrap_or(0.85);
+ let yellow_threshold = req.confidence_threshold_yellow.unwrap_or(0.60);
+ let requirements = req.requirements.unwrap_or(serde_json::json!([]));
+ let acceptance_criteria = req.acceptance_criteria.unwrap_or(serde_json::json!([]));
+
+ sqlx::query_as::<_, Directive>(
+ r#"
+ INSERT INTO directives (
+ owner_id, title, goal, requirements, acceptance_criteria,
+ autonomy_level, confidence_threshold_green, confidence_threshold_yellow,
+ repository_url, local_path, base_branch,
+ max_total_cost_usd, max_wall_time_minutes
+ )
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
RETURNING *
"#,
)
.bind(owner_id)
- .bind(&req.name)
- .bind(&req.description)
- .bind(loop_enabled)
- .bind(loop_max_iterations)
- .bind(&req.loop_progress_check)
+ .bind(&title)
+ .bind(&req.goal)
+ .bind(&requirements)
+ .bind(&acceptance_criteria)
+ .bind(&autonomy_level)
+ .bind(green_threshold)
+ .bind(yellow_threshold)
+ .bind(&req.repository_url)
+ .bind(&req.local_path)
+ .bind(&req.base_branch)
+ .bind(req.max_total_cost_usd)
+ .bind(req.max_wall_time_minutes)
.fetch_one(pool)
.await
}
-/// Get a chain by ID, scoped to owner.
-pub async fn get_chain_for_owner(
+/// Get a directive by ID, scoped to owner.
+pub async fn get_directive_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
-) -> Result<Option<Chain>, sqlx::Error> {
- sqlx::query_as::<_, Chain>(
+) -> Result<Option<Directive>, sqlx::Error> {
+ sqlx::query_as::<_, Directive>(
r#"
- SELECT *
- FROM chains
- WHERE id = $1 AND owner_id = $2
+ SELECT * FROM directives WHERE id = $1 AND owner_id = $2
"#,
)
.bind(id)
@@ -4954,817 +5047,485 @@ pub async fn get_chain_for_owner(
.await
}
-/// Get a chain by ID (no owner check - for internal use).
-pub async fn get_chain(pool: &PgPool, id: Uuid) -> Result<Option<Chain>, sqlx::Error> {
- sqlx::query_as::<_, Chain>(
- r#"
- SELECT *
- FROM chains
- WHERE id = $1
- "#,
+/// Get a directive by ID (no owner check - for internal use).
+pub async fn get_directive(pool: &PgPool, id: Uuid) -> Result<Option<Directive>, sqlx::Error> {
+ sqlx::query_as::<_, Directive>(
+ r#"SELECT * FROM directives WHERE id = $1"#,
)
.bind(id)
.fetch_optional(pool)
.await
}
-/// List chains for a specific owner.
-pub async fn list_chains_for_owner(
+/// List directives for an owner.
+pub async fn list_directives_for_owner(
pool: &PgPool,
owner_id: Uuid,
-) -> Result<Vec<ChainSummary>, sqlx::Error> {
- sqlx::query_as::<_, ChainSummary>(
- r#"
- SELECT
- c.id,
- c.name,
- c.description,
- c.status,
- c.loop_enabled,
- c.loop_current_iteration,
- COUNT(DISTINCT cc.contract_id) as contract_count,
- COUNT(DISTINCT CASE WHEN con.status = 'completed' THEN cc.contract_id END) as completed_count,
- c.version,
- c.created_at
- FROM chains c
- LEFT JOIN chain_contracts cc ON cc.chain_id = c.id
- LEFT JOIN contracts con ON con.id = cc.contract_id
- WHERE c.owner_id = $1
- GROUP BY c.id
- ORDER BY c.created_at DESC
- "#,
- )
- .bind(owner_id)
- .fetch_all(pool)
- .await
+ status_filter: Option<&str>,
+) -> Result<Vec<DirectiveSummary>, sqlx::Error> {
+ let query = if let Some(status) = status_filter {
+ sqlx::query_as::<_, DirectiveSummary>(
+ r#"
+ SELECT
+ d.id, d.title, d.goal, d.status, d.autonomy_level,
+ dc.current_confidence,
+ COALESCE(dc.completed_steps, 0) as completed_steps,
+ COALESCE(dc.total_steps, 0) as total_steps,
+ d.chain_generation_count, d.started_at, d.created_at
+ FROM directives d
+ LEFT JOIN directive_chains dc ON dc.id = d.current_chain_id
+ WHERE d.owner_id = $1 AND d.status = $2
+ ORDER BY d.created_at DESC
+ "#,
+ )
+ .bind(owner_id)
+ .bind(status)
+ } else {
+ sqlx::query_as::<_, DirectiveSummary>(
+ r#"
+ SELECT
+ d.id, d.title, d.goal, d.status, d.autonomy_level,
+ dc.current_confidence,
+ COALESCE(dc.completed_steps, 0) as completed_steps,
+ COALESCE(dc.total_steps, 0) as total_steps,
+ d.chain_generation_count, d.started_at, d.created_at
+ FROM directives d
+ LEFT JOIN directive_chains dc ON dc.id = d.current_chain_id
+ WHERE d.owner_id = $1
+ ORDER BY d.created_at DESC
+ "#,
+ )
+ .bind(owner_id)
+ };
+ query.fetch_all(pool).await
}
-/// Update a chain.
-pub async fn update_chain_for_owner(
+/// Update a directive with optimistic locking.
+pub async fn update_directive_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
- req: UpdateChainRequest,
-) -> Result<Chain, RepositoryError> {
- // First get current version if optimistic locking requested
- if let Some(expected_version) = req.version {
- let current: Option<(i32,)> = sqlx::query_as(
- "SELECT version FROM chains WHERE id = $1 AND owner_id = $2",
- )
- .bind(id)
- .bind(owner_id)
- .fetch_optional(pool)
- .await?;
+ req: UpdateDirectiveRequest,
+) -> Result<Directive, RepositoryError> {
+ // First get current version
+ let current = sqlx::query_scalar::<_, i32>(
+ "SELECT version FROM directives WHERE id = $1 AND owner_id = $2"
+ )
+ .bind(id)
+ .bind(owner_id)
+ .fetch_optional(pool)
+ .await?
+ .ok_or_else(|| RepositoryError::Database(sqlx::Error::RowNotFound))?;
- if let Some((actual_version,)) = current {
- if actual_version != expected_version {
- return Err(RepositoryError::VersionConflict {
- expected: expected_version,
- actual: actual_version,
- });
- }
- }
+ if current != req.version {
+ return Err(RepositoryError::VersionConflict {
+ expected: req.version,
+ actual: current,
+ });
}
- let result = sqlx::query_as::<_, Chain>(
- r#"
- UPDATE chains
- SET
- name = COALESCE($3, name),
- description = COALESCE($4, description),
- status = COALESCE($5, status),
- loop_enabled = COALESCE($6, loop_enabled),
- loop_max_iterations = COALESCE($7, loop_max_iterations),
- loop_progress_check = COALESCE($8, loop_progress_check),
+ let directive = sqlx::query_as::<_, Directive>(
+ r#"
+ UPDATE directives SET
+ title = COALESCE($3, title),
+ goal = COALESCE($4, goal),
+ requirements = COALESCE($5, requirements),
+ acceptance_criteria = COALESCE($6, acceptance_criteria),
+ constraints = COALESCE($7, constraints),
+ external_dependencies = COALESCE($8, external_dependencies),
+ autonomy_level = COALESCE($9, autonomy_level),
+ confidence_threshold_green = COALESCE($10, confidence_threshold_green),
+ confidence_threshold_yellow = COALESCE($11, confidence_threshold_yellow),
+ max_total_cost_usd = COALESCE($12, max_total_cost_usd),
+ max_wall_time_minutes = COALESCE($13, max_wall_time_minutes),
+ max_rework_cycles = COALESCE($14, max_rework_cycles),
+ max_chain_regenerations = COALESCE($15, max_chain_regenerations),
version = version + 1,
updated_at = NOW()
- WHERE id = $1 AND owner_id = $2
+ WHERE id = $1 AND owner_id = $2 AND version = $16
RETURNING *
"#,
)
.bind(id)
.bind(owner_id)
- .bind(&req.name)
- .bind(&req.description)
- .bind(&req.status)
- .bind(req.loop_enabled)
- .bind(req.loop_max_iterations)
- .bind(&req.loop_progress_check)
+ .bind(&req.title)
+ .bind(&req.goal)
+ .bind(&req.requirements)
+ .bind(&req.acceptance_criteria)
+ .bind(&req.constraints)
+ .bind(&req.external_dependencies)
+ .bind(&req.autonomy_level)
+ .bind(req.confidence_threshold_green)
+ .bind(req.confidence_threshold_yellow)
+ .bind(req.max_total_cost_usd)
+ .bind(req.max_wall_time_minutes)
+ .bind(req.max_rework_cycles)
+ .bind(req.max_chain_regenerations)
+ .bind(req.version)
.fetch_one(pool)
.await?;
- Ok(result)
+ Ok(directive)
}
-/// Delete (archive) a chain.
-pub async fn delete_chain_for_owner(
+/// Update directive status.
+pub async fn update_directive_status(
pool: &PgPool,
id: Uuid,
- owner_id: Uuid,
-) -> Result<bool, sqlx::Error> {
- let result = sqlx::query(
- r#"
- UPDATE chains
- SET status = 'archived', updated_at = NOW()
- WHERE id = $1 AND owner_id = $2
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected() > 0)
-}
-
-/// Add a contract to a chain.
-pub async fn add_contract_to_chain(
- pool: &PgPool,
- chain_id: Uuid,
- contract_id: Uuid,
- depends_on: Vec<Uuid>,
- order_index: i32,
- editor_x: Option<f64>,
- editor_y: Option<f64>,
-) -> Result<ChainContract, sqlx::Error> {
- // Also update the contract's chain_id
- sqlx::query("UPDATE contracts SET chain_id = $1 WHERE id = $2")
- .bind(chain_id)
- .bind(contract_id)
- .execute(pool)
- .await?;
-
- sqlx::query_as::<_, ChainContract>(
+ status: &str,
+) -> Result<Directive, sqlx::Error> {
+ sqlx::query_as::<_, Directive>(
r#"
- INSERT INTO chain_contracts (chain_id, contract_id, depends_on, order_index, editor_x, editor_y)
- VALUES ($1, $2, $3, $4, $5, $6)
- ON CONFLICT (chain_id, contract_id) DO UPDATE SET
- depends_on = EXCLUDED.depends_on,
- order_index = EXCLUDED.order_index,
- editor_x = EXCLUDED.editor_x,
- editor_y = EXCLUDED.editor_y
+ UPDATE directives SET
+ status = $2,
+ started_at = CASE WHEN $2 = 'active' AND started_at IS NULL THEN NOW() ELSE started_at END,
+ completed_at = CASE WHEN $2 IN ('completed', 'failed', 'archived') THEN NOW() ELSE completed_at END,
+ updated_at = NOW()
+ WHERE id = $1
RETURNING *
"#,
)
- .bind(chain_id)
- .bind(contract_id)
- .bind(&depends_on)
- .bind(order_index)
- .bind(editor_x)
- .bind(editor_y)
+ .bind(id)
+ .bind(status)
.fetch_one(pool)
.await
}
-/// Remove a contract from a chain.
-pub async fn remove_contract_from_chain(
+/// Archive a directive (soft delete).
+pub async fn archive_directive_for_owner(
pool: &PgPool,
- chain_id: Uuid,
- contract_id: Uuid,
+ id: Uuid,
+ owner_id: Uuid,
) -> Result<bool, sqlx::Error> {
- // Clear the contract's chain_id
- sqlx::query("UPDATE contracts SET chain_id = NULL WHERE id = $1 AND chain_id = $2")
- .bind(contract_id)
- .bind(chain_id)
- .execute(pool)
- .await?;
-
let result = sqlx::query(
r#"
- DELETE FROM chain_contracts
- WHERE chain_id = $1 AND contract_id = $2
+ UPDATE directives SET status = 'archived', updated_at = NOW()
+ WHERE id = $1 AND owner_id = $2
"#,
)
- .bind(chain_id)
- .bind(contract_id)
+ .bind(id)
+ .bind(owner_id)
.execute(pool)
.await?;
-
Ok(result.rows_affected() > 0)
}
-/// List contracts in a chain with their details.
-pub async fn list_chain_contracts(
- pool: &PgPool,
- chain_id: Uuid,
-) -> Result<Vec<ChainContractDetail>, sqlx::Error> {
- sqlx::query_as::<_, ChainContractDetail>(
- r#"
- SELECT
- cc.id as chain_contract_id,
- cc.contract_id,
- c.name as contract_name,
- c.status as contract_status,
- c.phase as contract_phase,
- cc.depends_on,
- cc.order_index,
- cc.editor_x,
- cc.editor_y,
- cc.evaluation_status,
- cc.evaluation_retry_count,
- cc.max_evaluation_retries,
- cc.created_at
- FROM chain_contracts cc
- JOIN contracts c ON c.id = cc.contract_id
- WHERE cc.chain_id = $1
- ORDER BY cc.order_index ASC
- "#,
- )
- .bind(chain_id)
- .fetch_all(pool)
- .await
-}
-
-/// Get chain with all contracts for detail view.
-pub async fn get_chain_with_contracts(
+/// Get directive with full progress info.
+pub async fn get_directive_with_progress(
pool: &PgPool,
- chain_id: Uuid,
+ id: Uuid,
owner_id: Uuid,
-) -> Result<Option<ChainWithContracts>, sqlx::Error> {
- let chain = get_chain_for_owner(pool, chain_id, owner_id).await?;
-
- match chain {
- Some(chain) => {
- let contracts = list_chain_contracts(pool, chain_id).await?;
- let repositories = list_chain_repositories(pool, chain_id).await?;
- Ok(Some(ChainWithContracts {
- chain,
- contracts,
- repositories,
- }))
- }
- None => Ok(None),
- }
-}
+) -> Result<Option<DirectiveWithProgress>, sqlx::Error> {
+ let directive = match get_directive_for_owner(pool, id, owner_id).await? {
+ Some(d) => d,
+ None => return Ok(None),
+ };
-// =============================================================================
-// Chain Repository Operations
-// =============================================================================
+ let chain = if let Some(chain_id) = directive.current_chain_id {
+ get_directive_chain(pool, chain_id).await?
+ } else {
+ None
+ };
-/// List all repositories for a chain.
-pub async fn list_chain_repositories(
- pool: &PgPool,
- chain_id: Uuid,
-) -> Result<Vec<ChainRepository>, sqlx::Error> {
- sqlx::query_as::<_, ChainRepository>(
- r#"
- SELECT *
- FROM chain_repositories
- WHERE chain_id = $1
- ORDER BY is_primary DESC, created_at ASC
- "#,
- )
- .bind(chain_id)
- .fetch_all(pool)
- .await
-}
+ let steps = if let Some(ref c) = chain {
+ list_chain_steps(pool, c.id).await?
+ } else {
+ vec![]
+ };
-/// Get a chain repository by ID.
-pub async fn get_chain_repository(
- pool: &PgPool,
- chain_id: Uuid,
- repository_id: Uuid,
-) -> Result<Option<ChainRepository>, sqlx::Error> {
- sqlx::query_as::<_, ChainRepository>(
- r#"
- SELECT *
- FROM chain_repositories
- WHERE id = $1 AND chain_id = $2
- "#,
- )
- .bind(repository_id)
- .bind(chain_id)
- .fetch_optional(pool)
- .await
+ let recent_events = list_directive_events(pool, id, Some(20)).await?;
+ let pending_approvals = list_pending_approvals(pool, id).await?;
+
+ Ok(Some(DirectiveWithProgress {
+ directive,
+ chain,
+ steps,
+ recent_events,
+ pending_approvals,
+ }))
}
-/// Add a repository to a chain.
-pub async fn add_chain_repository(
+// =============================================================================
+// Directive Chain Operations
+// =============================================================================
+
+/// Create a new chain generation for a directive.
+pub async fn create_directive_chain(
pool: &PgPool,
- chain_id: Uuid,
- req: &AddChainRepositoryRequest,
-) -> Result<ChainRepository, sqlx::Error> {
- // If is_primary, clear other primaries first
- if req.is_primary {
- sqlx::query(
- r#"
- UPDATE chain_repositories
- SET is_primary = false, updated_at = NOW()
- WHERE chain_id = $1 AND is_primary = true
- "#,
- )
- .bind(chain_id)
- .execute(pool)
- .await?;
- }
+ directive_id: Uuid,
+ name: &str,
+ description: Option<&str>,
+ rationale: Option<&str>,
+ planning_model: Option<&str>,
+) -> Result<DirectiveChain, sqlx::Error> {
+ // Get next generation number
+ let generation = sqlx::query_scalar::<_, i32>(
+ "SELECT COALESCE(MAX(generation), 0) + 1 FROM directive_chains WHERE directive_id = $1"
+ )
+ .bind(directive_id)
+ .fetch_one(pool)
+ .await?;
- sqlx::query_as::<_, ChainRepository>(
+ let chain = sqlx::query_as::<_, DirectiveChain>(
r#"
- INSERT INTO chain_repositories (chain_id, name, repository_url, local_path, source_type, status, is_primary)
- VALUES ($1, $2, $3, $4, $5, 'ready', $6)
+ INSERT INTO directive_chains (directive_id, generation, name, description, rationale, planning_model)
+ VALUES ($1, $2, $3, $4, $5, $6)
RETURNING *
"#,
)
- .bind(chain_id)
- .bind(&req.name)
- .bind(&req.repository_url)
- .bind(&req.local_path)
- .bind(&req.source_type)
- .bind(req.is_primary)
+ .bind(directive_id)
+ .bind(generation)
+ .bind(name)
+ .bind(description)
+ .bind(rationale)
+ .bind(planning_model)
.fetch_one(pool)
- .await
-}
-
-/// Delete a repository from a chain.
-pub async fn delete_chain_repository(
- pool: &PgPool,
- chain_id: Uuid,
- repository_id: Uuid,
-) -> Result<bool, sqlx::Error> {
- let result = sqlx::query(
- r#"
- DELETE FROM chain_repositories
- WHERE id = $1 AND chain_id = $2
- "#,
- )
- .bind(repository_id)
- .bind(chain_id)
- .execute(pool)
.await?;
- Ok(result.rows_affected() > 0)
-}
-
-/// Set a repository as primary for a chain.
-pub async fn set_chain_repository_primary(
- pool: &PgPool,
- chain_id: Uuid,
- repository_id: Uuid,
-) -> Result<ChainRepository, sqlx::Error> {
- // Clear existing primary
+ // Update directive to point to new chain and increment generation count
sqlx::query(
r#"
- UPDATE chain_repositories
- SET is_primary = false, updated_at = NOW()
- WHERE chain_id = $1 AND is_primary = true
+ UPDATE directives SET
+ current_chain_id = $2,
+ chain_generation_count = chain_generation_count + 1,
+ updated_at = NOW()
+ WHERE id = $1
"#,
)
- .bind(chain_id)
+ .bind(directive_id)
+ .bind(chain.id)
.execute(pool)
.await?;
- // Set new primary
- sqlx::query_as::<_, ChainRepository>(
- r#"
- UPDATE chain_repositories
- SET is_primary = true, updated_at = NOW()
- WHERE id = $1 AND chain_id = $2
- RETURNING *
- "#,
- )
- .bind(repository_id)
- .bind(chain_id)
- .fetch_one(pool)
- .await
+ Ok(chain)
}
-/// Get the primary repository for a chain.
-pub async fn get_chain_primary_repository(
- pool: &PgPool,
- chain_id: Uuid,
-) -> Result<Option<ChainRepository>, sqlx::Error> {
- sqlx::query_as::<_, ChainRepository>(
- r#"
- SELECT *
- FROM chain_repositories
- WHERE chain_id = $1 AND is_primary = true
- "#,
+/// Get a directive chain by ID.
+pub async fn get_directive_chain(pool: &PgPool, id: Uuid) -> Result<Option<DirectiveChain>, sqlx::Error> {
+ sqlx::query_as::<_, DirectiveChain>(
+ "SELECT * FROM directive_chains WHERE id = $1"
)
- .bind(chain_id)
+ .bind(id)
.fetch_optional(pool)
.await
}
-/// Get chain graph structure for visualization.
-pub async fn get_chain_graph(
- pool: &PgPool,
- chain_id: Uuid,
-) -> Result<Option<ChainGraphResponse>, sqlx::Error> {
- let chain = get_chain(pool, chain_id).await?;
-
- match chain {
- Some(chain) => {
- let contracts = list_chain_contracts(pool, chain_id).await?;
-
- let nodes: Vec<ChainGraphNode> = contracts
- .iter()
- .map(|c| ChainGraphNode {
- id: c.chain_contract_id,
- contract_id: c.contract_id,
- name: c.contract_name.clone(),
- status: c.contract_status.clone(),
- phase: c.contract_phase.clone(),
- x: c.editor_x.unwrap_or(0.0),
- y: c.editor_y.unwrap_or(0.0),
- })
- .collect();
-
- let mut edges: Vec<ChainGraphEdge> = Vec::new();
- for contract in &contracts {
- for dep_id in &contract.depends_on {
- // Find the chain_contract_id for this dependency
- if let Some(dep) = contracts.iter().find(|c| c.contract_id == *dep_id) {
- edges.push(ChainGraphEdge {
- from: dep.chain_contract_id,
- to: contract.chain_contract_id,
- });
- }
- }
- }
-
- Ok(Some(ChainGraphResponse {
- chain_id: chain.id,
- chain_name: chain.name,
- chain_status: chain.status,
- nodes,
- edges,
- }))
- }
- None => Ok(None),
- }
-}
-
-/// Record a chain event.
-pub async fn record_chain_event(
- pool: &PgPool,
- chain_id: Uuid,
- event_type: &str,
- contract_id: Option<Uuid>,
- event_data: Option<serde_json::Value>,
-) -> Result<ChainEvent, sqlx::Error> {
- sqlx::query_as::<_, ChainEvent>(
+/// Get the current chain for a directive.
+pub async fn get_current_chain(pool: &PgPool, directive_id: Uuid) -> Result<Option<DirectiveChain>, sqlx::Error> {
+ sqlx::query_as::<_, DirectiveChain>(
r#"
- INSERT INTO chain_events (chain_id, event_type, contract_id, event_data)
- VALUES ($1, $2, $3, $4)
- RETURNING *
+ SELECT dc.* FROM directive_chains dc
+ JOIN directives d ON d.current_chain_id = dc.id
+ WHERE d.id = $1
"#,
)
- .bind(chain_id)
- .bind(event_type)
- .bind(contract_id)
- .bind(event_data)
- .fetch_one(pool)
+ .bind(directive_id)
+ .fetch_optional(pool)
.await
}
-/// List chain events.
-pub async fn list_chain_events(
+/// Update chain status.
+pub async fn update_chain_status(
pool: &PgPool,
chain_id: Uuid,
-) -> Result<Vec<ChainEvent>, sqlx::Error> {
- sqlx::query_as::<_, ChainEvent>(
- r#"
- SELECT *
- FROM chain_events
- WHERE chain_id = $1
- ORDER BY created_at DESC
- "#,
- )
- .bind(chain_id)
- .fetch_all(pool)
- .await
-}
-
-/// Increment chain loop iteration.
-pub async fn increment_chain_loop(pool: &PgPool, chain_id: Uuid) -> Result<Chain, sqlx::Error> {
- sqlx::query_as::<_, Chain>(
+ status: &str,
+) -> Result<DirectiveChain, sqlx::Error> {
+ sqlx::query_as::<_, DirectiveChain>(
r#"
- UPDATE chains
- SET loop_current_iteration = COALESCE(loop_current_iteration, 0) + 1,
+ UPDATE directive_chains SET
+ status = $2,
+ started_at = CASE WHEN $2 = 'active' AND started_at IS NULL THEN NOW() ELSE started_at END,
+ completed_at = CASE WHEN $2 IN ('completed', 'failed', 'superseded') THEN NOW() ELSE completed_at END,
updated_at = NOW()
WHERE id = $1
RETURNING *
"#,
)
.bind(chain_id)
+ .bind(status)
.fetch_one(pool)
.await
}
-/// Mark a chain as completed.
-pub async fn complete_chain(pool: &PgPool, chain_id: Uuid) -> Result<Chain, sqlx::Error> {
- sqlx::query_as::<_, Chain>(
+/// Supersede a chain (mark as superseded and update directive).
+pub async fn supersede_chain(pool: &PgPool, chain_id: Uuid) -> Result<(), sqlx::Error> {
+ sqlx::query(
r#"
- UPDATE chains
- SET status = 'completed',
- updated_at = NOW()
+ UPDATE directive_chains SET status = 'superseded', completed_at = NOW(), updated_at = NOW()
WHERE id = $1
- RETURNING *
- "#,
- )
- .bind(chain_id)
- .fetch_one(pool)
- .await
-}
-
-/// Get contracts in a chain that have no pending dependencies (ready to start).
-/// Returns contracts where all depends_on contracts are completed.
-pub async fn get_ready_chain_contracts(
- pool: &PgPool,
- chain_id: Uuid,
-) -> Result<Vec<ChainContractDetail>, sqlx::Error> {
- sqlx::query_as::<_, ChainContractDetail>(
- r#"
- SELECT
- cc.id as chain_contract_id,
- cc.contract_id,
- c.name as contract_name,
- c.status as contract_status,
- c.phase as contract_phase,
- cc.depends_on,
- cc.order_index,
- cc.editor_x,
- cc.editor_y
- FROM chain_contracts cc
- JOIN contracts c ON c.id = cc.contract_id
- WHERE cc.chain_id = $1
- AND c.status = 'active'
- AND (
- -- No dependencies
- cc.depends_on IS NULL
- OR array_length(cc.depends_on, 1) IS NULL
- OR array_length(cc.depends_on, 1) = 0
- -- Or all dependencies completed
- OR NOT EXISTS (
- SELECT 1
- FROM unnest(cc.depends_on) AS dep_id
- JOIN contracts dep ON dep.id = dep_id
- WHERE dep.status != 'completed'
- )
- )
- ORDER BY cc.order_index ASC
"#,
)
.bind(chain_id)
- .fetch_all(pool)
- .await
-}
-
-/// Check if all contracts in a chain are completed.
-pub async fn is_chain_complete(pool: &PgPool, chain_id: Uuid) -> Result<bool, sqlx::Error> {
- let result: (i64,) = sqlx::query_as(
- r#"
- SELECT COUNT(*)
- FROM chain_contracts cc
- JOIN contracts c ON c.id = cc.contract_id
- WHERE cc.chain_id = $1
- AND c.status != 'completed'
- "#,
- )
- .bind(chain_id)
- .fetch_one(pool)
+ .execute(pool)
.await?;
-
- Ok(result.0 == 0)
-}
-
-/// Get chain editor data for the GUI editor.
-pub async fn get_chain_editor_data(
- pool: &PgPool,
- chain_id: Uuid,
- owner_id: Uuid,
-) -> Result<Option<ChainEditorData>, sqlx::Error> {
- let chain = get_chain_for_owner(pool, chain_id, owner_id).await?;
-
- match chain {
- Some(chain) => {
- let contracts = list_chain_contracts(pool, chain_id).await?;
- let repositories = list_chain_repositories(pool, chain_id).await?;
-
- // Build nodes
- let nodes: Vec<ChainEditorNode> = contracts
- .iter()
- .map(|c| ChainEditorNode {
- id: c.contract_id.to_string(),
- x: c.editor_x.unwrap_or(0.0),
- y: c.editor_y.unwrap_or(0.0),
- contract: ChainEditorContract {
- name: c.contract_name.clone(),
- description: None, // Would need to join with full contract data
- contract_type: "simple".to_string(),
- phases: vec!["plan".to_string(), "execute".to_string()],
- tasks: vec![],
- deliverables: vec![],
- },
- })
- .collect();
-
- // Build edges
- let edges: Vec<ChainEditorEdge> = contracts
- .iter()
- .flat_map(|c| {
- c.depends_on.iter().map(move |dep_id| ChainEditorEdge {
- from: dep_id.to_string(),
- to: c.contract_id.to_string(),
- })
- })
- .collect();
-
- Ok(Some(ChainEditorData {
- id: Some(chain.id),
- name: chain.name,
- description: chain.description,
- repositories,
- loop_enabled: chain.loop_enabled,
- loop_max_iterations: chain.loop_max_iterations,
- loop_progress_check: chain.loop_progress_check,
- nodes,
- edges,
- }))
- }
- None => Ok(None),
- }
+ Ok(())
}
// =============================================================================
-// Chain Contract Definition Operations
+// Chain Step Operations
// =============================================================================
-/// Create a new contract definition in a chain.
-pub async fn create_chain_contract_definition(
+/// Create a new step in a chain.
+pub async fn create_chain_step(
pool: &PgPool,
chain_id: Uuid,
- req: AddContractDefinitionRequest,
-) -> Result<ChainContractDefinition, sqlx::Error> {
- // Get the next order index
- let max_order: Option<i32> = sqlx::query_scalar(
- "SELECT MAX(order_index) FROM chain_contract_definitions WHERE chain_id = $1",
+ req: AddStepRequest,
+) -> Result<ChainStep, sqlx::Error> {
+ let step_type = req.step_type.unwrap_or_else(|| "execute".to_string());
+ let contract_type = req.contract_type.unwrap_or_else(|| "simple".to_string());
+ let phases = req.phases.unwrap_or_default();
+ let depends_on = req.depends_on.unwrap_or_default();
+ let requirement_ids = req.requirement_ids.unwrap_or_default();
+ let acceptance_criteria_ids = req.acceptance_criteria_ids.unwrap_or_default();
+ let verifier_config = req.verifier_config.unwrap_or(serde_json::json!({}));
+
+ // Get next order index
+ let order_index = sqlx::query_scalar::<_, i32>(
+ "SELECT COALESCE(MAX(order_index), 0) + 1 FROM chain_steps WHERE chain_id = $1"
)
.bind(chain_id)
.fetch_one(pool)
.await?;
- let order_index = max_order.unwrap_or(-1) + 1;
-
- // Convert tasks, deliverables, and validation to JSON
- let tasks_json = req.tasks.as_ref().map(|t| serde_json::to_value(t).unwrap());
- let deliverables_json = req
- .deliverables
- .as_ref()
- .map(|d| serde_json::to_value(d).unwrap());
- let validation_json = req
- .validation
- .as_ref()
- .map(|v| serde_json::to_value(v).unwrap());
- let depends_on_names: Vec<String> = req.depends_on.unwrap_or_default();
-
- sqlx::query_as::<_, ChainContractDefinition>(
- r#"
- INSERT INTO chain_contract_definitions
- (chain_id, name, description, contract_type, initial_phase, depends_on_names, tasks, deliverables, validation, editor_x, editor_y, order_index)
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
+ let step = sqlx::query_as::<_, ChainStep>(
+ r#"
+ INSERT INTO chain_steps (
+ chain_id, name, description, step_type, contract_type,
+ initial_phase, task_plan, phases, depends_on, parallel_group,
+ requirement_ids, acceptance_criteria_ids, verifier_config,
+ editor_x, editor_y, order_index
+ )
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
RETURNING *
"#,
)
.bind(chain_id)
.bind(&req.name)
.bind(&req.description)
- .bind(&req.contract_type)
+ .bind(&step_type)
+ .bind(&contract_type)
.bind(&req.initial_phase)
- .bind(&depends_on_names)
- .bind(&tasks_json)
- .bind(&deliverables_json)
- .bind(&validation_json)
- .bind(req.editor_x)
- .bind(req.editor_y)
+ .bind(&req.task_plan)
+ .bind(&phases)
+ .bind(&depends_on)
+ .bind(&req.parallel_group)
+ .bind(&requirement_ids)
+ .bind(&acceptance_criteria_ids)
+ .bind(&verifier_config)
+ .bind(req.editor_x.unwrap_or(0.0))
+ .bind(req.editor_y.unwrap_or(0.0))
.bind(order_index)
.fetch_one(pool)
- .await
-}
+ .await?;
-/// List all contract definitions in a chain.
-pub async fn list_chain_contract_definitions(
- pool: &PgPool,
- chain_id: Uuid,
-) -> Result<Vec<ChainContractDefinition>, sqlx::Error> {
- sqlx::query_as::<_, ChainContractDefinition>(
- r#"
- SELECT * FROM chain_contract_definitions
- WHERE chain_id = $1
- ORDER BY order_index ASC
- "#,
+ // Update chain total_steps count
+ sqlx::query(
+ "UPDATE directive_chains SET total_steps = total_steps + 1, updated_at = NOW() WHERE id = $1"
)
.bind(chain_id)
- .fetch_all(pool)
- .await
+ .execute(pool)
+ .await?;
+
+ Ok(step)
}
-/// Get a specific contract definition.
-pub async fn get_chain_contract_definition(
- pool: &PgPool,
- definition_id: Uuid,
-) -> Result<Option<ChainContractDefinition>, sqlx::Error> {
- sqlx::query_as::<_, ChainContractDefinition>(
- "SELECT * FROM chain_contract_definitions WHERE id = $1",
+/// Get a chain step by ID.
+pub async fn get_chain_step(pool: &PgPool, id: Uuid) -> Result<Option<ChainStep>, sqlx::Error> {
+ sqlx::query_as::<_, ChainStep>(
+ "SELECT * FROM chain_steps WHERE id = $1"
)
- .bind(definition_id)
+ .bind(id)
.fetch_optional(pool)
.await
}
-/// Update a contract definition.
-pub async fn update_chain_contract_definition(
- pool: &PgPool,
- definition_id: Uuid,
- req: UpdateContractDefinitionRequest,
-) -> Result<ChainContractDefinition, sqlx::Error> {
- let tasks_json = req.tasks.as_ref().map(|t| serde_json::to_value(t).unwrap());
- let deliverables_json = req
- .deliverables
- .as_ref()
- .map(|d| serde_json::to_value(d).unwrap());
- let validation_json = req
- .validation
- .as_ref()
- .map(|v| serde_json::to_value(v).unwrap());
+/// List all steps in a chain.
+pub async fn list_chain_steps(pool: &PgPool, chain_id: Uuid) -> Result<Vec<ChainStep>, sqlx::Error> {
+ sqlx::query_as::<_, ChainStep>(
+ "SELECT * FROM chain_steps WHERE chain_id = $1 ORDER BY order_index"
+ )
+ .bind(chain_id)
+ .fetch_all(pool)
+ .await
+}
- sqlx::query_as::<_, ChainContractDefinition>(
+/// Update a chain step.
+pub async fn update_chain_step(
+ pool: &PgPool,
+ step_id: Uuid,
+ req: UpdateStepRequest,
+) -> Result<ChainStep, sqlx::Error> {
+ sqlx::query_as::<_, ChainStep>(
r#"
- UPDATE chain_contract_definitions SET
+ UPDATE chain_steps SET
name = COALESCE($2, name),
description = COALESCE($3, description),
- contract_type = COALESCE($4, contract_type),
- initial_phase = COALESCE($5, initial_phase),
- depends_on_names = COALESCE($6, depends_on_names),
- tasks = COALESCE($7, tasks),
- deliverables = COALESCE($8, deliverables),
- validation = COALESCE($9, validation),
- editor_x = COALESCE($10, editor_x),
- editor_y = COALESCE($11, editor_y)
+ task_plan = COALESCE($4, task_plan),
+ depends_on = COALESCE($5, depends_on),
+ requirement_ids = COALESCE($6, requirement_ids),
+ acceptance_criteria_ids = COALESCE($7, acceptance_criteria_ids),
+ verifier_config = COALESCE($8, verifier_config),
+ editor_x = COALESCE($9, editor_x),
+ editor_y = COALESCE($10, editor_y)
WHERE id = $1
RETURNING *
"#,
)
- .bind(definition_id)
+ .bind(step_id)
.bind(&req.name)
.bind(&req.description)
- .bind(&req.contract_type)
- .bind(&req.initial_phase)
+ .bind(&req.task_plan)
.bind(&req.depends_on)
- .bind(&tasks_json)
- .bind(&deliverables_json)
- .bind(&validation_json)
+ .bind(&req.requirement_ids)
+ .bind(&req.acceptance_criteria_ids)
+ .bind(&req.verifier_config)
.bind(req.editor_x)
.bind(req.editor_y)
.fetch_one(pool)
.await
}
-/// Delete a contract definition.
-pub async fn delete_chain_contract_definition(
- pool: &PgPool,
- definition_id: Uuid,
-) -> Result<bool, sqlx::Error> {
- let result = sqlx::query("DELETE FROM chain_contract_definitions WHERE id = $1")
- .bind(definition_id)
+/// Delete a chain step.
+pub async fn delete_chain_step(pool: &PgPool, step_id: Uuid) -> Result<bool, sqlx::Error> {
+ // Get chain_id first for updating count
+ let chain_id = sqlx::query_scalar::<_, Uuid>(
+ "SELECT chain_id FROM chain_steps WHERE id = $1"
+ )
+ .bind(step_id)
+ .fetch_optional(pool)
+ .await?;
+
+ let result = sqlx::query("DELETE FROM chain_steps WHERE id = $1")
+ .bind(step_id)
.execute(pool)
.await?;
+
+ // Update chain total_steps count
+ if let Some(cid) = chain_id {
+ sqlx::query(
+ "UPDATE directive_chains SET total_steps = total_steps - 1, updated_at = NOW() WHERE id = $1"
+ )
+ .bind(cid)
+ .execute(pool)
+ .await?;
+ }
+
Ok(result.rows_affected() > 0)
}
-/// Get definitions that are ready to be instantiated (all dependencies are satisfied).
-/// A definition is ready if all definitions it depends on have been instantiated as contracts
-/// and those contracts have completed.
-pub async fn get_ready_definitions(
- pool: &PgPool,
- chain_id: Uuid,
-) -> Result<Vec<ChainContractDefinition>, sqlx::Error> {
- sqlx::query_as::<_, ChainContractDefinition>(
- r#"
- SELECT d.*
- FROM chain_contract_definitions d
- WHERE d.chain_id = $1
- -- Not already instantiated
- AND NOT EXISTS (
- SELECT 1 FROM chain_contracts cc
- WHERE cc.definition_id = d.id
- )
- -- All dependencies satisfied (either no deps, or all deps have completed contracts)
- AND (
- cardinality(d.depends_on_names) = 0
- OR NOT EXISTS (
- SELECT 1 FROM unnest(d.depends_on_names) AS dep_name
- WHERE NOT EXISTS (
- SELECT 1 FROM chain_contract_definitions dep_def
- JOIN chain_contracts cc ON cc.definition_id = dep_def.id
- JOIN contracts c ON c.id = cc.contract_id
- WHERE dep_def.chain_id = d.chain_id
- AND dep_def.name = dep_name
- AND c.status = 'completed'
- )
- )
- )
- ORDER BY d.order_index ASC
+/// Find steps that are ready to execute (all dependencies met, status=pending).
+pub async fn find_ready_steps(pool: &PgPool, chain_id: Uuid) -> Result<Vec<ChainStep>, sqlx::Error> {
+ sqlx::query_as::<_, ChainStep>(
+ r#"
+ SELECT s.* FROM chain_steps s
+ WHERE s.chain_id = $1
+ AND s.status = 'pending'
+ AND NOT EXISTS (
+ SELECT 1 FROM chain_steps dep
+ WHERE dep.id = ANY(s.depends_on)
+ AND dep.status NOT IN ('passed', 'skipped')
+ )
+ ORDER BY s.order_index
"#,
)
.bind(chain_id)
@@ -5772,909 +5533,500 @@ pub async fn get_ready_definitions(
.await
}
-/// Get the definition graph for visualization.
-pub async fn get_chain_definition_graph(
+/// Update step status.
+pub async fn update_step_status(
pool: &PgPool,
- chain_id: Uuid,
-) -> Result<Option<ChainDefinitionGraphResponse>, sqlx::Error> {
- let chain = sqlx::query_as::<_, Chain>("SELECT * FROM chains WHERE id = $1")
- .bind(chain_id)
- .fetch_optional(pool)
- .await?;
-
- let Some(chain) = chain else {
- return Ok(None);
- };
-
- let definitions = list_chain_contract_definitions(pool, chain_id).await?;
-
- // Get instantiated contracts for each definition
- let chain_contracts = list_chain_contracts(pool, chain_id).await?;
- let instantiated: std::collections::HashMap<Uuid, &ChainContractDetail> = chain_contracts
- .iter()
- .filter_map(|cc| {
- // Find definition_id from cc - we need to query this
- // For now, match by name
- definitions
- .iter()
- .find(|d| d.name == cc.contract_name)
- .map(|d| (d.id, cc))
- })
- .collect();
-
- let nodes: Vec<ChainDefinitionGraphNode> = definitions
- .iter()
- .map(|d| {
- let cc = instantiated.get(&d.id);
- ChainDefinitionGraphNode {
- id: d.id,
- name: d.name.clone(),
- contract_type: d.contract_type.clone(),
- x: d.editor_x.unwrap_or(0.0),
- y: d.editor_y.unwrap_or(0.0),
- is_instantiated: cc.is_some(),
- contract_id: cc.map(|c| c.contract_id),
- contract_status: cc.map(|c| c.contract_status.clone()),
- }
- })
- .collect();
-
- // Build edges from depends_on_names
- let name_to_id: std::collections::HashMap<&str, Uuid> =
- definitions.iter().map(|d| (d.name.as_str(), d.id)).collect();
-
- let edges: Vec<ChainGraphEdge> = definitions
- .iter()
- .flat_map(|d| {
- let target_id = d.id;
- let name_to_id = &name_to_id;
- d.depends_on_names.iter().filter_map(move |dep_name| {
- name_to_id
- .get(dep_name.as_str())
- .map(|&from_id| ChainGraphEdge { from: from_id, to: target_id })
- })
- })
- .collect();
-
- Ok(Some(ChainDefinitionGraphResponse {
- chain_id: chain.id,
- chain_name: chain.name,
- chain_status: chain.status,
- nodes,
- edges,
- }))
-}
-
-/// Update chain status.
-pub async fn update_chain_status(
- pool: &PgPool,
- chain_id: Uuid,
+ step_id: Uuid,
status: &str,
-) -> Result<(), sqlx::Error> {
- sqlx::query("UPDATE chains SET status = $2, updated_at = NOW() WHERE id = $1")
- .bind(chain_id)
- .bind(status)
- .execute(pool)
- .await?;
- Ok(())
-}
-
-// =============================================================================
-// Chain Progression
-// =============================================================================
-
-/// Result of chain progression check
-#[derive(Debug)]
-pub struct ChainProgressionResult {
- /// Contracts created from ready definitions
- pub contracts_created: Vec<Uuid>,
- /// Whether all definitions are instantiated and completed (chain is done)
- pub chain_completed: bool,
-}
-
-/// Progress a chain by creating contracts from ready definitions.
-///
-/// This is called when a contract in the chain completes. It:
-/// 1. Finds definitions whose dependencies are all satisfied (completed)
-/// 2. Creates contracts from those definitions
-/// 3. Links them to the chain
-/// 4. Checks if chain is complete (all definitions instantiated and completed)
-pub async fn progress_chain(
- pool: &PgPool,
- chain_id: Uuid,
- owner_id: Uuid,
-) -> Result<ChainProgressionResult, sqlx::Error> {
- let mut contracts_created = Vec::new();
-
- // Get all definitions for this chain
- let definitions = list_chain_contract_definitions(pool, chain_id).await?;
- if definitions.is_empty() {
- return Ok(ChainProgressionResult {
- contracts_created: vec![],
- chain_completed: true,
- });
- }
-
- // Get existing chain contracts to know what's already instantiated
- let chain_contracts = list_chain_contracts(pool, chain_id).await?;
-
- // Build a map of definition name -> instantiated contract status
- let instantiated: std::collections::HashMap<String, Option<String>> = chain_contracts
- .iter()
- .map(|cc| (cc.contract_name.clone(), Some(cc.contract_status.clone())))
- .collect();
-
- // Find definitions that are ready to be instantiated:
- // - Not yet instantiated
- // - All dependencies are instantiated AND completed
- for def in &definitions {
- // Skip if already instantiated
- if instantiated.contains_key(&def.name) {
- continue;
- }
-
- // Check if all dependencies are completed
- let deps_satisfied = def.depends_on_names.iter().all(|dep_name| {
- instantiated
- .get(dep_name)
- .map(|status| status.as_deref() == Some("completed"))
- .unwrap_or(false)
- });
-
- // Root definitions (no dependencies) are always ready
- let is_root = def.depends_on_names.is_empty();
-
- if is_root || deps_satisfied {
- // Create contract from definition
- match create_contract_from_definition(pool, chain_id, owner_id, def).await {
- Ok(contract_id) => {
- contracts_created.push(contract_id);
- tracing::info!(
- chain_id = %chain_id,
- definition_name = %def.name,
- contract_id = %contract_id,
- "Created contract from chain definition"
- );
- }
- Err(e) => {
- tracing::error!(
- chain_id = %chain_id,
- definition_name = %def.name,
- error = %e,
- "Failed to create contract from chain definition"
- );
- }
- }
- }
- }
-
- // Check if chain is complete (all definitions instantiated and completed)
- let updated_contracts = list_chain_contracts(pool, chain_id).await?;
- let all_instantiated = definitions.len() == updated_contracts.len();
- let all_completed = updated_contracts
- .iter()
- .all(|cc| cc.contract_status == "completed");
- let chain_completed = all_instantiated && all_completed;
-
- if chain_completed {
- update_chain_status(pool, chain_id, "completed").await?;
- tracing::info!(chain_id = %chain_id, "Chain completed - all contracts done");
- }
-
- Ok(ChainProgressionResult {
- contracts_created,
- chain_completed,
- })
-}
-
-/// Task definition parsed from JSON (matches chain YAML format)
-#[derive(Debug, Clone, serde::Deserialize)]
-struct ChainTaskDef {
- name: String,
- plan: String,
-}
-
-/// Validation config parsed from definition JSON
-#[derive(Debug, Clone, serde::Deserialize)]
-#[serde(rename_all = "camelCase")]
-struct ValidationConfig {
- #[serde(default)]
- check_deliverables: bool,
- #[serde(default)]
- run_tests: bool,
- check_content: Option<String>,
- #[serde(default = "default_on_failure_str")]
- on_failure: String,
- #[serde(default = "default_max_retries_val")]
- max_retries: i32,
-}
-
-fn default_on_failure_str() -> String {
- "block".to_string()
-}
-
-fn default_max_retries_val() -> i32 {
- 3
-}
-
-/// Generate a validation plan for a checkpoint contract.
-fn generate_checkpoint_plan(
- def: &ChainContractDefinition,
- upstream_contracts: &[&ChainContractDetail],
- validation: &ValidationConfig,
-) -> String {
- let upstream_names: Vec<&str> = upstream_contracts.iter().map(|c| c.contract_name.as_str()).collect();
-
- let mut plan = format!(
- r#"# Checkpoint Validation: {}
-
-You are validating the outputs of upstream contracts before allowing downstream work to proceed.
-
-## Upstream Contracts to Validate
-{}
-
-"#,
- def.name,
- upstream_names.iter().map(|n| format!("- {}", n)).collect::<Vec<_>>().join("\n")
- );
-
- // Add deliverables check section
- if validation.check_deliverables {
- plan.push_str(r#"## Deliverables Check
-Verify that all required deliverables from upstream contracts exist and are properly completed.
-
-Use the makima CLI to check contract status:
-```bash
-makima contract status <contract_id>
-```
-
-For each upstream contract, verify:
-1. Contract status is "completed"
-2. All required deliverables are marked as complete
-3. Deliverable content exists and is not empty
-
-"#);
- }
-
- // Add tests check section
- if validation.run_tests {
- plan.push_str(r#"## Tests Check
-Run the test suite to verify the codebase is in a good state.
-
-```bash
-# Run tests appropriate for the project type
-npm test # for Node.js projects
-cargo test # for Rust projects
-pytest # for Python projects
-go test ./... # for Go projects
-```
-
-Verify:
-1. All tests pass
-2. No new test failures introduced
-3. Test coverage is acceptable
-
-"#);
- }
-
- // Add custom content check section
- if let Some(content_check) = &validation.check_content {
- plan.push_str(&format!(r#"## Custom Validation Criteria
-{}
-
-"#, content_check));
- }
-
- // Add validation result section
- plan.push_str(&format!(r#"## Reporting Results
-
-After completing all validation checks, you must report the result:
-
-**If ALL checks pass:**
-Mark this checkpoint contract as completed using:
-```bash
-makima supervisor complete
-```
-
-**If ANY check fails (on_failure: "{}"):**
-"#, validation.on_failure));
-
- match validation.on_failure.as_str() {
- "block" => plan.push_str(r#"
-- Document the failure reason clearly
-- Do NOT mark the contract as complete
-- The chain will be blocked until issues are resolved manually
-"#),
- "retry" => plan.push_str(&format!(r#"
-- Document the failure reason
-- Request retry of the failed upstream contract (max {} retries)
-- Use: `makima supervisor ask "Upstream validation failed. Retry?" --choices "Yes,No"`
-"#, validation.max_retries)),
- "warn" => plan.push_str(r#"
-- Document the warning/issue found
-- Mark the contract as complete anyway (downstream will proceed)
-- Log the warning for visibility
-"#),
- _ => plan.push_str(r#"
-- Document the failure reason
-- Do NOT mark the contract as complete
-"#),
- }
-
- plan.push_str(r#"
-## Begin Validation
-
-Start by checking the status of each upstream contract, then proceed with the validation criteria above.
-"#);
-
- plan
-}
-
-/// Create a contract from a chain definition.
-async fn create_contract_from_definition(
- pool: &PgPool,
- chain_id: Uuid,
- owner_id: Uuid,
- def: &ChainContractDefinition,
-) -> Result<Uuid, sqlx::Error> {
- // Get the existing contracts to find dependency info
- let existing_contracts = list_chain_contracts(pool, chain_id).await?;
- let name_to_contract: std::collections::HashMap<&str, &ChainContractDetail> = existing_contracts
- .iter()
- .map(|cc| (cc.contract_name.as_str(), cc))
- .collect();
-
- // Resolve dependency names to contract details
- let upstream_contracts: Vec<&ChainContractDetail> = def
- .depends_on_names
- .iter()
- .filter_map(|name| name_to_contract.get(name.as_str()).copied())
- .collect();
-
- // Create the contract request with basic fields
- let req = CreateContractRequest {
- name: def.name.clone(),
- description: def.description.clone(),
- contract_type: Some(def.contract_type.clone()),
- initial_phase: def.initial_phase.clone(),
- template_id: None,
- autonomous_loop: None,
- phase_guard: None,
- local_only: None,
- auto_merge_local: None,
- };
-
- // Create the contract
- let contract = create_contract_for_owner(pool, owner_id, req).await?;
-
- // For checkpoint contracts, generate a validation plan
- if def.contract_type == "checkpoint" {
- // Parse validation config
- let validation: ValidationConfig = def
- .validation
- .as_ref()
- .and_then(|v| serde_json::from_value(v.clone()).ok())
- .unwrap_or(ValidationConfig {
- check_deliverables: true,
- run_tests: false,
- check_content: None,
- on_failure: default_on_failure_str(),
- max_retries: default_max_retries_val(),
- });
-
- // Generate validation plan
- let validation_plan = generate_checkpoint_plan(def, &upstream_contracts, &validation);
-
- // Create a supervisor task with the validation plan
- let task_req = CreateTaskRequest {
- contract_id: Some(contract.id),
- name: format!("Validate: {}", def.name),
- description: Some("Checkpoint validation task".to_string()),
- plan: validation_plan,
- parent_task_id: None,
- is_supervisor: true, // Checkpoint uses supervisor task for validation
- priority: 0,
- repository_url: None,
- base_branch: None,
- target_branch: None,
- merge_mode: None,
- target_repo_path: None,
- completion_action: None,
- continue_from_task_id: None,
- copy_files: None,
- checkpoint_sha: None,
- branched_from_task_id: None,
- conversation_history: None,
- supervisor_worktree_task_id: None,
- };
-
- if let Err(e) = create_task_for_owner(pool, owner_id, task_req).await {
- tracing::warn!(
- contract_id = %contract.id,
- error = %e,
- "Failed to create validation task for checkpoint contract"
- );
- }
+) -> Result<ChainStep, sqlx::Error> {
+ let step = sqlx::query_as::<_, ChainStep>(
+ r#"
+ UPDATE chain_steps SET
+ status = $2,
+ started_at = CASE WHEN $2 = 'running' AND started_at IS NULL THEN NOW() ELSE started_at END,
+ completed_at = CASE WHEN $2 IN ('passed', 'failed', 'skipped') THEN NOW() ELSE completed_at END
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(step_id)
+ .bind(status)
+ .fetch_one(pool)
+ .await?;
- // Set initial validation status
+ // Update chain completed_steps and failed_steps counts
+ if status == "passed" || status == "skipped" {
sqlx::query(
- "UPDATE chain_contracts SET validation_status = 'pending' WHERE chain_id = $1 AND contract_id = $2",
+ "UPDATE directive_chains SET completed_steps = completed_steps + 1, updated_at = NOW() WHERE id = $1"
)
- .bind(chain_id)
- .bind(contract.id)
+ .bind(step.chain_id)
.execute(pool)
.await?;
- } else {
- // Parse and create tasks from definition for regular contracts
- if let Some(tasks_json) = &def.tasks {
- if let Ok(tasks) = serde_json::from_value::<Vec<ChainTaskDef>>(tasks_json.clone()) {
- for task_def in tasks {
- let task_req = CreateTaskRequest {
- contract_id: Some(contract.id),
- name: task_def.name,
- description: None,
- plan: task_def.plan,
- parent_task_id: None,
- is_supervisor: false,
- priority: 0,
- repository_url: None,
- base_branch: None,
- target_branch: None,
- merge_mode: None,
- target_repo_path: None,
- completion_action: None,
- continue_from_task_id: None,
- copy_files: None,
- checkpoint_sha: None,
- branched_from_task_id: None,
- conversation_history: None,
- supervisor_worktree_task_id: None,
- };
- if let Err(e) = create_task_for_owner(pool, owner_id, task_req).await {
- tracing::warn!(
- contract_id = %contract.id,
- error = %e,
- "Failed to create task from chain definition"
- );
- }
- }
- }
- }
- }
-
- // Resolve dependency names to contract IDs
- let depends_on: Vec<Uuid> = upstream_contracts.iter().map(|c| c.contract_id).collect();
-
- // Link contract to chain
- add_contract_to_chain(
- pool,
- chain_id,
- contract.id,
- depends_on,
- def.order_index,
- def.editor_x,
- def.editor_y,
- )
- .await?;
-
- // Update chain_contracts with definition_id link
- sqlx::query(
- "UPDATE chain_contracts SET definition_id = $1 WHERE chain_id = $2 AND contract_id = $3",
- )
- .bind(def.id)
- .bind(chain_id)
- .bind(contract.id)
- .execute(pool)
- .await?;
-
- // Copy repositories from chain to contract
- let chain_repos = list_chain_repositories(pool, chain_id).await.unwrap_or_default();
- for repo in chain_repos {
- if let Some(url) = &repo.repository_url {
- // Remote repository
- if let Err(e) = add_remote_repository(pool, contract.id, &repo.name, url, repo.is_primary).await {
- tracing::warn!(
- contract_id = %contract.id,
- repo_name = %repo.name,
- error = %e,
- "Failed to copy repository from chain to contract"
- );
- }
- } else if let Some(path) = &repo.local_path {
- // Local repository
- if let Err(e) = add_local_repository(pool, contract.id, &repo.name, path, repo.is_primary).await {
- tracing::warn!(
- contract_id = %contract.id,
- repo_name = %repo.name,
- error = %e,
- "Failed to copy local repository from chain to contract"
- );
- }
- }
- }
-
- // Activate the contract so it can start
- sqlx::query("UPDATE contracts SET status = 'active' WHERE id = $1")
- .bind(contract.id)
+ } else if status == "failed" {
+ sqlx::query(
+ "UPDATE directive_chains SET failed_steps = failed_steps + 1, updated_at = NOW() WHERE id = $1"
+ )
+ .bind(step.chain_id)
.execute(pool)
.await?;
+ }
- tracing::info!(
- contract_id = %contract.id,
- contract_name = %def.name,
- chain_id = %chain_id,
- "Contract created and activated from chain definition"
- );
-
- Ok(contract.id)
+ Ok(step)
}
-// =============================================================================
-// Chain Directives
-// =============================================================================
-
-/// Create a directive for a chain.
-pub async fn create_chain_directive(
+/// Link a step to a contract.
+pub async fn update_step_contract(
pool: &PgPool,
- chain_id: Uuid,
- req: CreateChainDirectiveRequest,
-) -> Result<ChainDirective, sqlx::Error> {
- let requirements = serde_json::to_value(&req.requirements.unwrap_or_default())
- .unwrap_or(serde_json::json!([]));
- let acceptance_criteria = serde_json::to_value(&req.acceptance_criteria.unwrap_or_default())
- .unwrap_or(serde_json::json!([]));
- let constraints =
- serde_json::to_value(&req.constraints.unwrap_or_default()).unwrap_or(serde_json::json!([]));
- let external_dependencies =
- serde_json::to_value(&req.external_dependencies.unwrap_or_default())
- .unwrap_or(serde_json::json!([]));
- let source_type = req.source_type.unwrap_or_else(|| "llm_generated".to_string());
-
- sqlx::query_as::<_, ChainDirective>(
- r#"
- INSERT INTO chain_directives (chain_id, requirements, acceptance_criteria, constraints, external_dependencies, source_type)
- VALUES ($1, $2, $3, $4, $5, $6)
+ step_id: Uuid,
+ contract_id: Uuid,
+ supervisor_task_id: Option<Uuid>,
+) -> Result<ChainStep, sqlx::Error> {
+ sqlx::query_as::<_, ChainStep>(
+ r#"
+ UPDATE chain_steps SET contract_id = $2, supervisor_task_id = $3
+ WHERE id = $1
RETURNING *
"#,
)
- .bind(chain_id)
- .bind(&requirements)
- .bind(&acceptance_criteria)
- .bind(&constraints)
- .bind(&external_dependencies)
- .bind(&source_type)
+ .bind(step_id)
+ .bind(contract_id)
+ .bind(supervisor_task_id)
.fetch_one(pool)
.await
}
-/// Get the directive for a chain.
-pub async fn get_chain_directive(
+/// Update step confidence score and level.
+pub async fn update_step_confidence(
pool: &PgPool,
- chain_id: Uuid,
-) -> Result<Option<ChainDirective>, sqlx::Error> {
- sqlx::query_as::<_, ChainDirective>(
+ step_id: Uuid,
+ score: f64,
+ level: &str,
+ evaluation_id: Uuid,
+) -> Result<ChainStep, sqlx::Error> {
+ sqlx::query_as::<_, ChainStep>(
r#"
- SELECT *
- FROM chain_directives
- WHERE chain_id = $1
+ UPDATE chain_steps SET
+ confidence_score = $2,
+ confidence_level = $3,
+ last_evaluation_id = $4,
+ evaluation_count = evaluation_count + 1
+ WHERE id = $1
+ RETURNING *
"#,
)
- .bind(chain_id)
- .fetch_optional(pool)
+ .bind(step_id)
+ .bind(score)
+ .bind(level)
+ .bind(evaluation_id)
+ .fetch_one(pool)
.await
}
-/// Update a chain directive.
-pub async fn update_chain_directive(
- pool: &PgPool,
- chain_id: Uuid,
- req: CreateChainDirectiveRequest,
-) -> Result<ChainDirective, sqlx::Error> {
- let requirements = req
- .requirements
- .map(|r| serde_json::to_value(&r).unwrap_or(serde_json::json!([])));
- let acceptance_criteria = req
- .acceptance_criteria
- .map(|ac| serde_json::to_value(&ac).unwrap_or(serde_json::json!([])));
- let constraints = req
- .constraints
- .map(|c| serde_json::to_value(&c).unwrap_or(serde_json::json!([])));
- let external_dependencies = req
- .external_dependencies
- .map(|ed| serde_json::to_value(&ed).unwrap_or(serde_json::json!([])));
-
- sqlx::query_as::<_, ChainDirective>(
- r#"
- UPDATE chain_directives SET
- requirements = COALESCE($2, requirements),
- acceptance_criteria = COALESCE($3, acceptance_criteria),
- constraints = COALESCE($4, constraints),
- external_dependencies = COALESCE($5, external_dependencies),
- source_type = COALESCE($6, source_type),
- version = version + 1,
- updated_at = NOW()
- WHERE chain_id = $1
+/// Increment step rework count.
+pub async fn increment_step_rework_count(pool: &PgPool, step_id: Uuid) -> Result<ChainStep, sqlx::Error> {
+ sqlx::query_as::<_, ChainStep>(
+ r#"
+ UPDATE chain_steps SET rework_count = rework_count + 1, status = 'rework'
+ WHERE id = $1
RETURNING *
"#,
)
- .bind(chain_id)
- .bind(&requirements)
- .bind(&acceptance_criteria)
- .bind(&constraints)
- .bind(&external_dependencies)
- .bind(&req.source_type)
+ .bind(step_id)
.fetch_one(pool)
.await
}
-/// Delete a chain directive.
-pub async fn delete_chain_directive(pool: &PgPool, chain_id: Uuid) -> Result<bool, sqlx::Error> {
- let result = sqlx::query("DELETE FROM chain_directives WHERE chain_id = $1")
- .bind(chain_id)
- .execute(pool)
- .await?;
- Ok(result.rows_affected() > 0)
-}
-
-/// Get directive traceability (requirement -> contract mapping).
-pub async fn get_directive_traceability(
+/// Get chain graph for visualization.
+pub async fn get_chain_graph(
pool: &PgPool,
chain_id: Uuid,
-) -> Result<DirectiveTraceabilityResponse, sqlx::Error> {
- // Get the directive
- let directive = get_chain_directive(pool, chain_id).await?;
-
- // Get all contract definitions with their requirement mappings
- let definitions = list_chain_contract_definitions(pool, chain_id).await?;
-
- // Parse requirements from directive
- let requirements: Vec<super::models::DirectiveRequirement> = directive
- .as_ref()
- .and_then(|d| serde_json::from_value(d.requirements.clone()).ok())
- .unwrap_or_default();
-
- // Build traceability entries
- let mut entries: Vec<TraceabilityEntry> = Vec::new();
- let mut covered_requirements: std::collections::HashSet<String> =
- std::collections::HashSet::new();
-
- for req in &requirements {
- let mut contract_def_ids: Vec<Uuid> = Vec::new();
- let mut contract_def_names: Vec<String> = Vec::new();
-
- for def in &definitions {
- if def.requirement_ids.contains(&req.id) {
- contract_def_ids.push(def.id);
- contract_def_names.push(def.name.clone());
- covered_requirements.insert(req.id.clone());
- }
+) -> Result<DirectiveChainGraphResponse, sqlx::Error> {
+ let chain = get_directive_chain(pool, chain_id).await?
+ .ok_or_else(|| sqlx::Error::RowNotFound)?;
+
+ let steps = list_chain_steps(pool, chain_id).await?;
+
+ let nodes: Vec<DirectiveChainGraphNode> = steps.iter().map(|s| {
+ DirectiveChainGraphNode {
+ id: s.id,
+ name: s.name.clone(),
+ step_type: s.step_type.clone(),
+ status: s.status.clone(),
+ confidence_score: s.confidence_score,
+ confidence_level: s.confidence_level.clone(),
+ contract_id: s.contract_id,
+ editor_x: s.editor_x,
+ editor_y: s.editor_y,
+ }
+ }).collect();
+
+ let mut edges = Vec::new();
+ for step in &steps {
+ for dep_id in &step.depends_on {
+ edges.push(DirectiveChainGraphEdge {
+ source: *dep_id,
+ target: step.id,
+ });
}
-
- // Get acceptance criteria for this requirement
- let acceptance_criteria: Vec<super::models::DirectiveAcceptanceCriterion> = directive
- .as_ref()
- .and_then(|d| serde_json::from_value(d.acceptance_criteria.clone()).ok())
- .unwrap_or_default();
-
- let ac_ids: Vec<String> = acceptance_criteria
- .iter()
- .filter(|ac| ac.requirement_ids.contains(&req.id))
- .map(|ac| ac.id.clone())
- .collect();
-
- entries.push(TraceabilityEntry {
- requirement_id: req.id.clone(),
- requirement_title: req.title.clone(),
- contract_definition_ids: contract_def_ids,
- contract_definition_names: contract_def_names,
- acceptance_criteria_ids: ac_ids,
- });
}
- // Find uncovered requirements
- let uncovered: Vec<String> = requirements
- .iter()
- .filter(|r| !covered_requirements.contains(&r.id))
- .map(|r| r.id.clone())
- .collect();
-
- Ok(DirectiveTraceabilityResponse {
+ Ok(DirectiveChainGraphResponse {
chain_id,
- entries,
- uncovered_requirements: uncovered,
+ directive_id: chain.directive_id,
+ nodes,
+ edges,
})
}
// =============================================================================
-// Contract Evaluations
+// Directive Evaluation Operations
// =============================================================================
-/// Create a contract evaluation record.
-pub async fn create_contract_evaluation(
+/// Create a directive evaluation.
+pub async fn create_directive_evaluation(
pool: &PgPool,
- req: CreateContractEvaluationRequest,
-) -> Result<ContractEvaluation, sqlx::Error> {
- let criteria_results = serde_json::to_value(&req.criteria_results).unwrap_or(serde_json::json!([]));
+ directive_id: Uuid,
+ chain_id: Option<Uuid>,
+ step_id: Option<Uuid>,
+ contract_id: Option<Uuid>,
+ evaluation_type: &str,
+ evaluator: Option<&str>,
+ passed: bool,
+ overall_score: Option<f64>,
+ confidence_level: Option<&str>,
+ programmatic_results: serde_json::Value,
+ llm_results: serde_json::Value,
+ criteria_results: serde_json::Value,
+ summary_feedback: &str,
+ rework_instructions: Option<&str>,
+) -> Result<DirectiveEvaluation, sqlx::Error> {
+ // Get next evaluation number for this step/directive
+ let evaluation_number = if let Some(sid) = step_id {
+ sqlx::query_scalar::<_, i32>(
+ "SELECT COALESCE(MAX(evaluation_number), 0) + 1 FROM directive_evaluations WHERE step_id = $1"
+ )
+ .bind(sid)
+ .fetch_one(pool)
+ .await?
+ } else {
+ sqlx::query_scalar::<_, i32>(
+ "SELECT COALESCE(MAX(evaluation_number), 0) + 1 FROM directive_evaluations WHERE directive_id = $1 AND step_id IS NULL"
+ )
+ .bind(directive_id)
+ .fetch_one(pool)
+ .await?
+ };
- sqlx::query_as::<_, ContractEvaluation>(
+ sqlx::query_as::<_, DirectiveEvaluation>(
r#"
- INSERT INTO contract_evaluations (
- contract_id, chain_id, chain_contract_id,
- evaluator_model, passed, overall_score,
- criteria_results, summary_feedback, rework_instructions,
+ INSERT INTO directive_evaluations (
+ directive_id, chain_id, step_id, contract_id,
+ evaluation_type, evaluation_number, evaluator,
+ passed, overall_score, confidence_level,
+ programmatic_results, llm_results, criteria_results,
+ summary_feedback, rework_instructions,
completed_at
)
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, NOW())
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, NOW())
RETURNING *
"#,
)
- .bind(req.contract_id)
- .bind(req.chain_id)
- .bind(req.chain_contract_id)
- .bind(&req.evaluator_model)
- .bind(req.passed)
- .bind(req.overall_score)
+ .bind(directive_id)
+ .bind(chain_id)
+ .bind(step_id)
+ .bind(contract_id)
+ .bind(evaluation_type)
+ .bind(evaluation_number)
+ .bind(evaluator)
+ .bind(passed)
+ .bind(overall_score)
+ .bind(confidence_level)
+ .bind(&programmatic_results)
+ .bind(&llm_results)
.bind(&criteria_results)
- .bind(&req.summary_feedback)
- .bind(&req.rework_instructions)
+ .bind(summary_feedback)
+ .bind(rework_instructions)
.fetch_one(pool)
.await
}
-/// Get a contract evaluation by ID.
-pub async fn get_contract_evaluation(
+/// List evaluations for a step.
+pub async fn list_step_evaluations(
pool: &PgPool,
- id: Uuid,
-) -> Result<Option<ContractEvaluation>, sqlx::Error> {
- sqlx::query_as::<_, ContractEvaluation>(
- r#"
- SELECT *
- FROM contract_evaluations
- WHERE id = $1
- "#,
+ step_id: Uuid,
+) -> Result<Vec<DirectiveEvaluation>, sqlx::Error> {
+ sqlx::query_as::<_, DirectiveEvaluation>(
+ "SELECT * FROM directive_evaluations WHERE step_id = $1 ORDER BY evaluation_number DESC"
)
- .bind(id)
- .fetch_optional(pool)
+ .bind(step_id)
+ .fetch_all(pool)
.await
}
-/// List evaluations for a contract.
-pub async fn list_contract_evaluations(
+/// List evaluations for a directive.
+pub async fn list_directive_evaluations(
pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Vec<ContractEvaluationSummary>, sqlx::Error> {
- sqlx::query_as::<_, ContractEvaluationSummary>(
- r#"
- SELECT id, contract_id, evaluation_number, passed, overall_score, summary_feedback, created_at
- FROM contract_evaluations
- WHERE contract_id = $1
- ORDER BY evaluation_number DESC
- "#,
+ directive_id: Uuid,
+ limit: Option<i64>,
+) -> Result<Vec<DirectiveEvaluation>, sqlx::Error> {
+ let limit = limit.unwrap_or(100);
+ sqlx::query_as::<_, DirectiveEvaluation>(
+ "SELECT * FROM directive_evaluations WHERE directive_id = $1 ORDER BY created_at DESC LIMIT $2"
)
- .bind(contract_id)
+ .bind(directive_id)
+ .bind(limit)
.fetch_all(pool)
.await
}
-/// List evaluations for a chain.
-pub async fn list_chain_evaluations(
+// =============================================================================
+// Directive Event Operations
+// =============================================================================
+
+/// Emit a directive event.
+pub async fn emit_directive_event(
pool: &PgPool,
- chain_id: Uuid,
-) -> Result<Vec<ContractEvaluationSummary>, sqlx::Error> {
- sqlx::query_as::<_, ContractEvaluationSummary>(
+ directive_id: Uuid,
+ chain_id: Option<Uuid>,
+ step_id: Option<Uuid>,
+ event_type: &str,
+ severity: &str,
+ event_data: Option<serde_json::Value>,
+ actor_type: &str,
+ actor_id: Option<Uuid>,
+) -> Result<DirectiveEvent, sqlx::Error> {
+ sqlx::query_as::<_, DirectiveEvent>(
r#"
- SELECT id, contract_id, evaluation_number, passed, overall_score, summary_feedback, created_at
- FROM contract_evaluations
- WHERE chain_id = $1
- ORDER BY created_at DESC
+ INSERT INTO directive_events (
+ directive_id, chain_id, step_id, event_type, severity, event_data, actor_type, actor_id
+ )
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
+ RETURNING *
"#,
)
+ .bind(directive_id)
.bind(chain_id)
+ .bind(step_id)
+ .bind(event_type)
+ .bind(severity)
+ .bind(event_data)
+ .bind(actor_type)
+ .bind(actor_id)
+ .fetch_one(pool)
+ .await
+}
+
+/// List directive events.
+pub async fn list_directive_events(
+ pool: &PgPool,
+ directive_id: Uuid,
+ limit: Option<i64>,
+) -> Result<Vec<DirectiveEvent>, sqlx::Error> {
+ let limit = limit.unwrap_or(100);
+ sqlx::query_as::<_, DirectiveEvent>(
+ "SELECT * FROM directive_events WHERE directive_id = $1 ORDER BY created_at DESC LIMIT $2"
+ )
+ .bind(directive_id)
+ .bind(limit)
+ .fetch_all(pool)
+ .await
+}
+
+// =============================================================================
+// Directive Verifier Operations
+// =============================================================================
+
+/// Create a directive verifier.
+pub async fn create_directive_verifier(
+ pool: &PgPool,
+ directive_id: Uuid,
+ name: &str,
+ verifier_type: &str,
+ command: Option<&str>,
+ working_directory: Option<&str>,
+ auto_detect: bool,
+ detect_files: Vec<String>,
+ weight: f64,
+ required: bool,
+) -> Result<DirectiveVerifier, sqlx::Error> {
+ sqlx::query_as::<_, DirectiveVerifier>(
+ r#"
+ INSERT INTO directive_verifiers (
+ directive_id, name, verifier_type, command, working_directory,
+ auto_detect, detect_files, weight, required
+ )
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
+ RETURNING *
+ "#,
+ )
+ .bind(directive_id)
+ .bind(name)
+ .bind(verifier_type)
+ .bind(command)
+ .bind(working_directory)
+ .bind(auto_detect)
+ .bind(&detect_files)
+ .bind(weight)
+ .bind(required)
+ .fetch_one(pool)
+ .await
+}
+
+/// List verifiers for a directive.
+pub async fn list_directive_verifiers(
+ pool: &PgPool,
+ directive_id: Uuid,
+) -> Result<Vec<DirectiveVerifier>, sqlx::Error> {
+ sqlx::query_as::<_, DirectiveVerifier>(
+ "SELECT * FROM directive_verifiers WHERE directive_id = $1 ORDER BY name"
+ )
+ .bind(directive_id)
.fetch_all(pool)
.await
}
-/// Get the latest evaluation for a chain contract.
-pub async fn get_latest_chain_contract_evaluation(
+/// Update a directive verifier.
+pub async fn update_directive_verifier(
pool: &PgPool,
- chain_contract_id: Uuid,
-) -> Result<Option<ContractEvaluation>, sqlx::Error> {
- sqlx::query_as::<_, ContractEvaluation>(
+ verifier_id: Uuid,
+ enabled: Option<bool>,
+ command: Option<&str>,
+ weight: Option<f64>,
+ required: Option<bool>,
+) -> Result<DirectiveVerifier, sqlx::Error> {
+ sqlx::query_as::<_, DirectiveVerifier>(
r#"
- SELECT *
- FROM contract_evaluations
- WHERE chain_contract_id = $1
- ORDER BY evaluation_number DESC
- LIMIT 1
+ UPDATE directive_verifiers SET
+ enabled = COALESCE($2, enabled),
+ command = COALESCE($3, command),
+ weight = COALESCE($4, weight),
+ required = COALESCE($5, required),
+ updated_at = NOW()
+ WHERE id = $1
+ RETURNING *
"#,
)
- .bind(chain_contract_id)
- .fetch_optional(pool)
+ .bind(verifier_id)
+ .bind(enabled)
+ .bind(command)
+ .bind(weight)
+ .bind(required)
+ .fetch_one(pool)
.await
}
-/// Get the next evaluation number for a chain contract.
-pub async fn get_next_evaluation_number(
+/// Update verifier last run result.
+pub async fn update_verifier_result(
pool: &PgPool,
- chain_contract_id: Uuid,
-) -> Result<i32, sqlx::Error> {
- let result: Option<(i32,)> = sqlx::query_as(
+ verifier_id: Uuid,
+ result: serde_json::Value,
+) -> Result<DirectiveVerifier, sqlx::Error> {
+ sqlx::query_as::<_, DirectiveVerifier>(
r#"
- SELECT COALESCE(MAX(evaluation_number), 0) + 1 as next_number
- FROM contract_evaluations
- WHERE chain_contract_id = $1
+ UPDATE directive_verifiers SET last_run_at = NOW(), last_result = $2, updated_at = NOW()
+ WHERE id = $1
+ RETURNING *
"#,
)
- .bind(chain_contract_id)
- .fetch_optional(pool)
- .await?;
+ .bind(verifier_id)
+ .bind(result)
+ .fetch_one(pool)
+ .await
+}
- Ok(result.map(|(n,)| n).unwrap_or(1))
+// =============================================================================
+// Directive Approval Operations
+// =============================================================================
+
+/// Create an approval request.
+pub async fn create_approval_request(
+ pool: &PgPool,
+ directive_id: Uuid,
+ step_id: Option<Uuid>,
+ approval_type: &str,
+ description: &str,
+ context: Option<serde_json::Value>,
+ urgency: &str,
+ expires_at: Option<chrono::DateTime<Utc>>,
+) -> Result<DirectiveApproval, sqlx::Error> {
+ sqlx::query_as::<_, DirectiveApproval>(
+ r#"
+ INSERT INTO directive_approvals (
+ directive_id, step_id, approval_type, description, context, urgency, expires_at
+ )
+ VALUES ($1, $2, $3, $4, $5, $6, $7)
+ RETURNING *
+ "#,
+ )
+ .bind(directive_id)
+ .bind(step_id)
+ .bind(approval_type)
+ .bind(description)
+ .bind(context)
+ .bind(urgency)
+ .bind(expires_at)
+ .fetch_one(pool)
+ .await
}
-/// Update chain contract evaluation status.
-pub async fn update_chain_contract_evaluation_status(
+/// Resolve an approval request.
+pub async fn resolve_approval(
pool: &PgPool,
- chain_contract_id: Uuid,
+ approval_id: Uuid,
status: &str,
- evaluation_id: Option<Uuid>,
- rework_feedback: Option<&str>,
-) -> Result<ChainContract, sqlx::Error> {
- sqlx::query_as::<_, ChainContract>(
- r#"
- UPDATE chain_contracts SET
- evaluation_status = $2,
- last_evaluation_id = COALESCE($3, last_evaluation_id),
- rework_feedback = COALESCE($4, rework_feedback),
- evaluation_retry_count = CASE
- WHEN $2 = 'rework' THEN evaluation_retry_count + 1
- ELSE evaluation_retry_count
- END,
- rework_started_at = CASE
- WHEN $2 = 'rework' THEN NOW()
- ELSE rework_started_at
- END
+ response: Option<&str>,
+ responded_by: Uuid,
+) -> Result<DirectiveApproval, sqlx::Error> {
+ sqlx::query_as::<_, DirectiveApproval>(
+ r#"
+ UPDATE directive_approvals SET
+ status = $2,
+ response = $3,
+ responded_by = $4,
+ responded_at = NOW()
WHERE id = $1
RETURNING *
"#,
)
- .bind(chain_contract_id)
+ .bind(approval_id)
.bind(status)
- .bind(evaluation_id)
- .bind(rework_feedback)
+ .bind(response)
+ .bind(responded_by)
.fetch_one(pool)
.await
}
-/// Mark a chain contract's original completion time (before rework).
-pub async fn mark_chain_contract_original_completion(
+/// List pending approvals for a directive.
+pub async fn list_pending_approvals(
pool: &PgPool,
- chain_contract_id: Uuid,
-) -> Result<(), sqlx::Error> {
- sqlx::query(
+ directive_id: Uuid,
+) -> Result<Vec<DirectiveApproval>, sqlx::Error> {
+ sqlx::query_as::<_, DirectiveApproval>(
r#"
- UPDATE chain_contracts SET
- original_completion_at = COALESCE(original_completion_at, NOW())
- WHERE id = $1
+ SELECT * FROM directive_approvals
+ WHERE directive_id = $1 AND status = 'pending'
+ ORDER BY
+ CASE urgency
+ WHEN 'critical' THEN 1
+ WHEN 'high' THEN 2
+ WHEN 'normal' THEN 3
+ ELSE 4
+ END,
+ created_at
"#,
)
- .bind(chain_contract_id)
- .execute(pool)
- .await?;
- Ok(())
+ .bind(directive_id)
+ .fetch_all(pool)
+ .await
}
-/// Get chain contract by contract ID.
-pub async fn get_chain_contract_by_contract_id(
+/// Get step by contract ID.
+pub async fn get_step_by_contract_id(
pool: &PgPool,
contract_id: Uuid,
-) -> Result<Option<ChainContract>, sqlx::Error> {
- sqlx::query_as::<_, ChainContract>(
- r#"
- SELECT *
- FROM chain_contracts
- WHERE contract_id = $1
- "#,
+) -> Result<Option<ChainStep>, sqlx::Error> {
+ sqlx::query_as::<_, ChainStep>(
+ "SELECT * FROM chain_steps WHERE contract_id = $1"
)
.bind(contract_id)
.fetch_optional(pool)
@@ -6682,103 +6034,9 @@ pub async fn get_chain_contract_by_contract_id(
}
// =============================================================================
-// Init Chain (Directive-Driven Chain Creation)
+// Helper Functions
// =============================================================================
-/// Initialize a directive-driven chain.
-/// Creates a directive contract and an empty chain linked to it.
-pub async fn init_chain_for_owner(
- pool: &PgPool,
- owner_id: Uuid,
- req: InitChainRequest,
-) -> Result<InitChainResponse, sqlx::Error> {
- // Create the directive contract
- // Note: "directive" contract type uses the "specification" phases by default
- let contract_req = CreateContractRequest {
- name: format!("Directive: {}", truncate_string(&req.goal, 50)),
- description: Some(req.goal.clone()),
- contract_type: Some("specification".to_string()), // Directive uses spec workflow
- template_id: None,
- initial_phase: Some("research".to_string()),
- phase_guard: Some(req.phase_guard),
- autonomous_loop: Some(false),
- local_only: Some(false),
- auto_merge_local: Some(false),
- };
-
- let contract = create_contract_for_owner(pool, owner_id, contract_req).await?;
-
- // Mark it as a chain directive
- sqlx::query("UPDATE contracts SET is_chain_directive = true WHERE id = $1")
- .bind(contract.id)
- .execute(pool)
- .await?;
-
- // Build repositories list from request
- let repositories = match (req.repository_url.as_ref(), req.local_path.as_ref()) {
- (Some(url), _) => Some(vec![AddChainRepositoryRequest {
- name: "Primary".to_string(),
- repository_url: Some(url.clone()),
- local_path: None,
- source_type: "remote".to_string(),
- is_primary: true,
- }]),
- (None, Some(path)) => Some(vec![AddChainRepositoryRequest {
- name: "Primary".to_string(),
- repository_url: None,
- local_path: Some(path.clone()),
- source_type: "local".to_string(),
- is_primary: true,
- }]),
- (None, None) => None,
- };
-
- // Create the chain with directive contract reference
- let chain_req = CreateChainRequest {
- name: truncate_string(&req.goal, 100),
- description: Some(req.goal),
- repositories,
- loop_enabled: Some(false),
- loop_max_iterations: None,
- loop_progress_check: None,
- contracts: None,
- };
-
- let chain = create_chain_for_owner(pool, owner_id, chain_req).await?;
-
- // Link directive contract to chain
- sqlx::query(
- r#"
- UPDATE chains SET directive_contract_id = $2 WHERE id = $1;
- UPDATE contracts SET spawned_chain_id = $1 WHERE id = $2;
- "#,
- )
- .bind(chain.id)
- .bind(contract.id)
- .execute(pool)
- .await?;
-
- // Create empty directive document
- create_chain_directive(
- pool,
- chain.id,
- CreateChainDirectiveRequest {
- requirements: Some(vec![]),
- acceptance_criteria: Some(vec![]),
- constraints: Some(vec![]),
- external_dependencies: Some(vec![]),
- source_type: Some("llm_generated".to_string()),
- },
- )
- .await?;
-
- Ok(InitChainResponse {
- chain_id: chain.id,
- directive_contract_id: contract.id,
- supervisor_task_id: contract.supervisor_task_id,
- })
-}
-
/// Helper to truncate string to max length
fn truncate_string(s: &str, max_len: usize) -> String {
if s.len() <= max_len {