diff options
Diffstat (limited to 'makima/src/db/repository.rs')
| -rw-r--r-- | makima/src/db/repository.rs | 1400 |
1 files changed, 1337 insertions, 63 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index ce1e97d..3b911c2 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -5,8 +5,12 @@ use sqlx::PgPool; use uuid::Uuid; use super::models::{ - CreateFileRequest, CreateTaskRequest, Daemon, File, FileVersion, MeshChatConversation, - MeshChatMessageRecord, Task, TaskEvent, TaskSummary, UpdateFileRequest, UpdateTaskRequest, + Contract, ContractChatConversation, ContractChatMessageRecord, ContractEvent, ContractRepository, + ContractSummary, CreateCheckpointRequest, CreateContractRequest, CreateFileRequest, + CreateTaskRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, File, FileSummary, + FileVersion, MeshChatConversation, MeshChatMessageRecord, SupervisorState, Task, TaskCheckpoint, + TaskEvent, TaskSummary, UpdateContractRequest, UpdateFileRequest, UpdateSupervisorStateRequest, + UpdateTaskRequest, }; /// Repository error types. @@ -52,8 +56,18 @@ fn generate_default_name() -> String { now.format("Recording - %b %d %Y %H:%M:%S").to_string() } -/// Create a new file record. -pub async fn create_file(pool: &PgPool, req: CreateFileRequest) -> Result<File, sqlx::Error> { +/// Internal request for creating files without contract association (e.g., audio transcription). +/// User-facing file creation should use CreateFileRequest which requires contract_id. +pub struct InternalCreateFileRequest { + pub name: Option<String>, + pub description: Option<String>, + pub transcript: Vec<super::models::TranscriptEntry>, + pub location: Option<String>, +} + +/// Create a new file record (internal use, no contract required). +/// For user-facing file creation, use create_file_for_owner which requires a contract. +pub async fn create_file(pool: &PgPool, req: InternalCreateFileRequest) -> Result<File, sqlx::Error> { let name = req.name.unwrap_or_else(generate_default_name); let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default(); let body_json = serde_json::to_value::<Vec<super::models::BodyElement>>(vec![]).unwrap(); @@ -62,7 +76,7 @@ pub async fn create_file(pool: &PgPool, req: CreateFileRequest) -> Result<File, r#" INSERT INTO files (name, description, transcript, location, summary, body) VALUES ($1, $2, $3, $4, NULL, $5) - RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(&name) @@ -78,7 +92,7 @@ pub async fn create_file(pool: &PgPool, req: CreateFileRequest) -> Result<File, pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Error> { sqlx::query_as::<_, File>( r#" - SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files WHERE id = $1 "#, @@ -92,7 +106,7 @@ pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Err pub async fn list_files(pool: &PgPool) -> Result<Vec<File>, sqlx::Error> { sqlx::query_as::<_, File>( r#" - SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files ORDER BY created_at DESC "#, @@ -144,7 +158,7 @@ pub async fn update_file( UPDATE files SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW() WHERE id = $1 AND version = $7 - RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) @@ -163,7 +177,7 @@ pub async fn update_file( UPDATE files SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW() WHERE id = $1 - RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) @@ -219,6 +233,7 @@ pub async fn count_files(pool: &PgPool) -> Result<i64, sqlx::Error> { // ============================================================================= /// Create a new file record for a specific owner. +/// Files must belong to a contract - the contract_id is required and the phase is looked up. pub async fn create_file_for_owner( pool: &PgPool, owner_id: Uuid, @@ -226,21 +241,38 @@ pub async fn create_file_for_owner( ) -> Result<File, sqlx::Error> { let name = req.name.unwrap_or_else(generate_default_name); let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default(); - let body_json = serde_json::to_value::<Vec<super::models::BodyElement>>(vec![]).unwrap(); + // Use body from request (may be empty or contain template elements) + let body_json = serde_json::to_value(&req.body).unwrap_or_default(); + + // Use provided contract_phase, or look up from contract's current phase + let contract_phase: Option<String> = if req.contract_phase.is_some() { + req.contract_phase + } else { + sqlx::query_scalar( + "SELECT phase FROM contracts WHERE id = $1 AND owner_id = $2", + ) + .bind(req.contract_id) + .bind(owner_id) + .fetch_optional(pool) + .await? + }; sqlx::query_as::<_, File>( r#" - INSERT INTO files (owner_id, name, description, transcript, location, summary, body) - VALUES ($1, $2, $3, $4, $5, NULL, $6) - RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + INSERT INTO files (owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, repo_file_path) + VALUES ($1, $2, $3, $4, $5, $6, $7, NULL, $8, $9) + RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(owner_id) + .bind(req.contract_id) + .bind(&contract_phase) .bind(&name) .bind(&req.description) .bind(&transcript_json) .bind(&req.location) .bind(&body_json) + .bind(&req.repo_file_path) .fetch_one(pool) .await } @@ -253,7 +285,7 @@ pub async fn get_file_for_owner( ) -> Result<Option<File>, sqlx::Error> { sqlx::query_as::<_, File>( r#" - SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files WHERE id = $1 AND owner_id = $2 "#, @@ -268,7 +300,7 @@ pub async fn get_file_for_owner( pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<File>, sqlx::Error> { sqlx::query_as::<_, File>( r#" - SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files WHERE owner_id = $1 ORDER BY created_at DESC @@ -279,6 +311,72 @@ pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<F .await } +/// Database row type for file summary with contract info +#[derive(Debug, sqlx::FromRow)] +struct FileSummaryRow { + id: Uuid, + contract_id: Option<Uuid>, + contract_name: Option<String>, + contract_phase: Option<String>, + name: String, + description: Option<String>, + #[sqlx(json)] + transcript: Vec<crate::db::models::TranscriptEntry>, + version: i32, + repo_file_path: Option<String>, + repo_sync_status: Option<String>, + created_at: chrono::DateTime<chrono::Utc>, + updated_at: chrono::DateTime<chrono::Utc>, +} + +/// List file summaries for an owner with contract info (joined). +pub async fn list_file_summaries_for_owner( + pool: &PgPool, + owner_id: Uuid, +) -> Result<Vec<FileSummary>, sqlx::Error> { + let rows = sqlx::query_as::<_, FileSummaryRow>( + r#" + SELECT + f.id, f.contract_id, c.name as contract_name, f.contract_phase, + f.name, f.description, f.transcript, f.version, + f.repo_file_path, f.repo_sync_status, f.created_at, f.updated_at + FROM files f + LEFT JOIN contracts c ON f.contract_id = c.id + WHERE f.owner_id = $1 + ORDER BY f.created_at DESC + "#, + ) + .bind(owner_id) + .fetch_all(pool) + .await?; + + Ok(rows + .into_iter() + .map(|row| { + let duration = row + .transcript + .iter() + .map(|t| t.end) + .fold(0.0_f32, f32::max); + FileSummary { + id: row.id, + contract_id: row.contract_id, + contract_name: row.contract_name, + contract_phase: row.contract_phase, + name: row.name, + description: row.description, + transcript_count: row.transcript.len(), + duration: if duration > 0.0 { Some(duration) } else { None }, + version: row.version, + repo_file_path: row.repo_file_path, + repo_sync_status: row.repo_sync_status, + created_at: row.created_at, + updated_at: row.updated_at, + } + }) + .collect()) +} + /// Update a file by ID with optimistic locking, scoped to owner. pub async fn update_file_for_owner( pool: &PgPool, @@ -318,7 +416,7 @@ pub async fn update_file_for_owner( UPDATE files SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() WHERE id = $1 AND owner_id = $2 AND version = $8 - RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) @@ -338,7 +436,7 @@ pub async fn update_file_for_owner( UPDATE files SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() WHERE id = $1 AND owner_id = $2 - RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) @@ -511,6 +609,7 @@ pub async fn restore_file_version( summary: target.summary, body: Some(target.body), version: Some(current_version), + repo_file_path: None, }; update_file(pool, file_id, update_req).await @@ -540,26 +639,22 @@ pub async fn count_file_versions(pool: &PgPool, file_id: Uuid) -> Result<i64, sq /// to max 1 (2 levels: orchestrator at depth 0, subtasks at depth 1). /// /// NOTE: completion_action is NOT inherited - subtasks should not auto-merge unless -/// explicitly configured. The orchestrator controls when completion steps happen. +/// explicitly configured. The supervisor controls when completion steps happen. +/// +/// Task spawning is now controlled by supervisors at the application level. +/// Depth is no longer constrained in the database. pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, sqlx::Error> { // Calculate depth and inherit settings from parent if applicable - let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = + let (depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = if let Some(parent_id) = req.parent_task_id { - // Fetch parent task to get depth and inherit repo settings + // Fetch parent task to get depth and inherit settings let parent = get_task(pool, parent_id).await? .ok_or_else(|| sqlx::Error::RowNotFound)?; let new_depth = parent.depth + 1; - // Validate max depth (must be < 2, i.e., 0 or 1 only) - // Orchestrators are at depth 0, subtasks at depth 1 - // Subtasks cannot have their own children - if new_depth >= 2 { - return Err(sqlx::Error::Protocol(format!( - "Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.", - new_depth - ))); - } + // Subtasks inherit contract_id from parent + let contract_id = parent.contract_id.unwrap_or(req.contract_id); // Inherit repo settings if not provided let repo_url = req.repository_url.clone().or(parent.repository_url); @@ -568,14 +663,15 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, let merge_mode = req.merge_mode.clone().or(parent.merge_mode); let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path); // NOTE: completion_action is NOT inherited - subtasks should not auto-merge. - // The orchestrator integrates subtask work from their worktrees. + // The supervisor integrates subtask work from their worktrees. let completion_action = req.completion_action.clone(); - (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) + (new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) } else { - // Top-level task: depth 0 + // Top-level task: depth 0, use contract_id from request ( 0, + req.contract_id, req.repository_url.clone(), req.base_branch.clone(), req.target_branch.clone(), @@ -590,20 +686,22 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, sqlx::query_as::<_, Task>( r#" INSERT INTO tasks ( - parent_task_id, depth, name, description, plan, priority, - repository_url, base_branch, target_branch, merge_mode, + contract_id, parent_task_id, depth, name, description, plan, priority, + is_supervisor, repository_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action, continue_from_task_id, copy_files ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) RETURNING * "#, ) + .bind(contract_id) .bind(req.parent_task_id) .bind(depth) .bind(&req.name) .bind(&req.description) .bind(&req.plan) .bind(req.priority) + .bind(req.is_supervisor) .bind(&repo_url) .bind(&base_branch) .bind(&target_branch) @@ -635,10 +733,13 @@ pub async fn list_tasks(pool: &PgPool) -> Result<Vec<TaskSummary>, sqlx::Error> sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, - t.progress_summary, t.version, t.created_at, t.updated_at, - (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count + t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, + t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, + t.version, t.is_supervisor, t.created_at, t.updated_at FROM tasks t + LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id IS NULL ORDER BY t.priority DESC, t.created_at DESC "#, @@ -652,10 +753,13 @@ pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSum sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, - t.progress_summary, t.version, t.created_at, t.updated_at, - (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count + t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, + t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, + t.version, t.is_supervisor, t.created_at, t.updated_at FROM tasks t + LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id = $1 ORDER BY t.priority DESC, t.created_at DESC "#, @@ -665,6 +769,25 @@ pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSum .await } +/// List all tasks in a contract (for supervisor tree view). +pub async fn list_tasks_by_contract( + pool: &PgPool, + contract_id: Uuid, + owner_id: Uuid, +) -> Result<Vec<Task>, sqlx::Error> { + sqlx::query_as::<_, Task>( + r#" + SELECT * FROM tasks + WHERE contract_id = $1 AND owner_id = $2 + ORDER BY is_supervisor DESC, depth ASC, created_at ASC + "#, + ) + .bind(contract_id) + .bind(owner_id) + .fetch_all(pool) + .await +} + /// Update a task by ID with optimistic locking. pub async fn update_task( pool: &PgPool, @@ -817,9 +940,9 @@ pub async fn create_task_for_owner( req: CreateTaskRequest, ) -> Result<Task, sqlx::Error> { // Calculate depth and inherit settings from parent if applicable - let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = + let (depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = if let Some(parent_id) = req.parent_task_id { - // Fetch parent task to get depth and inherit repo settings (must belong to same owner) + // Fetch parent task to get depth and inherit settings (must belong to same owner) let parent = get_task_for_owner(pool, parent_id, owner_id).await? .ok_or_else(|| sqlx::Error::RowNotFound)?; @@ -833,6 +956,9 @@ pub async fn create_task_for_owner( ))); } + // Subtasks inherit contract_id from parent + let contract_id = parent.contract_id.unwrap_or(req.contract_id); + // Inherit repo settings if not provided let repo_url = req.repository_url.clone().or(parent.repository_url); let base_branch = req.base_branch.clone().or(parent.base_branch); @@ -843,11 +969,12 @@ pub async fn create_task_for_owner( // The orchestrator integrates subtask work from their worktrees. let completion_action = req.completion_action.clone(); - (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) + (new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) } else { - // Top-level task: depth 0 + // Top-level task: depth 0, use contract_id from request ( 0, + req.contract_id, req.repository_url.clone(), req.base_branch.clone(), req.target_branch.clone(), @@ -862,21 +989,23 @@ pub async fn create_task_for_owner( sqlx::query_as::<_, Task>( r#" INSERT INTO tasks ( - owner_id, parent_task_id, depth, name, description, plan, priority, - repository_url, base_branch, target_branch, merge_mode, + owner_id, contract_id, parent_task_id, depth, name, description, plan, priority, + is_supervisor, repository_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action, continue_from_task_id, copy_files ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) RETURNING * "#, ) .bind(owner_id) + .bind(contract_id) .bind(req.parent_task_id) .bind(depth) .bind(&req.name) .bind(&req.description) .bind(&req.plan) .bind(req.priority) + .bind(req.is_supervisor) .bind(&repo_url) .bind(&base_branch) .bind(&target_branch) @@ -916,10 +1045,13 @@ pub async fn list_tasks_for_owner( sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, - t.progress_summary, t.version, t.created_at, t.updated_at, - (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count + t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, + t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, + t.version, t.is_supervisor, t.created_at, t.updated_at FROM tasks t + LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.owner_id = $1 AND t.parent_task_id IS NULL ORDER BY t.priority DESC, t.created_at DESC "#, @@ -938,10 +1070,13 @@ pub async fn list_subtasks_for_owner( sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, - t.progress_summary, t.version, t.created_at, t.updated_at, - (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count + t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, + t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, + t.version, t.is_supervisor, t.created_at, t.updated_at FROM tasks t + LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.owner_id = $1 AND t.parent_task_id = $2 ORDER BY t.priority DESC, t.created_at DESC "#, @@ -986,6 +1121,7 @@ pub async fn update_task_for_owner( let error_message = req.error_message.or(existing.error_message); let merge_mode = req.merge_mode.or(existing.merge_mode); let pr_url = req.pr_url.or(existing.pr_url); + let repository_url = req.repository_url.or(existing.repository_url); let target_repo_path = req.target_repo_path.or(existing.target_repo_path); let completion_action = req.completion_action.or(existing.completion_action); let daemon_id = if req.clear_daemon_id { @@ -1002,8 +1138,9 @@ pub async fn update_task_for_owner( SET name = $3, description = $4, plan = $5, status = $6, priority = $7, progress_summary = $8, last_output = $9, error_message = $10, merge_mode = $11, pr_url = $12, daemon_id = $13, - target_repo_path = $14, completion_action = $15, updated_at = NOW() - WHERE id = $1 AND owner_id = $2 AND version = $16 + target_repo_path = $14, completion_action = $15, repository_url = $16, + updated_at = NOW() + WHERE id = $1 AND owner_id = $2 AND version = $17 RETURNING * "#, ) @@ -1022,6 +1159,7 @@ pub async fn update_task_for_owner( .bind(daemon_id) .bind(&target_repo_path) .bind(&completion_action) + .bind(&repository_url) .bind(req.version.unwrap()) .fetch_optional(pool) .await? @@ -1032,7 +1170,8 @@ pub async fn update_task_for_owner( SET name = $3, description = $4, plan = $5, status = $6, priority = $7, progress_summary = $8, last_output = $9, error_message = $10, merge_mode = $11, pr_url = $12, daemon_id = $13, - target_repo_path = $14, completion_action = $15, updated_at = NOW() + target_repo_path = $14, completion_action = $15, repository_url = $16, + updated_at = NOW() WHERE id = $1 AND owner_id = $2 RETURNING * "#, @@ -1052,6 +1191,7 @@ pub async fn update_task_for_owner( .bind(daemon_id) .bind(&target_repo_path) .bind(&completion_action) + .bind(&repository_url) .fetch_optional(pool) .await? }; @@ -1328,6 +1468,26 @@ pub async fn update_daemon_status( Ok(result.rows_affected() > 0) } +/// Mark daemon as disconnected by connection_id. +pub async fn disconnect_daemon_by_connection( + pool: &PgPool, + connection_id: &str, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + UPDATE daemons + SET status = 'disconnected', + disconnected_at = NOW() + WHERE connection_id = $1 + "#, + ) + .bind(connection_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + /// Update daemon task count. pub async fn update_daemon_task_count( pool: &PgPool, @@ -1393,6 +1553,25 @@ pub async fn count_daemons(pool: &PgPool) -> Result<i64, sqlx::Error> { Ok(result.0) } +/// Delete stale daemons that haven't sent a heartbeat within the timeout. +/// Returns the number of deleted daemons. +pub async fn delete_stale_daemons( + pool: &PgPool, + timeout_seconds: i64, +) -> Result<u64, sqlx::Error> { + let result = sqlx::query( + r#" + DELETE FROM daemons + WHERE last_heartbeat_at < NOW() - INTERVAL '1 second' * $1 + "#, + ) + .bind(timeout_seconds) + .execute(pool) + .await?; + + Ok(result.rows_affected()) +} + // ============================================================================= // Sibling Awareness Functions // ============================================================================= @@ -1408,10 +1587,13 @@ pub async fn list_sibling_tasks( sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, - t.progress_summary, t.version, t.created_at, t.updated_at, - (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count + t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, + t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, + t.version, t.is_supervisor, t.created_at, t.updated_at FROM tasks t + LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id = $1 AND t.id != $2 ORDER BY t.priority DESC, t.created_at DESC "#, @@ -1426,10 +1608,13 @@ pub async fn list_sibling_tasks( sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, - t.progress_summary, t.version, t.created_at, t.updated_at, - (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count + t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, + t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, + t.version, t.is_supervisor, t.created_at, t.updated_at FROM tasks t + LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id IS NULL AND t.id != $1 ORDER BY t.priority DESC, t.created_at DESC "#, @@ -1710,3 +1895,1092 @@ pub async fn clear_conversation(pool: &PgPool, owner_id: Uuid) -> Result<MeshCha // Create new active conversation get_or_create_active_conversation(pool, owner_id).await } + +// ============================================================================= +// Contract Chat History Functions +// ============================================================================= + +/// Get or create the active conversation for a contract. +pub async fn get_or_create_contract_conversation( + pool: &PgPool, + contract_id: Uuid, + owner_id: Uuid, +) -> Result<ContractChatConversation, sqlx::Error> { + // Try to get existing active conversation for this contract + let existing = sqlx::query_as::<_, ContractChatConversation>( + r#" + SELECT * + FROM contract_chat_conversations + WHERE is_active = true AND contract_id = $1 AND owner_id = $2 + LIMIT 1 + "#, + ) + .bind(contract_id) + .bind(owner_id) + .fetch_optional(pool) + .await?; + + if let Some(conv) = existing { + return Ok(conv); + } + + // Create new conversation + sqlx::query_as::<_, ContractChatConversation>( + r#" + INSERT INTO contract_chat_conversations (contract_id, owner_id, is_active) + VALUES ($1, $2, true) + RETURNING * + "#, + ) + .bind(contract_id) + .bind(owner_id) + .fetch_one(pool) + .await +} + +/// List messages for a contract conversation. +pub async fn list_contract_chat_messages( + pool: &PgPool, + conversation_id: Uuid, + limit: Option<i32>, +) -> Result<Vec<ContractChatMessageRecord>, sqlx::Error> { + let limit = limit.unwrap_or(100); + sqlx::query_as::<_, ContractChatMessageRecord>( + r#" + SELECT * + FROM contract_chat_messages + WHERE conversation_id = $1 + ORDER BY created_at ASC + LIMIT $2 + "#, + ) + .bind(conversation_id) + .bind(limit) + .fetch_all(pool) + .await +} + +/// Add a message to a contract conversation. +pub async fn add_contract_chat_message( + pool: &PgPool, + conversation_id: Uuid, + role: &str, + content: &str, + tool_calls: Option<serde_json::Value>, + pending_questions: Option<serde_json::Value>, +) -> Result<ContractChatMessageRecord, sqlx::Error> { + sqlx::query_as::<_, ContractChatMessageRecord>( + r#" + INSERT INTO contract_chat_messages + (conversation_id, role, content, tool_calls, pending_questions) + VALUES ($1, $2, $3, $4, $5) + RETURNING * + "#, + ) + .bind(conversation_id) + .bind(role) + .bind(content) + .bind(tool_calls) + .bind(pending_questions) + .fetch_one(pool) + .await +} + +/// Clear contract conversation (archive existing and create new). +pub async fn clear_contract_conversation( + pool: &PgPool, + contract_id: Uuid, + owner_id: Uuid, +) -> Result<ContractChatConversation, sqlx::Error> { + // Mark existing as inactive for this contract + sqlx::query( + r#" + UPDATE contract_chat_conversations + SET is_active = false, updated_at = NOW() + WHERE is_active = true AND contract_id = $1 AND owner_id = $2 + "#, + ) + .bind(contract_id) + .bind(owner_id) + .execute(pool) + .await?; + + // Create new active conversation + get_or_create_contract_conversation(pool, contract_id, owner_id).await +} + +// ============================================================================= +// Contract Functions (Owner-Scoped) +// ============================================================================= + +/// Create a new contract for a specific owner. +pub async fn create_contract_for_owner( + pool: &PgPool, + owner_id: Uuid, + req: CreateContractRequest, +) -> Result<Contract, sqlx::Error> { + // Use provided initial_phase or default to "research" + let phase = req.initial_phase.as_deref().unwrap_or("research"); + + // Validate the phase + let valid_phases = ["research", "specify", "plan", "execute", "review"]; + if !valid_phases.contains(&phase) { + return Err(sqlx::Error::Protocol(format!( + "Invalid initial_phase '{}'. Must be one of: {}", + phase, + valid_phases.join(", ") + ))); + } + + sqlx::query_as::<_, Contract>( + r#" + INSERT INTO contracts (owner_id, name, description, phase) + VALUES ($1, $2, $3, $4) + RETURNING * + "#, + ) + .bind(owner_id) + .bind(&req.name) + .bind(&req.description) + .bind(phase) + .fetch_one(pool) + .await +} + +/// Get a contract by ID, scoped to owner. +pub async fn get_contract_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, +) -> Result<Option<Contract>, sqlx::Error> { + sqlx::query_as::<_, Contract>( + r#" + SELECT * + FROM contracts + WHERE id = $1 AND owner_id = $2 + "#, + ) + .bind(id) + .bind(owner_id) + .fetch_optional(pool) + .await +} + +/// List all contracts for an owner, ordered by created_at DESC. +pub async fn list_contracts_for_owner( + pool: &PgPool, + owner_id: Uuid, +) -> Result<Vec<ContractSummary>, sqlx::Error> { + sqlx::query_as::<_, ContractSummary>( + r#" + SELECT + c.id, c.name, c.description, c.phase, c.status, + c.version, c.created_at, + (SELECT COUNT(*) FROM files WHERE contract_id = c.id) as file_count, + (SELECT COUNT(*) FROM tasks WHERE contract_id = c.id) as task_count, + (SELECT COUNT(*) FROM contract_repositories WHERE contract_id = c.id) as repository_count + FROM contracts c + WHERE c.owner_id = $1 + ORDER BY c.created_at DESC + "#, + ) + .bind(owner_id) + .fetch_all(pool) + .await +} + +/// Get contract summary by ID. +pub async fn get_contract_summary_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, +) -> Result<Option<ContractSummary>, sqlx::Error> { + sqlx::query_as::<_, ContractSummary>( + r#" + SELECT + c.id, c.name, c.description, c.phase, c.status, + c.version, c.created_at, + (SELECT COUNT(*) FROM files WHERE contract_id = c.id) as file_count, + (SELECT COUNT(*) FROM tasks WHERE contract_id = c.id) as task_count, + (SELECT COUNT(*) FROM contract_repositories WHERE contract_id = c.id) as repository_count + FROM contracts c + WHERE c.id = $1 AND c.owner_id = $2 + "#, + ) + .bind(id) + .bind(owner_id) + .fetch_optional(pool) + .await +} + +/// Update a contract by ID with optimistic locking, scoped to owner. +pub async fn update_contract_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, + req: UpdateContractRequest, +) -> Result<Option<Contract>, RepositoryError> { + let existing = get_contract_for_owner(pool, id, owner_id).await?; + let Some(existing) = existing else { + return Ok(None); + }; + + // Check version if provided (optimistic locking) + if let Some(expected_version) = req.version { + if existing.version != expected_version { + return Err(RepositoryError::VersionConflict { + expected: expected_version, + actual: existing.version, + }); + } + } + + // Apply updates + let name = req.name.unwrap_or(existing.name); + let description = req.description.or(existing.description); + let phase = req.phase.unwrap_or(existing.phase); + let status = req.status.unwrap_or(existing.status); + let supervisor_task_id = req.supervisor_task_id.or(existing.supervisor_task_id); + + let result = if req.version.is_some() { + sqlx::query_as::<_, Contract>( + r#" + UPDATE contracts + SET name = $3, description = $4, phase = $5, status = $6, + supervisor_task_id = $7, version = version + 1, updated_at = NOW() + WHERE id = $1 AND owner_id = $2 AND version = $8 + RETURNING * + "#, + ) + .bind(id) + .bind(owner_id) + .bind(&name) + .bind(&description) + .bind(&phase) + .bind(&status) + .bind(supervisor_task_id) + .bind(req.version.unwrap()) + .fetch_optional(pool) + .await? + } else { + sqlx::query_as::<_, Contract>( + r#" + UPDATE contracts + SET name = $3, description = $4, phase = $5, status = $6, + supervisor_task_id = $7, version = version + 1, updated_at = NOW() + WHERE id = $1 AND owner_id = $2 + RETURNING * + "#, + ) + .bind(id) + .bind(owner_id) + .bind(&name) + .bind(&description) + .bind(&phase) + .bind(&status) + .bind(supervisor_task_id) + .fetch_optional(pool) + .await? + }; + + // If versioned update returned None, there was a race condition + if result.is_none() && req.version.is_some() { + if let Some(current) = get_contract_for_owner(pool, id, owner_id).await? { + return Err(RepositoryError::VersionConflict { + expected: req.version.unwrap(), + actual: current.version, + }); + } + } + + Ok(result) +} + +/// Delete a contract by ID, scoped to owner. +pub async fn delete_contract_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + DELETE FROM contracts + WHERE id = $1 AND owner_id = $2 + "#, + ) + .bind(id) + .bind(owner_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Change contract phase and record event. +pub async fn change_contract_phase_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, + new_phase: &str, +) -> Result<Option<Contract>, sqlx::Error> { + // Get current phase + let existing = get_contract_for_owner(pool, id, owner_id).await?; + let Some(existing) = existing else { + return Ok(None); + }; + + let previous_phase = existing.phase.clone(); + + // Update phase + let contract = sqlx::query_as::<_, Contract>( + r#" + UPDATE contracts + SET phase = $3, version = version + 1, updated_at = NOW() + WHERE id = $1 AND owner_id = $2 + RETURNING * + "#, + ) + .bind(id) + .bind(owner_id) + .bind(new_phase) + .fetch_optional(pool) + .await?; + + // Record event + if contract.is_some() { + sqlx::query( + r#" + INSERT INTO contract_events (contract_id, event_type, previous_phase, new_phase) + VALUES ($1, 'phase_change', $2, $3) + "#, + ) + .bind(id) + .bind(&previous_phase) + .bind(new_phase) + .execute(pool) + .await?; + } + + Ok(contract) +} + +// ============================================================================= +// Contract Repository Functions +// ============================================================================= + +/// List repositories for a contract. +pub async fn list_contract_repositories( + pool: &PgPool, + contract_id: Uuid, +) -> Result<Vec<ContractRepository>, sqlx::Error> { + sqlx::query_as::<_, ContractRepository>( + r#" + SELECT * + FROM contract_repositories + WHERE contract_id = $1 + ORDER BY is_primary DESC, created_at ASC + "#, + ) + .bind(contract_id) + .fetch_all(pool) + .await +} + +/// Add a remote repository to a contract. +pub async fn add_remote_repository( + pool: &PgPool, + contract_id: Uuid, + name: &str, + repository_url: &str, + is_primary: bool, +) -> Result<ContractRepository, sqlx::Error> { + // If is_primary, clear other primaries first + if is_primary { + sqlx::query( + r#" + UPDATE contract_repositories + SET is_primary = false, updated_at = NOW() + WHERE contract_id = $1 AND is_primary = true + "#, + ) + .bind(contract_id) + .execute(pool) + .await?; + } + + sqlx::query_as::<_, ContractRepository>( + r#" + INSERT INTO contract_repositories (contract_id, name, repository_url, source_type, status, is_primary) + VALUES ($1, $2, $3, 'remote', 'ready', $4) + RETURNING * + "#, + ) + .bind(contract_id) + .bind(name) + .bind(repository_url) + .bind(is_primary) + .fetch_one(pool) + .await +} + +/// Add a local repository to a contract. +pub async fn add_local_repository( + pool: &PgPool, + contract_id: Uuid, + name: &str, + local_path: &str, + is_primary: bool, +) -> Result<ContractRepository, sqlx::Error> { + // If is_primary, clear other primaries first + if is_primary { + sqlx::query( + r#" + UPDATE contract_repositories + SET is_primary = false, updated_at = NOW() + WHERE contract_id = $1 AND is_primary = true + "#, + ) + .bind(contract_id) + .execute(pool) + .await?; + } + + sqlx::query_as::<_, ContractRepository>( + r#" + INSERT INTO contract_repositories (contract_id, name, local_path, source_type, status, is_primary) + VALUES ($1, $2, $3, 'local', 'ready', $4) + RETURNING * + "#, + ) + .bind(contract_id) + .bind(name) + .bind(local_path) + .bind(is_primary) + .fetch_one(pool) + .await +} + +/// Create a managed repository (daemon will create it). +pub async fn create_managed_repository( + pool: &PgPool, + contract_id: Uuid, + name: &str, + is_primary: bool, +) -> Result<ContractRepository, sqlx::Error> { + // If is_primary, clear other primaries first + if is_primary { + sqlx::query( + r#" + UPDATE contract_repositories + SET is_primary = false, updated_at = NOW() + WHERE contract_id = $1 AND is_primary = true + "#, + ) + .bind(contract_id) + .execute(pool) + .await?; + } + + sqlx::query_as::<_, ContractRepository>( + r#" + INSERT INTO contract_repositories (contract_id, name, source_type, status, is_primary) + VALUES ($1, $2, 'managed', 'pending', $3) + RETURNING * + "#, + ) + .bind(contract_id) + .bind(name) + .bind(is_primary) + .fetch_one(pool) + .await +} + +/// Delete a repository from a contract. +pub async fn delete_contract_repository( + pool: &PgPool, + repo_id: Uuid, + contract_id: Uuid, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + DELETE FROM contract_repositories + WHERE id = $1 AND contract_id = $2 + "#, + ) + .bind(repo_id) + .bind(contract_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Set a repository as primary (and clear others). +pub async fn set_repository_primary( + pool: &PgPool, + repo_id: Uuid, + contract_id: Uuid, +) -> Result<bool, sqlx::Error> { + // Clear other primaries + sqlx::query( + r#" + UPDATE contract_repositories + SET is_primary = false, updated_at = NOW() + WHERE contract_id = $1 AND is_primary = true + "#, + ) + .bind(contract_id) + .execute(pool) + .await?; + + // Set this one as primary + let result = sqlx::query( + r#" + UPDATE contract_repositories + SET is_primary = true, updated_at = NOW() + WHERE id = $1 AND contract_id = $2 + "#, + ) + .bind(repo_id) + .bind(contract_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Update managed repository status (used by daemon). +pub async fn update_managed_repository_status( + pool: &PgPool, + repo_id: Uuid, + status: &str, + repository_url: Option<&str>, +) -> Result<Option<ContractRepository>, sqlx::Error> { + sqlx::query_as::<_, ContractRepository>( + r#" + UPDATE contract_repositories + SET status = $2, repository_url = COALESCE($3, repository_url), updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(repo_id) + .bind(status) + .bind(repository_url) + .fetch_optional(pool) + .await +} + +// ============================================================================= +// Contract Task Association Functions +// ============================================================================= + +/// Add a task to a contract. +pub async fn add_task_to_contract( + pool: &PgPool, + contract_id: Uuid, + task_id: Uuid, + owner_id: Uuid, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + UPDATE tasks + SET contract_id = $2, updated_at = NOW() + WHERE id = $1 AND owner_id = $3 + "#, + ) + .bind(task_id) + .bind(contract_id) + .bind(owner_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Remove a task from a contract. +pub async fn remove_task_from_contract( + pool: &PgPool, + contract_id: Uuid, + task_id: Uuid, + owner_id: Uuid, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + UPDATE tasks + SET contract_id = NULL, updated_at = NOW() + WHERE id = $1 AND contract_id = $2 AND owner_id = $3 + "#, + ) + .bind(task_id) + .bind(contract_id) + .bind(owner_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// List files in a contract. +pub async fn list_files_in_contract( + pool: &PgPool, + contract_id: Uuid, + owner_id: Uuid, +) -> Result<Vec<FileSummary>, sqlx::Error> { + // Use a manual query since FileSummary doesn't have a FromRow derive with all the computed fields + let files = sqlx::query_as::<_, File>( + r#" + SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at + FROM files + WHERE contract_id = $1 AND owner_id = $2 + ORDER BY created_at DESC + "#, + ) + .bind(contract_id) + .bind(owner_id) + .fetch_all(pool) + .await?; + + Ok(files.into_iter().map(FileSummary::from).collect()) +} + +/// List tasks in a contract. +pub async fn list_tasks_in_contract( + pool: &PgPool, + contract_id: Uuid, + owner_id: Uuid, +) -> Result<Vec<TaskSummary>, sqlx::Error> { + sqlx::query_as::<_, TaskSummary>( + r#" + SELECT + t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, + t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, + t.version, t.is_supervisor, t.created_at, t.updated_at + FROM tasks t + LEFT JOIN contracts c ON t.contract_id = c.id + WHERE t.contract_id = $1 AND t.owner_id = $2 + ORDER BY t.priority DESC, t.created_at DESC + "#, + ) + .bind(contract_id) + .bind(owner_id) + .fetch_all(pool) + .await +} + +// ============================================================================= +// Contract Events +// ============================================================================= + +/// List events for a contract. +pub async fn list_contract_events( + pool: &PgPool, + contract_id: Uuid, +) -> Result<Vec<ContractEvent>, sqlx::Error> { + sqlx::query_as::<_, ContractEvent>( + r#" + SELECT * + FROM contract_events + WHERE contract_id = $1 + ORDER BY created_at DESC + "#, + ) + .bind(contract_id) + .fetch_all(pool) + .await +} + +/// Record a contract event. +pub async fn record_contract_event( + pool: &PgPool, + contract_id: Uuid, + event_type: &str, + event_data: Option<serde_json::Value>, +) -> Result<ContractEvent, sqlx::Error> { + sqlx::query_as::<_, ContractEvent>( + r#" + INSERT INTO contract_events (contract_id, event_type, event_data) + VALUES ($1, $2, $3) + RETURNING * + "#, + ) + .bind(contract_id) + .bind(event_type) + .bind(event_data) + .fetch_one(pool) + .await +} + +// ============================================================================ +// Task Checkpoints +// ============================================================================ + +/// Create a checkpoint for a task. +pub async fn create_task_checkpoint( + pool: &PgPool, + task_id: Uuid, + commit_sha: &str, + branch_name: &str, + message: &str, + files_changed: Option<serde_json::Value>, + lines_added: Option<i32>, + lines_removed: Option<i32>, +) -> Result<TaskCheckpoint, sqlx::Error> { + // Get current checkpoint count and increment + let checkpoint_number: i32 = sqlx::query_scalar( + "SELECT COALESCE(MAX(checkpoint_number), 0) + 1 FROM task_checkpoints WHERE task_id = $1", + ) + .bind(task_id) + .fetch_one(pool) + .await?; + + // Update task's checkpoint tracking + sqlx::query( + r#" + UPDATE tasks + SET last_checkpoint_sha = $1, + checkpoint_count = $2, + checkpoint_message = $3, + updated_at = NOW() + WHERE id = $4 + "#, + ) + .bind(commit_sha) + .bind(checkpoint_number) + .bind(message) + .bind(task_id) + .execute(pool) + .await?; + + sqlx::query_as::<_, TaskCheckpoint>( + r#" + INSERT INTO task_checkpoints ( + task_id, checkpoint_number, commit_sha, branch_name, message, + files_changed, lines_added, lines_removed + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + RETURNING * + "#, + ) + .bind(task_id) + .bind(checkpoint_number) + .bind(commit_sha) + .bind(branch_name) + .bind(message) + .bind(files_changed) + .bind(lines_added) + .bind(lines_removed) + .fetch_one(pool) + .await +} + +/// Get a checkpoint by ID. +pub async fn get_task_checkpoint( + pool: &PgPool, + id: Uuid, +) -> Result<Option<TaskCheckpoint>, sqlx::Error> { + sqlx::query_as::<_, TaskCheckpoint>("SELECT * FROM task_checkpoints WHERE id = $1") + .bind(id) + .fetch_optional(pool) + .await +} + +/// Get a checkpoint by commit SHA. +pub async fn get_task_checkpoint_by_sha( + pool: &PgPool, + commit_sha: &str, +) -> Result<Option<TaskCheckpoint>, sqlx::Error> { + sqlx::query_as::<_, TaskCheckpoint>("SELECT * FROM task_checkpoints WHERE commit_sha = $1") + .bind(commit_sha) + .fetch_optional(pool) + .await +} + +/// List checkpoints for a task. +pub async fn list_task_checkpoints( + pool: &PgPool, + task_id: Uuid, +) -> Result<Vec<TaskCheckpoint>, sqlx::Error> { + sqlx::query_as::<_, TaskCheckpoint>( + "SELECT * FROM task_checkpoints WHERE task_id = $1 ORDER BY checkpoint_number DESC", + ) + .bind(task_id) + .fetch_all(pool) + .await +} + +// ============================================================================ +// Supervisor State +// ============================================================================ + +/// Create or update supervisor state for a contract. +pub async fn upsert_supervisor_state( + pool: &PgPool, + contract_id: Uuid, + task_id: Uuid, + conversation_history: serde_json::Value, + pending_task_ids: &[Uuid], + phase: &str, +) -> Result<SupervisorState, sqlx::Error> { + sqlx::query_as::<_, SupervisorState>( + r#" + INSERT INTO supervisor_states (contract_id, task_id, conversation_history, pending_task_ids, phase, last_activity) + VALUES ($1, $2, $3, $4, $5, NOW()) + ON CONFLICT (contract_id) DO UPDATE SET + task_id = EXCLUDED.task_id, + conversation_history = EXCLUDED.conversation_history, + pending_task_ids = EXCLUDED.pending_task_ids, + phase = EXCLUDED.phase, + last_activity = NOW(), + updated_at = NOW() + RETURNING * + "#, + ) + .bind(contract_id) + .bind(task_id) + .bind(conversation_history) + .bind(pending_task_ids) + .bind(phase) + .fetch_one(pool) + .await +} + +/// Get supervisor state for a contract. +pub async fn get_supervisor_state( + pool: &PgPool, + contract_id: Uuid, +) -> Result<Option<SupervisorState>, sqlx::Error> { + sqlx::query_as::<_, SupervisorState>("SELECT * FROM supervisor_states WHERE contract_id = $1") + .bind(contract_id) + .fetch_optional(pool) + .await +} + +/// Get supervisor state by task ID. +pub async fn get_supervisor_state_by_task( + pool: &PgPool, + task_id: Uuid, +) -> Result<Option<SupervisorState>, sqlx::Error> { + sqlx::query_as::<_, SupervisorState>("SELECT * FROM supervisor_states WHERE task_id = $1") + .bind(task_id) + .fetch_optional(pool) + .await +} + +/// Update supervisor conversation history. +pub async fn update_supervisor_conversation( + pool: &PgPool, + contract_id: Uuid, + conversation_history: serde_json::Value, +) -> Result<SupervisorState, sqlx::Error> { + sqlx::query_as::<_, SupervisorState>( + r#" + UPDATE supervisor_states + SET conversation_history = $1, + last_activity = NOW(), + updated_at = NOW() + WHERE contract_id = $2 + RETURNING * + "#, + ) + .bind(conversation_history) + .bind(contract_id) + .fetch_one(pool) + .await +} + +/// Update supervisor pending tasks. +pub async fn update_supervisor_pending_tasks( + pool: &PgPool, + contract_id: Uuid, + pending_task_ids: &[Uuid], +) -> Result<SupervisorState, sqlx::Error> { + sqlx::query_as::<_, SupervisorState>( + r#" + UPDATE supervisor_states + SET pending_task_ids = $1, + last_activity = NOW(), + updated_at = NOW() + WHERE contract_id = $2 + RETURNING * + "#, + ) + .bind(pending_task_ids) + .bind(contract_id) + .fetch_one(pool) + .await +} + +// ============================================================================ +// Contract Supervisor +// ============================================================================ + +/// Update contract's supervisor task ID. +pub async fn update_contract_supervisor( + pool: &PgPool, + contract_id: Uuid, + supervisor_task_id: Uuid, +) -> Result<Contract, sqlx::Error> { + sqlx::query_as::<_, Contract>( + r#" + UPDATE contracts + SET supervisor_task_id = $1, + updated_at = NOW() + WHERE id = $2 + RETURNING * + "#, + ) + .bind(supervisor_task_id) + .bind(contract_id) + .fetch_one(pool) + .await +} + +/// Get the supervisor task for a contract. +pub async fn get_contract_supervisor_task( + pool: &PgPool, + contract_id: Uuid, +) -> Result<Option<Task>, sqlx::Error> { + sqlx::query_as::<_, Task>( + r#" + SELECT t.* FROM tasks t + JOIN contracts c ON c.supervisor_task_id = t.id + WHERE c.id = $1 + "#, + ) + .bind(contract_id) + .fetch_optional(pool) + .await +} + +// ============================================================================ +// Task Tree Queries +// ============================================================================ + +/// Get full task tree for a contract. +pub async fn get_contract_task_tree( + pool: &PgPool, + contract_id: Uuid, +) -> Result<Vec<Task>, sqlx::Error> { + sqlx::query_as::<_, Task>( + r#" + WITH RECURSIVE task_tree AS ( + -- Base case: root tasks (no parent) + SELECT * FROM tasks + WHERE contract_id = $1 AND parent_task_id IS NULL + UNION ALL + -- Recursive case: children of current level + SELECT t.* FROM tasks t + JOIN task_tree tt ON t.parent_task_id = tt.id + ) + SELECT * FROM task_tree + ORDER BY depth, created_at + "#, + ) + .bind(contract_id) + .fetch_all(pool) + .await +} + +/// Get task tree from a specific root task. +pub async fn get_task_tree(pool: &PgPool, root_task_id: Uuid) -> Result<Vec<Task>, sqlx::Error> { + sqlx::query_as::<_, Task>( + r#" + WITH RECURSIVE task_tree AS ( + -- Base case: the root task + SELECT * FROM tasks WHERE id = $1 + UNION ALL + -- Recursive case: children of current level + SELECT t.* FROM tasks t + JOIN task_tree tt ON t.parent_task_id = tt.id + ) + SELECT * FROM task_tree + ORDER BY depth, created_at + "#, + ) + .bind(root_task_id) + .fetch_all(pool) + .await +} + +// ============================================================================ +// Daemon Selection +// ============================================================================ + +/// Get daemons with capacity info for selection. +pub async fn get_available_daemons( + pool: &PgPool, + owner_id: Uuid, +) -> Result<Vec<DaemonWithCapacity>, sqlx::Error> { + sqlx::query_as::<_, DaemonWithCapacity>( + r#" + SELECT id, owner_id, connection_id, hostname, machine_id, + max_concurrent_tasks, current_task_count, + capacity_score, task_queue_length, supports_migration, + status, last_heartbeat_at, connected_at + FROM daemons + WHERE owner_id = $1 AND status = 'connected' + ORDER BY + COALESCE(capacity_score, 100) DESC, + (max_concurrent_tasks - current_task_count) DESC, + COALESCE(task_queue_length, 0) ASC + "#, + ) + .bind(owner_id) + .fetch_all(pool) + .await +} + +/// Create a daemon task assignment. +pub async fn create_daemon_task_assignment( + pool: &PgPool, + daemon_id: Uuid, + task_id: Uuid, +) -> Result<DaemonTaskAssignment, sqlx::Error> { + sqlx::query_as::<_, DaemonTaskAssignment>( + r#" + INSERT INTO daemon_task_assignments (daemon_id, task_id) + VALUES ($1, $2) + RETURNING * + "#, + ) + .bind(daemon_id) + .bind(task_id) + .fetch_one(pool) + .await +} + +/// Update daemon task assignment status. +pub async fn update_daemon_task_assignment_status( + pool: &PgPool, + task_id: Uuid, + status: &str, +) -> Result<DaemonTaskAssignment, sqlx::Error> { + sqlx::query_as::<_, DaemonTaskAssignment>( + r#" + UPDATE daemon_task_assignments + SET status = $1 + WHERE task_id = $2 + RETURNING * + "#, + ) + .bind(status) + .bind(task_id) + .fetch_one(pool) + .await +} + +/// Get daemon task assignment for a task. +pub async fn get_daemon_task_assignment( + pool: &PgPool, + task_id: Uuid, +) -> Result<Option<DaemonTaskAssignment>, sqlx::Error> { + sqlx::query_as::<_, DaemonTaskAssignment>( + "SELECT * FROM daemon_task_assignments WHERE task_id = $1", + ) + .bind(task_id) + .fetch_optional(pool) + .await +} |
