//! Repository pattern for file database operations.
use chrono::Utc;
use serde::Deserialize;
use sqlx::PgPool;
use uuid::Uuid;
use super::models::{
CheckpointPatch, CheckpointPatchInfo, Contract, ContractChatConversation,
ContractChatMessageRecord, ContractEvent, ContractRepository, ContractSummary,
ContractTypeTemplateRecord, ConversationMessage, ConversationSnapshot,
CreateContractRequest, CreateFileRequest, CreateTaskRequest,
CreateTemplateRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity,
DeliverableDefinition, Directive, DirectiveStep, DirectiveSummary,
CreateDirectiveRequest, CreateDirectiveStepRequest, DirectiveGoalHistory,
UpdateDirectiveRequest, UpdateDirectiveStepRequest,
CreateOrderRequest, Order, UpdateOrderRequest,
CreateDirectiveOrderGroupRequest, DirectiveOrderGroup, UpdateDirectiveOrderGroupRequest,
File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters,
MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult, PhaseConfig,
PhaseDefinition, SupervisorHeartbeatRecord, SupervisorState,
Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest,
UpdateFileRequest, UpdateTaskRequest, UpdateTemplateRequest,
UserSetting,
};
/// Repository error types.
#[derive(Debug)]
pub enum RepositoryError {
/// Database error
Database(sqlx::Error),
/// Version conflict (optimistic locking failure)
VersionConflict {
/// The version the client expected
expected: i32,
/// The actual current version in the database
actual: i32,
},
}
impl From<sqlx::Error> for RepositoryError {
fn from(e: sqlx::Error) -> Self {
RepositoryError::Database(e)
}
}
impl std::fmt::Display for RepositoryError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RepositoryError::Database(e) => write!(f, "Database error: {}", e),
RepositoryError::VersionConflict { expected, actual } => {
write!(
f,
"Version conflict: expected {}, actual {}",
expected, actual
)
}
}
}
}
impl std::error::Error for RepositoryError {}
/// Generate a default name based on current timestamp.
fn generate_default_name() -> String {
let now = Utc::now();
now.format("Recording - %b %d %Y %H:%M:%S").to_string()
}
/// Internal request for creating files without contract association (e.g., audio transcription).
/// User-facing file creation should use CreateFileRequest which requires contract_id.
pub struct InternalCreateFileRequest {
pub name: Option<String>,
pub description: Option<String>,
pub transcript: Vec<super::models::TranscriptEntry>,
pub location: Option<String>,
}
/// Create a new file record (internal use, no contract required).
/// For user-facing file creation, use create_file_for_owner which requires a contract.
pub async fn create_file(pool: &PgPool, req: InternalCreateFileRequest) -> Result<File, sqlx::Error> {
let name = req.name.unwrap_or_else(generate_default_name);
let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default();
let body_json = serde_json::to_value::<Vec<super::models::BodyElement>>(vec![]).unwrap();
sqlx::query_as::<_, File>(
r#"
INSERT INTO files (name, description, transcript, location, summary, body)
VALUES ($1, $2, $3, $4, NULL, $5)
RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(&name)
.bind(&req.description)
.bind(&transcript_json)
.bind(&req.location)
.bind(&body_json)
.fetch_one(pool)
.await
}
/// Get a file by ID.
pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
WHERE id = $1
"#,
)
.bind(id)
.fetch_optional(pool)
.await
}
/// List all files, ordered by created_at DESC.
pub async fn list_files(pool: &PgPool) -> Result<Vec<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
ORDER BY created_at DESC
"#,
)
.fetch_all(pool)
.await
}
/// Update a file by ID with optimistic locking.
///
/// If `req.version` is provided, the update will only succeed if the current
/// version matches. Returns `RepositoryError::VersionConflict` if there's a mismatch.
///
/// If `req.version` is None (e.g., internal system updates), version checking is skipped.
pub async fn update_file(
pool: &PgPool,
id: Uuid,
req: UpdateFileRequest,
) -> Result<Option<File>, RepositoryError> {
// Get the existing file first
let existing = get_file(pool, id).await?;
let Some(existing) = existing else {
return Ok(None);
};
// Check version if provided (optimistic locking)
if let Some(expected_version) = req.version {
if existing.version != expected_version {
return Err(RepositoryError::VersionConflict {
expected: expected_version,
actual: existing.version,
});
}
}
// Apply updates
let name = req.name.unwrap_or(existing.name);
let description = req.description.or(existing.description);
let transcript = req.transcript.unwrap_or(existing.transcript);
let transcript_json = serde_json::to_value(&transcript).unwrap_or_default();
let summary = req.summary.or(existing.summary);
let body = req.body.unwrap_or(existing.body);
let body_json = serde_json::to_value(&body).unwrap_or_default();
// Update with version check in WHERE clause for race condition safety
let result = if req.version.is_some() {
sqlx::query_as::<_, File>(
r#"
UPDATE files
SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW()
WHERE id = $1 AND version = $7
RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(id)
.bind(&name)
.bind(&description)
.bind(&transcript_json)
.bind(&summary)
.bind(&body_json)
.bind(req.version.unwrap())
.fetch_optional(pool)
.await?
} else {
// No version check for internal updates
sqlx::query_as::<_, File>(
r#"
UPDATE files
SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW()
WHERE id = $1
RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(id)
.bind(&name)
.bind(&description)
.bind(&transcript_json)
.bind(&summary)
.bind(&body_json)
.fetch_optional(pool)
.await?
};
// If versioned update returned None, there was a race condition
if result.is_none() && req.version.is_some() {
// Re-fetch to get the actual version
if let Some(current) = get_file(pool, id).await? {
return Err(RepositoryError::VersionConflict {
expected: req.version.unwrap(),
actual: current.version,
});
}
}
Ok(result)
}
/// Delete a file by ID.
pub async fn delete_file(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM files
WHERE id = $1
"#,
)
.bind(id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Count total files.
pub async fn count_files(pool: &PgPool) -> Result<i64, sqlx::Error> {
let result: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM files")
.fetch_one(pool)
.await?;
Ok(result.0)
}
// =============================================================================
// Owner-Scoped File Functions
// =============================================================================
/// Create a new file record for a specific owner.
/// Files must belong to a contract - the contract_id is required and the phase is looked up.
pub async fn create_file_for_owner(
pool: &PgPool,
owner_id: Uuid,
req: CreateFileRequest,
) -> Result<File, sqlx::Error> {
let name = req.name.unwrap_or_else(generate_default_name);
let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default();
// Use body from request (may be empty or contain template elements)
let body_json = serde_json::to_value(&req.body).unwrap_or_default();
// Use provided contract_phase, or look up from contract's current phase
let contract_phase: Option<String> = if req.contract_phase.is_some() {
req.contract_phase
} else {
sqlx::query_scalar(
"SELECT phase FROM contracts WHERE id = $1 AND owner_id = $2",
)
.bind(req.contract_id)
.bind(owner_id)
.fetch_optional(pool)
.await?
};
sqlx::query_as::<_, File>(
r#"
INSERT INTO files (owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, repo_file_path)
VALUES ($1, $2, $3, $4, $5, $6, $7, NULL, $8, $9)
RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(owner_id)
.bind(req.contract_id)
.bind(&contract_phase)
.bind(&name)
.bind(&req.description)
.bind(&transcript_json)
.bind(&req.location)
.bind(&body_json)
.bind(&req.repo_file_path)
.fetch_one(pool)
.await
}
/// Get a file by ID, scoped to owner.
pub async fn get_file_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
) -> Result<Option<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
WHERE id = $1 AND owner_id = $2
"#,
)
.bind(id)
.bind(owner_id)
.fetch_optional(pool)
.await
}
/// List all files for an owner, ordered by created_at DESC.
pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
WHERE owner_id = $1
ORDER BY created_at DESC
"#,
)
.bind(owner_id)
.fetch_all(pool)
.await
}
/// Database row type for file summary with contract info
#[derive(Debug, sqlx::FromRow)]
struct FileSummaryRow {
id: Uuid,
contract_id: Option<Uuid>,
contract_name: Option<String>,
contract_phase: Option<String>,
name: String,
description: Option<String>,
#[sqlx(json)]
transcript: Vec<crate::db::models::TranscriptEntry>,
version: i32,
repo_file_path: Option<String>,
repo_sync_status: Option<String>,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
}
/// List file summaries for an owner with contract info (joined).
pub async fn list_file_summaries_for_owner(
pool: &PgPool,
owner_id: Uuid,
) -> Result<Vec<FileSummary>, sqlx::Error> {
let rows = sqlx::query_as::<_, FileSummaryRow>(
r#"
SELECT
f.id, f.contract_id, c.name as contract_name, f.contract_phase,
f.name, f.description, f.transcript, f.version,
f.repo_file_path, f.repo_sync_status, f.created_at, f.updated_at
FROM files f
LEFT JOIN contracts c ON f.contract_id = c.id
WHERE f.owner_id = $1
ORDER BY f.created_at DESC
"#,
)
.bind(owner_id)
.fetch_all(pool)
.await?;
Ok(rows
.into_iter()
.map(|row| {
let duration = row
.transcript
.iter()
.map(|t| t.end)
.fold(0.0_f32, f32::max);
FileSummary {
id: row.id,
contract_id: row.contract_id,
contract_name: row.contract_name,
contract_phase: row.contract_phase,
name: row.name,
description: row.description,
transcript_count: row.transcript.len(),
duration: if duration > 0.0 { Some(duration) } else { None },
version: row.version,
repo_file_path: row.repo_file_path,
repo_sync_status: row.repo_sync_status,
created_at: row.created_at,
updated_at: row.updated_at,
}
})
.collect())
}
/// Update a file by ID with optimistic locking, scoped to owner.
pub async fn update_file_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
req: UpdateFileRequest,
) -> Result<Option<File>, RepositoryError> {
// Get the existing file first (scoped to owner)
let existing = get_file_for_owner(pool, id, owner_id).await?;
let Some(existing) = existing else {
return Ok(None);
};
// Check version if provided (optimistic locking)
if let Some(expected_version) = req.version {
if existing.version != expected_version {
return Err(RepositoryError::VersionConflict {
expected: expected_version,
actual: existing.version,
});
}
}
// Apply updates
let name = req.name.unwrap_or(existing.name);
let description = req.description.or(existing.description);
let transcript = req.transcript.unwrap_or(existing.transcript);
let transcript_json = serde_json::to_value(&transcript).unwrap_or_default();
let summary = req.summary.or(existing.summary);
let body = req.body.unwrap_or(existing.body);
let body_json = serde_json::to_value(&body).unwrap_or_default();
// Update with version check in WHERE clause for race condition safety
let result = if req.version.is_some() {
sqlx::query_as::<_, File>(
r#"
UPDATE files
SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
WHERE id = $1 AND owner_id = $2 AND version = $8
RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(id)
.bind(owner_id)
.bind(&name)
.bind(&description)
.bind(&transcript_json)
.bind(&summary)
.bind(&body_json)
.bind(req.version.unwrap())
.fetch_optional(pool)
.await?
} else {
// No version check for internal updates
sqlx::query_as::<_, File>(
r#"
UPDATE files
SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
WHERE id = $1 AND owner_id = $2
RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(id)
.bind(owner_id)
.bind(&name)
.bind(&description)
.bind(&transcript_json)
.bind(&summary)
.bind(&body_json)
.fetch_optional(pool)
.await?
};
// If versioned update returned None, there was a race condition
if result.is_none() && req.version.is_some() {
// Re-fetch to get the actual version
if let Some(current) = get_file_for_owner(pool, id, owner_id).await? {
return Err(RepositoryError::VersionConflict {
expected: req.version.unwrap(),
actual: current.version,
});
}
}
Ok(result)
}
/// Delete a file by ID, scoped to owner.
pub async fn delete_file_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM files
WHERE id = $1 AND owner_id = $2
"#,
)
.bind(id)
.bind(owner_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
// =============================================================================
// Version History Functions
// =============================================================================
/// Set the version source for the current transaction.
/// This is used by the trigger to record who made the change.
pub async fn set_version_source(pool: &PgPool, source: &str) -> Result<(), sqlx::Error> {
sqlx::query(&format!("SET LOCAL app.version_source = '{}'", source))
.execute(pool)
.await?;
Ok(())
}
/// Set the change description for the current transaction.
pub async fn set_change_description(pool: &PgPool, description: &str) -> Result<(), sqlx::Error> {
// Escape single quotes for SQL
let escaped = description.replace('\'', "''");
sqlx::query(&format!("SET LOCAL app.change_description = '{}'", escaped))
.execute(pool)
.await?;
Ok(())
}
/// List all versions of a file, ordered by version DESC.
pub async fn list_file_versions(pool: &PgPool, file_id: Uuid) -> Result<Vec<FileVersion>, sqlx::Error> {
// First get the current version from the files table
let current = get_file(pool, file_id).await?;
let mut versions = sqlx::query_as::<_, FileVersion>(
r#"
SELECT id, file_id, version, name, description, summary, body, source, change_description, created_at
FROM file_versions
WHERE file_id = $1
ORDER BY version DESC
"#,
)
.bind(file_id)
.fetch_all(pool)
.await?;
// Add the current version as the first entry if it exists
if let Some(file) = current {
let current_version = FileVersion {
id: file.id,
file_id: file.id,
version: file.version,
name: file.name,
description: file.description,
summary: file.summary,
body: file.body,
source: "user".to_string(), // Current version source
change_description: None,
created_at: file.updated_at,
};
versions.insert(0, current_version);
}
Ok(versions)
}
/// Get a specific version of a file.
pub async fn get_file_version(
pool: &PgPool,
file_id: Uuid,
version: i32,
) -> Result<Option<FileVersion>, sqlx::Error> {
// First check if this is the current version
if let Some(file) = get_file(pool, file_id).await? {
if file.version == version {
return Ok(Some(FileVersion {
id: file.id,
file_id: file.id,
version: file.version,
name: file.name,
description: file.description,
summary: file.summary,
body: file.body,
source: "user".to_string(),
change_description: None,
created_at: file.updated_at,
}));
}
}
// Otherwise, look in the versions table
sqlx::query_as::<_, FileVersion>(
r#"
SELECT id, file_id, version, name, description, summary, body, source, change_description, created_at
FROM file_versions
WHERE file_id = $1 AND version = $2
"#,
)
.bind(file_id)
.bind(version)
.fetch_optional(pool)
.await
}
/// Restore a file to a previous version.
/// This creates a new version with the content from the target version.
pub async fn restore_file_version(
pool: &PgPool,
file_id: Uuid,
target_version: i32,
current_version: i32,
) -> Result<Option<File>, RepositoryError> {
// Get the target version content
let target = get_file_version(pool, file_id, target_version).await?;
let Some(target) = target else {
return Ok(None);
};
// Set version source and description for the trigger
set_version_source(pool, "system").await?;
set_change_description(pool, &format!("Restored from version {}", target_version)).await?;
// Update the file with the target version's content
// This will trigger the save_file_version trigger to save the current state first
let update_req = UpdateFileRequest {
name: Some(target.name),
description: target.description,
transcript: None,
summary: target.summary,
body: Some(target.body),
version: Some(current_version),
repo_file_path: None,
};
update_file(pool, file_id, update_req).await
}
/// Count versions for a file.
pub async fn count_file_versions(pool: &PgPool, file_id: Uuid) -> Result<i64, sqlx::Error> {
let result: (i64,) = sqlx::query_as(
"SELECT COUNT(*) + 1 FROM file_versions WHERE file_id = $1", // +1 for current version
)
.bind(file_id)
.fetch_one(pool)
.await?;
Ok(result.0)
}
// =============================================================================
// Task Functions
// =============================================================================
/// Create a new task.
///
/// If creating a subtask (parent_task_id is set) and repository settings are not provided,
/// the subtask will inherit repository_url, base_branch, target_branch, merge_mode,
/// and target_repo_path from the parent task. Depth is calculated from parent and limited
/// to max 1 (2 levels: orchestrator at depth 0, subtasks at depth 1).
///
/// NOTE: completion_action is NOT inherited - subtasks should not auto-merge unless
/// explicitly configured. The supervisor controls when completion steps happen.
///
/// Task spawning is now controlled by supervisors at the application level.
/// Depth is no longer constrained in the database.
pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, sqlx::Error> {
// Calculate depth and inherit settings from parent if applicable
let (depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
if let Some(parent_id) = req.parent_task_id {
// Fetch parent task to get depth and inherit settings
let parent = get_task(pool, parent_id).await?
.ok_or_else(|| sqlx::Error::RowNotFound)?;
let new_depth = parent.depth + 1;
// Subtasks inherit contract_id from parent (or use request contract_id if parent has none)
let contract_id = parent.contract_id.or(req.contract_id);
// Inherit repo settings if not provided
let repo_url = req.repository_url.clone().or(parent.repository_url);
let base_branch = req.base_branch.clone().or(parent.base_branch);
let target_branch = req.target_branch.clone().or(parent.target_branch);
let merge_mode = req.merge_mode.clone().or(parent.merge_mode);
let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path);
// NOTE: completion_action is NOT inherited - subtasks should not auto-merge.
// The supervisor integrates subtask work from their worktrees.
let completion_action = req.completion_action.clone();
(new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
} else {
// Top-level task: depth 0, use contract_id from request (may be None for branched tasks)
(
0,
req.contract_id,
req.repository_url.clone(),
req.base_branch.clone(),
req.target_branch.clone(),
req.merge_mode.clone(),
req.target_repo_path.clone(),
req.completion_action.clone(),
)
};
let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default());
sqlx::query_as::<_, Task>(
r#"
INSERT INTO tasks (
contract_id, parent_task_id, depth, name, description, plan, priority,
is_supervisor, repository_url, base_branch, target_branch, merge_mode,
target_repo_path, completion_action, continue_from_task_id, copy_files,
branched_from_task_id, conversation_state, supervisor_worktree_task_id
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19)
RETURNING *
"#,
)
.bind(contract_id)
.bind(req.parent_task_id)
.bind(depth)
.bind(&req.name)
.bind(&req.description)
.bind(&req.plan)
.bind(req.priority)
.bind(req.is_supervisor)
.bind(&repo_url)
.bind(&base_branch)
.bind(&target_branch)
.bind(&merge_mode)
.bind(&target_repo_path)
.bind(&completion_action)
.bind(&req.continue_from_task_id)
.bind(©_files_json)
.bind(&req.branched_from_task_id)
.bind(&req.conversation_history)
.bind(&req.supervisor_worktree_task_id)
.fetch_one(pool)
.await
}
/// Get a task by ID.
pub async fn get_task(pool: &PgPool, id: Uuid) -> Result<Option<Task>, sqlx::Error> {
sqlx::query_as::<_, Task>(
r#"
SELECT *
FROM tasks
WHERE id = $1
"#,
)
.bind(id)
.fetch_optional(pool)
.await
}
/// List all top-level tasks (no parent), ordered by created_at DESC.
/// Hidden tasks are excluded by default.
pub async fn list_tasks(pool: &PgPool) -> Result<Vec<TaskSummary>, sqlx::Error> {
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
c.status as contract_status,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id IS NULL AND COALESCE(t.hidden, false) = false
ORDER BY t.priority DESC, t.created_at DESC
"#,
)
.fetch_all(pool)
.await
}
/// List subtasks of a parent task.
pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSummary>, sqlx::Error> {
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
c.status as contract_status,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id = $1
ORDER BY t.priority DESC, t.created_at DESC
"#,
)
.bind(parent_id)
.fetch_all(pool)
.await
}
/// List all tasks in a contract (for supervisor tree view).
pub async fn list_tasks_by_contract(
pool: &PgPool,
contract_id: Uuid,
owner_id: Uuid,
) -> Result<Vec<Task>, sqlx::Error> {
sqlx::query_as::<_, Task>(
r#"
SELECT * FROM tasks
WHERE contract_id = $1 AND owner_id = $2
ORDER BY is_supervisor DESC, depth ASC, created_at ASC
"#,
)
.bind(contract_id)
.bind(owner_id)
.fetch_all(pool)
.await
}
/// Get pending tasks for a contract (non-supervisor tasks only).
/// Includes tasks that were interrupted (retry candidates).
/// Prioritizes interrupted tasks and excludes those that exceeded max_retries.
pub async fn get_pending_tasks_for_contract(
pool: &PgPool,
contract_id: Uuid,
owner_id: Uuid,
) -> Result<Vec<Task>, sqlx::Error> {
sqlx::query_as::<_, Task>(
r#"
SELECT t.* FROM tasks t
WHERE t.contract_id = $1 AND t.owner_id = $2
AND t.status = 'pending'
AND t.retry_count < t.max_retries
AND t.is_supervisor = false
ORDER BY
t.interrupted_at DESC NULLS LAST,
t.priority DESC,
t.created_at ASC
"#,
)
.bind(contract_id)
.bind(owner_id)
.fetch_all(pool)
.await
}
/// Get all contracts that have pending tasks awaiting retry.
/// Returns tuples of (contract_id, owner_id) for contracts with retryable tasks.
pub async fn get_all_pending_task_contracts(
pool: &PgPool,
) -> Result<Vec<(Uuid, Uuid)>, sqlx::Error> {
sqlx::query_as::<_, (Uuid, Uuid)>(
r#"
SELECT DISTINCT t.contract_id, t.owner_id
FROM tasks t
WHERE t.contract_id IS NOT NULL
AND t.status = 'pending'
AND t.retry_count < t.max_retries
AND t.is_supervisor = false
ORDER BY t.owner_id, t.contract_id
"#,
)
.fetch_all(pool)
.await
}
/// Mark a task as pending for retry after daemon failure.
/// Increments retry count and adds the failed daemon to exclusion list.
pub async fn mark_task_for_retry(
pool: &PgPool,
task_id: Uuid,
failed_daemon_id: Uuid,
) -> Result<Option<Task>, sqlx::Error> {
sqlx::query_as::<_, Task>(
r#"
UPDATE tasks
SET status = 'pending',
daemon_id = NULL,
retry_count = retry_count + 1,
failed_daemon_ids = array_append(COALESCE(failed_daemon_ids, '{}'), $2),
last_active_daemon_id = $2,
interrupted_at = NOW(),
error_message = 'Daemon disconnected, awaiting retry',
updated_at = NOW()
WHERE id = $1
AND retry_count < max_retries
RETURNING *
"#,
)
.bind(task_id)
.bind(failed_daemon_id)
.fetch_optional(pool)
.await
}
/// Mark a task as permanently failed (exceeded retry limit).
pub async fn mark_task_permanently_failed(
pool: &PgPool,
task_id: Uuid,
failed_daemon_id: Uuid,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
UPDATE tasks
SET status = 'failed',
daemon_id = NULL,
retry_count = retry_count + 1,
failed_daemon_ids = array_append(COALESCE(failed_daemon_ids, '{}'), $2),
last_active_daemon_id = $2,
error_message = 'Task failed: exceeded maximum retry attempts',
updated_at = NOW()
WHERE id = $1
"#,
)
.bind(task_id)
.bind(failed_daemon_id)
.execute(pool)
.await?;
Ok(())
}
/// Update a task by ID with optimistic locking.
pub async fn update_task(
pool: &PgPool,
id: Uuid,
req: UpdateTaskRequest,
) -> Result<Option<Task>, RepositoryError> {
// Get the existing task first
let existing = get_task(pool, id).await?;
let Some(existing) = existing else {
return Ok(None);
};
// Check version if provided (optimistic locking)
if let Some(expected_version) = req.version {
if existing.version != expected_version {
return Err(RepositoryError::VersionConflict {
expected: expected_version,
actual: existing.version,
});
}
}
// Apply updates
let name = req.name.unwrap_or(existing.name);
let description = req.description.or(existing.description);
let plan = req.plan.unwrap_or(existing.plan);
let status = req.status.unwrap_or(existing.status);
let priority = req.priority.unwrap_or(existing.priority);
let progress_summary = req.progress_summary.or(existing.progress_summary);
let last_output = req.last_output.or(existing.last_output);
let error_message = req.error_message.or(existing.error_message);
let merge_mode = req.merge_mode.or(existing.merge_mode);
let pr_url = req.pr_url.or(existing.pr_url);
let target_repo_path = req.target_repo_path.or(existing.target_repo_path);
let completion_action = req.completion_action.or(existing.completion_action);
// Handle clear_daemon_id: if true, set to NULL; otherwise use provided value or keep existing
let daemon_id = if req.clear_daemon_id {
None
} else {
req.daemon_id.or(existing.daemon_id)
};
// Update with version check in WHERE clause for race condition safety
let result = if req.version.is_some() {
sqlx::query_as::<_, Task>(
r#"
UPDATE tasks
SET name = $2, description = $3, plan = $4, status = $5, priority = $6,
progress_summary = $7, last_output = $8, error_message = $9,
merge_mode = $10, pr_url = $11, daemon_id = $12,
target_repo_path = $13, completion_action = $14, updated_at = NOW()
WHERE id = $1 AND version = $15
RETURNING *
"#,
)
.bind(id)
.bind(&name)
.bind(&description)
.bind(&plan)
.bind(&status)
.bind(priority)
.bind(&progress_summary)
.bind(&last_output)
.bind(&error_message)
.bind(&merge_mode)
.bind(&pr_url)
.bind(daemon_id)
.bind(&target_repo_path)
.bind(&completion_action)
.bind(req.version.unwrap())
.fetch_optional(pool)
.await?
} else {
sqlx::query_as::<_, Task>(
r#"
UPDATE tasks
SET name = $2, description = $3, plan = $4, status = $5, priority = $6,
progress_summary = $7, last_output = $8, error_message = $9,
merge_mode = $10, pr_url = $11, daemon_id = $12,
target_repo_path = $13, completion_action = $14, updated_at = NOW()
WHERE id = $1
RETURNING *
"#,
)
.bind(id)
.bind(&name)
.bind(&description)
.bind(&plan)
.bind(&status)
.bind(priority)
.bind(&progress_summary)
.bind(&last_output)
.bind(&error_message)
.bind(&merge_mode)
.bind(&pr_url)
.bind(daemon_id)
.bind(&target_repo_path)
.bind(&completion_action)
.fetch_optional(pool)
.await?
};
// If versioned update returned None, there was a race condition
if result.is_none() && req.version.is_some() {
if let Some(current) = get_task(pool, id).await? {
return Err(RepositoryError::VersionConflict {
expected: req.version.unwrap(),
actual: current.version,
});
}
}
Ok(result)
}
/// Delete a task by ID.
pub async fn delete_task(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM tasks
WHERE id = $1
"#,
)
.bind(id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Count total tasks.
pub async fn count_tasks(pool: &PgPool) -> Result<i64, sqlx::Error> {
let result: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM tasks WHERE parent_task_id IS NULL",
)
.fetch_one(pool)
.await?;
Ok(result.0)
}
// =============================================================================
// Owner-Scoped Task Functions
// =============================================================================
/// Create a new task for a specific owner.
pub async fn create_task_for_owner(
pool: &PgPool,
owner_id: Uuid,
req: CreateTaskRequest,
) -> Result<Task, sqlx::Error> {
// Calculate depth and inherit settings from parent if applicable
let (depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
if let Some(parent_id) = req.parent_task_id {
// Fetch parent task to get depth and inherit settings (must belong to same owner)
let parent = get_task_for_owner(pool, parent_id, owner_id).await?
.ok_or_else(|| sqlx::Error::RowNotFound)?;
let new_depth = parent.depth + 1;
// Validate max depth
if new_depth >= 2 {
return Err(sqlx::Error::Protocol(format!(
"Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.",
new_depth
)));
}
// Subtasks inherit contract_id from parent (or use request contract_id if parent has none)
let contract_id = parent.contract_id.or(req.contract_id);
// Inherit repo settings if not provided
let repo_url = req.repository_url.clone().or(parent.repository_url);
let base_branch = req.base_branch.clone().or(parent.base_branch);
let target_branch = req.target_branch.clone().or(parent.target_branch);
let merge_mode = req.merge_mode.clone().or(parent.merge_mode);
let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path);
// NOTE: completion_action is NOT inherited - subtasks should not auto-merge.
// The orchestrator integrates subtask work from their worktrees.
let completion_action = req.completion_action.clone();
(new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
} else {
// Top-level task: depth 0, use contract_id from request (may be None for branched tasks)
(
0,
req.contract_id,
req.repository_url.clone(),
req.base_branch.clone(),
req.target_branch.clone(),
req.merge_mode.clone(),
req.target_repo_path.clone(),
req.completion_action.clone(),
)
};
let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default());
sqlx::query_as::<_, Task>(
r#"
INSERT INTO tasks (
owner_id, contract_id, parent_task_id, depth, name, description, plan, priority,
is_supervisor, repository_url, base_branch, target_branch, merge_mode,
target_repo_path, completion_action, continue_from_task_id, copy_files,
branched_from_task_id, conversation_state, supervisor_worktree_task_id,
directive_id, directive_step_id
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22)
RETURNING *
"#,
)
.bind(owner_id)
.bind(contract_id)
.bind(req.parent_task_id)
.bind(depth)
.bind(&req.name)
.bind(&req.description)
.bind(&req.plan)
.bind(req.priority)
.bind(req.is_supervisor)
.bind(&repo_url)
.bind(&base_branch)
.bind(&target_branch)
.bind(&merge_mode)
.bind(&target_repo_path)
.bind(&completion_action)
.bind(&req.continue_from_task_id)
.bind(©_files_json)
.bind(&req.branched_from_task_id)
.bind(&req.conversation_history)
.bind(&req.supervisor_worktree_task_id)
.bind(&req.directive_id)
.bind(&req.directive_step_id)
.fetch_one(pool)
.await
}
/// Get a task by ID, scoped to owner.
pub async fn get_task_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
) -> Result<Option<Task>, sqlx::Error> {
sqlx::query_as::<_, Task>(
r#"
SELECT *
FROM tasks
WHERE id = $1 AND owner_id = $2
"#,
)
.bind(id)
.bind(owner_id)
.fetch_optional(pool)
.await
}
/// List all top-level tasks (no parent) for an owner, ordered by created_at DESC.
/// Hidden tasks are excluded by default.
pub async fn list_tasks_for_owner(
pool: &PgPool,
owner_id: Uuid,
) -> Result<Vec<TaskSummary>, sqlx::Error> {
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
c.status as contract_status,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.owner_id = $1 AND t.parent_task_id IS NULL AND COALESCE(t.hidden, false) = false
ORDER BY t.priority DESC, t.created_at DESC
"#,
)
.bind(owner_id)
.fetch_all(pool)
.await
}
/// List subtasks of a parent task, scoped to owner.
pub async fn list_subtasks_for_owner(
pool: &PgPool,
parent_id: Uuid,
owner_id: Uuid,
) -> Result<Vec<TaskSummary>, sqlx::Error> {
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
c.status as contract_status,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.owner_id = $1 AND t.parent_task_id = $2
ORDER BY t.priority DESC, t.created_at DESC
"#,
)
.bind(owner_id)
.bind(parent_id)
.fetch_all(pool)
.await
}
/// Update a task by ID with optimistic locking, scoped to owner.
pub async fn update_task_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
req: UpdateTaskRequest,
) -> Result<Option<Task>, RepositoryError> {
// Get the existing task first (scoped to owner)
let existing = get_task_for_owner(pool, id, owner_id).await?;
let Some(existing) = existing else {
return Ok(None);
};
// Check version if provided (optimistic locking)
if let Some(expected_version) = req.version {
if existing.version != expected_version {
return Err(RepositoryError::VersionConflict {
expected: expected_version,
actual: existing.version,
});
}
}
// Apply updates
let name = req.name.unwrap_or(existing.name);
let description = req.description.or(existing.description);
let plan = req.plan.unwrap_or(existing.plan);
let status = req.status.unwrap_or(existing.status);
let priority = req.priority.unwrap_or(existing.priority);
let progress_summary = req.progress_summary.or(existing.progress_summary);
let last_output = req.last_output.or(existing.last_output);
let error_message = req.error_message.or(existing.error_message);
let merge_mode = req.merge_mode.or(existing.merge_mode);
let pr_url = req.pr_url.or(existing.pr_url);
let repository_url = req.repository_url.or(existing.repository_url);
let target_repo_path = req.target_repo_path.or(existing.target_repo_path);
let completion_action = req.completion_action.or(existing.completion_action);
let hidden = req.hidden.unwrap_or(existing.hidden);
let daemon_id = if req.clear_daemon_id {
None
} else {
req.daemon_id.or(existing.daemon_id)
};
// Update with version check in WHERE clause for race condition safety
let result = if req.version.is_some() {
sqlx::query_as::<_, Task>(
r#"
UPDATE tasks
SET name = $3, description = $4, plan = $5, status = $6, priority = $7,
progress_summary = $8, last_output = $9, error_message = $10,
merge_mode = $11, pr_url = $12, daemon_id = $13,
target_repo_path = $14, completion_action = $15, repository_url = $16,
hidden = $17, updated_at = NOW()
WHERE id = $1 AND owner_id = $2 AND version = $18
RETURNING *
"#,
)
.bind(id)
.bind(owner_id)
.bind(&name)
.bind(&description)
.bind(&plan)
.bind(&status)
.bind(priority)
.bind(&progress_summary)
.bind(&last_output)
.bind(&error_message)
.bind(&merge_mode)
.bind(&pr_url)
.bind(daemon_id)
.bind(&target_repo_path)
.bind(&completion_action)
.bind(&repository_url)
.bind(hidden)
.bind(req.version.unwrap())
.fetch_optional(pool)
.await?
} else {
sqlx::query_as::<_, Task>(
r#"
UPDATE tasks
SET name = $3, description = $4, plan = $5, status = $6, priority = $7,
progress_summary = $8, last_output = $9, error_message = $10,
merge_mode = $11, pr_url = $12, daemon_id = $13,
target_repo_path = $14, completion_action = $15, repository_url = $16,
hidden = $17, updated_at = NOW()
WHERE id = $1 AND owner_id = $2
RETURNING *
"#,
)
.bind(id)
.bind(owner_id)
.bind(&name)
.bind(&description)
.bind(&plan)
.bind(&status)
.bind(priority)
.bind(&progress_summary)
.bind(&last_output)
.bind(&error_message)
.bind(&merge_mode)
.bind(&pr_url)
.bind(daemon_id)
.bind(&target_repo_path)
.bind(&completion_action)
.bind(&repository_url)
.bind(hidden)
.fetch_optional(pool)
.await?
};
// If versioned update returned None, there was a race condition
if result.is_none() && req.version.is_some() {
if let Some(current) = get_task_for_owner(pool, id, owner_id).await? {
return Err(RepositoryError::VersionConflict {
expected: req.version.unwrap(),
actual: current.version,
});
}
}
Ok(result)
}
/// Delete a task by ID, scoped to owner.
pub async fn delete_task_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM tasks
WHERE id = $1 AND owner_id = $2
"#,
)
.bind(id)
.bind(owner_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Update task status and record event.
pub async fn update_task_status(
pool: &PgPool,
id: Uuid,
new_status: &str,
event_data: Option<serde_json::Value>,
) -> Result<Option<Task>, sqlx::Error> {
// Get existing status
let existing = get_task(pool, id).await?;
let Some(existing) = existing else {
return Ok(None);
};
let previous_status = existing.status.clone();
// Update task status
let task = sqlx::query_as::<_, Task>(
r#"
UPDATE tasks
SET status = $2, updated_at = NOW(),
started_at = CASE WHEN $2 = 'running' AND started_at IS NULL THEN NOW() ELSE started_at END,
completed_at = CASE WHEN $2 IN ('done', 'failed', 'merged') THEN NOW() ELSE completed_at END
WHERE id = $1
RETURNING *
"#,
)
.bind(id)
.bind(new_status)
.fetch_optional(pool)
.await?;
// Record event
if task.is_some() {
let _ = create_task_event(
pool,
id,
"status_change",
Some(&previous_status),
Some(new_status),
event_data,
)
.await;
}
Ok(task)
}
// =============================================================================
// Task Event Functions
// =============================================================================
/// Create a task event.
pub async fn create_task_event(
pool: &PgPool,
task_id: Uuid,
event_type: &str,
previous_status: Option<&str>,
new_status: Option<&str>,
event_data: Option<serde_json::Value>,
) -> Result<TaskEvent, sqlx::Error> {
sqlx::query_as::<_, TaskEvent>(
r#"
INSERT INTO task_events (task_id, event_type, previous_status, new_status, event_data)
VALUES ($1, $2, $3, $4, $5)
RETURNING *
"#,
)
.bind(task_id)
.bind(event_type)
.bind(previous_status)
.bind(new_status)
.bind(event_data)
.fetch_one(pool)
.await
}
/// List events for a task.
pub async fn list_task_events(
pool: &PgPool,
task_id: Uuid,
limit: Option<i64>,
) -> Result<Vec<TaskEvent>, sqlx::Error> {
let limit = limit.unwrap_or(100);
sqlx::query_as::<_, TaskEvent>(
r#"
SELECT *
FROM task_events
WHERE task_id = $1
ORDER BY created_at DESC
LIMIT $2
"#,
)
.bind(task_id)
.bind(limit)
.fetch_all(pool)
.await
}
// =============================================================================
// Daemon Functions
// =============================================================================
/// Register a new daemon connection.
pub async fn register_daemon(
pool: &PgPool,
owner_id: Uuid,
connection_id: &str,
hostname: Option<&str>,
machine_id: Option<&str>,
max_concurrent_tasks: i32,
) -> Result<Daemon, sqlx::Error> {
sqlx::query_as::<_, Daemon>(
r#"
INSERT INTO daemons (owner_id, connection_id, hostname, machine_id, max_concurrent_tasks)
VALUES ($1, $2, $3, $4, $5)
RETURNING *
"#,
)
.bind(owner_id)
.bind(connection_id)
.bind(hostname)
.bind(machine_id)
.bind(max_concurrent_tasks)
.fetch_one(pool)
.await
}
/// Get a daemon by ID.
pub async fn get_daemon(pool: &PgPool, id: Uuid) -> Result<Option<Daemon>, sqlx::Error> {
sqlx::query_as::<_, Daemon>(
r#"
SELECT *
FROM daemons
WHERE id = $1
"#,
)
.bind(id)
.fetch_optional(pool)
.await
}
/// Get a daemon by connection ID.
pub async fn get_daemon_by_connection(
pool: &PgPool,
connection_id: &str,
) -> Result<Option<Daemon>, sqlx::Error> {
sqlx::query_as::<_, Daemon>(
r#"
SELECT *
FROM daemons
WHERE connection_id = $1
"#,
)
.bind(connection_id)
.fetch_optional(pool)
.await
}
/// List all daemons.
pub async fn list_daemons(pool: &PgPool) -> Result<Vec<Daemon>, sqlx::Error> {
sqlx::query_as::<_, Daemon>(
r#"
SELECT *
FROM daemons
ORDER BY connected_at DESC
"#,
)
.fetch_all(pool)
.await
}
/// List daemons for a specific owner.
pub async fn list_daemons_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<Daemon>, sqlx::Error> {
sqlx::query_as::<_, Daemon>(
r#"
SELECT *
FROM daemons
WHERE owner_id = $1
ORDER BY connected_at DESC
"#,
)
.bind(owner_id)
.fetch_all(pool)
.await
}
/// Get a daemon by ID for a specific owner.
pub async fn get_daemon_for_owner(pool: &PgPool, id: Uuid, owner_id: Uuid) -> Result<Option<Daemon>, sqlx::Error> {
sqlx::query_as::<_, Daemon>(
r#"
SELECT *
FROM daemons
WHERE id = $1 AND owner_id = $2
"#,
)
.bind(id)
.bind(owner_id)
.fetch_optional(pool)
.await
}
/// Update daemon heartbeat.
pub async fn update_daemon_heartbeat(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
UPDATE daemons
SET last_heartbeat_at = NOW(), status = 'connected'
WHERE id = $1
"#,
)
.bind(id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Update daemon status.
pub async fn update_daemon_status(
pool: &PgPool,
id: Uuid,
status: &str,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
UPDATE daemons
SET status = $2,
disconnected_at = CASE WHEN $2 = 'disconnected' THEN NOW() ELSE disconnected_at END
WHERE id = $1
"#,
)
.bind(id)
.bind(status)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Mark daemon as disconnected by connection_id.
pub async fn disconnect_daemon_by_connection(
pool: &PgPool,
connection_id: &str,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
UPDATE daemons
SET status = 'disconnected',
disconnected_at = NOW()
WHERE connection_id = $1
"#,
)
.bind(connection_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Update daemon task count.
pub async fn update_daemon_task_count(
pool: &PgPool,
id: Uuid,
delta: i32,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
UPDATE daemons
SET current_task_count = GREATEST(0, current_task_count + $2)
WHERE id = $1
"#,
)
.bind(id)
.bind(delta)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Delete a daemon by ID.
pub async fn delete_daemon(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM daemons
WHERE id = $1
"#,
)
.bind(id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Delete a daemon by connection ID.
pub async fn delete_daemon_by_connection(
pool: &PgPool,
connection_id: &str,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM daemons
WHERE connection_id = $1
"#,
)
.bind(connection_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Count connected daemons.
pub async fn count_daemons(pool: &PgPool) -> Result<i64, sqlx::Error> {
let result: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM daemons WHERE status = 'connected'",
)
.fetch_one(pool)
.await?;
Ok(result.0)
}
/// Delete stale daemons that haven't sent a heartbeat within the timeout.
/// Returns the number of deleted daemons.
pub async fn delete_stale_daemons(
pool: &PgPool,
timeout_seconds: i64,
) -> Result<u64, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM daemons
WHERE last_heartbeat_at < NOW() - INTERVAL '1 second' * $1
"#,
)
.bind(timeout_seconds)
.execute(pool)
.await?;
Ok(result.rows_affected())
}
// =============================================================================
// Sibling Awareness Functions
// =============================================================================
/// List sibling tasks (tasks with the same parent, excluding the given task).
pub async fn list_sibling_tasks(
pool: &PgPool,
task_id: Uuid,
parent_id: Option<Uuid>,
) -> Result<Vec<TaskSummary>, sqlx::Error> {
match parent_id {
Some(parent) => {
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
c.status as contract_status,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id = $1 AND t.id != $2
ORDER BY t.priority DESC, t.created_at DESC
"#,
)
.bind(parent)
.bind(task_id)
.fetch_all(pool)
.await
}
None => {
// Top-level tasks (no parent) - siblings are other top-level tasks
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
c.status as contract_status,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id IS NULL AND t.id != $1
ORDER BY t.priority DESC, t.created_at DESC
"#,
)
.bind(task_id)
.fetch_all(pool)
.await
}
}
}
/// Get running sibling tasks (for context injection).
pub async fn get_running_siblings(
pool: &PgPool,
owner_id: Uuid,
task_id: Uuid,
parent_id: Option<Uuid>,
) -> Result<Vec<Task>, sqlx::Error> {
match parent_id {
Some(parent) => {
sqlx::query_as::<_, Task>(
r#"
SELECT *
FROM tasks t
WHERE t.owner_id = $1
AND t.parent_task_id = $2
AND t.id != $3
AND t.status = 'running'
ORDER BY t.priority DESC
"#,
)
.bind(owner_id)
.bind(parent)
.bind(task_id)
.fetch_all(pool)
.await
}
None => {
sqlx::query_as::<_, Task>(
r#"
SELECT *
FROM tasks t
WHERE t.owner_id = $1
AND t.parent_task_id IS NULL
AND t.id != $2
AND t.status = 'running'
ORDER BY t.priority DESC
"#,
)
.bind(owner_id)
.bind(task_id)
.fetch_all(pool)
.await
}
}
}
/// Get task with its siblings for context awareness.
pub async fn get_task_with_siblings(
pool: &PgPool,
id: Uuid,
) -> Result<Option<(Task, Vec<TaskSummary>)>, sqlx::Error> {
let task = get_task(pool, id).await?;
let Some(task) = task else {
return Ok(None);
};
let siblings = list_sibling_tasks(pool, id, task.parent_task_id).await?;
Ok(Some((task, siblings)))
}
// =============================================================================
// Task Output Persistence Functions
// =============================================================================
/// Save task output to the database.
/// This stores output in the task_events table with event_type='output'.
pub async fn save_task_output(
pool: &PgPool,
task_id: Uuid,
message_type: &str,
content: &str,
tool_name: Option<&str>,
tool_input: Option<serde_json::Value>,
is_error: Option<bool>,
cost_usd: Option<f64>,
duration_ms: Option<u64>,
) -> Result<TaskEvent, sqlx::Error> {
let event_data = serde_json::json!({
"messageType": message_type,
"content": content,
"toolName": tool_name,
"toolInput": tool_input,
"isError": is_error,
"costUsd": cost_usd,
"durationMs": duration_ms,
});
create_task_event(pool, task_id, "output", None, None, Some(event_data)).await
}
/// Get task output from the database.
/// Retrieves all output events for a task, ordered by creation time.
pub async fn get_task_output(
pool: &PgPool,
task_id: Uuid,
limit: Option<i64>,
) -> Result<Vec<TaskEvent>, sqlx::Error> {
let limit = limit.unwrap_or(1000);
sqlx::query_as::<_, TaskEvent>(
r#"
SELECT *
FROM task_events
WHERE task_id = $1 AND event_type = 'output'
ORDER BY created_at ASC
LIMIT $2
"#,
)
.bind(task_id)
.bind(limit)
.fetch_all(pool)
.await
}
/// Update task completion status with error message.
/// Sets the task status to 'done' or 'failed' and records completion time.
pub async fn complete_task(
pool: &PgPool,
task_id: Uuid,
success: bool,
error_message: Option<&str>,
) -> Result<Option<Task>, sqlx::Error> {
let status = if success { "done" } else { "failed" };
let task = sqlx::query_as::<_, Task>(
r#"
UPDATE tasks
SET status = $2,
error_message = COALESCE($3, error_message),
completed_at = NOW(),
updated_at = NOW()
WHERE id = $1
RETURNING *
"#,
)
.bind(task_id)
.bind(status)
.bind(error_message)
.fetch_optional(pool)
.await?;
// Record completion event
if task.is_some() {
let event_data = serde_json::json!({
"success": success,
"errorMessage": error_message,
});
let _ = create_task_event(
pool,
task_id,
"complete",
Some("running"),
Some(status),
Some(event_data),
)
.await;
}
Ok(task)
}
// =============================================================================
// Mesh Chat History Functions
// =============================================================================
/// Get or create the active conversation for an owner.
pub async fn get_or_create_active_conversation(
pool: &PgPool,
owner_id: Uuid,
) -> Result<MeshChatConversation, sqlx::Error> {
// Try to get existing active conversation for this owner
let existing = sqlx::query_as::<_, MeshChatConversation>(
r#"
SELECT *
FROM mesh_chat_conversations
WHERE is_active = true AND owner_id = $1
LIMIT 1
"#,
)
.bind(owner_id)
.fetch_optional(pool)
.await?;
if let Some(conv) = existing {
return Ok(conv);
}
// Create new conversation
sqlx::query_as::<_, MeshChatConversation>(
r#"
INSERT INTO mesh_chat_conversations (owner_id, is_active)
VALUES ($1, true)
RETURNING *
"#,
)
.bind(owner_id)
.fetch_one(pool)
.await
}
/// List messages for a conversation.
pub async fn list_chat_messages(
pool: &PgPool,
conversation_id: Uuid,
limit: Option<i32>,
) -> Result<Vec<MeshChatMessageRecord>, sqlx::Error> {
let limit = limit.unwrap_or(100);
sqlx::query_as::<_, MeshChatMessageRecord>(
r#"
SELECT *
FROM mesh_chat_messages
WHERE conversation_id = $1
ORDER BY created_at ASC
LIMIT $2
"#,
)
.bind(conversation_id)
.bind(limit)
.fetch_all(pool)
.await
}
/// Add a message to a conversation.
#[allow(clippy::too_many_arguments)]
pub async fn add_chat_message(
pool: &PgPool,
conversation_id: Uuid,
role: &str,
content: &str,
context_type: &str,
context_task_id: Option<Uuid>,
tool_calls: Option<serde_json::Value>,
pending_questions: Option<serde_json::Value>,
) -> Result<MeshChatMessageRecord, sqlx::Error> {
sqlx::query_as::<_, MeshChatMessageRecord>(
r#"
INSERT INTO mesh_chat_messages
(conversation_id, role, content, context_type, context_task_id, tool_calls, pending_questions)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING *
"#,
)
.bind(conversation_id)
.bind(role)
.bind(content)
.bind(context_type)
.bind(context_task_id)
.bind(tool_calls)
.bind(pending_questions)
.fetch_one(pool)
.await
}
/// Clear conversation (archive existing and create new).
pub async fn clear_conversation(pool: &PgPool, owner_id: Uuid) -> Result<MeshChatConversation, sqlx::Error> {
// Mark existing as inactive for this owner
sqlx::query(
r#"
UPDATE mesh_chat_conversations
SET is_active = false, updated_at = NOW()
WHERE is_active = true AND owner_id = $1
"#,
)
.bind(owner_id)
.execute(pool)
.await?;
// Create new active conversation
get_or_create_active_conversation(pool, owner_id).await
}
// =============================================================================
// Contract Chat History Functions
// =============================================================================
/// Get or create the active conversation for a contract.
pub async fn get_or_create_contract_conversation(
pool: &PgPool,
contract_id: Uuid,
owner_id: Uuid,
) -> Result<ContractChatConversation, sqlx::Error> {
// Try to get existing active conversation for this contract
let existing = sqlx::query_as::<_, ContractChatConversation>(
r#"
SELECT *
FROM contract_chat_conversations
WHERE is_active = true AND contract_id = $1 AND owner_id = $2
LIMIT 1
"#,
)
.bind(contract_id)
.bind(owner_id)
.fetch_optional(pool)
.await?;
if let Some(conv) = existing {
return Ok(conv);
}
// Create new conversation
sqlx::query_as::<_, ContractChatConversation>(
r#"
INSERT INTO contract_chat_conversations (contract_id, owner_id, is_active)
VALUES ($1, $2, true)
RETURNING *
"#,
)
.bind(contract_id)
.bind(owner_id)
.fetch_one(pool)
.await
}
/// List messages for a contract conversation.
pub async fn list_contract_chat_messages(
pool: &PgPool,
conversation_id: Uuid,
limit: Option<i32>,
) -> Result<Vec<ContractChatMessageRecord>, sqlx::Error> {
let limit = limit.unwrap_or(100);
sqlx::query_as::<_, ContractChatMessageRecord>(
r#"
SELECT *
FROM contract_chat_messages
WHERE conversation_id = $1
ORDER BY created_at ASC
LIMIT $2
"#,
)
.bind(conversation_id)
.bind(limit)
.fetch_all(pool)
.await
}
/// Add a message to a contract conversation.
pub async fn add_contract_chat_message(
pool: &PgPool,
conversation_id: Uuid,
role: &str,
content: &str,
tool_calls: Option<serde_json::Value>,
pending_questions: Option<serde_json::Value>,
) -> Result<ContractChatMessageRecord, sqlx::Error> {
sqlx::query_as::<_, ContractChatMessageRecord>(
r#"
INSERT INTO contract_chat_messages
(conversation_id, role, content, tool_calls, pending_questions)
VALUES ($1, $2, $3, $4, $5)
RETURNING *
"#,
)
.bind(conversation_id)
.bind(role)
.bind(content)
.bind(tool_calls)
.bind(pending_questions)
.fetch_one(pool)
.await
}
/// Clear contract conversation (archive existing and create new).
pub async fn clear_contract_conversation(
pool: &PgPool,
contract_id: Uuid,
owner_id: Uuid,
) -> Result<ContractChatConversation, sqlx::Error> {
// Mark existing as inactive for this contract
sqlx::query(
r#"
UPDATE contract_chat_conversations
SET is_active = false, updated_at = NOW()
WHERE is_active = true AND contract_id = $1 AND owner_id = $2
"#,
)
.bind(contract_id)
.bind(owner_id)
.execute(pool)
.await?;
// Create new active conversation
get_or_create_contract_conversation(pool, contract_id, owner_id).await
}
// =============================================================================
// Contract Type Template Functions (Owner-Scoped)
// =============================================================================
/// Create a new contract type template for a specific owner.
pub async fn create_template_for_owner(
pool: &PgPool,
owner_id: Uuid,
req: CreateTemplateRequest,
) -> Result<ContractTypeTemplateRecord, sqlx::Error> {
sqlx::query_as::<_, ContractTypeTemplateRecord>(
r#"
INSERT INTO contract_type_templates (owner_id, name, description, phases, default_phase, deliverables)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING *
"#,
)
.bind(owner_id)
.bind(&req.name)
.bind(&req.description)
.bind(serde_json::to_value(&req.phases).unwrap_or_default())
.bind(&req.default_phase)
.bind(match &req.deliverables {
Some(d) => serde_json::to_value(d).ok(),
None => None,
})
.fetch_one(pool)
.await
}
/// Get a contract type template by ID, scoped to owner.
pub async fn get_template_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
) -> Result<Option<ContractTypeTemplateRecord>, sqlx::Error> {
sqlx::query_as::<_, ContractTypeTemplateRecord>(
r#"
SELECT *
FROM contract_type_templates
WHERE id = $1 AND owner_id = $2
"#,
)
.bind(id)
.bind(owner_id)
.fetch_optional(pool)
.await
}
/// Get a contract type template by ID (internal use, no owner scoping).
pub async fn get_template_by_id(
pool: &PgPool,
id: Uuid,
) -> Result<Option<ContractTypeTemplateRecord>, sqlx::Error> {
sqlx::query_as::<_, ContractTypeTemplateRecord>(
r#"
SELECT *
FROM contract_type_templates
WHERE id = $1
"#,
)
.bind(id)
.fetch_optional(pool)
.await
}
/// List all contract type templates for an owner, ordered by name.
pub async fn list_templates_for_owner(
pool: &PgPool,
owner_id: Uuid,
) -> Result<Vec<ContractTypeTemplateRecord>, sqlx::Error> {
sqlx::query_as::<_, ContractTypeTemplateRecord>(
r#"
SELECT *
FROM contract_type_templates
WHERE owner_id = $1
ORDER BY name ASC
"#,
)
.bind(owner_id)
.fetch_all(pool)
.await
}
/// Update a contract type template for an owner.
pub async fn update_template_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
req: UpdateTemplateRequest,
) -> Result<Option<ContractTypeTemplateRecord>, RepositoryError> {
// Build dynamic update query
let mut query = String::from("UPDATE contract_type_templates SET updated_at = NOW()");
let mut param_idx = 3; // $1 = id, $2 = owner_id
if req.name.is_some() {
query.push_str(&format!(", name = ${}", param_idx));
param_idx += 1;
}
if req.description.is_some() {
query.push_str(&format!(", description = ${}", param_idx));
param_idx += 1;
}
if req.phases.is_some() {
query.push_str(&format!(", phases = ${}", param_idx));
param_idx += 1;
}
if req.default_phase.is_some() {
query.push_str(&format!(", default_phase = ${}", param_idx));
param_idx += 1;
}
if req.deliverables.is_some() {
query.push_str(&format!(", deliverables = ${}", param_idx));
param_idx += 1;
}
// Optimistic locking
if req.version.is_some() {
query.push_str(&format!(", version = version + 1 WHERE id = $1 AND owner_id = $2 AND version = ${}", param_idx));
} else {
query.push_str(", version = version + 1 WHERE id = $1 AND owner_id = $2");
}
query.push_str(" RETURNING *");
let mut sql_query = sqlx::query_as::<_, ContractTypeTemplateRecord>(&query);
sql_query = sql_query.bind(id).bind(owner_id);
if let Some(ref name) = req.name {
sql_query = sql_query.bind(name);
}
if let Some(ref description) = req.description {
sql_query = sql_query.bind(description);
}
if let Some(ref phases) = req.phases {
sql_query = sql_query.bind(serde_json::to_value(phases).unwrap_or_default());
}
if let Some(ref default_phase) = req.default_phase {
sql_query = sql_query.bind(default_phase);
}
if let Some(ref deliverables) = req.deliverables {
sql_query = sql_query.bind(serde_json::to_value(deliverables).unwrap_or_default());
}
if let Some(version) = req.version {
sql_query = sql_query.bind(version);
}
match sql_query.fetch_optional(pool).await {
Ok(result) => {
if result.is_none() && req.version.is_some() {
// Check if it's a version conflict
if let Some(current) = get_template_for_owner(pool, id, owner_id).await? {
return Err(RepositoryError::VersionConflict {
expected: req.version.unwrap(),
actual: current.version,
});
}
}
Ok(result)
}
Err(e) => Err(RepositoryError::Database(e)),
}
}
/// Delete a contract type template for an owner.
pub async fn delete_template_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM contract_type_templates
WHERE id = $1 AND owner_id = $2
"#,
)
.bind(id)
.bind(owner_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Helper function to build PhaseConfig from a template.
pub fn build_phase_config_from_template(template: &ContractTypeTemplateRecord) -> PhaseConfig {
PhaseConfig {
phases: template.phases.clone(),
default_phase: template.default_phase.clone(),
deliverables: template.deliverables.clone().unwrap_or_default(),
}
}
/// Helper function to build PhaseConfig for built-in contract types.
pub fn build_phase_config_for_builtin(contract_type: &str) -> PhaseConfig {
match contract_type {
"simple" => PhaseConfig {
phases: vec![
PhaseDefinition { id: "plan".to_string(), name: "Plan".to_string(), order: 0 },
PhaseDefinition { id: "execute".to_string(), name: "Execute".to_string(), order: 1 },
],
default_phase: "plan".to_string(),
deliverables: [
("plan".to_string(), vec![DeliverableDefinition {
id: "plan-document".to_string(),
name: "Plan".to_string(),
priority: "required".to_string(),
}]),
("execute".to_string(), vec![DeliverableDefinition {
id: "pull-request".to_string(),
name: "Pull Request".to_string(),
priority: "required".to_string(),
}]),
].into_iter().collect(),
},
"specification" => PhaseConfig {
phases: vec![
PhaseDefinition { id: "research".to_string(), name: "Research".to_string(), order: 0 },
PhaseDefinition { id: "specify".to_string(), name: "Specify".to_string(), order: 1 },
PhaseDefinition { id: "plan".to_string(), name: "Plan".to_string(), order: 2 },
PhaseDefinition { id: "execute".to_string(), name: "Execute".to_string(), order: 3 },
PhaseDefinition { id: "review".to_string(), name: "Review".to_string(), order: 4 },
],
default_phase: "research".to_string(),
deliverables: [
("research".to_string(), vec![DeliverableDefinition {
id: "research-notes".to_string(),
name: "Research Notes".to_string(),
priority: "required".to_string(),
}]),
("specify".to_string(), vec![DeliverableDefinition {
id: "requirements-document".to_string(),
name: "Requirements Document".to_string(),
priority: "required".to_string(),
}]),
("plan".to_string(), vec![DeliverableDefinition {
id: "plan-document".to_string(),
name: "Plan".to_string(),
priority: "required".to_string(),
}]),
("execute".to_string(), vec![DeliverableDefinition {
id: "pull-request".to_string(),
name: "Pull Request".to_string(),
priority: "required".to_string(),
}]),
("review".to_string(), vec![DeliverableDefinition {
id: "release-notes".to_string(),
name: "Release Notes".to_string(),
priority: "required".to_string(),
}]),
].into_iter().collect(),
},
"execute" | _ => PhaseConfig {
phases: vec![
PhaseDefinition { id: "execute".to_string(), name: "Execute".to_string(), order: 0 },
],
default_phase: "execute".to_string(),
deliverables: std::collections::HashMap::new(),
},
}
}
// =============================================================================
// Contract Functions (Owner-Scoped)
// =============================================================================
/// Create a new contract for a specific owner.
/// Supports both built-in contract types (simple, specification, execute) and custom templates.
pub async fn create_contract_for_owner(
pool: &PgPool,
owner_id: Uuid,
req: CreateContractRequest,
) -> Result<Contract, sqlx::Error> {
// Determine phase configuration based on template_id or contract_type
let (phase_config, contract_type_str, default_phase): (PhaseConfig, String, String) =
if let Some(template_id) = req.template_id {
// Look up the custom template
let template = get_template_by_id(pool, template_id)
.await?
.ok_or_else(|| {
sqlx::Error::Protocol(format!("Template not found: {}", template_id))
})?;
let config = build_phase_config_from_template(&template);
let default = config.default_phase.clone();
// For custom templates, store the template name as the contract_type
(config, template.name.clone(), default)
} else {
// Use built-in contract type
let contract_type = req.contract_type.as_deref().unwrap_or("simple");
// Validate contract type
let valid_types = ["simple", "specification", "execute"];
if !valid_types.contains(&contract_type) {
return Err(sqlx::Error::Protocol(format!(
"Invalid contract_type '{}'. Must be one of: {} or provide a template_id",
contract_type,
valid_types.join(", ")
)));
}
let config = build_phase_config_for_builtin(contract_type);
let default = config.default_phase.clone();
(config, contract_type.to_string(), default)
};
// Get valid phase IDs from the configuration
let valid_phase_ids: Vec<String> = phase_config.phases.iter().map(|p| p.id.clone()).collect();
// Use provided initial_phase or default based on contract type/template
let phase = req.initial_phase.as_deref().unwrap_or(&default_phase);
// Validate the phase is valid for this contract type/template
if !valid_phase_ids.contains(&phase.to_string()) {
return Err(sqlx::Error::Protocol(format!(
"Invalid initial_phase '{}' for contract type '{}'. Must be one of: {}",
phase,
contract_type_str,
valid_phase_ids.join(", ")
)));
}
let autonomous_loop = req.autonomous_loop.unwrap_or(false);
let phase_guard = req.phase_guard.unwrap_or(false);
let local_only = req.local_only.unwrap_or(false);
let auto_merge_local = req.auto_merge_local.unwrap_or(false);
// Serialize phase_config to JSON
let phase_config_json = serde_json::to_value(&phase_config).ok();
sqlx::query_as::<_, Contract>(
r#"
INSERT INTO contracts (owner_id, name, description, contract_type, phase, autonomous_loop, phase_guard, local_only, auto_merge_local, phase_config)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
RETURNING *
"#,
)
.bind(owner_id)
.bind(&req.name)
.bind(&req.description)
.bind(&contract_type_str)
.bind(phase)
.bind(autonomous_loop)
.bind(phase_guard)
.bind(local_only)
.bind(auto_merge_local)
.bind(phase_config_json)
.fetch_one(pool)
.await
}
/// Get a contract by ID, scoped to owner.
pub async fn get_contract_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
) -> Result<Option<Contract>, sqlx::Error> {
sqlx::query_as::<_, Contract>(
r#"
SELECT *
FROM contracts
WHERE id = $1 AND owner_id = $2
"#,
)
.bind(id)
.bind(owner_id)
.fetch_optional(pool)
.await
}
/// List all contracts for an owner, ordered by created_at DESC.
pub async fn list_contracts_for_owner(
pool: &PgPool,
owner_id: Uuid,
) -> Result<Vec<ContractSummary>, sqlx::Error> {
sqlx::query_as::<_, ContractSummary>(
r#"
SELECT
c.id, c.name, c.description, c.contract_type, c.phase, c.status,
c.supervisor_task_id, c.local_only, c.auto_merge_local, c.version, c.created_at,
(SELECT COUNT(*) FROM files WHERE contract_id = c.id) as file_count,
(SELECT COUNT(*) FROM tasks WHERE contract_id = c.id) as task_count,
(SELECT COUNT(*) FROM contract_repositories WHERE contract_id = c.id) as repository_count
FROM contracts c
WHERE c.owner_id = $1
ORDER BY c.created_at DESC
"#,
)
.bind(owner_id)
.fetch_all(pool)
.await
}
/// Get contract summary by ID.
pub async fn get_contract_summary_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
) -> Result<Option<ContractSummary>, sqlx::Error> {
sqlx::query_as::<_, ContractSummary>(
r#"
SELECT
c.id, c.name, c.description, c.contract_type, c.phase, c.status,
c.supervisor_task_id, c.local_only, c.auto_merge_local, c.version, c.created_at,
(SELECT COUNT(*) FROM files WHERE contract_id = c.id) as file_count,
(SELECT COUNT(*) FROM tasks WHERE contract_id = c.id) as task_count,
(SELECT COUNT(*) FROM contract_repositories WHERE contract_id = c.id) as repository_count
FROM contracts c
WHERE c.id = $1 AND c.owner_id = $2
"#,
)
.bind(id)
.bind(owner_id)
.fetch_optional(pool)
.await
}
/// Update a contract by ID with optimistic locking, scoped to owner.
pub async fn update_contract_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
req: UpdateContractRequest,
) -> Result<Option<Contract>, RepositoryError> {
let existing = get_contract_for_owner(pool, id, owner_id).await?;
let Some(existing) = existing else {
return Ok(None);
};
// Check version if provided (optimistic locking)
if let Some(expected_version) = req.version {
if existing.version != expected_version {
return Err(RepositoryError::VersionConflict {
expected: expected_version,
actual: existing.version,
});
}
}
// Apply updates
let name = req.name.unwrap_or(existing.name);
let description = req.description.or(existing.description);
let phase = req.phase.unwrap_or(existing.phase);
let status = req.status.unwrap_or(existing.status);
let supervisor_task_id = req.supervisor_task_id.or(existing.supervisor_task_id);
let autonomous_loop = req.autonomous_loop.unwrap_or(existing.autonomous_loop);
let phase_guard = req.phase_guard.unwrap_or(existing.phase_guard);
let local_only = req.local_only.unwrap_or(existing.local_only);
let auto_merge_local = req.auto_merge_local.unwrap_or(existing.auto_merge_local);
let result = if req.version.is_some() {
sqlx::query_as::<_, Contract>(
r#"
UPDATE contracts
SET name = $3, description = $4, phase = $5, status = $6,
supervisor_task_id = $7, autonomous_loop = $8, phase_guard = $9, local_only = $10, auto_merge_local = $11, version = version + 1, updated_at = NOW()
WHERE id = $1 AND owner_id = $2 AND version = $12
RETURNING *
"#,
)
.bind(id)
.bind(owner_id)
.bind(&name)
.bind(&description)
.bind(&phase)
.bind(&status)
.bind(supervisor_task_id)
.bind(autonomous_loop)
.bind(phase_guard)
.bind(local_only)
.bind(auto_merge_local)
.bind(req.version.unwrap())
.fetch_optional(pool)
.await?
} else {
sqlx::query_as::<_, Contract>(
r#"
UPDATE contracts
SET name = $3, description = $4, phase = $5, status = $6,
supervisor_task_id = $7, autonomous_loop = $8, phase_guard = $9, local_only = $10, auto_merge_local = $11, version = version + 1, updated_at = NOW()
WHERE id = $1 AND owner_id = $2
RETURNING *
"#,
)
.bind(id)
.bind(owner_id)
.bind(&name)
.bind(&description)
.bind(&phase)
.bind(&status)
.bind(supervisor_task_id)
.bind(autonomous_loop)
.bind(phase_guard)
.bind(local_only)
.bind(auto_merge_local)
.fetch_optional(pool)
.await?
};
// If versioned update returned None, there was a race condition
if result.is_none() && req.version.is_some() {
if let Some(current) = get_contract_for_owner(pool, id, owner_id).await? {
return Err(RepositoryError::VersionConflict {
expected: req.version.unwrap(),
actual: current.version,
});
}
}
Ok(result)
}
/// Delete a contract by ID, scoped to owner.
pub async fn delete_contract_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM contracts
WHERE id = $1 AND owner_id = $2
"#,
)
.bind(id)
.bind(owner_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Change contract phase and record event.
///
/// This is the simple version without version checking. Use `change_contract_phase_with_version`
/// for explicit version conflict detection.
pub async fn change_contract_phase_for_owner(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
new_phase: &str,
) -> Result<Option<Contract>, sqlx::Error> {
// Get current phase
let existing = get_contract_for_owner(pool, id, owner_id).await?;
let Some(existing) = existing else {
return Ok(None);
};
let previous_phase = existing.phase.clone();
// Update phase
let contract = sqlx::query_as::<_, Contract>(
r#"
UPDATE contracts
SET phase = $3, version = version + 1, updated_at = NOW()
WHERE id = $1 AND owner_id = $2
RETURNING *
"#,
)
.bind(id)
.bind(owner_id)
.bind(new_phase)
.fetch_optional(pool)
.await?;
// Record event
if contract.is_some() {
sqlx::query(
r#"
INSERT INTO contract_events (contract_id, event_type, previous_phase, new_phase)
VALUES ($1, 'phase_change', $2, $3)
"#,
)
.bind(id)
.bind(&previous_phase)
.bind(new_phase)
.execute(pool)
.await?;
}
Ok(contract)
}
/// Change contract phase with explicit version checking for conflict detection.
///
/// Uses `SELECT ... FOR UPDATE` to lock the row and prevent race conditions.
/// Returns `PhaseChangeResult::VersionConflict` if the expected version doesn't match.
pub async fn change_contract_phase_with_version(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
new_phase: &str,
expected_version: Option<i32>,
) -> Result<PhaseChangeResult, sqlx::Error> {
// Start a transaction to ensure atomicity with row locking
let mut tx = pool.begin().await?;
// Lock the row with SELECT FOR UPDATE and get current state
let existing: Option<Contract> = sqlx::query_as::<_, Contract>(
r#"
SELECT *
FROM contracts
WHERE id = $1 AND owner_id = $2
FOR UPDATE
"#,
)
.bind(id)
.bind(owner_id)
.fetch_optional(&mut *tx)
.await?;
let Some(existing) = existing else {
tx.rollback().await?;
return Ok(PhaseChangeResult::NotFound);
};
// Check version if provided (optimistic locking)
if let Some(expected) = expected_version {
if existing.version != expected {
tx.rollback().await?;
return Ok(PhaseChangeResult::VersionConflict {
expected,
actual: existing.version,
current_phase: existing.phase,
});
}
}
// Validate the phase transition is allowed
let valid_phases = existing.valid_phase_ids();
if !valid_phases.contains(&new_phase.to_string()) {
tx.rollback().await?;
return Ok(PhaseChangeResult::ValidationFailed {
reason: format!(
"Invalid phase '{}' for contract type '{}'",
new_phase, existing.contract_type
),
missing_requirements: vec![format!(
"Phase must be one of: {}",
valid_phases.join(", ")
)],
});
}
let previous_phase = existing.phase.clone();
// Update phase with version increment
let contract = sqlx::query_as::<_, Contract>(
r#"
UPDATE contracts
SET phase = $3, version = version + 1, updated_at = NOW()
WHERE id = $1 AND owner_id = $2
RETURNING *
"#,
)
.bind(id)
.bind(owner_id)
.bind(new_phase)
.fetch_one(&mut *tx)
.await?;
// Record event
sqlx::query(
r#"
INSERT INTO contract_events (contract_id, event_type, previous_phase, new_phase)
VALUES ($1, 'phase_change', $2, $3)
"#,
)
.bind(id)
.bind(&previous_phase)
.bind(new_phase)
.execute(&mut *tx)
.await?;
// Commit the transaction
tx.commit().await?;
Ok(PhaseChangeResult::Success(contract))
}
// =============================================================================
// Contract Repository Functions
// =============================================================================
/// List repositories for a contract.
pub async fn list_contract_repositories(
pool: &PgPool,
contract_id: Uuid,
) -> Result<Vec<ContractRepository>, sqlx::Error> {
sqlx::query_as::<_, ContractRepository>(
r#"
SELECT *
FROM contract_repositories
WHERE contract_id = $1
ORDER BY is_primary DESC, created_at ASC
"#,
)
.bind(contract_id)
.fetch_all(pool)
.await
}
/// Add a remote repository to a contract.
pub async fn add_remote_repository(
pool: &PgPool,
contract_id: Uuid,
name: &str,
repository_url: &str,
is_primary: bool,
) -> Result<ContractRepository, sqlx::Error> {
// If is_primary, clear other primaries first
if is_primary {
sqlx::query(
r#"
UPDATE contract_repositories
SET is_primary = false, updated_at = NOW()
WHERE contract_id = $1 AND is_primary = true
"#,
)
.bind(contract_id)
.execute(pool)
.await?;
}
sqlx::query_as::<_, ContractRepository>(
r#"
INSERT INTO contract_repositories (contract_id, name, repository_url, source_type, status, is_primary)
VALUES ($1, $2, $3, 'remote', 'ready', $4)
RETURNING *
"#,
)
.bind(contract_id)
.bind(name)
.bind(repository_url)
.bind(is_primary)
.fetch_one(pool)
.await
}
/// Add a local repository to a contract.
pub async fn add_local_repository(
pool: &PgPool,
contract_id: Uuid,
name: &str,
local_path: &str,
is_primary: bool,
) -> Result<ContractRepository, sqlx::Error> {
// If is_primary, clear other primaries first
if is_primary {
sqlx::query(
r#"
UPDATE contract_repositories
SET is_primary = false, updated_at = NOW()
WHERE contract_id = $1 AND is_primary = true
"#,
)
.bind(contract_id)
.execute(pool)
.await?;
}
sqlx::query_as::<_, ContractRepository>(
r#"
INSERT INTO contract_repositories (contract_id, name, local_path, source_type, status, is_primary)
VALUES ($1, $2, $3, 'local', 'ready', $4)
RETURNING *
"#,
)
.bind(contract_id)
.bind(name)
.bind(local_path)
.bind(is_primary)
.fetch_one(pool)
.await
}
/// Create a managed repository (daemon will create it).
pub async fn create_managed_repository(
pool: &PgPool,
contract_id: Uuid,
name: &str,
is_primary: bool,
) -> Result<ContractRepository, sqlx::Error> {
// If is_primary, clear other primaries first
if is_primary {
sqlx::query(
r#"
UPDATE contract_repositories
SET is_primary = false, updated_at = NOW()
WHERE contract_id = $1 AND is_primary = true
"#,
)
.bind(contract_id)
.execute(pool)
.await?;
}
sqlx::query_as::<_, ContractRepository>(
r#"
INSERT INTO contract_repositories (contract_id, name, source_type, status, is_primary)
VALUES ($1, $2, 'managed', 'pending', $3)
RETURNING *
"#,
)
.bind(contract_id)
.bind(name)
.bind(is_primary)
.fetch_one(pool)
.await
}
/// Delete a repository from a contract.
pub async fn delete_contract_repository(
pool: &PgPool,
repo_id: Uuid,
contract_id: Uuid,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM contract_repositories
WHERE id = $1 AND contract_id = $2
"#,
)
.bind(repo_id)
.bind(contract_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Set a repository as primary (and clear others).
pub async fn set_repository_primary(
pool: &PgPool,
repo_id: Uuid,
contract_id: Uuid,
) -> Result<bool, sqlx::Error> {
// Clear other primaries
sqlx::query(
r#"
UPDATE contract_repositories
SET is_primary = false, updated_at = NOW()
WHERE contract_id = $1 AND is_primary = true
"#,
)
.bind(contract_id)
.execute(pool)
.await?;
// Set this one as primary
let result = sqlx::query(
r#"
UPDATE contract_repositories
SET is_primary = true, updated_at = NOW()
WHERE id = $1 AND contract_id = $2
"#,
)
.bind(repo_id)
.bind(contract_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Update managed repository status (used by daemon).
pub async fn update_managed_repository_status(
pool: &PgPool,
repo_id: Uuid,
status: &str,
repository_url: Option<&str>,
) -> Result<Option<ContractRepository>, sqlx::Error> {
sqlx::query_as::<_, ContractRepository>(
r#"
UPDATE contract_repositories
SET status = $2, repository_url = COALESCE($3, repository_url), updated_at = NOW()
WHERE id = $1
RETURNING *
"#,
)
.bind(repo_id)
.bind(status)
.bind(repository_url)
.fetch_optional(pool)
.await
}
// =============================================================================
// Contract Task Association Functions
// =============================================================================
/// Add a task to a contract.
pub async fn add_task_to_contract(
pool: &PgPool,
contract_id: Uuid,
task_id: Uuid,
owner_id: Uuid,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
UPDATE tasks
SET contract_id = $2, updated_at = NOW()
WHERE id = $1 AND owner_id = $3
"#,
)
.bind(task_id)
.bind(contract_id)
.bind(owner_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Remove a task from a contract.
pub async fn remove_task_from_contract(
pool: &PgPool,
contract_id: Uuid,
task_id: Uuid,
owner_id: Uuid,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
UPDATE tasks
SET contract_id = NULL, updated_at = NOW()
WHERE id = $1 AND contract_id = $2 AND owner_id = $3
"#,
)
.bind(task_id)
.bind(contract_id)
.bind(owner_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// List files in a contract.
pub async fn list_files_in_contract(
pool: &PgPool,
contract_id: Uuid,
owner_id: Uuid,
) -> Result<Vec<FileSummary>, sqlx::Error> {
// Use a manual query since FileSummary doesn't have a FromRow derive with all the computed fields
let files = sqlx::query_as::<_, File>(
r#"
SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
WHERE contract_id = $1 AND owner_id = $2
ORDER BY created_at DESC
"#,
)
.bind(contract_id)
.bind(owner_id)
.fetch_all(pool)
.await?;
Ok(files.into_iter().map(FileSummary::from).collect())
}
/// List tasks in a contract.
pub async fn list_tasks_in_contract(
pool: &PgPool,
contract_id: Uuid,
owner_id: Uuid,
) -> Result<Vec<TaskSummary>, sqlx::Error> {
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
c.status as contract_status,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.contract_id = $1 AND t.owner_id = $2
ORDER BY t.priority DESC, t.created_at DESC
"#,
)
.bind(contract_id)
.bind(owner_id)
.fetch_all(pool)
.await
}
/// Minimal task info for worktree cleanup operations.
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct TaskWorktreeInfo {
pub id: Uuid,
pub daemon_id: Option<Uuid>,
pub overlay_path: Option<String>,
/// If set, this task shares the worktree of the specified supervisor task.
/// Should NOT have its worktree deleted during cleanup.
pub supervisor_worktree_task_id: Option<Uuid>,
}
/// List tasks in a contract with their daemon/worktree info.
/// Used for cleaning up worktrees when a contract is completed or deleted.
pub async fn list_contract_tasks_with_worktree_info(
pool: &PgPool,
contract_id: Uuid,
) -> Result<Vec<TaskWorktreeInfo>, sqlx::Error> {
sqlx::query_as::<_, TaskWorktreeInfo>(
r#"
SELECT id, daemon_id, overlay_path, supervisor_worktree_task_id
FROM tasks
WHERE contract_id = $1 AND (daemon_id IS NOT NULL OR overlay_path IS NOT NULL)
"#,
)
.bind(contract_id)
.fetch_all(pool)
.await
}
// =============================================================================
// Contract Events
// =============================================================================
/// List events for a contract.
pub async fn list_contract_events(
pool: &PgPool,
contract_id: Uuid,
) -> Result<Vec<ContractEvent>, sqlx::Error> {
sqlx::query_as::<_, ContractEvent>(
r#"
SELECT *
FROM contract_events
WHERE contract_id = $1
ORDER BY created_at DESC
"#,
)
.bind(contract_id)
.fetch_all(pool)
.await
}
/// Record a contract event.
pub async fn record_contract_event(
pool: &PgPool,
contract_id: Uuid,
event_type: &str,
event_data: Option<serde_json::Value>,
) -> Result<ContractEvent, sqlx::Error> {
sqlx::query_as::<_, ContractEvent>(
r#"
INSERT INTO contract_events (contract_id, event_type, event_data)
VALUES ($1, $2, $3)
RETURNING *
"#,
)
.bind(contract_id)
.bind(event_type)
.bind(event_data)
.fetch_one(pool)
.await
}
// ============================================================================
// Task Checkpoints
// ============================================================================
/// Create a checkpoint for a task.
pub async fn create_task_checkpoint(
pool: &PgPool,
task_id: Uuid,
commit_sha: &str,
branch_name: &str,
message: &str,
files_changed: Option<serde_json::Value>,
lines_added: Option<i32>,
lines_removed: Option<i32>,
) -> Result<TaskCheckpoint, sqlx::Error> {
// Get current checkpoint count and increment
let checkpoint_number: i32 = sqlx::query_scalar(
"SELECT COALESCE(MAX(checkpoint_number), 0) + 1 FROM task_checkpoints WHERE task_id = $1",
)
.bind(task_id)
.fetch_one(pool)
.await?;
// Update task's checkpoint tracking
sqlx::query(
r#"
UPDATE tasks
SET last_checkpoint_sha = $1,
checkpoint_count = $2,
checkpoint_message = $3,
updated_at = NOW()
WHERE id = $4
"#,
)
.bind(commit_sha)
.bind(checkpoint_number)
.bind(message)
.bind(task_id)
.execute(pool)
.await?;
sqlx::query_as::<_, TaskCheckpoint>(
r#"
INSERT INTO task_checkpoints (
task_id, checkpoint_number, commit_sha, branch_name, message,
files_changed, lines_added, lines_removed
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING *
"#,
)
.bind(task_id)
.bind(checkpoint_number)
.bind(commit_sha)
.bind(branch_name)
.bind(message)
.bind(files_changed)
.bind(lines_added)
.bind(lines_removed)
.fetch_one(pool)
.await
}
/// Get a checkpoint by ID.
pub async fn get_task_checkpoint(
pool: &PgPool,
id: Uuid,
) -> Result<Option<TaskCheckpoint>, sqlx::Error> {
sqlx::query_as::<_, TaskCheckpoint>("SELECT * FROM task_checkpoints WHERE id = $1")
.bind(id)
.fetch_optional(pool)
.await
}
/// Get a checkpoint by commit SHA.
pub async fn get_task_checkpoint_by_sha(
pool: &PgPool,
commit_sha: &str,
) -> Result<Option<TaskCheckpoint>, sqlx::Error> {
sqlx::query_as::<_, TaskCheckpoint>("SELECT * FROM task_checkpoints WHERE commit_sha = $1")
.bind(commit_sha)
.fetch_optional(pool)
.await
}
/// List checkpoints for a task.
pub async fn list_task_checkpoints(
pool: &PgPool,
task_id: Uuid,
) -> Result<Vec<TaskCheckpoint>, sqlx::Error> {
sqlx::query_as::<_, TaskCheckpoint>(
"SELECT * FROM task_checkpoints WHERE task_id = $1 ORDER BY checkpoint_number DESC",
)
.bind(task_id)
.fetch_all(pool)
.await
}
// ============================================================================
// Supervisor State
// ============================================================================
/// Create or update supervisor state for a contract.
pub async fn upsert_supervisor_state(
pool: &PgPool,
contract_id: Uuid,
task_id: Uuid,
conversation_history: serde_json::Value,
pending_task_ids: &[Uuid],
phase: &str,
) -> Result<SupervisorState, sqlx::Error> {
sqlx::query_as::<_, SupervisorState>(
r#"
INSERT INTO supervisor_states (contract_id, task_id, conversation_history, pending_task_ids, phase, last_activity)
VALUES ($1, $2, $3, $4, $5, NOW())
ON CONFLICT (contract_id) DO UPDATE SET
task_id = EXCLUDED.task_id,
conversation_history = EXCLUDED.conversation_history,
pending_task_ids = EXCLUDED.pending_task_ids,
phase = EXCLUDED.phase,
last_activity = NOW(),
updated_at = NOW()
RETURNING *
"#,
)
.bind(contract_id)
.bind(task_id)
.bind(conversation_history)
.bind(pending_task_ids)
.bind(phase)
.fetch_one(pool)
.await
}
/// Get supervisor state for a contract.
pub async fn get_supervisor_state(
pool: &PgPool,
contract_id: Uuid,
) -> Result<Option<SupervisorState>, sqlx::Error> {
sqlx::query_as::<_, SupervisorState>("SELECT * FROM supervisor_states WHERE contract_id = $1")
.bind(contract_id)
.fetch_optional(pool)
.await
}
/// Get supervisor state by task ID.
pub async fn get_supervisor_state_by_task(
pool: &PgPool,
task_id: Uuid,
) -> Result<Option<SupervisorState>, sqlx::Error> {
sqlx::query_as::<_, SupervisorState>("SELECT * FROM supervisor_states WHERE task_id = $1")
.bind(task_id)
.fetch_optional(pool)
.await
}
/// Update supervisor conversation history.
pub async fn update_supervisor_conversation(
pool: &PgPool,
contract_id: Uuid,
conversation_history: serde_json::Value,
) -> Result<SupervisorState, sqlx::Error> {
sqlx::query_as::<_, SupervisorState>(
r#"
UPDATE supervisor_states
SET conversation_history = $1,
last_activity = NOW(),
updated_at = NOW()
WHERE contract_id = $2
RETURNING *
"#,
)
.bind(conversation_history)
.bind(contract_id)
.fetch_one(pool)
.await
}
/// Update supervisor pending tasks.
pub async fn update_supervisor_pending_tasks(
pool: &PgPool,
contract_id: Uuid,
pending_task_ids: &[Uuid],
) -> Result<SupervisorState, sqlx::Error> {
sqlx::query_as::<_, SupervisorState>(
r#"
UPDATE supervisor_states
SET pending_task_ids = $1,
last_activity = NOW(),
updated_at = NOW()
WHERE contract_id = $2
RETURNING *
"#,
)
.bind(pending_task_ids)
.bind(contract_id)
.fetch_one(pool)
.await
}
/// Update supervisor state with detailed activity tracking.
/// Called at key save points: LLM response, task spawn, question asked, phase change.
pub async fn update_supervisor_detailed_state(
pool: &PgPool,
contract_id: Uuid,
state: &str,
current_activity: Option<&str>,
progress: i32,
error_message: Option<&str>,
) -> Result<SupervisorState, sqlx::Error> {
sqlx::query_as::<_, SupervisorState>(
r#"
UPDATE supervisor_states
SET state = $1,
current_activity = $2,
progress = $3,
error_message = $4,
last_activity = NOW(),
updated_at = NOW()
WHERE contract_id = $5
RETURNING *
"#,
)
.bind(state)
.bind(current_activity)
.bind(progress)
.bind(error_message)
.bind(contract_id)
.fetch_one(pool)
.await
}
/// Add a spawned task ID to the supervisor's list.
pub async fn add_supervisor_spawned_task(
pool: &PgPool,
contract_id: Uuid,
task_id: Uuid,
) -> Result<SupervisorState, sqlx::Error> {
sqlx::query_as::<_, SupervisorState>(
r#"
UPDATE supervisor_states
SET spawned_task_ids = array_append(spawned_task_ids, $1),
last_activity = NOW(),
updated_at = NOW()
WHERE contract_id = $2
RETURNING *
"#,
)
.bind(task_id)
.bind(contract_id)
.fetch_one(pool)
.await
}
/// Add a pending question to the supervisor state.
pub async fn add_supervisor_pending_question(
pool: &PgPool,
contract_id: Uuid,
question: serde_json::Value,
) -> Result<SupervisorState, sqlx::Error> {
sqlx::query_as::<_, SupervisorState>(
r#"
UPDATE supervisor_states
SET pending_questions = pending_questions || $1::jsonb,
state = 'waiting_for_user',
last_activity = NOW(),
updated_at = NOW()
WHERE contract_id = $2
RETURNING *
"#,
)
.bind(question)
.bind(contract_id)
.fetch_one(pool)
.await
}
/// Remove a pending question by ID.
pub async fn remove_supervisor_pending_question(
pool: &PgPool,
contract_id: Uuid,
question_id: Uuid,
) -> Result<SupervisorState, sqlx::Error> {
sqlx::query_as::<_, SupervisorState>(
r#"
UPDATE supervisor_states
SET pending_questions = (
SELECT COALESCE(jsonb_agg(elem), '[]'::jsonb)
FROM jsonb_array_elements(pending_questions) elem
WHERE (elem->>'id')::uuid != $1
),
last_activity = NOW(),
updated_at = NOW()
WHERE contract_id = $2
RETURNING *
"#,
)
.bind(question_id)
.bind(contract_id)
.fetch_one(pool)
.await
}
/// Comprehensive state save - used at major save points.
pub async fn save_supervisor_state_full(
pool: &PgPool,
contract_id: Uuid,
task_id: Uuid,
conversation_history: serde_json::Value,
pending_task_ids: &[Uuid],
phase: &str,
state: &str,
current_activity: Option<&str>,
progress: i32,
error_message: Option<&str>,
spawned_task_ids: &[Uuid],
pending_questions: serde_json::Value,
) -> Result<SupervisorState, sqlx::Error> {
sqlx::query_as::<_, SupervisorState>(
r#"
INSERT INTO supervisor_states (
contract_id, task_id, conversation_history, pending_task_ids, phase,
state, current_activity, progress, error_message, spawned_task_ids,
pending_questions, last_activity
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW())
ON CONFLICT (contract_id) DO UPDATE SET
task_id = EXCLUDED.task_id,
conversation_history = EXCLUDED.conversation_history,
pending_task_ids = EXCLUDED.pending_task_ids,
phase = EXCLUDED.phase,
state = EXCLUDED.state,
current_activity = EXCLUDED.current_activity,
progress = EXCLUDED.progress,
error_message = EXCLUDED.error_message,
spawned_task_ids = EXCLUDED.spawned_task_ids,
pending_questions = EXCLUDED.pending_questions,
last_activity = NOW(),
updated_at = NOW()
RETURNING *
"#,
)
.bind(contract_id)
.bind(task_id)
.bind(conversation_history)
.bind(pending_task_ids)
.bind(phase)
.bind(state)
.bind(current_activity)
.bind(progress)
.bind(error_message)
.bind(spawned_task_ids)
.bind(pending_questions)
.fetch_one(pool)
.await
}
/// Mark supervisor as restored from a crash/interruption.
pub async fn mark_supervisor_restored(
pool: &PgPool,
contract_id: Uuid,
restoration_source: &str,
) -> Result<SupervisorState, sqlx::Error> {
sqlx::query_as::<_, SupervisorState>(
r#"
UPDATE supervisor_states
SET restoration_count = restoration_count + 1,
last_restored_at = NOW(),
restoration_source = $1,
state = 'initializing',
error_message = NULL,
last_activity = NOW(),
updated_at = NOW()
WHERE contract_id = $2
RETURNING *
"#,
)
.bind(restoration_source)
.bind(contract_id)
.fetch_one(pool)
.await
}
/// Get supervisors with pending questions (for re-delivery after restoration).
pub async fn get_supervisors_with_pending_questions(
pool: &PgPool,
owner_id: Uuid,
) -> Result<Vec<SupervisorState>, sqlx::Error> {
sqlx::query_as::<_, SupervisorState>(
r#"
SELECT ss.*
FROM supervisor_states ss
JOIN contracts c ON c.id = ss.contract_id
WHERE c.owner_id = $1
AND ss.pending_questions != '[]'::jsonb
AND c.status = 'active'
ORDER BY ss.last_activity DESC
"#,
)
.bind(owner_id)
.fetch_all(pool)
.await
}
/// Get supervisor state with full details for restoration.
/// Includes validation info.
pub async fn get_supervisor_state_for_restoration(
pool: &PgPool,
contract_id: Uuid,
) -> Result<Option<SupervisorState>, sqlx::Error> {
sqlx::query_as::<_, SupervisorState>(
r#"
SELECT * FROM supervisor_states WHERE contract_id = $1
"#,
)
.bind(contract_id)
.fetch_optional(pool)
.await
}
/// Validate spawned tasks are in expected states.
/// Returns map of task_id -> (status, updated_at).
pub async fn validate_spawned_tasks(
pool: &PgPool,
task_ids: &[Uuid],
) -> Result<std::collections::HashMap<Uuid, (String, chrono::DateTime<Utc>)>, sqlx::Error> {
use sqlx::Row;
let rows = sqlx::query(
r#"
SELECT id, status, updated_at
FROM tasks
WHERE id = ANY($1)
"#,
)
.bind(task_ids)
.fetch_all(pool)
.await?;
let mut result = std::collections::HashMap::new();
for row in rows {
let id: Uuid = row.get("id");
let status: String = row.get("status");
let updated_at: chrono::DateTime<Utc> = row.get("updated_at");
result.insert(id, (status, updated_at));
}
Ok(result)
}
/// Update supervisor state when phase changes.
pub async fn update_supervisor_phase(
pool: &PgPool,
contract_id: Uuid,
new_phase: &str,
) -> Result<SupervisorState, sqlx::Error> {
sqlx::query_as::<_, SupervisorState>(
r#"
UPDATE supervisor_states
SET phase = $1,
state = 'working',
current_activity = 'Phase changed to ' || $1,
last_activity = NOW(),
updated_at = NOW()
WHERE contract_id = $2
RETURNING *
"#,
)
.bind(new_phase)
.bind(contract_id)
.fetch_one(pool)
.await
}
/// Update supervisor state on heartbeat (lightweight update).
pub async fn update_supervisor_heartbeat_state(
pool: &PgPool,
contract_id: Uuid,
state: &str,
current_activity: Option<&str>,
progress: i32,
pending_task_ids: &[Uuid],
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
UPDATE supervisor_states
SET state = $1,
current_activity = $2,
progress = $3,
pending_task_ids = $4,
last_activity = NOW(),
updated_at = NOW()
WHERE contract_id = $5
"#,
)
.bind(state)
.bind(current_activity)
.bind(progress)
.bind(pending_task_ids)
.bind(contract_id)
.execute(pool)
.await?;
Ok(())
}
// ============================================================================
// Supervisor Heartbeats
// ============================================================================
/// Record a supervisor heartbeat.
/// This creates a historical record for monitoring and dead supervisor detection.
pub async fn create_supervisor_heartbeat(
pool: &PgPool,
supervisor_task_id: Uuid,
contract_id: Uuid,
state: &str,
phase: &str,
current_activity: Option<&str>,
progress: i32,
pending_task_ids: &[Uuid],
) -> Result<SupervisorHeartbeatRecord, sqlx::Error> {
sqlx::query_as::<_, SupervisorHeartbeatRecord>(
r#"
INSERT INTO supervisor_heartbeats (
supervisor_task_id, contract_id, state, phase, current_activity, progress, pending_task_ids, timestamp
)
VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
RETURNING *
"#,
)
.bind(supervisor_task_id)
.bind(contract_id)
.bind(state)
.bind(phase)
.bind(current_activity)
.bind(progress)
.bind(pending_task_ids)
.fetch_one(pool)
.await
}
/// Get the latest heartbeat for a supervisor task.
pub async fn get_latest_supervisor_heartbeat(
pool: &PgPool,
supervisor_task_id: Uuid,
) -> Result<Option<SupervisorHeartbeatRecord>, sqlx::Error> {
sqlx::query_as::<_, SupervisorHeartbeatRecord>(
r#"
SELECT * FROM supervisor_heartbeats
WHERE supervisor_task_id = $1
ORDER BY timestamp DESC
LIMIT 1
"#,
)
.bind(supervisor_task_id)
.fetch_optional(pool)
.await
}
/// Get recent heartbeats for a supervisor task.
pub async fn get_supervisor_heartbeats(
pool: &PgPool,
supervisor_task_id: Uuid,
limit: i64,
) -> Result<Vec<SupervisorHeartbeatRecord>, sqlx::Error> {
sqlx::query_as::<_, SupervisorHeartbeatRecord>(
r#"
SELECT * FROM supervisor_heartbeats
WHERE supervisor_task_id = $1
ORDER BY timestamp DESC
LIMIT $2
"#,
)
.bind(supervisor_task_id)
.bind(limit)
.fetch_all(pool)
.await
}
/// Get recent heartbeats for a contract.
pub async fn get_contract_supervisor_heartbeats(
pool: &PgPool,
contract_id: Uuid,
limit: i64,
) -> Result<Vec<SupervisorHeartbeatRecord>, sqlx::Error> {
sqlx::query_as::<_, SupervisorHeartbeatRecord>(
r#"
SELECT * FROM supervisor_heartbeats
WHERE contract_id = $1
ORDER BY timestamp DESC
LIMIT $2
"#,
)
.bind(contract_id)
.bind(limit)
.fetch_all(pool)
.await
}
/// Delete old heartbeats beyond the TTL (24 hours by default).
/// Returns the number of deleted records.
pub async fn cleanup_old_heartbeats(
pool: &PgPool,
ttl_hours: i64,
) -> Result<u64, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM supervisor_heartbeats
WHERE timestamp < NOW() - ($1 || ' hours')::INTERVAL
"#,
)
.bind(ttl_hours.to_string())
.execute(pool)
.await?;
Ok(result.rows_affected())
}
/// Find supervisors that have not sent a heartbeat within the timeout period.
/// Returns list of (supervisor_task_id, contract_id, last_heartbeat_timestamp).
pub async fn find_stale_supervisors(
pool: &PgPool,
timeout_seconds: i64,
) -> Result<Vec<(Uuid, Uuid, chrono::DateTime<Utc>)>, sqlx::Error> {
let rows = sqlx::query(
r#"
WITH latest_heartbeats AS (
SELECT DISTINCT ON (supervisor_task_id)
supervisor_task_id,
contract_id,
timestamp
FROM supervisor_heartbeats
ORDER BY supervisor_task_id, timestamp DESC
)
SELECT
lh.supervisor_task_id,
lh.contract_id,
lh.timestamp
FROM latest_heartbeats lh
JOIN tasks t ON t.id = lh.supervisor_task_id
WHERE t.status = 'running'
AND lh.timestamp < NOW() - ($1 || ' seconds')::INTERVAL
"#,
)
.bind(timeout_seconds.to_string())
.fetch_all(pool)
.await?;
let mut result = Vec::new();
for row in rows {
use sqlx::Row;
let supervisor_task_id: Uuid = row.get("supervisor_task_id");
let contract_id: Uuid = row.get("contract_id");
let timestamp: chrono::DateTime<Utc> = row.get("timestamp");
result.push((supervisor_task_id, contract_id, timestamp));
}
Ok(result)
}
// ============================================================================
// Contract Supervisor
// ============================================================================
/// Update contract's supervisor task ID.
pub async fn update_contract_supervisor(
pool: &PgPool,
contract_id: Uuid,
supervisor_task_id: Uuid,
) -> Result<Contract, sqlx::Error> {
sqlx::query_as::<_, Contract>(
r#"
UPDATE contracts
SET supervisor_task_id = $1,
updated_at = NOW()
WHERE id = $2
RETURNING *
"#,
)
.bind(supervisor_task_id)
.bind(contract_id)
.fetch_one(pool)
.await
}
/// Mark a deliverable as complete for a specific phase.
/// Uses JSONB operations to append the deliverable_id to the phase's array.
pub async fn mark_deliverable_complete(
pool: &PgPool,
contract_id: Uuid,
phase: &str,
deliverable_id: &str,
) -> Result<Contract, sqlx::Error> {
// Use jsonb_set to add the deliverable to the phase's array
// If the phase key doesn't exist, create an empty array first
// COALESCE handles the case where the phase array doesn't exist yet
sqlx::query_as::<_, Contract>(
r#"
UPDATE contracts
SET completed_deliverables = jsonb_set(
completed_deliverables,
ARRAY[$2::text],
COALESCE(completed_deliverables->$2, '[]'::jsonb) || to_jsonb($3::text),
true
),
updated_at = NOW()
WHERE id = $1
AND NOT (COALESCE(completed_deliverables->$2, '[]'::jsonb) ? $3)
RETURNING *
"#,
)
.bind(contract_id)
.bind(phase)
.bind(deliverable_id)
.fetch_one(pool)
.await
}
/// Clear all completed deliverables for a specific phase.
/// Used when phase changes or deliverables need to be reset.
pub async fn clear_phase_deliverables(
pool: &PgPool,
contract_id: Uuid,
phase: &str,
) -> Result<Contract, sqlx::Error> {
sqlx::query_as::<_, Contract>(
r#"
UPDATE contracts
SET completed_deliverables = completed_deliverables - $2,
updated_at = NOW()
WHERE id = $1
RETURNING *
"#,
)
.bind(contract_id)
.bind(phase)
.fetch_one(pool)
.await
}
/// Get the supervisor task for a contract.
pub async fn get_contract_supervisor_task(
pool: &PgPool,
contract_id: Uuid,
) -> Result<Option<Task>, sqlx::Error> {
sqlx::query_as::<_, Task>(
r#"
SELECT t.* FROM tasks t
JOIN contracts c ON c.supervisor_task_id = t.id
WHERE c.id = $1
"#,
)
.bind(contract_id)
.fetch_optional(pool)
.await
}
// ============================================================================
// Task Tree Queries
// ============================================================================
/// Get full task tree for a contract.
pub async fn get_contract_task_tree(
pool: &PgPool,
contract_id: Uuid,
) -> Result<Vec<Task>, sqlx::Error> {
sqlx::query_as::<_, Task>(
r#"
WITH RECURSIVE task_tree AS (
-- Base case: root tasks (no parent)
SELECT * FROM tasks
WHERE contract_id = $1 AND parent_task_id IS NULL
UNION ALL
-- Recursive case: children of current level
SELECT t.* FROM tasks t
JOIN task_tree tt ON t.parent_task_id = tt.id
)
SELECT * FROM task_tree
ORDER BY depth, created_at
"#,
)
.bind(contract_id)
.fetch_all(pool)
.await
}
/// Get task tree from a specific root task.
pub async fn get_task_tree(pool: &PgPool, root_task_id: Uuid) -> Result<Vec<Task>, sqlx::Error> {
sqlx::query_as::<_, Task>(
r#"
WITH RECURSIVE task_tree AS (
-- Base case: the root task
SELECT * FROM tasks WHERE id = $1
UNION ALL
-- Recursive case: children of current level
SELECT t.* FROM tasks t
JOIN task_tree tt ON t.parent_task_id = tt.id
)
SELECT * FROM task_tree
ORDER BY depth, created_at
"#,
)
.bind(root_task_id)
.fetch_all(pool)
.await
}
// ============================================================================
// Daemon Selection
// ============================================================================
/// Get daemons with capacity info for selection.
pub async fn get_available_daemons(
pool: &PgPool,
owner_id: Uuid,
) -> Result<Vec<DaemonWithCapacity>, sqlx::Error> {
sqlx::query_as::<_, DaemonWithCapacity>(
r#"
SELECT id, owner_id, connection_id, hostname, machine_id,
max_concurrent_tasks, current_task_count,
capacity_score, task_queue_length, supports_migration,
status, last_heartbeat_at, connected_at
FROM daemons
WHERE owner_id = $1 AND status = 'connected'
ORDER BY
COALESCE(capacity_score, 100) DESC,
(max_concurrent_tasks - current_task_count) DESC,
COALESCE(task_queue_length, 0) ASC
"#,
)
.bind(owner_id)
.fetch_all(pool)
.await
}
/// Get daemons with capacity info for selection, excluding specified daemon IDs.
/// Used for task retry to avoid reassigning to daemons that have already failed.
pub async fn get_available_daemons_excluding(
pool: &PgPool,
owner_id: Uuid,
exclude_daemon_ids: &[Uuid],
) -> Result<Vec<DaemonWithCapacity>, sqlx::Error> {
sqlx::query_as::<_, DaemonWithCapacity>(
r#"
SELECT id, owner_id, connection_id, hostname, machine_id,
max_concurrent_tasks, current_task_count,
capacity_score, task_queue_length, supports_migration,
status, last_heartbeat_at, connected_at
FROM daemons
WHERE owner_id = $1
AND status = 'connected'
AND id != ALL($2)
ORDER BY
COALESCE(capacity_score, 100) DESC,
(max_concurrent_tasks - current_task_count) DESC,
COALESCE(task_queue_length, 0) ASC
"#,
)
.bind(owner_id)
.bind(exclude_daemon_ids)
.fetch_all(pool)
.await
}
/// Create a daemon task assignment.
pub async fn create_daemon_task_assignment(
pool: &PgPool,
daemon_id: Uuid,
task_id: Uuid,
) -> Result<DaemonTaskAssignment, sqlx::Error> {
sqlx::query_as::<_, DaemonTaskAssignment>(
r#"
INSERT INTO daemon_task_assignments (daemon_id, task_id)
VALUES ($1, $2)
RETURNING *
"#,
)
.bind(daemon_id)
.bind(task_id)
.fetch_one(pool)
.await
}
/// Update daemon task assignment status.
pub async fn update_daemon_task_assignment_status(
pool: &PgPool,
task_id: Uuid,
status: &str,
) -> Result<DaemonTaskAssignment, sqlx::Error> {
sqlx::query_as::<_, DaemonTaskAssignment>(
r#"
UPDATE daemon_task_assignments
SET status = $1
WHERE task_id = $2
RETURNING *
"#,
)
.bind(status)
.bind(task_id)
.fetch_one(pool)
.await
}
/// Get daemon task assignment for a task.
pub async fn get_daemon_task_assignment(
pool: &PgPool,
task_id: Uuid,
) -> Result<Option<DaemonTaskAssignment>, sqlx::Error> {
sqlx::query_as::<_, DaemonTaskAssignment>(
"SELECT * FROM daemon_task_assignments WHERE task_id = $1",
)
.bind(task_id)
.fetch_optional(pool)
.await
}
// ============================================================================
// Repository History Functions
// ============================================================================
use super::models::RepositoryHistoryEntry;
/// List all repository history entries for an owner, ordered by use_count DESC, last_used_at DESC.
pub async fn list_repository_history_for_owner(
pool: &PgPool,
owner_id: Uuid,
) -> Result<Vec<RepositoryHistoryEntry>, sqlx::Error> {
sqlx::query_as::<_, RepositoryHistoryEntry>(
r#"
SELECT id, owner_id, name, repository_url, local_path, source_type, use_count, last_used_at, created_at
FROM repository_history
WHERE owner_id = $1
ORDER BY use_count DESC, last_used_at DESC
"#,
)
.bind(owner_id)
.fetch_all(pool)
.await
}
/// Get repository suggestions for an owner, optionally filtered by source type and query.
pub async fn get_repository_suggestions(
pool: &PgPool,
owner_id: Uuid,
source_type: Option<&str>,
query: Option<&str>,
limit: i32,
) -> Result<Vec<RepositoryHistoryEntry>, sqlx::Error> {
// Build query dynamically based on filters
let mut sql = String::from(
r#"
SELECT id, owner_id, name, repository_url, local_path, source_type, use_count, last_used_at, created_at
FROM repository_history
WHERE owner_id = $1
"#,
);
let mut param_idx = 2;
if source_type.is_some() {
sql.push_str(&format!(" AND source_type = ${}", param_idx));
param_idx += 1;
}
if query.is_some() {
sql.push_str(&format!(
" AND (LOWER(name) LIKE ${} OR LOWER(COALESCE(repository_url, '')) LIKE ${} OR LOWER(COALESCE(local_path, '')) LIKE ${})",
param_idx, param_idx, param_idx
));
param_idx += 1;
}
sql.push_str(&format!(
" ORDER BY use_count DESC, last_used_at DESC LIMIT ${}",
param_idx
));
// Build and execute query with the appropriate bindings
let mut query_builder = sqlx::query_as::<_, RepositoryHistoryEntry>(&sql).bind(owner_id);
if let Some(st) = source_type {
query_builder = query_builder.bind(st);
}
if let Some(q) = query {
let search_pattern = format!("%{}%", q.to_lowercase());
query_builder = query_builder.bind(search_pattern);
}
query_builder = query_builder.bind(limit);
query_builder.fetch_all(pool).await
}
/// Add or update a repository history entry.
/// If an entry with the same URL (for remote) or path (for local) already exists,
/// increment use_count and update last_used_at and name.
/// Otherwise, create a new entry.
pub async fn add_or_update_repository_history(
pool: &PgPool,
owner_id: Uuid,
name: &str,
repository_url: Option<&str>,
local_path: Option<&str>,
source_type: &str,
) -> Result<RepositoryHistoryEntry, sqlx::Error> {
// Use UPSERT (INSERT ... ON CONFLICT)
if source_type == "remote" {
let url = repository_url.ok_or_else(|| {
sqlx::Error::Protocol("repository_url required for remote type".to_string())
})?;
sqlx::query_as::<_, RepositoryHistoryEntry>(
r#"
INSERT INTO repository_history (owner_id, name, repository_url, local_path, source_type, use_count, last_used_at)
VALUES ($1, $2, $3, NULL, $4, 1, NOW())
ON CONFLICT (owner_id, repository_url) WHERE source_type = 'remote' AND repository_url IS NOT NULL
DO UPDATE SET
name = EXCLUDED.name,
use_count = repository_history.use_count + 1,
last_used_at = NOW()
RETURNING id, owner_id, name, repository_url, local_path, source_type, use_count, last_used_at, created_at
"#,
)
.bind(owner_id)
.bind(name)
.bind(url)
.bind(source_type)
.fetch_one(pool)
.await
} else if source_type == "local" {
let path = local_path.ok_or_else(|| {
sqlx::Error::Protocol("local_path required for local type".to_string())
})?;
sqlx::query_as::<_, RepositoryHistoryEntry>(
r#"
INSERT INTO repository_history (owner_id, name, repository_url, local_path, source_type, use_count, last_used_at)
VALUES ($1, $2, NULL, $3, $4, 1, NOW())
ON CONFLICT (owner_id, local_path) WHERE source_type = 'local' AND local_path IS NOT NULL
DO UPDATE SET
name = EXCLUDED.name,
use_count = repository_history.use_count + 1,
last_used_at = NOW()
RETURNING id, owner_id, name, repository_url, local_path, source_type, use_count, last_used_at, created_at
"#,
)
.bind(owner_id)
.bind(name)
.bind(path)
.bind(source_type)
.fetch_one(pool)
.await
} else {
Err(sqlx::Error::Protocol(format!(
"Invalid source_type: {}",
source_type
)))
}
}
/// Delete a repository history entry.
/// Returns true if an entry was deleted, false if not found.
pub async fn delete_repository_history(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM repository_history
WHERE id = $1 AND owner_id = $2
"#,
)
.bind(id)
.bind(owner_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
// ============================================================================
// Conversation Snapshots
// ============================================================================
/// Create a new conversation snapshot
pub async fn create_conversation_snapshot(
pool: &PgPool,
task_id: Uuid,
checkpoint_id: Option<Uuid>,
snapshot_type: &str,
message_count: i32,
conversation_state: serde_json::Value,
metadata: Option<serde_json::Value>,
) -> Result<ConversationSnapshot, sqlx::Error> {
sqlx::query_as::<_, ConversationSnapshot>(
r#"
INSERT INTO conversation_snapshots (task_id, checkpoint_id, snapshot_type, message_count, conversation_state, metadata)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING *
"#
)
.bind(task_id)
.bind(checkpoint_id)
.bind(snapshot_type)
.bind(message_count)
.bind(conversation_state)
.bind(metadata)
.fetch_one(pool)
.await
}
/// Get a conversation snapshot by ID
pub async fn get_conversation_snapshot(
pool: &PgPool,
id: Uuid,
) -> Result<Option<ConversationSnapshot>, sqlx::Error> {
sqlx::query_as::<_, ConversationSnapshot>(
"SELECT * FROM conversation_snapshots WHERE id = $1"
)
.bind(id)
.fetch_optional(pool)
.await
}
/// Get conversation snapshot at a specific checkpoint
pub async fn get_conversation_at_checkpoint(
pool: &PgPool,
checkpoint_id: Uuid,
) -> Result<Option<ConversationSnapshot>, sqlx::Error> {
sqlx::query_as::<_, ConversationSnapshot>(
"SELECT * FROM conversation_snapshots WHERE checkpoint_id = $1 ORDER BY created_at DESC LIMIT 1"
)
.bind(checkpoint_id)
.fetch_optional(pool)
.await
}
/// List conversation snapshots for a task
pub async fn list_conversation_snapshots(
pool: &PgPool,
task_id: Uuid,
limit: Option<i32>,
) -> Result<Vec<ConversationSnapshot>, sqlx::Error> {
let limit = limit.unwrap_or(100);
sqlx::query_as::<_, ConversationSnapshot>(
"SELECT * FROM conversation_snapshots WHERE task_id = $1 ORDER BY created_at DESC LIMIT $2"
)
.bind(task_id)
.bind(limit)
.fetch_all(pool)
.await
}
/// Delete conversation snapshots older than retention period
pub async fn cleanup_old_snapshots(
pool: &PgPool,
retention_days: i32,
) -> Result<u64, sqlx::Error> {
let result = sqlx::query(
"DELETE FROM conversation_snapshots WHERE created_at < NOW() - INTERVAL '1 day' * $1"
)
.bind(retention_days)
.execute(pool)
.await?;
Ok(result.rows_affected())
}
// ============================================================================
// History Events
// ============================================================================
/// Record a new history event
#[allow(clippy::too_many_arguments)]
pub async fn record_history_event(
pool: &PgPool,
owner_id: Uuid,
contract_id: Option<Uuid>,
task_id: Option<Uuid>,
event_type: &str,
event_subtype: Option<&str>,
phase: Option<&str>,
event_data: serde_json::Value,
) -> Result<HistoryEvent, sqlx::Error> {
sqlx::query_as::<_, HistoryEvent>(
r#"
INSERT INTO history_events (owner_id, contract_id, task_id, event_type, event_subtype, phase, event_data)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING *
"#
)
.bind(owner_id)
.bind(contract_id)
.bind(task_id)
.bind(event_type)
.bind(event_subtype)
.bind(phase)
.bind(event_data)
.fetch_one(pool)
.await
}
/// Get contract history timeline
pub async fn get_contract_history(
pool: &PgPool,
contract_id: Uuid,
owner_id: Uuid,
filters: &HistoryQueryFilters,
) -> Result<(Vec<HistoryEvent>, i64), sqlx::Error> {
let limit = filters.limit.unwrap_or(100);
let mut query = String::from(
"SELECT * FROM history_events WHERE contract_id = $1 AND owner_id = $2"
);
let mut count_query = String::from(
"SELECT COUNT(*) FROM history_events WHERE contract_id = $1 AND owner_id = $2"
);
let mut param_count = 2;
if filters.phase.is_some() {
param_count += 1;
query.push_str(&format!(" AND phase = ${}" , param_count));
count_query.push_str(&format!(" AND phase = ${}", param_count));
}
if filters.from.is_some() {
param_count += 1;
query.push_str(&format!(" AND created_at >= ${}", param_count));
count_query.push_str(&format!(" AND created_at >= ${}", param_count));
}
if filters.to.is_some() {
param_count += 1;
query.push_str(&format!(" AND created_at <= ${}", param_count));
count_query.push_str(&format!(" AND created_at <= ${}", param_count));
}
query.push_str(" ORDER BY created_at DESC");
query.push_str(&format!(" LIMIT {}", limit));
// Build and execute the query dynamically
let mut q = sqlx::query_as::<_, HistoryEvent>(&query)
.bind(contract_id)
.bind(owner_id);
if let Some(ref phase) = filters.phase {
q = q.bind(phase);
}
if let Some(ref from) = filters.from {
q = q.bind(from);
}
if let Some(ref to) = filters.to {
q = q.bind(to);
}
let events = q.fetch_all(pool).await?;
// Get total count
let mut cq = sqlx::query_scalar::<_, i64>(&count_query)
.bind(contract_id)
.bind(owner_id);
if let Some(ref phase) = filters.phase {
cq = cq.bind(phase);
}
if let Some(ref from) = filters.from {
cq = cq.bind(from);
}
if let Some(ref to) = filters.to {
cq = cq.bind(to);
}
let count = cq.fetch_one(pool).await?;
Ok((events, count))
}
/// Get task history
pub async fn get_task_history(
pool: &PgPool,
task_id: Uuid,
owner_id: Uuid,
filters: &HistoryQueryFilters,
) -> Result<(Vec<HistoryEvent>, i64), sqlx::Error> {
let limit = filters.limit.unwrap_or(100);
let events = sqlx::query_as::<_, HistoryEvent>(
r#"
SELECT * FROM history_events
WHERE task_id = $1 AND owner_id = $2
ORDER BY created_at DESC
LIMIT $3
"#
)
.bind(task_id)
.bind(owner_id)
.bind(limit)
.fetch_all(pool)
.await?;
let count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM history_events WHERE task_id = $1 AND owner_id = $2"
)
.bind(task_id)
.bind(owner_id)
.fetch_one(pool)
.await?;
Ok((events, count))
}
/// Get unified timeline for an owner
pub async fn get_timeline(
pool: &PgPool,
owner_id: Uuid,
filters: &HistoryQueryFilters,
) -> Result<(Vec<HistoryEvent>, i64), sqlx::Error> {
let limit = filters.limit.unwrap_or(100);
let events = sqlx::query_as::<_, HistoryEvent>(
r#"
SELECT * FROM history_events
WHERE owner_id = $1
ORDER BY created_at DESC
LIMIT $2
"#
)
.bind(owner_id)
.bind(limit)
.fetch_all(pool)
.await?;
let count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM history_events WHERE owner_id = $1"
)
.bind(owner_id)
.fetch_one(pool)
.await?;
Ok((events, count))
}
// ============================================================================
// Task Conversation Retrieval
// ============================================================================
// Helper struct for parsing task output events
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskOutputEvent {
message_type: String,
content: Option<String>,
tool_name: Option<String>,
tool_input: Option<serde_json::Value>,
is_error: Option<bool>,
cost_usd: Option<f32>,
}
/// Get task conversation messages (reconstructed from task_events)
pub async fn get_task_conversation(
pool: &PgPool,
task_id: Uuid,
include_tool_calls: bool,
include_tool_results: bool,
limit: Option<i32>,
) -> Result<Vec<ConversationMessage>, sqlx::Error> {
let limit = limit.unwrap_or(1000);
// Get output events that represent conversation turns
let events = sqlx::query_as::<_, TaskEvent>(
r#"
SELECT * FROM task_events
WHERE task_id = $1 AND event_type = 'output'
ORDER BY created_at ASC
LIMIT $2
"#
)
.bind(task_id)
.bind(limit)
.fetch_all(pool)
.await?;
// Convert task events to conversation messages
let mut messages = Vec::new();
for event in events {
if let Some(data) = event.event_data {
// Parse the event data to extract message info
if let Ok(output) = serde_json::from_value::<TaskOutputEvent>(data.clone()) {
let should_include = match output.message_type.as_str() {
"tool_use" => include_tool_calls,
"tool_result" => include_tool_results,
_ => true,
};
if should_include {
messages.push(ConversationMessage {
id: event.id.to_string(),
role: match output.message_type.as_str() {
"assistant" => "assistant".to_string(),
"tool_use" => "assistant".to_string(),
"tool_result" => "tool".to_string(),
"system" => "system".to_string(),
"error" => "system".to_string(),
_ => "user".to_string(),
},
content: output.content.unwrap_or_default(),
timestamp: event.created_at,
tool_calls: None,
tool_name: output.tool_name,
tool_input: output.tool_input,
tool_result: None,
is_error: output.is_error,
token_count: None,
cost_usd: output.cost_usd.map(|c| c as f64),
});
}
}
}
}
Ok(messages)
}
/// Get supervisor conversation (from supervisor_states)
pub async fn get_supervisor_conversation_full(
pool: &PgPool,
contract_id: Uuid,
) -> Result<Option<SupervisorState>, sqlx::Error> {
get_supervisor_state(pool, contract_id).await
}
// =============================================================================
// Anonymous Task Cleanup Functions
// =============================================================================
/// Delete stale anonymous tasks (tasks with contract_id = NULL) that:
/// - Are in a terminal state (done, failed, merged)
/// - Are older than the specified number of days
///
/// Returns the number of deleted tasks.
pub async fn cleanup_stale_anonymous_tasks(
pool: &PgPool,
max_age_days: i32,
) -> Result<i64, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM tasks
WHERE contract_id IS NULL
AND status IN ('done', 'failed', 'merged')
AND created_at < NOW() - INTERVAL '1 day' * $1
"#,
)
.bind(max_age_days)
.execute(pool)
.await?;
Ok(result.rows_affected() as i64)
}
// ============================================================================
// Checkpoint Patches (for task recovery)
// ============================================================================
/// Create a checkpoint patch for task recovery.
pub async fn create_checkpoint_patch(
pool: &PgPool,
task_id: Uuid,
checkpoint_id: Option<Uuid>,
base_commit_sha: &str,
patch_data: &[u8],
files_count: i32,
ttl_hours: i64,
) -> Result<CheckpointPatch, sqlx::Error> {
sqlx::query_as::<_, CheckpointPatch>(
r#"
INSERT INTO checkpoint_patches (
task_id, checkpoint_id, base_commit_sha, patch_data,
patch_size_bytes, files_count, expires_at
)
VALUES ($1, $2, $3, $4, $5, $6, NOW() + INTERVAL '1 hour' * $7)
RETURNING *
"#,
)
.bind(task_id)
.bind(checkpoint_id)
.bind(base_commit_sha)
.bind(patch_data)
.bind(patch_data.len() as i32)
.bind(files_count)
.bind(ttl_hours)
.fetch_one(pool)
.await
}
/// Get the latest checkpoint patch for a task.
pub async fn get_latest_checkpoint_patch(
pool: &PgPool,
task_id: Uuid,
) -> Result<Option<CheckpointPatch>, sqlx::Error> {
sqlx::query_as::<_, CheckpointPatch>(
r#"
SELECT * FROM checkpoint_patches
WHERE task_id = $1 AND expires_at > NOW()
ORDER BY created_at DESC
LIMIT 1
"#,
)
.bind(task_id)
.fetch_optional(pool)
.await
}
/// Get a checkpoint patch by ID.
pub async fn get_checkpoint_patch(
pool: &PgPool,
id: Uuid,
) -> Result<Option<CheckpointPatch>, sqlx::Error> {
sqlx::query_as::<_, CheckpointPatch>(
"SELECT * FROM checkpoint_patches WHERE id = $1",
)
.bind(id)
.fetch_optional(pool)
.await
}
/// List all checkpoint patches for a task (without patch data for efficiency).
pub async fn list_checkpoint_patches(
pool: &PgPool,
task_id: Uuid,
) -> Result<Vec<CheckpointPatchInfo>, sqlx::Error> {
sqlx::query_as::<_, CheckpointPatchInfo>(
r#"
SELECT id, task_id, checkpoint_id, base_commit_sha,
patch_size_bytes, files_count, created_at, expires_at
FROM checkpoint_patches
WHERE task_id = $1
ORDER BY created_at DESC
"#,
)
.bind(task_id)
.fetch_all(pool)
.await
}
/// Delete expired checkpoint patches.
/// Returns the number of deleted patches.
pub async fn cleanup_expired_checkpoint_patches(
pool: &PgPool,
) -> Result<i64, sqlx::Error> {
let result = sqlx::query("DELETE FROM checkpoint_patches WHERE expires_at < NOW()")
.execute(pool)
.await?;
Ok(result.rows_affected() as i64)
}
/// Delete all checkpoint patches for a task.
pub async fn delete_checkpoint_patches_for_task(
pool: &PgPool,
task_id: Uuid,
) -> Result<i64, sqlx::Error> {
let result = sqlx::query("DELETE FROM checkpoint_patches WHERE task_id = $1")
.bind(task_id)
.execute(pool)
.await?;
Ok(result.rows_affected() as i64)
}
// =============================================================================
// Red Team Notifications
// =============================================================================
// =============================================================================
// Supervisor Status API Helpers
// =============================================================================
/// Get supervisor status for a contract.
/// Returns combined information from supervisor_states and tasks tables.
pub async fn get_supervisor_status(
pool: &PgPool,
contract_id: Uuid,
owner_id: Uuid,
) -> Result<Option<SupervisorStatusInfo>, sqlx::Error> {
// Query to get supervisor status by joining supervisor_states with tasks
sqlx::query_as::<_, SupervisorStatusInfo>(
r#"
SELECT
ss.task_id,
COALESCE(t.status, 'unknown') as supervisor_state,
ss.phase,
t.progress_summary as current_activity,
ss.pending_task_ids,
ss.last_activity as last_heartbeat,
t.status as task_status,
t.daemon_id IS NOT NULL as is_running
FROM supervisor_states ss
JOIN tasks t ON t.id = ss.task_id
WHERE ss.contract_id = $1
AND t.owner_id = $2
"#,
)
.bind(contract_id)
.bind(owner_id)
.fetch_optional(pool)
.await
}
/// Internal struct to hold supervisor status query result
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct SupervisorStatusInfo {
pub task_id: Uuid,
pub supervisor_state: String,
pub phase: String,
pub current_activity: Option<String>,
#[sqlx(try_from = "Vec<Uuid>")]
pub pending_task_ids: Vec<Uuid>,
pub last_heartbeat: chrono::DateTime<chrono::Utc>,
pub task_status: String,
pub is_running: bool,
}
/// Get supervisor activity history from history_events table.
/// This provides a timeline of supervisor activities that can serve as "heartbeats".
pub async fn get_supervisor_activity_history(
pool: &PgPool,
contract_id: Uuid,
limit: i32,
offset: i32,
) -> Result<Vec<SupervisorActivityEntry>, sqlx::Error> {
sqlx::query_as::<_, SupervisorActivityEntry>(
r#"
SELECT
created_at as timestamp,
COALESCE(event_subtype, 'activity') as state,
event_data->>'activity' as activity,
(event_data->>'progress')::INTEGER as progress,
COALESCE(phase, 'unknown') as phase,
CASE
WHEN event_data->'pending_task_ids' IS NOT NULL
THEN ARRAY(SELECT jsonb_array_elements_text(event_data->'pending_task_ids'))::UUID[]
ELSE ARRAY[]::UUID[]
END as pending_task_ids
FROM history_events
WHERE contract_id = $1
AND event_type IN ('supervisor', 'phase', 'task')
ORDER BY created_at DESC
LIMIT $2 OFFSET $3
"#,
)
.bind(contract_id)
.bind(limit)
.bind(offset)
.fetch_all(pool)
.await
}
/// Internal struct to hold supervisor activity entry
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct SupervisorActivityEntry {
pub timestamp: chrono::DateTime<chrono::Utc>,
pub state: String,
pub activity: Option<String>,
pub progress: Option<i32>,
pub phase: String,
#[sqlx(try_from = "Vec<Uuid>")]
pub pending_task_ids: Vec<Uuid>,
}
/// Count total supervisor activity history entries for a contract.
pub async fn count_supervisor_activity_history(
pool: &PgPool,
contract_id: Uuid,
) -> Result<i64, sqlx::Error> {
let result: (i64,) = sqlx::query_as(
r#"
SELECT COUNT(*)
FROM history_events
WHERE contract_id = $1
AND event_type IN ('supervisor', 'phase', 'task')
"#,
)
.bind(contract_id)
.fetch_one(pool)
.await?;
Ok(result.0)
}
/// Update supervisor state last_activity timestamp.
/// This acts as a "sync" operation to refresh the supervisor's heartbeat.
pub async fn sync_supervisor_state(
pool: &PgPool,
contract_id: Uuid,
) -> Result<Option<SupervisorState>, sqlx::Error> {
sqlx::query_as::<_, SupervisorState>(
r#"
UPDATE supervisor_states
SET last_activity = NOW(),
updated_at = NOW()
WHERE contract_id = $1
RETURNING *
"#,
)
.bind(contract_id)
.fetch_optional(pool)
.await
}
// =============================================================================
// Helper Functions
// =============================================================================
/// Helper to truncate string to max length
#[allow(dead_code)]
fn truncate_string(s: &str, max_len: usize) -> String {
if s.len() <= max_len {
s.to_string()
} else {
format!("{}...", &s[..max_len - 3])
}
}
// =============================================================================
// Directive CRUD
// =============================================================================
/// Create a new directive for an owner.
pub async fn create_directive_for_owner(
pool: &PgPool,
owner_id: Uuid,
req: CreateDirectiveRequest,
) -> Result<Directive, sqlx::Error> {
sqlx::query_as::<_, Directive>(
r#"
INSERT INTO directives (owner_id, title, goal, repository_url, local_path, base_branch, reconcile_mode)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING *
"#,
)
.bind(owner_id)
.bind(&req.title)
.bind(&req.goal)
.bind(&req.repository_url)
.bind(&req.local_path)
.bind(&req.base_branch)
.bind(req.reconcile_mode.as_deref().unwrap_or("auto"))
.fetch_one(pool)
.await
}
/// Get a single directive for an owner.
pub async fn get_directive_for_owner(
pool: &PgPool,
owner_id: Uuid,
id: Uuid,
) -> Result<Option<Directive>, sqlx::Error> {
sqlx::query_as::<_, Directive>(
r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#,
)
.bind(id)
.bind(owner_id)
.fetch_optional(pool)
.await
}
/// Get a directive with all its steps.
pub async fn get_directive_with_steps_for_owner(
pool: &PgPool,
owner_id: Uuid,
id: Uuid,
) -> Result<Option<(Directive, Vec<DirectiveStep>)>, sqlx::Error> {
let directive = sqlx::query_as::<_, Directive>(
r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#,
)
.bind(id)
.bind(owner_id)
.fetch_optional(pool)
.await?;
match directive {
Some(d) => {
let steps = list_directive_steps(pool, d.id).await?;
Ok(Some((d, steps)))
}
None => Ok(None),
}
}
/// List all directives for an owner with step counts.
pub async fn list_directives_for_owner(
pool: &PgPool,
owner_id: Uuid,
) -> Result<Vec<DirectiveSummary>, sqlx::Error> {
sqlx::query_as::<_, DirectiveSummary>(
r#"
SELECT
d.id, d.owner_id, d.title, d.goal, d.status, d.repository_url,
d.orchestrator_task_id, d.pr_url, d.completion_task_id,
d.reconcile_mode,
d.version, d.created_at, d.updated_at,
COALESCE(s.total_steps, 0) as total_steps,
COALESCE(s.completed_steps, 0) as completed_steps,
COALESCE(s.running_steps, 0) as running_steps,
COALESCE(s.failed_steps, 0) as failed_steps
FROM directives d
LEFT JOIN LATERAL (
SELECT
COUNT(*) as total_steps,
COUNT(*) FILTER (WHERE status = 'completed') as completed_steps,
COUNT(*) FILTER (WHERE status = 'running') as running_steps,
COUNT(*) FILTER (WHERE status = 'failed') as failed_steps
FROM directive_steps
WHERE directive_id = d.id
) s ON true
WHERE d.owner_id = $1
ORDER BY d.created_at DESC
"#,
)
.bind(owner_id)
.fetch_all(pool)
.await
}
/// Update a directive with optimistic locking.
pub async fn update_directive_for_owner(
pool: &PgPool,
owner_id: Uuid,
id: Uuid,
req: UpdateDirectiveRequest,
) -> Result<Option<Directive>, RepositoryError> {
let current = sqlx::query_as::<_, Directive>(
r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#,
)
.bind(id)
.bind(owner_id)
.fetch_optional(pool)
.await
.map_err(RepositoryError::Database)?;
let current = match current {
Some(c) => c,
None => return Ok(None),
};
if let Some(expected_version) = req.version {
if expected_version != current.version {
return Err(RepositoryError::VersionConflict {
expected: expected_version,
actual: current.version,
});
}
}
let title = req.title.as_deref().unwrap_or(¤t.title);
let goal = req.goal.as_deref().unwrap_or(¤t.goal);
let goal_changed = goal != current.goal;
let status = req.status.as_deref().unwrap_or(¤t.status);
let repository_url = req.repository_url.as_deref().or(current.repository_url.as_deref());
let local_path = req.local_path.as_deref().or(current.local_path.as_deref());
let base_branch = req.base_branch.as_deref().or(current.base_branch.as_deref());
let orchestrator_task_id = req.orchestrator_task_id.or(current.orchestrator_task_id);
let pr_url = req.pr_url.as_deref().or(current.pr_url.as_deref());
let pr_branch = req.pr_branch.as_deref().or(current.pr_branch.as_deref());
let reconcile_mode = req.reconcile_mode.clone().unwrap_or_else(|| current.reconcile_mode.clone());
let result = sqlx::query_as::<_, Directive>(
r#"
UPDATE directives
SET title = $3, goal = $4, status = $5, repository_url = $6, local_path = $7,
base_branch = $8, orchestrator_task_id = $9, pr_url = $10, pr_branch = $11,
reconcile_mode = $12,
goal_updated_at = CASE WHEN $13 THEN NOW() ELSE goal_updated_at END,
version = version + 1, updated_at = NOW()
WHERE id = $1 AND owner_id = $2
RETURNING *
"#,
)
.bind(id)
.bind(owner_id)
.bind(title)
.bind(goal)
.bind(status)
.bind(repository_url)
.bind(local_path)
.bind(base_branch)
.bind(orchestrator_task_id)
.bind(pr_url)
.bind(pr_branch)
.bind(reconcile_mode)
.bind(goal_changed)
.fetch_optional(pool)
.await
.map_err(RepositoryError::Database)?;
Ok(result)
}
/// Delete a directive for an owner.
pub async fn delete_directive_for_owner(
pool: &PgPool,
owner_id: Uuid,
id: Uuid,
) -> Result<bool, sqlx::Error> {
// Delete all tasks associated with this directive
sqlx::query(
r#"DELETE FROM tasks WHERE directive_id = $1 AND owner_id = $2"#,
)
.bind(id)
.bind(owner_id)
.execute(pool)
.await?;
let result = sqlx::query(
r#"DELETE FROM directives WHERE id = $1 AND owner_id = $2"#,
)
.bind(id)
.bind(owner_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Clean up terminal tasks associated with a directive.
///
/// Deletes tasks in terminal states (completed, failed, merged, done, interrupted)
/// that belong to this directive, excluding tasks currently referenced by
/// `completion_task_id` or `orchestrator_task_id` on the directive.
/// NULLs out `task_id` on directive_steps for deleted tasks.
pub async fn cleanup_directive_tasks(
pool: &PgPool,
owner_id: Uuid,
directive_id: Uuid,
) -> Result<i64, sqlx::Error> {
// NULL out task_id on steps that reference terminal tasks we're about to delete
sqlx::query(
r#"
UPDATE directive_steps
SET task_id = NULL
WHERE directive_id = $1
AND task_id IS NOT NULL
AND task_id IN (
SELECT t.id FROM tasks t
WHERE t.directive_id = $1
AND t.owner_id = $2
AND t.status IN ('completed', 'failed', 'merged', 'done', 'interrupted')
AND t.id NOT IN (
SELECT COALESCE(d.completion_task_id, '00000000-0000-0000-0000-000000000000')
FROM directives d WHERE d.id = $1
UNION
SELECT COALESCE(d.orchestrator_task_id, '00000000-0000-0000-0000-000000000000')
FROM directives d WHERE d.id = $1
)
)
"#,
)
.bind(directive_id)
.bind(owner_id)
.execute(pool)
.await?;
// Delete terminal tasks not currently referenced by the directive
let result = sqlx::query(
r#"
DELETE FROM tasks
WHERE directive_id = $1
AND owner_id = $2
AND status IN ('completed', 'failed', 'merged', 'done', 'interrupted')
AND id NOT IN (
SELECT COALESCE(d.completion_task_id, '00000000-0000-0000-0000-000000000000')
FROM directives d WHERE d.id = $1
UNION
SELECT COALESCE(d.orchestrator_task_id, '00000000-0000-0000-0000-000000000000')
FROM directives d WHERE d.id = $1
)
"#,
)
.bind(directive_id)
.bind(owner_id)
.execute(pool)
.await?;
Ok(result.rows_affected() as i64)
}
// =============================================================================
// Directive Completion Helpers
// =============================================================================
/// Row type for completed step tasks.
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct CompletedStepTask {
pub step_id: Uuid,
pub step_name: String,
pub task_id: Uuid,
pub task_name: String,
}
/// Row type for directive completion task status check.
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct DirectiveCompletionCheck {
pub directive_id: Uuid,
pub owner_id: Uuid,
pub completion_task_id: Uuid,
pub task_status: String,
pub pr_url: Option<String>,
pub task_name: String,
}
/// Get idle directives that need a completion task spawned.
/// Conditions: status = 'idle', no completion_task_id, has repository_url,
/// and has at least one completed step with a task_id.
pub async fn get_idle_directives_needing_completion(
pool: &PgPool,
) -> Result<Vec<Directive>, sqlx::Error> {
sqlx::query_as::<_, Directive>(
r#"
SELECT d.*
FROM directives d
WHERE d.status = 'idle'
AND d.completion_task_id IS NULL
AND d.pr_branch IS NULL
AND d.repository_url IS NOT NULL
AND EXISTS (
SELECT 1 FROM directive_steps ds
WHERE ds.directive_id = d.id
AND ds.status = 'completed'
AND ds.task_id IS NOT NULL
)
"#,
)
.fetch_all(pool)
.await
}
/// Get directives that attempted completion (pr_branch set) but have no PR URL yet
/// and no active completion task. These need a verification task spawned.
pub async fn get_directives_needing_verification(
pool: &PgPool,
) -> Result<Vec<Directive>, sqlx::Error> {
sqlx::query_as::<_, Directive>(
r#"
SELECT d.*
FROM directives d
WHERE d.status = 'idle'
AND d.pr_branch IS NOT NULL
AND d.pr_url IS NULL
AND d.completion_task_id IS NULL
AND d.repository_url IS NOT NULL
"#,
)
.fetch_all(pool)
.await
}
/// Get directives with active completion tasks, joined with task status.
pub async fn get_completion_tasks_to_check(
pool: &PgPool,
) -> Result<Vec<DirectiveCompletionCheck>, sqlx::Error> {
sqlx::query_as::<_, DirectiveCompletionCheck>(
r#"
SELECT d.id as directive_id, d.owner_id, d.completion_task_id, t.status as task_status, d.pr_url, t.name as task_name
FROM directives d
JOIN tasks t ON t.id = d.completion_task_id
WHERE d.completion_task_id IS NOT NULL
"#,
)
.fetch_all(pool)
.await
}
/// Atomically claim a directive for completion by setting a placeholder completion_task_id.
/// Returns true if the claim was successful (no other task already claimed it).
pub async fn claim_directive_for_completion(
pool: &PgPool,
directive_id: Uuid,
task_id: Uuid,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"UPDATE directives SET completion_task_id = $2, updated_at = NOW()
WHERE id = $1 AND completion_task_id IS NULL"#,
)
.bind(directive_id)
.bind(task_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Assign a completion task to a directive (unconditional update).
pub async fn assign_completion_task(
pool: &PgPool,
directive_id: Uuid,
task_id: Uuid,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"UPDATE directives SET completion_task_id = $2, updated_at = NOW() WHERE id = $1"#,
)
.bind(directive_id)
.bind(task_id)
.execute(pool)
.await?;
Ok(())
}
/// Clear the completion task from a directive.
pub async fn clear_completion_task(
pool: &PgPool,
directive_id: Uuid,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"UPDATE directives SET completion_task_id = NULL, updated_at = NOW() WHERE id = $1"#,
)
.bind(directive_id)
.execute(pool)
.await?;
Ok(())
}
/// Get completed step tasks for a directive (steps that have completed with an assigned task).
pub async fn get_completed_step_tasks(
pool: &PgPool,
directive_id: Uuid,
) -> Result<Vec<CompletedStepTask>, sqlx::Error> {
sqlx::query_as::<_, CompletedStepTask>(
r#"
SELECT ds.id as step_id, ds.name as step_name, ds.task_id, t.name as task_name
FROM directive_steps ds
JOIN tasks t ON t.id = ds.task_id
WHERE ds.directive_id = $1
AND ds.status = 'completed'
AND ds.task_id IS NOT NULL
ORDER BY ds.order_index, ds.created_at
"#,
)
.bind(directive_id)
.fetch_all(pool)
.await
}
/// Get the task ID of the most recently completed step for a directive.
/// Used as a fallback `continue_from_task_id` when dispatching new-generation steps
/// that have no explicit dependencies and no PR branch to continue from.
pub async fn get_last_completed_step_task_id(
pool: &PgPool,
directive_id: Uuid,
) -> Result<Option<Uuid>, sqlx::Error> {
let row: Option<(Uuid,)> = sqlx::query_as(
r#"
SELECT ds.task_id
FROM directive_steps ds
WHERE ds.directive_id = $1
AND ds.status = 'completed'
AND ds.task_id IS NOT NULL
ORDER BY ds.updated_at DESC
LIMIT 1
"#,
)
.bind(directive_id)
.fetch_optional(pool)
.await?;
Ok(row.map(|r| r.0))
}
// =============================================================================
// Directive Step CRUD
// =============================================================================
/// Get a single directive step by ID.
pub async fn get_directive_step(
pool: &PgPool,
step_id: Uuid,
) -> Result<Option<DirectiveStep>, sqlx::Error> {
sqlx::query_as::<_, DirectiveStep>(
r#"SELECT * FROM directive_steps WHERE id = $1"#,
)
.bind(step_id)
.fetch_optional(pool)
.await
}
/// List all steps for a directive, ordered by order_index.
pub async fn list_directive_steps(
pool: &PgPool,
directive_id: Uuid,
) -> Result<Vec<DirectiveStep>, sqlx::Error> {
sqlx::query_as::<_, DirectiveStep>(
r#"
SELECT * FROM directive_steps
WHERE directive_id = $1
ORDER BY order_index, created_at
"#,
)
.bind(directive_id)
.fetch_all(pool)
.await
}
/// Create a single directive step.
pub async fn create_directive_step(
pool: &PgPool,
directive_id: Uuid,
req: CreateDirectiveStepRequest,
) -> Result<DirectiveStep, sqlx::Error> {
let generation = req.generation.unwrap_or(1);
let order_id = req.order_id;
let contract_type = req.contract_type.clone();
let step = sqlx::query_as::<_, DirectiveStep>(
r#"
INSERT INTO directive_steps (directive_id, name, description, task_plan, depends_on, order_index, generation, contract_type)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING *
"#,
)
.bind(directive_id)
.bind(&req.name)
.bind(&req.description)
.bind(&req.task_plan)
.bind(&req.depends_on)
.bind(req.order_index)
.bind(generation)
.bind(&contract_type)
.fetch_one(pool)
.await?;
// If an order_id was provided, auto-link the order to this step
if let Some(oid) = order_id {
sqlx::query(
r#"UPDATE orders SET directive_step_id = $1, updated_at = NOW() WHERE id = $2"#,
)
.bind(step.id)
.bind(oid)
.execute(pool)
.await?;
}
Ok(step)
}
/// Batch create multiple directive steps.
pub async fn batch_create_directive_steps(
pool: &PgPool,
directive_id: Uuid,
steps: Vec<CreateDirectiveStepRequest>,
) -> Result<Vec<DirectiveStep>, sqlx::Error> {
let mut results = Vec::with_capacity(steps.len());
for req in steps {
let step = create_directive_step(pool, directive_id, req).await?;
results.push(step);
}
Ok(results)
}
/// Update a directive step.
pub async fn update_directive_step(
pool: &PgPool,
step_id: Uuid,
req: UpdateDirectiveStepRequest,
) -> Result<Option<DirectiveStep>, sqlx::Error> {
let current = sqlx::query_as::<_, DirectiveStep>(
r#"SELECT * FROM directive_steps WHERE id = $1"#,
)
.bind(step_id)
.fetch_optional(pool)
.await?;
let current = match current {
Some(c) => c,
None => return Ok(None),
};
let name = req.name.as_deref().unwrap_or(¤t.name);
let description = req.description.as_deref().or(current.description.as_deref());
let task_plan = req.task_plan.as_deref().or(current.task_plan.as_deref());
let depends_on = req.depends_on.as_deref().unwrap_or(¤t.depends_on);
let status = req.status.as_deref().unwrap_or(¤t.status);
let task_id = req.task_id.or(current.task_id);
let order_index = req.order_index.unwrap_or(current.order_index);
// Set started_at when transitioning to running
let started_at = if status == "running" && current.status != "running" {
Some(Utc::now())
} else {
current.started_at
};
// Set completed_at when transitioning to terminal state
let completed_at = if matches!(status, "completed" | "failed" | "skipped")
&& !matches!(current.status.as_str(), "completed" | "failed" | "skipped")
{
Some(Utc::now())
} else {
current.completed_at
};
sqlx::query_as::<_, DirectiveStep>(
r#"
UPDATE directive_steps
SET name = $2, description = $3, task_plan = $4, depends_on = $5,
status = $6, task_id = $7, order_index = $8, started_at = $9, completed_at = $10
WHERE id = $1
RETURNING *
"#,
)
.bind(step_id)
.bind(name)
.bind(description)
.bind(task_plan)
.bind(depends_on)
.bind(status)
.bind(task_id)
.bind(order_index)
.bind(started_at)
.bind(completed_at)
.fetch_optional(pool)
.await
}
/// Delete a directive step.
pub async fn delete_directive_step(
pool: &PgPool,
step_id: Uuid,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(r#"DELETE FROM directive_steps WHERE id = $1"#)
.bind(step_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Delete all directive steps that have not started execution (pending, ready, failed, skipped).
/// Completed and running steps are preserved.
/// Returns the number of deleted steps.
pub async fn clear_pending_directive_steps(
pool: &PgPool,
directive_id: Uuid,
) -> Result<u64, sqlx::Error> {
let result = sqlx::query(
r#"DELETE FROM directive_steps
WHERE directive_id = $1
AND status IN ('pending', 'ready', 'failed', 'skipped')"#,
)
.bind(directive_id)
.execute(pool)
.await?;
Ok(result.rows_affected())
}
// =============================================================================
// Directive DAG Progression
// =============================================================================
/// Advance pending steps to ready if all their dependencies are in terminal states.
/// Returns the newly-ready steps.
pub async fn advance_directive_ready_steps(
pool: &PgPool,
directive_id: Uuid,
) -> Result<Vec<DirectiveStep>, sqlx::Error> {
sqlx::query_as::<_, DirectiveStep>(
r#"
UPDATE directive_steps SET status = 'ready'
WHERE directive_id = $1 AND status = 'pending'
AND NOT EXISTS (
SELECT 1 FROM unnest(depends_on) AS dep_id
JOIN directive_steps ds ON ds.id = dep_id
WHERE ds.status NOT IN ('completed', 'skipped')
)
AND NOT EXISTS (
SELECT 1 FROM directive_steps prev
WHERE prev.directive_id = $1
AND prev.order_index < directive_steps.order_index
AND prev.status NOT IN ('completed', 'skipped', 'failed')
)
RETURNING *
"#,
)
.bind(directive_id)
.fetch_all(pool)
.await
}
/// Check if all steps in a directive are in terminal states.
/// If so, set the directive to 'idle' (not completed — directives are ongoing).
/// Returns true if the directive was set to idle.
pub async fn check_directive_idle(
pool: &PgPool,
directive_id: Uuid,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
UPDATE directives SET status = 'idle', updated_at = NOW()
WHERE id = $1 AND status = 'active'
AND NOT EXISTS (
SELECT 1 FROM directive_steps
WHERE directive_id = $1
AND status NOT IN ('completed', 'failed', 'skipped')
)
AND EXISTS (
SELECT 1 FROM directive_steps WHERE directive_id = $1
)
"#,
)
.bind(directive_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Update a directive's goal and bump goal_updated_at.
/// Reactivates idle/paused directives and clears any stale orchestrator task
/// so that replanning triggers on the next tick.
pub async fn update_directive_goal(
pool: &PgPool,
owner_id: Uuid,
directive_id: Uuid,
goal: &str,
) -> Result<Option<Directive>, sqlx::Error> {
sqlx::query_as::<_, Directive>(
r#"
UPDATE directives
SET goal = $3,
goal_updated_at = NOW(),
status = CASE WHEN status IN ('idle', 'paused') THEN 'active' ELSE status END,
orchestrator_task_id = NULL,
updated_at = NOW(),
version = version + 1
WHERE id = $1 AND owner_id = $2
RETURNING *
"#,
)
.bind(directive_id)
.bind(owner_id)
.bind(goal)
.fetch_optional(pool)
.await
}
/// Save a goal to the directive goal history.
pub async fn save_directive_goal_history(
pool: &PgPool,
directive_id: Uuid,
goal: &str,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"INSERT INTO directive_goal_history (directive_id, goal)
VALUES ($1, $2)"#,
)
.bind(directive_id)
.bind(goal)
.execute(pool)
.await?;
Ok(())
}
/// Get recent goal history for a directive (most recent first), limited to limit entries.
pub async fn get_directive_goal_history(
pool: &PgPool,
directive_id: Uuid,
limit: i64,
) -> Result<Vec<DirectiveGoalHistory>, sqlx::Error> {
sqlx::query_as::<_, DirectiveGoalHistory>(
r#"SELECT id, directive_id, goal, created_at
FROM directive_goal_history
WHERE directive_id = $1
ORDER BY created_at DESC
LIMIT $2"#,
)
.bind(directive_id)
.bind(limit)
.fetch_all(pool)
.await
}
/// Set a directive's status (used for start/pause/archive transitions).
pub async fn set_directive_status(
pool: &PgPool,
owner_id: Uuid,
directive_id: Uuid,
status: &str,
) -> Result<Option<Directive>, sqlx::Error> {
let mut query = String::from(
r#"UPDATE directives SET status = $3, updated_at = NOW(), version = version + 1"#,
);
if status == "active" {
query.push_str(", started_at = COALESCE(started_at, NOW())");
}
query.push_str(" WHERE id = $1 AND owner_id = $2 RETURNING *");
sqlx::query_as::<_, Directive>(&query)
.bind(directive_id)
.bind(owner_id)
.bind(status)
.fetch_optional(pool)
.await
}
// =============================================================================
// Directive Orchestrator Queries
// =============================================================================
/// Get active directives that need planning (no steps, no orchestrator task).
pub async fn get_directives_needing_planning(
pool: &PgPool,
) -> Result<Vec<Directive>, sqlx::Error> {
sqlx::query_as::<_, Directive>(
r#"
SELECT d.* FROM directives d
WHERE d.status = 'active'
AND d.orchestrator_task_id IS NULL
AND NOT EXISTS (
SELECT 1 FROM directive_steps WHERE directive_id = d.id
)
"#,
)
.fetch_all(pool)
.await
}
/// A step joined with minimal directive info for dispatch.
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct StepForDispatch {
// Step fields
pub step_id: Uuid,
pub directive_id: Uuid,
pub step_name: String,
pub step_description: Option<String>,
pub task_plan: Option<String>,
pub order_index: i32,
pub generation: i32,
pub depends_on: Vec<Uuid>,
/// Optional contract type — when set, orchestrator creates a contract instead of a task.
pub contract_type: Option<String>,
// Directive fields
pub owner_id: Uuid,
pub directive_title: String,
pub repository_url: Option<String>,
pub base_branch: Option<String>,
/// The directive's PR branch (if a PR has already been created from previous steps).
pub pr_branch: Option<String>,
/// The directive's reconcile mode: "auto", "semi-auto", or "manual".
pub reconcile_mode: String,
}
/// Get ready steps that need task dispatch.
pub async fn get_ready_steps_for_dispatch(
pool: &PgPool,
) -> Result<Vec<StepForDispatch>, sqlx::Error> {
sqlx::query_as::<_, StepForDispatch>(
r#"
SELECT
ds.id AS step_id,
ds.directive_id,
ds.name AS step_name,
ds.description AS step_description,
ds.task_plan,
ds.order_index,
ds.generation,
ds.depends_on,
ds.contract_type,
d.owner_id,
d.title AS directive_title,
d.repository_url,
d.base_branch,
d.pr_branch,
d.reconcile_mode
FROM directive_steps ds
JOIN directives d ON d.id = ds.directive_id
WHERE ds.status = 'ready'
AND ds.task_id IS NULL
AND ds.contract_id IS NULL
AND d.status = 'active'
ORDER BY ds.order_index
"#,
)
.fetch_all(pool)
.await
}
/// Task info for a dependency step (step → linked task).
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct DependencyTaskInfo {
pub step_id: Uuid,
pub task_id: Uuid,
pub task_name: String,
}
/// Resolve dependency step UUIDs to their linked task IDs and names.
/// Returns results in the same order as the input `depends_on` slice.
pub async fn get_step_dependency_tasks(
pool: &PgPool,
depends_on: &[Uuid],
) -> Result<Vec<DependencyTaskInfo>, sqlx::Error> {
if depends_on.is_empty() {
return Ok(vec![]);
}
let rows = sqlx::query_as::<_, DependencyTaskInfo>(
r#"
SELECT ds.id AS step_id, t.id AS task_id, t.name AS task_name
FROM directive_steps ds
JOIN tasks t ON t.id = ds.task_id
WHERE ds.id = ANY($1)
"#,
)
.bind(depends_on)
.fetch_all(pool)
.await?;
// Re-order to match input ordering
let mut ordered = Vec::with_capacity(depends_on.len());
for dep_id in depends_on {
if let Some(row) = rows.iter().find(|r| r.step_id == *dep_id) {
ordered.push(row.clone());
}
}
Ok(ordered)
}
/// A running step joined with its task's current status.
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct RunningStepWithTask {
pub step_id: Uuid,
pub directive_id: Uuid,
pub task_id: Uuid,
pub task_status: String,
}
/// Get running steps with their task status for monitoring.
pub async fn get_running_steps_with_tasks(
pool: &PgPool,
) -> Result<Vec<RunningStepWithTask>, sqlx::Error> {
sqlx::query_as::<_, RunningStepWithTask>(
r#"
SELECT
ds.id AS step_id,
ds.directive_id,
ds.task_id AS "task_id!",
t.status AS task_status
FROM directive_steps ds
JOIN tasks t ON t.id = ds.task_id
WHERE ds.status = 'running'
AND ds.task_id IS NOT NULL
AND ds.contract_id IS NULL
"#,
)
.fetch_all(pool)
.await
}
/// A running step backed by a contract, joined with the contract's current status.
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct RunningStepWithContract {
pub step_id: Uuid,
pub directive_id: Uuid,
pub contract_id: Uuid,
pub contract_status: String,
pub contract_phase: String,
}
/// Get running steps that are backed by contracts (for contract-based monitoring).
pub async fn get_running_steps_with_contracts(
pool: &PgPool,
) -> Result<Vec<RunningStepWithContract>, sqlx::Error> {
sqlx::query_as::<_, RunningStepWithContract>(
r#"
SELECT
ds.id AS step_id,
ds.directive_id,
ds.contract_id AS "contract_id!",
c.status AS contract_status,
c.phase AS contract_phase
FROM directive_steps ds
JOIN contracts c ON c.id = ds.contract_id
WHERE ds.status = 'running'
AND ds.contract_id IS NOT NULL
"#,
)
.fetch_all(pool)
.await
}
/// An orchestrator task to check (directive with pending planning task).
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct OrchestratorTaskCheck {
pub directive_id: Uuid,
pub orchestrator_task_id: Uuid,
pub task_status: String,
pub owner_id: Uuid,
}
/// Get directives with orchestrator tasks to check completion.
pub async fn get_orchestrator_tasks_to_check(
pool: &PgPool,
) -> Result<Vec<OrchestratorTaskCheck>, sqlx::Error> {
sqlx::query_as::<_, OrchestratorTaskCheck>(
r#"
SELECT
d.id AS directive_id,
d.orchestrator_task_id AS "orchestrator_task_id!",
t.status AS task_status,
d.owner_id
FROM directives d
JOIN tasks t ON t.id = d.orchestrator_task_id
WHERE d.orchestrator_task_id IS NOT NULL
AND d.status = 'active'
"#,
)
.fetch_all(pool)
.await
}
/// Get active directives that need re-planning (goal updated after latest step).
pub async fn get_directives_needing_replanning(
pool: &PgPool,
) -> Result<Vec<Directive>, sqlx::Error> {
sqlx::query_as::<_, Directive>(
r#"
SELECT d.* FROM directives d
WHERE d.status = 'active'
AND d.orchestrator_task_id IS NULL
AND EXISTS (
SELECT 1 FROM directive_steps WHERE directive_id = d.id
)
AND d.goal_updated_at > (
SELECT COALESCE(MAX(ds.created_at), '1970-01-01'::timestamptz)
FROM directive_steps ds WHERE ds.directive_id = d.id
)
"#,
)
.fetch_all(pool)
.await
}
/// Assign a task to a step and set status to running.
pub async fn assign_task_to_step(
pool: &PgPool,
step_id: Uuid,
task_id: Uuid,
) -> Result<Option<DirectiveStep>, sqlx::Error> {
sqlx::query_as::<_, DirectiveStep>(
r#"
UPDATE directive_steps
SET task_id = $2, status = 'running', started_at = NOW()
WHERE id = $1
RETURNING *
"#,
)
.bind(step_id)
.bind(task_id)
.fetch_optional(pool)
.await
}
/// Set the orchestrator_task_id on a directive.
pub async fn assign_orchestrator_task(
pool: &PgPool,
directive_id: Uuid,
task_id: Uuid,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
UPDATE directives
SET orchestrator_task_id = $2, updated_at = NOW()
WHERE id = $1
"#,
)
.bind(directive_id)
.bind(task_id)
.execute(pool)
.await?;
Ok(())
}
/// Clear the orchestrator_task_id on a directive.
pub async fn clear_orchestrator_task(
pool: &PgPool,
directive_id: Uuid,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
UPDATE directives
SET orchestrator_task_id = NULL, updated_at = NOW()
WHERE id = $1
"#,
)
.bind(directive_id)
.execute(pool)
.await?;
Ok(())
}
/// Cancel old planning tasks for a directive.
/// Marks any non-terminal planning/re-planning tasks as interrupted,
/// excluding the given new task. Identifies planning tasks by name prefix
/// ("Plan: " or "Re-plan: ") to avoid cancelling completion/verification tasks.
pub async fn cancel_old_planning_tasks(
pool: &PgPool,
directive_id: Uuid,
exclude_task_id: Uuid,
) -> Result<u64, sqlx::Error> {
let result = sqlx::query(
r#"
UPDATE tasks
SET status = 'interrupted',
completed_at = NOW(),
updated_at = NOW()
WHERE directive_id = $1
AND id != $2
AND (name LIKE 'Plan: %' OR name LIKE 'Re-plan: %')
AND status NOT IN ('completed', 'failed', 'merged', 'done', 'interrupted')
"#,
)
.bind(directive_id)
.bind(exclude_task_id)
.execute(pool)
.await?;
Ok(result.rows_affected())
}
/// Link a task to a step without changing step status.
pub async fn link_task_to_step(
pool: &PgPool,
step_id: Uuid,
task_id: Uuid,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
UPDATE directive_steps
SET task_id = $2
WHERE id = $1
"#,
)
.bind(step_id)
.bind(task_id)
.execute(pool)
.await?;
Ok(())
}
/// Link a contract to a directive step.
pub async fn link_contract_to_step(
pool: &PgPool,
step_id: Uuid,
contract_id: Uuid,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
UPDATE directive_steps
SET contract_id = $1
WHERE id = $2
"#,
)
.bind(contract_id)
.bind(step_id)
.execute(pool)
.await?;
Ok(())
}
/// Set a step to 'running' status (after its task has been dispatched).
pub async fn set_step_running(
pool: &PgPool,
step_id: Uuid,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
UPDATE directive_steps
SET status = 'running', started_at = COALESCE(started_at, NOW())
WHERE id = $1
"#,
)
.bind(step_id)
.execute(pool)
.await?;
Ok(())
}
/// Get pending directive tasks (tasks with directive_id that are still pending).
pub async fn get_pending_directive_tasks(
pool: &PgPool,
) -> Result<Vec<Task>, sqlx::Error> {
sqlx::query_as::<_, Task>(
r#"
SELECT * FROM tasks
WHERE directive_id IS NOT NULL
AND status = 'pending'
AND daemon_id IS NULL
ORDER BY created_at
"#,
)
.fetch_all(pool)
.await
}
/// Get the max generation number for steps in a directive.
pub async fn get_directive_max_generation(
pool: &PgPool,
directive_id: Uuid,
) -> Result<i32, sqlx::Error> {
let row: (Option<i32>,) = sqlx::query_as(
r#"SELECT MAX(generation) FROM directive_steps WHERE directive_id = $1"#,
)
.bind(directive_id)
.fetch_one(pool)
.await?;
Ok(row.0.unwrap_or(0))
}
// =============================================================================
// Order CRUD
// =============================================================================
/// Create a new order for the given owner.
pub async fn create_order(
pool: &PgPool,
owner_id: Uuid,
req: CreateOrderRequest,
) -> Result<Order, sqlx::Error> {
let priority = req.priority.as_deref().unwrap_or("medium");
let status = req.status.as_deref().unwrap_or("open");
let order_type = req.order_type.as_deref().unwrap_or("feature");
sqlx::query_as::<_, Order>(
r#"
INSERT INTO orders (owner_id, title, description, priority, status, order_type, labels, directive_id, repository_url, dog_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
RETURNING *
"#,
)
.bind(owner_id)
.bind(&req.title)
.bind(&req.description)
.bind(priority)
.bind(status)
.bind(order_type)
.bind(&req.labels)
.bind(req.directive_id)
.bind(&req.repository_url)
.bind(req.dog_id)
.fetch_one(pool)
.await
}
/// List orders for the given owner with optional filters.
pub async fn list_orders(
pool: &PgPool,
owner_id: Uuid,
status_filter: Option<&str>,
type_filter: Option<&str>,
priority_filter: Option<&str>,
directive_id_filter: Option<Uuid>,
dog_id_filter: Option<Uuid>,
search_filter: Option<&str>,
) -> Result<Vec<Order>, sqlx::Error> {
// Build dynamic query with optional filters
let mut query = String::from("SELECT * FROM orders WHERE owner_id = $1");
let mut param_idx = 2u32;
if status_filter.is_some() {
query.push_str(&format!(" AND status = ${}", param_idx));
param_idx += 1;
}
if type_filter.is_some() {
query.push_str(&format!(" AND order_type = ${}", param_idx));
param_idx += 1;
}
if priority_filter.is_some() {
query.push_str(&format!(" AND priority = ${}", param_idx));
param_idx += 1;
}
if directive_id_filter.is_some() {
query.push_str(&format!(" AND directive_id = ${}", param_idx));
param_idx += 1;
}
if dog_id_filter.is_some() {
query.push_str(&format!(" AND dog_id = ${}", param_idx));
param_idx += 1;
}
if search_filter.is_some() {
query.push_str(&format!(
" AND (title ILIKE ${p} OR description ILIKE ${p} OR directive_name ILIKE ${p})",
p = param_idx
));
let _ = param_idx; // suppress unused warning
}
query.push_str(" ORDER BY created_at DESC");
let mut q = sqlx::query_as::<_, Order>(&query).bind(owner_id);
if let Some(s) = status_filter {
q = q.bind(s);
}
if let Some(t) = type_filter {
q = q.bind(t);
}
if let Some(p) = priority_filter {
q = q.bind(p);
}
if let Some(d) = directive_id_filter {
q = q.bind(d);
}
if let Some(d) = dog_id_filter {
q = q.bind(d);
}
if let Some(s) = search_filter {
q = q.bind(format!("%{}%", s));
}
q.fetch_all(pool).await
}
/// Get a single order by ID (owner-scoped).
pub async fn get_order(
pool: &PgPool,
owner_id: Uuid,
order_id: Uuid,
) -> Result<Option<Order>, sqlx::Error> {
sqlx::query_as::<_, Order>(
r#"SELECT * FROM orders WHERE id = $1 AND owner_id = $2"#,
)
.bind(order_id)
.bind(owner_id)
.fetch_optional(pool)
.await
}
/// Update an order (owner-scoped). Uses COALESCE pattern to only update provided fields.
pub async fn update_order(
pool: &PgPool,
owner_id: Uuid,
order_id: Uuid,
req: UpdateOrderRequest,
) -> Result<Option<Order>, sqlx::Error> {
let current = sqlx::query_as::<_, Order>(
r#"SELECT * FROM orders WHERE id = $1 AND owner_id = $2"#,
)
.bind(order_id)
.bind(owner_id)
.fetch_optional(pool)
.await?;
let current = match current {
Some(c) => c,
None => return Ok(None),
};
let title = req.title.as_deref().unwrap_or(¤t.title);
let description = req.description.as_deref().or(current.description.as_deref());
let priority = req.priority.as_deref().unwrap_or(¤t.priority);
let status = req.status.as_deref().unwrap_or(¤t.status);
let order_type = req.order_type.as_deref().unwrap_or(¤t.order_type);
let labels = req.labels.as_ref().unwrap_or(¤t.labels);
let directive_id = req.directive_id.or(current.directive_id);
let directive_step_id = req.directive_step_id.or(current.directive_step_id);
let repository_url = req.repository_url.as_deref().or(current.repository_url.as_deref());
let dog_id = req.dog_id.or(current.dog_id);
sqlx::query_as::<_, Order>(
r#"
UPDATE orders
SET title = $3, description = $4, priority = $5, status = $6,
order_type = $7, labels = $8, directive_id = $9, directive_step_id = $10,
repository_url = $11, dog_id = $12, updated_at = NOW()
WHERE id = $1 AND owner_id = $2
RETURNING *
"#,
)
.bind(order_id)
.bind(owner_id)
.bind(title)
.bind(description)
.bind(priority)
.bind(status)
.bind(order_type)
.bind(labels)
.bind(directive_id)
.bind(directive_step_id)
.bind(repository_url)
.bind(dog_id)
.fetch_optional(pool)
.await
}
/// Delete an order (owner-scoped). Returns true if a row was deleted.
pub async fn delete_order(
pool: &PgPool,
owner_id: Uuid,
order_id: Uuid,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"DELETE FROM orders WHERE id = $1 AND owner_id = $2"#,
)
.bind(order_id)
.bind(owner_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// Link an order to a directive.
pub async fn link_order_to_directive(
pool: &PgPool,
owner_id: Uuid,
order_id: Uuid,
directive_id: Uuid,
) -> Result<Option<Order>, sqlx::Error> {
sqlx::query_as::<_, Order>(
r#"
UPDATE orders
SET directive_id = $3, updated_at = NOW()
WHERE id = $1 AND owner_id = $2
RETURNING *
"#,
)
.bind(order_id)
.bind(owner_id)
.bind(directive_id)
.fetch_optional(pool)
.await
}
/// Convert an order to a directive step. Creates a new DirectiveStep from the order's
/// title and description, links the order to the new step, and returns the created step.
/// Uses the order's existing directive_id (which is required for new orders).
pub async fn convert_order_to_step(
pool: &PgPool,
owner_id: Uuid,
order_id: Uuid,
) -> Result<Option<DirectiveStep>, sqlx::Error> {
// Verify the order exists and belongs to this owner
let order = sqlx::query_as::<_, Order>(
r#"SELECT * FROM orders WHERE id = $1 AND owner_id = $2"#,
)
.bind(order_id)
.bind(owner_id)
.fetch_optional(pool)
.await?;
let order = match order {
Some(o) => o,
None => return Ok(None),
};
// Get the directive_id from the order (required for new orders, but legacy data may have NULL)
let directive_id = match order.directive_id {
Some(id) => id,
None => return Ok(None),
};
// Verify the directive exists and belongs to this owner
let directive = sqlx::query_as::<_, Directive>(
r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#,
)
.bind(directive_id)
.bind(owner_id)
.fetch_optional(pool)
.await?;
if directive.is_none() {
return Ok(None);
}
// Get the next order_index for this directive
let max_index: (Option<i32>,) = sqlx::query_as(
r#"SELECT MAX(order_index) FROM directive_steps WHERE directive_id = $1"#,
)
.bind(directive_id)
.fetch_one(pool)
.await?;
let next_index = max_index.0.unwrap_or(-1) + 1;
// Create the directive step from order data
let step = sqlx::query_as::<_, DirectiveStep>(
r#"
INSERT INTO directive_steps (directive_id, name, description, order_index)
VALUES ($1, $2, $3, $4)
RETURNING *
"#,
)
.bind(directive_id)
.bind(&order.title)
.bind(&order.description)
.bind(next_index)
.fetch_one(pool)
.await?;
// Link the order to the new step
sqlx::query(
r#"
UPDATE orders
SET directive_step_id = $3, updated_at = NOW()
WHERE id = $1 AND owner_id = $2
"#,
)
.bind(order_id)
.bind(owner_id)
.bind(step.id)
.execute(pool)
.await?;
Ok(Some(step))
}
// =============================================================================
// Order Pickup
// =============================================================================
/// Get available orders for pickup: open orders with no directive assigned
/// OR orders already linked to this specific directive that are not yet done,
/// sorted by priority (critical first) then creation date.
pub async fn get_available_orders_for_pickup(
pool: &PgPool,
owner_id: Uuid,
directive_id: Uuid,
) -> Result<Vec<Order>, sqlx::Error> {
sqlx::query_as::<_, Order>(
r#"
SELECT *
FROM orders
WHERE owner_id = $1
AND status IN ('open', 'in_progress')
AND (directive_id IS NULL OR directive_id = $2)
ORDER BY CASE priority
WHEN 'critical' THEN 0
WHEN 'high' THEN 1
WHEN 'medium' THEN 2
WHEN 'low' THEN 3
ELSE 4
END ASC, created_at ASC
"#,
)
.bind(owner_id)
.bind(directive_id)
.fetch_all(pool)
.await
}
/// Bulk-link orders to a directive by setting directive_id on matching orders.
/// Returns the count of updated rows.
pub async fn bulk_link_orders_to_directive(
pool: &PgPool,
owner_id: Uuid,
order_ids: &[Uuid],
directive_id: Uuid,
) -> Result<i64, sqlx::Error> {
let result = sqlx::query(
r#"
UPDATE orders
SET directive_id = $1, updated_at = NOW()
WHERE id = ANY($2)
AND owner_id = $3
"#,
)
.bind(directive_id)
.bind(order_ids)
.bind(owner_id)
.execute(pool)
.await?;
Ok(result.rows_affected() as i64)
}
/// Bulk update order status for a set of order IDs.
/// Returns the count of updated rows.
pub async fn bulk_update_order_status(
pool: &PgPool,
owner_id: Uuid,
order_ids: &[Uuid],
status: &str,
) -> Result<i64, sqlx::Error> {
let result = sqlx::query(
r#"UPDATE orders SET status = $1, updated_at = NOW()
WHERE id = ANY($2) AND owner_id = $3"#,
)
.bind(status)
.bind(order_ids)
.bind(owner_id)
.execute(pool)
.await?;
Ok(result.rows_affected() as i64)
}
/// Get orders linked to a specific directive step.
pub async fn get_orders_by_step_id(
pool: &PgPool,
step_id: Uuid,
) -> Result<Vec<Order>, sqlx::Error> {
sqlx::query_as::<_, Order>(
r#"SELECT * FROM orders WHERE directive_step_id = $1"#,
)
.bind(step_id)
.fetch_all(pool)
.await
}
/// Reconcile directive orders by checking linked step statuses.
/// - Orders linked to completed steps are marked "done"
/// - Orders linked to running/ready steps are marked "under_review"
/// Returns the count of orders updated.
pub async fn reconcile_directive_orders(
pool: &PgPool,
owner_id: Uuid,
directive_id: Uuid,
) -> Result<i64, sqlx::Error> {
let rows: Vec<(Uuid,)> = sqlx::query_as(
r#"
UPDATE orders o
SET status = CASE
WHEN ds.status = 'completed' THEN 'done'
WHEN ds.status IN ('running', 'ready') THEN 'under_review'
ELSE o.status
END,
updated_at = NOW()
FROM directive_steps ds
WHERE o.directive_step_id = ds.id
AND o.directive_id = $1
AND o.owner_id = $2
AND o.status NOT IN ('done', 'archived')
AND ds.status IN ('completed', 'running', 'ready')
RETURNING o.id
"#,
)
.bind(directive_id)
.bind(owner_id)
.fetch_all(pool)
.await?;
Ok(rows.len() as i64)
}
// =============================================================================
// Directive Order Group (DOG) CRUD
// =============================================================================
/// Create a new Directive Order Group (DOG) for the given owner and directive.
pub async fn create_directive_order_group(
pool: &PgPool,
directive_id: Uuid,
owner_id: Uuid,
req: CreateDirectiveOrderGroupRequest,
) -> Result<DirectiveOrderGroup, sqlx::Error> {
sqlx::query_as::<_, DirectiveOrderGroup>(
r#"
INSERT INTO directive_order_groups (directive_id, owner_id, name, description)
VALUES ($1, $2, $3, $4)
RETURNING *
"#,
)
.bind(directive_id)
.bind(owner_id)
.bind(&req.name)
.bind(&req.description)
.fetch_one(pool)
.await
}
/// List all DOGs for a given directive (owner-scoped).
pub async fn list_directive_order_groups(
pool: &PgPool,
directive_id: Uuid,
owner_id: Uuid,
) -> Result<Vec<DirectiveOrderGroup>, sqlx::Error> {
sqlx::query_as::<_, DirectiveOrderGroup>(
r#"
SELECT * FROM directive_order_groups
WHERE directive_id = $1 AND owner_id = $2
ORDER BY created_at DESC
"#,
)
.bind(directive_id)
.bind(owner_id)
.fetch_all(pool)
.await
}
/// Get a single DOG by ID (owner-scoped).
pub async fn get_directive_order_group(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
) -> Result<Option<DirectiveOrderGroup>, sqlx::Error> {
sqlx::query_as::<_, DirectiveOrderGroup>(
r#"SELECT * FROM directive_order_groups WHERE id = $1 AND owner_id = $2"#,
)
.bind(id)
.bind(owner_id)
.fetch_optional(pool)
.await
}
/// Update a DOG (owner-scoped). Uses fetch-then-update pattern for partial updates.
pub async fn update_directive_order_group(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
req: UpdateDirectiveOrderGroupRequest,
) -> Result<Option<DirectiveOrderGroup>, sqlx::Error> {
let current = sqlx::query_as::<_, DirectiveOrderGroup>(
r#"SELECT * FROM directive_order_groups WHERE id = $1 AND owner_id = $2"#,
)
.bind(id)
.bind(owner_id)
.fetch_optional(pool)
.await?;
let current = match current {
Some(c) => c,
None => return Ok(None),
};
let name = req.name.as_deref().unwrap_or(¤t.name);
let description = req.description.as_deref().or(current.description.as_deref());
let status = req.status.as_deref().unwrap_or(¤t.status);
sqlx::query_as::<_, DirectiveOrderGroup>(
r#"
UPDATE directive_order_groups
SET name = $3, description = $4, status = $5, updated_at = NOW()
WHERE id = $1 AND owner_id = $2
RETURNING *
"#,
)
.bind(id)
.bind(owner_id)
.bind(name)
.bind(description)
.bind(status)
.fetch_optional(pool)
.await
}
/// Delete a DOG (owner-scoped). Returns true if a row was deleted.
pub async fn delete_directive_order_group(
pool: &PgPool,
id: Uuid,
owner_id: Uuid,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"DELETE FROM directive_order_groups WHERE id = $1 AND owner_id = $2"#,
)
.bind(id)
.bind(owner_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
/// List orders belonging to a specific DOG (owner-scoped).
pub async fn list_orders_by_dog(
pool: &PgPool,
dog_id: Uuid,
owner_id: Uuid,
) -> Result<Vec<Order>, sqlx::Error> {
sqlx::query_as::<_, Order>(
r#"
SELECT * FROM orders
WHERE dog_id = $1 AND owner_id = $2
ORDER BY created_at DESC
"#,
)
.bind(dog_id)
.bind(owner_id)
.fetch_all(pool)
.await
}
/// Get available orders for pickup filtered to a specific DOG.
/// Like `get_available_orders_for_pickup` but only returns orders belonging to the given DOG.
pub async fn get_available_orders_for_dog_pickup(
pool: &PgPool,
owner_id: Uuid,
directive_id: Uuid,
dog_id: Uuid,
) -> Result<Vec<Order>, sqlx::Error> {
sqlx::query_as::<_, Order>(
r#"
SELECT *
FROM orders
WHERE owner_id = $1
AND dog_id = $3
AND status IN ('open', 'in_progress')
AND (directive_id IS NULL OR directive_id = $2)
ORDER BY CASE priority
WHEN 'critical' THEN 0
WHEN 'high' THEN 1
WHEN 'medium' THEN 2
WHEN 'low' THEN 3
ELSE 4
END ASC, created_at ASC
"#,
)
.bind(owner_id)
.bind(directive_id)
.bind(dog_id)
.fetch_all(pool)
.await
}
// ─── User Settings ───────────────────────────────────────────────────────────
/// Get all settings for a given owner.
pub async fn get_user_settings(
pool: &PgPool,
owner_id: Uuid,
) -> Result<Vec<UserSetting>, sqlx::Error> {
sqlx::query_as::<_, UserSetting>(
r#"
SELECT id, owner_id, key, value, created_at, updated_at
FROM user_settings
WHERE owner_id = $1
ORDER BY key ASC
"#,
)
.bind(owner_id)
.fetch_all(pool)
.await
}
/// Get a single setting by owner and key.
pub async fn get_user_setting(
pool: &PgPool,
owner_id: Uuid,
key: &str,
) -> Result<Option<UserSetting>, sqlx::Error> {
sqlx::query_as::<_, UserSetting>(
r#"
SELECT id, owner_id, key, value, created_at, updated_at
FROM user_settings
WHERE owner_id = $1 AND key = $2
"#,
)
.bind(owner_id)
.bind(key)
.fetch_optional(pool)
.await
}
/// Upsert a user setting (insert or update on conflict).
pub async fn upsert_user_setting(
pool: &PgPool,
owner_id: Uuid,
key: &str,
value: &serde_json::Value,
) -> Result<UserSetting, sqlx::Error> {
sqlx::query_as::<_, UserSetting>(
r#"
INSERT INTO user_settings (owner_id, key, value)
VALUES ($1, $2, $3)
ON CONFLICT (owner_id, key) DO UPDATE
SET value = EXCLUDED.value, updated_at = now()
RETURNING id, owner_id, key, value, created_at, updated_at
"#,
)
.bind(owner_id)
.bind(key)
.bind(value)
.fetch_one(pool)
.await
}
/// Delete a user setting. Returns true if a row was deleted.
pub async fn delete_user_setting(
pool: &PgPool,
owner_id: Uuid,
key: &str,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM user_settings
WHERE owner_id = $1 AND key = $2
"#,
)
.bind(owner_id)
.bind(key)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}