diff options
Diffstat (limited to 'makima/src/db/repository.rs')
| -rw-r--r-- | makima/src/db/repository.rs | 548 |
1 files changed, 539 insertions, 9 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 2ecbc4a..48b0714 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -6,15 +6,19 @@ use sqlx::PgPool; use uuid::Uuid; use super::models::{ - CheckpointPatch, CheckpointPatchInfo, Contract, ContractChatConversation, - ContractChatMessageRecord, ContractEvent, ContractRepository, ContractSummary, - ContractTypeTemplateRecord, ConversationMessage, ConversationSnapshot, CreateContractRequest, - CreateFileRequest, CreateTaskRequest, CreateTemplateRequest, Daemon, DaemonTaskAssignment, - DaemonWithCapacity, DeliverableDefinition, File, FileSummary, FileVersion, HistoryEvent, - HistoryQueryFilters, MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult, - PhaseConfig, PhaseDefinition, SupervisorHeartbeatRecord, SupervisorState, - Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest, UpdateFileRequest, - UpdateTaskRequest, UpdateTemplateRequest, + AddContractToChainRequest, Chain, ChainContract, ChainContractDetail, ChainEditorContract, + ChainEditorData, ChainEditorDeliverable, ChainEditorEdge, ChainEditorNode, ChainEditorTask, + ChainEvent, ChainGraphEdge, ChainGraphNode, ChainGraphResponse, ChainSummary, + ChainWithContracts, CheckpointPatch, CheckpointPatchInfo, Contract, + ContractChatConversation, ContractChatMessageRecord, ContractEvent, ContractRepository, + ContractSummary, ContractTypeTemplateRecord, ConversationMessage, ConversationSnapshot, + CreateChainRequest, CreateContractRequest, CreateFileRequest, CreateTaskRequest, + CreateTemplateRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, + DeliverableDefinition, File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters, + MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult, PhaseConfig, PhaseDefinition, + SupervisorHeartbeatRecord, SupervisorState, Task, TaskCheckpoint, TaskEvent, TaskSummary, + UpdateChainRequest, UpdateContractRequest, UpdateFileRequest, UpdateTaskRequest, + UpdateTemplateRequest, }; /// Repository error types. @@ -4896,3 +4900,529 @@ pub async fn sync_supervisor_state( .fetch_optional(pool) .await } + +// ============================================================================= +// Chain Operations (DAG of contracts for multi-contract orchestration) +// ============================================================================= + +/// Create a new chain for a specific owner. +pub async fn create_chain_for_owner( + pool: &PgPool, + owner_id: Uuid, + req: CreateChainRequest, +) -> Result<Chain, sqlx::Error> { + let loop_enabled = req.loop_enabled.unwrap_or(false); + let loop_max_iterations = req.loop_max_iterations.unwrap_or(10); + + sqlx::query_as::<_, Chain>( + r#" + INSERT INTO chains (owner_id, name, description, repository_url, local_path, loop_enabled, loop_max_iterations, loop_progress_check) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + RETURNING * + "#, + ) + .bind(owner_id) + .bind(&req.name) + .bind(&req.description) + .bind(&req.repository_url) + .bind(&req.local_path) + .bind(loop_enabled) + .bind(loop_max_iterations) + .bind(&req.loop_progress_check) + .fetch_one(pool) + .await +} + +/// Get a chain by ID, scoped to owner. +pub async fn get_chain_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, +) -> Result<Option<Chain>, sqlx::Error> { + sqlx::query_as::<_, Chain>( + r#" + SELECT * + FROM chains + WHERE id = $1 AND owner_id = $2 + "#, + ) + .bind(id) + .bind(owner_id) + .fetch_optional(pool) + .await +} + +/// Get a chain by ID (no owner check - for internal use). +pub async fn get_chain(pool: &PgPool, id: Uuid) -> Result<Option<Chain>, sqlx::Error> { + sqlx::query_as::<_, Chain>( + r#" + SELECT * + FROM chains + WHERE id = $1 + "#, + ) + .bind(id) + .fetch_optional(pool) + .await +} + +/// List chains for a specific owner. +pub async fn list_chains_for_owner( + pool: &PgPool, + owner_id: Uuid, +) -> Result<Vec<ChainSummary>, sqlx::Error> { + sqlx::query_as::<_, ChainSummary>( + r#" + SELECT + c.id, + c.name, + c.description, + c.status, + c.loop_enabled, + c.loop_current_iteration, + COUNT(DISTINCT cc.contract_id) as contract_count, + COUNT(DISTINCT CASE WHEN con.status = 'completed' THEN cc.contract_id END) as completed_count, + c.version, + c.created_at + FROM chains c + LEFT JOIN chain_contracts cc ON cc.chain_id = c.id + LEFT JOIN contracts con ON con.id = cc.contract_id + WHERE c.owner_id = $1 + GROUP BY c.id + ORDER BY c.created_at DESC + "#, + ) + .bind(owner_id) + .fetch_all(pool) + .await +} + +/// Update a chain. +pub async fn update_chain_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, + req: UpdateChainRequest, +) -> Result<Chain, RepositoryError> { + // First get current version if optimistic locking requested + if let Some(expected_version) = req.version { + let current: Option<(i32,)> = sqlx::query_as( + "SELECT version FROM chains WHERE id = $1 AND owner_id = $2", + ) + .bind(id) + .bind(owner_id) + .fetch_optional(pool) + .await?; + + if let Some((actual_version,)) = current { + if actual_version != expected_version { + return Err(RepositoryError::VersionConflict { + expected: expected_version, + actual: actual_version, + }); + } + } + } + + let result = sqlx::query_as::<_, Chain>( + r#" + UPDATE chains + SET + name = COALESCE($3, name), + description = COALESCE($4, description), + status = COALESCE($5, status), + loop_enabled = COALESCE($6, loop_enabled), + loop_max_iterations = COALESCE($7, loop_max_iterations), + loop_progress_check = COALESCE($8, loop_progress_check), + version = version + 1, + updated_at = NOW() + WHERE id = $1 AND owner_id = $2 + RETURNING * + "#, + ) + .bind(id) + .bind(owner_id) + .bind(&req.name) + .bind(&req.description) + .bind(&req.status) + .bind(req.loop_enabled) + .bind(req.loop_max_iterations) + .bind(&req.loop_progress_check) + .fetch_one(pool) + .await?; + + Ok(result) +} + +/// Delete (archive) a chain. +pub async fn delete_chain_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + UPDATE chains + SET status = 'archived', updated_at = NOW() + WHERE id = $1 AND owner_id = $2 + "#, + ) + .bind(id) + .bind(owner_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Add a contract to a chain. +pub async fn add_contract_to_chain( + pool: &PgPool, + chain_id: Uuid, + contract_id: Uuid, + depends_on: Vec<Uuid>, + order_index: i32, + editor_x: Option<f64>, + editor_y: Option<f64>, +) -> Result<ChainContract, sqlx::Error> { + // Also update the contract's chain_id + sqlx::query("UPDATE contracts SET chain_id = $1 WHERE id = $2") + .bind(chain_id) + .bind(contract_id) + .execute(pool) + .await?; + + sqlx::query_as::<_, ChainContract>( + r#" + INSERT INTO chain_contracts (chain_id, contract_id, depends_on, order_index, editor_x, editor_y) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (chain_id, contract_id) DO UPDATE SET + depends_on = EXCLUDED.depends_on, + order_index = EXCLUDED.order_index, + editor_x = EXCLUDED.editor_x, + editor_y = EXCLUDED.editor_y + RETURNING * + "#, + ) + .bind(chain_id) + .bind(contract_id) + .bind(&depends_on) + .bind(order_index) + .bind(editor_x) + .bind(editor_y) + .fetch_one(pool) + .await +} + +/// Remove a contract from a chain. +pub async fn remove_contract_from_chain( + pool: &PgPool, + chain_id: Uuid, + contract_id: Uuid, +) -> Result<bool, sqlx::Error> { + // Clear the contract's chain_id + sqlx::query("UPDATE contracts SET chain_id = NULL WHERE id = $1 AND chain_id = $2") + .bind(contract_id) + .bind(chain_id) + .execute(pool) + .await?; + + let result = sqlx::query( + r#" + DELETE FROM chain_contracts + WHERE chain_id = $1 AND contract_id = $2 + "#, + ) + .bind(chain_id) + .bind(contract_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// List contracts in a chain with their details. +pub async fn list_chain_contracts( + pool: &PgPool, + chain_id: Uuid, +) -> Result<Vec<ChainContractDetail>, sqlx::Error> { + sqlx::query_as::<_, ChainContractDetail>( + r#" + SELECT + cc.id as chain_contract_id, + cc.contract_id, + c.name as contract_name, + c.status as contract_status, + c.phase as contract_phase, + cc.depends_on, + cc.order_index, + cc.editor_x, + cc.editor_y + FROM chain_contracts cc + JOIN contracts c ON c.id = cc.contract_id + WHERE cc.chain_id = $1 + ORDER BY cc.order_index ASC + "#, + ) + .bind(chain_id) + .fetch_all(pool) + .await +} + +/// Get chain with all contracts for detail view. +pub async fn get_chain_with_contracts( + pool: &PgPool, + chain_id: Uuid, + owner_id: Uuid, +) -> Result<Option<ChainWithContracts>, sqlx::Error> { + let chain = get_chain_for_owner(pool, chain_id, owner_id).await?; + + match chain { + Some(chain) => { + let contracts = list_chain_contracts(pool, chain_id).await?; + Ok(Some(ChainWithContracts { chain, contracts })) + } + None => Ok(None), + } +} + +/// Get chain graph structure for visualization. +pub async fn get_chain_graph( + pool: &PgPool, + chain_id: Uuid, +) -> Result<Option<ChainGraphResponse>, sqlx::Error> { + let chain = get_chain(pool, chain_id).await?; + + match chain { + Some(chain) => { + let contracts = list_chain_contracts(pool, chain_id).await?; + + let nodes: Vec<ChainGraphNode> = contracts + .iter() + .map(|c| ChainGraphNode { + id: c.chain_contract_id, + contract_id: c.contract_id, + name: c.contract_name.clone(), + status: c.contract_status.clone(), + phase: c.contract_phase.clone(), + x: c.editor_x.unwrap_or(0.0), + y: c.editor_y.unwrap_or(0.0), + }) + .collect(); + + let mut edges: Vec<ChainGraphEdge> = Vec::new(); + for contract in &contracts { + for dep_id in &contract.depends_on { + // Find the chain_contract_id for this dependency + if let Some(dep) = contracts.iter().find(|c| c.contract_id == *dep_id) { + edges.push(ChainGraphEdge { + from: dep.chain_contract_id, + to: contract.chain_contract_id, + }); + } + } + } + + Ok(Some(ChainGraphResponse { + chain_id: chain.id, + chain_name: chain.name, + chain_status: chain.status, + nodes, + edges, + })) + } + None => Ok(None), + } +} + +/// Record a chain event. +pub async fn record_chain_event( + pool: &PgPool, + chain_id: Uuid, + event_type: &str, + contract_id: Option<Uuid>, + event_data: Option<serde_json::Value>, +) -> Result<ChainEvent, sqlx::Error> { + sqlx::query_as::<_, ChainEvent>( + r#" + INSERT INTO chain_events (chain_id, event_type, contract_id, event_data) + VALUES ($1, $2, $3, $4) + RETURNING * + "#, + ) + .bind(chain_id) + .bind(event_type) + .bind(contract_id) + .bind(event_data) + .fetch_one(pool) + .await +} + +/// List chain events. +pub async fn list_chain_events( + pool: &PgPool, + chain_id: Uuid, +) -> Result<Vec<ChainEvent>, sqlx::Error> { + sqlx::query_as::<_, ChainEvent>( + r#" + SELECT * + FROM chain_events + WHERE chain_id = $1 + ORDER BY created_at DESC + "#, + ) + .bind(chain_id) + .fetch_all(pool) + .await +} + +/// Increment chain loop iteration. +pub async fn increment_chain_loop(pool: &PgPool, chain_id: Uuid) -> Result<Chain, sqlx::Error> { + sqlx::query_as::<_, Chain>( + r#" + UPDATE chains + SET loop_current_iteration = COALESCE(loop_current_iteration, 0) + 1, + updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(chain_id) + .fetch_one(pool) + .await +} + +/// Mark a chain as completed. +pub async fn complete_chain(pool: &PgPool, chain_id: Uuid) -> Result<Chain, sqlx::Error> { + sqlx::query_as::<_, Chain>( + r#" + UPDATE chains + SET status = 'completed', + updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(chain_id) + .fetch_one(pool) + .await +} + +/// Get contracts in a chain that have no pending dependencies (ready to start). +/// Returns contracts where all depends_on contracts are completed. +pub async fn get_ready_chain_contracts( + pool: &PgPool, + chain_id: Uuid, +) -> Result<Vec<ChainContractDetail>, sqlx::Error> { + sqlx::query_as::<_, ChainContractDetail>( + r#" + SELECT + cc.id as chain_contract_id, + cc.contract_id, + c.name as contract_name, + c.status as contract_status, + c.phase as contract_phase, + cc.depends_on, + cc.order_index, + cc.editor_x, + cc.editor_y + FROM chain_contracts cc + JOIN contracts c ON c.id = cc.contract_id + WHERE cc.chain_id = $1 + AND c.status = 'active' + AND ( + -- No dependencies + cc.depends_on IS NULL + OR array_length(cc.depends_on, 1) IS NULL + OR array_length(cc.depends_on, 1) = 0 + -- Or all dependencies completed + OR NOT EXISTS ( + SELECT 1 + FROM unnest(cc.depends_on) AS dep_id + JOIN contracts dep ON dep.id = dep_id + WHERE dep.status != 'completed' + ) + ) + ORDER BY cc.order_index ASC + "#, + ) + .bind(chain_id) + .fetch_all(pool) + .await +} + +/// Check if all contracts in a chain are completed. +pub async fn is_chain_complete(pool: &PgPool, chain_id: Uuid) -> Result<bool, sqlx::Error> { + let result: (i64,) = sqlx::query_as( + r#" + SELECT COUNT(*) + FROM chain_contracts cc + JOIN contracts c ON c.id = cc.contract_id + WHERE cc.chain_id = $1 + AND c.status != 'completed' + "#, + ) + .bind(chain_id) + .fetch_one(pool) + .await?; + + Ok(result.0 == 0) +} + +/// Get chain editor data for the GUI editor. +pub async fn get_chain_editor_data( + pool: &PgPool, + chain_id: Uuid, + owner_id: Uuid, +) -> Result<Option<ChainEditorData>, sqlx::Error> { + let chain = get_chain_for_owner(pool, chain_id, owner_id).await?; + + match chain { + Some(chain) => { + let contracts = list_chain_contracts(pool, chain_id).await?; + + // Build nodes + let nodes: Vec<ChainEditorNode> = contracts + .iter() + .map(|c| ChainEditorNode { + id: c.contract_id.to_string(), + x: c.editor_x.unwrap_or(0.0), + y: c.editor_y.unwrap_or(0.0), + contract: ChainEditorContract { + name: c.contract_name.clone(), + description: None, // Would need to join with full contract data + contract_type: "simple".to_string(), + phases: vec!["plan".to_string(), "execute".to_string()], + tasks: vec![], + deliverables: vec![], + }, + }) + .collect(); + + // Build edges + let edges: Vec<ChainEditorEdge> = contracts + .iter() + .flat_map(|c| { + c.depends_on.iter().map(move |dep_id| ChainEditorEdge { + from: dep_id.to_string(), + to: c.contract_id.to_string(), + }) + }) + .collect(); + + Ok(Some(ChainEditorData { + id: Some(chain.id), + name: chain.name, + description: chain.description, + repository_url: chain.repository_url, + local_path: chain.local_path, + loop_enabled: chain.loop_enabled, + loop_max_iterations: chain.loop_max_iterations, + loop_progress_check: chain.loop_progress_check, + nodes, + edges, + })) + } + None => Ok(None), + } +} |
