//! Repository pattern for file database operations. use chrono::Utc; use serde::Deserialize; use sqlx::PgPool; use uuid::Uuid; use super::models::{ Contract, ContractChatConversation, ContractChatMessageRecord, ContractEvent, ContractRepository, ContractSummary, ConversationMessage, ConversationSnapshot, CreateContractRequest, CreateFileRequest, CreateTaskRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters, MeshChatConversation, MeshChatMessageRecord, SupervisorState, Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest, UpdateFileRequest, UpdateTaskRequest, }; /// Repository error types. #[derive(Debug)] pub enum RepositoryError { /// Database error Database(sqlx::Error), /// Version conflict (optimistic locking failure) VersionConflict { /// The version the client expected expected: i32, /// The actual current version in the database actual: i32, }, } impl From for RepositoryError { fn from(e: sqlx::Error) -> Self { RepositoryError::Database(e) } } impl std::fmt::Display for RepositoryError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { RepositoryError::Database(e) => write!(f, "Database error: {}", e), RepositoryError::VersionConflict { expected, actual } => { write!( f, "Version conflict: expected {}, actual {}", expected, actual ) } } } } impl std::error::Error for RepositoryError {} /// Generate a default name based on current timestamp. fn generate_default_name() -> String { let now = Utc::now(); now.format("Recording - %b %d %Y %H:%M:%S").to_string() } /// Internal request for creating files without contract association (e.g., audio transcription). /// User-facing file creation should use CreateFileRequest which requires contract_id. pub struct InternalCreateFileRequest { pub name: Option, pub description: Option, pub transcript: Vec, pub location: Option, } /// Create a new file record (internal use, no contract required). /// For user-facing file creation, use create_file_for_owner which requires a contract. pub async fn create_file(pool: &PgPool, req: InternalCreateFileRequest) -> Result { let name = req.name.unwrap_or_else(generate_default_name); let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default(); let body_json = serde_json::to_value::>(vec![]).unwrap(); sqlx::query_as::<_, File>( r#" INSERT INTO files (name, description, transcript, location, summary, body) VALUES ($1, $2, $3, $4, NULL, $5) RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(&name) .bind(&req.description) .bind(&transcript_json) .bind(&req.location) .bind(&body_json) .fetch_one(pool) .await } /// Get a file by ID. pub async fn get_file(pool: &PgPool, id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, File>( r#" SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files WHERE id = $1 "#, ) .bind(id) .fetch_optional(pool) .await } /// List all files, ordered by created_at DESC. pub async fn list_files(pool: &PgPool) -> Result, sqlx::Error> { sqlx::query_as::<_, File>( r#" SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files ORDER BY created_at DESC "#, ) .fetch_all(pool) .await } /// Update a file by ID with optimistic locking. /// /// If `req.version` is provided, the update will only succeed if the current /// version matches. Returns `RepositoryError::VersionConflict` if there's a mismatch. /// /// If `req.version` is None (e.g., internal system updates), version checking is skipped. pub async fn update_file( pool: &PgPool, id: Uuid, req: UpdateFileRequest, ) -> Result, RepositoryError> { // Get the existing file first let existing = get_file(pool, id).await?; let Some(existing) = existing else { return Ok(None); }; // Check version if provided (optimistic locking) if let Some(expected_version) = req.version { if existing.version != expected_version { return Err(RepositoryError::VersionConflict { expected: expected_version, actual: existing.version, }); } } // Apply updates let name = req.name.unwrap_or(existing.name); let description = req.description.or(existing.description); let transcript = req.transcript.unwrap_or(existing.transcript); let transcript_json = serde_json::to_value(&transcript).unwrap_or_default(); let summary = req.summary.or(existing.summary); let body = req.body.unwrap_or(existing.body); let body_json = serde_json::to_value(&body).unwrap_or_default(); // Update with version check in WHERE clause for race condition safety let result = if req.version.is_some() { sqlx::query_as::<_, File>( r#" UPDATE files SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW() WHERE id = $1 AND version = $7 RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) .bind(&name) .bind(&description) .bind(&transcript_json) .bind(&summary) .bind(&body_json) .bind(req.version.unwrap()) .fetch_optional(pool) .await? } else { // No version check for internal updates sqlx::query_as::<_, File>( r#" UPDATE files SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW() WHERE id = $1 RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) .bind(&name) .bind(&description) .bind(&transcript_json) .bind(&summary) .bind(&body_json) .fetch_optional(pool) .await? }; // If versioned update returned None, there was a race condition if result.is_none() && req.version.is_some() { // Re-fetch to get the actual version if let Some(current) = get_file(pool, id).await? { return Err(RepositoryError::VersionConflict { expected: req.version.unwrap(), actual: current.version, }); } } Ok(result) } /// Delete a file by ID. pub async fn delete_file(pool: &PgPool, id: Uuid) -> Result { let result = sqlx::query( r#" DELETE FROM files WHERE id = $1 "#, ) .bind(id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Count total files. pub async fn count_files(pool: &PgPool) -> Result { let result: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM files") .fetch_one(pool) .await?; Ok(result.0) } // ============================================================================= // Owner-Scoped File Functions // ============================================================================= /// Create a new file record for a specific owner. /// Files must belong to a contract - the contract_id is required and the phase is looked up. pub async fn create_file_for_owner( pool: &PgPool, owner_id: Uuid, req: CreateFileRequest, ) -> Result { let name = req.name.unwrap_or_else(generate_default_name); let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default(); // Use body from request (may be empty or contain template elements) let body_json = serde_json::to_value(&req.body).unwrap_or_default(); // Use provided contract_phase, or look up from contract's current phase let contract_phase: Option = if req.contract_phase.is_some() { req.contract_phase } else { sqlx::query_scalar( "SELECT phase FROM contracts WHERE id = $1 AND owner_id = $2", ) .bind(req.contract_id) .bind(owner_id) .fetch_optional(pool) .await? }; sqlx::query_as::<_, File>( r#" INSERT INTO files (owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, repo_file_path) VALUES ($1, $2, $3, $4, $5, $6, $7, NULL, $8, $9) RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(owner_id) .bind(req.contract_id) .bind(&contract_phase) .bind(&name) .bind(&req.description) .bind(&transcript_json) .bind(&req.location) .bind(&body_json) .bind(&req.repo_file_path) .fetch_one(pool) .await } /// Get a file by ID, scoped to owner. pub async fn get_file_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, File>( r#" SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await } /// List all files for an owner, ordered by created_at DESC. pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, File>( r#" SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files WHERE owner_id = $1 ORDER BY created_at DESC "#, ) .bind(owner_id) .fetch_all(pool) .await } /// Database row type for file summary with contract info #[derive(Debug, sqlx::FromRow)] struct FileSummaryRow { id: Uuid, contract_id: Option, contract_name: Option, contract_phase: Option, name: String, description: Option, #[sqlx(json)] transcript: Vec, version: i32, repo_file_path: Option, repo_sync_status: Option, created_at: chrono::DateTime, updated_at: chrono::DateTime, } /// List file summaries for an owner with contract info (joined). pub async fn list_file_summaries_for_owner( pool: &PgPool, owner_id: Uuid, ) -> Result, sqlx::Error> { let rows = sqlx::query_as::<_, FileSummaryRow>( r#" SELECT f.id, f.contract_id, c.name as contract_name, f.contract_phase, f.name, f.description, f.transcript, f.version, f.repo_file_path, f.repo_sync_status, f.created_at, f.updated_at FROM files f LEFT JOIN contracts c ON f.contract_id = c.id WHERE f.owner_id = $1 ORDER BY f.created_at DESC "#, ) .bind(owner_id) .fetch_all(pool) .await?; Ok(rows .into_iter() .map(|row| { let duration = row .transcript .iter() .map(|t| t.end) .fold(0.0_f32, f32::max); FileSummary { id: row.id, contract_id: row.contract_id, contract_name: row.contract_name, contract_phase: row.contract_phase, name: row.name, description: row.description, transcript_count: row.transcript.len(), duration: if duration > 0.0 { Some(duration) } else { None }, version: row.version, repo_file_path: row.repo_file_path, repo_sync_status: row.repo_sync_status, created_at: row.created_at, updated_at: row.updated_at, } }) .collect()) } /// Update a file by ID with optimistic locking, scoped to owner. pub async fn update_file_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, req: UpdateFileRequest, ) -> Result, RepositoryError> { // Get the existing file first (scoped to owner) let existing = get_file_for_owner(pool, id, owner_id).await?; let Some(existing) = existing else { return Ok(None); }; // Check version if provided (optimistic locking) if let Some(expected_version) = req.version { if existing.version != expected_version { return Err(RepositoryError::VersionConflict { expected: expected_version, actual: existing.version, }); } } // Apply updates let name = req.name.unwrap_or(existing.name); let description = req.description.or(existing.description); let transcript = req.transcript.unwrap_or(existing.transcript); let transcript_json = serde_json::to_value(&transcript).unwrap_or_default(); let summary = req.summary.or(existing.summary); let body = req.body.unwrap_or(existing.body); let body_json = serde_json::to_value(&body).unwrap_or_default(); // Update with version check in WHERE clause for race condition safety let result = if req.version.is_some() { sqlx::query_as::<_, File>( r#" UPDATE files SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() WHERE id = $1 AND owner_id = $2 AND version = $8 RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) .bind(owner_id) .bind(&name) .bind(&description) .bind(&transcript_json) .bind(&summary) .bind(&body_json) .bind(req.version.unwrap()) .fetch_optional(pool) .await? } else { // No version check for internal updates sqlx::query_as::<_, File>( r#" UPDATE files SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() WHERE id = $1 AND owner_id = $2 RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) .bind(owner_id) .bind(&name) .bind(&description) .bind(&transcript_json) .bind(&summary) .bind(&body_json) .fetch_optional(pool) .await? }; // If versioned update returned None, there was a race condition if result.is_none() && req.version.is_some() { // Re-fetch to get the actual version if let Some(current) = get_file_for_owner(pool, id, owner_id).await? { return Err(RepositoryError::VersionConflict { expected: req.version.unwrap(), actual: current.version, }); } } Ok(result) } /// Delete a file by ID, scoped to owner. pub async fn delete_file_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result { let result = sqlx::query( r#" DELETE FROM files WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } // ============================================================================= // Version History Functions // ============================================================================= /// Set the version source for the current transaction. /// This is used by the trigger to record who made the change. pub async fn set_version_source(pool: &PgPool, source: &str) -> Result<(), sqlx::Error> { sqlx::query(&format!("SET LOCAL app.version_source = '{}'", source)) .execute(pool) .await?; Ok(()) } /// Set the change description for the current transaction. pub async fn set_change_description(pool: &PgPool, description: &str) -> Result<(), sqlx::Error> { // Escape single quotes for SQL let escaped = description.replace('\'', "''"); sqlx::query(&format!("SET LOCAL app.change_description = '{}'", escaped)) .execute(pool) .await?; Ok(()) } /// List all versions of a file, ordered by version DESC. pub async fn list_file_versions(pool: &PgPool, file_id: Uuid) -> Result, sqlx::Error> { // First get the current version from the files table let current = get_file(pool, file_id).await?; let mut versions = sqlx::query_as::<_, FileVersion>( r#" SELECT id, file_id, version, name, description, summary, body, source, change_description, created_at FROM file_versions WHERE file_id = $1 ORDER BY version DESC "#, ) .bind(file_id) .fetch_all(pool) .await?; // Add the current version as the first entry if it exists if let Some(file) = current { let current_version = FileVersion { id: file.id, file_id: file.id, version: file.version, name: file.name, description: file.description, summary: file.summary, body: file.body, source: "user".to_string(), // Current version source change_description: None, created_at: file.updated_at, }; versions.insert(0, current_version); } Ok(versions) } /// Get a specific version of a file. pub async fn get_file_version( pool: &PgPool, file_id: Uuid, version: i32, ) -> Result, sqlx::Error> { // First check if this is the current version if let Some(file) = get_file(pool, file_id).await? { if file.version == version { return Ok(Some(FileVersion { id: file.id, file_id: file.id, version: file.version, name: file.name, description: file.description, summary: file.summary, body: file.body, source: "user".to_string(), change_description: None, created_at: file.updated_at, })); } } // Otherwise, look in the versions table sqlx::query_as::<_, FileVersion>( r#" SELECT id, file_id, version, name, description, summary, body, source, change_description, created_at FROM file_versions WHERE file_id = $1 AND version = $2 "#, ) .bind(file_id) .bind(version) .fetch_optional(pool) .await } /// Restore a file to a previous version. /// This creates a new version with the content from the target version. pub async fn restore_file_version( pool: &PgPool, file_id: Uuid, target_version: i32, current_version: i32, ) -> Result, RepositoryError> { // Get the target version content let target = get_file_version(pool, file_id, target_version).await?; let Some(target) = target else { return Ok(None); }; // Set version source and description for the trigger set_version_source(pool, "system").await?; set_change_description(pool, &format!("Restored from version {}", target_version)).await?; // Update the file with the target version's content // This will trigger the save_file_version trigger to save the current state first let update_req = UpdateFileRequest { name: Some(target.name), description: target.description, transcript: None, summary: target.summary, body: Some(target.body), version: Some(current_version), repo_file_path: None, }; update_file(pool, file_id, update_req).await } /// Count versions for a file. pub async fn count_file_versions(pool: &PgPool, file_id: Uuid) -> Result { let result: (i64,) = sqlx::query_as( "SELECT COUNT(*) + 1 FROM file_versions WHERE file_id = $1", // +1 for current version ) .bind(file_id) .fetch_one(pool) .await?; Ok(result.0) } // ============================================================================= // Task Functions // ============================================================================= /// Create a new task. /// /// If creating a subtask (parent_task_id is set) and repository settings are not provided, /// the subtask will inherit repository_url, base_branch, target_branch, merge_mode, /// and target_repo_path from the parent task. Depth is calculated from parent and limited /// to max 1 (2 levels: orchestrator at depth 0, subtasks at depth 1). /// /// NOTE: completion_action is NOT inherited - subtasks should not auto-merge unless /// explicitly configured. The supervisor controls when completion steps happen. /// /// Task spawning is now controlled by supervisors at the application level. /// Depth is no longer constrained in the database. pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result { // Calculate depth and inherit settings from parent if applicable let (depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = if let Some(parent_id) = req.parent_task_id { // Fetch parent task to get depth and inherit settings let parent = get_task(pool, parent_id).await? .ok_or_else(|| sqlx::Error::RowNotFound)?; let new_depth = parent.depth + 1; // Subtasks inherit contract_id from parent let contract_id = parent.contract_id.unwrap_or(req.contract_id); // Inherit repo settings if not provided let repo_url = req.repository_url.clone().or(parent.repository_url); let base_branch = req.base_branch.clone().or(parent.base_branch); let target_branch = req.target_branch.clone().or(parent.target_branch); let merge_mode = req.merge_mode.clone().or(parent.merge_mode); let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path); // NOTE: completion_action is NOT inherited - subtasks should not auto-merge. // The supervisor integrates subtask work from their worktrees. let completion_action = req.completion_action.clone(); (new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) } else { // Top-level task: depth 0, use contract_id from request ( 0, req.contract_id, req.repository_url.clone(), req.base_branch.clone(), req.target_branch.clone(), req.merge_mode.clone(), req.target_repo_path.clone(), req.completion_action.clone(), ) }; let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default()); sqlx::query_as::<_, Task>( r#" INSERT INTO tasks ( contract_id, parent_task_id, depth, name, description, plan, priority, is_supervisor, repository_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action, continue_from_task_id, copy_files ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) RETURNING * "#, ) .bind(contract_id) .bind(req.parent_task_id) .bind(depth) .bind(&req.name) .bind(&req.description) .bind(&req.plan) .bind(req.priority) .bind(req.is_supervisor) .bind(&repo_url) .bind(&base_branch) .bind(&target_branch) .bind(&merge_mode) .bind(&target_repo_path) .bind(&completion_action) .bind(&req.continue_from_task_id) .bind(©_files_json) .fetch_one(pool) .await } /// Get a task by ID. pub async fn get_task(pool: &PgPool, id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" SELECT * FROM tasks WHERE id = $1 "#, ) .bind(id) .fetch_optional(pool) .await } /// List all top-level tasks (no parent), ordered by created_at DESC. pub async fn list_tasks(pool: &PgPool) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, c.status as contract_status, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, t.version, t.is_supervisor, t.created_at, t.updated_at FROM tasks t LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id IS NULL ORDER BY t.priority DESC, t.created_at DESC "#, ) .fetch_all(pool) .await } /// List subtasks of a parent task. pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, c.status as contract_status, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, t.version, t.is_supervisor, t.created_at, t.updated_at FROM tasks t LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id = $1 ORDER BY t.priority DESC, t.created_at DESC "#, ) .bind(parent_id) .fetch_all(pool) .await } /// List all tasks in a contract (for supervisor tree view). pub async fn list_tasks_by_contract( pool: &PgPool, contract_id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" SELECT * FROM tasks WHERE contract_id = $1 AND owner_id = $2 ORDER BY is_supervisor DESC, depth ASC, created_at ASC "#, ) .bind(contract_id) .bind(owner_id) .fetch_all(pool) .await } /// Get pending tasks for a contract (non-supervisor tasks only). /// Includes tasks that were interrupted (retry candidates). /// Prioritizes interrupted tasks and excludes those that exceeded max_retries. pub async fn get_pending_tasks_for_contract( pool: &PgPool, contract_id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" SELECT * FROM tasks WHERE contract_id = $1 AND owner_id = $2 AND status = 'pending' AND is_supervisor = false AND retry_count < max_retries ORDER BY interrupted_at DESC NULLS LAST, priority DESC, created_at ASC "#, ) .bind(contract_id) .bind(owner_id) .fetch_all(pool) .await } /// Mark a task as pending for retry after daemon failure. /// Increments retry count and adds the failed daemon to exclusion list. pub async fn mark_task_for_retry( pool: &PgPool, task_id: Uuid, failed_daemon_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" UPDATE tasks SET status = 'pending', daemon_id = NULL, retry_count = retry_count + 1, failed_daemon_ids = array_append(COALESCE(failed_daemon_ids, '{}'), $2), last_active_daemon_id = $2, interrupted_at = NOW(), error_message = 'Daemon disconnected, awaiting retry', updated_at = NOW() WHERE id = $1 AND retry_count < max_retries RETURNING * "#, ) .bind(task_id) .bind(failed_daemon_id) .fetch_optional(pool) .await } /// Mark a task as permanently failed (exceeded retry limit). pub async fn mark_task_permanently_failed( pool: &PgPool, task_id: Uuid, failed_daemon_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( r#" UPDATE tasks SET status = 'failed', daemon_id = NULL, retry_count = retry_count + 1, failed_daemon_ids = array_append(COALESCE(failed_daemon_ids, '{}'), $2), last_active_daemon_id = $2, error_message = 'Task failed: exceeded maximum retry attempts', updated_at = NOW() WHERE id = $1 "#, ) .bind(task_id) .bind(failed_daemon_id) .execute(pool) .await?; Ok(()) } /// Update a task by ID with optimistic locking. pub async fn update_task( pool: &PgPool, id: Uuid, req: UpdateTaskRequest, ) -> Result, RepositoryError> { // Get the existing task first let existing = get_task(pool, id).await?; let Some(existing) = existing else { return Ok(None); }; // Check version if provided (optimistic locking) if let Some(expected_version) = req.version { if existing.version != expected_version { return Err(RepositoryError::VersionConflict { expected: expected_version, actual: existing.version, }); } } // Apply updates let name = req.name.unwrap_or(existing.name); let description = req.description.or(existing.description); let plan = req.plan.unwrap_or(existing.plan); let status = req.status.unwrap_or(existing.status); let priority = req.priority.unwrap_or(existing.priority); let progress_summary = req.progress_summary.or(existing.progress_summary); let last_output = req.last_output.or(existing.last_output); let error_message = req.error_message.or(existing.error_message); let merge_mode = req.merge_mode.or(existing.merge_mode); let pr_url = req.pr_url.or(existing.pr_url); let target_repo_path = req.target_repo_path.or(existing.target_repo_path); let completion_action = req.completion_action.or(existing.completion_action); // Handle clear_daemon_id: if true, set to NULL; otherwise use provided value or keep existing let daemon_id = if req.clear_daemon_id { None } else { req.daemon_id.or(existing.daemon_id) }; // Update with version check in WHERE clause for race condition safety let result = if req.version.is_some() { sqlx::query_as::<_, Task>( r#" UPDATE tasks SET name = $2, description = $3, plan = $4, status = $5, priority = $6, progress_summary = $7, last_output = $8, error_message = $9, merge_mode = $10, pr_url = $11, daemon_id = $12, target_repo_path = $13, completion_action = $14, updated_at = NOW() WHERE id = $1 AND version = $15 RETURNING * "#, ) .bind(id) .bind(&name) .bind(&description) .bind(&plan) .bind(&status) .bind(priority) .bind(&progress_summary) .bind(&last_output) .bind(&error_message) .bind(&merge_mode) .bind(&pr_url) .bind(daemon_id) .bind(&target_repo_path) .bind(&completion_action) .bind(req.version.unwrap()) .fetch_optional(pool) .await? } else { sqlx::query_as::<_, Task>( r#" UPDATE tasks SET name = $2, description = $3, plan = $4, status = $5, priority = $6, progress_summary = $7, last_output = $8, error_message = $9, merge_mode = $10, pr_url = $11, daemon_id = $12, target_repo_path = $13, completion_action = $14, updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(id) .bind(&name) .bind(&description) .bind(&plan) .bind(&status) .bind(priority) .bind(&progress_summary) .bind(&last_output) .bind(&error_message) .bind(&merge_mode) .bind(&pr_url) .bind(daemon_id) .bind(&target_repo_path) .bind(&completion_action) .fetch_optional(pool) .await? }; // If versioned update returned None, there was a race condition if result.is_none() && req.version.is_some() { if let Some(current) = get_task(pool, id).await? { return Err(RepositoryError::VersionConflict { expected: req.version.unwrap(), actual: current.version, }); } } Ok(result) } /// Delete a task by ID. pub async fn delete_task(pool: &PgPool, id: Uuid) -> Result { let result = sqlx::query( r#" DELETE FROM tasks WHERE id = $1 "#, ) .bind(id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Count total tasks. pub async fn count_tasks(pool: &PgPool) -> Result { let result: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM tasks WHERE parent_task_id IS NULL", ) .fetch_one(pool) .await?; Ok(result.0) } // ============================================================================= // Owner-Scoped Task Functions // ============================================================================= /// Create a new task for a specific owner. pub async fn create_task_for_owner( pool: &PgPool, owner_id: Uuid, req: CreateTaskRequest, ) -> Result { // Calculate depth and inherit settings from parent if applicable let (depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = if let Some(parent_id) = req.parent_task_id { // Fetch parent task to get depth and inherit settings (must belong to same owner) let parent = get_task_for_owner(pool, parent_id, owner_id).await? .ok_or_else(|| sqlx::Error::RowNotFound)?; let new_depth = parent.depth + 1; // Validate max depth if new_depth >= 2 { return Err(sqlx::Error::Protocol(format!( "Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.", new_depth ))); } // Subtasks inherit contract_id from parent let contract_id = parent.contract_id.unwrap_or(req.contract_id); // Inherit repo settings if not provided let repo_url = req.repository_url.clone().or(parent.repository_url); let base_branch = req.base_branch.clone().or(parent.base_branch); let target_branch = req.target_branch.clone().or(parent.target_branch); let merge_mode = req.merge_mode.clone().or(parent.merge_mode); let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path); // NOTE: completion_action is NOT inherited - subtasks should not auto-merge. // The orchestrator integrates subtask work from their worktrees. let completion_action = req.completion_action.clone(); (new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) } else { // Top-level task: depth 0, use contract_id from request ( 0, req.contract_id, req.repository_url.clone(), req.base_branch.clone(), req.target_branch.clone(), req.merge_mode.clone(), req.target_repo_path.clone(), req.completion_action.clone(), ) }; let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default()); sqlx::query_as::<_, Task>( r#" INSERT INTO tasks ( owner_id, contract_id, parent_task_id, depth, name, description, plan, priority, is_supervisor, repository_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action, continue_from_task_id, copy_files ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) RETURNING * "#, ) .bind(owner_id) .bind(contract_id) .bind(req.parent_task_id) .bind(depth) .bind(&req.name) .bind(&req.description) .bind(&req.plan) .bind(req.priority) .bind(req.is_supervisor) .bind(&repo_url) .bind(&base_branch) .bind(&target_branch) .bind(&merge_mode) .bind(&target_repo_path) .bind(&completion_action) .bind(&req.continue_from_task_id) .bind(©_files_json) .fetch_one(pool) .await } /// Get a task by ID, scoped to owner. pub async fn get_task_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" SELECT * FROM tasks WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await } /// List all top-level tasks (no parent) for an owner, ordered by created_at DESC. pub async fn list_tasks_for_owner( pool: &PgPool, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, c.status as contract_status, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, t.version, t.is_supervisor, t.created_at, t.updated_at FROM tasks t LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.owner_id = $1 AND t.parent_task_id IS NULL ORDER BY t.priority DESC, t.created_at DESC "#, ) .bind(owner_id) .fetch_all(pool) .await } /// List subtasks of a parent task, scoped to owner. pub async fn list_subtasks_for_owner( pool: &PgPool, parent_id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, c.status as contract_status, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, t.version, t.is_supervisor, t.created_at, t.updated_at FROM tasks t LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.owner_id = $1 AND t.parent_task_id = $2 ORDER BY t.priority DESC, t.created_at DESC "#, ) .bind(owner_id) .bind(parent_id) .fetch_all(pool) .await } /// Update a task by ID with optimistic locking, scoped to owner. pub async fn update_task_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, req: UpdateTaskRequest, ) -> Result, RepositoryError> { // Get the existing task first (scoped to owner) let existing = get_task_for_owner(pool, id, owner_id).await?; let Some(existing) = existing else { return Ok(None); }; // Check version if provided (optimistic locking) if let Some(expected_version) = req.version { if existing.version != expected_version { return Err(RepositoryError::VersionConflict { expected: expected_version, actual: existing.version, }); } } // Apply updates let name = req.name.unwrap_or(existing.name); let description = req.description.or(existing.description); let plan = req.plan.unwrap_or(existing.plan); let status = req.status.unwrap_or(existing.status); let priority = req.priority.unwrap_or(existing.priority); let progress_summary = req.progress_summary.or(existing.progress_summary); let last_output = req.last_output.or(existing.last_output); let error_message = req.error_message.or(existing.error_message); let merge_mode = req.merge_mode.or(existing.merge_mode); let pr_url = req.pr_url.or(existing.pr_url); let repository_url = req.repository_url.or(existing.repository_url); let target_repo_path = req.target_repo_path.or(existing.target_repo_path); let completion_action = req.completion_action.or(existing.completion_action); let daemon_id = if req.clear_daemon_id { None } else { req.daemon_id.or(existing.daemon_id) }; // Update with version check in WHERE clause for race condition safety let result = if req.version.is_some() { sqlx::query_as::<_, Task>( r#" UPDATE tasks SET name = $3, description = $4, plan = $5, status = $6, priority = $7, progress_summary = $8, last_output = $9, error_message = $10, merge_mode = $11, pr_url = $12, daemon_id = $13, target_repo_path = $14, completion_action = $15, repository_url = $16, updated_at = NOW() WHERE id = $1 AND owner_id = $2 AND version = $17 RETURNING * "#, ) .bind(id) .bind(owner_id) .bind(&name) .bind(&description) .bind(&plan) .bind(&status) .bind(priority) .bind(&progress_summary) .bind(&last_output) .bind(&error_message) .bind(&merge_mode) .bind(&pr_url) .bind(daemon_id) .bind(&target_repo_path) .bind(&completion_action) .bind(&repository_url) .bind(req.version.unwrap()) .fetch_optional(pool) .await? } else { sqlx::query_as::<_, Task>( r#" UPDATE tasks SET name = $3, description = $4, plan = $5, status = $6, priority = $7, progress_summary = $8, last_output = $9, error_message = $10, merge_mode = $11, pr_url = $12, daemon_id = $13, target_repo_path = $14, completion_action = $15, repository_url = $16, updated_at = NOW() WHERE id = $1 AND owner_id = $2 RETURNING * "#, ) .bind(id) .bind(owner_id) .bind(&name) .bind(&description) .bind(&plan) .bind(&status) .bind(priority) .bind(&progress_summary) .bind(&last_output) .bind(&error_message) .bind(&merge_mode) .bind(&pr_url) .bind(daemon_id) .bind(&target_repo_path) .bind(&completion_action) .bind(&repository_url) .fetch_optional(pool) .await? }; // If versioned update returned None, there was a race condition if result.is_none() && req.version.is_some() { if let Some(current) = get_task_for_owner(pool, id, owner_id).await? { return Err(RepositoryError::VersionConflict { expected: req.version.unwrap(), actual: current.version, }); } } Ok(result) } /// Delete a task by ID, scoped to owner. pub async fn delete_task_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result { let result = sqlx::query( r#" DELETE FROM tasks WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Update task status and record event. pub async fn update_task_status( pool: &PgPool, id: Uuid, new_status: &str, event_data: Option, ) -> Result, sqlx::Error> { // Get existing status let existing = get_task(pool, id).await?; let Some(existing) = existing else { return Ok(None); }; let previous_status = existing.status.clone(); // Update task status let task = sqlx::query_as::<_, Task>( r#" UPDATE tasks SET status = $2, updated_at = NOW(), started_at = CASE WHEN $2 = 'running' AND started_at IS NULL THEN NOW() ELSE started_at END, completed_at = CASE WHEN $2 IN ('done', 'failed', 'merged') THEN NOW() ELSE completed_at END WHERE id = $1 RETURNING * "#, ) .bind(id) .bind(new_status) .fetch_optional(pool) .await?; // Record event if task.is_some() { let _ = create_task_event( pool, id, "status_change", Some(&previous_status), Some(new_status), event_data, ) .await; } Ok(task) } // ============================================================================= // Task Event Functions // ============================================================================= /// Create a task event. pub async fn create_task_event( pool: &PgPool, task_id: Uuid, event_type: &str, previous_status: Option<&str>, new_status: Option<&str>, event_data: Option, ) -> Result { sqlx::query_as::<_, TaskEvent>( r#" INSERT INTO task_events (task_id, event_type, previous_status, new_status, event_data) VALUES ($1, $2, $3, $4, $5) RETURNING * "#, ) .bind(task_id) .bind(event_type) .bind(previous_status) .bind(new_status) .bind(event_data) .fetch_one(pool) .await } /// List events for a task. pub async fn list_task_events( pool: &PgPool, task_id: Uuid, limit: Option, ) -> Result, sqlx::Error> { let limit = limit.unwrap_or(100); sqlx::query_as::<_, TaskEvent>( r#" SELECT * FROM task_events WHERE task_id = $1 ORDER BY created_at DESC LIMIT $2 "#, ) .bind(task_id) .bind(limit) .fetch_all(pool) .await } // ============================================================================= // Daemon Functions // ============================================================================= /// Register a new daemon connection. pub async fn register_daemon( pool: &PgPool, owner_id: Uuid, connection_id: &str, hostname: Option<&str>, machine_id: Option<&str>, max_concurrent_tasks: i32, ) -> Result { sqlx::query_as::<_, Daemon>( r#" INSERT INTO daemons (owner_id, connection_id, hostname, machine_id, max_concurrent_tasks) VALUES ($1, $2, $3, $4, $5) RETURNING * "#, ) .bind(owner_id) .bind(connection_id) .bind(hostname) .bind(machine_id) .bind(max_concurrent_tasks) .fetch_one(pool) .await } /// Get a daemon by ID. pub async fn get_daemon(pool: &PgPool, id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, Daemon>( r#" SELECT * FROM daemons WHERE id = $1 "#, ) .bind(id) .fetch_optional(pool) .await } /// Get a daemon by connection ID. pub async fn get_daemon_by_connection( pool: &PgPool, connection_id: &str, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Daemon>( r#" SELECT * FROM daemons WHERE connection_id = $1 "#, ) .bind(connection_id) .fetch_optional(pool) .await } /// List all daemons. pub async fn list_daemons(pool: &PgPool) -> Result, sqlx::Error> { sqlx::query_as::<_, Daemon>( r#" SELECT * FROM daemons ORDER BY connected_at DESC "#, ) .fetch_all(pool) .await } /// List daemons for a specific owner. pub async fn list_daemons_for_owner(pool: &PgPool, owner_id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, Daemon>( r#" SELECT * FROM daemons WHERE owner_id = $1 ORDER BY connected_at DESC "#, ) .bind(owner_id) .fetch_all(pool) .await } /// Get a daemon by ID for a specific owner. pub async fn get_daemon_for_owner(pool: &PgPool, id: Uuid, owner_id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, Daemon>( r#" SELECT * FROM daemons WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await } /// Update daemon heartbeat. pub async fn update_daemon_heartbeat(pool: &PgPool, id: Uuid) -> Result { let result = sqlx::query( r#" UPDATE daemons SET last_heartbeat_at = NOW(), status = 'connected' WHERE id = $1 "#, ) .bind(id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Update daemon status. pub async fn update_daemon_status( pool: &PgPool, id: Uuid, status: &str, ) -> Result { let result = sqlx::query( r#" UPDATE daemons SET status = $2, disconnected_at = CASE WHEN $2 = 'disconnected' THEN NOW() ELSE disconnected_at END WHERE id = $1 "#, ) .bind(id) .bind(status) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Mark daemon as disconnected by connection_id. pub async fn disconnect_daemon_by_connection( pool: &PgPool, connection_id: &str, ) -> Result { let result = sqlx::query( r#" UPDATE daemons SET status = 'disconnected', disconnected_at = NOW() WHERE connection_id = $1 "#, ) .bind(connection_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Update daemon task count. pub async fn update_daemon_task_count( pool: &PgPool, id: Uuid, delta: i32, ) -> Result { let result = sqlx::query( r#" UPDATE daemons SET current_task_count = GREATEST(0, current_task_count + $2) WHERE id = $1 "#, ) .bind(id) .bind(delta) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Delete a daemon by ID. pub async fn delete_daemon(pool: &PgPool, id: Uuid) -> Result { let result = sqlx::query( r#" DELETE FROM daemons WHERE id = $1 "#, ) .bind(id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Delete a daemon by connection ID. pub async fn delete_daemon_by_connection( pool: &PgPool, connection_id: &str, ) -> Result { let result = sqlx::query( r#" DELETE FROM daemons WHERE connection_id = $1 "#, ) .bind(connection_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Count connected daemons. pub async fn count_daemons(pool: &PgPool) -> Result { let result: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM daemons WHERE status = 'connected'", ) .fetch_one(pool) .await?; Ok(result.0) } /// Delete stale daemons that haven't sent a heartbeat within the timeout. /// Returns the number of deleted daemons. pub async fn delete_stale_daemons( pool: &PgPool, timeout_seconds: i64, ) -> Result { let result = sqlx::query( r#" DELETE FROM daemons WHERE last_heartbeat_at < NOW() - INTERVAL '1 second' * $1 "#, ) .bind(timeout_seconds) .execute(pool) .await?; Ok(result.rows_affected()) } // ============================================================================= // Sibling Awareness Functions // ============================================================================= /// List sibling tasks (tasks with the same parent, excluding the given task). pub async fn list_sibling_tasks( pool: &PgPool, task_id: Uuid, parent_id: Option, ) -> Result, sqlx::Error> { match parent_id { Some(parent) => { sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, c.status as contract_status, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, t.version, t.is_supervisor, t.created_at, t.updated_at FROM tasks t LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id = $1 AND t.id != $2 ORDER BY t.priority DESC, t.created_at DESC "#, ) .bind(parent) .bind(task_id) .fetch_all(pool) .await } None => { // Top-level tasks (no parent) - siblings are other top-level tasks sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, c.status as contract_status, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, t.version, t.is_supervisor, t.created_at, t.updated_at FROM tasks t LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id IS NULL AND t.id != $1 ORDER BY t.priority DESC, t.created_at DESC "#, ) .bind(task_id) .fetch_all(pool) .await } } } /// Get running sibling tasks (for context injection). pub async fn get_running_siblings( pool: &PgPool, owner_id: Uuid, task_id: Uuid, parent_id: Option, ) -> Result, sqlx::Error> { match parent_id { Some(parent) => { sqlx::query_as::<_, Task>( r#" SELECT * FROM tasks t WHERE t.owner_id = $1 AND t.parent_task_id = $2 AND t.id != $3 AND t.status = 'running' ORDER BY t.priority DESC "#, ) .bind(owner_id) .bind(parent) .bind(task_id) .fetch_all(pool) .await } None => { sqlx::query_as::<_, Task>( r#" SELECT * FROM tasks t WHERE t.owner_id = $1 AND t.parent_task_id IS NULL AND t.id != $2 AND t.status = 'running' ORDER BY t.priority DESC "#, ) .bind(owner_id) .bind(task_id) .fetch_all(pool) .await } } } /// Get task with its siblings for context awareness. pub async fn get_task_with_siblings( pool: &PgPool, id: Uuid, ) -> Result)>, sqlx::Error> { let task = get_task(pool, id).await?; let Some(task) = task else { return Ok(None); }; let siblings = list_sibling_tasks(pool, id, task.parent_task_id).await?; Ok(Some((task, siblings))) } // ============================================================================= // Task Output Persistence Functions // ============================================================================= /// Save task output to the database. /// This stores output in the task_events table with event_type='output'. pub async fn save_task_output( pool: &PgPool, task_id: Uuid, message_type: &str, content: &str, tool_name: Option<&str>, tool_input: Option, is_error: Option, cost_usd: Option, duration_ms: Option, ) -> Result { let event_data = serde_json::json!({ "messageType": message_type, "content": content, "toolName": tool_name, "toolInput": tool_input, "isError": is_error, "costUsd": cost_usd, "durationMs": duration_ms, }); create_task_event(pool, task_id, "output", None, None, Some(event_data)).await } /// Get task output from the database. /// Retrieves all output events for a task, ordered by creation time. pub async fn get_task_output( pool: &PgPool, task_id: Uuid, limit: Option, ) -> Result, sqlx::Error> { let limit = limit.unwrap_or(1000); sqlx::query_as::<_, TaskEvent>( r#" SELECT * FROM task_events WHERE task_id = $1 AND event_type = 'output' ORDER BY created_at ASC LIMIT $2 "#, ) .bind(task_id) .bind(limit) .fetch_all(pool) .await } /// Update task completion status with error message. /// Sets the task status to 'done' or 'failed' and records completion time. pub async fn complete_task( pool: &PgPool, task_id: Uuid, success: bool, error_message: Option<&str>, ) -> Result, sqlx::Error> { let status = if success { "done" } else { "failed" }; let task = sqlx::query_as::<_, Task>( r#" UPDATE tasks SET status = $2, error_message = COALESCE($3, error_message), completed_at = NOW(), updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(task_id) .bind(status) .bind(error_message) .fetch_optional(pool) .await?; // Record completion event if task.is_some() { let event_data = serde_json::json!({ "success": success, "errorMessage": error_message, }); let _ = create_task_event( pool, task_id, "complete", Some("running"), Some(status), Some(event_data), ) .await; } Ok(task) } // ============================================================================= // Mesh Chat History Functions // ============================================================================= /// Get or create the active conversation for an owner. pub async fn get_or_create_active_conversation( pool: &PgPool, owner_id: Uuid, ) -> Result { // Try to get existing active conversation for this owner let existing = sqlx::query_as::<_, MeshChatConversation>( r#" SELECT * FROM mesh_chat_conversations WHERE is_active = true AND owner_id = $1 LIMIT 1 "#, ) .bind(owner_id) .fetch_optional(pool) .await?; if let Some(conv) = existing { return Ok(conv); } // Create new conversation sqlx::query_as::<_, MeshChatConversation>( r#" INSERT INTO mesh_chat_conversations (owner_id, is_active) VALUES ($1, true) RETURNING * "#, ) .bind(owner_id) .fetch_one(pool) .await } /// List messages for a conversation. pub async fn list_chat_messages( pool: &PgPool, conversation_id: Uuid, limit: Option, ) -> Result, sqlx::Error> { let limit = limit.unwrap_or(100); sqlx::query_as::<_, MeshChatMessageRecord>( r#" SELECT * FROM mesh_chat_messages WHERE conversation_id = $1 ORDER BY created_at ASC LIMIT $2 "#, ) .bind(conversation_id) .bind(limit) .fetch_all(pool) .await } /// Add a message to a conversation. #[allow(clippy::too_many_arguments)] pub async fn add_chat_message( pool: &PgPool, conversation_id: Uuid, role: &str, content: &str, context_type: &str, context_task_id: Option, tool_calls: Option, pending_questions: Option, ) -> Result { sqlx::query_as::<_, MeshChatMessageRecord>( r#" INSERT INTO mesh_chat_messages (conversation_id, role, content, context_type, context_task_id, tool_calls, pending_questions) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING * "#, ) .bind(conversation_id) .bind(role) .bind(content) .bind(context_type) .bind(context_task_id) .bind(tool_calls) .bind(pending_questions) .fetch_one(pool) .await } /// Clear conversation (archive existing and create new). pub async fn clear_conversation(pool: &PgPool, owner_id: Uuid) -> Result { // Mark existing as inactive for this owner sqlx::query( r#" UPDATE mesh_chat_conversations SET is_active = false, updated_at = NOW() WHERE is_active = true AND owner_id = $1 "#, ) .bind(owner_id) .execute(pool) .await?; // Create new active conversation get_or_create_active_conversation(pool, owner_id).await } // ============================================================================= // Contract Chat History Functions // ============================================================================= /// Get or create the active conversation for a contract. pub async fn get_or_create_contract_conversation( pool: &PgPool, contract_id: Uuid, owner_id: Uuid, ) -> Result { // Try to get existing active conversation for this contract let existing = sqlx::query_as::<_, ContractChatConversation>( r#" SELECT * FROM contract_chat_conversations WHERE is_active = true AND contract_id = $1 AND owner_id = $2 LIMIT 1 "#, ) .bind(contract_id) .bind(owner_id) .fetch_optional(pool) .await?; if let Some(conv) = existing { return Ok(conv); } // Create new conversation sqlx::query_as::<_, ContractChatConversation>( r#" INSERT INTO contract_chat_conversations (contract_id, owner_id, is_active) VALUES ($1, $2, true) RETURNING * "#, ) .bind(contract_id) .bind(owner_id) .fetch_one(pool) .await } /// List messages for a contract conversation. pub async fn list_contract_chat_messages( pool: &PgPool, conversation_id: Uuid, limit: Option, ) -> Result, sqlx::Error> { let limit = limit.unwrap_or(100); sqlx::query_as::<_, ContractChatMessageRecord>( r#" SELECT * FROM contract_chat_messages WHERE conversation_id = $1 ORDER BY created_at ASC LIMIT $2 "#, ) .bind(conversation_id) .bind(limit) .fetch_all(pool) .await } /// Add a message to a contract conversation. pub async fn add_contract_chat_message( pool: &PgPool, conversation_id: Uuid, role: &str, content: &str, tool_calls: Option, pending_questions: Option, ) -> Result { sqlx::query_as::<_, ContractChatMessageRecord>( r#" INSERT INTO contract_chat_messages (conversation_id, role, content, tool_calls, pending_questions) VALUES ($1, $2, $3, $4, $5) RETURNING * "#, ) .bind(conversation_id) .bind(role) .bind(content) .bind(tool_calls) .bind(pending_questions) .fetch_one(pool) .await } /// Clear contract conversation (archive existing and create new). pub async fn clear_contract_conversation( pool: &PgPool, contract_id: Uuid, owner_id: Uuid, ) -> Result { // Mark existing as inactive for this contract sqlx::query( r#" UPDATE contract_chat_conversations SET is_active = false, updated_at = NOW() WHERE is_active = true AND contract_id = $1 AND owner_id = $2 "#, ) .bind(contract_id) .bind(owner_id) .execute(pool) .await?; // Create new active conversation get_or_create_contract_conversation(pool, contract_id, owner_id).await } // ============================================================================= // Contract Functions (Owner-Scoped) // ============================================================================= /// Create a new contract for a specific owner. pub async fn create_contract_for_owner( pool: &PgPool, owner_id: Uuid, req: CreateContractRequest, ) -> Result { // Default contract type is "simple" let contract_type = req.contract_type.as_deref().unwrap_or("simple"); // Validate contract type let valid_types = ["simple", "specification"]; if !valid_types.contains(&contract_type) { return Err(sqlx::Error::Protocol(format!( "Invalid contract_type '{}'. Must be one of: {}", contract_type, valid_types.join(", ") ))); } // Determine valid phases based on contract type let (valid_phases, default_phase): (&[&str], &str) = match contract_type { "simple" => (&["plan", "execute"], "plan"), "specification" => (&["research", "specify", "plan", "execute", "review"], "research"), _ => (&["plan", "execute"], "plan"), }; // Use provided initial_phase or default based on contract type let phase = req.initial_phase.as_deref().unwrap_or(default_phase); // Validate the phase is valid for this contract type if !valid_phases.contains(&phase) { return Err(sqlx::Error::Protocol(format!( "Invalid initial_phase '{}' for contract type '{}'. Must be one of: {}", phase, contract_type, valid_phases.join(", ") ))); } let autonomous_loop = req.autonomous_loop.unwrap_or(false); let phase_guard = req.phase_guard.unwrap_or(false); sqlx::query_as::<_, Contract>( r#" INSERT INTO contracts (owner_id, name, description, contract_type, phase, autonomous_loop, phase_guard) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING * "#, ) .bind(owner_id) .bind(&req.name) .bind(&req.description) .bind(contract_type) .bind(phase) .bind(autonomous_loop) .bind(phase_guard) .fetch_one(pool) .await } /// Get a contract by ID, scoped to owner. pub async fn get_contract_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Contract>( r#" SELECT * FROM contracts WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await } /// List all contracts for an owner, ordered by created_at DESC. pub async fn list_contracts_for_owner( pool: &PgPool, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, ContractSummary>( r#" SELECT c.id, c.name, c.description, c.contract_type, c.phase, c.status, c.supervisor_task_id, c.version, c.created_at, (SELECT COUNT(*) FROM files WHERE contract_id = c.id) as file_count, (SELECT COUNT(*) FROM tasks WHERE contract_id = c.id) as task_count, (SELECT COUNT(*) FROM contract_repositories WHERE contract_id = c.id) as repository_count FROM contracts c WHERE c.owner_id = $1 ORDER BY c.created_at DESC "#, ) .bind(owner_id) .fetch_all(pool) .await } /// Get contract summary by ID. pub async fn get_contract_summary_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, ContractSummary>( r#" SELECT c.id, c.name, c.description, c.contract_type, c.phase, c.status, c.supervisor_task_id, c.version, c.created_at, (SELECT COUNT(*) FROM files WHERE contract_id = c.id) as file_count, (SELECT COUNT(*) FROM tasks WHERE contract_id = c.id) as task_count, (SELECT COUNT(*) FROM contract_repositories WHERE contract_id = c.id) as repository_count FROM contracts c WHERE c.id = $1 AND c.owner_id = $2 "#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await } /// Update a contract by ID with optimistic locking, scoped to owner. pub async fn update_contract_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, req: UpdateContractRequest, ) -> Result, RepositoryError> { let existing = get_contract_for_owner(pool, id, owner_id).await?; let Some(existing) = existing else { return Ok(None); }; // Check version if provided (optimistic locking) if let Some(expected_version) = req.version { if existing.version != expected_version { return Err(RepositoryError::VersionConflict { expected: expected_version, actual: existing.version, }); } } // Apply updates let name = req.name.unwrap_or(existing.name); let description = req.description.or(existing.description); let phase = req.phase.unwrap_or(existing.phase); let status = req.status.unwrap_or(existing.status); let supervisor_task_id = req.supervisor_task_id.or(existing.supervisor_task_id); let autonomous_loop = req.autonomous_loop.unwrap_or(existing.autonomous_loop); let phase_guard = req.phase_guard.unwrap_or(existing.phase_guard); let result = if req.version.is_some() { sqlx::query_as::<_, Contract>( r#" UPDATE contracts SET name = $3, description = $4, phase = $5, status = $6, supervisor_task_id = $7, autonomous_loop = $8, phase_guard = $9, version = version + 1, updated_at = NOW() WHERE id = $1 AND owner_id = $2 AND version = $10 RETURNING * "#, ) .bind(id) .bind(owner_id) .bind(&name) .bind(&description) .bind(&phase) .bind(&status) .bind(supervisor_task_id) .bind(autonomous_loop) .bind(phase_guard) .bind(req.version.unwrap()) .fetch_optional(pool) .await? } else { sqlx::query_as::<_, Contract>( r#" UPDATE contracts SET name = $3, description = $4, phase = $5, status = $6, supervisor_task_id = $7, autonomous_loop = $8, phase_guard = $9, version = version + 1, updated_at = NOW() WHERE id = $1 AND owner_id = $2 RETURNING * "#, ) .bind(id) .bind(owner_id) .bind(&name) .bind(&description) .bind(&phase) .bind(&status) .bind(supervisor_task_id) .bind(autonomous_loop) .bind(phase_guard) .fetch_optional(pool) .await? }; // If versioned update returned None, there was a race condition if result.is_none() && req.version.is_some() { if let Some(current) = get_contract_for_owner(pool, id, owner_id).await? { return Err(RepositoryError::VersionConflict { expected: req.version.unwrap(), actual: current.version, }); } } Ok(result) } /// Delete a contract by ID, scoped to owner. pub async fn delete_contract_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result { let result = sqlx::query( r#" DELETE FROM contracts WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Change contract phase and record event. pub async fn change_contract_phase_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, new_phase: &str, ) -> Result, sqlx::Error> { // Get current phase let existing = get_contract_for_owner(pool, id, owner_id).await?; let Some(existing) = existing else { return Ok(None); }; let previous_phase = existing.phase.clone(); // Update phase let contract = sqlx::query_as::<_, Contract>( r#" UPDATE contracts SET phase = $3, version = version + 1, updated_at = NOW() WHERE id = $1 AND owner_id = $2 RETURNING * "#, ) .bind(id) .bind(owner_id) .bind(new_phase) .fetch_optional(pool) .await?; // Record event if contract.is_some() { sqlx::query( r#" INSERT INTO contract_events (contract_id, event_type, previous_phase, new_phase) VALUES ($1, 'phase_change', $2, $3) "#, ) .bind(id) .bind(&previous_phase) .bind(new_phase) .execute(pool) .await?; } Ok(contract) } // ============================================================================= // Contract Repository Functions // ============================================================================= /// List repositories for a contract. pub async fn list_contract_repositories( pool: &PgPool, contract_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, ContractRepository>( r#" SELECT * FROM contract_repositories WHERE contract_id = $1 ORDER BY is_primary DESC, created_at ASC "#, ) .bind(contract_id) .fetch_all(pool) .await } /// Add a remote repository to a contract. pub async fn add_remote_repository( pool: &PgPool, contract_id: Uuid, name: &str, repository_url: &str, is_primary: bool, ) -> Result { // If is_primary, clear other primaries first if is_primary { sqlx::query( r#" UPDATE contract_repositories SET is_primary = false, updated_at = NOW() WHERE contract_id = $1 AND is_primary = true "#, ) .bind(contract_id) .execute(pool) .await?; } sqlx::query_as::<_, ContractRepository>( r#" INSERT INTO contract_repositories (contract_id, name, repository_url, source_type, status, is_primary) VALUES ($1, $2, $3, 'remote', 'ready', $4) RETURNING * "#, ) .bind(contract_id) .bind(name) .bind(repository_url) .bind(is_primary) .fetch_one(pool) .await } /// Add a local repository to a contract. pub async fn add_local_repository( pool: &PgPool, contract_id: Uuid, name: &str, local_path: &str, is_primary: bool, ) -> Result { // If is_primary, clear other primaries first if is_primary { sqlx::query( r#" UPDATE contract_repositories SET is_primary = false, updated_at = NOW() WHERE contract_id = $1 AND is_primary = true "#, ) .bind(contract_id) .execute(pool) .await?; } sqlx::query_as::<_, ContractRepository>( r#" INSERT INTO contract_repositories (contract_id, name, local_path, source_type, status, is_primary) VALUES ($1, $2, $3, 'local', 'ready', $4) RETURNING * "#, ) .bind(contract_id) .bind(name) .bind(local_path) .bind(is_primary) .fetch_one(pool) .await } /// Create a managed repository (daemon will create it). pub async fn create_managed_repository( pool: &PgPool, contract_id: Uuid, name: &str, is_primary: bool, ) -> Result { // If is_primary, clear other primaries first if is_primary { sqlx::query( r#" UPDATE contract_repositories SET is_primary = false, updated_at = NOW() WHERE contract_id = $1 AND is_primary = true "#, ) .bind(contract_id) .execute(pool) .await?; } sqlx::query_as::<_, ContractRepository>( r#" INSERT INTO contract_repositories (contract_id, name, source_type, status, is_primary) VALUES ($1, $2, 'managed', 'pending', $3) RETURNING * "#, ) .bind(contract_id) .bind(name) .bind(is_primary) .fetch_one(pool) .await } /// Delete a repository from a contract. pub async fn delete_contract_repository( pool: &PgPool, repo_id: Uuid, contract_id: Uuid, ) -> Result { let result = sqlx::query( r#" DELETE FROM contract_repositories WHERE id = $1 AND contract_id = $2 "#, ) .bind(repo_id) .bind(contract_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Set a repository as primary (and clear others). pub async fn set_repository_primary( pool: &PgPool, repo_id: Uuid, contract_id: Uuid, ) -> Result { // Clear other primaries sqlx::query( r#" UPDATE contract_repositories SET is_primary = false, updated_at = NOW() WHERE contract_id = $1 AND is_primary = true "#, ) .bind(contract_id) .execute(pool) .await?; // Set this one as primary let result = sqlx::query( r#" UPDATE contract_repositories SET is_primary = true, updated_at = NOW() WHERE id = $1 AND contract_id = $2 "#, ) .bind(repo_id) .bind(contract_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Update managed repository status (used by daemon). pub async fn update_managed_repository_status( pool: &PgPool, repo_id: Uuid, status: &str, repository_url: Option<&str>, ) -> Result, sqlx::Error> { sqlx::query_as::<_, ContractRepository>( r#" UPDATE contract_repositories SET status = $2, repository_url = COALESCE($3, repository_url), updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(repo_id) .bind(status) .bind(repository_url) .fetch_optional(pool) .await } // ============================================================================= // Contract Task Association Functions // ============================================================================= /// Add a task to a contract. pub async fn add_task_to_contract( pool: &PgPool, contract_id: Uuid, task_id: Uuid, owner_id: Uuid, ) -> Result { let result = sqlx::query( r#" UPDATE tasks SET contract_id = $2, updated_at = NOW() WHERE id = $1 AND owner_id = $3 "#, ) .bind(task_id) .bind(contract_id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Remove a task from a contract. pub async fn remove_task_from_contract( pool: &PgPool, contract_id: Uuid, task_id: Uuid, owner_id: Uuid, ) -> Result { let result = sqlx::query( r#" UPDATE tasks SET contract_id = NULL, updated_at = NOW() WHERE id = $1 AND contract_id = $2 AND owner_id = $3 "#, ) .bind(task_id) .bind(contract_id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// List files in a contract. pub async fn list_files_in_contract( pool: &PgPool, contract_id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { // Use a manual query since FileSummary doesn't have a FromRow derive with all the computed fields let files = sqlx::query_as::<_, File>( r#" SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files WHERE contract_id = $1 AND owner_id = $2 ORDER BY created_at DESC "#, ) .bind(contract_id) .bind(owner_id) .fetch_all(pool) .await?; Ok(files.into_iter().map(FileSummary::from).collect()) } /// List tasks in a contract. pub async fn list_tasks_in_contract( pool: &PgPool, contract_id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, c.status as contract_status, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, t.version, t.is_supervisor, t.created_at, t.updated_at FROM tasks t LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.contract_id = $1 AND t.owner_id = $2 ORDER BY t.priority DESC, t.created_at DESC "#, ) .bind(contract_id) .bind(owner_id) .fetch_all(pool) .await } /// Minimal task info for worktree cleanup operations. #[derive(Debug, Clone, sqlx::FromRow)] pub struct TaskWorktreeInfo { pub id: Uuid, pub daemon_id: Option, pub overlay_path: Option, } /// List tasks in a contract with their daemon/worktree info. /// Used for cleaning up worktrees when a contract is completed or deleted. pub async fn list_contract_tasks_with_worktree_info( pool: &PgPool, contract_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskWorktreeInfo>( r#" SELECT id, daemon_id, overlay_path FROM tasks WHERE contract_id = $1 AND (daemon_id IS NOT NULL OR overlay_path IS NOT NULL) "#, ) .bind(contract_id) .fetch_all(pool) .await } // ============================================================================= // Contract Events // ============================================================================= /// List events for a contract. pub async fn list_contract_events( pool: &PgPool, contract_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, ContractEvent>( r#" SELECT * FROM contract_events WHERE contract_id = $1 ORDER BY created_at DESC "#, ) .bind(contract_id) .fetch_all(pool) .await } /// Record a contract event. pub async fn record_contract_event( pool: &PgPool, contract_id: Uuid, event_type: &str, event_data: Option, ) -> Result { sqlx::query_as::<_, ContractEvent>( r#" INSERT INTO contract_events (contract_id, event_type, event_data) VALUES ($1, $2, $3) RETURNING * "#, ) .bind(contract_id) .bind(event_type) .bind(event_data) .fetch_one(pool) .await } // ============================================================================ // Task Checkpoints // ============================================================================ /// Create a checkpoint for a task. pub async fn create_task_checkpoint( pool: &PgPool, task_id: Uuid, commit_sha: &str, branch_name: &str, message: &str, files_changed: Option, lines_added: Option, lines_removed: Option, ) -> Result { // Get current checkpoint count and increment let checkpoint_number: i32 = sqlx::query_scalar( "SELECT COALESCE(MAX(checkpoint_number), 0) + 1 FROM task_checkpoints WHERE task_id = $1", ) .bind(task_id) .fetch_one(pool) .await?; // Update task's checkpoint tracking sqlx::query( r#" UPDATE tasks SET last_checkpoint_sha = $1, checkpoint_count = $2, checkpoint_message = $3, updated_at = NOW() WHERE id = $4 "#, ) .bind(commit_sha) .bind(checkpoint_number) .bind(message) .bind(task_id) .execute(pool) .await?; sqlx::query_as::<_, TaskCheckpoint>( r#" INSERT INTO task_checkpoints ( task_id, checkpoint_number, commit_sha, branch_name, message, files_changed, lines_added, lines_removed ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING * "#, ) .bind(task_id) .bind(checkpoint_number) .bind(commit_sha) .bind(branch_name) .bind(message) .bind(files_changed) .bind(lines_added) .bind(lines_removed) .fetch_one(pool) .await } /// Get a checkpoint by ID. pub async fn get_task_checkpoint( pool: &PgPool, id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskCheckpoint>("SELECT * FROM task_checkpoints WHERE id = $1") .bind(id) .fetch_optional(pool) .await } /// Get a checkpoint by commit SHA. pub async fn get_task_checkpoint_by_sha( pool: &PgPool, commit_sha: &str, ) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskCheckpoint>("SELECT * FROM task_checkpoints WHERE commit_sha = $1") .bind(commit_sha) .fetch_optional(pool) .await } /// List checkpoints for a task. pub async fn list_task_checkpoints( pool: &PgPool, task_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskCheckpoint>( "SELECT * FROM task_checkpoints WHERE task_id = $1 ORDER BY checkpoint_number DESC", ) .bind(task_id) .fetch_all(pool) .await } // ============================================================================ // Supervisor State // ============================================================================ /// Create or update supervisor state for a contract. pub async fn upsert_supervisor_state( pool: &PgPool, contract_id: Uuid, task_id: Uuid, conversation_history: serde_json::Value, pending_task_ids: &[Uuid], phase: &str, ) -> Result { sqlx::query_as::<_, SupervisorState>( r#" INSERT INTO supervisor_states (contract_id, task_id, conversation_history, pending_task_ids, phase, last_activity) VALUES ($1, $2, $3, $4, $5, NOW()) ON CONFLICT (contract_id) DO UPDATE SET task_id = EXCLUDED.task_id, conversation_history = EXCLUDED.conversation_history, pending_task_ids = EXCLUDED.pending_task_ids, phase = EXCLUDED.phase, last_activity = NOW(), updated_at = NOW() RETURNING * "#, ) .bind(contract_id) .bind(task_id) .bind(conversation_history) .bind(pending_task_ids) .bind(phase) .fetch_one(pool) .await } /// Get supervisor state for a contract. pub async fn get_supervisor_state( pool: &PgPool, contract_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, SupervisorState>("SELECT * FROM supervisor_states WHERE contract_id = $1") .bind(contract_id) .fetch_optional(pool) .await } /// Get supervisor state by task ID. pub async fn get_supervisor_state_by_task( pool: &PgPool, task_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, SupervisorState>("SELECT * FROM supervisor_states WHERE task_id = $1") .bind(task_id) .fetch_optional(pool) .await } /// Update supervisor conversation history. pub async fn update_supervisor_conversation( pool: &PgPool, contract_id: Uuid, conversation_history: serde_json::Value, ) -> Result { sqlx::query_as::<_, SupervisorState>( r#" UPDATE supervisor_states SET conversation_history = $1, last_activity = NOW(), updated_at = NOW() WHERE contract_id = $2 RETURNING * "#, ) .bind(conversation_history) .bind(contract_id) .fetch_one(pool) .await } /// Update supervisor pending tasks. pub async fn update_supervisor_pending_tasks( pool: &PgPool, contract_id: Uuid, pending_task_ids: &[Uuid], ) -> Result { sqlx::query_as::<_, SupervisorState>( r#" UPDATE supervisor_states SET pending_task_ids = $1, last_activity = NOW(), updated_at = NOW() WHERE contract_id = $2 RETURNING * "#, ) .bind(pending_task_ids) .bind(contract_id) .fetch_one(pool) .await } // ============================================================================ // Contract Supervisor // ============================================================================ /// Update contract's supervisor task ID. pub async fn update_contract_supervisor( pool: &PgPool, contract_id: Uuid, supervisor_task_id: Uuid, ) -> Result { sqlx::query_as::<_, Contract>( r#" UPDATE contracts SET supervisor_task_id = $1, updated_at = NOW() WHERE id = $2 RETURNING * "#, ) .bind(supervisor_task_id) .bind(contract_id) .fetch_one(pool) .await } /// Get the supervisor task for a contract. pub async fn get_contract_supervisor_task( pool: &PgPool, contract_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" SELECT t.* FROM tasks t JOIN contracts c ON c.supervisor_task_id = t.id WHERE c.id = $1 "#, ) .bind(contract_id) .fetch_optional(pool) .await } // ============================================================================ // Task Tree Queries // ============================================================================ /// Get full task tree for a contract. pub async fn get_contract_task_tree( pool: &PgPool, contract_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" WITH RECURSIVE task_tree AS ( -- Base case: root tasks (no parent) SELECT * FROM tasks WHERE contract_id = $1 AND parent_task_id IS NULL UNION ALL -- Recursive case: children of current level SELECT t.* FROM tasks t JOIN task_tree tt ON t.parent_task_id = tt.id ) SELECT * FROM task_tree ORDER BY depth, created_at "#, ) .bind(contract_id) .fetch_all(pool) .await } /// Get task tree from a specific root task. pub async fn get_task_tree(pool: &PgPool, root_task_id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" WITH RECURSIVE task_tree AS ( -- Base case: the root task SELECT * FROM tasks WHERE id = $1 UNION ALL -- Recursive case: children of current level SELECT t.* FROM tasks t JOIN task_tree tt ON t.parent_task_id = tt.id ) SELECT * FROM task_tree ORDER BY depth, created_at "#, ) .bind(root_task_id) .fetch_all(pool) .await } // ============================================================================ // Daemon Selection // ============================================================================ /// Get daemons with capacity info for selection. pub async fn get_available_daemons( pool: &PgPool, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DaemonWithCapacity>( r#" SELECT id, owner_id, connection_id, hostname, machine_id, max_concurrent_tasks, current_task_count, capacity_score, task_queue_length, supports_migration, status, last_heartbeat_at, connected_at FROM daemons WHERE owner_id = $1 AND status = 'connected' ORDER BY COALESCE(capacity_score, 100) DESC, (max_concurrent_tasks - current_task_count) DESC, COALESCE(task_queue_length, 0) ASC "#, ) .bind(owner_id) .fetch_all(pool) .await } /// Get daemons with capacity info for selection, excluding specified daemon IDs. /// Used for task retry to avoid reassigning to daemons that have already failed. pub async fn get_available_daemons_excluding( pool: &PgPool, owner_id: Uuid, exclude_daemon_ids: &[Uuid], ) -> Result, sqlx::Error> { sqlx::query_as::<_, DaemonWithCapacity>( r#" SELECT id, owner_id, connection_id, hostname, machine_id, max_concurrent_tasks, current_task_count, capacity_score, task_queue_length, supports_migration, status, last_heartbeat_at, connected_at FROM daemons WHERE owner_id = $1 AND status = 'connected' AND id != ALL($2) ORDER BY COALESCE(capacity_score, 100) DESC, (max_concurrent_tasks - current_task_count) DESC, COALESCE(task_queue_length, 0) ASC "#, ) .bind(owner_id) .bind(exclude_daemon_ids) .fetch_all(pool) .await } /// Create a daemon task assignment. pub async fn create_daemon_task_assignment( pool: &PgPool, daemon_id: Uuid, task_id: Uuid, ) -> Result { sqlx::query_as::<_, DaemonTaskAssignment>( r#" INSERT INTO daemon_task_assignments (daemon_id, task_id) VALUES ($1, $2) RETURNING * "#, ) .bind(daemon_id) .bind(task_id) .fetch_one(pool) .await } /// Update daemon task assignment status. pub async fn update_daemon_task_assignment_status( pool: &PgPool, task_id: Uuid, status: &str, ) -> Result { sqlx::query_as::<_, DaemonTaskAssignment>( r#" UPDATE daemon_task_assignments SET status = $1 WHERE task_id = $2 RETURNING * "#, ) .bind(status) .bind(task_id) .fetch_one(pool) .await } /// Get daemon task assignment for a task. pub async fn get_daemon_task_assignment( pool: &PgPool, task_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DaemonTaskAssignment>( "SELECT * FROM daemon_task_assignments WHERE task_id = $1", ) .bind(task_id) .fetch_optional(pool) .await } // ============================================================================ // Repository History Functions // ============================================================================ use super::models::RepositoryHistoryEntry; /// List all repository history entries for an owner, ordered by use_count DESC, last_used_at DESC. pub async fn list_repository_history_for_owner( pool: &PgPool, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, RepositoryHistoryEntry>( r#" SELECT id, owner_id, name, repository_url, local_path, source_type, use_count, last_used_at, created_at FROM repository_history WHERE owner_id = $1 ORDER BY use_count DESC, last_used_at DESC "#, ) .bind(owner_id) .fetch_all(pool) .await } /// Get repository suggestions for an owner, optionally filtered by source type and query. pub async fn get_repository_suggestions( pool: &PgPool, owner_id: Uuid, source_type: Option<&str>, query: Option<&str>, limit: i32, ) -> Result, sqlx::Error> { // Build query dynamically based on filters let mut sql = String::from( r#" SELECT id, owner_id, name, repository_url, local_path, source_type, use_count, last_used_at, created_at FROM repository_history WHERE owner_id = $1 "#, ); let mut param_idx = 2; if source_type.is_some() { sql.push_str(&format!(" AND source_type = ${}", param_idx)); param_idx += 1; } if query.is_some() { sql.push_str(&format!( " AND (LOWER(name) LIKE ${} OR LOWER(COALESCE(repository_url, '')) LIKE ${} OR LOWER(COALESCE(local_path, '')) LIKE ${})", param_idx, param_idx, param_idx )); param_idx += 1; } sql.push_str(&format!( " ORDER BY use_count DESC, last_used_at DESC LIMIT ${}", param_idx )); // Build and execute query with the appropriate bindings let mut query_builder = sqlx::query_as::<_, RepositoryHistoryEntry>(&sql).bind(owner_id); if let Some(st) = source_type { query_builder = query_builder.bind(st); } if let Some(q) = query { let search_pattern = format!("%{}%", q.to_lowercase()); query_builder = query_builder.bind(search_pattern); } query_builder = query_builder.bind(limit); query_builder.fetch_all(pool).await } /// Add or update a repository history entry. /// If an entry with the same URL (for remote) or path (for local) already exists, /// increment use_count and update last_used_at and name. /// Otherwise, create a new entry. pub async fn add_or_update_repository_history( pool: &PgPool, owner_id: Uuid, name: &str, repository_url: Option<&str>, local_path: Option<&str>, source_type: &str, ) -> Result { // Use UPSERT (INSERT ... ON CONFLICT) if source_type == "remote" { let url = repository_url.ok_or_else(|| { sqlx::Error::Protocol("repository_url required for remote type".to_string()) })?; sqlx::query_as::<_, RepositoryHistoryEntry>( r#" INSERT INTO repository_history (owner_id, name, repository_url, local_path, source_type, use_count, last_used_at) VALUES ($1, $2, $3, NULL, $4, 1, NOW()) ON CONFLICT (owner_id, repository_url) WHERE source_type = 'remote' AND repository_url IS NOT NULL DO UPDATE SET name = EXCLUDED.name, use_count = repository_history.use_count + 1, last_used_at = NOW() RETURNING id, owner_id, name, repository_url, local_path, source_type, use_count, last_used_at, created_at "#, ) .bind(owner_id) .bind(name) .bind(url) .bind(source_type) .fetch_one(pool) .await } else if source_type == "local" { let path = local_path.ok_or_else(|| { sqlx::Error::Protocol("local_path required for local type".to_string()) })?; sqlx::query_as::<_, RepositoryHistoryEntry>( r#" INSERT INTO repository_history (owner_id, name, repository_url, local_path, source_type, use_count, last_used_at) VALUES ($1, $2, NULL, $3, $4, 1, NOW()) ON CONFLICT (owner_id, local_path) WHERE source_type = 'local' AND local_path IS NOT NULL DO UPDATE SET name = EXCLUDED.name, use_count = repository_history.use_count + 1, last_used_at = NOW() RETURNING id, owner_id, name, repository_url, local_path, source_type, use_count, last_used_at, created_at "#, ) .bind(owner_id) .bind(name) .bind(path) .bind(source_type) .fetch_one(pool) .await } else { Err(sqlx::Error::Protocol(format!( "Invalid source_type: {}", source_type ))) } } /// Delete a repository history entry. /// Returns true if an entry was deleted, false if not found. pub async fn delete_repository_history( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result { let result = sqlx::query( r#" DELETE FROM repository_history WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } // ============================================================================ // Conversation Snapshots // ============================================================================ /// Create a new conversation snapshot pub async fn create_conversation_snapshot( pool: &PgPool, task_id: Uuid, checkpoint_id: Option, snapshot_type: &str, message_count: i32, conversation_state: serde_json::Value, metadata: Option, ) -> Result { sqlx::query_as::<_, ConversationSnapshot>( r#" INSERT INTO conversation_snapshots (task_id, checkpoint_id, snapshot_type, message_count, conversation_state, metadata) VALUES ($1, $2, $3, $4, $5, $6) RETURNING * "# ) .bind(task_id) .bind(checkpoint_id) .bind(snapshot_type) .bind(message_count) .bind(conversation_state) .bind(metadata) .fetch_one(pool) .await } /// Get a conversation snapshot by ID pub async fn get_conversation_snapshot( pool: &PgPool, id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, ConversationSnapshot>( "SELECT * FROM conversation_snapshots WHERE id = $1" ) .bind(id) .fetch_optional(pool) .await } /// Get conversation snapshot at a specific checkpoint pub async fn get_conversation_at_checkpoint( pool: &PgPool, checkpoint_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, ConversationSnapshot>( "SELECT * FROM conversation_snapshots WHERE checkpoint_id = $1 ORDER BY created_at DESC LIMIT 1" ) .bind(checkpoint_id) .fetch_optional(pool) .await } /// List conversation snapshots for a task pub async fn list_conversation_snapshots( pool: &PgPool, task_id: Uuid, limit: Option, ) -> Result, sqlx::Error> { let limit = limit.unwrap_or(100); sqlx::query_as::<_, ConversationSnapshot>( "SELECT * FROM conversation_snapshots WHERE task_id = $1 ORDER BY created_at DESC LIMIT $2" ) .bind(task_id) .bind(limit) .fetch_all(pool) .await } /// Delete conversation snapshots older than retention period pub async fn cleanup_old_snapshots( pool: &PgPool, retention_days: i32, ) -> Result { let result = sqlx::query( "DELETE FROM conversation_snapshots WHERE created_at < NOW() - INTERVAL '1 day' * $1" ) .bind(retention_days) .execute(pool) .await?; Ok(result.rows_affected()) } // ============================================================================ // History Events // ============================================================================ /// Record a new history event #[allow(clippy::too_many_arguments)] pub async fn record_history_event( pool: &PgPool, owner_id: Uuid, contract_id: Option, task_id: Option, event_type: &str, event_subtype: Option<&str>, phase: Option<&str>, event_data: serde_json::Value, ) -> Result { sqlx::query_as::<_, HistoryEvent>( r#" INSERT INTO history_events (owner_id, contract_id, task_id, event_type, event_subtype, phase, event_data) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING * "# ) .bind(owner_id) .bind(contract_id) .bind(task_id) .bind(event_type) .bind(event_subtype) .bind(phase) .bind(event_data) .fetch_one(pool) .await } /// Get contract history timeline pub async fn get_contract_history( pool: &PgPool, contract_id: Uuid, owner_id: Uuid, filters: &HistoryQueryFilters, ) -> Result<(Vec, i64), sqlx::Error> { let limit = filters.limit.unwrap_or(100); let mut query = String::from( "SELECT * FROM history_events WHERE contract_id = $1 AND owner_id = $2" ); let mut count_query = String::from( "SELECT COUNT(*) FROM history_events WHERE contract_id = $1 AND owner_id = $2" ); let mut param_count = 2; if filters.phase.is_some() { param_count += 1; query.push_str(&format!(" AND phase = ${}" , param_count)); count_query.push_str(&format!(" AND phase = ${}", param_count)); } if filters.from.is_some() { param_count += 1; query.push_str(&format!(" AND created_at >= ${}", param_count)); count_query.push_str(&format!(" AND created_at >= ${}", param_count)); } if filters.to.is_some() { param_count += 1; query.push_str(&format!(" AND created_at <= ${}", param_count)); count_query.push_str(&format!(" AND created_at <= ${}", param_count)); } query.push_str(" ORDER BY created_at DESC"); query.push_str(&format!(" LIMIT {}", limit)); // Build and execute the query dynamically let mut q = sqlx::query_as::<_, HistoryEvent>(&query) .bind(contract_id) .bind(owner_id); if let Some(ref phase) = filters.phase { q = q.bind(phase); } if let Some(ref from) = filters.from { q = q.bind(from); } if let Some(ref to) = filters.to { q = q.bind(to); } let events = q.fetch_all(pool).await?; // Get total count let mut cq = sqlx::query_scalar::<_, i64>(&count_query) .bind(contract_id) .bind(owner_id); if let Some(ref phase) = filters.phase { cq = cq.bind(phase); } if let Some(ref from) = filters.from { cq = cq.bind(from); } if let Some(ref to) = filters.to { cq = cq.bind(to); } let count = cq.fetch_one(pool).await?; Ok((events, count)) } /// Get task history pub async fn get_task_history( pool: &PgPool, task_id: Uuid, owner_id: Uuid, filters: &HistoryQueryFilters, ) -> Result<(Vec, i64), sqlx::Error> { let limit = filters.limit.unwrap_or(100); let events = sqlx::query_as::<_, HistoryEvent>( r#" SELECT * FROM history_events WHERE task_id = $1 AND owner_id = $2 ORDER BY created_at DESC LIMIT $3 "# ) .bind(task_id) .bind(owner_id) .bind(limit) .fetch_all(pool) .await?; let count: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM history_events WHERE task_id = $1 AND owner_id = $2" ) .bind(task_id) .bind(owner_id) .fetch_one(pool) .await?; Ok((events, count)) } /// Get unified timeline for an owner pub async fn get_timeline( pool: &PgPool, owner_id: Uuid, filters: &HistoryQueryFilters, ) -> Result<(Vec, i64), sqlx::Error> { let limit = filters.limit.unwrap_or(100); let events = sqlx::query_as::<_, HistoryEvent>( r#" SELECT * FROM history_events WHERE owner_id = $1 ORDER BY created_at DESC LIMIT $2 "# ) .bind(owner_id) .bind(limit) .fetch_all(pool) .await?; let count: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM history_events WHERE owner_id = $1" ) .bind(owner_id) .fetch_one(pool) .await?; Ok((events, count)) } // ============================================================================ // Task Conversation Retrieval // ============================================================================ // Helper struct for parsing task output events #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] struct TaskOutputEvent { message_type: String, content: Option, tool_name: Option, tool_input: Option, is_error: Option, cost_usd: Option, } /// Get task conversation messages (reconstructed from task_events) pub async fn get_task_conversation( pool: &PgPool, task_id: Uuid, include_tool_calls: bool, include_tool_results: bool, limit: Option, ) -> Result, sqlx::Error> { let limit = limit.unwrap_or(1000); // Get output events that represent conversation turns let events = sqlx::query_as::<_, TaskEvent>( r#" SELECT * FROM task_events WHERE task_id = $1 AND event_type = 'output' ORDER BY created_at ASC LIMIT $2 "# ) .bind(task_id) .bind(limit) .fetch_all(pool) .await?; // Convert task events to conversation messages let mut messages = Vec::new(); for event in events { if let Some(data) = event.event_data { // Parse the event data to extract message info if let Ok(output) = serde_json::from_value::(data.clone()) { let should_include = match output.message_type.as_str() { "tool_use" => include_tool_calls, "tool_result" => include_tool_results, _ => true, }; if should_include { messages.push(ConversationMessage { id: event.id.to_string(), role: match output.message_type.as_str() { "assistant" => "assistant".to_string(), "tool_use" => "assistant".to_string(), "tool_result" => "tool".to_string(), "system" => "system".to_string(), "error" => "system".to_string(), _ => "user".to_string(), }, content: output.content.unwrap_or_default(), timestamp: event.created_at, tool_calls: None, tool_name: output.tool_name, tool_input: output.tool_input, tool_result: None, is_error: output.is_error, token_count: None, cost_usd: output.cost_usd.map(|c| c as f64), }); } } } } Ok(messages) } /// Get supervisor conversation (from supervisor_states) pub async fn get_supervisor_conversation_full( pool: &PgPool, contract_id: Uuid, ) -> Result, sqlx::Error> { get_supervisor_state(pool, contract_id).await }