//! Repository pattern for file database operations. use chrono::Utc; use serde::Deserialize; use sqlx::PgPool; use uuid::Uuid; use super::models::{ CheckpointPatch, CheckpointPatchInfo, Contract, ContractChatConversation, ContractChatMessageRecord, ContractEvent, ContractRepository, ContractSummary, ContractTypeTemplateRecord, ConversationMessage, ConversationSnapshot, CreateContractRequest, CreateFileRequest, CreateTaskRequest, CreateTemplateRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, DeliverableDefinition, Directive, DirectiveDocument, DirectiveStep, DirectiveSummary, CreateDirectiveRequest, CreateDirectiveStepRequest, DirectiveGoalHistory, UpdateDirectiveRequest, UpdateDirectiveStepRequest, CreateOrderRequest, Order, UpdateOrderRequest, CreateDirectiveOrderGroupRequest, DirectiveOrderGroup, UpdateDirectiveOrderGroupRequest, File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters, MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult, PhaseConfig, PhaseDefinition, SupervisorHeartbeatRecord, SupervisorState, Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest, UpdateFileRequest, UpdateTaskRequest, UpdateTemplateRequest, }; /// Repository error types. #[derive(Debug)] pub enum RepositoryError { /// Database error Database(sqlx::Error), /// Version conflict (optimistic locking failure) VersionConflict { /// The version the client expected expected: i32, /// The actual current version in the database actual: i32, }, /// Caller-facing precondition failure (wrong status, etc.). Validation(String), } impl From for RepositoryError { fn from(e: sqlx::Error) -> Self { RepositoryError::Database(e) } } impl std::fmt::Display for RepositoryError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { RepositoryError::Database(e) => write!(f, "Database error: {}", e), RepositoryError::VersionConflict { expected, actual } => { write!( f, "Version conflict: expected {}, actual {}", expected, actual ) } RepositoryError::Validation(msg) => write!(f, "Validation error: {}", msg), } } } impl std::error::Error for RepositoryError {} /// Generate a default name based on current timestamp. fn generate_default_name() -> String { let now = Utc::now(); now.format("Recording - %b %d %Y %H:%M:%S").to_string() } /// Internal request for creating files without contract association (e.g., audio transcription). /// User-facing file creation should use CreateFileRequest which requires contract_id. pub struct InternalCreateFileRequest { pub name: Option, pub description: Option, pub transcript: Vec, pub location: Option, } /// Create a new file record (internal use, no contract required). /// For user-facing file creation, use create_file_for_owner which requires a contract. pub async fn create_file(pool: &PgPool, req: InternalCreateFileRequest) -> Result { let name = req.name.unwrap_or_else(generate_default_name); let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default(); let body_json = serde_json::to_value::>(vec![]).unwrap(); sqlx::query_as::<_, File>( r#" INSERT INTO files (name, description, transcript, location, summary, body) VALUES ($1, $2, $3, $4, NULL, $5) RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(&name) .bind(&req.description) .bind(&transcript_json) .bind(&req.location) .bind(&body_json) .fetch_one(pool) .await } /// Get a file by ID. pub async fn get_file(pool: &PgPool, id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, File>( r#" SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files WHERE id = $1 "#, ) .bind(id) .fetch_optional(pool) .await } /// List all files, ordered by created_at DESC. pub async fn list_files(pool: &PgPool) -> Result, sqlx::Error> { sqlx::query_as::<_, File>( r#" SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files ORDER BY created_at DESC "#, ) .fetch_all(pool) .await } /// Update a file by ID with optimistic locking. /// /// If `req.version` is provided, the update will only succeed if the current /// version matches. Returns `RepositoryError::VersionConflict` if there's a mismatch. /// /// If `req.version` is None (e.g., internal system updates), version checking is skipped. pub async fn update_file( pool: &PgPool, id: Uuid, req: UpdateFileRequest, ) -> Result, RepositoryError> { // Get the existing file first let existing = get_file(pool, id).await?; let Some(existing) = existing else { return Ok(None); }; // Check version if provided (optimistic locking) if let Some(expected_version) = req.version { if existing.version != expected_version { return Err(RepositoryError::VersionConflict { expected: expected_version, actual: existing.version, }); } } // Apply updates let name = req.name.unwrap_or(existing.name); let description = req.description.or(existing.description); let transcript = req.transcript.unwrap_or(existing.transcript); let transcript_json = serde_json::to_value(&transcript).unwrap_or_default(); let summary = req.summary.or(existing.summary); let body = req.body.unwrap_or(existing.body); let body_json = serde_json::to_value(&body).unwrap_or_default(); // Update with version check in WHERE clause for race condition safety let result = if req.version.is_some() { sqlx::query_as::<_, File>( r#" UPDATE files SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW() WHERE id = $1 AND version = $7 RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) .bind(&name) .bind(&description) .bind(&transcript_json) .bind(&summary) .bind(&body_json) .bind(req.version.unwrap()) .fetch_optional(pool) .await? } else { // No version check for internal updates sqlx::query_as::<_, File>( r#" UPDATE files SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW() WHERE id = $1 RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) .bind(&name) .bind(&description) .bind(&transcript_json) .bind(&summary) .bind(&body_json) .fetch_optional(pool) .await? }; // If versioned update returned None, there was a race condition if result.is_none() && req.version.is_some() { // Re-fetch to get the actual version if let Some(current) = get_file(pool, id).await? { return Err(RepositoryError::VersionConflict { expected: req.version.unwrap(), actual: current.version, }); } } Ok(result) } /// Delete a file by ID. pub async fn delete_file(pool: &PgPool, id: Uuid) -> Result { let result = sqlx::query( r#" DELETE FROM files WHERE id = $1 "#, ) .bind(id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Count total files. pub async fn count_files(pool: &PgPool) -> Result { let result: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM files") .fetch_one(pool) .await?; Ok(result.0) } // ============================================================================= // Owner-Scoped File Functions // ============================================================================= /// Create a new file record for a specific owner. /// Files must belong to a contract - the contract_id is required and the phase is looked up. pub async fn create_file_for_owner( pool: &PgPool, owner_id: Uuid, req: CreateFileRequest, ) -> Result { let name = req.name.unwrap_or_else(generate_default_name); let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default(); // Use body from request (may be empty or contain template elements) let body_json = serde_json::to_value(&req.body).unwrap_or_default(); // Use provided contract_phase, or look up from contract's current phase let contract_phase: Option = if req.contract_phase.is_some() { req.contract_phase } else { sqlx::query_scalar( "SELECT phase FROM contracts WHERE id = $1 AND owner_id = $2", ) .bind(req.contract_id) .bind(owner_id) .fetch_optional(pool) .await? }; sqlx::query_as::<_, File>( r#" INSERT INTO files (owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, repo_file_path) VALUES ($1, $2, $3, $4, $5, $6, $7, NULL, $8, $9) RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(owner_id) .bind(req.contract_id) .bind(&contract_phase) .bind(&name) .bind(&req.description) .bind(&transcript_json) .bind(&req.location) .bind(&body_json) .bind(&req.repo_file_path) .fetch_one(pool) .await } /// Get a file by ID, scoped to owner. pub async fn get_file_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, File>( r#" SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await } /// List all files for an owner, ordered by created_at DESC. pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, File>( r#" SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files WHERE owner_id = $1 ORDER BY created_at DESC "#, ) .bind(owner_id) .fetch_all(pool) .await } /// Database row type for file summary with contract info #[derive(Debug, sqlx::FromRow)] struct FileSummaryRow { id: Uuid, contract_id: Option, contract_name: Option, contract_phase: Option, name: String, description: Option, #[sqlx(json)] transcript: Vec, version: i32, repo_file_path: Option, repo_sync_status: Option, created_at: chrono::DateTime, updated_at: chrono::DateTime, } /// List file summaries for an owner with contract info (joined). pub async fn list_file_summaries_for_owner( pool: &PgPool, owner_id: Uuid, ) -> Result, sqlx::Error> { let rows = sqlx::query_as::<_, FileSummaryRow>( r#" SELECT f.id, f.contract_id, c.name as contract_name, f.contract_phase, f.name, f.description, f.transcript, f.version, f.repo_file_path, f.repo_sync_status, f.created_at, f.updated_at FROM files f LEFT JOIN contracts c ON f.contract_id = c.id WHERE f.owner_id = $1 ORDER BY f.created_at DESC "#, ) .bind(owner_id) .fetch_all(pool) .await?; Ok(rows .into_iter() .map(|row| { let duration = row .transcript .iter() .map(|t| t.end) .fold(0.0_f32, f32::max); FileSummary { id: row.id, contract_id: row.contract_id, contract_name: row.contract_name, contract_phase: row.contract_phase, name: row.name, description: row.description, transcript_count: row.transcript.len(), duration: if duration > 0.0 { Some(duration) } else { None }, version: row.version, repo_file_path: row.repo_file_path, repo_sync_status: row.repo_sync_status, created_at: row.created_at, updated_at: row.updated_at, } }) .collect()) } /// Update a file by ID with optimistic locking, scoped to owner. pub async fn update_file_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, req: UpdateFileRequest, ) -> Result, RepositoryError> { // Get the existing file first (scoped to owner) let existing = get_file_for_owner(pool, id, owner_id).await?; let Some(existing) = existing else { return Ok(None); }; // Check version if provided (optimistic locking) if let Some(expected_version) = req.version { if existing.version != expected_version { return Err(RepositoryError::VersionConflict { expected: expected_version, actual: existing.version, }); } } // Apply updates let name = req.name.unwrap_or(existing.name); let description = req.description.or(existing.description); let transcript = req.transcript.unwrap_or(existing.transcript); let transcript_json = serde_json::to_value(&transcript).unwrap_or_default(); let summary = req.summary.or(existing.summary); let body = req.body.unwrap_or(existing.body); let body_json = serde_json::to_value(&body).unwrap_or_default(); // Update with version check in WHERE clause for race condition safety let result = if req.version.is_some() { sqlx::query_as::<_, File>( r#" UPDATE files SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() WHERE id = $1 AND owner_id = $2 AND version = $8 RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) .bind(owner_id) .bind(&name) .bind(&description) .bind(&transcript_json) .bind(&summary) .bind(&body_json) .bind(req.version.unwrap()) .fetch_optional(pool) .await? } else { // No version check for internal updates sqlx::query_as::<_, File>( r#" UPDATE files SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() WHERE id = $1 AND owner_id = $2 RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) .bind(owner_id) .bind(&name) .bind(&description) .bind(&transcript_json) .bind(&summary) .bind(&body_json) .fetch_optional(pool) .await? }; // If versioned update returned None, there was a race condition if result.is_none() && req.version.is_some() { // Re-fetch to get the actual version if let Some(current) = get_file_for_owner(pool, id, owner_id).await? { return Err(RepositoryError::VersionConflict { expected: req.version.unwrap(), actual: current.version, }); } } Ok(result) } /// Delete a file by ID, scoped to owner. pub async fn delete_file_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result { let result = sqlx::query( r#" DELETE FROM files WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } // ============================================================================= // Version History Functions // ============================================================================= /// Set the version source for the current transaction. /// This is used by the trigger to record who made the change. pub async fn set_version_source(pool: &PgPool, source: &str) -> Result<(), sqlx::Error> { sqlx::query(&format!("SET LOCAL app.version_source = '{}'", source)) .execute(pool) .await?; Ok(()) } /// Set the change description for the current transaction. pub async fn set_change_description(pool: &PgPool, description: &str) -> Result<(), sqlx::Error> { // Escape single quotes for SQL let escaped = description.replace('\'', "''"); sqlx::query(&format!("SET LOCAL app.change_description = '{}'", escaped)) .execute(pool) .await?; Ok(()) } /// List all versions of a file, ordered by version DESC. pub async fn list_file_versions(pool: &PgPool, file_id: Uuid) -> Result, sqlx::Error> { // First get the current version from the files table let current = get_file(pool, file_id).await?; let mut versions = sqlx::query_as::<_, FileVersion>( r#" SELECT id, file_id, version, name, description, summary, body, source, change_description, created_at FROM file_versions WHERE file_id = $1 ORDER BY version DESC "#, ) .bind(file_id) .fetch_all(pool) .await?; // Add the current version as the first entry if it exists if let Some(file) = current { let current_version = FileVersion { id: file.id, file_id: file.id, version: file.version, name: file.name, description: file.description, summary: file.summary, body: file.body, source: "user".to_string(), // Current version source change_description: None, created_at: file.updated_at, }; versions.insert(0, current_version); } Ok(versions) } /// Get a specific version of a file. pub async fn get_file_version( pool: &PgPool, file_id: Uuid, version: i32, ) -> Result, sqlx::Error> { // First check if this is the current version if let Some(file) = get_file(pool, file_id).await? { if file.version == version { return Ok(Some(FileVersion { id: file.id, file_id: file.id, version: file.version, name: file.name, description: file.description, summary: file.summary, body: file.body, source: "user".to_string(), change_description: None, created_at: file.updated_at, })); } } // Otherwise, look in the versions table sqlx::query_as::<_, FileVersion>( r#" SELECT id, file_id, version, name, description, summary, body, source, change_description, created_at FROM file_versions WHERE file_id = $1 AND version = $2 "#, ) .bind(file_id) .bind(version) .fetch_optional(pool) .await } /// Restore a file to a previous version. /// This creates a new version with the content from the target version. pub async fn restore_file_version( pool: &PgPool, file_id: Uuid, target_version: i32, current_version: i32, ) -> Result, RepositoryError> { // Get the target version content let target = get_file_version(pool, file_id, target_version).await?; let Some(target) = target else { return Ok(None); }; // Set version source and description for the trigger set_version_source(pool, "system").await?; set_change_description(pool, &format!("Restored from version {}", target_version)).await?; // Update the file with the target version's content // This will trigger the save_file_version trigger to save the current state first let update_req = UpdateFileRequest { name: Some(target.name), description: target.description, transcript: None, summary: target.summary, body: Some(target.body), version: Some(current_version), repo_file_path: None, }; update_file(pool, file_id, update_req).await } /// Count versions for a file. pub async fn count_file_versions(pool: &PgPool, file_id: Uuid) -> Result { let result: (i64,) = sqlx::query_as( "SELECT COUNT(*) + 1 FROM file_versions WHERE file_id = $1", // +1 for current version ) .bind(file_id) .fetch_one(pool) .await?; Ok(result.0) } // ============================================================================= // Task Functions // ============================================================================= /// Create a new task. /// /// If creating a subtask (parent_task_id is set) and repository settings are not provided, /// the subtask will inherit repository_url, base_branch, target_branch, merge_mode, /// and target_repo_path from the parent task. Depth is calculated from parent and limited /// to max 1 (2 levels: orchestrator at depth 0, subtasks at depth 1). /// /// NOTE: completion_action is NOT inherited - subtasks should not auto-merge unless /// explicitly configured. The supervisor controls when completion steps happen. /// /// Task spawning is now controlled by supervisors at the application level. /// Depth is no longer constrained in the database. pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result { // Calculate depth and inherit settings from parent if applicable let (depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = if let Some(parent_id) = req.parent_task_id { // Fetch parent task to get depth and inherit settings let parent = get_task(pool, parent_id).await? .ok_or_else(|| sqlx::Error::RowNotFound)?; let new_depth = parent.depth + 1; // Subtasks inherit contract_id from parent (or use request contract_id if parent has none) let contract_id = parent.contract_id.or(req.contract_id); // Inherit repo settings if not provided let repo_url = req.repository_url.clone().or(parent.repository_url); let base_branch = req.base_branch.clone().or(parent.base_branch); let target_branch = req.target_branch.clone().or(parent.target_branch); let merge_mode = req.merge_mode.clone().or(parent.merge_mode); let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path); // NOTE: completion_action is NOT inherited - subtasks should not auto-merge. // The supervisor integrates subtask work from their worktrees. let completion_action = req.completion_action.clone(); (new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) } else { // Top-level task: depth 0, use contract_id from request (may be None for branched tasks) ( 0, req.contract_id, req.repository_url.clone(), req.base_branch.clone(), req.target_branch.clone(), req.merge_mode.clone(), req.target_repo_path.clone(), req.completion_action.clone(), ) }; let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default()); sqlx::query_as::<_, Task>( r#" INSERT INTO tasks ( contract_id, parent_task_id, depth, name, description, plan, priority, is_supervisor, repository_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action, continue_from_task_id, copy_files, branched_from_task_id, conversation_state, supervisor_worktree_task_id ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19) RETURNING * "#, ) .bind(contract_id) .bind(req.parent_task_id) .bind(depth) .bind(&req.name) .bind(&req.description) .bind(&req.plan) .bind(req.priority) .bind(req.is_supervisor) .bind(&repo_url) .bind(&base_branch) .bind(&target_branch) .bind(&merge_mode) .bind(&target_repo_path) .bind(&completion_action) .bind(&req.continue_from_task_id) .bind(©_files_json) .bind(&req.branched_from_task_id) .bind(&req.conversation_history) .bind(&req.supervisor_worktree_task_id) .fetch_one(pool) .await } /// Get a task by ID. pub async fn get_task(pool: &PgPool, id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" SELECT * FROM tasks WHERE id = $1 "#, ) .bind(id) .fetch_optional(pool) .await } /// List all top-level tasks (no parent), ordered by created_at DESC. /// Hidden tasks are excluded by default. pub async fn list_tasks(pool: &PgPool) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, c.status as contract_status, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id IS NULL AND COALESCE(t.hidden, false) = false ORDER BY t.priority DESC, t.created_at DESC "#, ) .fetch_all(pool) .await } /// List subtasks of a parent task. pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, c.status as contract_status, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id = $1 ORDER BY t.priority DESC, t.created_at DESC "#, ) .bind(parent_id) .fetch_all(pool) .await } /// List all tasks in a contract (for supervisor tree view). pub async fn list_tasks_by_contract( pool: &PgPool, contract_id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" SELECT * FROM tasks WHERE contract_id = $1 AND owner_id = $2 ORDER BY is_supervisor DESC, depth ASC, created_at ASC "#, ) .bind(contract_id) .bind(owner_id) .fetch_all(pool) .await } /// Get pending tasks for a contract (non-supervisor tasks only). /// Includes tasks that were interrupted (retry candidates). /// Prioritizes interrupted tasks and excludes those that exceeded max_retries. pub async fn get_pending_tasks_for_contract( pool: &PgPool, contract_id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" SELECT t.* FROM tasks t WHERE t.contract_id = $1 AND t.owner_id = $2 AND t.status = 'pending' AND t.retry_count < t.max_retries AND t.is_supervisor = false ORDER BY t.interrupted_at DESC NULLS LAST, t.priority DESC, t.created_at ASC "#, ) .bind(contract_id) .bind(owner_id) .fetch_all(pool) .await } /// Get all contracts that have pending tasks awaiting retry. /// Returns tuples of (contract_id, owner_id) for contracts with retryable tasks. pub async fn get_all_pending_task_contracts( pool: &PgPool, ) -> Result, sqlx::Error> { sqlx::query_as::<_, (Uuid, Uuid)>( r#" SELECT DISTINCT t.contract_id, t.owner_id FROM tasks t WHERE t.contract_id IS NOT NULL AND t.status = 'pending' AND t.retry_count < t.max_retries AND t.is_supervisor = false ORDER BY t.owner_id, t.contract_id "#, ) .fetch_all(pool) .await } /// Mark a task as pending for retry after daemon failure. /// Increments retry count and adds the failed daemon to exclusion list. pub async fn mark_task_for_retry( pool: &PgPool, task_id: Uuid, failed_daemon_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" UPDATE tasks SET status = 'pending', daemon_id = NULL, retry_count = retry_count + 1, failed_daemon_ids = array_append(COALESCE(failed_daemon_ids, '{}'), $2), last_active_daemon_id = $2, interrupted_at = NOW(), error_message = 'Daemon disconnected, awaiting retry', updated_at = NOW() WHERE id = $1 AND retry_count < max_retries RETURNING * "#, ) .bind(task_id) .bind(failed_daemon_id) .fetch_optional(pool) .await } /// Mark a task as permanently failed (exceeded retry limit). pub async fn mark_task_permanently_failed( pool: &PgPool, task_id: Uuid, failed_daemon_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( r#" UPDATE tasks SET status = 'failed', daemon_id = NULL, retry_count = retry_count + 1, failed_daemon_ids = array_append(COALESCE(failed_daemon_ids, '{}'), $2), last_active_daemon_id = $2, error_message = 'Task failed: exceeded maximum retry attempts', updated_at = NOW() WHERE id = $1 "#, ) .bind(task_id) .bind(failed_daemon_id) .execute(pool) .await?; Ok(()) } /// Update a task by ID with optimistic locking. pub async fn update_task( pool: &PgPool, id: Uuid, req: UpdateTaskRequest, ) -> Result, RepositoryError> { // Get the existing task first let existing = get_task(pool, id).await?; let Some(existing) = existing else { return Ok(None); }; // Check version if provided (optimistic locking) if let Some(expected_version) = req.version { if existing.version != expected_version { return Err(RepositoryError::VersionConflict { expected: expected_version, actual: existing.version, }); } } // Apply updates let name = req.name.unwrap_or(existing.name); let description = req.description.or(existing.description); let plan = req.plan.unwrap_or(existing.plan); let status = req.status.unwrap_or(existing.status); let priority = req.priority.unwrap_or(existing.priority); let progress_summary = req.progress_summary.or(existing.progress_summary); let last_output = req.last_output.or(existing.last_output); let error_message = req.error_message.or(existing.error_message); let merge_mode = req.merge_mode.or(existing.merge_mode); let pr_url = req.pr_url.or(existing.pr_url); let target_repo_path = req.target_repo_path.or(existing.target_repo_path); let completion_action = req.completion_action.or(existing.completion_action); // Handle clear_daemon_id: if true, set to NULL; otherwise use provided value or keep existing let daemon_id = if req.clear_daemon_id { None } else { req.daemon_id.or(existing.daemon_id) }; // Update with version check in WHERE clause for race condition safety let result = if req.version.is_some() { sqlx::query_as::<_, Task>( r#" UPDATE tasks SET name = $2, description = $3, plan = $4, status = $5, priority = $6, progress_summary = $7, last_output = $8, error_message = $9, merge_mode = $10, pr_url = $11, daemon_id = $12, target_repo_path = $13, completion_action = $14, updated_at = NOW() WHERE id = $1 AND version = $15 RETURNING * "#, ) .bind(id) .bind(&name) .bind(&description) .bind(&plan) .bind(&status) .bind(priority) .bind(&progress_summary) .bind(&last_output) .bind(&error_message) .bind(&merge_mode) .bind(&pr_url) .bind(daemon_id) .bind(&target_repo_path) .bind(&completion_action) .bind(req.version.unwrap()) .fetch_optional(pool) .await? } else { sqlx::query_as::<_, Task>( r#" UPDATE tasks SET name = $2, description = $3, plan = $4, status = $5, priority = $6, progress_summary = $7, last_output = $8, error_message = $9, merge_mode = $10, pr_url = $11, daemon_id = $12, target_repo_path = $13, completion_action = $14, updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(id) .bind(&name) .bind(&description) .bind(&plan) .bind(&status) .bind(priority) .bind(&progress_summary) .bind(&last_output) .bind(&error_message) .bind(&merge_mode) .bind(&pr_url) .bind(daemon_id) .bind(&target_repo_path) .bind(&completion_action) .fetch_optional(pool) .await? }; // If versioned update returned None, there was a race condition if result.is_none() && req.version.is_some() { if let Some(current) = get_task(pool, id).await? { return Err(RepositoryError::VersionConflict { expected: req.version.unwrap(), actual: current.version, }); } } Ok(result) } /// Delete a task by ID. pub async fn delete_task(pool: &PgPool, id: Uuid) -> Result { let result = sqlx::query( r#" DELETE FROM tasks WHERE id = $1 "#, ) .bind(id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Count total tasks. pub async fn count_tasks(pool: &PgPool) -> Result { let result: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM tasks WHERE parent_task_id IS NULL", ) .fetch_one(pool) .await?; Ok(result.0) } // ============================================================================= // Owner-Scoped Task Functions // ============================================================================= /// Create a new task for a specific owner. pub async fn create_task_for_owner( pool: &PgPool, owner_id: Uuid, req: CreateTaskRequest, ) -> Result { // Calculate depth and inherit settings from parent if applicable let (depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = if let Some(parent_id) = req.parent_task_id { // Fetch parent task to get depth and inherit settings (must belong to same owner) let parent = get_task_for_owner(pool, parent_id, owner_id).await? .ok_or_else(|| sqlx::Error::RowNotFound)?; let new_depth = parent.depth + 1; // Validate max depth if new_depth >= 2 { return Err(sqlx::Error::Protocol(format!( "Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.", new_depth ))); } // Subtasks inherit contract_id from parent (or use request contract_id if parent has none) let contract_id = parent.contract_id.or(req.contract_id); // Inherit repo settings if not provided let repo_url = req.repository_url.clone().or(parent.repository_url); let base_branch = req.base_branch.clone().or(parent.base_branch); let target_branch = req.target_branch.clone().or(parent.target_branch); let merge_mode = req.merge_mode.clone().or(parent.merge_mode); let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path); // NOTE: completion_action is NOT inherited - subtasks should not auto-merge. // The orchestrator integrates subtask work from their worktrees. let completion_action = req.completion_action.clone(); (new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) } else { // Top-level task: depth 0, use contract_id from request (may be None for branched tasks) ( 0, req.contract_id, req.repository_url.clone(), req.base_branch.clone(), req.target_branch.clone(), req.merge_mode.clone(), req.target_repo_path.clone(), req.completion_action.clone(), ) }; let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default()); // Resolve the directive_document_id. Tasks plumbed through this builder // currently have no way to specify a document explicitly (we don't want // to widen `CreateTaskRequest` for this — every call site would have to // change). Instead, when the task is directive-driven (directive_id is // set) we attach it to that directive's most recently-updated active // document so the task lands under that document's tasks/ subfolder in // the sidebar. Resolution failures are non-fatal — the task still gets // created with directive_document_id = NULL, matching legacy behaviour. let directive_document_id = match req.directive_id { Some(directive_id) => resolve_active_document_for_directive(pool, directive_id) .await .unwrap_or(None), None => None, }; sqlx::query_as::<_, Task>( r#" INSERT INTO tasks ( owner_id, contract_id, parent_task_id, depth, name, description, plan, priority, is_supervisor, repository_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action, continue_from_task_id, copy_files, branched_from_task_id, conversation_state, supervisor_worktree_task_id, directive_id, directive_step_id, directive_document_id ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23) RETURNING * "#, ) .bind(owner_id) .bind(contract_id) .bind(req.parent_task_id) .bind(depth) .bind(&req.name) .bind(&req.description) .bind(&req.plan) .bind(req.priority) .bind(req.is_supervisor) .bind(&repo_url) .bind(&base_branch) .bind(&target_branch) .bind(&merge_mode) .bind(&target_repo_path) .bind(&completion_action) .bind(&req.continue_from_task_id) .bind(©_files_json) .bind(&req.branched_from_task_id) .bind(&req.conversation_history) .bind(&req.supervisor_worktree_task_id) .bind(&req.directive_id) .bind(&req.directive_step_id) .bind(&directive_document_id) .fetch_one(pool) .await } /// Pick the directive's "current" document for tasks/steps to attach to. /// /// Selection rule, in order of preference: /// 1. The most recently `updated_at` document with `status = 'active'`. /// 2. If no active doc exists, the most recently `updated_at` document /// with `status = 'draft'` — covers the case where a fresh draft was /// auto-created post-ship and the orchestrator is now spawning tasks /// against it before the user has even touched it. /// 3. None — directive has no documents at all. /// /// Returning `Ok(None)` is fine and expected (e.g., directives that pre-date /// the document model on a fresh DB, or directives whose only doc is shipped /// + no fresh draft exists yet). The task/step is then stored with /// `directive_document_id = NULL`, which the sidebar already tolerates. async fn resolve_active_document_for_directive( pool: &PgPool, directive_id: Uuid, ) -> Result, sqlx::Error> { let row: Option<(Uuid,)> = sqlx::query_as( r#" SELECT id FROM directive_documents WHERE directive_id = $1 AND status IN ('active', 'draft') ORDER BY CASE status WHEN 'active' THEN 0 WHEN 'draft' THEN 1 ELSE 2 END, updated_at DESC LIMIT 1 "#, ) .bind(directive_id) .fetch_optional(pool) .await?; Ok(row.map(|r| r.0)) } /// Get a task by ID, scoped to owner. pub async fn get_task_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" SELECT * FROM tasks WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await } /// List all top-level tasks (no parent) for an owner, ordered by created_at DESC. /// Hidden tasks are excluded by default. pub async fn list_tasks_for_owner( pool: &PgPool, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, c.status as contract_status, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.owner_id = $1 AND t.parent_task_id IS NULL AND COALESCE(t.hidden, false) = false ORDER BY t.priority DESC, t.created_at DESC "#, ) .bind(owner_id) .fetch_all(pool) .await } // ============================================================================= // Tmp directive — per-owner scratchpad // ============================================================================= /// Get the owner's tmp directive, creating it on the fly if absent. Idempotent /// thanks to the partial unique index on (owner_id) WHERE is_tmp. /// /// We try an INSERT first with ON CONFLICT DO NOTHING; if a row was inserted /// it's returned, otherwise we fall back to a SELECT for the row some other /// request just created (or one that already existed). pub async fn get_or_create_tmp_directive( pool: &PgPool, owner_id: Uuid, ) -> Result { // Try insert first. RETURNING fires only if a row was actually written; // if the partial unique index trips (a tmp directive already exists) // we get None and fall through to the SELECT. let inserted = sqlx::query_as::<_, Directive>( r#" INSERT INTO directives (owner_id, title, goal, status, reconcile_mode, is_tmp) VALUES ($1, 'tmp', '', 'idle', 'auto', true) ON CONFLICT DO NOTHING RETURNING * "#, ) .bind(owner_id) .fetch_optional(pool) .await?; if let Some(d) = inserted { return Ok(d); } // Pre-existing or just-created-by-someone-else: fetch. sqlx::query_as::<_, Directive>( r#"SELECT * FROM directives WHERE owner_id = $1 AND is_tmp = true LIMIT 1"#, ) .bind(owner_id) .fetch_one(pool) .await } /// Find every tmp directive (across owners). Used by the 30-day expiry /// sweep — we need to know which directives are scratchpads so we know /// which tasks to age out. pub async fn list_all_tmp_directives( pool: &PgPool, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Directive>( r#"SELECT * FROM directives WHERE is_tmp = true"#, ) .fetch_all(pool) .await } /// Delete tasks attached to a tmp directive that are older than 30 days. /// Returns the number of rows deleted (informational; we log it). /// /// We only sweep top-level tasks (parent_task_id IS NULL) — subtasks die /// when their parent dies via the FK cascade. pub async fn delete_expired_tmp_tasks( pool: &PgPool, tmp_directive_id: Uuid, ) -> Result { let result = sqlx::query( r#" DELETE FROM tasks WHERE directive_id = $1 AND parent_task_id IS NULL AND created_at < NOW() - INTERVAL '30 days' "#, ) .bind(tmp_directive_id) .execute(pool) .await?; Ok(result.rows_affected()) } /// List ephemeral tasks attached to a directive — tasks with `directive_id` /// set but no `directive_step_id`. These are the "spinoff" tasks the user /// created via the directive folder context menu, distinct from /// step-spawned execution tasks. Hidden tasks excluded. pub async fn list_ephemeral_directive_tasks_for_owner( pool: &PgPool, owner_id: Uuid, directive_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, c.status as contract_status, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.owner_id = $1 AND t.directive_id = $2 AND t.directive_step_id IS NULL AND t.parent_task_id IS NULL AND COALESCE(t.hidden, false) = false ORDER BY t.created_at DESC "#, ) .bind(owner_id) .bind(directive_id) .fetch_all(pool) .await } /// List top-level tasks attached to the owner's tmp directive. These are /// the scratchpad / orphan tasks surfaced under the sidebar's `tmp/` /// folder. Auto-creates the tmp directive if it doesn't exist yet so the /// caller never has to handle "no tmp directive". pub async fn list_tmp_tasks_for_owner( pool: &PgPool, owner_id: Uuid, ) -> Result, sqlx::Error> { let tmp = get_or_create_tmp_directive(pool, owner_id).await?; sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, c.status as contract_status, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.owner_id = $1 AND t.directive_id = $2 AND t.parent_task_id IS NULL AND COALESCE(t.hidden, false) = false ORDER BY t.priority DESC, t.created_at DESC "#, ) .bind(owner_id) .bind(tmp.id) .fetch_all(pool) .await } /// List subtasks of a parent task, scoped to owner. pub async fn list_subtasks_for_owner( pool: &PgPool, parent_id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, c.status as contract_status, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.owner_id = $1 AND t.parent_task_id = $2 ORDER BY t.priority DESC, t.created_at DESC "#, ) .bind(owner_id) .bind(parent_id) .fetch_all(pool) .await } /// Update a task by ID with optimistic locking, scoped to owner. pub async fn update_task_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, req: UpdateTaskRequest, ) -> Result, RepositoryError> { // Get the existing task first (scoped to owner) let existing = get_task_for_owner(pool, id, owner_id).await?; let Some(existing) = existing else { return Ok(None); }; // Check version if provided (optimistic locking) if let Some(expected_version) = req.version { if existing.version != expected_version { return Err(RepositoryError::VersionConflict { expected: expected_version, actual: existing.version, }); } } // Apply updates let name = req.name.unwrap_or(existing.name); let description = req.description.or(existing.description); let plan = req.plan.unwrap_or(existing.plan); let status = req.status.unwrap_or(existing.status); let priority = req.priority.unwrap_or(existing.priority); let progress_summary = req.progress_summary.or(existing.progress_summary); let last_output = req.last_output.or(existing.last_output); let error_message = req.error_message.or(existing.error_message); let merge_mode = req.merge_mode.or(existing.merge_mode); let pr_url = req.pr_url.or(existing.pr_url); let repository_url = req.repository_url.or(existing.repository_url); let target_repo_path = req.target_repo_path.or(existing.target_repo_path); let completion_action = req.completion_action.or(existing.completion_action); let hidden = req.hidden.unwrap_or(existing.hidden); let daemon_id = if req.clear_daemon_id { None } else { req.daemon_id.or(existing.daemon_id) }; // Update with version check in WHERE clause for race condition safety let result = if req.version.is_some() { sqlx::query_as::<_, Task>( r#" UPDATE tasks SET name = $3, description = $4, plan = $5, status = $6, priority = $7, progress_summary = $8, last_output = $9, error_message = $10, merge_mode = $11, pr_url = $12, daemon_id = $13, target_repo_path = $14, completion_action = $15, repository_url = $16, hidden = $17, updated_at = NOW() WHERE id = $1 AND owner_id = $2 AND version = $18 RETURNING * "#, ) .bind(id) .bind(owner_id) .bind(&name) .bind(&description) .bind(&plan) .bind(&status) .bind(priority) .bind(&progress_summary) .bind(&last_output) .bind(&error_message) .bind(&merge_mode) .bind(&pr_url) .bind(daemon_id) .bind(&target_repo_path) .bind(&completion_action) .bind(&repository_url) .bind(hidden) .bind(req.version.unwrap()) .fetch_optional(pool) .await? } else { sqlx::query_as::<_, Task>( r#" UPDATE tasks SET name = $3, description = $4, plan = $5, status = $6, priority = $7, progress_summary = $8, last_output = $9, error_message = $10, merge_mode = $11, pr_url = $12, daemon_id = $13, target_repo_path = $14, completion_action = $15, repository_url = $16, hidden = $17, updated_at = NOW() WHERE id = $1 AND owner_id = $2 RETURNING * "#, ) .bind(id) .bind(owner_id) .bind(&name) .bind(&description) .bind(&plan) .bind(&status) .bind(priority) .bind(&progress_summary) .bind(&last_output) .bind(&error_message) .bind(&merge_mode) .bind(&pr_url) .bind(daemon_id) .bind(&target_repo_path) .bind(&completion_action) .bind(&repository_url) .bind(hidden) .fetch_optional(pool) .await? }; // If versioned update returned None, there was a race condition if result.is_none() && req.version.is_some() { if let Some(current) = get_task_for_owner(pool, id, owner_id).await? { return Err(RepositoryError::VersionConflict { expected: req.version.unwrap(), actual: current.version, }); } } Ok(result) } /// Delete a task by ID, scoped to owner. pub async fn delete_task_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result { let result = sqlx::query( r#" DELETE FROM tasks WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Update task status and record event. pub async fn update_task_status( pool: &PgPool, id: Uuid, new_status: &str, event_data: Option, ) -> Result, sqlx::Error> { // Get existing status let existing = get_task(pool, id).await?; let Some(existing) = existing else { return Ok(None); }; let previous_status = existing.status.clone(); // Update task status let task = sqlx::query_as::<_, Task>( r#" UPDATE tasks SET status = $2, updated_at = NOW(), started_at = CASE WHEN $2 = 'running' AND started_at IS NULL THEN NOW() ELSE started_at END, completed_at = CASE WHEN $2 IN ('done', 'failed', 'merged') THEN NOW() ELSE completed_at END WHERE id = $1 RETURNING * "#, ) .bind(id) .bind(new_status) .fetch_optional(pool) .await?; // Record event if task.is_some() { let _ = create_task_event( pool, id, "status_change", Some(&previous_status), Some(new_status), event_data, ) .await; } Ok(task) } // ============================================================================= // Task Event Functions // ============================================================================= /// Create a task event. pub async fn create_task_event( pool: &PgPool, task_id: Uuid, event_type: &str, previous_status: Option<&str>, new_status: Option<&str>, event_data: Option, ) -> Result { sqlx::query_as::<_, TaskEvent>( r#" INSERT INTO task_events (task_id, event_type, previous_status, new_status, event_data) VALUES ($1, $2, $3, $4, $5) RETURNING * "#, ) .bind(task_id) .bind(event_type) .bind(previous_status) .bind(new_status) .bind(event_data) .fetch_one(pool) .await } /// List events for a task. pub async fn list_task_events( pool: &PgPool, task_id: Uuid, limit: Option, ) -> Result, sqlx::Error> { let limit = limit.unwrap_or(100); sqlx::query_as::<_, TaskEvent>( r#" SELECT * FROM task_events WHERE task_id = $1 ORDER BY created_at DESC LIMIT $2 "#, ) .bind(task_id) .bind(limit) .fetch_all(pool) .await } // ============================================================================= // Daemon Functions // ============================================================================= /// Register a new daemon connection. pub async fn register_daemon( pool: &PgPool, owner_id: Uuid, connection_id: &str, hostname: Option<&str>, machine_id: Option<&str>, max_concurrent_tasks: i32, ) -> Result { sqlx::query_as::<_, Daemon>( r#" INSERT INTO daemons (owner_id, connection_id, hostname, machine_id, max_concurrent_tasks) VALUES ($1, $2, $3, $4, $5) RETURNING * "#, ) .bind(owner_id) .bind(connection_id) .bind(hostname) .bind(machine_id) .bind(max_concurrent_tasks) .fetch_one(pool) .await } /// Get a daemon by ID. pub async fn get_daemon(pool: &PgPool, id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, Daemon>( r#" SELECT * FROM daemons WHERE id = $1 "#, ) .bind(id) .fetch_optional(pool) .await } /// Get a daemon by connection ID. pub async fn get_daemon_by_connection( pool: &PgPool, connection_id: &str, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Daemon>( r#" SELECT * FROM daemons WHERE connection_id = $1 "#, ) .bind(connection_id) .fetch_optional(pool) .await } /// List all daemons. pub async fn list_daemons(pool: &PgPool) -> Result, sqlx::Error> { sqlx::query_as::<_, Daemon>( r#" SELECT * FROM daemons ORDER BY connected_at DESC "#, ) .fetch_all(pool) .await } /// List daemons for a specific owner. pub async fn list_daemons_for_owner(pool: &PgPool, owner_id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, Daemon>( r#" SELECT * FROM daemons WHERE owner_id = $1 ORDER BY connected_at DESC "#, ) .bind(owner_id) .fetch_all(pool) .await } /// Get a daemon by ID for a specific owner. pub async fn get_daemon_for_owner(pool: &PgPool, id: Uuid, owner_id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, Daemon>( r#" SELECT * FROM daemons WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await } /// Update daemon heartbeat. pub async fn update_daemon_heartbeat(pool: &PgPool, id: Uuid) -> Result { let result = sqlx::query( r#" UPDATE daemons SET last_heartbeat_at = NOW(), status = 'connected' WHERE id = $1 "#, ) .bind(id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Update daemon status. pub async fn update_daemon_status( pool: &PgPool, id: Uuid, status: &str, ) -> Result { let result = sqlx::query( r#" UPDATE daemons SET status = $2, disconnected_at = CASE WHEN $2 = 'disconnected' THEN NOW() ELSE disconnected_at END WHERE id = $1 "#, ) .bind(id) .bind(status) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Mark daemon as disconnected by connection_id. pub async fn disconnect_daemon_by_connection( pool: &PgPool, connection_id: &str, ) -> Result { let result = sqlx::query( r#" UPDATE daemons SET status = 'disconnected', disconnected_at = NOW() WHERE connection_id = $1 "#, ) .bind(connection_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Update daemon task count. pub async fn update_daemon_task_count( pool: &PgPool, id: Uuid, delta: i32, ) -> Result { let result = sqlx::query( r#" UPDATE daemons SET current_task_count = GREATEST(0, current_task_count + $2) WHERE id = $1 "#, ) .bind(id) .bind(delta) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Delete a daemon by ID. pub async fn delete_daemon(pool: &PgPool, id: Uuid) -> Result { let result = sqlx::query( r#" DELETE FROM daemons WHERE id = $1 "#, ) .bind(id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Delete a daemon by connection ID. pub async fn delete_daemon_by_connection( pool: &PgPool, connection_id: &str, ) -> Result { let result = sqlx::query( r#" DELETE FROM daemons WHERE connection_id = $1 "#, ) .bind(connection_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Count connected daemons. pub async fn count_daemons(pool: &PgPool) -> Result { let result: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM daemons WHERE status = 'connected'", ) .fetch_one(pool) .await?; Ok(result.0) } /// Delete stale daemons that haven't sent a heartbeat within the timeout. /// Returns the number of deleted daemons. pub async fn delete_stale_daemons( pool: &PgPool, timeout_seconds: i64, ) -> Result { let result = sqlx::query( r#" DELETE FROM daemons WHERE last_heartbeat_at < NOW() - INTERVAL '1 second' * $1 "#, ) .bind(timeout_seconds) .execute(pool) .await?; Ok(result.rows_affected()) } // ============================================================================= // Sibling Awareness Functions // ============================================================================= /// List sibling tasks (tasks with the same parent, excluding the given task). pub async fn list_sibling_tasks( pool: &PgPool, task_id: Uuid, parent_id: Option, ) -> Result, sqlx::Error> { match parent_id { Some(parent) => { sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, c.status as contract_status, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id = $1 AND t.id != $2 ORDER BY t.priority DESC, t.created_at DESC "#, ) .bind(parent) .bind(task_id) .fetch_all(pool) .await } None => { // Top-level tasks (no parent) - siblings are other top-level tasks sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, c.status as contract_status, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id IS NULL AND t.id != $1 ORDER BY t.priority DESC, t.created_at DESC "#, ) .bind(task_id) .fetch_all(pool) .await } } } /// Get running sibling tasks (for context injection). pub async fn get_running_siblings( pool: &PgPool, owner_id: Uuid, task_id: Uuid, parent_id: Option, ) -> Result, sqlx::Error> { match parent_id { Some(parent) => { sqlx::query_as::<_, Task>( r#" SELECT * FROM tasks t WHERE t.owner_id = $1 AND t.parent_task_id = $2 AND t.id != $3 AND t.status = 'running' ORDER BY t.priority DESC "#, ) .bind(owner_id) .bind(parent) .bind(task_id) .fetch_all(pool) .await } None => { sqlx::query_as::<_, Task>( r#" SELECT * FROM tasks t WHERE t.owner_id = $1 AND t.parent_task_id IS NULL AND t.id != $2 AND t.status = 'running' ORDER BY t.priority DESC "#, ) .bind(owner_id) .bind(task_id) .fetch_all(pool) .await } } } /// Get task with its siblings for context awareness. pub async fn get_task_with_siblings( pool: &PgPool, id: Uuid, ) -> Result)>, sqlx::Error> { let task = get_task(pool, id).await?; let Some(task) = task else { return Ok(None); }; let siblings = list_sibling_tasks(pool, id, task.parent_task_id).await?; Ok(Some((task, siblings))) } // ============================================================================= // Task Output Persistence Functions // ============================================================================= /// Save task output to the database. /// This stores output in the task_events table with event_type='output'. pub async fn save_task_output( pool: &PgPool, task_id: Uuid, message_type: &str, content: &str, tool_name: Option<&str>, tool_input: Option, is_error: Option, cost_usd: Option, duration_ms: Option, ) -> Result { let event_data = serde_json::json!({ "messageType": message_type, "content": content, "toolName": tool_name, "toolInput": tool_input, "isError": is_error, "costUsd": cost_usd, "durationMs": duration_ms, }); create_task_event(pool, task_id, "output", None, None, Some(event_data)).await } /// Get task output from the database. /// Retrieves all output events for a task, ordered by creation time. pub async fn get_task_output( pool: &PgPool, task_id: Uuid, limit: Option, ) -> Result, sqlx::Error> { let limit = limit.unwrap_or(1000); sqlx::query_as::<_, TaskEvent>( r#" SELECT * FROM task_events WHERE task_id = $1 AND event_type = 'output' ORDER BY created_at ASC LIMIT $2 "#, ) .bind(task_id) .bind(limit) .fetch_all(pool) .await } /// Update task completion status with error message. /// Sets the task status to 'done' or 'failed' and records completion time. pub async fn complete_task( pool: &PgPool, task_id: Uuid, success: bool, error_message: Option<&str>, ) -> Result, sqlx::Error> { let status = if success { "done" } else { "failed" }; let task = sqlx::query_as::<_, Task>( r#" UPDATE tasks SET status = $2, error_message = COALESCE($3, error_message), completed_at = NOW(), updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(task_id) .bind(status) .bind(error_message) .fetch_optional(pool) .await?; // Record completion event if task.is_some() { let event_data = serde_json::json!({ "success": success, "errorMessage": error_message, }); let _ = create_task_event( pool, task_id, "complete", Some("running"), Some(status), Some(event_data), ) .await; } Ok(task) } // ============================================================================= // Mesh Chat History Functions // ============================================================================= /// Get or create the active conversation for an owner. pub async fn get_or_create_active_conversation( pool: &PgPool, owner_id: Uuid, ) -> Result { // Try to get existing active conversation for this owner let existing = sqlx::query_as::<_, MeshChatConversation>( r#" SELECT * FROM mesh_chat_conversations WHERE is_active = true AND owner_id = $1 LIMIT 1 "#, ) .bind(owner_id) .fetch_optional(pool) .await?; if let Some(conv) = existing { return Ok(conv); } // Create new conversation sqlx::query_as::<_, MeshChatConversation>( r#" INSERT INTO mesh_chat_conversations (owner_id, is_active) VALUES ($1, true) RETURNING * "#, ) .bind(owner_id) .fetch_one(pool) .await } /// List messages for a conversation. pub async fn list_chat_messages( pool: &PgPool, conversation_id: Uuid, limit: Option, ) -> Result, sqlx::Error> { let limit = limit.unwrap_or(100); sqlx::query_as::<_, MeshChatMessageRecord>( r#" SELECT * FROM mesh_chat_messages WHERE conversation_id = $1 ORDER BY created_at ASC LIMIT $2 "#, ) .bind(conversation_id) .bind(limit) .fetch_all(pool) .await } /// Add a message to a conversation. #[allow(clippy::too_many_arguments)] pub async fn add_chat_message( pool: &PgPool, conversation_id: Uuid, role: &str, content: &str, context_type: &str, context_task_id: Option, tool_calls: Option, pending_questions: Option, ) -> Result { sqlx::query_as::<_, MeshChatMessageRecord>( r#" INSERT INTO mesh_chat_messages (conversation_id, role, content, context_type, context_task_id, tool_calls, pending_questions) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING * "#, ) .bind(conversation_id) .bind(role) .bind(content) .bind(context_type) .bind(context_task_id) .bind(tool_calls) .bind(pending_questions) .fetch_one(pool) .await } /// Clear conversation (archive existing and create new). pub async fn clear_conversation(pool: &PgPool, owner_id: Uuid) -> Result { // Mark existing as inactive for this owner sqlx::query( r#" UPDATE mesh_chat_conversations SET is_active = false, updated_at = NOW() WHERE is_active = true AND owner_id = $1 "#, ) .bind(owner_id) .execute(pool) .await?; // Create new active conversation get_or_create_active_conversation(pool, owner_id).await } // ============================================================================= // Contract Chat History Functions // ============================================================================= /// Get or create the active conversation for a contract. pub async fn get_or_create_contract_conversation( pool: &PgPool, contract_id: Uuid, owner_id: Uuid, ) -> Result { // Try to get existing active conversation for this contract let existing = sqlx::query_as::<_, ContractChatConversation>( r#" SELECT * FROM contract_chat_conversations WHERE is_active = true AND contract_id = $1 AND owner_id = $2 LIMIT 1 "#, ) .bind(contract_id) .bind(owner_id) .fetch_optional(pool) .await?; if let Some(conv) = existing { return Ok(conv); } // Create new conversation sqlx::query_as::<_, ContractChatConversation>( r#" INSERT INTO contract_chat_conversations (contract_id, owner_id, is_active) VALUES ($1, $2, true) RETURNING * "#, ) .bind(contract_id) .bind(owner_id) .fetch_one(pool) .await } /// List messages for a contract conversation. pub async fn list_contract_chat_messages( pool: &PgPool, conversation_id: Uuid, limit: Option, ) -> Result, sqlx::Error> { let limit = limit.unwrap_or(100); sqlx::query_as::<_, ContractChatMessageRecord>( r#" SELECT * FROM contract_chat_messages WHERE conversation_id = $1 ORDER BY created_at ASC LIMIT $2 "#, ) .bind(conversation_id) .bind(limit) .fetch_all(pool) .await } /// Add a message to a contract conversation. pub async fn add_contract_chat_message( pool: &PgPool, conversation_id: Uuid, role: &str, content: &str, tool_calls: Option, pending_questions: Option, ) -> Result { sqlx::query_as::<_, ContractChatMessageRecord>( r#" INSERT INTO contract_chat_messages (conversation_id, role, content, tool_calls, pending_questions) VALUES ($1, $2, $3, $4, $5) RETURNING * "#, ) .bind(conversation_id) .bind(role) .bind(content) .bind(tool_calls) .bind(pending_questions) .fetch_one(pool) .await } /// Clear contract conversation (archive existing and create new). pub async fn clear_contract_conversation( pool: &PgPool, contract_id: Uuid, owner_id: Uuid, ) -> Result { // Mark existing as inactive for this contract sqlx::query( r#" UPDATE contract_chat_conversations SET is_active = false, updated_at = NOW() WHERE is_active = true AND contract_id = $1 AND owner_id = $2 "#, ) .bind(contract_id) .bind(owner_id) .execute(pool) .await?; // Create new active conversation get_or_create_contract_conversation(pool, contract_id, owner_id).await } // ============================================================================= // Contract Type Template Functions (Owner-Scoped) // ============================================================================= /// Create a new contract type template for a specific owner. pub async fn create_template_for_owner( pool: &PgPool, owner_id: Uuid, req: CreateTemplateRequest, ) -> Result { sqlx::query_as::<_, ContractTypeTemplateRecord>( r#" INSERT INTO contract_type_templates (owner_id, name, description, phases, default_phase, deliverables) VALUES ($1, $2, $3, $4, $5, $6) RETURNING * "#, ) .bind(owner_id) .bind(&req.name) .bind(&req.description) .bind(serde_json::to_value(&req.phases).unwrap_or_default()) .bind(&req.default_phase) .bind(match &req.deliverables { Some(d) => serde_json::to_value(d).ok(), None => None, }) .fetch_one(pool) .await } /// Get a contract type template by ID, scoped to owner. pub async fn get_template_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, ContractTypeTemplateRecord>( r#" SELECT * FROM contract_type_templates WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await } /// Get a contract type template by ID (internal use, no owner scoping). pub async fn get_template_by_id( pool: &PgPool, id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, ContractTypeTemplateRecord>( r#" SELECT * FROM contract_type_templates WHERE id = $1 "#, ) .bind(id) .fetch_optional(pool) .await } /// List all contract type templates for an owner, ordered by name. pub async fn list_templates_for_owner( pool: &PgPool, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, ContractTypeTemplateRecord>( r#" SELECT * FROM contract_type_templates WHERE owner_id = $1 ORDER BY name ASC "#, ) .bind(owner_id) .fetch_all(pool) .await } /// Update a contract type template for an owner. pub async fn update_template_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, req: UpdateTemplateRequest, ) -> Result, RepositoryError> { // Build dynamic update query let mut query = String::from("UPDATE contract_type_templates SET updated_at = NOW()"); let mut param_idx = 3; // $1 = id, $2 = owner_id if req.name.is_some() { query.push_str(&format!(", name = ${}", param_idx)); param_idx += 1; } if req.description.is_some() { query.push_str(&format!(", description = ${}", param_idx)); param_idx += 1; } if req.phases.is_some() { query.push_str(&format!(", phases = ${}", param_idx)); param_idx += 1; } if req.default_phase.is_some() { query.push_str(&format!(", default_phase = ${}", param_idx)); param_idx += 1; } if req.deliverables.is_some() { query.push_str(&format!(", deliverables = ${}", param_idx)); param_idx += 1; } // Optimistic locking if req.version.is_some() { query.push_str(&format!(", version = version + 1 WHERE id = $1 AND owner_id = $2 AND version = ${}", param_idx)); } else { query.push_str(", version = version + 1 WHERE id = $1 AND owner_id = $2"); } query.push_str(" RETURNING *"); let mut sql_query = sqlx::query_as::<_, ContractTypeTemplateRecord>(&query); sql_query = sql_query.bind(id).bind(owner_id); if let Some(ref name) = req.name { sql_query = sql_query.bind(name); } if let Some(ref description) = req.description { sql_query = sql_query.bind(description); } if let Some(ref phases) = req.phases { sql_query = sql_query.bind(serde_json::to_value(phases).unwrap_or_default()); } if let Some(ref default_phase) = req.default_phase { sql_query = sql_query.bind(default_phase); } if let Some(ref deliverables) = req.deliverables { sql_query = sql_query.bind(serde_json::to_value(deliverables).unwrap_or_default()); } if let Some(version) = req.version { sql_query = sql_query.bind(version); } match sql_query.fetch_optional(pool).await { Ok(result) => { if result.is_none() && req.version.is_some() { // Check if it's a version conflict if let Some(current) = get_template_for_owner(pool, id, owner_id).await? { return Err(RepositoryError::VersionConflict { expected: req.version.unwrap(), actual: current.version, }); } } Ok(result) } Err(e) => Err(RepositoryError::Database(e)), } } /// Delete a contract type template for an owner. pub async fn delete_template_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result { let result = sqlx::query( r#" DELETE FROM contract_type_templates WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Helper function to build PhaseConfig from a template. pub fn build_phase_config_from_template(template: &ContractTypeTemplateRecord) -> PhaseConfig { PhaseConfig { phases: template.phases.clone(), default_phase: template.default_phase.clone(), deliverables: template.deliverables.clone().unwrap_or_default(), } } /// Helper function to build PhaseConfig for built-in contract types. pub fn build_phase_config_for_builtin(contract_type: &str) -> PhaseConfig { match contract_type { "simple" => PhaseConfig { phases: vec![ PhaseDefinition { id: "plan".to_string(), name: "Plan".to_string(), order: 0 }, PhaseDefinition { id: "execute".to_string(), name: "Execute".to_string(), order: 1 }, ], default_phase: "plan".to_string(), deliverables: [ ("plan".to_string(), vec![DeliverableDefinition { id: "plan-document".to_string(), name: "Plan".to_string(), priority: "required".to_string(), }]), ("execute".to_string(), vec![DeliverableDefinition { id: "pull-request".to_string(), name: "Pull Request".to_string(), priority: "required".to_string(), }]), ].into_iter().collect(), }, "specification" => PhaseConfig { phases: vec![ PhaseDefinition { id: "research".to_string(), name: "Research".to_string(), order: 0 }, PhaseDefinition { id: "specify".to_string(), name: "Specify".to_string(), order: 1 }, PhaseDefinition { id: "plan".to_string(), name: "Plan".to_string(), order: 2 }, PhaseDefinition { id: "execute".to_string(), name: "Execute".to_string(), order: 3 }, PhaseDefinition { id: "review".to_string(), name: "Review".to_string(), order: 4 }, ], default_phase: "research".to_string(), deliverables: [ ("research".to_string(), vec![DeliverableDefinition { id: "research-notes".to_string(), name: "Research Notes".to_string(), priority: "required".to_string(), }]), ("specify".to_string(), vec![DeliverableDefinition { id: "requirements-document".to_string(), name: "Requirements Document".to_string(), priority: "required".to_string(), }]), ("plan".to_string(), vec![DeliverableDefinition { id: "plan-document".to_string(), name: "Plan".to_string(), priority: "required".to_string(), }]), ("execute".to_string(), vec![DeliverableDefinition { id: "pull-request".to_string(), name: "Pull Request".to_string(), priority: "required".to_string(), }]), ("review".to_string(), vec![DeliverableDefinition { id: "release-notes".to_string(), name: "Release Notes".to_string(), priority: "required".to_string(), }]), ].into_iter().collect(), }, "execute" | _ => PhaseConfig { phases: vec![ PhaseDefinition { id: "execute".to_string(), name: "Execute".to_string(), order: 0 }, ], default_phase: "execute".to_string(), deliverables: std::collections::HashMap::new(), }, } } // ============================================================================= // Contract Functions (Owner-Scoped) // ============================================================================= /// Create a new contract for a specific owner. /// Supports both built-in contract types (simple, specification, execute) and custom templates. pub async fn create_contract_for_owner( pool: &PgPool, owner_id: Uuid, req: CreateContractRequest, ) -> Result { // Determine phase configuration based on template_id or contract_type let (phase_config, contract_type_str, default_phase): (PhaseConfig, String, String) = if let Some(template_id) = req.template_id { // Look up the custom template let template = get_template_by_id(pool, template_id) .await? .ok_or_else(|| { sqlx::Error::Protocol(format!("Template not found: {}", template_id)) })?; let config = build_phase_config_from_template(&template); let default = config.default_phase.clone(); // For custom templates, store the template name as the contract_type (config, template.name.clone(), default) } else { // Use built-in contract type let contract_type = req.contract_type.as_deref().unwrap_or("simple"); // Validate contract type let valid_types = ["simple", "specification", "execute"]; if !valid_types.contains(&contract_type) { return Err(sqlx::Error::Protocol(format!( "Invalid contract_type '{}'. Must be one of: {} or provide a template_id", contract_type, valid_types.join(", ") ))); } let config = build_phase_config_for_builtin(contract_type); let default = config.default_phase.clone(); (config, contract_type.to_string(), default) }; // Get valid phase IDs from the configuration let valid_phase_ids: Vec = phase_config.phases.iter().map(|p| p.id.clone()).collect(); // Use provided initial_phase or default based on contract type/template let phase = req.initial_phase.as_deref().unwrap_or(&default_phase); // Validate the phase is valid for this contract type/template if !valid_phase_ids.contains(&phase.to_string()) { return Err(sqlx::Error::Protocol(format!( "Invalid initial_phase '{}' for contract type '{}'. Must be one of: {}", phase, contract_type_str, valid_phase_ids.join(", ") ))); } let autonomous_loop = req.autonomous_loop.unwrap_or(false); let phase_guard = req.phase_guard.unwrap_or(false); let local_only = req.local_only.unwrap_or(false); let auto_merge_local = req.auto_merge_local.unwrap_or(false); // Serialize phase_config to JSON let phase_config_json = serde_json::to_value(&phase_config).ok(); sqlx::query_as::<_, Contract>( r#" INSERT INTO contracts (owner_id, name, description, contract_type, phase, autonomous_loop, phase_guard, local_only, auto_merge_local, phase_config) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING * "#, ) .bind(owner_id) .bind(&req.name) .bind(&req.description) .bind(&contract_type_str) .bind(phase) .bind(autonomous_loop) .bind(phase_guard) .bind(local_only) .bind(auto_merge_local) .bind(phase_config_json) .fetch_one(pool) .await } /// Get a contract by ID, scoped to owner. pub async fn get_contract_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Contract>( r#" SELECT * FROM contracts WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await } /// List all contracts for an owner, ordered by created_at DESC. pub async fn list_contracts_for_owner( pool: &PgPool, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, ContractSummary>( r#" SELECT c.id, c.name, c.description, c.contract_type, c.phase, c.status, c.supervisor_task_id, c.local_only, c.auto_merge_local, c.version, c.created_at, (SELECT COUNT(*) FROM files WHERE contract_id = c.id) as file_count, (SELECT COUNT(*) FROM tasks WHERE contract_id = c.id) as task_count, (SELECT COUNT(*) FROM contract_repositories WHERE contract_id = c.id) as repository_count FROM contracts c WHERE c.owner_id = $1 ORDER BY c.created_at DESC "#, ) .bind(owner_id) .fetch_all(pool) .await } /// Get contract summary by ID. pub async fn get_contract_summary_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, ContractSummary>( r#" SELECT c.id, c.name, c.description, c.contract_type, c.phase, c.status, c.supervisor_task_id, c.local_only, c.auto_merge_local, c.version, c.created_at, (SELECT COUNT(*) FROM files WHERE contract_id = c.id) as file_count, (SELECT COUNT(*) FROM tasks WHERE contract_id = c.id) as task_count, (SELECT COUNT(*) FROM contract_repositories WHERE contract_id = c.id) as repository_count FROM contracts c WHERE c.id = $1 AND c.owner_id = $2 "#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await } /// Update a contract by ID with optimistic locking, scoped to owner. pub async fn update_contract_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, req: UpdateContractRequest, ) -> Result, RepositoryError> { let existing = get_contract_for_owner(pool, id, owner_id).await?; let Some(existing) = existing else { return Ok(None); }; // Check version if provided (optimistic locking) if let Some(expected_version) = req.version { if existing.version != expected_version { return Err(RepositoryError::VersionConflict { expected: expected_version, actual: existing.version, }); } } // Apply updates let name = req.name.unwrap_or(existing.name); let description = req.description.or(existing.description); let phase = req.phase.unwrap_or(existing.phase); let status = req.status.unwrap_or(existing.status); let supervisor_task_id = req.supervisor_task_id.or(existing.supervisor_task_id); let autonomous_loop = req.autonomous_loop.unwrap_or(existing.autonomous_loop); let phase_guard = req.phase_guard.unwrap_or(existing.phase_guard); let local_only = req.local_only.unwrap_or(existing.local_only); let auto_merge_local = req.auto_merge_local.unwrap_or(existing.auto_merge_local); let result = if req.version.is_some() { sqlx::query_as::<_, Contract>( r#" UPDATE contracts SET name = $3, description = $4, phase = $5, status = $6, supervisor_task_id = $7, autonomous_loop = $8, phase_guard = $9, local_only = $10, auto_merge_local = $11, version = version + 1, updated_at = NOW() WHERE id = $1 AND owner_id = $2 AND version = $12 RETURNING * "#, ) .bind(id) .bind(owner_id) .bind(&name) .bind(&description) .bind(&phase) .bind(&status) .bind(supervisor_task_id) .bind(autonomous_loop) .bind(phase_guard) .bind(local_only) .bind(auto_merge_local) .bind(req.version.unwrap()) .fetch_optional(pool) .await? } else { sqlx::query_as::<_, Contract>( r#" UPDATE contracts SET name = $3, description = $4, phase = $5, status = $6, supervisor_task_id = $7, autonomous_loop = $8, phase_guard = $9, local_only = $10, auto_merge_local = $11, version = version + 1, updated_at = NOW() WHERE id = $1 AND owner_id = $2 RETURNING * "#, ) .bind(id) .bind(owner_id) .bind(&name) .bind(&description) .bind(&phase) .bind(&status) .bind(supervisor_task_id) .bind(autonomous_loop) .bind(phase_guard) .bind(local_only) .bind(auto_merge_local) .fetch_optional(pool) .await? }; // If versioned update returned None, there was a race condition if result.is_none() && req.version.is_some() { if let Some(current) = get_contract_for_owner(pool, id, owner_id).await? { return Err(RepositoryError::VersionConflict { expected: req.version.unwrap(), actual: current.version, }); } } Ok(result) } /// Delete a contract by ID, scoped to owner. pub async fn delete_contract_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result { let result = sqlx::query( r#" DELETE FROM contracts WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Change contract phase and record event. /// /// This is the simple version without version checking. Use `change_contract_phase_with_version` /// for explicit version conflict detection. pub async fn change_contract_phase_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, new_phase: &str, ) -> Result, sqlx::Error> { // Get current phase let existing = get_contract_for_owner(pool, id, owner_id).await?; let Some(existing) = existing else { return Ok(None); }; let previous_phase = existing.phase.clone(); // Update phase let contract = sqlx::query_as::<_, Contract>( r#" UPDATE contracts SET phase = $3, version = version + 1, updated_at = NOW() WHERE id = $1 AND owner_id = $2 RETURNING * "#, ) .bind(id) .bind(owner_id) .bind(new_phase) .fetch_optional(pool) .await?; // Record event if contract.is_some() { sqlx::query( r#" INSERT INTO contract_events (contract_id, event_type, previous_phase, new_phase) VALUES ($1, 'phase_change', $2, $3) "#, ) .bind(id) .bind(&previous_phase) .bind(new_phase) .execute(pool) .await?; } Ok(contract) } /// Change contract phase with explicit version checking for conflict detection. /// /// Uses `SELECT ... FOR UPDATE` to lock the row and prevent race conditions. /// Returns `PhaseChangeResult::VersionConflict` if the expected version doesn't match. pub async fn change_contract_phase_with_version( pool: &PgPool, id: Uuid, owner_id: Uuid, new_phase: &str, expected_version: Option, ) -> Result { // Start a transaction to ensure atomicity with row locking let mut tx = pool.begin().await?; // Lock the row with SELECT FOR UPDATE and get current state let existing: Option = sqlx::query_as::<_, Contract>( r#" SELECT * FROM contracts WHERE id = $1 AND owner_id = $2 FOR UPDATE "#, ) .bind(id) .bind(owner_id) .fetch_optional(&mut *tx) .await?; let Some(existing) = existing else { tx.rollback().await?; return Ok(PhaseChangeResult::NotFound); }; // Check version if provided (optimistic locking) if let Some(expected) = expected_version { if existing.version != expected { tx.rollback().await?; return Ok(PhaseChangeResult::VersionConflict { expected, actual: existing.version, current_phase: existing.phase, }); } } // Validate the phase transition is allowed let valid_phases = existing.valid_phase_ids(); if !valid_phases.contains(&new_phase.to_string()) { tx.rollback().await?; return Ok(PhaseChangeResult::ValidationFailed { reason: format!( "Invalid phase '{}' for contract type '{}'", new_phase, existing.contract_type ), missing_requirements: vec![format!( "Phase must be one of: {}", valid_phases.join(", ") )], }); } let previous_phase = existing.phase.clone(); // Update phase with version increment let contract = sqlx::query_as::<_, Contract>( r#" UPDATE contracts SET phase = $3, version = version + 1, updated_at = NOW() WHERE id = $1 AND owner_id = $2 RETURNING * "#, ) .bind(id) .bind(owner_id) .bind(new_phase) .fetch_one(&mut *tx) .await?; // Record event sqlx::query( r#" INSERT INTO contract_events (contract_id, event_type, previous_phase, new_phase) VALUES ($1, 'phase_change', $2, $3) "#, ) .bind(id) .bind(&previous_phase) .bind(new_phase) .execute(&mut *tx) .await?; // Commit the transaction tx.commit().await?; Ok(PhaseChangeResult::Success(contract)) } // ============================================================================= // Contract Repository Functions // ============================================================================= /// List repositories for a contract. pub async fn list_contract_repositories( pool: &PgPool, contract_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, ContractRepository>( r#" SELECT * FROM contract_repositories WHERE contract_id = $1 ORDER BY is_primary DESC, created_at ASC "#, ) .bind(contract_id) .fetch_all(pool) .await } /// Add a remote repository to a contract. pub async fn add_remote_repository( pool: &PgPool, contract_id: Uuid, name: &str, repository_url: &str, is_primary: bool, ) -> Result { // If is_primary, clear other primaries first if is_primary { sqlx::query( r#" UPDATE contract_repositories SET is_primary = false, updated_at = NOW() WHERE contract_id = $1 AND is_primary = true "#, ) .bind(contract_id) .execute(pool) .await?; } sqlx::query_as::<_, ContractRepository>( r#" INSERT INTO contract_repositories (contract_id, name, repository_url, source_type, status, is_primary) VALUES ($1, $2, $3, 'remote', 'ready', $4) RETURNING * "#, ) .bind(contract_id) .bind(name) .bind(repository_url) .bind(is_primary) .fetch_one(pool) .await } /// Add a local repository to a contract. pub async fn add_local_repository( pool: &PgPool, contract_id: Uuid, name: &str, local_path: &str, is_primary: bool, ) -> Result { // If is_primary, clear other primaries first if is_primary { sqlx::query( r#" UPDATE contract_repositories SET is_primary = false, updated_at = NOW() WHERE contract_id = $1 AND is_primary = true "#, ) .bind(contract_id) .execute(pool) .await?; } sqlx::query_as::<_, ContractRepository>( r#" INSERT INTO contract_repositories (contract_id, name, local_path, source_type, status, is_primary) VALUES ($1, $2, $3, 'local', 'ready', $4) RETURNING * "#, ) .bind(contract_id) .bind(name) .bind(local_path) .bind(is_primary) .fetch_one(pool) .await } /// Create a managed repository (daemon will create it). pub async fn create_managed_repository( pool: &PgPool, contract_id: Uuid, name: &str, is_primary: bool, ) -> Result { // If is_primary, clear other primaries first if is_primary { sqlx::query( r#" UPDATE contract_repositories SET is_primary = false, updated_at = NOW() WHERE contract_id = $1 AND is_primary = true "#, ) .bind(contract_id) .execute(pool) .await?; } sqlx::query_as::<_, ContractRepository>( r#" INSERT INTO contract_repositories (contract_id, name, source_type, status, is_primary) VALUES ($1, $2, 'managed', 'pending', $3) RETURNING * "#, ) .bind(contract_id) .bind(name) .bind(is_primary) .fetch_one(pool) .await } /// Delete a repository from a contract. pub async fn delete_contract_repository( pool: &PgPool, repo_id: Uuid, contract_id: Uuid, ) -> Result { let result = sqlx::query( r#" DELETE FROM contract_repositories WHERE id = $1 AND contract_id = $2 "#, ) .bind(repo_id) .bind(contract_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Set a repository as primary (and clear others). pub async fn set_repository_primary( pool: &PgPool, repo_id: Uuid, contract_id: Uuid, ) -> Result { // Clear other primaries sqlx::query( r#" UPDATE contract_repositories SET is_primary = false, updated_at = NOW() WHERE contract_id = $1 AND is_primary = true "#, ) .bind(contract_id) .execute(pool) .await?; // Set this one as primary let result = sqlx::query( r#" UPDATE contract_repositories SET is_primary = true, updated_at = NOW() WHERE id = $1 AND contract_id = $2 "#, ) .bind(repo_id) .bind(contract_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Update managed repository status (used by daemon). pub async fn update_managed_repository_status( pool: &PgPool, repo_id: Uuid, status: &str, repository_url: Option<&str>, ) -> Result, sqlx::Error> { sqlx::query_as::<_, ContractRepository>( r#" UPDATE contract_repositories SET status = $2, repository_url = COALESCE($3, repository_url), updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(repo_id) .bind(status) .bind(repository_url) .fetch_optional(pool) .await } // ============================================================================= // Contract Task Association Functions // ============================================================================= /// Add a task to a contract. pub async fn add_task_to_contract( pool: &PgPool, contract_id: Uuid, task_id: Uuid, owner_id: Uuid, ) -> Result { let result = sqlx::query( r#" UPDATE tasks SET contract_id = $2, updated_at = NOW() WHERE id = $1 AND owner_id = $3 "#, ) .bind(task_id) .bind(contract_id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Remove a task from a contract. pub async fn remove_task_from_contract( pool: &PgPool, contract_id: Uuid, task_id: Uuid, owner_id: Uuid, ) -> Result { let result = sqlx::query( r#" UPDATE tasks SET contract_id = NULL, updated_at = NOW() WHERE id = $1 AND contract_id = $2 AND owner_id = $3 "#, ) .bind(task_id) .bind(contract_id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// List files in a contract. pub async fn list_files_in_contract( pool: &PgPool, contract_id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { // Use a manual query since FileSummary doesn't have a FromRow derive with all the computed fields let files = sqlx::query_as::<_, File>( r#" SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files WHERE contract_id = $1 AND owner_id = $2 ORDER BY created_at DESC "#, ) .bind(contract_id) .bind(owner_id) .fetch_all(pool) .await?; Ok(files.into_iter().map(FileSummary::from).collect()) } /// List tasks in a contract. pub async fn list_tasks_in_contract( pool: &PgPool, contract_id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, c.status as contract_status, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.contract_id = $1 AND t.owner_id = $2 ORDER BY t.priority DESC, t.created_at DESC "#, ) .bind(contract_id) .bind(owner_id) .fetch_all(pool) .await } /// Minimal task info for worktree cleanup operations. #[derive(Debug, Clone, sqlx::FromRow)] pub struct TaskWorktreeInfo { pub id: Uuid, pub daemon_id: Option, pub overlay_path: Option, /// If set, this task shares the worktree of the specified supervisor task. /// Should NOT have its worktree deleted during cleanup. pub supervisor_worktree_task_id: Option, } /// List tasks in a contract with their daemon/worktree info. /// Used for cleaning up worktrees when a contract is completed or deleted. pub async fn list_contract_tasks_with_worktree_info( pool: &PgPool, contract_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskWorktreeInfo>( r#" SELECT id, daemon_id, overlay_path, supervisor_worktree_task_id FROM tasks WHERE contract_id = $1 AND (daemon_id IS NOT NULL OR overlay_path IS NOT NULL) "#, ) .bind(contract_id) .fetch_all(pool) .await } // ============================================================================= // Contract Events // ============================================================================= /// List events for a contract. pub async fn list_contract_events( pool: &PgPool, contract_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, ContractEvent>( r#" SELECT * FROM contract_events WHERE contract_id = $1 ORDER BY created_at DESC "#, ) .bind(contract_id) .fetch_all(pool) .await } /// Record a contract event. pub async fn record_contract_event( pool: &PgPool, contract_id: Uuid, event_type: &str, event_data: Option, ) -> Result { sqlx::query_as::<_, ContractEvent>( r#" INSERT INTO contract_events (contract_id, event_type, event_data) VALUES ($1, $2, $3) RETURNING * "#, ) .bind(contract_id) .bind(event_type) .bind(event_data) .fetch_one(pool) .await } // ============================================================================ // Task Checkpoints // ============================================================================ /// Create a checkpoint for a task. pub async fn create_task_checkpoint( pool: &PgPool, task_id: Uuid, commit_sha: &str, branch_name: &str, message: &str, files_changed: Option, lines_added: Option, lines_removed: Option, ) -> Result { // Get current checkpoint count and increment let checkpoint_number: i32 = sqlx::query_scalar( "SELECT COALESCE(MAX(checkpoint_number), 0) + 1 FROM task_checkpoints WHERE task_id = $1", ) .bind(task_id) .fetch_one(pool) .await?; // Update task's checkpoint tracking sqlx::query( r#" UPDATE tasks SET last_checkpoint_sha = $1, checkpoint_count = $2, checkpoint_message = $3, updated_at = NOW() WHERE id = $4 "#, ) .bind(commit_sha) .bind(checkpoint_number) .bind(message) .bind(task_id) .execute(pool) .await?; sqlx::query_as::<_, TaskCheckpoint>( r#" INSERT INTO task_checkpoints ( task_id, checkpoint_number, commit_sha, branch_name, message, files_changed, lines_added, lines_removed ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING * "#, ) .bind(task_id) .bind(checkpoint_number) .bind(commit_sha) .bind(branch_name) .bind(message) .bind(files_changed) .bind(lines_added) .bind(lines_removed) .fetch_one(pool) .await } /// Get a checkpoint by ID. pub async fn get_task_checkpoint( pool: &PgPool, id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskCheckpoint>("SELECT * FROM task_checkpoints WHERE id = $1") .bind(id) .fetch_optional(pool) .await } /// Get a checkpoint by commit SHA. pub async fn get_task_checkpoint_by_sha( pool: &PgPool, commit_sha: &str, ) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskCheckpoint>("SELECT * FROM task_checkpoints WHERE commit_sha = $1") .bind(commit_sha) .fetch_optional(pool) .await } /// List checkpoints for a task. pub async fn list_task_checkpoints( pool: &PgPool, task_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskCheckpoint>( "SELECT * FROM task_checkpoints WHERE task_id = $1 ORDER BY checkpoint_number DESC", ) .bind(task_id) .fetch_all(pool) .await } // ============================================================================ // Supervisor State // ============================================================================ /// Create or update supervisor state for a contract. pub async fn upsert_supervisor_state( pool: &PgPool, contract_id: Uuid, task_id: Uuid, conversation_history: serde_json::Value, pending_task_ids: &[Uuid], phase: &str, ) -> Result { sqlx::query_as::<_, SupervisorState>( r#" INSERT INTO supervisor_states (contract_id, task_id, conversation_history, pending_task_ids, phase, last_activity) VALUES ($1, $2, $3, $4, $5, NOW()) ON CONFLICT (contract_id) DO UPDATE SET task_id = EXCLUDED.task_id, conversation_history = EXCLUDED.conversation_history, pending_task_ids = EXCLUDED.pending_task_ids, phase = EXCLUDED.phase, last_activity = NOW(), updated_at = NOW() RETURNING * "#, ) .bind(contract_id) .bind(task_id) .bind(conversation_history) .bind(pending_task_ids) .bind(phase) .fetch_one(pool) .await } /// Get supervisor state for a contract. pub async fn get_supervisor_state( pool: &PgPool, contract_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, SupervisorState>("SELECT * FROM supervisor_states WHERE contract_id = $1") .bind(contract_id) .fetch_optional(pool) .await } /// Get supervisor state by task ID. pub async fn get_supervisor_state_by_task( pool: &PgPool, task_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, SupervisorState>("SELECT * FROM supervisor_states WHERE task_id = $1") .bind(task_id) .fetch_optional(pool) .await } /// Update supervisor conversation history. pub async fn update_supervisor_conversation( pool: &PgPool, contract_id: Uuid, conversation_history: serde_json::Value, ) -> Result { sqlx::query_as::<_, SupervisorState>( r#" UPDATE supervisor_states SET conversation_history = $1, last_activity = NOW(), updated_at = NOW() WHERE contract_id = $2 RETURNING * "#, ) .bind(conversation_history) .bind(contract_id) .fetch_one(pool) .await } /// Update supervisor pending tasks. pub async fn update_supervisor_pending_tasks( pool: &PgPool, contract_id: Uuid, pending_task_ids: &[Uuid], ) -> Result { sqlx::query_as::<_, SupervisorState>( r#" UPDATE supervisor_states SET pending_task_ids = $1, last_activity = NOW(), updated_at = NOW() WHERE contract_id = $2 RETURNING * "#, ) .bind(pending_task_ids) .bind(contract_id) .fetch_one(pool) .await } /// Update supervisor state with detailed activity tracking. /// Called at key save points: LLM response, task spawn, question asked, phase change. pub async fn update_supervisor_detailed_state( pool: &PgPool, contract_id: Uuid, state: &str, current_activity: Option<&str>, progress: i32, error_message: Option<&str>, ) -> Result { sqlx::query_as::<_, SupervisorState>( r#" UPDATE supervisor_states SET state = $1, current_activity = $2, progress = $3, error_message = $4, last_activity = NOW(), updated_at = NOW() WHERE contract_id = $5 RETURNING * "#, ) .bind(state) .bind(current_activity) .bind(progress) .bind(error_message) .bind(contract_id) .fetch_one(pool) .await } /// Add a spawned task ID to the supervisor's list. pub async fn add_supervisor_spawned_task( pool: &PgPool, contract_id: Uuid, task_id: Uuid, ) -> Result { sqlx::query_as::<_, SupervisorState>( r#" UPDATE supervisor_states SET spawned_task_ids = array_append(spawned_task_ids, $1), last_activity = NOW(), updated_at = NOW() WHERE contract_id = $2 RETURNING * "#, ) .bind(task_id) .bind(contract_id) .fetch_one(pool) .await } /// Add a pending question to the supervisor state. pub async fn add_supervisor_pending_question( pool: &PgPool, contract_id: Uuid, question: serde_json::Value, ) -> Result { sqlx::query_as::<_, SupervisorState>( r#" UPDATE supervisor_states SET pending_questions = pending_questions || $1::jsonb, state = 'waiting_for_user', last_activity = NOW(), updated_at = NOW() WHERE contract_id = $2 RETURNING * "#, ) .bind(question) .bind(contract_id) .fetch_one(pool) .await } /// Remove a pending question by ID. pub async fn remove_supervisor_pending_question( pool: &PgPool, contract_id: Uuid, question_id: Uuid, ) -> Result { sqlx::query_as::<_, SupervisorState>( r#" UPDATE supervisor_states SET pending_questions = ( SELECT COALESCE(jsonb_agg(elem), '[]'::jsonb) FROM jsonb_array_elements(pending_questions) elem WHERE (elem->>'id')::uuid != $1 ), last_activity = NOW(), updated_at = NOW() WHERE contract_id = $2 RETURNING * "#, ) .bind(question_id) .bind(contract_id) .fetch_one(pool) .await } /// Comprehensive state save - used at major save points. pub async fn save_supervisor_state_full( pool: &PgPool, contract_id: Uuid, task_id: Uuid, conversation_history: serde_json::Value, pending_task_ids: &[Uuid], phase: &str, state: &str, current_activity: Option<&str>, progress: i32, error_message: Option<&str>, spawned_task_ids: &[Uuid], pending_questions: serde_json::Value, ) -> Result { sqlx::query_as::<_, SupervisorState>( r#" INSERT INTO supervisor_states ( contract_id, task_id, conversation_history, pending_task_ids, phase, state, current_activity, progress, error_message, spawned_task_ids, pending_questions, last_activity ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW()) ON CONFLICT (contract_id) DO UPDATE SET task_id = EXCLUDED.task_id, conversation_history = EXCLUDED.conversation_history, pending_task_ids = EXCLUDED.pending_task_ids, phase = EXCLUDED.phase, state = EXCLUDED.state, current_activity = EXCLUDED.current_activity, progress = EXCLUDED.progress, error_message = EXCLUDED.error_message, spawned_task_ids = EXCLUDED.spawned_task_ids, pending_questions = EXCLUDED.pending_questions, last_activity = NOW(), updated_at = NOW() RETURNING * "#, ) .bind(contract_id) .bind(task_id) .bind(conversation_history) .bind(pending_task_ids) .bind(phase) .bind(state) .bind(current_activity) .bind(progress) .bind(error_message) .bind(spawned_task_ids) .bind(pending_questions) .fetch_one(pool) .await } /// Mark supervisor as restored from a crash/interruption. pub async fn mark_supervisor_restored( pool: &PgPool, contract_id: Uuid, restoration_source: &str, ) -> Result { sqlx::query_as::<_, SupervisorState>( r#" UPDATE supervisor_states SET restoration_count = restoration_count + 1, last_restored_at = NOW(), restoration_source = $1, state = 'initializing', error_message = NULL, last_activity = NOW(), updated_at = NOW() WHERE contract_id = $2 RETURNING * "#, ) .bind(restoration_source) .bind(contract_id) .fetch_one(pool) .await } /// Get supervisors with pending questions (for re-delivery after restoration). pub async fn get_supervisors_with_pending_questions( pool: &PgPool, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, SupervisorState>( r#" SELECT ss.* FROM supervisor_states ss JOIN contracts c ON c.id = ss.contract_id WHERE c.owner_id = $1 AND ss.pending_questions != '[]'::jsonb AND c.status = 'active' ORDER BY ss.last_activity DESC "#, ) .bind(owner_id) .fetch_all(pool) .await } /// Get supervisor state with full details for restoration. /// Includes validation info. pub async fn get_supervisor_state_for_restoration( pool: &PgPool, contract_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, SupervisorState>( r#" SELECT * FROM supervisor_states WHERE contract_id = $1 "#, ) .bind(contract_id) .fetch_optional(pool) .await } /// Validate spawned tasks are in expected states. /// Returns map of task_id -> (status, updated_at). pub async fn validate_spawned_tasks( pool: &PgPool, task_ids: &[Uuid], ) -> Result)>, sqlx::Error> { use sqlx::Row; let rows = sqlx::query( r#" SELECT id, status, updated_at FROM tasks WHERE id = ANY($1) "#, ) .bind(task_ids) .fetch_all(pool) .await?; let mut result = std::collections::HashMap::new(); for row in rows { let id: Uuid = row.get("id"); let status: String = row.get("status"); let updated_at: chrono::DateTime = row.get("updated_at"); result.insert(id, (status, updated_at)); } Ok(result) } /// Update supervisor state when phase changes. pub async fn update_supervisor_phase( pool: &PgPool, contract_id: Uuid, new_phase: &str, ) -> Result { sqlx::query_as::<_, SupervisorState>( r#" UPDATE supervisor_states SET phase = $1, state = 'working', current_activity = 'Phase changed to ' || $1, last_activity = NOW(), updated_at = NOW() WHERE contract_id = $2 RETURNING * "#, ) .bind(new_phase) .bind(contract_id) .fetch_one(pool) .await } /// Update supervisor state on heartbeat (lightweight update). pub async fn update_supervisor_heartbeat_state( pool: &PgPool, contract_id: Uuid, state: &str, current_activity: Option<&str>, progress: i32, pending_task_ids: &[Uuid], ) -> Result<(), sqlx::Error> { sqlx::query( r#" UPDATE supervisor_states SET state = $1, current_activity = $2, progress = $3, pending_task_ids = $4, last_activity = NOW(), updated_at = NOW() WHERE contract_id = $5 "#, ) .bind(state) .bind(current_activity) .bind(progress) .bind(pending_task_ids) .bind(contract_id) .execute(pool) .await?; Ok(()) } // ============================================================================ // Supervisor Heartbeats // ============================================================================ /// Record a supervisor heartbeat. /// This creates a historical record for monitoring and dead supervisor detection. pub async fn create_supervisor_heartbeat( pool: &PgPool, supervisor_task_id: Uuid, contract_id: Uuid, state: &str, phase: &str, current_activity: Option<&str>, progress: i32, pending_task_ids: &[Uuid], ) -> Result { sqlx::query_as::<_, SupervisorHeartbeatRecord>( r#" INSERT INTO supervisor_heartbeats ( supervisor_task_id, contract_id, state, phase, current_activity, progress, pending_task_ids, timestamp ) VALUES ($1, $2, $3, $4, $5, $6, $7, NOW()) RETURNING * "#, ) .bind(supervisor_task_id) .bind(contract_id) .bind(state) .bind(phase) .bind(current_activity) .bind(progress) .bind(pending_task_ids) .fetch_one(pool) .await } /// Get the latest heartbeat for a supervisor task. pub async fn get_latest_supervisor_heartbeat( pool: &PgPool, supervisor_task_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, SupervisorHeartbeatRecord>( r#" SELECT * FROM supervisor_heartbeats WHERE supervisor_task_id = $1 ORDER BY timestamp DESC LIMIT 1 "#, ) .bind(supervisor_task_id) .fetch_optional(pool) .await } /// Get recent heartbeats for a supervisor task. pub async fn get_supervisor_heartbeats( pool: &PgPool, supervisor_task_id: Uuid, limit: i64, ) -> Result, sqlx::Error> { sqlx::query_as::<_, SupervisorHeartbeatRecord>( r#" SELECT * FROM supervisor_heartbeats WHERE supervisor_task_id = $1 ORDER BY timestamp DESC LIMIT $2 "#, ) .bind(supervisor_task_id) .bind(limit) .fetch_all(pool) .await } /// Get recent heartbeats for a contract. pub async fn get_contract_supervisor_heartbeats( pool: &PgPool, contract_id: Uuid, limit: i64, ) -> Result, sqlx::Error> { sqlx::query_as::<_, SupervisorHeartbeatRecord>( r#" SELECT * FROM supervisor_heartbeats WHERE contract_id = $1 ORDER BY timestamp DESC LIMIT $2 "#, ) .bind(contract_id) .bind(limit) .fetch_all(pool) .await } /// Delete old heartbeats beyond the TTL (24 hours by default). /// Returns the number of deleted records. pub async fn cleanup_old_heartbeats( pool: &PgPool, ttl_hours: i64, ) -> Result { let result = sqlx::query( r#" DELETE FROM supervisor_heartbeats WHERE timestamp < NOW() - ($1 || ' hours')::INTERVAL "#, ) .bind(ttl_hours.to_string()) .execute(pool) .await?; Ok(result.rows_affected()) } /// Find supervisors that have not sent a heartbeat within the timeout period. /// Returns list of (supervisor_task_id, contract_id, last_heartbeat_timestamp). pub async fn find_stale_supervisors( pool: &PgPool, timeout_seconds: i64, ) -> Result)>, sqlx::Error> { let rows = sqlx::query( r#" WITH latest_heartbeats AS ( SELECT DISTINCT ON (supervisor_task_id) supervisor_task_id, contract_id, timestamp FROM supervisor_heartbeats ORDER BY supervisor_task_id, timestamp DESC ) SELECT lh.supervisor_task_id, lh.contract_id, lh.timestamp FROM latest_heartbeats lh JOIN tasks t ON t.id = lh.supervisor_task_id WHERE t.status = 'running' AND lh.timestamp < NOW() - ($1 || ' seconds')::INTERVAL "#, ) .bind(timeout_seconds.to_string()) .fetch_all(pool) .await?; let mut result = Vec::new(); for row in rows { use sqlx::Row; let supervisor_task_id: Uuid = row.get("supervisor_task_id"); let contract_id: Uuid = row.get("contract_id"); let timestamp: chrono::DateTime = row.get("timestamp"); result.push((supervisor_task_id, contract_id, timestamp)); } Ok(result) } // ============================================================================ // Contract Supervisor // ============================================================================ /// Update contract's supervisor task ID. pub async fn update_contract_supervisor( pool: &PgPool, contract_id: Uuid, supervisor_task_id: Uuid, ) -> Result { sqlx::query_as::<_, Contract>( r#" UPDATE contracts SET supervisor_task_id = $1, updated_at = NOW() WHERE id = $2 RETURNING * "#, ) .bind(supervisor_task_id) .bind(contract_id) .fetch_one(pool) .await } /// Mark a deliverable as complete for a specific phase. /// Uses JSONB operations to append the deliverable_id to the phase's array. pub async fn mark_deliverable_complete( pool: &PgPool, contract_id: Uuid, phase: &str, deliverable_id: &str, ) -> Result { // Use jsonb_set to add the deliverable to the phase's array // If the phase key doesn't exist, create an empty array first // COALESCE handles the case where the phase array doesn't exist yet sqlx::query_as::<_, Contract>( r#" UPDATE contracts SET completed_deliverables = jsonb_set( completed_deliverables, ARRAY[$2::text], COALESCE(completed_deliverables->$2, '[]'::jsonb) || to_jsonb($3::text), true ), updated_at = NOW() WHERE id = $1 AND NOT (COALESCE(completed_deliverables->$2, '[]'::jsonb) ? $3) RETURNING * "#, ) .bind(contract_id) .bind(phase) .bind(deliverable_id) .fetch_one(pool) .await } /// Clear all completed deliverables for a specific phase. /// Used when phase changes or deliverables need to be reset. pub async fn clear_phase_deliverables( pool: &PgPool, contract_id: Uuid, phase: &str, ) -> Result { sqlx::query_as::<_, Contract>( r#" UPDATE contracts SET completed_deliverables = completed_deliverables - $2, updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(contract_id) .bind(phase) .fetch_one(pool) .await } /// Get the supervisor task for a contract. pub async fn get_contract_supervisor_task( pool: &PgPool, contract_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" SELECT t.* FROM tasks t JOIN contracts c ON c.supervisor_task_id = t.id WHERE c.id = $1 "#, ) .bind(contract_id) .fetch_optional(pool) .await } // ============================================================================ // Task Tree Queries // ============================================================================ /// Get full task tree for a contract. pub async fn get_contract_task_tree( pool: &PgPool, contract_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" WITH RECURSIVE task_tree AS ( -- Base case: root tasks (no parent) SELECT * FROM tasks WHERE contract_id = $1 AND parent_task_id IS NULL UNION ALL -- Recursive case: children of current level SELECT t.* FROM tasks t JOIN task_tree tt ON t.parent_task_id = tt.id ) SELECT * FROM task_tree ORDER BY depth, created_at "#, ) .bind(contract_id) .fetch_all(pool) .await } /// Get task tree from a specific root task. pub async fn get_task_tree(pool: &PgPool, root_task_id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" WITH RECURSIVE task_tree AS ( -- Base case: the root task SELECT * FROM tasks WHERE id = $1 UNION ALL -- Recursive case: children of current level SELECT t.* FROM tasks t JOIN task_tree tt ON t.parent_task_id = tt.id ) SELECT * FROM task_tree ORDER BY depth, created_at "#, ) .bind(root_task_id) .fetch_all(pool) .await } // ============================================================================ // Daemon Selection // ============================================================================ /// Get daemons with capacity info for selection. pub async fn get_available_daemons( pool: &PgPool, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DaemonWithCapacity>( r#" SELECT id, owner_id, connection_id, hostname, machine_id, max_concurrent_tasks, current_task_count, capacity_score, task_queue_length, supports_migration, status, last_heartbeat_at, connected_at FROM daemons WHERE owner_id = $1 AND status = 'connected' ORDER BY COALESCE(capacity_score, 100) DESC, (max_concurrent_tasks - current_task_count) DESC, COALESCE(task_queue_length, 0) ASC "#, ) .bind(owner_id) .fetch_all(pool) .await } /// Get daemons with capacity info for selection, excluding specified daemon IDs. /// Used for task retry to avoid reassigning to daemons that have already failed. pub async fn get_available_daemons_excluding( pool: &PgPool, owner_id: Uuid, exclude_daemon_ids: &[Uuid], ) -> Result, sqlx::Error> { sqlx::query_as::<_, DaemonWithCapacity>( r#" SELECT id, owner_id, connection_id, hostname, machine_id, max_concurrent_tasks, current_task_count, capacity_score, task_queue_length, supports_migration, status, last_heartbeat_at, connected_at FROM daemons WHERE owner_id = $1 AND status = 'connected' AND id != ALL($2) ORDER BY COALESCE(capacity_score, 100) DESC, (max_concurrent_tasks - current_task_count) DESC, COALESCE(task_queue_length, 0) ASC "#, ) .bind(owner_id) .bind(exclude_daemon_ids) .fetch_all(pool) .await } /// Create a daemon task assignment. pub async fn create_daemon_task_assignment( pool: &PgPool, daemon_id: Uuid, task_id: Uuid, ) -> Result { sqlx::query_as::<_, DaemonTaskAssignment>( r#" INSERT INTO daemon_task_assignments (daemon_id, task_id) VALUES ($1, $2) RETURNING * "#, ) .bind(daemon_id) .bind(task_id) .fetch_one(pool) .await } /// Update daemon task assignment status. pub async fn update_daemon_task_assignment_status( pool: &PgPool, task_id: Uuid, status: &str, ) -> Result { sqlx::query_as::<_, DaemonTaskAssignment>( r#" UPDATE daemon_task_assignments SET status = $1 WHERE task_id = $2 RETURNING * "#, ) .bind(status) .bind(task_id) .fetch_one(pool) .await } /// Get daemon task assignment for a task. pub async fn get_daemon_task_assignment( pool: &PgPool, task_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DaemonTaskAssignment>( "SELECT * FROM daemon_task_assignments WHERE task_id = $1", ) .bind(task_id) .fetch_optional(pool) .await } // ============================================================================ // Repository History Functions // ============================================================================ use super::models::RepositoryHistoryEntry; /// List all repository history entries for an owner, ordered by use_count DESC, last_used_at DESC. pub async fn list_repository_history_for_owner( pool: &PgPool, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, RepositoryHistoryEntry>( r#" SELECT id, owner_id, name, repository_url, local_path, source_type, use_count, last_used_at, created_at FROM repository_history WHERE owner_id = $1 ORDER BY use_count DESC, last_used_at DESC "#, ) .bind(owner_id) .fetch_all(pool) .await } /// Get repository suggestions for an owner, optionally filtered by source type and query. pub async fn get_repository_suggestions( pool: &PgPool, owner_id: Uuid, source_type: Option<&str>, query: Option<&str>, limit: i32, ) -> Result, sqlx::Error> { // Build query dynamically based on filters let mut sql = String::from( r#" SELECT id, owner_id, name, repository_url, local_path, source_type, use_count, last_used_at, created_at FROM repository_history WHERE owner_id = $1 "#, ); let mut param_idx = 2; if source_type.is_some() { sql.push_str(&format!(" AND source_type = ${}", param_idx)); param_idx += 1; } if query.is_some() { sql.push_str(&format!( " AND (LOWER(name) LIKE ${} OR LOWER(COALESCE(repository_url, '')) LIKE ${} OR LOWER(COALESCE(local_path, '')) LIKE ${})", param_idx, param_idx, param_idx )); param_idx += 1; } sql.push_str(&format!( " ORDER BY use_count DESC, last_used_at DESC LIMIT ${}", param_idx )); // Build and execute query with the appropriate bindings let mut query_builder = sqlx::query_as::<_, RepositoryHistoryEntry>(&sql).bind(owner_id); if let Some(st) = source_type { query_builder = query_builder.bind(st); } if let Some(q) = query { let search_pattern = format!("%{}%", q.to_lowercase()); query_builder = query_builder.bind(search_pattern); } query_builder = query_builder.bind(limit); query_builder.fetch_all(pool).await } /// Add or update a repository history entry. /// If an entry with the same URL (for remote) or path (for local) already exists, /// increment use_count and update last_used_at and name. /// Otherwise, create a new entry. pub async fn add_or_update_repository_history( pool: &PgPool, owner_id: Uuid, name: &str, repository_url: Option<&str>, local_path: Option<&str>, source_type: &str, ) -> Result { // Use UPSERT (INSERT ... ON CONFLICT) if source_type == "remote" { let url = repository_url.ok_or_else(|| { sqlx::Error::Protocol("repository_url required for remote type".to_string()) })?; sqlx::query_as::<_, RepositoryHistoryEntry>( r#" INSERT INTO repository_history (owner_id, name, repository_url, local_path, source_type, use_count, last_used_at) VALUES ($1, $2, $3, NULL, $4, 1, NOW()) ON CONFLICT (owner_id, repository_url) WHERE source_type = 'remote' AND repository_url IS NOT NULL DO UPDATE SET name = EXCLUDED.name, use_count = repository_history.use_count + 1, last_used_at = NOW() RETURNING id, owner_id, name, repository_url, local_path, source_type, use_count, last_used_at, created_at "#, ) .bind(owner_id) .bind(name) .bind(url) .bind(source_type) .fetch_one(pool) .await } else if source_type == "local" { let path = local_path.ok_or_else(|| { sqlx::Error::Protocol("local_path required for local type".to_string()) })?; sqlx::query_as::<_, RepositoryHistoryEntry>( r#" INSERT INTO repository_history (owner_id, name, repository_url, local_path, source_type, use_count, last_used_at) VALUES ($1, $2, NULL, $3, $4, 1, NOW()) ON CONFLICT (owner_id, local_path) WHERE source_type = 'local' AND local_path IS NOT NULL DO UPDATE SET name = EXCLUDED.name, use_count = repository_history.use_count + 1, last_used_at = NOW() RETURNING id, owner_id, name, repository_url, local_path, source_type, use_count, last_used_at, created_at "#, ) .bind(owner_id) .bind(name) .bind(path) .bind(source_type) .fetch_one(pool) .await } else { Err(sqlx::Error::Protocol(format!( "Invalid source_type: {}", source_type ))) } } /// Delete a repository history entry. /// Returns true if an entry was deleted, false if not found. pub async fn delete_repository_history( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result { let result = sqlx::query( r#" DELETE FROM repository_history WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } // ============================================================================ // Conversation Snapshots // ============================================================================ /// Create a new conversation snapshot pub async fn create_conversation_snapshot( pool: &PgPool, task_id: Uuid, checkpoint_id: Option, snapshot_type: &str, message_count: i32, conversation_state: serde_json::Value, metadata: Option, ) -> Result { sqlx::query_as::<_, ConversationSnapshot>( r#" INSERT INTO conversation_snapshots (task_id, checkpoint_id, snapshot_type, message_count, conversation_state, metadata) VALUES ($1, $2, $3, $4, $5, $6) RETURNING * "# ) .bind(task_id) .bind(checkpoint_id) .bind(snapshot_type) .bind(message_count) .bind(conversation_state) .bind(metadata) .fetch_one(pool) .await } /// Get a conversation snapshot by ID pub async fn get_conversation_snapshot( pool: &PgPool, id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, ConversationSnapshot>( "SELECT * FROM conversation_snapshots WHERE id = $1" ) .bind(id) .fetch_optional(pool) .await } /// Get conversation snapshot at a specific checkpoint pub async fn get_conversation_at_checkpoint( pool: &PgPool, checkpoint_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, ConversationSnapshot>( "SELECT * FROM conversation_snapshots WHERE checkpoint_id = $1 ORDER BY created_at DESC LIMIT 1" ) .bind(checkpoint_id) .fetch_optional(pool) .await } /// List conversation snapshots for a task pub async fn list_conversation_snapshots( pool: &PgPool, task_id: Uuid, limit: Option, ) -> Result, sqlx::Error> { let limit = limit.unwrap_or(100); sqlx::query_as::<_, ConversationSnapshot>( "SELECT * FROM conversation_snapshots WHERE task_id = $1 ORDER BY created_at DESC LIMIT $2" ) .bind(task_id) .bind(limit) .fetch_all(pool) .await } /// Delete conversation snapshots older than retention period pub async fn cleanup_old_snapshots( pool: &PgPool, retention_days: i32, ) -> Result { let result = sqlx::query( "DELETE FROM conversation_snapshots WHERE created_at < NOW() - INTERVAL '1 day' * $1" ) .bind(retention_days) .execute(pool) .await?; Ok(result.rows_affected()) } // ============================================================================ // History Events // ============================================================================ /// Record a new history event #[allow(clippy::too_many_arguments)] pub async fn record_history_event( pool: &PgPool, owner_id: Uuid, contract_id: Option, task_id: Option, event_type: &str, event_subtype: Option<&str>, phase: Option<&str>, event_data: serde_json::Value, ) -> Result { sqlx::query_as::<_, HistoryEvent>( r#" INSERT INTO history_events (owner_id, contract_id, task_id, event_type, event_subtype, phase, event_data) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING * "# ) .bind(owner_id) .bind(contract_id) .bind(task_id) .bind(event_type) .bind(event_subtype) .bind(phase) .bind(event_data) .fetch_one(pool) .await } /// Get contract history timeline pub async fn get_contract_history( pool: &PgPool, contract_id: Uuid, owner_id: Uuid, filters: &HistoryQueryFilters, ) -> Result<(Vec, i64), sqlx::Error> { let limit = filters.limit.unwrap_or(100); let mut query = String::from( "SELECT * FROM history_events WHERE contract_id = $1 AND owner_id = $2" ); let mut count_query = String::from( "SELECT COUNT(*) FROM history_events WHERE contract_id = $1 AND owner_id = $2" ); let mut param_count = 2; if filters.phase.is_some() { param_count += 1; query.push_str(&format!(" AND phase = ${}" , param_count)); count_query.push_str(&format!(" AND phase = ${}", param_count)); } if filters.from.is_some() { param_count += 1; query.push_str(&format!(" AND created_at >= ${}", param_count)); count_query.push_str(&format!(" AND created_at >= ${}", param_count)); } if filters.to.is_some() { param_count += 1; query.push_str(&format!(" AND created_at <= ${}", param_count)); count_query.push_str(&format!(" AND created_at <= ${}", param_count)); } query.push_str(" ORDER BY created_at DESC"); query.push_str(&format!(" LIMIT {}", limit)); // Build and execute the query dynamically let mut q = sqlx::query_as::<_, HistoryEvent>(&query) .bind(contract_id) .bind(owner_id); if let Some(ref phase) = filters.phase { q = q.bind(phase); } if let Some(ref from) = filters.from { q = q.bind(from); } if let Some(ref to) = filters.to { q = q.bind(to); } let events = q.fetch_all(pool).await?; // Get total count let mut cq = sqlx::query_scalar::<_, i64>(&count_query) .bind(contract_id) .bind(owner_id); if let Some(ref phase) = filters.phase { cq = cq.bind(phase); } if let Some(ref from) = filters.from { cq = cq.bind(from); } if let Some(ref to) = filters.to { cq = cq.bind(to); } let count = cq.fetch_one(pool).await?; Ok((events, count)) } /// Get task history pub async fn get_task_history( pool: &PgPool, task_id: Uuid, owner_id: Uuid, filters: &HistoryQueryFilters, ) -> Result<(Vec, i64), sqlx::Error> { let limit = filters.limit.unwrap_or(100); let events = sqlx::query_as::<_, HistoryEvent>( r#" SELECT * FROM history_events WHERE task_id = $1 AND owner_id = $2 ORDER BY created_at DESC LIMIT $3 "# ) .bind(task_id) .bind(owner_id) .bind(limit) .fetch_all(pool) .await?; let count: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM history_events WHERE task_id = $1 AND owner_id = $2" ) .bind(task_id) .bind(owner_id) .fetch_one(pool) .await?; Ok((events, count)) } /// Get unified timeline for an owner pub async fn get_timeline( pool: &PgPool, owner_id: Uuid, filters: &HistoryQueryFilters, ) -> Result<(Vec, i64), sqlx::Error> { let limit = filters.limit.unwrap_or(100); let events = sqlx::query_as::<_, HistoryEvent>( r#" SELECT * FROM history_events WHERE owner_id = $1 ORDER BY created_at DESC LIMIT $2 "# ) .bind(owner_id) .bind(limit) .fetch_all(pool) .await?; let count: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM history_events WHERE owner_id = $1" ) .bind(owner_id) .fetch_one(pool) .await?; Ok((events, count)) } // ============================================================================ // Task Conversation Retrieval // ============================================================================ // Helper struct for parsing task output events #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] struct TaskOutputEvent { message_type: String, content: Option, tool_name: Option, tool_input: Option, is_error: Option, cost_usd: Option, } /// Get task conversation messages (reconstructed from task_events) pub async fn get_task_conversation( pool: &PgPool, task_id: Uuid, include_tool_calls: bool, include_tool_results: bool, limit: Option, ) -> Result, sqlx::Error> { let limit = limit.unwrap_or(1000); // Get output events that represent conversation turns let events = sqlx::query_as::<_, TaskEvent>( r#" SELECT * FROM task_events WHERE task_id = $1 AND event_type = 'output' ORDER BY created_at ASC LIMIT $2 "# ) .bind(task_id) .bind(limit) .fetch_all(pool) .await?; // Convert task events to conversation messages let mut messages = Vec::new(); for event in events { if let Some(data) = event.event_data { // Parse the event data to extract message info if let Ok(output) = serde_json::from_value::(data.clone()) { let should_include = match output.message_type.as_str() { "tool_use" => include_tool_calls, "tool_result" => include_tool_results, _ => true, }; if should_include { messages.push(ConversationMessage { id: event.id.to_string(), role: match output.message_type.as_str() { "assistant" => "assistant".to_string(), "tool_use" => "assistant".to_string(), "tool_result" => "tool".to_string(), "system" => "system".to_string(), "error" => "system".to_string(), _ => "user".to_string(), }, content: output.content.unwrap_or_default(), timestamp: event.created_at, tool_calls: None, tool_name: output.tool_name, tool_input: output.tool_input, tool_result: None, is_error: output.is_error, token_count: None, cost_usd: output.cost_usd.map(|c| c as f64), }); } } } } Ok(messages) } /// Get supervisor conversation (from supervisor_states) pub async fn get_supervisor_conversation_full( pool: &PgPool, contract_id: Uuid, ) -> Result, sqlx::Error> { get_supervisor_state(pool, contract_id).await } // ============================================================================= // Anonymous Task Cleanup Functions // ============================================================================= /// Delete stale anonymous tasks (tasks with contract_id = NULL) that: /// - Are in a terminal state (done, failed, merged) /// - Are older than the specified number of days /// /// Returns the number of deleted tasks. pub async fn cleanup_stale_anonymous_tasks( pool: &PgPool, max_age_days: i32, ) -> Result { let result = sqlx::query( r#" DELETE FROM tasks WHERE contract_id IS NULL AND status IN ('done', 'failed', 'merged') AND created_at < NOW() - INTERVAL '1 day' * $1 "#, ) .bind(max_age_days) .execute(pool) .await?; Ok(result.rows_affected() as i64) } // ============================================================================ // Checkpoint Patches (for task recovery) // ============================================================================ /// Create a checkpoint patch for task recovery. pub async fn create_checkpoint_patch( pool: &PgPool, task_id: Uuid, checkpoint_id: Option, base_commit_sha: &str, patch_data: &[u8], files_count: i32, ttl_hours: i64, ) -> Result { sqlx::query_as::<_, CheckpointPatch>( r#" INSERT INTO checkpoint_patches ( task_id, checkpoint_id, base_commit_sha, patch_data, patch_size_bytes, files_count, expires_at ) VALUES ($1, $2, $3, $4, $5, $6, NOW() + INTERVAL '1 hour' * $7) RETURNING * "#, ) .bind(task_id) .bind(checkpoint_id) .bind(base_commit_sha) .bind(patch_data) .bind(patch_data.len() as i32) .bind(files_count) .bind(ttl_hours) .fetch_one(pool) .await } /// Get the latest checkpoint patch for a task. pub async fn get_latest_checkpoint_patch( pool: &PgPool, task_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, CheckpointPatch>( r#" SELECT * FROM checkpoint_patches WHERE task_id = $1 AND expires_at > NOW() ORDER BY created_at DESC LIMIT 1 "#, ) .bind(task_id) .fetch_optional(pool) .await } /// Get a checkpoint patch by ID. pub async fn get_checkpoint_patch( pool: &PgPool, id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, CheckpointPatch>( "SELECT * FROM checkpoint_patches WHERE id = $1", ) .bind(id) .fetch_optional(pool) .await } /// List all checkpoint patches for a task (without patch data for efficiency). pub async fn list_checkpoint_patches( pool: &PgPool, task_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, CheckpointPatchInfo>( r#" SELECT id, task_id, checkpoint_id, base_commit_sha, patch_size_bytes, files_count, created_at, expires_at FROM checkpoint_patches WHERE task_id = $1 ORDER BY created_at DESC "#, ) .bind(task_id) .fetch_all(pool) .await } /// Delete expired checkpoint patches. /// Returns the number of deleted patches. pub async fn cleanup_expired_checkpoint_patches( pool: &PgPool, ) -> Result { let result = sqlx::query("DELETE FROM checkpoint_patches WHERE expires_at < NOW()") .execute(pool) .await?; Ok(result.rows_affected() as i64) } /// Delete all checkpoint patches for a task. pub async fn delete_checkpoint_patches_for_task( pool: &PgPool, task_id: Uuid, ) -> Result { let result = sqlx::query("DELETE FROM checkpoint_patches WHERE task_id = $1") .bind(task_id) .execute(pool) .await?; Ok(result.rows_affected() as i64) } // ============================================================================= // Red Team Notifications // ============================================================================= // ============================================================================= // Supervisor Status API Helpers // ============================================================================= /// Get supervisor status for a contract. /// Returns combined information from supervisor_states and tasks tables. pub async fn get_supervisor_status( pool: &PgPool, contract_id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { // Query to get supervisor status by joining supervisor_states with tasks sqlx::query_as::<_, SupervisorStatusInfo>( r#" SELECT ss.task_id, COALESCE(t.status, 'unknown') as supervisor_state, ss.phase, t.progress_summary as current_activity, ss.pending_task_ids, ss.last_activity as last_heartbeat, t.status as task_status, t.daemon_id IS NOT NULL as is_running FROM supervisor_states ss JOIN tasks t ON t.id = ss.task_id WHERE ss.contract_id = $1 AND t.owner_id = $2 "#, ) .bind(contract_id) .bind(owner_id) .fetch_optional(pool) .await } /// Internal struct to hold supervisor status query result #[derive(Debug, Clone, sqlx::FromRow)] pub struct SupervisorStatusInfo { pub task_id: Uuid, pub supervisor_state: String, pub phase: String, pub current_activity: Option, #[sqlx(try_from = "Vec")] pub pending_task_ids: Vec, pub last_heartbeat: chrono::DateTime, pub task_status: String, pub is_running: bool, } /// Get supervisor activity history from history_events table. /// This provides a timeline of supervisor activities that can serve as "heartbeats". pub async fn get_supervisor_activity_history( pool: &PgPool, contract_id: Uuid, limit: i32, offset: i32, ) -> Result, sqlx::Error> { sqlx::query_as::<_, SupervisorActivityEntry>( r#" SELECT created_at as timestamp, COALESCE(event_subtype, 'activity') as state, event_data->>'activity' as activity, (event_data->>'progress')::INTEGER as progress, COALESCE(phase, 'unknown') as phase, CASE WHEN event_data->'pending_task_ids' IS NOT NULL THEN ARRAY(SELECT jsonb_array_elements_text(event_data->'pending_task_ids'))::UUID[] ELSE ARRAY[]::UUID[] END as pending_task_ids FROM history_events WHERE contract_id = $1 AND event_type IN ('supervisor', 'phase', 'task') ORDER BY created_at DESC LIMIT $2 OFFSET $3 "#, ) .bind(contract_id) .bind(limit) .bind(offset) .fetch_all(pool) .await } /// Internal struct to hold supervisor activity entry #[derive(Debug, Clone, sqlx::FromRow)] pub struct SupervisorActivityEntry { pub timestamp: chrono::DateTime, pub state: String, pub activity: Option, pub progress: Option, pub phase: String, #[sqlx(try_from = "Vec")] pub pending_task_ids: Vec, } /// Count total supervisor activity history entries for a contract. pub async fn count_supervisor_activity_history( pool: &PgPool, contract_id: Uuid, ) -> Result { let result: (i64,) = sqlx::query_as( r#" SELECT COUNT(*) FROM history_events WHERE contract_id = $1 AND event_type IN ('supervisor', 'phase', 'task') "#, ) .bind(contract_id) .fetch_one(pool) .await?; Ok(result.0) } /// Update supervisor state last_activity timestamp. /// This acts as a "sync" operation to refresh the supervisor's heartbeat. pub async fn sync_supervisor_state( pool: &PgPool, contract_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, SupervisorState>( r#" UPDATE supervisor_states SET last_activity = NOW(), updated_at = NOW() WHERE contract_id = $1 RETURNING * "#, ) .bind(contract_id) .fetch_optional(pool) .await } // ============================================================================= // Helper Functions // ============================================================================= /// Helper to truncate string to max length fn truncate_string(s: &str, max_len: usize) -> String { if s.len() <= max_len { s.to_string() } else { format!("{}...", &s[..max_len - 3]) } } // ============================================================================= // Directive CRUD // ============================================================================= /// Create a new directive for an owner. pub async fn create_directive_for_owner( pool: &PgPool, owner_id: Uuid, req: CreateDirectiveRequest, ) -> Result { sqlx::query_as::<_, Directive>( r#" INSERT INTO directives (owner_id, title, goal, repository_url, local_path, base_branch, reconcile_mode) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING * "#, ) .bind(owner_id) .bind(&req.title) .bind(&req.goal) .bind(&req.repository_url) .bind(&req.local_path) .bind(&req.base_branch) .bind(req.reconcile_mode.as_deref().unwrap_or("auto")) .fetch_one(pool) .await } /// Get a single directive for an owner. pub async fn get_directive_for_owner( pool: &PgPool, owner_id: Uuid, id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Directive>( r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await } /// Get a directive without an owner scope check. /// /// Used by background orchestration code that has already established the /// directive identity through other means (e.g. it just received the /// directive_id from a different already-authorized query). HTTP handlers /// must continue to use `get_directive_for_owner` to enforce isolation. pub async fn get_directive( pool: &PgPool, id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Directive>(r#"SELECT * FROM directives WHERE id = $1"#) .bind(id) .fetch_optional(pool) .await } /// Get a directive with all its steps. pub async fn get_directive_with_steps_for_owner( pool: &PgPool, owner_id: Uuid, id: Uuid, ) -> Result)>, sqlx::Error> { let directive = sqlx::query_as::<_, Directive>( r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await?; match directive { Some(d) => { let steps = list_directive_steps(pool, d.id).await?; Ok(Some((d, steps))) } None => Ok(None), } } /// List all directives for an owner with step counts. Excludes the per-owner /// tmp directive (the scratchpad surface; surfaced via the sidebar's /// dedicated `tmp/` folder, not the regular directive list). pub async fn list_directives_for_owner( pool: &PgPool, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DirectiveSummary>( r#" SELECT d.id, d.owner_id, d.title, d.goal, d.status, d.repository_url, d.orchestrator_task_id, d.pr_url, d.completion_task_id, d.reconcile_mode, d.version, d.created_at, d.updated_at, COALESCE(s.total_steps, 0) as total_steps, COALESCE(s.completed_steps, 0) as completed_steps, COALESCE(s.running_steps, 0) as running_steps, COALESCE(s.failed_steps, 0) as failed_steps FROM directives d LEFT JOIN LATERAL ( SELECT COUNT(*) as total_steps, COUNT(*) FILTER (WHERE status = 'completed') as completed_steps, COUNT(*) FILTER (WHERE status = 'running') as running_steps, COUNT(*) FILTER (WHERE status = 'failed') as failed_steps FROM directive_steps WHERE directive_id = d.id ) s ON true WHERE d.owner_id = $1 AND d.is_tmp = false ORDER BY d.created_at DESC "#, ) .bind(owner_id) .fetch_all(pool) .await } /// Update a directive with optimistic locking. pub async fn update_directive_for_owner( pool: &PgPool, owner_id: Uuid, id: Uuid, req: UpdateDirectiveRequest, ) -> Result, RepositoryError> { let current = sqlx::query_as::<_, Directive>( r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await .map_err(RepositoryError::Database)?; let current = match current { Some(c) => c, None => return Ok(None), }; if let Some(expected_version) = req.version { if expected_version != current.version { return Err(RepositoryError::VersionConflict { expected: expected_version, actual: current.version, }); } } let title = req.title.as_deref().unwrap_or(¤t.title); let goal = req.goal.as_deref().unwrap_or(¤t.goal); let goal_changed = goal != current.goal; let status = req.status.as_deref().unwrap_or(¤t.status); let repository_url = req.repository_url.as_deref().or(current.repository_url.as_deref()); let local_path = req.local_path.as_deref().or(current.local_path.as_deref()); let base_branch = req.base_branch.as_deref().or(current.base_branch.as_deref()); let orchestrator_task_id = req.orchestrator_task_id.or(current.orchestrator_task_id); let pr_url = req.pr_url.as_deref().or(current.pr_url.as_deref()); let pr_branch = req.pr_branch.as_deref().or(current.pr_branch.as_deref()); let reconcile_mode = req.reconcile_mode.clone().unwrap_or_else(|| current.reconcile_mode.clone()); let result = sqlx::query_as::<_, Directive>( r#" UPDATE directives SET title = $3, goal = $4, status = $5, repository_url = $6, local_path = $7, base_branch = $8, orchestrator_task_id = $9, pr_url = $10, pr_branch = $11, reconcile_mode = $12, goal_updated_at = CASE WHEN $13 THEN NOW() ELSE goal_updated_at END, version = version + 1, updated_at = NOW() WHERE id = $1 AND owner_id = $2 RETURNING * "#, ) .bind(id) .bind(owner_id) .bind(title) .bind(goal) .bind(status) .bind(repository_url) .bind(local_path) .bind(base_branch) .bind(orchestrator_task_id) .bind(pr_url) .bind(pr_branch) .bind(reconcile_mode) .bind(goal_changed) .fetch_optional(pool) .await .map_err(RepositoryError::Database)?; Ok(result) } /// Delete a directive for an owner. pub async fn delete_directive_for_owner( pool: &PgPool, owner_id: Uuid, id: Uuid, ) -> Result { // Delete all tasks associated with this directive sqlx::query( r#"DELETE FROM tasks WHERE directive_id = $1 AND owner_id = $2"#, ) .bind(id) .bind(owner_id) .execute(pool) .await?; let result = sqlx::query( r#"DELETE FROM directives WHERE id = $1 AND owner_id = $2"#, ) .bind(id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Clean up terminal tasks associated with a directive. /// /// Deletes tasks in terminal states (completed, failed, merged, done, interrupted) /// that belong to this directive, excluding tasks currently referenced by /// `completion_task_id` or `orchestrator_task_id` on the directive. /// NULLs out `task_id` on directive_steps for deleted tasks. pub async fn cleanup_directive_tasks( pool: &PgPool, owner_id: Uuid, directive_id: Uuid, ) -> Result { // NULL out task_id on steps that reference terminal tasks we're about to delete sqlx::query( r#" UPDATE directive_steps SET task_id = NULL WHERE directive_id = $1 AND task_id IS NOT NULL AND task_id IN ( SELECT t.id FROM tasks t WHERE t.directive_id = $1 AND t.owner_id = $2 AND t.status IN ('completed', 'failed', 'merged', 'done', 'interrupted') AND t.id NOT IN ( SELECT COALESCE(d.completion_task_id, '00000000-0000-0000-0000-000000000000') FROM directives d WHERE d.id = $1 UNION SELECT COALESCE(d.orchestrator_task_id, '00000000-0000-0000-0000-000000000000') FROM directives d WHERE d.id = $1 ) ) "#, ) .bind(directive_id) .bind(owner_id) .execute(pool) .await?; // Delete terminal tasks not currently referenced by the directive let result = sqlx::query( r#" DELETE FROM tasks WHERE directive_id = $1 AND owner_id = $2 AND status IN ('completed', 'failed', 'merged', 'done', 'interrupted') AND id NOT IN ( SELECT COALESCE(d.completion_task_id, '00000000-0000-0000-0000-000000000000') FROM directives d WHERE d.id = $1 UNION SELECT COALESCE(d.orchestrator_task_id, '00000000-0000-0000-0000-000000000000') FROM directives d WHERE d.id = $1 ) "#, ) .bind(directive_id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() as i64) } // ============================================================================= // Directive Completion Helpers // ============================================================================= /// Row type for completed step tasks. #[derive(Debug, Clone, sqlx::FromRow)] pub struct CompletedStepTask { pub step_id: Uuid, pub step_name: String, pub task_id: Uuid, pub task_name: String, } /// Row type for directive completion task status check. #[derive(Debug, Clone, sqlx::FromRow)] pub struct DirectiveCompletionCheck { pub directive_id: Uuid, pub owner_id: Uuid, pub completion_task_id: Uuid, pub task_status: String, pub pr_url: Option, pub task_name: String, } /// Get idle directives that need a completion task spawned. /// Conditions: status = 'idle', no completion_task_id, has repository_url, /// and has at least one completed step with a task_id. pub async fn get_idle_directives_needing_completion( pool: &PgPool, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Directive>( r#" SELECT d.* FROM directives d WHERE d.status = 'idle' AND d.completion_task_id IS NULL AND d.pr_branch IS NULL AND d.repository_url IS NOT NULL AND EXISTS ( SELECT 1 FROM directive_steps ds WHERE ds.directive_id = d.id AND ds.status = 'completed' AND ds.task_id IS NOT NULL ) "#, ) .fetch_all(pool) .await } /// Get directives that attempted completion (pr_branch set) but have no PR URL yet /// and no active completion task. These need a verification task spawned. pub async fn get_directives_needing_verification( pool: &PgPool, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Directive>( r#" SELECT d.* FROM directives d WHERE d.status = 'idle' AND d.pr_branch IS NOT NULL AND d.pr_url IS NULL AND d.completion_task_id IS NULL AND d.repository_url IS NOT NULL "#, ) .fetch_all(pool) .await } /// Get directives with active completion tasks, joined with task status. pub async fn get_completion_tasks_to_check( pool: &PgPool, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DirectiveCompletionCheck>( r#" SELECT d.id as directive_id, d.owner_id, d.completion_task_id, t.status as task_status, d.pr_url, t.name as task_name FROM directives d JOIN tasks t ON t.id = d.completion_task_id WHERE d.completion_task_id IS NOT NULL "#, ) .fetch_all(pool) .await } /// Atomically claim a directive for completion by setting a placeholder completion_task_id. /// Returns true if the claim was successful (no other task already claimed it). pub async fn claim_directive_for_completion( pool: &PgPool, directive_id: Uuid, task_id: Uuid, ) -> Result { let result = sqlx::query( r#"UPDATE directives SET completion_task_id = $2, updated_at = NOW() WHERE id = $1 AND completion_task_id IS NULL"#, ) .bind(directive_id) .bind(task_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Assign a completion task to a directive (unconditional update). pub async fn assign_completion_task( pool: &PgPool, directive_id: Uuid, task_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( r#"UPDATE directives SET completion_task_id = $2, updated_at = NOW() WHERE id = $1"#, ) .bind(directive_id) .bind(task_id) .execute(pool) .await?; Ok(()) } /// Clear the completion task from a directive. pub async fn clear_completion_task( pool: &PgPool, directive_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( r#"UPDATE directives SET completion_task_id = NULL, updated_at = NOW() WHERE id = $1"#, ) .bind(directive_id) .execute(pool) .await?; Ok(()) } /// Get completed step tasks for a directive (steps that have completed with an assigned task). pub async fn get_completed_step_tasks( pool: &PgPool, directive_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, CompletedStepTask>( r#" SELECT ds.id as step_id, ds.name as step_name, ds.task_id, t.name as task_name FROM directive_steps ds JOIN tasks t ON t.id = ds.task_id WHERE ds.directive_id = $1 AND ds.status = 'completed' AND ds.task_id IS NOT NULL ORDER BY ds.order_index, ds.created_at "#, ) .bind(directive_id) .fetch_all(pool) .await } /// Get the task ID of the most recently completed step for a directive. /// Used as a fallback `continue_from_task_id` when dispatching new-generation steps /// that have no explicit dependencies and no PR branch to continue from. pub async fn get_last_completed_step_task_id( pool: &PgPool, directive_id: Uuid, ) -> Result, sqlx::Error> { let row: Option<(Uuid,)> = sqlx::query_as( r#" SELECT ds.task_id FROM directive_steps ds WHERE ds.directive_id = $1 AND ds.status = 'completed' AND ds.task_id IS NOT NULL ORDER BY ds.updated_at DESC LIMIT 1 "#, ) .bind(directive_id) .fetch_optional(pool) .await?; Ok(row.map(|r| r.0)) } // ============================================================================= // Directive Step CRUD // ============================================================================= /// Get a single directive step by ID. pub async fn get_directive_step( pool: &PgPool, step_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DirectiveStep>( r#"SELECT * FROM directive_steps WHERE id = $1"#, ) .bind(step_id) .fetch_optional(pool) .await } /// List all steps for a directive, ordered by order_index. pub async fn list_directive_steps( pool: &PgPool, directive_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DirectiveStep>( r#" SELECT * FROM directive_steps WHERE directive_id = $1 ORDER BY order_index, created_at "#, ) .bind(directive_id) .fetch_all(pool) .await } /// Create a single directive step. pub async fn create_directive_step( pool: &PgPool, directive_id: Uuid, req: CreateDirectiveStepRequest, ) -> Result { let generation = req.generation.unwrap_or(1); let order_id = req.order_id; let contract_type = req.contract_type.clone(); // Resolve the document this step belongs to. If the caller supplied one, // honour it; otherwise pick the directive's most recently-updated // active (or draft) document. Steps that can't be matched to any // document fall through with NULL — the sidebar treats those as // directive-level orphans. let directive_document_id = match req.directive_document_id { Some(id) => Some(id), None => resolve_active_document_for_directive(pool, directive_id) .await .unwrap_or(None), }; let step = sqlx::query_as::<_, DirectiveStep>( r#" INSERT INTO directive_steps ( directive_id, name, description, task_plan, depends_on, order_index, generation, contract_type, directive_document_id ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING * "#, ) .bind(directive_id) .bind(&req.name) .bind(&req.description) .bind(&req.task_plan) .bind(&req.depends_on) .bind(req.order_index) .bind(generation) .bind(&contract_type) .bind(&directive_document_id) .fetch_one(pool) .await?; // If an order_id was provided, auto-link the order to this step if let Some(oid) = order_id { sqlx::query( r#"UPDATE orders SET directive_step_id = $1, updated_at = NOW() WHERE id = $2"#, ) .bind(step.id) .bind(oid) .execute(pool) .await?; } Ok(step) } /// Batch create multiple directive steps. pub async fn batch_create_directive_steps( pool: &PgPool, directive_id: Uuid, steps: Vec, ) -> Result, sqlx::Error> { let mut results = Vec::with_capacity(steps.len()); for req in steps { let step = create_directive_step(pool, directive_id, req).await?; results.push(step); } Ok(results) } /// Update a directive step. pub async fn update_directive_step( pool: &PgPool, step_id: Uuid, req: UpdateDirectiveStepRequest, ) -> Result, sqlx::Error> { let current = sqlx::query_as::<_, DirectiveStep>( r#"SELECT * FROM directive_steps WHERE id = $1"#, ) .bind(step_id) .fetch_optional(pool) .await?; let current = match current { Some(c) => c, None => return Ok(None), }; let name = req.name.as_deref().unwrap_or(¤t.name); let description = req.description.as_deref().or(current.description.as_deref()); let task_plan = req.task_plan.as_deref().or(current.task_plan.as_deref()); let depends_on = req.depends_on.as_deref().unwrap_or(¤t.depends_on); let status = req.status.as_deref().unwrap_or(¤t.status); let task_id = req.task_id.or(current.task_id); let order_index = req.order_index.unwrap_or(current.order_index); // Set started_at when transitioning to running let started_at = if status == "running" && current.status != "running" { Some(Utc::now()) } else { current.started_at }; // Set completed_at when transitioning to terminal state let completed_at = if matches!(status, "completed" | "failed" | "skipped") && !matches!(current.status.as_str(), "completed" | "failed" | "skipped") { Some(Utc::now()) } else { current.completed_at }; sqlx::query_as::<_, DirectiveStep>( r#" UPDATE directive_steps SET name = $2, description = $3, task_plan = $4, depends_on = $5, status = $6, task_id = $7, order_index = $8, started_at = $9, completed_at = $10 WHERE id = $1 RETURNING * "#, ) .bind(step_id) .bind(name) .bind(description) .bind(task_plan) .bind(depends_on) .bind(status) .bind(task_id) .bind(order_index) .bind(started_at) .bind(completed_at) .fetch_optional(pool) .await } /// Delete a directive step. pub async fn delete_directive_step( pool: &PgPool, step_id: Uuid, ) -> Result { let result = sqlx::query(r#"DELETE FROM directive_steps WHERE id = $1"#) .bind(step_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Delete all directive steps that have not started execution (pending, ready, failed, skipped). /// Completed and running steps are preserved. /// Returns the number of deleted steps. pub async fn clear_pending_directive_steps( pool: &PgPool, directive_id: Uuid, ) -> Result { let result = sqlx::query( r#"DELETE FROM directive_steps WHERE directive_id = $1 AND status IN ('pending', 'ready', 'failed', 'skipped')"#, ) .bind(directive_id) .execute(pool) .await?; Ok(result.rows_affected()) } // ============================================================================= // Directive Document CRUD // ============================================================================= /// List all contracts under a directive in queue order. /// /// Ordered by `position` (lower = earlier), with `created_at` as a stable /// tie-break. Position is the queue order in the unified directive UI; /// only one contract is active at a time, and the next-up contract is /// the lowest-position non-shipped row. pub async fn list_directive_documents( pool: &PgPool, directive_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DirectiveDocument>( r#" SELECT * FROM directive_documents WHERE directive_id = $1 ORDER BY position ASC, created_at ASC "#, ) .bind(directive_id) .fetch_all(pool) .await } /// Get a single directive document by ID. pub async fn get_directive_document( pool: &PgPool, document_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DirectiveDocument>( r#"SELECT * FROM directive_documents WHERE id = $1"#, ) .bind(document_id) .fetch_optional(pool) .await } /// Create a new directive document (contract). Status defaults to 'draft'. /// /// The new row's `position` is computed server-side as /// `MAX(position) + 1` over the directive's existing contracts, so it /// lands at the bottom of the queue. Callers that want to insert in the /// middle should call `reorder_directive_document_position` afterwards. /// `merge_mode` defaults to 'shared' on creation; flip later via /// `update_directive_document`. pub async fn create_directive_document( pool: &PgPool, directive_id: Uuid, title: &str, body: &str, ) -> Result { sqlx::query_as::<_, DirectiveDocument>( r#" INSERT INTO directive_documents (directive_id, title, body, status, position) VALUES ( $1, $2, $3, 'draft', COALESCE( (SELECT MAX(position) + 1 FROM directive_documents WHERE directive_id = $1), 0 ) ) RETURNING * "#, ) .bind(directive_id) .bind(title) .bind(body) .fetch_one(pool) .await } /// Update a directive document's title and/or body. /// /// Bumps `version` and `updated_at`. If the document was previously in the /// `shipped` state and the body actually changed, the status flips back to /// `active` and `shipped_at` is cleared — this implements the "editing a /// shipped contract reactivates it" behaviour. The wiring from the API / /// handlers will be added in a later step. pub async fn update_directive_document( pool: &PgPool, document_id: Uuid, title: Option<&str>, body: Option<&str>, merge_mode: Option<&str>, ) -> Result, sqlx::Error> { let current = sqlx::query_as::<_, DirectiveDocument>( r#"SELECT * FROM directive_documents WHERE id = $1"#, ) .bind(document_id) .fetch_optional(pool) .await?; let current = match current { Some(c) => c, None => return Ok(None), }; let new_title = title.unwrap_or(¤t.title); let new_body = body.unwrap_or(¤t.body); let new_merge_mode = merge_mode.unwrap_or(¤t.merge_mode); let body_changed = new_body != current.body; // Reactivation rule: editing the body of a shipped doc flips it back // to 'active' and clears shipped_at. Other status transitions remain // untouched here and are handled by the dedicated mark/archive helpers. let reactivate_from_shipped = current.status == "shipped" && body_changed; let new_status = if reactivate_from_shipped { "active" } else { current.status.as_str() }; let result = sqlx::query_as::<_, DirectiveDocument>( r#" UPDATE directive_documents SET title = $2, body = $3, status = $4, shipped_at = CASE WHEN $5 THEN NULL ELSE shipped_at END, merge_mode = $6, version = version + 1, updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(document_id) .bind(new_title) .bind(new_body) .bind(new_status) .bind(reactivate_from_shipped) .bind(new_merge_mode) .fetch_optional(pool) .await?; Ok(result) } /// Move a contract to a new queue position within its directive. /// /// Implementation: a single SQL CTE that bumps siblings out of the way /// based on whether we're moving forward (later) or backward (earlier). /// Returns the updated contract row. pub async fn reorder_directive_document_position( pool: &PgPool, document_id: Uuid, new_position: i32, ) -> Result, sqlx::Error> { let mut tx = pool.begin().await?; let current = sqlx::query_as::<_, DirectiveDocument>( r#"SELECT * FROM directive_documents WHERE id = $1"#, ) .bind(document_id) .fetch_optional(&mut *tx) .await?; let current = match current { Some(c) => c, None => return Ok(None), }; if current.position == new_position { tx.commit().await?; return Ok(Some(current)); } // Shift siblings to make room. Moving forward (new > old) drags the // intermediate range back by one; moving backward pushes it forward. if new_position > current.position { sqlx::query( r#" UPDATE directive_documents SET position = position - 1 WHERE directive_id = $1 AND id <> $2 AND position > $3 AND position <= $4 "#, ) .bind(current.directive_id) .bind(document_id) .bind(current.position) .bind(new_position) .execute(&mut *tx) .await?; } else { sqlx::query( r#" UPDATE directive_documents SET position = position + 1 WHERE directive_id = $1 AND id <> $2 AND position >= $3 AND position < $4 "#, ) .bind(current.directive_id) .bind(document_id) .bind(new_position) .bind(current.position) .execute(&mut *tx) .await?; } let result = sqlx::query_as::<_, DirectiveDocument>( r#" UPDATE directive_documents SET position = $2, updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(document_id) .bind(new_position) .fetch_optional(&mut *tx) .await?; tx.commit().await?; Ok(result) } /// Mark a directive document as shipped (PR raised). Sets pr_url, optional /// pr_branch, status = 'shipped', shipped_at = NOW(), and bumps version. pub async fn mark_directive_document_shipped( pool: &PgPool, document_id: Uuid, pr_url: &str, pr_branch: Option<&str>, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DirectiveDocument>( r#" UPDATE directive_documents SET status = 'shipped', pr_url = $2, pr_branch = COALESCE($3, pr_branch), shipped_at = NOW(), version = version + 1, updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(document_id) .bind(pr_url) .bind(pr_branch) .fetch_optional(pool) .await } /// Archive a directive document. Sets status = 'archived' and stamps /// archived_at = NOW(). Idempotent — archiving an already-archived doc /// re-stamps archived_at and bumps version. /// /// If the archived contract was `active`, the next-up `queued` contract /// in the same directive auto-promotes to `active` (sequential queue). pub async fn archive_directive_document( pool: &PgPool, document_id: Uuid, ) -> Result, sqlx::Error> { let mut tx = pool.begin().await?; let archived = sqlx::query_as::<_, DirectiveDocument>( r#" UPDATE directive_documents SET status = 'archived', archived_at = NOW(), version = version + 1, updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(document_id) .fetch_optional(&mut *tx) .await?; if let Some(ref doc) = archived { promote_next_queued_contract(&mut tx, doc.directive_id).await?; } tx.commit().await?; Ok(archived) } // ============================================================================ // Lifecycle transitions: start / pause / complete / unlock // // The lifecycle is `draft → queued → active → shipped → archived`. At most // one contract per directive sits in `active` at a time — the queue is // serialised because a directive owns a single shared worktree. Helpers // below enforce that invariant in SQL transactions. // ============================================================================ /// Lock a draft contract and either activate it (if no sibling is active) /// or queue it. Returns the updated row, or `Ok(None)` if the contract /// doesn't exist. Errors with `RepositoryError::Validation` if the /// contract is in any state other than `draft`. pub async fn start_contract( pool: &PgPool, contract_id: Uuid, ) -> Result, RepositoryError> { let mut tx = pool.begin().await?; let current = sqlx::query_as::<_, DirectiveDocument>( r#"SELECT * FROM directive_documents WHERE id = $1"#, ) .bind(contract_id) .fetch_optional(&mut *tx) .await?; let current = match current { Some(c) => c, None => return Ok(None), }; if current.status != "draft" { return Err(RepositoryError::Validation(format!( "contract is in status '{}'; only 'draft' contracts can be started", current.status ))); } // If any sibling is already active, this one queues. Otherwise it // claims the active slot directly. let active_count: (i64,) = sqlx::query_as( r#"SELECT COUNT(*)::BIGINT FROM directive_documents WHERE directive_id = $1 AND status = 'active'"#, ) .bind(current.directive_id) .fetch_one(&mut *tx) .await?; let new_status = if active_count.0 > 0 { "queued" } else { "active" }; let updated = sqlx::query_as::<_, DirectiveDocument>( r#" UPDATE directive_documents SET status = $2, version = version + 1, updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(contract_id) .bind(new_status) .fetch_optional(&mut *tx) .await?; tx.commit().await?; Ok(updated) } /// Pause an active contract — moves it back to `queued` so the next /// queued sibling can pick up the active slot. The orchestrator-daemon /// stop is the caller's responsibility. pub async fn pause_contract( pool: &PgPool, contract_id: Uuid, ) -> Result, RepositoryError> { let mut tx = pool.begin().await?; let current = sqlx::query_as::<_, DirectiveDocument>( r#"SELECT * FROM directive_documents WHERE id = $1"#, ) .bind(contract_id) .fetch_optional(&mut *tx) .await?; let current = match current { Some(c) => c, None => return Ok(None), }; if current.status != "active" { return Err(RepositoryError::Validation(format!( "contract is in status '{}'; only 'active' contracts can be paused", current.status ))); } let updated = sqlx::query_as::<_, DirectiveDocument>( r#" UPDATE directive_documents SET status = 'queued', version = version + 1, updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(contract_id) .fetch_optional(&mut *tx) .await?; // The slot is free — promote the next queued contract (lowest // position, excluding the one we just paused). promote_next_queued_contract(&mut tx, current.directive_id).await?; tx.commit().await?; Ok(updated) } /// Mark an active contract as `shipped` — the work is done. Optional /// pr_url / pr_branch are recorded if supplied. Promotes the next /// queued sibling to `active`. pub async fn complete_contract( pool: &PgPool, contract_id: Uuid, pr_url: Option<&str>, pr_branch: Option<&str>, ) -> Result, RepositoryError> { let mut tx = pool.begin().await?; let current = sqlx::query_as::<_, DirectiveDocument>( r#"SELECT * FROM directive_documents WHERE id = $1"#, ) .bind(contract_id) .fetch_optional(&mut *tx) .await?; let current = match current { Some(c) => c, None => return Ok(None), }; if current.status != "active" && current.status != "queued" { return Err(RepositoryError::Validation(format!( "contract is in status '{}'; only 'active' or 'queued' contracts can be completed", current.status ))); } let updated = sqlx::query_as::<_, DirectiveDocument>( r#" UPDATE directive_documents SET status = 'shipped', pr_url = COALESCE($2, pr_url), pr_branch = COALESCE($3, pr_branch), shipped_at = NOW(), version = version + 1, updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(contract_id) .bind(pr_url) .bind(pr_branch) .fetch_optional(&mut *tx) .await?; promote_next_queued_contract(&mut tx, current.directive_id).await?; tx.commit().await?; Ok(updated) } /// Unlock a queued or active contract back to `draft` so the spec is /// editable again. If the contract was active, the slot frees and the /// next queued sibling auto-promotes. pub async fn unlock_contract( pool: &PgPool, contract_id: Uuid, ) -> Result, RepositoryError> { let mut tx = pool.begin().await?; let current = sqlx::query_as::<_, DirectiveDocument>( r#"SELECT * FROM directive_documents WHERE id = $1"#, ) .bind(contract_id) .fetch_optional(&mut *tx) .await?; let current = match current { Some(c) => c, None => return Ok(None), }; if current.status != "queued" && current.status != "active" { return Err(RepositoryError::Validation(format!( "contract is in status '{}'; only 'queued' or 'active' contracts can be unlocked", current.status ))); } let was_active = current.status == "active"; let updated = sqlx::query_as::<_, DirectiveDocument>( r#" UPDATE directive_documents SET status = 'draft', version = version + 1, updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(contract_id) .fetch_optional(&mut *tx) .await?; if was_active { promote_next_queued_contract(&mut tx, current.directive_id).await?; } tx.commit().await?; Ok(updated) } /// Find the lowest-position `queued` contract under a directive and /// flip it to `active`. No-op when no queued contract exists. /// /// Caller must hold the parent transaction so the count → promote /// sequence stays atomic w.r.t. other lifecycle transitions. async fn promote_next_queued_contract( tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, directive_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( r#" UPDATE directive_documents SET status = 'active', version = version + 1, updated_at = NOW() WHERE id = ( SELECT id FROM directive_documents WHERE directive_id = $1 AND status = 'queued' ORDER BY position ASC, created_at ASC LIMIT 1 ) "#, ) .bind(directive_id) .execute(&mut **tx) .await?; Ok(()) } /// Count the number of currently-active documents under a directive. /// "Active" here means status = 'active' (not draft, shipped, or archived). pub async fn count_active_directive_documents( pool: &PgPool, directive_id: Uuid, ) -> Result { let row: (i64,) = sqlx::query_as( r#" SELECT COUNT(*)::BIGINT FROM directive_documents WHERE directive_id = $1 AND status = 'active' "#, ) .bind(directive_id) .fetch_one(pool) .await?; Ok(row.0) } /// List all tasks attached to a specific directive document. /// /// This powers the per-document `tasks/` subfolder in the sidebar — when a /// document ships, its tasks visually move with it under shipped/. Includes /// both step-execution tasks (those with directive_step_id set) and /// "ephemeral" / orchestrator-style tasks (those without a step). /// /// Hidden tasks are filtered out so dismissed tasks don't reappear in the /// document's task list. pub async fn list_directive_document_tasks( pool: &PgPool, document_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" SELECT * FROM tasks WHERE directive_document_id = $1 AND COALESCE(hidden, false) = false ORDER BY created_at DESC "#, ) .bind(document_id) .fetch_all(pool) .await } /// List directive_steps attached to a specific document, ordered the same /// way the directive page orders them (by order_index, then created_at). pub async fn list_directive_document_steps( pool: &PgPool, document_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DirectiveStep>( r#" SELECT * FROM directive_steps WHERE directive_document_id = $1 ORDER BY order_index, created_at "#, ) .bind(document_id) .fetch_all(pool) .await } // ============================================================================= // Directive DAG Progression // ============================================================================= /// Advance pending steps to ready if all their dependencies are in terminal states. /// Returns the newly-ready steps. pub async fn advance_directive_ready_steps( pool: &PgPool, directive_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DirectiveStep>( r#" UPDATE directive_steps SET status = 'ready' WHERE directive_id = $1 AND status = 'pending' AND NOT EXISTS ( SELECT 1 FROM unnest(depends_on) AS dep_id JOIN directive_steps ds ON ds.id = dep_id WHERE ds.status NOT IN ('completed', 'skipped') ) AND NOT EXISTS ( SELECT 1 FROM directive_steps prev WHERE prev.directive_id = $1 AND prev.order_index < directive_steps.order_index AND prev.status NOT IN ('completed', 'skipped', 'failed') ) RETURNING * "#, ) .bind(directive_id) .fetch_all(pool) .await } /// Check if all steps in a directive are in terminal states. /// If so, set the directive to 'idle' (not completed — directives are ongoing). /// Returns true if the directive was set to idle. pub async fn check_directive_idle( pool: &PgPool, directive_id: Uuid, ) -> Result { let result = sqlx::query( r#" UPDATE directives SET status = 'idle', updated_at = NOW() WHERE id = $1 AND status = 'active' AND NOT EXISTS ( SELECT 1 FROM directive_steps WHERE directive_id = $1 AND status NOT IN ('completed', 'failed', 'skipped') ) AND EXISTS ( SELECT 1 FROM directive_steps WHERE directive_id = $1 ) "#, ) .bind(directive_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Update a directive's goal and bump goal_updated_at. /// Reactivates draft/idle/paused/inactive directives and clears any stale /// orchestrator task so that planning/replanning triggers on the next /// reconciler tick. /// /// `draft` flips because the document-mode UI treats the first goal save as /// the implicit "start". `inactive` flips because editing a contract whose /// last revision was already shipped is the way the user kicks off an /// amendment — the planner picks it up via phase_planning/replanning and /// uses get_latest_merged_revision to learn the BEFORE→AFTER diff. pub async fn update_directive_goal( pool: &PgPool, owner_id: Uuid, directive_id: Uuid, goal: &str, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Directive>( r#" UPDATE directives SET goal = $3, goal_updated_at = NOW(), status = CASE WHEN status IN ('draft', 'idle', 'paused', 'inactive') THEN 'active' ELSE status END, orchestrator_task_id = NULL, updated_at = NOW(), version = version + 1 WHERE id = $1 AND owner_id = $2 RETURNING * "#, ) .bind(directive_id) .bind(owner_id) .bind(goal) .fetch_optional(pool) .await } /// Mark a directive 'inactive'. Used at the moment a PR is raised — at that /// point the contract's current iteration is "shipped" and editing the goal /// (Stage 4) starts an amendment cycle. Idempotent: no-op if status is /// already inactive or already past it (e.g. archived). pub async fn set_directive_inactive( pool: &PgPool, directive_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( r#" UPDATE directives SET status = 'inactive', updated_at = NOW(), version = version + 1 WHERE id = $1 AND status IN ('active', 'idle', 'paused') "#, ) .bind(directive_id) .execute(pool) .await?; Ok(()) } /// Reset a directive for a "new draft" cycle: clear the goal back to empty, /// flip status to 'draft', and detach the current pr_url / pr_branch / /// orchestrator linkage so the next goal save starts fresh. Prior revisions /// remain in `directive_revisions` as the historical record. Used by the /// sidebar's "New draft" right-click on inactive contracts. pub async fn reset_directive_for_new_draft( pool: &PgPool, owner_id: Uuid, directive_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Directive>( r#" UPDATE directives SET goal = '', goal_updated_at = NOW(), status = 'draft', pr_url = NULL, pr_branch = NULL, orchestrator_task_id = NULL, completion_task_id = NULL, updated_at = NOW(), version = version + 1 WHERE id = $1 AND owner_id = $2 RETURNING * "#, ) .bind(directive_id) .bind(owner_id) .fetch_optional(pool) .await } /// Update a directive's goal WITHOUT clearing the orchestrator task id. /// /// This is the path used by the goal-edit interrupt cycle: when a small goal /// edit arrives while a planner is already running, we want to keep the /// planner attached so a `SendMessage` can summarise the change in-flight /// instead of cancelling and respawning. We still bump `goal_updated_at` so /// the timestamp reflects the edit, but we do NOT trigger replanning by /// clearing the orchestrator task. We also do not flip status from /// idle/paused → active here, since by definition a planner is already /// running. pub async fn update_directive_goal_keep_orchestrator( pool: &PgPool, owner_id: Uuid, directive_id: Uuid, goal: &str, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Directive>( r#" UPDATE directives SET goal = $3, goal_updated_at = NOW(), updated_at = NOW(), version = version + 1 WHERE id = $1 AND owner_id = $2 RETURNING * "#, ) .bind(directive_id) .bind(owner_id) .bind(goal) .fetch_optional(pool) .await } // ============================================================================= // Directive Revisions — per-PR snapshots of the contract content. // ============================================================================= /// Snapshot the directive's current goal as a revision attached to the given /// PR URL. The version is auto-assigned as MAX(existing) + 1 per directive. /// Idempotent on (directive_id, pr_url): if a revision already exists for /// this directive+pr_url combo, returns the existing row instead of creating /// a duplicate. pub async fn create_directive_revision( pool: &PgPool, directive_id: Uuid, content: &str, pr_url: &str, pr_branch: Option<&str>, ) -> Result { // Idempotency: don't double-snapshot if the orchestrator's completion task // re-runs and re-sets the same pr_url. if let Some(existing) = sqlx::query_as::<_, crate::db::models::DirectiveRevision>( r#" SELECT * FROM directive_revisions WHERE directive_id = $1 AND pr_url = $2 ORDER BY frozen_at DESC LIMIT 1 "#, ) .bind(directive_id) .bind(pr_url) .fetch_optional(pool) .await? { return Ok(existing); } sqlx::query_as::<_, crate::db::models::DirectiveRevision>( r#" INSERT INTO directive_revisions (directive_id, content, pr_url, pr_branch, pr_state, version, frozen_at) SELECT $1, $2, $3, $4, 'open', COALESCE(MAX(version), 0) + 1, NOW() FROM directive_revisions WHERE directive_id = $1 RETURNING * "#, ) .bind(directive_id) .bind(content) .bind(pr_url) .bind(pr_branch) .fetch_one(pool) .await } /// List all revisions for a directive, newest first. Scoped by owner via the /// directive join so callers don't accidentally surface other users' history. pub async fn list_directive_revisions_for_owner( pool: &PgPool, owner_id: Uuid, directive_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, crate::db::models::DirectiveRevision>( r#" SELECT r.* FROM directive_revisions r JOIN directives d ON d.id = r.directive_id WHERE r.directive_id = $1 AND d.owner_id = $2 ORDER BY r.frozen_at DESC "#, ) .bind(directive_id) .bind(owner_id) .fetch_all(pool) .await } /// Update the pr_state on a revision (called by the reconciler when it /// detects a PR transitioned to merged/closed). New state must be one of /// 'open' | 'merged' | 'closed' to satisfy the table's CHECK constraint. pub async fn update_directive_revision_pr_state( pool: &PgPool, revision_id: Uuid, new_state: &str, ) -> Result<(), sqlx::Error> { sqlx::query( r#"UPDATE directive_revisions SET pr_state = $2 WHERE id = $1"#, ) .bind(revision_id) .bind(new_state) .execute(pool) .await?; Ok(()) } /// Find the most recent merged revision for a directive — used when planning /// an amendment to know what the previous "frozen" content was so the diff /// can be passed to the orchestrator. pub async fn get_latest_merged_revision( pool: &PgPool, directive_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, crate::db::models::DirectiveRevision>( r#" SELECT * FROM directive_revisions WHERE directive_id = $1 AND pr_state = 'merged' ORDER BY frozen_at DESC LIMIT 1 "#, ) .bind(directive_id) .fetch_optional(pool) .await } /// Save a goal to the directive goal history. pub async fn save_directive_goal_history( pool: &PgPool, directive_id: Uuid, goal: &str, ) -> Result<(), sqlx::Error> { sqlx::query( r#"INSERT INTO directive_goal_history (directive_id, goal) VALUES ($1, $2)"#, ) .bind(directive_id) .bind(goal) .execute(pool) .await?; Ok(()) } /// Get recent goal history for a directive (most recent first), limited to limit entries. pub async fn get_directive_goal_history( pool: &PgPool, directive_id: Uuid, limit: i64, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DirectiveGoalHistory>( r#"SELECT id, directive_id, goal, created_at FROM directive_goal_history WHERE directive_id = $1 ORDER BY created_at DESC LIMIT $2"#, ) .bind(directive_id) .bind(limit) .fetch_all(pool) .await } /// Set a directive's status (used for start/pause/archive transitions). pub async fn set_directive_status( pool: &PgPool, owner_id: Uuid, directive_id: Uuid, status: &str, ) -> Result, sqlx::Error> { let mut query = String::from( r#"UPDATE directives SET status = $3, updated_at = NOW(), version = version + 1"#, ); if status == "active" { query.push_str(", started_at = COALESCE(started_at, NOW())"); } query.push_str(" WHERE id = $1 AND owner_id = $2 RETURNING *"); sqlx::query_as::<_, Directive>(&query) .bind(directive_id) .bind(owner_id) .bind(status) .fetch_optional(pool) .await } // ============================================================================= // Directive Orchestrator Queries // ============================================================================= /// Get active directives that need planning (no steps, no orchestrator task). pub async fn get_directives_needing_planning( pool: &PgPool, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Directive>( r#" SELECT d.* FROM directives d WHERE d.status = 'active' AND d.orchestrator_task_id IS NULL AND NOT EXISTS ( SELECT 1 FROM directive_steps WHERE directive_id = d.id ) "#, ) .fetch_all(pool) .await } /// A step joined with minimal directive info for dispatch. #[derive(Debug, Clone, sqlx::FromRow)] pub struct StepForDispatch { // Step fields pub step_id: Uuid, pub directive_id: Uuid, pub step_name: String, pub step_description: Option, pub task_plan: Option, pub order_index: i32, pub generation: i32, pub depends_on: Vec, /// Optional contract type — when set, orchestrator creates a contract instead of a task. pub contract_type: Option, // Directive fields pub owner_id: Uuid, pub directive_title: String, pub repository_url: Option, pub base_branch: Option, /// The directive's PR branch (if a PR has already been created from previous steps). pub pr_branch: Option, /// The directive's reconcile mode: "auto", "semi-auto", or "manual". pub reconcile_mode: String, } /// Get ready steps that need task dispatch. pub async fn get_ready_steps_for_dispatch( pool: &PgPool, ) -> Result, sqlx::Error> { sqlx::query_as::<_, StepForDispatch>( r#" SELECT ds.id AS step_id, ds.directive_id, ds.name AS step_name, ds.description AS step_description, ds.task_plan, ds.order_index, ds.generation, ds.depends_on, ds.contract_type, d.owner_id, d.title AS directive_title, d.repository_url, d.base_branch, d.pr_branch, d.reconcile_mode FROM directive_steps ds JOIN directives d ON d.id = ds.directive_id WHERE ds.status = 'ready' AND ds.task_id IS NULL AND ds.contract_id IS NULL AND d.status = 'active' ORDER BY ds.order_index "#, ) .fetch_all(pool) .await } /// Task info for a dependency step (step → linked task). #[derive(Debug, Clone, sqlx::FromRow)] pub struct DependencyTaskInfo { pub step_id: Uuid, pub task_id: Uuid, pub task_name: String, } /// Resolve dependency step UUIDs to their linked task IDs and names. /// Returns results in the same order as the input `depends_on` slice. pub async fn get_step_dependency_tasks( pool: &PgPool, depends_on: &[Uuid], ) -> Result, sqlx::Error> { if depends_on.is_empty() { return Ok(vec![]); } let rows = sqlx::query_as::<_, DependencyTaskInfo>( r#" SELECT ds.id AS step_id, t.id AS task_id, t.name AS task_name FROM directive_steps ds JOIN tasks t ON t.id = ds.task_id WHERE ds.id = ANY($1) "#, ) .bind(depends_on) .fetch_all(pool) .await?; // Re-order to match input ordering let mut ordered = Vec::with_capacity(depends_on.len()); for dep_id in depends_on { if let Some(row) = rows.iter().find(|r| r.step_id == *dep_id) { ordered.push(row.clone()); } } Ok(ordered) } /// A running step joined with its task's current status. #[derive(Debug, Clone, sqlx::FromRow)] pub struct RunningStepWithTask { pub step_id: Uuid, pub directive_id: Uuid, pub task_id: Uuid, pub task_status: String, } /// Get running steps with their task status for monitoring. pub async fn get_running_steps_with_tasks( pool: &PgPool, ) -> Result, sqlx::Error> { sqlx::query_as::<_, RunningStepWithTask>( r#" SELECT ds.id AS step_id, ds.directive_id, ds.task_id AS "task_id!", t.status AS task_status FROM directive_steps ds JOIN tasks t ON t.id = ds.task_id WHERE ds.status = 'running' AND ds.task_id IS NOT NULL AND ds.contract_id IS NULL "#, ) .fetch_all(pool) .await } /// A running step backed by a contract, joined with the contract's current status. #[derive(Debug, Clone, sqlx::FromRow)] pub struct RunningStepWithContract { pub step_id: Uuid, pub directive_id: Uuid, pub contract_id: Uuid, pub contract_status: String, pub contract_phase: String, } /// Get running steps that are backed by contracts (for contract-based monitoring). pub async fn get_running_steps_with_contracts( pool: &PgPool, ) -> Result, sqlx::Error> { sqlx::query_as::<_, RunningStepWithContract>( r#" SELECT ds.id AS step_id, ds.directive_id, ds.contract_id AS "contract_id!", c.status AS contract_status, c.phase AS contract_phase FROM directive_steps ds JOIN contracts c ON c.id = ds.contract_id WHERE ds.status = 'running' AND ds.contract_id IS NOT NULL "#, ) .fetch_all(pool) .await } /// An orchestrator task to check (directive with pending planning task). #[derive(Debug, Clone, sqlx::FromRow)] pub struct OrchestratorTaskCheck { pub directive_id: Uuid, pub orchestrator_task_id: Uuid, pub task_status: String, pub owner_id: Uuid, } /// Get directives with orchestrator tasks to check completion. pub async fn get_orchestrator_tasks_to_check( pool: &PgPool, ) -> Result, sqlx::Error> { sqlx::query_as::<_, OrchestratorTaskCheck>( r#" SELECT d.id AS directive_id, d.orchestrator_task_id AS "orchestrator_task_id!", t.status AS task_status, d.owner_id FROM directives d JOIN tasks t ON t.id = d.orchestrator_task_id WHERE d.orchestrator_task_id IS NOT NULL AND d.status = 'active' "#, ) .fetch_all(pool) .await } /// Get active directives that need re-planning (goal updated after latest step). pub async fn get_directives_needing_replanning( pool: &PgPool, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Directive>( r#" SELECT d.* FROM directives d WHERE d.status = 'active' AND d.orchestrator_task_id IS NULL AND EXISTS ( SELECT 1 FROM directive_steps WHERE directive_id = d.id ) AND d.goal_updated_at > ( SELECT COALESCE(MAX(ds.created_at), '1970-01-01'::timestamptz) FROM directive_steps ds WHERE ds.directive_id = d.id ) "#, ) .fetch_all(pool) .await } /// Assign a task to a step and set status to running. pub async fn assign_task_to_step( pool: &PgPool, step_id: Uuid, task_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DirectiveStep>( r#" UPDATE directive_steps SET task_id = $2, status = 'running', started_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(step_id) .bind(task_id) .fetch_optional(pool) .await } /// Set the orchestrator_task_id on a directive. pub async fn assign_orchestrator_task( pool: &PgPool, directive_id: Uuid, task_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( r#" UPDATE directives SET orchestrator_task_id = $2, updated_at = NOW() WHERE id = $1 "#, ) .bind(directive_id) .bind(task_id) .execute(pool) .await?; Ok(()) } /// Clear the orchestrator_task_id on a directive. pub async fn clear_orchestrator_task( pool: &PgPool, directive_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( r#" UPDATE directives SET orchestrator_task_id = NULL, updated_at = NOW() WHERE id = $1 "#, ) .bind(directive_id) .execute(pool) .await?; Ok(()) } /// Cancel old planning tasks for a directive. /// Marks any non-terminal planning/re-planning tasks as interrupted, /// excluding the given new task. Identifies planning tasks by name prefix /// ("Plan: " or "Re-plan: ") to avoid cancelling completion/verification tasks. pub async fn cancel_old_planning_tasks( pool: &PgPool, directive_id: Uuid, exclude_task_id: Uuid, ) -> Result { let result = sqlx::query( r#" UPDATE tasks SET status = 'interrupted', completed_at = NOW(), updated_at = NOW() WHERE directive_id = $1 AND id != $2 AND (name LIKE 'Plan: %' OR name LIKE 'Re-plan: %') AND status NOT IN ('completed', 'failed', 'merged', 'done', 'interrupted') "#, ) .bind(directive_id) .bind(exclude_task_id) .execute(pool) .await?; Ok(result.rows_affected()) } /// Link a task to a step without changing step status. pub async fn link_task_to_step( pool: &PgPool, step_id: Uuid, task_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( r#" UPDATE directive_steps SET task_id = $2 WHERE id = $1 "#, ) .bind(step_id) .bind(task_id) .execute(pool) .await?; Ok(()) } /// Link a contract to a directive step. pub async fn link_contract_to_step( pool: &PgPool, step_id: Uuid, contract_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( r#" UPDATE directive_steps SET contract_id = $1 WHERE id = $2 "#, ) .bind(contract_id) .bind(step_id) .execute(pool) .await?; Ok(()) } /// Set a step to 'running' status (after its task has been dispatched). pub async fn set_step_running( pool: &PgPool, step_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( r#" UPDATE directive_steps SET status = 'running', started_at = COALESCE(started_at, NOW()) WHERE id = $1 "#, ) .bind(step_id) .execute(pool) .await?; Ok(()) } /// Get pending directive tasks (tasks with directive_id that are still pending). pub async fn get_pending_directive_tasks( pool: &PgPool, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" SELECT * FROM tasks WHERE directive_id IS NOT NULL AND status = 'pending' AND daemon_id IS NULL ORDER BY created_at "#, ) .fetch_all(pool) .await } /// Get the max generation number for steps in a directive. pub async fn get_directive_max_generation( pool: &PgPool, directive_id: Uuid, ) -> Result { let row: (Option,) = sqlx::query_as( r#"SELECT MAX(generation) FROM directive_steps WHERE directive_id = $1"#, ) .bind(directive_id) .fetch_one(pool) .await?; Ok(row.0.unwrap_or(0)) } // ============================================================================= // Order CRUD // ============================================================================= /// Create a new order for the given owner. pub async fn create_order( pool: &PgPool, owner_id: Uuid, req: CreateOrderRequest, ) -> Result { let priority = req.priority.as_deref().unwrap_or("medium"); let status = req.status.as_deref().unwrap_or("open"); let order_type = req.order_type.as_deref().unwrap_or("feature"); sqlx::query_as::<_, Order>( r#" INSERT INTO orders (owner_id, title, description, priority, status, order_type, labels, directive_id, repository_url, dog_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING * "#, ) .bind(owner_id) .bind(&req.title) .bind(&req.description) .bind(priority) .bind(status) .bind(order_type) .bind(&req.labels) .bind(req.directive_id) .bind(&req.repository_url) .bind(req.dog_id) .fetch_one(pool) .await } /// List orders for the given owner with optional filters. pub async fn list_orders( pool: &PgPool, owner_id: Uuid, status_filter: Option<&str>, type_filter: Option<&str>, priority_filter: Option<&str>, directive_id_filter: Option, dog_id_filter: Option, search_filter: Option<&str>, ) -> Result, sqlx::Error> { // Build dynamic query with optional filters let mut query = String::from("SELECT * FROM orders WHERE owner_id = $1"); let mut param_idx = 2u32; if status_filter.is_some() { query.push_str(&format!(" AND status = ${}", param_idx)); param_idx += 1; } if type_filter.is_some() { query.push_str(&format!(" AND order_type = ${}", param_idx)); param_idx += 1; } if priority_filter.is_some() { query.push_str(&format!(" AND priority = ${}", param_idx)); param_idx += 1; } if directive_id_filter.is_some() { query.push_str(&format!(" AND directive_id = ${}", param_idx)); param_idx += 1; } if dog_id_filter.is_some() { query.push_str(&format!(" AND dog_id = ${}", param_idx)); param_idx += 1; } if search_filter.is_some() { query.push_str(&format!( " AND (title ILIKE ${p} OR description ILIKE ${p} OR directive_name ILIKE ${p})", p = param_idx )); let _ = param_idx; // suppress unused warning } query.push_str(" ORDER BY created_at DESC"); let mut q = sqlx::query_as::<_, Order>(&query).bind(owner_id); if let Some(s) = status_filter { q = q.bind(s); } if let Some(t) = type_filter { q = q.bind(t); } if let Some(p) = priority_filter { q = q.bind(p); } if let Some(d) = directive_id_filter { q = q.bind(d); } if let Some(d) = dog_id_filter { q = q.bind(d); } if let Some(s) = search_filter { q = q.bind(format!("%{}%", s)); } q.fetch_all(pool).await } /// Get a single order by ID (owner-scoped). pub async fn get_order( pool: &PgPool, owner_id: Uuid, order_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Order>( r#"SELECT * FROM orders WHERE id = $1 AND owner_id = $2"#, ) .bind(order_id) .bind(owner_id) .fetch_optional(pool) .await } /// Update an order (owner-scoped). Uses COALESCE pattern to only update provided fields. pub async fn update_order( pool: &PgPool, owner_id: Uuid, order_id: Uuid, req: UpdateOrderRequest, ) -> Result, sqlx::Error> { let current = sqlx::query_as::<_, Order>( r#"SELECT * FROM orders WHERE id = $1 AND owner_id = $2"#, ) .bind(order_id) .bind(owner_id) .fetch_optional(pool) .await?; let current = match current { Some(c) => c, None => return Ok(None), }; let title = req.title.as_deref().unwrap_or(¤t.title); let description = req.description.as_deref().or(current.description.as_deref()); let priority = req.priority.as_deref().unwrap_or(¤t.priority); let status = req.status.as_deref().unwrap_or(¤t.status); let order_type = req.order_type.as_deref().unwrap_or(¤t.order_type); let labels = req.labels.as_ref().unwrap_or(¤t.labels); let directive_id = req.directive_id.or(current.directive_id); let directive_step_id = req.directive_step_id.or(current.directive_step_id); let repository_url = req.repository_url.as_deref().or(current.repository_url.as_deref()); let dog_id = req.dog_id.or(current.dog_id); sqlx::query_as::<_, Order>( r#" UPDATE orders SET title = $3, description = $4, priority = $5, status = $6, order_type = $7, labels = $8, directive_id = $9, directive_step_id = $10, repository_url = $11, dog_id = $12, updated_at = NOW() WHERE id = $1 AND owner_id = $2 RETURNING * "#, ) .bind(order_id) .bind(owner_id) .bind(title) .bind(description) .bind(priority) .bind(status) .bind(order_type) .bind(labels) .bind(directive_id) .bind(directive_step_id) .bind(repository_url) .bind(dog_id) .fetch_optional(pool) .await } /// Delete an order (owner-scoped). Returns true if a row was deleted. pub async fn delete_order( pool: &PgPool, owner_id: Uuid, order_id: Uuid, ) -> Result { let result = sqlx::query( r#"DELETE FROM orders WHERE id = $1 AND owner_id = $2"#, ) .bind(order_id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Link an order to a directive. pub async fn link_order_to_directive( pool: &PgPool, owner_id: Uuid, order_id: Uuid, directive_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Order>( r#" UPDATE orders SET directive_id = $3, updated_at = NOW() WHERE id = $1 AND owner_id = $2 RETURNING * "#, ) .bind(order_id) .bind(owner_id) .bind(directive_id) .fetch_optional(pool) .await } /// Convert an order to a directive step. Creates a new DirectiveStep from the order's /// title and description, links the order to the new step, and returns the created step. /// Uses the order's existing directive_id (which is required for new orders). pub async fn convert_order_to_step( pool: &PgPool, owner_id: Uuid, order_id: Uuid, ) -> Result, sqlx::Error> { // Verify the order exists and belongs to this owner let order = sqlx::query_as::<_, Order>( r#"SELECT * FROM orders WHERE id = $1 AND owner_id = $2"#, ) .bind(order_id) .bind(owner_id) .fetch_optional(pool) .await?; let order = match order { Some(o) => o, None => return Ok(None), }; // Get the directive_id from the order (required for new orders, but legacy data may have NULL) let directive_id = match order.directive_id { Some(id) => id, None => return Ok(None), }; // Verify the directive exists and belongs to this owner let directive = sqlx::query_as::<_, Directive>( r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#, ) .bind(directive_id) .bind(owner_id) .fetch_optional(pool) .await?; if directive.is_none() { return Ok(None); } // Get the next order_index for this directive let max_index: (Option,) = sqlx::query_as( r#"SELECT MAX(order_index) FROM directive_steps WHERE directive_id = $1"#, ) .bind(directive_id) .fetch_one(pool) .await?; let next_index = max_index.0.unwrap_or(-1) + 1; // Create the directive step from order data let step = sqlx::query_as::<_, DirectiveStep>( r#" INSERT INTO directive_steps (directive_id, name, description, order_index) VALUES ($1, $2, $3, $4) RETURNING * "#, ) .bind(directive_id) .bind(&order.title) .bind(&order.description) .bind(next_index) .fetch_one(pool) .await?; // Link the order to the new step sqlx::query( r#" UPDATE orders SET directive_step_id = $3, updated_at = NOW() WHERE id = $1 AND owner_id = $2 "#, ) .bind(order_id) .bind(owner_id) .bind(step.id) .execute(pool) .await?; Ok(Some(step)) } // ============================================================================= // Order Pickup // ============================================================================= /// Get available orders for pickup: open orders with no directive assigned /// OR orders already linked to this specific directive that are not yet done, /// sorted by priority (critical first) then creation date. pub async fn get_available_orders_for_pickup( pool: &PgPool, owner_id: Uuid, directive_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Order>( r#" SELECT * FROM orders WHERE owner_id = $1 AND status IN ('open', 'in_progress') AND (directive_id IS NULL OR directive_id = $2) ORDER BY CASE priority WHEN 'critical' THEN 0 WHEN 'high' THEN 1 WHEN 'medium' THEN 2 WHEN 'low' THEN 3 ELSE 4 END ASC, created_at ASC "#, ) .bind(owner_id) .bind(directive_id) .fetch_all(pool) .await } /// Bulk-link orders to a directive by setting directive_id on matching orders. /// Returns the count of updated rows. pub async fn bulk_link_orders_to_directive( pool: &PgPool, owner_id: Uuid, order_ids: &[Uuid], directive_id: Uuid, ) -> Result { let result = sqlx::query( r#" UPDATE orders SET directive_id = $1, updated_at = NOW() WHERE id = ANY($2) AND owner_id = $3 "#, ) .bind(directive_id) .bind(order_ids) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() as i64) } /// Bulk update order status for a set of order IDs. /// Returns the count of updated rows. pub async fn bulk_update_order_status( pool: &PgPool, owner_id: Uuid, order_ids: &[Uuid], status: &str, ) -> Result { let result = sqlx::query( r#"UPDATE orders SET status = $1, updated_at = NOW() WHERE id = ANY($2) AND owner_id = $3"#, ) .bind(status) .bind(order_ids) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() as i64) } /// Get orders linked to a specific directive step. pub async fn get_orders_by_step_id( pool: &PgPool, step_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Order>( r#"SELECT * FROM orders WHERE directive_step_id = $1"#, ) .bind(step_id) .fetch_all(pool) .await } /// Reconcile directive orders by checking linked step statuses. /// - Orders linked to completed steps are marked "done" /// - Orders linked to running/ready steps are marked "under_review" /// Returns the count of orders updated. pub async fn reconcile_directive_orders( pool: &PgPool, owner_id: Uuid, directive_id: Uuid, ) -> Result { let rows: Vec<(Uuid,)> = sqlx::query_as( r#" UPDATE orders o SET status = CASE WHEN ds.status = 'completed' THEN 'done' WHEN ds.status IN ('running', 'ready') THEN 'under_review' ELSE o.status END, updated_at = NOW() FROM directive_steps ds WHERE o.directive_step_id = ds.id AND o.directive_id = $1 AND o.owner_id = $2 AND o.status NOT IN ('done', 'archived') AND ds.status IN ('completed', 'running', 'ready') RETURNING o.id "#, ) .bind(directive_id) .bind(owner_id) .fetch_all(pool) .await?; Ok(rows.len() as i64) } // ============================================================================= // Directive Order Group (DOG) CRUD // ============================================================================= /// Create a new Directive Order Group (DOG) for the given owner and directive. pub async fn create_directive_order_group( pool: &PgPool, directive_id: Uuid, owner_id: Uuid, req: CreateDirectiveOrderGroupRequest, ) -> Result { sqlx::query_as::<_, DirectiveOrderGroup>( r#" INSERT INTO directive_order_groups (directive_id, owner_id, name, description) VALUES ($1, $2, $3, $4) RETURNING * "#, ) .bind(directive_id) .bind(owner_id) .bind(&req.name) .bind(&req.description) .fetch_one(pool) .await } /// List all DOGs for a given directive (owner-scoped). pub async fn list_directive_order_groups( pool: &PgPool, directive_id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DirectiveOrderGroup>( r#" SELECT * FROM directive_order_groups WHERE directive_id = $1 AND owner_id = $2 ORDER BY created_at DESC "#, ) .bind(directive_id) .bind(owner_id) .fetch_all(pool) .await } /// Get a single DOG by ID (owner-scoped). pub async fn get_directive_order_group( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DirectiveOrderGroup>( r#"SELECT * FROM directive_order_groups WHERE id = $1 AND owner_id = $2"#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await } /// Update a DOG (owner-scoped). Uses fetch-then-update pattern for partial updates. pub async fn update_directive_order_group( pool: &PgPool, id: Uuid, owner_id: Uuid, req: UpdateDirectiveOrderGroupRequest, ) -> Result, sqlx::Error> { let current = sqlx::query_as::<_, DirectiveOrderGroup>( r#"SELECT * FROM directive_order_groups WHERE id = $1 AND owner_id = $2"#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await?; let current = match current { Some(c) => c, None => return Ok(None), }; let name = req.name.as_deref().unwrap_or(¤t.name); let description = req.description.as_deref().or(current.description.as_deref()); let status = req.status.as_deref().unwrap_or(¤t.status); sqlx::query_as::<_, DirectiveOrderGroup>( r#" UPDATE directive_order_groups SET name = $3, description = $4, status = $5, updated_at = NOW() WHERE id = $1 AND owner_id = $2 RETURNING * "#, ) .bind(id) .bind(owner_id) .bind(name) .bind(description) .bind(status) .fetch_optional(pool) .await } /// Delete a DOG (owner-scoped). Returns true if a row was deleted. pub async fn delete_directive_order_group( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result { let result = sqlx::query( r#"DELETE FROM directive_order_groups WHERE id = $1 AND owner_id = $2"#, ) .bind(id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// List orders belonging to a specific DOG (owner-scoped). pub async fn list_orders_by_dog( pool: &PgPool, dog_id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Order>( r#" SELECT * FROM orders WHERE dog_id = $1 AND owner_id = $2 ORDER BY created_at DESC "#, ) .bind(dog_id) .bind(owner_id) .fetch_all(pool) .await } /// Get available orders for pickup filtered to a specific DOG. /// Like `get_available_orders_for_pickup` but only returns orders belonging to the given DOG. pub async fn get_available_orders_for_dog_pickup( pool: &PgPool, owner_id: Uuid, directive_id: Uuid, dog_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Order>( r#" SELECT * FROM orders WHERE owner_id = $1 AND dog_id = $3 AND status IN ('open', 'in_progress') AND (directive_id IS NULL OR directive_id = $2) ORDER BY CASE priority WHEN 'critical' THEN 0 WHEN 'high' THEN 1 WHEN 'medium' THEN 2 WHEN 'low' THEN 3 ELSE 4 END ASC, created_at ASC "#, ) .bind(owner_id) .bind(directive_id) .bind(dog_id) .fetch_all(pool) .await }