summaryrefslogtreecommitdiff
path: root/makima/src/db
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/db')
-rw-r--r--makima/src/db/models.rs905
-rw-r--r--makima/src/db/repository.rs2537
2 files changed, 73 insertions, 3369 deletions
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs
index 3fb9667..bfb8bf3 100644
--- a/makima/src/db/models.rs
+++ b/makima/src/db/models.rs
@@ -111,10 +111,6 @@ pub enum BodyElement {
pub struct File {
pub id: Uuid,
pub owner_id: Uuid,
- /// Contract this file belongs to (optional)
- pub contract_id: Option<Uuid>,
- /// Phase of the contract when file was added (e.g., "research", "specify")
- pub contract_phase: Option<String>,
pub name: String,
pub description: Option<String>,
#[sqlx(json)]
@@ -141,8 +137,6 @@ pub struct File {
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateFileRequest {
- /// Contract this file belongs to (required - files must belong to a contract)
- pub contract_id: Uuid,
/// Name of the file (auto-generated if not provided)
pub name: Option<String>,
/// Optional description
@@ -157,8 +151,6 @@ pub struct CreateFileRequest {
pub body: Vec<BodyElement>,
/// Path to linked repository file (e.g., "README.md")
pub repo_file_path: Option<String>,
- /// Contract phase this file belongs to (for deliverable tracking)
- pub contract_phase: Option<String>,
}
/// Request payload for updating an existing file.
@@ -194,12 +186,6 @@ pub struct FileListResponse {
#[serde(rename_all = "camelCase")]
pub struct FileSummary {
pub id: Uuid,
- /// Contract this file belongs to
- pub contract_id: Option<Uuid>,
- /// Contract name (joined from contracts table)
- pub contract_name: Option<String>,
- /// Phase when file was added to contract
- pub contract_phase: Option<String>,
pub name: String,
pub description: Option<String>,
pub transcript_count: usize,
@@ -224,9 +210,6 @@ impl From<File> for FileSummary {
.fold(0.0_f32, f32::max);
Self {
id: file.id,
- contract_id: file.contract_id,
- contract_name: None, // Not available from File alone, requires JOIN
- contract_phase: file.contract_phase,
name: file.name,
description: file.description,
transcript_count: file.transcript.len(),
@@ -425,8 +408,6 @@ impl std::str::FromStr for MergeMode {
pub struct Task {
pub id: Uuid,
pub owner_id: Uuid,
- /// Contract this task belongs to (required for new tasks)
- pub contract_id: Option<Uuid>,
pub parent_task_id: Option<Uuid>,
/// Depth in task hierarchy (no longer constrained)
pub depth: i32,
@@ -436,11 +417,6 @@ pub struct Task {
pub priority: i32,
pub plan: String,
- // Supervisor flag
- /// True for contract supervisor tasks. Only supervisors can spawn new tasks.
- #[serde(default)]
- pub is_supervisor: bool,
-
// Daemon/container info
pub daemon_id: Option<Uuid>,
pub container_id: Option<String>,
@@ -565,14 +541,6 @@ impl Task {
#[serde(rename_all = "camelCase")]
pub struct TaskSummary {
pub id: Uuid,
- /// Contract this task belongs to
- pub contract_id: Option<Uuid>,
- /// Contract name (joined from contracts table)
- pub contract_name: Option<String>,
- /// Contract phase (joined from contracts table)
- pub contract_phase: Option<String>,
- /// Contract status (joined from contracts table): 'active', 'completed', 'archived'
- pub contract_status: Option<String>,
pub parent_task_id: Option<Uuid>,
/// Depth in task hierarchy: 0=orchestrator (top-level), 1=subtask (max)
pub depth: i32,
@@ -582,9 +550,6 @@ pub struct TaskSummary {
pub progress_summary: Option<String>,
pub subtask_count: i64,
pub version: i32,
- /// True for contract supervisor tasks
- #[serde(default)]
- pub is_supervisor: bool,
/// Whether this task is hidden from the UI (user dismissed it)
#[serde(default)]
pub hidden: bool,
@@ -597,10 +562,6 @@ impl From<Task> for TaskSummary {
fn from(task: Task) -> Self {
Self {
id: task.id,
- contract_id: task.contract_id,
- contract_name: None, // Not available from Task directly
- contract_phase: None, // Not available from Task directly
- contract_status: None, // Not available from Task directly
parent_task_id: task.parent_task_id,
depth: task.depth,
name: task.name,
@@ -609,7 +570,6 @@ impl From<Task> for TaskSummary {
progress_summary: task.progress_summary,
subtask_count: 0, // Would need separate query
version: task.version,
- is_supervisor: task.is_supervisor,
hidden: task.hidden,
created_at: task.created_at,
updated_at: task.updated_at,
@@ -629,8 +589,6 @@ pub struct TaskListResponse {
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateTaskRequest {
- /// Contract this task belongs to (optional for branched/anonymous tasks)
- pub contract_id: Option<Uuid>,
/// Name of the task
pub name: String,
/// Optional description
@@ -639,9 +597,6 @@ pub struct CreateTaskRequest {
pub plan: String,
/// Parent task ID (for subtasks)
pub parent_task_id: Option<Uuid>,
- /// True for contract supervisor tasks. Only supervisors can spawn new tasks.
- #[serde(default)]
- pub is_supervisor: bool,
/// Priority (higher = more urgent)
#[serde(default)]
pub priority: i32,
@@ -668,9 +623,6 @@ pub struct CreateTaskRequest {
pub branched_from_task_id: Option<Uuid>,
/// Conversation history to initialize the task with (JSON array of messages)
pub conversation_history: Option<serde_json::Value>,
- /// Task ID whose worktree this task shares. When set, this task reuses the supervisor's
- /// worktree instead of creating its own, and should NOT have its worktree deleted during cleanup.
- pub supervisor_worktree_task_id: Option<Uuid>,
/// Directive this task belongs to (for directive-driven tasks)
pub directive_id: Option<Uuid>,
/// Directive step this task executes
@@ -935,87 +887,8 @@ pub struct TaskOutputResponse {
pub task_id: Uuid,
}
-// =============================================================================
-// Mesh Chat History Types
-// =============================================================================
-
-/// Mesh chat conversation for persisting history
-#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct MeshChatConversation {
- pub id: Uuid,
- pub owner_id: Uuid,
- pub name: Option<String>,
- pub is_active: bool,
- pub created_at: DateTime<Utc>,
- pub updated_at: DateTime<Utc>,
-}
-
-/// Individual message in a mesh chat conversation
-#[derive(Debug, Clone, FromRow, Serialize, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct MeshChatMessageRecord {
- pub id: Uuid,
- pub conversation_id: Uuid,
- pub role: String,
- pub content: String,
- pub context_type: String,
- pub context_task_id: Option<Uuid>,
- /// Tool calls made during this message (JSON, nullable)
- pub tool_calls: Option<serde_json::Value>,
- /// Pending questions requiring user response (JSON, nullable)
- pub pending_questions: Option<serde_json::Value>,
- pub created_at: DateTime<Utc>,
-}
-
-/// Response for chat history endpoint
-#[derive(Debug, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct MeshChatHistoryResponse {
- pub conversation_id: Uuid,
- pub messages: Vec<MeshChatMessageRecord>,
-}
-
-// =============================================================================
-// Contract Chat History Types
-// =============================================================================
-
-/// Conversation thread for contract chat (scoped to a specific contract)
-#[derive(Debug, Clone, FromRow, Serialize, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct ContractChatConversation {
- pub id: Uuid,
- pub contract_id: Uuid,
- pub owner_id: Uuid,
- pub name: Option<String>,
- pub is_active: bool,
- pub created_at: DateTime<Utc>,
- pub updated_at: DateTime<Utc>,
-}
-
-/// Individual message in a contract chat conversation
-#[derive(Debug, Clone, FromRow, Serialize, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct ContractChatMessageRecord {
- pub id: Uuid,
- pub conversation_id: Uuid,
- pub role: String,
- pub content: String,
- /// Tool calls made during this message (JSON, nullable)
- pub tool_calls: Option<serde_json::Value>,
- /// Pending questions requiring user response (JSON, nullable)
- pub pending_questions: Option<serde_json::Value>,
- pub created_at: DateTime<Utc>,
-}
-
-/// Response for contract chat history endpoint
-#[derive(Debug, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct ContractChatHistoryResponse {
- pub contract_id: Uuid,
- pub conversation_id: Uuid,
- pub messages: Vec<ContractChatMessageRecord>,
-}
+// (MeshChat* + ContractChat* types removed alongside their dead
+// tables/handlers — see migration 20260517000000.)
// =============================================================================
// Merge API Types
@@ -1120,772 +993,6 @@ pub struct MergeCompleteCheckResponse {
pub skipped_count: u32,
}
-// =============================================================================
-// Contract Type Templates (User-defined)
-// =============================================================================
-
-/// A phase definition within a contract template
-#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct PhaseDefinition {
- /// Phase identifier (e.g., "research", "plan", "execute")
- pub id: String,
- /// Display name for the phase
- pub name: String,
- /// Order in the workflow (0-indexed)
- pub order: i32,
-}
-
-/// A deliverable definition within a phase
-#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct DeliverableDefinition {
- /// Deliverable identifier (e.g., "plan-document", "pull-request")
- pub id: String,
- /// Display name for the deliverable
- pub name: String,
- /// Priority: "required", "recommended", or "optional"
- #[serde(default = "default_priority")]
- pub priority: String,
-}
-
-fn default_priority() -> String {
- "required".to_string()
-}
-
-/// Phase configuration stored on a contract (copied from template at creation)
-#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct PhaseConfig {
- /// Ordered list of phases in the workflow
- pub phases: Vec<PhaseDefinition>,
- /// Default starting phase
- pub default_phase: String,
- /// Deliverables per phase: { "phase_id": [deliverables] }
- #[serde(default)]
- pub deliverables: std::collections::HashMap<String, Vec<DeliverableDefinition>>,
-}
-
-/// Contract type template record from the database
-#[derive(Debug, Clone, FromRow, Serialize, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct ContractTypeTemplateRecord {
- pub id: Uuid,
- pub owner_id: Uuid,
- pub name: String,
- pub description: Option<String>,
- #[sqlx(json)]
- pub phases: Vec<PhaseDefinition>,
- pub default_phase: String,
- #[sqlx(json)]
- pub deliverables: Option<std::collections::HashMap<String, Vec<DeliverableDefinition>>>,
- pub version: i32,
- pub created_at: DateTime<Utc>,
- pub updated_at: DateTime<Utc>,
-}
-
-/// Request to create a new contract type template
-#[derive(Debug, Clone, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct CreateTemplateRequest {
- pub name: String,
- pub description: Option<String>,
- pub phases: Vec<PhaseDefinition>,
- pub default_phase: String,
- pub deliverables: Option<std::collections::HashMap<String, Vec<DeliverableDefinition>>>,
-}
-
-/// Request to update a contract type template
-#[derive(Debug, Clone, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct UpdateTemplateRequest {
- pub name: Option<String>,
- pub description: Option<String>,
- pub phases: Option<Vec<PhaseDefinition>>,
- pub default_phase: Option<String>,
- pub deliverables: Option<std::collections::HashMap<String, Vec<DeliverableDefinition>>>,
- /// Version for optimistic locking
- pub version: Option<i32>,
-}
-
-/// Summary of a contract type template for list views
-#[derive(Debug, Clone, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct ContractTypeTemplateSummary {
- pub id: Uuid,
- pub name: String,
- pub description: Option<String>,
- pub phases: Vec<PhaseDefinition>,
- pub default_phase: String,
- pub is_builtin: bool,
- pub version: i32,
- pub created_at: DateTime<Utc>,
-}
-
-// =============================================================================
-// Contract Types
-// =============================================================================
-
-/// Contract type determines the workflow and required documents
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
-#[serde(rename_all = "lowercase")]
-pub enum ContractType {
- /// Simple Plan -> Execute workflow (default)
- /// - Plan phase: requires a "Plan" document
- /// - Execute phase: requires a "PR" document
- Simple,
- /// Specification-based development with TDD
- /// - Research: requires "Research Notes" document
- /// - Specify: requires "Requirements Document"
- /// - Plan: requires "Plan" document
- /// - Execute: requires "PR" document
- /// - Review: requires "Release Notes" document
- Specification,
- /// Execute-only workflow with no deliverables
- /// - Only has "execute" phase
- /// - NO deliverables at all - just execute tasks directly
- Execute,
-}
-
-impl Default for ContractType {
- fn default() -> Self {
- ContractType::Simple
- }
-}
-
-impl std::fmt::Display for ContractType {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- match self {
- ContractType::Simple => write!(f, "simple"),
- ContractType::Specification => write!(f, "specification"),
- ContractType::Execute => write!(f, "execute"),
- }
- }
-}
-
-impl std::str::FromStr for ContractType {
- type Err = String;
-
- fn from_str(s: &str) -> Result<Self, Self::Err> {
- match s.to_lowercase().as_str() {
- "simple" => Ok(ContractType::Simple),
- "specification" => Ok(ContractType::Specification),
- "execute" => Ok(ContractType::Execute),
- _ => Err(format!("Unknown contract type: {}", s)),
- }
- }
-}
-
-/// Contract phase for workflow progression
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
-#[serde(rename_all = "lowercase")]
-pub enum ContractPhase {
- Research,
- Specify,
- Plan,
- Execute,
- Review,
-}
-
-impl std::fmt::Display for ContractPhase {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- match self {
- ContractPhase::Research => write!(f, "research"),
- ContractPhase::Specify => write!(f, "specify"),
- ContractPhase::Plan => write!(f, "plan"),
- ContractPhase::Execute => write!(f, "execute"),
- ContractPhase::Review => write!(f, "review"),
- }
- }
-}
-
-impl std::str::FromStr for ContractPhase {
- type Err = String;
-
- fn from_str(s: &str) -> Result<Self, Self::Err> {
- match s.to_lowercase().as_str() {
- "research" => Ok(ContractPhase::Research),
- "specify" => Ok(ContractPhase::Specify),
- "plan" => Ok(ContractPhase::Plan),
- "execute" => Ok(ContractPhase::Execute),
- "review" => Ok(ContractPhase::Review),
- _ => Err(format!("Unknown contract phase: {}", s)),
- }
- }
-}
-
-/// Contract status
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
-#[serde(rename_all = "lowercase")]
-pub enum ContractStatus {
- Active,
- Completed,
- Archived,
-}
-
-impl std::fmt::Display for ContractStatus {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- match self {
- ContractStatus::Active => write!(f, "active"),
- ContractStatus::Completed => write!(f, "completed"),
- ContractStatus::Archived => write!(f, "archived"),
- }
- }
-}
-
-impl std::str::FromStr for ContractStatus {
- type Err = String;
-
- fn from_str(s: &str) -> Result<Self, Self::Err> {
- match s.to_lowercase().as_str() {
- "active" => Ok(ContractStatus::Active),
- "completed" => Ok(ContractStatus::Completed),
- "archived" => Ok(ContractStatus::Archived),
- _ => Err(format!("Unknown contract status: {}", s)),
- }
- }
-}
-
-/// Repository source type
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
-#[serde(rename_all = "lowercase")]
-pub enum RepositorySourceType {
- /// Existing remote repo (GitHub, GitLab, etc)
- Remote,
- /// Existing local repo
- Local,
- /// New repo created/managed by Makima daemon
- Managed,
-}
-
-impl std::fmt::Display for RepositorySourceType {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- match self {
- RepositorySourceType::Remote => write!(f, "remote"),
- RepositorySourceType::Local => write!(f, "local"),
- RepositorySourceType::Managed => write!(f, "managed"),
- }
- }
-}
-
-impl std::str::FromStr for RepositorySourceType {
- type Err = String;
-
- fn from_str(s: &str) -> Result<Self, Self::Err> {
- match s.to_lowercase().as_str() {
- "remote" => Ok(RepositorySourceType::Remote),
- "local" => Ok(RepositorySourceType::Local),
- "managed" => Ok(RepositorySourceType::Managed),
- _ => Err(format!("Unknown repository source type: {}", s)),
- }
- }
-}
-
-/// Repository status (for managed repos)
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
-#[serde(rename_all = "lowercase")]
-pub enum RepositoryStatus {
- /// Repo is usable
- Ready,
- /// Waiting for daemon to create
- Pending,
- /// Daemon is creating the repo
- Creating,
- /// Creation failed
- Failed,
-}
-
-impl std::fmt::Display for RepositoryStatus {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- match self {
- RepositoryStatus::Ready => write!(f, "ready"),
- RepositoryStatus::Pending => write!(f, "pending"),
- RepositoryStatus::Creating => write!(f, "creating"),
- RepositoryStatus::Failed => write!(f, "failed"),
- }
- }
-}
-
-impl std::str::FromStr for RepositoryStatus {
- type Err = String;
-
- fn from_str(s: &str) -> Result<Self, Self::Err> {
- match s.to_lowercase().as_str() {
- "ready" => Ok(RepositoryStatus::Ready),
- "pending" => Ok(RepositoryStatus::Pending),
- "creating" => Ok(RepositoryStatus::Creating),
- "failed" => Ok(RepositoryStatus::Failed),
- _ => Err(format!("Unknown repository status: {}", s)),
- }
- }
-}
-
-/// Contract record from the database
-#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct Contract {
- pub id: Uuid,
- pub owner_id: Uuid,
- pub name: String,
- pub description: Option<String>,
- /// Contract type: "simple" or "specification"
- pub contract_type: String,
- pub phase: String,
- pub status: String,
- /// The long-running supervisor task that orchestrates this contract
- #[serde(skip_serializing_if = "Option::is_none")]
- pub supervisor_task_id: Option<Uuid>,
- /// Whether tasks for this contract should run in autonomous loop mode.
- /// When enabled, tasks will automatically restart with --continue if they exit
- /// without a COMPLETION_GATE indicating ready: true.
- #[serde(default)]
- pub autonomous_loop: bool,
- /// Whether to wait for user confirmation before progressing to the next phase.
- /// When enabled, the supervisor will pause and ask the user to review and approve
- /// phase outputs (like plans, requirements, etc.) before continuing.
- #[serde(default)]
- pub phase_guard: bool,
- /// Completed deliverables per phase.
- /// Structure: { "plan": ["plan-document"], "execute": ["pull-request"] }
- #[sqlx(json)]
- #[serde(default)]
- pub completed_deliverables: serde_json::Value,
- /// Whether this contract operates in local-only mode.
- /// When enabled, automatic completion actions (branch, merge, pr) are skipped,
- /// allowing users to manually handle code changes via patch files or other means.
- #[serde(default)]
- pub local_only: bool,
- /// Whether to auto-merge to target branch locally when local_only mode is enabled.
- /// When both local_only and auto_merge_local are true, completed task changes will be
- /// automatically merged to the master/main branch locally (without pushing or creating PRs).
- #[serde(default)]
- pub auto_merge_local: bool,
- /// Phase configuration copied from template at contract creation (raw JSON).
- /// When present, this overrides the built-in contract type phases.
- /// Use `get_phase_config()` to get the parsed PhaseConfig.
- #[serde(skip_serializing_if = "Option::is_none")]
- pub phase_config: Option<serde_json::Value>,
- pub version: i32,
- pub created_at: DateTime<Utc>,
- pub updated_at: DateTime<Utc>,
-}
-
-impl Contract {
- /// Parse contract_type string to ContractType enum
- pub fn contract_type_enum(&self) -> Result<ContractType, String> {
- self.contract_type.parse()
- }
-
- /// Parse phase string to ContractPhase enum
- pub fn phase_enum(&self) -> Result<ContractPhase, String> {
- self.phase.parse()
- }
-
- /// Parse status string to ContractStatus enum
- pub fn status_enum(&self) -> Result<ContractStatus, String> {
- self.status.parse()
- }
-
- /// Get valid phase IDs for this contract (as strings)
- pub fn valid_phase_ids(&self) -> Vec<String> {
- // Check phase_config first (for custom templates)
- if let Some(config) = self.get_phase_config() {
- let mut phases: Vec<_> = config.phases.iter().collect();
- phases.sort_by_key(|p| p.order);
- return phases.iter().map(|p| p.id.clone()).collect();
- }
-
- // Fall back to built-in contract types
- match self.contract_type.as_str() {
- "simple" => vec!["plan".to_string(), "execute".to_string()],
- "specification" => vec![
- "research".to_string(),
- "specify".to_string(),
- "plan".to_string(),
- "execute".to_string(),
- "review".to_string(),
- ],
- "execute" => vec!["execute".to_string()],
- _ => vec!["plan".to_string(), "execute".to_string()],
- }
- }
-
- /// Get valid phases for this contract type (as ContractPhase enums)
- /// Note: For custom templates with non-standard phases, this only returns
- /// phases that map to the ContractPhase enum.
- pub fn valid_phases(&self) -> Vec<ContractPhase> {
- self.valid_phase_ids()
- .iter()
- .filter_map(|id| id.parse::<ContractPhase>().ok())
- .collect()
- }
-
- /// Get the initial phase ID for this contract type (as string)
- pub fn initial_phase_id(&self) -> String {
- // Check phase_config first (for custom templates)
- if let Some(config) = self.get_phase_config() {
- return config.default_phase.clone();
- }
-
- // Fall back to built-in contract types
- match self.contract_type.as_str() {
- "specification" => "research".to_string(),
- "execute" => "execute".to_string(),
- _ => "plan".to_string(),
- }
- }
-
- /// Get the initial phase for this contract type (as ContractPhase enum)
- pub fn initial_phase(&self) -> ContractPhase {
- self.initial_phase_id()
- .parse()
- .unwrap_or(ContractPhase::Plan)
- }
-
- /// Get the terminal phase ID for this contract type (as string)
- pub fn terminal_phase_id(&self) -> String {
- // Check phase_config first (for custom templates)
- if let Some(config) = self.get_phase_config() {
- // Last phase in sorted order is the terminal phase
- let mut phases: Vec<_> = config.phases.iter().collect();
- phases.sort_by_key(|p| p.order);
- if let Some(last) = phases.last() {
- return last.id.clone();
- }
- }
-
- // Fall back to built-in contract types
- match self.contract_type.as_str() {
- "specification" => "review".to_string(),
- _ => "execute".to_string(),
- }
- }
-
- /// Get the terminal phase for this contract type (phase where contract can be completed)
- pub fn terminal_phase(&self) -> ContractPhase {
- self.terminal_phase_id()
- .parse()
- .unwrap_or(ContractPhase::Execute)
- }
-
- /// Check if a phase ID is valid for this contract
- pub fn is_valid_phase(&self, phase_id: &str) -> bool {
- self.valid_phase_ids().contains(&phase_id.to_string())
- }
-
- /// Get the phase configuration for custom templates
- pub fn get_phase_config(&self) -> Option<PhaseConfig> {
- self.phase_config
- .as_ref()
- .and_then(|v| serde_json::from_value(v.clone()).ok())
- }
-
- /// Get completed deliverable IDs for a specific phase
- pub fn get_completed_deliverables(&self, phase: &str) -> Vec<String> {
- self.completed_deliverables
- .get(phase)
- .and_then(|v| v.as_array())
- .map(|arr| {
- arr.iter()
- .filter_map(|v| v.as_str().map(String::from))
- .collect()
- })
- .unwrap_or_default()
- }
-
- /// Check if a specific deliverable is marked as complete for a phase
- pub fn is_deliverable_complete(&self, phase: &str, deliverable_id: &str) -> bool {
- self.get_completed_deliverables(phase)
- .contains(&deliverable_id.to_string())
- }
-}
-
-/// Contract repository record from the database
-#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct ContractRepository {
- pub id: Uuid,
- pub contract_id: Uuid,
- pub name: String,
- pub repository_url: Option<String>,
- pub local_path: Option<String>,
- pub source_type: String,
- pub status: String,
- pub is_primary: bool,
- pub created_at: DateTime<Utc>,
- pub updated_at: DateTime<Utc>,
-}
-
-impl ContractRepository {
- /// Parse source_type string to RepositorySourceType enum
- pub fn source_type_enum(&self) -> Result<RepositorySourceType, String> {
- self.source_type.parse()
- }
-
- /// Parse status string to RepositoryStatus enum
- pub fn status_enum(&self) -> Result<RepositoryStatus, String> {
- self.status.parse()
- }
-}
-
-/// Summary of a contract for list views
-#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct ContractSummary {
- pub id: Uuid,
- pub name: String,
- pub description: Option<String>,
- /// Contract type: "simple" or "specification"
- pub contract_type: String,
- pub phase: String,
- pub status: String,
- /// Supervisor task ID for contract orchestration
- pub supervisor_task_id: Option<Uuid>,
- /// When true, tasks do not auto-execute completion actions and work stays in worktrees.
- #[serde(default)]
- pub local_only: bool,
- /// When true with local_only, automatically merge completed tasks to target branch locally.
- #[serde(default)]
- pub auto_merge_local: bool,
- pub file_count: i64,
- pub task_count: i64,
- pub repository_count: i64,
- pub version: i32,
- pub created_at: DateTime<Utc>,
-}
-
-/// Contract with all relations for detail view
-#[derive(Debug, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct ContractWithRelations {
- #[serde(flatten)]
- pub contract: Contract,
- pub repositories: Vec<ContractRepository>,
- pub files: Vec<FileSummary>,
- pub tasks: Vec<TaskSummary>,
-}
-
-/// Response for contract list endpoint
-#[derive(Debug, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct ContractListResponse {
- pub contracts: Vec<ContractSummary>,
- pub total: i64,
-}
-
-/// Request payload for creating a new contract
-#[derive(Debug, Clone, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct CreateContractRequest {
- /// Name of the contract
- pub name: String,
- /// Optional description
- pub description: Option<String>,
- /// Contract type: "simple" (default), "specification", "execute", or a custom template name.
- /// For built-in types:
- /// - simple: Plan -> Execute workflow
- /// - specification: Research -> Specify -> Plan -> Execute -> Review
- /// - execute: Execute only
- /// For custom templates, use the template name or provide template_id.
- #[serde(default)]
- pub contract_type: Option<String>,
- /// UUID of a custom template to use. If provided, this takes precedence over contract_type.
- /// The template's phase configuration will be copied to the contract.
- #[serde(default)]
- pub template_id: Option<Uuid>,
- /// Initial phase to start in (defaults based on contract_type or template)
- /// - simple: defaults to "plan"
- /// - specification: defaults to "research"
- #[serde(default)]
- pub initial_phase: Option<String>,
- /// Enable autonomous loop mode for tasks in this contract.
- /// When enabled, tasks automatically restart with --continue if they exit
- /// without a COMPLETION_GATE indicating ready: true.
- #[serde(default)]
- pub autonomous_loop: Option<bool>,
- /// Enable phase guard mode for this contract.
- /// When enabled, the supervisor will pause and ask the user to review and approve
- /// phase outputs before progressing to the next phase.
- #[serde(default)]
- pub phase_guard: Option<bool>,
- /// Enable local-only mode for this contract.
- /// When enabled, automatic completion actions (branch, merge, pr) are skipped,
- /// allowing users to manually handle code changes via patch files or other means.
- #[serde(default)]
- pub local_only: Option<bool>,
- /// Enable auto-merge to target branch locally when local_only mode is enabled.
- /// When both local_only and auto_merge_local are true, completed task changes will be
- /// automatically merged to the master/main branch locally (without pushing or creating PRs).
- #[serde(default)]
- pub auto_merge_local: Option<bool>,
-}
-
-/// Request payload for updating a contract
-#[derive(Debug, Default, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct UpdateContractRequest {
- pub name: Option<String>,
- pub description: Option<String>,
- pub phase: Option<String>,
- pub status: Option<String>,
- /// Supervisor task ID for contract orchestration
- #[serde(skip_serializing_if = "Option::is_none")]
- pub supervisor_task_id: Option<Uuid>,
- /// Enable or disable autonomous loop mode for tasks in this contract.
- #[serde(default)]
- pub autonomous_loop: Option<bool>,
- /// Enable or disable phase guard mode for this contract.
- /// When enabled, the supervisor will pause and ask the user to review and approve
- /// phase outputs before progressing to the next phase.
- #[serde(default)]
- pub phase_guard: Option<bool>,
- /// Enable or disable local-only mode for this contract.
- /// When enabled, automatic completion actions (branch, merge, pr) are skipped,
- /// allowing users to manually handle code changes via patch files or other means.
- #[serde(default)]
- pub local_only: Option<bool>,
- /// Enable or disable auto-merge to target branch locally when local_only mode is enabled.
- /// When both local_only and auto_merge_local are true, completed task changes will be
- /// automatically merged to the master/main branch locally (without pushing or creating PRs).
- #[serde(default)]
- pub auto_merge_local: Option<bool>,
- /// Version for optimistic locking
- pub version: Option<i32>,
-}
-
-/// Request to add a remote repository to a contract
-#[derive(Debug, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct AddRemoteRepositoryRequest {
- pub name: String,
- pub repository_url: String,
- #[serde(default)]
- pub is_primary: bool,
-}
-
-/// Request to add a local repository to a contract
-#[derive(Debug, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct AddLocalRepositoryRequest {
- pub name: String,
- pub local_path: String,
- #[serde(default)]
- pub is_primary: bool,
-}
-
-/// Request to create a managed repository (daemon will create it)
-#[derive(Debug, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct CreateManagedRepositoryRequest {
- pub name: String,
- #[serde(default)]
- pub is_primary: bool,
-}
-
-/// Request to change contract phase
-#[derive(Debug, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct ChangePhaseRequest {
- pub phase: String,
- /// If phase_guard is enabled, this must be true to confirm the transition.
- /// If not provided or false, returns phase deliverables for review.
- #[serde(default)]
- pub confirmed: Option<bool>,
- /// User feedback for changes (used when not confirming)
- #[serde(skip_serializing_if = "Option::is_none")]
- pub feedback: Option<String>,
- /// Expected version for optimistic locking. If provided, the phase change
- /// will only succeed if the current contract version matches.
- #[serde(skip_serializing_if = "Option::is_none")]
- pub expected_version: Option<i32>,
-}
-
-/// Result of a phase change operation, supporting explicit conflict detection.
-#[derive(Debug, Clone)]
-pub enum PhaseChangeResult {
- /// Phase change succeeded, returning the updated contract
- Success(Contract),
- /// Version conflict: the contract was modified concurrently
- VersionConflict {
- /// The version the client expected
- expected: i32,
- /// The actual current version in the database
- actual: i32,
- /// The current phase of the contract
- current_phase: String,
- },
- /// Validation failed (e.g., invalid phase transition)
- ValidationFailed {
- /// Human-readable reason for the failure
- reason: String,
- /// List of missing requirements for the phase transition
- missing_requirements: Vec<String>,
- },
- /// The caller is not authorized to change this contract's phase
- Unauthorized,
- /// The contract was not found
- NotFound,
-}
-
-/// Response for phase transition when phase_guard is enabled
-#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct PhaseTransitionRequest {
- /// Current contract phase
- pub current_phase: String,
- /// Requested next phase
- pub next_phase: String,
- /// Summary of phase deliverables/outputs
- pub deliverables_summary: String,
- /// List of files created in this phase
- pub phase_files: Vec<PhaseFileInfo>,
- /// List of completed tasks in this phase
- pub phase_tasks: Vec<PhaseTaskInfo>,
- /// Whether user confirmation is required
- pub requires_confirmation: bool,
- /// Unique ID for tracking this transition request
- pub transition_id: String,
-}
-
-/// File info for phase transition review
-#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct PhaseFileInfo {
- pub id: Uuid,
- pub name: String,
- pub description: Option<String>,
-}
-
-/// Task info for phase transition review
-#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct PhaseTaskInfo {
- pub id: Uuid,
- pub name: String,
- pub status: String,
-}
-
-/// Contract event record from the database
-#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct ContractEvent {
- pub id: Uuid,
- pub contract_id: Uuid,
- pub event_type: String,
- pub previous_phase: Option<String>,
- pub new_phase: Option<String>,
- #[sqlx(json)]
- pub event_data: Option<serde_json::Value>,
- pub created_at: DateTime<Utc>,
-}
-
-/// Response for contract events list endpoint
-#[derive(Debug, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct ContractEventListResponse {
- pub events: Vec<ContractEvent>,
- pub total: i64,
-}
// ============================================================================
// Task Checkpoints (for git checkpoint tracking)
@@ -2173,11 +1280,9 @@ pub struct ConversationSnapshot {
pub struct HistoryEvent {
pub id: Uuid,
pub owner_id: Uuid,
- pub contract_id: Option<Uuid>,
pub task_id: Option<Uuid>,
pub event_type: String,
pub event_subtype: Option<String>,
- pub phase: Option<String>,
#[sqlx(json)]
pub event_data: serde_json::Value,
pub created_at: DateTime<Utc>,
@@ -2221,7 +1326,6 @@ pub struct ToolCallInfo {
#[derive(Debug, Deserialize, ToSchema, Default)]
#[serde(rename_all = "camelCase")]
pub struct HistoryQueryFilters {
- pub phase: Option<String>,
pub event_types: Option<Vec<String>>,
#[serde(default, deserialize_with = "flexible_datetime::deserialize")]
pub from: Option<DateTime<Utc>>,
@@ -2766,11 +1870,6 @@ pub struct DirectiveStep {
/// Status: pending, ready, running, completed, failed, skipped
pub status: String,
pub task_id: Option<Uuid>,
- /// Optional contract ID for contract-backed execution.
- pub contract_id: Option<Uuid>,
- /// Optional contract type (e.g. "simple", "specification", "execute").
- /// When set, the orchestrator creates a contract instead of a standalone task.
- pub contract_type: Option<String>,
pub order_index: i32,
pub generation: i32,
pub started_at: Option<DateTime<Utc>>,
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index ee4b561..d453f99 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -6,21 +6,17 @@ use sqlx::PgPool;
use uuid::Uuid;
use super::models::{
- CheckpointPatch, CheckpointPatchInfo, Contract, ContractChatConversation,
- ContractChatMessageRecord, ContractEvent, ContractRepository, ContractSummary,
- ContractTypeTemplateRecord, ConversationMessage, ConversationSnapshot,
- CreateContractRequest, CreateFileRequest, CreateTaskRequest,
- CreateTemplateRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity,
- DeliverableDefinition, Directive, DirectiveDocument, DirectiveStep, DirectiveSummary,
+ CheckpointPatch, CheckpointPatchInfo, ConversationMessage, ConversationSnapshot,
+ CreateFileRequest, CreateTaskRequest,
+ Daemon, DaemonTaskAssignment, DaemonWithCapacity,
+ Directive, DirectiveDocument, DirectiveStep, DirectiveSummary,
CreateDirectiveRequest, CreateDirectiveStepRequest,
UpdateDirectiveRequest, UpdateDirectiveStepRequest,
CreateOrderRequest, Order, UpdateOrderRequest,
CreateDirectiveOrderGroupRequest, DirectiveOrderGroup, UpdateDirectiveOrderGroupRequest,
File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters,
- MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult, PhaseConfig,
- PhaseDefinition, SupervisorHeartbeatRecord, SupervisorState,
- Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest,
- UpdateFileRequest, UpdateTaskRequest, UpdateTemplateRequest,
+ Task, TaskCheckpoint, TaskEvent, TaskSummary,
+ UpdateFileRequest, UpdateTaskRequest,
};
/// Repository error types.
@@ -89,7 +85,7 @@ pub async fn create_file(pool: &PgPool, req: InternalCreateFileRequest) -> Resul
r#"
INSERT INTO files (name, description, transcript, location, summary, body)
VALUES ($1, $2, $3, $4, NULL, $5)
- RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(&name)
@@ -105,7 +101,7 @@ pub async fn create_file(pool: &PgPool, req: InternalCreateFileRequest) -> Resul
pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
- SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
WHERE id = $1
"#,
@@ -119,7 +115,7 @@ pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Err
pub async fn list_files(pool: &PgPool) -> Result<Vec<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
- SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
ORDER BY created_at DESC
"#,
@@ -171,7 +167,7 @@ pub async fn update_file(
UPDATE files
SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW()
WHERE id = $1 AND version = $7
- RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(id)
@@ -190,7 +186,7 @@ pub async fn update_file(
UPDATE files
SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW()
WHERE id = $1
- RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(id)
@@ -246,7 +242,6 @@ pub async fn count_files(pool: &PgPool) -> Result<i64, sqlx::Error> {
// =============================================================================
/// Create a new file record for a specific owner.
-/// Files must belong to a contract - the contract_id is required and the phase is looked up.
pub async fn create_file_for_owner(
pool: &PgPool,
owner_id: Uuid,
@@ -254,32 +249,16 @@ pub async fn create_file_for_owner(
) -> Result<File, sqlx::Error> {
let name = req.name.unwrap_or_else(generate_default_name);
let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default();
- // Use body from request (may be empty or contain template elements)
let body_json = serde_json::to_value(&req.body).unwrap_or_default();
- // Use provided contract_phase, or look up from contract's current phase
- let contract_phase: Option<String> = if req.contract_phase.is_some() {
- req.contract_phase
- } else {
- sqlx::query_scalar(
- "SELECT phase FROM contracts WHERE id = $1 AND owner_id = $2",
- )
- .bind(req.contract_id)
- .bind(owner_id)
- .fetch_optional(pool)
- .await?
- };
-
sqlx::query_as::<_, File>(
r#"
- INSERT INTO files (owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, repo_file_path)
- VALUES ($1, $2, $3, $4, $5, $6, $7, NULL, $8, $9)
- RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ INSERT INTO files (owner_id, name, description, transcript, location, summary, body, repo_file_path)
+ VALUES ($1, $2, $3, $4, $5, NULL, $6, $7)
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(owner_id)
- .bind(req.contract_id)
- .bind(&contract_phase)
.bind(&name)
.bind(&req.description)
.bind(&transcript_json)
@@ -298,7 +277,7 @@ pub async fn get_file_for_owner(
) -> Result<Option<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
- SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
WHERE id = $1 AND owner_id = $2
"#,
@@ -313,7 +292,7 @@ pub async fn get_file_for_owner(
pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
- SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
WHERE owner_id = $1
ORDER BY created_at DESC
@@ -324,13 +303,10 @@ pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<F
.await
}
-/// Database row type for file summary with contract info
+/// Database row type for file summary
#[derive(Debug, sqlx::FromRow)]
struct FileSummaryRow {
id: Uuid,
- contract_id: Option<Uuid>,
- contract_name: Option<String>,
- contract_phase: Option<String>,
name: String,
description: Option<String>,
#[sqlx(json)]
@@ -342,7 +318,7 @@ struct FileSummaryRow {
updated_at: chrono::DateTime<chrono::Utc>,
}
-/// List file summaries for an owner with contract info (joined).
+/// List file summaries for an owner.
pub async fn list_file_summaries_for_owner(
pool: &PgPool,
owner_id: Uuid,
@@ -350,11 +326,9 @@ pub async fn list_file_summaries_for_owner(
let rows = sqlx::query_as::<_, FileSummaryRow>(
r#"
SELECT
- f.id, f.contract_id, c.name as contract_name, f.contract_phase,
- f.name, f.description, f.transcript, f.version,
+ f.id, f.name, f.description, f.transcript, f.version,
f.repo_file_path, f.repo_sync_status, f.created_at, f.updated_at
FROM files f
- LEFT JOIN contracts c ON f.contract_id = c.id
WHERE f.owner_id = $1
ORDER BY f.created_at DESC
"#,
@@ -373,9 +347,6 @@ pub async fn list_file_summaries_for_owner(
.fold(0.0_f32, f32::max);
FileSummary {
id: row.id,
- contract_id: row.contract_id,
- contract_name: row.contract_name,
- contract_phase: row.contract_phase,
name: row.name,
description: row.description,
transcript_count: row.transcript.len(),
@@ -429,7 +400,7 @@ pub async fn update_file_for_owner(
UPDATE files
SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
WHERE id = $1 AND owner_id = $2 AND version = $8
- RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(id)
@@ -449,7 +420,7 @@ pub async fn update_file_for_owner(
UPDATE files
SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
WHERE id = $1 AND owner_id = $2
- RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(id)
@@ -657,34 +628,24 @@ pub async fn count_file_versions(pool: &PgPool, file_id: Uuid) -> Result<i64, sq
/// Task spawning is now controlled by supervisors at the application level.
/// Depth is no longer constrained in the database.
pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, sqlx::Error> {
- // Calculate depth and inherit settings from parent if applicable
- let (depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
+ // Calculate depth + inherit settings from parent if applicable.
+ let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
if let Some(parent_id) = req.parent_task_id {
- // Fetch parent task to get depth and inherit settings
let parent = get_task(pool, parent_id).await?
.ok_or_else(|| sqlx::Error::RowNotFound)?;
let new_depth = parent.depth + 1;
-
- // Subtasks inherit contract_id from parent (or use request contract_id if parent has none)
- let contract_id = parent.contract_id.or(req.contract_id);
-
- // Inherit repo settings if not provided
let repo_url = req.repository_url.clone().or(parent.repository_url);
let base_branch = req.base_branch.clone().or(parent.base_branch);
let target_branch = req.target_branch.clone().or(parent.target_branch);
let merge_mode = req.merge_mode.clone().or(parent.merge_mode);
let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path);
- // NOTE: completion_action is NOT inherited - subtasks should not auto-merge.
- // The supervisor integrates subtask work from their worktrees.
let completion_action = req.completion_action.clone();
- (new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
+ (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
} else {
- // Top-level task: depth 0, use contract_id from request (may be None for branched tasks)
(
0,
- req.contract_id,
req.repository_url.clone(),
req.base_branch.clone(),
req.target_branch.clone(),
@@ -699,23 +660,21 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task,
sqlx::query_as::<_, Task>(
r#"
INSERT INTO tasks (
- contract_id, parent_task_id, depth, name, description, plan, priority,
- is_supervisor, repository_url, base_branch, target_branch, merge_mode,
+ parent_task_id, depth, name, description, plan, priority,
+ repository_url, base_branch, target_branch, merge_mode,
target_repo_path, completion_action, continue_from_task_id, copy_files,
- branched_from_task_id, conversation_state, supervisor_worktree_task_id
+ branched_from_task_id, conversation_state
)
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19)
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
RETURNING *
"#,
)
- .bind(contract_id)
.bind(req.parent_task_id)
.bind(depth)
.bind(&req.name)
.bind(&req.description)
.bind(&req.plan)
.bind(req.priority)
- .bind(req.is_supervisor)
.bind(&repo_url)
.bind(&base_branch)
.bind(&target_branch)
@@ -726,7 +685,6 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task,
.bind(&copy_files_json)
.bind(&req.branched_from_task_id)
.bind(&req.conversation_history)
- .bind(&req.supervisor_worktree_task_id)
.fetch_one(pool)
.await
}
@@ -751,14 +709,12 @@ pub async fn list_tasks(pool: &PgPool) -> Result<Vec<TaskSummary>, sqlx::Error>
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
+ t.id,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
+ t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id IS NULL AND COALESCE(t.hidden, false) = false
ORDER BY t.priority DESC, t.created_at DESC
"#,
@@ -772,14 +728,12 @@ pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSum
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
+ t.id,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
+ t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id = $1
ORDER BY t.priority DESC, t.created_at DESC
"#,
@@ -790,73 +744,6 @@ pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSum
}
/// List all tasks in a contract (for supervisor tree view).
-pub async fn list_tasks_by_contract(
- pool: &PgPool,
- contract_id: Uuid,
- owner_id: Uuid,
-) -> Result<Vec<Task>, sqlx::Error> {
- sqlx::query_as::<_, Task>(
- r#"
- SELECT * FROM tasks
- WHERE contract_id = $1 AND owner_id = $2
- ORDER BY is_supervisor DESC, depth ASC, created_at ASC
- "#,
- )
- .bind(contract_id)
- .bind(owner_id)
- .fetch_all(pool)
- .await
-}
-
-/// Get pending tasks for a contract (non-supervisor tasks only).
-/// Includes tasks that were interrupted (retry candidates).
-/// Prioritizes interrupted tasks and excludes those that exceeded max_retries.
-pub async fn get_pending_tasks_for_contract(
- pool: &PgPool,
- contract_id: Uuid,
- owner_id: Uuid,
-) -> Result<Vec<Task>, sqlx::Error> {
- sqlx::query_as::<_, Task>(
- r#"
- SELECT t.* FROM tasks t
- WHERE t.contract_id = $1 AND t.owner_id = $2
- AND t.status = 'pending'
- AND t.retry_count < t.max_retries
- AND t.is_supervisor = false
- ORDER BY
- t.interrupted_at DESC NULLS LAST,
- t.priority DESC,
- t.created_at ASC
- "#,
- )
- .bind(contract_id)
- .bind(owner_id)
- .fetch_all(pool)
- .await
-}
-
-/// Get all contracts that have pending tasks awaiting retry.
-/// Returns tuples of (contract_id, owner_id) for contracts with retryable tasks.
-pub async fn get_all_pending_task_contracts(
- pool: &PgPool,
-) -> Result<Vec<(Uuid, Uuid)>, sqlx::Error> {
- sqlx::query_as::<_, (Uuid, Uuid)>(
- r#"
- SELECT DISTINCT t.contract_id, t.owner_id
- FROM tasks t
- WHERE t.contract_id IS NOT NULL
- AND t.status = 'pending'
- AND t.retry_count < t.max_retries
- AND t.is_supervisor = false
- ORDER BY t.owner_id, t.contract_id
- "#,
- )
- .fetch_all(pool)
- .await
-}
-
-/// Mark a task as pending for retry after daemon failure.
-/// Increments retry count and adds the failed daemon to exclusion list.
pub async fn mark_task_for_retry(
pool: &PgPool,
task_id: Uuid,
@@ -1061,16 +948,13 @@ pub async fn create_task_for_owner(
owner_id: Uuid,
req: CreateTaskRequest,
) -> Result<Task, sqlx::Error> {
- // Calculate depth and inherit settings from parent if applicable
- let (depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
+ // Calculate depth + inherit settings from parent if applicable.
+ let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
if let Some(parent_id) = req.parent_task_id {
- // Fetch parent task to get depth and inherit settings (must belong to same owner)
let parent = get_task_for_owner(pool, parent_id, owner_id).await?
.ok_or_else(|| sqlx::Error::RowNotFound)?;
let new_depth = parent.depth + 1;
-
- // Validate max depth
if new_depth >= 2 {
return Err(sqlx::Error::Protocol(format!(
"Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.",
@@ -1078,25 +962,17 @@ pub async fn create_task_for_owner(
)));
}
- // Subtasks inherit contract_id from parent (or use request contract_id if parent has none)
- let contract_id = parent.contract_id.or(req.contract_id);
-
- // Inherit repo settings if not provided
let repo_url = req.repository_url.clone().or(parent.repository_url);
let base_branch = req.base_branch.clone().or(parent.base_branch);
let target_branch = req.target_branch.clone().or(parent.target_branch);
let merge_mode = req.merge_mode.clone().or(parent.merge_mode);
let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path);
- // NOTE: completion_action is NOT inherited - subtasks should not auto-merge.
- // The orchestrator integrates subtask work from their worktrees.
let completion_action = req.completion_action.clone();
- (new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
+ (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
} else {
- // Top-level task: depth 0, use contract_id from request (may be None for branched tasks)
(
0,
- req.contract_id,
req.repository_url.clone(),
req.base_branch.clone(),
req.target_branch.clone(),
@@ -1108,14 +984,11 @@ pub async fn create_task_for_owner(
let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default());
- // Resolve the directive_document_id. Tasks plumbed through this builder
- // currently have no way to specify a document explicitly (we don't want
- // to widen `CreateTaskRequest` for this — every call site would have to
- // change). Instead, when the task is directive-driven (directive_id is
- // set) we attach it to that directive's most recently-updated active
- // document so the task lands under that document's tasks/ subfolder in
- // the sidebar. Resolution failures are non-fatal — the task still gets
- // created with directive_document_id = NULL, matching legacy behaviour.
+ // Resolve directive_document_id from the directive's currently-
+ // active contract row (directive_documents table) so the task
+ // lands under the right tasks/ subfolder in the sidebar. Failures
+ // are non-fatal — the task is created with NULL document_id and
+ // the sidebar tolerates that.
let directive_document_id = match req.directive_id {
Some(directive_id) => resolve_active_document_for_directive(pool, directive_id)
.await
@@ -1126,25 +999,23 @@ pub async fn create_task_for_owner(
sqlx::query_as::<_, Task>(
r#"
INSERT INTO tasks (
- owner_id, contract_id, parent_task_id, depth, name, description, plan, priority,
- is_supervisor, repository_url, base_branch, target_branch, merge_mode,
+ owner_id, parent_task_id, depth, name, description, plan, priority,
+ repository_url, base_branch, target_branch, merge_mode,
target_repo_path, completion_action, continue_from_task_id, copy_files,
- branched_from_task_id, conversation_state, supervisor_worktree_task_id,
+ branched_from_task_id, conversation_state,
directive_id, directive_step_id, directive_document_id
)
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23)
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20)
RETURNING *
"#,
)
.bind(owner_id)
- .bind(contract_id)
.bind(req.parent_task_id)
.bind(depth)
.bind(&req.name)
.bind(&req.description)
.bind(&req.plan)
.bind(req.priority)
- .bind(req.is_supervisor)
.bind(&repo_url)
.bind(&base_branch)
.bind(&target_branch)
@@ -1155,7 +1026,6 @@ pub async fn create_task_for_owner(
.bind(&copy_files_json)
.bind(&req.branched_from_task_id)
.bind(&req.conversation_history)
- .bind(&req.supervisor_worktree_task_id)
.bind(&req.directive_id)
.bind(&req.directive_step_id)
.bind(&directive_document_id)
@@ -1226,14 +1096,12 @@ pub async fn list_tasks_for_owner(
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
+ t.id,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
+ t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.owner_id = $1 AND t.parent_task_id IS NULL AND COALESCE(t.hidden, false) = false
ORDER BY t.priority DESC, t.created_at DESC
"#,
@@ -1335,14 +1203,12 @@ pub async fn list_ephemeral_directive_tasks_for_owner(
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
+ t.id,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
+ t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.owner_id = $1
AND t.directive_id = $2
AND t.directive_step_id IS NULL
@@ -1369,14 +1235,12 @@ pub async fn list_tmp_tasks_for_owner(
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
+ t.id,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
+ t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.owner_id = $1
AND t.directive_id = $2
AND t.parent_task_id IS NULL
@@ -1399,14 +1263,12 @@ pub async fn list_subtasks_for_owner(
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
+ t.id,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
+ t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.owner_id = $1 AND t.parent_task_id = $2
ORDER BY t.priority DESC, t.created_at DESC
"#,
@@ -1920,14 +1782,12 @@ pub async fn list_sibling_tasks(
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
+ t.id,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
+ t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id = $1 AND t.id != $2
ORDER BY t.priority DESC, t.created_at DESC
"#,
@@ -1942,14 +1802,12 @@ pub async fn list_sibling_tasks(
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
+ t.id,
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
+ t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id IS NULL AND t.id != $1
ORDER BY t.priority DESC, t.created_at DESC
"#,
@@ -2121,1287 +1979,28 @@ pub async fn complete_task(
Ok(task)
}
-// =============================================================================
-// Mesh Chat History Functions
-// =============================================================================
-
-/// Get or create the active conversation for an owner.
-pub async fn get_or_create_active_conversation(
- pool: &PgPool,
- owner_id: Uuid,
-) -> Result<MeshChatConversation, sqlx::Error> {
- // Try to get existing active conversation for this owner
- let existing = sqlx::query_as::<_, MeshChatConversation>(
- r#"
- SELECT *
- FROM mesh_chat_conversations
- WHERE is_active = true AND owner_id = $1
- LIMIT 1
- "#,
- )
- .bind(owner_id)
- .fetch_optional(pool)
- .await?;
-
- if let Some(conv) = existing {
- return Ok(conv);
- }
-
- // Create new conversation
- sqlx::query_as::<_, MeshChatConversation>(
- r#"
- INSERT INTO mesh_chat_conversations (owner_id, is_active)
- VALUES ($1, true)
- RETURNING *
- "#,
- )
- .bind(owner_id)
- .fetch_one(pool)
- .await
-}
-
-/// List messages for a conversation.
-pub async fn list_chat_messages(
- pool: &PgPool,
- conversation_id: Uuid,
- limit: Option<i32>,
-) -> Result<Vec<MeshChatMessageRecord>, sqlx::Error> {
- let limit = limit.unwrap_or(100);
- sqlx::query_as::<_, MeshChatMessageRecord>(
- r#"
- SELECT *
- FROM mesh_chat_messages
- WHERE conversation_id = $1
- ORDER BY created_at ASC
- LIMIT $2
- "#,
- )
- .bind(conversation_id)
- .bind(limit)
- .fetch_all(pool)
- .await
-}
-
-/// Add a message to a conversation.
-#[allow(clippy::too_many_arguments)]
-pub async fn add_chat_message(
- pool: &PgPool,
- conversation_id: Uuid,
- role: &str,
- content: &str,
- context_type: &str,
- context_task_id: Option<Uuid>,
- tool_calls: Option<serde_json::Value>,
- pending_questions: Option<serde_json::Value>,
-) -> Result<MeshChatMessageRecord, sqlx::Error> {
- sqlx::query_as::<_, MeshChatMessageRecord>(
- r#"
- INSERT INTO mesh_chat_messages
- (conversation_id, role, content, context_type, context_task_id, tool_calls, pending_questions)
- VALUES ($1, $2, $3, $4, $5, $6, $7)
- RETURNING *
- "#,
- )
- .bind(conversation_id)
- .bind(role)
- .bind(content)
- .bind(context_type)
- .bind(context_task_id)
- .bind(tool_calls)
- .bind(pending_questions)
- .fetch_one(pool)
- .await
-}
-
-/// Clear conversation (archive existing and create new).
-pub async fn clear_conversation(pool: &PgPool, owner_id: Uuid) -> Result<MeshChatConversation, sqlx::Error> {
- // Mark existing as inactive for this owner
- sqlx::query(
- r#"
- UPDATE mesh_chat_conversations
- SET is_active = false, updated_at = NOW()
- WHERE is_active = true AND owner_id = $1
- "#,
- )
- .bind(owner_id)
- .execute(pool)
- .await?;
-
- // Create new active conversation
- get_or_create_active_conversation(pool, owner_id).await
-}
-
-// =============================================================================
-// Contract Chat History Functions
-// =============================================================================
-
-/// Get or create the active conversation for a contract.
-pub async fn get_or_create_contract_conversation(
- pool: &PgPool,
- contract_id: Uuid,
- owner_id: Uuid,
-) -> Result<ContractChatConversation, sqlx::Error> {
- // Try to get existing active conversation for this contract
- let existing = sqlx::query_as::<_, ContractChatConversation>(
- r#"
- SELECT *
- FROM contract_chat_conversations
- WHERE is_active = true AND contract_id = $1 AND owner_id = $2
- LIMIT 1
- "#,
- )
- .bind(contract_id)
- .bind(owner_id)
- .fetch_optional(pool)
- .await?;
-
- if let Some(conv) = existing {
- return Ok(conv);
- }
-
- // Create new conversation
- sqlx::query_as::<_, ContractChatConversation>(
- r#"
- INSERT INTO contract_chat_conversations (contract_id, owner_id, is_active)
- VALUES ($1, $2, true)
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(owner_id)
- .fetch_one(pool)
- .await
-}
-
-/// List messages for a contract conversation.
-pub async fn list_contract_chat_messages(
- pool: &PgPool,
- conversation_id: Uuid,
- limit: Option<i32>,
-) -> Result<Vec<ContractChatMessageRecord>, sqlx::Error> {
- let limit = limit.unwrap_or(100);
- sqlx::query_as::<_, ContractChatMessageRecord>(
- r#"
- SELECT *
- FROM contract_chat_messages
- WHERE conversation_id = $1
- ORDER BY created_at ASC
- LIMIT $2
- "#,
- )
- .bind(conversation_id)
- .bind(limit)
- .fetch_all(pool)
- .await
-}
-
-/// Add a message to a contract conversation.
-pub async fn add_contract_chat_message(
- pool: &PgPool,
- conversation_id: Uuid,
- role: &str,
- content: &str,
- tool_calls: Option<serde_json::Value>,
- pending_questions: Option<serde_json::Value>,
-) -> Result<ContractChatMessageRecord, sqlx::Error> {
- sqlx::query_as::<_, ContractChatMessageRecord>(
- r#"
- INSERT INTO contract_chat_messages
- (conversation_id, role, content, tool_calls, pending_questions)
- VALUES ($1, $2, $3, $4, $5)
- RETURNING *
- "#,
- )
- .bind(conversation_id)
- .bind(role)
- .bind(content)
- .bind(tool_calls)
- .bind(pending_questions)
- .fetch_one(pool)
- .await
-}
-
-/// Clear contract conversation (archive existing and create new).
-pub async fn clear_contract_conversation(
- pool: &PgPool,
- contract_id: Uuid,
- owner_id: Uuid,
-) -> Result<ContractChatConversation, sqlx::Error> {
- // Mark existing as inactive for this contract
- sqlx::query(
- r#"
- UPDATE contract_chat_conversations
- SET is_active = false, updated_at = NOW()
- WHERE is_active = true AND contract_id = $1 AND owner_id = $2
- "#,
- )
- .bind(contract_id)
- .bind(owner_id)
- .execute(pool)
- .await?;
-
- // Create new active conversation
- get_or_create_contract_conversation(pool, contract_id, owner_id).await
-}
-
-// =============================================================================
-// Contract Type Template Functions (Owner-Scoped)
-// =============================================================================
-
-/// Create a new contract type template for a specific owner.
-pub async fn create_template_for_owner(
- pool: &PgPool,
- owner_id: Uuid,
- req: CreateTemplateRequest,
-) -> Result<ContractTypeTemplateRecord, sqlx::Error> {
- sqlx::query_as::<_, ContractTypeTemplateRecord>(
- r#"
- INSERT INTO contract_type_templates (owner_id, name, description, phases, default_phase, deliverables)
- VALUES ($1, $2, $3, $4, $5, $6)
- RETURNING *
- "#,
- )
- .bind(owner_id)
- .bind(&req.name)
- .bind(&req.description)
- .bind(serde_json::to_value(&req.phases).unwrap_or_default())
- .bind(&req.default_phase)
- .bind(match &req.deliverables {
- Some(d) => serde_json::to_value(d).ok(),
- None => None,
- })
- .fetch_one(pool)
- .await
-}
-
-/// Get a contract type template by ID, scoped to owner.
-pub async fn get_template_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
-) -> Result<Option<ContractTypeTemplateRecord>, sqlx::Error> {
- sqlx::query_as::<_, ContractTypeTemplateRecord>(
- r#"
- SELECT *
- FROM contract_type_templates
- WHERE id = $1 AND owner_id = $2
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Get a contract type template by ID (internal use, no owner scoping).
-pub async fn get_template_by_id(
- pool: &PgPool,
- id: Uuid,
-) -> Result<Option<ContractTypeTemplateRecord>, sqlx::Error> {
- sqlx::query_as::<_, ContractTypeTemplateRecord>(
- r#"
- SELECT *
- FROM contract_type_templates
- WHERE id = $1
- "#,
- )
- .bind(id)
- .fetch_optional(pool)
- .await
-}
-
-/// List all contract type templates for an owner, ordered by name.
-pub async fn list_templates_for_owner(
- pool: &PgPool,
- owner_id: Uuid,
-) -> Result<Vec<ContractTypeTemplateRecord>, sqlx::Error> {
- sqlx::query_as::<_, ContractTypeTemplateRecord>(
- r#"
- SELECT *
- FROM contract_type_templates
- WHERE owner_id = $1
- ORDER BY name ASC
- "#,
- )
- .bind(owner_id)
- .fetch_all(pool)
- .await
-}
-
-/// Update a contract type template for an owner.
-pub async fn update_template_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
- req: UpdateTemplateRequest,
-) -> Result<Option<ContractTypeTemplateRecord>, RepositoryError> {
- // Build dynamic update query
- let mut query = String::from("UPDATE contract_type_templates SET updated_at = NOW()");
- let mut param_idx = 3; // $1 = id, $2 = owner_id
-
- if req.name.is_some() {
- query.push_str(&format!(", name = ${}", param_idx));
- param_idx += 1;
- }
- if req.description.is_some() {
- query.push_str(&format!(", description = ${}", param_idx));
- param_idx += 1;
- }
- if req.phases.is_some() {
- query.push_str(&format!(", phases = ${}", param_idx));
- param_idx += 1;
- }
- if req.default_phase.is_some() {
- query.push_str(&format!(", default_phase = ${}", param_idx));
- param_idx += 1;
- }
- if req.deliverables.is_some() {
- query.push_str(&format!(", deliverables = ${}", param_idx));
- param_idx += 1;
- }
-
- // Optimistic locking
- if req.version.is_some() {
- query.push_str(&format!(", version = version + 1 WHERE id = $1 AND owner_id = $2 AND version = ${}", param_idx));
- } else {
- query.push_str(", version = version + 1 WHERE id = $1 AND owner_id = $2");
- }
- query.push_str(" RETURNING *");
-
- let mut sql_query = sqlx::query_as::<_, ContractTypeTemplateRecord>(&query);
- sql_query = sql_query.bind(id).bind(owner_id);
-
- if let Some(ref name) = req.name {
- sql_query = sql_query.bind(name);
- }
- if let Some(ref description) = req.description {
- sql_query = sql_query.bind(description);
- }
- if let Some(ref phases) = req.phases {
- sql_query = sql_query.bind(serde_json::to_value(phases).unwrap_or_default());
- }
- if let Some(ref default_phase) = req.default_phase {
- sql_query = sql_query.bind(default_phase);
- }
- if let Some(ref deliverables) = req.deliverables {
- sql_query = sql_query.bind(serde_json::to_value(deliverables).unwrap_or_default());
- }
- if let Some(version) = req.version {
- sql_query = sql_query.bind(version);
- }
-
- match sql_query.fetch_optional(pool).await {
- Ok(result) => {
- if result.is_none() && req.version.is_some() {
- // Check if it's a version conflict
- if let Some(current) = get_template_for_owner(pool, id, owner_id).await? {
- return Err(RepositoryError::VersionConflict {
- expected: req.version.unwrap(),
- actual: current.version,
- });
- }
- }
- Ok(result)
- }
- Err(e) => Err(RepositoryError::Database(e)),
- }
-}
-
-/// Delete a contract type template for an owner.
-pub async fn delete_template_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
-) -> Result<bool, sqlx::Error> {
- let result = sqlx::query(
- r#"
- DELETE FROM contract_type_templates
- WHERE id = $1 AND owner_id = $2
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected() > 0)
-}
-
-/// Helper function to build PhaseConfig from a template.
-pub fn build_phase_config_from_template(template: &ContractTypeTemplateRecord) -> PhaseConfig {
- PhaseConfig {
- phases: template.phases.clone(),
- default_phase: template.default_phase.clone(),
- deliverables: template.deliverables.clone().unwrap_or_default(),
- }
-}
-
-/// Helper function to build PhaseConfig for built-in contract types.
-pub fn build_phase_config_for_builtin(contract_type: &str) -> PhaseConfig {
- match contract_type {
- "simple" => PhaseConfig {
- phases: vec![
- PhaseDefinition { id: "plan".to_string(), name: "Plan".to_string(), order: 0 },
- PhaseDefinition { id: "execute".to_string(), name: "Execute".to_string(), order: 1 },
- ],
- default_phase: "plan".to_string(),
- deliverables: [
- ("plan".to_string(), vec![DeliverableDefinition {
- id: "plan-document".to_string(),
- name: "Plan".to_string(),
- priority: "required".to_string(),
- }]),
- ("execute".to_string(), vec![DeliverableDefinition {
- id: "pull-request".to_string(),
- name: "Pull Request".to_string(),
- priority: "required".to_string(),
- }]),
- ].into_iter().collect(),
- },
- "specification" => PhaseConfig {
- phases: vec![
- PhaseDefinition { id: "research".to_string(), name: "Research".to_string(), order: 0 },
- PhaseDefinition { id: "specify".to_string(), name: "Specify".to_string(), order: 1 },
- PhaseDefinition { id: "plan".to_string(), name: "Plan".to_string(), order: 2 },
- PhaseDefinition { id: "execute".to_string(), name: "Execute".to_string(), order: 3 },
- PhaseDefinition { id: "review".to_string(), name: "Review".to_string(), order: 4 },
- ],
- default_phase: "research".to_string(),
- deliverables: [
- ("research".to_string(), vec![DeliverableDefinition {
- id: "research-notes".to_string(),
- name: "Research Notes".to_string(),
- priority: "required".to_string(),
- }]),
- ("specify".to_string(), vec![DeliverableDefinition {
- id: "requirements-document".to_string(),
- name: "Requirements Document".to_string(),
- priority: "required".to_string(),
- }]),
- ("plan".to_string(), vec![DeliverableDefinition {
- id: "plan-document".to_string(),
- name: "Plan".to_string(),
- priority: "required".to_string(),
- }]),
- ("execute".to_string(), vec![DeliverableDefinition {
- id: "pull-request".to_string(),
- name: "Pull Request".to_string(),
- priority: "required".to_string(),
- }]),
- ("review".to_string(), vec![DeliverableDefinition {
- id: "release-notes".to_string(),
- name: "Release Notes".to_string(),
- priority: "required".to_string(),
- }]),
- ].into_iter().collect(),
- },
- "execute" | _ => PhaseConfig {
- phases: vec![
- PhaseDefinition { id: "execute".to_string(), name: "Execute".to_string(), order: 0 },
- ],
- default_phase: "execute".to_string(),
- deliverables: std::collections::HashMap::new(),
- },
- }
-}
-
-// =============================================================================
-// Contract Functions (Owner-Scoped)
-// =============================================================================
-
-/// Create a new contract for a specific owner.
-/// Supports both built-in contract types (simple, specification, execute) and custom templates.
-pub async fn create_contract_for_owner(
- pool: &PgPool,
- owner_id: Uuid,
- req: CreateContractRequest,
-) -> Result<Contract, sqlx::Error> {
- // Determine phase configuration based on template_id or contract_type
- let (phase_config, contract_type_str, default_phase): (PhaseConfig, String, String) =
- if let Some(template_id) = req.template_id {
- // Look up the custom template
- let template = get_template_by_id(pool, template_id)
- .await?
- .ok_or_else(|| {
- sqlx::Error::Protocol(format!("Template not found: {}", template_id))
- })?;
-
- let config = build_phase_config_from_template(&template);
- let default = config.default_phase.clone();
- // For custom templates, store the template name as the contract_type
- (config, template.name.clone(), default)
- } else {
- // Use built-in contract type
- let contract_type = req.contract_type.as_deref().unwrap_or("simple");
-
- // Validate contract type
- let valid_types = ["simple", "specification", "execute"];
- if !valid_types.contains(&contract_type) {
- return Err(sqlx::Error::Protocol(format!(
- "Invalid contract_type '{}'. Must be one of: {} or provide a template_id",
- contract_type,
- valid_types.join(", ")
- )));
- }
-
- let config = build_phase_config_for_builtin(contract_type);
- let default = config.default_phase.clone();
- (config, contract_type.to_string(), default)
- };
-
- // Get valid phase IDs from the configuration
- let valid_phase_ids: Vec<String> = phase_config.phases.iter().map(|p| p.id.clone()).collect();
-
- // Use provided initial_phase or default based on contract type/template
- let phase = req.initial_phase.as_deref().unwrap_or(&default_phase);
-
- // Validate the phase is valid for this contract type/template
- if !valid_phase_ids.contains(&phase.to_string()) {
- return Err(sqlx::Error::Protocol(format!(
- "Invalid initial_phase '{}' for contract type '{}'. Must be one of: {}",
- phase,
- contract_type_str,
- valid_phase_ids.join(", ")
- )));
- }
-
- let autonomous_loop = req.autonomous_loop.unwrap_or(false);
- let phase_guard = req.phase_guard.unwrap_or(false);
- let local_only = req.local_only.unwrap_or(false);
- let auto_merge_local = req.auto_merge_local.unwrap_or(false);
-
- // Serialize phase_config to JSON
- let phase_config_json = serde_json::to_value(&phase_config).ok();
-
- sqlx::query_as::<_, Contract>(
- r#"
- INSERT INTO contracts (owner_id, name, description, contract_type, phase, autonomous_loop, phase_guard, local_only, auto_merge_local, phase_config)
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
- RETURNING *
- "#,
- )
- .bind(owner_id)
- .bind(&req.name)
- .bind(&req.description)
- .bind(&contract_type_str)
- .bind(phase)
- .bind(autonomous_loop)
- .bind(phase_guard)
- .bind(local_only)
- .bind(auto_merge_local)
- .bind(phase_config_json)
- .fetch_one(pool)
- .await
-}
-
-/// Get a contract by ID, scoped to owner.
-pub async fn get_contract_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
-) -> Result<Option<Contract>, sqlx::Error> {
- sqlx::query_as::<_, Contract>(
- r#"
- SELECT *
- FROM contracts
- WHERE id = $1 AND owner_id = $2
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .fetch_optional(pool)
- .await
-}
-
-/// List all contracts for an owner, ordered by created_at DESC.
-pub async fn list_contracts_for_owner(
- pool: &PgPool,
- owner_id: Uuid,
-) -> Result<Vec<ContractSummary>, sqlx::Error> {
- sqlx::query_as::<_, ContractSummary>(
- r#"
- SELECT
- c.id, c.name, c.description, c.contract_type, c.phase, c.status,
- c.supervisor_task_id, c.local_only, c.auto_merge_local, c.version, c.created_at,
- (SELECT COUNT(*) FROM files WHERE contract_id = c.id) as file_count,
- (SELECT COUNT(*) FROM tasks WHERE contract_id = c.id) as task_count,
- (SELECT COUNT(*) FROM contract_repositories WHERE contract_id = c.id) as repository_count
- FROM contracts c
- WHERE c.owner_id = $1
- ORDER BY c.created_at DESC
- "#,
- )
- .bind(owner_id)
- .fetch_all(pool)
- .await
-}
-
-/// Get contract summary by ID.
-pub async fn get_contract_summary_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
-) -> Result<Option<ContractSummary>, sqlx::Error> {
- sqlx::query_as::<_, ContractSummary>(
- r#"
- SELECT
- c.id, c.name, c.description, c.contract_type, c.phase, c.status,
- c.supervisor_task_id, c.local_only, c.auto_merge_local, c.version, c.created_at,
- (SELECT COUNT(*) FROM files WHERE contract_id = c.id) as file_count,
- (SELECT COUNT(*) FROM tasks WHERE contract_id = c.id) as task_count,
- (SELECT COUNT(*) FROM contract_repositories WHERE contract_id = c.id) as repository_count
- FROM contracts c
- WHERE c.id = $1 AND c.owner_id = $2
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Update a contract by ID with optimistic locking, scoped to owner.
-pub async fn update_contract_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
- req: UpdateContractRequest,
-) -> Result<Option<Contract>, RepositoryError> {
- let existing = get_contract_for_owner(pool, id, owner_id).await?;
- let Some(existing) = existing else {
- return Ok(None);
- };
-
- // Check version if provided (optimistic locking)
- if let Some(expected_version) = req.version {
- if existing.version != expected_version {
- return Err(RepositoryError::VersionConflict {
- expected: expected_version,
- actual: existing.version,
- });
- }
- }
-
- // Apply updates
- let name = req.name.unwrap_or(existing.name);
- let description = req.description.or(existing.description);
- let phase = req.phase.unwrap_or(existing.phase);
- let status = req.status.unwrap_or(existing.status);
- let supervisor_task_id = req.supervisor_task_id.or(existing.supervisor_task_id);
- let autonomous_loop = req.autonomous_loop.unwrap_or(existing.autonomous_loop);
- let phase_guard = req.phase_guard.unwrap_or(existing.phase_guard);
- let local_only = req.local_only.unwrap_or(existing.local_only);
- let auto_merge_local = req.auto_merge_local.unwrap_or(existing.auto_merge_local);
-
- let result = if req.version.is_some() {
- sqlx::query_as::<_, Contract>(
- r#"
- UPDATE contracts
- SET name = $3, description = $4, phase = $5, status = $6,
- supervisor_task_id = $7, autonomous_loop = $8, phase_guard = $9, local_only = $10, auto_merge_local = $11, version = version + 1, updated_at = NOW()
- WHERE id = $1 AND owner_id = $2 AND version = $12
- RETURNING *
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .bind(&name)
- .bind(&description)
- .bind(&phase)
- .bind(&status)
- .bind(supervisor_task_id)
- .bind(autonomous_loop)
- .bind(phase_guard)
- .bind(local_only)
- .bind(auto_merge_local)
- .bind(req.version.unwrap())
- .fetch_optional(pool)
- .await?
- } else {
- sqlx::query_as::<_, Contract>(
- r#"
- UPDATE contracts
- SET name = $3, description = $4, phase = $5, status = $6,
- supervisor_task_id = $7, autonomous_loop = $8, phase_guard = $9, local_only = $10, auto_merge_local = $11, version = version + 1, updated_at = NOW()
- WHERE id = $1 AND owner_id = $2
- RETURNING *
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .bind(&name)
- .bind(&description)
- .bind(&phase)
- .bind(&status)
- .bind(supervisor_task_id)
- .bind(autonomous_loop)
- .bind(phase_guard)
- .bind(local_only)
- .bind(auto_merge_local)
- .fetch_optional(pool)
- .await?
- };
-
- // If versioned update returned None, there was a race condition
- if result.is_none() && req.version.is_some() {
- if let Some(current) = get_contract_for_owner(pool, id, owner_id).await? {
- return Err(RepositoryError::VersionConflict {
- expected: req.version.unwrap(),
- actual: current.version,
- });
- }
- }
-
- Ok(result)
-}
-
-/// Delete a contract by ID, scoped to owner.
-pub async fn delete_contract_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
-) -> Result<bool, sqlx::Error> {
- let result = sqlx::query(
- r#"
- DELETE FROM contracts
- WHERE id = $1 AND owner_id = $2
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected() > 0)
-}
-
-/// Change contract phase and record event.
-///
-/// This is the simple version without version checking. Use `change_contract_phase_with_version`
-/// for explicit version conflict detection.
-pub async fn change_contract_phase_for_owner(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
- new_phase: &str,
-) -> Result<Option<Contract>, sqlx::Error> {
- // Get current phase
- let existing = get_contract_for_owner(pool, id, owner_id).await?;
- let Some(existing) = existing else {
- return Ok(None);
- };
-
- let previous_phase = existing.phase.clone();
-
- // Update phase
- let contract = sqlx::query_as::<_, Contract>(
- r#"
- UPDATE contracts
- SET phase = $3, version = version + 1, updated_at = NOW()
- WHERE id = $1 AND owner_id = $2
- RETURNING *
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .bind(new_phase)
- .fetch_optional(pool)
- .await?;
-
- // Record event
- if contract.is_some() {
- sqlx::query(
- r#"
- INSERT INTO contract_events (contract_id, event_type, previous_phase, new_phase)
- VALUES ($1, 'phase_change', $2, $3)
- "#,
- )
- .bind(id)
- .bind(&previous_phase)
- .bind(new_phase)
- .execute(pool)
- .await?;
- }
-
- Ok(contract)
-}
-
-/// Change contract phase with explicit version checking for conflict detection.
-///
-/// Uses `SELECT ... FOR UPDATE` to lock the row and prevent race conditions.
-/// Returns `PhaseChangeResult::VersionConflict` if the expected version doesn't match.
-pub async fn change_contract_phase_with_version(
- pool: &PgPool,
- id: Uuid,
- owner_id: Uuid,
- new_phase: &str,
- expected_version: Option<i32>,
-) -> Result<PhaseChangeResult, sqlx::Error> {
- // Start a transaction to ensure atomicity with row locking
- let mut tx = pool.begin().await?;
-
- // Lock the row with SELECT FOR UPDATE and get current state
- let existing: Option<Contract> = sqlx::query_as::<_, Contract>(
- r#"
- SELECT *
- FROM contracts
- WHERE id = $1 AND owner_id = $2
- FOR UPDATE
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .fetch_optional(&mut *tx)
- .await?;
-
- let Some(existing) = existing else {
- tx.rollback().await?;
- return Ok(PhaseChangeResult::NotFound);
- };
-
- // Check version if provided (optimistic locking)
- if let Some(expected) = expected_version {
- if existing.version != expected {
- tx.rollback().await?;
- return Ok(PhaseChangeResult::VersionConflict {
- expected,
- actual: existing.version,
- current_phase: existing.phase,
- });
- }
- }
-
- // Validate the phase transition is allowed
- let valid_phases = existing.valid_phase_ids();
- if !valid_phases.contains(&new_phase.to_string()) {
- tx.rollback().await?;
- return Ok(PhaseChangeResult::ValidationFailed {
- reason: format!(
- "Invalid phase '{}' for contract type '{}'",
- new_phase, existing.contract_type
- ),
- missing_requirements: vec![format!(
- "Phase must be one of: {}",
- valid_phases.join(", ")
- )],
- });
- }
-
- let previous_phase = existing.phase.clone();
-
- // Update phase with version increment
- let contract = sqlx::query_as::<_, Contract>(
- r#"
- UPDATE contracts
- SET phase = $3, version = version + 1, updated_at = NOW()
- WHERE id = $1 AND owner_id = $2
- RETURNING *
- "#,
- )
- .bind(id)
- .bind(owner_id)
- .bind(new_phase)
- .fetch_one(&mut *tx)
- .await?;
-
- // Record event
- sqlx::query(
- r#"
- INSERT INTO contract_events (contract_id, event_type, previous_phase, new_phase)
- VALUES ($1, 'phase_change', $2, $3)
- "#,
- )
- .bind(id)
- .bind(&previous_phase)
- .bind(new_phase)
- .execute(&mut *tx)
- .await?;
-
- // Commit the transaction
- tx.commit().await?;
-
- Ok(PhaseChangeResult::Success(contract))
-}
-
-// =============================================================================
-// Contract Repository Functions
-// =============================================================================
-
-/// List repositories for a contract.
-pub async fn list_contract_repositories(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Vec<ContractRepository>, sqlx::Error> {
- sqlx::query_as::<_, ContractRepository>(
- r#"
- SELECT *
- FROM contract_repositories
- WHERE contract_id = $1
- ORDER BY is_primary DESC, created_at ASC
- "#,
- )
- .bind(contract_id)
- .fetch_all(pool)
- .await
-}
-
-/// Add a remote repository to a contract.
-pub async fn add_remote_repository(
- pool: &PgPool,
- contract_id: Uuid,
- name: &str,
- repository_url: &str,
- is_primary: bool,
-) -> Result<ContractRepository, sqlx::Error> {
- // If is_primary, clear other primaries first
- if is_primary {
- sqlx::query(
- r#"
- UPDATE contract_repositories
- SET is_primary = false, updated_at = NOW()
- WHERE contract_id = $1 AND is_primary = true
- "#,
- )
- .bind(contract_id)
- .execute(pool)
- .await?;
- }
- sqlx::query_as::<_, ContractRepository>(
- r#"
- INSERT INTO contract_repositories (contract_id, name, repository_url, source_type, status, is_primary)
- VALUES ($1, $2, $3, 'remote', 'ready', $4)
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(name)
- .bind(repository_url)
- .bind(is_primary)
- .fetch_one(pool)
- .await
-}
-
-/// Add a local repository to a contract.
-pub async fn add_local_repository(
- pool: &PgPool,
- contract_id: Uuid,
- name: &str,
- local_path: &str,
- is_primary: bool,
-) -> Result<ContractRepository, sqlx::Error> {
- // If is_primary, clear other primaries first
- if is_primary {
- sqlx::query(
- r#"
- UPDATE contract_repositories
- SET is_primary = false, updated_at = NOW()
- WHERE contract_id = $1 AND is_primary = true
- "#,
- )
- .bind(contract_id)
- .execute(pool)
- .await?;
- }
-
- sqlx::query_as::<_, ContractRepository>(
+/// Get task tree from a specific root task.
+pub async fn get_task_tree(pool: &PgPool, root_task_id: Uuid) -> Result<Vec<Task>, sqlx::Error> {
+ sqlx::query_as::<_, Task>(
r#"
- INSERT INTO contract_repositories (contract_id, name, local_path, source_type, status, is_primary)
- VALUES ($1, $2, $3, 'local', 'ready', $4)
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(name)
- .bind(local_path)
- .bind(is_primary)
- .fetch_one(pool)
- .await
-}
-
-/// Create a managed repository (daemon will create it).
-pub async fn create_managed_repository(
- pool: &PgPool,
- contract_id: Uuid,
- name: &str,
- is_primary: bool,
-) -> Result<ContractRepository, sqlx::Error> {
- // If is_primary, clear other primaries first
- if is_primary {
- sqlx::query(
- r#"
- UPDATE contract_repositories
- SET is_primary = false, updated_at = NOW()
- WHERE contract_id = $1 AND is_primary = true
- "#,
+ WITH RECURSIVE task_tree AS (
+ -- Base case: the root task
+ SELECT * FROM tasks WHERE id = $1
+ UNION ALL
+ -- Recursive case: children of current level
+ SELECT t.* FROM tasks t
+ JOIN task_tree tt ON t.parent_task_id = tt.id
)
- .bind(contract_id)
- .execute(pool)
- .await?;
- }
-
- sqlx::query_as::<_, ContractRepository>(
- r#"
- INSERT INTO contract_repositories (contract_id, name, source_type, status, is_primary)
- VALUES ($1, $2, 'managed', 'pending', $3)
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(name)
- .bind(is_primary)
- .fetch_one(pool)
- .await
-}
-
-/// Delete a repository from a contract.
-pub async fn delete_contract_repository(
- pool: &PgPool,
- repo_id: Uuid,
- contract_id: Uuid,
-) -> Result<bool, sqlx::Error> {
- let result = sqlx::query(
- r#"
- DELETE FROM contract_repositories
- WHERE id = $1 AND contract_id = $2
- "#,
- )
- .bind(repo_id)
- .bind(contract_id)
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected() > 0)
-}
-
-/// Set a repository as primary (and clear others).
-pub async fn set_repository_primary(
- pool: &PgPool,
- repo_id: Uuid,
- contract_id: Uuid,
-) -> Result<bool, sqlx::Error> {
- // Clear other primaries
- sqlx::query(
- r#"
- UPDATE contract_repositories
- SET is_primary = false, updated_at = NOW()
- WHERE contract_id = $1 AND is_primary = true
- "#,
- )
- .bind(contract_id)
- .execute(pool)
- .await?;
-
- // Set this one as primary
- let result = sqlx::query(
- r#"
- UPDATE contract_repositories
- SET is_primary = true, updated_at = NOW()
- WHERE id = $1 AND contract_id = $2
- "#,
- )
- .bind(repo_id)
- .bind(contract_id)
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected() > 0)
-}
-
-/// Update managed repository status (used by daemon).
-pub async fn update_managed_repository_status(
- pool: &PgPool,
- repo_id: Uuid,
- status: &str,
- repository_url: Option<&str>,
-) -> Result<Option<ContractRepository>, sqlx::Error> {
- sqlx::query_as::<_, ContractRepository>(
- r#"
- UPDATE contract_repositories
- SET status = $2, repository_url = COALESCE($3, repository_url), updated_at = NOW()
- WHERE id = $1
- RETURNING *
- "#,
- )
- .bind(repo_id)
- .bind(status)
- .bind(repository_url)
- .fetch_optional(pool)
- .await
-}
-
-// =============================================================================
-// Contract Task Association Functions
-// =============================================================================
-
-/// Add a task to a contract.
-pub async fn add_task_to_contract(
- pool: &PgPool,
- contract_id: Uuid,
- task_id: Uuid,
- owner_id: Uuid,
-) -> Result<bool, sqlx::Error> {
- let result = sqlx::query(
- r#"
- UPDATE tasks
- SET contract_id = $2, updated_at = NOW()
- WHERE id = $1 AND owner_id = $3
- "#,
- )
- .bind(task_id)
- .bind(contract_id)
- .bind(owner_id)
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected() > 0)
-}
-
-/// Remove a task from a contract.
-pub async fn remove_task_from_contract(
- pool: &PgPool,
- contract_id: Uuid,
- task_id: Uuid,
- owner_id: Uuid,
-) -> Result<bool, sqlx::Error> {
- let result = sqlx::query(
- r#"
- UPDATE tasks
- SET contract_id = NULL, updated_at = NOW()
- WHERE id = $1 AND contract_id = $2 AND owner_id = $3
- "#,
- )
- .bind(task_id)
- .bind(contract_id)
- .bind(owner_id)
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected() > 0)
-}
-
-/// List files in a contract.
-pub async fn list_files_in_contract(
- pool: &PgPool,
- contract_id: Uuid,
- owner_id: Uuid,
-) -> Result<Vec<FileSummary>, sqlx::Error> {
- // Use a manual query since FileSummary doesn't have a FromRow derive with all the computed fields
- let files = sqlx::query_as::<_, File>(
- r#"
- SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
- FROM files
- WHERE contract_id = $1 AND owner_id = $2
- ORDER BY created_at DESC
- "#,
- )
- .bind(contract_id)
- .bind(owner_id)
- .fetch_all(pool)
- .await?;
-
- Ok(files.into_iter().map(FileSummary::from).collect())
-}
-
-/// List tasks in a contract.
-pub async fn list_tasks_in_contract(
- pool: &PgPool,
- contract_id: Uuid,
- owner_id: Uuid,
-) -> Result<Vec<TaskSummary>, sqlx::Error> {
- sqlx::query_as::<_, TaskSummary>(
- r#"
- SELECT
- t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
- c.status as contract_status,
- t.parent_task_id, t.depth, t.name, t.status, t.priority,
- t.progress_summary,
- (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
- FROM tasks t
- LEFT JOIN contracts c ON t.contract_id = c.id
- WHERE t.contract_id = $1 AND t.owner_id = $2
- ORDER BY t.priority DESC, t.created_at DESC
- "#,
- )
- .bind(contract_id)
- .bind(owner_id)
- .fetch_all(pool)
- .await
-}
-
-/// Minimal task info for worktree cleanup operations.
-#[derive(Debug, Clone, sqlx::FromRow)]
-pub struct TaskWorktreeInfo {
- pub id: Uuid,
- pub daemon_id: Option<Uuid>,
- pub overlay_path: Option<String>,
- /// If set, this task shares the worktree of the specified supervisor task.
- /// Should NOT have its worktree deleted during cleanup.
- pub supervisor_worktree_task_id: Option<Uuid>,
-}
-
-/// List tasks in a contract with their daemon/worktree info.
-/// Used for cleaning up worktrees when a contract is completed or deleted.
-pub async fn list_contract_tasks_with_worktree_info(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Vec<TaskWorktreeInfo>, sqlx::Error> {
- sqlx::query_as::<_, TaskWorktreeInfo>(
- r#"
- SELECT id, daemon_id, overlay_path, supervisor_worktree_task_id
- FROM tasks
- WHERE contract_id = $1 AND (daemon_id IS NOT NULL OR overlay_path IS NOT NULL)
- "#,
- )
- .bind(contract_id)
- .fetch_all(pool)
- .await
-}
-
-// =============================================================================
-// Contract Events
-// =============================================================================
-
-/// List events for a contract.
-pub async fn list_contract_events(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Vec<ContractEvent>, sqlx::Error> {
- sqlx::query_as::<_, ContractEvent>(
- r#"
- SELECT *
- FROM contract_events
- WHERE contract_id = $1
- ORDER BY created_at DESC
+ SELECT * FROM task_tree
+ ORDER BY depth, created_at
"#,
)
- .bind(contract_id)
+ .bind(root_task_id)
.fetch_all(pool)
.await
}
-/// Record a contract event.
-pub async fn record_contract_event(
- pool: &PgPool,
- contract_id: Uuid,
- event_type: &str,
- event_data: Option<serde_json::Value>,
-) -> Result<ContractEvent, sqlx::Error> {
- sqlx::query_as::<_, ContractEvent>(
- r#"
- INSERT INTO contract_events (contract_id, event_type, event_data)
- VALUES ($1, $2, $3)
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(event_type)
- .bind(event_data)
- .fetch_one(pool)
- .await
-}
-
// ============================================================================
// Task Checkpoints
// ============================================================================
@@ -3501,713 +2100,6 @@ pub async fn list_task_checkpoints(
}
// ============================================================================
-// Supervisor State
-// ============================================================================
-
-/// Create or update supervisor state for a contract.
-pub async fn upsert_supervisor_state(
- pool: &PgPool,
- contract_id: Uuid,
- task_id: Uuid,
- conversation_history: serde_json::Value,
- pending_task_ids: &[Uuid],
- phase: &str,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- INSERT INTO supervisor_states (contract_id, task_id, conversation_history, pending_task_ids, phase, last_activity)
- VALUES ($1, $2, $3, $4, $5, NOW())
- ON CONFLICT (contract_id) DO UPDATE SET
- task_id = EXCLUDED.task_id,
- conversation_history = EXCLUDED.conversation_history,
- pending_task_ids = EXCLUDED.pending_task_ids,
- phase = EXCLUDED.phase,
- last_activity = NOW(),
- updated_at = NOW()
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(task_id)
- .bind(conversation_history)
- .bind(pending_task_ids)
- .bind(phase)
- .fetch_one(pool)
- .await
-}
-
-/// Get supervisor state for a contract.
-pub async fn get_supervisor_state(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Option<SupervisorState>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>("SELECT * FROM supervisor_states WHERE contract_id = $1")
- .bind(contract_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Get supervisor state by task ID.
-pub async fn get_supervisor_state_by_task(
- pool: &PgPool,
- task_id: Uuid,
-) -> Result<Option<SupervisorState>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>("SELECT * FROM supervisor_states WHERE task_id = $1")
- .bind(task_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Update supervisor conversation history.
-pub async fn update_supervisor_conversation(
- pool: &PgPool,
- contract_id: Uuid,
- conversation_history: serde_json::Value,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET conversation_history = $1,
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $2
- RETURNING *
- "#,
- )
- .bind(conversation_history)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Update supervisor pending tasks.
-pub async fn update_supervisor_pending_tasks(
- pool: &PgPool,
- contract_id: Uuid,
- pending_task_ids: &[Uuid],
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET pending_task_ids = $1,
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $2
- RETURNING *
- "#,
- )
- .bind(pending_task_ids)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Update supervisor state with detailed activity tracking.
-/// Called at key save points: LLM response, task spawn, question asked, phase change.
-pub async fn update_supervisor_detailed_state(
- pool: &PgPool,
- contract_id: Uuid,
- state: &str,
- current_activity: Option<&str>,
- progress: i32,
- error_message: Option<&str>,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET state = $1,
- current_activity = $2,
- progress = $3,
- error_message = $4,
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $5
- RETURNING *
- "#,
- )
- .bind(state)
- .bind(current_activity)
- .bind(progress)
- .bind(error_message)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Add a spawned task ID to the supervisor's list.
-pub async fn add_supervisor_spawned_task(
- pool: &PgPool,
- contract_id: Uuid,
- task_id: Uuid,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET spawned_task_ids = array_append(spawned_task_ids, $1),
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $2
- RETURNING *
- "#,
- )
- .bind(task_id)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Add a pending question to the supervisor state.
-pub async fn add_supervisor_pending_question(
- pool: &PgPool,
- contract_id: Uuid,
- question: serde_json::Value,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET pending_questions = pending_questions || $1::jsonb,
- state = 'waiting_for_user',
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $2
- RETURNING *
- "#,
- )
- .bind(question)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Remove a pending question by ID.
-pub async fn remove_supervisor_pending_question(
- pool: &PgPool,
- contract_id: Uuid,
- question_id: Uuid,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET pending_questions = (
- SELECT COALESCE(jsonb_agg(elem), '[]'::jsonb)
- FROM jsonb_array_elements(pending_questions) elem
- WHERE (elem->>'id')::uuid != $1
- ),
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $2
- RETURNING *
- "#,
- )
- .bind(question_id)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Comprehensive state save - used at major save points.
-pub async fn save_supervisor_state_full(
- pool: &PgPool,
- contract_id: Uuid,
- task_id: Uuid,
- conversation_history: serde_json::Value,
- pending_task_ids: &[Uuid],
- phase: &str,
- state: &str,
- current_activity: Option<&str>,
- progress: i32,
- error_message: Option<&str>,
- spawned_task_ids: &[Uuid],
- pending_questions: serde_json::Value,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- INSERT INTO supervisor_states (
- contract_id, task_id, conversation_history, pending_task_ids, phase,
- state, current_activity, progress, error_message, spawned_task_ids,
- pending_questions, last_activity
- )
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW())
- ON CONFLICT (contract_id) DO UPDATE SET
- task_id = EXCLUDED.task_id,
- conversation_history = EXCLUDED.conversation_history,
- pending_task_ids = EXCLUDED.pending_task_ids,
- phase = EXCLUDED.phase,
- state = EXCLUDED.state,
- current_activity = EXCLUDED.current_activity,
- progress = EXCLUDED.progress,
- error_message = EXCLUDED.error_message,
- spawned_task_ids = EXCLUDED.spawned_task_ids,
- pending_questions = EXCLUDED.pending_questions,
- last_activity = NOW(),
- updated_at = NOW()
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(task_id)
- .bind(conversation_history)
- .bind(pending_task_ids)
- .bind(phase)
- .bind(state)
- .bind(current_activity)
- .bind(progress)
- .bind(error_message)
- .bind(spawned_task_ids)
- .bind(pending_questions)
- .fetch_one(pool)
- .await
-}
-
-/// Mark supervisor as restored from a crash/interruption.
-pub async fn mark_supervisor_restored(
- pool: &PgPool,
- contract_id: Uuid,
- restoration_source: &str,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET restoration_count = restoration_count + 1,
- last_restored_at = NOW(),
- restoration_source = $1,
- state = 'initializing',
- error_message = NULL,
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $2
- RETURNING *
- "#,
- )
- .bind(restoration_source)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Get supervisors with pending questions (for re-delivery after restoration).
-pub async fn get_supervisors_with_pending_questions(
- pool: &PgPool,
- owner_id: Uuid,
-) -> Result<Vec<SupervisorState>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- SELECT ss.*
- FROM supervisor_states ss
- JOIN contracts c ON c.id = ss.contract_id
- WHERE c.owner_id = $1
- AND ss.pending_questions != '[]'::jsonb
- AND c.status = 'active'
- ORDER BY ss.last_activity DESC
- "#,
- )
- .bind(owner_id)
- .fetch_all(pool)
- .await
-}
-
-/// Get supervisor state with full details for restoration.
-/// Includes validation info.
-pub async fn get_supervisor_state_for_restoration(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Option<SupervisorState>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- SELECT * FROM supervisor_states WHERE contract_id = $1
- "#,
- )
- .bind(contract_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Validate spawned tasks are in expected states.
-/// Returns map of task_id -> (status, updated_at).
-pub async fn validate_spawned_tasks(
- pool: &PgPool,
- task_ids: &[Uuid],
-) -> Result<std::collections::HashMap<Uuid, (String, chrono::DateTime<Utc>)>, sqlx::Error> {
- use sqlx::Row;
-
- let rows = sqlx::query(
- r#"
- SELECT id, status, updated_at
- FROM tasks
- WHERE id = ANY($1)
- "#,
- )
- .bind(task_ids)
- .fetch_all(pool)
- .await?;
-
- let mut result = std::collections::HashMap::new();
- for row in rows {
- let id: Uuid = row.get("id");
- let status: String = row.get("status");
- let updated_at: chrono::DateTime<Utc> = row.get("updated_at");
- result.insert(id, (status, updated_at));
- }
- Ok(result)
-}
-
-/// Update supervisor state when phase changes.
-pub async fn update_supervisor_phase(
- pool: &PgPool,
- contract_id: Uuid,
- new_phase: &str,
-) -> Result<SupervisorState, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET phase = $1,
- state = 'working',
- current_activity = 'Phase changed to ' || $1,
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $2
- RETURNING *
- "#,
- )
- .bind(new_phase)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Update supervisor state on heartbeat (lightweight update).
-pub async fn update_supervisor_heartbeat_state(
- pool: &PgPool,
- contract_id: Uuid,
- state: &str,
- current_activity: Option<&str>,
- progress: i32,
- pending_task_ids: &[Uuid],
-) -> Result<(), sqlx::Error> {
- sqlx::query(
- r#"
- UPDATE supervisor_states
- SET state = $1,
- current_activity = $2,
- progress = $3,
- pending_task_ids = $4,
- last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $5
- "#,
- )
- .bind(state)
- .bind(current_activity)
- .bind(progress)
- .bind(pending_task_ids)
- .bind(contract_id)
- .execute(pool)
- .await?;
- Ok(())
-}
-
-// ============================================================================
-// Supervisor Heartbeats
-// ============================================================================
-
-/// Record a supervisor heartbeat.
-/// This creates a historical record for monitoring and dead supervisor detection.
-pub async fn create_supervisor_heartbeat(
- pool: &PgPool,
- supervisor_task_id: Uuid,
- contract_id: Uuid,
- state: &str,
- phase: &str,
- current_activity: Option<&str>,
- progress: i32,
- pending_task_ids: &[Uuid],
-) -> Result<SupervisorHeartbeatRecord, sqlx::Error> {
- sqlx::query_as::<_, SupervisorHeartbeatRecord>(
- r#"
- INSERT INTO supervisor_heartbeats (
- supervisor_task_id, contract_id, state, phase, current_activity, progress, pending_task_ids, timestamp
- )
- VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
- RETURNING *
- "#,
- )
- .bind(supervisor_task_id)
- .bind(contract_id)
- .bind(state)
- .bind(phase)
- .bind(current_activity)
- .bind(progress)
- .bind(pending_task_ids)
- .fetch_one(pool)
- .await
-}
-
-/// Get the latest heartbeat for a supervisor task.
-pub async fn get_latest_supervisor_heartbeat(
- pool: &PgPool,
- supervisor_task_id: Uuid,
-) -> Result<Option<SupervisorHeartbeatRecord>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorHeartbeatRecord>(
- r#"
- SELECT * FROM supervisor_heartbeats
- WHERE supervisor_task_id = $1
- ORDER BY timestamp DESC
- LIMIT 1
- "#,
- )
- .bind(supervisor_task_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Get recent heartbeats for a supervisor task.
-pub async fn get_supervisor_heartbeats(
- pool: &PgPool,
- supervisor_task_id: Uuid,
- limit: i64,
-) -> Result<Vec<SupervisorHeartbeatRecord>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorHeartbeatRecord>(
- r#"
- SELECT * FROM supervisor_heartbeats
- WHERE supervisor_task_id = $1
- ORDER BY timestamp DESC
- LIMIT $2
- "#,
- )
- .bind(supervisor_task_id)
- .bind(limit)
- .fetch_all(pool)
- .await
-}
-
-/// Get recent heartbeats for a contract.
-pub async fn get_contract_supervisor_heartbeats(
- pool: &PgPool,
- contract_id: Uuid,
- limit: i64,
-) -> Result<Vec<SupervisorHeartbeatRecord>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorHeartbeatRecord>(
- r#"
- SELECT * FROM supervisor_heartbeats
- WHERE contract_id = $1
- ORDER BY timestamp DESC
- LIMIT $2
- "#,
- )
- .bind(contract_id)
- .bind(limit)
- .fetch_all(pool)
- .await
-}
-
-/// Delete old heartbeats beyond the TTL (24 hours by default).
-/// Returns the number of deleted records.
-pub async fn cleanup_old_heartbeats(
- pool: &PgPool,
- ttl_hours: i64,
-) -> Result<u64, sqlx::Error> {
- let result = sqlx::query(
- r#"
- DELETE FROM supervisor_heartbeats
- WHERE timestamp < NOW() - ($1 || ' hours')::INTERVAL
- "#,
- )
- .bind(ttl_hours.to_string())
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected())
-}
-
-/// Find supervisors that have not sent a heartbeat within the timeout period.
-/// Returns list of (supervisor_task_id, contract_id, last_heartbeat_timestamp).
-pub async fn find_stale_supervisors(
- pool: &PgPool,
- timeout_seconds: i64,
-) -> Result<Vec<(Uuid, Uuid, chrono::DateTime<Utc>)>, sqlx::Error> {
- let rows = sqlx::query(
- r#"
- WITH latest_heartbeats AS (
- SELECT DISTINCT ON (supervisor_task_id)
- supervisor_task_id,
- contract_id,
- timestamp
- FROM supervisor_heartbeats
- ORDER BY supervisor_task_id, timestamp DESC
- )
- SELECT
- lh.supervisor_task_id,
- lh.contract_id,
- lh.timestamp
- FROM latest_heartbeats lh
- JOIN tasks t ON t.id = lh.supervisor_task_id
- WHERE t.status = 'running'
- AND lh.timestamp < NOW() - ($1 || ' seconds')::INTERVAL
- "#,
- )
- .bind(timeout_seconds.to_string())
- .fetch_all(pool)
- .await?;
-
- let mut result = Vec::new();
- for row in rows {
- use sqlx::Row;
- let supervisor_task_id: Uuid = row.get("supervisor_task_id");
- let contract_id: Uuid = row.get("contract_id");
- let timestamp: chrono::DateTime<Utc> = row.get("timestamp");
- result.push((supervisor_task_id, contract_id, timestamp));
- }
- Ok(result)
-}
-
-// ============================================================================
-// Contract Supervisor
-// ============================================================================
-
-/// Update contract's supervisor task ID.
-pub async fn update_contract_supervisor(
- pool: &PgPool,
- contract_id: Uuid,
- supervisor_task_id: Uuid,
-) -> Result<Contract, sqlx::Error> {
- sqlx::query_as::<_, Contract>(
- r#"
- UPDATE contracts
- SET supervisor_task_id = $1,
- updated_at = NOW()
- WHERE id = $2
- RETURNING *
- "#,
- )
- .bind(supervisor_task_id)
- .bind(contract_id)
- .fetch_one(pool)
- .await
-}
-
-/// Mark a deliverable as complete for a specific phase.
-/// Uses JSONB operations to append the deliverable_id to the phase's array.
-pub async fn mark_deliverable_complete(
- pool: &PgPool,
- contract_id: Uuid,
- phase: &str,
- deliverable_id: &str,
-) -> Result<Contract, sqlx::Error> {
- // Use jsonb_set to add the deliverable to the phase's array
- // If the phase key doesn't exist, create an empty array first
- // COALESCE handles the case where the phase array doesn't exist yet
- sqlx::query_as::<_, Contract>(
- r#"
- UPDATE contracts
- SET completed_deliverables = jsonb_set(
- completed_deliverables,
- ARRAY[$2::text],
- COALESCE(completed_deliverables->$2, '[]'::jsonb) || to_jsonb($3::text),
- true
- ),
- updated_at = NOW()
- WHERE id = $1
- AND NOT (COALESCE(completed_deliverables->$2, '[]'::jsonb) ? $3)
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(phase)
- .bind(deliverable_id)
- .fetch_one(pool)
- .await
-}
-
-/// Clear all completed deliverables for a specific phase.
-/// Used when phase changes or deliverables need to be reset.
-pub async fn clear_phase_deliverables(
- pool: &PgPool,
- contract_id: Uuid,
- phase: &str,
-) -> Result<Contract, sqlx::Error> {
- sqlx::query_as::<_, Contract>(
- r#"
- UPDATE contracts
- SET completed_deliverables = completed_deliverables - $2,
- updated_at = NOW()
- WHERE id = $1
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .bind(phase)
- .fetch_one(pool)
- .await
-}
-
-/// Get the supervisor task for a contract.
-pub async fn get_contract_supervisor_task(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Option<Task>, sqlx::Error> {
- sqlx::query_as::<_, Task>(
- r#"
- SELECT t.* FROM tasks t
- JOIN contracts c ON c.supervisor_task_id = t.id
- WHERE c.id = $1
- "#,
- )
- .bind(contract_id)
- .fetch_optional(pool)
- .await
-}
-
-// ============================================================================
-// Task Tree Queries
-// ============================================================================
-
-/// Get full task tree for a contract.
-pub async fn get_contract_task_tree(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Vec<Task>, sqlx::Error> {
- sqlx::query_as::<_, Task>(
- r#"
- WITH RECURSIVE task_tree AS (
- -- Base case: root tasks (no parent)
- SELECT * FROM tasks
- WHERE contract_id = $1 AND parent_task_id IS NULL
- UNION ALL
- -- Recursive case: children of current level
- SELECT t.* FROM tasks t
- JOIN task_tree tt ON t.parent_task_id = tt.id
- )
- SELECT * FROM task_tree
- ORDER BY depth, created_at
- "#,
- )
- .bind(contract_id)
- .fetch_all(pool)
- .await
-}
-
-/// Get task tree from a specific root task.
-pub async fn get_task_tree(pool: &PgPool, root_task_id: Uuid) -> Result<Vec<Task>, sqlx::Error> {
- sqlx::query_as::<_, Task>(
- r#"
- WITH RECURSIVE task_tree AS (
- -- Base case: the root task
- SELECT * FROM tasks WHERE id = $1
- UNION ALL
- -- Recursive case: children of current level
- SELECT t.* FROM tasks t
- JOIN task_tree tt ON t.parent_task_id = tt.id
- )
- SELECT * FROM task_tree
- ORDER BY depth, created_at
- "#,
- )
- .bind(root_task_id)
- .fetch_all(pool)
- .await
-}
-
-// ============================================================================
// Daemon Selection
// ============================================================================
@@ -4578,107 +2470,27 @@ pub async fn cleanup_old_snapshots(
pub async fn record_history_event(
pool: &PgPool,
owner_id: Uuid,
- contract_id: Option<Uuid>,
task_id: Option<Uuid>,
event_type: &str,
event_subtype: Option<&str>,
- phase: Option<&str>,
event_data: serde_json::Value,
) -> Result<HistoryEvent, sqlx::Error> {
sqlx::query_as::<_, HistoryEvent>(
r#"
- INSERT INTO history_events (owner_id, contract_id, task_id, event_type, event_subtype, phase, event_data)
- VALUES ($1, $2, $3, $4, $5, $6, $7)
+ INSERT INTO history_events (owner_id, task_id, event_type, event_subtype, event_data)
+ VALUES ($1, $2, $3, $4, $5)
RETURNING *
"#
)
.bind(owner_id)
- .bind(contract_id)
.bind(task_id)
.bind(event_type)
.bind(event_subtype)
- .bind(phase)
.bind(event_data)
.fetch_one(pool)
.await
}
-/// Get contract history timeline
-pub async fn get_contract_history(
- pool: &PgPool,
- contract_id: Uuid,
- owner_id: Uuid,
- filters: &HistoryQueryFilters,
-) -> Result<(Vec<HistoryEvent>, i64), sqlx::Error> {
- let limit = filters.limit.unwrap_or(100);
-
- let mut query = String::from(
- "SELECT * FROM history_events WHERE contract_id = $1 AND owner_id = $2"
- );
- let mut count_query = String::from(
- "SELECT COUNT(*) FROM history_events WHERE contract_id = $1 AND owner_id = $2"
- );
-
- let mut param_count = 2;
-
- if filters.phase.is_some() {
- param_count += 1;
- query.push_str(&format!(" AND phase = ${}" , param_count));
- count_query.push_str(&format!(" AND phase = ${}", param_count));
- }
-
- if filters.from.is_some() {
- param_count += 1;
- query.push_str(&format!(" AND created_at >= ${}", param_count));
- count_query.push_str(&format!(" AND created_at >= ${}", param_count));
- }
-
- if filters.to.is_some() {
- param_count += 1;
- query.push_str(&format!(" AND created_at <= ${}", param_count));
- count_query.push_str(&format!(" AND created_at <= ${}", param_count));
- }
-
- query.push_str(" ORDER BY created_at DESC");
- query.push_str(&format!(" LIMIT {}", limit));
-
- // Build and execute the query dynamically
- let mut q = sqlx::query_as::<_, HistoryEvent>(&query)
- .bind(contract_id)
- .bind(owner_id);
-
- if let Some(ref phase) = filters.phase {
- q = q.bind(phase);
- }
- if let Some(ref from) = filters.from {
- q = q.bind(from);
- }
- if let Some(ref to) = filters.to {
- q = q.bind(to);
- }
-
- let events = q.fetch_all(pool).await?;
-
- // Get total count
- let mut cq = sqlx::query_scalar::<_, i64>(&count_query)
- .bind(contract_id)
- .bind(owner_id);
-
- if let Some(ref phase) = filters.phase {
- cq = cq.bind(phase);
- }
- if let Some(ref from) = filters.from {
- cq = cq.bind(from);
- }
- if let Some(ref to) = filters.to {
- cq = cq.bind(to);
- }
-
- let count = cq.fetch_one(pool).await?;
-
- Ok((events, count))
-}
-
/// Get task history
pub async fn get_task_history(
pool: &PgPool,
@@ -4825,13 +2637,6 @@ pub async fn get_task_conversation(
Ok(messages)
}
-/// Get supervisor conversation (from supervisor_states)
-pub async fn get_supervisor_conversation_full(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Option<SupervisorState>, sqlx::Error> {
- get_supervisor_state(pool, contract_id).await
-}
// =============================================================================
// Anonymous Task Cleanup Functions
@@ -4969,156 +2774,6 @@ pub async fn delete_checkpoint_patches_for_task(
Ok(result.rows_affected() as i64)
}
-// =============================================================================
-// Red Team Notifications
-// =============================================================================
-// =============================================================================
-// Supervisor Status API Helpers
-// =============================================================================
-
-/// Get supervisor status for a contract.
-/// Returns combined information from supervisor_states and tasks tables.
-pub async fn get_supervisor_status(
- pool: &PgPool,
- contract_id: Uuid,
- owner_id: Uuid,
-) -> Result<Option<SupervisorStatusInfo>, sqlx::Error> {
- // Query to get supervisor status by joining supervisor_states with tasks
- sqlx::query_as::<_, SupervisorStatusInfo>(
- r#"
- SELECT
- ss.task_id,
- COALESCE(t.status, 'unknown') as supervisor_state,
- ss.phase,
- t.progress_summary as current_activity,
- ss.pending_task_ids,
- ss.last_activity as last_heartbeat,
- t.status as task_status,
- t.daemon_id IS NOT NULL as is_running
- FROM supervisor_states ss
- JOIN tasks t ON t.id = ss.task_id
- WHERE ss.contract_id = $1
- AND t.owner_id = $2
- "#,
- )
- .bind(contract_id)
- .bind(owner_id)
- .fetch_optional(pool)
- .await
-}
-
-/// Internal struct to hold supervisor status query result
-#[derive(Debug, Clone, sqlx::FromRow)]
-pub struct SupervisorStatusInfo {
- pub task_id: Uuid,
- pub supervisor_state: String,
- pub phase: String,
- pub current_activity: Option<String>,
- #[sqlx(try_from = "Vec<Uuid>")]
- pub pending_task_ids: Vec<Uuid>,
- pub last_heartbeat: chrono::DateTime<chrono::Utc>,
- pub task_status: String,
- pub is_running: bool,
-}
-
-/// Get supervisor activity history from history_events table.
-/// This provides a timeline of supervisor activities that can serve as "heartbeats".
-pub async fn get_supervisor_activity_history(
- pool: &PgPool,
- contract_id: Uuid,
- limit: i32,
- offset: i32,
-) -> Result<Vec<SupervisorActivityEntry>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorActivityEntry>(
- r#"
- SELECT
- created_at as timestamp,
- COALESCE(event_subtype, 'activity') as state,
- event_data->>'activity' as activity,
- (event_data->>'progress')::INTEGER as progress,
- COALESCE(phase, 'unknown') as phase,
- CASE
- WHEN event_data->'pending_task_ids' IS NOT NULL
- THEN ARRAY(SELECT jsonb_array_elements_text(event_data->'pending_task_ids'))::UUID[]
- ELSE ARRAY[]::UUID[]
- END as pending_task_ids
- FROM history_events
- WHERE contract_id = $1
- AND event_type IN ('supervisor', 'phase', 'task')
- ORDER BY created_at DESC
- LIMIT $2 OFFSET $3
- "#,
- )
- .bind(contract_id)
- .bind(limit)
- .bind(offset)
- .fetch_all(pool)
- .await
-}
-
-/// Internal struct to hold supervisor activity entry
-#[derive(Debug, Clone, sqlx::FromRow)]
-pub struct SupervisorActivityEntry {
- pub timestamp: chrono::DateTime<chrono::Utc>,
- pub state: String,
- pub activity: Option<String>,
- pub progress: Option<i32>,
- pub phase: String,
- #[sqlx(try_from = "Vec<Uuid>")]
- pub pending_task_ids: Vec<Uuid>,
-}
-
-/// Count total supervisor activity history entries for a contract.
-pub async fn count_supervisor_activity_history(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<i64, sqlx::Error> {
- let result: (i64,) = sqlx::query_as(
- r#"
- SELECT COUNT(*)
- FROM history_events
- WHERE contract_id = $1
- AND event_type IN ('supervisor', 'phase', 'task')
- "#,
- )
- .bind(contract_id)
- .fetch_one(pool)
- .await?;
- Ok(result.0)
-}
-
-/// Update supervisor state last_activity timestamp.
-/// This acts as a "sync" operation to refresh the supervisor's heartbeat.
-pub async fn sync_supervisor_state(
- pool: &PgPool,
- contract_id: Uuid,
-) -> Result<Option<SupervisorState>, sqlx::Error> {
- sqlx::query_as::<_, SupervisorState>(
- r#"
- UPDATE supervisor_states
- SET last_activity = NOW(),
- updated_at = NOW()
- WHERE contract_id = $1
- RETURNING *
- "#,
- )
- .bind(contract_id)
- .fetch_optional(pool)
- .await
-}
-
-// =============================================================================
-// Helper Functions
-// =============================================================================
-
-/// Helper to truncate string to max length
-fn truncate_string(s: &str, max_len: usize) -> String {
- if s.len() <= max_len {
- s.to_string()
- } else {
- format!("{}...", &s[..max_len - 3])
- }
-}
// =============================================================================
// Directive CRUD
@@ -7031,37 +4686,6 @@ pub async fn get_running_steps_with_tasks(
.await
}
-/// A running step backed by a contract, joined with the contract's current status.
-#[derive(Debug, Clone, sqlx::FromRow)]
-pub struct RunningStepWithContract {
- pub step_id: Uuid,
- pub directive_id: Uuid,
- pub contract_id: Uuid,
- pub contract_status: String,
- pub contract_phase: String,
-}
-
-/// Get running steps that are backed by contracts (for contract-based monitoring).
-pub async fn get_running_steps_with_contracts(
- pool: &PgPool,
-) -> Result<Vec<RunningStepWithContract>, sqlx::Error> {
- sqlx::query_as::<_, RunningStepWithContract>(
- r#"
- SELECT
- ds.id AS step_id,
- ds.directive_id,
- ds.contract_id AS "contract_id!",
- c.status AS contract_status,
- c.phase AS contract_phase
- FROM directive_steps ds
- JOIN contracts c ON c.id = ds.contract_id
- WHERE ds.status = 'running'
- AND ds.contract_id IS NOT NULL
- "#,
- )
- .fetch_all(pool)
- .await
-}
/// An orchestrator task to check (directive with pending planning task).
#[derive(Debug, Clone, sqlx::FromRow)]
@@ -7221,25 +4845,6 @@ pub async fn link_task_to_step(
Ok(())
}
-/// Link a contract to a directive step.
-pub async fn link_contract_to_step(
- pool: &PgPool,
- step_id: Uuid,
- contract_id: Uuid,
-) -> Result<(), sqlx::Error> {
- sqlx::query(
- r#"
- UPDATE directive_steps
- SET contract_id = $1
- WHERE id = $2
- "#,
- )
- .bind(contract_id)
- .bind(step_id)
- .execute(pool)
- .await?;
- Ok(())
-}
/// Set a step to 'running' status (after its task has been dispatched).
pub async fn set_step_running(