diff options
Diffstat (limited to 'makima/src/db/repository.rs')
| -rw-r--r-- | makima/src/db/repository.rs | 1393 |
1 files changed, 1370 insertions, 23 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 4137ba6..ce1e97d 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -4,10 +4,10 @@ use chrono::Utc; use sqlx::PgPool; use uuid::Uuid; -use super::models::{CreateFileRequest, File, FileVersion, UpdateFileRequest}; - -/// Default owner ID for anonymous users. -pub const ANONYMOUS_OWNER_ID: Uuid = Uuid::from_u128(0x00000000_0000_0000_0000_000000000002); +use super::models::{ + CreateFileRequest, CreateTaskRequest, Daemon, File, FileVersion, MeshChatConversation, + MeshChatMessageRecord, Task, TaskEvent, TaskSummary, UpdateFileRequest, UpdateTaskRequest, +}; /// Repository error types. #[derive(Debug)] @@ -60,12 +60,11 @@ pub async fn create_file(pool: &PgPool, req: CreateFileRequest) -> Result<File, sqlx::query_as::<_, File>( r#" - INSERT INTO files (owner_id, name, description, transcript, location, summary, body) - VALUES ($1, $2, $3, $4, $5, NULL, $6) + INSERT INTO files (name, description, transcript, location, summary, body) + VALUES ($1, $2, $3, $4, NULL, $5) RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at "#, ) - .bind(ANONYMOUS_OWNER_ID) .bind(&name) .bind(&req.description) .bind(&transcript_json) @@ -81,26 +80,23 @@ pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Err r#" SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at FROM files - WHERE id = $1 AND owner_id = $2 + WHERE id = $1 "#, ) .bind(id) - .bind(ANONYMOUS_OWNER_ID) .fetch_optional(pool) .await } -/// List all files for the owner, ordered by created_at DESC. +/// List all files, ordered by created_at DESC. pub async fn list_files(pool: &PgPool) -> Result<Vec<File>, sqlx::Error> { sqlx::query_as::<_, File>( r#" SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at FROM files - WHERE owner_id = $1 ORDER BY created_at DESC "#, ) - .bind(ANONYMOUS_OWNER_ID) .fetch_all(pool) .await } @@ -146,13 +142,12 @@ pub async fn update_file( sqlx::query_as::<_, File>( r#" UPDATE files - SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() - WHERE id = $1 AND owner_id = $2 AND version = $8 + SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW() + WHERE id = $1 AND version = $7 RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at "#, ) .bind(id) - .bind(ANONYMOUS_OWNER_ID) .bind(&name) .bind(&description) .bind(&transcript_json) @@ -166,13 +161,12 @@ pub async fn update_file( sqlx::query_as::<_, File>( r#" UPDATE files - SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() - WHERE id = $1 AND owner_id = $2 + SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW() + WHERE id = $1 RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at "#, ) .bind(id) - .bind(ANONYMOUS_OWNER_ID) .bind(&name) .bind(&description) .bind(&transcript_json) @@ -201,21 +195,19 @@ pub async fn delete_file(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> { let result = sqlx::query( r#" DELETE FROM files - WHERE id = $1 AND owner_id = $2 + WHERE id = $1 "#, ) .bind(id) - .bind(ANONYMOUS_OWNER_ID) .execute(pool) .await?; Ok(result.rows_affected() > 0) } -/// Count total files for owner. +/// Count total files. pub async fn count_files(pool: &PgPool) -> Result<i64, sqlx::Error> { - let result: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM files WHERE owner_id = $1") - .bind(ANONYMOUS_OWNER_ID) + let result: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM files") .fetch_one(pool) .await?; @@ -223,6 +215,178 @@ pub async fn count_files(pool: &PgPool) -> Result<i64, sqlx::Error> { } // ============================================================================= +// Owner-Scoped File Functions +// ============================================================================= + +/// Create a new file record for a specific owner. +pub async fn create_file_for_owner( + pool: &PgPool, + owner_id: Uuid, + req: CreateFileRequest, +) -> Result<File, sqlx::Error> { + let name = req.name.unwrap_or_else(generate_default_name); + let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default(); + let body_json = serde_json::to_value::<Vec<super::models::BodyElement>>(vec![]).unwrap(); + + sqlx::query_as::<_, File>( + r#" + INSERT INTO files (owner_id, name, description, transcript, location, summary, body) + VALUES ($1, $2, $3, $4, $5, NULL, $6) + RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + "#, + ) + .bind(owner_id) + .bind(&name) + .bind(&req.description) + .bind(&transcript_json) + .bind(&req.location) + .bind(&body_json) + .fetch_one(pool) + .await +} + +/// Get a file by ID, scoped to owner. +pub async fn get_file_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, +) -> Result<Option<File>, sqlx::Error> { + sqlx::query_as::<_, File>( + r#" + SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + FROM files + WHERE id = $1 AND owner_id = $2 + "#, + ) + .bind(id) + .bind(owner_id) + .fetch_optional(pool) + .await +} + +/// List all files for an owner, ordered by created_at DESC. +pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<File>, sqlx::Error> { + sqlx::query_as::<_, File>( + r#" + SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + FROM files + WHERE owner_id = $1 + ORDER BY created_at DESC + "#, + ) + .bind(owner_id) + .fetch_all(pool) + .await +} + +/// Update a file by ID with optimistic locking, scoped to owner. +pub async fn update_file_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, + req: UpdateFileRequest, +) -> Result<Option<File>, RepositoryError> { + // Get the existing file first (scoped to owner) + let existing = get_file_for_owner(pool, id, owner_id).await?; + let Some(existing) = existing else { + return Ok(None); + }; + + // Check version if provided (optimistic locking) + if let Some(expected_version) = req.version { + if existing.version != expected_version { + return Err(RepositoryError::VersionConflict { + expected: expected_version, + actual: existing.version, + }); + } + } + + // Apply updates + let name = req.name.unwrap_or(existing.name); + let description = req.description.or(existing.description); + let transcript = req.transcript.unwrap_or(existing.transcript); + let transcript_json = serde_json::to_value(&transcript).unwrap_or_default(); + let summary = req.summary.or(existing.summary); + let body = req.body.unwrap_or(existing.body); + let body_json = serde_json::to_value(&body).unwrap_or_default(); + + // Update with version check in WHERE clause for race condition safety + let result = if req.version.is_some() { + sqlx::query_as::<_, File>( + r#" + UPDATE files + SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() + WHERE id = $1 AND owner_id = $2 AND version = $8 + RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + "#, + ) + .bind(id) + .bind(owner_id) + .bind(&name) + .bind(&description) + .bind(&transcript_json) + .bind(&summary) + .bind(&body_json) + .bind(req.version.unwrap()) + .fetch_optional(pool) + .await? + } else { + // No version check for internal updates + sqlx::query_as::<_, File>( + r#" + UPDATE files + SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() + WHERE id = $1 AND owner_id = $2 + RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + "#, + ) + .bind(id) + .bind(owner_id) + .bind(&name) + .bind(&description) + .bind(&transcript_json) + .bind(&summary) + .bind(&body_json) + .fetch_optional(pool) + .await? + }; + + // If versioned update returned None, there was a race condition + if result.is_none() && req.version.is_some() { + // Re-fetch to get the actual version + if let Some(current) = get_file_for_owner(pool, id, owner_id).await? { + return Err(RepositoryError::VersionConflict { + expected: req.version.unwrap(), + actual: current.version, + }); + } + } + + Ok(result) +} + +/// Delete a file by ID, scoped to owner. +pub async fn delete_file_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + DELETE FROM files + WHERE id = $1 AND owner_id = $2 + "#, + ) + .bind(id) + .bind(owner_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +// ============================================================================= // Version History Functions // ============================================================================= @@ -363,3 +527,1186 @@ pub async fn count_file_versions(pool: &PgPool, file_id: Uuid) -> Result<i64, sq Ok(result.0) } + +// ============================================================================= +// Task Functions +// ============================================================================= + +/// Create a new task. +/// +/// If creating a subtask (parent_task_id is set) and repository settings are not provided, +/// the subtask will inherit repository_url, base_branch, target_branch, merge_mode, +/// and target_repo_path from the parent task. Depth is calculated from parent and limited +/// to max 1 (2 levels: orchestrator at depth 0, subtasks at depth 1). +/// +/// NOTE: completion_action is NOT inherited - subtasks should not auto-merge unless +/// explicitly configured. The orchestrator controls when completion steps happen. +pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, sqlx::Error> { + // Calculate depth and inherit settings from parent if applicable + let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = + if let Some(parent_id) = req.parent_task_id { + // Fetch parent task to get depth and inherit repo settings + let parent = get_task(pool, parent_id).await? + .ok_or_else(|| sqlx::Error::RowNotFound)?; + + let new_depth = parent.depth + 1; + + // Validate max depth (must be < 2, i.e., 0 or 1 only) + // Orchestrators are at depth 0, subtasks at depth 1 + // Subtasks cannot have their own children + if new_depth >= 2 { + return Err(sqlx::Error::Protocol(format!( + "Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.", + new_depth + ))); + } + + // Inherit repo settings if not provided + let repo_url = req.repository_url.clone().or(parent.repository_url); + let base_branch = req.base_branch.clone().or(parent.base_branch); + let target_branch = req.target_branch.clone().or(parent.target_branch); + let merge_mode = req.merge_mode.clone().or(parent.merge_mode); + let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path); + // NOTE: completion_action is NOT inherited - subtasks should not auto-merge. + // The orchestrator integrates subtask work from their worktrees. + let completion_action = req.completion_action.clone(); + + (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) + } else { + // Top-level task: depth 0 + ( + 0, + req.repository_url.clone(), + req.base_branch.clone(), + req.target_branch.clone(), + req.merge_mode.clone(), + req.target_repo_path.clone(), + req.completion_action.clone(), + ) + }; + + let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default()); + + sqlx::query_as::<_, Task>( + r#" + INSERT INTO tasks ( + parent_task_id, depth, name, description, plan, priority, + repository_url, base_branch, target_branch, merge_mode, + target_repo_path, completion_action, continue_from_task_id, copy_files + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) + RETURNING * + "#, + ) + .bind(req.parent_task_id) + .bind(depth) + .bind(&req.name) + .bind(&req.description) + .bind(&req.plan) + .bind(req.priority) + .bind(&repo_url) + .bind(&base_branch) + .bind(&target_branch) + .bind(&merge_mode) + .bind(&target_repo_path) + .bind(&completion_action) + .bind(&req.continue_from_task_id) + .bind(©_files_json) + .fetch_one(pool) + .await +} + +/// Get a task by ID. +pub async fn get_task(pool: &PgPool, id: Uuid) -> Result<Option<Task>, sqlx::Error> { + sqlx::query_as::<_, Task>( + r#" + SELECT * + FROM tasks + WHERE id = $1 + "#, + ) + .bind(id) + .fetch_optional(pool) + .await +} + +/// List all top-level tasks (no parent), ordered by created_at DESC. +pub async fn list_tasks(pool: &PgPool) -> Result<Vec<TaskSummary>, sqlx::Error> { + sqlx::query_as::<_, TaskSummary>( + r#" + SELECT + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, t.version, t.created_at, t.updated_at, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count + FROM tasks t + WHERE t.parent_task_id IS NULL + ORDER BY t.priority DESC, t.created_at DESC + "#, + ) + .fetch_all(pool) + .await +} + +/// List subtasks of a parent task. +pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSummary>, sqlx::Error> { + sqlx::query_as::<_, TaskSummary>( + r#" + SELECT + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, t.version, t.created_at, t.updated_at, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count + FROM tasks t + WHERE t.parent_task_id = $1 + ORDER BY t.priority DESC, t.created_at DESC + "#, + ) + .bind(parent_id) + .fetch_all(pool) + .await +} + +/// Update a task by ID with optimistic locking. +pub async fn update_task( + pool: &PgPool, + id: Uuid, + req: UpdateTaskRequest, +) -> Result<Option<Task>, RepositoryError> { + // Get the existing task first + let existing = get_task(pool, id).await?; + let Some(existing) = existing else { + return Ok(None); + }; + + // Check version if provided (optimistic locking) + if let Some(expected_version) = req.version { + if existing.version != expected_version { + return Err(RepositoryError::VersionConflict { + expected: expected_version, + actual: existing.version, + }); + } + } + + // Apply updates + let name = req.name.unwrap_or(existing.name); + let description = req.description.or(existing.description); + let plan = req.plan.unwrap_or(existing.plan); + let status = req.status.unwrap_or(existing.status); + let priority = req.priority.unwrap_or(existing.priority); + let progress_summary = req.progress_summary.or(existing.progress_summary); + let last_output = req.last_output.or(existing.last_output); + let error_message = req.error_message.or(existing.error_message); + let merge_mode = req.merge_mode.or(existing.merge_mode); + let pr_url = req.pr_url.or(existing.pr_url); + let target_repo_path = req.target_repo_path.or(existing.target_repo_path); + let completion_action = req.completion_action.or(existing.completion_action); + // Handle clear_daemon_id: if true, set to NULL; otherwise use provided value or keep existing + let daemon_id = if req.clear_daemon_id { + None + } else { + req.daemon_id.or(existing.daemon_id) + }; + + // Update with version check in WHERE clause for race condition safety + let result = if req.version.is_some() { + sqlx::query_as::<_, Task>( + r#" + UPDATE tasks + SET name = $2, description = $3, plan = $4, status = $5, priority = $6, + progress_summary = $7, last_output = $8, error_message = $9, + merge_mode = $10, pr_url = $11, daemon_id = $12, + target_repo_path = $13, completion_action = $14, updated_at = NOW() + WHERE id = $1 AND version = $15 + RETURNING * + "#, + ) + .bind(id) + .bind(&name) + .bind(&description) + .bind(&plan) + .bind(&status) + .bind(priority) + .bind(&progress_summary) + .bind(&last_output) + .bind(&error_message) + .bind(&merge_mode) + .bind(&pr_url) + .bind(daemon_id) + .bind(&target_repo_path) + .bind(&completion_action) + .bind(req.version.unwrap()) + .fetch_optional(pool) + .await? + } else { + sqlx::query_as::<_, Task>( + r#" + UPDATE tasks + SET name = $2, description = $3, plan = $4, status = $5, priority = $6, + progress_summary = $7, last_output = $8, error_message = $9, + merge_mode = $10, pr_url = $11, daemon_id = $12, + target_repo_path = $13, completion_action = $14, updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(id) + .bind(&name) + .bind(&description) + .bind(&plan) + .bind(&status) + .bind(priority) + .bind(&progress_summary) + .bind(&last_output) + .bind(&error_message) + .bind(&merge_mode) + .bind(&pr_url) + .bind(daemon_id) + .bind(&target_repo_path) + .bind(&completion_action) + .fetch_optional(pool) + .await? + }; + + // If versioned update returned None, there was a race condition + if result.is_none() && req.version.is_some() { + if let Some(current) = get_task(pool, id).await? { + return Err(RepositoryError::VersionConflict { + expected: req.version.unwrap(), + actual: current.version, + }); + } + } + + Ok(result) +} + +/// Delete a task by ID. +pub async fn delete_task(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + DELETE FROM tasks + WHERE id = $1 + "#, + ) + .bind(id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Count total tasks. +pub async fn count_tasks(pool: &PgPool) -> Result<i64, sqlx::Error> { + let result: (i64,) = sqlx::query_as( + "SELECT COUNT(*) FROM tasks WHERE parent_task_id IS NULL", + ) + .fetch_one(pool) + .await?; + + Ok(result.0) +} + +// ============================================================================= +// Owner-Scoped Task Functions +// ============================================================================= + +/// Create a new task for a specific owner. +pub async fn create_task_for_owner( + pool: &PgPool, + owner_id: Uuid, + req: CreateTaskRequest, +) -> Result<Task, sqlx::Error> { + // Calculate depth and inherit settings from parent if applicable + let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = + if let Some(parent_id) = req.parent_task_id { + // Fetch parent task to get depth and inherit repo settings (must belong to same owner) + let parent = get_task_for_owner(pool, parent_id, owner_id).await? + .ok_or_else(|| sqlx::Error::RowNotFound)?; + + let new_depth = parent.depth + 1; + + // Validate max depth + if new_depth >= 2 { + return Err(sqlx::Error::Protocol(format!( + "Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.", + new_depth + ))); + } + + // Inherit repo settings if not provided + let repo_url = req.repository_url.clone().or(parent.repository_url); + let base_branch = req.base_branch.clone().or(parent.base_branch); + let target_branch = req.target_branch.clone().or(parent.target_branch); + let merge_mode = req.merge_mode.clone().or(parent.merge_mode); + let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path); + // NOTE: completion_action is NOT inherited - subtasks should not auto-merge. + // The orchestrator integrates subtask work from their worktrees. + let completion_action = req.completion_action.clone(); + + (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) + } else { + // Top-level task: depth 0 + ( + 0, + req.repository_url.clone(), + req.base_branch.clone(), + req.target_branch.clone(), + req.merge_mode.clone(), + req.target_repo_path.clone(), + req.completion_action.clone(), + ) + }; + + let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default()); + + sqlx::query_as::<_, Task>( + r#" + INSERT INTO tasks ( + owner_id, parent_task_id, depth, name, description, plan, priority, + repository_url, base_branch, target_branch, merge_mode, + target_repo_path, completion_action, continue_from_task_id, copy_files + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15) + RETURNING * + "#, + ) + .bind(owner_id) + .bind(req.parent_task_id) + .bind(depth) + .bind(&req.name) + .bind(&req.description) + .bind(&req.plan) + .bind(req.priority) + .bind(&repo_url) + .bind(&base_branch) + .bind(&target_branch) + .bind(&merge_mode) + .bind(&target_repo_path) + .bind(&completion_action) + .bind(&req.continue_from_task_id) + .bind(©_files_json) + .fetch_one(pool) + .await +} + +/// Get a task by ID, scoped to owner. +pub async fn get_task_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, +) -> Result<Option<Task>, sqlx::Error> { + sqlx::query_as::<_, Task>( + r#" + SELECT * + FROM tasks + WHERE id = $1 AND owner_id = $2 + "#, + ) + .bind(id) + .bind(owner_id) + .fetch_optional(pool) + .await +} + +/// List all top-level tasks (no parent) for an owner, ordered by created_at DESC. +pub async fn list_tasks_for_owner( + pool: &PgPool, + owner_id: Uuid, +) -> Result<Vec<TaskSummary>, sqlx::Error> { + sqlx::query_as::<_, TaskSummary>( + r#" + SELECT + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, t.version, t.created_at, t.updated_at, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count + FROM tasks t + WHERE t.owner_id = $1 AND t.parent_task_id IS NULL + ORDER BY t.priority DESC, t.created_at DESC + "#, + ) + .bind(owner_id) + .fetch_all(pool) + .await +} + +/// List subtasks of a parent task, scoped to owner. +pub async fn list_subtasks_for_owner( + pool: &PgPool, + parent_id: Uuid, + owner_id: Uuid, +) -> Result<Vec<TaskSummary>, sqlx::Error> { + sqlx::query_as::<_, TaskSummary>( + r#" + SELECT + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, t.version, t.created_at, t.updated_at, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count + FROM tasks t + WHERE t.owner_id = $1 AND t.parent_task_id = $2 + ORDER BY t.priority DESC, t.created_at DESC + "#, + ) + .bind(owner_id) + .bind(parent_id) + .fetch_all(pool) + .await +} + +/// Update a task by ID with optimistic locking, scoped to owner. +pub async fn update_task_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, + req: UpdateTaskRequest, +) -> Result<Option<Task>, RepositoryError> { + // Get the existing task first (scoped to owner) + let existing = get_task_for_owner(pool, id, owner_id).await?; + let Some(existing) = existing else { + return Ok(None); + }; + + // Check version if provided (optimistic locking) + if let Some(expected_version) = req.version { + if existing.version != expected_version { + return Err(RepositoryError::VersionConflict { + expected: expected_version, + actual: existing.version, + }); + } + } + + // Apply updates + let name = req.name.unwrap_or(existing.name); + let description = req.description.or(existing.description); + let plan = req.plan.unwrap_or(existing.plan); + let status = req.status.unwrap_or(existing.status); + let priority = req.priority.unwrap_or(existing.priority); + let progress_summary = req.progress_summary.or(existing.progress_summary); + let last_output = req.last_output.or(existing.last_output); + let error_message = req.error_message.or(existing.error_message); + let merge_mode = req.merge_mode.or(existing.merge_mode); + let pr_url = req.pr_url.or(existing.pr_url); + let target_repo_path = req.target_repo_path.or(existing.target_repo_path); + let completion_action = req.completion_action.or(existing.completion_action); + let daemon_id = if req.clear_daemon_id { + None + } else { + req.daemon_id.or(existing.daemon_id) + }; + + // Update with version check in WHERE clause for race condition safety + let result = if req.version.is_some() { + sqlx::query_as::<_, Task>( + r#" + UPDATE tasks + SET name = $3, description = $4, plan = $5, status = $6, priority = $7, + progress_summary = $8, last_output = $9, error_message = $10, + merge_mode = $11, pr_url = $12, daemon_id = $13, + target_repo_path = $14, completion_action = $15, updated_at = NOW() + WHERE id = $1 AND owner_id = $2 AND version = $16 + RETURNING * + "#, + ) + .bind(id) + .bind(owner_id) + .bind(&name) + .bind(&description) + .bind(&plan) + .bind(&status) + .bind(priority) + .bind(&progress_summary) + .bind(&last_output) + .bind(&error_message) + .bind(&merge_mode) + .bind(&pr_url) + .bind(daemon_id) + .bind(&target_repo_path) + .bind(&completion_action) + .bind(req.version.unwrap()) + .fetch_optional(pool) + .await? + } else { + sqlx::query_as::<_, Task>( + r#" + UPDATE tasks + SET name = $3, description = $4, plan = $5, status = $6, priority = $7, + progress_summary = $8, last_output = $9, error_message = $10, + merge_mode = $11, pr_url = $12, daemon_id = $13, + target_repo_path = $14, completion_action = $15, updated_at = NOW() + WHERE id = $1 AND owner_id = $2 + RETURNING * + "#, + ) + .bind(id) + .bind(owner_id) + .bind(&name) + .bind(&description) + .bind(&plan) + .bind(&status) + .bind(priority) + .bind(&progress_summary) + .bind(&last_output) + .bind(&error_message) + .bind(&merge_mode) + .bind(&pr_url) + .bind(daemon_id) + .bind(&target_repo_path) + .bind(&completion_action) + .fetch_optional(pool) + .await? + }; + + // If versioned update returned None, there was a race condition + if result.is_none() && req.version.is_some() { + if let Some(current) = get_task_for_owner(pool, id, owner_id).await? { + return Err(RepositoryError::VersionConflict { + expected: req.version.unwrap(), + actual: current.version, + }); + } + } + + Ok(result) +} + +/// Delete a task by ID, scoped to owner. +pub async fn delete_task_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + DELETE FROM tasks + WHERE id = $1 AND owner_id = $2 + "#, + ) + .bind(id) + .bind(owner_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Update task status and record event. +pub async fn update_task_status( + pool: &PgPool, + id: Uuid, + new_status: &str, + event_data: Option<serde_json::Value>, +) -> Result<Option<Task>, sqlx::Error> { + // Get existing status + let existing = get_task(pool, id).await?; + let Some(existing) = existing else { + return Ok(None); + }; + + let previous_status = existing.status.clone(); + + // Update task status + let task = sqlx::query_as::<_, Task>( + r#" + UPDATE tasks + SET status = $2, updated_at = NOW(), + started_at = CASE WHEN $2 = 'running' AND started_at IS NULL THEN NOW() ELSE started_at END, + completed_at = CASE WHEN $2 IN ('done', 'failed', 'merged') THEN NOW() ELSE completed_at END + WHERE id = $1 + RETURNING * + "#, + ) + .bind(id) + .bind(new_status) + .fetch_optional(pool) + .await?; + + // Record event + if task.is_some() { + let _ = create_task_event( + pool, + id, + "status_change", + Some(&previous_status), + Some(new_status), + event_data, + ) + .await; + } + + Ok(task) +} + +// ============================================================================= +// Task Event Functions +// ============================================================================= + +/// Create a task event. +pub async fn create_task_event( + pool: &PgPool, + task_id: Uuid, + event_type: &str, + previous_status: Option<&str>, + new_status: Option<&str>, + event_data: Option<serde_json::Value>, +) -> Result<TaskEvent, sqlx::Error> { + sqlx::query_as::<_, TaskEvent>( + r#" + INSERT INTO task_events (task_id, event_type, previous_status, new_status, event_data) + VALUES ($1, $2, $3, $4, $5) + RETURNING * + "#, + ) + .bind(task_id) + .bind(event_type) + .bind(previous_status) + .bind(new_status) + .bind(event_data) + .fetch_one(pool) + .await +} + +/// List events for a task. +pub async fn list_task_events( + pool: &PgPool, + task_id: Uuid, + limit: Option<i64>, +) -> Result<Vec<TaskEvent>, sqlx::Error> { + let limit = limit.unwrap_or(100); + sqlx::query_as::<_, TaskEvent>( + r#" + SELECT * + FROM task_events + WHERE task_id = $1 + ORDER BY created_at DESC + LIMIT $2 + "#, + ) + .bind(task_id) + .bind(limit) + .fetch_all(pool) + .await +} + +// ============================================================================= +// Daemon Functions +// ============================================================================= + +/// Register a new daemon connection. +pub async fn register_daemon( + pool: &PgPool, + owner_id: Uuid, + connection_id: &str, + hostname: Option<&str>, + machine_id: Option<&str>, + max_concurrent_tasks: i32, +) -> Result<Daemon, sqlx::Error> { + sqlx::query_as::<_, Daemon>( + r#" + INSERT INTO daemons (owner_id, connection_id, hostname, machine_id, max_concurrent_tasks) + VALUES ($1, $2, $3, $4, $5) + RETURNING * + "#, + ) + .bind(owner_id) + .bind(connection_id) + .bind(hostname) + .bind(machine_id) + .bind(max_concurrent_tasks) + .fetch_one(pool) + .await +} + +/// Get a daemon by ID. +pub async fn get_daemon(pool: &PgPool, id: Uuid) -> Result<Option<Daemon>, sqlx::Error> { + sqlx::query_as::<_, Daemon>( + r#" + SELECT * + FROM daemons + WHERE id = $1 + "#, + ) + .bind(id) + .fetch_optional(pool) + .await +} + +/// Get a daemon by connection ID. +pub async fn get_daemon_by_connection( + pool: &PgPool, + connection_id: &str, +) -> Result<Option<Daemon>, sqlx::Error> { + sqlx::query_as::<_, Daemon>( + r#" + SELECT * + FROM daemons + WHERE connection_id = $1 + "#, + ) + .bind(connection_id) + .fetch_optional(pool) + .await +} + +/// List all daemons. +pub async fn list_daemons(pool: &PgPool) -> Result<Vec<Daemon>, sqlx::Error> { + sqlx::query_as::<_, Daemon>( + r#" + SELECT * + FROM daemons + ORDER BY connected_at DESC + "#, + ) + .fetch_all(pool) + .await +} + +/// List daemons for a specific owner. +pub async fn list_daemons_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<Daemon>, sqlx::Error> { + sqlx::query_as::<_, Daemon>( + r#" + SELECT * + FROM daemons + WHERE owner_id = $1 + ORDER BY connected_at DESC + "#, + ) + .bind(owner_id) + .fetch_all(pool) + .await +} + +/// Get a daemon by ID for a specific owner. +pub async fn get_daemon_for_owner(pool: &PgPool, id: Uuid, owner_id: Uuid) -> Result<Option<Daemon>, sqlx::Error> { + sqlx::query_as::<_, Daemon>( + r#" + SELECT * + FROM daemons + WHERE id = $1 AND owner_id = $2 + "#, + ) + .bind(id) + .bind(owner_id) + .fetch_optional(pool) + .await +} + +/// Update daemon heartbeat. +pub async fn update_daemon_heartbeat(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + UPDATE daemons + SET last_heartbeat_at = NOW(), status = 'connected' + WHERE id = $1 + "#, + ) + .bind(id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Update daemon status. +pub async fn update_daemon_status( + pool: &PgPool, + id: Uuid, + status: &str, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + UPDATE daemons + SET status = $2, + disconnected_at = CASE WHEN $2 = 'disconnected' THEN NOW() ELSE disconnected_at END + WHERE id = $1 + "#, + ) + .bind(id) + .bind(status) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Update daemon task count. +pub async fn update_daemon_task_count( + pool: &PgPool, + id: Uuid, + delta: i32, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + UPDATE daemons + SET current_task_count = GREATEST(0, current_task_count + $2) + WHERE id = $1 + "#, + ) + .bind(id) + .bind(delta) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Delete a daemon by ID. +pub async fn delete_daemon(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + DELETE FROM daemons + WHERE id = $1 + "#, + ) + .bind(id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Delete a daemon by connection ID. +pub async fn delete_daemon_by_connection( + pool: &PgPool, + connection_id: &str, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + DELETE FROM daemons + WHERE connection_id = $1 + "#, + ) + .bind(connection_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Count connected daemons. +pub async fn count_daemons(pool: &PgPool) -> Result<i64, sqlx::Error> { + let result: (i64,) = sqlx::query_as( + "SELECT COUNT(*) FROM daemons WHERE status = 'connected'", + ) + .fetch_one(pool) + .await?; + + Ok(result.0) +} + +// ============================================================================= +// Sibling Awareness Functions +// ============================================================================= + +/// List sibling tasks (tasks with the same parent, excluding the given task). +pub async fn list_sibling_tasks( + pool: &PgPool, + task_id: Uuid, + parent_id: Option<Uuid>, +) -> Result<Vec<TaskSummary>, sqlx::Error> { + match parent_id { + Some(parent) => { + sqlx::query_as::<_, TaskSummary>( + r#" + SELECT + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, t.version, t.created_at, t.updated_at, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count + FROM tasks t + WHERE t.parent_task_id = $1 AND t.id != $2 + ORDER BY t.priority DESC, t.created_at DESC + "#, + ) + .bind(parent) + .bind(task_id) + .fetch_all(pool) + .await + } + None => { + // Top-level tasks (no parent) - siblings are other top-level tasks + sqlx::query_as::<_, TaskSummary>( + r#" + SELECT + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, t.version, t.created_at, t.updated_at, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count + FROM tasks t + WHERE t.parent_task_id IS NULL AND t.id != $1 + ORDER BY t.priority DESC, t.created_at DESC + "#, + ) + .bind(task_id) + .fetch_all(pool) + .await + } + } +} + +/// Get running sibling tasks (for context injection). +pub async fn get_running_siblings( + pool: &PgPool, + owner_id: Uuid, + task_id: Uuid, + parent_id: Option<Uuid>, +) -> Result<Vec<Task>, sqlx::Error> { + match parent_id { + Some(parent) => { + sqlx::query_as::<_, Task>( + r#" + SELECT * + FROM tasks t + WHERE t.owner_id = $1 + AND t.parent_task_id = $2 + AND t.id != $3 + AND t.status = 'running' + ORDER BY t.priority DESC + "#, + ) + .bind(owner_id) + .bind(parent) + .bind(task_id) + .fetch_all(pool) + .await + } + None => { + sqlx::query_as::<_, Task>( + r#" + SELECT * + FROM tasks t + WHERE t.owner_id = $1 + AND t.parent_task_id IS NULL + AND t.id != $2 + AND t.status = 'running' + ORDER BY t.priority DESC + "#, + ) + .bind(owner_id) + .bind(task_id) + .fetch_all(pool) + .await + } + } +} + +/// Get task with its siblings for context awareness. +pub async fn get_task_with_siblings( + pool: &PgPool, + id: Uuid, +) -> Result<Option<(Task, Vec<TaskSummary>)>, sqlx::Error> { + let task = get_task(pool, id).await?; + let Some(task) = task else { + return Ok(None); + }; + + let siblings = list_sibling_tasks(pool, id, task.parent_task_id).await?; + Ok(Some((task, siblings))) +} + +// ============================================================================= +// Task Output Persistence Functions +// ============================================================================= + +/// Save task output to the database. +/// This stores output in the task_events table with event_type='output'. +pub async fn save_task_output( + pool: &PgPool, + task_id: Uuid, + message_type: &str, + content: &str, + tool_name: Option<&str>, + tool_input: Option<serde_json::Value>, + is_error: Option<bool>, + cost_usd: Option<f64>, + duration_ms: Option<u64>, +) -> Result<TaskEvent, sqlx::Error> { + let event_data = serde_json::json!({ + "messageType": message_type, + "content": content, + "toolName": tool_name, + "toolInput": tool_input, + "isError": is_error, + "costUsd": cost_usd, + "durationMs": duration_ms, + }); + + create_task_event(pool, task_id, "output", None, None, Some(event_data)).await +} + +/// Get task output from the database. +/// Retrieves all output events for a task, ordered by creation time. +pub async fn get_task_output( + pool: &PgPool, + task_id: Uuid, + limit: Option<i64>, +) -> Result<Vec<TaskEvent>, sqlx::Error> { + let limit = limit.unwrap_or(1000); + sqlx::query_as::<_, TaskEvent>( + r#" + SELECT * + FROM task_events + WHERE task_id = $1 AND event_type = 'output' + ORDER BY created_at ASC + LIMIT $2 + "#, + ) + .bind(task_id) + .bind(limit) + .fetch_all(pool) + .await +} + +/// Update task completion status with error message. +/// Sets the task status to 'done' or 'failed' and records completion time. +pub async fn complete_task( + pool: &PgPool, + task_id: Uuid, + success: bool, + error_message: Option<&str>, +) -> Result<Option<Task>, sqlx::Error> { + let status = if success { "done" } else { "failed" }; + + let task = sqlx::query_as::<_, Task>( + r#" + UPDATE tasks + SET status = $2, + error_message = COALESCE($3, error_message), + completed_at = NOW(), + updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(task_id) + .bind(status) + .bind(error_message) + .fetch_optional(pool) + .await?; + + // Record completion event + if task.is_some() { + let event_data = serde_json::json!({ + "success": success, + "errorMessage": error_message, + }); + let _ = create_task_event( + pool, + task_id, + "complete", + Some("running"), + Some(status), + Some(event_data), + ) + .await; + } + + Ok(task) +} + +// ============================================================================= +// Mesh Chat History Functions +// ============================================================================= + +/// Get or create the active conversation for an owner. +pub async fn get_or_create_active_conversation( + pool: &PgPool, + owner_id: Uuid, +) -> Result<MeshChatConversation, sqlx::Error> { + // Try to get existing active conversation for this owner + let existing = sqlx::query_as::<_, MeshChatConversation>( + r#" + SELECT * + FROM mesh_chat_conversations + WHERE is_active = true AND owner_id = $1 + LIMIT 1 + "#, + ) + .bind(owner_id) + .fetch_optional(pool) + .await?; + + if let Some(conv) = existing { + return Ok(conv); + } + + // Create new conversation + sqlx::query_as::<_, MeshChatConversation>( + r#" + INSERT INTO mesh_chat_conversations (owner_id, is_active) + VALUES ($1, true) + RETURNING * + "#, + ) + .bind(owner_id) + .fetch_one(pool) + .await +} + +/// List messages for a conversation. +pub async fn list_chat_messages( + pool: &PgPool, + conversation_id: Uuid, + limit: Option<i32>, +) -> Result<Vec<MeshChatMessageRecord>, sqlx::Error> { + let limit = limit.unwrap_or(100); + sqlx::query_as::<_, MeshChatMessageRecord>( + r#" + SELECT * + FROM mesh_chat_messages + WHERE conversation_id = $1 + ORDER BY created_at ASC + LIMIT $2 + "#, + ) + .bind(conversation_id) + .bind(limit) + .fetch_all(pool) + .await +} + +/// Add a message to a conversation. +#[allow(clippy::too_many_arguments)] +pub async fn add_chat_message( + pool: &PgPool, + conversation_id: Uuid, + role: &str, + content: &str, + context_type: &str, + context_task_id: Option<Uuid>, + tool_calls: Option<serde_json::Value>, + pending_questions: Option<serde_json::Value>, +) -> Result<MeshChatMessageRecord, sqlx::Error> { + sqlx::query_as::<_, MeshChatMessageRecord>( + r#" + INSERT INTO mesh_chat_messages + (conversation_id, role, content, context_type, context_task_id, tool_calls, pending_questions) + VALUES ($1, $2, $3, $4, $5, $6, $7) + RETURNING * + "#, + ) + .bind(conversation_id) + .bind(role) + .bind(content) + .bind(context_type) + .bind(context_task_id) + .bind(tool_calls) + .bind(pending_questions) + .fetch_one(pool) + .await +} + +/// Clear conversation (archive existing and create new). +pub async fn clear_conversation(pool: &PgPool, owner_id: Uuid) -> Result<MeshChatConversation, sqlx::Error> { + // Mark existing as inactive for this owner + sqlx::query( + r#" + UPDATE mesh_chat_conversations + SET is_active = false, updated_at = NOW() + WHERE is_active = true AND owner_id = $1 + "#, + ) + .bind(owner_id) + .execute(pool) + .await?; + + // Create new active conversation + get_or_create_active_conversation(pool, owner_id).await +} |
