summaryrefslogtreecommitdiff
path: root/makima/src/db/repository.rs
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/db/repository.rs')
-rw-r--r--makima/src/db/repository.rs1393
1 files changed, 1370 insertions, 23 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index 4137ba6..ce1e97d 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -4,10 +4,10 @@ use chrono::Utc;
use sqlx::PgPool;
use uuid::Uuid;
-use super::models::{CreateFileRequest, File, FileVersion, UpdateFileRequest};
-
-/// Default owner ID for anonymous users.
-pub const ANONYMOUS_OWNER_ID: Uuid = Uuid::from_u128(0x00000000_0000_0000_0000_000000000002);
+use super::models::{
+ CreateFileRequest, CreateTaskRequest, Daemon, File, FileVersion, MeshChatConversation,
+ MeshChatMessageRecord, Task, TaskEvent, TaskSummary, UpdateFileRequest, UpdateTaskRequest,
+};
/// Repository error types.
#[derive(Debug)]
@@ -60,12 +60,11 @@ pub async fn create_file(pool: &PgPool, req: CreateFileRequest) -> Result<File,
sqlx::query_as::<_, File>(
r#"
- INSERT INTO files (owner_id, name, description, transcript, location, summary, body)
- VALUES ($1, $2, $3, $4, $5, NULL, $6)
+ INSERT INTO files (name, description, transcript, location, summary, body)
+ VALUES ($1, $2, $3, $4, NULL, $5)
RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
"#,
)
- .bind(ANONYMOUS_OWNER_ID)
.bind(&name)
.bind(&req.description)
.bind(&transcript_json)
@@ -81,26 +80,23 @@ pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Err
r#"
SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
FROM files
- WHERE id = $1 AND owner_id = $2
+ WHERE id = $1
"#,
)
.bind(id)
- .bind(ANONYMOUS_OWNER_ID)
.fetch_optional(pool)
.await
}
-/// List all files for the owner, ordered by created_at DESC.
+/// List all files, ordered by created_at DESC.
pub async fn list_files(pool: &PgPool) -> Result<Vec<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
FROM files
- WHERE owner_id = $1
ORDER BY created_at DESC
"#,
)
- .bind(ANONYMOUS_OWNER_ID)
.fetch_all(pool)
.await
}
@@ -146,13 +142,12 @@ pub async fn update_file(
sqlx::query_as::<_, File>(
r#"
UPDATE files
- SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
- WHERE id = $1 AND owner_id = $2 AND version = $8
+ SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW()
+ WHERE id = $1 AND version = $7
RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
"#,
)
.bind(id)
- .bind(ANONYMOUS_OWNER_ID)
.bind(&name)
.bind(&description)
.bind(&transcript_json)
@@ -166,13 +161,12 @@ pub async fn update_file(
sqlx::query_as::<_, File>(
r#"
UPDATE files
- SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
- WHERE id = $1 AND owner_id = $2
+ SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW()
+ WHERE id = $1
RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
"#,
)
.bind(id)
- .bind(ANONYMOUS_OWNER_ID)
.bind(&name)
.bind(&description)
.bind(&transcript_json)
@@ -201,21 +195,19 @@ pub async fn delete_file(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM files
- WHERE id = $1 AND owner_id = $2
+ WHERE id = $1
"#,
)
.bind(id)
- .bind(ANONYMOUS_OWNER_ID)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
-/// Count total files for owner.
+/// Count total files.
pub async fn count_files(pool: &PgPool) -> Result<i64, sqlx::Error> {
- let result: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM files WHERE owner_id = $1")
- .bind(ANONYMOUS_OWNER_ID)
+ let result: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM files")
.fetch_one(pool)
.await?;
@@ -223,6 +215,178 @@ pub async fn count_files(pool: &PgPool) -> Result<i64, sqlx::Error> {
}
// =============================================================================
+// Owner-Scoped File Functions
+// =============================================================================
+
+/// Create a new file record for a specific owner.
+pub async fn create_file_for_owner(
+ pool: &PgPool,
+ owner_id: Uuid,
+ req: CreateFileRequest,
+) -> Result<File, sqlx::Error> {
+ let name = req.name.unwrap_or_else(generate_default_name);
+ let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default();
+ let body_json = serde_json::to_value::<Vec<super::models::BodyElement>>(vec![]).unwrap();
+
+ sqlx::query_as::<_, File>(
+ r#"
+ INSERT INTO files (owner_id, name, description, transcript, location, summary, body)
+ VALUES ($1, $2, $3, $4, $5, NULL, $6)
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
+ "#,
+ )
+ .bind(owner_id)
+ .bind(&name)
+ .bind(&req.description)
+ .bind(&transcript_json)
+ .bind(&req.location)
+ .bind(&body_json)
+ .fetch_one(pool)
+ .await
+}
+
+/// Get a file by ID, scoped to owner.
+pub async fn get_file_for_owner(
+ pool: &PgPool,
+ id: Uuid,
+ owner_id: Uuid,
+) -> Result<Option<File>, sqlx::Error> {
+ sqlx::query_as::<_, File>(
+ r#"
+ SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
+ FROM files
+ WHERE id = $1 AND owner_id = $2
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// List all files for an owner, ordered by created_at DESC.
+pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<File>, sqlx::Error> {
+ sqlx::query_as::<_, File>(
+ r#"
+ SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
+ FROM files
+ WHERE owner_id = $1
+ ORDER BY created_at DESC
+ "#,
+ )
+ .bind(owner_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Update a file by ID with optimistic locking, scoped to owner.
+pub async fn update_file_for_owner(
+ pool: &PgPool,
+ id: Uuid,
+ owner_id: Uuid,
+ req: UpdateFileRequest,
+) -> Result<Option<File>, RepositoryError> {
+ // Get the existing file first (scoped to owner)
+ let existing = get_file_for_owner(pool, id, owner_id).await?;
+ let Some(existing) = existing else {
+ return Ok(None);
+ };
+
+ // Check version if provided (optimistic locking)
+ if let Some(expected_version) = req.version {
+ if existing.version != expected_version {
+ return Err(RepositoryError::VersionConflict {
+ expected: expected_version,
+ actual: existing.version,
+ });
+ }
+ }
+
+ // Apply updates
+ let name = req.name.unwrap_or(existing.name);
+ let description = req.description.or(existing.description);
+ let transcript = req.transcript.unwrap_or(existing.transcript);
+ let transcript_json = serde_json::to_value(&transcript).unwrap_or_default();
+ let summary = req.summary.or(existing.summary);
+ let body = req.body.unwrap_or(existing.body);
+ let body_json = serde_json::to_value(&body).unwrap_or_default();
+
+ // Update with version check in WHERE clause for race condition safety
+ let result = if req.version.is_some() {
+ sqlx::query_as::<_, File>(
+ r#"
+ UPDATE files
+ SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
+ WHERE id = $1 AND owner_id = $2 AND version = $8
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .bind(&name)
+ .bind(&description)
+ .bind(&transcript_json)
+ .bind(&summary)
+ .bind(&body_json)
+ .bind(req.version.unwrap())
+ .fetch_optional(pool)
+ .await?
+ } else {
+ // No version check for internal updates
+ sqlx::query_as::<_, File>(
+ r#"
+ UPDATE files
+ SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
+ WHERE id = $1 AND owner_id = $2
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .bind(&name)
+ .bind(&description)
+ .bind(&transcript_json)
+ .bind(&summary)
+ .bind(&body_json)
+ .fetch_optional(pool)
+ .await?
+ };
+
+ // If versioned update returned None, there was a race condition
+ if result.is_none() && req.version.is_some() {
+ // Re-fetch to get the actual version
+ if let Some(current) = get_file_for_owner(pool, id, owner_id).await? {
+ return Err(RepositoryError::VersionConflict {
+ expected: req.version.unwrap(),
+ actual: current.version,
+ });
+ }
+ }
+
+ Ok(result)
+}
+
+/// Delete a file by ID, scoped to owner.
+pub async fn delete_file_for_owner(
+ pool: &PgPool,
+ id: Uuid,
+ owner_id: Uuid,
+) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ DELETE FROM files
+ WHERE id = $1 AND owner_id = $2
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+// =============================================================================
// Version History Functions
// =============================================================================
@@ -363,3 +527,1186 @@ pub async fn count_file_versions(pool: &PgPool, file_id: Uuid) -> Result<i64, sq
Ok(result.0)
}
+
+// =============================================================================
+// Task Functions
+// =============================================================================
+
+/// Create a new task.
+///
+/// If creating a subtask (parent_task_id is set) and repository settings are not provided,
+/// the subtask will inherit repository_url, base_branch, target_branch, merge_mode,
+/// and target_repo_path from the parent task. Depth is calculated from parent and limited
+/// to max 1 (2 levels: orchestrator at depth 0, subtasks at depth 1).
+///
+/// NOTE: completion_action is NOT inherited - subtasks should not auto-merge unless
+/// explicitly configured. The orchestrator controls when completion steps happen.
+pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, sqlx::Error> {
+ // Calculate depth and inherit settings from parent if applicable
+ let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
+ if let Some(parent_id) = req.parent_task_id {
+ // Fetch parent task to get depth and inherit repo settings
+ let parent = get_task(pool, parent_id).await?
+ .ok_or_else(|| sqlx::Error::RowNotFound)?;
+
+ let new_depth = parent.depth + 1;
+
+ // Validate max depth (must be < 2, i.e., 0 or 1 only)
+ // Orchestrators are at depth 0, subtasks at depth 1
+ // Subtasks cannot have their own children
+ if new_depth >= 2 {
+ return Err(sqlx::Error::Protocol(format!(
+ "Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.",
+ new_depth
+ )));
+ }
+
+ // Inherit repo settings if not provided
+ let repo_url = req.repository_url.clone().or(parent.repository_url);
+ let base_branch = req.base_branch.clone().or(parent.base_branch);
+ let target_branch = req.target_branch.clone().or(parent.target_branch);
+ let merge_mode = req.merge_mode.clone().or(parent.merge_mode);
+ let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path);
+ // NOTE: completion_action is NOT inherited - subtasks should not auto-merge.
+ // The orchestrator integrates subtask work from their worktrees.
+ let completion_action = req.completion_action.clone();
+
+ (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
+ } else {
+ // Top-level task: depth 0
+ (
+ 0,
+ req.repository_url.clone(),
+ req.base_branch.clone(),
+ req.target_branch.clone(),
+ req.merge_mode.clone(),
+ req.target_repo_path.clone(),
+ req.completion_action.clone(),
+ )
+ };
+
+ let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default());
+
+ sqlx::query_as::<_, Task>(
+ r#"
+ INSERT INTO tasks (
+ parent_task_id, depth, name, description, plan, priority,
+ repository_url, base_branch, target_branch, merge_mode,
+ target_repo_path, completion_action, continue_from_task_id, copy_files
+ )
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
+ RETURNING *
+ "#,
+ )
+ .bind(req.parent_task_id)
+ .bind(depth)
+ .bind(&req.name)
+ .bind(&req.description)
+ .bind(&req.plan)
+ .bind(req.priority)
+ .bind(&repo_url)
+ .bind(&base_branch)
+ .bind(&target_branch)
+ .bind(&merge_mode)
+ .bind(&target_repo_path)
+ .bind(&completion_action)
+ .bind(&req.continue_from_task_id)
+ .bind(&copy_files_json)
+ .fetch_one(pool)
+ .await
+}
+
+/// Get a task by ID.
+pub async fn get_task(pool: &PgPool, id: Uuid) -> Result<Option<Task>, sqlx::Error> {
+ sqlx::query_as::<_, Task>(
+ r#"
+ SELECT *
+ FROM tasks
+ WHERE id = $1
+ "#,
+ )
+ .bind(id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// List all top-level tasks (no parent), ordered by created_at DESC.
+pub async fn list_tasks(pool: &PgPool) -> Result<Vec<TaskSummary>, sqlx::Error> {
+ sqlx::query_as::<_, TaskSummary>(
+ r#"
+ SELECT
+ t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority,
+ t.progress_summary, t.version, t.created_at, t.updated_at,
+ (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count
+ FROM tasks t
+ WHERE t.parent_task_id IS NULL
+ ORDER BY t.priority DESC, t.created_at DESC
+ "#,
+ )
+ .fetch_all(pool)
+ .await
+}
+
+/// List subtasks of a parent task.
+pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSummary>, sqlx::Error> {
+ sqlx::query_as::<_, TaskSummary>(
+ r#"
+ SELECT
+ t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority,
+ t.progress_summary, t.version, t.created_at, t.updated_at,
+ (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count
+ FROM tasks t
+ WHERE t.parent_task_id = $1
+ ORDER BY t.priority DESC, t.created_at DESC
+ "#,
+ )
+ .bind(parent_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Update a task by ID with optimistic locking.
+pub async fn update_task(
+ pool: &PgPool,
+ id: Uuid,
+ req: UpdateTaskRequest,
+) -> Result<Option<Task>, RepositoryError> {
+ // Get the existing task first
+ let existing = get_task(pool, id).await?;
+ let Some(existing) = existing else {
+ return Ok(None);
+ };
+
+ // Check version if provided (optimistic locking)
+ if let Some(expected_version) = req.version {
+ if existing.version != expected_version {
+ return Err(RepositoryError::VersionConflict {
+ expected: expected_version,
+ actual: existing.version,
+ });
+ }
+ }
+
+ // Apply updates
+ let name = req.name.unwrap_or(existing.name);
+ let description = req.description.or(existing.description);
+ let plan = req.plan.unwrap_or(existing.plan);
+ let status = req.status.unwrap_or(existing.status);
+ let priority = req.priority.unwrap_or(existing.priority);
+ let progress_summary = req.progress_summary.or(existing.progress_summary);
+ let last_output = req.last_output.or(existing.last_output);
+ let error_message = req.error_message.or(existing.error_message);
+ let merge_mode = req.merge_mode.or(existing.merge_mode);
+ let pr_url = req.pr_url.or(existing.pr_url);
+ let target_repo_path = req.target_repo_path.or(existing.target_repo_path);
+ let completion_action = req.completion_action.or(existing.completion_action);
+ // Handle clear_daemon_id: if true, set to NULL; otherwise use provided value or keep existing
+ let daemon_id = if req.clear_daemon_id {
+ None
+ } else {
+ req.daemon_id.or(existing.daemon_id)
+ };
+
+ // Update with version check in WHERE clause for race condition safety
+ let result = if req.version.is_some() {
+ sqlx::query_as::<_, Task>(
+ r#"
+ UPDATE tasks
+ SET name = $2, description = $3, plan = $4, status = $5, priority = $6,
+ progress_summary = $7, last_output = $8, error_message = $9,
+ merge_mode = $10, pr_url = $11, daemon_id = $12,
+ target_repo_path = $13, completion_action = $14, updated_at = NOW()
+ WHERE id = $1 AND version = $15
+ RETURNING *
+ "#,
+ )
+ .bind(id)
+ .bind(&name)
+ .bind(&description)
+ .bind(&plan)
+ .bind(&status)
+ .bind(priority)
+ .bind(&progress_summary)
+ .bind(&last_output)
+ .bind(&error_message)
+ .bind(&merge_mode)
+ .bind(&pr_url)
+ .bind(daemon_id)
+ .bind(&target_repo_path)
+ .bind(&completion_action)
+ .bind(req.version.unwrap())
+ .fetch_optional(pool)
+ .await?
+ } else {
+ sqlx::query_as::<_, Task>(
+ r#"
+ UPDATE tasks
+ SET name = $2, description = $3, plan = $4, status = $5, priority = $6,
+ progress_summary = $7, last_output = $8, error_message = $9,
+ merge_mode = $10, pr_url = $11, daemon_id = $12,
+ target_repo_path = $13, completion_action = $14, updated_at = NOW()
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(id)
+ .bind(&name)
+ .bind(&description)
+ .bind(&plan)
+ .bind(&status)
+ .bind(priority)
+ .bind(&progress_summary)
+ .bind(&last_output)
+ .bind(&error_message)
+ .bind(&merge_mode)
+ .bind(&pr_url)
+ .bind(daemon_id)
+ .bind(&target_repo_path)
+ .bind(&completion_action)
+ .fetch_optional(pool)
+ .await?
+ };
+
+ // If versioned update returned None, there was a race condition
+ if result.is_none() && req.version.is_some() {
+ if let Some(current) = get_task(pool, id).await? {
+ return Err(RepositoryError::VersionConflict {
+ expected: req.version.unwrap(),
+ actual: current.version,
+ });
+ }
+ }
+
+ Ok(result)
+}
+
+/// Delete a task by ID.
+pub async fn delete_task(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ DELETE FROM tasks
+ WHERE id = $1
+ "#,
+ )
+ .bind(id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// Count total tasks.
+pub async fn count_tasks(pool: &PgPool) -> Result<i64, sqlx::Error> {
+ let result: (i64,) = sqlx::query_as(
+ "SELECT COUNT(*) FROM tasks WHERE parent_task_id IS NULL",
+ )
+ .fetch_one(pool)
+ .await?;
+
+ Ok(result.0)
+}
+
+// =============================================================================
+// Owner-Scoped Task Functions
+// =============================================================================
+
+/// Create a new task for a specific owner.
+pub async fn create_task_for_owner(
+ pool: &PgPool,
+ owner_id: Uuid,
+ req: CreateTaskRequest,
+) -> Result<Task, sqlx::Error> {
+ // Calculate depth and inherit settings from parent if applicable
+ let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
+ if let Some(parent_id) = req.parent_task_id {
+ // Fetch parent task to get depth and inherit repo settings (must belong to same owner)
+ let parent = get_task_for_owner(pool, parent_id, owner_id).await?
+ .ok_or_else(|| sqlx::Error::RowNotFound)?;
+
+ let new_depth = parent.depth + 1;
+
+ // Validate max depth
+ if new_depth >= 2 {
+ return Err(sqlx::Error::Protocol(format!(
+ "Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.",
+ new_depth
+ )));
+ }
+
+ // Inherit repo settings if not provided
+ let repo_url = req.repository_url.clone().or(parent.repository_url);
+ let base_branch = req.base_branch.clone().or(parent.base_branch);
+ let target_branch = req.target_branch.clone().or(parent.target_branch);
+ let merge_mode = req.merge_mode.clone().or(parent.merge_mode);
+ let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path);
+ // NOTE: completion_action is NOT inherited - subtasks should not auto-merge.
+ // The orchestrator integrates subtask work from their worktrees.
+ let completion_action = req.completion_action.clone();
+
+ (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
+ } else {
+ // Top-level task: depth 0
+ (
+ 0,
+ req.repository_url.clone(),
+ req.base_branch.clone(),
+ req.target_branch.clone(),
+ req.merge_mode.clone(),
+ req.target_repo_path.clone(),
+ req.completion_action.clone(),
+ )
+ };
+
+ let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default());
+
+ sqlx::query_as::<_, Task>(
+ r#"
+ INSERT INTO tasks (
+ owner_id, parent_task_id, depth, name, description, plan, priority,
+ repository_url, base_branch, target_branch, merge_mode,
+ target_repo_path, completion_action, continue_from_task_id, copy_files
+ )
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
+ RETURNING *
+ "#,
+ )
+ .bind(owner_id)
+ .bind(req.parent_task_id)
+ .bind(depth)
+ .bind(&req.name)
+ .bind(&req.description)
+ .bind(&req.plan)
+ .bind(req.priority)
+ .bind(&repo_url)
+ .bind(&base_branch)
+ .bind(&target_branch)
+ .bind(&merge_mode)
+ .bind(&target_repo_path)
+ .bind(&completion_action)
+ .bind(&req.continue_from_task_id)
+ .bind(&copy_files_json)
+ .fetch_one(pool)
+ .await
+}
+
+/// Get a task by ID, scoped to owner.
+pub async fn get_task_for_owner(
+ pool: &PgPool,
+ id: Uuid,
+ owner_id: Uuid,
+) -> Result<Option<Task>, sqlx::Error> {
+ sqlx::query_as::<_, Task>(
+ r#"
+ SELECT *
+ FROM tasks
+ WHERE id = $1 AND owner_id = $2
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// List all top-level tasks (no parent) for an owner, ordered by created_at DESC.
+pub async fn list_tasks_for_owner(
+ pool: &PgPool,
+ owner_id: Uuid,
+) -> Result<Vec<TaskSummary>, sqlx::Error> {
+ sqlx::query_as::<_, TaskSummary>(
+ r#"
+ SELECT
+ t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority,
+ t.progress_summary, t.version, t.created_at, t.updated_at,
+ (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count
+ FROM tasks t
+ WHERE t.owner_id = $1 AND t.parent_task_id IS NULL
+ ORDER BY t.priority DESC, t.created_at DESC
+ "#,
+ )
+ .bind(owner_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// List subtasks of a parent task, scoped to owner.
+pub async fn list_subtasks_for_owner(
+ pool: &PgPool,
+ parent_id: Uuid,
+ owner_id: Uuid,
+) -> Result<Vec<TaskSummary>, sqlx::Error> {
+ sqlx::query_as::<_, TaskSummary>(
+ r#"
+ SELECT
+ t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority,
+ t.progress_summary, t.version, t.created_at, t.updated_at,
+ (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count
+ FROM tasks t
+ WHERE t.owner_id = $1 AND t.parent_task_id = $2
+ ORDER BY t.priority DESC, t.created_at DESC
+ "#,
+ )
+ .bind(owner_id)
+ .bind(parent_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Update a task by ID with optimistic locking, scoped to owner.
+pub async fn update_task_for_owner(
+ pool: &PgPool,
+ id: Uuid,
+ owner_id: Uuid,
+ req: UpdateTaskRequest,
+) -> Result<Option<Task>, RepositoryError> {
+ // Get the existing task first (scoped to owner)
+ let existing = get_task_for_owner(pool, id, owner_id).await?;
+ let Some(existing) = existing else {
+ return Ok(None);
+ };
+
+ // Check version if provided (optimistic locking)
+ if let Some(expected_version) = req.version {
+ if existing.version != expected_version {
+ return Err(RepositoryError::VersionConflict {
+ expected: expected_version,
+ actual: existing.version,
+ });
+ }
+ }
+
+ // Apply updates
+ let name = req.name.unwrap_or(existing.name);
+ let description = req.description.or(existing.description);
+ let plan = req.plan.unwrap_or(existing.plan);
+ let status = req.status.unwrap_or(existing.status);
+ let priority = req.priority.unwrap_or(existing.priority);
+ let progress_summary = req.progress_summary.or(existing.progress_summary);
+ let last_output = req.last_output.or(existing.last_output);
+ let error_message = req.error_message.or(existing.error_message);
+ let merge_mode = req.merge_mode.or(existing.merge_mode);
+ let pr_url = req.pr_url.or(existing.pr_url);
+ let target_repo_path = req.target_repo_path.or(existing.target_repo_path);
+ let completion_action = req.completion_action.or(existing.completion_action);
+ let daemon_id = if req.clear_daemon_id {
+ None
+ } else {
+ req.daemon_id.or(existing.daemon_id)
+ };
+
+ // Update with version check in WHERE clause for race condition safety
+ let result = if req.version.is_some() {
+ sqlx::query_as::<_, Task>(
+ r#"
+ UPDATE tasks
+ SET name = $3, description = $4, plan = $5, status = $6, priority = $7,
+ progress_summary = $8, last_output = $9, error_message = $10,
+ merge_mode = $11, pr_url = $12, daemon_id = $13,
+ target_repo_path = $14, completion_action = $15, updated_at = NOW()
+ WHERE id = $1 AND owner_id = $2 AND version = $16
+ RETURNING *
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .bind(&name)
+ .bind(&description)
+ .bind(&plan)
+ .bind(&status)
+ .bind(priority)
+ .bind(&progress_summary)
+ .bind(&last_output)
+ .bind(&error_message)
+ .bind(&merge_mode)
+ .bind(&pr_url)
+ .bind(daemon_id)
+ .bind(&target_repo_path)
+ .bind(&completion_action)
+ .bind(req.version.unwrap())
+ .fetch_optional(pool)
+ .await?
+ } else {
+ sqlx::query_as::<_, Task>(
+ r#"
+ UPDATE tasks
+ SET name = $3, description = $4, plan = $5, status = $6, priority = $7,
+ progress_summary = $8, last_output = $9, error_message = $10,
+ merge_mode = $11, pr_url = $12, daemon_id = $13,
+ target_repo_path = $14, completion_action = $15, updated_at = NOW()
+ WHERE id = $1 AND owner_id = $2
+ RETURNING *
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .bind(&name)
+ .bind(&description)
+ .bind(&plan)
+ .bind(&status)
+ .bind(priority)
+ .bind(&progress_summary)
+ .bind(&last_output)
+ .bind(&error_message)
+ .bind(&merge_mode)
+ .bind(&pr_url)
+ .bind(daemon_id)
+ .bind(&target_repo_path)
+ .bind(&completion_action)
+ .fetch_optional(pool)
+ .await?
+ };
+
+ // If versioned update returned None, there was a race condition
+ if result.is_none() && req.version.is_some() {
+ if let Some(current) = get_task_for_owner(pool, id, owner_id).await? {
+ return Err(RepositoryError::VersionConflict {
+ expected: req.version.unwrap(),
+ actual: current.version,
+ });
+ }
+ }
+
+ Ok(result)
+}
+
+/// Delete a task by ID, scoped to owner.
+pub async fn delete_task_for_owner(
+ pool: &PgPool,
+ id: Uuid,
+ owner_id: Uuid,
+) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ DELETE FROM tasks
+ WHERE id = $1 AND owner_id = $2
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// Update task status and record event.
+pub async fn update_task_status(
+ pool: &PgPool,
+ id: Uuid,
+ new_status: &str,
+ event_data: Option<serde_json::Value>,
+) -> Result<Option<Task>, sqlx::Error> {
+ // Get existing status
+ let existing = get_task(pool, id).await?;
+ let Some(existing) = existing else {
+ return Ok(None);
+ };
+
+ let previous_status = existing.status.clone();
+
+ // Update task status
+ let task = sqlx::query_as::<_, Task>(
+ r#"
+ UPDATE tasks
+ SET status = $2, updated_at = NOW(),
+ started_at = CASE WHEN $2 = 'running' AND started_at IS NULL THEN NOW() ELSE started_at END,
+ completed_at = CASE WHEN $2 IN ('done', 'failed', 'merged') THEN NOW() ELSE completed_at END
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(id)
+ .bind(new_status)
+ .fetch_optional(pool)
+ .await?;
+
+ // Record event
+ if task.is_some() {
+ let _ = create_task_event(
+ pool,
+ id,
+ "status_change",
+ Some(&previous_status),
+ Some(new_status),
+ event_data,
+ )
+ .await;
+ }
+
+ Ok(task)
+}
+
+// =============================================================================
+// Task Event Functions
+// =============================================================================
+
+/// Create a task event.
+pub async fn create_task_event(
+ pool: &PgPool,
+ task_id: Uuid,
+ event_type: &str,
+ previous_status: Option<&str>,
+ new_status: Option<&str>,
+ event_data: Option<serde_json::Value>,
+) -> Result<TaskEvent, sqlx::Error> {
+ sqlx::query_as::<_, TaskEvent>(
+ r#"
+ INSERT INTO task_events (task_id, event_type, previous_status, new_status, event_data)
+ VALUES ($1, $2, $3, $4, $5)
+ RETURNING *
+ "#,
+ )
+ .bind(task_id)
+ .bind(event_type)
+ .bind(previous_status)
+ .bind(new_status)
+ .bind(event_data)
+ .fetch_one(pool)
+ .await
+}
+
+/// List events for a task.
+pub async fn list_task_events(
+ pool: &PgPool,
+ task_id: Uuid,
+ limit: Option<i64>,
+) -> Result<Vec<TaskEvent>, sqlx::Error> {
+ let limit = limit.unwrap_or(100);
+ sqlx::query_as::<_, TaskEvent>(
+ r#"
+ SELECT *
+ FROM task_events
+ WHERE task_id = $1
+ ORDER BY created_at DESC
+ LIMIT $2
+ "#,
+ )
+ .bind(task_id)
+ .bind(limit)
+ .fetch_all(pool)
+ .await
+}
+
+// =============================================================================
+// Daemon Functions
+// =============================================================================
+
+/// Register a new daemon connection.
+pub async fn register_daemon(
+ pool: &PgPool,
+ owner_id: Uuid,
+ connection_id: &str,
+ hostname: Option<&str>,
+ machine_id: Option<&str>,
+ max_concurrent_tasks: i32,
+) -> Result<Daemon, sqlx::Error> {
+ sqlx::query_as::<_, Daemon>(
+ r#"
+ INSERT INTO daemons (owner_id, connection_id, hostname, machine_id, max_concurrent_tasks)
+ VALUES ($1, $2, $3, $4, $5)
+ RETURNING *
+ "#,
+ )
+ .bind(owner_id)
+ .bind(connection_id)
+ .bind(hostname)
+ .bind(machine_id)
+ .bind(max_concurrent_tasks)
+ .fetch_one(pool)
+ .await
+}
+
+/// Get a daemon by ID.
+pub async fn get_daemon(pool: &PgPool, id: Uuid) -> Result<Option<Daemon>, sqlx::Error> {
+ sqlx::query_as::<_, Daemon>(
+ r#"
+ SELECT *
+ FROM daemons
+ WHERE id = $1
+ "#,
+ )
+ .bind(id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Get a daemon by connection ID.
+pub async fn get_daemon_by_connection(
+ pool: &PgPool,
+ connection_id: &str,
+) -> Result<Option<Daemon>, sqlx::Error> {
+ sqlx::query_as::<_, Daemon>(
+ r#"
+ SELECT *
+ FROM daemons
+ WHERE connection_id = $1
+ "#,
+ )
+ .bind(connection_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// List all daemons.
+pub async fn list_daemons(pool: &PgPool) -> Result<Vec<Daemon>, sqlx::Error> {
+ sqlx::query_as::<_, Daemon>(
+ r#"
+ SELECT *
+ FROM daemons
+ ORDER BY connected_at DESC
+ "#,
+ )
+ .fetch_all(pool)
+ .await
+}
+
+/// List daemons for a specific owner.
+pub async fn list_daemons_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<Daemon>, sqlx::Error> {
+ sqlx::query_as::<_, Daemon>(
+ r#"
+ SELECT *
+ FROM daemons
+ WHERE owner_id = $1
+ ORDER BY connected_at DESC
+ "#,
+ )
+ .bind(owner_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Get a daemon by ID for a specific owner.
+pub async fn get_daemon_for_owner(pool: &PgPool, id: Uuid, owner_id: Uuid) -> Result<Option<Daemon>, sqlx::Error> {
+ sqlx::query_as::<_, Daemon>(
+ r#"
+ SELECT *
+ FROM daemons
+ WHERE id = $1 AND owner_id = $2
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Update daemon heartbeat.
+pub async fn update_daemon_heartbeat(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ UPDATE daemons
+ SET last_heartbeat_at = NOW(), status = 'connected'
+ WHERE id = $1
+ "#,
+ )
+ .bind(id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// Update daemon status.
+pub async fn update_daemon_status(
+ pool: &PgPool,
+ id: Uuid,
+ status: &str,
+) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ UPDATE daemons
+ SET status = $2,
+ disconnected_at = CASE WHEN $2 = 'disconnected' THEN NOW() ELSE disconnected_at END
+ WHERE id = $1
+ "#,
+ )
+ .bind(id)
+ .bind(status)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// Update daemon task count.
+pub async fn update_daemon_task_count(
+ pool: &PgPool,
+ id: Uuid,
+ delta: i32,
+) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ UPDATE daemons
+ SET current_task_count = GREATEST(0, current_task_count + $2)
+ WHERE id = $1
+ "#,
+ )
+ .bind(id)
+ .bind(delta)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// Delete a daemon by ID.
+pub async fn delete_daemon(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ DELETE FROM daemons
+ WHERE id = $1
+ "#,
+ )
+ .bind(id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// Delete a daemon by connection ID.
+pub async fn delete_daemon_by_connection(
+ pool: &PgPool,
+ connection_id: &str,
+) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ DELETE FROM daemons
+ WHERE connection_id = $1
+ "#,
+ )
+ .bind(connection_id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// Count connected daemons.
+pub async fn count_daemons(pool: &PgPool) -> Result<i64, sqlx::Error> {
+ let result: (i64,) = sqlx::query_as(
+ "SELECT COUNT(*) FROM daemons WHERE status = 'connected'",
+ )
+ .fetch_one(pool)
+ .await?;
+
+ Ok(result.0)
+}
+
+// =============================================================================
+// Sibling Awareness Functions
+// =============================================================================
+
+/// List sibling tasks (tasks with the same parent, excluding the given task).
+pub async fn list_sibling_tasks(
+ pool: &PgPool,
+ task_id: Uuid,
+ parent_id: Option<Uuid>,
+) -> Result<Vec<TaskSummary>, sqlx::Error> {
+ match parent_id {
+ Some(parent) => {
+ sqlx::query_as::<_, TaskSummary>(
+ r#"
+ SELECT
+ t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority,
+ t.progress_summary, t.version, t.created_at, t.updated_at,
+ (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count
+ FROM tasks t
+ WHERE t.parent_task_id = $1 AND t.id != $2
+ ORDER BY t.priority DESC, t.created_at DESC
+ "#,
+ )
+ .bind(parent)
+ .bind(task_id)
+ .fetch_all(pool)
+ .await
+ }
+ None => {
+ // Top-level tasks (no parent) - siblings are other top-level tasks
+ sqlx::query_as::<_, TaskSummary>(
+ r#"
+ SELECT
+ t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority,
+ t.progress_summary, t.version, t.created_at, t.updated_at,
+ (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count
+ FROM tasks t
+ WHERE t.parent_task_id IS NULL AND t.id != $1
+ ORDER BY t.priority DESC, t.created_at DESC
+ "#,
+ )
+ .bind(task_id)
+ .fetch_all(pool)
+ .await
+ }
+ }
+}
+
+/// Get running sibling tasks (for context injection).
+pub async fn get_running_siblings(
+ pool: &PgPool,
+ owner_id: Uuid,
+ task_id: Uuid,
+ parent_id: Option<Uuid>,
+) -> Result<Vec<Task>, sqlx::Error> {
+ match parent_id {
+ Some(parent) => {
+ sqlx::query_as::<_, Task>(
+ r#"
+ SELECT *
+ FROM tasks t
+ WHERE t.owner_id = $1
+ AND t.parent_task_id = $2
+ AND t.id != $3
+ AND t.status = 'running'
+ ORDER BY t.priority DESC
+ "#,
+ )
+ .bind(owner_id)
+ .bind(parent)
+ .bind(task_id)
+ .fetch_all(pool)
+ .await
+ }
+ None => {
+ sqlx::query_as::<_, Task>(
+ r#"
+ SELECT *
+ FROM tasks t
+ WHERE t.owner_id = $1
+ AND t.parent_task_id IS NULL
+ AND t.id != $2
+ AND t.status = 'running'
+ ORDER BY t.priority DESC
+ "#,
+ )
+ .bind(owner_id)
+ .bind(task_id)
+ .fetch_all(pool)
+ .await
+ }
+ }
+}
+
+/// Get task with its siblings for context awareness.
+pub async fn get_task_with_siblings(
+ pool: &PgPool,
+ id: Uuid,
+) -> Result<Option<(Task, Vec<TaskSummary>)>, sqlx::Error> {
+ let task = get_task(pool, id).await?;
+ let Some(task) = task else {
+ return Ok(None);
+ };
+
+ let siblings = list_sibling_tasks(pool, id, task.parent_task_id).await?;
+ Ok(Some((task, siblings)))
+}
+
+// =============================================================================
+// Task Output Persistence Functions
+// =============================================================================
+
+/// Save task output to the database.
+/// This stores output in the task_events table with event_type='output'.
+pub async fn save_task_output(
+ pool: &PgPool,
+ task_id: Uuid,
+ message_type: &str,
+ content: &str,
+ tool_name: Option<&str>,
+ tool_input: Option<serde_json::Value>,
+ is_error: Option<bool>,
+ cost_usd: Option<f64>,
+ duration_ms: Option<u64>,
+) -> Result<TaskEvent, sqlx::Error> {
+ let event_data = serde_json::json!({
+ "messageType": message_type,
+ "content": content,
+ "toolName": tool_name,
+ "toolInput": tool_input,
+ "isError": is_error,
+ "costUsd": cost_usd,
+ "durationMs": duration_ms,
+ });
+
+ create_task_event(pool, task_id, "output", None, None, Some(event_data)).await
+}
+
+/// Get task output from the database.
+/// Retrieves all output events for a task, ordered by creation time.
+pub async fn get_task_output(
+ pool: &PgPool,
+ task_id: Uuid,
+ limit: Option<i64>,
+) -> Result<Vec<TaskEvent>, sqlx::Error> {
+ let limit = limit.unwrap_or(1000);
+ sqlx::query_as::<_, TaskEvent>(
+ r#"
+ SELECT *
+ FROM task_events
+ WHERE task_id = $1 AND event_type = 'output'
+ ORDER BY created_at ASC
+ LIMIT $2
+ "#,
+ )
+ .bind(task_id)
+ .bind(limit)
+ .fetch_all(pool)
+ .await
+}
+
+/// Update task completion status with error message.
+/// Sets the task status to 'done' or 'failed' and records completion time.
+pub async fn complete_task(
+ pool: &PgPool,
+ task_id: Uuid,
+ success: bool,
+ error_message: Option<&str>,
+) -> Result<Option<Task>, sqlx::Error> {
+ let status = if success { "done" } else { "failed" };
+
+ let task = sqlx::query_as::<_, Task>(
+ r#"
+ UPDATE tasks
+ SET status = $2,
+ error_message = COALESCE($3, error_message),
+ completed_at = NOW(),
+ updated_at = NOW()
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(task_id)
+ .bind(status)
+ .bind(error_message)
+ .fetch_optional(pool)
+ .await?;
+
+ // Record completion event
+ if task.is_some() {
+ let event_data = serde_json::json!({
+ "success": success,
+ "errorMessage": error_message,
+ });
+ let _ = create_task_event(
+ pool,
+ task_id,
+ "complete",
+ Some("running"),
+ Some(status),
+ Some(event_data),
+ )
+ .await;
+ }
+
+ Ok(task)
+}
+
+// =============================================================================
+// Mesh Chat History Functions
+// =============================================================================
+
+/// Get or create the active conversation for an owner.
+pub async fn get_or_create_active_conversation(
+ pool: &PgPool,
+ owner_id: Uuid,
+) -> Result<MeshChatConversation, sqlx::Error> {
+ // Try to get existing active conversation for this owner
+ let existing = sqlx::query_as::<_, MeshChatConversation>(
+ r#"
+ SELECT *
+ FROM mesh_chat_conversations
+ WHERE is_active = true AND owner_id = $1
+ LIMIT 1
+ "#,
+ )
+ .bind(owner_id)
+ .fetch_optional(pool)
+ .await?;
+
+ if let Some(conv) = existing {
+ return Ok(conv);
+ }
+
+ // Create new conversation
+ sqlx::query_as::<_, MeshChatConversation>(
+ r#"
+ INSERT INTO mesh_chat_conversations (owner_id, is_active)
+ VALUES ($1, true)
+ RETURNING *
+ "#,
+ )
+ .bind(owner_id)
+ .fetch_one(pool)
+ .await
+}
+
+/// List messages for a conversation.
+pub async fn list_chat_messages(
+ pool: &PgPool,
+ conversation_id: Uuid,
+ limit: Option<i32>,
+) -> Result<Vec<MeshChatMessageRecord>, sqlx::Error> {
+ let limit = limit.unwrap_or(100);
+ sqlx::query_as::<_, MeshChatMessageRecord>(
+ r#"
+ SELECT *
+ FROM mesh_chat_messages
+ WHERE conversation_id = $1
+ ORDER BY created_at ASC
+ LIMIT $2
+ "#,
+ )
+ .bind(conversation_id)
+ .bind(limit)
+ .fetch_all(pool)
+ .await
+}
+
+/// Add a message to a conversation.
+#[allow(clippy::too_many_arguments)]
+pub async fn add_chat_message(
+ pool: &PgPool,
+ conversation_id: Uuid,
+ role: &str,
+ content: &str,
+ context_type: &str,
+ context_task_id: Option<Uuid>,
+ tool_calls: Option<serde_json::Value>,
+ pending_questions: Option<serde_json::Value>,
+) -> Result<MeshChatMessageRecord, sqlx::Error> {
+ sqlx::query_as::<_, MeshChatMessageRecord>(
+ r#"
+ INSERT INTO mesh_chat_messages
+ (conversation_id, role, content, context_type, context_task_id, tool_calls, pending_questions)
+ VALUES ($1, $2, $3, $4, $5, $6, $7)
+ RETURNING *
+ "#,
+ )
+ .bind(conversation_id)
+ .bind(role)
+ .bind(content)
+ .bind(context_type)
+ .bind(context_task_id)
+ .bind(tool_calls)
+ .bind(pending_questions)
+ .fetch_one(pool)
+ .await
+}
+
+/// Clear conversation (archive existing and create new).
+pub async fn clear_conversation(pool: &PgPool, owner_id: Uuid) -> Result<MeshChatConversation, sqlx::Error> {
+ // Mark existing as inactive for this owner
+ sqlx::query(
+ r#"
+ UPDATE mesh_chat_conversations
+ SET is_active = false, updated_at = NOW()
+ WHERE is_active = true AND owner_id = $1
+ "#,
+ )
+ .bind(owner_id)
+ .execute(pool)
+ .await?;
+
+ // Create new active conversation
+ get_or_create_active_conversation(pool, owner_id).await
+}