//! Repository pattern for file database operations.
use chrono::Utc;
use sqlx::PgPool;
use uuid::Uuid;
use super::models::{
Contract, ContractChatConversation, ContractChatMessageRecord, ContractEvent, ContractRepository,
ContractSummary, CreateCheckpointRequest, CreateContractRequest, CreateFileRequest,
CreateTaskRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, File, FileSummary,
FileVersion, MeshChatConversation, MeshChatMessageRecord, SupervisorState, Task, TaskCheckpoint,
TaskEvent, TaskSummary, UpdateContractRequest, UpdateFileRequest, UpdateSupervisorStateRequest,
UpdateTaskRequest,
};
/// Repository error types.
#[derive(Debug)]
pub enum RepositoryError {
/// Database error
Database(sqlx::Error),
/// Version conflict (optimistic locking failure)
VersionConflict {
/// The version the client expected
expected: i32,
/// The actual current version in the database
actual: i32,
},
}
impl From<sqlx::Error> for RepositoryError {
fn from(e: sqlx::Error) -> Self {
RepositoryError::Database(e)
}
}
impl std::fmt::Display for RepositoryError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RepositoryError::Database(e) => write!(f, "Database error: {}", e),
RepositoryError::VersionConflict { expected, actual } => {
write!(
f,
"Version conflict: expected {}, actual {}",
expected, actual
)
}
}
}
}
impl std::error::Error for RepositoryError {}
/// Generate a default name based on current timestamp.
fn generate_default_name() -> String {
let now = Utc::now();
now.format("Recording - %b %d %Y %H:%M:%S").to_string()
}
/// Internal request for creating files without contract association (e.g., audio transcription).
/// User-facing file creation should use CreateFileRequest which requires contract_id.
pub struct InternalCreateFileRequest {
pub name: Option<String>,
pub description: Option<String>,
pub transcript: Vec<super::models::TranscriptEntry>,
pub location: Option<String>,
}
/// Create a new file record (internal use, no contract required).
/// For user-facing file creation, use create_file_for_owner which requires a contract.
pub async fn create_file(pool: &PgPool, req: InternalCreateFileRequest) -> Result<File, sqlx::Error> {
let name = req.name.unwrap_or_else(generate_default_name);
let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default();
let body_json = serde_json::to_value::<Vec<super::models::BodyElement>>(vec![]).unwrap();
sqlx::query_as::<_, File>(
r#"
INSERT INTO files (name, description, transcript, location, summary, body)
VALUES ($1, $2, $3, $4, NULL, $5)
RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(&name)
.bind(&req.description)
.bind(&transcript_json)
.bind(&req.location)
.bind(&body_json)
.fetch_one(pool)
.await
}
/// Get a file by ID.
pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
WHERE id = $1
"#,
)
.bind(id)
.fetch_optional(pool)
.await
}
/// List all files, ordered by created_at DESC.
pub async fn list_files(pool: &PgPool) -> Result<Vec<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
ORDER BY created_at DESC
"#,
)
.fetch_all(pool)
.await
}
/// Update a file by ID with optimistic locking.
///
/// If `req.version` is provided, the update will only succeed if the current
/// version matches. Returns `RepositoryError::VersionConflict` if there's a mismatch.
///
/// If `req.version` is None (e.g., internal system updates), version checking is skipped.
pub async fn update_file(
pool: &PgPool,
id: Uuid,
req: UpdateFileRequest,
) -> Result<Option<File>, RepositoryError> {
// Get the existing file first
let existing = get_file(pool, id).await?;
let Some(existing) = existing else {
return Ok(None);
};
// Check version if provided (optimistic locking)
if let Some(expected_version) = req.version {
if existing.version != expected_version {
return Err(RepositoryError::VersionConflict {
expected: expected_version,
actual: existing.version,
});
}
}
// Apply updates
let name = req.name.unwrap_or(existing.name);
let description = req.description.or(existing.description);
let transcript = req.transcript.unwrap_or(existing.transcript);
let transcript_json = serde_json::to_value(&transcript).unwrap_or_default();
let summary = req.summary.or(existing.summary);
let body = req.body.unwrap_or(existing.body);
let body_json = serde_json::to_value(&body).unwrap_or_default();
// Update with version check in WHERE clause for race condition safety
let result = if req.version.is_some() {
sqlx::query_as::<_, File>(
r#"
UPDATE files
SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW()
WHERE id = $1 AND version = $7
RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(id)
.bind(&name)
.bind(&description)
.bind(&transcript_json)
.bind(&summary)
.bind(&body_json)
.bind(req.version.unwrap())
.fetch_optional(pool)
.await?
} else {
// No version check for internal updates
sqlx::query_as::<_, File>(
r#"
UPDATE files
SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW()
WHERE id = $1
RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(id)
.bind(&name)
.bind(&description)
.bind(&transcript_json)
.bind(&summary)
.bind(&body_json)
.fetch_optional(pool)
.await?
};
// If versioned update returned None, there was a race condition
if result.is_none() && req.version.is_some() {
// Re-fetch to get the actual version
if let Some(current) = get_file(pool, id).await? {
return Err(RepositoryError::VersionConflict {
expected: req.version.unwrap(),
actual: current.version,
});
}
}
Ok(result)
}
/// Delete a file by ID.
pub async fn delete_file(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM files
WHERE id = $1
"#,
)
.bind(id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Count total files.
pub async fn count_files(pool: &PgPool) -> Result<i64, sqlx::Error> {
let result: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM files")
.fetch_one(pool)
.await?;
Ok(result.0)
}
// =============================================================================
// Owner-Scoped File Functions
// =============================================================================
/// Create a new file record for a specific owner.
/// Files must belong to a contract - the contract_id is required and the phase is looked up.
pub async fn create_file_for_owner(
pool: &PgPool,
owner_id: Uuid,
req: CreateFileRequest,
) -> Result<File, sqlx::Error> {
let name = req.name.unwrap_or_else(generate_default_name);
let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default();
// Use body from request (may be empty or contain template elements)
let body_json = serde_json::to_value(&req.body).unwrap_or_default();
// Use provided contract_phase, or look up from contract's current phase
let contract_phase: Option<String> = if req.contract_phase.is_some() {
req.contract_phase
} else {
sqlx::query_scalar(
"SELECT phase FROM contracts WHERE id = $1 AND owner_id = $2",
)
.bind(req.contract_id)
.bind(owner_id)
.fetch_optional(pool)
.await?
};
sqlx::query_as::<_, File>(
r#"
INSERT INTO files (owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, repo_file_path)
VALUES ($1, $2, $3, $4, $5, $6, $7, NULL, $8, $9)
RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(owner_id)
.bind(req.contract_id)
.bind(&contract_phase)
.bind(&name)
.bind(&req.description)
.bind(&transcript_json)
.bind(&req.location)
.bind(&body_json)
.bind(&req.repo_file_path)
.fetch_one(pool)
.await
}
/// Get a file by ID, scoped to owner.
pub async fn get_file_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
) -> Result<Option<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
WHERE id = $1 AND owner_id = $2
"#,
)
.bind(id)
.bind(owner_id)
.fetch_optional(pool)
.await
}
/// List all files for an owner, ordered by created_at DESC.
pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
WHERE owner_id = $1
ORDER BY created_at DESC
"#,
)
.bind(owner_id)
.fetch_all(pool)
.await
}
/// Database row type for file summary with contract info
#[derive(Debug, sqlx::FromRow)]
struct FileSummaryRow {
id: Uuid,
contract_id: Option<Uuid>,
contract_name: Option<String>,
contract_phase: Option<String>,
name: String,
description: Option<String>,
#[sqlx(json)]
transcript: Vec<crate::db::models::TranscriptEntry>,
version: i32,
repo_file_path: Option<String>,
repo_sync_status: Option<String>,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
}
/// List file summaries for an owner with contract info (joined).
pub async fn list_file_summaries_for_owner(
pool: &PgPool,
owner_id: Uuid,
) -> Result<Vec<FileSummary>, sqlx::Error> {
let rows = sqlx::query_as::<_, FileSummaryRow>(
r#"
SELECT
f.id, f.contract_id, c.name as contract_name, f.contract_phase,
f.name, f.description, f.transcript, f.version,
f.repo_file_path, f.repo_sync_status, f.created_at, f.updated_at
FROM files f
LEFT JOIN contracts c ON f.contract_id = c.id
WHERE f.owner_id = $1
ORDER BY f.created_at DESC
"#,
)
.bind(owner_id)
.fetch_all(pool)
.await?;
Ok(rows
.into_iter()
.map(|row| {
let duration = row
.transcript
.iter()
.map(|t| t.end)
.fold(0.0_f32, f32::max);
FileSummary {
id: row.id,
contract_id: row.contract_id,
contract_name: row.contract_name,
contract_phase: row.contract_phase,
name: row.name,
description: row.description,
transcript_count: row.transcript.len(),
duration: if duration > 0.0 { Some(duration) } else { None },
version: row.version,
repo_file_path: row.repo_file_path,
repo_sync_status: row.repo_sync_status,
created_at: row.created_at,
updated_at: row.updated_at,
}
})
.collect())
}
/// Update a file by ID with optimistic locking, scoped to owner.
pub async fn update_file_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
req: UpdateFileRequest,
) -> Result<Option<File>, RepositoryError> {
// Get the existing file first (scoped to owner)
let existing = get_file_for_owner(pool, id, owner_id).await?;
let Some(existing) = existing else {
return Ok(None);
};
// Check version if provided (optimistic locking)
if let Some(expected_version) = req.version {
if existing.version != expected_version {
return Err(RepositoryError::VersionConflict {
expected: expected_version,
actual: existing.version,
});
}
}
// Apply updates
let name = req.name.unwrap_or(existing.name);
let description = req.description.or(existing.description);
let transcript = req.transcript.unwrap_or(existing.transcript);
let transcript_json = serde_json::to_value(&transcript).unwrap_or_default();
let summary = req.summary.or(existing.summary);
let body = req.body.unwrap_or(existing.body);
let body_json = serde_json::to_value(&body).unwrap_or_default();
// Update with version check in WHERE clause for race condition safety
let result = if req.version.is_some() {
sqlx::query_as::<_, File>(
r#"
UPDATE files
SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
WHERE id = $1 AND owner_id = $2 AND version = $8
RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(id)
.bind(owner_id)
.bind(&name)
.bind(&description)
.bind(&transcript_json)
.bind(&summary)
.bind(&body_json)
.bind(req.version.unwrap())
.fetch_optional(pool)
.await?
} else {
// No version check for internal updates
sqlx::query_as::<_, File>(
r#"
UPDATE files
SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
WHERE id = $1 AND owner_id = $2
RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(id)
.bind(owner_id)
.bind(&name)
.bind(&description)
.bind(&transcript_json)
.bind(&summary)
.bind(&body_json)
.fetch_optional(pool)
.await?
};
// If versioned update returned None, there was a race condition
if result.is_none() && req.version.is_some() {
// Re-fetch to get the actual version
if let Some(current) = get_file_for_owner(pool, id, owner_id).await? {
return Err(RepositoryError::VersionConflict {
expected: req.version.unwrap(),
actual: current.version,
});
}
}
Ok(result)
}
/// Delete a file by ID, scoped to owner.
pub async fn delete_file_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM files
WHERE id = $1 AND owner_id = $2
"#,
)
.bind(id)
.bind(owner_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
// =============================================================================
// Version History Functions
// =============================================================================
/// Set the version source for the current transaction.
/// This is used by the trigger to record who made the change.
pub async fn set_version_source(pool: &PgPool, source: &str) -> Result<(), sqlx::Error> {
sqlx::query(&format!("SET LOCAL app.version_source = '{}'", source))
.execute(pool)
.await?;
Ok(())
}
/// Set the change description for the current transaction.
pub async fn set_change_description(pool: &PgPool, description: &str) -> Result<(), sqlx::Error> {
// Escape single quotes for SQL
let escaped = description.replace('\'', "''");
sqlx::query(&format!("SET LOCAL app.change_description = '{}'", escaped))
.execute(pool)
.await?;
Ok(())
}
/// List all versions of a file, ordered by version DESC.
pub async fn list_file_versions(pool: &PgPool, file_id: Uuid) -> Result<Vec<FileVersion>, sqlx::Error> {
// First get the current version from the files table
let current = get_file(pool, file_id).await?;
let mut versions = sqlx::query_as::<_, FileVersion>(
r#"
SELECT id, file_id, version, name, description, summary, body, source, change_description, created_at
FROM file_versions
WHERE file_id = $1
ORDER BY version DESC
"#,
)
.bind(file_id)
.fetch_all(pool)
.await?;
// Add the current version as the first entry if it exists
if let Some(file) = current {
let current_version = FileVersion {
id: file.id,
file_id: file.id,
version: file.version,
name: file.name,
description: file.description,
summary: file.summary,
body: file.body,
source: "user".to_string(), // Current version source
change_description: None,
created_at: file.updated_at,
};
versions.insert(0, current_version);
}
Ok(versions)
}
/// Get a specific version of a file.
pub async fn get_file_version(
pool: &PgPool,
file_id: Uuid,
version: i32,
) -> Result<Option<FileVersion>, sqlx::Error> {
// First check if this is the current version
if let Some(file) = get_file(pool, file_id).await? {
if file.version == version {
return Ok(Some(FileVersion {
id: file.id,
file_id: file.id,
version: file.version,
name: file.name,
description: file.description,
summary: file.summary,
body: file.body,
source: "user".to_string(),
change_description: None,
created_at: file.updated_at,
}));
}
}
// Otherwise, look in the versions table
sqlx::query_as::<_, FileVersion>(
r#"
SELECT id, file_id, version, name, description, summary, body, source, change_description, created_at
FROM file_versions
WHERE file_id = $1 AND version = $2
"#,
)
.bind(file_id)
.bind(version)
.fetch_optional(pool)
.await
}
/// Restore a file to a previous version.
/// This creates a new version with the content from the target version.
pub async fn restore_file_version(
pool: &PgPool,
file_id: Uuid,
target_version: i32,
current_version: i32,
) -> Result<Option<File>, RepositoryError> {
// Get the target version content
let target = get_file_version(pool, file_id, target_version).await?;
let Some(target) = target else {
return Ok(None);
};
// Set version source and description for the trigger
set_version_source(pool, "system").await?;
set_change_description(pool, &format!("Restored from version {}", target_version)).await?;
// Update the file with the target version's content
// This will trigger the save_file_version trigger to save the current state first
let update_req = UpdateFileRequest {
name: Some(target.name),
description: target.description,
transcript: None,
summary: target.summary,
body: Some(target.body),
version: Some(current_version),
repo_file_path: None,
};
update_file(pool, file_id, update_req).await
}
/// Count versions for a file.
pub async fn count_file_versions(pool: &PgPool, file_id: Uuid) -> Result<i64, sqlx::Error> {
let result: (i64,) = sqlx::query_as(
"SELECT COUNT(*) + 1 FROM file_versions WHERE file_id = $1", // +1 for current version
)
.bind(file_id)
.fetch_one(pool)
.await?;
Ok(result.0)
}
// =============================================================================
// Task Functions
// =============================================================================
/// Create a new task.
///
/// If creating a subtask (parent_task_id is set) and repository settings are not provided,
/// the subtask will inherit repository_url, base_branch, target_branch, merge_mode,
/// and target_repo_path from the parent task. Depth is calculated from parent and limited
/// to max 1 (2 levels: orchestrator at depth 0, subtasks at depth 1).
///
/// NOTE: completion_action is NOT inherited - subtasks should not auto-merge unless
/// explicitly configured. The supervisor controls when completion steps happen.
///
/// Task spawning is now controlled by supervisors at the application level.
/// Depth is no longer constrained in the database.
pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, sqlx::Error> {
// Calculate depth and inherit settings from parent if applicable
let (depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
if let Some(parent_id) = req.parent_task_id {
// Fetch parent task to get depth and inherit settings
let parent = get_task(pool, parent_id).await?
.ok_or_else(|| sqlx::Error::RowNotFound)?;
let new_depth = parent.depth + 1;
// Subtasks inherit contract_id from parent
let contract_id = parent.contract_id.unwrap_or(req.contract_id);
// Inherit repo settings if not provided
let repo_url = req.repository_url.clone().or(parent.repository_url);
let base_branch = req.base_branch.clone().or(parent.base_branch);
let target_branch = req.target_branch.clone().or(parent.target_branch);
let merge_mode = req.merge_mode.clone().or(parent.merge_mode);
let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path);
// NOTE: completion_action is NOT inherited - subtasks should not auto-merge.
// The supervisor integrates subtask work from their worktrees.
let completion_action = req.completion_action.clone();
(new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
} else {
// Top-level task: depth 0, use contract_id from request
(
0,
req.contract_id,
req.repository_url.clone(),
req.base_branch.clone(),
req.target_branch.clone(),
req.merge_mode.clone(),
req.target_repo_path.clone(),
req.completion_action.clone(),
)
};
let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default());
sqlx::query_as::<_, Task>(
r#"
INSERT INTO tasks (
contract_id, parent_task_id, depth, name, description, plan, priority,
is_supervisor, repository_url, base_branch, target_branch, merge_mode,
target_repo_path, completion_action, continue_from_task_id, copy_files
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
RETURNING *
"#,
)
.bind(contract_id)
.bind(req.parent_task_id)
.bind(depth)
.bind(&req.name)
.bind(&req.description)
.bind(&req.plan)
.bind(req.priority)
.bind(req.is_supervisor)
.bind(&repo_url)
.bind(&base_branch)
.bind(&target_branch)
.bind(&merge_mode)
.bind(&target_repo_path)
.bind(&completion_action)
.bind(&req.continue_from_task_id)
.bind(©_files_json)
.fetch_one(pool)
.await
}
/// Get a task by ID.
pub async fn get_task(pool: &PgPool, id: Uuid) -> Result<Option<Task>, sqlx::Error> {
sqlx::query_as::<_, Task>(
r#"
SELECT *
FROM tasks
WHERE id = $1
"#,
)
.bind(id)
.fetch_optional(pool)
.await
}
/// List all top-level tasks (no parent), ordered by created_at DESC.
pub async fn list_tasks(pool: &PgPool) -> Result<Vec<TaskSummary>, sqlx::Error> {
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
t.version, t.is_supervisor, t.created_at, t.updated_at
FROM tasks t
LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id IS NULL
ORDER BY t.priority DESC, t.created_at DESC
"#,
)
.fetch_all(pool)
.await
}
/// List subtasks of a parent task.
pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSummary>, sqlx::Error> {
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
t.version, t.is_supervisor, t.created_at, t.updated_at
FROM tasks t
LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id = $1
ORDER BY t.priority DESC, t.created_at DESC
"#,
)
.bind(parent_id)
.fetch_all(pool)
.await
}
/// List all tasks in a contract (for supervisor tree view).
pub async fn list_tasks_by_contract(
pool: &PgPool,
contract_id: Uuid,
owner_id: Uuid,
) -> Result<Vec<Task>, sqlx::Error> {
sqlx::query_as::<_, Task>(
r#"
SELECT * FROM tasks
WHERE contract_id = $1 AND owner_id = $2
ORDER BY is_supervisor DESC, depth ASC, created_at ASC
"#,
)
.bind(contract_id)
.bind(owner_id)
.fetch_all(pool)
.await
}
/// Update a task by ID with optimistic locking.
pub async fn update_task(
pool: &PgPool,
id: Uuid,
req: UpdateTaskRequest,
) -> Result<Option<Task>, RepositoryError> {
// Get the existing task first
let existing = get_task(pool, id).await?;
let Some(existing) = existing else {
return Ok(None);
};
// Check version if provided (optimistic locking)
if let Some(expected_version) = req.version {
if existing.version != expected_version {
return Err(RepositoryError::VersionConflict {
expected: expected_version,
actual: existing.version,
});
}
}
// Apply updates
let name = req.name.unwrap_or(existing.name);
let description = req.description.or(existing.description);
let plan = req.plan.unwrap_or(existing.plan);
let status = req.status.unwrap_or(existing.status);
let priority = req.priority.unwrap_or(existing.priority);
let progress_summary = req.progress_summary.or(existing.progress_summary);
let last_output = req.last_output.or(existing.last_output);
let error_message = req.error_message.or(existing.error_message);
let merge_mode = req.merge_mode.or(existing.merge_mode);
let pr_url = req.pr_url.or(existing.pr_url);
let target_repo_path = req.target_repo_path.or(existing.target_repo_path);
let completion_action = req.completion_action.or(existing.completion_action);
// Handle clear_daemon_id: if true, set to NULL; otherwise use provided value or keep existing
let daemon_id = if req.clear_daemon_id {
None
} else {
req.daemon_id.or(existing.daemon_id)
};
// Update with version check in WHERE clause for race condition safety
let result = if req.version.is_some() {
sqlx::query_as::<_, Task>(
r#"
UPDATE tasks
SET name = $2, description = $3, plan = $4, status = $5, priority = $6,
progress_summary = $7, last_output = $8, error_message = $9,
merge_mode = $10, pr_url = $11, daemon_id = $12,
target_repo_path = $13, completion_action = $14, updated_at = NOW()
WHERE id = $1 AND version = $15
RETURNING *
"#,
)
.bind(id)
.bind(&name)
.bind(&description)
.bind(&plan)
.bind(&status)
.bind(priority)
.bind(&progress_summary)
.bind(&last_output)
.bind(&error_message)
.bind(&merge_mode)
.bind(&pr_url)
.bind(daemon_id)
.bind(&target_repo_path)
.bind(&completion_action)
.bind(req.version.unwrap())
.fetch_optional(pool)
.await?
} else {
sqlx::query_as::<_, Task>(
r#"
UPDATE tasks
SET name = $2, description = $3, plan = $4, status = $5, priority = $6,
progress_summary = $7, last_output = $8, error_message = $9,
merge_mode = $10, pr_url = $11, daemon_id = $12,
target_repo_path = $13, completion_action = $14, updated_at = NOW()
WHERE id = $1
RETURNING *
"#,
)
.bind(id)
.bind(&name)
.bind(&description)
.bind(&plan)
.bind(&status)
.bind(priority)
.bind(&progress_summary)
.bind(&last_output)
.bind(&error_message)
.bind(&merge_mode)
.bind(&pr_url)
.bind(daemon_id)
.bind(&target_repo_path)
.bind(&completion_action)
.fetch_optional(pool)
.await?
};
// If versioned update returned None, there was a race condition
if result.is_none() && req.version.is_some() {
if let Some(current) = get_task(pool, id).await? {
return Err(RepositoryError::VersionConflict {
expected: req.version.unwrap(),
actual: current.version,
});
}
}
Ok(result)
}
/// Delete a task by ID.
pub async fn delete_task(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM tasks
WHERE id = $1
"#,
)
.bind(id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Count total tasks.
pub async fn count_tasks(pool: &PgPool) -> Result<i64, sqlx::Error> {
let result: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM tasks WHERE parent_task_id IS NULL",
)
.fetch_one(pool)
.await?;
Ok(result.0)
}
// =============================================================================
// Owner-Scoped Task Functions
// =============================================================================
/// Create a new task for a specific owner.
pub async fn create_task_for_owner(
pool: &PgPool,
owner_id: Uuid,
req: CreateTaskRequest,
) -> Result<Task, sqlx::Error> {
// Calculate depth and inherit settings from parent if applicable
let (depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
if let Some(parent_id) = req.parent_task_id {
// Fetch parent task to get depth and inherit settings (must belong to same owner)
let parent = get_task_for_owner(pool, parent_id, owner_id).await?
.ok_or_else(|| sqlx::Error::RowNotFound)?;
let new_depth = parent.depth + 1;
// Validate max depth
if new_depth >= 2 {
return Err(sqlx::Error::Protocol(format!(
"Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.",
new_depth
)));
}
// Subtasks inherit contract_id from parent
let contract_id = parent.contract_id.unwrap_or(req.contract_id);
// Inherit repo settings if not provided
let repo_url = req.repository_url.clone().or(parent.repository_url);
let base_branch = req.base_branch.clone().or(parent.base_branch);
let target_branch = req.target_branch.clone().or(parent.target_branch);
let merge_mode = req.merge_mode.clone().or(parent.merge_mode);
let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path);
// NOTE: completion_action is NOT inherited - subtasks should not auto-merge.
// The orchestrator integrates subtask work from their worktrees.
let completion_action = req.completion_action.clone();
(new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
} else {
// Top-level task: depth 0, use contract_id from request
(
0,
req.contract_id,
req.repository_url.clone(),
req.base_branch.clone(),
req.target_branch.clone(),
req.merge_mode.clone(),
req.target_repo_path.clone(),
req.completion_action.clone(),
)
};
let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default());
sqlx::query_as::<_, Task>(
r#"
INSERT INTO tasks (
owner_id, contract_id, parent_task_id, depth, name, description, plan, priority,
is_supervisor, repository_url, base_branch, target_branch, merge_mode,
target_repo_path, completion_action, continue_from_task_id, copy_files
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)
RETURNING *
"#,
)
.bind(owner_id)
.bind(contract_id)
.bind(req.parent_task_id)
.bind(depth)
.bind(&req.name)
.bind(&req.description)
.bind(&req.plan)
.bind(req.priority)
.bind(req.is_supervisor)
.bind(&repo_url)
.bind(&base_branch)
.bind(&target_branch)
.bind(&merge_mode)
.bind(&target_repo_path)
.bind(&completion_action)
.bind(&req.continue_from_task_id)
.bind(©_files_json)
.fetch_one(pool)
.await
}
/// Get a task by ID, scoped to owner.
pub async fn get_task_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
) -> Result<Option<Task>, sqlx::Error> {
sqlx::query_as::<_, Task>(
r#"
SELECT *
FROM tasks
WHERE id = $1 AND owner_id = $2
"#,
)
.bind(id)
.bind(owner_id)
.fetch_optional(pool)
.await
}
/// List all top-level tasks (no parent) for an owner, ordered by created_at DESC.
pub async fn list_tasks_for_owner(
pool: &PgPool,
owner_id: Uuid,
) -> Result<Vec<TaskSummary>, sqlx::Error> {
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
t.version, t.is_supervisor, t.created_at, t.updated_at
FROM tasks t
LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.owner_id = $1 AND t.parent_task_id IS NULL
ORDER BY t.priority DESC, t.created_at DESC
"#,
)
.bind(owner_id)
.fetch_all(pool)
.await
}
/// List subtasks of a parent task, scoped to owner.
pub async fn list_subtasks_for_owner(
pool: &PgPool,
parent_id: Uuid,
owner_id: Uuid,
) -> Result<Vec<TaskSummary>, sqlx::Error> {
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
t.version, t.is_supervisor, t.created_at, t.updated_at
FROM tasks t
LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.owner_id = $1 AND t.parent_task_id = $2
ORDER BY t.priority DESC, t.created_at DESC
"#,
)
.bind(owner_id)
.bind(parent_id)
.fetch_all(pool)
.await
}
/// Update a task by ID with optimistic locking, scoped to owner.
pub async fn update_task_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
req: UpdateTaskRequest,
) -> Result<Option<Task>, RepositoryError> {
// Get the existing task first (scoped to owner)
let existing = get_task_for_owner(pool, id, owner_id).await?;
let Some(existing) = existing else {
return Ok(None);
};
// Check version if provided (optimistic locking)
if let Some(expected_version) = req.version {
if existing.version != expected_version {
return Err(RepositoryError::VersionConflict {
expected: expected_version,
actual: existing.version,
});
}
}
// Apply updates
let name = req.name.unwrap_or(existing.name);
let description = req.description.or(existing.description);
let plan = req.plan.unwrap_or(existing.plan);
let status = req.status.unwrap_or(existing.status);
let priority = req.priority.unwrap_or(existing.priority);
let progress_summary = req.progress_summary.or(existing.progress_summary);
let last_output = req.last_output.or(existing.last_output);
let error_message = req.error_message.or(existing.error_message);
let merge_mode = req.merge_mode.or(existing.merge_mode);
let pr_url = req.pr_url.or(existing.pr_url);
let repository_url = req.repository_url.or(existing.repository_url);
let target_repo_path = req.target_repo_path.or(existing.target_repo_path);
let completion_action = req.completion_action.or(existing.completion_action);
let daemon_id = if req.clear_daemon_id {
None
} else {
req.daemon_id.or(existing.daemon_id)
};
// Update with version check in WHERE clause for race condition safety
let result = if req.version.is_some() {
sqlx::query_as::<_, Task>(
r#"
UPDATE tasks
SET name = $3, description = $4, plan = $5, status = $6, priority = $7,
progress_summary = $8, last_output = $9, error_message = $10,
merge_mode = $11, pr_url = $12, daemon_id = $13,
target_repo_path = $14, completion_action = $15, repository_url = $16,
updated_at = NOW()
WHERE id = $1 AND owner_id = $2 AND version = $17
RETURNING *
"#,
)
.bind(id)
.bind(owner_id)
.bind(&name)
.bind(&description)
.bind(&plan)
.bind(&status)
.bind(priority)
.bind(&progress_summary)
.bind(&last_output)
.bind(&error_message)
.bind(&merge_mode)
.bind(&pr_url)
.bind(daemon_id)
.bind(&target_repo_path)
.bind(&completion_action)
.bind(&repository_url)
.bind(req.version.unwrap())
.fetch_optional(pool)
.await?
} else {
sqlx::query_as::<_, Task>(
r#"
UPDATE tasks
SET name = $3, description = $4, plan = $5, status = $6, priority = $7,
progress_summary = $8, last_output = $9, error_message = $10,
merge_mode = $11, pr_url = $12, daemon_id = $13,
target_repo_path = $14, completion_action = $15, repository_url = $16,
updated_at = NOW()
WHERE id = $1 AND owner_id = $2
RETURNING *
"#,
)
.bind(id)
.bind(owner_id)
.bind(&name)
.bind(&description)
.bind(&plan)
.bind(&status)
.bind(priority)
.bind(&progress_summary)
.bind(&last_output)
.bind(&error_message)
.bind(&merge_mode)
.bind(&pr_url)
.bind(daemon_id)
.bind(&target_repo_path)
.bind(&completion_action)
.bind(&repository_url)
.fetch_optional(pool)
.await?
};
// If versioned update returned None, there was a race condition
if result.is_none() && req.version.is_some() {
if let Some(current) = get_task_for_owner(pool, id, owner_id).await? {
return Err(RepositoryError::VersionConflict {
expected: req.version.unwrap(),
actual: current.version,
});
}
}
Ok(result)
}
/// Delete a task by ID, scoped to owner.
pub async fn delete_task_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM tasks
WHERE id = $1 AND owner_id = $2
"#,
)
.bind(id)
.bind(owner_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Update task status and record event.
pub async fn update_task_status(
pool: &PgPool,
id: Uuid,
new_status: &str,
event_data: Option<serde_json::Value>,
) -> Result<Option<Task>, sqlx::Error> {
// Get existing status
let existing = get_task(pool, id).await?;
let Some(existing) = existing else {
return Ok(None);
};
let previous_status = existing.status.clone();
// Update task status
let task = sqlx::query_as::<_, Task>(
r#"
UPDATE tasks
SET status = $2, updated_at = NOW(),
started_at = CASE WHEN $2 = 'running' AND started_at IS NULL THEN NOW() ELSE started_at END,
completed_at = CASE WHEN $2 IN ('done', 'failed', 'merged') THEN NOW() ELSE completed_at END
WHERE id = $1
RETURNING *
"#,
)
.bind(id)
.bind(new_status)
.fetch_optional(pool)
.await?;
// Record event
if task.is_some() {
let _ = create_task_event(
pool,
id,
"status_change",
Some(&previous_status),
Some(new_status),
event_data,
)
.await;
}
Ok(task)
}
// =============================================================================
// Task Event Functions
// =============================================================================
/// Create a task event.
pub async fn create_task_event(
pool: &PgPool,
task_id: Uuid,
event_type: &str,
previous_status: Option<&str>,
new_status: Option<&str>,
event_data: Option<serde_json::Value>,
) -> Result<TaskEvent, sqlx::Error> {
sqlx::query_as::<_, TaskEvent>(
r#"
INSERT INTO task_events (task_id, event_type, previous_status, new_status, event_data)
VALUES ($1, $2, $3, $4, $5)
RETURNING *
"#,
)
.bind(task_id)
.bind(event_type)
.bind(previous_status)
.bind(new_status)
.bind(event_data)
.fetch_one(pool)
.await
}
/// List events for a task.
pub async fn list_task_events(
pool: &PgPool,
task_id: Uuid,
limit: Option<i64>,
) -> Result<Vec<TaskEvent>, sqlx::Error> {
let limit = limit.unwrap_or(100);
sqlx::query_as::<_, TaskEvent>(
r#"
SELECT *
FROM task_events
WHERE task_id = $1
ORDER BY created_at DESC
LIMIT $2
"#,
)
.bind(task_id)
.bind(limit)
.fetch_all(pool)
.await
}
// =============================================================================
// Daemon Functions
// =============================================================================
/// Register a new daemon connection.
pub async fn register_daemon(
pool: &PgPool,
owner_id: Uuid,
connection_id: &str,
hostname: Option<&str>,
machine_id: Option<&str>,
max_concurrent_tasks: i32,
) -> Result<Daemon, sqlx::Error> {
sqlx::query_as::<_, Daemon>(
r#"
INSERT INTO daemons (owner_id, connection_id, hostname, machine_id, max_concurrent_tasks)
VALUES ($1, $2, $3, $4, $5)
RETURNING *
"#,
)
.bind(owner_id)
.bind(connection_id)
.bind(hostname)
.bind(machine_id)
.bind(max_concurrent_tasks)
.fetch_one(pool)
.await
}
/// Get a daemon by ID.
pub async fn get_daemon(pool: &PgPool, id: Uuid) -> Result<Option<Daemon>, sqlx::Error> {
sqlx::query_as::<_, Daemon>(
r#"
SELECT *
FROM daemons
WHERE id = $1
"#,
)
.bind(id)
.fetch_optional(pool)
.await
}
/// Get a daemon by connection ID.
pub async fn get_daemon_by_connection(
pool: &PgPool,
connection_id: &str,
) -> Result<Option<Daemon>, sqlx::Error> {
sqlx::query_as::<_, Daemon>(
r#"
SELECT *
FROM daemons
WHERE connection_id = $1
"#,
)
.bind(connection_id)
.fetch_optional(pool)
.await
}
/// List all daemons.
pub async fn list_daemons(pool: &PgPool) -> Result<Vec<Daemon>, sqlx::Error> {
sqlx::query_as::<_, Daemon>(
r#"
SELECT *
FROM daemons
ORDER BY connected_at DESC
"#,
)
.fetch_all(pool)
.await
}
/// List daemons for a specific owner.
pub async fn list_daemons_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<Daemon>, sqlx::Error> {
sqlx::query_as::<_, Daemon>(
r#"
SELECT *
FROM daemons
WHERE owner_id = $1
ORDER BY connected_at DESC
"#,
)
.bind(owner_id)
.fetch_all(pool)
.await
}
/// Get a daemon by ID for a specific owner.
pub async fn get_daemon_for_owner(pool: &PgPool, id: Uuid, owner_id: Uuid) -> Result<Option<Daemon>, sqlx::Error> {
sqlx::query_as::<_, Daemon>(
r#"
SELECT *
FROM daemons
WHERE id = $1 AND owner_id = $2
"#,
)
.bind(id)
.bind(owner_id)
.fetch_optional(pool)
.await
}
/// Update daemon heartbeat.
pub async fn update_daemon_heartbeat(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
UPDATE daemons
SET last_heartbeat_at = NOW(), status = 'connected'
WHERE id = $1
"#,
)
.bind(id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Update daemon status.
pub async fn update_daemon_status(
pool: &PgPool,
id: Uuid,
status: &str,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
UPDATE daemons
SET status = $2,
disconnected_at = CASE WHEN $2 = 'disconnected' THEN NOW() ELSE disconnected_at END
WHERE id = $1
"#,
)
.bind(id)
.bind(status)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Mark daemon as disconnected by connection_id.
pub async fn disconnect_daemon_by_connection(
pool: &PgPool,
connection_id: &str,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
UPDATE daemons
SET status = 'disconnected',
disconnected_at = NOW()
WHERE connection_id = $1
"#,
)
.bind(connection_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Update daemon task count.
pub async fn update_daemon_task_count(
pool: &PgPool,
id: Uuid,
delta: i32,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
UPDATE daemons
SET current_task_count = GREATEST(0, current_task_count + $2)
WHERE id = $1
"#,
)
.bind(id)
.bind(delta)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Delete a daemon by ID.
pub async fn delete_daemon(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM daemons
WHERE id = $1
"#,
)
.bind(id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Delete a daemon by connection ID.
pub async fn delete_daemon_by_connection(
pool: &PgPool,
connection_id: &str,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM daemons
WHERE connection_id = $1
"#,
)
.bind(connection_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Count connected daemons.
pub async fn count_daemons(pool: &PgPool) -> Result<i64, sqlx::Error> {
let result: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM daemons WHERE status = 'connected'",
)
.fetch_one(pool)
.await?;
Ok(result.0)
}
/// Delete stale daemons that haven't sent a heartbeat within the timeout.
/// Returns the number of deleted daemons.
pub async fn delete_stale_daemons(
pool: &PgPool,
timeout_seconds: i64,
) -> Result<u64, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM daemons
WHERE last_heartbeat_at < NOW() - INTERVAL '1 second' * $1
"#,
)
.bind(timeout_seconds)
.execute(pool)
.await?;
Ok(result.rows_affected())
}
// =============================================================================
// Sibling Awareness Functions
// =============================================================================
/// List sibling tasks (tasks with the same parent, excluding the given task).
pub async fn list_sibling_tasks(
pool: &PgPool,
task_id: Uuid,
parent_id: Option<Uuid>,
) -> Result<Vec<TaskSummary>, sqlx::Error> {
match parent_id {
Some(parent) => {
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
t.version, t.is_supervisor, t.created_at, t.updated_at
FROM tasks t
LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id = $1 AND t.id != $2
ORDER BY t.priority DESC, t.created_at DESC
"#,
)
.bind(parent)
.bind(task_id)
.fetch_all(pool)
.await
}
None => {
// Top-level tasks (no parent) - siblings are other top-level tasks
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
t.version, t.is_supervisor, t.created_at, t.updated_at
FROM tasks t
LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id IS NULL AND t.id != $1
ORDER BY t.priority DESC, t.created_at DESC
"#,
)
.bind(task_id)
.fetch_all(pool)
.await
}
}
}
/// Get running sibling tasks (for context injection).
pub async fn get_running_siblings(
pool: &PgPool,
owner_id: Uuid,
task_id: Uuid,
parent_id: Option<Uuid>,
) -> Result<Vec<Task>, sqlx::Error> {
match parent_id {
Some(parent) => {
sqlx::query_as::<_, Task>(
r#"
SELECT *
FROM tasks t
WHERE t.owner_id = $1
AND t.parent_task_id = $2
AND t.id != $3
AND t.status = 'running'
ORDER BY t.priority DESC
"#,
)
.bind(owner_id)
.bind(parent)
.bind(task_id)
.fetch_all(pool)
.await
}
None => {
sqlx::query_as::<_, Task>(
r#"
SELECT *
FROM tasks t
WHERE t.owner_id = $1
AND t.parent_task_id IS NULL
AND t.id != $2
AND t.status = 'running'
ORDER BY t.priority DESC
"#,
)
.bind(owner_id)
.bind(task_id)
.fetch_all(pool)
.await
}
}
}
/// Get task with its siblings for context awareness.
pub async fn get_task_with_siblings(
pool: &PgPool,
id: Uuid,
) -> Result<Option<(Task, Vec<TaskSummary>)>, sqlx::Error> {
let task = get_task(pool, id).await?;
let Some(task) = task else {
return Ok(None);
};
let siblings = list_sibling_tasks(pool, id, task.parent_task_id).await?;
Ok(Some((task, siblings)))
}
// =============================================================================
// Task Output Persistence Functions
// =============================================================================
/// Save task output to the database.
/// This stores output in the task_events table with event_type='output'.
pub async fn save_task_output(
pool: &PgPool,
task_id: Uuid,
message_type: &str,
content: &str,
tool_name: Option<&str>,
tool_input: Option<serde_json::Value>,
is_error: Option<bool>,
cost_usd: Option<f64>,
duration_ms: Option<u64>,
) -> Result<TaskEvent, sqlx::Error> {
let event_data = serde_json::json!({
"messageType": message_type,
"content": content,
"toolName": tool_name,
"toolInput": tool_input,
"isError": is_error,
"costUsd": cost_usd,
"durationMs": duration_ms,
});
create_task_event(pool, task_id, "output", None, None, Some(event_data)).await
}
/// Get task output from the database.
/// Retrieves all output events for a task, ordered by creation time.
pub async fn get_task_output(
pool: &PgPool,
task_id: Uuid,
limit: Option<i64>,
) -> Result<Vec<TaskEvent>, sqlx::Error> {
let limit = limit.unwrap_or(1000);
sqlx::query_as::<_, TaskEvent>(
r#"
SELECT *
FROM task_events
WHERE task_id = $1 AND event_type = 'output'
ORDER BY created_at ASC
LIMIT $2
"#,
)
.bind(task_id)
.bind(limit)
.fetch_all(pool)
.await
}
/// Update task completion status with error message.
/// Sets the task status to 'done' or 'failed' and records completion time.
pub async fn complete_task(
pool: &PgPool,
task_id: Uuid,
success: bool,
error_message: Option<&str>,
) -> Result<Option<Task>, sqlx::Error> {
let status = if success { "done" } else { "failed" };
let task = sqlx::query_as::<_, Task>(
r#"
UPDATE tasks
SET status = $2,
error_message = COALESCE($3, error_message),
completed_at = NOW(),
updated_at = NOW()
WHERE id = $1
RETURNING *
"#,
)
.bind(task_id)
.bind(status)
.bind(error_message)
.fetch_optional(pool)
.await?;
// Record completion event
if task.is_some() {
let event_data = serde_json::json!({
"success": success,
"errorMessage": error_message,
});
let _ = create_task_event(
pool,
task_id,
"complete",
Some("running"),
Some(status),
Some(event_data),
)
.await;
}
Ok(task)
}
// =============================================================================
// Mesh Chat History Functions
// =============================================================================
/// Get or create the active conversation for an owner.
pub async fn get_or_create_active_conversation(
pool: &PgPool,
owner_id: Uuid,
) -> Result<MeshChatConversation, sqlx::Error> {
// Try to get existing active conversation for this owner
let existing = sqlx::query_as::<_, MeshChatConversation>(
r#"
SELECT *
FROM mesh_chat_conversations
WHERE is_active = true AND owner_id = $1
LIMIT 1
"#,
)
.bind(owner_id)
.fetch_optional(pool)
.await?;
if let Some(conv) = existing {
return Ok(conv);
}
// Create new conversation
sqlx::query_as::<_, MeshChatConversation>(
r#"
INSERT INTO mesh_chat_conversations (owner_id, is_active)
VALUES ($1, true)
RETURNING *
"#,
)
.bind(owner_id)
.fetch_one(pool)
.await
}
/// List messages for a conversation.
pub async fn list_chat_messages(
pool: &PgPool,
conversation_id: Uuid,
limit: Option<i32>,
) -> Result<Vec<MeshChatMessageRecord>, sqlx::Error> {
let limit = limit.unwrap_or(100);
sqlx::query_as::<_, MeshChatMessageRecord>(
r#"
SELECT *
FROM mesh_chat_messages
WHERE conversation_id = $1
ORDER BY created_at ASC
LIMIT $2
"#,
)
.bind(conversation_id)
.bind(limit)
.fetch_all(pool)
.await
}
/// Add a message to a conversation.
#[allow(clippy::too_many_arguments)]
pub async fn add_chat_message(
pool: &PgPool,
conversation_id: Uuid,
role: &str,
content: &str,
context_type: &str,
context_task_id: Option<Uuid>,
tool_calls: Option<serde_json::Value>,
pending_questions: Option<serde_json::Value>,
) -> Result<MeshChatMessageRecord, sqlx::Error> {
sqlx::query_as::<_, MeshChatMessageRecord>(
r#"
INSERT INTO mesh_chat_messages
(conversation_id, role, content, context_type, context_task_id, tool_calls, pending_questions)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING *
"#,
)
.bind(conversation_id)
.bind(role)
.bind(content)
.bind(context_type)
.bind(context_task_id)
.bind(tool_calls)
.bind(pending_questions)
.fetch_one(pool)
.await
}
/// Clear conversation (archive existing and create new).
pub async fn clear_conversation(pool: &PgPool, owner_id: Uuid) -> Result<MeshChatConversation, sqlx::Error> {
// Mark existing as inactive for this owner
sqlx::query(
r#"
UPDATE mesh_chat_conversations
SET is_active = false, updated_at = NOW()
WHERE is_active = true AND owner_id = $1
"#,
)
.bind(owner_id)
.execute(pool)
.await?;
// Create new active conversation
get_or_create_active_conversation(pool, owner_id).await
}
// =============================================================================
// Contract Chat History Functions
// =============================================================================
/// Get or create the active conversation for a contract.
pub async fn get_or_create_contract_conversation(
pool: &PgPool,
contract_id: Uuid,
owner_id: Uuid,
) -> Result<ContractChatConversation, sqlx::Error> {
// Try to get existing active conversation for this contract
let existing = sqlx::query_as::<_, ContractChatConversation>(
r#"
SELECT *
FROM contract_chat_conversations
WHERE is_active = true AND contract_id = $1 AND owner_id = $2
LIMIT 1
"#,
)
.bind(contract_id)
.bind(owner_id)
.fetch_optional(pool)
.await?;
if let Some(conv) = existing {
return Ok(conv);
}
// Create new conversation
sqlx::query_as::<_, ContractChatConversation>(
r#"
INSERT INTO contract_chat_conversations (contract_id, owner_id, is_active)
VALUES ($1, $2, true)
RETURNING *
"#,
)
.bind(contract_id)
.bind(owner_id)
.fetch_one(pool)
.await
}
/// List messages for a contract conversation.
pub async fn list_contract_chat_messages(
pool: &PgPool,
conversation_id: Uuid,
limit: Option<i32>,
) -> Result<Vec<ContractChatMessageRecord>, sqlx::Error> {
let limit = limit.unwrap_or(100);
sqlx::query_as::<_, ContractChatMessageRecord>(
r#"
SELECT *
FROM contract_chat_messages
WHERE conversation_id = $1
ORDER BY created_at ASC
LIMIT $2
"#,
)
.bind(conversation_id)
.bind(limit)
.fetch_all(pool)
.await
}
/// Add a message to a contract conversation.
pub async fn add_contract_chat_message(
pool: &PgPool,
conversation_id: Uuid,
role: &str,
content: &str,
tool_calls: Option<serde_json::Value>,
pending_questions: Option<serde_json::Value>,
) -> Result<ContractChatMessageRecord, sqlx::Error> {
sqlx::query_as::<_, ContractChatMessageRecord>(
r#"
INSERT INTO contract_chat_messages
(conversation_id, role, content, tool_calls, pending_questions)
VALUES ($1, $2, $3, $4, $5)
RETURNING *
"#,
)
.bind(conversation_id)
.bind(role)
.bind(content)
.bind(tool_calls)
.bind(pending_questions)
.fetch_one(pool)
.await
}
/// Clear contract conversation (archive existing and create new).
pub async fn clear_contract_conversation(
pool: &PgPool,
contract_id: Uuid,
owner_id: Uuid,
) -> Result<ContractChatConversation, sqlx::Error> {
// Mark existing as inactive for this contract
sqlx::query(
r#"
UPDATE contract_chat_conversations
SET is_active = false, updated_at = NOW()
WHERE is_active = true AND contract_id = $1 AND owner_id = $2
"#,
)
.bind(contract_id)
.bind(owner_id)
.execute(pool)
.await?;
// Create new active conversation
get_or_create_contract_conversation(pool, contract_id, owner_id).await
}
// =============================================================================
// Contract Functions (Owner-Scoped)
// =============================================================================
/// Create a new contract for a specific owner.
pub async fn create_contract_for_owner(
pool: &PgPool,
owner_id: Uuid,
req: CreateContractRequest,
) -> Result<Contract, sqlx::Error> {
// Use provided initial_phase or default to "research"
let phase = req.initial_phase.as_deref().unwrap_or("research");
// Validate the phase
let valid_phases = ["research", "specify", "plan", "execute", "review"];
if !valid_phases.contains(&phase) {
return Err(sqlx::Error::Protocol(format!(
"Invalid initial_phase '{}'. Must be one of: {}",
phase,
valid_phases.join(", ")
)));
}
sqlx::query_as::<_, Contract>(
r#"
INSERT INTO contracts (owner_id, name, description, phase)
VALUES ($1, $2, $3, $4)
RETURNING *
"#,
)
.bind(owner_id)
.bind(&req.name)
.bind(&req.description)
.bind(phase)
.fetch_one(pool)
.await
}
/// Get a contract by ID, scoped to owner.
pub async fn get_contract_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
) -> Result<Option<Contract>, sqlx::Error> {
sqlx::query_as::<_, Contract>(
r#"
SELECT *
FROM contracts
WHERE id = $1 AND owner_id = $2
"#,
)
.bind(id)
.bind(owner_id)
.fetch_optional(pool)
.await
}
/// List all contracts for an owner, ordered by created_at DESC.
pub async fn list_contracts_for_owner(
pool: &PgPool,
owner_id: Uuid,
) -> Result<Vec<ContractSummary>, sqlx::Error> {
sqlx::query_as::<_, ContractSummary>(
r#"
SELECT
c.id, c.name, c.description, c.phase, c.status,
c.version, c.created_at,
(SELECT COUNT(*) FROM files WHERE contract_id = c.id) as file_count,
(SELECT COUNT(*) FROM tasks WHERE contract_id = c.id) as task_count,
(SELECT COUNT(*) FROM contract_repositories WHERE contract_id = c.id) as repository_count
FROM contracts c
WHERE c.owner_id = $1
ORDER BY c.created_at DESC
"#,
)
.bind(owner_id)
.fetch_all(pool)
.await
}
/// Get contract summary by ID.
pub async fn get_contract_summary_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
) -> Result<Option<ContractSummary>, sqlx::Error> {
sqlx::query_as::<_, ContractSummary>(
r#"
SELECT
c.id, c.name, c.description, c.phase, c.status,
c.version, c.created_at,
(SELECT COUNT(*) FROM files WHERE contract_id = c.id) as file_count,
(SELECT COUNT(*) FROM tasks WHERE contract_id = c.id) as task_count,
(SELECT COUNT(*) FROM contract_repositories WHERE contract_id = c.id) as repository_count
FROM contracts c
WHERE c.id = $1 AND c.owner_id = $2
"#,
)
.bind(id)
.bind(owner_id)
.fetch_optional(pool)
.await
}
/// Update a contract by ID with optimistic locking, scoped to owner.
pub async fn update_contract_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
req: UpdateContractRequest,
) -> Result<Option<Contract>, RepositoryError> {
let existing = get_contract_for_owner(pool, id, owner_id).await?;
let Some(existing) = existing else {
return Ok(None);
};
// Check version if provided (optimistic locking)
if let Some(expected_version) = req.version {
if existing.version != expected_version {
return Err(RepositoryError::VersionConflict {
expected: expected_version,
actual: existing.version,
});
}
}
// Apply updates
let name = req.name.unwrap_or(existing.name);
let description = req.description.or(existing.description);
let phase = req.phase.unwrap_or(existing.phase);
let status = req.status.unwrap_or(existing.status);
let supervisor_task_id = req.supervisor_task_id.or(existing.supervisor_task_id);
let result = if req.version.is_some() {
sqlx::query_as::<_, Contract>(
r#"
UPDATE contracts
SET name = $3, description = $4, phase = $5, status = $6,
supervisor_task_id = $7, version = version + 1, updated_at = NOW()
WHERE id = $1 AND owner_id = $2 AND version = $8
RETURNING *
"#,
)
.bind(id)
.bind(owner_id)
.bind(&name)
.bind(&description)
.bind(&phase)
.bind(&status)
.bind(supervisor_task_id)
.bind(req.version.unwrap())
.fetch_optional(pool)
.await?
} else {
sqlx::query_as::<_, Contract>(
r#"
UPDATE contracts
SET name = $3, description = $4, phase = $5, status = $6,
supervisor_task_id = $7, version = version + 1, updated_at = NOW()
WHERE id = $1 AND owner_id = $2
RETURNING *
"#,
)
.bind(id)
.bind(owner_id)
.bind(&name)
.bind(&description)
.bind(&phase)
.bind(&status)
.bind(supervisor_task_id)
.fetch_optional(pool)
.await?
};
// If versioned update returned None, there was a race condition
if result.is_none() && req.version.is_some() {
if let Some(current) = get_contract_for_owner(pool, id, owner_id).await? {
return Err(RepositoryError::VersionConflict {
expected: req.version.unwrap(),
actual: current.version,
});
}
}
Ok(result)
}
/// Delete a contract by ID, scoped to owner.
pub async fn delete_contract_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM contracts
WHERE id = $1 AND owner_id = $2
"#,
)
.bind(id)
.bind(owner_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Change contract phase and record event.
pub async fn change_contract_phase_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
new_phase: &str,
) -> Result<Option<Contract>, sqlx::Error> {
// Get current phase
let existing = get_contract_for_owner(pool, id, owner_id).await?;
let Some(existing) = existing else {
return Ok(None);
};
let previous_phase = existing.phase.clone();
// Update phase
let contract = sqlx::query_as::<_, Contract>(
r#"
UPDATE contracts
SET phase = $3, version = version + 1, updated_at = NOW()
WHERE id = $1 AND owner_id = $2
RETURNING *
"#,
)
.bind(id)
.bind(owner_id)
.bind(new_phase)
.fetch_optional(pool)
.await?;
// Record event
if contract.is_some() {
sqlx::query(
r#"
INSERT INTO contract_events (contract_id, event_type, previous_phase, new_phase)
VALUES ($1, 'phase_change', $2, $3)
"#,
)
.bind(id)
.bind(&previous_phase)
.bind(new_phase)
.execute(pool)
.await?;
}
Ok(contract)
}
// =============================================================================
// Contract Repository Functions
// =============================================================================
/// List repositories for a contract.
pub async fn list_contract_repositories(
pool: &PgPool,
contract_id: Uuid,
) -> Result<Vec<ContractRepository>, sqlx::Error> {
sqlx::query_as::<_, ContractRepository>(
r#"
SELECT *
FROM contract_repositories
WHERE contract_id = $1
ORDER BY is_primary DESC, created_at ASC
"#,
)
.bind(contract_id)
.fetch_all(pool)
.await
}
/// Add a remote repository to a contract.
pub async fn add_remote_repository(
pool: &PgPool,
contract_id: Uuid,
name: &str,
repository_url: &str,
is_primary: bool,
) -> Result<ContractRepository, sqlx::Error> {
// If is_primary, clear other primaries first
if is_primary {
sqlx::query(
r#"
UPDATE contract_repositories
SET is_primary = false, updated_at = NOW()
WHERE contract_id = $1 AND is_primary = true
"#,
)
.bind(contract_id)
.execute(pool)
.await?;
}
sqlx::query_as::<_, ContractRepository>(
r#"
INSERT INTO contract_repositories (contract_id, name, repository_url, source_type, status, is_primary)
VALUES ($1, $2, $3, 'remote', 'ready', $4)
RETURNING *
"#,
)
.bind(contract_id)
.bind(name)
.bind(repository_url)
.bind(is_primary)
.fetch_one(pool)
.await
}
/// Add a local repository to a contract.
pub async fn add_local_repository(
pool: &PgPool,
contract_id: Uuid,
name: &str,
local_path: &str,
is_primary: bool,
) -> Result<ContractRepository, sqlx::Error> {
// If is_primary, clear other primaries first
if is_primary {
sqlx::query(
r#"
UPDATE contract_repositories
SET is_primary = false, updated_at = NOW()
WHERE contract_id = $1 AND is_primary = true
"#,
)
.bind(contract_id)
.execute(pool)
.await?;
}
sqlx::query_as::<_, ContractRepository>(
r#"
INSERT INTO contract_repositories (contract_id, name, local_path, source_type, status, is_primary)
VALUES ($1, $2, $3, 'local', 'ready', $4)
RETURNING *
"#,
)
.bind(contract_id)
.bind(name)
.bind(local_path)
.bind(is_primary)
.fetch_one(pool)
.await
}
/// Create a managed repository (daemon will create it).
pub async fn create_managed_repository(
pool: &PgPool,
contract_id: Uuid,
name: &str,
is_primary: bool,
) -> Result<ContractRepository, sqlx::Error> {
// If is_primary, clear other primaries first
if is_primary {
sqlx::query(
r#"
UPDATE contract_repositories
SET is_primary = false, updated_at = NOW()
WHERE contract_id = $1 AND is_primary = true
"#,
)
.bind(contract_id)
.execute(pool)
.await?;
}
sqlx::query_as::<_, ContractRepository>(
r#"
INSERT INTO contract_repositories (contract_id, name, source_type, status, is_primary)
VALUES ($1, $2, 'managed', 'pending', $3)
RETURNING *
"#,
)
.bind(contract_id)
.bind(name)
.bind(is_primary)
.fetch_one(pool)
.await
}
/// Delete a repository from a contract.
pub async fn delete_contract_repository(
pool: &PgPool,
repo_id: Uuid,
contract_id: Uuid,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM contract_repositories
WHERE id = $1 AND contract_id = $2
"#,
)
.bind(repo_id)
.bind(contract_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Set a repository as primary (and clear others).
pub async fn set_repository_primary(
pool: &PgPool,
repo_id: Uuid,
contract_id: Uuid,
) -> Result<bool, sqlx::Error> {
// Clear other primaries
sqlx::query(
r#"
UPDATE contract_repositories
SET is_primary = false, updated_at = NOW()
WHERE contract_id = $1 AND is_primary = true
"#,
)
.bind(contract_id)
.execute(pool)
.await?;
// Set this one as primary
let result = sqlx::query(
r#"
UPDATE contract_repositories
SET is_primary = true, updated_at = NOW()
WHERE id = $1 AND contract_id = $2
"#,
)
.bind(repo_id)
.bind(contract_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Update managed repository status (used by daemon).
pub async fn update_managed_repository_status(
pool: &PgPool,
repo_id: Uuid,
status: &str,
repository_url: Option<&str>,
) -> Result<Option<ContractRepository>, sqlx::Error> {
sqlx::query_as::<_, ContractRepository>(
r#"
UPDATE contract_repositories
SET status = $2, repository_url = COALESCE($3, repository_url), updated_at = NOW()
WHERE id = $1
RETURNING *
"#,
)
.bind(repo_id)
.bind(status)
.bind(repository_url)
.fetch_optional(pool)
.await
}
// =============================================================================
// Contract Task Association Functions
// =============================================================================
/// Add a task to a contract.
pub async fn add_task_to_contract(
pool: &PgPool,
contract_id: Uuid,
task_id: Uuid,
owner_id: Uuid,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
UPDATE tasks
SET contract_id = $2, updated_at = NOW()
WHERE id = $1 AND owner_id = $3
"#,
)
.bind(task_id)
.bind(contract_id)
.bind(owner_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Remove a task from a contract.
pub async fn remove_task_from_contract(
pool: &PgPool,
contract_id: Uuid,
task_id: Uuid,
owner_id: Uuid,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
UPDATE tasks
SET contract_id = NULL, updated_at = NOW()
WHERE id = $1 AND contract_id = $2 AND owner_id = $3
"#,
)
.bind(task_id)
.bind(contract_id)
.bind(owner_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// List files in a contract.
pub async fn list_files_in_contract(
pool: &PgPool,
contract_id: Uuid,
owner_id: Uuid,
) -> Result<Vec<FileSummary>, sqlx::Error> {
// Use a manual query since FileSummary doesn't have a FromRow derive with all the computed fields
let files = sqlx::query_as::<_, File>(
r#"
SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
WHERE contract_id = $1 AND owner_id = $2
ORDER BY created_at DESC
"#,
)
.bind(contract_id)
.bind(owner_id)
.fetch_all(pool)
.await?;
Ok(files.into_iter().map(FileSummary::from).collect())
}
/// List tasks in a contract.
pub async fn list_tasks_in_contract(
pool: &PgPool,
contract_id: Uuid,
owner_id: Uuid,
) -> Result<Vec<TaskSummary>, sqlx::Error> {
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
t.version, t.is_supervisor, t.created_at, t.updated_at
FROM tasks t
LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.contract_id = $1 AND t.owner_id = $2
ORDER BY t.priority DESC, t.created_at DESC
"#,
)
.bind(contract_id)
.bind(owner_id)
.fetch_all(pool)
.await
}
// =============================================================================
// Contract Events
// =============================================================================
/// List events for a contract.
pub async fn list_contract_events(
pool: &PgPool,
contract_id: Uuid,
) -> Result<Vec<ContractEvent>, sqlx::Error> {
sqlx::query_as::<_, ContractEvent>(
r#"
SELECT *
FROM contract_events
WHERE contract_id = $1
ORDER BY created_at DESC
"#,
)
.bind(contract_id)
.fetch_all(pool)
.await
}
/// Record a contract event.
pub async fn record_contract_event(
pool: &PgPool,
contract_id: Uuid,
event_type: &str,
event_data: Option<serde_json::Value>,
) -> Result<ContractEvent, sqlx::Error> {
sqlx::query_as::<_, ContractEvent>(
r#"
INSERT INTO contract_events (contract_id, event_type, event_data)
VALUES ($1, $2, $3)
RETURNING *
"#,
)
.bind(contract_id)
.bind(event_type)
.bind(event_data)
.fetch_one(pool)
.await
}
// ============================================================================
// Task Checkpoints
// ============================================================================
/// Create a checkpoint for a task.
pub async fn create_task_checkpoint(
pool: &PgPool,
task_id: Uuid,
commit_sha: &str,
branch_name: &str,
message: &str,
files_changed: Option<serde_json::Value>,
lines_added: Option<i32>,
lines_removed: Option<i32>,
) -> Result<TaskCheckpoint, sqlx::Error> {
// Get current checkpoint count and increment
let checkpoint_number: i32 = sqlx::query_scalar(
"SELECT COALESCE(MAX(checkpoint_number), 0) + 1 FROM task_checkpoints WHERE task_id = $1",
)
.bind(task_id)
.fetch_one(pool)
.await?;
// Update task's checkpoint tracking
sqlx::query(
r#"
UPDATE tasks
SET last_checkpoint_sha = $1,
checkpoint_count = $2,
checkpoint_message = $3,
updated_at = NOW()
WHERE id = $4
"#,
)
.bind(commit_sha)
.bind(checkpoint_number)
.bind(message)
.bind(task_id)
.execute(pool)
.await?;
sqlx::query_as::<_, TaskCheckpoint>(
r#"
INSERT INTO task_checkpoints (
task_id, checkpoint_number, commit_sha, branch_name, message,
files_changed, lines_added, lines_removed
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING *
"#,
)
.bind(task_id)
.bind(checkpoint_number)
.bind(commit_sha)
.bind(branch_name)
.bind(message)
.bind(files_changed)
.bind(lines_added)
.bind(lines_removed)
.fetch_one(pool)
.await
}
/// Get a checkpoint by ID.
pub async fn get_task_checkpoint(
pool: &PgPool,
id: Uuid,
) -> Result<Option<TaskCheckpoint>, sqlx::Error> {
sqlx::query_as::<_, TaskCheckpoint>("SELECT * FROM task_checkpoints WHERE id = $1")
.bind(id)
.fetch_optional(pool)
.await
}
/// Get a checkpoint by commit SHA.
pub async fn get_task_checkpoint_by_sha(
pool: &PgPool,
commit_sha: &str,
) -> Result<Option<TaskCheckpoint>, sqlx::Error> {
sqlx::query_as::<_, TaskCheckpoint>("SELECT * FROM task_checkpoints WHERE commit_sha = $1")
.bind(commit_sha)
.fetch_optional(pool)
.await
}
/// List checkpoints for a task.
pub async fn list_task_checkpoints(
pool: &PgPool,
task_id: Uuid,
) -> Result<Vec<TaskCheckpoint>, sqlx::Error> {
sqlx::query_as::<_, TaskCheckpoint>(
"SELECT * FROM task_checkpoints WHERE task_id = $1 ORDER BY checkpoint_number DESC",
)
.bind(task_id)
.fetch_all(pool)
.await
}
// ============================================================================
// Supervisor State
// ============================================================================
/// Create or update supervisor state for a contract.
pub async fn upsert_supervisor_state(
pool: &PgPool,
contract_id: Uuid,
task_id: Uuid,
conversation_history: serde_json::Value,
pending_task_ids: &[Uuid],
phase: &str,
) -> Result<SupervisorState, sqlx::Error> {
sqlx::query_as::<_, SupervisorState>(
r#"
INSERT INTO supervisor_states (contract_id, task_id, conversation_history, pending_task_ids, phase, last_activity)
VALUES ($1, $2, $3, $4, $5, NOW())
ON CONFLICT (contract_id) DO UPDATE SET
task_id = EXCLUDED.task_id,
conversation_history = EXCLUDED.conversation_history,
pending_task_ids = EXCLUDED.pending_task_ids,
phase = EXCLUDED.phase,
last_activity = NOW(),
updated_at = NOW()
RETURNING *
"#,
)
.bind(contract_id)
.bind(task_id)
.bind(conversation_history)
.bind(pending_task_ids)
.bind(phase)
.fetch_one(pool)
.await
}
/// Get supervisor state for a contract.
pub async fn get_supervisor_state(
pool: &PgPool,
contract_id: Uuid,
) -> Result<Option<SupervisorState>, sqlx::Error> {
sqlx::query_as::<_, SupervisorState>("SELECT * FROM supervisor_states WHERE contract_id = $1")
.bind(contract_id)
.fetch_optional(pool)
.await
}
/// Get supervisor state by task ID.
pub async fn get_supervisor_state_by_task(
pool: &PgPool,
task_id: Uuid,
) -> Result<Option<SupervisorState>, sqlx::Error> {
sqlx::query_as::<_, SupervisorState>("SELECT * FROM supervisor_states WHERE task_id = $1")
.bind(task_id)
.fetch_optional(pool)
.await
}
/// Update supervisor conversation history.
pub async fn update_supervisor_conversation(
pool: &PgPool,
contract_id: Uuid,
conversation_history: serde_json::Value,
) -> Result<SupervisorState, sqlx::Error> {
sqlx::query_as::<_, SupervisorState>(
r#"
UPDATE supervisor_states
SET conversation_history = $1,
last_activity = NOW(),
updated_at = NOW()
WHERE contract_id = $2
RETURNING *
"#,
)
.bind(conversation_history)
.bind(contract_id)
.fetch_one(pool)
.await
}
/// Update supervisor pending tasks.
pub async fn update_supervisor_pending_tasks(
pool: &PgPool,
contract_id: Uuid,
pending_task_ids: &[Uuid],
) -> Result<SupervisorState, sqlx::Error> {
sqlx::query_as::<_, SupervisorState>(
r#"
UPDATE supervisor_states
SET pending_task_ids = $1,
last_activity = NOW(),
updated_at = NOW()
WHERE contract_id = $2
RETURNING *
"#,
)
.bind(pending_task_ids)
.bind(contract_id)
.fetch_one(pool)
.await
}
// ============================================================================
// Contract Supervisor
// ============================================================================
/// Update contract's supervisor task ID.
pub async fn update_contract_supervisor(
pool: &PgPool,
contract_id: Uuid,
supervisor_task_id: Uuid,
) -> Result<Contract, sqlx::Error> {
sqlx::query_as::<_, Contract>(
r#"
UPDATE contracts
SET supervisor_task_id = $1,
updated_at = NOW()
WHERE id = $2
RETURNING *
"#,
)
.bind(supervisor_task_id)
.bind(contract_id)
.fetch_one(pool)
.await
}
/// Get the supervisor task for a contract.
pub async fn get_contract_supervisor_task(
pool: &PgPool,
contract_id: Uuid,
) -> Result<Option<Task>, sqlx::Error> {
sqlx::query_as::<_, Task>(
r#"
SELECT t.* FROM tasks t
JOIN contracts c ON c.supervisor_task_id = t.id
WHERE c.id = $1
"#,
)
.bind(contract_id)
.fetch_optional(pool)
.await
}
// ============================================================================
// Task Tree Queries
// ============================================================================
/// Get full task tree for a contract.
pub async fn get_contract_task_tree(
pool: &PgPool,
contract_id: Uuid,
) -> Result<Vec<Task>, sqlx::Error> {
sqlx::query_as::<_, Task>(
r#"
WITH RECURSIVE task_tree AS (
-- Base case: root tasks (no parent)
SELECT * FROM tasks
WHERE contract_id = $1 AND parent_task_id IS NULL
UNION ALL
-- Recursive case: children of current level
SELECT t.* FROM tasks t
JOIN task_tree tt ON t.parent_task_id = tt.id
)
SELECT * FROM task_tree
ORDER BY depth, created_at
"#,
)
.bind(contract_id)
.fetch_all(pool)
.await
}
/// Get task tree from a specific root task.
pub async fn get_task_tree(pool: &PgPool, root_task_id: Uuid) -> Result<Vec<Task>, sqlx::Error> {
sqlx::query_as::<_, Task>(
r#"
WITH RECURSIVE task_tree AS (
-- Base case: the root task
SELECT * FROM tasks WHERE id = $1
UNION ALL
-- Recursive case: children of current level
SELECT t.* FROM tasks t
JOIN task_tree tt ON t.parent_task_id = tt.id
)
SELECT * FROM task_tree
ORDER BY depth, created_at
"#,
)
.bind(root_task_id)
.fetch_all(pool)
.await
}
// ============================================================================
// Daemon Selection
// ============================================================================
/// Get daemons with capacity info for selection.
pub async fn get_available_daemons(
pool: &PgPool,
owner_id: Uuid,
) -> Result<Vec<DaemonWithCapacity>, sqlx::Error> {
sqlx::query_as::<_, DaemonWithCapacity>(
r#"
SELECT id, owner_id, connection_id, hostname, machine_id,
max_concurrent_tasks, current_task_count,
capacity_score, task_queue_length, supports_migration,
status, last_heartbeat_at, connected_at
FROM daemons
WHERE owner_id = $1 AND status = 'connected'
ORDER BY
COALESCE(capacity_score, 100) DESC,
(max_concurrent_tasks - current_task_count) DESC,
COALESCE(task_queue_length, 0) ASC
"#,
)
.bind(owner_id)
.fetch_all(pool)
.await
}
/// Create a daemon task assignment.
pub async fn create_daemon_task_assignment(
pool: &PgPool,
daemon_id: Uuid,
task_id: Uuid,
) -> Result<DaemonTaskAssignment, sqlx::Error> {
sqlx::query_as::<_, DaemonTaskAssignment>(
r#"
INSERT INTO daemon_task_assignments (daemon_id, task_id)
VALUES ($1, $2)
RETURNING *
"#,
)
.bind(daemon_id)
.bind(task_id)
.fetch_one(pool)
.await
}
/// Update daemon task assignment status.
pub async fn update_daemon_task_assignment_status(
pool: &PgPool,
task_id: Uuid,
status: &str,
) -> Result<DaemonTaskAssignment, sqlx::Error> {
sqlx::query_as::<_, DaemonTaskAssignment>(
r#"
UPDATE daemon_task_assignments
SET status = $1
WHERE task_id = $2
RETURNING *
"#,
)
.bind(status)
.bind(task_id)
.fetch_one(pool)
.await
}
/// Get daemon task assignment for a task.
pub async fn get_daemon_task_assignment(
pool: &PgPool,
task_id: Uuid,
) -> Result<Option<DaemonTaskAssignment>, sqlx::Error> {
sqlx::query_as::<_, DaemonTaskAssignment>(
"SELECT * FROM daemon_task_assignments WHERE task_id = $1",
)
.bind(task_id)
.fetch_optional(pool)
.await
}