summaryrefslogtreecommitdiff
path: root/makima/src/db/repository.rs
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-05-18 01:21:30 +0100
committerGitHub <noreply@github.com>2026-05-18 01:21:30 +0100
commitf240675da99bc7705e473b8f70a2628812aa4c10 (patch)
tree3ee2d24b431ccb8cd1a3013c86b34a5782a3e224 /makima/src/db/repository.rs
parent0d996cf7590e3e52f424859c7d6f0e68640f119e (diff)
downloadsoryu-master.tar.gz
soryu-master.zip
chore: drop legacy contracts + supervisor task-grouping (#136)HEADmaster
The contracts table, supervisor task type, and all their backing machinery have been inert for several PRs. The directives system reads its own active contract body for spec text, and PR #135 removed the last LLM surface that spawned supervisors. This PR wipes the dead surface in one shot — the user authorised a DB wipe, so the migration drops every legacy table with CASCADE rather than carrying forward stub rows. Net change: −12k LOC across handlers, repository, state, models, the TUI, and the listen module. What's gone: - contracts, contract_chat_*, contract_events, contract_repositories, contract_type_templates tables. - supervisor_states, supervisor_heartbeats tables. - mesh_chat_conversations, mesh_chat_messages tables. - tasks.contract_id/is_supervisor/supervisor_task_id/supervisor_worktree_task_id columns. - directive_steps.contract_id/contract_type columns. - files.contract_id/contract_phase columns. - history_events.contract_id/phase columns. - The Contract/Supervisor/MeshChat handler + model + repository surface, plus the daemon TUI views that read them. - The standalone listen.rs websocket handler (orphaned with the LLM). What stays: - mesh_supervisor handler: trimmed to just the questions + orders backchannel used by `makima directive ask` / `create-order` (kept the URL prefix for CLI client compat). - directive_documents (the user-facing "contracts" surface). - pending_questions in-memory state for the directive Ask flow. cargo check, cargo test --lib (68 passed), tsc, and vite build all clean. Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Diffstat (limited to 'makima/src/db/repository.rs')
-rw-r--r--makima/src/db/repository.rs2537
1 files changed, 71 insertions, 2466 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index ee4b561..d453f99 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -6,21 +6,17 @@ use sqlx::PgPool;
use uuid::Uuid;
use super::models::{
- CheckpointPatch, CheckpointPatchInfo, Contract, ContractChatConversation,
- ContractChatMessageRecord, ContractEvent, ContractRepository, ContractSummary,
- ContractTypeTemplateRecord, ConversationMessage, ConversationSnapshot,
- CreateContractRequest, CreateFileRequest, CreateTaskRequest,
- CreateTemplateRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity,
- DeliverableDefinition, Directive, DirectiveDocument, DirectiveStep, DirectiveSummary,
+ CheckpointPatch, CheckpointPatchInfo, ConversationMessage, ConversationSnapshot,
+ CreateFileRequest, CreateTaskRequest,
+ Daemon, DaemonTaskAssignment, DaemonWithCapacity,
+ Directive, DirectiveDocument, DirectiveStep, DirectiveSummary,
CreateDirectiveRequest, CreateDirectiveStepRequest,
UpdateDirectiveRequest, UpdateDirectiveStepRequest,
CreateOrderRequest, Order, UpdateOrderRequest,
CreateDirectiveOrderGroupRequest, DirectiveOrderGroup, UpdateDirectiveOrderGroupRequest,
File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters,
- MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult, PhaseConfig,
- PhaseDefinition, SupervisorHeartbeatRecord, SupervisorState,
- Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest,
- UpdateFileRequest, UpdateTaskRequest, UpdateTemplateRequest,
+ Task, TaskCheckpoint, TaskEvent, TaskSummary,
+ UpdateFileRequest, UpdateTaskRequest,
};
/// Repository error types.
@@ -89,7 +85,7 @@ pub async fn create_file(pool: &PgPool, req: InternalCreateFileRequest) -> Resul
r#"
INSERT INTO files (name, description, transcript, location, summary, body)
VALUES ($1, $2, $3, $4, NULL, $5)
- RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(&name)
@@ -105,7 +101,7 @@ pub async fn create_file(pool: &PgPool, req: InternalCreateFileRequest) -> Resul
pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
- SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
WHERE id = $1
"#,
@@ -119,7 +115,7 @@ pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Err
pub async fn list_files(pool: &PgPool) -> Result<Vec<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
- SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
ORDER BY created_at DESC
"#,
@@ -171,7 +167,7 @@ pub async fn update_file(
UPDATE files
SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW()
WHERE id = $1 AND version = $7
- RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(id)
@@ -190,7 +186,7 @@ pub async fn update_file(
UPDATE files
SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW()
WHERE id = $1
- RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(id)
@@ -246,7 +242,6 @@ pub async fn count_files(pool: &PgPool) -> Result<i64, sqlx::Error> {
// =============================================================================
/// Create a new file record for a specific owner.
-/// Files must belong to a contract - the contract_id is required and the phase is looked up.
pub async fn create_file_for_owner(
pool: &PgPool,
owner_id: Uuid,
@@ -254,32 +249,16 @@ pub async fn create_file_for_owner(
) -> Result<File, sqlx::Error> {
let name = req.name.unwrap_or_else(generate_default_name);
let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default();
- // Use body from request (may be empty or contain template elements)
let body_json = serde_json::to_value(&req.body).unwrap_or_default();
- // Use provided contract_phase, or look up from contract's current phase
- let contract_phase: Option<String> = if req.contract_phase.is_some() {
- req.contract_phase
- } else {
- sqlx::query_scalar(
- "SELECT phase FROM contracts WHERE id = $1 AND owner_id = $2",
- )
- .bind(req.contract_id)
- .bind(owner_id)
- .fetch_optional(pool)
- .await?
- };
-
sqlx::query_as::<_, File>(
r#"
- INSERT INTO files (owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, repo_file_path)
- VALUES ($1, $2, $3, $4, $5, $6, $7, NULL, $8, $9)
- RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ INSERT INTO files (owner_id, name, description, transcript, location, summary, body, repo_file_path)
+ VALUES ($1, $2, $3, $4, $5, NULL, $6, $7)
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(owner_id)
- .bind(req.contract_id)
- .bind(&contract_phase)
.bind(&name)
.bind(&req.description)
.bind(&transcript_json)
@@ -298,7 +277,7 @@ pub async fn get_file_for_owner(
) -> Result<Option<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
- SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
WHERE id = $1 AND owner_id = $2
"#,
@@ -313,7 +292,7 @@ pub async fn get_file_for_owner(
pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
- SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
WHERE owner_id = $1
ORDER BY created_at DESC
@@ -324,13 +303,10 @@ pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<F
.await
}
-/// Database row type for file summary with contract info
+/// Database row type for file summary
#[derive(Debug, sqlx::FromRow)]
struct FileSummaryRow {
id: Uuid,
- contract_id: Option<Uuid>,
- contract_name: Option<String>,
- contract_phase: Option<String>,
name: String,
description: Option<String>,
#[sqlx(json)]
@@ -342,7 +318,7 @@ struct FileSummaryRow {
updated_at: chrono::DateTime<chrono::Utc>,
}
-/// List file summaries for an owner with contract info (joined).
+/// List file summaries for an owner.
pub async fn list_file_summaries_for_owner(
pool: &PgPool,
owner_id: Uuid,
@@ -350,11 +326,9 @@ pub async fn list_file_summaries_for_owner(
let rows = sqlx::query_as::<_, FileSummaryRow>(
r#"
SELECT
- f.id, f.contract_id, c.name as contract_name, f.contract_phase,
- f.name, f.description, f.transcript, f.version,
+ f.id, f.name, f.description, f.transcript, f.version,
f.repo_file_path, f.repo_sync_status, f.created_at, f.updated_at
FROM files f
- LEFT JOIN contracts c ON f.contract_id = c.id
WHERE f.owner_id = $1
ORDER BY f.created_at DESC
"#,
@@ -373,9 +347,6 @@ pub async fn list_file_summaries_for_owner(
.fold(0.0_f32, f32::max);
FileSummary {
id: row.id,
- contract_id: row.contract_id,
- contract_name: row.contract_name,
- contract_phase: row.contract_phase,
name: row.name,
description: row.description,
transcript_count: row.transcript.len(),
@@ -429,7 +400,7 @@ pub async fn update_file_for_owner(
UPDATE files
SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
WHERE id = $1 AND owner_id = $2 AND version = $8
- RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(id)
@@ -449,7 +420,7 @@ pub async fn update_file_for_owner(
UPDATE files
SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
WHERE id = $1 AND owner_id = $2
- RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(id)
@@ -657,34 +628,24 @@ pub async fn count_file_versions(pool: &PgPool, file_id: Uuid) -> Result<i64, sq
/// Task spawning is now controlled by supervisors at the application level.
/// Depth is no longer constrained in the database.
pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, sqlx::Error> {
- // Calculate depth and inherit settings from parent if applicable
- let (depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
+ // Calculate depth + inherit settings from parent if applicable.
+ let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
if let Some(parent_id) = req.parent_task_id {
- // Fetch parent task to get depth and inherit settings
let parent = get_task(pool, parent_id).await?
.ok_or_else(|| sqlx::Error::RowNotFound)?;
let new_depth = parent.depth + 1;
-
- // Subtasks inherit contract_id from parent (or use request contract_id if parent has none)
- let contract_id = parent.contract_id.or(req.contract_id);
-
- // Inherit repo settings if not provided
let repo_url = req.repository_url.clone().or(parent.repository_url);
let base_branch = req.base_branch.clone().or(parent.base_branch);
let target_branch = req.target_branch.clone().or(parent.target_branch);
let merge_mode = req.merge_mode.clone().or(parent.merge_mode);
let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path);
- // NOTE: completion_action is NOT inherited - subtasks should not auto-merge.
- // The supervisor integrates subtask work from their worktrees.
let completion_action = req.completion_action.clone();
- (new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
+ (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
} else {
- // Top-level task: depth 0, use contract_id from request (may be None for branched tasks)
(
0,
- req.contract_id,
req.repository_url.clone(),
req.base_branch.clone(),
req.target_branch.clone(),
@@ -699,23 +660,21 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task,
sqlx::query_as::<_, Task>(
r#"
INSERT INTO tasks (
- contract_id, parent_task_id, depth, name, description, plan, priority,
- is_supervisor, repository_url, base_branch, target_branch, merge_mode,
+ parent_task_id, depth, name, description, plan, priority,
+ repository_url, base_branch, target_branch, merge_mode,
target_repo_path, completion_action, continue_from_task_id, copy_files,
- branched_from_task_id, conversation_state, supervisor_worktree_task_id
+ branched_from_task_id, conversation_state
)
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19)
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
RETURNING *
"#,
)
- .bind(contract_id)
.bind(req.parent_task_id)
.bind(depth)
.bind(&req.name)
.bind(&req.description)
.bind(&req.plan)
.bind(req.priority)
- .bind(req.is_supervisor)
.bind(&repo_url)
.bind(&base_branch)
.bind(&target_branch)
@@ -726,7 +685,6 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task,
.bind(&copy_files_json)
.bind(&req.branched_from_task_id)
.bind(&req.conversation_history)
- .bind(&req.supervisor_worktree_task_id)
.fetch_one(pool)
.await
}
@@ -751,14 +709,12 @@ pub async fn list_tasks(pool: &PgPool) -> Result<Vec<TaskSummary>, sqlx::Error>
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
+ t.id,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
+ t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id IS NULL AND COALESCE(t.hidden, false) = false
ORDER BY t.priority DESC, t.created_at DESC
"#,
@@ -772,14 +728,12 @@ pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSum
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
+ t.id,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
+ t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id = $1
ORDER BY t.priority DESC, t.created_at DESC
"#,
@@ -790,73 +744,6 @@ pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSum
}
/// List all tasks in a contract (for supervisor tree view).
-pub async fn list_tasks_by_contract(
- pool: &PgPool,
- contract_id: Uuid,
- owner_id: Uuid,
-) -> Result<Vec<Task>, sqlx::Error> {
- sqlx::query_as::<_, Task>(
- r#"
- SELECT * FROM tasks
- WHERE contract_id = $1 AND owner_id = $2
- ORDER BY is_supervisor DESC, depth ASC, created_at ASC
- "#,
- )
- .bind(contract_id)
- .bind(owner_id)
- .fetch_all(pool)
- .await
-}
-
-/// Get pending tasks for a contract (non-supervisor tasks only).
-/// Includes tasks that were interrupted (retry candidates).
-/// Prioritizes interrupted tasks and excludes those that exceeded max_retries.
-pub async fn get_pending_tasks_for_contract(
- pool: &PgPool,
- contract_id: Uuid,
- owner_id: Uuid,
-) -> Result<Vec<Task>, sqlx::Error> {
- sqlx::query_as::<_, Task>(
- r#"
- SELECT t.* FROM tasks t
- WHERE t.contract_id = $1 AND t.owner_id = $2
- AND t.status = 'pending'
- AND t.retry_count < t.max_retries
- AND t.is_supervisor = false
- ORDER BY
- t.interrupted_at DESC NULLS LAST,
- t.priority DESC,
- t.created_at ASC
- "#,
- )
- .bind(contract_id)
- .bind(owner_id)
- .fetch_all(pool)
- .await
-}
-
-/// Get all contracts that have pending tasks awaiting retry.
-/// Returns tuples of (contract_id, owner_id) for contracts with retryable tasks.
-pub async fn get_all_pending_task_contracts(
- pool: &PgPool,
-) -> Result<Vec<(Uuid, Uuid)>, sqlx::Error> {
- sqlx::query_as::<_, (Uuid, Uuid)>(
- r#"
- SELECT DISTINCT t.contract_id, t.owner_id
- FROM tasks t
- WHERE t.contract_id IS NOT NULL
- AND t.status = 'pending'
- AND t.retry_count < t.max_retries
- AND t.is_supervisor = false
- ORDER BY t.owner_id, t.contract_id
- "#,
- )
- .fetch_all(pool)
- .await
-}
-
-/// Mark a task as pending for retry after daemon failure.
-/// Increments retry count and adds the failed daemon to exclusion list.
pub async fn mark_task_for_retry(
pool: &PgPool,
task_id: Uuid,
@@ -1061,16 +948,13 @@ pub async fn create_task_for_owner(
owner_id: Uuid,
req: CreateTaskRequest,
) -> Result<Task, sqlx::Error> {
- // Calculate depth and inherit settings from parent if applicable
- let (depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
+ // Calculate depth + inherit settings from parent if applicable.
+ let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
if let Some(parent_id) = req.parent_task_id {
- // Fetch parent task to get depth and inherit settings (must belong to same owner)
let parent = get_task_for_owner(pool, parent_id, owner_id).await?
.ok_or_else(|| sqlx::Error::RowNotFound)?;
let new_depth = parent.depth + 1;
-
- // Validate max depth
if new_depth >= 2 {
return Err(sqlx::Error::Protocol(format!(
"Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.",
@@ -1078,25 +962,17 @@ pub async fn create_task_for_owner(
)));
}
- // Subtasks inherit contract_id from parent (or use request contract_id if parent has none)
- let contract_id = parent.contract_id.or(req.contract_id);
-
- // Inherit repo settings if not provided
let repo_url = req.repository_url.clone().or(parent.repository_url);
let base_branch = req.base_branch.clone().or(parent.base_branch);
let target_branch = req.target_branch.clone().or(parent.target_branch);
let merge_mode = req.merge_mode.clone().or(parent.merge_mode);
let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path);
- // NOTE: completion_action is NOT inherited - subtasks should not auto-merge.
- // The orchestrator integrates subtask work from their worktrees.
let completion_action = req.completion_action.clone();
- (new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
+ (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
} else {
- // Top-level task: depth 0, use contract_id from request (may be None for branched tasks)
(
0,
- req.contract_id,
req.repository_url.clone(),
req.base_branch.clone(),
req.target_branch.clone(),
@@ -1108,14 +984,11 @@ pub async fn create_task_for_owner(
let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default());
- // Resolve the directive_document_id. Tasks plumbed through this builder
- // currently have no way to specify a document explicitly (we don't want
- // to widen `CreateTaskRequest` for this — every call site would have to
- // change). Instead, when the task is directive-driven (directive_id is
- // set) we attach it to that directive's most recently-updated active
- // document so the task lands under that document's tasks/ subfolder in
- // the sidebar. Resolution failures are non-fatal — the task still gets
- // created with directive_document_id = NULL, matching legacy behaviour.
+ // Resolve directive_document_id from the directive's currently-
+ // active contract row (directive_documents table) so the task
+ // lands under the right tasks/ subfolder in the sidebar. Failures
+ // are non-fatal — the task is created with NULL document_id and
+ // the sidebar tolerates that.
let directive_document_id = match req.directive_id {
Some(directive_id) => resolve_active_document_for_directive(pool, directive_id)
.await
@@ -1126,25 +999,23 @@ pub async fn create_task_for_owner(
sqlx::query_as::<_, Task>(
r#"
INSERT INTO tasks (
- owner_id, contract_id, parent_task_id, depth, name, description, plan, priority,
- is_supervisor, repository_url, base_branch, target_branch, merge_mode,
+ owner_id, parent_task_id, depth, name, description, plan, priority,
+ repository_url, base_branch, target_branch, merge_mode,
target_repo_path, completion_action, continue_from_task_id, copy_files,
- branched_from_task_id, conversation_state, supervisor_worktree_task_id,
+ branched_from_task_id, conversation_state,
directive_id, directive_step_id, directive_document_id
)
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23)
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20)
RETURNING *
"#,
)
.bind(owner_id)
- .bind(contract_id)
.bind(req.parent_task_id)
.bind(depth)
.bind(&req.name)
.bind(&req.description)
.bind(&req.plan)
.bind(req.priority)
- .bind(req.is_supervisor)
.bind(&repo_url)
.bind(&base_branch)
.bind(&target_branch)
@@ -1155,7 +1026,6 @@ pub async fn create_task_for_owner(
.bind(&copy_files_json)
.bind(&req.branched_from_task_id)
.bind(&req.conversation_history)
- .bind(&req.supervisor_worktree_task_id)
.bind(&req.directive_id)
.bind(&req.directive_step_id)
.bind(&directive_document_id)
@@ -1226,14 +1096,12 @@ pub async fn list_tasks_for_owner(
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
+ t.id,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
+ t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.owner_id = $1 AND t.parent_task_id IS NULL AND COALESCE(t.hidden, false) = false
ORDER BY t.priority DESC, t.created_at DESC
"#,
@@ -1335,14 +1203,12 @@ pub async fn list_ephemeral_directive_tasks_for_owner(
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
+ t.id,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
+ t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.owner_id = $1
AND t.directive_id = $2
AND t.directive_step_id IS NULL
@@ -1369,14 +1235,12 @@ pub async fn list_tmp_tasks_for_owner(
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
+ t.id,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
+ t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.owner_id = $1
AND t.directive_id = $2
AND t.parent_task_id IS NULL
@@ -1399,14 +1263,12 @@ pub async fn list_subtasks_for_owner(
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
+ t.id,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
+ t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.owner_id = $1 AND t.parent_task_id = $2
ORDER BY t.priority DESC, t.created_at DESC
"#,
@@ -1920,14 +1782,12 @@ pub async fn list_sibling_tasks(
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
+ t.id,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
+ t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id = $1 AND t.id != $2
ORDER BY t.priority DESC, t.created_at DESC
"#,
@@ -1942,14 +1802,12 @@ pub async fn list_sibling_tasks(
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
+ t.id,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
+ t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id IS NULL AND t.id != $1
ORDER BY t.priority DESC, t.created_at DESC
"#,
@@ -2121,1287 +1979,28 @@ pub async fn complete_task(
Ok(task)
}
-// =============================================================================
-// Mesh Chat History Functions
-// =============================================================================
-
-/// Get or create the active conversation for an owner.
-pub async fn get_or_create_active_conversation(
- pool: &PgPool,
- owner_id: Uuid,
-) -> Result<MeshChatConversation, sqlx::Error> {
- // Try to get existing active conversation for this owner
- let existing = sqlx::query_as::<_, MeshChatConversation>(
- r#"
- SELECT *
- FROM mesh_chat_conversations
- WHERE is_active = true AND owner_id = $1
- LIMIT 1
- "#,
- )
- .bind(owner_id)
- .fetch_optional(pool)
- .await?;
-
- if let Some(conv) = existing {
- return Ok(conv);
- }
-
- // Create new conversation
- sqlx::query_as::<_, MeshChatConversation>(
- r#"
- INSERT INTO mesh_chat_conversations (owner_id, is_active)
- VALUES ($1, true)
- RETURNING *
- "#,
- )
- .bind(owner_id)
- .fetch_one(pool)
- .await
-}
-
-/// List messages for a conversation.
-pub async fn list_chat_messages(
- pool: &PgPool,
- conversation_id: Uuid,
- limit: Option<i32>,
-) -> Result<Vec<MeshChatMessageRecord>, sqlx::Error> {
- let limit = limit.unwrap_or(100);
- sqlx::query_as::<_, MeshChatMessageRecord>(
- r#"
- SELECT *
- FROM mesh_chat_messages
- WHERE conversation_id = $1
- ORDER BY created_at ASC
- LIMIT $2
- "#,
- )
- .bind(conversation_id)
- .bind(limit)
- .fetch_all(pool)
- .await
-}
-
-/// Add a message to a conversation.
-#[allow(clippy::too_many_arguments)]
-pub async fn add_chat_message(
- pool: &PgPool,
- conversation_id: Uuid,
- role: &str,
- content: &str,
- context_type: &str,
- context_task_id: Option<Uuid>,
- tool_calls: Option<serde_json::Value>,
- pending_questions: Option<serde_json::Value>,
-) -> Result<MeshChatMessageRecord, sqlx::Error> {
- sqlx::query_as::<_, MeshChatMessageRecord>(
- r#"
- INSERT INTO mesh_chat_messages
- (conversation_id, role, content, context_type, context_task_id, tool_calls, pending_questions)
- VALUES ($1, $2, $3, $4, $5, $6, $7)
- RETURNING *
- "#,
- )
- .bind(conversation_id)
- .bind(role)
- .bind(content)
- .bind(context_type)
- .bind(context_task_id)
- .bind(tool_calls)
- .bind(pending_questions)
- .fetch_one(pool)
- .await
-}
-
-/// Clear conversation (archive existing and create new).
-pub async fn clear_conversation(pool: &PgPool, owner_id: Uuid) -> Result<MeshChatConversation, sqlx::Error> {
- // Mark existing as inactive for this owner
- sqlx::query(
- r#"
- UPDATE mesh_chat_conversations
- SET is_active = false, updated_at = NOW()
- WHERE is_active = true AND owner_id = $1
- "#,
- )
- .bind(owner_id)
- .execute(pool)
- .await?;
-
- // Create new active conversation
- get_or_create_active_conversation(pool, owner_id).await
-}
-
-// =============================================================================
-// Contract Chat History Functions
-// =============================================================================
-
-/// Get or create the active conversation for a contract.
-pub async fn get_or_create_contract_conversation(
- pool: &PgPool,
- contract_id: Uuid,
- owner_id: Uuid,
-) -> Result<ContractChatConversation, sqlx::Error> {
- // Try to get existing active conversation for this contract
- let existing = sqlx::query_as::<_, ContractChatConversation>(
- r#"
- SELECT *
- FROM contract_chat_conversations
- WHERE is_active = true AND contract_id = $1 AND owner_id = $2
- LIMIT 1
- "#,
- )
- .bind(contract_id)
- .bind(owner_id)
- .fetch_optional(pool)
- .await?;
-
- if let Some(conv) = existing {
- return Ok(conv);
- }
-
- // Create new conversation
- sqlx::query_as::<_, ContractChatConversation>(
- r#"
- INSERT INTO contract_chat_conversations (contract_id, owner_id, is_active)
- VALUES ($1, $2, true)
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(owner_id)
- .fetch_one(pool)
- .await
-}
-
-/// List messages for a contract conversation.
-pub async fn list_contract_chat_messages(
- pool: &PgPool,
- conversation_id: Uuid,
- limit: Option<i32>,
-) -> Result<Vec<ContractChatMessageRecord>, sqlx::Error> {
- let limit = limit.unwrap_or(100);
- sqlx::query_as::<_, ContractChatMessageRecord>(
- r#"
- SELECT *
- FROM contract_chat_messages
- WHERE conversation_id = $1
- ORDER BY created_at ASC
- LIMIT $2
- "#,
- )
- .bind(conversation_id)
- .bind(limit)
- .fetch_all(pool)
- .await
-}
-
-/// Add a message to a contract conversation.
-pub async fn add_contract_chat_message(
- pool: &PgPool,
- conversation_id: Uuid,
- role: &str,
- content: &str,
- tool_calls: Option<serde_json::Value>,
- pending_questions: Option<serde_json::Value>,
-) -> Result<ContractChatMessageRecord, sqlx::Error> {
- sqlx::query_as::<_, ContractChatMessageRecord>(
- r#"
- INSERT INTO contract_chat_messages
- (conversation_id, role, content, tool_calls, pending_questions)
- VALUES ($1, $2, $3, $4, $5)
- RETURNING *
- "#,
- )
- .bind(conversation_id)
- .bind(role)
- .bind(content)
- .bind(tool_calls)
- .bind(pending_questions)
- .fetch_one(pool)
- .await
-}
-
-/// Clear contract conversation (archive existing and create new).
-pub async fn clear_contract_conversation(
- pool: &PgPool,
- contract_id: Uuid,
- owner_id: Uuid,
-) -> Result<ContractChatConversation, sqlx::Error> {
- // Mark existing as inactive for this contract
- sqlx::query(
- r#"
- UPDATE contract_chat_conversations
- SET is_active = false, updated_at = NOW()
- WHERE is_active = true AND contract_id = $1 AND owner_id = $2
- "#,
- )
- .bind(contract_id)
- .bind(owner_id)
- .execute(pool)
- .await?;
-
- // Create new active conversation
- get_or_create_contract_conversation(pool, contract_id, owner_id).await
-}
-
-// =============================================================================
-// Contract Type Template Functions (Owner-Scoped)
-// =============================================================================
-
-/// Create a new contract type template for a specific owner.
-pub async fn create_template_for_owner(
- pool: &PgPool,
- owner_id: Uuid,
- req: CreateTemplateRequest,
-) -> Result<ContractTypeTemplateRecord, sqlx::Error> {
- sqlx::query_as::<_, ContractTypeTemplateRecord>(
- r#"
- INSERT INTO contract_type_templates (owner_id, name, description, phases, default_phase, deliverables)
- VALUES ($1, $2, $3, $4, $5, $6)
- RETURNING *
- "#,
- )
- .bind(owner_id)
- .bind(&req.name)
- .bind(&req.description)
- .bind(serde_json::to_value(&req.phases).unwrap_or_default())
- .bind(&req.default_phase)
- .bind(match &req.deliverables {
- Some(d) => serde_json::to_value(d).ok(),
- None => None,
- })
- .fetch_one(pool)
- .await
-}
-
-/// Get a contract type template by ID, scoped to owner.
-pub async fn get_template_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
-) -> Result<Option<ContractTypeTemplateRecord>, sqlx::Error> {
- sqlx::query_as::<_, ContractTypeTemplateRecord>(
- r#"
- SELECT *
- FROM contract_type_templates
- WHERE id = $1 AND owner_id = $2
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Get a contract type template by ID (internal use, no owner scoping).
-pub async fn get_template_by_id(
- pool: &PgPool,
- id: Uuid,
-) -> Result<Option<ContractTypeTemplateRecord>, sqlx::Error> {
- sqlx::query_as::<_, ContractTypeTemplateRecord>(
- r#"
- SELECT *
- FROM contract_type_templates
- WHERE id = $1
- "#,
- )
- .bind(id)
- .fetch_optional(pool)
- .await
-}
-
-/// List all contract type templates for an owner, ordered by name.
-pub async fn list_templates_for_owner(
- pool: &PgPool,
- owner_id: Uuid,
-) -> Result<Vec<ContractTypeTemplateRecord>, sqlx::Error> {
- sqlx::query_as::<_, ContractTypeTemplateRecord>(
- r#"
- SELECT *
- FROM contract_type_templates
- WHERE owner_id = $1
- ORDER BY name ASC
- "#,
- )
- .bind(owner_id)
- .fetch_all(pool)
- .await
-}
-
-/// Update a contract type template for an owner.
-pub async fn update_template_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
- req: UpdateTemplateRequest,
-) -> Result<Option<ContractTypeTemplateRecord>, RepositoryError> {
- // Build dynamic update query
- let mut query = String::from("UPDATE contract_type_templates SET updated_at = NOW()");
- let mut param_idx = 3; // $1 = id, $2 = owner_id
-
- if req.name.is_some() {
- query.push_str(&format!(", name = ${}", param_idx));
- param_idx += 1;
- }
- if req.description.is_some() {
- query.push_str(&format!(", description = ${}", param_idx));
- param_idx += 1;
- }
- if req.phases.is_some() {
- query.push_str(&format!(", phases = ${}", param_idx));
- param_idx += 1;
- }
- if req.default_phase.is_some() {
- query.push_str(&format!(", default_phase = ${}", param_idx));
- param_idx += 1;
- }
- if req.deliverables.is_some() {
- query.push_str(&format!(", deliverables = ${}", param_idx));
- param_idx += 1;
- }
-
- // Optimistic locking
- if req.version.is_some() {
- query.push_str(&format!(", version = version + 1 WHERE id = $1 AND owner_id = $2 AND version = ${}", param_idx));
- } else {
- query.push_str(", version = version + 1 WHERE id = $1 AND owner_id = $2");
- }
- query.push_str(" RETURNING *");
-
- let mut sql_query = sqlx::query_as::<_, ContractTypeTemplateRecord>(&query);
- sql_query = sql_query.bind(id).bind(owner_id);
-
- if let Some(ref name) = req.name {
- sql_query = sql_query.bind(name);
- }
- if let Some(ref description) = req.description {
- sql_query = sql_query.bind(description);
- }
- if let Some(ref phases) = req.phases {
- sql_query = sql_query.bind(serde_json::to_value(phases).unwrap_or_default());
- }
- if let Some(ref default_phase) = req.default_phase {
- sql_query = sql_query.bind(default_phase);
- }
- if let Some(ref deliverables) = req.deliverables {
- sql_query = sql_query.bind(serde_json::to_value(deliverables).unwrap_or_default());
- }
- if let Some(version) = req.version {
- sql_query = sql_query.bind(version);
- }
-
- match sql_query.fetch_optional(pool).await {
- Ok(result) => {
- if result.is_none() && req.version.is_some() {
- // Check if it's a version conflict
- if let Some(current) = get_template_for_owner(pool, id, owner_id).await? {
- return Err(RepositoryError::VersionConflict {
- expected: req.version.unwrap(),
- actual: current.version,
- });
- }
- }
- Ok(result)
- }
- Err(e) => Err(RepositoryError::Database(e)),
- }
-}
-
-/// Delete a contract type template for an owner.
-pub async fn delete_template_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
-) -> Result<bool, sqlx::Error> {
- let result = sqlx::query(
- r#"
- DELETE FROM contract_type_templates
- WHERE id = $1 AND owner_id = $2
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected() > 0)
-}
-
-/// Helper function to build PhaseConfig from a template.
-pub fn build_phase_config_from_template(template: &ContractTypeTemplateRecord) -> PhaseConfig {
- PhaseConfig {
- phases: template.phases.clone(),
- default_phase: template.default_phase.clone(),
- deliverables: template.deliverables.clone().unwrap_or_default(),
- }
-}
-
-/// Helper function to build PhaseConfig for built-in contract types.
-pub fn build_phase_config_for_builtin(contract_type: &str) -> PhaseConfig {
- match contract_type {
- "simple" => PhaseConfig {
- phases: vec![
- PhaseDefinition { id: "plan".to_string(), name: "Plan".to_string(), order: 0 },
- PhaseDefinition { id: "execute".to_string(), name: "Execute".to_string(), order: 1 },
- ],
- default_phase: "plan".to_string(),
- deliverables: [
- ("plan".to_string(), vec![DeliverableDefinition {
- id: "plan-document".to_string(),
- name: "Plan".to_string(),
- priority: "required".to_string(),
- }]),
- ("execute".to_string(), vec![DeliverableDefinition {
- id: "pull-request".to_string(),
- name: "Pull Request".to_string(),
- priority: "required".to_string(),
- }]),
- ].into_iter().collect(),
- },
- "specification" => PhaseConfig {
- phases: vec![
- PhaseDefinition { id: "research".to_string(), name: "Research".to_string(), order: 0 },
- PhaseDefinition { id: "specify".to_string(), name: "Specify".to_string(), order: 1 },
- PhaseDefinition { id: "plan".to_string(), name: "Plan".to_string(), order: 2 },
- PhaseDefinition { id: "execute".to_string(), name: "Execute".to_string(), order: 3 },
- PhaseDefinition { id: "review".to_string(), name: "Review".to_string(), order: 4 },
- ],
- default_phase: "research".to_string(),
- deliverables: [
- ("research".to_string(), vec![DeliverableDefinition {
- id: "research-notes".to_string(),
- name: "Research Notes".to_string(),
- priority: "required".to_string(),
- }]),
- ("specify".to_string(), vec![DeliverableDefinition {
- id: "requirements-document".to_string(),
- name: "Requirements Document".to_string(),
- priority: "required".to_string(),
- }]),
- ("plan".to_string(), vec![DeliverableDefinition {
- id: "plan-document".to_string(),
- name: "Plan".to_string(),
- priority: "required".to_string(),
- }]),
- ("execute".to_string(), vec![DeliverableDefinition {
- id: "pull-request".to_string(),
- name: "Pull Request".to_string(),
- priority: "required".to_string(),
- }]),
- ("review".to_string(), vec![DeliverableDefinition {
- id: "release-notes".to_string(),
- name: "Release Notes".to_string(),
- priority: "required".to_string(),
- }]),
- ].into_iter().collect(),
- },
- "execute" | _ => PhaseConfig {
- phases: vec![
- PhaseDefinition { id: "execute".to_string(), name: "Execute".to_string(), order: 0 },
- ],
- default_phase: "execute".to_string(),
- deliverables: std::collections::HashMap::new(),
- },
- }
-}
-
-// =============================================================================
-// Contract Functions (Owner-Scoped)
-// =============================================================================
-
-/// Create a new contract for a specific owner.
-/// Supports both built-in contract types (simple, specification, execute) and custom templates.
-pub async fn create_contract_for_owner(
- pool: &PgPool,
- owner_id: Uuid,
- req: CreateContractRequest,
-) -> Result<Contract, sqlx::Error> {
- // Determine phase configuration based on template_id or contract_type
- let (phase_config, contract_type_str, default_phase): (PhaseConfig, String, String) =
- if let Some(template_id) = req.template_id {
- // Look up the custom template
- let template = get_template_by_id(pool, template_id)
- .await?
- .ok_or_else(|| {
- sqlx::Error::Protocol(format!("Template not found: {}", template_id))
- })?;
-
- let config = build_phase_config_from_template(&template);
- let default = config.default_phase.clone();
- // For custom templates, store the template name as the contract_type
- (config, template.name.clone(), default)
- } else {
- // Use built-in contract type
- let contract_type = req.contract_type.as_deref().unwrap_or("simple");
-
- // Validate contract type
- let valid_types = ["simple", "specification", "execute"];
- if !valid_types.contains(&contract_type) {
- return Err(sqlx::Error::Protocol(format!(
- "Invalid contract_type '{}'. Must be one of: {} or provide a template_id",
- contract_type,
- valid_types.join(", ")
- )));
- }
-
- let config = build_phase_config_for_builtin(contract_type);
- let default = config.default_phase.clone();
- (config, contract_type.to_string(), default)
- };
-
- // Get valid phase IDs from the configuration
- let valid_phase_ids: Vec<String> = phase_config.phases.iter().map(|p| p.id.clone()).collect();
-
- // Use provided initial_phase or default based on contract type/template
- let phase = req.initial_phase.as_deref().unwrap_or(&default_phase);
-
- // Validate the phase is valid for this contract type/template
- if !valid_phase_ids.contains(&phase.to_string()) {
- return Err(sqlx::Error::Protocol(format!(
- "Invalid initial_phase '{}' for contract type '{}'. Must be one of: {}",
- phase,
- contract_type_str,
- valid_phase_ids.join(", ")
- )));
- }
-
- let autonomous_loop = req.autonomous_loop.unwrap_or(false);
- let phase_guard = req.phase_guard.unwrap_or(false);
- let local_only = req.local_only.unwrap_or(false);
- let auto_merge_local = req.auto_merge_local.unwrap_or(false);
-
- // Serialize phase_config to JSON
- let phase_config_json = serde_json::to_value(&phase_config).ok();
-
- sqlx::query_as::<_, Contract>(
- r#"
- INSERT INTO contracts (owner_id, name, description, contract_type, phase, autonomous_loop, phase_guard, local_only, auto_merge_local, phase_config)
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
- RETURNING *
- "#,
- )
- .bind(owner_id)
- .bind(&req.name)
- .bind(&req.description)
- .bind(&contract_type_str)
- .bind(phase)
- .bind(autonomous_loop)
- .bind(phase_guard)
- .bind(local_only)
- .bind(auto_merge_local)
- .bind(phase_config_json)
- .fetch_one(pool)
- .await
-}
-
-/// Get a contract by ID, scoped to owner.
-pub async fn get_contract_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
-) -> Result<Option<Contract>, sqlx::Error> {
- sqlx::query_as::<_, Contract>(
- r#"
- SELECT *
- FROM contracts
- WHERE id = $1 AND owner_id = $2
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .fetch_optional(pool)
- .await
-}
-
-/// List all contracts for an owner, ordered by created_at DESC.
-pub async fn list_contracts_for_owner(
- pool: &PgPool,
- owner_id: Uuid,
-) -> Result<Vec<ContractSummary>, sqlx::Error> {
- sqlx::query_as::<_, ContractSummary>(
- r#"
- SELECT
- c.id, c.name, c.description, c.contract_type, c.phase, c.status,
- c.supervisor_task_id, c.local_only, c.auto_merge_local, c.version, c.created_at,
- (SELECT COUNT(*) FROM files WHERE contract_id = c.id) as file_count,
- (SELECT COUNT(*) FROM tasks WHERE contract_id = c.id) as task_count,
- (SELECT COUNT(*) FROM contract_repositories WHERE contract_id = c.id) as repository_count
- FROM contracts c
- WHERE c.owner_id = $1
- ORDER BY c.created_at DESC
- "#,
- )
- .bind(owner_id)
- .fetch_all(pool)
- .await
-}
-
-/// Get contract summary by ID.
-pub async fn get_contract_summary_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
-) -> Result<Option<ContractSummary>, sqlx::Error> {
- sqlx::query_as::<_, ContractSummary>(
- r#"
- SELECT
- c.id, c.name, c.description, c.contract_type, c.phase, c.status,
- c.supervisor_task_id, c.local_only, c.auto_merge_local, c.version, c.created_at,
- (SELECT COUNT(*) FROM files WHERE contract_id = c.id) as file_count,
- (SELECT COUNT(*) FROM tasks WHERE contract_id = c.id) as task_count,
- (SELECT COUNT(*) FROM contract_repositories WHERE contract_id = c.id) as repository_count
- FROM contracts c
- WHERE c.id = $1 AND c.owner_id = $2
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Update a contract by ID with optimistic locking, scoped to owner.
-pub async fn update_contract_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
- req: UpdateContractRequest,
-) -> Result<Option<Contract>, RepositoryError> {
- let existing = get_contract_for_owner(pool, id, owner_id).await?;
- let Some(existing) = existing else {
- return Ok(None);
- };
-
- // Check version if provided (optimistic locking)
- if let Some(expected_version) = req.version {
- if existing.version != expected_version {
- return Err(RepositoryError::VersionConflict {
- expected: expected_version,
- actual: existing.version,
- });
- }
- }
-
- // Apply updates
- let name = req.name.unwrap_or(existing.name);
- let description = req.description.or(existing.description);
- let phase = req.phase.unwrap_or(existing.phase);
- let status = req.status.unwrap_or(existing.status);
- let supervisor_task_id = req.supervisor_task_id.or(existing.supervisor_task_id);
- let autonomous_loop = req.autonomous_loop.unwrap_or(existing.autonomous_loop);
- let phase_guard = req.phase_guard.unwrap_or(existing.phase_guard);
- let local_only = req.local_only.unwrap_or(existing.local_only);
- let auto_merge_local = req.auto_merge_local.unwrap_or(existing.auto_merge_local);
-
- let result = if req.version.is_some() {
- sqlx::query_as::<_, Contract>(
- r#"
- UPDATE contracts
- SET name = $3, description = $4, phase = $5, status = $6,
- supervisor_task_id = $7, autonomous_loop = $8, phase_guard = $9, local_only = $10, auto_merge_local = $11, version = version + 1, updated_at = NOW()
- WHERE id = $1 AND owner_id = $2 AND version = $12
- RETURNING *
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .bind(&name)
- .bind(&description)
- .bind(&phase)
- .bind(&status)
- .bind(supervisor_task_id)
- .bind(autonomous_loop)
- .bind(phase_guard)
- .bind(local_only)
- .bind(auto_merge_local)
- .bind(req.version.unwrap())
- .fetch_optional(pool)
- .await?
- } else {
- sqlx::query_as::<_, Contract>(
- r#"
- UPDATE contracts
- SET name = $3, description = $4, phase = $5, status = $6,
- supervisor_task_id = $7, autonomous_loop = $8, phase_guard = $9, local_only = $10, auto_merge_local = $11, version = version + 1, updated_at = NOW()
- WHERE id = $1 AND owner_id = $2
- RETURNING *
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .bind(&name)
- .bind(&description)
- .bind(&phase)
- .bind(&status)
- .bind(supervisor_task_id)
- .bind(autonomous_loop)
- .bind(phase_guard)
- .bind(local_only)
- .bind(auto_merge_local)
- .fetch_optional(pool)
- .await?
- };
-
- // If versioned update returned None, there was a race condition
- if result.is_none() && req.version.is_some() {
- if let Some(current) = get_contract_for_owner(pool, id, owner_id).await? {
- return Err(RepositoryError::VersionConflict {
- expected: req.version.unwrap(),
- actual: current.version,
- });
- }
- }
-
- Ok(result)
-}
-
-/// Delete a contract by ID, scoped to owner.
-pub async fn delete_contract_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
-) -> Result<bool, sqlx::Error> {
- let result = sqlx::query(
- r#"
- DELETE FROM contracts
- WHERE id = $1 AND owner_id = $2
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected() > 0)
-}
-
-/// Change contract phase and record event.
-///
-/// This is the simple version without version checking. Use `change_contract_phase_with_version`
-/// for explicit version conflict detection.
-pub async fn change_contract_phase_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
- new_phase: &str,
-) -> Result<Option<Contract>, sqlx::Error> {
- // Get current phase
- let existing = get_contract_for_owner(pool, id, owner_id).await?;
- let Some(existing) = existing else {
- return Ok(None);
- };
-
- let previous_phase = existing.phase.clone();
-
- // Update phase
- let contract = sqlx::query_as::<_, Contract>(
- r#"
- UPDATE contracts
- SET phase = $3, version = version + 1, updated_at = NOW()
- WHERE id = $1 AND owner_id = $2
- RETURNING *
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .bind(new_phase)
- .fetch_optional(pool)
- .await?;
-
- // Record event
- if contract.is_some() {
- sqlx::query(
- r#"
- INSERT INTO contract_events (contract_id, event_type, previous_phase, new_phase)
- VALUES ($1, 'phase_change', $2, $3)
- "#,
- )
- .bind(id)
- .bind(&previous_phase)
- .bind(new_phase)
- .execute(pool)
- .await?;
- }
-
- Ok(contract)
-}
-
-/// Change contract phase with explicit version checking for conflict detection.
-///
-/// Uses `SELECT ... FOR UPDATE` to lock the row and prevent race conditions.
-/// Returns `PhaseChangeResult::VersionConflict` if the expected version doesn't match.
-pub async fn change_contract_phase_with_version(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
- new_phase: &str,
- expected_version: Option<i32>,
-) -> Result<PhaseChangeResult, sqlx::Error> {
- // Start a transaction to ensure atomicity with row locking
- let mut tx = pool.begin().await?;
-
- // Lock the row with SELECT FOR UPDATE and get current state
- let existing: Option<Contract> = sqlx::query_as::<_, Contract>(
- r#"
- SELECT *
- FROM contracts
- WHERE id = $1 AND owner_id = $2
- FOR UPDATE
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .fetch_optional(&mut *tx)
- .await?;
-
- let Some(existing) = existing else {
- tx.rollback().await?;
- return Ok(PhaseChangeResult::NotFound);
- };
-
- // Check version if provided (optimistic locking)
- if let Some(expected) = expected_version {
- if existing.version != expected {
- tx.rollback().await?;
- return Ok(PhaseChangeResult::VersionConflict {
- expected,
- actual: existing.version,
- current_phase: existing.phase,
- });
- }
- }
-
- // Validate the phase transition is allowed
- let valid_phases = existing.valid_phase_ids();
- if !valid_phases.contains(&new_phase.to_string()) {
- tx.rollback().await?;
- return Ok(PhaseChangeResult::ValidationFailed {
- reason: format!(
- "Invalid phase '{}' for contract type '{}'",
- new_phase, existing.contract_type
- ),
- missing_requirements: vec![format!(
- "Phase must be one of: {}",
- valid_phases.join(", ")
- )],
- });
- }
-
- let previous_phase = existing.phase.clone();
-
- // Update phase with version increment
- let contract = sqlx::query_as::<_, Contract>(
- r#"
- UPDATE contracts
- SET phase = $3, version = version + 1, updated_at = NOW()
- WHERE id = $1 AND owner_id = $2
- RETURNING *
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .bind(new_phase)
- .fetch_one(&mut *tx)
- .await?;
-
- // Record event
- sqlx::query(
- r#"
- INSERT INTO contract_events (contract_id, event_type, previous_phase, new_phase)
- VALUES ($1, 'phase_change', $2, $3)
- "#,
- )
- .bind(id)
- .bind(&previous_phase)
- .bind(new_phase)
- .execute(&mut *tx)
- .await?;
-
- // Commit the transaction
- tx.commit().await?;
-
- Ok(PhaseChangeResult::Success(contract))
-}
-
-// =============================================================================
-// Contract Repository Functions
-// =============================================================================
-
-/// List repositories for a contract.
-pub async fn list_contract_repositories(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Vec<ContractRepository>, sqlx::Error> {
- sqlx::query_as::<_, ContractRepository>(
- r#"
- SELECT *
- FROM contract_repositories
- WHERE contract_id = $1
- ORDER BY is_primary DESC, created_at ASC
- "#,
- )
- .bind(contract_id)
- .fetch_all(pool)
- .await
-}
-
-/// Add a remote repository to a contract.
-pub async fn add_remote_repository(
- pool: &PgPool,
- contract_id: Uuid,
- name: &str,
- repository_url: &str,
- is_primary: bool,
-) -> Result<ContractRepository, sqlx::Error> {
- // If is_primary, clear other primaries first
- if is_primary {
- sqlx::query(
- r#"
- UPDATE contract_repositories
- SET is_primary = false, updated_at = NOW()
- WHERE contract_id = $1 AND is_primary = true
- "#,
- )
- .bind(contract_id)
- .execute(pool)
- .await?;
- }
- sqlx::query_as::<_, ContractRepository>(
- r#"
- INSERT INTO contract_repositories (contract_id, name, repository_url, source_type, status, is_primary)
- VALUES ($1, $2, $3, 'remote', 'ready', $4)
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(name)
- .bind(repository_url)
- .bind(is_primary)
- .fetch_one(pool)
- .await
-}
-
-/// Add a local repository to a contract.
-pub async fn add_local_repository(
- pool: &PgPool,
- contract_id: Uuid,
- name: &str,
- local_path: &str,
- is_primary: bool,
-) -> Result<ContractRepository, sqlx::Error> {
- // If is_primary, clear other primaries first
- if is_primary {
- sqlx::query(
- r#"
- UPDATE contract_repositories
- SET is_primary = false, updated_at = NOW()
- WHERE contract_id = $1 AND is_primary = true
- "#,
- )
- .bind(contract_id)
- .execute(pool)
- .await?;
- }
-
- sqlx::query_as::<_, ContractRepository>(
+/// Get task tree from a specific root task.
+pub async fn get_task_tree(pool: &PgPool, root_task_id: Uuid) -> Result<Vec<Task>, sqlx::Error> {
+ sqlx::query_as::<_, Task>(
r#"
- INSERT INTO contract_repositories (contract_id, name, local_path, source_type, status, is_primary)
- VALUES ($1, $2, $3, 'local', 'ready', $4)
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(name)
- .bind(local_path)
- .bind(is_primary)
- .fetch_one(pool)
- .await
-}
-
-/// Create a managed repository (daemon will create it).
-pub async fn create_managed_repository(
- pool: &PgPool,
- contract_id: Uuid,
- name: &str,
- is_primary: bool,
-) -> Result<ContractRepository, sqlx::Error> {
- // If is_primary, clear other primaries first
- if is_primary {
- sqlx::query(
- r#"
- UPDATE contract_repositories
- SET is_primary = false, updated_at = NOW()
- WHERE contract_id = $1 AND is_primary = true
- "#,
+ WITH RECURSIVE task_tree AS (
+ -- Base case: the root task
+ SELECT * FROM tasks WHERE id = $1
+ UNION ALL
+ -- Recursive case: children of current level
+ SELECT t.* FROM tasks t
+ JOIN task_tree tt ON t.parent_task_id = tt.id
)
- .bind(contract_id)
- .execute(pool)
- .await?;
- }
-
- sqlx::query_as::<_, ContractRepository>(
- r#"
- INSERT INTO contract_repositories (contract_id, name, source_type, status, is_primary)
- VALUES ($1, $2, 'managed', 'pending', $3)
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(name)
- .bind(is_primary)
- .fetch_one(pool)
- .await
-}
-
-/// Delete a repository from a contract.
-pub async fn delete_contract_repository(
- pool: &PgPool,
- repo_id: Uuid,
- contract_id: Uuid,
-) -> Result<bool, sqlx::Error> {
- let result = sqlx::query(
- r#"
- DELETE FROM contract_repositories
- WHERE id = $1 AND contract_id = $2
- "#,
- )
- .bind(repo_id)
- .bind(contract_id)
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected() > 0)
-}
-
-/// Set a repository as primary (and clear others).
-pub async fn set_repository_primary(
- pool: &PgPool,
- repo_id: Uuid,
- contract_id: Uuid,
-) -> Result<bool, sqlx::Error> {
- // Clear other primaries
- sqlx::query(
- r#"
- UPDATE contract_repositories
- SET is_primary = false, updated_at = NOW()
- WHERE contract_id = $1 AND is_primary = true
- "#,
- )
- .bind(contract_id)
- .execute(pool)
- .await?;
-
- // Set this one as primary
- let result = sqlx::query(
- r#"
- UPDATE contract_repositories
- SET is_primary = true, updated_at = NOW()
- WHERE id = $1 AND contract_id = $2
- "#,
- )
- .bind(repo_id)
- .bind(contract_id)
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected() > 0)
-}
-
-/// Update managed repository status (used by daemon).
-pub async fn update_managed_repository_status(
- pool: &PgPool,
- repo_id: Uuid,
- status: &str,
- repository_url: Option<&str>,
-) -> Result<Option<ContractRepository>, sqlx::Error> {
- sqlx::query_as::<_, ContractRepository>(
- r#"
- UPDATE contract_repositories
- SET status = $2, repository_url = COALESCE($3, repository_url), updated_at = NOW()
- WHERE id = $1
- RETURNING *
- "#,
- )
- .bind(repo_id)
- .bind(status)
- .bind(repository_url)
- .fetch_optional(pool)
- .await
-}
-
-// =============================================================================
-// Contract Task Association Functions
-// =============================================================================
-
-/// Add a task to a contract.
-pub async fn add_task_to_contract(
- pool: &PgPool,
- contract_id: Uuid,
- task_id: Uuid,
- owner_id: Uuid,
-) -> Result<bool, sqlx::Error> {
- let result = sqlx::query(
- r#"
- UPDATE tasks
- SET contract_id = $2, updated_at = NOW()
- WHERE id = $1 AND owner_id = $3
- "#,
- )
- .bind(task_id)
- .bind(contract_id)
- .bind(owner_id)
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected() > 0)
-}
-
-/// Remove a task from a contract.
-pub async fn remove_task_from_contract(
- pool: &PgPool,
- contract_id: Uuid,
- task_id: Uuid,
- owner_id: Uuid,
-) -> Result<bool, sqlx::Error> {
- let result = sqlx::query(
- r#"
- UPDATE tasks
- SET contract_id = NULL, updated_at = NOW()
- WHERE id = $1 AND contract_id = $2 AND owner_id = $3
- "#,
- )
- .bind(task_id)
- .bind(contract_id)
- .bind(owner_id)
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected() > 0)
-}
-
-/// List files in a contract.
-pub async fn list_files_in_contract(
- pool: &PgPool,
- contract_id: Uuid,
- owner_id: Uuid,
-) -> Result<Vec<FileSummary>, sqlx::Error> {
- // Use a manual query since FileSummary doesn't have a FromRow derive with all the computed fields
- let files = sqlx::query_as::<_, File>(
- r#"
- SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
- FROM files
- WHERE contract_id = $1 AND owner_id = $2
- ORDER BY created_at DESC
- "#,
- )
- .bind(contract_id)
- .bind(owner_id)
- .fetch_all(pool)
- .await?;
-
- Ok(files.into_iter().map(FileSummary::from).collect())
-}
-
-/// List tasks in a contract.
-pub async fn list_tasks_in_contract(
- pool: &PgPool,
- contract_id: Uuid,
- owner_id: Uuid,
-) -> Result<Vec<TaskSummary>, sqlx::Error> {
- sqlx::query_as::<_, TaskSummary>(
- r#"
- SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
- t.parent_task_id, t.depth, t.name, t.status, t.priority,
- t.progress_summary,
- (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
- FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
- WHERE t.contract_id = $1 AND t.owner_id = $2
- ORDER BY t.priority DESC, t.created_at DESC
- "#,
- )
- .bind(contract_id)
- .bind(owner_id)
- .fetch_all(pool)
- .await
-}
-
-/// Minimal task info for worktree cleanup operations.
-#[derive(Debug, Clone, sqlx::FromRow)]
-pub struct TaskWorktreeInfo {
- pub id: Uuid,
- pub daemon_id: Option<Uuid>,
- pub overlay_path: Option<String>,
- /// If set, this task shares the worktree of the specified supervisor task.
- /// Should NOT have its worktree deleted during cleanup.
- pub supervisor_worktree_task_id: Option<Uuid>,
-}
-
-/// List tasks in a contract with their daemon/worktree info.
-/// Used for cleaning up worktrees when a contract is completed or deleted.
-pub async fn list_contract_tasks_with_worktree_info(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Vec<TaskWorktreeInfo>, sqlx::Error> {
- sqlx::query_as::<_, TaskWorktreeInfo>(
- r#"
- SELECT id, daemon_id, overlay_path, supervisor_worktree_task_id
- FROM tasks
- WHERE contract_id = $1 AND (daemon_id IS NOT NULL OR overlay_path IS NOT NULL)
- "#,
- )
- .bind(contract_id)
- .fetch_all(pool)
- .await
-}
-
-// =============================================================================
-// Contract Events
-// =============================================================================
-
-/// List events for a contract.
-pub async fn list_contract_events(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Vec<ContractEvent>, sqlx::Error> {
- sqlx::query_as::<_, ContractEvent>(
- r#"
- SELECT *
- FROM contract_events
- WHERE contract_id = $1
- ORDER BY created_at DESC
+ SELECT * FROM task_tree
+ ORDER BY depth, created_at
"#,
)
- .bind(contract_id)
+ .bind(root_task_id)
.fetch_all(pool)
.await
}
-/// Record a contract event.
-pub async fn record_contract_event(
- pool: &PgPool,
- contract_id: Uuid,
- event_type: &str,
- event_data: Option<serde_json::Value>,
-) -> Result<ContractEvent, sqlx::Error> {
- sqlx::query_as::<_, ContractEvent>(
- r#"
- INSERT INTO contract_events (contract_id, event_type, event_data)
- VALUES ($1, $2, $3)
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(event_type)
- .bind(event_data)
- .fetch_one(pool)
- .await
-}
-
// ============================================================================
// Task Checkpoints
// ============================================================================
@@ -3501,713 +2100,6 @@ pub async fn list_task_checkpoints(
}
// ============================================================================
-// Supervisor State
-// ============================================================================
-
-/// Create or update supervisor state for a contract.
-pub async fn upsert_supervisor_state(
- pool: &PgPool,
- contract_id: Uuid,
- task_id: Uuid,
- conversation_history: serde_json::Value,
- pending_task_ids: &[Uuid],
- phase: &str,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- INSERT INTO supervisor_states (contract_id, task_id, conversation_history, pending_task_ids, phase, last_activity)
- VALUES ($1, $2, $3, $4, $5, NOW())
- ON CONFLICT (contract_id) DO UPDATE SET
- task_id = EXCLUDED.task_id,
- conversation_history = EXCLUDED.conversation_history,
- pending_task_ids = EXCLUDED.pending_task_ids,
- phase = EXCLUDED.phase,
- last_activity = NOW(),
- updated_at = NOW()
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(task_id)
- .bind(conversation_history)
- .bind(pending_task_ids)
- .bind(phase)
- .fetch_one(pool)
- .await
-}
-
-/// Get supervisor state for a contract.
-pub async fn get_supervisor_state(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Option<SupervisorState>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>("SELECT * FROM supervisor_states WHERE contract_id = $1")
- .bind(contract_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Get supervisor state by task ID.
-pub async fn get_supervisor_state_by_task(
- pool: &PgPool,
- task_id: Uuid,
-) -> Result<Option<SupervisorState>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>("SELECT * FROM supervisor_states WHERE task_id = $1")
- .bind(task_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Update supervisor conversation history.
-pub async fn update_supervisor_conversation(
- pool: &PgPool,
- contract_id: Uuid,
- conversation_history: serde_json::Value,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET conversation_history = $1,
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $2
- RETURNING *
- "#,
- )
- .bind(conversation_history)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Update supervisor pending tasks.
-pub async fn update_supervisor_pending_tasks(
- pool: &PgPool,
- contract_id: Uuid,
- pending_task_ids: &[Uuid],
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET pending_task_ids = $1,
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $2
- RETURNING *
- "#,
- )
- .bind(pending_task_ids)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Update supervisor state with detailed activity tracking.
-/// Called at key save points: LLM response, task spawn, question asked, phase change.
-pub async fn update_supervisor_detailed_state(
- pool: &PgPool,
- contract_id: Uuid,
- state: &str,
- current_activity: Option<&str>,
- progress: i32,
- error_message: Option<&str>,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET state = $1,
- current_activity = $2,
- progress = $3,
- error_message = $4,
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $5
- RETURNING *
- "#,
- )
- .bind(state)
- .bind(current_activity)
- .bind(progress)
- .bind(error_message)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Add a spawned task ID to the supervisor's list.
-pub async fn add_supervisor_spawned_task(
- pool: &PgPool,
- contract_id: Uuid,
- task_id: Uuid,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET spawned_task_ids = array_append(spawned_task_ids, $1),
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $2
- RETURNING *
- "#,
- )
- .bind(task_id)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Add a pending question to the supervisor state.
-pub async fn add_supervisor_pending_question(
- pool: &PgPool,
- contract_id: Uuid,
- question: serde_json::Value,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET pending_questions = pending_questions || $1::jsonb,
- state = 'waiting_for_user',
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $2
- RETURNING *
- "#,
- )
- .bind(question)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Remove a pending question by ID.
-pub async fn remove_supervisor_pending_question(
- pool: &PgPool,
- contract_id: Uuid,
- question_id: Uuid,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET pending_questions = (
- SELECT COALESCE(jsonb_agg(elem), '[]'::jsonb)
- FROM jsonb_array_elements(pending_questions) elem
- WHERE (elem->>'id')::uuid != $1
- ),
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $2
- RETURNING *
- "#,
- )
- .bind(question_id)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Comprehensive state save - used at major save points.
-pub async fn save_supervisor_state_full(
- pool: &PgPool,
- contract_id: Uuid,
- task_id: Uuid,
- conversation_history: serde_json::Value,
- pending_task_ids: &[Uuid],
- phase: &str,
- state: &str,
- current_activity: Option<&str>,
- progress: i32,
- error_message: Option<&str>,
- spawned_task_ids: &[Uuid],
- pending_questions: serde_json::Value,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- INSERT INTO supervisor_states (
- contract_id, task_id, conversation_history, pending_task_ids, phase,
- state, current_activity, progress, error_message, spawned_task_ids,
- pending_questions, last_activity
- )
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW())
- ON CONFLICT (contract_id) DO UPDATE SET
- task_id = EXCLUDED.task_id,
- conversation_history = EXCLUDED.conversation_history,
- pending_task_ids = EXCLUDED.pending_task_ids,
- phase = EXCLUDED.phase,
- state = EXCLUDED.state,
- current_activity = EXCLUDED.current_activity,
- progress = EXCLUDED.progress,
- error_message = EXCLUDED.error_message,
- spawned_task_ids = EXCLUDED.spawned_task_ids,
- pending_questions = EXCLUDED.pending_questions,
- last_activity = NOW(),
- updated_at = NOW()
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(task_id)
- .bind(conversation_history)
- .bind(pending_task_ids)
- .bind(phase)
- .bind(state)
- .bind(current_activity)
- .bind(progress)
- .bind(error_message)
- .bind(spawned_task_ids)
- .bind(pending_questions)
- .fetch_one(pool)
- .await
-}
-
-/// Mark supervisor as restored from a crash/interruption.
-pub async fn mark_supervisor_restored(
- pool: &PgPool,
- contract_id: Uuid,
- restoration_source: &str,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET restoration_count = restoration_count + 1,
- last_restored_at = NOW(),
- restoration_source = $1,
- state = 'initializing',
- error_message = NULL,
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $2
- RETURNING *
- "#,
- )
- .bind(restoration_source)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Get supervisors with pending questions (for re-delivery after restoration).
-pub async fn get_supervisors_with_pending_questions(
- pool: &PgPool,
- owner_id: Uuid,
-) -> Result<Vec<SupervisorState>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- SELECT ss.*
- FROM supervisor_states ss
- JOIN contracts c ON c.id = ss.contract_id
- WHERE c.owner_id = $1
- AND ss.pending_questions != '[]'::jsonb
- AND c.status = 'active'
- ORDER BY ss.last_activity DESC
- "#,
- )
- .bind(owner_id)
- .fetch_all(pool)
- .await
-}
-
-/// Get supervisor state with full details for restoration.
-/// Includes validation info.
-pub async fn get_supervisor_state_for_restoration(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Option<SupervisorState>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- SELECT * FROM supervisor_states WHERE contract_id = $1
- "#,
- )
- .bind(contract_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Validate spawned tasks are in expected states.
-/// Returns map of task_id -> (status, updated_at).
-pub async fn validate_spawned_tasks(
- pool: &PgPool,
- task_ids: &[Uuid],
-) -> Result<std::collections::HashMap<Uuid, (String, chrono::DateTime<Utc>)>, sqlx::Error> {
- use sqlx::Row;
-
- let rows = sqlx::query(
- r#"
- SELECT id, status, updated_at
- FROM tasks
- WHERE id = ANY($1)
- "#,
- )
- .bind(task_ids)
- .fetch_all(pool)
- .await?;
-
- let mut result = std::collections::HashMap::new();
- for row in rows {
- let id: Uuid = row.get("id");
- let status: String = row.get("status");
- let updated_at: chrono::DateTime<Utc> = row.get("updated_at");
- result.insert(id, (status, updated_at));
- }
- Ok(result)
-}
-
-/// Update supervisor state when phase changes.
-pub async fn update_supervisor_phase(
- pool: &PgPool,
- contract_id: Uuid,
- new_phase: &str,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET phase = $1,
- state = 'working',
- current_activity = 'Phase changed to ' || $1,
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $2
- RETURNING *
- "#,
- )
- .bind(new_phase)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Update supervisor state on heartbeat (lightweight update).
-pub async fn update_supervisor_heartbeat_state(
- pool: &PgPool,
- contract_id: Uuid,
- state: &str,
- current_activity: Option<&str>,
- progress: i32,
- pending_task_ids: &[Uuid],
-) -> Result<(), sqlx::Error> {
- sqlx::query(
- r#"
- UPDATE supervisor_states
- SET state = $1,
- current_activity = $2,
- progress = $3,
- pending_task_ids = $4,
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $5
- "#,
- )
- .bind(state)
- .bind(current_activity)
- .bind(progress)
- .bind(pending_task_ids)
- .bind(contract_id)
- .execute(pool)
- .await?;
- Ok(())
-}
-
-// ============================================================================
-// Supervisor Heartbeats
-// ============================================================================
-
-/// Record a supervisor heartbeat.
-/// This creates a historical record for monitoring and dead supervisor detection.
-pub async fn create_supervisor_heartbeat(
- pool: &PgPool,
- supervisor_task_id: Uuid,
- contract_id: Uuid,
- state: &str,
- phase: &str,
- current_activity: Option<&str>,
- progress: i32,
- pending_task_ids: &[Uuid],
-) -> Result<SupervisorHeartbeatRecord, sqlx::Error> {
- sqlx::query_as::<_, SupervisorHeartbeatRecord>(
- r#"
- INSERT INTO supervisor_heartbeats (
- supervisor_task_id, contract_id, state, phase, current_activity, progress, pending_task_ids, timestamp
- )
- VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
- RETURNING *
- "#,
- )
- .bind(supervisor_task_id)
- .bind(contract_id)
- .bind(state)
- .bind(phase)
- .bind(current_activity)
- .bind(progress)
- .bind(pending_task_ids)
- .fetch_one(pool)
- .await
-}
-
-/// Get the latest heartbeat for a supervisor task.
-pub async fn get_latest_supervisor_heartbeat(
- pool: &PgPool,
- supervisor_task_id: Uuid,
-) -> Result<Option<SupervisorHeartbeatRecord>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorHeartbeatRecord>(
- r#"
- SELECT * FROM supervisor_heartbeats
- WHERE supervisor_task_id = $1
- ORDER BY timestamp DESC
- LIMIT 1
- "#,
- )
- .bind(supervisor_task_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Get recent heartbeats for a supervisor task.
-pub async fn get_supervisor_heartbeats(
- pool: &PgPool,
- supervisor_task_id: Uuid,
- limit: i64,
-) -> Result<Vec<SupervisorHeartbeatRecord>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorHeartbeatRecord>(
- r#"
- SELECT * FROM supervisor_heartbeats
- WHERE supervisor_task_id = $1
- ORDER BY timestamp DESC
- LIMIT $2
- "#,
- )
- .bind(supervisor_task_id)
- .bind(limit)
- .fetch_all(pool)
- .await
-}
-
-/// Get recent heartbeats for a contract.
-pub async fn get_contract_supervisor_heartbeats(
- pool: &PgPool,
- contract_id: Uuid,
- limit: i64,
-) -> Result<Vec<SupervisorHeartbeatRecord>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorHeartbeatRecord>(
- r#"
- SELECT * FROM supervisor_heartbeats
- WHERE contract_id = $1
- ORDER BY timestamp DESC
- LIMIT $2
- "#,
- )
- .bind(contract_id)
- .bind(limit)
- .fetch_all(pool)
- .await
-}
-
-/// Delete old heartbeats beyond the TTL (24 hours by default).
-/// Returns the number of deleted records.
-pub async fn cleanup_old_heartbeats(
- pool: &PgPool,
- ttl_hours: i64,
-) -> Result<u64, sqlx::Error> {
- let result = sqlx::query(
- r#"
- DELETE FROM supervisor_heartbeats
- WHERE timestamp < NOW() - ($1 || ' hours')::INTERVAL
- "#,
- )
- .bind(ttl_hours.to_string())
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected())
-}
-
-/// Find supervisors that have not sent a heartbeat within the timeout period.
-/// Returns list of (supervisor_task_id, contract_id, last_heartbeat_timestamp).
-pub async fn find_stale_supervisors(
- pool: &PgPool,
- timeout_seconds: i64,
-) -> Result<Vec<(Uuid, Uuid, chrono::DateTime<Utc>)>, sqlx::Error> {
- let rows = sqlx::query(
- r#"
- WITH latest_heartbeats AS (
- SELECT DISTINCT ON (supervisor_task_id)
- supervisor_task_id,
- contract_id,
- timestamp
- FROM supervisor_heartbeats
- ORDER BY supervisor_task_id, timestamp DESC
- )
- SELECT
- lh.supervisor_task_id,
- lh.contract_id,
- lh.timestamp
- FROM latest_heartbeats lh
- JOIN tasks t ON t.id = lh.supervisor_task_id
- WHERE t.status = 'running'
- AND lh.timestamp < NOW() - ($1 || ' seconds')::INTERVAL
- "#,
- )
- .bind(timeout_seconds.to_string())
- .fetch_all(pool)
- .await?;
-
- let mut result = Vec::new();
- for row in rows {
- use sqlx::Row;
- let supervisor_task_id: Uuid = row.get("supervisor_task_id");
- let contract_id: Uuid = row.get("contract_id");
- let timestamp: chrono::DateTime<Utc> = row.get("timestamp");
- result.push((supervisor_task_id, contract_id, timestamp));
- }
- Ok(result)
-}
-
-// ============================================================================
-// Contract Supervisor
-// ============================================================================
-
-/// Update contract's supervisor task ID.
-pub async fn update_contract_supervisor(
- pool: &PgPool,
- contract_id: Uuid,
- supervisor_task_id: Uuid,
-) -> Result<Contract, sqlx::Error> {
- sqlx::query_as::<_, Contract>(
- r#"
- UPDATE contracts
- SET supervisor_task_id = $1,
- updated_at = NOW()
- WHERE id = $2
- RETURNING *
- "#,
- )
- .bind(supervisor_task_id)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Mark a deliverable as complete for a specific phase.
-/// Uses JSONB operations to append the deliverable_id to the phase's array.
-pub async fn mark_deliverable_complete(
- pool: &PgPool,
- contract_id: Uuid,
- phase: &str,
- deliverable_id: &str,
-) -> Result<Contract, sqlx::Error> {
- // Use jsonb_set to add the deliverable to the phase's array
- // If the phase key doesn't exist, create an empty array first
- // COALESCE handles the case where the phase array doesn't exist yet
- sqlx::query_as::<_, Contract>(
- r#"
- UPDATE contracts
- SET completed_deliverables = jsonb_set(
- completed_deliverables,
- ARRAY[$2::text],
- COALESCE(completed_deliverables->$2, '[]'::jsonb) || to_jsonb($3::text),
- true
- ),
- updated_at = NOW()
- WHERE id = $1
- AND NOT (COALESCE(completed_deliverables->$2, '[]'::jsonb) ? $3)
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(phase)
- .bind(deliverable_id)
- .fetch_one(pool)
- .await
-}
-
-/// Clear all completed deliverables for a specific phase.
-/// Used when phase changes or deliverables need to be reset.
-pub async fn clear_phase_deliverables(
- pool: &PgPool,
- contract_id: Uuid,
- phase: &str,
-) -> Result<Contract, sqlx::Error> {
- sqlx::query_as::<_, Contract>(
- r#"
- UPDATE contracts
- SET completed_deliverables = completed_deliverables - $2,
- updated_at = NOW()
- WHERE id = $1
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(phase)
- .fetch_one(pool)
- .await
-}
-
-/// Get the supervisor task for a contract.
-pub async fn get_contract_supervisor_task(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Option<Task>, sqlx::Error> {
- sqlx::query_as::<_, Task>(
- r#"
- SELECT t.* FROM tasks t
- JOIN contracts c ON c.supervisor_task_id = t.id
- WHERE c.id = $1
- "#,
- )
- .bind(contract_id)
- .fetch_optional(pool)
- .await
-}
-
-// ============================================================================
-// Task Tree Queries
-// ============================================================================
-
-/// Get full task tree for a contract.
-pub async fn get_contract_task_tree(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Vec<Task>, sqlx::Error> {
- sqlx::query_as::<_, Task>(
- r#"
- WITH RECURSIVE task_tree AS (
- -- Base case: root tasks (no parent)
- SELECT * FROM tasks
- WHERE contract_id = $1 AND parent_task_id IS NULL
- UNION ALL
- -- Recursive case: children of current level
- SELECT t.* FROM tasks t
- JOIN task_tree tt ON t.parent_task_id = tt.id
- )
- SELECT * FROM task_tree
- ORDER BY depth, created_at
- "#,
- )
- .bind(contract_id)
- .fetch_all(pool)
- .await
-}
-
-/// Get task tree from a specific root task.
-pub async fn get_task_tree(pool: &PgPool, root_task_id: Uuid) -> Result<Vec<Task>, sqlx::Error> {
- sqlx::query_as::<_, Task>(
- r#"
- WITH RECURSIVE task_tree AS (
- -- Base case: the root task
- SELECT * FROM tasks WHERE id = $1
- UNION ALL
- -- Recursive case: children of current level
- SELECT t.* FROM tasks t
- JOIN task_tree tt ON t.parent_task_id = tt.id
- )
- SELECT * FROM task_tree
- ORDER BY depth, created_at
- "#,
- )
- .bind(root_task_id)
- .fetch_all(pool)
- .await
-}
-
-// ============================================================================
// Daemon Selection
// ============================================================================
@@ -4578,107 +2470,27 @@ pub async fn cleanup_old_snapshots(
pub async fn record_history_event(
pool: &PgPool,
owner_id: Uuid,
- contract_id: Option<Uuid>,
task_id: Option<Uuid>,
event_type: &str,
event_subtype: Option<&str>,
- phase: Option<&str>,
event_data: serde_json::Value,
) -> Result<HistoryEvent, sqlx::Error> {
sqlx::query_as::<_, HistoryEvent>(
r#"
- INSERT INTO history_events (owner_id, contract_id, task_id, event_type, event_subtype, phase, event_data)
- VALUES ($1, $2, $3, $4, $5, $6, $7)
+ INSERT INTO history_events (owner_id, task_id, event_type, event_subtype, event_data)
+ VALUES ($1, $2, $3, $4, $5)
RETURNING *
"#
)
.bind(owner_id)
- .bind(contract_id)
.bind(task_id)
.bind(event_type)
.bind(event_subtype)
- .bind(phase)
.bind(event_data)
.fetch_one(pool)
.await
}
-/// Get contract history timeline
-pub async fn get_contract_history(
- pool: &PgPool,
- contract_id: Uuid,
- owner_id: Uuid,
- filters: &HistoryQueryFilters,
-) -> Result<(Vec<HistoryEvent>, i64), sqlx::Error> {
- let limit = filters.limit.unwrap_or(100);
-
- let mut query = String::from(
- "SELECT * FROM history_events WHERE contract_id = $1 AND owner_id = $2"
- );
- let mut count_query = String::from(
- "SELECT COUNT(*) FROM history_events WHERE contract_id = $1 AND owner_id = $2"
- );
-
- let mut param_count = 2;
-
- if filters.phase.is_some() {
- param_count += 1;
- query.push_str(&format!(" AND phase = ${}" , param_count));
- count_query.push_str(&format!(" AND phase = ${}", param_count));
- }
-
- if filters.from.is_some() {
- param_count += 1;
- query.push_str(&format!(" AND created_at >= ${}", param_count));
- count_query.push_str(&format!(" AND created_at >= ${}", param_count));
- }
-
- if filters.to.is_some() {
- param_count += 1;
- query.push_str(&format!(" AND created_at <= ${}", param_count));
- count_query.push_str(&format!(" AND created_at <= ${}", param_count));
- }
-
- query.push_str(" ORDER BY created_at DESC");
- query.push_str(&format!(" LIMIT {}", limit));
-
- // Build and execute the query dynamically
- let mut q = sqlx::query_as::<_, HistoryEvent>(&query)
- .bind(contract_id)
- .bind(owner_id);
-
- if let Some(ref phase) = filters.phase {
- q = q.bind(phase);
- }
- if let Some(ref from) = filters.from {
- q = q.bind(from);
- }
- if let Some(ref to) = filters.to {
- q = q.bind(to);
- }
-
- let events = q.fetch_all(pool).await?;
-
- // Get total count
- let mut cq = sqlx::query_scalar::<_, i64>(&count_query)
- .bind(contract_id)
- .bind(owner_id);
-
- if let Some(ref phase) = filters.phase {
- cq = cq.bind(phase);
- }
- if let Some(ref from) = filters.from {
- cq = cq.bind(from);
- }
- if let Some(ref to) = filters.to {
- cq = cq.bind(to);
- }
-
- let count = cq.fetch_one(pool).await?;
-
- Ok((events, count))
-}
-
/// Get task history
pub async fn get_task_history(
pool: &PgPool,
@@ -4825,13 +2637,6 @@ pub async fn get_task_conversation(
Ok(messages)
}
-/// Get supervisor conversation (from supervisor_states)
-pub async fn get_supervisor_conversation_full(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Option<SupervisorState>, sqlx::Error> {
- get_supervisor_state(pool, contract_id).await
-}
// =============================================================================
// Anonymous Task Cleanup Functions
@@ -4969,156 +2774,6 @@ pub async fn delete_checkpoint_patches_for_task(
Ok(result.rows_affected() as i64)
}
-// =============================================================================
-// Red Team Notifications
-// =============================================================================
-// =============================================================================
-// Supervisor Status API Helpers
-// =============================================================================
-
-/// Get supervisor status for a contract.
-/// Returns combined information from supervisor_states and tasks tables.
-pub async fn get_supervisor_status(
- pool: &PgPool,
- contract_id: Uuid,
- owner_id: Uuid,
-) -> Result<Option<SupervisorStatusInfo>, sqlx::Error> {
- // Query to get supervisor status by joining supervisor_states with tasks
- sqlx::query_as::<_, SupervisorStatusInfo>(
- r#"
- SELECT
- ss.task_id,
- COALESCE(t.status, 'unknown') as supervisor_state,
- ss.phase,
- t.progress_summary as current_activity,
- ss.pending_task_ids,
- ss.last_activity as last_heartbeat,
- t.status as task_status,
- t.daemon_id IS NOT NULL as is_running
- FROM supervisor_states ss
- JOIN tasks t ON t.id = ss.task_id
- WHERE ss.contract_id = $1
- AND t.owner_id = $2
- "#,
- )
- .bind(contract_id)
- .bind(owner_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Internal struct to hold supervisor status query result
-#[derive(Debug, Clone, sqlx::FromRow)]
-pub struct SupervisorStatusInfo {
- pub task_id: Uuid,
- pub supervisor_state: String,
- pub phase: String,
- pub current_activity: Option<String>,
- #[sqlx(try_from = "Vec<Uuid>")]
- pub pending_task_ids: Vec<Uuid>,
- pub last_heartbeat: chrono::DateTime<chrono::Utc>,
- pub task_status: String,
- pub is_running: bool,
-}
-
-/// Get supervisor activity history from history_events table.
-/// This provides a timeline of supervisor activities that can serve as "heartbeats".
-pub async fn get_supervisor_activity_history(
- pool: &PgPool,
- contract_id: Uuid,
- limit: i32,
- offset: i32,
-) -> Result<Vec<SupervisorActivityEntry>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorActivityEntry>(
- r#"
- SELECT
- created_at as timestamp,
- COALESCE(event_subtype, 'activity') as state,
- event_data->>'activity' as activity,
- (event_data->>'progress')::INTEGER as progress,
- COALESCE(phase, 'unknown') as phase,
- CASE
- WHEN event_data->'pending_task_ids' IS NOT NULL
- THEN ARRAY(SELECT jsonb_array_elements_text(event_data->'pending_task_ids'))::UUID[]
- ELSE ARRAY[]::UUID[]
- END as pending_task_ids
- FROM history_events
- WHERE contract_id = $1
- AND event_type IN ('supervisor', 'phase', 'task')
- ORDER BY created_at DESC
- LIMIT $2 OFFSET $3
- "#,
- )
- .bind(contract_id)
- .bind(limit)
- .bind(offset)
- .fetch_all(pool)
- .await
-}
-
-/// Internal struct to hold supervisor activity entry
-#[derive(Debug, Clone, sqlx::FromRow)]
-pub struct SupervisorActivityEntry {
- pub timestamp: chrono::DateTime<chrono::Utc>,
- pub state: String,
- pub activity: Option<String>,
- pub progress: Option<i32>,
- pub phase: String,
- #[sqlx(try_from = "Vec<Uuid>")]
- pub pending_task_ids: Vec<Uuid>,
-}
-
-/// Count total supervisor activity history entries for a contract.
-pub async fn count_supervisor_activity_history(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<i64, sqlx::Error> {
- let result: (i64,) = sqlx::query_as(
- r#"
- SELECT COUNT(*)
- FROM history_events
- WHERE contract_id = $1
- AND event_type IN ('supervisor', 'phase', 'task')
- "#,
- )
- .bind(contract_id)
- .fetch_one(pool)
- .await?;
- Ok(result.0)
-}
-
-/// Update supervisor state last_activity timestamp.
-/// This acts as a "sync" operation to refresh the supervisor's heartbeat.
-pub async fn sync_supervisor_state(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Option<SupervisorState>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $1
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .fetch_optional(pool)
- .await
-}
-
-// =============================================================================
-// Helper Functions
-// =============================================================================
-
-/// Helper to truncate string to max length
-fn truncate_string(s: &str, max_len: usize) -> String {
- if s.len() <= max_len {
- s.to_string()
- } else {
- format!("{}...", &s[..max_len - 3])
- }
-}
// =============================================================================
// Directive CRUD
@@ -7031,37 +4686,6 @@ pub async fn get_running_steps_with_tasks(
.await
}
-/// A running step backed by a contract, joined with the contract's current status.
-#[derive(Debug, Clone, sqlx::FromRow)]
-pub struct RunningStepWithContract {
- pub step_id: Uuid,
- pub directive_id: Uuid,
- pub contract_id: Uuid,
- pub contract_status: String,
- pub contract_phase: String,
-}
-
-/// Get running steps that are backed by contracts (for contract-based monitoring).
-pub async fn get_running_steps_with_contracts(
- pool: &PgPool,
-) -> Result<Vec<RunningStepWithContract>, sqlx::Error> {
- sqlx::query_as::<_, RunningStepWithContract>(
- r#"
- SELECT
- ds.id AS step_id,
- ds.directive_id,
- ds.contract_id AS "contract_id!",
- c.status AS contract_status,
- c.phase AS contract_phase
- FROM directive_steps ds
- JOIN contracts c ON c.id = ds.contract_id
- WHERE ds.status = 'running'
- AND ds.contract_id IS NOT NULL
- "#,
- )
- .fetch_all(pool)
- .await
-}
/// An orchestrator task to check (directive with pending planning task).
#[derive(Debug, Clone, sqlx::FromRow)]
@@ -7221,25 +4845,6 @@ pub async fn link_task_to_step(
Ok(())
}
-/// Link a contract to a directive step.
-pub async fn link_contract_to_step(
- pool: &PgPool,
- step_id: Uuid,
- contract_id: Uuid,
-) -> Result<(), sqlx::Error> {
- sqlx::query(
- r#"
- UPDATE directive_steps
- SET contract_id = $1
- WHERE id = $2
- "#,
- )
- .bind(contract_id)
- .bind(step_id)
- .execute(pool)
- .await?;
- Ok(())
-}
/// Set a step to 'running' status (after its task has been dispatched).
pub async fn set_step_running(