//! Repository pattern for file database operations.
use chrono::Utc;
use sqlx::PgPool;
use uuid::Uuid;
use super::models::{
CreateFileRequest, CreateTaskRequest, Daemon, File, FileVersion, MeshChatConversation,
MeshChatMessageRecord, Task, TaskEvent, TaskSummary, UpdateFileRequest, UpdateTaskRequest,
};
/// Repository error types.
#[derive(Debug)]
pub enum RepositoryError {
/// Database error
Database(sqlx::Error),
/// Version conflict (optimistic locking failure)
VersionConflict {
/// The version the client expected
expected: i32,
/// The actual current version in the database
actual: i32,
},
}
impl From<sqlx::Error> for RepositoryError {
fn from(e: sqlx::Error) -> Self {
RepositoryError::Database(e)
}
}
impl std::fmt::Display for RepositoryError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RepositoryError::Database(e) => write!(f, "Database error: {}", e),
RepositoryError::VersionConflict { expected, actual } => {
write!(
f,
"Version conflict: expected {}, actual {}",
expected, actual
)
}
}
}
}
impl std::error::Error for RepositoryError {}
/// Generate a default name based on current timestamp.
fn generate_default_name() -> String {
let now = Utc::now();
now.format("Recording - %b %d %Y %H:%M:%S").to_string()
}
/// Create a new file record.
pub async fn create_file(pool: &PgPool, req: CreateFileRequest) -> Result<File, sqlx::Error> {
let name = req.name.unwrap_or_else(generate_default_name);
let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default();
let body_json = serde_json::to_value::<Vec<super::models::BodyElement>>(vec![]).unwrap();
sqlx::query_as::<_, File>(
r#"
INSERT INTO files (name, description, transcript, location, summary, body)
VALUES ($1, $2, $3, $4, NULL, $5)
RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
"#,
)
.bind(&name)
.bind(&req.description)
.bind(&transcript_json)
.bind(&req.location)
.bind(&body_json)
.fetch_one(pool)
.await
}
/// Get a file by ID.
pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
FROM files
WHERE id = $1
"#,
)
.bind(id)
.fetch_optional(pool)
.await
}
/// List all files, ordered by created_at DESC.
pub async fn list_files(pool: &PgPool) -> Result<Vec<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
FROM files
ORDER BY created_at DESC
"#,
)
.fetch_all(pool)
.await
}
/// Update a file by ID with optimistic locking.
///
/// If `req.version` is provided, the update will only succeed if the current
/// version matches. Returns `RepositoryError::VersionConflict` if there's a mismatch.
///
/// If `req.version` is None (e.g., internal system updates), version checking is skipped.
pub async fn update_file(
pool: &PgPool,
id: Uuid,
req: UpdateFileRequest,
) -> Result<Option<File>, RepositoryError> {
// Get the existing file first
let existing = get_file(pool, id).await?;
let Some(existing) = existing else {
return Ok(None);
};
// Check version if provided (optimistic locking)
if let Some(expected_version) = req.version {
if existing.version != expected_version {
return Err(RepositoryError::VersionConflict {
expected: expected_version,
actual: existing.version,
});
}
}
// Apply updates
let name = req.name.unwrap_or(existing.name);
let description = req.description.or(existing.description);
let transcript = req.transcript.unwrap_or(existing.transcript);
let transcript_json = serde_json::to_value(&transcript).unwrap_or_default();
let summary = req.summary.or(existing.summary);
let body = req.body.unwrap_or(existing.body);
let body_json = serde_json::to_value(&body).unwrap_or_default();
// Update with version check in WHERE clause for race condition safety
let result = if req.version.is_some() {
sqlx::query_as::<_, File>(
r#"
UPDATE files
SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW()
WHERE id = $1 AND version = $7
RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
"#,
)
.bind(id)
.bind(&name)
.bind(&description)
.bind(&transcript_json)
.bind(&summary)
.bind(&body_json)
.bind(req.version.unwrap())
.fetch_optional(pool)
.await?
} else {
// No version check for internal updates
sqlx::query_as::<_, File>(
r#"
UPDATE files
SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW()
WHERE id = $1
RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
"#,
)
.bind(id)
.bind(&name)
.bind(&description)
.bind(&transcript_json)
.bind(&summary)
.bind(&body_json)
.fetch_optional(pool)
.await?
};
// If versioned update returned None, there was a race condition
if result.is_none() && req.version.is_some() {
// Re-fetch to get the actual version
if let Some(current) = get_file(pool, id).await? {
return Err(RepositoryError::VersionConflict {
expected: req.version.unwrap(),
actual: current.version,
});
}
}
Ok(result)
}
/// Delete a file by ID.
pub async fn delete_file(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM files
WHERE id = $1
"#,
)
.bind(id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Count total files.
pub async fn count_files(pool: &PgPool) -> Result<i64, sqlx::Error> {
let result: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM files")
.fetch_one(pool)
.await?;
Ok(result.0)
}
// =============================================================================
// Owner-Scoped File Functions
// =============================================================================
/// Create a new file record for a specific owner.
pub async fn create_file_for_owner(
pool: &PgPool,
owner_id: Uuid,
req: CreateFileRequest,
) -> Result<File, sqlx::Error> {
let name = req.name.unwrap_or_else(generate_default_name);
let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default();
let body_json = serde_json::to_value::<Vec<super::models::BodyElement>>(vec![]).unwrap();
sqlx::query_as::<_, File>(
r#"
INSERT INTO files (owner_id, name, description, transcript, location, summary, body)
VALUES ($1, $2, $3, $4, $5, NULL, $6)
RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
"#,
)
.bind(owner_id)
.bind(&name)
.bind(&req.description)
.bind(&transcript_json)
.bind(&req.location)
.bind(&body_json)
.fetch_one(pool)
.await
}
/// Get a file by ID, scoped to owner.
pub async fn get_file_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
) -> Result<Option<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
FROM files
WHERE id = $1 AND owner_id = $2
"#,
)
.bind(id)
.bind(owner_id)
.fetch_optional(pool)
.await
}
/// List all files for an owner, ordered by created_at DESC.
pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
FROM files
WHERE owner_id = $1
ORDER BY created_at DESC
"#,
)
.bind(owner_id)
.fetch_all(pool)
.await
}
/// Update a file by ID with optimistic locking, scoped to owner.
pub async fn update_file_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
req: UpdateFileRequest,
) -> Result<Option<File>, RepositoryError> {
// Get the existing file first (scoped to owner)
let existing = get_file_for_owner(pool, id, owner_id).await?;
let Some(existing) = existing else {
return Ok(None);
};
// Check version if provided (optimistic locking)
if let Some(expected_version) = req.version {
if existing.version != expected_version {
return Err(RepositoryError::VersionConflict {
expected: expected_version,
actual: existing.version,
});
}
}
// Apply updates
let name = req.name.unwrap_or(existing.name);
let description = req.description.or(existing.description);
let transcript = req.transcript.unwrap_or(existing.transcript);
let transcript_json = serde_json::to_value(&transcript).unwrap_or_default();
let summary = req.summary.or(existing.summary);
let body = req.body.unwrap_or(existing.body);
let body_json = serde_json::to_value(&body).unwrap_or_default();
// Update with version check in WHERE clause for race condition safety
let result = if req.version.is_some() {
sqlx::query_as::<_, File>(
r#"
UPDATE files
SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
WHERE id = $1 AND owner_id = $2 AND version = $8
RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
"#,
)
.bind(id)
.bind(owner_id)
.bind(&name)
.bind(&description)
.bind(&transcript_json)
.bind(&summary)
.bind(&body_json)
.bind(req.version.unwrap())
.fetch_optional(pool)
.await?
} else {
// No version check for internal updates
sqlx::query_as::<_, File>(
r#"
UPDATE files
SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
WHERE id = $1 AND owner_id = $2
RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
"#,
)
.bind(id)
.bind(owner_id)
.bind(&name)
.bind(&description)
.bind(&transcript_json)
.bind(&summary)
.bind(&body_json)
.fetch_optional(pool)
.await?
};
// If versioned update returned None, there was a race condition
if result.is_none() && req.version.is_some() {
// Re-fetch to get the actual version
if let Some(current) = get_file_for_owner(pool, id, owner_id).await? {
return Err(RepositoryError::VersionConflict {
expected: req.version.unwrap(),
actual: current.version,
});
}
}
Ok(result)
}
/// Delete a file by ID, scoped to owner.
pub async fn delete_file_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM files
WHERE id = $1 AND owner_id = $2
"#,
)
.bind(id)
.bind(owner_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
// =============================================================================
// Version History Functions
// =============================================================================
/// Set the version source for the current transaction.
/// This is used by the trigger to record who made the change.
pub async fn set_version_source(pool: &PgPool, source: &str) -> Result<(), sqlx::Error> {
sqlx::query(&format!("SET LOCAL app.version_source = '{}'", source))
.execute(pool)
.await?;
Ok(())
}
/// Set the change description for the current transaction.
pub async fn set_change_description(pool: &PgPool, description: &str) -> Result<(), sqlx::Error> {
// Escape single quotes for SQL
let escaped = description.replace('\'', "''");
sqlx::query(&format!("SET LOCAL app.change_description = '{}'", escaped))
.execute(pool)
.await?;
Ok(())
}
/// List all versions of a file, ordered by version DESC.
pub async fn list_file_versions(pool: &PgPool, file_id: Uuid) -> Result<Vec<FileVersion>, sqlx::Error> {
// First get the current version from the files table
let current = get_file(pool, file_id).await?;
let mut versions = sqlx::query_as::<_, FileVersion>(
r#"
SELECT id, file_id, version, name, description, summary, body, source, change_description, created_at
FROM file_versions
WHERE file_id = $1
ORDER BY version DESC
"#,
)
.bind(file_id)
.fetch_all(pool)
.await?;
// Add the current version as the first entry if it exists
if let Some(file) = current {
let current_version = FileVersion {
id: file.id,
file_id: file.id,
version: file.version,
name: file.name,
description: file.description,
summary: file.summary,
body: file.body,
source: "user".to_string(), // Current version source
change_description: None,
created_at: file.updated_at,
};
versions.insert(0, current_version);
}
Ok(versions)
}
/// Get a specific version of a file.
pub async fn get_file_version(
pool: &PgPool,
file_id: Uuid,
version: i32,
) -> Result<Option<FileVersion>, sqlx::Error> {
// First check if this is the current version
if let Some(file) = get_file(pool, file_id).await? {
if file.version == version {
return Ok(Some(FileVersion {
id: file.id,
file_id: file.id,
version: file.version,
name: file.name,
description: file.description,
summary: file.summary,
body: file.body,
source: "user".to_string(),
change_description: None,
created_at: file.updated_at,
}));
}
}
// Otherwise, look in the versions table
sqlx::query_as::<_, FileVersion>(
r#"
SELECT id, file_id, version, name, description, summary, body, source, change_description, created_at
FROM file_versions
WHERE file_id = $1 AND version = $2
"#,
)
.bind(file_id)
.bind(version)
.fetch_optional(pool)
.await
}
/// Restore a file to a previous version.
/// This creates a new version with the content from the target version.
pub async fn restore_file_version(
pool: &PgPool,
file_id: Uuid,
target_version: i32,
current_version: i32,
) -> Result<Option<File>, RepositoryError> {
// Get the target version content
let target = get_file_version(pool, file_id, target_version).await?;
let Some(target) = target else {
return Ok(None);
};
// Set version source and description for the trigger
set_version_source(pool, "system").await?;
set_change_description(pool, &format!("Restored from version {}", target_version)).await?;
// Update the file with the target version's content
// This will trigger the save_file_version trigger to save the current state first
let update_req = UpdateFileRequest {
name: Some(target.name),
description: target.description,
transcript: None,
summary: target.summary,
body: Some(target.body),
version: Some(current_version),
};
update_file(pool, file_id, update_req).await
}
/// Count versions for a file.
pub async fn count_file_versions(pool: &PgPool, file_id: Uuid) -> Result<i64, sqlx::Error> {
let result: (i64,) = sqlx::query_as(
"SELECT COUNT(*) + 1 FROM file_versions WHERE file_id = $1", // +1 for current version
)
.bind(file_id)
.fetch_one(pool)
.await?;
Ok(result.0)
}
// =============================================================================
// Task Functions
// =============================================================================
/// Create a new task.
///
/// If creating a subtask (parent_task_id is set) and repository settings are not provided,
/// the subtask will inherit repository_url, base_branch, target_branch, merge_mode,
/// and target_repo_path from the parent task. Depth is calculated from parent and limited
/// to max 1 (2 levels: orchestrator at depth 0, subtasks at depth 1).
///
/// NOTE: completion_action is NOT inherited - subtasks should not auto-merge unless
/// explicitly configured. The orchestrator controls when completion steps happen.
pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, sqlx::Error> {
// Calculate depth and inherit settings from parent if applicable
let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
if let Some(parent_id) = req.parent_task_id {
// Fetch parent task to get depth and inherit repo settings
let parent = get_task(pool, parent_id).await?
.ok_or_else(|| sqlx::Error::RowNotFound)?;
let new_depth = parent.depth + 1;
// Validate max depth (must be < 2, i.e., 0 or 1 only)
// Orchestrators are at depth 0, subtasks at depth 1
// Subtasks cannot have their own children
if new_depth >= 2 {
return Err(sqlx::Error::Protocol(format!(
"Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.",
new_depth
)));
}
// Inherit repo settings if not provided
let repo_url = req.repository_url.clone().or(parent.repository_url);
let base_branch = req.base_branch.clone().or(parent.base_branch);
let target_branch = req.target_branch.clone().or(parent.target_branch);
let merge_mode = req.merge_mode.clone().or(parent.merge_mode);
let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path);
// NOTE: completion_action is NOT inherited - subtasks should not auto-merge.
// The orchestrator integrates subtask work from their worktrees.
let completion_action = req.completion_action.clone();
(new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
} else {
// Top-level task: depth 0
(
0,
req.repository_url.clone(),
req.base_branch.clone(),
req.target_branch.clone(),
req.merge_mode.clone(),
req.target_repo_path.clone(),
req.completion_action.clone(),
)
};
let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default());
sqlx::query_as::<_, Task>(
r#"
INSERT INTO tasks (
parent_task_id, depth, name, description, plan, priority,
repository_url, base_branch, target_branch, merge_mode,
target_repo_path, completion_action, continue_from_task_id, copy_files
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
RETURNING *
"#,
)
.bind(req.parent_task_id)
.bind(depth)
.bind(&req.name)
.bind(&req.description)
.bind(&req.plan)
.bind(req.priority)
.bind(&repo_url)
.bind(&base_branch)
.bind(&target_branch)
.bind(&merge_mode)
.bind(&target_repo_path)
.bind(&completion_action)
.bind(&req.continue_from_task_id)
.bind(©_files_json)
.fetch_one(pool)
.await
}
/// Get a task by ID.
pub async fn get_task(pool: &PgPool, id: Uuid) -> Result<Option<Task>, sqlx::Error> {
sqlx::query_as::<_, Task>(
r#"
SELECT *
FROM tasks
WHERE id = $1
"#,
)
.bind(id)
.fetch_optional(pool)
.await
}
/// List all top-level tasks (no parent), ordered by created_at DESC.
pub async fn list_tasks(pool: &PgPool) -> Result<Vec<TaskSummary>, sqlx::Error> {
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary, t.version, t.created_at, t.updated_at,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count
FROM tasks t
WHERE t.parent_task_id IS NULL
ORDER BY t.priority DESC, t.created_at DESC
"#,
)
.fetch_all(pool)
.await
}
/// List subtasks of a parent task.
pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSummary>, sqlx::Error> {
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary, t.version, t.created_at, t.updated_at,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count
FROM tasks t
WHERE t.parent_task_id = $1
ORDER BY t.priority DESC, t.created_at DESC
"#,
)
.bind(parent_id)
.fetch_all(pool)
.await
}
/// Update a task by ID with optimistic locking.
pub async fn update_task(
pool: &PgPool,
id: Uuid,
req: UpdateTaskRequest,
) -> Result<Option<Task>, RepositoryError> {
// Get the existing task first
let existing = get_task(pool, id).await?;
let Some(existing) = existing else {
return Ok(None);
};
// Check version if provided (optimistic locking)
if let Some(expected_version) = req.version {
if existing.version != expected_version {
return Err(RepositoryError::VersionConflict {
expected: expected_version,
actual: existing.version,
});
}
}
// Apply updates
let name = req.name.unwrap_or(existing.name);
let description = req.description.or(existing.description);
let plan = req.plan.unwrap_or(existing.plan);
let status = req.status.unwrap_or(existing.status);
let priority = req.priority.unwrap_or(existing.priority);
let progress_summary = req.progress_summary.or(existing.progress_summary);
let last_output = req.last_output.or(existing.last_output);
let error_message = req.error_message.or(existing.error_message);
let merge_mode = req.merge_mode.or(existing.merge_mode);
let pr_url = req.pr_url.or(existing.pr_url);
let target_repo_path = req.target_repo_path.or(existing.target_repo_path);
let completion_action = req.completion_action.or(existing.completion_action);
// Handle clear_daemon_id: if true, set to NULL; otherwise use provided value or keep existing
let daemon_id = if req.clear_daemon_id {
None
} else {
req.daemon_id.or(existing.daemon_id)
};
// Update with version check in WHERE clause for race condition safety
let result = if req.version.is_some() {
sqlx::query_as::<_, Task>(
r#"
UPDATE tasks
SET name = $2, description = $3, plan = $4, status = $5, priority = $6,
progress_summary = $7, last_output = $8, error_message = $9,
merge_mode = $10, pr_url = $11, daemon_id = $12,
target_repo_path = $13, completion_action = $14, updated_at = NOW()
WHERE id = $1 AND version = $15
RETURNING *
"#,
)
.bind(id)
.bind(&name)
.bind(&description)
.bind(&plan)
.bind(&status)
.bind(priority)
.bind(&progress_summary)
.bind(&last_output)
.bind(&error_message)
.bind(&merge_mode)
.bind(&pr_url)
.bind(daemon_id)
.bind(&target_repo_path)
.bind(&completion_action)
.bind(req.version.unwrap())
.fetch_optional(pool)
.await?
} else {
sqlx::query_as::<_, Task>(
r#"
UPDATE tasks
SET name = $2, description = $3, plan = $4, status = $5, priority = $6,
progress_summary = $7, last_output = $8, error_message = $9,
merge_mode = $10, pr_url = $11, daemon_id = $12,
target_repo_path = $13, completion_action = $14, updated_at = NOW()
WHERE id = $1
RETURNING *
"#,
)
.bind(id)
.bind(&name)
.bind(&description)
.bind(&plan)
.bind(&status)
.bind(priority)
.bind(&progress_summary)
.bind(&last_output)
.bind(&error_message)
.bind(&merge_mode)
.bind(&pr_url)
.bind(daemon_id)
.bind(&target_repo_path)
.bind(&completion_action)
.fetch_optional(pool)
.await?
};
// If versioned update returned None, there was a race condition
if result.is_none() && req.version.is_some() {
if let Some(current) = get_task(pool, id).await? {
return Err(RepositoryError::VersionConflict {
expected: req.version.unwrap(),
actual: current.version,
});
}
}
Ok(result)
}
/// Delete a task by ID.
pub async fn delete_task(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM tasks
WHERE id = $1
"#,
)
.bind(id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Count total tasks.
pub async fn count_tasks(pool: &PgPool) -> Result<i64, sqlx::Error> {
let result: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM tasks WHERE parent_task_id IS NULL",
)
.fetch_one(pool)
.await?;
Ok(result.0)
}
// =============================================================================
// Owner-Scoped Task Functions
// =============================================================================
/// Create a new task for a specific owner.
pub async fn create_task_for_owner(
pool: &PgPool,
owner_id: Uuid,
req: CreateTaskRequest,
) -> Result<Task, sqlx::Error> {
// Calculate depth and inherit settings from parent if applicable
let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
if let Some(parent_id) = req.parent_task_id {
// Fetch parent task to get depth and inherit repo settings (must belong to same owner)
let parent = get_task_for_owner(pool, parent_id, owner_id).await?
.ok_or_else(|| sqlx::Error::RowNotFound)?;
let new_depth = parent.depth + 1;
// Validate max depth
if new_depth >= 2 {
return Err(sqlx::Error::Protocol(format!(
"Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.",
new_depth
)));
}
// Inherit repo settings if not provided
let repo_url = req.repository_url.clone().or(parent.repository_url);
let base_branch = req.base_branch.clone().or(parent.base_branch);
let target_branch = req.target_branch.clone().or(parent.target_branch);
let merge_mode = req.merge_mode.clone().or(parent.merge_mode);
let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path);
// NOTE: completion_action is NOT inherited - subtasks should not auto-merge.
// The orchestrator integrates subtask work from their worktrees.
let completion_action = req.completion_action.clone();
(new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
} else {
// Top-level task: depth 0
(
0,
req.repository_url.clone(),
req.base_branch.clone(),
req.target_branch.clone(),
req.merge_mode.clone(),
req.target_repo_path.clone(),
req.completion_action.clone(),
)
};
let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default());
sqlx::query_as::<_, Task>(
r#"
INSERT INTO tasks (
owner_id, parent_task_id, depth, name, description, plan, priority,
repository_url, base_branch, target_branch, merge_mode,
target_repo_path, completion_action, continue_from_task_id, copy_files
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
RETURNING *
"#,
)
.bind(owner_id)
.bind(req.parent_task_id)
.bind(depth)
.bind(&req.name)
.bind(&req.description)
.bind(&req.plan)
.bind(req.priority)
.bind(&repo_url)
.bind(&base_branch)
.bind(&target_branch)
.bind(&merge_mode)
.bind(&target_repo_path)
.bind(&completion_action)
.bind(&req.continue_from_task_id)
.bind(©_files_json)
.fetch_one(pool)
.await
}
/// Get a task by ID, scoped to owner.
pub async fn get_task_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
) -> Result<Option<Task>, sqlx::Error> {
sqlx::query_as::<_, Task>(
r#"
SELECT *
FROM tasks
WHERE id = $1 AND owner_id = $2
"#,
)
.bind(id)
.bind(owner_id)
.fetch_optional(pool)
.await
}
/// List all top-level tasks (no parent) for an owner, ordered by created_at DESC.
pub async fn list_tasks_for_owner(
pool: &PgPool,
owner_id: Uuid,
) -> Result<Vec<TaskSummary>, sqlx::Error> {
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary, t.version, t.created_at, t.updated_at,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count
FROM tasks t
WHERE t.owner_id = $1 AND t.parent_task_id IS NULL
ORDER BY t.priority DESC, t.created_at DESC
"#,
)
.bind(owner_id)
.fetch_all(pool)
.await
}
/// List subtasks of a parent task, scoped to owner.
pub async fn list_subtasks_for_owner(
pool: &PgPool,
parent_id: Uuid,
owner_id: Uuid,
) -> Result<Vec<TaskSummary>, sqlx::Error> {
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary, t.version, t.created_at, t.updated_at,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count
FROM tasks t
WHERE t.owner_id = $1 AND t.parent_task_id = $2
ORDER BY t.priority DESC, t.created_at DESC
"#,
)
.bind(owner_id)
.bind(parent_id)
.fetch_all(pool)
.await
}
/// Update a task by ID with optimistic locking, scoped to owner.
pub async fn update_task_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
req: UpdateTaskRequest,
) -> Result<Option<Task>, RepositoryError> {
// Get the existing task first (scoped to owner)
let existing = get_task_for_owner(pool, id, owner_id).await?;
let Some(existing) = existing else {
return Ok(None);
};
// Check version if provided (optimistic locking)
if let Some(expected_version) = req.version {
if existing.version != expected_version {
return Err(RepositoryError::VersionConflict {
expected: expected_version,
actual: existing.version,
});
}
}
// Apply updates
let name = req.name.unwrap_or(existing.name);
let description = req.description.or(existing.description);
let plan = req.plan.unwrap_or(existing.plan);
let status = req.status.unwrap_or(existing.status);
let priority = req.priority.unwrap_or(existing.priority);
let progress_summary = req.progress_summary.or(existing.progress_summary);
let last_output = req.last_output.or(existing.last_output);
let error_message = req.error_message.or(existing.error_message);
let merge_mode = req.merge_mode.or(existing.merge_mode);
let pr_url = req.pr_url.or(existing.pr_url);
let target_repo_path = req.target_repo_path.or(existing.target_repo_path);
let completion_action = req.completion_action.or(existing.completion_action);
let daemon_id = if req.clear_daemon_id {
None
} else {
req.daemon_id.or(existing.daemon_id)
};
// Update with version check in WHERE clause for race condition safety
let result = if req.version.is_some() {
sqlx::query_as::<_, Task>(
r#"
UPDATE tasks
SET name = $3, description = $4, plan = $5, status = $6, priority = $7,
progress_summary = $8, last_output = $9, error_message = $10,
merge_mode = $11, pr_url = $12, daemon_id = $13,
target_repo_path = $14, completion_action = $15, updated_at = NOW()
WHERE id = $1 AND owner_id = $2 AND version = $16
RETURNING *
"#,
)
.bind(id)
.bind(owner_id)
.bind(&name)
.bind(&description)
.bind(&plan)
.bind(&status)
.bind(priority)
.bind(&progress_summary)
.bind(&last_output)
.bind(&error_message)
.bind(&merge_mode)
.bind(&pr_url)
.bind(daemon_id)
.bind(&target_repo_path)
.bind(&completion_action)
.bind(req.version.unwrap())
.fetch_optional(pool)
.await?
} else {
sqlx::query_as::<_, Task>(
r#"
UPDATE tasks
SET name = $3, description = $4, plan = $5, status = $6, priority = $7,
progress_summary = $8, last_output = $9, error_message = $10,
merge_mode = $11, pr_url = $12, daemon_id = $13,
target_repo_path = $14, completion_action = $15, updated_at = NOW()
WHERE id = $1 AND owner_id = $2
RETURNING *
"#,
)
.bind(id)
.bind(owner_id)
.bind(&name)
.bind(&description)
.bind(&plan)
.bind(&status)
.bind(priority)
.bind(&progress_summary)
.bind(&last_output)
.bind(&error_message)
.bind(&merge_mode)
.bind(&pr_url)
.bind(daemon_id)
.bind(&target_repo_path)
.bind(&completion_action)
.fetch_optional(pool)
.await?
};
// If versioned update returned None, there was a race condition
if result.is_none() && req.version.is_some() {
if let Some(current) = get_task_for_owner(pool, id, owner_id).await? {
return Err(RepositoryError::VersionConflict {
expected: req.version.unwrap(),
actual: current.version,
});
}
}
Ok(result)
}
/// Delete a task by ID, scoped to owner.
pub async fn delete_task_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM tasks
WHERE id = $1 AND owner_id = $2
"#,
)
.bind(id)
.bind(owner_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Update task status and record event.
pub async fn update_task_status(
pool: &PgPool,
id: Uuid,
new_status: &str,
event_data: Option<serde_json::Value>,
) -> Result<Option<Task>, sqlx::Error> {
// Get existing status
let existing = get_task(pool, id).await?;
let Some(existing) = existing else {
return Ok(None);
};
let previous_status = existing.status.clone();
// Update task status
let task = sqlx::query_as::<_, Task>(
r#"
UPDATE tasks
SET status = $2, updated_at = NOW(),
started_at = CASE WHEN $2 = 'running' AND started_at IS NULL THEN NOW() ELSE started_at END,
completed_at = CASE WHEN $2 IN ('done', 'failed', 'merged') THEN NOW() ELSE completed_at END
WHERE id = $1
RETURNING *
"#,
)
.bind(id)
.bind(new_status)
.fetch_optional(pool)
.await?;
// Record event
if task.is_some() {
let _ = create_task_event(
pool,
id,
"status_change",
Some(&previous_status),
Some(new_status),
event_data,
)
.await;
}
Ok(task)
}
// =============================================================================
// Task Event Functions
// =============================================================================
/// Create a task event.
pub async fn create_task_event(
pool: &PgPool,
task_id: Uuid,
event_type: &str,
previous_status: Option<&str>,
new_status: Option<&str>,
event_data: Option<serde_json::Value>,
) -> Result<TaskEvent, sqlx::Error> {
sqlx::query_as::<_, TaskEvent>(
r#"
INSERT INTO task_events (task_id, event_type, previous_status, new_status, event_data)
VALUES ($1, $2, $3, $4, $5)
RETURNING *
"#,
)
.bind(task_id)
.bind(event_type)
.bind(previous_status)
.bind(new_status)
.bind(event_data)
.fetch_one(pool)
.await
}
/// List events for a task.
pub async fn list_task_events(
pool: &PgPool,
task_id: Uuid,
limit: Option<i64>,
) -> Result<Vec<TaskEvent>, sqlx::Error> {
let limit = limit.unwrap_or(100);
sqlx::query_as::<_, TaskEvent>(
r#"
SELECT *
FROM task_events
WHERE task_id = $1
ORDER BY created_at DESC
LIMIT $2
"#,
)
.bind(task_id)
.bind(limit)
.fetch_all(pool)
.await
}
// =============================================================================
// Daemon Functions
// =============================================================================
/// Register a new daemon connection.
pub async fn register_daemon(
pool: &PgPool,
owner_id: Uuid,
connection_id: &str,
hostname: Option<&str>,
machine_id: Option<&str>,
max_concurrent_tasks: i32,
) -> Result<Daemon, sqlx::Error> {
sqlx::query_as::<_, Daemon>(
r#"
INSERT INTO daemons (owner_id, connection_id, hostname, machine_id, max_concurrent_tasks)
VALUES ($1, $2, $3, $4, $5)
RETURNING *
"#,
)
.bind(owner_id)
.bind(connection_id)
.bind(hostname)
.bind(machine_id)
.bind(max_concurrent_tasks)
.fetch_one(pool)
.await
}
/// Get a daemon by ID.
pub async fn get_daemon(pool: &PgPool, id: Uuid) -> Result<Option<Daemon>, sqlx::Error> {
sqlx::query_as::<_, Daemon>(
r#"
SELECT *
FROM daemons
WHERE id = $1
"#,
)
.bind(id)
.fetch_optional(pool)
.await
}
/// Get a daemon by connection ID.
pub async fn get_daemon_by_connection(
pool: &PgPool,
connection_id: &str,
) -> Result<Option<Daemon>, sqlx::Error> {
sqlx::query_as::<_, Daemon>(
r#"
SELECT *
FROM daemons
WHERE connection_id = $1
"#,
)
.bind(connection_id)
.fetch_optional(pool)
.await
}
/// List all daemons.
pub async fn list_daemons(pool: &PgPool) -> Result<Vec<Daemon>, sqlx::Error> {
sqlx::query_as::<_, Daemon>(
r#"
SELECT *
FROM daemons
ORDER BY connected_at DESC
"#,
)
.fetch_all(pool)
.await
}
/// List daemons for a specific owner.
pub async fn list_daemons_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<Daemon>, sqlx::Error> {
sqlx::query_as::<_, Daemon>(
r#"
SELECT *
FROM daemons
WHERE owner_id = $1
ORDER BY connected_at DESC
"#,
)
.bind(owner_id)
.fetch_all(pool)
.await
}
/// Get a daemon by ID for a specific owner.
pub async fn get_daemon_for_owner(pool: &PgPool, id: Uuid, owner_id: Uuid) -> Result<Option<Daemon>, sqlx::Error> {
sqlx::query_as::<_, Daemon>(
r#"
SELECT *
FROM daemons
WHERE id = $1 AND owner_id = $2
"#,
)
.bind(id)
.bind(owner_id)
.fetch_optional(pool)
.await
}
/// Update daemon heartbeat.
pub async fn update_daemon_heartbeat(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
UPDATE daemons
SET last_heartbeat_at = NOW(), status = 'connected'
WHERE id = $1
"#,
)
.bind(id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Update daemon status.
pub async fn update_daemon_status(
pool: &PgPool,
id: Uuid,
status: &str,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
UPDATE daemons
SET status = $2,
disconnected_at = CASE WHEN $2 = 'disconnected' THEN NOW() ELSE disconnected_at END
WHERE id = $1
"#,
)
.bind(id)
.bind(status)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Update daemon task count.
pub async fn update_daemon_task_count(
pool: &PgPool,
id: Uuid,
delta: i32,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
UPDATE daemons
SET current_task_count = GREATEST(0, current_task_count + $2)
WHERE id = $1
"#,
)
.bind(id)
.bind(delta)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Delete a daemon by ID.
pub async fn delete_daemon(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM daemons
WHERE id = $1
"#,
)
.bind(id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Delete a daemon by connection ID.
pub async fn delete_daemon_by_connection(
pool: &PgPool,
connection_id: &str,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM daemons
WHERE connection_id = $1
"#,
)
.bind(connection_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Count connected daemons.
pub async fn count_daemons(pool: &PgPool) -> Result<i64, sqlx::Error> {
let result: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM daemons WHERE status = 'connected'",
)
.fetch_one(pool)
.await?;
Ok(result.0)
}
// =============================================================================
// Sibling Awareness Functions
// =============================================================================
/// List sibling tasks (tasks with the same parent, excluding the given task).
pub async fn list_sibling_tasks(
pool: &PgPool,
task_id: Uuid,
parent_id: Option<Uuid>,
) -> Result<Vec<TaskSummary>, sqlx::Error> {
match parent_id {
Some(parent) => {
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary, t.version, t.created_at, t.updated_at,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count
FROM tasks t
WHERE t.parent_task_id = $1 AND t.id != $2
ORDER BY t.priority DESC, t.created_at DESC
"#,
)
.bind(parent)
.bind(task_id)
.fetch_all(pool)
.await
}
None => {
// Top-level tasks (no parent) - siblings are other top-level tasks
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary, t.version, t.created_at, t.updated_at,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count
FROM tasks t
WHERE t.parent_task_id IS NULL AND t.id != $1
ORDER BY t.priority DESC, t.created_at DESC
"#,
)
.bind(task_id)
.fetch_all(pool)
.await
}
}
}
/// Get running sibling tasks (for context injection).
pub async fn get_running_siblings(
pool: &PgPool,
owner_id: Uuid,
task_id: Uuid,
parent_id: Option<Uuid>,
) -> Result<Vec<Task>, sqlx::Error> {
match parent_id {
Some(parent) => {
sqlx::query_as::<_, Task>(
r#"
SELECT *
FROM tasks t
WHERE t.owner_id = $1
AND t.parent_task_id = $2
AND t.id != $3
AND t.status = 'running'
ORDER BY t.priority DESC
"#,
)
.bind(owner_id)
.bind(parent)
.bind(task_id)
.fetch_all(pool)
.await
}
None => {
sqlx::query_as::<_, Task>(
r#"
SELECT *
FROM tasks t
WHERE t.owner_id = $1
AND t.parent_task_id IS NULL
AND t.id != $2
AND t.status = 'running'
ORDER BY t.priority DESC
"#,
)
.bind(owner_id)
.bind(task_id)
.fetch_all(pool)
.await
}
}
}
/// Get task with its siblings for context awareness.
pub async fn get_task_with_siblings(
pool: &PgPool,
id: Uuid,
) -> Result<Option<(Task, Vec<TaskSummary>)>, sqlx::Error> {
let task = get_task(pool, id).await?;
let Some(task) = task else {
return Ok(None);
};
let siblings = list_sibling_tasks(pool, id, task.parent_task_id).await?;
Ok(Some((task, siblings)))
}
// =============================================================================
// Task Output Persistence Functions
// =============================================================================
/// Save task output to the database.
/// This stores output in the task_events table with event_type='output'.
pub async fn save_task_output(
pool: &PgPool,
task_id: Uuid,
message_type: &str,
content: &str,
tool_name: Option<&str>,
tool_input: Option<serde_json::Value>,
is_error: Option<bool>,
cost_usd: Option<f64>,
duration_ms: Option<u64>,
) -> Result<TaskEvent, sqlx::Error> {
let event_data = serde_json::json!({
"messageType": message_type,
"content": content,
"toolName": tool_name,
"toolInput": tool_input,
"isError": is_error,
"costUsd": cost_usd,
"durationMs": duration_ms,
});
create_task_event(pool, task_id, "output", None, None, Some(event_data)).await
}
/// Get task output from the database.
/// Retrieves all output events for a task, ordered by creation time.
pub async fn get_task_output(
pool: &PgPool,
task_id: Uuid,
limit: Option<i64>,
) -> Result<Vec<TaskEvent>, sqlx::Error> {
let limit = limit.unwrap_or(1000);
sqlx::query_as::<_, TaskEvent>(
r#"
SELECT *
FROM task_events
WHERE task_id = $1 AND event_type = 'output'
ORDER BY created_at ASC
LIMIT $2
"#,
)
.bind(task_id)
.bind(limit)
.fetch_all(pool)
.await
}
/// Update task completion status with error message.
/// Sets the task status to 'done' or 'failed' and records completion time.
pub async fn complete_task(
pool: &PgPool,
task_id: Uuid,
success: bool,
error_message: Option<&str>,
) -> Result<Option<Task>, sqlx::Error> {
let status = if success { "done" } else { "failed" };
let task = sqlx::query_as::<_, Task>(
r#"
UPDATE tasks
SET status = $2,
error_message = COALESCE($3, error_message),
completed_at = NOW(),
updated_at = NOW()
WHERE id = $1
RETURNING *
"#,
)
.bind(task_id)
.bind(status)
.bind(error_message)
.fetch_optional(pool)
.await?;
// Record completion event
if task.is_some() {
let event_data = serde_json::json!({
"success": success,
"errorMessage": error_message,
});
let _ = create_task_event(
pool,
task_id,
"complete",
Some("running"),
Some(status),
Some(event_data),
)
.await;
}
Ok(task)
}
// =============================================================================
// Mesh Chat History Functions
// =============================================================================
/// Get or create the active conversation for an owner.
pub async fn get_or_create_active_conversation(
pool: &PgPool,
owner_id: Uuid,
) -> Result<MeshChatConversation, sqlx::Error> {
// Try to get existing active conversation for this owner
let existing = sqlx::query_as::<_, MeshChatConversation>(
r#"
SELECT *
FROM mesh_chat_conversations
WHERE is_active = true AND owner_id = $1
LIMIT 1
"#,
)
.bind(owner_id)
.fetch_optional(pool)
.await?;
if let Some(conv) = existing {
return Ok(conv);
}
// Create new conversation
sqlx::query_as::<_, MeshChatConversation>(
r#"
INSERT INTO mesh_chat_conversations (owner_id, is_active)
VALUES ($1, true)
RETURNING *
"#,
)
.bind(owner_id)
.fetch_one(pool)
.await
}
/// List messages for a conversation.
pub async fn list_chat_messages(
pool: &PgPool,
conversation_id: Uuid,
limit: Option<i32>,
) -> Result<Vec<MeshChatMessageRecord>, sqlx::Error> {
let limit = limit.unwrap_or(100);
sqlx::query_as::<_, MeshChatMessageRecord>(
r#"
SELECT *
FROM mesh_chat_messages
WHERE conversation_id = $1
ORDER BY created_at ASC
LIMIT $2
"#,
)
.bind(conversation_id)
.bind(limit)
.fetch_all(pool)
.await
}
/// Add a message to a conversation.
#[allow(clippy::too_many_arguments)]
pub async fn add_chat_message(
pool: &PgPool,
conversation_id: Uuid,
role: &str,
content: &str,
context_type: &str,
context_task_id: Option<Uuid>,
tool_calls: Option<serde_json::Value>,
pending_questions: Option<serde_json::Value>,
) -> Result<MeshChatMessageRecord, sqlx::Error> {
sqlx::query_as::<_, MeshChatMessageRecord>(
r#"
INSERT INTO mesh_chat_messages
(conversation_id, role, content, context_type, context_task_id, tool_calls, pending_questions)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING *
"#,
)
.bind(conversation_id)
.bind(role)
.bind(content)
.bind(context_type)
.bind(context_task_id)
.bind(tool_calls)
.bind(pending_questions)
.fetch_one(pool)
.await
}
/// Clear conversation (archive existing and create new).
pub async fn clear_conversation(pool: &PgPool, owner_id: Uuid) -> Result<MeshChatConversation, sqlx::Error> {
// Mark existing as inactive for this owner
sqlx::query(
r#"
UPDATE mesh_chat_conversations
SET is_active = false, updated_at = NOW()
WHERE is_active = true AND owner_id = $1
"#,
)
.bind(owner_id)
.execute(pool)
.await?;
// Create new active conversation
get_or_create_active_conversation(pool, owner_id).await
}