diff options
Diffstat (limited to 'makima/src/db')
| -rw-r--r-- | makima/src/db/models.rs | 905 | ||||
| -rw-r--r-- | makima/src/db/repository.rs | 2537 |
2 files changed, 73 insertions, 3369 deletions
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index 3fb9667..bfb8bf3 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -111,10 +111,6 @@ pub enum BodyElement { pub struct File { pub id: Uuid, pub owner_id: Uuid, - /// Contract this file belongs to (optional) - pub contract_id: Option<Uuid>, - /// Phase of the contract when file was added (e.g., "research", "specify") - pub contract_phase: Option<String>, pub name: String, pub description: Option<String>, #[sqlx(json)] @@ -141,8 +137,6 @@ pub struct File { #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct CreateFileRequest { - /// Contract this file belongs to (required - files must belong to a contract) - pub contract_id: Uuid, /// Name of the file (auto-generated if not provided) pub name: Option<String>, /// Optional description @@ -157,8 +151,6 @@ pub struct CreateFileRequest { pub body: Vec<BodyElement>, /// Path to linked repository file (e.g., "README.md") pub repo_file_path: Option<String>, - /// Contract phase this file belongs to (for deliverable tracking) - pub contract_phase: Option<String>, } /// Request payload for updating an existing file. @@ -194,12 +186,6 @@ pub struct FileListResponse { #[serde(rename_all = "camelCase")] pub struct FileSummary { pub id: Uuid, - /// Contract this file belongs to - pub contract_id: Option<Uuid>, - /// Contract name (joined from contracts table) - pub contract_name: Option<String>, - /// Phase when file was added to contract - pub contract_phase: Option<String>, pub name: String, pub description: Option<String>, pub transcript_count: usize, @@ -224,9 +210,6 @@ impl From<File> for FileSummary { .fold(0.0_f32, f32::max); Self { id: file.id, - contract_id: file.contract_id, - contract_name: None, // Not available from File alone, requires JOIN - contract_phase: file.contract_phase, name: file.name, description: file.description, transcript_count: file.transcript.len(), @@ -425,8 +408,6 @@ impl std::str::FromStr for MergeMode { pub struct Task { pub id: Uuid, pub owner_id: Uuid, - /// Contract this task belongs to (required for new tasks) - pub contract_id: Option<Uuid>, pub parent_task_id: Option<Uuid>, /// Depth in task hierarchy (no longer constrained) pub depth: i32, @@ -436,11 +417,6 @@ pub struct Task { pub priority: i32, pub plan: String, - // Supervisor flag - /// True for contract supervisor tasks. Only supervisors can spawn new tasks. - #[serde(default)] - pub is_supervisor: bool, - // Daemon/container info pub daemon_id: Option<Uuid>, pub container_id: Option<String>, @@ -565,14 +541,6 @@ impl Task { #[serde(rename_all = "camelCase")] pub struct TaskSummary { pub id: Uuid, - /// Contract this task belongs to - pub contract_id: Option<Uuid>, - /// Contract name (joined from contracts table) - pub contract_name: Option<String>, - /// Contract phase (joined from contracts table) - pub contract_phase: Option<String>, - /// Contract status (joined from contracts table): 'active', 'completed', 'archived' - pub contract_status: Option<String>, pub parent_task_id: Option<Uuid>, /// Depth in task hierarchy: 0=orchestrator (top-level), 1=subtask (max) pub depth: i32, @@ -582,9 +550,6 @@ pub struct TaskSummary { pub progress_summary: Option<String>, pub subtask_count: i64, pub version: i32, - /// True for contract supervisor tasks - #[serde(default)] - pub is_supervisor: bool, /// Whether this task is hidden from the UI (user dismissed it) #[serde(default)] pub hidden: bool, @@ -597,10 +562,6 @@ impl From<Task> for TaskSummary { fn from(task: Task) -> Self { Self { id: task.id, - contract_id: task.contract_id, - contract_name: None, // Not available from Task directly - contract_phase: None, // Not available from Task directly - contract_status: None, // Not available from Task directly parent_task_id: task.parent_task_id, depth: task.depth, name: task.name, @@ -609,7 +570,6 @@ impl From<Task> for TaskSummary { progress_summary: task.progress_summary, subtask_count: 0, // Would need separate query version: task.version, - is_supervisor: task.is_supervisor, hidden: task.hidden, created_at: task.created_at, updated_at: task.updated_at, @@ -629,8 +589,6 @@ pub struct TaskListResponse { #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct CreateTaskRequest { - /// Contract this task belongs to (optional for branched/anonymous tasks) - pub contract_id: Option<Uuid>, /// Name of the task pub name: String, /// Optional description @@ -639,9 +597,6 @@ pub struct CreateTaskRequest { pub plan: String, /// Parent task ID (for subtasks) pub parent_task_id: Option<Uuid>, - /// True for contract supervisor tasks. Only supervisors can spawn new tasks. - #[serde(default)] - pub is_supervisor: bool, /// Priority (higher = more urgent) #[serde(default)] pub priority: i32, @@ -668,9 +623,6 @@ pub struct CreateTaskRequest { pub branched_from_task_id: Option<Uuid>, /// Conversation history to initialize the task with (JSON array of messages) pub conversation_history: Option<serde_json::Value>, - /// Task ID whose worktree this task shares. When set, this task reuses the supervisor's - /// worktree instead of creating its own, and should NOT have its worktree deleted during cleanup. - pub supervisor_worktree_task_id: Option<Uuid>, /// Directive this task belongs to (for directive-driven tasks) pub directive_id: Option<Uuid>, /// Directive step this task executes @@ -935,87 +887,8 @@ pub struct TaskOutputResponse { pub task_id: Uuid, } -// ============================================================================= -// Mesh Chat History Types -// ============================================================================= - -/// Mesh chat conversation for persisting history -#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct MeshChatConversation { - pub id: Uuid, - pub owner_id: Uuid, - pub name: Option<String>, - pub is_active: bool, - pub created_at: DateTime<Utc>, - pub updated_at: DateTime<Utc>, -} - -/// Individual message in a mesh chat conversation -#[derive(Debug, Clone, FromRow, Serialize, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct MeshChatMessageRecord { - pub id: Uuid, - pub conversation_id: Uuid, - pub role: String, - pub content: String, - pub context_type: String, - pub context_task_id: Option<Uuid>, - /// Tool calls made during this message (JSON, nullable) - pub tool_calls: Option<serde_json::Value>, - /// Pending questions requiring user response (JSON, nullable) - pub pending_questions: Option<serde_json::Value>, - pub created_at: DateTime<Utc>, -} - -/// Response for chat history endpoint -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct MeshChatHistoryResponse { - pub conversation_id: Uuid, - pub messages: Vec<MeshChatMessageRecord>, -} - -// ============================================================================= -// Contract Chat History Types -// ============================================================================= - -/// Conversation thread for contract chat (scoped to a specific contract) -#[derive(Debug, Clone, FromRow, Serialize, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ContractChatConversation { - pub id: Uuid, - pub contract_id: Uuid, - pub owner_id: Uuid, - pub name: Option<String>, - pub is_active: bool, - pub created_at: DateTime<Utc>, - pub updated_at: DateTime<Utc>, -} - -/// Individual message in a contract chat conversation -#[derive(Debug, Clone, FromRow, Serialize, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ContractChatMessageRecord { - pub id: Uuid, - pub conversation_id: Uuid, - pub role: String, - pub content: String, - /// Tool calls made during this message (JSON, nullable) - pub tool_calls: Option<serde_json::Value>, - /// Pending questions requiring user response (JSON, nullable) - pub pending_questions: Option<serde_json::Value>, - pub created_at: DateTime<Utc>, -} - -/// Response for contract chat history endpoint -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ContractChatHistoryResponse { - pub contract_id: Uuid, - pub conversation_id: Uuid, - pub messages: Vec<ContractChatMessageRecord>, -} +// (MeshChat* + ContractChat* types removed alongside their dead +// tables/handlers — see migration 20260517000000.) // ============================================================================= // Merge API Types @@ -1120,772 +993,6 @@ pub struct MergeCompleteCheckResponse { pub skipped_count: u32, } -// ============================================================================= -// Contract Type Templates (User-defined) -// ============================================================================= - -/// A phase definition within a contract template -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct PhaseDefinition { - /// Phase identifier (e.g., "research", "plan", "execute") - pub id: String, - /// Display name for the phase - pub name: String, - /// Order in the workflow (0-indexed) - pub order: i32, -} - -/// A deliverable definition within a phase -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct DeliverableDefinition { - /// Deliverable identifier (e.g., "plan-document", "pull-request") - pub id: String, - /// Display name for the deliverable - pub name: String, - /// Priority: "required", "recommended", or "optional" - #[serde(default = "default_priority")] - pub priority: String, -} - -fn default_priority() -> String { - "required".to_string() -} - -/// Phase configuration stored on a contract (copied from template at creation) -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct PhaseConfig { - /// Ordered list of phases in the workflow - pub phases: Vec<PhaseDefinition>, - /// Default starting phase - pub default_phase: String, - /// Deliverables per phase: { "phase_id": [deliverables] } - #[serde(default)] - pub deliverables: std::collections::HashMap<String, Vec<DeliverableDefinition>>, -} - -/// Contract type template record from the database -#[derive(Debug, Clone, FromRow, Serialize, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ContractTypeTemplateRecord { - pub id: Uuid, - pub owner_id: Uuid, - pub name: String, - pub description: Option<String>, - #[sqlx(json)] - pub phases: Vec<PhaseDefinition>, - pub default_phase: String, - #[sqlx(json)] - pub deliverables: Option<std::collections::HashMap<String, Vec<DeliverableDefinition>>>, - pub version: i32, - pub created_at: DateTime<Utc>, - pub updated_at: DateTime<Utc>, -} - -/// Request to create a new contract type template -#[derive(Debug, Clone, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct CreateTemplateRequest { - pub name: String, - pub description: Option<String>, - pub phases: Vec<PhaseDefinition>, - pub default_phase: String, - pub deliverables: Option<std::collections::HashMap<String, Vec<DeliverableDefinition>>>, -} - -/// Request to update a contract type template -#[derive(Debug, Clone, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct UpdateTemplateRequest { - pub name: Option<String>, - pub description: Option<String>, - pub phases: Option<Vec<PhaseDefinition>>, - pub default_phase: Option<String>, - pub deliverables: Option<std::collections::HashMap<String, Vec<DeliverableDefinition>>>, - /// Version for optimistic locking - pub version: Option<i32>, -} - -/// Summary of a contract type template for list views -#[derive(Debug, Clone, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ContractTypeTemplateSummary { - pub id: Uuid, - pub name: String, - pub description: Option<String>, - pub phases: Vec<PhaseDefinition>, - pub default_phase: String, - pub is_builtin: bool, - pub version: i32, - pub created_at: DateTime<Utc>, -} - -// ============================================================================= -// Contract Types -// ============================================================================= - -/// Contract type determines the workflow and required documents -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)] -#[serde(rename_all = "lowercase")] -pub enum ContractType { - /// Simple Plan -> Execute workflow (default) - /// - Plan phase: requires a "Plan" document - /// - Execute phase: requires a "PR" document - Simple, - /// Specification-based development with TDD - /// - Research: requires "Research Notes" document - /// - Specify: requires "Requirements Document" - /// - Plan: requires "Plan" document - /// - Execute: requires "PR" document - /// - Review: requires "Release Notes" document - Specification, - /// Execute-only workflow with no deliverables - /// - Only has "execute" phase - /// - NO deliverables at all - just execute tasks directly - Execute, -} - -impl Default for ContractType { - fn default() -> Self { - ContractType::Simple - } -} - -impl std::fmt::Display for ContractType { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - ContractType::Simple => write!(f, "simple"), - ContractType::Specification => write!(f, "specification"), - ContractType::Execute => write!(f, "execute"), - } - } -} - -impl std::str::FromStr for ContractType { - type Err = String; - - fn from_str(s: &str) -> Result<Self, Self::Err> { - match s.to_lowercase().as_str() { - "simple" => Ok(ContractType::Simple), - "specification" => Ok(ContractType::Specification), - "execute" => Ok(ContractType::Execute), - _ => Err(format!("Unknown contract type: {}", s)), - } - } -} - -/// Contract phase for workflow progression -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)] -#[serde(rename_all = "lowercase")] -pub enum ContractPhase { - Research, - Specify, - Plan, - Execute, - Review, -} - -impl std::fmt::Display for ContractPhase { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - ContractPhase::Research => write!(f, "research"), - ContractPhase::Specify => write!(f, "specify"), - ContractPhase::Plan => write!(f, "plan"), - ContractPhase::Execute => write!(f, "execute"), - ContractPhase::Review => write!(f, "review"), - } - } -} - -impl std::str::FromStr for ContractPhase { - type Err = String; - - fn from_str(s: &str) -> Result<Self, Self::Err> { - match s.to_lowercase().as_str() { - "research" => Ok(ContractPhase::Research), - "specify" => Ok(ContractPhase::Specify), - "plan" => Ok(ContractPhase::Plan), - "execute" => Ok(ContractPhase::Execute), - "review" => Ok(ContractPhase::Review), - _ => Err(format!("Unknown contract phase: {}", s)), - } - } -} - -/// Contract status -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)] -#[serde(rename_all = "lowercase")] -pub enum ContractStatus { - Active, - Completed, - Archived, -} - -impl std::fmt::Display for ContractStatus { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - ContractStatus::Active => write!(f, "active"), - ContractStatus::Completed => write!(f, "completed"), - ContractStatus::Archived => write!(f, "archived"), - } - } -} - -impl std::str::FromStr for ContractStatus { - type Err = String; - - fn from_str(s: &str) -> Result<Self, Self::Err> { - match s.to_lowercase().as_str() { - "active" => Ok(ContractStatus::Active), - "completed" => Ok(ContractStatus::Completed), - "archived" => Ok(ContractStatus::Archived), - _ => Err(format!("Unknown contract status: {}", s)), - } - } -} - -/// Repository source type -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)] -#[serde(rename_all = "lowercase")] -pub enum RepositorySourceType { - /// Existing remote repo (GitHub, GitLab, etc) - Remote, - /// Existing local repo - Local, - /// New repo created/managed by Makima daemon - Managed, -} - -impl std::fmt::Display for RepositorySourceType { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - RepositorySourceType::Remote => write!(f, "remote"), - RepositorySourceType::Local => write!(f, "local"), - RepositorySourceType::Managed => write!(f, "managed"), - } - } -} - -impl std::str::FromStr for RepositorySourceType { - type Err = String; - - fn from_str(s: &str) -> Result<Self, Self::Err> { - match s.to_lowercase().as_str() { - "remote" => Ok(RepositorySourceType::Remote), - "local" => Ok(RepositorySourceType::Local), - "managed" => Ok(RepositorySourceType::Managed), - _ => Err(format!("Unknown repository source type: {}", s)), - } - } -} - -/// Repository status (for managed repos) -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)] -#[serde(rename_all = "lowercase")] -pub enum RepositoryStatus { - /// Repo is usable - Ready, - /// Waiting for daemon to create - Pending, - /// Daemon is creating the repo - Creating, - /// Creation failed - Failed, -} - -impl std::fmt::Display for RepositoryStatus { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - RepositoryStatus::Ready => write!(f, "ready"), - RepositoryStatus::Pending => write!(f, "pending"), - RepositoryStatus::Creating => write!(f, "creating"), - RepositoryStatus::Failed => write!(f, "failed"), - } - } -} - -impl std::str::FromStr for RepositoryStatus { - type Err = String; - - fn from_str(s: &str) -> Result<Self, Self::Err> { - match s.to_lowercase().as_str() { - "ready" => Ok(RepositoryStatus::Ready), - "pending" => Ok(RepositoryStatus::Pending), - "creating" => Ok(RepositoryStatus::Creating), - "failed" => Ok(RepositoryStatus::Failed), - _ => Err(format!("Unknown repository status: {}", s)), - } - } -} - -/// Contract record from the database -#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct Contract { - pub id: Uuid, - pub owner_id: Uuid, - pub name: String, - pub description: Option<String>, - /// Contract type: "simple" or "specification" - pub contract_type: String, - pub phase: String, - pub status: String, - /// The long-running supervisor task that orchestrates this contract - #[serde(skip_serializing_if = "Option::is_none")] - pub supervisor_task_id: Option<Uuid>, - /// Whether tasks for this contract should run in autonomous loop mode. - /// When enabled, tasks will automatically restart with --continue if they exit - /// without a COMPLETION_GATE indicating ready: true. - #[serde(default)] - pub autonomous_loop: bool, - /// Whether to wait for user confirmation before progressing to the next phase. - /// When enabled, the supervisor will pause and ask the user to review and approve - /// phase outputs (like plans, requirements, etc.) before continuing. - #[serde(default)] - pub phase_guard: bool, - /// Completed deliverables per phase. - /// Structure: { "plan": ["plan-document"], "execute": ["pull-request"] } - #[sqlx(json)] - #[serde(default)] - pub completed_deliverables: serde_json::Value, - /// Whether this contract operates in local-only mode. - /// When enabled, automatic completion actions (branch, merge, pr) are skipped, - /// allowing users to manually handle code changes via patch files or other means. - #[serde(default)] - pub local_only: bool, - /// Whether to auto-merge to target branch locally when local_only mode is enabled. - /// When both local_only and auto_merge_local are true, completed task changes will be - /// automatically merged to the master/main branch locally (without pushing or creating PRs). - #[serde(default)] - pub auto_merge_local: bool, - /// Phase configuration copied from template at contract creation (raw JSON). - /// When present, this overrides the built-in contract type phases. - /// Use `get_phase_config()` to get the parsed PhaseConfig. - #[serde(skip_serializing_if = "Option::is_none")] - pub phase_config: Option<serde_json::Value>, - pub version: i32, - pub created_at: DateTime<Utc>, - pub updated_at: DateTime<Utc>, -} - -impl Contract { - /// Parse contract_type string to ContractType enum - pub fn contract_type_enum(&self) -> Result<ContractType, String> { - self.contract_type.parse() - } - - /// Parse phase string to ContractPhase enum - pub fn phase_enum(&self) -> Result<ContractPhase, String> { - self.phase.parse() - } - - /// Parse status string to ContractStatus enum - pub fn status_enum(&self) -> Result<ContractStatus, String> { - self.status.parse() - } - - /// Get valid phase IDs for this contract (as strings) - pub fn valid_phase_ids(&self) -> Vec<String> { - // Check phase_config first (for custom templates) - if let Some(config) = self.get_phase_config() { - let mut phases: Vec<_> = config.phases.iter().collect(); - phases.sort_by_key(|p| p.order); - return phases.iter().map(|p| p.id.clone()).collect(); - } - - // Fall back to built-in contract types - match self.contract_type.as_str() { - "simple" => vec!["plan".to_string(), "execute".to_string()], - "specification" => vec![ - "research".to_string(), - "specify".to_string(), - "plan".to_string(), - "execute".to_string(), - "review".to_string(), - ], - "execute" => vec!["execute".to_string()], - _ => vec!["plan".to_string(), "execute".to_string()], - } - } - - /// Get valid phases for this contract type (as ContractPhase enums) - /// Note: For custom templates with non-standard phases, this only returns - /// phases that map to the ContractPhase enum. - pub fn valid_phases(&self) -> Vec<ContractPhase> { - self.valid_phase_ids() - .iter() - .filter_map(|id| id.parse::<ContractPhase>().ok()) - .collect() - } - - /// Get the initial phase ID for this contract type (as string) - pub fn initial_phase_id(&self) -> String { - // Check phase_config first (for custom templates) - if let Some(config) = self.get_phase_config() { - return config.default_phase.clone(); - } - - // Fall back to built-in contract types - match self.contract_type.as_str() { - "specification" => "research".to_string(), - "execute" => "execute".to_string(), - _ => "plan".to_string(), - } - } - - /// Get the initial phase for this contract type (as ContractPhase enum) - pub fn initial_phase(&self) -> ContractPhase { - self.initial_phase_id() - .parse() - .unwrap_or(ContractPhase::Plan) - } - - /// Get the terminal phase ID for this contract type (as string) - pub fn terminal_phase_id(&self) -> String { - // Check phase_config first (for custom templates) - if let Some(config) = self.get_phase_config() { - // Last phase in sorted order is the terminal phase - let mut phases: Vec<_> = config.phases.iter().collect(); - phases.sort_by_key(|p| p.order); - if let Some(last) = phases.last() { - return last.id.clone(); - } - } - - // Fall back to built-in contract types - match self.contract_type.as_str() { - "specification" => "review".to_string(), - _ => "execute".to_string(), - } - } - - /// Get the terminal phase for this contract type (phase where contract can be completed) - pub fn terminal_phase(&self) -> ContractPhase { - self.terminal_phase_id() - .parse() - .unwrap_or(ContractPhase::Execute) - } - - /// Check if a phase ID is valid for this contract - pub fn is_valid_phase(&self, phase_id: &str) -> bool { - self.valid_phase_ids().contains(&phase_id.to_string()) - } - - /// Get the phase configuration for custom templates - pub fn get_phase_config(&self) -> Option<PhaseConfig> { - self.phase_config - .as_ref() - .and_then(|v| serde_json::from_value(v.clone()).ok()) - } - - /// Get completed deliverable IDs for a specific phase - pub fn get_completed_deliverables(&self, phase: &str) -> Vec<String> { - self.completed_deliverables - .get(phase) - .and_then(|v| v.as_array()) - .map(|arr| { - arr.iter() - .filter_map(|v| v.as_str().map(String::from)) - .collect() - }) - .unwrap_or_default() - } - - /// Check if a specific deliverable is marked as complete for a phase - pub fn is_deliverable_complete(&self, phase: &str, deliverable_id: &str) -> bool { - self.get_completed_deliverables(phase) - .contains(&deliverable_id.to_string()) - } -} - -/// Contract repository record from the database -#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ContractRepository { - pub id: Uuid, - pub contract_id: Uuid, - pub name: String, - pub repository_url: Option<String>, - pub local_path: Option<String>, - pub source_type: String, - pub status: String, - pub is_primary: bool, - pub created_at: DateTime<Utc>, - pub updated_at: DateTime<Utc>, -} - -impl ContractRepository { - /// Parse source_type string to RepositorySourceType enum - pub fn source_type_enum(&self) -> Result<RepositorySourceType, String> { - self.source_type.parse() - } - - /// Parse status string to RepositoryStatus enum - pub fn status_enum(&self) -> Result<RepositoryStatus, String> { - self.status.parse() - } -} - -/// Summary of a contract for list views -#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ContractSummary { - pub id: Uuid, - pub name: String, - pub description: Option<String>, - /// Contract type: "simple" or "specification" - pub contract_type: String, - pub phase: String, - pub status: String, - /// Supervisor task ID for contract orchestration - pub supervisor_task_id: Option<Uuid>, - /// When true, tasks do not auto-execute completion actions and work stays in worktrees. - #[serde(default)] - pub local_only: bool, - /// When true with local_only, automatically merge completed tasks to target branch locally. - #[serde(default)] - pub auto_merge_local: bool, - pub file_count: i64, - pub task_count: i64, - pub repository_count: i64, - pub version: i32, - pub created_at: DateTime<Utc>, -} - -/// Contract with all relations for detail view -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ContractWithRelations { - #[serde(flatten)] - pub contract: Contract, - pub repositories: Vec<ContractRepository>, - pub files: Vec<FileSummary>, - pub tasks: Vec<TaskSummary>, -} - -/// Response for contract list endpoint -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ContractListResponse { - pub contracts: Vec<ContractSummary>, - pub total: i64, -} - -/// Request payload for creating a new contract -#[derive(Debug, Clone, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct CreateContractRequest { - /// Name of the contract - pub name: String, - /// Optional description - pub description: Option<String>, - /// Contract type: "simple" (default), "specification", "execute", or a custom template name. - /// For built-in types: - /// - simple: Plan -> Execute workflow - /// - specification: Research -> Specify -> Plan -> Execute -> Review - /// - execute: Execute only - /// For custom templates, use the template name or provide template_id. - #[serde(default)] - pub contract_type: Option<String>, - /// UUID of a custom template to use. If provided, this takes precedence over contract_type. - /// The template's phase configuration will be copied to the contract. - #[serde(default)] - pub template_id: Option<Uuid>, - /// Initial phase to start in (defaults based on contract_type or template) - /// - simple: defaults to "plan" - /// - specification: defaults to "research" - #[serde(default)] - pub initial_phase: Option<String>, - /// Enable autonomous loop mode for tasks in this contract. - /// When enabled, tasks automatically restart with --continue if they exit - /// without a COMPLETION_GATE indicating ready: true. - #[serde(default)] - pub autonomous_loop: Option<bool>, - /// Enable phase guard mode for this contract. - /// When enabled, the supervisor will pause and ask the user to review and approve - /// phase outputs before progressing to the next phase. - #[serde(default)] - pub phase_guard: Option<bool>, - /// Enable local-only mode for this contract. - /// When enabled, automatic completion actions (branch, merge, pr) are skipped, - /// allowing users to manually handle code changes via patch files or other means. - #[serde(default)] - pub local_only: Option<bool>, - /// Enable auto-merge to target branch locally when local_only mode is enabled. - /// When both local_only and auto_merge_local are true, completed task changes will be - /// automatically merged to the master/main branch locally (without pushing or creating PRs). - #[serde(default)] - pub auto_merge_local: Option<bool>, -} - -/// Request payload for updating a contract -#[derive(Debug, Default, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct UpdateContractRequest { - pub name: Option<String>, - pub description: Option<String>, - pub phase: Option<String>, - pub status: Option<String>, - /// Supervisor task ID for contract orchestration - #[serde(skip_serializing_if = "Option::is_none")] - pub supervisor_task_id: Option<Uuid>, - /// Enable or disable autonomous loop mode for tasks in this contract. - #[serde(default)] - pub autonomous_loop: Option<bool>, - /// Enable or disable phase guard mode for this contract. - /// When enabled, the supervisor will pause and ask the user to review and approve - /// phase outputs before progressing to the next phase. - #[serde(default)] - pub phase_guard: Option<bool>, - /// Enable or disable local-only mode for this contract. - /// When enabled, automatic completion actions (branch, merge, pr) are skipped, - /// allowing users to manually handle code changes via patch files or other means. - #[serde(default)] - pub local_only: Option<bool>, - /// Enable or disable auto-merge to target branch locally when local_only mode is enabled. - /// When both local_only and auto_merge_local are true, completed task changes will be - /// automatically merged to the master/main branch locally (without pushing or creating PRs). - #[serde(default)] - pub auto_merge_local: Option<bool>, - /// Version for optimistic locking - pub version: Option<i32>, -} - -/// Request to add a remote repository to a contract -#[derive(Debug, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct AddRemoteRepositoryRequest { - pub name: String, - pub repository_url: String, - #[serde(default)] - pub is_primary: bool, -} - -/// Request to add a local repository to a contract -#[derive(Debug, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct AddLocalRepositoryRequest { - pub name: String, - pub local_path: String, - #[serde(default)] - pub is_primary: bool, -} - -/// Request to create a managed repository (daemon will create it) -#[derive(Debug, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct CreateManagedRepositoryRequest { - pub name: String, - #[serde(default)] - pub is_primary: bool, -} - -/// Request to change contract phase -#[derive(Debug, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ChangePhaseRequest { - pub phase: String, - /// If phase_guard is enabled, this must be true to confirm the transition. - /// If not provided or false, returns phase deliverables for review. - #[serde(default)] - pub confirmed: Option<bool>, - /// User feedback for changes (used when not confirming) - #[serde(skip_serializing_if = "Option::is_none")] - pub feedback: Option<String>, - /// Expected version for optimistic locking. If provided, the phase change - /// will only succeed if the current contract version matches. - #[serde(skip_serializing_if = "Option::is_none")] - pub expected_version: Option<i32>, -} - -/// Result of a phase change operation, supporting explicit conflict detection. -#[derive(Debug, Clone)] -pub enum PhaseChangeResult { - /// Phase change succeeded, returning the updated contract - Success(Contract), - /// Version conflict: the contract was modified concurrently - VersionConflict { - /// The version the client expected - expected: i32, - /// The actual current version in the database - actual: i32, - /// The current phase of the contract - current_phase: String, - }, - /// Validation failed (e.g., invalid phase transition) - ValidationFailed { - /// Human-readable reason for the failure - reason: String, - /// List of missing requirements for the phase transition - missing_requirements: Vec<String>, - }, - /// The caller is not authorized to change this contract's phase - Unauthorized, - /// The contract was not found - NotFound, -} - -/// Response for phase transition when phase_guard is enabled -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct PhaseTransitionRequest { - /// Current contract phase - pub current_phase: String, - /// Requested next phase - pub next_phase: String, - /// Summary of phase deliverables/outputs - pub deliverables_summary: String, - /// List of files created in this phase - pub phase_files: Vec<PhaseFileInfo>, - /// List of completed tasks in this phase - pub phase_tasks: Vec<PhaseTaskInfo>, - /// Whether user confirmation is required - pub requires_confirmation: bool, - /// Unique ID for tracking this transition request - pub transition_id: String, -} - -/// File info for phase transition review -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct PhaseFileInfo { - pub id: Uuid, - pub name: String, - pub description: Option<String>, -} - -/// Task info for phase transition review -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct PhaseTaskInfo { - pub id: Uuid, - pub name: String, - pub status: String, -} - -/// Contract event record from the database -#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ContractEvent { - pub id: Uuid, - pub contract_id: Uuid, - pub event_type: String, - pub previous_phase: Option<String>, - pub new_phase: Option<String>, - #[sqlx(json)] - pub event_data: Option<serde_json::Value>, - pub created_at: DateTime<Utc>, -} - -/// Response for contract events list endpoint -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ContractEventListResponse { - pub events: Vec<ContractEvent>, - pub total: i64, -} // ============================================================================ // Task Checkpoints (for git checkpoint tracking) @@ -2173,11 +1280,9 @@ pub struct ConversationSnapshot { pub struct HistoryEvent { pub id: Uuid, pub owner_id: Uuid, - pub contract_id: Option<Uuid>, pub task_id: Option<Uuid>, pub event_type: String, pub event_subtype: Option<String>, - pub phase: Option<String>, #[sqlx(json)] pub event_data: serde_json::Value, pub created_at: DateTime<Utc>, @@ -2221,7 +1326,6 @@ pub struct ToolCallInfo { #[derive(Debug, Deserialize, ToSchema, Default)] #[serde(rename_all = "camelCase")] pub struct HistoryQueryFilters { - pub phase: Option<String>, pub event_types: Option<Vec<String>>, #[serde(default, deserialize_with = "flexible_datetime::deserialize")] pub from: Option<DateTime<Utc>>, @@ -2766,11 +1870,6 @@ pub struct DirectiveStep { /// Status: pending, ready, running, completed, failed, skipped pub status: String, pub task_id: Option<Uuid>, - /// Optional contract ID for contract-backed execution. - pub contract_id: Option<Uuid>, - /// Optional contract type (e.g. "simple", "specification", "execute"). - /// When set, the orchestrator creates a contract instead of a standalone task. - pub contract_type: Option<String>, pub order_index: i32, pub generation: i32, pub started_at: Option<DateTime<Utc>>, diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index ee4b561..d453f99 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -6,21 +6,17 @@ use sqlx::PgPool; use uuid::Uuid; use super::models::{ - CheckpointPatch, CheckpointPatchInfo, Contract, ContractChatConversation, - ContractChatMessageRecord, ContractEvent, ContractRepository, ContractSummary, - ContractTypeTemplateRecord, ConversationMessage, ConversationSnapshot, - CreateContractRequest, CreateFileRequest, CreateTaskRequest, - CreateTemplateRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, - DeliverableDefinition, Directive, DirectiveDocument, DirectiveStep, DirectiveSummary, + CheckpointPatch, CheckpointPatchInfo, ConversationMessage, ConversationSnapshot, + CreateFileRequest, CreateTaskRequest, + Daemon, DaemonTaskAssignment, DaemonWithCapacity, + Directive, DirectiveDocument, DirectiveStep, DirectiveSummary, CreateDirectiveRequest, CreateDirectiveStepRequest, UpdateDirectiveRequest, UpdateDirectiveStepRequest, CreateOrderRequest, Order, UpdateOrderRequest, CreateDirectiveOrderGroupRequest, DirectiveOrderGroup, UpdateDirectiveOrderGroupRequest, File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters, - MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult, PhaseConfig, - PhaseDefinition, SupervisorHeartbeatRecord, SupervisorState, - Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest, - UpdateFileRequest, UpdateTaskRequest, UpdateTemplateRequest, + Task, TaskCheckpoint, TaskEvent, TaskSummary, + UpdateFileRequest, UpdateTaskRequest, }; /// Repository error types. @@ -89,7 +85,7 @@ pub async fn create_file(pool: &PgPool, req: InternalCreateFileRequest) -> Resul r#" INSERT INTO files (name, description, transcript, location, summary, body) VALUES ($1, $2, $3, $4, NULL, $5) - RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at + RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(&name) @@ -105,7 +101,7 @@ pub async fn create_file(pool: &PgPool, req: InternalCreateFileRequest) -> Resul pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Error> { sqlx::query_as::<_, File>( r#" - SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at + SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files WHERE id = $1 "#, @@ -119,7 +115,7 @@ pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Err pub async fn list_files(pool: &PgPool) -> Result<Vec<File>, sqlx::Error> { sqlx::query_as::<_, File>( r#" - SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at + SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files ORDER BY created_at DESC "#, @@ -171,7 +167,7 @@ pub async fn update_file( UPDATE files SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW() WHERE id = $1 AND version = $7 - RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at + RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) @@ -190,7 +186,7 @@ pub async fn update_file( UPDATE files SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW() WHERE id = $1 - RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at + RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) @@ -246,7 +242,6 @@ pub async fn count_files(pool: &PgPool) -> Result<i64, sqlx::Error> { // ============================================================================= /// Create a new file record for a specific owner. -/// Files must belong to a contract - the contract_id is required and the phase is looked up. pub async fn create_file_for_owner( pool: &PgPool, owner_id: Uuid, @@ -254,32 +249,16 @@ pub async fn create_file_for_owner( ) -> Result<File, sqlx::Error> { let name = req.name.unwrap_or_else(generate_default_name); let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default(); - // Use body from request (may be empty or contain template elements) let body_json = serde_json::to_value(&req.body).unwrap_or_default(); - // Use provided contract_phase, or look up from contract's current phase - let contract_phase: Option<String> = if req.contract_phase.is_some() { - req.contract_phase - } else { - sqlx::query_scalar( - "SELECT phase FROM contracts WHERE id = $1 AND owner_id = $2", - ) - .bind(req.contract_id) - .bind(owner_id) - .fetch_optional(pool) - .await? - }; - sqlx::query_as::<_, File>( r#" - INSERT INTO files (owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, repo_file_path) - VALUES ($1, $2, $3, $4, $5, $6, $7, NULL, $8, $9) - RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at + INSERT INTO files (owner_id, name, description, transcript, location, summary, body, repo_file_path) + VALUES ($1, $2, $3, $4, $5, NULL, $6, $7) + RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(owner_id) - .bind(req.contract_id) - .bind(&contract_phase) .bind(&name) .bind(&req.description) .bind(&transcript_json) @@ -298,7 +277,7 @@ pub async fn get_file_for_owner( ) -> Result<Option<File>, sqlx::Error> { sqlx::query_as::<_, File>( r#" - SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at + SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files WHERE id = $1 AND owner_id = $2 "#, @@ -313,7 +292,7 @@ pub async fn get_file_for_owner( pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<File>, sqlx::Error> { sqlx::query_as::<_, File>( r#" - SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at + SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files WHERE owner_id = $1 ORDER BY created_at DESC @@ -324,13 +303,10 @@ pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<F .await } -/// Database row type for file summary with contract info +/// Database row type for file summary #[derive(Debug, sqlx::FromRow)] struct FileSummaryRow { id: Uuid, - contract_id: Option<Uuid>, - contract_name: Option<String>, - contract_phase: Option<String>, name: String, description: Option<String>, #[sqlx(json)] @@ -342,7 +318,7 @@ struct FileSummaryRow { updated_at: chrono::DateTime<chrono::Utc>, } -/// List file summaries for an owner with contract info (joined). +/// List file summaries for an owner. pub async fn list_file_summaries_for_owner( pool: &PgPool, owner_id: Uuid, @@ -350,11 +326,9 @@ pub async fn list_file_summaries_for_owner( let rows = sqlx::query_as::<_, FileSummaryRow>( r#" SELECT - f.id, f.contract_id, c.name as contract_name, f.contract_phase, - f.name, f.description, f.transcript, f.version, + f.id, f.name, f.description, f.transcript, f.version, f.repo_file_path, f.repo_sync_status, f.created_at, f.updated_at FROM files f - LEFT JOIN contracts c ON f.contract_id = c.id WHERE f.owner_id = $1 ORDER BY f.created_at DESC "#, @@ -373,9 +347,6 @@ pub async fn list_file_summaries_for_owner( .fold(0.0_f32, f32::max); FileSummary { id: row.id, - contract_id: row.contract_id, - contract_name: row.contract_name, - contract_phase: row.contract_phase, name: row.name, description: row.description, transcript_count: row.transcript.len(), @@ -429,7 +400,7 @@ pub async fn update_file_for_owner( UPDATE files SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() WHERE id = $1 AND owner_id = $2 AND version = $8 - RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at + RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) @@ -449,7 +420,7 @@ pub async fn update_file_for_owner( UPDATE files SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() WHERE id = $1 AND owner_id = $2 - RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at + RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) @@ -657,34 +628,24 @@ pub async fn count_file_versions(pool: &PgPool, file_id: Uuid) -> Result<i64, sq /// Task spawning is now controlled by supervisors at the application level. /// Depth is no longer constrained in the database. pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, sqlx::Error> { - // Calculate depth and inherit settings from parent if applicable - let (depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = + // Calculate depth + inherit settings from parent if applicable. + let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = if let Some(parent_id) = req.parent_task_id { - // Fetch parent task to get depth and inherit settings let parent = get_task(pool, parent_id).await? .ok_or_else(|| sqlx::Error::RowNotFound)?; let new_depth = parent.depth + 1; - - // Subtasks inherit contract_id from parent (or use request contract_id if parent has none) - let contract_id = parent.contract_id.or(req.contract_id); - - // Inherit repo settings if not provided let repo_url = req.repository_url.clone().or(parent.repository_url); let base_branch = req.base_branch.clone().or(parent.base_branch); let target_branch = req.target_branch.clone().or(parent.target_branch); let merge_mode = req.merge_mode.clone().or(parent.merge_mode); let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path); - // NOTE: completion_action is NOT inherited - subtasks should not auto-merge. - // The supervisor integrates subtask work from their worktrees. let completion_action = req.completion_action.clone(); - (new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) + (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) } else { - // Top-level task: depth 0, use contract_id from request (may be None for branched tasks) ( 0, - req.contract_id, req.repository_url.clone(), req.base_branch.clone(), req.target_branch.clone(), @@ -699,23 +660,21 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, sqlx::query_as::<_, Task>( r#" INSERT INTO tasks ( - contract_id, parent_task_id, depth, name, description, plan, priority, - is_supervisor, repository_url, base_branch, target_branch, merge_mode, + parent_task_id, depth, name, description, plan, priority, + repository_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action, continue_from_task_id, copy_files, - branched_from_task_id, conversation_state, supervisor_worktree_task_id + branched_from_task_id, conversation_state ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) RETURNING * "#, ) - .bind(contract_id) .bind(req.parent_task_id) .bind(depth) .bind(&req.name) .bind(&req.description) .bind(&req.plan) .bind(req.priority) - .bind(req.is_supervisor) .bind(&repo_url) .bind(&base_branch) .bind(&target_branch) @@ -726,7 +685,6 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, .bind(©_files_json) .bind(&req.branched_from_task_id) .bind(&req.conversation_history) - .bind(&req.supervisor_worktree_task_id) .fetch_one(pool) .await } @@ -751,14 +709,12 @@ pub async fn list_tasks(pool: &PgPool) -> Result<Vec<TaskSummary>, sqlx::Error> sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, - c.status as contract_status, + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, - t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at + t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t - LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id IS NULL AND COALESCE(t.hidden, false) = false ORDER BY t.priority DESC, t.created_at DESC "#, @@ -772,14 +728,12 @@ pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSum sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, - c.status as contract_status, + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, - t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at + t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t - LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id = $1 ORDER BY t.priority DESC, t.created_at DESC "#, @@ -790,73 +744,6 @@ pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSum } /// List all tasks in a contract (for supervisor tree view). -pub async fn list_tasks_by_contract( - pool: &PgPool, - contract_id: Uuid, - owner_id: Uuid, -) -> Result<Vec<Task>, sqlx::Error> { - sqlx::query_as::<_, Task>( - r#" - SELECT * FROM tasks - WHERE contract_id = $1 AND owner_id = $2 - ORDER BY is_supervisor DESC, depth ASC, created_at ASC - "#, - ) - .bind(contract_id) - .bind(owner_id) - .fetch_all(pool) - .await -} - -/// Get pending tasks for a contract (non-supervisor tasks only). -/// Includes tasks that were interrupted (retry candidates). -/// Prioritizes interrupted tasks and excludes those that exceeded max_retries. -pub async fn get_pending_tasks_for_contract( - pool: &PgPool, - contract_id: Uuid, - owner_id: Uuid, -) -> Result<Vec<Task>, sqlx::Error> { - sqlx::query_as::<_, Task>( - r#" - SELECT t.* FROM tasks t - WHERE t.contract_id = $1 AND t.owner_id = $2 - AND t.status = 'pending' - AND t.retry_count < t.max_retries - AND t.is_supervisor = false - ORDER BY - t.interrupted_at DESC NULLS LAST, - t.priority DESC, - t.created_at ASC - "#, - ) - .bind(contract_id) - .bind(owner_id) - .fetch_all(pool) - .await -} - -/// Get all contracts that have pending tasks awaiting retry. -/// Returns tuples of (contract_id, owner_id) for contracts with retryable tasks. -pub async fn get_all_pending_task_contracts( - pool: &PgPool, -) -> Result<Vec<(Uuid, Uuid)>, sqlx::Error> { - sqlx::query_as::<_, (Uuid, Uuid)>( - r#" - SELECT DISTINCT t.contract_id, t.owner_id - FROM tasks t - WHERE t.contract_id IS NOT NULL - AND t.status = 'pending' - AND t.retry_count < t.max_retries - AND t.is_supervisor = false - ORDER BY t.owner_id, t.contract_id - "#, - ) - .fetch_all(pool) - .await -} - -/// Mark a task as pending for retry after daemon failure. -/// Increments retry count and adds the failed daemon to exclusion list. pub async fn mark_task_for_retry( pool: &PgPool, task_id: Uuid, @@ -1061,16 +948,13 @@ pub async fn create_task_for_owner( owner_id: Uuid, req: CreateTaskRequest, ) -> Result<Task, sqlx::Error> { - // Calculate depth and inherit settings from parent if applicable - let (depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = + // Calculate depth + inherit settings from parent if applicable. + let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = if let Some(parent_id) = req.parent_task_id { - // Fetch parent task to get depth and inherit settings (must belong to same owner) let parent = get_task_for_owner(pool, parent_id, owner_id).await? .ok_or_else(|| sqlx::Error::RowNotFound)?; let new_depth = parent.depth + 1; - - // Validate max depth if new_depth >= 2 { return Err(sqlx::Error::Protocol(format!( "Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.", @@ -1078,25 +962,17 @@ pub async fn create_task_for_owner( ))); } - // Subtasks inherit contract_id from parent (or use request contract_id if parent has none) - let contract_id = parent.contract_id.or(req.contract_id); - - // Inherit repo settings if not provided let repo_url = req.repository_url.clone().or(parent.repository_url); let base_branch = req.base_branch.clone().or(parent.base_branch); let target_branch = req.target_branch.clone().or(parent.target_branch); let merge_mode = req.merge_mode.clone().or(parent.merge_mode); let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path); - // NOTE: completion_action is NOT inherited - subtasks should not auto-merge. - // The orchestrator integrates subtask work from their worktrees. let completion_action = req.completion_action.clone(); - (new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) + (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) } else { - // Top-level task: depth 0, use contract_id from request (may be None for branched tasks) ( 0, - req.contract_id, req.repository_url.clone(), req.base_branch.clone(), req.target_branch.clone(), @@ -1108,14 +984,11 @@ pub async fn create_task_for_owner( let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default()); - // Resolve the directive_document_id. Tasks plumbed through this builder - // currently have no way to specify a document explicitly (we don't want - // to widen `CreateTaskRequest` for this — every call site would have to - // change). Instead, when the task is directive-driven (directive_id is - // set) we attach it to that directive's most recently-updated active - // document so the task lands under that document's tasks/ subfolder in - // the sidebar. Resolution failures are non-fatal — the task still gets - // created with directive_document_id = NULL, matching legacy behaviour. + // Resolve directive_document_id from the directive's currently- + // active contract row (directive_documents table) so the task + // lands under the right tasks/ subfolder in the sidebar. Failures + // are non-fatal — the task is created with NULL document_id and + // the sidebar tolerates that. let directive_document_id = match req.directive_id { Some(directive_id) => resolve_active_document_for_directive(pool, directive_id) .await @@ -1126,25 +999,23 @@ pub async fn create_task_for_owner( sqlx::query_as::<_, Task>( r#" INSERT INTO tasks ( - owner_id, contract_id, parent_task_id, depth, name, description, plan, priority, - is_supervisor, repository_url, base_branch, target_branch, merge_mode, + owner_id, parent_task_id, depth, name, description, plan, priority, + repository_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action, continue_from_task_id, copy_files, - branched_from_task_id, conversation_state, supervisor_worktree_task_id, + branched_from_task_id, conversation_state, directive_id, directive_step_id, directive_document_id ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20) RETURNING * "#, ) .bind(owner_id) - .bind(contract_id) .bind(req.parent_task_id) .bind(depth) .bind(&req.name) .bind(&req.description) .bind(&req.plan) .bind(req.priority) - .bind(req.is_supervisor) .bind(&repo_url) .bind(&base_branch) .bind(&target_branch) @@ -1155,7 +1026,6 @@ pub async fn create_task_for_owner( .bind(©_files_json) .bind(&req.branched_from_task_id) .bind(&req.conversation_history) - .bind(&req.supervisor_worktree_task_id) .bind(&req.directive_id) .bind(&req.directive_step_id) .bind(&directive_document_id) @@ -1226,14 +1096,12 @@ pub async fn list_tasks_for_owner( sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, - c.status as contract_status, + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, - t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at + t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t - LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.owner_id = $1 AND t.parent_task_id IS NULL AND COALESCE(t.hidden, false) = false ORDER BY t.priority DESC, t.created_at DESC "#, @@ -1335,14 +1203,12 @@ pub async fn list_ephemeral_directive_tasks_for_owner( sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, - c.status as contract_status, + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, - t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at + t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t - LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.owner_id = $1 AND t.directive_id = $2 AND t.directive_step_id IS NULL @@ -1369,14 +1235,12 @@ pub async fn list_tmp_tasks_for_owner( sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, - c.status as contract_status, + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, - t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at + t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t - LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.owner_id = $1 AND t.directive_id = $2 AND t.parent_task_id IS NULL @@ -1399,14 +1263,12 @@ pub async fn list_subtasks_for_owner( sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, - c.status as contract_status, + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, - t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at + t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t - LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.owner_id = $1 AND t.parent_task_id = $2 ORDER BY t.priority DESC, t.created_at DESC "#, @@ -1920,14 +1782,12 @@ pub async fn list_sibling_tasks( sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, - c.status as contract_status, + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, - t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at + t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t - LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id = $1 AND t.id != $2 ORDER BY t.priority DESC, t.created_at DESC "#, @@ -1942,14 +1802,12 @@ pub async fn list_sibling_tasks( sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, - c.status as contract_status, + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, - t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at + t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at FROM tasks t - LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id IS NULL AND t.id != $1 ORDER BY t.priority DESC, t.created_at DESC "#, @@ -2121,1287 +1979,28 @@ pub async fn complete_task( Ok(task) } -// ============================================================================= -// Mesh Chat History Functions -// ============================================================================= - -/// Get or create the active conversation for an owner. -pub async fn get_or_create_active_conversation( - pool: &PgPool, - owner_id: Uuid, -) -> Result<MeshChatConversation, sqlx::Error> { - // Try to get existing active conversation for this owner - let existing = sqlx::query_as::<_, MeshChatConversation>( - r#" - SELECT * - FROM mesh_chat_conversations - WHERE is_active = true AND owner_id = $1 - LIMIT 1 - "#, - ) - .bind(owner_id) - .fetch_optional(pool) - .await?; - - if let Some(conv) = existing { - return Ok(conv); - } - - // Create new conversation - sqlx::query_as::<_, MeshChatConversation>( - r#" - INSERT INTO mesh_chat_conversations (owner_id, is_active) - VALUES ($1, true) - RETURNING * - "#, - ) - .bind(owner_id) - .fetch_one(pool) - .await -} - -/// List messages for a conversation. -pub async fn list_chat_messages( - pool: &PgPool, - conversation_id: Uuid, - limit: Option<i32>, -) -> Result<Vec<MeshChatMessageRecord>, sqlx::Error> { - let limit = limit.unwrap_or(100); - sqlx::query_as::<_, MeshChatMessageRecord>( - r#" - SELECT * - FROM mesh_chat_messages - WHERE conversation_id = $1 - ORDER BY created_at ASC - LIMIT $2 - "#, - ) - .bind(conversation_id) - .bind(limit) - .fetch_all(pool) - .await -} - -/// Add a message to a conversation. -#[allow(clippy::too_many_arguments)] -pub async fn add_chat_message( - pool: &PgPool, - conversation_id: Uuid, - role: &str, - content: &str, - context_type: &str, - context_task_id: Option<Uuid>, - tool_calls: Option<serde_json::Value>, - pending_questions: Option<serde_json::Value>, -) -> Result<MeshChatMessageRecord, sqlx::Error> { - sqlx::query_as::<_, MeshChatMessageRecord>( - r#" - INSERT INTO mesh_chat_messages - (conversation_id, role, content, context_type, context_task_id, tool_calls, pending_questions) - VALUES ($1, $2, $3, $4, $5, $6, $7) - RETURNING * - "#, - ) - .bind(conversation_id) - .bind(role) - .bind(content) - .bind(context_type) - .bind(context_task_id) - .bind(tool_calls) - .bind(pending_questions) - .fetch_one(pool) - .await -} - -/// Clear conversation (archive existing and create new). -pub async fn clear_conversation(pool: &PgPool, owner_id: Uuid) -> Result<MeshChatConversation, sqlx::Error> { - // Mark existing as inactive for this owner - sqlx::query( - r#" - UPDATE mesh_chat_conversations - SET is_active = false, updated_at = NOW() - WHERE is_active = true AND owner_id = $1 - "#, - ) - .bind(owner_id) - .execute(pool) - .await?; - - // Create new active conversation - get_or_create_active_conversation(pool, owner_id).await -} - -// ============================================================================= -// Contract Chat History Functions -// ============================================================================= - -/// Get or create the active conversation for a contract. -pub async fn get_or_create_contract_conversation( - pool: &PgPool, - contract_id: Uuid, - owner_id: Uuid, -) -> Result<ContractChatConversation, sqlx::Error> { - // Try to get existing active conversation for this contract - let existing = sqlx::query_as::<_, ContractChatConversation>( - r#" - SELECT * - FROM contract_chat_conversations - WHERE is_active = true AND contract_id = $1 AND owner_id = $2 - LIMIT 1 - "#, - ) - .bind(contract_id) - .bind(owner_id) - .fetch_optional(pool) - .await?; - - if let Some(conv) = existing { - return Ok(conv); - } - - // Create new conversation - sqlx::query_as::<_, ContractChatConversation>( - r#" - INSERT INTO contract_chat_conversations (contract_id, owner_id, is_active) - VALUES ($1, $2, true) - RETURNING * - "#, - ) - .bind(contract_id) - .bind(owner_id) - .fetch_one(pool) - .await -} - -/// List messages for a contract conversation. -pub async fn list_contract_chat_messages( - pool: &PgPool, - conversation_id: Uuid, - limit: Option<i32>, -) -> Result<Vec<ContractChatMessageRecord>, sqlx::Error> { - let limit = limit.unwrap_or(100); - sqlx::query_as::<_, ContractChatMessageRecord>( - r#" - SELECT * - FROM contract_chat_messages - WHERE conversation_id = $1 - ORDER BY created_at ASC - LIMIT $2 - "#, - ) - .bind(conversation_id) - .bind(limit) - .fetch_all(pool) - .await -} - -/// Add a message to a contract conversation. -pub async fn add_contract_chat_message( - pool: &PgPool, - conversation_id: Uuid, - role: &str, - content: &str, - tool_calls: Option<serde_json::Value>, - pending_questions: Option<serde_json::Value>, -) -> Result<ContractChatMessageRecord, sqlx::Error> { - sqlx::query_as::<_, ContractChatMessageRecord>( - r#" - INSERT INTO contract_chat_messages - (conversation_id, role, content, tool_calls, pending_questions) - VALUES ($1, $2, $3, $4, $5) - RETURNING * - "#, - ) - .bind(conversation_id) - .bind(role) - .bind(content) - .bind(tool_calls) - .bind(pending_questions) - .fetch_one(pool) - .await -} - -/// Clear contract conversation (archive existing and create new). -pub async fn clear_contract_conversation( - pool: &PgPool, - contract_id: Uuid, - owner_id: Uuid, -) -> Result<ContractChatConversation, sqlx::Error> { - // Mark existing as inactive for this contract - sqlx::query( - r#" - UPDATE contract_chat_conversations - SET is_active = false, updated_at = NOW() - WHERE is_active = true AND contract_id = $1 AND owner_id = $2 - "#, - ) - .bind(contract_id) - .bind(owner_id) - .execute(pool) - .await?; - - // Create new active conversation - get_or_create_contract_conversation(pool, contract_id, owner_id).await -} - -// ============================================================================= -// Contract Type Template Functions (Owner-Scoped) -// ============================================================================= - -/// Create a new contract type template for a specific owner. -pub async fn create_template_for_owner( - pool: &PgPool, - owner_id: Uuid, - req: CreateTemplateRequest, -) -> Result<ContractTypeTemplateRecord, sqlx::Error> { - sqlx::query_as::<_, ContractTypeTemplateRecord>( - r#" - INSERT INTO contract_type_templates (owner_id, name, description, phases, default_phase, deliverables) - VALUES ($1, $2, $3, $4, $5, $6) - RETURNING * - "#, - ) - .bind(owner_id) - .bind(&req.name) - .bind(&req.description) - .bind(serde_json::to_value(&req.phases).unwrap_or_default()) - .bind(&req.default_phase) - .bind(match &req.deliverables { - Some(d) => serde_json::to_value(d).ok(), - None => None, - }) - .fetch_one(pool) - .await -} - -/// Get a contract type template by ID, scoped to owner. -pub async fn get_template_for_owner( - pool: &PgPool, - id: Uuid, - owner_id: Uuid, -) -> Result<Option<ContractTypeTemplateRecord>, sqlx::Error> { - sqlx::query_as::<_, ContractTypeTemplateRecord>( - r#" - SELECT * - FROM contract_type_templates - WHERE id = $1 AND owner_id = $2 - "#, - ) - .bind(id) - .bind(owner_id) - .fetch_optional(pool) - .await -} - -/// Get a contract type template by ID (internal use, no owner scoping). -pub async fn get_template_by_id( - pool: &PgPool, - id: Uuid, -) -> Result<Option<ContractTypeTemplateRecord>, sqlx::Error> { - sqlx::query_as::<_, ContractTypeTemplateRecord>( - r#" - SELECT * - FROM contract_type_templates - WHERE id = $1 - "#, - ) - .bind(id) - .fetch_optional(pool) - .await -} - -/// List all contract type templates for an owner, ordered by name. -pub async fn list_templates_for_owner( - pool: &PgPool, - owner_id: Uuid, -) -> Result<Vec<ContractTypeTemplateRecord>, sqlx::Error> { - sqlx::query_as::<_, ContractTypeTemplateRecord>( - r#" - SELECT * - FROM contract_type_templates - WHERE owner_id = $1 - ORDER BY name ASC - "#, - ) - .bind(owner_id) - .fetch_all(pool) - .await -} - -/// Update a contract type template for an owner. -pub async fn update_template_for_owner( - pool: &PgPool, - id: Uuid, - owner_id: Uuid, - req: UpdateTemplateRequest, -) -> Result<Option<ContractTypeTemplateRecord>, RepositoryError> { - // Build dynamic update query - let mut query = String::from("UPDATE contract_type_templates SET updated_at = NOW()"); - let mut param_idx = 3; // $1 = id, $2 = owner_id - - if req.name.is_some() { - query.push_str(&format!(", name = ${}", param_idx)); - param_idx += 1; - } - if req.description.is_some() { - query.push_str(&format!(", description = ${}", param_idx)); - param_idx += 1; - } - if req.phases.is_some() { - query.push_str(&format!(", phases = ${}", param_idx)); - param_idx += 1; - } - if req.default_phase.is_some() { - query.push_str(&format!(", default_phase = ${}", param_idx)); - param_idx += 1; - } - if req.deliverables.is_some() { - query.push_str(&format!(", deliverables = ${}", param_idx)); - param_idx += 1; - } - - // Optimistic locking - if req.version.is_some() { - query.push_str(&format!(", version = version + 1 WHERE id = $1 AND owner_id = $2 AND version = ${}", param_idx)); - } else { - query.push_str(", version = version + 1 WHERE id = $1 AND owner_id = $2"); - } - query.push_str(" RETURNING *"); - - let mut sql_query = sqlx::query_as::<_, ContractTypeTemplateRecord>(&query); - sql_query = sql_query.bind(id).bind(owner_id); - - if let Some(ref name) = req.name { - sql_query = sql_query.bind(name); - } - if let Some(ref description) = req.description { - sql_query = sql_query.bind(description); - } - if let Some(ref phases) = req.phases { - sql_query = sql_query.bind(serde_json::to_value(phases).unwrap_or_default()); - } - if let Some(ref default_phase) = req.default_phase { - sql_query = sql_query.bind(default_phase); - } - if let Some(ref deliverables) = req.deliverables { - sql_query = sql_query.bind(serde_json::to_value(deliverables).unwrap_or_default()); - } - if let Some(version) = req.version { - sql_query = sql_query.bind(version); - } - - match sql_query.fetch_optional(pool).await { - Ok(result) => { - if result.is_none() && req.version.is_some() { - // Check if it's a version conflict - if let Some(current) = get_template_for_owner(pool, id, owner_id).await? { - return Err(RepositoryError::VersionConflict { - expected: req.version.unwrap(), - actual: current.version, - }); - } - } - Ok(result) - } - Err(e) => Err(RepositoryError::Database(e)), - } -} - -/// Delete a contract type template for an owner. -pub async fn delete_template_for_owner( - pool: &PgPool, - id: Uuid, - owner_id: Uuid, -) -> Result<bool, sqlx::Error> { - let result = sqlx::query( - r#" - DELETE FROM contract_type_templates - WHERE id = $1 AND owner_id = $2 - "#, - ) - .bind(id) - .bind(owner_id) - .execute(pool) - .await?; - - Ok(result.rows_affected() > 0) -} - -/// Helper function to build PhaseConfig from a template. -pub fn build_phase_config_from_template(template: &ContractTypeTemplateRecord) -> PhaseConfig { - PhaseConfig { - phases: template.phases.clone(), - default_phase: template.default_phase.clone(), - deliverables: template.deliverables.clone().unwrap_or_default(), - } -} - -/// Helper function to build PhaseConfig for built-in contract types. -pub fn build_phase_config_for_builtin(contract_type: &str) -> PhaseConfig { - match contract_type { - "simple" => PhaseConfig { - phases: vec![ - PhaseDefinition { id: "plan".to_string(), name: "Plan".to_string(), order: 0 }, - PhaseDefinition { id: "execute".to_string(), name: "Execute".to_string(), order: 1 }, - ], - default_phase: "plan".to_string(), - deliverables: [ - ("plan".to_string(), vec![DeliverableDefinition { - id: "plan-document".to_string(), - name: "Plan".to_string(), - priority: "required".to_string(), - }]), - ("execute".to_string(), vec![DeliverableDefinition { - id: "pull-request".to_string(), - name: "Pull Request".to_string(), - priority: "required".to_string(), - }]), - ].into_iter().collect(), - }, - "specification" => PhaseConfig { - phases: vec![ - PhaseDefinition { id: "research".to_string(), name: "Research".to_string(), order: 0 }, - PhaseDefinition { id: "specify".to_string(), name: "Specify".to_string(), order: 1 }, - PhaseDefinition { id: "plan".to_string(), name: "Plan".to_string(), order: 2 }, - PhaseDefinition { id: "execute".to_string(), name: "Execute".to_string(), order: 3 }, - PhaseDefinition { id: "review".to_string(), name: "Review".to_string(), order: 4 }, - ], - default_phase: "research".to_string(), - deliverables: [ - ("research".to_string(), vec![DeliverableDefinition { - id: "research-notes".to_string(), - name: "Research Notes".to_string(), - priority: "required".to_string(), - }]), - ("specify".to_string(), vec![DeliverableDefinition { - id: "requirements-document".to_string(), - name: "Requirements Document".to_string(), - priority: "required".to_string(), - }]), - ("plan".to_string(), vec![DeliverableDefinition { - id: "plan-document".to_string(), - name: "Plan".to_string(), - priority: "required".to_string(), - }]), - ("execute".to_string(), vec![DeliverableDefinition { - id: "pull-request".to_string(), - name: "Pull Request".to_string(), - priority: "required".to_string(), - }]), - ("review".to_string(), vec![DeliverableDefinition { - id: "release-notes".to_string(), - name: "Release Notes".to_string(), - priority: "required".to_string(), - }]), - ].into_iter().collect(), - }, - "execute" | _ => PhaseConfig { - phases: vec![ - PhaseDefinition { id: "execute".to_string(), name: "Execute".to_string(), order: 0 }, - ], - default_phase: "execute".to_string(), - deliverables: std::collections::HashMap::new(), - }, - } -} - -// ============================================================================= -// Contract Functions (Owner-Scoped) -// ============================================================================= - -/// Create a new contract for a specific owner. -/// Supports both built-in contract types (simple, specification, execute) and custom templates. -pub async fn create_contract_for_owner( - pool: &PgPool, - owner_id: Uuid, - req: CreateContractRequest, -) -> Result<Contract, sqlx::Error> { - // Determine phase configuration based on template_id or contract_type - let (phase_config, contract_type_str, default_phase): (PhaseConfig, String, String) = - if let Some(template_id) = req.template_id { - // Look up the custom template - let template = get_template_by_id(pool, template_id) - .await? - .ok_or_else(|| { - sqlx::Error::Protocol(format!("Template not found: {}", template_id)) - })?; - - let config = build_phase_config_from_template(&template); - let default = config.default_phase.clone(); - // For custom templates, store the template name as the contract_type - (config, template.name.clone(), default) - } else { - // Use built-in contract type - let contract_type = req.contract_type.as_deref().unwrap_or("simple"); - - // Validate contract type - let valid_types = ["simple", "specification", "execute"]; - if !valid_types.contains(&contract_type) { - return Err(sqlx::Error::Protocol(format!( - "Invalid contract_type '{}'. Must be one of: {} or provide a template_id", - contract_type, - valid_types.join(", ") - ))); - } - - let config = build_phase_config_for_builtin(contract_type); - let default = config.default_phase.clone(); - (config, contract_type.to_string(), default) - }; - - // Get valid phase IDs from the configuration - let valid_phase_ids: Vec<String> = phase_config.phases.iter().map(|p| p.id.clone()).collect(); - - // Use provided initial_phase or default based on contract type/template - let phase = req.initial_phase.as_deref().unwrap_or(&default_phase); - - // Validate the phase is valid for this contract type/template - if !valid_phase_ids.contains(&phase.to_string()) { - return Err(sqlx::Error::Protocol(format!( - "Invalid initial_phase '{}' for contract type '{}'. Must be one of: {}", - phase, - contract_type_str, - valid_phase_ids.join(", ") - ))); - } - - let autonomous_loop = req.autonomous_loop.unwrap_or(false); - let phase_guard = req.phase_guard.unwrap_or(false); - let local_only = req.local_only.unwrap_or(false); - let auto_merge_local = req.auto_merge_local.unwrap_or(false); - - // Serialize phase_config to JSON - let phase_config_json = serde_json::to_value(&phase_config).ok(); - - sqlx::query_as::<_, Contract>( - r#" - INSERT INTO contracts (owner_id, name, description, contract_type, phase, autonomous_loop, phase_guard, local_only, auto_merge_local, phase_config) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) - RETURNING * - "#, - ) - .bind(owner_id) - .bind(&req.name) - .bind(&req.description) - .bind(&contract_type_str) - .bind(phase) - .bind(autonomous_loop) - .bind(phase_guard) - .bind(local_only) - .bind(auto_merge_local) - .bind(phase_config_json) - .fetch_one(pool) - .await -} - -/// Get a contract by ID, scoped to owner. -pub async fn get_contract_for_owner( - pool: &PgPool, - id: Uuid, - owner_id: Uuid, -) -> Result<Option<Contract>, sqlx::Error> { - sqlx::query_as::<_, Contract>( - r#" - SELECT * - FROM contracts - WHERE id = $1 AND owner_id = $2 - "#, - ) - .bind(id) - .bind(owner_id) - .fetch_optional(pool) - .await -} - -/// List all contracts for an owner, ordered by created_at DESC. -pub async fn list_contracts_for_owner( - pool: &PgPool, - owner_id: Uuid, -) -> Result<Vec<ContractSummary>, sqlx::Error> { - sqlx::query_as::<_, ContractSummary>( - r#" - SELECT - c.id, c.name, c.description, c.contract_type, c.phase, c.status, - c.supervisor_task_id, c.local_only, c.auto_merge_local, c.version, c.created_at, - (SELECT COUNT(*) FROM files WHERE contract_id = c.id) as file_count, - (SELECT COUNT(*) FROM tasks WHERE contract_id = c.id) as task_count, - (SELECT COUNT(*) FROM contract_repositories WHERE contract_id = c.id) as repository_count - FROM contracts c - WHERE c.owner_id = $1 - ORDER BY c.created_at DESC - "#, - ) - .bind(owner_id) - .fetch_all(pool) - .await -} - -/// Get contract summary by ID. -pub async fn get_contract_summary_for_owner( - pool: &PgPool, - id: Uuid, - owner_id: Uuid, -) -> Result<Option<ContractSummary>, sqlx::Error> { - sqlx::query_as::<_, ContractSummary>( - r#" - SELECT - c.id, c.name, c.description, c.contract_type, c.phase, c.status, - c.supervisor_task_id, c.local_only, c.auto_merge_local, c.version, c.created_at, - (SELECT COUNT(*) FROM files WHERE contract_id = c.id) as file_count, - (SELECT COUNT(*) FROM tasks WHERE contract_id = c.id) as task_count, - (SELECT COUNT(*) FROM contract_repositories WHERE contract_id = c.id) as repository_count - FROM contracts c - WHERE c.id = $1 AND c.owner_id = $2 - "#, - ) - .bind(id) - .bind(owner_id) - .fetch_optional(pool) - .await -} - -/// Update a contract by ID with optimistic locking, scoped to owner. -pub async fn update_contract_for_owner( - pool: &PgPool, - id: Uuid, - owner_id: Uuid, - req: UpdateContractRequest, -) -> Result<Option<Contract>, RepositoryError> { - let existing = get_contract_for_owner(pool, id, owner_id).await?; - let Some(existing) = existing else { - return Ok(None); - }; - - // Check version if provided (optimistic locking) - if let Some(expected_version) = req.version { - if existing.version != expected_version { - return Err(RepositoryError::VersionConflict { - expected: expected_version, - actual: existing.version, - }); - } - } - - // Apply updates - let name = req.name.unwrap_or(existing.name); - let description = req.description.or(existing.description); - let phase = req.phase.unwrap_or(existing.phase); - let status = req.status.unwrap_or(existing.status); - let supervisor_task_id = req.supervisor_task_id.or(existing.supervisor_task_id); - let autonomous_loop = req.autonomous_loop.unwrap_or(existing.autonomous_loop); - let phase_guard = req.phase_guard.unwrap_or(existing.phase_guard); - let local_only = req.local_only.unwrap_or(existing.local_only); - let auto_merge_local = req.auto_merge_local.unwrap_or(existing.auto_merge_local); - - let result = if req.version.is_some() { - sqlx::query_as::<_, Contract>( - r#" - UPDATE contracts - SET name = $3, description = $4, phase = $5, status = $6, - supervisor_task_id = $7, autonomous_loop = $8, phase_guard = $9, local_only = $10, auto_merge_local = $11, version = version + 1, updated_at = NOW() - WHERE id = $1 AND owner_id = $2 AND version = $12 - RETURNING * - "#, - ) - .bind(id) - .bind(owner_id) - .bind(&name) - .bind(&description) - .bind(&phase) - .bind(&status) - .bind(supervisor_task_id) - .bind(autonomous_loop) - .bind(phase_guard) - .bind(local_only) - .bind(auto_merge_local) - .bind(req.version.unwrap()) - .fetch_optional(pool) - .await? - } else { - sqlx::query_as::<_, Contract>( - r#" - UPDATE contracts - SET name = $3, description = $4, phase = $5, status = $6, - supervisor_task_id = $7, autonomous_loop = $8, phase_guard = $9, local_only = $10, auto_merge_local = $11, version = version + 1, updated_at = NOW() - WHERE id = $1 AND owner_id = $2 - RETURNING * - "#, - ) - .bind(id) - .bind(owner_id) - .bind(&name) - .bind(&description) - .bind(&phase) - .bind(&status) - .bind(supervisor_task_id) - .bind(autonomous_loop) - .bind(phase_guard) - .bind(local_only) - .bind(auto_merge_local) - .fetch_optional(pool) - .await? - }; - - // If versioned update returned None, there was a race condition - if result.is_none() && req.version.is_some() { - if let Some(current) = get_contract_for_owner(pool, id, owner_id).await? { - return Err(RepositoryError::VersionConflict { - expected: req.version.unwrap(), - actual: current.version, - }); - } - } - - Ok(result) -} - -/// Delete a contract by ID, scoped to owner. -pub async fn delete_contract_for_owner( - pool: &PgPool, - id: Uuid, - owner_id: Uuid, -) -> Result<bool, sqlx::Error> { - let result = sqlx::query( - r#" - DELETE FROM contracts - WHERE id = $1 AND owner_id = $2 - "#, - ) - .bind(id) - .bind(owner_id) - .execute(pool) - .await?; - - Ok(result.rows_affected() > 0) -} - -/// Change contract phase and record event. -/// -/// This is the simple version without version checking. Use `change_contract_phase_with_version` -/// for explicit version conflict detection. -pub async fn change_contract_phase_for_owner( - pool: &PgPool, - id: Uuid, - owner_id: Uuid, - new_phase: &str, -) -> Result<Option<Contract>, sqlx::Error> { - // Get current phase - let existing = get_contract_for_owner(pool, id, owner_id).await?; - let Some(existing) = existing else { - return Ok(None); - }; - - let previous_phase = existing.phase.clone(); - - // Update phase - let contract = sqlx::query_as::<_, Contract>( - r#" - UPDATE contracts - SET phase = $3, version = version + 1, updated_at = NOW() - WHERE id = $1 AND owner_id = $2 - RETURNING * - "#, - ) - .bind(id) - .bind(owner_id) - .bind(new_phase) - .fetch_optional(pool) - .await?; - - // Record event - if contract.is_some() { - sqlx::query( - r#" - INSERT INTO contract_events (contract_id, event_type, previous_phase, new_phase) - VALUES ($1, 'phase_change', $2, $3) - "#, - ) - .bind(id) - .bind(&previous_phase) - .bind(new_phase) - .execute(pool) - .await?; - } - - Ok(contract) -} - -/// Change contract phase with explicit version checking for conflict detection. -/// -/// Uses `SELECT ... FOR UPDATE` to lock the row and prevent race conditions. -/// Returns `PhaseChangeResult::VersionConflict` if the expected version doesn't match. -pub async fn change_contract_phase_with_version( - pool: &PgPool, - id: Uuid, - owner_id: Uuid, - new_phase: &str, - expected_version: Option<i32>, -) -> Result<PhaseChangeResult, sqlx::Error> { - // Start a transaction to ensure atomicity with row locking - let mut tx = pool.begin().await?; - - // Lock the row with SELECT FOR UPDATE and get current state - let existing: Option<Contract> = sqlx::query_as::<_, Contract>( - r#" - SELECT * - FROM contracts - WHERE id = $1 AND owner_id = $2 - FOR UPDATE - "#, - ) - .bind(id) - .bind(owner_id) - .fetch_optional(&mut *tx) - .await?; - - let Some(existing) = existing else { - tx.rollback().await?; - return Ok(PhaseChangeResult::NotFound); - }; - - // Check version if provided (optimistic locking) - if let Some(expected) = expected_version { - if existing.version != expected { - tx.rollback().await?; - return Ok(PhaseChangeResult::VersionConflict { - expected, - actual: existing.version, - current_phase: existing.phase, - }); - } - } - - // Validate the phase transition is allowed - let valid_phases = existing.valid_phase_ids(); - if !valid_phases.contains(&new_phase.to_string()) { - tx.rollback().await?; - return Ok(PhaseChangeResult::ValidationFailed { - reason: format!( - "Invalid phase '{}' for contract type '{}'", - new_phase, existing.contract_type - ), - missing_requirements: vec![format!( - "Phase must be one of: {}", - valid_phases.join(", ") - )], - }); - } - - let previous_phase = existing.phase.clone(); - - // Update phase with version increment - let contract = sqlx::query_as::<_, Contract>( - r#" - UPDATE contracts - SET phase = $3, version = version + 1, updated_at = NOW() - WHERE id = $1 AND owner_id = $2 - RETURNING * - "#, - ) - .bind(id) - .bind(owner_id) - .bind(new_phase) - .fetch_one(&mut *tx) - .await?; - - // Record event - sqlx::query( - r#" - INSERT INTO contract_events (contract_id, event_type, previous_phase, new_phase) - VALUES ($1, 'phase_change', $2, $3) - "#, - ) - .bind(id) - .bind(&previous_phase) - .bind(new_phase) - .execute(&mut *tx) - .await?; - - // Commit the transaction - tx.commit().await?; - - Ok(PhaseChangeResult::Success(contract)) -} - -// ============================================================================= -// Contract Repository Functions -// ============================================================================= - -/// List repositories for a contract. -pub async fn list_contract_repositories( - pool: &PgPool, - contract_id: Uuid, -) -> Result<Vec<ContractRepository>, sqlx::Error> { - sqlx::query_as::<_, ContractRepository>( - r#" - SELECT * - FROM contract_repositories - WHERE contract_id = $1 - ORDER BY is_primary DESC, created_at ASC - "#, - ) - .bind(contract_id) - .fetch_all(pool) - .await -} - -/// Add a remote repository to a contract. -pub async fn add_remote_repository( - pool: &PgPool, - contract_id: Uuid, - name: &str, - repository_url: &str, - is_primary: bool, -) -> Result<ContractRepository, sqlx::Error> { - // If is_primary, clear other primaries first - if is_primary { - sqlx::query( - r#" - UPDATE contract_repositories - SET is_primary = false, updated_at = NOW() - WHERE contract_id = $1 AND is_primary = true - "#, - ) - .bind(contract_id) - .execute(pool) - .await?; - } - sqlx::query_as::<_, ContractRepository>( - r#" - INSERT INTO contract_repositories (contract_id, name, repository_url, source_type, status, is_primary) - VALUES ($1, $2, $3, 'remote', 'ready', $4) - RETURNING * - "#, - ) - .bind(contract_id) - .bind(name) - .bind(repository_url) - .bind(is_primary) - .fetch_one(pool) - .await -} - -/// Add a local repository to a contract. -pub async fn add_local_repository( - pool: &PgPool, - contract_id: Uuid, - name: &str, - local_path: &str, - is_primary: bool, -) -> Result<ContractRepository, sqlx::Error> { - // If is_primary, clear other primaries first - if is_primary { - sqlx::query( - r#" - UPDATE contract_repositories - SET is_primary = false, updated_at = NOW() - WHERE contract_id = $1 AND is_primary = true - "#, - ) - .bind(contract_id) - .execute(pool) - .await?; - } - - sqlx::query_as::<_, ContractRepository>( +/// Get task tree from a specific root task. +pub async fn get_task_tree(pool: &PgPool, root_task_id: Uuid) -> Result<Vec<Task>, sqlx::Error> { + sqlx::query_as::<_, Task>( r#" - INSERT INTO contract_repositories (contract_id, name, local_path, source_type, status, is_primary) - VALUES ($1, $2, $3, 'local', 'ready', $4) - RETURNING * - "#, - ) - .bind(contract_id) - .bind(name) - .bind(local_path) - .bind(is_primary) - .fetch_one(pool) - .await -} - -/// Create a managed repository (daemon will create it). -pub async fn create_managed_repository( - pool: &PgPool, - contract_id: Uuid, - name: &str, - is_primary: bool, -) -> Result<ContractRepository, sqlx::Error> { - // If is_primary, clear other primaries first - if is_primary { - sqlx::query( - r#" - UPDATE contract_repositories - SET is_primary = false, updated_at = NOW() - WHERE contract_id = $1 AND is_primary = true - "#, + WITH RECURSIVE task_tree AS ( + -- Base case: the root task + SELECT * FROM tasks WHERE id = $1 + UNION ALL + -- Recursive case: children of current level + SELECT t.* FROM tasks t + JOIN task_tree tt ON t.parent_task_id = tt.id ) - .bind(contract_id) - .execute(pool) - .await?; - } - - sqlx::query_as::<_, ContractRepository>( - r#" - INSERT INTO contract_repositories (contract_id, name, source_type, status, is_primary) - VALUES ($1, $2, 'managed', 'pending', $3) - RETURNING * - "#, - ) - .bind(contract_id) - .bind(name) - .bind(is_primary) - .fetch_one(pool) - .await -} - -/// Delete a repository from a contract. -pub async fn delete_contract_repository( - pool: &PgPool, - repo_id: Uuid, - contract_id: Uuid, -) -> Result<bool, sqlx::Error> { - let result = sqlx::query( - r#" - DELETE FROM contract_repositories - WHERE id = $1 AND contract_id = $2 - "#, - ) - .bind(repo_id) - .bind(contract_id) - .execute(pool) - .await?; - - Ok(result.rows_affected() > 0) -} - -/// Set a repository as primary (and clear others). -pub async fn set_repository_primary( - pool: &PgPool, - repo_id: Uuid, - contract_id: Uuid, -) -> Result<bool, sqlx::Error> { - // Clear other primaries - sqlx::query( - r#" - UPDATE contract_repositories - SET is_primary = false, updated_at = NOW() - WHERE contract_id = $1 AND is_primary = true - "#, - ) - .bind(contract_id) - .execute(pool) - .await?; - - // Set this one as primary - let result = sqlx::query( - r#" - UPDATE contract_repositories - SET is_primary = true, updated_at = NOW() - WHERE id = $1 AND contract_id = $2 - "#, - ) - .bind(repo_id) - .bind(contract_id) - .execute(pool) - .await?; - - Ok(result.rows_affected() > 0) -} - -/// Update managed repository status (used by daemon). -pub async fn update_managed_repository_status( - pool: &PgPool, - repo_id: Uuid, - status: &str, - repository_url: Option<&str>, -) -> Result<Option<ContractRepository>, sqlx::Error> { - sqlx::query_as::<_, ContractRepository>( - r#" - UPDATE contract_repositories - SET status = $2, repository_url = COALESCE($3, repository_url), updated_at = NOW() - WHERE id = $1 - RETURNING * - "#, - ) - .bind(repo_id) - .bind(status) - .bind(repository_url) - .fetch_optional(pool) - .await -} - -// ============================================================================= -// Contract Task Association Functions -// ============================================================================= - -/// Add a task to a contract. -pub async fn add_task_to_contract( - pool: &PgPool, - contract_id: Uuid, - task_id: Uuid, - owner_id: Uuid, -) -> Result<bool, sqlx::Error> { - let result = sqlx::query( - r#" - UPDATE tasks - SET contract_id = $2, updated_at = NOW() - WHERE id = $1 AND owner_id = $3 - "#, - ) - .bind(task_id) - .bind(contract_id) - .bind(owner_id) - .execute(pool) - .await?; - - Ok(result.rows_affected() > 0) -} - -/// Remove a task from a contract. -pub async fn remove_task_from_contract( - pool: &PgPool, - contract_id: Uuid, - task_id: Uuid, - owner_id: Uuid, -) -> Result<bool, sqlx::Error> { - let result = sqlx::query( - r#" - UPDATE tasks - SET contract_id = NULL, updated_at = NOW() - WHERE id = $1 AND contract_id = $2 AND owner_id = $3 - "#, - ) - .bind(task_id) - .bind(contract_id) - .bind(owner_id) - .execute(pool) - .await?; - - Ok(result.rows_affected() > 0) -} - -/// List files in a contract. -pub async fn list_files_in_contract( - pool: &PgPool, - contract_id: Uuid, - owner_id: Uuid, -) -> Result<Vec<FileSummary>, sqlx::Error> { - // Use a manual query since FileSummary doesn't have a FromRow derive with all the computed fields - let files = sqlx::query_as::<_, File>( - r#" - SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at - FROM files - WHERE contract_id = $1 AND owner_id = $2 - ORDER BY created_at DESC - "#, - ) - .bind(contract_id) - .bind(owner_id) - .fetch_all(pool) - .await?; - - Ok(files.into_iter().map(FileSummary::from).collect()) -} - -/// List tasks in a contract. -pub async fn list_tasks_in_contract( - pool: &PgPool, - contract_id: Uuid, - owner_id: Uuid, -) -> Result<Vec<TaskSummary>, sqlx::Error> { - sqlx::query_as::<_, TaskSummary>( - r#" - SELECT - t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, - c.status as contract_status, - t.parent_task_id, t.depth, t.name, t.status, t.priority, - t.progress_summary, - (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, - t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at - FROM tasks t - LEFT JOIN contracts c ON t.contract_id = c.id - WHERE t.contract_id = $1 AND t.owner_id = $2 - ORDER BY t.priority DESC, t.created_at DESC - "#, - ) - .bind(contract_id) - .bind(owner_id) - .fetch_all(pool) - .await -} - -/// Minimal task info for worktree cleanup operations. -#[derive(Debug, Clone, sqlx::FromRow)] -pub struct TaskWorktreeInfo { - pub id: Uuid, - pub daemon_id: Option<Uuid>, - pub overlay_path: Option<String>, - /// If set, this task shares the worktree of the specified supervisor task. - /// Should NOT have its worktree deleted during cleanup. - pub supervisor_worktree_task_id: Option<Uuid>, -} - -/// List tasks in a contract with their daemon/worktree info. -/// Used for cleaning up worktrees when a contract is completed or deleted. -pub async fn list_contract_tasks_with_worktree_info( - pool: &PgPool, - contract_id: Uuid, -) -> Result<Vec<TaskWorktreeInfo>, sqlx::Error> { - sqlx::query_as::<_, TaskWorktreeInfo>( - r#" - SELECT id, daemon_id, overlay_path, supervisor_worktree_task_id - FROM tasks - WHERE contract_id = $1 AND (daemon_id IS NOT NULL OR overlay_path IS NOT NULL) - "#, - ) - .bind(contract_id) - .fetch_all(pool) - .await -} - -// ============================================================================= -// Contract Events -// ============================================================================= - -/// List events for a contract. -pub async fn list_contract_events( - pool: &PgPool, - contract_id: Uuid, -) -> Result<Vec<ContractEvent>, sqlx::Error> { - sqlx::query_as::<_, ContractEvent>( - r#" - SELECT * - FROM contract_events - WHERE contract_id = $1 - ORDER BY created_at DESC + SELECT * FROM task_tree + ORDER BY depth, created_at "#, ) - .bind(contract_id) + .bind(root_task_id) .fetch_all(pool) .await } -/// Record a contract event. -pub async fn record_contract_event( - pool: &PgPool, - contract_id: Uuid, - event_type: &str, - event_data: Option<serde_json::Value>, -) -> Result<ContractEvent, sqlx::Error> { - sqlx::query_as::<_, ContractEvent>( - r#" - INSERT INTO contract_events (contract_id, event_type, event_data) - VALUES ($1, $2, $3) - RETURNING * - "#, - ) - .bind(contract_id) - .bind(event_type) - .bind(event_data) - .fetch_one(pool) - .await -} - // ============================================================================ // Task Checkpoints // ============================================================================ @@ -3501,713 +2100,6 @@ pub async fn list_task_checkpoints( } // ============================================================================ -// Supervisor State -// ============================================================================ - -/// Create or update supervisor state for a contract. -pub async fn upsert_supervisor_state( - pool: &PgPool, - contract_id: Uuid, - task_id: Uuid, - conversation_history: serde_json::Value, - pending_task_ids: &[Uuid], - phase: &str, -) -> Result<SupervisorState, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - INSERT INTO supervisor_states (contract_id, task_id, conversation_history, pending_task_ids, phase, last_activity) - VALUES ($1, $2, $3, $4, $5, NOW()) - ON CONFLICT (contract_id) DO UPDATE SET - task_id = EXCLUDED.task_id, - conversation_history = EXCLUDED.conversation_history, - pending_task_ids = EXCLUDED.pending_task_ids, - phase = EXCLUDED.phase, - last_activity = NOW(), - updated_at = NOW() - RETURNING * - "#, - ) - .bind(contract_id) - .bind(task_id) - .bind(conversation_history) - .bind(pending_task_ids) - .bind(phase) - .fetch_one(pool) - .await -} - -/// Get supervisor state for a contract. -pub async fn get_supervisor_state( - pool: &PgPool, - contract_id: Uuid, -) -> Result<Option<SupervisorState>, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>("SELECT * FROM supervisor_states WHERE contract_id = $1") - .bind(contract_id) - .fetch_optional(pool) - .await -} - -/// Get supervisor state by task ID. -pub async fn get_supervisor_state_by_task( - pool: &PgPool, - task_id: Uuid, -) -> Result<Option<SupervisorState>, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>("SELECT * FROM supervisor_states WHERE task_id = $1") - .bind(task_id) - .fetch_optional(pool) - .await -} - -/// Update supervisor conversation history. -pub async fn update_supervisor_conversation( - pool: &PgPool, - contract_id: Uuid, - conversation_history: serde_json::Value, -) -> Result<SupervisorState, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - UPDATE supervisor_states - SET conversation_history = $1, - last_activity = NOW(), - updated_at = NOW() - WHERE contract_id = $2 - RETURNING * - "#, - ) - .bind(conversation_history) - .bind(contract_id) - .fetch_one(pool) - .await -} - -/// Update supervisor pending tasks. -pub async fn update_supervisor_pending_tasks( - pool: &PgPool, - contract_id: Uuid, - pending_task_ids: &[Uuid], -) -> Result<SupervisorState, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - UPDATE supervisor_states - SET pending_task_ids = $1, - last_activity = NOW(), - updated_at = NOW() - WHERE contract_id = $2 - RETURNING * - "#, - ) - .bind(pending_task_ids) - .bind(contract_id) - .fetch_one(pool) - .await -} - -/// Update supervisor state with detailed activity tracking. -/// Called at key save points: LLM response, task spawn, question asked, phase change. -pub async fn update_supervisor_detailed_state( - pool: &PgPool, - contract_id: Uuid, - state: &str, - current_activity: Option<&str>, - progress: i32, - error_message: Option<&str>, -) -> Result<SupervisorState, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - UPDATE supervisor_states - SET state = $1, - current_activity = $2, - progress = $3, - error_message = $4, - last_activity = NOW(), - updated_at = NOW() - WHERE contract_id = $5 - RETURNING * - "#, - ) - .bind(state) - .bind(current_activity) - .bind(progress) - .bind(error_message) - .bind(contract_id) - .fetch_one(pool) - .await -} - -/// Add a spawned task ID to the supervisor's list. -pub async fn add_supervisor_spawned_task( - pool: &PgPool, - contract_id: Uuid, - task_id: Uuid, -) -> Result<SupervisorState, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - UPDATE supervisor_states - SET spawned_task_ids = array_append(spawned_task_ids, $1), - last_activity = NOW(), - updated_at = NOW() - WHERE contract_id = $2 - RETURNING * - "#, - ) - .bind(task_id) - .bind(contract_id) - .fetch_one(pool) - .await -} - -/// Add a pending question to the supervisor state. -pub async fn add_supervisor_pending_question( - pool: &PgPool, - contract_id: Uuid, - question: serde_json::Value, -) -> Result<SupervisorState, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - UPDATE supervisor_states - SET pending_questions = pending_questions || $1::jsonb, - state = 'waiting_for_user', - last_activity = NOW(), - updated_at = NOW() - WHERE contract_id = $2 - RETURNING * - "#, - ) - .bind(question) - .bind(contract_id) - .fetch_one(pool) - .await -} - -/// Remove a pending question by ID. -pub async fn remove_supervisor_pending_question( - pool: &PgPool, - contract_id: Uuid, - question_id: Uuid, -) -> Result<SupervisorState, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - UPDATE supervisor_states - SET pending_questions = ( - SELECT COALESCE(jsonb_agg(elem), '[]'::jsonb) - FROM jsonb_array_elements(pending_questions) elem - WHERE (elem->>'id')::uuid != $1 - ), - last_activity = NOW(), - updated_at = NOW() - WHERE contract_id = $2 - RETURNING * - "#, - ) - .bind(question_id) - .bind(contract_id) - .fetch_one(pool) - .await -} - -/// Comprehensive state save - used at major save points. -pub async fn save_supervisor_state_full( - pool: &PgPool, - contract_id: Uuid, - task_id: Uuid, - conversation_history: serde_json::Value, - pending_task_ids: &[Uuid], - phase: &str, - state: &str, - current_activity: Option<&str>, - progress: i32, - error_message: Option<&str>, - spawned_task_ids: &[Uuid], - pending_questions: serde_json::Value, -) -> Result<SupervisorState, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - INSERT INTO supervisor_states ( - contract_id, task_id, conversation_history, pending_task_ids, phase, - state, current_activity, progress, error_message, spawned_task_ids, - pending_questions, last_activity - ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW()) - ON CONFLICT (contract_id) DO UPDATE SET - task_id = EXCLUDED.task_id, - conversation_history = EXCLUDED.conversation_history, - pending_task_ids = EXCLUDED.pending_task_ids, - phase = EXCLUDED.phase, - state = EXCLUDED.state, - current_activity = EXCLUDED.current_activity, - progress = EXCLUDED.progress, - error_message = EXCLUDED.error_message, - spawned_task_ids = EXCLUDED.spawned_task_ids, - pending_questions = EXCLUDED.pending_questions, - last_activity = NOW(), - updated_at = NOW() - RETURNING * - "#, - ) - .bind(contract_id) - .bind(task_id) - .bind(conversation_history) - .bind(pending_task_ids) - .bind(phase) - .bind(state) - .bind(current_activity) - .bind(progress) - .bind(error_message) - .bind(spawned_task_ids) - .bind(pending_questions) - .fetch_one(pool) - .await -} - -/// Mark supervisor as restored from a crash/interruption. -pub async fn mark_supervisor_restored( - pool: &PgPool, - contract_id: Uuid, - restoration_source: &str, -) -> Result<SupervisorState, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - UPDATE supervisor_states - SET restoration_count = restoration_count + 1, - last_restored_at = NOW(), - restoration_source = $1, - state = 'initializing', - error_message = NULL, - last_activity = NOW(), - updated_at = NOW() - WHERE contract_id = $2 - RETURNING * - "#, - ) - .bind(restoration_source) - .bind(contract_id) - .fetch_one(pool) - .await -} - -/// Get supervisors with pending questions (for re-delivery after restoration). -pub async fn get_supervisors_with_pending_questions( - pool: &PgPool, - owner_id: Uuid, -) -> Result<Vec<SupervisorState>, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - SELECT ss.* - FROM supervisor_states ss - JOIN contracts c ON c.id = ss.contract_id - WHERE c.owner_id = $1 - AND ss.pending_questions != '[]'::jsonb - AND c.status = 'active' - ORDER BY ss.last_activity DESC - "#, - ) - .bind(owner_id) - .fetch_all(pool) - .await -} - -/// Get supervisor state with full details for restoration. -/// Includes validation info. -pub async fn get_supervisor_state_for_restoration( - pool: &PgPool, - contract_id: Uuid, -) -> Result<Option<SupervisorState>, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - SELECT * FROM supervisor_states WHERE contract_id = $1 - "#, - ) - .bind(contract_id) - .fetch_optional(pool) - .await -} - -/// Validate spawned tasks are in expected states. -/// Returns map of task_id -> (status, updated_at). -pub async fn validate_spawned_tasks( - pool: &PgPool, - task_ids: &[Uuid], -) -> Result<std::collections::HashMap<Uuid, (String, chrono::DateTime<Utc>)>, sqlx::Error> { - use sqlx::Row; - - let rows = sqlx::query( - r#" - SELECT id, status, updated_at - FROM tasks - WHERE id = ANY($1) - "#, - ) - .bind(task_ids) - .fetch_all(pool) - .await?; - - let mut result = std::collections::HashMap::new(); - for row in rows { - let id: Uuid = row.get("id"); - let status: String = row.get("status"); - let updated_at: chrono::DateTime<Utc> = row.get("updated_at"); - result.insert(id, (status, updated_at)); - } - Ok(result) -} - -/// Update supervisor state when phase changes. -pub async fn update_supervisor_phase( - pool: &PgPool, - contract_id: Uuid, - new_phase: &str, -) -> Result<SupervisorState, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - UPDATE supervisor_states - SET phase = $1, - state = 'working', - current_activity = 'Phase changed to ' || $1, - last_activity = NOW(), - updated_at = NOW() - WHERE contract_id = $2 - RETURNING * - "#, - ) - .bind(new_phase) - .bind(contract_id) - .fetch_one(pool) - .await -} - -/// Update supervisor state on heartbeat (lightweight update). -pub async fn update_supervisor_heartbeat_state( - pool: &PgPool, - contract_id: Uuid, - state: &str, - current_activity: Option<&str>, - progress: i32, - pending_task_ids: &[Uuid], -) -> Result<(), sqlx::Error> { - sqlx::query( - r#" - UPDATE supervisor_states - SET state = $1, - current_activity = $2, - progress = $3, - pending_task_ids = $4, - last_activity = NOW(), - updated_at = NOW() - WHERE contract_id = $5 - "#, - ) - .bind(state) - .bind(current_activity) - .bind(progress) - .bind(pending_task_ids) - .bind(contract_id) - .execute(pool) - .await?; - Ok(()) -} - -// ============================================================================ -// Supervisor Heartbeats -// ============================================================================ - -/// Record a supervisor heartbeat. -/// This creates a historical record for monitoring and dead supervisor detection. -pub async fn create_supervisor_heartbeat( - pool: &PgPool, - supervisor_task_id: Uuid, - contract_id: Uuid, - state: &str, - phase: &str, - current_activity: Option<&str>, - progress: i32, - pending_task_ids: &[Uuid], -) -> Result<SupervisorHeartbeatRecord, sqlx::Error> { - sqlx::query_as::<_, SupervisorHeartbeatRecord>( - r#" - INSERT INTO supervisor_heartbeats ( - supervisor_task_id, contract_id, state, phase, current_activity, progress, pending_task_ids, timestamp - ) - VALUES ($1, $2, $3, $4, $5, $6, $7, NOW()) - RETURNING * - "#, - ) - .bind(supervisor_task_id) - .bind(contract_id) - .bind(state) - .bind(phase) - .bind(current_activity) - .bind(progress) - .bind(pending_task_ids) - .fetch_one(pool) - .await -} - -/// Get the latest heartbeat for a supervisor task. -pub async fn get_latest_supervisor_heartbeat( - pool: &PgPool, - supervisor_task_id: Uuid, -) -> Result<Option<SupervisorHeartbeatRecord>, sqlx::Error> { - sqlx::query_as::<_, SupervisorHeartbeatRecord>( - r#" - SELECT * FROM supervisor_heartbeats - WHERE supervisor_task_id = $1 - ORDER BY timestamp DESC - LIMIT 1 - "#, - ) - .bind(supervisor_task_id) - .fetch_optional(pool) - .await -} - -/// Get recent heartbeats for a supervisor task. -pub async fn get_supervisor_heartbeats( - pool: &PgPool, - supervisor_task_id: Uuid, - limit: i64, -) -> Result<Vec<SupervisorHeartbeatRecord>, sqlx::Error> { - sqlx::query_as::<_, SupervisorHeartbeatRecord>( - r#" - SELECT * FROM supervisor_heartbeats - WHERE supervisor_task_id = $1 - ORDER BY timestamp DESC - LIMIT $2 - "#, - ) - .bind(supervisor_task_id) - .bind(limit) - .fetch_all(pool) - .await -} - -/// Get recent heartbeats for a contract. -pub async fn get_contract_supervisor_heartbeats( - pool: &PgPool, - contract_id: Uuid, - limit: i64, -) -> Result<Vec<SupervisorHeartbeatRecord>, sqlx::Error> { - sqlx::query_as::<_, SupervisorHeartbeatRecord>( - r#" - SELECT * FROM supervisor_heartbeats - WHERE contract_id = $1 - ORDER BY timestamp DESC - LIMIT $2 - "#, - ) - .bind(contract_id) - .bind(limit) - .fetch_all(pool) - .await -} - -/// Delete old heartbeats beyond the TTL (24 hours by default). -/// Returns the number of deleted records. -pub async fn cleanup_old_heartbeats( - pool: &PgPool, - ttl_hours: i64, -) -> Result<u64, sqlx::Error> { - let result = sqlx::query( - r#" - DELETE FROM supervisor_heartbeats - WHERE timestamp < NOW() - ($1 || ' hours')::INTERVAL - "#, - ) - .bind(ttl_hours.to_string()) - .execute(pool) - .await?; - - Ok(result.rows_affected()) -} - -/// Find supervisors that have not sent a heartbeat within the timeout period. -/// Returns list of (supervisor_task_id, contract_id, last_heartbeat_timestamp). -pub async fn find_stale_supervisors( - pool: &PgPool, - timeout_seconds: i64, -) -> Result<Vec<(Uuid, Uuid, chrono::DateTime<Utc>)>, sqlx::Error> { - let rows = sqlx::query( - r#" - WITH latest_heartbeats AS ( - SELECT DISTINCT ON (supervisor_task_id) - supervisor_task_id, - contract_id, - timestamp - FROM supervisor_heartbeats - ORDER BY supervisor_task_id, timestamp DESC - ) - SELECT - lh.supervisor_task_id, - lh.contract_id, - lh.timestamp - FROM latest_heartbeats lh - JOIN tasks t ON t.id = lh.supervisor_task_id - WHERE t.status = 'running' - AND lh.timestamp < NOW() - ($1 || ' seconds')::INTERVAL - "#, - ) - .bind(timeout_seconds.to_string()) - .fetch_all(pool) - .await?; - - let mut result = Vec::new(); - for row in rows { - use sqlx::Row; - let supervisor_task_id: Uuid = row.get("supervisor_task_id"); - let contract_id: Uuid = row.get("contract_id"); - let timestamp: chrono::DateTime<Utc> = row.get("timestamp"); - result.push((supervisor_task_id, contract_id, timestamp)); - } - Ok(result) -} - -// ============================================================================ -// Contract Supervisor -// ============================================================================ - -/// Update contract's supervisor task ID. -pub async fn update_contract_supervisor( - pool: &PgPool, - contract_id: Uuid, - supervisor_task_id: Uuid, -) -> Result<Contract, sqlx::Error> { - sqlx::query_as::<_, Contract>( - r#" - UPDATE contracts - SET supervisor_task_id = $1, - updated_at = NOW() - WHERE id = $2 - RETURNING * - "#, - ) - .bind(supervisor_task_id) - .bind(contract_id) - .fetch_one(pool) - .await -} - -/// Mark a deliverable as complete for a specific phase. -/// Uses JSONB operations to append the deliverable_id to the phase's array. -pub async fn mark_deliverable_complete( - pool: &PgPool, - contract_id: Uuid, - phase: &str, - deliverable_id: &str, -) -> Result<Contract, sqlx::Error> { - // Use jsonb_set to add the deliverable to the phase's array - // If the phase key doesn't exist, create an empty array first - // COALESCE handles the case where the phase array doesn't exist yet - sqlx::query_as::<_, Contract>( - r#" - UPDATE contracts - SET completed_deliverables = jsonb_set( - completed_deliverables, - ARRAY[$2::text], - COALESCE(completed_deliverables->$2, '[]'::jsonb) || to_jsonb($3::text), - true - ), - updated_at = NOW() - WHERE id = $1 - AND NOT (COALESCE(completed_deliverables->$2, '[]'::jsonb) ? $3) - RETURNING * - "#, - ) - .bind(contract_id) - .bind(phase) - .bind(deliverable_id) - .fetch_one(pool) - .await -} - -/// Clear all completed deliverables for a specific phase. -/// Used when phase changes or deliverables need to be reset. -pub async fn clear_phase_deliverables( - pool: &PgPool, - contract_id: Uuid, - phase: &str, -) -> Result<Contract, sqlx::Error> { - sqlx::query_as::<_, Contract>( - r#" - UPDATE contracts - SET completed_deliverables = completed_deliverables - $2, - updated_at = NOW() - WHERE id = $1 - RETURNING * - "#, - ) - .bind(contract_id) - .bind(phase) - .fetch_one(pool) - .await -} - -/// Get the supervisor task for a contract. -pub async fn get_contract_supervisor_task( - pool: &PgPool, - contract_id: Uuid, -) -> Result<Option<Task>, sqlx::Error> { - sqlx::query_as::<_, Task>( - r#" - SELECT t.* FROM tasks t - JOIN contracts c ON c.supervisor_task_id = t.id - WHERE c.id = $1 - "#, - ) - .bind(contract_id) - .fetch_optional(pool) - .await -} - -// ============================================================================ -// Task Tree Queries -// ============================================================================ - -/// Get full task tree for a contract. -pub async fn get_contract_task_tree( - pool: &PgPool, - contract_id: Uuid, -) -> Result<Vec<Task>, sqlx::Error> { - sqlx::query_as::<_, Task>( - r#" - WITH RECURSIVE task_tree AS ( - -- Base case: root tasks (no parent) - SELECT * FROM tasks - WHERE contract_id = $1 AND parent_task_id IS NULL - UNION ALL - -- Recursive case: children of current level - SELECT t.* FROM tasks t - JOIN task_tree tt ON t.parent_task_id = tt.id - ) - SELECT * FROM task_tree - ORDER BY depth, created_at - "#, - ) - .bind(contract_id) - .fetch_all(pool) - .await -} - -/// Get task tree from a specific root task. -pub async fn get_task_tree(pool: &PgPool, root_task_id: Uuid) -> Result<Vec<Task>, sqlx::Error> { - sqlx::query_as::<_, Task>( - r#" - WITH RECURSIVE task_tree AS ( - -- Base case: the root task - SELECT * FROM tasks WHERE id = $1 - UNION ALL - -- Recursive case: children of current level - SELECT t.* FROM tasks t - JOIN task_tree tt ON t.parent_task_id = tt.id - ) - SELECT * FROM task_tree - ORDER BY depth, created_at - "#, - ) - .bind(root_task_id) - .fetch_all(pool) - .await -} - -// ============================================================================ // Daemon Selection // ============================================================================ @@ -4578,107 +2470,27 @@ pub async fn cleanup_old_snapshots( pub async fn record_history_event( pool: &PgPool, owner_id: Uuid, - contract_id: Option<Uuid>, task_id: Option<Uuid>, event_type: &str, event_subtype: Option<&str>, - phase: Option<&str>, event_data: serde_json::Value, ) -> Result<HistoryEvent, sqlx::Error> { sqlx::query_as::<_, HistoryEvent>( r#" - INSERT INTO history_events (owner_id, contract_id, task_id, event_type, event_subtype, phase, event_data) - VALUES ($1, $2, $3, $4, $5, $6, $7) + INSERT INTO history_events (owner_id, task_id, event_type, event_subtype, event_data) + VALUES ($1, $2, $3, $4, $5) RETURNING * "# ) .bind(owner_id) - .bind(contract_id) .bind(task_id) .bind(event_type) .bind(event_subtype) - .bind(phase) .bind(event_data) .fetch_one(pool) .await } -/// Get contract history timeline -pub async fn get_contract_history( - pool: &PgPool, - contract_id: Uuid, - owner_id: Uuid, - filters: &HistoryQueryFilters, -) -> Result<(Vec<HistoryEvent>, i64), sqlx::Error> { - let limit = filters.limit.unwrap_or(100); - - let mut query = String::from( - "SELECT * FROM history_events WHERE contract_id = $1 AND owner_id = $2" - ); - let mut count_query = String::from( - "SELECT COUNT(*) FROM history_events WHERE contract_id = $1 AND owner_id = $2" - ); - - let mut param_count = 2; - - if filters.phase.is_some() { - param_count += 1; - query.push_str(&format!(" AND phase = ${}" , param_count)); - count_query.push_str(&format!(" AND phase = ${}", param_count)); - } - - if filters.from.is_some() { - param_count += 1; - query.push_str(&format!(" AND created_at >= ${}", param_count)); - count_query.push_str(&format!(" AND created_at >= ${}", param_count)); - } - - if filters.to.is_some() { - param_count += 1; - query.push_str(&format!(" AND created_at <= ${}", param_count)); - count_query.push_str(&format!(" AND created_at <= ${}", param_count)); - } - - query.push_str(" ORDER BY created_at DESC"); - query.push_str(&format!(" LIMIT {}", limit)); - - // Build and execute the query dynamically - let mut q = sqlx::query_as::<_, HistoryEvent>(&query) - .bind(contract_id) - .bind(owner_id); - - if let Some(ref phase) = filters.phase { - q = q.bind(phase); - } - if let Some(ref from) = filters.from { - q = q.bind(from); - } - if let Some(ref to) = filters.to { - q = q.bind(to); - } - - let events = q.fetch_all(pool).await?; - - // Get total count - let mut cq = sqlx::query_scalar::<_, i64>(&count_query) - .bind(contract_id) - .bind(owner_id); - - if let Some(ref phase) = filters.phase { - cq = cq.bind(phase); - } - if let Some(ref from) = filters.from { - cq = cq.bind(from); - } - if let Some(ref to) = filters.to { - cq = cq.bind(to); - } - - let count = cq.fetch_one(pool).await?; - - Ok((events, count)) -} - /// Get task history pub async fn get_task_history( pool: &PgPool, @@ -4825,13 +2637,6 @@ pub async fn get_task_conversation( Ok(messages) } -/// Get supervisor conversation (from supervisor_states) -pub async fn get_supervisor_conversation_full( - pool: &PgPool, - contract_id: Uuid, -) -> Result<Option<SupervisorState>, sqlx::Error> { - get_supervisor_state(pool, contract_id).await -} // ============================================================================= // Anonymous Task Cleanup Functions @@ -4969,156 +2774,6 @@ pub async fn delete_checkpoint_patches_for_task( Ok(result.rows_affected() as i64) } -// ============================================================================= -// Red Team Notifications -// ============================================================================= -// ============================================================================= -// Supervisor Status API Helpers -// ============================================================================= - -/// Get supervisor status for a contract. -/// Returns combined information from supervisor_states and tasks tables. -pub async fn get_supervisor_status( - pool: &PgPool, - contract_id: Uuid, - owner_id: Uuid, -) -> Result<Option<SupervisorStatusInfo>, sqlx::Error> { - // Query to get supervisor status by joining supervisor_states with tasks - sqlx::query_as::<_, SupervisorStatusInfo>( - r#" - SELECT - ss.task_id, - COALESCE(t.status, 'unknown') as supervisor_state, - ss.phase, - t.progress_summary as current_activity, - ss.pending_task_ids, - ss.last_activity as last_heartbeat, - t.status as task_status, - t.daemon_id IS NOT NULL as is_running - FROM supervisor_states ss - JOIN tasks t ON t.id = ss.task_id - WHERE ss.contract_id = $1 - AND t.owner_id = $2 - "#, - ) - .bind(contract_id) - .bind(owner_id) - .fetch_optional(pool) - .await -} - -/// Internal struct to hold supervisor status query result -#[derive(Debug, Clone, sqlx::FromRow)] -pub struct SupervisorStatusInfo { - pub task_id: Uuid, - pub supervisor_state: String, - pub phase: String, - pub current_activity: Option<String>, - #[sqlx(try_from = "Vec<Uuid>")] - pub pending_task_ids: Vec<Uuid>, - pub last_heartbeat: chrono::DateTime<chrono::Utc>, - pub task_status: String, - pub is_running: bool, -} - -/// Get supervisor activity history from history_events table. -/// This provides a timeline of supervisor activities that can serve as "heartbeats". -pub async fn get_supervisor_activity_history( - pool: &PgPool, - contract_id: Uuid, - limit: i32, - offset: i32, -) -> Result<Vec<SupervisorActivityEntry>, sqlx::Error> { - sqlx::query_as::<_, SupervisorActivityEntry>( - r#" - SELECT - created_at as timestamp, - COALESCE(event_subtype, 'activity') as state, - event_data->>'activity' as activity, - (event_data->>'progress')::INTEGER as progress, - COALESCE(phase, 'unknown') as phase, - CASE - WHEN event_data->'pending_task_ids' IS NOT NULL - THEN ARRAY(SELECT jsonb_array_elements_text(event_data->'pending_task_ids'))::UUID[] - ELSE ARRAY[]::UUID[] - END as pending_task_ids - FROM history_events - WHERE contract_id = $1 - AND event_type IN ('supervisor', 'phase', 'task') - ORDER BY created_at DESC - LIMIT $2 OFFSET $3 - "#, - ) - .bind(contract_id) - .bind(limit) - .bind(offset) - .fetch_all(pool) - .await -} - -/// Internal struct to hold supervisor activity entry -#[derive(Debug, Clone, sqlx::FromRow)] -pub struct SupervisorActivityEntry { - pub timestamp: chrono::DateTime<chrono::Utc>, - pub state: String, - pub activity: Option<String>, - pub progress: Option<i32>, - pub phase: String, - #[sqlx(try_from = "Vec<Uuid>")] - pub pending_task_ids: Vec<Uuid>, -} - -/// Count total supervisor activity history entries for a contract. -pub async fn count_supervisor_activity_history( - pool: &PgPool, - contract_id: Uuid, -) -> Result<i64, sqlx::Error> { - let result: (i64,) = sqlx::query_as( - r#" - SELECT COUNT(*) - FROM history_events - WHERE contract_id = $1 - AND event_type IN ('supervisor', 'phase', 'task') - "#, - ) - .bind(contract_id) - .fetch_one(pool) - .await?; - Ok(result.0) -} - -/// Update supervisor state last_activity timestamp. -/// This acts as a "sync" operation to refresh the supervisor's heartbeat. -pub async fn sync_supervisor_state( - pool: &PgPool, - contract_id: Uuid, -) -> Result<Option<SupervisorState>, sqlx::Error> { - sqlx::query_as::<_, SupervisorState>( - r#" - UPDATE supervisor_states - SET last_activity = NOW(), - updated_at = NOW() - WHERE contract_id = $1 - RETURNING * - "#, - ) - .bind(contract_id) - .fetch_optional(pool) - .await -} - -// ============================================================================= -// Helper Functions -// ============================================================================= - -/// Helper to truncate string to max length -fn truncate_string(s: &str, max_len: usize) -> String { - if s.len() <= max_len { - s.to_string() - } else { - format!("{}...", &s[..max_len - 3]) - } -} // ============================================================================= // Directive CRUD @@ -7031,37 +4686,6 @@ pub async fn get_running_steps_with_tasks( .await } -/// A running step backed by a contract, joined with the contract's current status. -#[derive(Debug, Clone, sqlx::FromRow)] -pub struct RunningStepWithContract { - pub step_id: Uuid, - pub directive_id: Uuid, - pub contract_id: Uuid, - pub contract_status: String, - pub contract_phase: String, -} - -/// Get running steps that are backed by contracts (for contract-based monitoring). -pub async fn get_running_steps_with_contracts( - pool: &PgPool, -) -> Result<Vec<RunningStepWithContract>, sqlx::Error> { - sqlx::query_as::<_, RunningStepWithContract>( - r#" - SELECT - ds.id AS step_id, - ds.directive_id, - ds.contract_id AS "contract_id!", - c.status AS contract_status, - c.phase AS contract_phase - FROM directive_steps ds - JOIN contracts c ON c.id = ds.contract_id - WHERE ds.status = 'running' - AND ds.contract_id IS NOT NULL - "#, - ) - .fetch_all(pool) - .await -} /// An orchestrator task to check (directive with pending planning task). #[derive(Debug, Clone, sqlx::FromRow)] @@ -7221,25 +4845,6 @@ pub async fn link_task_to_step( Ok(()) } -/// Link a contract to a directive step. -pub async fn link_contract_to_step( - pool: &PgPool, - step_id: Uuid, - contract_id: Uuid, -) -> Result<(), sqlx::Error> { - sqlx::query( - r#" - UPDATE directive_steps - SET contract_id = $1 - WHERE id = $2 - "#, - ) - .bind(contract_id) - .bind(step_id) - .execute(pool) - .await?; - Ok(()) -} /// Set a step to 'running' status (after its task has been dispatched). pub async fn set_step_running( |
