//! Repository pattern for file database operations. use chrono::Utc; use sqlx::PgPool; use uuid::Uuid; use super::models::{ CreateFileRequest, CreateTaskRequest, Daemon, File, FileVersion, MeshChatConversation, MeshChatMessageRecord, Task, TaskEvent, TaskSummary, UpdateFileRequest, UpdateTaskRequest, }; /// Repository error types. #[derive(Debug)] pub enum RepositoryError { /// Database error Database(sqlx::Error), /// Version conflict (optimistic locking failure) VersionConflict { /// The version the client expected expected: i32, /// The actual current version in the database actual: i32, }, } impl From for RepositoryError { fn from(e: sqlx::Error) -> Self { RepositoryError::Database(e) } } impl std::fmt::Display for RepositoryError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { RepositoryError::Database(e) => write!(f, "Database error: {}", e), RepositoryError::VersionConflict { expected, actual } => { write!( f, "Version conflict: expected {}, actual {}", expected, actual ) } } } } impl std::error::Error for RepositoryError {} /// Generate a default name based on current timestamp. fn generate_default_name() -> String { let now = Utc::now(); now.format("Recording - %b %d %Y %H:%M:%S").to_string() } /// Create a new file record. pub async fn create_file(pool: &PgPool, req: CreateFileRequest) -> Result { let name = req.name.unwrap_or_else(generate_default_name); let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default(); let body_json = serde_json::to_value::>(vec![]).unwrap(); sqlx::query_as::<_, File>( r#" INSERT INTO files (name, description, transcript, location, summary, body) VALUES ($1, $2, $3, $4, NULL, $5) RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at "#, ) .bind(&name) .bind(&req.description) .bind(&transcript_json) .bind(&req.location) .bind(&body_json) .fetch_one(pool) .await } /// Get a file by ID. pub async fn get_file(pool: &PgPool, id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, File>( r#" SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at FROM files WHERE id = $1 "#, ) .bind(id) .fetch_optional(pool) .await } /// List all files, ordered by created_at DESC. pub async fn list_files(pool: &PgPool) -> Result, sqlx::Error> { sqlx::query_as::<_, File>( r#" SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at FROM files ORDER BY created_at DESC "#, ) .fetch_all(pool) .await } /// Update a file by ID with optimistic locking. /// /// If `req.version` is provided, the update will only succeed if the current /// version matches. Returns `RepositoryError::VersionConflict` if there's a mismatch. /// /// If `req.version` is None (e.g., internal system updates), version checking is skipped. pub async fn update_file( pool: &PgPool, id: Uuid, req: UpdateFileRequest, ) -> Result, RepositoryError> { // Get the existing file first let existing = get_file(pool, id).await?; let Some(existing) = existing else { return Ok(None); }; // Check version if provided (optimistic locking) if let Some(expected_version) = req.version { if existing.version != expected_version { return Err(RepositoryError::VersionConflict { expected: expected_version, actual: existing.version, }); } } // Apply updates let name = req.name.unwrap_or(existing.name); let description = req.description.or(existing.description); let transcript = req.transcript.unwrap_or(existing.transcript); let transcript_json = serde_json::to_value(&transcript).unwrap_or_default(); let summary = req.summary.or(existing.summary); let body = req.body.unwrap_or(existing.body); let body_json = serde_json::to_value(&body).unwrap_or_default(); // Update with version check in WHERE clause for race condition safety let result = if req.version.is_some() { sqlx::query_as::<_, File>( r#" UPDATE files SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW() WHERE id = $1 AND version = $7 RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at "#, ) .bind(id) .bind(&name) .bind(&description) .bind(&transcript_json) .bind(&summary) .bind(&body_json) .bind(req.version.unwrap()) .fetch_optional(pool) .await? } else { // No version check for internal updates sqlx::query_as::<_, File>( r#" UPDATE files SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW() WHERE id = $1 RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at "#, ) .bind(id) .bind(&name) .bind(&description) .bind(&transcript_json) .bind(&summary) .bind(&body_json) .fetch_optional(pool) .await? }; // If versioned update returned None, there was a race condition if result.is_none() && req.version.is_some() { // Re-fetch to get the actual version if let Some(current) = get_file(pool, id).await? { return Err(RepositoryError::VersionConflict { expected: req.version.unwrap(), actual: current.version, }); } } Ok(result) } /// Delete a file by ID. pub async fn delete_file(pool: &PgPool, id: Uuid) -> Result { let result = sqlx::query( r#" DELETE FROM files WHERE id = $1 "#, ) .bind(id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Count total files. pub async fn count_files(pool: &PgPool) -> Result { let result: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM files") .fetch_one(pool) .await?; Ok(result.0) } // ============================================================================= // Owner-Scoped File Functions // ============================================================================= /// Create a new file record for a specific owner. pub async fn create_file_for_owner( pool: &PgPool, owner_id: Uuid, req: CreateFileRequest, ) -> Result { let name = req.name.unwrap_or_else(generate_default_name); let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default(); let body_json = serde_json::to_value::>(vec![]).unwrap(); sqlx::query_as::<_, File>( r#" INSERT INTO files (owner_id, name, description, transcript, location, summary, body) VALUES ($1, $2, $3, $4, $5, NULL, $6) RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at "#, ) .bind(owner_id) .bind(&name) .bind(&req.description) .bind(&transcript_json) .bind(&req.location) .bind(&body_json) .fetch_one(pool) .await } /// Get a file by ID, scoped to owner. pub async fn get_file_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, File>( r#" SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at FROM files WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await } /// List all files for an owner, ordered by created_at DESC. pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, File>( r#" SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at FROM files WHERE owner_id = $1 ORDER BY created_at DESC "#, ) .bind(owner_id) .fetch_all(pool) .await } /// Update a file by ID with optimistic locking, scoped to owner. pub async fn update_file_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, req: UpdateFileRequest, ) -> Result, RepositoryError> { // Get the existing file first (scoped to owner) let existing = get_file_for_owner(pool, id, owner_id).await?; let Some(existing) = existing else { return Ok(None); }; // Check version if provided (optimistic locking) if let Some(expected_version) = req.version { if existing.version != expected_version { return Err(RepositoryError::VersionConflict { expected: expected_version, actual: existing.version, }); } } // Apply updates let name = req.name.unwrap_or(existing.name); let description = req.description.or(existing.description); let transcript = req.transcript.unwrap_or(existing.transcript); let transcript_json = serde_json::to_value(&transcript).unwrap_or_default(); let summary = req.summary.or(existing.summary); let body = req.body.unwrap_or(existing.body); let body_json = serde_json::to_value(&body).unwrap_or_default(); // Update with version check in WHERE clause for race condition safety let result = if req.version.is_some() { sqlx::query_as::<_, File>( r#" UPDATE files SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() WHERE id = $1 AND owner_id = $2 AND version = $8 RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at "#, ) .bind(id) .bind(owner_id) .bind(&name) .bind(&description) .bind(&transcript_json) .bind(&summary) .bind(&body_json) .bind(req.version.unwrap()) .fetch_optional(pool) .await? } else { // No version check for internal updates sqlx::query_as::<_, File>( r#" UPDATE files SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() WHERE id = $1 AND owner_id = $2 RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at "#, ) .bind(id) .bind(owner_id) .bind(&name) .bind(&description) .bind(&transcript_json) .bind(&summary) .bind(&body_json) .fetch_optional(pool) .await? }; // If versioned update returned None, there was a race condition if result.is_none() && req.version.is_some() { // Re-fetch to get the actual version if let Some(current) = get_file_for_owner(pool, id, owner_id).await? { return Err(RepositoryError::VersionConflict { expected: req.version.unwrap(), actual: current.version, }); } } Ok(result) } /// Delete a file by ID, scoped to owner. pub async fn delete_file_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result { let result = sqlx::query( r#" DELETE FROM files WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } // ============================================================================= // Version History Functions // ============================================================================= /// Set the version source for the current transaction. /// This is used by the trigger to record who made the change. pub async fn set_version_source(pool: &PgPool, source: &str) -> Result<(), sqlx::Error> { sqlx::query(&format!("SET LOCAL app.version_source = '{}'", source)) .execute(pool) .await?; Ok(()) } /// Set the change description for the current transaction. pub async fn set_change_description(pool: &PgPool, description: &str) -> Result<(), sqlx::Error> { // Escape single quotes for SQL let escaped = description.replace('\'', "''"); sqlx::query(&format!("SET LOCAL app.change_description = '{}'", escaped)) .execute(pool) .await?; Ok(()) } /// List all versions of a file, ordered by version DESC. pub async fn list_file_versions(pool: &PgPool, file_id: Uuid) -> Result, sqlx::Error> { // First get the current version from the files table let current = get_file(pool, file_id).await?; let mut versions = sqlx::query_as::<_, FileVersion>( r#" SELECT id, file_id, version, name, description, summary, body, source, change_description, created_at FROM file_versions WHERE file_id = $1 ORDER BY version DESC "#, ) .bind(file_id) .fetch_all(pool) .await?; // Add the current version as the first entry if it exists if let Some(file) = current { let current_version = FileVersion { id: file.id, file_id: file.id, version: file.version, name: file.name, description: file.description, summary: file.summary, body: file.body, source: "user".to_string(), // Current version source change_description: None, created_at: file.updated_at, }; versions.insert(0, current_version); } Ok(versions) } /// Get a specific version of a file. pub async fn get_file_version( pool: &PgPool, file_id: Uuid, version: i32, ) -> Result, sqlx::Error> { // First check if this is the current version if let Some(file) = get_file(pool, file_id).await? { if file.version == version { return Ok(Some(FileVersion { id: file.id, file_id: file.id, version: file.version, name: file.name, description: file.description, summary: file.summary, body: file.body, source: "user".to_string(), change_description: None, created_at: file.updated_at, })); } } // Otherwise, look in the versions table sqlx::query_as::<_, FileVersion>( r#" SELECT id, file_id, version, name, description, summary, body, source, change_description, created_at FROM file_versions WHERE file_id = $1 AND version = $2 "#, ) .bind(file_id) .bind(version) .fetch_optional(pool) .await } /// Restore a file to a previous version. /// This creates a new version with the content from the target version. pub async fn restore_file_version( pool: &PgPool, file_id: Uuid, target_version: i32, current_version: i32, ) -> Result, RepositoryError> { // Get the target version content let target = get_file_version(pool, file_id, target_version).await?; let Some(target) = target else { return Ok(None); }; // Set version source and description for the trigger set_version_source(pool, "system").await?; set_change_description(pool, &format!("Restored from version {}", target_version)).await?; // Update the file with the target version's content // This will trigger the save_file_version trigger to save the current state first let update_req = UpdateFileRequest { name: Some(target.name), description: target.description, transcript: None, summary: target.summary, body: Some(target.body), version: Some(current_version), }; update_file(pool, file_id, update_req).await } /// Count versions for a file. pub async fn count_file_versions(pool: &PgPool, file_id: Uuid) -> Result { let result: (i64,) = sqlx::query_as( "SELECT COUNT(*) + 1 FROM file_versions WHERE file_id = $1", // +1 for current version ) .bind(file_id) .fetch_one(pool) .await?; Ok(result.0) } // ============================================================================= // Task Functions // ============================================================================= /// Create a new task. /// /// If creating a subtask (parent_task_id is set) and repository settings are not provided, /// the subtask will inherit repository_url, base_branch, target_branch, merge_mode, /// and target_repo_path from the parent task. Depth is calculated from parent and limited /// to max 1 (2 levels: orchestrator at depth 0, subtasks at depth 1). /// /// NOTE: completion_action is NOT inherited - subtasks should not auto-merge unless /// explicitly configured. The orchestrator controls when completion steps happen. pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result { // Calculate depth and inherit settings from parent if applicable let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = if let Some(parent_id) = req.parent_task_id { // Fetch parent task to get depth and inherit repo settings let parent = get_task(pool, parent_id).await? .ok_or_else(|| sqlx::Error::RowNotFound)?; let new_depth = parent.depth + 1; // Validate max depth (must be < 2, i.e., 0 or 1 only) // Orchestrators are at depth 0, subtasks at depth 1 // Subtasks cannot have their own children if new_depth >= 2 { return Err(sqlx::Error::Protocol(format!( "Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.", new_depth ))); } // Inherit repo settings if not provided let repo_url = req.repository_url.clone().or(parent.repository_url); let base_branch = req.base_branch.clone().or(parent.base_branch); let target_branch = req.target_branch.clone().or(parent.target_branch); let merge_mode = req.merge_mode.clone().or(parent.merge_mode); let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path); // NOTE: completion_action is NOT inherited - subtasks should not auto-merge. // The orchestrator integrates subtask work from their worktrees. let completion_action = req.completion_action.clone(); (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) } else { // Top-level task: depth 0 ( 0, req.repository_url.clone(), req.base_branch.clone(), req.target_branch.clone(), req.merge_mode.clone(), req.target_repo_path.clone(), req.completion_action.clone(), ) }; let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default()); sqlx::query_as::<_, Task>( r#" INSERT INTO tasks ( parent_task_id, depth, name, description, plan, priority, repository_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action, continue_from_task_id, copy_files ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) RETURNING * "#, ) .bind(req.parent_task_id) .bind(depth) .bind(&req.name) .bind(&req.description) .bind(&req.plan) .bind(req.priority) .bind(&repo_url) .bind(&base_branch) .bind(&target_branch) .bind(&merge_mode) .bind(&target_repo_path) .bind(&completion_action) .bind(&req.continue_from_task_id) .bind(©_files_json) .fetch_one(pool) .await } /// Get a task by ID. pub async fn get_task(pool: &PgPool, id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" SELECT * FROM tasks WHERE id = $1 "#, ) .bind(id) .fetch_optional(pool) .await } /// List all top-level tasks (no parent), ordered by created_at DESC. pub async fn list_tasks(pool: &PgPool) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, t.version, t.created_at, t.updated_at, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count FROM tasks t WHERE t.parent_task_id IS NULL ORDER BY t.priority DESC, t.created_at DESC "#, ) .fetch_all(pool) .await } /// List subtasks of a parent task. pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, t.version, t.created_at, t.updated_at, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count FROM tasks t WHERE t.parent_task_id = $1 ORDER BY t.priority DESC, t.created_at DESC "#, ) .bind(parent_id) .fetch_all(pool) .await } /// Update a task by ID with optimistic locking. pub async fn update_task( pool: &PgPool, id: Uuid, req: UpdateTaskRequest, ) -> Result, RepositoryError> { // Get the existing task first let existing = get_task(pool, id).await?; let Some(existing) = existing else { return Ok(None); }; // Check version if provided (optimistic locking) if let Some(expected_version) = req.version { if existing.version != expected_version { return Err(RepositoryError::VersionConflict { expected: expected_version, actual: existing.version, }); } } // Apply updates let name = req.name.unwrap_or(existing.name); let description = req.description.or(existing.description); let plan = req.plan.unwrap_or(existing.plan); let status = req.status.unwrap_or(existing.status); let priority = req.priority.unwrap_or(existing.priority); let progress_summary = req.progress_summary.or(existing.progress_summary); let last_output = req.last_output.or(existing.last_output); let error_message = req.error_message.or(existing.error_message); let merge_mode = req.merge_mode.or(existing.merge_mode); let pr_url = req.pr_url.or(existing.pr_url); let target_repo_path = req.target_repo_path.or(existing.target_repo_path); let completion_action = req.completion_action.or(existing.completion_action); // Handle clear_daemon_id: if true, set to NULL; otherwise use provided value or keep existing let daemon_id = if req.clear_daemon_id { None } else { req.daemon_id.or(existing.daemon_id) }; // Update with version check in WHERE clause for race condition safety let result = if req.version.is_some() { sqlx::query_as::<_, Task>( r#" UPDATE tasks SET name = $2, description = $3, plan = $4, status = $5, priority = $6, progress_summary = $7, last_output = $8, error_message = $9, merge_mode = $10, pr_url = $11, daemon_id = $12, target_repo_path = $13, completion_action = $14, updated_at = NOW() WHERE id = $1 AND version = $15 RETURNING * "#, ) .bind(id) .bind(&name) .bind(&description) .bind(&plan) .bind(&status) .bind(priority) .bind(&progress_summary) .bind(&last_output) .bind(&error_message) .bind(&merge_mode) .bind(&pr_url) .bind(daemon_id) .bind(&target_repo_path) .bind(&completion_action) .bind(req.version.unwrap()) .fetch_optional(pool) .await? } else { sqlx::query_as::<_, Task>( r#" UPDATE tasks SET name = $2, description = $3, plan = $4, status = $5, priority = $6, progress_summary = $7, last_output = $8, error_message = $9, merge_mode = $10, pr_url = $11, daemon_id = $12, target_repo_path = $13, completion_action = $14, updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(id) .bind(&name) .bind(&description) .bind(&plan) .bind(&status) .bind(priority) .bind(&progress_summary) .bind(&last_output) .bind(&error_message) .bind(&merge_mode) .bind(&pr_url) .bind(daemon_id) .bind(&target_repo_path) .bind(&completion_action) .fetch_optional(pool) .await? }; // If versioned update returned None, there was a race condition if result.is_none() && req.version.is_some() { if let Some(current) = get_task(pool, id).await? { return Err(RepositoryError::VersionConflict { expected: req.version.unwrap(), actual: current.version, }); } } Ok(result) } /// Delete a task by ID. pub async fn delete_task(pool: &PgPool, id: Uuid) -> Result { let result = sqlx::query( r#" DELETE FROM tasks WHERE id = $1 "#, ) .bind(id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Count total tasks. pub async fn count_tasks(pool: &PgPool) -> Result { let result: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM tasks WHERE parent_task_id IS NULL", ) .fetch_one(pool) .await?; Ok(result.0) } // ============================================================================= // Owner-Scoped Task Functions // ============================================================================= /// Create a new task for a specific owner. pub async fn create_task_for_owner( pool: &PgPool, owner_id: Uuid, req: CreateTaskRequest, ) -> Result { // Calculate depth and inherit settings from parent if applicable let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = if let Some(parent_id) = req.parent_task_id { // Fetch parent task to get depth and inherit repo settings (must belong to same owner) let parent = get_task_for_owner(pool, parent_id, owner_id).await? .ok_or_else(|| sqlx::Error::RowNotFound)?; let new_depth = parent.depth + 1; // Validate max depth if new_depth >= 2 { return Err(sqlx::Error::Protocol(format!( "Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.", new_depth ))); } // Inherit repo settings if not provided let repo_url = req.repository_url.clone().or(parent.repository_url); let base_branch = req.base_branch.clone().or(parent.base_branch); let target_branch = req.target_branch.clone().or(parent.target_branch); let merge_mode = req.merge_mode.clone().or(parent.merge_mode); let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path); // NOTE: completion_action is NOT inherited - subtasks should not auto-merge. // The orchestrator integrates subtask work from their worktrees. let completion_action = req.completion_action.clone(); (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) } else { // Top-level task: depth 0 ( 0, req.repository_url.clone(), req.base_branch.clone(), req.target_branch.clone(), req.merge_mode.clone(), req.target_repo_path.clone(), req.completion_action.clone(), ) }; let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default()); sqlx::query_as::<_, Task>( r#" INSERT INTO tasks ( owner_id, parent_task_id, depth, name, description, plan, priority, repository_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action, continue_from_task_id, copy_files ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15) RETURNING * "#, ) .bind(owner_id) .bind(req.parent_task_id) .bind(depth) .bind(&req.name) .bind(&req.description) .bind(&req.plan) .bind(req.priority) .bind(&repo_url) .bind(&base_branch) .bind(&target_branch) .bind(&merge_mode) .bind(&target_repo_path) .bind(&completion_action) .bind(&req.continue_from_task_id) .bind(©_files_json) .fetch_one(pool) .await } /// Get a task by ID, scoped to owner. pub async fn get_task_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" SELECT * FROM tasks WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await } /// List all top-level tasks (no parent) for an owner, ordered by created_at DESC. pub async fn list_tasks_for_owner( pool: &PgPool, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, t.version, t.created_at, t.updated_at, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count FROM tasks t WHERE t.owner_id = $1 AND t.parent_task_id IS NULL ORDER BY t.priority DESC, t.created_at DESC "#, ) .bind(owner_id) .fetch_all(pool) .await } /// List subtasks of a parent task, scoped to owner. pub async fn list_subtasks_for_owner( pool: &PgPool, parent_id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, t.version, t.created_at, t.updated_at, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count FROM tasks t WHERE t.owner_id = $1 AND t.parent_task_id = $2 ORDER BY t.priority DESC, t.created_at DESC "#, ) .bind(owner_id) .bind(parent_id) .fetch_all(pool) .await } /// Update a task by ID with optimistic locking, scoped to owner. pub async fn update_task_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, req: UpdateTaskRequest, ) -> Result, RepositoryError> { // Get the existing task first (scoped to owner) let existing = get_task_for_owner(pool, id, owner_id).await?; let Some(existing) = existing else { return Ok(None); }; // Check version if provided (optimistic locking) if let Some(expected_version) = req.version { if existing.version != expected_version { return Err(RepositoryError::VersionConflict { expected: expected_version, actual: existing.version, }); } } // Apply updates let name = req.name.unwrap_or(existing.name); let description = req.description.or(existing.description); let plan = req.plan.unwrap_or(existing.plan); let status = req.status.unwrap_or(existing.status); let priority = req.priority.unwrap_or(existing.priority); let progress_summary = req.progress_summary.or(existing.progress_summary); let last_output = req.last_output.or(existing.last_output); let error_message = req.error_message.or(existing.error_message); let merge_mode = req.merge_mode.or(existing.merge_mode); let pr_url = req.pr_url.or(existing.pr_url); let target_repo_path = req.target_repo_path.or(existing.target_repo_path); let completion_action = req.completion_action.or(existing.completion_action); let daemon_id = if req.clear_daemon_id { None } else { req.daemon_id.or(existing.daemon_id) }; // Update with version check in WHERE clause for race condition safety let result = if req.version.is_some() { sqlx::query_as::<_, Task>( r#" UPDATE tasks SET name = $3, description = $4, plan = $5, status = $6, priority = $7, progress_summary = $8, last_output = $9, error_message = $10, merge_mode = $11, pr_url = $12, daemon_id = $13, target_repo_path = $14, completion_action = $15, updated_at = NOW() WHERE id = $1 AND owner_id = $2 AND version = $16 RETURNING * "#, ) .bind(id) .bind(owner_id) .bind(&name) .bind(&description) .bind(&plan) .bind(&status) .bind(priority) .bind(&progress_summary) .bind(&last_output) .bind(&error_message) .bind(&merge_mode) .bind(&pr_url) .bind(daemon_id) .bind(&target_repo_path) .bind(&completion_action) .bind(req.version.unwrap()) .fetch_optional(pool) .await? } else { sqlx::query_as::<_, Task>( r#" UPDATE tasks SET name = $3, description = $4, plan = $5, status = $6, priority = $7, progress_summary = $8, last_output = $9, error_message = $10, merge_mode = $11, pr_url = $12, daemon_id = $13, target_repo_path = $14, completion_action = $15, updated_at = NOW() WHERE id = $1 AND owner_id = $2 RETURNING * "#, ) .bind(id) .bind(owner_id) .bind(&name) .bind(&description) .bind(&plan) .bind(&status) .bind(priority) .bind(&progress_summary) .bind(&last_output) .bind(&error_message) .bind(&merge_mode) .bind(&pr_url) .bind(daemon_id) .bind(&target_repo_path) .bind(&completion_action) .fetch_optional(pool) .await? }; // If versioned update returned None, there was a race condition if result.is_none() && req.version.is_some() { if let Some(current) = get_task_for_owner(pool, id, owner_id).await? { return Err(RepositoryError::VersionConflict { expected: req.version.unwrap(), actual: current.version, }); } } Ok(result) } /// Delete a task by ID, scoped to owner. pub async fn delete_task_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result { let result = sqlx::query( r#" DELETE FROM tasks WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Update task status and record event. pub async fn update_task_status( pool: &PgPool, id: Uuid, new_status: &str, event_data: Option, ) -> Result, sqlx::Error> { // Get existing status let existing = get_task(pool, id).await?; let Some(existing) = existing else { return Ok(None); }; let previous_status = existing.status.clone(); // Update task status let task = sqlx::query_as::<_, Task>( r#" UPDATE tasks SET status = $2, updated_at = NOW(), started_at = CASE WHEN $2 = 'running' AND started_at IS NULL THEN NOW() ELSE started_at END, completed_at = CASE WHEN $2 IN ('done', 'failed', 'merged') THEN NOW() ELSE completed_at END WHERE id = $1 RETURNING * "#, ) .bind(id) .bind(new_status) .fetch_optional(pool) .await?; // Record event if task.is_some() { let _ = create_task_event( pool, id, "status_change", Some(&previous_status), Some(new_status), event_data, ) .await; } Ok(task) } // ============================================================================= // Task Event Functions // ============================================================================= /// Create a task event. pub async fn create_task_event( pool: &PgPool, task_id: Uuid, event_type: &str, previous_status: Option<&str>, new_status: Option<&str>, event_data: Option, ) -> Result { sqlx::query_as::<_, TaskEvent>( r#" INSERT INTO task_events (task_id, event_type, previous_status, new_status, event_data) VALUES ($1, $2, $3, $4, $5) RETURNING * "#, ) .bind(task_id) .bind(event_type) .bind(previous_status) .bind(new_status) .bind(event_data) .fetch_one(pool) .await } /// List events for a task. pub async fn list_task_events( pool: &PgPool, task_id: Uuid, limit: Option, ) -> Result, sqlx::Error> { let limit = limit.unwrap_or(100); sqlx::query_as::<_, TaskEvent>( r#" SELECT * FROM task_events WHERE task_id = $1 ORDER BY created_at DESC LIMIT $2 "#, ) .bind(task_id) .bind(limit) .fetch_all(pool) .await } // ============================================================================= // Daemon Functions // ============================================================================= /// Register a new daemon connection. pub async fn register_daemon( pool: &PgPool, owner_id: Uuid, connection_id: &str, hostname: Option<&str>, machine_id: Option<&str>, max_concurrent_tasks: i32, ) -> Result { sqlx::query_as::<_, Daemon>( r#" INSERT INTO daemons (owner_id, connection_id, hostname, machine_id, max_concurrent_tasks) VALUES ($1, $2, $3, $4, $5) RETURNING * "#, ) .bind(owner_id) .bind(connection_id) .bind(hostname) .bind(machine_id) .bind(max_concurrent_tasks) .fetch_one(pool) .await } /// Get a daemon by ID. pub async fn get_daemon(pool: &PgPool, id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, Daemon>( r#" SELECT * FROM daemons WHERE id = $1 "#, ) .bind(id) .fetch_optional(pool) .await } /// Get a daemon by connection ID. pub async fn get_daemon_by_connection( pool: &PgPool, connection_id: &str, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Daemon>( r#" SELECT * FROM daemons WHERE connection_id = $1 "#, ) .bind(connection_id) .fetch_optional(pool) .await } /// List all daemons. pub async fn list_daemons(pool: &PgPool) -> Result, sqlx::Error> { sqlx::query_as::<_, Daemon>( r#" SELECT * FROM daemons ORDER BY connected_at DESC "#, ) .fetch_all(pool) .await } /// List daemons for a specific owner. pub async fn list_daemons_for_owner(pool: &PgPool, owner_id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, Daemon>( r#" SELECT * FROM daemons WHERE owner_id = $1 ORDER BY connected_at DESC "#, ) .bind(owner_id) .fetch_all(pool) .await } /// Get a daemon by ID for a specific owner. pub async fn get_daemon_for_owner(pool: &PgPool, id: Uuid, owner_id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, Daemon>( r#" SELECT * FROM daemons WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await } /// Update daemon heartbeat. pub async fn update_daemon_heartbeat(pool: &PgPool, id: Uuid) -> Result { let result = sqlx::query( r#" UPDATE daemons SET last_heartbeat_at = NOW(), status = 'connected' WHERE id = $1 "#, ) .bind(id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Update daemon status. pub async fn update_daemon_status( pool: &PgPool, id: Uuid, status: &str, ) -> Result { let result = sqlx::query( r#" UPDATE daemons SET status = $2, disconnected_at = CASE WHEN $2 = 'disconnected' THEN NOW() ELSE disconnected_at END WHERE id = $1 "#, ) .bind(id) .bind(status) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Update daemon task count. pub async fn update_daemon_task_count( pool: &PgPool, id: Uuid, delta: i32, ) -> Result { let result = sqlx::query( r#" UPDATE daemons SET current_task_count = GREATEST(0, current_task_count + $2) WHERE id = $1 "#, ) .bind(id) .bind(delta) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Delete a daemon by ID. pub async fn delete_daemon(pool: &PgPool, id: Uuid) -> Result { let result = sqlx::query( r#" DELETE FROM daemons WHERE id = $1 "#, ) .bind(id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Delete a daemon by connection ID. pub async fn delete_daemon_by_connection( pool: &PgPool, connection_id: &str, ) -> Result { let result = sqlx::query( r#" DELETE FROM daemons WHERE connection_id = $1 "#, ) .bind(connection_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Count connected daemons. pub async fn count_daemons(pool: &PgPool) -> Result { let result: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM daemons WHERE status = 'connected'", ) .fetch_one(pool) .await?; Ok(result.0) } // ============================================================================= // Sibling Awareness Functions // ============================================================================= /// List sibling tasks (tasks with the same parent, excluding the given task). pub async fn list_sibling_tasks( pool: &PgPool, task_id: Uuid, parent_id: Option, ) -> Result, sqlx::Error> { match parent_id { Some(parent) => { sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, t.version, t.created_at, t.updated_at, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count FROM tasks t WHERE t.parent_task_id = $1 AND t.id != $2 ORDER BY t.priority DESC, t.created_at DESC "#, ) .bind(parent) .bind(task_id) .fetch_all(pool) .await } None => { // Top-level tasks (no parent) - siblings are other top-level tasks sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, t.version, t.created_at, t.updated_at, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count FROM tasks t WHERE t.parent_task_id IS NULL AND t.id != $1 ORDER BY t.priority DESC, t.created_at DESC "#, ) .bind(task_id) .fetch_all(pool) .await } } } /// Get running sibling tasks (for context injection). pub async fn get_running_siblings( pool: &PgPool, owner_id: Uuid, task_id: Uuid, parent_id: Option, ) -> Result, sqlx::Error> { match parent_id { Some(parent) => { sqlx::query_as::<_, Task>( r#" SELECT * FROM tasks t WHERE t.owner_id = $1 AND t.parent_task_id = $2 AND t.id != $3 AND t.status = 'running' ORDER BY t.priority DESC "#, ) .bind(owner_id) .bind(parent) .bind(task_id) .fetch_all(pool) .await } None => { sqlx::query_as::<_, Task>( r#" SELECT * FROM tasks t WHERE t.owner_id = $1 AND t.parent_task_id IS NULL AND t.id != $2 AND t.status = 'running' ORDER BY t.priority DESC "#, ) .bind(owner_id) .bind(task_id) .fetch_all(pool) .await } } } /// Get task with its siblings for context awareness. pub async fn get_task_with_siblings( pool: &PgPool, id: Uuid, ) -> Result)>, sqlx::Error> { let task = get_task(pool, id).await?; let Some(task) = task else { return Ok(None); }; let siblings = list_sibling_tasks(pool, id, task.parent_task_id).await?; Ok(Some((task, siblings))) } // ============================================================================= // Task Output Persistence Functions // ============================================================================= /// Save task output to the database. /// This stores output in the task_events table with event_type='output'. pub async fn save_task_output( pool: &PgPool, task_id: Uuid, message_type: &str, content: &str, tool_name: Option<&str>, tool_input: Option, is_error: Option, cost_usd: Option, duration_ms: Option, ) -> Result { let event_data = serde_json::json!({ "messageType": message_type, "content": content, "toolName": tool_name, "toolInput": tool_input, "isError": is_error, "costUsd": cost_usd, "durationMs": duration_ms, }); create_task_event(pool, task_id, "output", None, None, Some(event_data)).await } /// Get task output from the database. /// Retrieves all output events for a task, ordered by creation time. pub async fn get_task_output( pool: &PgPool, task_id: Uuid, limit: Option, ) -> Result, sqlx::Error> { let limit = limit.unwrap_or(1000); sqlx::query_as::<_, TaskEvent>( r#" SELECT * FROM task_events WHERE task_id = $1 AND event_type = 'output' ORDER BY created_at ASC LIMIT $2 "#, ) .bind(task_id) .bind(limit) .fetch_all(pool) .await } /// Update task completion status with error message. /// Sets the task status to 'done' or 'failed' and records completion time. pub async fn complete_task( pool: &PgPool, task_id: Uuid, success: bool, error_message: Option<&str>, ) -> Result, sqlx::Error> { let status = if success { "done" } else { "failed" }; let task = sqlx::query_as::<_, Task>( r#" UPDATE tasks SET status = $2, error_message = COALESCE($3, error_message), completed_at = NOW(), updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(task_id) .bind(status) .bind(error_message) .fetch_optional(pool) .await?; // Record completion event if task.is_some() { let event_data = serde_json::json!({ "success": success, "errorMessage": error_message, }); let _ = create_task_event( pool, task_id, "complete", Some("running"), Some(status), Some(event_data), ) .await; } Ok(task) } // ============================================================================= // Mesh Chat History Functions // ============================================================================= /// Get or create the active conversation for an owner. pub async fn get_or_create_active_conversation( pool: &PgPool, owner_id: Uuid, ) -> Result { // Try to get existing active conversation for this owner let existing = sqlx::query_as::<_, MeshChatConversation>( r#" SELECT * FROM mesh_chat_conversations WHERE is_active = true AND owner_id = $1 LIMIT 1 "#, ) .bind(owner_id) .fetch_optional(pool) .await?; if let Some(conv) = existing { return Ok(conv); } // Create new conversation sqlx::query_as::<_, MeshChatConversation>( r#" INSERT INTO mesh_chat_conversations (owner_id, is_active) VALUES ($1, true) RETURNING * "#, ) .bind(owner_id) .fetch_one(pool) .await } /// List messages for a conversation. pub async fn list_chat_messages( pool: &PgPool, conversation_id: Uuid, limit: Option, ) -> Result, sqlx::Error> { let limit = limit.unwrap_or(100); sqlx::query_as::<_, MeshChatMessageRecord>( r#" SELECT * FROM mesh_chat_messages WHERE conversation_id = $1 ORDER BY created_at ASC LIMIT $2 "#, ) .bind(conversation_id) .bind(limit) .fetch_all(pool) .await } /// Add a message to a conversation. #[allow(clippy::too_many_arguments)] pub async fn add_chat_message( pool: &PgPool, conversation_id: Uuid, role: &str, content: &str, context_type: &str, context_task_id: Option, tool_calls: Option, pending_questions: Option, ) -> Result { sqlx::query_as::<_, MeshChatMessageRecord>( r#" INSERT INTO mesh_chat_messages (conversation_id, role, content, context_type, context_task_id, tool_calls, pending_questions) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING * "#, ) .bind(conversation_id) .bind(role) .bind(content) .bind(context_type) .bind(context_task_id) .bind(tool_calls) .bind(pending_questions) .fetch_one(pool) .await } /// Clear conversation (archive existing and create new). pub async fn clear_conversation(pool: &PgPool, owner_id: Uuid) -> Result { // Mark existing as inactive for this owner sqlx::query( r#" UPDATE mesh_chat_conversations SET is_active = false, updated_at = NOW() WHERE is_active = true AND owner_id = $1 "#, ) .bind(owner_id) .execute(pool) .await?; // Create new active conversation get_or_create_active_conversation(pool, owner_id).await }