summaryrefslogtreecommitdiff
path: root/makima/src/db/repository.rs
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-02-03 22:01:29 +0000
committersoryu <soryu@soryu.co>2026-02-03 22:01:37 +0000
commitcf0a25af1d2834bfe6c5ea892ce5769936e5a673 (patch)
tree476ba326ac1752281a441b5c17d2b3be4b23a2a9 /makima/src/db/repository.rs
parent8361916ce67f3d2ba191ebf27cb50e79cb42e39c (diff)
downloadsoryu-cf0a25af1d2834bfe6c5ea892ce5769936e5a673.tar.gz
soryu-cf0a25af1d2834bfe6c5ea892ce5769936e5a673.zip
Add makima chain mechanism
Diffstat (limited to 'makima/src/db/repository.rs')
-rw-r--r--makima/src/db/repository.rs548
1 files changed, 539 insertions, 9 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index 2ecbc4a..48b0714 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -6,15 +6,19 @@ use sqlx::PgPool;
use uuid::Uuid;
use super::models::{
- CheckpointPatch, CheckpointPatchInfo, Contract, ContractChatConversation,
- ContractChatMessageRecord, ContractEvent, ContractRepository, ContractSummary,
- ContractTypeTemplateRecord, ConversationMessage, ConversationSnapshot, CreateContractRequest,
- CreateFileRequest, CreateTaskRequest, CreateTemplateRequest, Daemon, DaemonTaskAssignment,
- DaemonWithCapacity, DeliverableDefinition, File, FileSummary, FileVersion, HistoryEvent,
- HistoryQueryFilters, MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult,
- PhaseConfig, PhaseDefinition, SupervisorHeartbeatRecord, SupervisorState,
- Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest, UpdateFileRequest,
- UpdateTaskRequest, UpdateTemplateRequest,
+ AddContractToChainRequest, Chain, ChainContract, ChainContractDetail, ChainEditorContract,
+ ChainEditorData, ChainEditorDeliverable, ChainEditorEdge, ChainEditorNode, ChainEditorTask,
+ ChainEvent, ChainGraphEdge, ChainGraphNode, ChainGraphResponse, ChainSummary,
+ ChainWithContracts, CheckpointPatch, CheckpointPatchInfo, Contract,
+ ContractChatConversation, ContractChatMessageRecord, ContractEvent, ContractRepository,
+ ContractSummary, ContractTypeTemplateRecord, ConversationMessage, ConversationSnapshot,
+ CreateChainRequest, CreateContractRequest, CreateFileRequest, CreateTaskRequest,
+ CreateTemplateRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity,
+ DeliverableDefinition, File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters,
+ MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult, PhaseConfig, PhaseDefinition,
+ SupervisorHeartbeatRecord, SupervisorState, Task, TaskCheckpoint, TaskEvent, TaskSummary,
+ UpdateChainRequest, UpdateContractRequest, UpdateFileRequest, UpdateTaskRequest,
+ UpdateTemplateRequest,
};
/// Repository error types.
@@ -4896,3 +4900,529 @@ pub async fn sync_supervisor_state(
.fetch_optional(pool)
.await
}
+
+// =============================================================================
+// Chain Operations (DAG of contracts for multi-contract orchestration)
+// =============================================================================
+
+/// Create a new chain for a specific owner.
+pub async fn create_chain_for_owner(
+ pool: &PgPool,
+ owner_id: Uuid,
+ req: CreateChainRequest,
+) -> Result<Chain, sqlx::Error> {
+ let loop_enabled = req.loop_enabled.unwrap_or(false);
+ let loop_max_iterations = req.loop_max_iterations.unwrap_or(10);
+
+ sqlx::query_as::<_, Chain>(
+ r#"
+ INSERT INTO chains (owner_id, name, description, repository_url, local_path, loop_enabled, loop_max_iterations, loop_progress_check)
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
+ RETURNING *
+ "#,
+ )
+ .bind(owner_id)
+ .bind(&req.name)
+ .bind(&req.description)
+ .bind(&req.repository_url)
+ .bind(&req.local_path)
+ .bind(loop_enabled)
+ .bind(loop_max_iterations)
+ .bind(&req.loop_progress_check)
+ .fetch_one(pool)
+ .await
+}
+
+/// Get a chain by ID, scoped to owner.
+pub async fn get_chain_for_owner(
+ pool: &PgPool,
+ id: Uuid,
+ owner_id: Uuid,
+) -> Result<Option<Chain>, sqlx::Error> {
+ sqlx::query_as::<_, Chain>(
+ r#"
+ SELECT *
+ FROM chains
+ WHERE id = $1 AND owner_id = $2
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Get a chain by ID (no owner check - for internal use).
+pub async fn get_chain(pool: &PgPool, id: Uuid) -> Result<Option<Chain>, sqlx::Error> {
+ sqlx::query_as::<_, Chain>(
+ r#"
+ SELECT *
+ FROM chains
+ WHERE id = $1
+ "#,
+ )
+ .bind(id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// List chains for a specific owner.
+pub async fn list_chains_for_owner(
+ pool: &PgPool,
+ owner_id: Uuid,
+) -> Result<Vec<ChainSummary>, sqlx::Error> {
+ sqlx::query_as::<_, ChainSummary>(
+ r#"
+ SELECT
+ c.id,
+ c.name,
+ c.description,
+ c.status,
+ c.loop_enabled,
+ c.loop_current_iteration,
+ COUNT(DISTINCT cc.contract_id) as contract_count,
+ COUNT(DISTINCT CASE WHEN con.status = 'completed' THEN cc.contract_id END) as completed_count,
+ c.version,
+ c.created_at
+ FROM chains c
+ LEFT JOIN chain_contracts cc ON cc.chain_id = c.id
+ LEFT JOIN contracts con ON con.id = cc.contract_id
+ WHERE c.owner_id = $1
+ GROUP BY c.id
+ ORDER BY c.created_at DESC
+ "#,
+ )
+ .bind(owner_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Update a chain.
+pub async fn update_chain_for_owner(
+ pool: &PgPool,
+ id: Uuid,
+ owner_id: Uuid,
+ req: UpdateChainRequest,
+) -> Result<Chain, RepositoryError> {
+ // First get current version if optimistic locking requested
+ if let Some(expected_version) = req.version {
+ let current: Option<(i32,)> = sqlx::query_as(
+ "SELECT version FROM chains WHERE id = $1 AND owner_id = $2",
+ )
+ .bind(id)
+ .bind(owner_id)
+ .fetch_optional(pool)
+ .await?;
+
+ if let Some((actual_version,)) = current {
+ if actual_version != expected_version {
+ return Err(RepositoryError::VersionConflict {
+ expected: expected_version,
+ actual: actual_version,
+ });
+ }
+ }
+ }
+
+ let result = sqlx::query_as::<_, Chain>(
+ r#"
+ UPDATE chains
+ SET
+ name = COALESCE($3, name),
+ description = COALESCE($4, description),
+ status = COALESCE($5, status),
+ loop_enabled = COALESCE($6, loop_enabled),
+ loop_max_iterations = COALESCE($7, loop_max_iterations),
+ loop_progress_check = COALESCE($8, loop_progress_check),
+ version = version + 1,
+ updated_at = NOW()
+ WHERE id = $1 AND owner_id = $2
+ RETURNING *
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .bind(&req.name)
+ .bind(&req.description)
+ .bind(&req.status)
+ .bind(req.loop_enabled)
+ .bind(req.loop_max_iterations)
+ .bind(&req.loop_progress_check)
+ .fetch_one(pool)
+ .await?;
+
+ Ok(result)
+}
+
+/// Delete (archive) a chain.
+pub async fn delete_chain_for_owner(
+ pool: &PgPool,
+ id: Uuid,
+ owner_id: Uuid,
+) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ UPDATE chains
+ SET status = 'archived', updated_at = NOW()
+ WHERE id = $1 AND owner_id = $2
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// Add a contract to a chain.
+pub async fn add_contract_to_chain(
+ pool: &PgPool,
+ chain_id: Uuid,
+ contract_id: Uuid,
+ depends_on: Vec<Uuid>,
+ order_index: i32,
+ editor_x: Option<f64>,
+ editor_y: Option<f64>,
+) -> Result<ChainContract, sqlx::Error> {
+ // Also update the contract's chain_id
+ sqlx::query("UPDATE contracts SET chain_id = $1 WHERE id = $2")
+ .bind(chain_id)
+ .bind(contract_id)
+ .execute(pool)
+ .await?;
+
+ sqlx::query_as::<_, ChainContract>(
+ r#"
+ INSERT INTO chain_contracts (chain_id, contract_id, depends_on, order_index, editor_x, editor_y)
+ VALUES ($1, $2, $3, $4, $5, $6)
+ ON CONFLICT (chain_id, contract_id) DO UPDATE SET
+ depends_on = EXCLUDED.depends_on,
+ order_index = EXCLUDED.order_index,
+ editor_x = EXCLUDED.editor_x,
+ editor_y = EXCLUDED.editor_y
+ RETURNING *
+ "#,
+ )
+ .bind(chain_id)
+ .bind(contract_id)
+ .bind(&depends_on)
+ .bind(order_index)
+ .bind(editor_x)
+ .bind(editor_y)
+ .fetch_one(pool)
+ .await
+}
+
+/// Remove a contract from a chain.
+pub async fn remove_contract_from_chain(
+ pool: &PgPool,
+ chain_id: Uuid,
+ contract_id: Uuid,
+) -> Result<bool, sqlx::Error> {
+ // Clear the contract's chain_id
+ sqlx::query("UPDATE contracts SET chain_id = NULL WHERE id = $1 AND chain_id = $2")
+ .bind(contract_id)
+ .bind(chain_id)
+ .execute(pool)
+ .await?;
+
+ let result = sqlx::query(
+ r#"
+ DELETE FROM chain_contracts
+ WHERE chain_id = $1 AND contract_id = $2
+ "#,
+ )
+ .bind(chain_id)
+ .bind(contract_id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// List contracts in a chain with their details.
+pub async fn list_chain_contracts(
+ pool: &PgPool,
+ chain_id: Uuid,
+) -> Result<Vec<ChainContractDetail>, sqlx::Error> {
+ sqlx::query_as::<_, ChainContractDetail>(
+ r#"
+ SELECT
+ cc.id as chain_contract_id,
+ cc.contract_id,
+ c.name as contract_name,
+ c.status as contract_status,
+ c.phase as contract_phase,
+ cc.depends_on,
+ cc.order_index,
+ cc.editor_x,
+ cc.editor_y
+ FROM chain_contracts cc
+ JOIN contracts c ON c.id = cc.contract_id
+ WHERE cc.chain_id = $1
+ ORDER BY cc.order_index ASC
+ "#,
+ )
+ .bind(chain_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Get chain with all contracts for detail view.
+pub async fn get_chain_with_contracts(
+ pool: &PgPool,
+ chain_id: Uuid,
+ owner_id: Uuid,
+) -> Result<Option<ChainWithContracts>, sqlx::Error> {
+ let chain = get_chain_for_owner(pool, chain_id, owner_id).await?;
+
+ match chain {
+ Some(chain) => {
+ let contracts = list_chain_contracts(pool, chain_id).await?;
+ Ok(Some(ChainWithContracts { chain, contracts }))
+ }
+ None => Ok(None),
+ }
+}
+
+/// Get chain graph structure for visualization.
+pub async fn get_chain_graph(
+ pool: &PgPool,
+ chain_id: Uuid,
+) -> Result<Option<ChainGraphResponse>, sqlx::Error> {
+ let chain = get_chain(pool, chain_id).await?;
+
+ match chain {
+ Some(chain) => {
+ let contracts = list_chain_contracts(pool, chain_id).await?;
+
+ let nodes: Vec<ChainGraphNode> = contracts
+ .iter()
+ .map(|c| ChainGraphNode {
+ id: c.chain_contract_id,
+ contract_id: c.contract_id,
+ name: c.contract_name.clone(),
+ status: c.contract_status.clone(),
+ phase: c.contract_phase.clone(),
+ x: c.editor_x.unwrap_or(0.0),
+ y: c.editor_y.unwrap_or(0.0),
+ })
+ .collect();
+
+ let mut edges: Vec<ChainGraphEdge> = Vec::new();
+ for contract in &contracts {
+ for dep_id in &contract.depends_on {
+ // Find the chain_contract_id for this dependency
+ if let Some(dep) = contracts.iter().find(|c| c.contract_id == *dep_id) {
+ edges.push(ChainGraphEdge {
+ from: dep.chain_contract_id,
+ to: contract.chain_contract_id,
+ });
+ }
+ }
+ }
+
+ Ok(Some(ChainGraphResponse {
+ chain_id: chain.id,
+ chain_name: chain.name,
+ chain_status: chain.status,
+ nodes,
+ edges,
+ }))
+ }
+ None => Ok(None),
+ }
+}
+
+/// Record a chain event.
+pub async fn record_chain_event(
+ pool: &PgPool,
+ chain_id: Uuid,
+ event_type: &str,
+ contract_id: Option<Uuid>,
+ event_data: Option<serde_json::Value>,
+) -> Result<ChainEvent, sqlx::Error> {
+ sqlx::query_as::<_, ChainEvent>(
+ r#"
+ INSERT INTO chain_events (chain_id, event_type, contract_id, event_data)
+ VALUES ($1, $2, $3, $4)
+ RETURNING *
+ "#,
+ )
+ .bind(chain_id)
+ .bind(event_type)
+ .bind(contract_id)
+ .bind(event_data)
+ .fetch_one(pool)
+ .await
+}
+
+/// List chain events.
+pub async fn list_chain_events(
+ pool: &PgPool,
+ chain_id: Uuid,
+) -> Result<Vec<ChainEvent>, sqlx::Error> {
+ sqlx::query_as::<_, ChainEvent>(
+ r#"
+ SELECT *
+ FROM chain_events
+ WHERE chain_id = $1
+ ORDER BY created_at DESC
+ "#,
+ )
+ .bind(chain_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Increment chain loop iteration.
+pub async fn increment_chain_loop(pool: &PgPool, chain_id: Uuid) -> Result<Chain, sqlx::Error> {
+ sqlx::query_as::<_, Chain>(
+ r#"
+ UPDATE chains
+ SET loop_current_iteration = COALESCE(loop_current_iteration, 0) + 1,
+ updated_at = NOW()
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(chain_id)
+ .fetch_one(pool)
+ .await
+}
+
+/// Mark a chain as completed.
+pub async fn complete_chain(pool: &PgPool, chain_id: Uuid) -> Result<Chain, sqlx::Error> {
+ sqlx::query_as::<_, Chain>(
+ r#"
+ UPDATE chains
+ SET status = 'completed',
+ updated_at = NOW()
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(chain_id)
+ .fetch_one(pool)
+ .await
+}
+
+/// Get contracts in a chain that have no pending dependencies (ready to start).
+/// Returns contracts where all depends_on contracts are completed.
+pub async fn get_ready_chain_contracts(
+ pool: &PgPool,
+ chain_id: Uuid,
+) -> Result<Vec<ChainContractDetail>, sqlx::Error> {
+ sqlx::query_as::<_, ChainContractDetail>(
+ r#"
+ SELECT
+ cc.id as chain_contract_id,
+ cc.contract_id,
+ c.name as contract_name,
+ c.status as contract_status,
+ c.phase as contract_phase,
+ cc.depends_on,
+ cc.order_index,
+ cc.editor_x,
+ cc.editor_y
+ FROM chain_contracts cc
+ JOIN contracts c ON c.id = cc.contract_id
+ WHERE cc.chain_id = $1
+ AND c.status = 'active'
+ AND (
+ -- No dependencies
+ cc.depends_on IS NULL
+ OR array_length(cc.depends_on, 1) IS NULL
+ OR array_length(cc.depends_on, 1) = 0
+ -- Or all dependencies completed
+ OR NOT EXISTS (
+ SELECT 1
+ FROM unnest(cc.depends_on) AS dep_id
+ JOIN contracts dep ON dep.id = dep_id
+ WHERE dep.status != 'completed'
+ )
+ )
+ ORDER BY cc.order_index ASC
+ "#,
+ )
+ .bind(chain_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Check if all contracts in a chain are completed.
+pub async fn is_chain_complete(pool: &PgPool, chain_id: Uuid) -> Result<bool, sqlx::Error> {
+ let result: (i64,) = sqlx::query_as(
+ r#"
+ SELECT COUNT(*)
+ FROM chain_contracts cc
+ JOIN contracts c ON c.id = cc.contract_id
+ WHERE cc.chain_id = $1
+ AND c.status != 'completed'
+ "#,
+ )
+ .bind(chain_id)
+ .fetch_one(pool)
+ .await?;
+
+ Ok(result.0 == 0)
+}
+
+/// Get chain editor data for the GUI editor.
+pub async fn get_chain_editor_data(
+ pool: &PgPool,
+ chain_id: Uuid,
+ owner_id: Uuid,
+) -> Result<Option<ChainEditorData>, sqlx::Error> {
+ let chain = get_chain_for_owner(pool, chain_id, owner_id).await?;
+
+ match chain {
+ Some(chain) => {
+ let contracts = list_chain_contracts(pool, chain_id).await?;
+
+ // Build nodes
+ let nodes: Vec<ChainEditorNode> = contracts
+ .iter()
+ .map(|c| ChainEditorNode {
+ id: c.contract_id.to_string(),
+ x: c.editor_x.unwrap_or(0.0),
+ y: c.editor_y.unwrap_or(0.0),
+ contract: ChainEditorContract {
+ name: c.contract_name.clone(),
+ description: None, // Would need to join with full contract data
+ contract_type: "simple".to_string(),
+ phases: vec!["plan".to_string(), "execute".to_string()],
+ tasks: vec![],
+ deliverables: vec![],
+ },
+ })
+ .collect();
+
+ // Build edges
+ let edges: Vec<ChainEditorEdge> = contracts
+ .iter()
+ .flat_map(|c| {
+ c.depends_on.iter().map(move |dep_id| ChainEditorEdge {
+ from: dep_id.to_string(),
+ to: c.contract_id.to_string(),
+ })
+ })
+ .collect();
+
+ Ok(Some(ChainEditorData {
+ id: Some(chain.id),
+ name: chain.name,
+ description: chain.description,
+ repository_url: chain.repository_url,
+ local_path: chain.local_path,
+ loop_enabled: chain.loop_enabled,
+ loop_max_iterations: chain.loop_max_iterations,
+ loop_progress_check: chain.loop_progress_check,
+ nodes,
+ edges,
+ }))
+ }
+ None => Ok(None),
+ }
+}