diff options
| author | soryu <soryu@soryu.co> | 2026-02-03 22:01:29 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-02-03 22:01:37 +0000 |
| commit | cf0a25af1d2834bfe6c5ea892ce5769936e5a673 (patch) | |
| tree | 476ba326ac1752281a441b5c17d2b3be4b23a2a9 /makima/src/db | |
| parent | 8361916ce67f3d2ba191ebf27cb50e79cb42e39c (diff) | |
| download | soryu-cf0a25af1d2834bfe6c5ea892ce5769936e5a673.tar.gz soryu-cf0a25af1d2834bfe6c5ea892ce5769936e5a673.zip | |
Add makima chain mechanism
Diffstat (limited to 'makima/src/db')
| -rw-r--r-- | makima/src/db/models.rs | 356 | ||||
| -rw-r--r-- | makima/src/db/repository.rs | 548 |
2 files changed, 895 insertions, 9 deletions
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index cef0a22..45ddb52 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -1446,6 +1446,9 @@ pub struct Contract { /// Use `get_phase_config()` to get the parsed PhaseConfig. #[serde(skip_serializing_if = "Option::is_none")] pub phase_config: Option<serde_json::Value>, + /// Chain ID if this contract is part of a chain (DAG of contracts) + #[serde(skip_serializing_if = "Option::is_none")] + pub chain_id: Option<Uuid>, pub version: i32, pub created_at: DateTime<Utc>, pub updated_at: DateTime<Utc>, @@ -2586,6 +2589,359 @@ pub struct HeartbeatHistoryQuery { } // ============================================================================= +// Chains (DAG of contracts for multi-contract orchestration) +// ============================================================================= + +/// Chain status determines the overall state of the chain +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "lowercase")] +pub enum ChainStatus { + /// Chain is actively running + Active, + /// All contracts completed successfully + Completed, + /// Chain was manually archived + Archived, +} + +impl Default for ChainStatus { + fn default() -> Self { + ChainStatus::Active + } +} + +impl std::fmt::Display for ChainStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ChainStatus::Active => write!(f, "active"), + ChainStatus::Completed => write!(f, "completed"), + ChainStatus::Archived => write!(f, "archived"), + } + } +} + +impl std::str::FromStr for ChainStatus { + type Err = String; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s.to_lowercase().as_str() { + "active" => Ok(ChainStatus::Active), + "completed" => Ok(ChainStatus::Completed), + "archived" => Ok(ChainStatus::Archived), + _ => Err(format!("Invalid chain status: {}", s)), + } + } +} + +/// Chain - a directed acyclic graph (DAG) of contracts +/// Fits Makima's control theme - she controls through invisible chains +#[derive(Debug, Clone, FromRow, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct Chain { + pub id: Uuid, + pub owner_id: Uuid, + pub name: String, + pub description: Option<String>, + pub status: String, + /// Whether loop mode is enabled for iterative execution + #[serde(default)] + pub loop_enabled: bool, + /// Maximum loop iterations (default: 10) + pub loop_max_iterations: Option<i32>, + /// Current loop iteration count + pub loop_current_iteration: Option<i32>, + /// Progress check prompt/criteria for evaluating loop completion + pub loop_progress_check: Option<String>, + /// Repository URL for contracts in this chain (optional) + pub repository_url: Option<String>, + /// Local path for contracts in this chain (optional) + pub local_path: Option<String>, + /// Version for optimistic locking + pub version: i32, + pub created_at: DateTime<Utc>, + pub updated_at: DateTime<Utc>, +} + +impl Chain { + /// Parse status string to ChainStatus enum + pub fn status_enum(&self) -> Result<ChainStatus, String> { + self.status.parse() + } +} + +/// Chain contract link - links contracts to chains with DAG dependency info +#[derive(Debug, Clone, FromRow, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ChainContract { + pub id: Uuid, + pub chain_id: Uuid, + pub contract_id: Uuid, + /// Contract IDs this contract depends on (DAG edges) + #[sqlx(default)] + pub depends_on: Vec<Uuid>, + /// Order for display/processing (topological sort order) + pub order_index: i32, + /// X position for GUI editor + pub editor_x: Option<f64>, + /// Y position for GUI editor + pub editor_y: Option<f64>, + pub created_at: DateTime<Utc>, +} + +/// Chain event for audit trail +#[derive(Debug, Clone, FromRow, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ChainEvent { + pub id: Uuid, + pub chain_id: Uuid, + pub event_type: String, + pub contract_id: Option<Uuid>, + #[sqlx(json)] + pub event_data: Option<serde_json::Value>, + pub created_at: DateTime<Utc>, +} + +/// Summary of a chain for list views +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ChainSummary { + pub id: Uuid, + pub name: String, + pub description: Option<String>, + pub status: String, + pub loop_enabled: bool, + pub loop_current_iteration: Option<i32>, + pub contract_count: i64, + pub completed_count: i64, + pub version: i32, + pub created_at: DateTime<Utc>, +} + +/// Chain with contracts for detail view +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ChainWithContracts { + #[serde(flatten)] + pub chain: Chain, + pub contracts: Vec<ChainContractDetail>, +} + +/// Contract detail within a chain (includes contract info + chain link info) +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ChainContractDetail { + pub chain_contract_id: Uuid, + pub contract_id: Uuid, + pub contract_name: String, + pub contract_status: String, + pub contract_phase: String, + #[sqlx(default)] + pub depends_on: Vec<Uuid>, + pub order_index: i32, + pub editor_x: Option<f64>, + pub editor_y: Option<f64>, +} + +/// DAG graph structure for visualization +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ChainGraphResponse { + pub chain_id: Uuid, + pub chain_name: String, + pub chain_status: String, + pub nodes: Vec<ChainGraphNode>, + pub edges: Vec<ChainGraphEdge>, +} + +/// Node in chain DAG graph +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ChainGraphNode { + pub id: Uuid, + pub contract_id: Uuid, + pub name: String, + pub status: String, + pub phase: String, + pub x: f64, + pub y: f64, +} + +/// Edge in chain DAG graph +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ChainGraphEdge { + pub from: Uuid, + pub to: Uuid, +} + +/// Response for chain list endpoint +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ChainListResponse { + pub chains: Vec<ChainSummary>, + pub total: i64, +} + +/// Request payload for creating a new chain +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreateChainRequest { + /// Name of the chain + pub name: String, + /// Optional description + pub description: Option<String>, + /// Repository URL for contracts in this chain + pub repository_url: Option<String>, + /// Local path for contracts in this chain + pub local_path: Option<String>, + /// Enable loop mode for iterative execution + #[serde(default)] + pub loop_enabled: Option<bool>, + /// Maximum loop iterations (default: 10) + pub loop_max_iterations: Option<i32>, + /// Progress check prompt for evaluating loop completion + pub loop_progress_check: Option<String>, + /// Contracts to create within this chain + pub contracts: Option<Vec<CreateChainContractRequest>>, +} + +/// Request to create a contract within a chain +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreateChainContractRequest { + /// Name of the contract + pub name: String, + /// Optional description + pub description: Option<String>, + /// Contract type + #[serde(default)] + pub contract_type: Option<String>, + /// Initial phase + pub initial_phase: Option<String>, + /// Phases for the contract + pub phases: Option<Vec<String>>, + /// Names of contracts this depends on (resolved to IDs) + pub depends_on: Option<Vec<String>>, + /// Tasks to create in this contract + pub tasks: Option<Vec<CreateChainTaskRequest>>, + /// Deliverables for this contract + pub deliverables: Option<Vec<CreateChainDeliverableRequest>>, + /// Position in GUI editor + pub editor_x: Option<f64>, + pub editor_y: Option<f64>, +} + +/// Task definition within a chain contract +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreateChainTaskRequest { + pub name: String, + pub plan: String, +} + +/// Deliverable definition within a chain contract +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreateChainDeliverableRequest { + pub id: String, + pub name: String, + pub priority: Option<String>, +} + +/// Request to update an existing chain +#[derive(Debug, Clone, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct UpdateChainRequest { + pub name: Option<String>, + pub description: Option<String>, + pub status: Option<String>, + pub loop_enabled: Option<bool>, + pub loop_max_iterations: Option<i32>, + pub loop_progress_check: Option<String>, + /// Version for optimistic locking + pub version: Option<i32>, +} + +/// Request to add a contract to a chain +#[derive(Debug, Clone, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct AddContractToChainRequest { + /// Existing contract ID to add + pub contract_id: Option<Uuid>, + /// Or create a new contract with this definition + pub new_contract: Option<CreateChainContractRequest>, + /// Contract IDs this depends on + pub depends_on: Option<Vec<Uuid>>, + /// Position in GUI editor + pub editor_x: Option<f64>, + pub editor_y: Option<f64>, +} + +/// Editor data model for GUI chain editor +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ChainEditorData { + pub id: Option<Uuid>, + pub name: String, + pub description: Option<String>, + pub repository_url: Option<String>, + pub local_path: Option<String>, + pub loop_enabled: bool, + pub loop_max_iterations: Option<i32>, + pub loop_progress_check: Option<String>, + pub nodes: Vec<ChainEditorNode>, + pub edges: Vec<ChainEditorEdge>, +} + +/// Node in chain editor +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ChainEditorNode { + pub id: String, + pub x: f64, + pub y: f64, + pub contract: ChainEditorContract, +} + +/// Contract data in chain editor node +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ChainEditorContract { + pub name: String, + pub description: Option<String>, + #[serde(rename = "type")] + pub contract_type: String, + pub phases: Vec<String>, + pub tasks: Vec<ChainEditorTask>, + pub deliverables: Vec<ChainEditorDeliverable>, +} + +/// Task in chain editor +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ChainEditorTask { + pub name: String, + pub plan: String, +} + +/// Deliverable in chain editor +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ChainEditorDeliverable { + pub id: String, + pub name: String, + pub priority: String, +} + +/// Edge in chain editor +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ChainEditorEdge { + pub from: String, + pub to: String, +} + +// ============================================================================= // Unit Tests // ============================================================================= diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 2ecbc4a..48b0714 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -6,15 +6,19 @@ use sqlx::PgPool; use uuid::Uuid; use super::models::{ - CheckpointPatch, CheckpointPatchInfo, Contract, ContractChatConversation, - ContractChatMessageRecord, ContractEvent, ContractRepository, ContractSummary, - ContractTypeTemplateRecord, ConversationMessage, ConversationSnapshot, CreateContractRequest, - CreateFileRequest, CreateTaskRequest, CreateTemplateRequest, Daemon, DaemonTaskAssignment, - DaemonWithCapacity, DeliverableDefinition, File, FileSummary, FileVersion, HistoryEvent, - HistoryQueryFilters, MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult, - PhaseConfig, PhaseDefinition, SupervisorHeartbeatRecord, SupervisorState, - Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest, UpdateFileRequest, - UpdateTaskRequest, UpdateTemplateRequest, + AddContractToChainRequest, Chain, ChainContract, ChainContractDetail, ChainEditorContract, + ChainEditorData, ChainEditorDeliverable, ChainEditorEdge, ChainEditorNode, ChainEditorTask, + ChainEvent, ChainGraphEdge, ChainGraphNode, ChainGraphResponse, ChainSummary, + ChainWithContracts, CheckpointPatch, CheckpointPatchInfo, Contract, + ContractChatConversation, ContractChatMessageRecord, ContractEvent, ContractRepository, + ContractSummary, ContractTypeTemplateRecord, ConversationMessage, ConversationSnapshot, + CreateChainRequest, CreateContractRequest, CreateFileRequest, CreateTaskRequest, + CreateTemplateRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, + DeliverableDefinition, File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters, + MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult, PhaseConfig, PhaseDefinition, + SupervisorHeartbeatRecord, SupervisorState, Task, TaskCheckpoint, TaskEvent, TaskSummary, + UpdateChainRequest, UpdateContractRequest, UpdateFileRequest, UpdateTaskRequest, + UpdateTemplateRequest, }; /// Repository error types. @@ -4896,3 +4900,529 @@ pub async fn sync_supervisor_state( .fetch_optional(pool) .await } + +// ============================================================================= +// Chain Operations (DAG of contracts for multi-contract orchestration) +// ============================================================================= + +/// Create a new chain for a specific owner. +pub async fn create_chain_for_owner( + pool: &PgPool, + owner_id: Uuid, + req: CreateChainRequest, +) -> Result<Chain, sqlx::Error> { + let loop_enabled = req.loop_enabled.unwrap_or(false); + let loop_max_iterations = req.loop_max_iterations.unwrap_or(10); + + sqlx::query_as::<_, Chain>( + r#" + INSERT INTO chains (owner_id, name, description, repository_url, local_path, loop_enabled, loop_max_iterations, loop_progress_check) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + RETURNING * + "#, + ) + .bind(owner_id) + .bind(&req.name) + .bind(&req.description) + .bind(&req.repository_url) + .bind(&req.local_path) + .bind(loop_enabled) + .bind(loop_max_iterations) + .bind(&req.loop_progress_check) + .fetch_one(pool) + .await +} + +/// Get a chain by ID, scoped to owner. +pub async fn get_chain_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, +) -> Result<Option<Chain>, sqlx::Error> { + sqlx::query_as::<_, Chain>( + r#" + SELECT * + FROM chains + WHERE id = $1 AND owner_id = $2 + "#, + ) + .bind(id) + .bind(owner_id) + .fetch_optional(pool) + .await +} + +/// Get a chain by ID (no owner check - for internal use). +pub async fn get_chain(pool: &PgPool, id: Uuid) -> Result<Option<Chain>, sqlx::Error> { + sqlx::query_as::<_, Chain>( + r#" + SELECT * + FROM chains + WHERE id = $1 + "#, + ) + .bind(id) + .fetch_optional(pool) + .await +} + +/// List chains for a specific owner. +pub async fn list_chains_for_owner( + pool: &PgPool, + owner_id: Uuid, +) -> Result<Vec<ChainSummary>, sqlx::Error> { + sqlx::query_as::<_, ChainSummary>( + r#" + SELECT + c.id, + c.name, + c.description, + c.status, + c.loop_enabled, + c.loop_current_iteration, + COUNT(DISTINCT cc.contract_id) as contract_count, + COUNT(DISTINCT CASE WHEN con.status = 'completed' THEN cc.contract_id END) as completed_count, + c.version, + c.created_at + FROM chains c + LEFT JOIN chain_contracts cc ON cc.chain_id = c.id + LEFT JOIN contracts con ON con.id = cc.contract_id + WHERE c.owner_id = $1 + GROUP BY c.id + ORDER BY c.created_at DESC + "#, + ) + .bind(owner_id) + .fetch_all(pool) + .await +} + +/// Update a chain. +pub async fn update_chain_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, + req: UpdateChainRequest, +) -> Result<Chain, RepositoryError> { + // First get current version if optimistic locking requested + if let Some(expected_version) = req.version { + let current: Option<(i32,)> = sqlx::query_as( + "SELECT version FROM chains WHERE id = $1 AND owner_id = $2", + ) + .bind(id) + .bind(owner_id) + .fetch_optional(pool) + .await?; + + if let Some((actual_version,)) = current { + if actual_version != expected_version { + return Err(RepositoryError::VersionConflict { + expected: expected_version, + actual: actual_version, + }); + } + } + } + + let result = sqlx::query_as::<_, Chain>( + r#" + UPDATE chains + SET + name = COALESCE($3, name), + description = COALESCE($4, description), + status = COALESCE($5, status), + loop_enabled = COALESCE($6, loop_enabled), + loop_max_iterations = COALESCE($7, loop_max_iterations), + loop_progress_check = COALESCE($8, loop_progress_check), + version = version + 1, + updated_at = NOW() + WHERE id = $1 AND owner_id = $2 + RETURNING * + "#, + ) + .bind(id) + .bind(owner_id) + .bind(&req.name) + .bind(&req.description) + .bind(&req.status) + .bind(req.loop_enabled) + .bind(req.loop_max_iterations) + .bind(&req.loop_progress_check) + .fetch_one(pool) + .await?; + + Ok(result) +} + +/// Delete (archive) a chain. +pub async fn delete_chain_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + UPDATE chains + SET status = 'archived', updated_at = NOW() + WHERE id = $1 AND owner_id = $2 + "#, + ) + .bind(id) + .bind(owner_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Add a contract to a chain. +pub async fn add_contract_to_chain( + pool: &PgPool, + chain_id: Uuid, + contract_id: Uuid, + depends_on: Vec<Uuid>, + order_index: i32, + editor_x: Option<f64>, + editor_y: Option<f64>, +) -> Result<ChainContract, sqlx::Error> { + // Also update the contract's chain_id + sqlx::query("UPDATE contracts SET chain_id = $1 WHERE id = $2") + .bind(chain_id) + .bind(contract_id) + .execute(pool) + .await?; + + sqlx::query_as::<_, ChainContract>( + r#" + INSERT INTO chain_contracts (chain_id, contract_id, depends_on, order_index, editor_x, editor_y) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (chain_id, contract_id) DO UPDATE SET + depends_on = EXCLUDED.depends_on, + order_index = EXCLUDED.order_index, + editor_x = EXCLUDED.editor_x, + editor_y = EXCLUDED.editor_y + RETURNING * + "#, + ) + .bind(chain_id) + .bind(contract_id) + .bind(&depends_on) + .bind(order_index) + .bind(editor_x) + .bind(editor_y) + .fetch_one(pool) + .await +} + +/// Remove a contract from a chain. +pub async fn remove_contract_from_chain( + pool: &PgPool, + chain_id: Uuid, + contract_id: Uuid, +) -> Result<bool, sqlx::Error> { + // Clear the contract's chain_id + sqlx::query("UPDATE contracts SET chain_id = NULL WHERE id = $1 AND chain_id = $2") + .bind(contract_id) + .bind(chain_id) + .execute(pool) + .await?; + + let result = sqlx::query( + r#" + DELETE FROM chain_contracts + WHERE chain_id = $1 AND contract_id = $2 + "#, + ) + .bind(chain_id) + .bind(contract_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// List contracts in a chain with their details. +pub async fn list_chain_contracts( + pool: &PgPool, + chain_id: Uuid, +) -> Result<Vec<ChainContractDetail>, sqlx::Error> { + sqlx::query_as::<_, ChainContractDetail>( + r#" + SELECT + cc.id as chain_contract_id, + cc.contract_id, + c.name as contract_name, + c.status as contract_status, + c.phase as contract_phase, + cc.depends_on, + cc.order_index, + cc.editor_x, + cc.editor_y + FROM chain_contracts cc + JOIN contracts c ON c.id = cc.contract_id + WHERE cc.chain_id = $1 + ORDER BY cc.order_index ASC + "#, + ) + .bind(chain_id) + .fetch_all(pool) + .await +} + +/// Get chain with all contracts for detail view. +pub async fn get_chain_with_contracts( + pool: &PgPool, + chain_id: Uuid, + owner_id: Uuid, +) -> Result<Option<ChainWithContracts>, sqlx::Error> { + let chain = get_chain_for_owner(pool, chain_id, owner_id).await?; + + match chain { + Some(chain) => { + let contracts = list_chain_contracts(pool, chain_id).await?; + Ok(Some(ChainWithContracts { chain, contracts })) + } + None => Ok(None), + } +} + +/// Get chain graph structure for visualization. +pub async fn get_chain_graph( + pool: &PgPool, + chain_id: Uuid, +) -> Result<Option<ChainGraphResponse>, sqlx::Error> { + let chain = get_chain(pool, chain_id).await?; + + match chain { + Some(chain) => { + let contracts = list_chain_contracts(pool, chain_id).await?; + + let nodes: Vec<ChainGraphNode> = contracts + .iter() + .map(|c| ChainGraphNode { + id: c.chain_contract_id, + contract_id: c.contract_id, + name: c.contract_name.clone(), + status: c.contract_status.clone(), + phase: c.contract_phase.clone(), + x: c.editor_x.unwrap_or(0.0), + y: c.editor_y.unwrap_or(0.0), + }) + .collect(); + + let mut edges: Vec<ChainGraphEdge> = Vec::new(); + for contract in &contracts { + for dep_id in &contract.depends_on { + // Find the chain_contract_id for this dependency + if let Some(dep) = contracts.iter().find(|c| c.contract_id == *dep_id) { + edges.push(ChainGraphEdge { + from: dep.chain_contract_id, + to: contract.chain_contract_id, + }); + } + } + } + + Ok(Some(ChainGraphResponse { + chain_id: chain.id, + chain_name: chain.name, + chain_status: chain.status, + nodes, + edges, + })) + } + None => Ok(None), + } +} + +/// Record a chain event. +pub async fn record_chain_event( + pool: &PgPool, + chain_id: Uuid, + event_type: &str, + contract_id: Option<Uuid>, + event_data: Option<serde_json::Value>, +) -> Result<ChainEvent, sqlx::Error> { + sqlx::query_as::<_, ChainEvent>( + r#" + INSERT INTO chain_events (chain_id, event_type, contract_id, event_data) + VALUES ($1, $2, $3, $4) + RETURNING * + "#, + ) + .bind(chain_id) + .bind(event_type) + .bind(contract_id) + .bind(event_data) + .fetch_one(pool) + .await +} + +/// List chain events. +pub async fn list_chain_events( + pool: &PgPool, + chain_id: Uuid, +) -> Result<Vec<ChainEvent>, sqlx::Error> { + sqlx::query_as::<_, ChainEvent>( + r#" + SELECT * + FROM chain_events + WHERE chain_id = $1 + ORDER BY created_at DESC + "#, + ) + .bind(chain_id) + .fetch_all(pool) + .await +} + +/// Increment chain loop iteration. +pub async fn increment_chain_loop(pool: &PgPool, chain_id: Uuid) -> Result<Chain, sqlx::Error> { + sqlx::query_as::<_, Chain>( + r#" + UPDATE chains + SET loop_current_iteration = COALESCE(loop_current_iteration, 0) + 1, + updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(chain_id) + .fetch_one(pool) + .await +} + +/// Mark a chain as completed. +pub async fn complete_chain(pool: &PgPool, chain_id: Uuid) -> Result<Chain, sqlx::Error> { + sqlx::query_as::<_, Chain>( + r#" + UPDATE chains + SET status = 'completed', + updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(chain_id) + .fetch_one(pool) + .await +} + +/// Get contracts in a chain that have no pending dependencies (ready to start). +/// Returns contracts where all depends_on contracts are completed. +pub async fn get_ready_chain_contracts( + pool: &PgPool, + chain_id: Uuid, +) -> Result<Vec<ChainContractDetail>, sqlx::Error> { + sqlx::query_as::<_, ChainContractDetail>( + r#" + SELECT + cc.id as chain_contract_id, + cc.contract_id, + c.name as contract_name, + c.status as contract_status, + c.phase as contract_phase, + cc.depends_on, + cc.order_index, + cc.editor_x, + cc.editor_y + FROM chain_contracts cc + JOIN contracts c ON c.id = cc.contract_id + WHERE cc.chain_id = $1 + AND c.status = 'active' + AND ( + -- No dependencies + cc.depends_on IS NULL + OR array_length(cc.depends_on, 1) IS NULL + OR array_length(cc.depends_on, 1) = 0 + -- Or all dependencies completed + OR NOT EXISTS ( + SELECT 1 + FROM unnest(cc.depends_on) AS dep_id + JOIN contracts dep ON dep.id = dep_id + WHERE dep.status != 'completed' + ) + ) + ORDER BY cc.order_index ASC + "#, + ) + .bind(chain_id) + .fetch_all(pool) + .await +} + +/// Check if all contracts in a chain are completed. +pub async fn is_chain_complete(pool: &PgPool, chain_id: Uuid) -> Result<bool, sqlx::Error> { + let result: (i64,) = sqlx::query_as( + r#" + SELECT COUNT(*) + FROM chain_contracts cc + JOIN contracts c ON c.id = cc.contract_id + WHERE cc.chain_id = $1 + AND c.status != 'completed' + "#, + ) + .bind(chain_id) + .fetch_one(pool) + .await?; + + Ok(result.0 == 0) +} + +/// Get chain editor data for the GUI editor. +pub async fn get_chain_editor_data( + pool: &PgPool, + chain_id: Uuid, + owner_id: Uuid, +) -> Result<Option<ChainEditorData>, sqlx::Error> { + let chain = get_chain_for_owner(pool, chain_id, owner_id).await?; + + match chain { + Some(chain) => { + let contracts = list_chain_contracts(pool, chain_id).await?; + + // Build nodes + let nodes: Vec<ChainEditorNode> = contracts + .iter() + .map(|c| ChainEditorNode { + id: c.contract_id.to_string(), + x: c.editor_x.unwrap_or(0.0), + y: c.editor_y.unwrap_or(0.0), + contract: ChainEditorContract { + name: c.contract_name.clone(), + description: None, // Would need to join with full contract data + contract_type: "simple".to_string(), + phases: vec!["plan".to_string(), "execute".to_string()], + tasks: vec![], + deliverables: vec![], + }, + }) + .collect(); + + // Build edges + let edges: Vec<ChainEditorEdge> = contracts + .iter() + .flat_map(|c| { + c.depends_on.iter().map(move |dep_id| ChainEditorEdge { + from: dep_id.to_string(), + to: c.contract_id.to_string(), + }) + }) + .collect(); + + Ok(Some(ChainEditorData { + id: Some(chain.id), + name: chain.name, + description: chain.description, + repository_url: chain.repository_url, + local_path: chain.local_path, + loop_enabled: chain.loop_enabled, + loop_max_iterations: chain.loop_max_iterations, + loop_progress_check: chain.loop_progress_check, + nodes, + edges, + })) + } + None => Ok(None), + } +} |
