//! Repository pattern for file database operations. use chrono::Utc; use serde::Deserialize; use sqlx::PgPool; use uuid::Uuid; use super::models::{ CheckpointPatch, CheckpointPatchInfo, ConversationMessage, ConversationSnapshot, CreateFileRequest, CreateTaskRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, Directive, DirectiveDocument, DirectiveStep, DirectiveSummary, CreateDirectiveRequest, CreateDirectiveStepRequest, UpdateDirectiveRequest, UpdateDirectiveStepRequest, CreateOrderRequest, Order, UpdateOrderRequest, CreateDirectiveOrderGroupRequest, DirectiveOrderGroup, UpdateDirectiveOrderGroupRequest, File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters, Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateFileRequest, UpdateTaskRequest, }; /// Repository error types. #[derive(Debug)] pub enum RepositoryError { /// Database error Database(sqlx::Error), /// Version conflict (optimistic locking failure) VersionConflict { /// The version the client expected expected: i32, /// The actual current version in the database actual: i32, }, /// Caller-facing precondition failure (wrong status, etc.). Validation(String), } impl From for RepositoryError { fn from(e: sqlx::Error) -> Self { RepositoryError::Database(e) } } impl std::fmt::Display for RepositoryError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { RepositoryError::Database(e) => write!(f, "Database error: {}", e), RepositoryError::VersionConflict { expected, actual } => { write!( f, "Version conflict: expected {}, actual {}", expected, actual ) } RepositoryError::Validation(msg) => write!(f, "Validation error: {}", msg), } } } impl std::error::Error for RepositoryError {} /// Generate a default name based on current timestamp. fn generate_default_name() -> String { let now = Utc::now(); now.format("Recording - %b %d %Y %H:%M:%S").to_string() } /// Internal request for creating files without contract association (e.g., audio transcription). /// User-facing file creation should use CreateFileRequest which requires contract_id. pub struct InternalCreateFileRequest { pub name: Option, pub description: Option, pub transcript: Vec, pub location: Option, } /// Create a new file record (internal use, no contract required). /// For user-facing file creation, use create_file_for_owner which requires a contract. pub async fn create_file(pool: &PgPool, req: InternalCreateFileRequest) -> Result { let name = req.name.unwrap_or_else(generate_default_name); let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default(); let body_json = serde_json::to_value::>(vec![]).unwrap(); sqlx::query_as::<_, File>( r#" INSERT INTO files (name, description, transcript, location, summary, body) VALUES ($1, $2, $3, $4, NULL, $5) RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(&name) .bind(&req.description) .bind(&transcript_json) .bind(&req.location) .bind(&body_json) .fetch_one(pool) .await } /// Get a file by ID. pub async fn get_file(pool: &PgPool, id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, File>( r#" SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files WHERE id = $1 "#, ) .bind(id) .fetch_optional(pool) .await } /// List all files, ordered by created_at DESC. pub async fn list_files(pool: &PgPool) -> Result, sqlx::Error> { sqlx::query_as::<_, File>( r#" SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files ORDER BY created_at DESC "#, ) .fetch_all(pool) .await } /// Update a file by ID with optimistic locking. /// /// If `req.version` is provided, the update will only succeed if the current /// version matches. Returns `RepositoryError::VersionConflict` if there's a mismatch. /// /// If `req.version` is None (e.g., internal system updates), version checking is skipped. pub async fn update_file( pool: &PgPool, id: Uuid, req: UpdateFileRequest, ) -> Result, RepositoryError> { // Get the existing file first let existing = get_file(pool, id).await?; let Some(existing) = existing else { return Ok(None); }; // Check version if provided (optimistic locking) if let Some(expected_version) = req.version { if existing.version != expected_version { return Err(RepositoryError::VersionConflict { expected: expected_version, actual: existing.version, }); } } // Apply updates let name = req.name.unwrap_or(existing.name); let description = req.description.or(existing.description); let transcript = req.transcript.unwrap_or(existing.transcript); let transcript_json = serde_json::to_value(&transcript).unwrap_or_default(); let summary = req.summary.or(existing.summary); let body = req.body.unwrap_or(existing.body); let body_json = serde_json::to_value(&body).unwrap_or_default(); // Update with version check in WHERE clause for race condition safety let result = if req.version.is_some() { sqlx::query_as::<_, File>( r#" UPDATE files SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW() WHERE id = $1 AND version = $7 RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) .bind(&name) .bind(&description) .bind(&transcript_json) .bind(&summary) .bind(&body_json) .bind(req.version.unwrap()) .fetch_optional(pool) .await? } else { // No version check for internal updates sqlx::query_as::<_, File>( r#" UPDATE files SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW() WHERE id = $1 RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) .bind(&name) .bind(&description) .bind(&transcript_json) .bind(&summary) .bind(&body_json) .fetch_optional(pool) .await? }; // If versioned update returned None, there was a race condition if result.is_none() && req.version.is_some() { // Re-fetch to get the actual version if let Some(current) = get_file(pool, id).await? { return Err(RepositoryError::VersionConflict { expected: req.version.unwrap(), actual: current.version, }); } } Ok(result) } /// Delete a file by ID. pub async fn delete_file(pool: &PgPool, id: Uuid) -> Result { let result = sqlx::query( r#" DELETE FROM files WHERE id = $1 "#, ) .bind(id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Count total files. pub async fn count_files(pool: &PgPool) -> Result { let result: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM files") .fetch_one(pool) .await?; Ok(result.0) } // ============================================================================= // Owner-Scoped File Functions // ============================================================================= /// Create a new file record for a specific owner. pub async fn create_file_for_owner( pool: &PgPool, owner_id: Uuid, req: CreateFileRequest, ) -> Result { let name = req.name.unwrap_or_else(generate_default_name); let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default(); let body_json = serde_json::to_value(&req.body).unwrap_or_default(); sqlx::query_as::<_, File>( r#" INSERT INTO files (owner_id, name, description, transcript, location, summary, body, repo_file_path) VALUES ($1, $2, $3, $4, $5, NULL, $6, $7) RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(owner_id) .bind(&name) .bind(&req.description) .bind(&transcript_json) .bind(&req.location) .bind(&body_json) .bind(&req.repo_file_path) .fetch_one(pool) .await } /// Get a file by ID, scoped to owner. pub async fn get_file_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, File>( r#" SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await } /// List all files for an owner, ordered by created_at DESC. pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, File>( r#" SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files WHERE owner_id = $1 ORDER BY created_at DESC "#, ) .bind(owner_id) .fetch_all(pool) .await } /// Database row type for file summary #[derive(Debug, sqlx::FromRow)] struct FileSummaryRow { id: Uuid, name: String, description: Option, #[sqlx(json)] transcript: Vec, version: i32, repo_file_path: Option, repo_sync_status: Option, created_at: chrono::DateTime, updated_at: chrono::DateTime, } /// List file summaries for an owner. pub async fn list_file_summaries_for_owner( pool: &PgPool, owner_id: Uuid, ) -> Result, sqlx::Error> { let rows = sqlx::query_as::<_, FileSummaryRow>( r#" SELECT f.id, f.name, f.description, f.transcript, f.version, f.repo_file_path, f.repo_sync_status, f.created_at, f.updated_at FROM files f WHERE f.owner_id = $1 ORDER BY f.created_at DESC "#, ) .bind(owner_id) .fetch_all(pool) .await?; Ok(rows .into_iter() .map(|row| { let duration = row .transcript .iter() .map(|t| t.end) .fold(0.0_f32, f32::max); FileSummary { id: row.id, name: row.name, description: row.description, transcript_count: row.transcript.len(), duration: if duration > 0.0 { Some(duration) } else { None }, version: row.version, repo_file_path: row.repo_file_path, repo_sync_status: row.repo_sync_status, created_at: row.created_at, updated_at: row.updated_at, } }) .collect()) } /// Update a file by ID with optimistic locking, scoped to owner. pub async fn update_file_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, req: UpdateFileRequest, ) -> Result, RepositoryError> { // Get the existing file first (scoped to owner) let existing = get_file_for_owner(pool, id, owner_id).await?; let Some(existing) = existing else { return Ok(None); }; // Check version if provided (optimistic locking) if let Some(expected_version) = req.version { if existing.version != expected_version { return Err(RepositoryError::VersionConflict { expected: expected_version, actual: existing.version, }); } } // Apply updates let name = req.name.unwrap_or(existing.name); let description = req.description.or(existing.description); let transcript = req.transcript.unwrap_or(existing.transcript); let transcript_json = serde_json::to_value(&transcript).unwrap_or_default(); let summary = req.summary.or(existing.summary); let body = req.body.unwrap_or(existing.body); let body_json = serde_json::to_value(&body).unwrap_or_default(); // Update with version check in WHERE clause for race condition safety let result = if req.version.is_some() { sqlx::query_as::<_, File>( r#" UPDATE files SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() WHERE id = $1 AND owner_id = $2 AND version = $8 RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) .bind(owner_id) .bind(&name) .bind(&description) .bind(&transcript_json) .bind(&summary) .bind(&body_json) .bind(req.version.unwrap()) .fetch_optional(pool) .await? } else { // No version check for internal updates sqlx::query_as::<_, File>( r#" UPDATE files SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() WHERE id = $1 AND owner_id = $2 RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) .bind(owner_id) .bind(&name) .bind(&description) .bind(&transcript_json) .bind(&summary) .bind(&body_json) .fetch_optional(pool) .await? }; // If versioned update returned None, there was a race condition if result.is_none() && req.version.is_some() { // Re-fetch to get the actual version if let Some(current) = get_file_for_owner(pool, id, owner_id).await? { return Err(RepositoryError::VersionConflict { expected: req.version.unwrap(), actual: current.version, }); } } Ok(result) } /// Delete a file by ID, scoped to owner. pub async fn delete_file_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result { let result = sqlx::query( r#" DELETE FROM files WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } // ============================================================================= // Version History Functions // ============================================================================= /// Set the version source for the current transaction. /// This is used by the trigger to record who made the change. pub async fn set_version_source(pool: &PgPool, source: &str) -> Result<(), sqlx::Error> { sqlx::query(&format!("SET LOCAL app.version_source = '{}'", source)) .execute(pool) .await?; Ok(()) } /// Set the change description for the current transaction. pub async fn set_change_description(pool: &PgPool, description: &str) -> Result<(), sqlx::Error> { // Escape single quotes for SQL let escaped = description.replace('\'', "''"); sqlx::query(&format!("SET LOCAL app.change_description = '{}'", escaped)) .execute(pool) .await?; Ok(()) } /// List all versions of a file, ordered by version DESC. pub async fn list_file_versions(pool: &PgPool, file_id: Uuid) -> Result, sqlx::Error> { // First get the current version from the files table let current = get_file(pool, file_id).await?; let mut versions = sqlx::query_as::<_, FileVersion>( r#" SELECT id, file_id, version, name, description, summary, body, source, change_description, created_at FROM file_versions WHERE file_id = $1 ORDER BY version DESC "#, ) .bind(file_id) .fetch_all(pool) .await?; // Add the current version as the first entry if it exists if let Some(file) = current { let current_version = FileVersion { id: file.id, file_id: file.id, version: file.version, name: file.name, description: file.description, summary: file.summary, body: file.body, source: "user".to_string(), // Current version source change_description: None, created_at: file.updated_at, }; versions.insert(0, current_version); } Ok(versions) } /// Get a specific version of a file. pub async fn get_file_version( pool: &PgPool, file_id: Uuid, version: i32, ) -> Result, sqlx::Error> { // First check if this is the current version if let Some(file) = get_file(pool, file_id).await? { if file.version == version { return Ok(Some(FileVersion { id: file.id, file_id: file.id, version: file.version, name: file.name, description: file.description, summary: file.summary, body: file.body, source: "user".to_string(), change_description: None, created_at: file.updated_at, })); } } // Otherwise, look in the versions table sqlx::query_as::<_, FileVersion>( r#" SELECT id, file_id, version, name, description, summary, body, source, change_description, created_at FROM file_versions WHERE file_id = $1 AND version = $2 "#, ) .bind(file_id) .bind(version) .fetch_optional(pool) .await } /// Restore a file to a previous version. /// This creates a new version with the content from the target version. pub async fn restore_file_version( pool: &PgPool, file_id: Uuid, target_version: i32, current_version: i32, ) -> Result, RepositoryError> { // Get the target version content let target = get_file_version(pool, file_id, target_version).await?; let Some(target) = target else { return Ok(None); }; // Set version source and description for the trigger set_version_source(pool, "system").await?; set_change_description(pool, &format!("Restored from version {}", target_version)).await?; // Update the file with the target version's content // This will trigger the save_file_version trigger to save the current state first let update_req = UpdateFileRequest { name: Some(target.name), description: target.description, transcript: None, summary: target.summary, body: Some(target.body), version: Some(current_version), repo_file_path: None, }; update_file(pool, file_id, update_req).await } /// Count versions for a file. pub async fn count_file_versions(pool: &PgPool, file_id: Uuid) -> Result { let result: (i64,) = sqlx::query_as( "SELECT COUNT(*) + 1 FROM file_versions WHERE file_id = $1", // +1 for current version ) .bind(file_id) .fetch_one(pool) .await?; Ok(result.0) } // ============================================================================= // Task Functions // ============================================================================= /// Create a new task. /// /// If creating a subtask (parent_task_id is set) and repository settings are not provided, /// the subtask will inherit repository_url, base_branch, target_branch, merge_mode, /// and target_repo_path from the parent task. Depth is calculated from parent and limited /// to max 1 (2 levels: orchestrator at depth 0, subtasks at depth 1). /// /// NOTE: completion_action is NOT inherited - subtasks should not auto-merge unless /// explicitly configured. The supervisor controls when completion steps happen. /// /// Task spawning is now controlled by supervisors at the application level. /// Depth is no longer constrained in the database. pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result { // Calculate depth + inherit settings from parent if applicable. let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = if let Some(parent_id) = req.parent_task_id { let parent = get_task(pool, parent_id).await? .ok_or_else(|| sqlx::Error::RowNotFound)?; let new_depth = parent.depth + 1; let repo_url = req.repository_url.clone().or(parent.repository_url); let base_branch = req.base_branch.clone().or(parent.base_branch); let target_branch = req.target_branch.clone().or(parent.target_branch); let merge_mode = req.merge_mode.clone().or(parent.merge_mode); let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path); let completion_action = req.completion_action.clone(); (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) } else { ( 0, req.repository_url.clone(), req.base_branch.clone(), req.target_branch.clone(), req.merge_mode.clone(), req.target_repo_path.clone(), req.completion_action.clone(), ) }; let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default()); sqlx::query_as::<_, Task>( r#" INSERT INTO tasks ( parent_task_id, depth, name, description, plan, priority, repository_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action, continue_from_task_id, copy_files, branched_from_task_id, conversation_state ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) RETURNING * "#, ) .bind(req.parent_task_id) .bind(depth) .bind(&req.name) .bind(&req.description) .bind(&req.plan) .bind(req.priority) .bind(&repo_url) .bind(&base_branch) .bind(&target_branch) .bind(&merge_mode) .bind(&target_repo_path) .bind(&completion_action) .bind(&req.continue_from_task_id) .bind(©_files_json) .bind(&req.branched_from_task_id) .bind(&req.conversation_history) .fetch_one(pool) .await } /// Get a task by ID. pub async fn get_task(pool: &PgPool, id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" SELECT * FROM tasks WHERE id = $1 "#, ) .bind(id) .fetch_optional(pool) .await } /// List all top-level tasks (no parent), ordered by created_at DESC. /// Hidden tasks are excluded by default. pub async fn list_tasks(pool: &PgPool) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t WHERE t.parent_task_id IS NULL AND COALESCE(t.hidden, false) = false ORDER BY t.priority DESC, t.created_at DESC "#, ) .fetch_all(pool) .await } /// List subtasks of a parent task. pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t WHERE t.parent_task_id = $1 ORDER BY t.priority DESC, t.created_at DESC "#, ) .bind(parent_id) .fetch_all(pool) .await } /// List all tasks in a contract (for supervisor tree view). pub async fn mark_task_for_retry( pool: &PgPool, task_id: Uuid, failed_daemon_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" UPDATE tasks SET status = 'pending', daemon_id = NULL, retry_count = retry_count + 1, failed_daemon_ids = array_append(COALESCE(failed_daemon_ids, '{}'), $2), last_active_daemon_id = $2, interrupted_at = NOW(), error_message = 'Daemon disconnected, awaiting retry', updated_at = NOW() WHERE id = $1 AND retry_count < max_retries RETURNING * "#, ) .bind(task_id) .bind(failed_daemon_id) .fetch_optional(pool) .await } /// Mark a task as permanently failed (exceeded retry limit). pub async fn mark_task_permanently_failed( pool: &PgPool, task_id: Uuid, failed_daemon_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( r#" UPDATE tasks SET status = 'failed', daemon_id = NULL, retry_count = retry_count + 1, failed_daemon_ids = array_append(COALESCE(failed_daemon_ids, '{}'), $2), last_active_daemon_id = $2, error_message = 'Task failed: exceeded maximum retry attempts', updated_at = NOW() WHERE id = $1 "#, ) .bind(task_id) .bind(failed_daemon_id) .execute(pool) .await?; Ok(()) } /// Update a task by ID with optimistic locking. pub async fn update_task( pool: &PgPool, id: Uuid, req: UpdateTaskRequest, ) -> Result, RepositoryError> { // Get the existing task first let existing = get_task(pool, id).await?; let Some(existing) = existing else { return Ok(None); }; // Check version if provided (optimistic locking) if let Some(expected_version) = req.version { if existing.version != expected_version { return Err(RepositoryError::VersionConflict { expected: expected_version, actual: existing.version, }); } } // Apply updates let name = req.name.unwrap_or(existing.name); let description = req.description.or(existing.description); let plan = req.plan.unwrap_or(existing.plan); let status = req.status.unwrap_or(existing.status); let priority = req.priority.unwrap_or(existing.priority); let progress_summary = req.progress_summary.or(existing.progress_summary); let last_output = req.last_output.or(existing.last_output); let error_message = req.error_message.or(existing.error_message); let merge_mode = req.merge_mode.or(existing.merge_mode); let pr_url = req.pr_url.or(existing.pr_url); let target_repo_path = req.target_repo_path.or(existing.target_repo_path); let completion_action = req.completion_action.or(existing.completion_action); // Handle clear_daemon_id: if true, set to NULL; otherwise use provided value or keep existing let daemon_id = if req.clear_daemon_id { None } else { req.daemon_id.or(existing.daemon_id) }; // Update with version check in WHERE clause for race condition safety let result = if req.version.is_some() { sqlx::query_as::<_, Task>( r#" UPDATE tasks SET name = $2, description = $3, plan = $4, status = $5, priority = $6, progress_summary = $7, last_output = $8, error_message = $9, merge_mode = $10, pr_url = $11, daemon_id = $12, target_repo_path = $13, completion_action = $14, updated_at = NOW() WHERE id = $1 AND version = $15 RETURNING * "#, ) .bind(id) .bind(&name) .bind(&description) .bind(&plan) .bind(&status) .bind(priority) .bind(&progress_summary) .bind(&last_output) .bind(&error_message) .bind(&merge_mode) .bind(&pr_url) .bind(daemon_id) .bind(&target_repo_path) .bind(&completion_action) .bind(req.version.unwrap()) .fetch_optional(pool) .await? } else { sqlx::query_as::<_, Task>( r#" UPDATE tasks SET name = $2, description = $3, plan = $4, status = $5, priority = $6, progress_summary = $7, last_output = $8, error_message = $9, merge_mode = $10, pr_url = $11, daemon_id = $12, target_repo_path = $13, completion_action = $14, updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(id) .bind(&name) .bind(&description) .bind(&plan) .bind(&status) .bind(priority) .bind(&progress_summary) .bind(&last_output) .bind(&error_message) .bind(&merge_mode) .bind(&pr_url) .bind(daemon_id) .bind(&target_repo_path) .bind(&completion_action) .fetch_optional(pool) .await? }; // If versioned update returned None, there was a race condition if result.is_none() && req.version.is_some() { if let Some(current) = get_task(pool, id).await? { return Err(RepositoryError::VersionConflict { expected: req.version.unwrap(), actual: current.version, }); } } Ok(result) } /// Delete a task by ID. pub async fn delete_task(pool: &PgPool, id: Uuid) -> Result { let result = sqlx::query( r#" DELETE FROM tasks WHERE id = $1 "#, ) .bind(id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Count total tasks. pub async fn count_tasks(pool: &PgPool) -> Result { let result: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM tasks WHERE parent_task_id IS NULL", ) .fetch_one(pool) .await?; Ok(result.0) } // ============================================================================= // Owner-Scoped Task Functions // ============================================================================= /// Create a new task for a specific owner. pub async fn create_task_for_owner( pool: &PgPool, owner_id: Uuid, req: CreateTaskRequest, ) -> Result { // Calculate depth + inherit settings from parent if applicable. let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = if let Some(parent_id) = req.parent_task_id { let parent = get_task_for_owner(pool, parent_id, owner_id).await? .ok_or_else(|| sqlx::Error::RowNotFound)?; let new_depth = parent.depth + 1; if new_depth >= 2 { return Err(sqlx::Error::Protocol(format!( "Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.", new_depth ))); } let repo_url = req.repository_url.clone().or(parent.repository_url); let base_branch = req.base_branch.clone().or(parent.base_branch); let target_branch = req.target_branch.clone().or(parent.target_branch); let merge_mode = req.merge_mode.clone().or(parent.merge_mode); let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path); let completion_action = req.completion_action.clone(); (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) } else { ( 0, req.repository_url.clone(), req.base_branch.clone(), req.target_branch.clone(), req.merge_mode.clone(), req.target_repo_path.clone(), req.completion_action.clone(), ) }; let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default()); // Resolve directive_document_id from the directive's currently- // active contract row (directive_documents table) so the task // lands under the right tasks/ subfolder in the sidebar. Failures // are non-fatal — the task is created with NULL document_id and // the sidebar tolerates that. let directive_document_id = match req.directive_id { Some(directive_id) => resolve_active_document_for_directive(pool, directive_id) .await .unwrap_or(None), None => None, }; sqlx::query_as::<_, Task>( r#" INSERT INTO tasks ( owner_id, parent_task_id, depth, name, description, plan, priority, repository_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action, continue_from_task_id, copy_files, branched_from_task_id, conversation_state, directive_id, directive_step_id, directive_document_id ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20) RETURNING * "#, ) .bind(owner_id) .bind(req.parent_task_id) .bind(depth) .bind(&req.name) .bind(&req.description) .bind(&req.plan) .bind(req.priority) .bind(&repo_url) .bind(&base_branch) .bind(&target_branch) .bind(&merge_mode) .bind(&target_repo_path) .bind(&completion_action) .bind(&req.continue_from_task_id) .bind(©_files_json) .bind(&req.branched_from_task_id) .bind(&req.conversation_history) .bind(&req.directive_id) .bind(&req.directive_step_id) .bind(&directive_document_id) .fetch_one(pool) .await } /// Pick the directive's "current" document for tasks/steps to attach to. /// /// Selection rule, in order of preference: /// 1. The most recently `updated_at` document with `status = 'active'`. /// 2. If no active doc exists, the most recently `updated_at` document /// with `status = 'draft'` — covers the case where a fresh draft was /// auto-created post-ship and the orchestrator is now spawning tasks /// against it before the user has even touched it. /// 3. None — directive has no documents at all. /// /// Returning `Ok(None)` is fine and expected (e.g., directives that pre-date /// the document model on a fresh DB, or directives whose only doc is shipped /// + no fresh draft exists yet). The task/step is then stored with /// `directive_document_id = NULL`, which the sidebar already tolerates. async fn resolve_active_document_for_directive( pool: &PgPool, directive_id: Uuid, ) -> Result, sqlx::Error> { let row: Option<(Uuid,)> = sqlx::query_as( r#" SELECT id FROM directive_documents WHERE directive_id = $1 AND status IN ('active', 'draft') ORDER BY CASE status WHEN 'active' THEN 0 WHEN 'draft' THEN 1 ELSE 2 END, updated_at DESC LIMIT 1 "#, ) .bind(directive_id) .fetch_optional(pool) .await?; Ok(row.map(|r| r.0)) } /// Get a task by ID, scoped to owner. pub async fn get_task_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" SELECT * FROM tasks WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await } /// List all top-level tasks (no parent) for an owner, ordered by created_at DESC. /// Hidden tasks are excluded by default. pub async fn list_tasks_for_owner( pool: &PgPool, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t WHERE t.owner_id = $1 AND t.parent_task_id IS NULL AND COALESCE(t.hidden, false) = false ORDER BY t.priority DESC, t.created_at DESC "#, ) .bind(owner_id) .fetch_all(pool) .await } // ============================================================================= // Tmp directive — per-owner scratchpad // ============================================================================= /// Get the owner's tmp directive, creating it on the fly if absent. Idempotent /// thanks to the partial unique index on (owner_id) WHERE is_tmp. /// /// We try an INSERT first with ON CONFLICT DO NOTHING; if a row was inserted /// it's returned, otherwise we fall back to a SELECT for the row some other /// request just created (or one that already existed). pub async fn get_or_create_tmp_directive( pool: &PgPool, owner_id: Uuid, ) -> Result { // Try insert first. RETURNING fires only if a row was actually written; // if the partial unique index trips (a tmp directive already exists) // we get None and fall through to the SELECT. let inserted = sqlx::query_as::<_, Directive>( r#" INSERT INTO directives (owner_id, title, goal, status, reconcile_mode, is_tmp) VALUES ($1, 'tmp', '', 'idle', 'auto', true) ON CONFLICT DO NOTHING RETURNING * "#, ) .bind(owner_id) .fetch_optional(pool) .await?; if let Some(d) = inserted { return Ok(d); } // Pre-existing or just-created-by-someone-else: fetch. sqlx::query_as::<_, Directive>( r#"SELECT * FROM directives WHERE owner_id = $1 AND is_tmp = true LIMIT 1"#, ) .bind(owner_id) .fetch_one(pool) .await } /// Find every tmp directive (across owners). Used by the 30-day expiry /// sweep — we need to know which directives are scratchpads so we know /// which tasks to age out. pub async fn list_all_tmp_directives( pool: &PgPool, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Directive>( r#"SELECT * FROM directives WHERE is_tmp = true"#, ) .fetch_all(pool) .await } /// Delete tasks attached to a tmp directive that are older than 30 days. /// Returns the number of rows deleted (informational; we log it). /// /// We only sweep top-level tasks (parent_task_id IS NULL) — subtasks die /// when their parent dies via the FK cascade. pub async fn delete_expired_tmp_tasks( pool: &PgPool, tmp_directive_id: Uuid, ) -> Result { let result = sqlx::query( r#" DELETE FROM tasks WHERE directive_id = $1 AND parent_task_id IS NULL AND created_at < NOW() - INTERVAL '30 days' "#, ) .bind(tmp_directive_id) .execute(pool) .await?; Ok(result.rows_affected()) } /// List ephemeral tasks attached to a directive — tasks with `directive_id` /// set but no `directive_step_id`. These are the "spinoff" tasks the user /// created via the directive folder context menu, distinct from /// step-spawned execution tasks. Hidden tasks excluded. pub async fn list_ephemeral_directive_tasks_for_owner( pool: &PgPool, owner_id: Uuid, directive_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t WHERE t.owner_id = $1 AND t.directive_id = $2 AND t.directive_step_id IS NULL AND t.parent_task_id IS NULL AND COALESCE(t.hidden, false) = false ORDER BY t.created_at DESC "#, ) .bind(owner_id) .bind(directive_id) .fetch_all(pool) .await } /// List top-level tasks attached to the owner's tmp directive. These are /// the scratchpad / orphan tasks surfaced under the sidebar's `tmp/` /// folder. Auto-creates the tmp directive if it doesn't exist yet so the /// caller never has to handle "no tmp directive". pub async fn list_tmp_tasks_for_owner( pool: &PgPool, owner_id: Uuid, ) -> Result, sqlx::Error> { let tmp = get_or_create_tmp_directive(pool, owner_id).await?; sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t WHERE t.owner_id = $1 AND t.directive_id = $2 AND t.parent_task_id IS NULL AND COALESCE(t.hidden, false) = false ORDER BY t.priority DESC, t.created_at DESC "#, ) .bind(owner_id) .bind(tmp.id) .fetch_all(pool) .await } /// List subtasks of a parent task, scoped to owner. pub async fn list_subtasks_for_owner( pool: &PgPool, parent_id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t WHERE t.owner_id = $1 AND t.parent_task_id = $2 ORDER BY t.priority DESC, t.created_at DESC "#, ) .bind(owner_id) .bind(parent_id) .fetch_all(pool) .await } /// Update a task by ID with optimistic locking, scoped to owner. pub async fn update_task_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, req: UpdateTaskRequest, ) -> Result, RepositoryError> { // Get the existing task first (scoped to owner) let existing = get_task_for_owner(pool, id, owner_id).await?; let Some(existing) = existing else { return Ok(None); }; // Check version if provided (optimistic locking) if let Some(expected_version) = req.version { if existing.version != expected_version { return Err(RepositoryError::VersionConflict { expected: expected_version, actual: existing.version, }); } } // Apply updates let name = req.name.unwrap_or(existing.name); let description = req.description.or(existing.description); let plan = req.plan.unwrap_or(existing.plan); let status = req.status.unwrap_or(existing.status); let priority = req.priority.unwrap_or(existing.priority); let progress_summary = req.progress_summary.or(existing.progress_summary); let last_output = req.last_output.or(existing.last_output); let error_message = req.error_message.or(existing.error_message); let merge_mode = req.merge_mode.or(existing.merge_mode); let pr_url = req.pr_url.or(existing.pr_url); let repository_url = req.repository_url.or(existing.repository_url); let target_repo_path = req.target_repo_path.or(existing.target_repo_path); let completion_action = req.completion_action.or(existing.completion_action); let hidden = req.hidden.unwrap_or(existing.hidden); let daemon_id = if req.clear_daemon_id { None } else { req.daemon_id.or(existing.daemon_id) }; // Update with version check in WHERE clause for race condition safety let result = if req.version.is_some() { sqlx::query_as::<_, Task>( r#" UPDATE tasks SET name = $3, description = $4, plan = $5, status = $6, priority = $7, progress_summary = $8, last_output = $9, error_message = $10, merge_mode = $11, pr_url = $12, daemon_id = $13, target_repo_path = $14, completion_action = $15, repository_url = $16, hidden = $17, updated_at = NOW() WHERE id = $1 AND owner_id = $2 AND version = $18 RETURNING * "#, ) .bind(id) .bind(owner_id) .bind(&name) .bind(&description) .bind(&plan) .bind(&status) .bind(priority) .bind(&progress_summary) .bind(&last_output) .bind(&error_message) .bind(&merge_mode) .bind(&pr_url) .bind(daemon_id) .bind(&target_repo_path) .bind(&completion_action) .bind(&repository_url) .bind(hidden) .bind(req.version.unwrap()) .fetch_optional(pool) .await? } else { sqlx::query_as::<_, Task>( r#" UPDATE tasks SET name = $3, description = $4, plan = $5, status = $6, priority = $7, progress_summary = $8, last_output = $9, error_message = $10, merge_mode = $11, pr_url = $12, daemon_id = $13, target_repo_path = $14, completion_action = $15, repository_url = $16, hidden = $17, updated_at = NOW() WHERE id = $1 AND owner_id = $2 RETURNING * "#, ) .bind(id) .bind(owner_id) .bind(&name) .bind(&description) .bind(&plan) .bind(&status) .bind(priority) .bind(&progress_summary) .bind(&last_output) .bind(&error_message) .bind(&merge_mode) .bind(&pr_url) .bind(daemon_id) .bind(&target_repo_path) .bind(&completion_action) .bind(&repository_url) .bind(hidden) .fetch_optional(pool) .await? }; // If versioned update returned None, there was a race condition if result.is_none() && req.version.is_some() { if let Some(current) = get_task_for_owner(pool, id, owner_id).await? { return Err(RepositoryError::VersionConflict { expected: req.version.unwrap(), actual: current.version, }); } } Ok(result) } /// Delete a task by ID, scoped to owner. pub async fn delete_task_for_owner( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result { let result = sqlx::query( r#" DELETE FROM tasks WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Update task status and record event. pub async fn update_task_status( pool: &PgPool, id: Uuid, new_status: &str, event_data: Option, ) -> Result, sqlx::Error> { // Get existing status let existing = get_task(pool, id).await?; let Some(existing) = existing else { return Ok(None); }; let previous_status = existing.status.clone(); // Update task status let task = sqlx::query_as::<_, Task>( r#" UPDATE tasks SET status = $2, updated_at = NOW(), started_at = CASE WHEN $2 = 'running' AND started_at IS NULL THEN NOW() ELSE started_at END, completed_at = CASE WHEN $2 IN ('done', 'failed', 'merged') THEN NOW() ELSE completed_at END WHERE id = $1 RETURNING * "#, ) .bind(id) .bind(new_status) .fetch_optional(pool) .await?; // Record event if task.is_some() { let _ = create_task_event( pool, id, "status_change", Some(&previous_status), Some(new_status), event_data, ) .await; } Ok(task) } // ============================================================================= // Task Event Functions // ============================================================================= /// Create a task event. pub async fn create_task_event( pool: &PgPool, task_id: Uuid, event_type: &str, previous_status: Option<&str>, new_status: Option<&str>, event_data: Option, ) -> Result { sqlx::query_as::<_, TaskEvent>( r#" INSERT INTO task_events (task_id, event_type, previous_status, new_status, event_data) VALUES ($1, $2, $3, $4, $5) RETURNING * "#, ) .bind(task_id) .bind(event_type) .bind(previous_status) .bind(new_status) .bind(event_data) .fetch_one(pool) .await } /// List events for a task. pub async fn list_task_events( pool: &PgPool, task_id: Uuid, limit: Option, ) -> Result, sqlx::Error> { let limit = limit.unwrap_or(100); sqlx::query_as::<_, TaskEvent>( r#" SELECT * FROM task_events WHERE task_id = $1 ORDER BY created_at DESC LIMIT $2 "#, ) .bind(task_id) .bind(limit) .fetch_all(pool) .await } // ============================================================================= // Daemon Functions // ============================================================================= /// Register a new daemon connection. pub async fn register_daemon( pool: &PgPool, owner_id: Uuid, connection_id: &str, hostname: Option<&str>, machine_id: Option<&str>, max_concurrent_tasks: i32, ) -> Result { sqlx::query_as::<_, Daemon>( r#" INSERT INTO daemons (owner_id, connection_id, hostname, machine_id, max_concurrent_tasks) VALUES ($1, $2, $3, $4, $5) RETURNING * "#, ) .bind(owner_id) .bind(connection_id) .bind(hostname) .bind(machine_id) .bind(max_concurrent_tasks) .fetch_one(pool) .await } /// Get a daemon by ID. pub async fn get_daemon(pool: &PgPool, id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, Daemon>( r#" SELECT * FROM daemons WHERE id = $1 "#, ) .bind(id) .fetch_optional(pool) .await } /// Get a daemon by connection ID. pub async fn get_daemon_by_connection( pool: &PgPool, connection_id: &str, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Daemon>( r#" SELECT * FROM daemons WHERE connection_id = $1 "#, ) .bind(connection_id) .fetch_optional(pool) .await } /// List all daemons. pub async fn list_daemons(pool: &PgPool) -> Result, sqlx::Error> { sqlx::query_as::<_, Daemon>( r#" SELECT * FROM daemons ORDER BY connected_at DESC "#, ) .fetch_all(pool) .await } /// List daemons for a specific owner. pub async fn list_daemons_for_owner(pool: &PgPool, owner_id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, Daemon>( r#" SELECT * FROM daemons WHERE owner_id = $1 ORDER BY connected_at DESC "#, ) .bind(owner_id) .fetch_all(pool) .await } /// Get a daemon by ID for a specific owner. pub async fn get_daemon_for_owner(pool: &PgPool, id: Uuid, owner_id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, Daemon>( r#" SELECT * FROM daemons WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await } /// Update daemon heartbeat. pub async fn update_daemon_heartbeat(pool: &PgPool, id: Uuid) -> Result { let result = sqlx::query( r#" UPDATE daemons SET last_heartbeat_at = NOW(), status = 'connected' WHERE id = $1 "#, ) .bind(id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Update daemon status. pub async fn update_daemon_status( pool: &PgPool, id: Uuid, status: &str, ) -> Result { let result = sqlx::query( r#" UPDATE daemons SET status = $2, disconnected_at = CASE WHEN $2 = 'disconnected' THEN NOW() ELSE disconnected_at END WHERE id = $1 "#, ) .bind(id) .bind(status) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Mark daemon as disconnected by connection_id. pub async fn disconnect_daemon_by_connection( pool: &PgPool, connection_id: &str, ) -> Result { let result = sqlx::query( r#" UPDATE daemons SET status = 'disconnected', disconnected_at = NOW() WHERE connection_id = $1 "#, ) .bind(connection_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Update daemon task count. pub async fn update_daemon_task_count( pool: &PgPool, id: Uuid, delta: i32, ) -> Result { let result = sqlx::query( r#" UPDATE daemons SET current_task_count = GREATEST(0, current_task_count + $2) WHERE id = $1 "#, ) .bind(id) .bind(delta) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Delete a daemon by ID. pub async fn delete_daemon(pool: &PgPool, id: Uuid) -> Result { let result = sqlx::query( r#" DELETE FROM daemons WHERE id = $1 "#, ) .bind(id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Delete a daemon by connection ID. pub async fn delete_daemon_by_connection( pool: &PgPool, connection_id: &str, ) -> Result { let result = sqlx::query( r#" DELETE FROM daemons WHERE connection_id = $1 "#, ) .bind(connection_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Count connected daemons. pub async fn count_daemons(pool: &PgPool) -> Result { let result: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM daemons WHERE status = 'connected'", ) .fetch_one(pool) .await?; Ok(result.0) } /// Delete stale daemons that haven't sent a heartbeat within the timeout. /// Returns the number of deleted daemons. pub async fn delete_stale_daemons( pool: &PgPool, timeout_seconds: i64, ) -> Result { let result = sqlx::query( r#" DELETE FROM daemons WHERE last_heartbeat_at < NOW() - INTERVAL '1 second' * $1 "#, ) .bind(timeout_seconds) .execute(pool) .await?; Ok(result.rows_affected()) } // ============================================================================= // Sibling Awareness Functions // ============================================================================= /// List sibling tasks (tasks with the same parent, excluding the given task). pub async fn list_sibling_tasks( pool: &PgPool, task_id: Uuid, parent_id: Option, ) -> Result, sqlx::Error> { match parent_id { Some(parent) => { sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t WHERE t.parent_task_id = $1 AND t.id != $2 ORDER BY t.priority DESC, t.created_at DESC "#, ) .bind(parent) .bind(task_id) .fetch_all(pool) .await } None => { // Top-level tasks (no parent) - siblings are other top-level tasks sqlx::query_as::<_, TaskSummary>( r#" SELECT t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t WHERE t.parent_task_id IS NULL AND t.id != $1 ORDER BY t.priority DESC, t.created_at DESC "#, ) .bind(task_id) .fetch_all(pool) .await } } } /// Get running sibling tasks (for context injection). pub async fn get_running_siblings( pool: &PgPool, owner_id: Uuid, task_id: Uuid, parent_id: Option, ) -> Result, sqlx::Error> { match parent_id { Some(parent) => { sqlx::query_as::<_, Task>( r#" SELECT * FROM tasks t WHERE t.owner_id = $1 AND t.parent_task_id = $2 AND t.id != $3 AND t.status = 'running' ORDER BY t.priority DESC "#, ) .bind(owner_id) .bind(parent) .bind(task_id) .fetch_all(pool) .await } None => { sqlx::query_as::<_, Task>( r#" SELECT * FROM tasks t WHERE t.owner_id = $1 AND t.parent_task_id IS NULL AND t.id != $2 AND t.status = 'running' ORDER BY t.priority DESC "#, ) .bind(owner_id) .bind(task_id) .fetch_all(pool) .await } } } /// Get task with its siblings for context awareness. pub async fn get_task_with_siblings( pool: &PgPool, id: Uuid, ) -> Result)>, sqlx::Error> { let task = get_task(pool, id).await?; let Some(task) = task else { return Ok(None); }; let siblings = list_sibling_tasks(pool, id, task.parent_task_id).await?; Ok(Some((task, siblings))) } // ============================================================================= // Task Output Persistence Functions // ============================================================================= /// Save task output to the database. /// This stores output in the task_events table with event_type='output'. pub async fn save_task_output( pool: &PgPool, task_id: Uuid, message_type: &str, content: &str, tool_name: Option<&str>, tool_input: Option, is_error: Option, cost_usd: Option, duration_ms: Option, ) -> Result { let event_data = serde_json::json!({ "messageType": message_type, "content": content, "toolName": tool_name, "toolInput": tool_input, "isError": is_error, "costUsd": cost_usd, "durationMs": duration_ms, }); create_task_event(pool, task_id, "output", None, None, Some(event_data)).await } /// Get task output from the database. /// Retrieves all output events for a task, ordered by creation time. pub async fn get_task_output( pool: &PgPool, task_id: Uuid, limit: Option, ) -> Result, sqlx::Error> { let limit = limit.unwrap_or(1000); sqlx::query_as::<_, TaskEvent>( r#" SELECT * FROM task_events WHERE task_id = $1 AND event_type = 'output' ORDER BY created_at ASC LIMIT $2 "#, ) .bind(task_id) .bind(limit) .fetch_all(pool) .await } /// Update task completion status with error message. /// Sets the task status to 'done' or 'failed' and records completion time. pub async fn complete_task( pool: &PgPool, task_id: Uuid, success: bool, error_message: Option<&str>, ) -> Result, sqlx::Error> { let status = if success { "done" } else { "failed" }; let task = sqlx::query_as::<_, Task>( r#" UPDATE tasks SET status = $2, error_message = COALESCE($3, error_message), completed_at = NOW(), updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(task_id) .bind(status) .bind(error_message) .fetch_optional(pool) .await?; // Record completion event if task.is_some() { let event_data = serde_json::json!({ "success": success, "errorMessage": error_message, }); let _ = create_task_event( pool, task_id, "complete", Some("running"), Some(status), Some(event_data), ) .await; } Ok(task) } /// Get task tree from a specific root task. pub async fn get_task_tree(pool: &PgPool, root_task_id: Uuid) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" WITH RECURSIVE task_tree AS ( -- Base case: the root task SELECT * FROM tasks WHERE id = $1 UNION ALL -- Recursive case: children of current level SELECT t.* FROM tasks t JOIN task_tree tt ON t.parent_task_id = tt.id ) SELECT * FROM task_tree ORDER BY depth, created_at "#, ) .bind(root_task_id) .fetch_all(pool) .await } // ============================================================================ // Task Checkpoints // ============================================================================ /// Create a checkpoint for a task. pub async fn create_task_checkpoint( pool: &PgPool, task_id: Uuid, commit_sha: &str, branch_name: &str, message: &str, files_changed: Option, lines_added: Option, lines_removed: Option, ) -> Result { // Get current checkpoint count and increment let checkpoint_number: i32 = sqlx::query_scalar( "SELECT COALESCE(MAX(checkpoint_number), 0) + 1 FROM task_checkpoints WHERE task_id = $1", ) .bind(task_id) .fetch_one(pool) .await?; // Update task's checkpoint tracking sqlx::query( r#" UPDATE tasks SET last_checkpoint_sha = $1, checkpoint_count = $2, checkpoint_message = $3, updated_at = NOW() WHERE id = $4 "#, ) .bind(commit_sha) .bind(checkpoint_number) .bind(message) .bind(task_id) .execute(pool) .await?; sqlx::query_as::<_, TaskCheckpoint>( r#" INSERT INTO task_checkpoints ( task_id, checkpoint_number, commit_sha, branch_name, message, files_changed, lines_added, lines_removed ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING * "#, ) .bind(task_id) .bind(checkpoint_number) .bind(commit_sha) .bind(branch_name) .bind(message) .bind(files_changed) .bind(lines_added) .bind(lines_removed) .fetch_one(pool) .await } /// Get a checkpoint by ID. pub async fn get_task_checkpoint( pool: &PgPool, id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskCheckpoint>("SELECT * FROM task_checkpoints WHERE id = $1") .bind(id) .fetch_optional(pool) .await } /// Get a checkpoint by commit SHA. pub async fn get_task_checkpoint_by_sha( pool: &PgPool, commit_sha: &str, ) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskCheckpoint>("SELECT * FROM task_checkpoints WHERE commit_sha = $1") .bind(commit_sha) .fetch_optional(pool) .await } /// List checkpoints for a task. pub async fn list_task_checkpoints( pool: &PgPool, task_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, TaskCheckpoint>( "SELECT * FROM task_checkpoints WHERE task_id = $1 ORDER BY checkpoint_number DESC", ) .bind(task_id) .fetch_all(pool) .await } // ============================================================================ // Daemon Selection // ============================================================================ /// Get daemons with capacity info for selection. pub async fn get_available_daemons( pool: &PgPool, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DaemonWithCapacity>( r#" SELECT id, owner_id, connection_id, hostname, machine_id, max_concurrent_tasks, current_task_count, capacity_score, task_queue_length, supports_migration, status, last_heartbeat_at, connected_at FROM daemons WHERE owner_id = $1 AND status = 'connected' ORDER BY COALESCE(capacity_score, 100) DESC, (max_concurrent_tasks - current_task_count) DESC, COALESCE(task_queue_length, 0) ASC "#, ) .bind(owner_id) .fetch_all(pool) .await } /// Get daemons with capacity info for selection, excluding specified daemon IDs. /// Used for task retry to avoid reassigning to daemons that have already failed. pub async fn get_available_daemons_excluding( pool: &PgPool, owner_id: Uuid, exclude_daemon_ids: &[Uuid], ) -> Result, sqlx::Error> { sqlx::query_as::<_, DaemonWithCapacity>( r#" SELECT id, owner_id, connection_id, hostname, machine_id, max_concurrent_tasks, current_task_count, capacity_score, task_queue_length, supports_migration, status, last_heartbeat_at, connected_at FROM daemons WHERE owner_id = $1 AND status = 'connected' AND id != ALL($2) ORDER BY COALESCE(capacity_score, 100) DESC, (max_concurrent_tasks - current_task_count) DESC, COALESCE(task_queue_length, 0) ASC "#, ) .bind(owner_id) .bind(exclude_daemon_ids) .fetch_all(pool) .await } /// Create a daemon task assignment. pub async fn create_daemon_task_assignment( pool: &PgPool, daemon_id: Uuid, task_id: Uuid, ) -> Result { sqlx::query_as::<_, DaemonTaskAssignment>( r#" INSERT INTO daemon_task_assignments (daemon_id, task_id) VALUES ($1, $2) RETURNING * "#, ) .bind(daemon_id) .bind(task_id) .fetch_one(pool) .await } /// Update daemon task assignment status. pub async fn update_daemon_task_assignment_status( pool: &PgPool, task_id: Uuid, status: &str, ) -> Result { sqlx::query_as::<_, DaemonTaskAssignment>( r#" UPDATE daemon_task_assignments SET status = $1 WHERE task_id = $2 RETURNING * "#, ) .bind(status) .bind(task_id) .fetch_one(pool) .await } /// Get daemon task assignment for a task. pub async fn get_daemon_task_assignment( pool: &PgPool, task_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DaemonTaskAssignment>( "SELECT * FROM daemon_task_assignments WHERE task_id = $1", ) .bind(task_id) .fetch_optional(pool) .await } // ============================================================================ // Repository History Functions // ============================================================================ use super::models::RepositoryHistoryEntry; /// List all repository history entries for an owner, ordered by use_count DESC, last_used_at DESC. pub async fn list_repository_history_for_owner( pool: &PgPool, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, RepositoryHistoryEntry>( r#" SELECT id, owner_id, name, repository_url, local_path, source_type, use_count, last_used_at, created_at FROM repository_history WHERE owner_id = $1 ORDER BY use_count DESC, last_used_at DESC "#, ) .bind(owner_id) .fetch_all(pool) .await } /// Get repository suggestions for an owner, optionally filtered by source type and query. pub async fn get_repository_suggestions( pool: &PgPool, owner_id: Uuid, source_type: Option<&str>, query: Option<&str>, limit: i32, ) -> Result, sqlx::Error> { // Build query dynamically based on filters let mut sql = String::from( r#" SELECT id, owner_id, name, repository_url, local_path, source_type, use_count, last_used_at, created_at FROM repository_history WHERE owner_id = $1 "#, ); let mut param_idx = 2; if source_type.is_some() { sql.push_str(&format!(" AND source_type = ${}", param_idx)); param_idx += 1; } if query.is_some() { sql.push_str(&format!( " AND (LOWER(name) LIKE ${} OR LOWER(COALESCE(repository_url, '')) LIKE ${} OR LOWER(COALESCE(local_path, '')) LIKE ${})", param_idx, param_idx, param_idx )); param_idx += 1; } sql.push_str(&format!( " ORDER BY use_count DESC, last_used_at DESC LIMIT ${}", param_idx )); // Build and execute query with the appropriate bindings let mut query_builder = sqlx::query_as::<_, RepositoryHistoryEntry>(&sql).bind(owner_id); if let Some(st) = source_type { query_builder = query_builder.bind(st); } if let Some(q) = query { let search_pattern = format!("%{}%", q.to_lowercase()); query_builder = query_builder.bind(search_pattern); } query_builder = query_builder.bind(limit); query_builder.fetch_all(pool).await } /// Add or update a repository history entry. /// If an entry with the same URL (for remote) or path (for local) already exists, /// increment use_count and update last_used_at and name. /// Otherwise, create a new entry. pub async fn add_or_update_repository_history( pool: &PgPool, owner_id: Uuid, name: &str, repository_url: Option<&str>, local_path: Option<&str>, source_type: &str, ) -> Result { // Use UPSERT (INSERT ... ON CONFLICT) if source_type == "remote" { let url = repository_url.ok_or_else(|| { sqlx::Error::Protocol("repository_url required for remote type".to_string()) })?; sqlx::query_as::<_, RepositoryHistoryEntry>( r#" INSERT INTO repository_history (owner_id, name, repository_url, local_path, source_type, use_count, last_used_at) VALUES ($1, $2, $3, NULL, $4, 1, NOW()) ON CONFLICT (owner_id, repository_url) WHERE source_type = 'remote' AND repository_url IS NOT NULL DO UPDATE SET name = EXCLUDED.name, use_count = repository_history.use_count + 1, last_used_at = NOW() RETURNING id, owner_id, name, repository_url, local_path, source_type, use_count, last_used_at, created_at "#, ) .bind(owner_id) .bind(name) .bind(url) .bind(source_type) .fetch_one(pool) .await } else if source_type == "local" { let path = local_path.ok_or_else(|| { sqlx::Error::Protocol("local_path required for local type".to_string()) })?; sqlx::query_as::<_, RepositoryHistoryEntry>( r#" INSERT INTO repository_history (owner_id, name, repository_url, local_path, source_type, use_count, last_used_at) VALUES ($1, $2, NULL, $3, $4, 1, NOW()) ON CONFLICT (owner_id, local_path) WHERE source_type = 'local' AND local_path IS NOT NULL DO UPDATE SET name = EXCLUDED.name, use_count = repository_history.use_count + 1, last_used_at = NOW() RETURNING id, owner_id, name, repository_url, local_path, source_type, use_count, last_used_at, created_at "#, ) .bind(owner_id) .bind(name) .bind(path) .bind(source_type) .fetch_one(pool) .await } else { Err(sqlx::Error::Protocol(format!( "Invalid source_type: {}", source_type ))) } } /// Delete a repository history entry. /// Returns true if an entry was deleted, false if not found. pub async fn delete_repository_history( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result { let result = sqlx::query( r#" DELETE FROM repository_history WHERE id = $1 AND owner_id = $2 "#, ) .bind(id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } // ============================================================================ // Conversation Snapshots // ============================================================================ /// Create a new conversation snapshot pub async fn create_conversation_snapshot( pool: &PgPool, task_id: Uuid, checkpoint_id: Option, snapshot_type: &str, message_count: i32, conversation_state: serde_json::Value, metadata: Option, ) -> Result { sqlx::query_as::<_, ConversationSnapshot>( r#" INSERT INTO conversation_snapshots (task_id, checkpoint_id, snapshot_type, message_count, conversation_state, metadata) VALUES ($1, $2, $3, $4, $5, $6) RETURNING * "# ) .bind(task_id) .bind(checkpoint_id) .bind(snapshot_type) .bind(message_count) .bind(conversation_state) .bind(metadata) .fetch_one(pool) .await } /// Get a conversation snapshot by ID pub async fn get_conversation_snapshot( pool: &PgPool, id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, ConversationSnapshot>( "SELECT * FROM conversation_snapshots WHERE id = $1" ) .bind(id) .fetch_optional(pool) .await } /// Get conversation snapshot at a specific checkpoint pub async fn get_conversation_at_checkpoint( pool: &PgPool, checkpoint_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, ConversationSnapshot>( "SELECT * FROM conversation_snapshots WHERE checkpoint_id = $1 ORDER BY created_at DESC LIMIT 1" ) .bind(checkpoint_id) .fetch_optional(pool) .await } /// List conversation snapshots for a task pub async fn list_conversation_snapshots( pool: &PgPool, task_id: Uuid, limit: Option, ) -> Result, sqlx::Error> { let limit = limit.unwrap_or(100); sqlx::query_as::<_, ConversationSnapshot>( "SELECT * FROM conversation_snapshots WHERE task_id = $1 ORDER BY created_at DESC LIMIT $2" ) .bind(task_id) .bind(limit) .fetch_all(pool) .await } /// Delete conversation snapshots older than retention period pub async fn cleanup_old_snapshots( pool: &PgPool, retention_days: i32, ) -> Result { let result = sqlx::query( "DELETE FROM conversation_snapshots WHERE created_at < NOW() - INTERVAL '1 day' * $1" ) .bind(retention_days) .execute(pool) .await?; Ok(result.rows_affected()) } // ============================================================================ // History Events // ============================================================================ /// Record a new history event #[allow(clippy::too_many_arguments)] pub async fn record_history_event( pool: &PgPool, owner_id: Uuid, task_id: Option, event_type: &str, event_subtype: Option<&str>, event_data: serde_json::Value, ) -> Result { sqlx::query_as::<_, HistoryEvent>( r#" INSERT INTO history_events (owner_id, task_id, event_type, event_subtype, event_data) VALUES ($1, $2, $3, $4, $5) RETURNING * "# ) .bind(owner_id) .bind(task_id) .bind(event_type) .bind(event_subtype) .bind(event_data) .fetch_one(pool) .await } /// Get task history pub async fn get_task_history( pool: &PgPool, task_id: Uuid, owner_id: Uuid, filters: &HistoryQueryFilters, ) -> Result<(Vec, i64), sqlx::Error> { let limit = filters.limit.unwrap_or(100); let events = sqlx::query_as::<_, HistoryEvent>( r#" SELECT * FROM history_events WHERE task_id = $1 AND owner_id = $2 ORDER BY created_at DESC LIMIT $3 "# ) .bind(task_id) .bind(owner_id) .bind(limit) .fetch_all(pool) .await?; let count: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM history_events WHERE task_id = $1 AND owner_id = $2" ) .bind(task_id) .bind(owner_id) .fetch_one(pool) .await?; Ok((events, count)) } /// Get unified timeline for an owner pub async fn get_timeline( pool: &PgPool, owner_id: Uuid, filters: &HistoryQueryFilters, ) -> Result<(Vec, i64), sqlx::Error> { let limit = filters.limit.unwrap_or(100); let events = sqlx::query_as::<_, HistoryEvent>( r#" SELECT * FROM history_events WHERE owner_id = $1 ORDER BY created_at DESC LIMIT $2 "# ) .bind(owner_id) .bind(limit) .fetch_all(pool) .await?; let count: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM history_events WHERE owner_id = $1" ) .bind(owner_id) .fetch_one(pool) .await?; Ok((events, count)) } // ============================================================================ // Task Conversation Retrieval // ============================================================================ // Helper struct for parsing task output events #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] struct TaskOutputEvent { message_type: String, content: Option, tool_name: Option, tool_input: Option, is_error: Option, cost_usd: Option, } /// Get task conversation messages (reconstructed from task_events) pub async fn get_task_conversation( pool: &PgPool, task_id: Uuid, include_tool_calls: bool, include_tool_results: bool, limit: Option, ) -> Result, sqlx::Error> { let limit = limit.unwrap_or(1000); // Get output events that represent conversation turns let events = sqlx::query_as::<_, TaskEvent>( r#" SELECT * FROM task_events WHERE task_id = $1 AND event_type = 'output' ORDER BY created_at ASC LIMIT $2 "# ) .bind(task_id) .bind(limit) .fetch_all(pool) .await?; // Convert task events to conversation messages let mut messages = Vec::new(); for event in events { if let Some(data) = event.event_data { // Parse the event data to extract message info if let Ok(output) = serde_json::from_value::(data.clone()) { let should_include = match output.message_type.as_str() { "tool_use" => include_tool_calls, "tool_result" => include_tool_results, _ => true, }; if should_include { messages.push(ConversationMessage { id: event.id.to_string(), role: match output.message_type.as_str() { "assistant" => "assistant".to_string(), "tool_use" => "assistant".to_string(), "tool_result" => "tool".to_string(), "system" => "system".to_string(), "error" => "system".to_string(), _ => "user".to_string(), }, content: output.content.unwrap_or_default(), timestamp: event.created_at, tool_calls: None, tool_name: output.tool_name, tool_input: output.tool_input, tool_result: None, is_error: output.is_error, token_count: None, cost_usd: output.cost_usd.map(|c| c as f64), }); } } } } Ok(messages) } // ============================================================================= // Anonymous Task Cleanup Functions // ============================================================================= /// Delete stale anonymous tasks (tasks with contract_id = NULL) that: /// - Are in a terminal state (done, failed, merged) /// - Are older than the specified number of days /// /// Returns the number of deleted tasks. pub async fn cleanup_stale_anonymous_tasks( pool: &PgPool, max_age_days: i32, ) -> Result { let result = sqlx::query( r#" DELETE FROM tasks WHERE contract_id IS NULL AND status IN ('done', 'failed', 'merged') AND created_at < NOW() - INTERVAL '1 day' * $1 "#, ) .bind(max_age_days) .execute(pool) .await?; Ok(result.rows_affected() as i64) } // ============================================================================ // Checkpoint Patches (for task recovery) // ============================================================================ /// Create a checkpoint patch for task recovery. pub async fn create_checkpoint_patch( pool: &PgPool, task_id: Uuid, checkpoint_id: Option, base_commit_sha: &str, patch_data: &[u8], files_count: i32, ttl_hours: i64, ) -> Result { sqlx::query_as::<_, CheckpointPatch>( r#" INSERT INTO checkpoint_patches ( task_id, checkpoint_id, base_commit_sha, patch_data, patch_size_bytes, files_count, expires_at ) VALUES ($1, $2, $3, $4, $5, $6, NOW() + INTERVAL '1 hour' * $7) RETURNING * "#, ) .bind(task_id) .bind(checkpoint_id) .bind(base_commit_sha) .bind(patch_data) .bind(patch_data.len() as i32) .bind(files_count) .bind(ttl_hours) .fetch_one(pool) .await } /// Get the latest checkpoint patch for a task. pub async fn get_latest_checkpoint_patch( pool: &PgPool, task_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, CheckpointPatch>( r#" SELECT * FROM checkpoint_patches WHERE task_id = $1 AND expires_at > NOW() ORDER BY created_at DESC LIMIT 1 "#, ) .bind(task_id) .fetch_optional(pool) .await } /// Get a checkpoint patch by ID. pub async fn get_checkpoint_patch( pool: &PgPool, id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, CheckpointPatch>( "SELECT * FROM checkpoint_patches WHERE id = $1", ) .bind(id) .fetch_optional(pool) .await } /// List all checkpoint patches for a task (without patch data for efficiency). pub async fn list_checkpoint_patches( pool: &PgPool, task_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, CheckpointPatchInfo>( r#" SELECT id, task_id, checkpoint_id, base_commit_sha, patch_size_bytes, files_count, created_at, expires_at FROM checkpoint_patches WHERE task_id = $1 ORDER BY created_at DESC "#, ) .bind(task_id) .fetch_all(pool) .await } /// Delete expired checkpoint patches. /// Returns the number of deleted patches. pub async fn cleanup_expired_checkpoint_patches( pool: &PgPool, ) -> Result { let result = sqlx::query("DELETE FROM checkpoint_patches WHERE expires_at < NOW()") .execute(pool) .await?; Ok(result.rows_affected() as i64) } /// Delete all checkpoint patches for a task. pub async fn delete_checkpoint_patches_for_task( pool: &PgPool, task_id: Uuid, ) -> Result { let result = sqlx::query("DELETE FROM checkpoint_patches WHERE task_id = $1") .bind(task_id) .execute(pool) .await?; Ok(result.rows_affected() as i64) } // ============================================================================= // Directive CRUD // ============================================================================= /// Create a new directive for an owner. /// /// If `req.contract_body` is set, also auto-creates a first contract /// with that body so the directive is immediately ready to start. Both /// inserts run in the same transaction. pub async fn create_directive_for_owner( pool: &PgPool, owner_id: Uuid, req: CreateDirectiveRequest, ) -> Result { let mut tx = pool.begin().await?; let directive = sqlx::query_as::<_, Directive>( r#" INSERT INTO directives (owner_id, title, repository_url, local_path, base_branch, reconcile_mode) VALUES ($1, $2, $3, $4, $5, $6) RETURNING * "#, ) .bind(owner_id) .bind(&req.title) .bind(&req.repository_url) .bind(&req.local_path) .bind(&req.base_branch) .bind(req.reconcile_mode.as_deref().unwrap_or("auto")) .fetch_one(&mut *tx) .await?; if let Some(body) = &req.contract_body { sqlx::query( r#" INSERT INTO directive_documents (directive_id, title, body, status, position) VALUES ($1, '', $2, 'draft', 0) "#, ) .bind(directive.id) .bind(body) .execute(&mut *tx) .await?; } tx.commit().await?; Ok(directive) } /// Resolve the body of the directive's "current spec" — the active /// contract's body, falling back to the most-recently-updated draft if /// none is active. Returns empty string when the directive has no /// usable contracts (orchestrator should refuse to spawn in that case). pub async fn get_active_contract_body( pool: &PgPool, directive_id: Uuid, ) -> Result { let row: Option<(String,)> = sqlx::query_as( r#" SELECT body FROM directive_documents WHERE directive_id = $1 AND status IN ('active', 'queued', 'draft') ORDER BY CASE status WHEN 'active' THEN 0 WHEN 'queued' THEN 1 WHEN 'draft' THEN 2 ELSE 3 END, updated_at DESC LIMIT 1 "#, ) .bind(directive_id) .fetch_optional(pool) .await?; Ok(row.map(|r| r.0).unwrap_or_default()) } /// Get a single directive for an owner. pub async fn get_directive_for_owner( pool: &PgPool, owner_id: Uuid, id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Directive>( r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await } /// Get a directive without an owner scope check. /// /// Used by background orchestration code that has already established the /// directive identity through other means (e.g. it just received the /// directive_id from a different already-authorized query). HTTP handlers /// must continue to use `get_directive_for_owner` to enforce isolation. pub async fn get_directive( pool: &PgPool, id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Directive>(r#"SELECT * FROM directives WHERE id = $1"#) .bind(id) .fetch_optional(pool) .await } /// Get a directive with all its steps. pub async fn get_directive_with_steps_for_owner( pool: &PgPool, owner_id: Uuid, id: Uuid, ) -> Result)>, sqlx::Error> { let directive = sqlx::query_as::<_, Directive>( r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await?; match directive { Some(d) => { let steps = list_directive_steps(pool, d.id).await?; Ok(Some((d, steps))) } None => Ok(None), } } /// List all directives for an owner with step counts. Excludes the per-owner /// tmp directive (the scratchpad surface; surfaced via the sidebar's /// dedicated `tmp/` folder, not the regular directive list). pub async fn list_directives_for_owner( pool: &PgPool, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DirectiveSummary>( r#" SELECT d.id, d.owner_id, d.title, d.status, d.repository_url, d.orchestrator_task_id, d.pr_url, d.completion_task_id, d.reconcile_mode, d.version, d.created_at, d.updated_at, COALESCE(s.total_steps, 0) as total_steps, COALESCE(s.completed_steps, 0) as completed_steps, COALESCE(s.running_steps, 0) as running_steps, COALESCE(s.failed_steps, 0) as failed_steps FROM directives d LEFT JOIN LATERAL ( SELECT COUNT(*) as total_steps, COUNT(*) FILTER (WHERE status = 'completed') as completed_steps, COUNT(*) FILTER (WHERE status = 'running') as running_steps, COUNT(*) FILTER (WHERE status = 'failed') as failed_steps FROM directive_steps WHERE directive_id = d.id ) s ON true WHERE d.owner_id = $1 AND d.is_tmp = false ORDER BY d.created_at DESC "#, ) .bind(owner_id) .fetch_all(pool) .await } /// Update a directive with optimistic locking. pub async fn update_directive_for_owner( pool: &PgPool, owner_id: Uuid, id: Uuid, req: UpdateDirectiveRequest, ) -> Result, RepositoryError> { let current = sqlx::query_as::<_, Directive>( r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await .map_err(RepositoryError::Database)?; let current = match current { Some(c) => c, None => return Ok(None), }; if let Some(expected_version) = req.version { if expected_version != current.version { return Err(RepositoryError::VersionConflict { expected: expected_version, actual: current.version, }); } } let title = req.title.as_deref().unwrap_or(¤t.title); let status = req.status.as_deref().unwrap_or(¤t.status); let repository_url = req.repository_url.as_deref().or(current.repository_url.as_deref()); let local_path = req.local_path.as_deref().or(current.local_path.as_deref()); let base_branch = req.base_branch.as_deref().or(current.base_branch.as_deref()); let orchestrator_task_id = req.orchestrator_task_id.or(current.orchestrator_task_id); let pr_url = req.pr_url.as_deref().or(current.pr_url.as_deref()); let pr_branch = req.pr_branch.as_deref().or(current.pr_branch.as_deref()); let reconcile_mode = req.reconcile_mode.clone().unwrap_or_else(|| current.reconcile_mode.clone()); let result = sqlx::query_as::<_, Directive>( r#" UPDATE directives SET title = $3, status = $4, repository_url = $5, local_path = $6, base_branch = $7, orchestrator_task_id = $8, pr_url = $9, pr_branch = $10, reconcile_mode = $11, version = version + 1, updated_at = NOW() WHERE id = $1 AND owner_id = $2 RETURNING * "#, ) .bind(id) .bind(owner_id) .bind(title) .bind(status) .bind(repository_url) .bind(local_path) .bind(base_branch) .bind(orchestrator_task_id) .bind(pr_url) .bind(pr_branch) .bind(reconcile_mode) .fetch_optional(pool) .await .map_err(RepositoryError::Database)?; Ok(result) } /// Delete a directive for an owner. pub async fn delete_directive_for_owner( pool: &PgPool, owner_id: Uuid, id: Uuid, ) -> Result { // Delete all tasks associated with this directive sqlx::query( r#"DELETE FROM tasks WHERE directive_id = $1 AND owner_id = $2"#, ) .bind(id) .bind(owner_id) .execute(pool) .await?; let result = sqlx::query( r#"DELETE FROM directives WHERE id = $1 AND owner_id = $2"#, ) .bind(id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Clean up terminal tasks associated with a directive. /// /// Deletes tasks in terminal states (completed, failed, merged, done, interrupted) /// that belong to this directive, excluding tasks currently referenced by /// `completion_task_id` or `orchestrator_task_id` on the directive. /// NULLs out `task_id` on directive_steps for deleted tasks. pub async fn cleanup_directive_tasks( pool: &PgPool, owner_id: Uuid, directive_id: Uuid, ) -> Result { // NULL out task_id on steps that reference terminal tasks we're about to delete sqlx::query( r#" UPDATE directive_steps SET task_id = NULL WHERE directive_id = $1 AND task_id IS NOT NULL AND task_id IN ( SELECT t.id FROM tasks t WHERE t.directive_id = $1 AND t.owner_id = $2 AND t.status IN ('completed', 'failed', 'merged', 'done', 'interrupted') AND t.id NOT IN ( SELECT COALESCE(d.completion_task_id, '00000000-0000-0000-0000-000000000000') FROM directives d WHERE d.id = $1 UNION SELECT COALESCE(d.orchestrator_task_id, '00000000-0000-0000-0000-000000000000') FROM directives d WHERE d.id = $1 ) ) "#, ) .bind(directive_id) .bind(owner_id) .execute(pool) .await?; // Delete terminal tasks not currently referenced by the directive let result = sqlx::query( r#" DELETE FROM tasks WHERE directive_id = $1 AND owner_id = $2 AND status IN ('completed', 'failed', 'merged', 'done', 'interrupted') AND id NOT IN ( SELECT COALESCE(d.completion_task_id, '00000000-0000-0000-0000-000000000000') FROM directives d WHERE d.id = $1 UNION SELECT COALESCE(d.orchestrator_task_id, '00000000-0000-0000-0000-000000000000') FROM directives d WHERE d.id = $1 ) "#, ) .bind(directive_id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() as i64) } // ============================================================================= // Directive Completion Helpers // ============================================================================= /// Row type for completed step tasks. #[derive(Debug, Clone, sqlx::FromRow)] pub struct CompletedStepTask { pub step_id: Uuid, pub step_name: String, pub task_id: Uuid, pub task_name: String, } /// Row type for directive completion task status check. #[derive(Debug, Clone, sqlx::FromRow)] pub struct DirectiveCompletionCheck { pub directive_id: Uuid, pub owner_id: Uuid, pub completion_task_id: Uuid, pub task_status: String, pub pr_url: Option, pub task_name: String, } /// Get idle directives that need a completion task spawned. /// Conditions: status = 'idle', no completion_task_id, has repository_url, /// and has at least one completed step with a task_id. pub async fn get_idle_directives_needing_completion( pool: &PgPool, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Directive>( r#" SELECT d.* FROM directives d WHERE d.status = 'idle' AND d.completion_task_id IS NULL AND d.pr_branch IS NULL AND d.repository_url IS NOT NULL AND EXISTS ( SELECT 1 FROM directive_steps ds WHERE ds.directive_id = d.id AND ds.status = 'completed' AND ds.task_id IS NOT NULL ) "#, ) .fetch_all(pool) .await } /// Get directives that attempted completion (pr_branch set) but have no PR URL yet /// and no active completion task. These need a verification task spawned. pub async fn get_directives_needing_verification( pool: &PgPool, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Directive>( r#" SELECT d.* FROM directives d WHERE d.status = 'idle' AND d.pr_branch IS NOT NULL AND d.pr_url IS NULL AND d.completion_task_id IS NULL AND d.repository_url IS NOT NULL "#, ) .fetch_all(pool) .await } /// Get directives with active completion tasks, joined with task status. pub async fn get_completion_tasks_to_check( pool: &PgPool, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DirectiveCompletionCheck>( r#" SELECT d.id as directive_id, d.owner_id, d.completion_task_id, t.status as task_status, d.pr_url, t.name as task_name FROM directives d JOIN tasks t ON t.id = d.completion_task_id WHERE d.completion_task_id IS NOT NULL "#, ) .fetch_all(pool) .await } /// Atomically claim a directive for completion by setting a placeholder completion_task_id. /// Returns true if the claim was successful (no other task already claimed it). pub async fn claim_directive_for_completion( pool: &PgPool, directive_id: Uuid, task_id: Uuid, ) -> Result { let result = sqlx::query( r#"UPDATE directives SET completion_task_id = $2, updated_at = NOW() WHERE id = $1 AND completion_task_id IS NULL"#, ) .bind(directive_id) .bind(task_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Assign a completion task to a directive (unconditional update). pub async fn assign_completion_task( pool: &PgPool, directive_id: Uuid, task_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( r#"UPDATE directives SET completion_task_id = $2, updated_at = NOW() WHERE id = $1"#, ) .bind(directive_id) .bind(task_id) .execute(pool) .await?; Ok(()) } /// Clear the completion task from a directive. pub async fn clear_completion_task( pool: &PgPool, directive_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( r#"UPDATE directives SET completion_task_id = NULL, updated_at = NOW() WHERE id = $1"#, ) .bind(directive_id) .execute(pool) .await?; Ok(()) } /// Get completed step tasks for a directive (steps that have completed with an assigned task). pub async fn get_completed_step_tasks( pool: &PgPool, directive_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, CompletedStepTask>( r#" SELECT ds.id as step_id, ds.name as step_name, ds.task_id, t.name as task_name FROM directive_steps ds JOIN tasks t ON t.id = ds.task_id WHERE ds.directive_id = $1 AND ds.status = 'completed' AND ds.task_id IS NOT NULL ORDER BY ds.order_index, ds.created_at "#, ) .bind(directive_id) .fetch_all(pool) .await } /// Get the task ID of the most recently completed step for a directive. /// Used as a fallback `continue_from_task_id` when dispatching new-generation steps /// that have no explicit dependencies and no PR branch to continue from. pub async fn get_last_completed_step_task_id( pool: &PgPool, directive_id: Uuid, ) -> Result, sqlx::Error> { let row: Option<(Uuid,)> = sqlx::query_as( r#" SELECT ds.task_id FROM directive_steps ds WHERE ds.directive_id = $1 AND ds.status = 'completed' AND ds.task_id IS NOT NULL ORDER BY ds.updated_at DESC LIMIT 1 "#, ) .bind(directive_id) .fetch_optional(pool) .await?; Ok(row.map(|r| r.0)) } // ============================================================================= // Directive Step CRUD // ============================================================================= /// Get a single directive step by ID. pub async fn get_directive_step( pool: &PgPool, step_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DirectiveStep>( r#"SELECT * FROM directive_steps WHERE id = $1"#, ) .bind(step_id) .fetch_optional(pool) .await } /// List all steps for a directive, ordered by order_index. pub async fn list_directive_steps( pool: &PgPool, directive_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DirectiveStep>( r#" SELECT * FROM directive_steps WHERE directive_id = $1 ORDER BY order_index, created_at "#, ) .bind(directive_id) .fetch_all(pool) .await } /// Create a single directive step. pub async fn create_directive_step( pool: &PgPool, directive_id: Uuid, req: CreateDirectiveStepRequest, ) -> Result { let generation = req.generation.unwrap_or(1); let order_id = req.order_id; let contract_type = req.contract_type.clone(); // Resolve the document this step belongs to. If the caller supplied one, // honour it; otherwise pick the directive's most recently-updated // active (or draft) document. Steps that can't be matched to any // document fall through with NULL — the sidebar treats those as // directive-level orphans. let directive_document_id = match req.directive_document_id { Some(id) => Some(id), None => resolve_active_document_for_directive(pool, directive_id) .await .unwrap_or(None), }; let step = sqlx::query_as::<_, DirectiveStep>( r#" INSERT INTO directive_steps ( directive_id, name, description, task_plan, depends_on, order_index, generation, contract_type, directive_document_id ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING * "#, ) .bind(directive_id) .bind(&req.name) .bind(&req.description) .bind(&req.task_plan) .bind(&req.depends_on) .bind(req.order_index) .bind(generation) .bind(&contract_type) .bind(&directive_document_id) .fetch_one(pool) .await?; // If an order_id was provided, auto-link the order to this step if let Some(oid) = order_id { sqlx::query( r#"UPDATE orders SET directive_step_id = $1, updated_at = NOW() WHERE id = $2"#, ) .bind(step.id) .bind(oid) .execute(pool) .await?; } Ok(step) } /// Batch create multiple directive steps. pub async fn batch_create_directive_steps( pool: &PgPool, directive_id: Uuid, steps: Vec, ) -> Result, sqlx::Error> { let mut results = Vec::with_capacity(steps.len()); for req in steps { let step = create_directive_step(pool, directive_id, req).await?; results.push(step); } Ok(results) } /// Update a directive step. pub async fn update_directive_step( pool: &PgPool, step_id: Uuid, req: UpdateDirectiveStepRequest, ) -> Result, sqlx::Error> { let current = sqlx::query_as::<_, DirectiveStep>( r#"SELECT * FROM directive_steps WHERE id = $1"#, ) .bind(step_id) .fetch_optional(pool) .await?; let current = match current { Some(c) => c, None => return Ok(None), }; let name = req.name.as_deref().unwrap_or(¤t.name); let description = req.description.as_deref().or(current.description.as_deref()); let task_plan = req.task_plan.as_deref().or(current.task_plan.as_deref()); let depends_on = req.depends_on.as_deref().unwrap_or(¤t.depends_on); let status = req.status.as_deref().unwrap_or(¤t.status); let task_id = req.task_id.or(current.task_id); let order_index = req.order_index.unwrap_or(current.order_index); // Set started_at when transitioning to running let started_at = if status == "running" && current.status != "running" { Some(Utc::now()) } else { current.started_at }; // Set completed_at when transitioning to terminal state let completed_at = if matches!(status, "completed" | "failed" | "skipped") && !matches!(current.status.as_str(), "completed" | "failed" | "skipped") { Some(Utc::now()) } else { current.completed_at }; sqlx::query_as::<_, DirectiveStep>( r#" UPDATE directive_steps SET name = $2, description = $3, task_plan = $4, depends_on = $5, status = $6, task_id = $7, order_index = $8, started_at = $9, completed_at = $10 WHERE id = $1 RETURNING * "#, ) .bind(step_id) .bind(name) .bind(description) .bind(task_plan) .bind(depends_on) .bind(status) .bind(task_id) .bind(order_index) .bind(started_at) .bind(completed_at) .fetch_optional(pool) .await } /// Delete a directive step. pub async fn delete_directive_step( pool: &PgPool, step_id: Uuid, ) -> Result { let result = sqlx::query(r#"DELETE FROM directive_steps WHERE id = $1"#) .bind(step_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Delete all directive steps that have not started execution (pending, ready, failed, skipped). /// Completed and running steps are preserved. /// Returns the number of deleted steps. pub async fn clear_pending_directive_steps( pool: &PgPool, directive_id: Uuid, ) -> Result { let result = sqlx::query( r#"DELETE FROM directive_steps WHERE directive_id = $1 AND status IN ('pending', 'ready', 'failed', 'skipped')"#, ) .bind(directive_id) .execute(pool) .await?; Ok(result.rows_affected()) } // ============================================================================= // Directive Document CRUD // ============================================================================= /// List all contracts under a directive in queue order. /// /// Ordered by `position` (lower = earlier), with `created_at` as a stable /// tie-break. Position is the queue order in the unified directive UI; /// only one contract is active at a time, and the next-up contract is /// the lowest-position non-shipped row. pub async fn list_directive_documents( pool: &PgPool, directive_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DirectiveDocument>( r#" SELECT * FROM directive_documents WHERE directive_id = $1 ORDER BY position ASC, created_at ASC "#, ) .bind(directive_id) .fetch_all(pool) .await } /// Get a single directive document by ID. pub async fn get_directive_document( pool: &PgPool, document_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DirectiveDocument>( r#"SELECT * FROM directive_documents WHERE id = $1"#, ) .bind(document_id) .fetch_optional(pool) .await } /// Create a new directive document (contract). Status defaults to 'draft'. /// /// The new row's `position` is computed server-side as /// `MAX(position) + 1` over the directive's existing contracts, so it /// lands at the bottom of the queue. Callers that want to insert in the /// middle should call `reorder_directive_document_position` afterwards. /// `merge_mode` defaults to 'shared' on creation; flip later via /// `update_directive_document`. pub async fn create_directive_document( pool: &PgPool, directive_id: Uuid, title: &str, body: &str, ) -> Result { sqlx::query_as::<_, DirectiveDocument>( r#" INSERT INTO directive_documents (directive_id, title, body, status, position) VALUES ( $1, $2, $3, 'draft', COALESCE( (SELECT MAX(position) + 1 FROM directive_documents WHERE directive_id = $1), 0 ) ) RETURNING * "#, ) .bind(directive_id) .bind(title) .bind(body) .fetch_one(pool) .await } /// Update a directive document's title and/or body. /// /// Bumps `version` and `updated_at`. If the document was previously in the /// `shipped` state and the body actually changed, the status flips back to /// `active` and `shipped_at` is cleared — this implements the "editing a /// shipped contract reactivates it" behaviour. The wiring from the API / /// handlers will be added in a later step. pub async fn update_directive_document( pool: &PgPool, document_id: Uuid, title: Option<&str>, body: Option<&str>, merge_mode: Option<&str>, ) -> Result, sqlx::Error> { let current = sqlx::query_as::<_, DirectiveDocument>( r#"SELECT * FROM directive_documents WHERE id = $1"#, ) .bind(document_id) .fetch_optional(pool) .await?; let current = match current { Some(c) => c, None => return Ok(None), }; let new_title = title.unwrap_or(¤t.title); let new_body = body.unwrap_or(¤t.body); let new_merge_mode = merge_mode.unwrap_or(¤t.merge_mode); let body_changed = new_body != current.body; // Reactivation rule: editing the body of a shipped doc flips it back // to 'active' and clears shipped_at. Other status transitions remain // untouched here and are handled by the dedicated mark/archive helpers. let reactivate_from_shipped = current.status == "shipped" && body_changed; let new_status = if reactivate_from_shipped { "active" } else { current.status.as_str() }; let result = sqlx::query_as::<_, DirectiveDocument>( r#" UPDATE directive_documents SET title = $2, body = $3, status = $4, shipped_at = CASE WHEN $5 THEN NULL ELSE shipped_at END, merge_mode = $6, version = version + 1, updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(document_id) .bind(new_title) .bind(new_body) .bind(new_status) .bind(reactivate_from_shipped) .bind(new_merge_mode) .fetch_optional(pool) .await?; Ok(result) } /// Move a contract to a new queue position within its directive. /// /// Implementation: a single SQL CTE that bumps siblings out of the way /// based on whether we're moving forward (later) or backward (earlier). /// Returns the updated contract row. pub async fn reorder_directive_document_position( pool: &PgPool, document_id: Uuid, new_position: i32, ) -> Result, sqlx::Error> { let mut tx = pool.begin().await?; let current = sqlx::query_as::<_, DirectiveDocument>( r#"SELECT * FROM directive_documents WHERE id = $1"#, ) .bind(document_id) .fetch_optional(&mut *tx) .await?; let current = match current { Some(c) => c, None => return Ok(None), }; if current.position == new_position { tx.commit().await?; return Ok(Some(current)); } // Shift siblings to make room. Moving forward (new > old) drags the // intermediate range back by one; moving backward pushes it forward. if new_position > current.position { sqlx::query( r#" UPDATE directive_documents SET position = position - 1 WHERE directive_id = $1 AND id <> $2 AND position > $3 AND position <= $4 "#, ) .bind(current.directive_id) .bind(document_id) .bind(current.position) .bind(new_position) .execute(&mut *tx) .await?; } else { sqlx::query( r#" UPDATE directive_documents SET position = position + 1 WHERE directive_id = $1 AND id <> $2 AND position >= $3 AND position < $4 "#, ) .bind(current.directive_id) .bind(document_id) .bind(new_position) .bind(current.position) .execute(&mut *tx) .await?; } let result = sqlx::query_as::<_, DirectiveDocument>( r#" UPDATE directive_documents SET position = $2, updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(document_id) .bind(new_position) .fetch_optional(&mut *tx) .await?; tx.commit().await?; Ok(result) } /// Mark a directive document as shipped (PR raised). Sets pr_url, optional /// pr_branch, status = 'shipped', shipped_at = NOW(), and bumps version. pub async fn mark_directive_document_shipped( pool: &PgPool, document_id: Uuid, pr_url: &str, pr_branch: Option<&str>, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DirectiveDocument>( r#" UPDATE directive_documents SET status = 'shipped', pr_url = $2, pr_branch = COALESCE($3, pr_branch), shipped_at = NOW(), version = version + 1, updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(document_id) .bind(pr_url) .bind(pr_branch) .fetch_optional(pool) .await } /// Archive a directive document. Sets status = 'archived' and stamps /// archived_at = NOW(). Idempotent — archiving an already-archived doc /// re-stamps archived_at and bumps version. /// /// If the archived contract was `active`, the next-up `queued` contract /// in the same directive auto-promotes to `active` (sequential queue). pub async fn archive_directive_document( pool: &PgPool, document_id: Uuid, ) -> Result, sqlx::Error> { let mut tx = pool.begin().await?; let archived = sqlx::query_as::<_, DirectiveDocument>( r#" UPDATE directive_documents SET status = 'archived', archived_at = NOW(), version = version + 1, updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(document_id) .fetch_optional(&mut *tx) .await?; if let Some(ref doc) = archived { promote_next_queued_contract(&mut tx, doc.directive_id).await?; } tx.commit().await?; Ok(archived) } // ============================================================================ // Lifecycle transitions: start / pause / complete / unlock // // The lifecycle is `draft → queued → active → shipped → archived`. At most // one contract per directive sits in `active` at a time — the queue is // serialised because a directive owns a single shared worktree. Helpers // below enforce that invariant in SQL transactions. // ============================================================================ /// Lock a draft contract and either activate it (if no sibling is active) /// or queue it. Returns the updated row, or `Ok(None)` if the contract /// doesn't exist. Errors with `RepositoryError::Validation` if the /// contract is in any state other than `draft`. /// /// Side effect: if the contract enters `active`, the parent directive /// is flipped to `active` (from `draft|paused|idle|inactive`). This is /// what makes the orchestrator reconciler pick the directive up — its /// gate is `directive.status = 'active' AND orchestrator_task_id IS NULL`. pub async fn start_contract( pool: &PgPool, contract_id: Uuid, ) -> Result, RepositoryError> { let mut tx = pool.begin().await?; let current = sqlx::query_as::<_, DirectiveDocument>( r#"SELECT * FROM directive_documents WHERE id = $1"#, ) .bind(contract_id) .fetch_optional(&mut *tx) .await?; let current = match current { Some(c) => c, None => return Ok(None), }; if current.status != "draft" { return Err(RepositoryError::Validation(format!( "contract is in status '{}'; only 'draft' contracts can be started", current.status ))); } // If any sibling is already active, this one queues. Otherwise it // claims the active slot directly. let active_count: (i64,) = sqlx::query_as( r#"SELECT COUNT(*)::BIGINT FROM directive_documents WHERE directive_id = $1 AND status = 'active'"#, ) .bind(current.directive_id) .fetch_one(&mut *tx) .await?; let new_status = if active_count.0 > 0 { "queued" } else { "active" }; let updated = sqlx::query_as::<_, DirectiveDocument>( r#" UPDATE directive_documents SET status = $2, version = version + 1, updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(contract_id) .bind(new_status) .fetch_optional(&mut *tx) .await?; // Flip the parent directive to active so the reconciler picks it up. // Only when this contract is actually entering the active slot — a // queued contract doesn't drive planning by itself. if new_status == "active" { activate_parent_directive(&mut tx, current.directive_id).await?; } tx.commit().await?; Ok(updated) } /// Pause an active contract — moves it back to `queued` so the next /// queued sibling can pick up the active slot. The orchestrator-daemon /// stop is the caller's responsibility. pub async fn pause_contract( pool: &PgPool, contract_id: Uuid, ) -> Result, RepositoryError> { let mut tx = pool.begin().await?; let current = sqlx::query_as::<_, DirectiveDocument>( r#"SELECT * FROM directive_documents WHERE id = $1"#, ) .bind(contract_id) .fetch_optional(&mut *tx) .await?; let current = match current { Some(c) => c, None => return Ok(None), }; if current.status != "active" { return Err(RepositoryError::Validation(format!( "contract is in status '{}'; only 'active' contracts can be paused", current.status ))); } let updated = sqlx::query_as::<_, DirectiveDocument>( r#" UPDATE directive_documents SET status = 'queued', version = version + 1, updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(contract_id) .fetch_optional(&mut *tx) .await?; // The slot is free — promote the next queued contract (lowest // position, excluding the one we just paused). promote_next_queued_contract(&mut tx, current.directive_id).await?; // If no contract is active after the pause+promote, pause the // directive too — stops the reconciler from spawning new planners // on what is now an idle directive. deactivate_parent_directive_if_no_active( &mut tx, current.directive_id, "paused", ) .await?; tx.commit().await?; Ok(updated) } /// Mark an active contract as `shipped` — the work is done. Optional /// pr_url / pr_branch are recorded if supplied. Promotes the next /// queued sibling to `active`. pub async fn complete_contract( pool: &PgPool, contract_id: Uuid, pr_url: Option<&str>, pr_branch: Option<&str>, ) -> Result, RepositoryError> { let mut tx = pool.begin().await?; let current = sqlx::query_as::<_, DirectiveDocument>( r#"SELECT * FROM directive_documents WHERE id = $1"#, ) .bind(contract_id) .fetch_optional(&mut *tx) .await?; let current = match current { Some(c) => c, None => return Ok(None), }; if current.status != "active" && current.status != "queued" { return Err(RepositoryError::Validation(format!( "contract is in status '{}'; only 'active' or 'queued' contracts can be completed", current.status ))); } let updated = sqlx::query_as::<_, DirectiveDocument>( r#" UPDATE directive_documents SET status = 'shipped', pr_url = COALESCE($2, pr_url), pr_branch = COALESCE($3, pr_branch), shipped_at = NOW(), version = version + 1, updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(contract_id) .bind(pr_url) .bind(pr_branch) .fetch_optional(&mut *tx) .await?; promote_next_queued_contract(&mut tx, current.directive_id).await?; // If the ship freed the active slot AND no queued contract was // available to promote, the directive itself goes inactive — its // iteration is shipped; the next cycle starts via reopen or a new // contract. deactivate_parent_directive_if_no_active( &mut tx, current.directive_id, "inactive", ) .await?; tx.commit().await?; Ok(updated) } /// Unlock a queued or active contract back to `draft` so the spec is /// editable again. If the contract was active, the slot frees and the /// next queued sibling auto-promotes. pub async fn unlock_contract( pool: &PgPool, contract_id: Uuid, ) -> Result, RepositoryError> { let mut tx = pool.begin().await?; let current = sqlx::query_as::<_, DirectiveDocument>( r#"SELECT * FROM directive_documents WHERE id = $1"#, ) .bind(contract_id) .fetch_optional(&mut *tx) .await?; let current = match current { Some(c) => c, None => return Ok(None), }; if current.status != "queued" && current.status != "active" { return Err(RepositoryError::Validation(format!( "contract is in status '{}'; only 'queued' or 'active' contracts can be unlocked", current.status ))); } let was_active = current.status == "active"; let updated = sqlx::query_as::<_, DirectiveDocument>( r#" UPDATE directive_documents SET status = 'draft', version = version + 1, updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(contract_id) .fetch_optional(&mut *tx) .await?; if was_active { promote_next_queued_contract(&mut tx, current.directive_id).await?; // If unlocking the active contract leaves no other active under // the directive, pause the directive too. deactivate_parent_directive_if_no_active( &mut tx, current.directive_id, "paused", ) .await?; } tx.commit().await?; Ok(updated) } /// Reopen a shipped contract for amendment. Flips the contract back to /// `active`, re-activates the parent directive, and clears the /// directive's PR linkage + orchestrator task so the reconciler spawns a /// fresh planner. The planner uses `get_latest_merged_revision` to /// detect the previously-shipped PR and frame the new plan as a delta. pub async fn reopen_contract( pool: &PgPool, contract_id: Uuid, ) -> Result, RepositoryError> { let mut tx = pool.begin().await?; let current = sqlx::query_as::<_, DirectiveDocument>( r#"SELECT * FROM directive_documents WHERE id = $1"#, ) .bind(contract_id) .fetch_optional(&mut *tx) .await?; let current = match current { Some(c) => c, None => return Ok(None), }; if current.status != "shipped" { return Err(RepositoryError::Validation(format!( "contract is in status '{}'; only 'shipped' contracts can be reopened", current.status ))); } let updated = sqlx::query_as::<_, DirectiveDocument>( r#" UPDATE directive_documents SET status = 'active', version = version + 1, updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(contract_id) .fetch_optional(&mut *tx) .await?; // Re-activate the directive and clear the prior PR + orchestrator // linkage. Status is forced to `active` regardless of prior value // (except archived — guard against re-opening under an archived // directive). sqlx::query( r#" UPDATE directives SET status = 'active', orchestrator_task_id = NULL, pr_url = NULL, pr_branch = NULL, updated_at = NOW(), version = version + 1 WHERE id = $1 AND status <> 'archived' "#, ) .bind(current.directive_id) .execute(&mut *tx) .await?; tx.commit().await?; Ok(updated) } /// Resolve the directive's currently-active contract id. Returns /// `Ok(None)` when no active contract exists. Used by the /// auto-complete-on-PR path so the contract row can be shipped at the /// same moment the directive registers its PR url. pub async fn get_active_contract_id_for_directive( pool: &PgPool, directive_id: Uuid, ) -> Result, sqlx::Error> { let row: Option<(Uuid,)> = sqlx::query_as( r#" SELECT id FROM directive_documents WHERE directive_id = $1 AND status = 'active' ORDER BY position ASC, created_at ASC LIMIT 1 "#, ) .bind(directive_id) .fetch_optional(pool) .await?; Ok(row.map(|r| r.0)) } /// Flip the parent directive to `active` when a child contract just /// became active. Only promotes from `draft|paused|idle|inactive` — /// leaves `archived` directives untouched. async fn activate_parent_directive( tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, directive_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( r#" UPDATE directives SET status = 'active', updated_at = NOW(), version = version + 1 WHERE id = $1 AND status IN ('draft', 'paused', 'idle', 'inactive') "#, ) .bind(directive_id) .execute(&mut **tx) .await?; Ok(()) } /// After a contract lifecycle change that may have left no active /// contract under the directive, transition the directive to the /// supplied `new_status` (typically `'paused'` for unlock/pause flows, /// `'inactive'` for ship). No-op if the directive still has an active /// contract or is already past the destination state. async fn deactivate_parent_directive_if_no_active( tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, directive_id: Uuid, new_status: &str, ) -> Result<(), sqlx::Error> { sqlx::query( r#" UPDATE directives SET status = $2, updated_at = NOW(), version = version + 1 WHERE id = $1 AND status = 'active' AND NOT EXISTS ( SELECT 1 FROM directive_documents WHERE directive_id = $1 AND status = 'active' ) "#, ) .bind(directive_id) .bind(new_status) .execute(&mut **tx) .await?; Ok(()) } /// Find the lowest-position `queued` contract under a directive and /// flip it to `active`. No-op when no queued contract exists. /// /// Caller must hold the parent transaction so the count → promote /// sequence stays atomic w.r.t. other lifecycle transitions. async fn promote_next_queued_contract( tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, directive_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( r#" UPDATE directive_documents SET status = 'active', version = version + 1, updated_at = NOW() WHERE id = ( SELECT id FROM directive_documents WHERE directive_id = $1 AND status = 'queued' ORDER BY position ASC, created_at ASC LIMIT 1 ) "#, ) .bind(directive_id) .execute(&mut **tx) .await?; Ok(()) } /// Count the number of currently-active documents under a directive. /// "Active" here means status = 'active' (not draft, shipped, or archived). pub async fn count_active_directive_documents( pool: &PgPool, directive_id: Uuid, ) -> Result { let row: (i64,) = sqlx::query_as( r#" SELECT COUNT(*)::BIGINT FROM directive_documents WHERE directive_id = $1 AND status = 'active' "#, ) .bind(directive_id) .fetch_one(pool) .await?; Ok(row.0) } /// List all tasks attached to a specific directive document. /// /// This powers the per-document `tasks/` subfolder in the sidebar — when a /// document ships, its tasks visually move with it under shipped/. Includes /// both step-execution tasks (those with directive_step_id set) and /// "ephemeral" / orchestrator-style tasks (those without a step). /// /// Hidden tasks are filtered out so dismissed tasks don't reappear in the /// document's task list. pub async fn list_directive_document_tasks( pool: &PgPool, document_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" SELECT * FROM tasks WHERE directive_document_id = $1 AND COALESCE(hidden, false) = false ORDER BY created_at DESC "#, ) .bind(document_id) .fetch_all(pool) .await } /// List directive_steps attached to a specific document, ordered the same /// way the directive page orders them (by order_index, then created_at). pub async fn list_directive_document_steps( pool: &PgPool, document_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DirectiveStep>( r#" SELECT * FROM directive_steps WHERE directive_document_id = $1 ORDER BY order_index, created_at "#, ) .bind(document_id) .fetch_all(pool) .await } // ============================================================================= // Directive DAG Progression // ============================================================================= /// Advance pending steps to ready if all their dependencies are in terminal states. /// Returns the newly-ready steps. pub async fn advance_directive_ready_steps( pool: &PgPool, directive_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DirectiveStep>( r#" UPDATE directive_steps SET status = 'ready' WHERE directive_id = $1 AND status = 'pending' AND NOT EXISTS ( SELECT 1 FROM unnest(depends_on) AS dep_id JOIN directive_steps ds ON ds.id = dep_id WHERE ds.status NOT IN ('completed', 'skipped') ) AND NOT EXISTS ( SELECT 1 FROM directive_steps prev WHERE prev.directive_id = $1 AND prev.order_index < directive_steps.order_index AND prev.status NOT IN ('completed', 'skipped', 'failed') ) RETURNING * "#, ) .bind(directive_id) .fetch_all(pool) .await } /// Check if all steps in a directive are in terminal states. /// If so, set the directive to 'idle' (not completed — directives are ongoing). /// Returns true if the directive was set to idle. pub async fn check_directive_idle( pool: &PgPool, directive_id: Uuid, ) -> Result { let result = sqlx::query( r#" UPDATE directives SET status = 'idle', updated_at = NOW() WHERE id = $1 AND status = 'active' AND NOT EXISTS ( SELECT 1 FROM directive_steps WHERE directive_id = $1 AND status NOT IN ('completed', 'failed', 'skipped') ) AND EXISTS ( SELECT 1 FROM directive_steps WHERE directive_id = $1 ) "#, ) .bind(directive_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Mark a directive 'inactive'. Used at the moment a PR is raised — at that /// point the contract's current iteration is "shipped" and editing the goal /// (Stage 4) starts an amendment cycle. Idempotent: no-op if status is /// already inactive or already past it (e.g. archived). pub async fn set_directive_inactive( pool: &PgPool, directive_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( r#" UPDATE directives SET status = 'inactive', updated_at = NOW(), version = version + 1 WHERE id = $1 AND status IN ('active', 'idle', 'paused') "#, ) .bind(directive_id) .execute(pool) .await?; Ok(()) } /// Reset a directive for a "new draft" cycle: flip status to 'draft' and /// detach the current pr_url / pr_branch / orchestrator linkage so the /// next contract activation starts fresh. Prior revisions remain in /// `directive_revisions` as the historical record. pub async fn reset_directive_for_new_draft( pool: &PgPool, owner_id: Uuid, directive_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Directive>( r#" UPDATE directives SET status = 'draft', pr_url = NULL, pr_branch = NULL, orchestrator_task_id = NULL, completion_task_id = NULL, updated_at = NOW(), version = version + 1 WHERE id = $1 AND owner_id = $2 RETURNING * "#, ) .bind(directive_id) .bind(owner_id) .fetch_optional(pool) .await } // ============================================================================= // Directive Revisions — per-PR snapshots of the contract content. // ============================================================================= /// Snapshot the directive's current goal as a revision attached to the given /// PR URL. The version is auto-assigned as MAX(existing) + 1 per directive. /// Idempotent on (directive_id, pr_url): if a revision already exists for /// this directive+pr_url combo, returns the existing row instead of creating /// a duplicate. pub async fn create_directive_revision( pool: &PgPool, directive_id: Uuid, content: &str, pr_url: &str, pr_branch: Option<&str>, ) -> Result { // Idempotency: don't double-snapshot if the orchestrator's completion task // re-runs and re-sets the same pr_url. if let Some(existing) = sqlx::query_as::<_, crate::db::models::DirectiveRevision>( r#" SELECT * FROM directive_revisions WHERE directive_id = $1 AND pr_url = $2 ORDER BY frozen_at DESC LIMIT 1 "#, ) .bind(directive_id) .bind(pr_url) .fetch_optional(pool) .await? { return Ok(existing); } sqlx::query_as::<_, crate::db::models::DirectiveRevision>( r#" INSERT INTO directive_revisions (directive_id, content, pr_url, pr_branch, pr_state, version, frozen_at) SELECT $1, $2, $3, $4, 'open', COALESCE(MAX(version), 0) + 1, NOW() FROM directive_revisions WHERE directive_id = $1 RETURNING * "#, ) .bind(directive_id) .bind(content) .bind(pr_url) .bind(pr_branch) .fetch_one(pool) .await } /// List all revisions for a directive, newest first. Scoped by owner via the /// directive join so callers don't accidentally surface other users' history. pub async fn list_directive_revisions_for_owner( pool: &PgPool, owner_id: Uuid, directive_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, crate::db::models::DirectiveRevision>( r#" SELECT r.* FROM directive_revisions r JOIN directives d ON d.id = r.directive_id WHERE r.directive_id = $1 AND d.owner_id = $2 ORDER BY r.frozen_at DESC "#, ) .bind(directive_id) .bind(owner_id) .fetch_all(pool) .await } /// Update the pr_state on a revision (called by the reconciler when it /// detects a PR transitioned to merged/closed). New state must be one of /// 'open' | 'merged' | 'closed' to satisfy the table's CHECK constraint. pub async fn update_directive_revision_pr_state( pool: &PgPool, revision_id: Uuid, new_state: &str, ) -> Result<(), sqlx::Error> { sqlx::query( r#"UPDATE directive_revisions SET pr_state = $2 WHERE id = $1"#, ) .bind(revision_id) .bind(new_state) .execute(pool) .await?; Ok(()) } /// Find the most recent merged revision for a directive — used when planning /// an amendment to know what the previous "frozen" content was so the diff /// can be passed to the orchestrator. pub async fn get_latest_merged_revision( pool: &PgPool, directive_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, crate::db::models::DirectiveRevision>( r#" SELECT * FROM directive_revisions WHERE directive_id = $1 AND pr_state = 'merged' ORDER BY frozen_at DESC LIMIT 1 "#, ) .bind(directive_id) .fetch_optional(pool) .await } /// Set a directive's status (used for start/pause/archive transitions). pub async fn set_directive_status( pool: &PgPool, owner_id: Uuid, directive_id: Uuid, status: &str, ) -> Result, sqlx::Error> { let mut query = String::from( r#"UPDATE directives SET status = $3, updated_at = NOW(), version = version + 1"#, ); if status == "active" { query.push_str(", started_at = COALESCE(started_at, NOW())"); } query.push_str(" WHERE id = $1 AND owner_id = $2 RETURNING *"); sqlx::query_as::<_, Directive>(&query) .bind(directive_id) .bind(owner_id) .bind(status) .fetch_optional(pool) .await } // ============================================================================= // Directive Orchestrator Queries // ============================================================================= /// Get active directives that need planning (no steps, no orchestrator task). pub async fn get_directives_needing_planning( pool: &PgPool, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Directive>( r#" SELECT d.* FROM directives d WHERE d.status = 'active' AND d.orchestrator_task_id IS NULL AND NOT EXISTS ( SELECT 1 FROM directive_steps WHERE directive_id = d.id ) "#, ) .fetch_all(pool) .await } /// A step joined with minimal directive info for dispatch. #[derive(Debug, Clone, sqlx::FromRow)] pub struct StepForDispatch { // Step fields pub step_id: Uuid, pub directive_id: Uuid, pub step_name: String, pub step_description: Option, pub task_plan: Option, pub order_index: i32, pub generation: i32, pub depends_on: Vec, /// Optional contract type — when set, orchestrator creates a contract instead of a task. pub contract_type: Option, // Directive fields pub owner_id: Uuid, pub directive_title: String, pub repository_url: Option, pub base_branch: Option, /// The directive's PR branch (if a PR has already been created from previous steps). pub pr_branch: Option, /// The directive's reconcile mode: "auto", "semi-auto", or "manual". pub reconcile_mode: String, } /// Get ready steps that need task dispatch. pub async fn get_ready_steps_for_dispatch( pool: &PgPool, ) -> Result, sqlx::Error> { sqlx::query_as::<_, StepForDispatch>( r#" SELECT ds.id AS step_id, ds.directive_id, ds.name AS step_name, ds.description AS step_description, ds.task_plan, ds.order_index, ds.generation, ds.depends_on, ds.contract_type, d.owner_id, d.title AS directive_title, d.repository_url, d.base_branch, d.pr_branch, d.reconcile_mode FROM directive_steps ds JOIN directives d ON d.id = ds.directive_id WHERE ds.status = 'ready' AND ds.task_id IS NULL AND ds.contract_id IS NULL AND d.status = 'active' ORDER BY ds.order_index "#, ) .fetch_all(pool) .await } /// Task info for a dependency step (step → linked task). #[derive(Debug, Clone, sqlx::FromRow)] pub struct DependencyTaskInfo { pub step_id: Uuid, pub task_id: Uuid, pub task_name: String, } /// Resolve dependency step UUIDs to their linked task IDs and names. /// Returns results in the same order as the input `depends_on` slice. pub async fn get_step_dependency_tasks( pool: &PgPool, depends_on: &[Uuid], ) -> Result, sqlx::Error> { if depends_on.is_empty() { return Ok(vec![]); } let rows = sqlx::query_as::<_, DependencyTaskInfo>( r#" SELECT ds.id AS step_id, t.id AS task_id, t.name AS task_name FROM directive_steps ds JOIN tasks t ON t.id = ds.task_id WHERE ds.id = ANY($1) "#, ) .bind(depends_on) .fetch_all(pool) .await?; // Re-order to match input ordering let mut ordered = Vec::with_capacity(depends_on.len()); for dep_id in depends_on { if let Some(row) = rows.iter().find(|r| r.step_id == *dep_id) { ordered.push(row.clone()); } } Ok(ordered) } /// A running step joined with its task's current status. #[derive(Debug, Clone, sqlx::FromRow)] pub struct RunningStepWithTask { pub step_id: Uuid, pub directive_id: Uuid, pub task_id: Uuid, pub task_status: String, } /// Get running steps with their task status for monitoring. pub async fn get_running_steps_with_tasks( pool: &PgPool, ) -> Result, sqlx::Error> { sqlx::query_as::<_, RunningStepWithTask>( r#" SELECT ds.id AS step_id, ds.directive_id, ds.task_id AS "task_id!", t.status AS task_status FROM directive_steps ds JOIN tasks t ON t.id = ds.task_id WHERE ds.status = 'running' AND ds.task_id IS NOT NULL AND ds.contract_id IS NULL "#, ) .fetch_all(pool) .await } /// An orchestrator task to check (directive with pending planning task). #[derive(Debug, Clone, sqlx::FromRow)] pub struct OrchestratorTaskCheck { pub directive_id: Uuid, pub orchestrator_task_id: Uuid, pub task_status: String, pub owner_id: Uuid, } /// Get directives with orchestrator tasks to check completion. pub async fn get_orchestrator_tasks_to_check( pool: &PgPool, ) -> Result, sqlx::Error> { sqlx::query_as::<_, OrchestratorTaskCheck>( r#" SELECT d.id AS directive_id, d.orchestrator_task_id AS "orchestrator_task_id!", t.status AS task_status, d.owner_id FROM directives d JOIN tasks t ON t.id = d.orchestrator_task_id WHERE d.orchestrator_task_id IS NOT NULL AND d.status = 'active' "#, ) .fetch_all(pool) .await } /// Get active directives that need re-planning (goal updated after latest step). pub async fn get_directives_needing_replanning( pool: &PgPool, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Directive>( r#" SELECT d.* FROM directives d WHERE d.status = 'active' AND d.orchestrator_task_id IS NULL AND EXISTS ( SELECT 1 FROM directive_steps WHERE directive_id = d.id ) AND d.goal_updated_at > ( SELECT COALESCE(MAX(ds.created_at), '1970-01-01'::timestamptz) FROM directive_steps ds WHERE ds.directive_id = d.id ) "#, ) .fetch_all(pool) .await } /// Assign a task to a step and set status to running. pub async fn assign_task_to_step( pool: &PgPool, step_id: Uuid, task_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DirectiveStep>( r#" UPDATE directive_steps SET task_id = $2, status = 'running', started_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(step_id) .bind(task_id) .fetch_optional(pool) .await } /// Set the orchestrator_task_id on a directive. pub async fn assign_orchestrator_task( pool: &PgPool, directive_id: Uuid, task_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( r#" UPDATE directives SET orchestrator_task_id = $2, updated_at = NOW() WHERE id = $1 "#, ) .bind(directive_id) .bind(task_id) .execute(pool) .await?; Ok(()) } /// Clear the orchestrator_task_id on a directive. pub async fn clear_orchestrator_task( pool: &PgPool, directive_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( r#" UPDATE directives SET orchestrator_task_id = NULL, updated_at = NOW() WHERE id = $1 "#, ) .bind(directive_id) .execute(pool) .await?; Ok(()) } /// Cancel old planning tasks for a directive. /// Marks any non-terminal planning/re-planning tasks as interrupted, /// excluding the given new task. Identifies planning tasks by name prefix /// ("Plan: " or "Re-plan: ") to avoid cancelling completion/verification tasks. pub async fn cancel_old_planning_tasks( pool: &PgPool, directive_id: Uuid, exclude_task_id: Uuid, ) -> Result { let result = sqlx::query( r#" UPDATE tasks SET status = 'interrupted', completed_at = NOW(), updated_at = NOW() WHERE directive_id = $1 AND id != $2 AND (name LIKE 'Plan: %' OR name LIKE 'Re-plan: %') AND status NOT IN ('completed', 'failed', 'merged', 'done', 'interrupted') "#, ) .bind(directive_id) .bind(exclude_task_id) .execute(pool) .await?; Ok(result.rows_affected()) } /// Link a task to a step without changing step status. pub async fn link_task_to_step( pool: &PgPool, step_id: Uuid, task_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( r#" UPDATE directive_steps SET task_id = $2 WHERE id = $1 "#, ) .bind(step_id) .bind(task_id) .execute(pool) .await?; Ok(()) } /// Set a step to 'running' status (after its task has been dispatched). pub async fn set_step_running( pool: &PgPool, step_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( r#" UPDATE directive_steps SET status = 'running', started_at = COALESCE(started_at, NOW()) WHERE id = $1 "#, ) .bind(step_id) .execute(pool) .await?; Ok(()) } /// Get pending directive tasks (tasks with directive_id that are still pending). pub async fn get_pending_directive_tasks( pool: &PgPool, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Task>( r#" SELECT * FROM tasks WHERE directive_id IS NOT NULL AND status = 'pending' AND daemon_id IS NULL ORDER BY created_at "#, ) .fetch_all(pool) .await } /// Get the max generation number for steps in a directive. pub async fn get_directive_max_generation( pool: &PgPool, directive_id: Uuid, ) -> Result { let row: (Option,) = sqlx::query_as( r#"SELECT MAX(generation) FROM directive_steps WHERE directive_id = $1"#, ) .bind(directive_id) .fetch_one(pool) .await?; Ok(row.0.unwrap_or(0)) } // ============================================================================= // Order CRUD // ============================================================================= /// Create a new order for the given owner. pub async fn create_order( pool: &PgPool, owner_id: Uuid, req: CreateOrderRequest, ) -> Result { let priority = req.priority.as_deref().unwrap_or("medium"); let status = req.status.as_deref().unwrap_or("open"); let order_type = req.order_type.as_deref().unwrap_or("feature"); sqlx::query_as::<_, Order>( r#" INSERT INTO orders (owner_id, title, description, priority, status, order_type, labels, directive_id, repository_url, dog_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING * "#, ) .bind(owner_id) .bind(&req.title) .bind(&req.description) .bind(priority) .bind(status) .bind(order_type) .bind(&req.labels) .bind(req.directive_id) .bind(&req.repository_url) .bind(req.dog_id) .fetch_one(pool) .await } /// List orders for the given owner with optional filters. pub async fn list_orders( pool: &PgPool, owner_id: Uuid, status_filter: Option<&str>, type_filter: Option<&str>, priority_filter: Option<&str>, directive_id_filter: Option, dog_id_filter: Option, search_filter: Option<&str>, ) -> Result, sqlx::Error> { // Build dynamic query with optional filters let mut query = String::from("SELECT * FROM orders WHERE owner_id = $1"); let mut param_idx = 2u32; if status_filter.is_some() { query.push_str(&format!(" AND status = ${}", param_idx)); param_idx += 1; } if type_filter.is_some() { query.push_str(&format!(" AND order_type = ${}", param_idx)); param_idx += 1; } if priority_filter.is_some() { query.push_str(&format!(" AND priority = ${}", param_idx)); param_idx += 1; } if directive_id_filter.is_some() { query.push_str(&format!(" AND directive_id = ${}", param_idx)); param_idx += 1; } if dog_id_filter.is_some() { query.push_str(&format!(" AND dog_id = ${}", param_idx)); param_idx += 1; } if search_filter.is_some() { query.push_str(&format!( " AND (title ILIKE ${p} OR description ILIKE ${p} OR directive_name ILIKE ${p})", p = param_idx )); let _ = param_idx; // suppress unused warning } query.push_str(" ORDER BY created_at DESC"); let mut q = sqlx::query_as::<_, Order>(&query).bind(owner_id); if let Some(s) = status_filter { q = q.bind(s); } if let Some(t) = type_filter { q = q.bind(t); } if let Some(p) = priority_filter { q = q.bind(p); } if let Some(d) = directive_id_filter { q = q.bind(d); } if let Some(d) = dog_id_filter { q = q.bind(d); } if let Some(s) = search_filter { q = q.bind(format!("%{}%", s)); } q.fetch_all(pool).await } /// Get a single order by ID (owner-scoped). pub async fn get_order( pool: &PgPool, owner_id: Uuid, order_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Order>( r#"SELECT * FROM orders WHERE id = $1 AND owner_id = $2"#, ) .bind(order_id) .bind(owner_id) .fetch_optional(pool) .await } /// Update an order (owner-scoped). Uses COALESCE pattern to only update provided fields. pub async fn update_order( pool: &PgPool, owner_id: Uuid, order_id: Uuid, req: UpdateOrderRequest, ) -> Result, sqlx::Error> { let current = sqlx::query_as::<_, Order>( r#"SELECT * FROM orders WHERE id = $1 AND owner_id = $2"#, ) .bind(order_id) .bind(owner_id) .fetch_optional(pool) .await?; let current = match current { Some(c) => c, None => return Ok(None), }; let title = req.title.as_deref().unwrap_or(¤t.title); let description = req.description.as_deref().or(current.description.as_deref()); let priority = req.priority.as_deref().unwrap_or(¤t.priority); let status = req.status.as_deref().unwrap_or(¤t.status); let order_type = req.order_type.as_deref().unwrap_or(¤t.order_type); let labels = req.labels.as_ref().unwrap_or(¤t.labels); let directive_id = req.directive_id.or(current.directive_id); let directive_step_id = req.directive_step_id.or(current.directive_step_id); let repository_url = req.repository_url.as_deref().or(current.repository_url.as_deref()); let dog_id = req.dog_id.or(current.dog_id); sqlx::query_as::<_, Order>( r#" UPDATE orders SET title = $3, description = $4, priority = $5, status = $6, order_type = $7, labels = $8, directive_id = $9, directive_step_id = $10, repository_url = $11, dog_id = $12, updated_at = NOW() WHERE id = $1 AND owner_id = $2 RETURNING * "#, ) .bind(order_id) .bind(owner_id) .bind(title) .bind(description) .bind(priority) .bind(status) .bind(order_type) .bind(labels) .bind(directive_id) .bind(directive_step_id) .bind(repository_url) .bind(dog_id) .fetch_optional(pool) .await } /// Delete an order (owner-scoped). Returns true if a row was deleted. pub async fn delete_order( pool: &PgPool, owner_id: Uuid, order_id: Uuid, ) -> Result { let result = sqlx::query( r#"DELETE FROM orders WHERE id = $1 AND owner_id = $2"#, ) .bind(order_id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Link an order to a directive. pub async fn link_order_to_directive( pool: &PgPool, owner_id: Uuid, order_id: Uuid, directive_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Order>( r#" UPDATE orders SET directive_id = $3, updated_at = NOW() WHERE id = $1 AND owner_id = $2 RETURNING * "#, ) .bind(order_id) .bind(owner_id) .bind(directive_id) .fetch_optional(pool) .await } /// Convert an order to a directive step. Creates a new DirectiveStep from the order's /// title and description, links the order to the new step, and returns the created step. /// Uses the order's existing directive_id (which is required for new orders). pub async fn convert_order_to_step( pool: &PgPool, owner_id: Uuid, order_id: Uuid, ) -> Result, sqlx::Error> { // Verify the order exists and belongs to this owner let order = sqlx::query_as::<_, Order>( r#"SELECT * FROM orders WHERE id = $1 AND owner_id = $2"#, ) .bind(order_id) .bind(owner_id) .fetch_optional(pool) .await?; let order = match order { Some(o) => o, None => return Ok(None), }; // Get the directive_id from the order (required for new orders, but legacy data may have NULL) let directive_id = match order.directive_id { Some(id) => id, None => return Ok(None), }; // Verify the directive exists and belongs to this owner let directive = sqlx::query_as::<_, Directive>( r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#, ) .bind(directive_id) .bind(owner_id) .fetch_optional(pool) .await?; if directive.is_none() { return Ok(None); } // Get the next order_index for this directive let max_index: (Option,) = sqlx::query_as( r#"SELECT MAX(order_index) FROM directive_steps WHERE directive_id = $1"#, ) .bind(directive_id) .fetch_one(pool) .await?; let next_index = max_index.0.unwrap_or(-1) + 1; // Create the directive step from order data let step = sqlx::query_as::<_, DirectiveStep>( r#" INSERT INTO directive_steps (directive_id, name, description, order_index) VALUES ($1, $2, $3, $4) RETURNING * "#, ) .bind(directive_id) .bind(&order.title) .bind(&order.description) .bind(next_index) .fetch_one(pool) .await?; // Link the order to the new step sqlx::query( r#" UPDATE orders SET directive_step_id = $3, updated_at = NOW() WHERE id = $1 AND owner_id = $2 "#, ) .bind(order_id) .bind(owner_id) .bind(step.id) .execute(pool) .await?; Ok(Some(step)) } // ============================================================================= // Order Pickup // ============================================================================= /// Get available orders for pickup: open orders with no directive assigned /// OR orders already linked to this specific directive that are not yet done, /// sorted by priority (critical first) then creation date. pub async fn get_available_orders_for_pickup( pool: &PgPool, owner_id: Uuid, directive_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Order>( r#" SELECT * FROM orders WHERE owner_id = $1 AND status IN ('open', 'in_progress') AND (directive_id IS NULL OR directive_id = $2) ORDER BY CASE priority WHEN 'critical' THEN 0 WHEN 'high' THEN 1 WHEN 'medium' THEN 2 WHEN 'low' THEN 3 ELSE 4 END ASC, created_at ASC "#, ) .bind(owner_id) .bind(directive_id) .fetch_all(pool) .await } /// Bulk-link orders to a directive by setting directive_id on matching orders. /// Returns the count of updated rows. pub async fn bulk_link_orders_to_directive( pool: &PgPool, owner_id: Uuid, order_ids: &[Uuid], directive_id: Uuid, ) -> Result { let result = sqlx::query( r#" UPDATE orders SET directive_id = $1, updated_at = NOW() WHERE id = ANY($2) AND owner_id = $3 "#, ) .bind(directive_id) .bind(order_ids) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() as i64) } /// Bulk update order status for a set of order IDs. /// Returns the count of updated rows. pub async fn bulk_update_order_status( pool: &PgPool, owner_id: Uuid, order_ids: &[Uuid], status: &str, ) -> Result { let result = sqlx::query( r#"UPDATE orders SET status = $1, updated_at = NOW() WHERE id = ANY($2) AND owner_id = $3"#, ) .bind(status) .bind(order_ids) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() as i64) } /// Get orders linked to a specific directive step. pub async fn get_orders_by_step_id( pool: &PgPool, step_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Order>( r#"SELECT * FROM orders WHERE directive_step_id = $1"#, ) .bind(step_id) .fetch_all(pool) .await } /// Reconcile directive orders by checking linked step statuses. /// - Orders linked to completed steps are marked "done" /// - Orders linked to running/ready steps are marked "under_review" /// Returns the count of orders updated. pub async fn reconcile_directive_orders( pool: &PgPool, owner_id: Uuid, directive_id: Uuid, ) -> Result { let rows: Vec<(Uuid,)> = sqlx::query_as( r#" UPDATE orders o SET status = CASE WHEN ds.status = 'completed' THEN 'done' WHEN ds.status IN ('running', 'ready') THEN 'under_review' ELSE o.status END, updated_at = NOW() FROM directive_steps ds WHERE o.directive_step_id = ds.id AND o.directive_id = $1 AND o.owner_id = $2 AND o.status NOT IN ('done', 'archived') AND ds.status IN ('completed', 'running', 'ready') RETURNING o.id "#, ) .bind(directive_id) .bind(owner_id) .fetch_all(pool) .await?; Ok(rows.len() as i64) } // ============================================================================= // Directive Order Group (DOG) CRUD // ============================================================================= /// Create a new Directive Order Group (DOG) for the given owner and directive. pub async fn create_directive_order_group( pool: &PgPool, directive_id: Uuid, owner_id: Uuid, req: CreateDirectiveOrderGroupRequest, ) -> Result { sqlx::query_as::<_, DirectiveOrderGroup>( r#" INSERT INTO directive_order_groups (directive_id, owner_id, name, description) VALUES ($1, $2, $3, $4) RETURNING * "#, ) .bind(directive_id) .bind(owner_id) .bind(&req.name) .bind(&req.description) .fetch_one(pool) .await } /// List all DOGs for a given directive (owner-scoped). pub async fn list_directive_order_groups( pool: &PgPool, directive_id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DirectiveOrderGroup>( r#" SELECT * FROM directive_order_groups WHERE directive_id = $1 AND owner_id = $2 ORDER BY created_at DESC "#, ) .bind(directive_id) .bind(owner_id) .fetch_all(pool) .await } /// Get a single DOG by ID (owner-scoped). pub async fn get_directive_order_group( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DirectiveOrderGroup>( r#"SELECT * FROM directive_order_groups WHERE id = $1 AND owner_id = $2"#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await } /// Update a DOG (owner-scoped). Uses fetch-then-update pattern for partial updates. pub async fn update_directive_order_group( pool: &PgPool, id: Uuid, owner_id: Uuid, req: UpdateDirectiveOrderGroupRequest, ) -> Result, sqlx::Error> { let current = sqlx::query_as::<_, DirectiveOrderGroup>( r#"SELECT * FROM directive_order_groups WHERE id = $1 AND owner_id = $2"#, ) .bind(id) .bind(owner_id) .fetch_optional(pool) .await?; let current = match current { Some(c) => c, None => return Ok(None), }; let name = req.name.as_deref().unwrap_or(¤t.name); let description = req.description.as_deref().or(current.description.as_deref()); let status = req.status.as_deref().unwrap_or(¤t.status); sqlx::query_as::<_, DirectiveOrderGroup>( r#" UPDATE directive_order_groups SET name = $3, description = $4, status = $5, updated_at = NOW() WHERE id = $1 AND owner_id = $2 RETURNING * "#, ) .bind(id) .bind(owner_id) .bind(name) .bind(description) .bind(status) .fetch_optional(pool) .await } /// Delete a DOG (owner-scoped). Returns true if a row was deleted. pub async fn delete_directive_order_group( pool: &PgPool, id: Uuid, owner_id: Uuid, ) -> Result { let result = sqlx::query( r#"DELETE FROM directive_order_groups WHERE id = $1 AND owner_id = $2"#, ) .bind(id) .bind(owner_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// List orders belonging to a specific DOG (owner-scoped). pub async fn list_orders_by_dog( pool: &PgPool, dog_id: Uuid, owner_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Order>( r#" SELECT * FROM orders WHERE dog_id = $1 AND owner_id = $2 ORDER BY created_at DESC "#, ) .bind(dog_id) .bind(owner_id) .fetch_all(pool) .await } /// Get available orders for pickup filtered to a specific DOG. /// Like `get_available_orders_for_pickup` but only returns orders belonging to the given DOG. pub async fn get_available_orders_for_dog_pickup( pool: &PgPool, owner_id: Uuid, directive_id: Uuid, dog_id: Uuid, ) -> Result, sqlx::Error> { sqlx::query_as::<_, Order>( r#" SELECT * FROM orders WHERE owner_id = $1 AND dog_id = $3 AND status IN ('open', 'in_progress') AND (directive_id IS NULL OR directive_id = $2) ORDER BY CASE priority WHEN 'critical' THEN 0 WHEN 'high' THEN 1 WHEN 'medium' THEN 2 WHEN 'low' THEN 3 ELSE 4 END ASC, created_at ASC "#, ) .bind(owner_id) .bind(directive_id) .bind(dog_id) .fetch_all(pool) .await }