summaryrefslogtreecommitdiff
path: root/makima/src/db/repository.rs
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/db/repository.rs')
-rw-r--r--makima/src/db/repository.rs2537
1 files changed, 71 insertions, 2466 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index ee4b561..d453f99 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -6,21 +6,17 @@ use sqlx::PgPool;
use uuid::Uuid;
use super::models::{
- CheckpointPatch, CheckpointPatchInfo, Contract, ContractChatConversation,
- ContractChatMessageRecord, ContractEvent, ContractRepository, ContractSummary,
- ContractTypeTemplateRecord, ConversationMessage, ConversationSnapshot,
- CreateContractRequest, CreateFileRequest, CreateTaskRequest,
- CreateTemplateRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity,
- DeliverableDefinition, Directive, DirectiveDocument, DirectiveStep, DirectiveSummary,
+ CheckpointPatch, CheckpointPatchInfo, ConversationMessage, ConversationSnapshot,
+ CreateFileRequest, CreateTaskRequest,
+ Daemon, DaemonTaskAssignment, DaemonWithCapacity,
+ Directive, DirectiveDocument, DirectiveStep, DirectiveSummary,
CreateDirectiveRequest, CreateDirectiveStepRequest,
UpdateDirectiveRequest, UpdateDirectiveStepRequest,
CreateOrderRequest, Order, UpdateOrderRequest,
CreateDirectiveOrderGroupRequest, DirectiveOrderGroup, UpdateDirectiveOrderGroupRequest,
File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters,
- MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult, PhaseConfig,
- PhaseDefinition, SupervisorHeartbeatRecord, SupervisorState,
- Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest,
- UpdateFileRequest, UpdateTaskRequest, UpdateTemplateRequest,
+ Task, TaskCheckpoint, TaskEvent, TaskSummary,
+ UpdateFileRequest, UpdateTaskRequest,
};
/// Repository error types.
@@ -89,7 +85,7 @@ pub async fn create_file(pool: &PgPool, req: InternalCreateFileRequest) -> Resul
r#"
INSERT INTO files (name, description, transcript, location, summary, body)
VALUES ($1, $2, $3, $4, NULL, $5)
- RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(&name)
@@ -105,7 +101,7 @@ pub async fn create_file(pool: &PgPool, req: InternalCreateFileRequest) -> Resul
pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
- SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
WHERE id = $1
"#,
@@ -119,7 +115,7 @@ pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Err
pub async fn list_files(pool: &PgPool) -> Result<Vec<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
- SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
ORDER BY created_at DESC
"#,
@@ -171,7 +167,7 @@ pub async fn update_file(
UPDATE files
SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW()
WHERE id = $1 AND version = $7
- RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(id)
@@ -190,7 +186,7 @@ pub async fn update_file(
UPDATE files
SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW()
WHERE id = $1
- RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(id)
@@ -246,7 +242,6 @@ pub async fn count_files(pool: &PgPool) -> Result<i64, sqlx::Error> {
// =============================================================================
/// Create a new file record for a specific owner.
-/// Files must belong to a contract - the contract_id is required and the phase is looked up.
pub async fn create_file_for_owner(
pool: &PgPool,
owner_id: Uuid,
@@ -254,32 +249,16 @@ pub async fn create_file_for_owner(
) -> Result<File, sqlx::Error> {
let name = req.name.unwrap_or_else(generate_default_name);
let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default();
- // Use body from request (may be empty or contain template elements)
let body_json = serde_json::to_value(&req.body).unwrap_or_default();
- // Use provided contract_phase, or look up from contract's current phase
- let contract_phase: Option<String> = if req.contract_phase.is_some() {
- req.contract_phase
- } else {
- sqlx::query_scalar(
- "SELECT phase FROM contracts WHERE id = $1 AND owner_id = $2",
- )
- .bind(req.contract_id)
- .bind(owner_id)
- .fetch_optional(pool)
- .await?
- };
-
sqlx::query_as::<_, File>(
r#"
- INSERT INTO files (owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, repo_file_path)
- VALUES ($1, $2, $3, $4, $5, $6, $7, NULL, $8, $9)
- RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ INSERT INTO files (owner_id, name, description, transcript, location, summary, body, repo_file_path)
+ VALUES ($1, $2, $3, $4, $5, NULL, $6, $7)
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(owner_id)
- .bind(req.contract_id)
- .bind(&contract_phase)
.bind(&name)
.bind(&req.description)
.bind(&transcript_json)
@@ -298,7 +277,7 @@ pub async fn get_file_for_owner(
) -> Result<Option<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
- SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
WHERE id = $1 AND owner_id = $2
"#,
@@ -313,7 +292,7 @@ pub async fn get_file_for_owner(
pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
- SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
WHERE owner_id = $1
ORDER BY created_at DESC
@@ -324,13 +303,10 @@ pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<F
.await
}
-/// Database row type for file summary with contract info
+/// Database row type for file summary
#[derive(Debug, sqlx::FromRow)]
struct FileSummaryRow {
id: Uuid,
- contract_id: Option<Uuid>,
- contract_name: Option<String>,
- contract_phase: Option<String>,
name: String,
description: Option<String>,
#[sqlx(json)]
@@ -342,7 +318,7 @@ struct FileSummaryRow {
updated_at: chrono::DateTime<chrono::Utc>,
}
-/// List file summaries for an owner with contract info (joined).
+/// List file summaries for an owner.
pub async fn list_file_summaries_for_owner(
pool: &PgPool,
owner_id: Uuid,
@@ -350,11 +326,9 @@ pub async fn list_file_summaries_for_owner(
let rows = sqlx::query_as::<_, FileSummaryRow>(
r#"
SELECT
- f.id, f.contract_id, c.name as contract_name, f.contract_phase,
- f.name, f.description, f.transcript, f.version,
+ f.id, f.name, f.description, f.transcript, f.version,
f.repo_file_path, f.repo_sync_status, f.created_at, f.updated_at
FROM files f
- LEFT JOIN contracts c ON f.contract_id = c.id
WHERE f.owner_id = $1
ORDER BY f.created_at DESC
"#,
@@ -373,9 +347,6 @@ pub async fn list_file_summaries_for_owner(
.fold(0.0_f32, f32::max);
FileSummary {
id: row.id,
- contract_id: row.contract_id,
- contract_name: row.contract_name,
- contract_phase: row.contract_phase,
name: row.name,
description: row.description,
transcript_count: row.transcript.len(),
@@ -429,7 +400,7 @@ pub async fn update_file_for_owner(
UPDATE files
SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
WHERE id = $1 AND owner_id = $2 AND version = $8
- RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(id)
@@ -449,7 +420,7 @@ pub async fn update_file_for_owner(
UPDATE files
SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
WHERE id = $1 AND owner_id = $2
- RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(id)
@@ -657,34 +628,24 @@ pub async fn count_file_versions(pool: &PgPool, file_id: Uuid) -> Result<i64, sq
/// Task spawning is now controlled by supervisors at the application level.
/// Depth is no longer constrained in the database.
pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, sqlx::Error> {
- // Calculate depth and inherit settings from parent if applicable
- let (depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
+ // Calculate depth + inherit settings from parent if applicable.
+ let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
if let Some(parent_id) = req.parent_task_id {
- // Fetch parent task to get depth and inherit settings
let parent = get_task(pool, parent_id).await?
.ok_or_else(|| sqlx::Error::RowNotFound)?;
let new_depth = parent.depth + 1;
-
- // Subtasks inherit contract_id from parent (or use request contract_id if parent has none)
- let contract_id = parent.contract_id.or(req.contract_id);
-
- // Inherit repo settings if not provided
let repo_url = req.repository_url.clone().or(parent.repository_url);
let base_branch = req.base_branch.clone().or(parent.base_branch);
let target_branch = req.target_branch.clone().or(parent.target_branch);
let merge_mode = req.merge_mode.clone().or(parent.merge_mode);
let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path);
- // NOTE: completion_action is NOT inherited - subtasks should not auto-merge.
- // The supervisor integrates subtask work from their worktrees.
let completion_action = req.completion_action.clone();
- (new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
+ (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
} else {
- // Top-level task: depth 0, use contract_id from request (may be None for branched tasks)
(
0,
- req.contract_id,
req.repository_url.clone(),
req.base_branch.clone(),
req.target_branch.clone(),
@@ -699,23 +660,21 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task,
sqlx::query_as::<_, Task>(
r#"
INSERT INTO tasks (
- contract_id, parent_task_id, depth, name, description, plan, priority,
- is_supervisor, repository_url, base_branch, target_branch, merge_mode,
+ parent_task_id, depth, name, description, plan, priority,
+ repository_url, base_branch, target_branch, merge_mode,
target_repo_path, completion_action, continue_from_task_id, copy_files,
- branched_from_task_id, conversation_state, supervisor_worktree_task_id
+ branched_from_task_id, conversation_state
)
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19)
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
RETURNING *
"#,
)
- .bind(contract_id)
.bind(req.parent_task_id)
.bind(depth)
.bind(&req.name)
.bind(&req.description)
.bind(&req.plan)
.bind(req.priority)
- .bind(req.is_supervisor)
.bind(&repo_url)
.bind(&base_branch)
.bind(&target_branch)
@@ -726,7 +685,6 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task,
.bind(&copy_files_json)
.bind(&req.branched_from_task_id)
.bind(&req.conversation_history)
- .bind(&req.supervisor_worktree_task_id)
.fetch_one(pool)
.await
}
@@ -751,14 +709,12 @@ pub async fn list_tasks(pool: &PgPool) -> Result<Vec<TaskSummary>, sqlx::Error>
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
+ t.id,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
+ t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id IS NULL AND COALESCE(t.hidden, false) = false
ORDER BY t.priority DESC, t.created_at DESC
"#,
@@ -772,14 +728,12 @@ pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSum
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
+ t.id,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
+ t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id = $1
ORDER BY t.priority DESC, t.created_at DESC
"#,
@@ -790,73 +744,6 @@ pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSum
}
/// List all tasks in a contract (for supervisor tree view).
-pub async fn list_tasks_by_contract(
- pool: &PgPool,
- contract_id: Uuid,
- owner_id: Uuid,
-) -> Result<Vec<Task>, sqlx::Error> {
- sqlx::query_as::<_, Task>(
- r#"
- SELECT * FROM tasks
- WHERE contract_id = $1 AND owner_id = $2
- ORDER BY is_supervisor DESC, depth ASC, created_at ASC
- "#,
- )
- .bind(contract_id)
- .bind(owner_id)
- .fetch_all(pool)
- .await
-}
-
-/// Get pending tasks for a contract (non-supervisor tasks only).
-/// Includes tasks that were interrupted (retry candidates).
-/// Prioritizes interrupted tasks and excludes those that exceeded max_retries.
-pub async fn get_pending_tasks_for_contract(
- pool: &PgPool,
- contract_id: Uuid,
- owner_id: Uuid,
-) -> Result<Vec<Task>, sqlx::Error> {
- sqlx::query_as::<_, Task>(
- r#"
- SELECT t.* FROM tasks t
- WHERE t.contract_id = $1 AND t.owner_id = $2
- AND t.status = 'pending'
- AND t.retry_count < t.max_retries
- AND t.is_supervisor = false
- ORDER BY
- t.interrupted_at DESC NULLS LAST,
- t.priority DESC,
- t.created_at ASC
- "#,
- )
- .bind(contract_id)
- .bind(owner_id)
- .fetch_all(pool)
- .await
-}
-
-/// Get all contracts that have pending tasks awaiting retry.
-/// Returns tuples of (contract_id, owner_id) for contracts with retryable tasks.
-pub async fn get_all_pending_task_contracts(
- pool: &PgPool,
-) -> Result<Vec<(Uuid, Uuid)>, sqlx::Error> {
- sqlx::query_as::<_, (Uuid, Uuid)>(
- r#"
- SELECT DISTINCT t.contract_id, t.owner_id
- FROM tasks t
- WHERE t.contract_id IS NOT NULL
- AND t.status = 'pending'
- AND t.retry_count < t.max_retries
- AND t.is_supervisor = false
- ORDER BY t.owner_id, t.contract_id
- "#,
- )
- .fetch_all(pool)
- .await
-}
-
-/// Mark a task as pending for retry after daemon failure.
-/// Increments retry count and adds the failed daemon to exclusion list.
pub async fn mark_task_for_retry(
pool: &PgPool,
task_id: Uuid,
@@ -1061,16 +948,13 @@ pub async fn create_task_for_owner(
owner_id: Uuid,
req: CreateTaskRequest,
) -> Result<Task, sqlx::Error> {
- // Calculate depth and inherit settings from parent if applicable
- let (depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
+ // Calculate depth + inherit settings from parent if applicable.
+ let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
if let Some(parent_id) = req.parent_task_id {
- // Fetch parent task to get depth and inherit settings (must belong to same owner)
let parent = get_task_for_owner(pool, parent_id, owner_id).await?
.ok_or_else(|| sqlx::Error::RowNotFound)?;
let new_depth = parent.depth + 1;
-
- // Validate max depth
if new_depth >= 2 {
return Err(sqlx::Error::Protocol(format!(
"Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.",
@@ -1078,25 +962,17 @@ pub async fn create_task_for_owner(
)));
}
- // Subtasks inherit contract_id from parent (or use request contract_id if parent has none)
- let contract_id = parent.contract_id.or(req.contract_id);
-
- // Inherit repo settings if not provided
let repo_url = req.repository_url.clone().or(parent.repository_url);
let base_branch = req.base_branch.clone().or(parent.base_branch);
let target_branch = req.target_branch.clone().or(parent.target_branch);
let merge_mode = req.merge_mode.clone().or(parent.merge_mode);
let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path);
- // NOTE: completion_action is NOT inherited - subtasks should not auto-merge.
- // The orchestrator integrates subtask work from their worktrees.
let completion_action = req.completion_action.clone();
- (new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
+ (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
} else {
- // Top-level task: depth 0, use contract_id from request (may be None for branched tasks)
(
0,
- req.contract_id,
req.repository_url.clone(),
req.base_branch.clone(),
req.target_branch.clone(),
@@ -1108,14 +984,11 @@ pub async fn create_task_for_owner(
let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default());
- // Resolve the directive_document_id. Tasks plumbed through this builder
- // currently have no way to specify a document explicitly (we don't want
- // to widen `CreateTaskRequest` for this — every call site would have to
- // change). Instead, when the task is directive-driven (directive_id is
- // set) we attach it to that directive's most recently-updated active
- // document so the task lands under that document's tasks/ subfolder in
- // the sidebar. Resolution failures are non-fatal — the task still gets
- // created with directive_document_id = NULL, matching legacy behaviour.
+ // Resolve directive_document_id from the directive's currently-
+ // active contract row (directive_documents table) so the task
+ // lands under the right tasks/ subfolder in the sidebar. Failures
+ // are non-fatal — the task is created with NULL document_id and
+ // the sidebar tolerates that.
let directive_document_id = match req.directive_id {
Some(directive_id) => resolve_active_document_for_directive(pool, directive_id)
.await
@@ -1126,25 +999,23 @@ pub async fn create_task_for_owner(
sqlx::query_as::<_, Task>(
r#"
INSERT INTO tasks (
- owner_id, contract_id, parent_task_id, depth, name, description, plan, priority,
- is_supervisor, repository_url, base_branch, target_branch, merge_mode,
+ owner_id, parent_task_id, depth, name, description, plan, priority,
+ repository_url, base_branch, target_branch, merge_mode,
target_repo_path, completion_action, continue_from_task_id, copy_files,
- branched_from_task_id, conversation_state, supervisor_worktree_task_id,
+ branched_from_task_id, conversation_state,
directive_id, directive_step_id, directive_document_id
)
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23)
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20)
RETURNING *
"#,
)
.bind(owner_id)
- .bind(contract_id)
.bind(req.parent_task_id)
.bind(depth)
.bind(&req.name)
.bind(&req.description)
.bind(&req.plan)
.bind(req.priority)
- .bind(req.is_supervisor)
.bind(&repo_url)
.bind(&base_branch)
.bind(&target_branch)
@@ -1155,7 +1026,6 @@ pub async fn create_task_for_owner(
.bind(&copy_files_json)
.bind(&req.branched_from_task_id)
.bind(&req.conversation_history)
- .bind(&req.supervisor_worktree_task_id)
.bind(&req.directive_id)
.bind(&req.directive_step_id)
.bind(&directive_document_id)
@@ -1226,14 +1096,12 @@ pub async fn list_tasks_for_owner(
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
+ t.id,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
+ t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.owner_id = $1 AND t.parent_task_id IS NULL AND COALESCE(t.hidden, false) = false
ORDER BY t.priority DESC, t.created_at DESC
"#,
@@ -1335,14 +1203,12 @@ pub async fn list_ephemeral_directive_tasks_for_owner(
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
+ t.id,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
+ t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.owner_id = $1
AND t.directive_id = $2
AND t.directive_step_id IS NULL
@@ -1369,14 +1235,12 @@ pub async fn list_tmp_tasks_for_owner(
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
+ t.id,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
+ t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.owner_id = $1
AND t.directive_id = $2
AND t.parent_task_id IS NULL
@@ -1399,14 +1263,12 @@ pub async fn list_subtasks_for_owner(
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
+ t.id,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
+ t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.owner_id = $1 AND t.parent_task_id = $2
ORDER BY t.priority DESC, t.created_at DESC
"#,
@@ -1920,14 +1782,12 @@ pub async fn list_sibling_tasks(
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
+ t.id,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
+ t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id = $1 AND t.id != $2
ORDER BY t.priority DESC, t.created_at DESC
"#,
@@ -1942,14 +1802,12 @@ pub async fn list_sibling_tasks(
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
+ t.id,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
+ t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id IS NULL AND t.id != $1
ORDER BY t.priority DESC, t.created_at DESC
"#,
@@ -2121,1287 +1979,28 @@ pub async fn complete_task(
Ok(task)
}
-// =============================================================================
-// Mesh Chat History Functions
-// =============================================================================
-
-/// Get or create the active conversation for an owner.
-pub async fn get_or_create_active_conversation(
- pool: &PgPool,
- owner_id: Uuid,
-) -> Result<MeshChatConversation, sqlx::Error> {
- // Try to get existing active conversation for this owner
- let existing = sqlx::query_as::<_, MeshChatConversation>(
- r#"
- SELECT *
- FROM mesh_chat_conversations
- WHERE is_active = true AND owner_id = $1
- LIMIT 1
- "#,
- )
- .bind(owner_id)
- .fetch_optional(pool)
- .await?;
-
- if let Some(conv) = existing {
- return Ok(conv);
- }
-
- // Create new conversation
- sqlx::query_as::<_, MeshChatConversation>(
- r#"
- INSERT INTO mesh_chat_conversations (owner_id, is_active)
- VALUES ($1, true)
- RETURNING *
- "#,
- )
- .bind(owner_id)
- .fetch_one(pool)
- .await
-}
-
-/// List messages for a conversation.
-pub async fn list_chat_messages(
- pool: &PgPool,
- conversation_id: Uuid,
- limit: Option<i32>,
-) -> Result<Vec<MeshChatMessageRecord>, sqlx::Error> {
- let limit = limit.unwrap_or(100);
- sqlx::query_as::<_, MeshChatMessageRecord>(
- r#"
- SELECT *
- FROM mesh_chat_messages
- WHERE conversation_id = $1
- ORDER BY created_at ASC
- LIMIT $2
- "#,
- )
- .bind(conversation_id)
- .bind(limit)
- .fetch_all(pool)
- .await
-}
-
-/// Add a message to a conversation.
-#[allow(clippy::too_many_arguments)]
-pub async fn add_chat_message(
- pool: &PgPool,
- conversation_id: Uuid,
- role: &str,
- content: &str,
- context_type: &str,
- context_task_id: Option<Uuid>,
- tool_calls: Option<serde_json::Value>,
- pending_questions: Option<serde_json::Value>,
-) -> Result<MeshChatMessageRecord, sqlx::Error> {
- sqlx::query_as::<_, MeshChatMessageRecord>(
- r#"
- INSERT INTO mesh_chat_messages
- (conversation_id, role, content, context_type, context_task_id, tool_calls, pending_questions)
- VALUES ($1, $2, $3, $4, $5, $6, $7)
- RETURNING *
- "#,
- )
- .bind(conversation_id)
- .bind(role)
- .bind(content)
- .bind(context_type)
- .bind(context_task_id)
- .bind(tool_calls)
- .bind(pending_questions)
- .fetch_one(pool)
- .await
-}
-
-/// Clear conversation (archive existing and create new).
-pub async fn clear_conversation(pool: &PgPool, owner_id: Uuid) -> Result<MeshChatConversation, sqlx::Error> {
- // Mark existing as inactive for this owner
- sqlx::query(
- r#"
- UPDATE mesh_chat_conversations
- SET is_active = false, updated_at = NOW()
- WHERE is_active = true AND owner_id = $1
- "#,
- )
- .bind(owner_id)
- .execute(pool)
- .await?;
-
- // Create new active conversation
- get_or_create_active_conversation(pool, owner_id).await
-}
-
-// =============================================================================
-// Contract Chat History Functions
-// =============================================================================
-
-/// Get or create the active conversation for a contract.
-pub async fn get_or_create_contract_conversation(
- pool: &PgPool,
- contract_id: Uuid,
- owner_id: Uuid,
-) -> Result<ContractChatConversation, sqlx::Error> {
- // Try to get existing active conversation for this contract
- let existing = sqlx::query_as::<_, ContractChatConversation>(
- r#"
- SELECT *
- FROM contract_chat_conversations
- WHERE is_active = true AND contract_id = $1 AND owner_id = $2
- LIMIT 1
- "#,
- )
- .bind(contract_id)
- .bind(owner_id)
- .fetch_optional(pool)
- .await?;
-
- if let Some(conv) = existing {
- return Ok(conv);
- }
-
- // Create new conversation
- sqlx::query_as::<_, ContractChatConversation>(
- r#"
- INSERT INTO contract_chat_conversations (contract_id, owner_id, is_active)
- VALUES ($1, $2, true)
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(owner_id)
- .fetch_one(pool)
- .await
-}
-
-/// List messages for a contract conversation.
-pub async fn list_contract_chat_messages(
- pool: &PgPool,
- conversation_id: Uuid,
- limit: Option<i32>,
-) -> Result<Vec<ContractChatMessageRecord>, sqlx::Error> {
- let limit = limit.unwrap_or(100);
- sqlx::query_as::<_, ContractChatMessageRecord>(
- r#"
- SELECT *
- FROM contract_chat_messages
- WHERE conversation_id = $1
- ORDER BY created_at ASC
- LIMIT $2
- "#,
- )
- .bind(conversation_id)
- .bind(limit)
- .fetch_all(pool)
- .await
-}
-
-/// Add a message to a contract conversation.
-pub async fn add_contract_chat_message(
- pool: &PgPool,
- conversation_id: Uuid,
- role: &str,
- content: &str,
- tool_calls: Option<serde_json::Value>,
- pending_questions: Option<serde_json::Value>,
-) -> Result<ContractChatMessageRecord, sqlx::Error> {
- sqlx::query_as::<_, ContractChatMessageRecord>(
- r#"
- INSERT INTO contract_chat_messages
- (conversation_id, role, content, tool_calls, pending_questions)
- VALUES ($1, $2, $3, $4, $5)
- RETURNING *
- "#,
- )
- .bind(conversation_id)
- .bind(role)
- .bind(content)
- .bind(tool_calls)
- .bind(pending_questions)
- .fetch_one(pool)
- .await
-}
-
-/// Clear contract conversation (archive existing and create new).
-pub async fn clear_contract_conversation(
- pool: &PgPool,
- contract_id: Uuid,
- owner_id: Uuid,
-) -> Result<ContractChatConversation, sqlx::Error> {
- // Mark existing as inactive for this contract
- sqlx::query(
- r#"
- UPDATE contract_chat_conversations
- SET is_active = false, updated_at = NOW()
- WHERE is_active = true AND contract_id = $1 AND owner_id = $2
- "#,
- )
- .bind(contract_id)
- .bind(owner_id)
- .execute(pool)
- .await?;
-
- // Create new active conversation
- get_or_create_contract_conversation(pool, contract_id, owner_id).await
-}
-
-// =============================================================================
-// Contract Type Template Functions (Owner-Scoped)
-// =============================================================================
-
-/// Create a new contract type template for a specific owner.
-pub async fn create_template_for_owner(
- pool: &PgPool,
- owner_id: Uuid,
- req: CreateTemplateRequest,
-) -> Result<ContractTypeTemplateRecord, sqlx::Error> {
- sqlx::query_as::<_, ContractTypeTemplateRecord>(
- r#"
- INSERT INTO contract_type_templates (owner_id, name, description, phases, default_phase, deliverables)
- VALUES ($1, $2, $3, $4, $5, $6)
- RETURNING *
- "#,
- )
- .bind(owner_id)
- .bind(&req.name)
- .bind(&req.description)
- .bind(serde_json::to_value(&req.phases).unwrap_or_default())
- .bind(&req.default_phase)
- .bind(match &req.deliverables {
- Some(d) => serde_json::to_value(d).ok(),
- None => None,
- })
- .fetch_one(pool)
- .await
-}
-
-/// Get a contract type template by ID, scoped to owner.
-pub async fn get_template_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
-) -> Result<Option<ContractTypeTemplateRecord>, sqlx::Error> {
- sqlx::query_as::<_, ContractTypeTemplateRecord>(
- r#"
- SELECT *
- FROM contract_type_templates
- WHERE id = $1 AND owner_id = $2
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Get a contract type template by ID (internal use, no owner scoping).
-pub async fn get_template_by_id(
- pool: &PgPool,
- id: Uuid,
-) -> Result<Option<ContractTypeTemplateRecord>, sqlx::Error> {
- sqlx::query_as::<_, ContractTypeTemplateRecord>(
- r#"
- SELECT *
- FROM contract_type_templates
- WHERE id = $1
- "#,
- )
- .bind(id)
- .fetch_optional(pool)
- .await
-}
-
-/// List all contract type templates for an owner, ordered by name.
-pub async fn list_templates_for_owner(
- pool: &PgPool,
- owner_id: Uuid,
-) -> Result<Vec<ContractTypeTemplateRecord>, sqlx::Error> {
- sqlx::query_as::<_, ContractTypeTemplateRecord>(
- r#"
- SELECT *
- FROM contract_type_templates
- WHERE owner_id = $1
- ORDER BY name ASC
- "#,
- )
- .bind(owner_id)
- .fetch_all(pool)
- .await
-}
-
-/// Update a contract type template for an owner.
-pub async fn update_template_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
- req: UpdateTemplateRequest,
-) -> Result<Option<ContractTypeTemplateRecord>, RepositoryError> {
- // Build dynamic update query
- let mut query = String::from("UPDATE contract_type_templates SET updated_at = NOW()");
- let mut param_idx = 3; // $1 = id, $2 = owner_id
-
- if req.name.is_some() {
- query.push_str(&format!(", name = ${}", param_idx));
- param_idx += 1;
- }
- if req.description.is_some() {
- query.push_str(&format!(", description = ${}", param_idx));
- param_idx += 1;
- }
- if req.phases.is_some() {
- query.push_str(&format!(", phases = ${}", param_idx));
- param_idx += 1;
- }
- if req.default_phase.is_some() {
- query.push_str(&format!(", default_phase = ${}", param_idx));
- param_idx += 1;
- }
- if req.deliverables.is_some() {
- query.push_str(&format!(", deliverables = ${}", param_idx));
- param_idx += 1;
- }
-
- // Optimistic locking
- if req.version.is_some() {
- query.push_str(&format!(", version = version + 1 WHERE id = $1 AND owner_id = $2 AND version = ${}", param_idx));
- } else {
- query.push_str(", version = version + 1 WHERE id = $1 AND owner_id = $2");
- }
- query.push_str(" RETURNING *");
-
- let mut sql_query = sqlx::query_as::<_, ContractTypeTemplateRecord>(&query);
- sql_query = sql_query.bind(id).bind(owner_id);
-
- if let Some(ref name) = req.name {
- sql_query = sql_query.bind(name);
- }
- if let Some(ref description) = req.description {
- sql_query = sql_query.bind(description);
- }
- if let Some(ref phases) = req.phases {
- sql_query = sql_query.bind(serde_json::to_value(phases).unwrap_or_default());
- }
- if let Some(ref default_phase) = req.default_phase {
- sql_query = sql_query.bind(default_phase);
- }
- if let Some(ref deliverables) = req.deliverables {
- sql_query = sql_query.bind(serde_json::to_value(deliverables).unwrap_or_default());
- }
- if let Some(version) = req.version {
- sql_query = sql_query.bind(version);
- }
-
- match sql_query.fetch_optional(pool).await {
- Ok(result) => {
- if result.is_none() && req.version.is_some() {
- // Check if it's a version conflict
- if let Some(current) = get_template_for_owner(pool, id, owner_id).await? {
- return Err(RepositoryError::VersionConflict {
- expected: req.version.unwrap(),
- actual: current.version,
- });
- }
- }
- Ok(result)
- }
- Err(e) => Err(RepositoryError::Database(e)),
- }
-}
-
-/// Delete a contract type template for an owner.
-pub async fn delete_template_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
-) -> Result<bool, sqlx::Error> {
- let result = sqlx::query(
- r#"
- DELETE FROM contract_type_templates
- WHERE id = $1 AND owner_id = $2
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected() > 0)
-}
-
-/// Helper function to build PhaseConfig from a template.
-pub fn build_phase_config_from_template(template: &ContractTypeTemplateRecord) -> PhaseConfig {
- PhaseConfig {
- phases: template.phases.clone(),
- default_phase: template.default_phase.clone(),
- deliverables: template.deliverables.clone().unwrap_or_default(),
- }
-}
-
-/// Helper function to build PhaseConfig for built-in contract types.
-pub fn build_phase_config_for_builtin(contract_type: &str) -> PhaseConfig {
- match contract_type {
- "simple" => PhaseConfig {
- phases: vec![
- PhaseDefinition { id: "plan".to_string(), name: "Plan".to_string(), order: 0 },
- PhaseDefinition { id: "execute".to_string(), name: "Execute".to_string(), order: 1 },
- ],
- default_phase: "plan".to_string(),
- deliverables: [
- ("plan".to_string(), vec![DeliverableDefinition {
- id: "plan-document".to_string(),
- name: "Plan".to_string(),
- priority: "required".to_string(),
- }]),
- ("execute".to_string(), vec![DeliverableDefinition {
- id: "pull-request".to_string(),
- name: "Pull Request".to_string(),
- priority: "required".to_string(),
- }]),
- ].into_iter().collect(),
- },
- "specification" => PhaseConfig {
- phases: vec![
- PhaseDefinition { id: "research".to_string(), name: "Research".to_string(), order: 0 },
- PhaseDefinition { id: "specify".to_string(), name: "Specify".to_string(), order: 1 },
- PhaseDefinition { id: "plan".to_string(), name: "Plan".to_string(), order: 2 },
- PhaseDefinition { id: "execute".to_string(), name: "Execute".to_string(), order: 3 },
- PhaseDefinition { id: "review".to_string(), name: "Review".to_string(), order: 4 },
- ],
- default_phase: "research".to_string(),
- deliverables: [
- ("research".to_string(), vec![DeliverableDefinition {
- id: "research-notes".to_string(),
- name: "Research Notes".to_string(),
- priority: "required".to_string(),
- }]),
- ("specify".to_string(), vec![DeliverableDefinition {
- id: "requirements-document".to_string(),
- name: "Requirements Document".to_string(),
- priority: "required".to_string(),
- }]),
- ("plan".to_string(), vec![DeliverableDefinition {
- id: "plan-document".to_string(),
- name: "Plan".to_string(),
- priority: "required".to_string(),
- }]),
- ("execute".to_string(), vec![DeliverableDefinition {
- id: "pull-request".to_string(),
- name: "Pull Request".to_string(),
- priority: "required".to_string(),
- }]),
- ("review".to_string(), vec![DeliverableDefinition {
- id: "release-notes".to_string(),
- name: "Release Notes".to_string(),
- priority: "required".to_string(),
- }]),
- ].into_iter().collect(),
- },
- "execute" | _ => PhaseConfig {
- phases: vec![
- PhaseDefinition { id: "execute".to_string(), name: "Execute".to_string(), order: 0 },
- ],
- default_phase: "execute".to_string(),
- deliverables: std::collections::HashMap::new(),
- },
- }
-}
-
-// =============================================================================
-// Contract Functions (Owner-Scoped)
-// =============================================================================
-
-/// Create a new contract for a specific owner.
-/// Supports both built-in contract types (simple, specification, execute) and custom templates.
-pub async fn create_contract_for_owner(
- pool: &PgPool,
- owner_id: Uuid,
- req: CreateContractRequest,
-) -> Result<Contract, sqlx::Error> {
- // Determine phase configuration based on template_id or contract_type
- let (phase_config, contract_type_str, default_phase): (PhaseConfig, String, String) =
- if let Some(template_id) = req.template_id {
- // Look up the custom template
- let template = get_template_by_id(pool, template_id)
- .await?
- .ok_or_else(|| {
- sqlx::Error::Protocol(format!("Template not found: {}", template_id))
- })?;
-
- let config = build_phase_config_from_template(&template);
- let default = config.default_phase.clone();
- // For custom templates, store the template name as the contract_type
- (config, template.name.clone(), default)
- } else {
- // Use built-in contract type
- let contract_type = req.contract_type.as_deref().unwrap_or("simple");
-
- // Validate contract type
- let valid_types = ["simple", "specification", "execute"];
- if !valid_types.contains(&contract_type) {
- return Err(sqlx::Error::Protocol(format!(
- "Invalid contract_type '{}'. Must be one of: {} or provide a template_id",
- contract_type,
- valid_types.join(", ")
- )));
- }
-
- let config = build_phase_config_for_builtin(contract_type);
- let default = config.default_phase.clone();
- (config, contract_type.to_string(), default)
- };
-
- // Get valid phase IDs from the configuration
- let valid_phase_ids: Vec<String> = phase_config.phases.iter().map(|p| p.id.clone()).collect();
-
- // Use provided initial_phase or default based on contract type/template
- let phase = req.initial_phase.as_deref().unwrap_or(&default_phase);
-
- // Validate the phase is valid for this contract type/template
- if !valid_phase_ids.contains(&phase.to_string()) {
- return Err(sqlx::Error::Protocol(format!(
- "Invalid initial_phase '{}' for contract type '{}'. Must be one of: {}",
- phase,
- contract_type_str,
- valid_phase_ids.join(", ")
- )));
- }
-
- let autonomous_loop = req.autonomous_loop.unwrap_or(false);
- let phase_guard = req.phase_guard.unwrap_or(false);
- let local_only = req.local_only.unwrap_or(false);
- let auto_merge_local = req.auto_merge_local.unwrap_or(false);
-
- // Serialize phase_config to JSON
- let phase_config_json = serde_json::to_value(&phase_config).ok();
-
- sqlx::query_as::<_, Contract>(
- r#"
- INSERT INTO contracts (owner_id, name, description, contract_type, phase, autonomous_loop, phase_guard, local_only, auto_merge_local, phase_config)
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
- RETURNING *
- "#,
- )
- .bind(owner_id)
- .bind(&req.name)
- .bind(&req.description)
- .bind(&contract_type_str)
- .bind(phase)
- .bind(autonomous_loop)
- .bind(phase_guard)
- .bind(local_only)
- .bind(auto_merge_local)
- .bind(phase_config_json)
- .fetch_one(pool)
- .await
-}
-
-/// Get a contract by ID, scoped to owner.
-pub async fn get_contract_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
-) -> Result<Option<Contract>, sqlx::Error> {
- sqlx::query_as::<_, Contract>(
- r#"
- SELECT *
- FROM contracts
- WHERE id = $1 AND owner_id = $2
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .fetch_optional(pool)
- .await
-}
-
-/// List all contracts for an owner, ordered by created_at DESC.
-pub async fn list_contracts_for_owner(
- pool: &PgPool,
- owner_id: Uuid,
-) -> Result<Vec<ContractSummary>, sqlx::Error> {
- sqlx::query_as::<_, ContractSummary>(
- r#"
- SELECT
- c.id, c.name, c.description, c.contract_type, c.phase, c.status,
- c.supervisor_task_id, c.local_only, c.auto_merge_local, c.version, c.created_at,
- (SELECT COUNT(*) FROM files WHERE contract_id = c.id) as file_count,
- (SELECT COUNT(*) FROM tasks WHERE contract_id = c.id) as task_count,
- (SELECT COUNT(*) FROM contract_repositories WHERE contract_id = c.id) as repository_count
- FROM contracts c
- WHERE c.owner_id = $1
- ORDER BY c.created_at DESC
- "#,
- )
- .bind(owner_id)
- .fetch_all(pool)
- .await
-}
-
-/// Get contract summary by ID.
-pub async fn get_contract_summary_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
-) -> Result<Option<ContractSummary>, sqlx::Error> {
- sqlx::query_as::<_, ContractSummary>(
- r#"
- SELECT
- c.id, c.name, c.description, c.contract_type, c.phase, c.status,
- c.supervisor_task_id, c.local_only, c.auto_merge_local, c.version, c.created_at,
- (SELECT COUNT(*) FROM files WHERE contract_id = c.id) as file_count,
- (SELECT COUNT(*) FROM tasks WHERE contract_id = c.id) as task_count,
- (SELECT COUNT(*) FROM contract_repositories WHERE contract_id = c.id) as repository_count
- FROM contracts c
- WHERE c.id = $1 AND c.owner_id = $2
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Update a contract by ID with optimistic locking, scoped to owner.
-pub async fn update_contract_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
- req: UpdateContractRequest,
-) -> Result<Option<Contract>, RepositoryError> {
- let existing = get_contract_for_owner(pool, id, owner_id).await?;
- let Some(existing) = existing else {
- return Ok(None);
- };
-
- // Check version if provided (optimistic locking)
- if let Some(expected_version) = req.version {
- if existing.version != expected_version {
- return Err(RepositoryError::VersionConflict {
- expected: expected_version,
- actual: existing.version,
- });
- }
- }
-
- // Apply updates
- let name = req.name.unwrap_or(existing.name);
- let description = req.description.or(existing.description);
- let phase = req.phase.unwrap_or(existing.phase);
- let status = req.status.unwrap_or(existing.status);
- let supervisor_task_id = req.supervisor_task_id.or(existing.supervisor_task_id);
- let autonomous_loop = req.autonomous_loop.unwrap_or(existing.autonomous_loop);
- let phase_guard = req.phase_guard.unwrap_or(existing.phase_guard);
- let local_only = req.local_only.unwrap_or(existing.local_only);
- let auto_merge_local = req.auto_merge_local.unwrap_or(existing.auto_merge_local);
-
- let result = if req.version.is_some() {
- sqlx::query_as::<_, Contract>(
- r#"
- UPDATE contracts
- SET name = $3, description = $4, phase = $5, status = $6,
- supervisor_task_id = $7, autonomous_loop = $8, phase_guard = $9, local_only = $10, auto_merge_local = $11, version = version + 1, updated_at = NOW()
- WHERE id = $1 AND owner_id = $2 AND version = $12
- RETURNING *
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .bind(&name)
- .bind(&description)
- .bind(&phase)
- .bind(&status)
- .bind(supervisor_task_id)
- .bind(autonomous_loop)
- .bind(phase_guard)
- .bind(local_only)
- .bind(auto_merge_local)
- .bind(req.version.unwrap())
- .fetch_optional(pool)
- .await?
- } else {
- sqlx::query_as::<_, Contract>(
- r#"
- UPDATE contracts
- SET name = $3, description = $4, phase = $5, status = $6,
- supervisor_task_id = $7, autonomous_loop = $8, phase_guard = $9, local_only = $10, auto_merge_local = $11, version = version + 1, updated_at = NOW()
- WHERE id = $1 AND owner_id = $2
- RETURNING *
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .bind(&name)
- .bind(&description)
- .bind(&phase)
- .bind(&status)
- .bind(supervisor_task_id)
- .bind(autonomous_loop)
- .bind(phase_guard)
- .bind(local_only)
- .bind(auto_merge_local)
- .fetch_optional(pool)
- .await?
- };
-
- // If versioned update returned None, there was a race condition
- if result.is_none() && req.version.is_some() {
- if let Some(current) = get_contract_for_owner(pool, id, owner_id).await? {
- return Err(RepositoryError::VersionConflict {
- expected: req.version.unwrap(),
- actual: current.version,
- });
- }
- }
-
- Ok(result)
-}
-
-/// Delete a contract by ID, scoped to owner.
-pub async fn delete_contract_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
-) -> Result<bool, sqlx::Error> {
- let result = sqlx::query(
- r#"
- DELETE FROM contracts
- WHERE id = $1 AND owner_id = $2
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected() > 0)
-}
-
-/// Change contract phase and record event.
-///
-/// This is the simple version without version checking. Use `change_contract_phase_with_version`
-/// for explicit version conflict detection.
-pub async fn change_contract_phase_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
- new_phase: &str,
-) -> Result<Option<Contract>, sqlx::Error> {
- // Get current phase
- let existing = get_contract_for_owner(pool, id, owner_id).await?;
- let Some(existing) = existing else {
- return Ok(None);
- };
-
- let previous_phase = existing.phase.clone();
-
- // Update phase
- let contract = sqlx::query_as::<_, Contract>(
- r#"
- UPDATE contracts
- SET phase = $3, version = version + 1, updated_at = NOW()
- WHERE id = $1 AND owner_id = $2
- RETURNING *
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .bind(new_phase)
- .fetch_optional(pool)
- .await?;
-
- // Record event
- if contract.is_some() {
- sqlx::query(
- r#"
- INSERT INTO contract_events (contract_id, event_type, previous_phase, new_phase)
- VALUES ($1, 'phase_change', $2, $3)
- "#,
- )
- .bind(id)
- .bind(&previous_phase)
- .bind(new_phase)
- .execute(pool)
- .await?;
- }
-
- Ok(contract)
-}
-
-/// Change contract phase with explicit version checking for conflict detection.
-///
-/// Uses `SELECT ... FOR UPDATE` to lock the row and prevent race conditions.
-/// Returns `PhaseChangeResult::VersionConflict` if the expected version doesn't match.
-pub async fn change_contract_phase_with_version(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
- new_phase: &str,
- expected_version: Option<i32>,
-) -> Result<PhaseChangeResult, sqlx::Error> {
- // Start a transaction to ensure atomicity with row locking
- let mut tx = pool.begin().await?;
-
- // Lock the row with SELECT FOR UPDATE and get current state
- let existing: Option<Contract> = sqlx::query_as::<_, Contract>(
- r#"
- SELECT *
- FROM contracts
- WHERE id = $1 AND owner_id = $2
- FOR UPDATE
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .fetch_optional(&mut *tx)
- .await?;
-
- let Some(existing) = existing else {
- tx.rollback().await?;
- return Ok(PhaseChangeResult::NotFound);
- };
-
- // Check version if provided (optimistic locking)
- if let Some(expected) = expected_version {
- if existing.version != expected {
- tx.rollback().await?;
- return Ok(PhaseChangeResult::VersionConflict {
- expected,
- actual: existing.version,
- current_phase: existing.phase,
- });
- }
- }
-
- // Validate the phase transition is allowed
- let valid_phases = existing.valid_phase_ids();
- if !valid_phases.contains(&new_phase.to_string()) {
- tx.rollback().await?;
- return Ok(PhaseChangeResult::ValidationFailed {
- reason: format!(
- "Invalid phase '{}' for contract type '{}'",
- new_phase, existing.contract_type
- ),
- missing_requirements: vec![format!(
- "Phase must be one of: {}",
- valid_phases.join(", ")
- )],
- });
- }
-
- let previous_phase = existing.phase.clone();
-
- // Update phase with version increment
- let contract = sqlx::query_as::<_, Contract>(
- r#"
- UPDATE contracts
- SET phase = $3, version = version + 1, updated_at = NOW()
- WHERE id = $1 AND owner_id = $2
- RETURNING *
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .bind(new_phase)
- .fetch_one(&mut *tx)
- .await?;
-
- // Record event
- sqlx::query(
- r#"
- INSERT INTO contract_events (contract_id, event_type, previous_phase, new_phase)
- VALUES ($1, 'phase_change', $2, $3)
- "#,
- )
- .bind(id)
- .bind(&previous_phase)
- .bind(new_phase)
- .execute(&mut *tx)
- .await?;
-
- // Commit the transaction
- tx.commit().await?;
-
- Ok(PhaseChangeResult::Success(contract))
-}
-
-// =============================================================================
-// Contract Repository Functions
-// =============================================================================
-
-/// List repositories for a contract.
-pub async fn list_contract_repositories(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Vec<ContractRepository>, sqlx::Error> {
- sqlx::query_as::<_, ContractRepository>(
- r#"
- SELECT *
- FROM contract_repositories
- WHERE contract_id = $1
- ORDER BY is_primary DESC, created_at ASC
- "#,
- )
- .bind(contract_id)
- .fetch_all(pool)
- .await
-}
-
-/// Add a remote repository to a contract.
-pub async fn add_remote_repository(
- pool: &PgPool,
- contract_id: Uuid,
- name: &str,
- repository_url: &str,
- is_primary: bool,
-) -> Result<ContractRepository, sqlx::Error> {
- // If is_primary, clear other primaries first
- if is_primary {
- sqlx::query(
- r#"
- UPDATE contract_repositories
- SET is_primary = false, updated_at = NOW()
- WHERE contract_id = $1 AND is_primary = true
- "#,
- )
- .bind(contract_id)
- .execute(pool)
- .await?;
- }
- sqlx::query_as::<_, ContractRepository>(
- r#"
- INSERT INTO contract_repositories (contract_id, name, repository_url, source_type, status, is_primary)
- VALUES ($1, $2, $3, 'remote', 'ready', $4)
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(name)
- .bind(repository_url)
- .bind(is_primary)
- .fetch_one(pool)
- .await
-}
-
-/// Add a local repository to a contract.
-pub async fn add_local_repository(
- pool: &PgPool,
- contract_id: Uuid,
- name: &str,
- local_path: &str,
- is_primary: bool,
-) -> Result<ContractRepository, sqlx::Error> {
- // If is_primary, clear other primaries first
- if is_primary {
- sqlx::query(
- r#"
- UPDATE contract_repositories
- SET is_primary = false, updated_at = NOW()
- WHERE contract_id = $1 AND is_primary = true
- "#,
- )
- .bind(contract_id)
- .execute(pool)
- .await?;
- }
-
- sqlx::query_as::<_, ContractRepository>(
+/// Get task tree from a specific root task.
+pub async fn get_task_tree(pool: &PgPool, root_task_id: Uuid) -> Result<Vec<Task>, sqlx::Error> {
+ sqlx::query_as::<_, Task>(
r#"
- INSERT INTO contract_repositories (contract_id, name, local_path, source_type, status, is_primary)
- VALUES ($1, $2, $3, 'local', 'ready', $4)
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(name)
- .bind(local_path)
- .bind(is_primary)
- .fetch_one(pool)
- .await
-}
-
-/// Create a managed repository (daemon will create it).
-pub async fn create_managed_repository(
- pool: &PgPool,
- contract_id: Uuid,
- name: &str,
- is_primary: bool,
-) -> Result<ContractRepository, sqlx::Error> {
- // If is_primary, clear other primaries first
- if is_primary {
- sqlx::query(
- r#"
- UPDATE contract_repositories
- SET is_primary = false, updated_at = NOW()
- WHERE contract_id = $1 AND is_primary = true
- "#,
+ WITH RECURSIVE task_tree AS (
+ -- Base case: the root task
+ SELECT * FROM tasks WHERE id = $1
+ UNION ALL
+ -- Recursive case: children of current level
+ SELECT t.* FROM tasks t
+ JOIN task_tree tt ON t.parent_task_id = tt.id
)
- .bind(contract_id)
- .execute(pool)
- .await?;
- }
-
- sqlx::query_as::<_, ContractRepository>(
- r#"
- INSERT INTO contract_repositories (contract_id, name, source_type, status, is_primary)
- VALUES ($1, $2, 'managed', 'pending', $3)
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(name)
- .bind(is_primary)
- .fetch_one(pool)
- .await
-}
-
-/// Delete a repository from a contract.
-pub async fn delete_contract_repository(
- pool: &PgPool,
- repo_id: Uuid,
- contract_id: Uuid,
-) -> Result<bool, sqlx::Error> {
- let result = sqlx::query(
- r#"
- DELETE FROM contract_repositories
- WHERE id = $1 AND contract_id = $2
- "#,
- )
- .bind(repo_id)
- .bind(contract_id)
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected() > 0)
-}
-
-/// Set a repository as primary (and clear others).
-pub async fn set_repository_primary(
- pool: &PgPool,
- repo_id: Uuid,
- contract_id: Uuid,
-) -> Result<bool, sqlx::Error> {
- // Clear other primaries
- sqlx::query(
- r#"
- UPDATE contract_repositories
- SET is_primary = false, updated_at = NOW()
- WHERE contract_id = $1 AND is_primary = true
- "#,
- )
- .bind(contract_id)
- .execute(pool)
- .await?;
-
- // Set this one as primary
- let result = sqlx::query(
- r#"
- UPDATE contract_repositories
- SET is_primary = true, updated_at = NOW()
- WHERE id = $1 AND contract_id = $2
- "#,
- )
- .bind(repo_id)
- .bind(contract_id)
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected() > 0)
-}
-
-/// Update managed repository status (used by daemon).
-pub async fn update_managed_repository_status(
- pool: &PgPool,
- repo_id: Uuid,
- status: &str,
- repository_url: Option<&str>,
-) -> Result<Option<ContractRepository>, sqlx::Error> {
- sqlx::query_as::<_, ContractRepository>(
- r#"
- UPDATE contract_repositories
- SET status = $2, repository_url = COALESCE($3, repository_url), updated_at = NOW()
- WHERE id = $1
- RETURNING *
- "#,
- )
- .bind(repo_id)
- .bind(status)
- .bind(repository_url)
- .fetch_optional(pool)
- .await
-}
-
-// =============================================================================
-// Contract Task Association Functions
-// =============================================================================
-
-/// Add a task to a contract.
-pub async fn add_task_to_contract(
- pool: &PgPool,
- contract_id: Uuid,
- task_id: Uuid,
- owner_id: Uuid,
-) -> Result<bool, sqlx::Error> {
- let result = sqlx::query(
- r#"
- UPDATE tasks
- SET contract_id = $2, updated_at = NOW()
- WHERE id = $1 AND owner_id = $3
- "#,
- )
- .bind(task_id)
- .bind(contract_id)
- .bind(owner_id)
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected() > 0)
-}
-
-/// Remove a task from a contract.
-pub async fn remove_task_from_contract(
- pool: &PgPool,
- contract_id: Uuid,
- task_id: Uuid,
- owner_id: Uuid,
-) -> Result<bool, sqlx::Error> {
- let result = sqlx::query(
- r#"
- UPDATE tasks
- SET contract_id = NULL, updated_at = NOW()
- WHERE id = $1 AND contract_id = $2 AND owner_id = $3
- "#,
- )
- .bind(task_id)
- .bind(contract_id)
- .bind(owner_id)
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected() > 0)
-}
-
-/// List files in a contract.
-pub async fn list_files_in_contract(
- pool: &PgPool,
- contract_id: Uuid,
- owner_id: Uuid,
-) -> Result<Vec<FileSummary>, sqlx::Error> {
- // Use a manual query since FileSummary doesn't have a FromRow derive with all the computed fields
- let files = sqlx::query_as::<_, File>(
- r#"
- SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
- FROM files
- WHERE contract_id = $1 AND owner_id = $2
- ORDER BY created_at DESC
- "#,
- )
- .bind(contract_id)
- .bind(owner_id)
- .fetch_all(pool)
- .await?;
-
- Ok(files.into_iter().map(FileSummary::from).collect())
-}
-
-/// List tasks in a contract.
-pub async fn list_tasks_in_contract(
- pool: &PgPool,
- contract_id: Uuid,
- owner_id: Uuid,
-) -> Result<Vec<TaskSummary>, sqlx::Error> {
- sqlx::query_as::<_, TaskSummary>(
- r#"
- SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
- t.parent_task_id, t.depth, t.name, t.status, t.priority,
- t.progress_summary,
- (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
- FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
- WHERE t.contract_id = $1 AND t.owner_id = $2
- ORDER BY t.priority DESC, t.created_at DESC
- "#,
- )
- .bind(contract_id)
- .bind(owner_id)
- .fetch_all(pool)
- .await
-}
-
-/// Minimal task info for worktree cleanup operations.
-#[derive(Debug, Clone, sqlx::FromRow)]
-pub struct TaskWorktreeInfo {
- pub id: Uuid,
- pub daemon_id: Option<Uuid>,
- pub overlay_path: Option<String>,
- /// If set, this task shares the worktree of the specified supervisor task.
- /// Should NOT have its worktree deleted during cleanup.
- pub supervisor_worktree_task_id: Option<Uuid>,
-}
-
-/// List tasks in a contract with their daemon/worktree info.
-/// Used for cleaning up worktrees when a contract is completed or deleted.
-pub async fn list_contract_tasks_with_worktree_info(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Vec<TaskWorktreeInfo>, sqlx::Error> {
- sqlx::query_as::<_, TaskWorktreeInfo>(
- r#"
- SELECT id, daemon_id, overlay_path, supervisor_worktree_task_id
- FROM tasks
- WHERE contract_id = $1 AND (daemon_id IS NOT NULL OR overlay_path IS NOT NULL)
- "#,
- )
- .bind(contract_id)
- .fetch_all(pool)
- .await
-}
-
-// =============================================================================
-// Contract Events
-// =============================================================================
-
-/// List events for a contract.
-pub async fn list_contract_events(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Vec<ContractEvent>, sqlx::Error> {
- sqlx::query_as::<_, ContractEvent>(
- r#"
- SELECT *
- FROM contract_events
- WHERE contract_id = $1
- ORDER BY created_at DESC
+ SELECT * FROM task_tree
+ ORDER BY depth, created_at
"#,
)
- .bind(contract_id)
+ .bind(root_task_id)
.fetch_all(pool)
.await
}
-/// Record a contract event.
-pub async fn record_contract_event(
- pool: &PgPool,
- contract_id: Uuid,
- event_type: &str,
- event_data: Option<serde_json::Value>,
-) -> Result<ContractEvent, sqlx::Error> {
- sqlx::query_as::<_, ContractEvent>(
- r#"
- INSERT INTO contract_events (contract_id, event_type, event_data)
- VALUES ($1, $2, $3)
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(event_type)
- .bind(event_data)
- .fetch_one(pool)
- .await
-}
-
// ============================================================================
// Task Checkpoints
// ============================================================================
@@ -3501,713 +2100,6 @@ pub async fn list_task_checkpoints(
}
// ============================================================================
-// Supervisor State
-// ============================================================================
-
-/// Create or update supervisor state for a contract.
-pub async fn upsert_supervisor_state(
- pool: &PgPool,
- contract_id: Uuid,
- task_id: Uuid,
- conversation_history: serde_json::Value,
- pending_task_ids: &[Uuid],
- phase: &str,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- INSERT INTO supervisor_states (contract_id, task_id, conversation_history, pending_task_ids, phase, last_activity)
- VALUES ($1, $2, $3, $4, $5, NOW())
- ON CONFLICT (contract_id) DO UPDATE SET
- task_id = EXCLUDED.task_id,
- conversation_history = EXCLUDED.conversation_history,
- pending_task_ids = EXCLUDED.pending_task_ids,
- phase = EXCLUDED.phase,
- last_activity = NOW(),
- updated_at = NOW()
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(task_id)
- .bind(conversation_history)
- .bind(pending_task_ids)
- .bind(phase)
- .fetch_one(pool)
- .await
-}
-
-/// Get supervisor state for a contract.
-pub async fn get_supervisor_state(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Option<SupervisorState>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>("SELECT * FROM supervisor_states WHERE contract_id = $1")
- .bind(contract_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Get supervisor state by task ID.
-pub async fn get_supervisor_state_by_task(
- pool: &PgPool,
- task_id: Uuid,
-) -> Result<Option<SupervisorState>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>("SELECT * FROM supervisor_states WHERE task_id = $1")
- .bind(task_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Update supervisor conversation history.
-pub async fn update_supervisor_conversation(
- pool: &PgPool,
- contract_id: Uuid,
- conversation_history: serde_json::Value,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET conversation_history = $1,
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $2
- RETURNING *
- "#,
- )
- .bind(conversation_history)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Update supervisor pending tasks.
-pub async fn update_supervisor_pending_tasks(
- pool: &PgPool,
- contract_id: Uuid,
- pending_task_ids: &[Uuid],
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET pending_task_ids = $1,
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $2
- RETURNING *
- "#,
- )
- .bind(pending_task_ids)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Update supervisor state with detailed activity tracking.
-/// Called at key save points: LLM response, task spawn, question asked, phase change.
-pub async fn update_supervisor_detailed_state(
- pool: &PgPool,
- contract_id: Uuid,
- state: &str,
- current_activity: Option<&str>,
- progress: i32,
- error_message: Option<&str>,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET state = $1,
- current_activity = $2,
- progress = $3,
- error_message = $4,
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $5
- RETURNING *
- "#,
- )
- .bind(state)
- .bind(current_activity)
- .bind(progress)
- .bind(error_message)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Add a spawned task ID to the supervisor's list.
-pub async fn add_supervisor_spawned_task(
- pool: &PgPool,
- contract_id: Uuid,
- task_id: Uuid,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET spawned_task_ids = array_append(spawned_task_ids, $1),
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $2
- RETURNING *
- "#,
- )
- .bind(task_id)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Add a pending question to the supervisor state.
-pub async fn add_supervisor_pending_question(
- pool: &PgPool,
- contract_id: Uuid,
- question: serde_json::Value,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET pending_questions = pending_questions || $1::jsonb,
- state = 'waiting_for_user',
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $2
- RETURNING *
- "#,
- )
- .bind(question)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Remove a pending question by ID.
-pub async fn remove_supervisor_pending_question(
- pool: &PgPool,
- contract_id: Uuid,
- question_id: Uuid,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET pending_questions = (
- SELECT COALESCE(jsonb_agg(elem), '[]'::jsonb)
- FROM jsonb_array_elements(pending_questions) elem
- WHERE (elem->>'id')::uuid != $1
- ),
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $2
- RETURNING *
- "#,
- )
- .bind(question_id)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Comprehensive state save - used at major save points.
-pub async fn save_supervisor_state_full(
- pool: &PgPool,
- contract_id: Uuid,
- task_id: Uuid,
- conversation_history: serde_json::Value,
- pending_task_ids: &[Uuid],
- phase: &str,
- state: &str,
- current_activity: Option<&str>,
- progress: i32,
- error_message: Option<&str>,
- spawned_task_ids: &[Uuid],
- pending_questions: serde_json::Value,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- INSERT INTO supervisor_states (
- contract_id, task_id, conversation_history, pending_task_ids, phase,
- state, current_activity, progress, error_message, spawned_task_ids,
- pending_questions, last_activity
- )
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW())
- ON CONFLICT (contract_id) DO UPDATE SET
- task_id = EXCLUDED.task_id,
- conversation_history = EXCLUDED.conversation_history,
- pending_task_ids = EXCLUDED.pending_task_ids,
- phase = EXCLUDED.phase,
- state = EXCLUDED.state,
- current_activity = EXCLUDED.current_activity,
- progress = EXCLUDED.progress,
- error_message = EXCLUDED.error_message,
- spawned_task_ids = EXCLUDED.spawned_task_ids,
- pending_questions = EXCLUDED.pending_questions,
- last_activity = NOW(),
- updated_at = NOW()
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(task_id)
- .bind(conversation_history)
- .bind(pending_task_ids)
- .bind(phase)
- .bind(state)
- .bind(current_activity)
- .bind(progress)
- .bind(error_message)
- .bind(spawned_task_ids)
- .bind(pending_questions)
- .fetch_one(pool)
- .await
-}
-
-/// Mark supervisor as restored from a crash/interruption.
-pub async fn mark_supervisor_restored(
- pool: &PgPool,
- contract_id: Uuid,
- restoration_source: &str,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET restoration_count = restoration_count + 1,
- last_restored_at = NOW(),
- restoration_source = $1,
- state = 'initializing',
- error_message = NULL,
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $2
- RETURNING *
- "#,
- )
- .bind(restoration_source)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Get supervisors with pending questions (for re-delivery after restoration).
-pub async fn get_supervisors_with_pending_questions(
- pool: &PgPool,
- owner_id: Uuid,
-) -> Result<Vec<SupervisorState>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- SELECT ss.*
- FROM supervisor_states ss
- JOIN contracts c ON c.id = ss.contract_id
- WHERE c.owner_id = $1
- AND ss.pending_questions != '[]'::jsonb
- AND c.status = 'active'
- ORDER BY ss.last_activity DESC
- "#,
- )
- .bind(owner_id)
- .fetch_all(pool)
- .await
-}
-
-/// Get supervisor state with full details for restoration.
-/// Includes validation info.
-pub async fn get_supervisor_state_for_restoration(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Option<SupervisorState>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- SELECT * FROM supervisor_states WHERE contract_id = $1
- "#,
- )
- .bind(contract_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Validate spawned tasks are in expected states.
-/// Returns map of task_id -> (status, updated_at).
-pub async fn validate_spawned_tasks(
- pool: &PgPool,
- task_ids: &[Uuid],
-) -> Result<std::collections::HashMap<Uuid, (String, chrono::DateTime<Utc>)>, sqlx::Error> {
- use sqlx::Row;
-
- let rows = sqlx::query(
- r#"
- SELECT id, status, updated_at
- FROM tasks
- WHERE id = ANY($1)
- "#,
- )
- .bind(task_ids)
- .fetch_all(pool)
- .await?;
-
- let mut result = std::collections::HashMap::new();
- for row in rows {
- let id: Uuid = row.get("id");
- let status: String = row.get("status");
- let updated_at: chrono::DateTime<Utc> = row.get("updated_at");
- result.insert(id, (status, updated_at));
- }
- Ok(result)
-}
-
-/// Update supervisor state when phase changes.
-pub async fn update_supervisor_phase(
- pool: &PgPool,
- contract_id: Uuid,
- new_phase: &str,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET phase = $1,
- state = 'working',
- current_activity = 'Phase changed to ' || $1,
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $2
- RETURNING *
- "#,
- )
- .bind(new_phase)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Update supervisor state on heartbeat (lightweight update).
-pub async fn update_supervisor_heartbeat_state(
- pool: &PgPool,
- contract_id: Uuid,
- state: &str,
- current_activity: Option<&str>,
- progress: i32,
- pending_task_ids: &[Uuid],
-) -> Result<(), sqlx::Error> {
- sqlx::query(
- r#"
- UPDATE supervisor_states
- SET state = $1,
- current_activity = $2,
- progress = $3,
- pending_task_ids = $4,
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $5
- "#,
- )
- .bind(state)
- .bind(current_activity)
- .bind(progress)
- .bind(pending_task_ids)
- .bind(contract_id)
- .execute(pool)
- .await?;
- Ok(())
-}
-
-// ============================================================================
-// Supervisor Heartbeats
-// ============================================================================
-
-/// Record a supervisor heartbeat.
-/// This creates a historical record for monitoring and dead supervisor detection.
-pub async fn create_supervisor_heartbeat(
- pool: &PgPool,
- supervisor_task_id: Uuid,
- contract_id: Uuid,
- state: &str,
- phase: &str,
- current_activity: Option<&str>,
- progress: i32,
- pending_task_ids: &[Uuid],
-) -> Result<SupervisorHeartbeatRecord, sqlx::Error> {
- sqlx::query_as::<_, SupervisorHeartbeatRecord>(
- r#"
- INSERT INTO supervisor_heartbeats (
- supervisor_task_id, contract_id, state, phase, current_activity, progress, pending_task_ids, timestamp
- )
- VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
- RETURNING *
- "#,
- )
- .bind(supervisor_task_id)
- .bind(contract_id)
- .bind(state)
- .bind(phase)
- .bind(current_activity)
- .bind(progress)
- .bind(pending_task_ids)
- .fetch_one(pool)
- .await
-}
-
-/// Get the latest heartbeat for a supervisor task.
-pub async fn get_latest_supervisor_heartbeat(
- pool: &PgPool,
- supervisor_task_id: Uuid,
-) -> Result<Option<SupervisorHeartbeatRecord>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorHeartbeatRecord>(
- r#"
- SELECT * FROM supervisor_heartbeats
- WHERE supervisor_task_id = $1
- ORDER BY timestamp DESC
- LIMIT 1
- "#,
- )
- .bind(supervisor_task_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Get recent heartbeats for a supervisor task.
-pub async fn get_supervisor_heartbeats(
- pool: &PgPool,
- supervisor_task_id: Uuid,
- limit: i64,
-) -> Result<Vec<SupervisorHeartbeatRecord>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorHeartbeatRecord>(
- r#"
- SELECT * FROM supervisor_heartbeats
- WHERE supervisor_task_id = $1
- ORDER BY timestamp DESC
- LIMIT $2
- "#,
- )
- .bind(supervisor_task_id)
- .bind(limit)
- .fetch_all(pool)
- .await
-}
-
-/// Get recent heartbeats for a contract.
-pub async fn get_contract_supervisor_heartbeats(
- pool: &PgPool,
- contract_id: Uuid,
- limit: i64,
-) -> Result<Vec<SupervisorHeartbeatRecord>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorHeartbeatRecord>(
- r#"
- SELECT * FROM supervisor_heartbeats
- WHERE contract_id = $1
- ORDER BY timestamp DESC
- LIMIT $2
- "#,
- )
- .bind(contract_id)
- .bind(limit)
- .fetch_all(pool)
- .await
-}
-
-/// Delete old heartbeats beyond the TTL (24 hours by default).
-/// Returns the number of deleted records.
-pub async fn cleanup_old_heartbeats(
- pool: &PgPool,
- ttl_hours: i64,
-) -> Result<u64, sqlx::Error> {
- let result = sqlx::query(
- r#"
- DELETE FROM supervisor_heartbeats
- WHERE timestamp < NOW() - ($1 || ' hours')::INTERVAL
- "#,
- )
- .bind(ttl_hours.to_string())
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected())
-}
-
-/// Find supervisors that have not sent a heartbeat within the timeout period.
-/// Returns list of (supervisor_task_id, contract_id, last_heartbeat_timestamp).
-pub async fn find_stale_supervisors(
- pool: &PgPool,
- timeout_seconds: i64,
-) -> Result<Vec<(Uuid, Uuid, chrono::DateTime<Utc>)>, sqlx::Error> {
- let rows = sqlx::query(
- r#"
- WITH latest_heartbeats AS (
- SELECT DISTINCT ON (supervisor_task_id)
- supervisor_task_id,
- contract_id,
- timestamp
- FROM supervisor_heartbeats
- ORDER BY supervisor_task_id, timestamp DESC
- )
- SELECT
- lh.supervisor_task_id,
- lh.contract_id,
- lh.timestamp
- FROM latest_heartbeats lh
- JOIN tasks t ON t.id = lh.supervisor_task_id
- WHERE t.status = 'running'
- AND lh.timestamp < NOW() - ($1 || ' seconds')::INTERVAL
- "#,
- )
- .bind(timeout_seconds.to_string())
- .fetch_all(pool)
- .await?;
-
- let mut result = Vec::new();
- for row in rows {
- use sqlx::Row;
- let supervisor_task_id: Uuid = row.get("supervisor_task_id");
- let contract_id: Uuid = row.get("contract_id");
- let timestamp: chrono::DateTime<Utc> = row.get("timestamp");
- result.push((supervisor_task_id, contract_id, timestamp));
- }
- Ok(result)
-}
-
-// ============================================================================
-// Contract Supervisor
-// ============================================================================
-
-/// Update contract's supervisor task ID.
-pub async fn update_contract_supervisor(
- pool: &PgPool,
- contract_id: Uuid,
- supervisor_task_id: Uuid,
-) -> Result<Contract, sqlx::Error> {
- sqlx::query_as::<_, Contract>(
- r#"
- UPDATE contracts
- SET supervisor_task_id = $1,
- updated_at = NOW()
- WHERE id = $2
- RETURNING *
- "#,
- )
- .bind(supervisor_task_id)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Mark a deliverable as complete for a specific phase.
-/// Uses JSONB operations to append the deliverable_id to the phase's array.
-pub async fn mark_deliverable_complete(
- pool: &PgPool,
- contract_id: Uuid,
- phase: &str,
- deliverable_id: &str,
-) -> Result<Contract, sqlx::Error> {
- // Use jsonb_set to add the deliverable to the phase's array
- // If the phase key doesn't exist, create an empty array first
- // COALESCE handles the case where the phase array doesn't exist yet
- sqlx::query_as::<_, Contract>(
- r#"
- UPDATE contracts
- SET completed_deliverables = jsonb_set(
- completed_deliverables,
- ARRAY[$2::text],
- COALESCE(completed_deliverables->$2, '[]'::jsonb) || to_jsonb($3::text),
- true
- ),
- updated_at = NOW()
- WHERE id = $1
- AND NOT (COALESCE(completed_deliverables->$2, '[]'::jsonb) ? $3)
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(phase)
- .bind(deliverable_id)
- .fetch_one(pool)
- .await
-}
-
-/// Clear all completed deliverables for a specific phase.
-/// Used when phase changes or deliverables need to be reset.
-pub async fn clear_phase_deliverables(
- pool: &PgPool,
- contract_id: Uuid,
- phase: &str,
-) -> Result<Contract, sqlx::Error> {
- sqlx::query_as::<_, Contract>(
- r#"
- UPDATE contracts
- SET completed_deliverables = completed_deliverables - $2,
- updated_at = NOW()
- WHERE id = $1
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(phase)
- .fetch_one(pool)
- .await
-}
-
-/// Get the supervisor task for a contract.
-pub async fn get_contract_supervisor_task(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Option<Task>, sqlx::Error> {
- sqlx::query_as::<_, Task>(
- r#"
- SELECT t.* FROM tasks t
- JOIN contracts c ON c.supervisor_task_id = t.id
- WHERE c.id = $1
- "#,
- )
- .bind(contract_id)
- .fetch_optional(pool)
- .await
-}
-
-// ============================================================================
-// Task Tree Queries
-// ============================================================================
-
-/// Get full task tree for a contract.
-pub async fn get_contract_task_tree(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Vec<Task>, sqlx::Error> {
- sqlx::query_as::<_, Task>(
- r#"
- WITH RECURSIVE task_tree AS (
- -- Base case: root tasks (no parent)
- SELECT * FROM tasks
- WHERE contract_id = $1 AND parent_task_id IS NULL
- UNION ALL
- -- Recursive case: children of current level
- SELECT t.* FROM tasks t
- JOIN task_tree tt ON t.parent_task_id = tt.id
- )
- SELECT * FROM task_tree
- ORDER BY depth, created_at
- "#,
- )
- .bind(contract_id)
- .fetch_all(pool)
- .await
-}
-
-/// Get task tree from a specific root task.
-pub async fn get_task_tree(pool: &PgPool, root_task_id: Uuid) -> Result<Vec<Task>, sqlx::Error> {
- sqlx::query_as::<_, Task>(
- r#"
- WITH RECURSIVE task_tree AS (
- -- Base case: the root task
- SELECT * FROM tasks WHERE id = $1
- UNION ALL
- -- Recursive case: children of current level
- SELECT t.* FROM tasks t
- JOIN task_tree tt ON t.parent_task_id = tt.id
- )
- SELECT * FROM task_tree
- ORDER BY depth, created_at
- "#,
- )
- .bind(root_task_id)
- .fetch_all(pool)
- .await
-}
-
-// ============================================================================
// Daemon Selection
// ============================================================================
@@ -4578,107 +2470,27 @@ pub async fn cleanup_old_snapshots(
pub async fn record_history_event(
pool: &PgPool,
owner_id: Uuid,
- contract_id: Option<Uuid>,
task_id: Option<Uuid>,
event_type: &str,
event_subtype: Option<&str>,
- phase: Option<&str>,
event_data: serde_json::Value,
) -> Result<HistoryEvent, sqlx::Error> {
sqlx::query_as::<_, HistoryEvent>(
r#"
- INSERT INTO history_events (owner_id, contract_id, task_id, event_type, event_subtype, phase, event_data)
- VALUES ($1, $2, $3, $4, $5, $6, $7)
+ INSERT INTO history_events (owner_id, task_id, event_type, event_subtype, event_data)
+ VALUES ($1, $2, $3, $4, $5)
RETURNING *
"#
)
.bind(owner_id)
- .bind(contract_id)
.bind(task_id)
.bind(event_type)
.bind(event_subtype)
- .bind(phase)
.bind(event_data)
.fetch_one(pool)
.await
}
-/// Get contract history timeline
-pub async fn get_contract_history(
- pool: &PgPool,
- contract_id: Uuid,
- owner_id: Uuid,
- filters: &HistoryQueryFilters,
-) -> Result<(Vec<HistoryEvent>, i64), sqlx::Error> {
- let limit = filters.limit.unwrap_or(100);
-
- let mut query = String::from(
- "SELECT * FROM history_events WHERE contract_id = $1 AND owner_id = $2"
- );
- let mut count_query = String::from(
- "SELECT COUNT(*) FROM history_events WHERE contract_id = $1 AND owner_id = $2"
- );
-
- let mut param_count = 2;
-
- if filters.phase.is_some() {
- param_count += 1;
- query.push_str(&format!(" AND phase = ${}" , param_count));
- count_query.push_str(&format!(" AND phase = ${}", param_count));
- }
-
- if filters.from.is_some() {
- param_count += 1;
- query.push_str(&format!(" AND created_at >= ${}", param_count));
- count_query.push_str(&format!(" AND created_at >= ${}", param_count));
- }
-
- if filters.to.is_some() {
- param_count += 1;
- query.push_str(&format!(" AND created_at <= ${}", param_count));
- count_query.push_str(&format!(" AND created_at <= ${}", param_count));
- }
-
- query.push_str(" ORDER BY created_at DESC");
- query.push_str(&format!(" LIMIT {}", limit));
-
- // Build and execute the query dynamically
- let mut q = sqlx::query_as::<_, HistoryEvent>(&query)
- .bind(contract_id)
- .bind(owner_id);
-
- if let Some(ref phase) = filters.phase {
- q = q.bind(phase);
- }
- if let Some(ref from) = filters.from {
- q = q.bind(from);
- }
- if let Some(ref to) = filters.to {
- q = q.bind(to);
- }
-
- let events = q.fetch_all(pool).await?;
-
- // Get total count
- let mut cq = sqlx::query_scalar::<_, i64>(&count_query)
- .bind(contract_id)
- .bind(owner_id);
-
- if let Some(ref phase) = filters.phase {
- cq = cq.bind(phase);
- }
- if let Some(ref from) = filters.from {
- cq = cq.bind(from);
- }
- if let Some(ref to) = filters.to {
- cq = cq.bind(to);
- }
-
- let count = cq.fetch_one(pool).await?;
-
- Ok((events, count))
-}
-
/// Get task history
pub async fn get_task_history(
pool: &PgPool,
@@ -4825,13 +2637,6 @@ pub async fn get_task_conversation(
Ok(messages)
}
-/// Get supervisor conversation (from supervisor_states)
-pub async fn get_supervisor_conversation_full(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Option<SupervisorState>, sqlx::Error> {
- get_supervisor_state(pool, contract_id).await
-}
// =============================================================================
// Anonymous Task Cleanup Functions
@@ -4969,156 +2774,6 @@ pub async fn delete_checkpoint_patches_for_task(
Ok(result.rows_affected() as i64)
}
-// =============================================================================
-// Red Team Notifications
-// =============================================================================
-// =============================================================================
-// Supervisor Status API Helpers
-// =============================================================================
-
-/// Get supervisor status for a contract.
-/// Returns combined information from supervisor_states and tasks tables.
-pub async fn get_supervisor_status(
- pool: &PgPool,
- contract_id: Uuid,
- owner_id: Uuid,
-) -> Result<Option<SupervisorStatusInfo>, sqlx::Error> {
- // Query to get supervisor status by joining supervisor_states with tasks
- sqlx::query_as::<_, SupervisorStatusInfo>(
- r#"
- SELECT
- ss.task_id,
- COALESCE(t.status, 'unknown') as supervisor_state,
- ss.phase,
- t.progress_summary as current_activity,
- ss.pending_task_ids,
- ss.last_activity as last_heartbeat,
- t.status as task_status,
- t.daemon_id IS NOT NULL as is_running
- FROM supervisor_states ss
- JOIN tasks t ON t.id = ss.task_id
- WHERE ss.contract_id = $1
- AND t.owner_id = $2
- "#,
- )
- .bind(contract_id)
- .bind(owner_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Internal struct to hold supervisor status query result
-#[derive(Debug, Clone, sqlx::FromRow)]
-pub struct SupervisorStatusInfo {
- pub task_id: Uuid,
- pub supervisor_state: String,
- pub phase: String,
- pub current_activity: Option<String>,
- #[sqlx(try_from = "Vec<Uuid>")]
- pub pending_task_ids: Vec<Uuid>,
- pub last_heartbeat: chrono::DateTime<chrono::Utc>,
- pub task_status: String,
- pub is_running: bool,
-}
-
-/// Get supervisor activity history from history_events table.
-/// This provides a timeline of supervisor activities that can serve as "heartbeats".
-pub async fn get_supervisor_activity_history(
- pool: &PgPool,
- contract_id: Uuid,
- limit: i32,
- offset: i32,
-) -> Result<Vec<SupervisorActivityEntry>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorActivityEntry>(
- r#"
- SELECT
- created_at as timestamp,
- COALESCE(event_subtype, 'activity') as state,
- event_data->>'activity' as activity,
- (event_data->>'progress')::INTEGER as progress,
- COALESCE(phase, 'unknown') as phase,
- CASE
- WHEN event_data->'pending_task_ids' IS NOT NULL
- THEN ARRAY(SELECT jsonb_array_elements_text(event_data->'pending_task_ids'))::UUID[]
- ELSE ARRAY[]::UUID[]
- END as pending_task_ids
- FROM history_events
- WHERE contract_id = $1
- AND event_type IN ('supervisor', 'phase', 'task')
- ORDER BY created_at DESC
- LIMIT $2 OFFSET $3
- "#,
- )
- .bind(contract_id)
- .bind(limit)
- .bind(offset)
- .fetch_all(pool)
- .await
-}
-
-/// Internal struct to hold supervisor activity entry
-#[derive(Debug, Clone, sqlx::FromRow)]
-pub struct SupervisorActivityEntry {
- pub timestamp: chrono::DateTime<chrono::Utc>,
- pub state: String,
- pub activity: Option<String>,
- pub progress: Option<i32>,
- pub phase: String,
- #[sqlx(try_from = "Vec<Uuid>")]
- pub pending_task_ids: Vec<Uuid>,
-}
-
-/// Count total supervisor activity history entries for a contract.
-pub async fn count_supervisor_activity_history(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<i64, sqlx::Error> {
- let result: (i64,) = sqlx::query_as(
- r#"
- SELECT COUNT(*)
- FROM history_events
- WHERE contract_id = $1
- AND event_type IN ('supervisor', 'phase', 'task')
- "#,
- )
- .bind(contract_id)
- .fetch_one(pool)
- .await?;
- Ok(result.0)
-}
-
-/// Update supervisor state last_activity timestamp.
-/// This acts as a "sync" operation to refresh the supervisor's heartbeat.
-pub async fn sync_supervisor_state(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Option<SupervisorState>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $1
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .fetch_optional(pool)
- .await
-}
-
-// =============================================================================
-// Helper Functions
-// =============================================================================
-
-/// Helper to truncate string to max length
-fn truncate_string(s: &str, max_len: usize) -> String {
- if s.len() <= max_len {
- s.to_string()
- } else {
- format!("{}...", &s[..max_len - 3])
- }
-}
// =============================================================================
// Directive CRUD
@@ -7031,37 +4686,6 @@ pub async fn get_running_steps_with_tasks(
.await
}
-/// A running step backed by a contract, joined with the contract's current status.
-#[derive(Debug, Clone, sqlx::FromRow)]
-pub struct RunningStepWithContract {
- pub step_id: Uuid,
- pub directive_id: Uuid,
- pub contract_id: Uuid,
- pub contract_status: String,
- pub contract_phase: String,
-}
-
-/// Get running steps that are backed by contracts (for contract-based monitoring).
-pub async fn get_running_steps_with_contracts(
- pool: &PgPool,
-) -> Result<Vec<RunningStepWithContract>, sqlx::Error> {
- sqlx::query_as::<_, RunningStepWithContract>(
- r#"
- SELECT
- ds.id AS step_id,
- ds.directive_id,
- ds.contract_id AS "contract_id!",
- c.status AS contract_status,
- c.phase AS contract_phase
- FROM directive_steps ds
- JOIN contracts c ON c.id = ds.contract_id
- WHERE ds.status = 'running'
- AND ds.contract_id IS NOT NULL
- "#,
- )
- .fetch_all(pool)
- .await
-}
/// An orchestrator task to check (directive with pending planning task).
#[derive(Debug, Clone, sqlx::FromRow)]
@@ -7221,25 +4845,6 @@ pub async fn link_task_to_step(
Ok(())
}
-/// Link a contract to a directive step.
-pub async fn link_contract_to_step(
- pool: &PgPool,
- step_id: Uuid,
- contract_id: Uuid,
-) -> Result<(), sqlx::Error> {
- sqlx::query(
- r#"
- UPDATE directive_steps
- SET contract_id = $1
- WHERE id = $2
- "#,
- )
- .bind(contract_id)
- .bind(step_id)
- .execute(pool)
- .await?;
- Ok(())
-}
/// Set a step to 'running' status (after its task has been dispatched).
pub async fn set_step_running(