diff options
Diffstat (limited to 'makima/src/db/repository.rs')
| -rw-r--r-- | makima/src/db/repository.rs | 2537 |
1 files changed, 71 insertions, 2466 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index ee4b561..d453f99 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -6,21 +6,17 @@ use sqlx::PgPool; use uuid::Uuid; use super::models::{ - CheckpointPatch, CheckpointPatchInfo, Contract, ContractChatConversation, - ContractChatMessageRecord, ContractEvent, ContractRepository, ContractSummary, - ContractTypeTemplateRecord, ConversationMessage, ConversationSnapshot, - CreateContractRequest, CreateFileRequest, CreateTaskRequest, - CreateTemplateRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, - DeliverableDefinition, Directive, DirectiveDocument, DirectiveStep, DirectiveSummary, + CheckpointPatch, CheckpointPatchInfo, ConversationMessage, ConversationSnapshot, + CreateFileRequest, CreateTaskRequest, + Daemon, DaemonTaskAssignment, DaemonWithCapacity, + Directive, DirectiveDocument, DirectiveStep, DirectiveSummary, CreateDirectiveRequest, CreateDirectiveStepRequest, UpdateDirectiveRequest, UpdateDirectiveStepRequest, CreateOrderRequest, Order, UpdateOrderRequest, CreateDirectiveOrderGroupRequest, DirectiveOrderGroup, UpdateDirectiveOrderGroupRequest, File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters, - MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult, PhaseConfig, - PhaseDefinition, SupervisorHeartbeatRecord, SupervisorState, - Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest, - UpdateFileRequest, UpdateTaskRequest, UpdateTemplateRequest, + Task, TaskCheckpoint, TaskEvent, TaskSummary, + UpdateFileRequest, UpdateTaskRequest, }; /// Repository error types. @@ -89,7 +85,7 @@ pub async fn create_file(pool: &PgPool, req: InternalCreateFileRequest) -> Resul r#" INSERT INTO files (name, description, transcript, location, summary, body) VALUES ($1, $2, $3, $4, NULL, $5) - RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at + RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(&name) @@ -105,7 +101,7 @@ pub async fn create_file(pool: &PgPool, req: InternalCreateFileRequest) -> Resul pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Error> { sqlx::query_as::<_, File>( r#" - SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at + SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files WHERE id = $1 "#, @@ -119,7 +115,7 @@ pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Err pub async fn list_files(pool: &PgPool) -> Result<Vec<File>, sqlx::Error> { sqlx::query_as::<_, File>( r#" - SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at + SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files ORDER BY created_at DESC "#, @@ -171,7 +167,7 @@ pub async fn update_file( UPDATE files SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW() WHERE id = $1 AND version = $7 - RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at + RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) @@ -190,7 +186,7 @@ pub async fn update_file( UPDATE files SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW() WHERE id = $1 - RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at + RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) @@ -246,7 +242,6 @@ pub async fn count_files(pool: &PgPool) -> Result<i64, sqlx::Error> { // ============================================================================= /// Create a new file record for a specific owner. -/// Files must belong to a contract - the contract_id is required and the phase is looked up. pub async fn create_file_for_owner( pool: &PgPool, owner_id: Uuid, @@ -254,32 +249,16 @@ pub async fn create_file_for_owner( ) -> Result<File, sqlx::Error> { let name = req.name.unwrap_or_else(generate_default_name); let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default(); - // Use body from request (may be empty or contain template elements) let body_json = serde_json::to_value(&req.body).unwrap_or_default(); - // Use provided contract_phase, or look up from contract's current phase - let contract_phase: Option<String> = if req.contract_phase.is_some() { - req.contract_phase - } else { - sqlx::query_scalar( - "SELECT phase FROM contracts WHERE id = $1 AND owner_id = $2", - ) - .bind(req.contract_id) - .bind(owner_id) - .fetch_optional(pool) - .await? - }; - sqlx::query_as::<_, File>( r#" - INSERT INTO files (owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, repo_file_path) - VALUES ($1, $2, $3, $4, $5, $6, $7, NULL, $8, $9) - RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at + INSERT INTO files (owner_id, name, description, transcript, location, summary, body, repo_file_path) + VALUES ($1, $2, $3, $4, $5, NULL, $6, $7) + RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(owner_id) - .bind(req.contract_id) - .bind(&contract_phase) .bind(&name) .bind(&req.description) .bind(&transcript_json) @@ -298,7 +277,7 @@ pub async fn get_file_for_owner( ) -> Result<Option<File>, sqlx::Error> { sqlx::query_as::<_, File>( r#" - SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at + SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files WHERE id = $1 AND owner_id = $2 "#, @@ -313,7 +292,7 @@ pub async fn get_file_for_owner( pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<File>, sqlx::Error> { sqlx::query_as::<_, File>( r#" - SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at + SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files WHERE owner_id = $1 ORDER BY created_at DESC @@ -324,13 +303,10 @@ pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<F .await } -/// Database row type for file summary with contract info +/// Database row type for file summary #[derive(Debug, sqlx::FromRow)] struct FileSummaryRow { id: Uuid, - contract_id: Option<Uuid>, - contract_name: Option<String>, - contract_phase: Option<String>, name: String, description: Option<String>, #[sqlx(json)] @@ -342,7 +318,7 @@ struct FileSummaryRow { updated_at: chrono::DateTime<chrono::Utc>, } -/// List file summaries for an owner with contract info (joined). +/// List file summaries for an owner. pub async fn list_file_summaries_for_owner( pool: &PgPool, owner_id: Uuid, @@ -350,11 +326,9 @@ pub async fn list_file_summaries_for_owner( let rows = sqlx::query_as::<_, FileSummaryRow>( r#" SELECT - f.id, f.contract_id, c.name as contract_name, f.contract_phase, - f.name, f.description, f.transcript, f.version, + f.id, f.name, f.description, f.transcript, f.version, f.repo_file_path, f.repo_sync_status, f.created_at, f.updated_at FROM files f - LEFT JOIN contracts c ON f.contract_id = c.id WHERE f.owner_id = $1 ORDER BY f.created_at DESC "#, @@ -373,9 +347,6 @@ pub async fn list_file_summaries_for_owner( .fold(0.0_f32, f32::max); FileSummary { id: row.id, - contract_id: row.contract_id, - contract_name: row.contract_name, - contract_phase: row.contract_phase, name: row.name, description: row.description, transcript_count: row.transcript.len(), @@ -429,7 +400,7 @@ pub async fn update_file_for_owner( UPDATE files SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() WHERE id = $1 AND owner_id = $2 AND version = $8 - RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at + RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) @@ -449,7 +420,7 @@ pub async fn update_file_for_owner( UPDATE files SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() WHERE id = $1 AND owner_id = $2 - RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at + RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) @@ -657,34 +628,24 @@ pub async fn count_file_versions(pool: &PgPool, file_id: Uuid) -> Result<i64, sq /// Task spawning is now controlled by supervisors at the application level. /// Depth is no longer constrained in the database. pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, sqlx::Error> { - // Calculate depth and inherit settings from parent if applicable - let (depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = + // Calculate depth + inherit settings from parent if applicable. + let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = if let Some(parent_id) = req.parent_task_id { - // Fetch parent task to get depth and inherit settings let parent = get_task(pool, parent_id).await? .ok_or_else(|| sqlx::Error::RowNotFound)?; let new_depth = parent.depth + 1; - - // Subtasks inherit contract_id from parent (or use request contract_id if parent has none) - let contract_id = parent.contract_id.or(req.contract_id); - - // Inherit repo settings if not provided let repo_url = req.repository_url.clone().or(parent.repository_url); let base_branch = req.base_branch.clone().or(parent.base_branch); let target_branch = req.target_branch.clone().or(parent.target_branch); let merge_mode = req.merge_mode.clone().or(parent.merge_mode); let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path); - // NOTE: completion_action is NOT inherited - subtasks should not auto-merge. - // The supervisor integrates subtask work from their worktrees. let completion_action = req.completion_action.clone(); - (new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) + (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) } else { - // Top-level task: depth 0, use contract_id from request (may be None for branched tasks) ( 0, - req.contract_id, req.repository_url.clone(), req.base_branch.clone(), req.target_branch.clone(), @@ -699,23 +660,21 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, sqlx::query_as::<_, Task>( r#" INSERT INTO tasks ( - contract_id, parent_task_id, depth, name, description, plan, priority, - is_supervisor, repository_url, base_branch, target_branch, merge_mode, + parent_task_id, depth, name, description, plan, priority, + repository_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action, continue_from_task_id, copy_files, - branched_from_task_id, conversation_state, supervisor_worktree_task_id + branched_from_task_id, conversation_state ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) RETURNING * "#, ) - .bind(contract_id) .bind(req.parent_task_id) .bind(depth) .bind(&req.name) .bind(&req.description) .bind(&req.plan) .bind(req.priority) - .bind(req.is_supervisor) .bind(&repo_url) .bind(&base_branch) .bind(&target_branch) @@ -726,7 +685,6 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, .bind(©_files_json) .bind(&req.branched_from_task_id) .bind(&req.conversation_history) - .bind(&req.supervisor_worktree_task_id) .fetch_one(pool) .await } @@ -751,14 +709,12 @@ pub async fn list_tasks(pool: &PgPool) -> Result<Vec<TaskSummary>, sqlx::Error> sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, - c.status as contract_status, + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, - t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at + t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t - LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id IS NULL AND COALESCE(t.hidden, false) = false ORDER BY t.priority DESC, t.created_at DESC "#, @@ -772,14 +728,12 @@ pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSum sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, - c.status as contract_status, + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, - t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at + t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t - LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id = $1 ORDER BY t.priority DESC, t.created_at DESC "#, @@ -790,73 +744,6 @@ pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSum } /// List all tasks in a contract (for supervisor tree view). -pub async fn list_tasks_by_contract( - pool: &PgPool, - contract_id: Uuid, - owner_id: Uuid, -) -> Result<Vec<Task>, sqlx::Error> { - sqlx::query_as::<_, Task>( - r#" - SELECT * FROM tasks - WHERE contract_id = $1 AND owner_id = $2 - ORDER BY is_supervisor DESC, depth ASC, created_at ASC - "#, - ) - .bind(contract_id) - .bind(owner_id) - .fetch_all(pool) - .await -} - -/// Get pending tasks for a contract (non-supervisor tasks only). -/// Includes tasks that were interrupted (retry candidates). -/// Prioritizes interrupted tasks and excludes those that exceeded max_retries. -pub async fn get_pending_tasks_for_contract( - pool: &PgPool, - contract_id: Uuid, - owner_id: Uuid, -) -> Result<Vec<Task>, sqlx::Error> { - sqlx::query_as::<_, Task>( - r#" - SELECT t.* FROM tasks t - WHERE t.contract_id = $1 AND t.owner_id = $2 - AND t.status = 'pending' - AND t.retry_count < t.max_retries - AND t.is_supervisor = false - ORDER BY - t.interrupted_at DESC NULLS LAST, - t.priority DESC, - t.created_at ASC - "#, - ) - .bind(contract_id) - .bind(owner_id) - .fetch_all(pool) - .await -} - -/// Get all contracts that have pending tasks awaiting retry. -/// Returns tuples of (contract_id, owner_id) for contracts with retryable tasks. -pub async fn get_all_pending_task_contracts( - pool: &PgPool, -) -> Result<Vec<(Uuid, Uuid)>, sqlx::Error> { - sqlx::query_as::<_, (Uuid, Uuid)>( - r#" - SELECT DISTINCT t.contract_id, t.owner_id - FROM tasks t - WHERE t.contract_id IS NOT NULL - AND t.status = 'pending' - AND t.retry_count < t.max_retries - AND t.is_supervisor = false - ORDER BY t.owner_id, t.contract_id - "#, - ) - .fetch_all(pool) - .await -} - -/// Mark a task as pending for retry after daemon failure. -/// Increments retry count and adds the failed daemon to exclusion list. pub async fn mark_task_for_retry( pool: &PgPool, task_id: Uuid, @@ -1061,16 +948,13 @@ pub async fn create_task_for_owner( owner_id: Uuid, req: CreateTaskRequest, ) -> Result<Task, sqlx::Error> { - // Calculate depth and inherit settings from parent if applicable - let (depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = + // Calculate depth + inherit settings from parent if applicable. + let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = if let Some(parent_id) = req.parent_task_id { - // Fetch parent task to get depth and inherit settings (must belong to same owner) let parent = get_task_for_owner(pool, parent_id, owner_id).await? .ok_or_else(|| sqlx::Error::RowNotFound)?; let new_depth = parent.depth + 1; - - // Validate max depth if new_depth >= 2 { return Err(sqlx::Error::Protocol(format!( "Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.", @@ -1078,25 +962,17 @@ pub async fn create_task_for_owner( ))); } - // Subtasks inherit contract_id from parent (or use request contract_id if parent has none) - let contract_id = parent.contract_id.or(req.contract_id); - - // Inherit repo settings if not provided let repo_url = req.repository_url.clone().or(parent.repository_url); let base_branch = req.base_branch.clone().or(parent.base_branch); let target_branch = req.target_branch.clone().or(parent.target_branch); let merge_mode = req.merge_mode.clone().or(parent.merge_mode); let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path); - // NOTE: completion_action is NOT inherited - subtasks should not auto-merge. - // The orchestrator integrates subtask work from their worktrees. let completion_action = req.completion_action.clone(); - (new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) + (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) } else { - // Top-level task: depth 0, use contract_id from request (may be None for branched tasks) ( 0, - req.contract_id, req.repository_url.clone(), req.base_branch.clone(), req.target_branch.clone(), @@ -1108,14 +984,11 @@ pub async fn create_task_for_owner( let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default()); - // Resolve the directive_document_id. Tasks plumbed through this builder - // currently have no way to specify a document explicitly (we don't want - // to widen `CreateTaskRequest` for this — every call site would have to - // change). Instead, when the task is directive-driven (directive_id is - // set) we attach it to that directive's most recently-updated active - // document so the task lands under that document's tasks/ subfolder in - // the sidebar. Resolution failures are non-fatal — the task still gets - // created with directive_document_id = NULL, matching legacy behaviour. + // Resolve directive_document_id from the directive's currently- + // active contract row (directive_documents table) so the task + // lands under the right tasks/ subfolder in the sidebar. Failures + // are non-fatal — the task is created with NULL document_id and + // the sidebar tolerates that. let directive_document_id = match req.directive_id { Some(directive_id) => resolve_active_document_for_directive(pool, directive_id) .await @@ -1126,25 +999,23 @@ pub async fn create_task_for_owner( sqlx::query_as::<_, Task>( r#" INSERT INTO tasks ( - owner_id, contract_id, parent_task_id, depth, name, description, plan, priority, - is_supervisor, repository_url, base_branch, target_branch, merge_mode, + owner_id, parent_task_id, depth, name, description, plan, priority, + repository_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action, continue_from_task_id, copy_files, - branched_from_task_id, conversation_state, supervisor_worktree_task_id, + branched_from_task_id, conversation_state, directive_id, directive_step_id, directive_document_id ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20) RETURNING * "#, ) .bind(owner_id) - .bind(contract_id) .bind(req.parent_task_id) .bind(depth) .bind(&req.name) .bind(&req.description) .bind(&req.plan) .bind(req.priority) - .bind(req.is_supervisor) .bind(&repo_url) .bind(&base_branch) .bind(&target_branch) @@ -1155,7 +1026,6 @@ pub async fn create_task_for_owner( .bind(©_files_json) .bind(&req.branched_from_task_id) .bind(&req.conversation_history) - .bind(&req.supervisor_worktree_task_id) .bind(&req.directive_id) .bind(&req.directive_step_id) .bind(&directive_document_id) @@ -1226,14 +1096,12 @@ pub async fn list_tasks_for_owner( sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, - c.status as contract_status, + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, - t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at + t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t - LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.owner_id = $1 AND t.parent_task_id IS NULL AND COALESCE(t.hidden, false) = false ORDER BY t.priority DESC, t.created_at DESC "#, @@ -1335,14 +1203,12 @@ pub async fn list_ephemeral_directive_tasks_for_owner( sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, - c.status as contract_status, + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, - t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at + t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t - LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.owner_id = $1 AND t.directive_id = $2 AND t.directive_step_id IS NULL @@ -1369,14 +1235,12 @@ pub async fn list_tmp_tasks_for_owner( sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, - c.status as contract_status, + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, - t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at + t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t - LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.owner_id = $1 AND t.directive_id = $2 AND t.parent_task_id IS NULL @@ -1399,14 +1263,12 @@ pub async fn list_subtasks_for_owner( sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, - c.status as contract_status, + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, - t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at + t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t - LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.owner_id = $1 AND t.parent_task_id = $2 ORDER BY t.priority DESC, t.created_at DESC "#, @@ -1920,14 +1782,12 @@ pub async fn list_sibling_tasks( sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, - c.status as contract_status, + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, - t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at + t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t - LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id = $1 AND t.id != $2 ORDER BY t.priority DESC, t.created_at DESC "#, @@ -1942,14 +1802,12 @@ pub async fn list_sibling_tasks( sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, - c.status as contract_status, + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, - t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at + t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t - LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id IS NULL AND t.id != $1 ORDER BY t.priority DESC, t.created_at DESC "#, @@ -2121,1287 +1979,28 @@ pub async fn complete_task( Ok(task) } -// ============================================================================= -// Mesh Chat History Functions -// ============================================================================= - -/// Get or create the active conversation for an owner. -pub async fn get_or_create_active_conversation( - pool: &PgPool, - owner_id: Uuid, -) -> Result<MeshChatConversation, sqlx::Error> { - // Try to get existing active conversation for this owner - let existing = sqlx::query_as::<_, MeshChatConversation>( - r#" - SELECT * - FROM mesh_chat_conversations - WHERE is_active = true AND owner_id = $1 - LIMIT 1 - "#, - ) - .bind(owner_id) - .fetch_optional(pool) - .await?; - - if let Some(conv) = existing { - return Ok(conv); - } - - // Create new conversation - sqlx::query_as::<_, MeshChatConversation>( - r#" - INSERT INTO mesh_chat_conversations (owner_id, is_active) - VALUES ($1, true) - RETURNING * - "#, - ) - .bind(owner_id) - .fetch_one(pool) - .await -} - -/// List messages for a conversation. -pub async fn list_chat_messages( - pool: &PgPool, - conversation_id: Uuid, - limit: Option<i32>, -) -> Result<Vec<MeshChatMessageRecord>, sqlx::Error> { - let limit = limit.unwrap_or(100); - sqlx::query_as::<_, MeshChatMessageRecord>( - r#" - SELECT * - FROM mesh_chat_messages - WHERE conversation_id = $1 - ORDER BY created_at ASC - LIMIT $2 - "#, - ) - .bind(conversation_id) - .bind(limit) - .fetch_all(pool) - .await -} - -/// Add a message to a conversation. -#[allow(clippy::too_many_arguments)] -pub async fn add_chat_message( - pool: &PgPool, - conversation_id: Uuid, - role: &str, - content: &str, - context_type: &str, - context_task_id: Option<Uuid>, - tool_calls: Option<serde_json::Value>, - pending_questions: Option<serde_json::Value>, -) -> Result<MeshChatMessageRecord, sqlx::Error> { - sqlx::query_as::<_, MeshChatMessageRecord>( - r#" - INSERT INTO mesh_chat_messages - (conversation_id, role, content, context_type, context_task_id, tool_calls, pending_questions) - VALUES ($1, $2, $3, $4, $5, $6, $7) - RETURNING * - "#, - ) - .bind(conversation_id) - .bind(role) - .bind(content) - .bind(context_type) - .bind(context_task_id) - .bind(tool_calls) - .bind(pending_questions) - .fetch_one(pool) - .await -} - -/// Clear conversation (archive existing and create new). -pub async fn clear_conversation(pool: &PgPool, owner_id: Uuid) -> Result<MeshChatConversation, sqlx::Error> { - // Mark existing as inactive for this owner - sqlx::query( - r#" - UPDATE mesh_chat_conversations - SET is_active = false, updated_at = NOW() - WHERE is_active = true AND owner_id = $1 - "#, - ) - .bind(owner_id) - .execute(pool) - .await?; - - // Create new active conversation - get_or_create_active_conversation(pool, owner_id).await -} - -// ============================================================================= -// Contract Chat History Functions -// ============================================================================= - -/// Get or create the active conversation for a contract. -pub async fn get_or_create_contract_conversation( - pool: &PgPool, - contract_id: Uuid, - owner_id: Uuid, -) -> Result<ContractChatConversation, sqlx::Error> { - // Try to get existing active conversation for this contract - let existing = sqlx::query_as::<_, ContractChatConversation>( - r#" - SELECT * - FROM contract_chat_conversations - WHERE is_active = true AND contract_id = $1 AND owner_id = $2 - LIMIT 1 - "#, - ) - .bind(contract_id) - .bind(owner_id) - .fetch_optional(pool) - .await?; - - if let Some(conv) = existing { - return Ok(conv); - } - - // Create new conversation - sqlx::query_as::<_, ContractChatConversation>( - r#" - INSERT INTO contract_chat_conversations (contract_id, owner_id, is_active) - VALUES ($1, $2, true) - RETURNING * - "#, - ) - .bind(contract_id) - .bind(owner_id) - .fetch_one(pool) - .await -} - -/// List messages for a contract conversation. -pub async fn list_contract_chat_messages( - pool: &PgPool, - conversation_id: Uuid, - limit: Option<i32>, -) -> Result<Vec<ContractChatMessageRecord>, sqlx::Error> { - let limit = limit.unwrap_or(100); - sqlx::query_as::<_, ContractChatMessageRecord>( - r#" - SELECT * - FROM contract_chat_messages - WHERE conversation_id = $1 - ORDER BY created_at ASC - LIMIT $2 - "#, - ) - .bind(conversation_id) - .bind(limit) - .fetch_all(pool) - .await -} - -/// Add a message to a contract conversation. -pub async fn add_contract_chat_message( - pool: &PgPool, - conversation_id: Uuid, - role: &str, - content: &str, - tool_calls: Option<serde_json::Value>, - pending_questions: Option<serde_json::Value>, -) -> Result<ContractChatMessageRecord, sqlx::Error> { - sqlx::query_as::<_, ContractChatMessageRecord>( - r#" - INSERT INTO contract_chat_messages - (conversation_id, role, content, tool_calls, pending_questions) - VALUES ($1, $2, $3, $4, $5) - RETURNING * - "#, - ) - .bind(conversation_id) - .bind(role) - .bind(content) - .bind(tool_calls) - .bind(pending_questions) - .fetch_one(pool) - .await -} - -/// Clear contract conversation (archive existing and create new). -pub async fn clear_contract_conversation( - pool: &PgPool, - contract_id: Uuid, - owner_id: Uuid, -) -> Result<ContractChatConversation, sqlx::Error> { - // Mark existing as inactive for this contract - sqlx::query( - r#" - UPDATE contract_chat_conversations - SET is_active = false, updated_at = NOW() - WHERE is_active = true AND contract_id = $1 AND owner_id = $2 - "#, - ) - .bind(contract_id) - .bind(owner_id) - .execute(pool) - .await?; - - // Create new active conversation - get_or_create_contract_conversation(pool, contract_id, owner_id).await -} - -// ============================================================================= -// Contract Type Template Functions (Owner-Scoped) -// ============================================================================= - -/// Create a new contract type template for a specific owner. -pub async fn create_template_for_owner( - pool: &PgPool, - owner_id: Uuid, - req: CreateTemplateRequest, -) -> Result<ContractTypeTemplateRecord, sqlx::Error> { - sqlx::query_as::<_, ContractTypeTemplateRecord>( - r#" - INSERT INTO contract_type_templates (owner_id, name, description, phases, default_phase, deliverables) - VALUES ($1, $2, $3, $4, $5, $6) - RETURNING * - "#, - ) - .bind(owner_id) - .bind(&req.name) - .bind(&req.description) - .bind(serde_json::to_value(&req.phases).unwrap_or_default()) - .bind(&req.default_phase) - .bind(match &req.deliverables { - Some(d) => serde_json::to_value(d).ok(), - None => None, - }) - .fetch_one(pool) - .await -} - -/// Get a contract type template by ID, scoped to owner. -pub async fn get_template_for_owner( - pool: &PgPool, - id: Uuid, - owner_id: Uuid, -) -> Result<Option<ContractTypeTemplateRecord>, sqlx::Error> { - sqlx::query_as::<_, ContractTypeTemplateRecord>( - r#" - SELECT * - FROM contract_type_templates - WHERE id = $1 AND owner_id = $2 - "#, - ) - .bind(id) - .bind(owner_id) - .fetch_optional(pool) - .await -} - -/// Get a contract type template by ID (internal use, no owner scoping). -pub async fn get_template_by_id( - pool: &PgPool, - id: Uuid, -) -> Result<Option<ContractTypeTemplateRecord>, sqlx::Error> { - sqlx::query_as::<_, ContractTypeTemplateRecord>( - r#" - SELECT * - FROM contract_type_templates - WHERE id = $1 - "#, - ) - .bind(id) - .fetch_optional(pool) - .await -} - -/// List all contract type templates for an owner, ordered by name. -pub async fn list_templates_for_owner( - pool: &PgPool, - owner_id: Uuid, -) -> Result<Vec<ContractTypeTemplateRecord>, sqlx::Error> { - sqlx::query_as::<_, ContractTypeTemplateRecord>( - r#" - SELECT * - FROM contract_type_templates - WHERE owner_id = $1 - ORDER BY name ASC - "#, - ) - .bind(owner_id) - .fetch_all(pool) - .await -} - -/// Update a contract type template for an owner. -pub async fn update_template_for_owner( - pool: &PgPool, - id: Uuid, - owner_id: Uuid, - req: UpdateTemplateRequest, -) -> Result<Option<ContractTypeTemplateRecord>, RepositoryError> { - // Build dynamic update query - let mut query = String::from("UPDATE contract_type_templates SET updated_at = NOW()"); - let mut param_idx = 3; // $1 = id, $2 = owner_id - - if req.name.is_some() { - query.push_str(&format!(", name = ${}", param_idx)); - param_idx += 1; - } - if req.description.is_some() { - query.push_str(&format!(", description = ${}", param_idx)); - param_idx += 1; - } - if req.phases.is_some() { - query.push_str(&format!(", phases = ${}", param_idx)); - param_idx += 1; - } - if req.default_phase.is_some() { - query.push_str(&format!(", default_phase = ${}", param_idx)); - param_idx += 1; - } - if req.deliverables.is_some() { - query.push_str(&format!(", deliverables = ${}", param_idx)); - param_idx += 1; - } - - // Optimistic locking - if req.version.is_some() { - query.push_str(&format!(", version = version + 1 WHERE id = $1 AND owner_id = $2 AND version = ${}", param_idx)); - } else { - query.push_str(", version = version + 1 WHERE id = $1 AND owner_id = $2"); - } - query.push_str(" RETURNING *"); - - let mut sql_query = sqlx::query_as::<_, ContractTypeTemplateRecord>(&query); - sql_query = sql_query.bind(id).bind(owner_id); - - if let Some(ref name) = req.name { - sql_query = sql_query.bind(name); - } - if let Some(ref description) = req.description { - sql_query = sql_query.bind(description); - } - if let Some(ref phases) = req.phases { - sql_query = sql_query.bind(serde_json::to_value(phases).unwrap_or_default()); - } - if let Some(ref default_phase) = req.default_phase { - sql_query = sql_query.bind(default_phase); - } - if let Some(ref deliverables) = req.deliverables { - sql_query = sql_query.bind(serde_json::to_value(deliverables).unwrap_or_default()); - } - if let Some(version) = req.version { - sql_query = sql_query.bind(version); - } - - match sql_query.fetch_optional(pool).await { - Ok(result) => { - if result.is_none() && req.version.is_some() { - // Check if it's a version conflict - if let Some(current) = get_template_for_owner(pool, id, owner_id).await? { - return Err(RepositoryError::VersionConflict { - expected: req.version.unwrap(), - actual: current.version, - }); - } - } - Ok(result) - } - Err(e) => Err(RepositoryError::Database(e)), - } -} - -/// Delete a contract type template for an owner. -pub async fn delete_template_for_owner( - pool: &PgPool, - id: Uuid, - owner_id: Uuid, -) -> Result<bool, sqlx::Error> { - let result = sqlx::query( - r#" - DELETE FROM contract_type_templates - WHERE id = $1 AND owner_id = $2 - "#, - ) - .bind(id) - .bind(owner_id) - .execute(pool) - .await?; - - Ok(result.rows_affected() > 0) -} - -/// Helper function to build PhaseConfig from a template. -pub fn build_phase_config_from_template(template: &ContractTypeTemplateRecord) -> PhaseConfig { - PhaseConfig { - phases: template.phases.clone(), - default_phase: template.default_phase.clone(), - deliverables: template.deliverables.clone().unwrap_or_default(), - } -} - -/// Helper function to build PhaseConfig for built-in contract types. -pub fn build_phase_config_for_builtin(contract_type: &str) -> PhaseConfig { - match contract_type { - "simple" => PhaseConfig { - phases: vec![ - PhaseDefinition { id: "plan".to_string(), name: "Plan".to_string(), order: 0 }, - PhaseDefinition { id: "execute".to_string(), name: "Execute".to_string(), order: 1 }, - ], - default_phase: "plan".to_string(), - deliverables: [ - ("plan".to_string(), vec![DeliverableDefinition { - id: "plan-document".to_string(), - name: "Plan".to_string(), - priority: "required".to_string(), - }]), - ("execute".to_string(), vec![DeliverableDefinition { - id: "pull-request".to_string(), - name: "Pull Request".to_string(), - priority: "required".to_string(), - }]), - ].into_iter().collect(), - }, - "specification" => PhaseConfig { - phases: vec![ - PhaseDefinition { id: "research".to_string(), name: "Research".to_string(), order: 0 }, - PhaseDefinition { id: "specify".to_string(), name: "Specify".to_string(), order: 1 }, - PhaseDefinition { id: "plan".to_string(), name: "Plan".to_string(), order: 2 }, - PhaseDefinition { id: "execute".to_string(), name: "Execute".to_string(), order: 3 }, - PhaseDefinition { id: "review".to_string(), name: "Review".to_string(), order: 4 }, - ], - default_phase: "research".to_string(), - deliverables: [ - ("research".to_string(), vec![DeliverableDefinition { - id: "research-notes".to_string(), - name: "Research Notes".to_string(), - priority: "required".to_string(), - }]), - ("specify".to_string(), vec![DeliverableDefinition { - id: "requirements-document".to_string(), - name: "Requirements Document".to_string(), - priority: "required".to_string(), - }]), - ("plan".to_string(), vec![DeliverableDefinition { - id: "plan-document".to_string(), - name: "Plan".to_string(), - priority: "required".to_string(), - }]), - ("execute".to_string(), vec![DeliverableDefinition { - id: "pull-request".to_string(), - name: "Pull Request".to_string(), - priority: "required".to_string(), - }]), - ("review".to_string(), vec![DeliverableDefinition { - id: "release-notes".to_string(), - name: "Release Notes".to_string(), - priority: "required".to_string(), - }]), - ].into_iter().collect(), - }, - "execute" | _ => PhaseConfig { - phases: vec![ - PhaseDefinition { id: "execute".to_string(), name: "Execute".to_string(), order: 0 }, - ], - default_phase: "execute".to_string(), - deliverables: std::collections::HashMap::new(), - }, - } -} - -// ============================================================================= -// Contract Functions (Owner-Scoped) -// ============================================================================= - -/// Create a new contract for a specific owner. -/// Supports both built-in contract types (simple, specification, execute) and custom templates. -pub async fn create_contract_for_owner( - pool: &PgPool, - owner_id: Uuid, - req: CreateContractRequest, -) -> Result<Contract, sqlx::Error> { - // Determine phase configuration based on template_id or contract_type - let (phase_config, contract_type_str, default_phase): (PhaseConfig, String, String) = - if let Some(template_id) = req.template_id { - // Look up the custom template - let template = get_template_by_id(pool, template_id) - .await? - .ok_or_else(|| { - sqlx::Error::Protocol(format!("Template not found: {}", template_id)) - })?; - - let config = build_phase_config_from_template(&template); - let default = config.default_phase.clone(); - // For custom templates, store the template name as the contract_type - (config, template.name.clone(), default) - } else { - // Use built-in contract type - let contract_type = req.contract_type.as_deref().unwrap_or("simple"); - - // Validate contract type - let valid_types = ["simple", "specification", "execute"]; - if !valid_types.contains(&contract_type) { - return Err(sqlx::Error::Protocol(format!( - "Invalid contract_type '{}'. Must be one of: {} or provide a template_id", - contract_type, - valid_types.join(", ") - ))); - } - - let config = build_phase_config_for_builtin(contract_type); - let default = config.default_phase.clone(); - (config, contract_type.to_string(), default) - }; - - // Get valid phase IDs from the configuration - let valid_phase_ids: Vec<String> = phase_config.phases.iter().map(|p| p.id.clone()).collect(); - - // Use provided initial_phase or default based on contract type/template - let phase = req.initial_phase.as_deref().unwrap_or(&default_phase); - - // Validate the phase is valid for this contract type/template - if !valid_phase_ids.contains(&phase.to_string()) { - return Err(sqlx::Error::Protocol(format!( - "Invalid initial_phase '{}' for contract type '{}'. Must be one of: {}", - phase, - contract_type_str, - valid_phase_ids.join(", ") - ))); - } - - let autonomous_loop = req.autonomous_loop.unwrap_or(false); - let phase_guard = req.phase_guard.unwrap_or(false); - let local_only = req.local_only.unwrap_or(false); - let auto_merge_local = req.auto_merge_local.unwrap_or(false); - - // Serialize phase_config to JSON - let phase_config_json = serde_json::to_value(&phase_config).ok(); - - sqlx::query_as::<_, Contract>( - r#" - INSERT INTO contracts (owner_id, name, description, contract_type, phase, autonomous_loop, phase_guard, local_only, auto_merge_local, phase_config) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) - RETURNING * - "#, - ) - .bind(owner_id) - .bind(&req.name) - .bind(&req.description) - .bind(&contract_type_str) - .bind(phase) - .bind(autonomous_loop) - .bind(phase_guard) - .bind(local_only) - .bind(auto_merge_local) - .bind(phase_config_json) - .fetch_one(pool) - .await -} - -/// Get a contract by ID, scoped to owner. -pub async fn get_contract_for_owner( - pool: &PgPool, - id: Uuid, - owner_id: Uuid, -) -> Result<Option<Contract>, sqlx::Error> { - sqlx::query_as::<_, Contract>( - r#" - SELECT * - FROM contracts - WHERE id = $1 AND owner_id = $2 - "#, - ) - .bind(id) - .bind(owner_id) - .fetch_optional(pool) - .await -} - -/// List all contracts for an owner, ordered by created_at DESC. -pub async fn list_contracts_for_owner( - pool: &PgPool, - owner_id: Uuid, -) -> Result<Vec<ContractSummary>, sqlx::Error> { - sqlx::query_as::<_, ContractSummary>( - r#" - SELECT - c.id, c.name, c.description, c.contract_type, c.phase, c.status, - c.supervisor_task_id, c.local_only, c.auto_merge_local, c.version, c.created_at, - (SELECT COUNT(*) FROM files WHERE contract_id = c.id) as file_count, - (SELECT COUNT(*) FROM tasks WHERE contract_id = c.id) as task_count, - (SELECT COUNT(*) FROM contract_repositories WHERE contract_id = c.id) as repository_count - FROM contracts c - WHERE c.owner_id = $1 - ORDER BY c.created_at DESC - "#, - ) - .bind(owner_id) - .fetch_all(pool) - .await -} - -/// Get contract summary by ID. -pub async fn get_contract_summary_for_owner( - pool: &PgPool, - id: Uuid, - owner_id: Uuid, -) -> Result<Option<ContractSummary>, sqlx::Error> { - sqlx::query_as::<_, ContractSummary>( - r#" - SELECT - c.id, c.name, c.description, c.contract_type, c.phase, c.status, - c.supervisor_task_id, c.local_only, c.auto_merge_local, c.version, c.created_at, - (SELECT COUNT(*) FROM files WHERE contract_id = c.id) as file_count, - (SELECT COUNT(*) FROM tasks WHERE contract_id = c.id) as task_count, - (SELECT COUNT(*) FROM contract_repositories WHERE contract_id = c.id) as repository_count - FROM contracts c - WHERE c.id = $1 AND c.owner_id = $2 - "#, - ) - .bind(id) - .bind(owner_id) - .fetch_optional(pool) - .await -} - -/// Update a contract by ID with optimistic locking, scoped to owner. -pub async fn update_contract_for_owner( - pool: &PgPool, - id: Uuid, - owner_id: Uuid, - req: UpdateContractRequest, -) -> Result<Option<Contract>, RepositoryError> { - let existing = get_contract_for_owner(pool, id, owner_id).await?; - let Some(existing) = existing else { - return Ok(None); - }; - - // Check version if provided (optimistic locking) - if let Some(expected_version) = req.version { - if existing.version != expected_version { - return Err(RepositoryError::VersionConflict { - expected: expected_version, - actual: existing.version, - }); - } - } - - // Apply updates - let name = req.name.unwrap_or(existing.name); - let description = req.description.or(existing.description); - let phase = req.phase.unwrap_or(existing.phase); - let status = req.status.unwrap_or(existing.status); - let supervisor_task_id = req.supervisor_task_id.or(existing.supervisor_task_id); - let autonomous_loop = req.autonomous_loop.unwrap_or(existing.autonomous_loop); - let phase_guard = req.phase_guard.unwrap_or(existing.phase_guard); - let local_only = req.local_only.unwrap_or(existing.local_only); - let auto_merge_local = req.auto_merge_local.unwrap_or(existing.auto_merge_local); - - let result = if req.version.is_some() { - sqlx::query_as::<_, Contract>( - r#" - UPDATE contracts - SET name = $3, description = $4, phase = $5, status = $6, - supervisor_task_id = $7, autonomous_loop = $8, phase_guard = $9, local_only = $10, auto_merge_local = $11, version = version + 1, updated_at = NOW() - WHERE id = $1 AND owner_id = $2 AND version = $12 - RETURNING * - "#, - ) - .bind(id) - .bind(owner_id) - .bind(&name) - .bind(&description) - .bind(&phase) - .bind(&status) - .bind(supervisor_task_id) - .bind(autonomous_loop) - .bind(phase_guard) - .bind(local_only) - .bind(auto_merge_local) - .bind(req.version.unwrap()) - .fetch_optional(pool) - .await? - } else { - sqlx::query_as::<_, Contract>( - r#" - UPDATE contracts - SET name = $3, description = $4, phase = $5, status = $6, - supervisor_task_id = $7, autonomous_loop = $8, phase_guard = $9, local_only = $10, auto_merge_local = $11, version = version + 1, updated_at = NOW() - WHERE id = $1 AND owner_id = $2 - RETURNING * - "#, - ) - .bind(id) - .bind(owner_id) - .bind(&name) - .bind(&description) - .bind(&phase) - .bind(&status) - .bind(supervisor_task_id) - .bind(autonomous_loop) - .bind(phase_guard) - .bind(local_only) - .bind(auto_merge_local) - .fetch_optional(pool) - .await? - }; - - // If versioned update returned None, there was a race condition - if result.is_none() && req.version.is_some() { - if let Some(current) = get_contract_for_owner(pool, id, owner_id).await? { - return Err(RepositoryError::VersionConflict { - expected: req.version.unwrap(), - actual: current.version, - }); - } - } - - Ok(result) -} - -/// Delete a contract by ID, scoped to owner. -pub async fn delete_contract_for_owner( - pool: &PgPool, - id: Uuid, - owner_id: Uuid, -) -> Result<bool, sqlx::Error> { - let result = sqlx::query( - r#" - DELETE FROM contracts - WHERE id = $1 AND owner_id = $2 - "#, - ) - .bind(id) - .bind(owner_id) - .execute(pool) - .await?; - - Ok(result.rows_affected() > 0) -} - -/// Change contract phase and record event. -/// -/// This is the simple version without version checking. Use `change_contract_phase_with_version` -/// for explicit version conflict detection. -pub async fn change_contract_phase_for_owner( - pool: &PgPool, - id: Uuid, - owner_id: Uuid, - new_phase: &str, -) -> Result<Option<Contract>, sqlx::Error> { - // Get current phase - let existing = get_contract_for_owner(pool, id, owner_id).await?; - let Some(existing) = existing else { - return Ok(None); - }; - - let previous_phase = existing.phase.clone(); - - // Update phase - let contract = sqlx::query_as::<_, Contract>( - r#" - UPDATE contracts - SET phase = $3, version = version + 1, updated_at = NOW() - WHERE id = $1 AND owner_id = $2 - RETURNING * - "#, - ) - .bind(id) - .bind(owner_id) - .bind(new_phase) - .fetch_optional(pool) - .await?; - - // Record event - if contract.is_some() { - sqlx::query( - r#" - INSERT INTO contract_events (contract_id, event_type, previous_phase, new_phase) - VALUES ($1, 'phase_change', $2, $3) - "#, - ) - .bind(id) - .bind(&previous_phase) - .bind(new_phase) - .execute(pool) - .await?; - } - - Ok(contract) -} - -/// Change contract phase with explicit version checking for conflict detection. -/// -/// Uses `SELECT ... FOR UPDATE` to lock the row and prevent race conditions. -/// Returns `PhaseChangeResult::VersionConflict` if the expected version doesn't match. -pub async fn change_contract_phase_with_version( - pool: &PgPool, - id: Uuid, - owner_id: Uuid, - new_phase: &str, - expected_version: Option<i32>, -) -> Result<PhaseChangeResult, sqlx::Error> { - // Start a transaction to ensure atomicity with row locking - let mut tx = pool.begin().await?; - - // Lock the row with SELECT FOR UPDATE and get current state - let existing: Option<Contract> = sqlx::query_as::<_, Contract>( - r#" - SELECT * - FROM contracts - WHERE id = $1 AND owner_id = $2 - FOR UPDATE - "#, - ) - .bind(id) - .bind(owner_id) - .fetch_optional(&mut *tx) - .await?; - - let Some(existing) = existing else { - tx.rollback().await?; - return Ok(PhaseChangeResult::NotFound); - }; - - // Check version if provided (optimistic locking) - if let Some(expected) = expected_version { - if existing.version != expected { - tx.rollback().await?; - return Ok(PhaseChangeResult::VersionConflict { - expected, - actual: existing.version, - current_phase: existing.phase, - }); - } - } - - // Validate the phase transition is allowed - let valid_phases = existing.valid_phase_ids(); - if !valid_phases.contains(&new_phase.to_string()) { - tx.rollback().await?; - return Ok(PhaseChangeResult::ValidationFailed { - reason: format!( - "Invalid phase '{}' for contract type '{}'", - new_phase, existing.contract_type - ), - missing_requirements: vec![format!( - "Phase must be one of: {}", - valid_phases.join(", ") - )], - }); - } - - let previous_phase = existing.phase.clone(); - - // Update phase with version increment - let contract = sqlx::query_as::<_, Contract>( - r#" - UPDATE contracts - SET phase = $3, version = version + 1, updated_at = NOW() - WHERE id = $1 AND owner_id = $2 - RETURNING * - "#, - ) - .bind(id) - .bind(owner_id) - .bind(new_phase) - .fetch_one(&mut *tx) - .await?; - - // Record event - sqlx::query( - r#" - INSERT INTO contract_events (contract_id, event_type, previous_phase, new_phase) - VALUES ($1, 'phase_change', $2, $3) - "#, - ) - .bind(id) - .bind(&previous_phase) - .bind(new_phase) - .execute(&mut *tx) - .await?; - - // Commit the transaction - tx.commit().await?; - - Ok(PhaseChangeResult::Success(contract)) -} - -// ============================================================================= -// Contract Repository Functions -// ============================================================================= - -/// List repositories for a contract. -pub async fn list_contract_repositories( - pool: &PgPool, - contract_id: Uuid, -) -> Result<Vec<ContractRepository>, sqlx::Error> { - sqlx::query_as::<_, ContractRepository>( - r#" - SELECT * - FROM contract_repositories - WHERE contract_id = $1 - ORDER BY is_primary DESC, created_at ASC - "#, - ) - .bind(contract_id) - .fetch_all(pool) - .await -} - -/// Add a remote repository to a contract. -pub async fn add_remote_repository( - pool: &PgPool, - contract_id: Uuid, - name: &str, - repository_url: &str, - is_primary: bool, -) -> Result<ContractRepository, sqlx::Error> { - // If is_primary, clear other primaries first - if is_primary { - sqlx::query( - r#" - UPDATE contract_repositories - SET is_primary = false, updated_at = NOW() - WHERE contract_id = $1 AND is_primary = true - "#, - ) - .bind(contract_id) - .execute(pool) - .await?; - } - sqlx::query_as::<_, ContractRepository>( - r#" - INSERT INTO contract_repositories (contract_id, name, repository_url, source_type, status, is_primary) - VALUES ($1, $2, $3, 'remote', 'ready', $4) - RETURNING * - "#, - ) - .bind(contract_id) - .bind(name) - .bind(repository_url) - .bind(is_primary) - .fetch_one(pool) - .await -} - -/// Add a local repository to a contract. -pub async fn add_local_repository( - pool: &PgPool, - contract_id: Uuid, - name: &str, - local_path: &str, - is_primary: bool, -) -> Result<ContractRepository, sqlx::Error> { - // If is_primary, clear other primaries first - if is_primary { - sqlx::query( - r#" - UPDATE contract_repositories - SET is_primary = false, updated_at = NOW() - WHERE contract_id = $1 AND is_primary = true - "#, - ) - .bind(contract_id) - .execute(pool) - .await?; - } - - sqlx::query_as::<_, ContractRepository>( +/// Get task tree from a specific root task. +pub async fn get_task_tree(pool: &PgPool, root_task_id: Uuid) -> Result<Vec<Task>, sqlx::Error> { + sqlx::query_as::<_, Task>( r#" - INSERT INTO contract_repositories (contract_id, name, local_path, source_type, status, is_primary) - VALUES ($1, $2, $3, 'local', 'ready', $4) - RETURNING * - "#, - ) - .bind(contract_id) - .bind(name) - .bind(local_path) - .bind(is_primary) - .fetch_one(pool) - .await -} - -/// Create a managed repository (daemon will create it). -pub async fn create_managed_repository( - pool: &PgPool, - contract_id: Uuid, - name: &str, - is_primary: bool, -) -> Result<ContractRepository, sqlx::Error> { - // If is_primary, clear other primaries first - if is_primary { - sqlx::query( - r#" - UPDATE contract_repositories - SET is_primary = false, updated_at = NOW() - WHERE contract_id = $1 AND is_primary = true - "#, + WITH RECURSIVE task_tree AS ( + -- Base case: the root task + SELECT * FROM tasks WHERE id = $1 + UNION ALL + -- Recursive case: children of current level + SELECT t.* FROM tasks t + JOIN task_tree tt ON t.parent_task_id = tt.id ) - .bind(contract_id) - .execute(pool) - .await?; - } - - sqlx::query_as::<_, ContractRepository>( - r#" - INSERT INTO contract_repositories (contract_id, name, source_type, status, is_primary) - VALUES ($1, $2, 'managed', 'pending', $3) - RETURNING * - "#, - ) - .bind(contract_id) - .bind(name) - .bind(is_primary) - .fetch_one(pool) - .await -} - -/// Delete a repository from a contract. -pub async fn delete_contract_repository( - pool: &PgPool, - repo_id: Uuid, - contract_id: Uuid, -) -> Result<bool, sqlx::Error> { - let result = sqlx::query( - r#" - DELETE FROM contract_repositories - WHERE id = $1 AND contract_id = $2 - "#, - ) - .bind(repo_id) - .bind(contract_id) - .execute(pool) - .await?; - - Ok(result.rows_affected() > 0) -} - -/// Set a repository as primary (and clear others). -pub async fn set_repository_primary( - pool: &PgPool, - repo_id: Uuid, - contract_id: Uuid, -) -> Result<bool, sqlx::Error> { - // Clear other primaries - sqlx::query( - r#" - UPDATE contract_repositories - SET is_primary = false, updated_at = NOW() - WHERE contract_id = $1 AND is_primary = true - "#, - ) - .bind(contract_id) - .execute(pool) - .await?; - - // Set this one as primary - let result = sqlx::query( - r#" - UPDATE contract_repositories - SET is_primary = true, updated_at = NOW() - WHERE id = $1 AND contract_id = $2 - "#, - ) - .bind(repo_id) - .bind(contract_id) - .execute(pool) - .await?; - - Ok(result.rows_affected() > 0) -} - -/// Update managed repository status (used by daemon). -pub async fn update_managed_repository_status( - pool: &PgPool, - repo_id: Uuid, - status: &str, - repository_url: Option<&str>, -) -> Result<Option<ContractRepository>, sqlx::Error> { - sqlx::query_as::<_, ContractRepository>( - r#" - UPDATE contract_repositories - SET status = $2, repository_url = COALESCE($3, repository_url), updated_at = NOW() - WHERE id = $1 - RETURNING * - "#, - ) - .bind(repo_id) - .bind(status) - .bind(repository_url) - .fetch_optional(pool) - .await -} - -// ============================================================================= -// Contract Task Association Functions -// ============================================================================= - -/// Add a task to a contract. -pub async fn add_task_to_contract( - pool: &PgPool, - contract_id: Uuid, - task_id: Uuid, - owner_id: Uuid, -) -> Result<bool, sqlx::Error> { - let result = sqlx::query( - r#" - UPDATE tasks - SET contract_id = $2, updated_at = NOW() - WHERE id = $1 AND owner_id = $3 - "#, - ) - .bind(task_id) - .bind(contract_id) - .bind(owner_id) - .execute(pool) - .await?; - - Ok(result.rows_affected() > 0) -} - -/// Remove a task from a contract. -pub async fn remove_task_from_contract( - pool: &PgPool, - contract_id: Uuid, - task_id: Uuid, - owner_id: Uuid, -) -> Result<bool, sqlx::Error> { - let result = sqlx::query( - r#" - UPDATE tasks - SET contract_id = NULL, updated_at = NOW() - WHERE id = $1 AND contract_id = $2 AND owner_id = $3 - "#, - ) - .bind(task_id) - .bind(contract_id) - .bind(owner_id) - .execute(pool) - .await?; - - Ok(result.rows_affected() > 0) -} - -/// List files in a contract. -pub async fn list_files_in_contract( - pool: &PgPool, - contract_id: Uuid, - owner_id: Uuid, -) -> Result<Vec<FileSummary>, sqlx::Error> { - // Use a manual query since FileSummary doesn't have a FromRow derive with all the computed fields - let files = sqlx::query_as::<_, File>( - r#" - SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at - FROM files - WHERE contract_id = $1 AND owner_id = $2 - ORDER BY created_at DESC - "#, - ) - .bind(contract_id) - .bind(owner_id) - .fetch_all(pool) - .await?; - - Ok(files.into_iter().map(FileSummary::from).collect()) -} - -/// List tasks in a contract. -pub async fn list_tasks_in_contract( - pool: &PgPool, - contract_id: Uuid, - owner_id: Uuid, -) -> Result<Vec<TaskSummary>, sqlx::Error> { - sqlx::query_as::<_, TaskSummary>( - r#" - SELECT - t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, - c.status as contract_status, - t.parent_task_id, t.depth, t.name, t.status, t.priority, - t.progress_summary, - (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, - t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at - FROM tasks t - LEFT JOIN contracts c ON t.contract_id = c.id - WHERE t.contract_id = $1 AND t.owner_id = $2 - ORDER BY t.priority DESC, t.created_at DESC - "#, - ) - .bind(contract_id) - .bind(owner_id) - .fetch_all(pool) - .await -} - -/// Minimal task info for worktree cleanup operations. -#[derive(Debug, Clone, sqlx::FromRow)] -pub struct TaskWorktreeInfo { - pub id: Uuid, - pub daemon_id: Option<Uuid>, - pub overlay_path: Option<String>, - /// If set, this task shares the worktree of the specified supervisor task. - /// Should NOT have its worktree deleted during cleanup. - pub supervisor_worktree_task_id: Option<Uuid>, -} - -/// List tasks in a contract with their daemon/worktree info. -/// Used for cleaning up worktrees when a contract is completed or deleted. -pub async fn list_contract_tasks_with_worktree_info( - pool: &PgPool, - contract_id: Uuid, -) -> Result<Vec<TaskWorktreeInfo>, sqlx::Error> { - sqlx::query_as::<_, TaskWorktreeInfo>( - r#" - SELECT id, daemon_id, overlay_path, supervisor_worktree_task_id - FROM tasks - WHERE contract_id = $1 AND (daemon_id IS NOT NULL OR overlay_path IS NOT NULL) - "#, - ) - .bind(contract_id) - .fetch_all(pool) - .await -} - -// ============================================================================= -// Contract Events -// ============================================================================= - -/// List events for a contract. -pub async fn list_contract_events( - pool: &PgPool, - contract_id: Uuid, -) -> Result<Vec<ContractEvent>, sqlx::Error> { - sqlx::query_as::<_, ContractEvent>( - r#" - SELECT * - FROM contract_events - WHERE contract_id = $1 - ORDER BY created_at DESC + SELECT * FROM task_tree + ORDER BY depth, created_at "#, ) - .bind(contract_id) + .bind(root_task_id) .fetch_all(pool) .await } -/// Record a contract event. -pub async fn record_contract_event( - pool: &PgPool, - contract_id: Uuid, - event_type: &str, - event_data: Option<serde_json::Value>, -) -> Result<ContractEvent, sqlx::Error> { - sqlx::query_as::<_, ContractEvent>( - r#" - INSERT INTO contract_events (contract_id, event_type, event_data) - VALUES ($1, $2, $3) - RETURNING * - "#, - ) - .bind(contract_id) - .bind(event_type) - .bind(event_data) - .fetch_one(pool) - .await -} - // ============================================================================ // Task Checkpoints // ============================================================================ @@ -3501,713 +2100,6 @@ pub async fn list_task_checkpoints( } // ============================================================================ -// Supervisor State -// ============================================================================ - -/// Create or update supervisor state for a contract. -pub async fn upsert_supervisor_state( - pool: &PgPool, - contract_id: Uuid, - task_id: Uuid, - conversation_history: serde_json::Value, - pending_task_ids: &[Uuid], - phase: &str, -) -> Result<SupervisorState, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - INSERT INTO supervisor_states (contract_id, task_id, conversation_history, pending_task_ids, phase, last_activity) - VALUES ($1, $2, $3, $4, $5, NOW()) - ON CONFLICT (contract_id) DO UPDATE SET - task_id = EXCLUDED.task_id, - conversation_history = EXCLUDED.conversation_history, - pending_task_ids = EXCLUDED.pending_task_ids, - phase = EXCLUDED.phase, - last_activity = NOW(), - updated_at = NOW() - RETURNING * - "#, - ) - .bind(contract_id) - .bind(task_id) - .bind(conversation_history) - .bind(pending_task_ids) - .bind(phase) - .fetch_one(pool) - .await -} - -/// Get supervisor state for a contract. -pub async fn get_supervisor_state( - pool: &PgPool, - contract_id: Uuid, -) -> Result<Option<SupervisorState>, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>("SELECT * FROM supervisor_states WHERE contract_id = $1") - .bind(contract_id) - .fetch_optional(pool) - .await -} - -/// Get supervisor state by task ID. -pub async fn get_supervisor_state_by_task( - pool: &PgPool, - task_id: Uuid, -) -> Result<Option<SupervisorState>, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>("SELECT * FROM supervisor_states WHERE task_id = $1") - .bind(task_id) - .fetch_optional(pool) - .await -} - -/// Update supervisor conversation history. -pub async fn update_supervisor_conversation( - pool: &PgPool, - contract_id: Uuid, - conversation_history: serde_json::Value, -) -> Result<SupervisorState, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - UPDATE supervisor_states - SET conversation_history = $1, - last_activity = NOW(), - updated_at = NOW() - WHERE contract_id = $2 - RETURNING * - "#, - ) - .bind(conversation_history) - .bind(contract_id) - .fetch_one(pool) - .await -} - -/// Update supervisor pending tasks. -pub async fn update_supervisor_pending_tasks( - pool: &PgPool, - contract_id: Uuid, - pending_task_ids: &[Uuid], -) -> Result<SupervisorState, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - UPDATE supervisor_states - SET pending_task_ids = $1, - last_activity = NOW(), - updated_at = NOW() - WHERE contract_id = $2 - RETURNING * - "#, - ) - .bind(pending_task_ids) - .bind(contract_id) - .fetch_one(pool) - .await -} - -/// Update supervisor state with detailed activity tracking. -/// Called at key save points: LLM response, task spawn, question asked, phase change. -pub async fn update_supervisor_detailed_state( - pool: &PgPool, - contract_id: Uuid, - state: &str, - current_activity: Option<&str>, - progress: i32, - error_message: Option<&str>, -) -> Result<SupervisorState, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - UPDATE supervisor_states - SET state = $1, - current_activity = $2, - progress = $3, - error_message = $4, - last_activity = NOW(), - updated_at = NOW() - WHERE contract_id = $5 - RETURNING * - "#, - ) - .bind(state) - .bind(current_activity) - .bind(progress) - .bind(error_message) - .bind(contract_id) - .fetch_one(pool) - .await -} - -/// Add a spawned task ID to the supervisor's list. -pub async fn add_supervisor_spawned_task( - pool: &PgPool, - contract_id: Uuid, - task_id: Uuid, -) -> Result<SupervisorState, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - UPDATE supervisor_states - SET spawned_task_ids = array_append(spawned_task_ids, $1), - last_activity = NOW(), - updated_at = NOW() - WHERE contract_id = $2 - RETURNING * - "#, - ) - .bind(task_id) - .bind(contract_id) - .fetch_one(pool) - .await -} - -/// Add a pending question to the supervisor state. -pub async fn add_supervisor_pending_question( - pool: &PgPool, - contract_id: Uuid, - question: serde_json::Value, -) -> Result<SupervisorState, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - UPDATE supervisor_states - SET pending_questions = pending_questions || $1::jsonb, - state = 'waiting_for_user', - last_activity = NOW(), - updated_at = NOW() - WHERE contract_id = $2 - RETURNING * - "#, - ) - .bind(question) - .bind(contract_id) - .fetch_one(pool) - .await -} - -/// Remove a pending question by ID. -pub async fn remove_supervisor_pending_question( - pool: &PgPool, - contract_id: Uuid, - question_id: Uuid, -) -> Result<SupervisorState, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - UPDATE supervisor_states - SET pending_questions = ( - SELECT COALESCE(jsonb_agg(elem), '[]'::jsonb) - FROM jsonb_array_elements(pending_questions) elem - WHERE (elem->>'id')::uuid != $1 - ), - last_activity = NOW(), - updated_at = NOW() - WHERE contract_id = $2 - RETURNING * - "#, - ) - .bind(question_id) - .bind(contract_id) - .fetch_one(pool) - .await -} - -/// Comprehensive state save - used at major save points. -pub async fn save_supervisor_state_full( - pool: &PgPool, - contract_id: Uuid, - task_id: Uuid, - conversation_history: serde_json::Value, - pending_task_ids: &[Uuid], - phase: &str, - state: &str, - current_activity: Option<&str>, - progress: i32, - error_message: Option<&str>, - spawned_task_ids: &[Uuid], - pending_questions: serde_json::Value, -) -> Result<SupervisorState, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - INSERT INTO supervisor_states ( - contract_id, task_id, conversation_history, pending_task_ids, phase, - state, current_activity, progress, error_message, spawned_task_ids, - pending_questions, last_activity - ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW()) - ON CONFLICT (contract_id) DO UPDATE SET - task_id = EXCLUDED.task_id, - conversation_history = EXCLUDED.conversation_history, - pending_task_ids = EXCLUDED.pending_task_ids, - phase = EXCLUDED.phase, - state = EXCLUDED.state, - current_activity = EXCLUDED.current_activity, - progress = EXCLUDED.progress, - error_message = EXCLUDED.error_message, - spawned_task_ids = EXCLUDED.spawned_task_ids, - pending_questions = EXCLUDED.pending_questions, - last_activity = NOW(), - updated_at = NOW() - RETURNING * - "#, - ) - .bind(contract_id) - .bind(task_id) - .bind(conversation_history) - .bind(pending_task_ids) - .bind(phase) - .bind(state) - .bind(current_activity) - .bind(progress) - .bind(error_message) - .bind(spawned_task_ids) - .bind(pending_questions) - .fetch_one(pool) - .await -} - -/// Mark supervisor as restored from a crash/interruption. -pub async fn mark_supervisor_restored( - pool: &PgPool, - contract_id: Uuid, - restoration_source: &str, -) -> Result<SupervisorState, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - UPDATE supervisor_states - SET restoration_count = restoration_count + 1, - last_restored_at = NOW(), - restoration_source = $1, - state = 'initializing', - error_message = NULL, - last_activity = NOW(), - updated_at = NOW() - WHERE contract_id = $2 - RETURNING * - "#, - ) - .bind(restoration_source) - .bind(contract_id) - .fetch_one(pool) - .await -} - -/// Get supervisors with pending questions (for re-delivery after restoration). -pub async fn get_supervisors_with_pending_questions( - pool: &PgPool, - owner_id: Uuid, -) -> Result<Vec<SupervisorState>, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - SELECT ss.* - FROM supervisor_states ss - JOIN contracts c ON c.id = ss.contract_id - WHERE c.owner_id = $1 - AND ss.pending_questions != '[]'::jsonb - AND c.status = 'active' - ORDER BY ss.last_activity DESC - "#, - ) - .bind(owner_id) - .fetch_all(pool) - .await -} - -/// Get supervisor state with full details for restoration. -/// Includes validation info. -pub async fn get_supervisor_state_for_restoration( - pool: &PgPool, - contract_id: Uuid, -) -> Result<Option<SupervisorState>, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - SELECT * FROM supervisor_states WHERE contract_id = $1 - "#, - ) - .bind(contract_id) - .fetch_optional(pool) - .await -} - -/// Validate spawned tasks are in expected states. -/// Returns map of task_id -> (status, updated_at). -pub async fn validate_spawned_tasks( - pool: &PgPool, - task_ids: &[Uuid], -) -> Result<std::collections::HashMap<Uuid, (String, chrono::DateTime<Utc>)>, sqlx::Error> { - use sqlx::Row; - - let rows = sqlx::query( - r#" - SELECT id, status, updated_at - FROM tasks - WHERE id = ANY($1) - "#, - ) - .bind(task_ids) - .fetch_all(pool) - .await?; - - let mut result = std::collections::HashMap::new(); - for row in rows { - let id: Uuid = row.get("id"); - let status: String = row.get("status"); - let updated_at: chrono::DateTime<Utc> = row.get("updated_at"); - result.insert(id, (status, updated_at)); - } - Ok(result) -} - -/// Update supervisor state when phase changes. -pub async fn update_supervisor_phase( - pool: &PgPool, - contract_id: Uuid, - new_phase: &str, -) -> Result<SupervisorState, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - UPDATE supervisor_states - SET phase = $1, - state = 'working', - current_activity = 'Phase changed to ' || $1, - last_activity = NOW(), - updated_at = NOW() - WHERE contract_id = $2 - RETURNING * - "#, - ) - .bind(new_phase) - .bind(contract_id) - .fetch_one(pool) - .await -} - -/// Update supervisor state on heartbeat (lightweight update). -pub async fn update_supervisor_heartbeat_state( - pool: &PgPool, - contract_id: Uuid, - state: &str, - current_activity: Option<&str>, - progress: i32, - pending_task_ids: &[Uuid], -) -> Result<(), sqlx::Error> { - sqlx::query( - r#" - UPDATE supervisor_states - SET state = $1, - current_activity = $2, - progress = $3, - pending_task_ids = $4, - last_activity = NOW(), - updated_at = NOW() - WHERE contract_id = $5 - "#, - ) - .bind(state) - .bind(current_activity) - .bind(progress) - .bind(pending_task_ids) - .bind(contract_id) - .execute(pool) - .await?; - Ok(()) -} - -// ============================================================================ -// Supervisor Heartbeats -// ============================================================================ - -/// Record a supervisor heartbeat. -/// This creates a historical record for monitoring and dead supervisor detection. -pub async fn create_supervisor_heartbeat( - pool: &PgPool, - supervisor_task_id: Uuid, - contract_id: Uuid, - state: &str, - phase: &str, - current_activity: Option<&str>, - progress: i32, - pending_task_ids: &[Uuid], -) -> Result<SupervisorHeartbeatRecord, sqlx::Error> { - sqlx::query_as::<_, SupervisorHeartbeatRecord>( - r#" - INSERT INTO supervisor_heartbeats ( - supervisor_task_id, contract_id, state, phase, current_activity, progress, pending_task_ids, timestamp - ) - VALUES ($1, $2, $3, $4, $5, $6, $7, NOW()) - RETURNING * - "#, - ) - .bind(supervisor_task_id) - .bind(contract_id) - .bind(state) - .bind(phase) - .bind(current_activity) - .bind(progress) - .bind(pending_task_ids) - .fetch_one(pool) - .await -} - -/// Get the latest heartbeat for a supervisor task. -pub async fn get_latest_supervisor_heartbeat( - pool: &PgPool, - supervisor_task_id: Uuid, -) -> Result<Option<SupervisorHeartbeatRecord>, sqlx::Error> { - sqlx::query_as::<_, SupervisorHeartbeatRecord>( - r#" - SELECT * FROM supervisor_heartbeats - WHERE supervisor_task_id = $1 - ORDER BY timestamp DESC - LIMIT 1 - "#, - ) - .bind(supervisor_task_id) - .fetch_optional(pool) - .await -} - -/// Get recent heartbeats for a supervisor task. -pub async fn get_supervisor_heartbeats( - pool: &PgPool, - supervisor_task_id: Uuid, - limit: i64, -) -> Result<Vec<SupervisorHeartbeatRecord>, sqlx::Error> { - sqlx::query_as::<_, SupervisorHeartbeatRecord>( - r#" - SELECT * FROM supervisor_heartbeats - WHERE supervisor_task_id = $1 - ORDER BY timestamp DESC - LIMIT $2 - "#, - ) - .bind(supervisor_task_id) - .bind(limit) - .fetch_all(pool) - .await -} - -/// Get recent heartbeats for a contract. -pub async fn get_contract_supervisor_heartbeats( - pool: &PgPool, - contract_id: Uuid, - limit: i64, -) -> Result<Vec<SupervisorHeartbeatRecord>, sqlx::Error> { - sqlx::query_as::<_, SupervisorHeartbeatRecord>( - r#" - SELECT * FROM supervisor_heartbeats - WHERE contract_id = $1 - ORDER BY timestamp DESC - LIMIT $2 - "#, - ) - .bind(contract_id) - .bind(limit) - .fetch_all(pool) - .await -} - -/// Delete old heartbeats beyond the TTL (24 hours by default). -/// Returns the number of deleted records. -pub async fn cleanup_old_heartbeats( - pool: &PgPool, - ttl_hours: i64, -) -> Result<u64, sqlx::Error> { - let result = sqlx::query( - r#" - DELETE FROM supervisor_heartbeats - WHERE timestamp < NOW() - ($1 || ' hours')::INTERVAL - "#, - ) - .bind(ttl_hours.to_string()) - .execute(pool) - .await?; - - Ok(result.rows_affected()) -} - -/// Find supervisors that have not sent a heartbeat within the timeout period. -/// Returns list of (supervisor_task_id, contract_id, last_heartbeat_timestamp). -pub async fn find_stale_supervisors( - pool: &PgPool, - timeout_seconds: i64, -) -> Result<Vec<(Uuid, Uuid, chrono::DateTime<Utc>)>, sqlx::Error> { - let rows = sqlx::query( - r#" - WITH latest_heartbeats AS ( - SELECT DISTINCT ON (supervisor_task_id) - supervisor_task_id, - contract_id, - timestamp - FROM supervisor_heartbeats - ORDER BY supervisor_task_id, timestamp DESC - ) - SELECT - lh.supervisor_task_id, - lh.contract_id, - lh.timestamp - FROM latest_heartbeats lh - JOIN tasks t ON t.id = lh.supervisor_task_id - WHERE t.status = 'running' - AND lh.timestamp < NOW() - ($1 || ' seconds')::INTERVAL - "#, - ) - .bind(timeout_seconds.to_string()) - .fetch_all(pool) - .await?; - - let mut result = Vec::new(); - for row in rows { - use sqlx::Row; - let supervisor_task_id: Uuid = row.get("supervisor_task_id"); - let contract_id: Uuid = row.get("contract_id"); - let timestamp: chrono::DateTime<Utc> = row.get("timestamp"); - result.push((supervisor_task_id, contract_id, timestamp)); - } - Ok(result) -} - -// ============================================================================ -// Contract Supervisor -// ============================================================================ - -/// Update contract's supervisor task ID. -pub async fn update_contract_supervisor( - pool: &PgPool, - contract_id: Uuid, - supervisor_task_id: Uuid, -) -> Result<Contract, sqlx::Error> { - sqlx::query_as::<_, Contract>( - r#" - UPDATE contracts - SET supervisor_task_id = $1, - updated_at = NOW() - WHERE id = $2 - RETURNING * - "#, - ) - .bind(supervisor_task_id) - .bind(contract_id) - .fetch_one(pool) - .await -} - -/// Mark a deliverable as complete for a specific phase. -/// Uses JSONB operations to append the deliverable_id to the phase's array. -pub async fn mark_deliverable_complete( - pool: &PgPool, - contract_id: Uuid, - phase: &str, - deliverable_id: &str, -) -> Result<Contract, sqlx::Error> { - // Use jsonb_set to add the deliverable to the phase's array - // If the phase key doesn't exist, create an empty array first - // COALESCE handles the case where the phase array doesn't exist yet - sqlx::query_as::<_, Contract>( - r#" - UPDATE contracts - SET completed_deliverables = jsonb_set( - completed_deliverables, - ARRAY[$2::text], - COALESCE(completed_deliverables->$2, '[]'::jsonb) || to_jsonb($3::text), - true - ), - updated_at = NOW() - WHERE id = $1 - AND NOT (COALESCE(completed_deliverables->$2, '[]'::jsonb) ? $3) - RETURNING * - "#, - ) - .bind(contract_id) - .bind(phase) - .bind(deliverable_id) - .fetch_one(pool) - .await -} - -/// Clear all completed deliverables for a specific phase. -/// Used when phase changes or deliverables need to be reset. -pub async fn clear_phase_deliverables( - pool: &PgPool, - contract_id: Uuid, - phase: &str, -) -> Result<Contract, sqlx::Error> { - sqlx::query_as::<_, Contract>( - r#" - UPDATE contracts - SET completed_deliverables = completed_deliverables - $2, - updated_at = NOW() - WHERE id = $1 - RETURNING * - "#, - ) - .bind(contract_id) - .bind(phase) - .fetch_one(pool) - .await -} - -/// Get the supervisor task for a contract. -pub async fn get_contract_supervisor_task( - pool: &PgPool, - contract_id: Uuid, -) -> Result<Option<Task>, sqlx::Error> { - sqlx::query_as::<_, Task>( - r#" - SELECT t.* FROM tasks t - JOIN contracts c ON c.supervisor_task_id = t.id - WHERE c.id = $1 - "#, - ) - .bind(contract_id) - .fetch_optional(pool) - .await -} - -// ============================================================================ -// Task Tree Queries -// ============================================================================ - -/// Get full task tree for a contract. -pub async fn get_contract_task_tree( - pool: &PgPool, - contract_id: Uuid, -) -> Result<Vec<Task>, sqlx::Error> { - sqlx::query_as::<_, Task>( - r#" - WITH RECURSIVE task_tree AS ( - -- Base case: root tasks (no parent) - SELECT * FROM tasks - WHERE contract_id = $1 AND parent_task_id IS NULL - UNION ALL - -- Recursive case: children of current level - SELECT t.* FROM tasks t - JOIN task_tree tt ON t.parent_task_id = tt.id - ) - SELECT * FROM task_tree - ORDER BY depth, created_at - "#, - ) - .bind(contract_id) - .fetch_all(pool) - .await -} - -/// Get task tree from a specific root task. -pub async fn get_task_tree(pool: &PgPool, root_task_id: Uuid) -> Result<Vec<Task>, sqlx::Error> { - sqlx::query_as::<_, Task>( - r#" - WITH RECURSIVE task_tree AS ( - -- Base case: the root task - SELECT * FROM tasks WHERE id = $1 - UNION ALL - -- Recursive case: children of current level - SELECT t.* FROM tasks t - JOIN task_tree tt ON t.parent_task_id = tt.id - ) - SELECT * FROM task_tree - ORDER BY depth, created_at - "#, - ) - .bind(root_task_id) - .fetch_all(pool) - .await -} - -// ============================================================================ // Daemon Selection // ============================================================================ @@ -4578,107 +2470,27 @@ pub async fn cleanup_old_snapshots( pub async fn record_history_event( pool: &PgPool, owner_id: Uuid, - contract_id: Option<Uuid>, task_id: Option<Uuid>, event_type: &str, event_subtype: Option<&str>, - phase: Option<&str>, event_data: serde_json::Value, ) -> Result<HistoryEvent, sqlx::Error> { sqlx::query_as::<_, HistoryEvent>( r#" - INSERT INTO history_events (owner_id, contract_id, task_id, event_type, event_subtype, phase, event_data) - VALUES ($1, $2, $3, $4, $5, $6, $7) + INSERT INTO history_events (owner_id, task_id, event_type, event_subtype, event_data) + VALUES ($1, $2, $3, $4, $5) RETURNING * "# ) .bind(owner_id) - .bind(contract_id) .bind(task_id) .bind(event_type) .bind(event_subtype) - .bind(phase) .bind(event_data) .fetch_one(pool) .await } -/// Get contract history timeline -pub async fn get_contract_history( - pool: &PgPool, - contract_id: Uuid, - owner_id: Uuid, - filters: &HistoryQueryFilters, -) -> Result<(Vec<HistoryEvent>, i64), sqlx::Error> { - let limit = filters.limit.unwrap_or(100); - - let mut query = String::from( - "SELECT * FROM history_events WHERE contract_id = $1 AND owner_id = $2" - ); - let mut count_query = String::from( - "SELECT COUNT(*) FROM history_events WHERE contract_id = $1 AND owner_id = $2" - ); - - let mut param_count = 2; - - if filters.phase.is_some() { - param_count += 1; - query.push_str(&format!(" AND phase = ${}" , param_count)); - count_query.push_str(&format!(" AND phase = ${}", param_count)); - } - - if filters.from.is_some() { - param_count += 1; - query.push_str(&format!(" AND created_at >= ${}", param_count)); - count_query.push_str(&format!(" AND created_at >= ${}", param_count)); - } - - if filters.to.is_some() { - param_count += 1; - query.push_str(&format!(" AND created_at <= ${}", param_count)); - count_query.push_str(&format!(" AND created_at <= ${}", param_count)); - } - - query.push_str(" ORDER BY created_at DESC"); - query.push_str(&format!(" LIMIT {}", limit)); - - // Build and execute the query dynamically - let mut q = sqlx::query_as::<_, HistoryEvent>(&query) - .bind(contract_id) - .bind(owner_id); - - if let Some(ref phase) = filters.phase { - q = q.bind(phase); - } - if let Some(ref from) = filters.from { - q = q.bind(from); - } - if let Some(ref to) = filters.to { - q = q.bind(to); - } - - let events = q.fetch_all(pool).await?; - - // Get total count - let mut cq = sqlx::query_scalar::<_, i64>(&count_query) - .bind(contract_id) - .bind(owner_id); - - if let Some(ref phase) = filters.phase { - cq = cq.bind(phase); - } - if let Some(ref from) = filters.from { - cq = cq.bind(from); - } - if let Some(ref to) = filters.to { - cq = cq.bind(to); - } - - let count = cq.fetch_one(pool).await?; - - Ok((events, count)) -} - /// Get task history pub async fn get_task_history( pool: &PgPool, @@ -4825,13 +2637,6 @@ pub async fn get_task_conversation( Ok(messages) } -/// Get supervisor conversation (from supervisor_states) -pub async fn get_supervisor_conversation_full( - pool: &PgPool, - contract_id: Uuid, -) -> Result<Option<SupervisorState>, sqlx::Error> { - get_supervisor_state(pool, contract_id).await -} // ============================================================================= // Anonymous Task Cleanup Functions @@ -4969,156 +2774,6 @@ pub async fn delete_checkpoint_patches_for_task( Ok(result.rows_affected() as i64) } -// ============================================================================= -// Red Team Notifications -// ============================================================================= -// ============================================================================= -// Supervisor Status API Helpers -// ============================================================================= - -/// Get supervisor status for a contract. -/// Returns combined information from supervisor_states and tasks tables. -pub async fn get_supervisor_status( - pool: &PgPool, - contract_id: Uuid, - owner_id: Uuid, -) -> Result<Option<SupervisorStatusInfo>, sqlx::Error> { - // Query to get supervisor status by joining supervisor_states with tasks - sqlx::query_as::<_, SupervisorStatusInfo>( - r#" - SELECT - ss.task_id, - COALESCE(t.status, 'unknown') as supervisor_state, - ss.phase, - t.progress_summary as current_activity, - ss.pending_task_ids, - ss.last_activity as last_heartbeat, - t.status as task_status, - t.daemon_id IS NOT NULL as is_running - FROM supervisor_states ss - JOIN tasks t ON t.id = ss.task_id - WHERE ss.contract_id = $1 - AND t.owner_id = $2 - "#, - ) - .bind(contract_id) - .bind(owner_id) - .fetch_optional(pool) - .await -} - -/// Internal struct to hold supervisor status query result -#[derive(Debug, Clone, sqlx::FromRow)] -pub struct SupervisorStatusInfo { - pub task_id: Uuid, - pub supervisor_state: String, - pub phase: String, - pub current_activity: Option<String>, - #[sqlx(try_from = "Vec<Uuid>")] - pub pending_task_ids: Vec<Uuid>, - pub last_heartbeat: chrono::DateTime<chrono::Utc>, - pub task_status: String, - pub is_running: bool, -} - -/// Get supervisor activity history from history_events table. -/// This provides a timeline of supervisor activities that can serve as "heartbeats". -pub async fn get_supervisor_activity_history( - pool: &PgPool, - contract_id: Uuid, - limit: i32, - offset: i32, -) -> Result<Vec<SupervisorActivityEntry>, sqlx::Error> { - sqlx::query_as::<_, SupervisorActivityEntry>( - r#" - SELECT - created_at as timestamp, - COALESCE(event_subtype, 'activity') as state, - event_data->>'activity' as activity, - (event_data->>'progress')::INTEGER as progress, - COALESCE(phase, 'unknown') as phase, - CASE - WHEN event_data->'pending_task_ids' IS NOT NULL - THEN ARRAY(SELECT jsonb_array_elements_text(event_data->'pending_task_ids'))::UUID[] - ELSE ARRAY[]::UUID[] - END as pending_task_ids - FROM history_events - WHERE contract_id = $1 - AND event_type IN ('supervisor', 'phase', 'task') - ORDER BY created_at DESC - LIMIT $2 OFFSET $3 - "#, - ) - .bind(contract_id) - .bind(limit) - .bind(offset) - .fetch_all(pool) - .await -} - -/// Internal struct to hold supervisor activity entry -#[derive(Debug, Clone, sqlx::FromRow)] -pub struct SupervisorActivityEntry { - pub timestamp: chrono::DateTime<chrono::Utc>, - pub state: String, - pub activity: Option<String>, - pub progress: Option<i32>, - pub phase: String, - #[sqlx(try_from = "Vec<Uuid>")] - pub pending_task_ids: Vec<Uuid>, -} - -/// Count total supervisor activity history entries for a contract. -pub async fn count_supervisor_activity_history( - pool: &PgPool, - contract_id: Uuid, -) -> Result<i64, sqlx::Error> { - let result: (i64,) = sqlx::query_as( - r#" - SELECT COUNT(*) - FROM history_events - WHERE contract_id = $1 - AND event_type IN ('supervisor', 'phase', 'task') - "#, - ) - .bind(contract_id) - .fetch_one(pool) - .await?; - Ok(result.0) -} - -/// Update supervisor state last_activity timestamp. -/// This acts as a "sync" operation to refresh the supervisor's heartbeat. -pub async fn sync_supervisor_state( - pool: &PgPool, - contract_id: Uuid, -) -> Result<Option<SupervisorState>, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - UPDATE supervisor_states - SET last_activity = NOW(), - updated_at = NOW() - WHERE contract_id = $1 - RETURNING * - "#, - ) - .bind(contract_id) - .fetch_optional(pool) - .await -} - -// ============================================================================= -// Helper Functions -// ============================================================================= - -/// Helper to truncate string to max length -fn truncate_string(s: &str, max_len: usize) -> String { - if s.len() <= max_len { - s.to_string() - } else { - format!("{}...", &s[..max_len - 3]) - } -} // ============================================================================= // Directive CRUD @@ -7031,37 +4686,6 @@ pub async fn get_running_steps_with_tasks( .await } -/// A running step backed by a contract, joined with the contract's current status. -#[derive(Debug, Clone, sqlx::FromRow)] -pub struct RunningStepWithContract { - pub step_id: Uuid, - pub directive_id: Uuid, - pub contract_id: Uuid, - pub contract_status: String, - pub contract_phase: String, -} - -/// Get running steps that are backed by contracts (for contract-based monitoring). -pub async fn get_running_steps_with_contracts( - pool: &PgPool, -) -> Result<Vec<RunningStepWithContract>, sqlx::Error> { - sqlx::query_as::<_, RunningStepWithContract>( - r#" - SELECT - ds.id AS step_id, - ds.directive_id, - ds.contract_id AS "contract_id!", - c.status AS contract_status, - c.phase AS contract_phase - FROM directive_steps ds - JOIN contracts c ON c.id = ds.contract_id - WHERE ds.status = 'running' - AND ds.contract_id IS NOT NULL - "#, - ) - .fetch_all(pool) - .await -} /// An orchestrator task to check (directive with pending planning task). #[derive(Debug, Clone, sqlx::FromRow)] @@ -7221,25 +4845,6 @@ pub async fn link_task_to_step( Ok(()) } -/// Link a contract to a directive step. -pub async fn link_contract_to_step( - pool: &PgPool, - step_id: Uuid, - contract_id: Uuid, -) -> Result<(), sqlx::Error> { - sqlx::query( - r#" - UPDATE directive_steps - SET contract_id = $1 - WHERE id = $2 - "#, - ) - .bind(contract_id) - .bind(step_id) - .execute(pool) - .await?; - Ok(()) -} /// Set a step to 'running' status (after its task has been dispatched). pub async fn set_step_running( |
