diff options
| author | soryu <soryu@soryu.co> | 2026-01-11 05:52:14 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-15 00:21:16 +0000 |
| commit | 87044a747b47bd83249d61a45842c7f7b2eae56d (patch) | |
| tree | ef2000ce79ffcc2723ef841acef5aa1deb1d5378 /makima/src/db | |
| parent | 077820c4167c168072d217a1b01df840463a12a8 (diff) | |
| download | soryu-87044a747b47bd83249d61a45842c7f7b2eae56d.tar.gz soryu-87044a747b47bd83249d61a45842c7f7b2eae56d.zip | |
Contract system
Diffstat (limited to 'makima/src/db')
| -rw-r--r-- | makima/src/db/models.rs | 603 | ||||
| -rw-r--r-- | makima/src/db/repository.rs | 1400 |
2 files changed, 1938 insertions, 65 deletions
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index 5064b97..e16c43f 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -60,6 +60,8 @@ pub enum BodyElement { alt: Option<String>, caption: Option<String>, }, + /// Raw markdown content - renders as formatted markdown, edits as raw text + Markdown { content: String }, } /// File record from the database. @@ -68,6 +70,10 @@ pub enum BodyElement { pub struct File { pub id: Uuid, pub owner_id: Uuid, + /// Contract this file belongs to (optional) + pub contract_id: Option<Uuid>, + /// Phase of the contract when file was added (e.g., "research", "specify") + pub contract_phase: Option<String>, pub name: String, pub description: Option<String>, #[sqlx(json)] @@ -80,6 +86,12 @@ pub struct File { pub body: Vec<BodyElement>, /// Version number for optimistic locking pub version: i32, + /// Path to linked repository file (e.g., "README.md", "docs/design.md") + pub repo_file_path: Option<String>, + /// When the file was last synced from the repository + pub repo_synced_at: Option<DateTime<Utc>>, + /// Sync status: 'none', 'synced', 'modified', 'conflict' + pub repo_sync_status: Option<String>, pub created_at: DateTime<Utc>, pub updated_at: DateTime<Utc>, } @@ -88,14 +100,24 @@ pub struct File { #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct CreateFileRequest { + /// Contract this file belongs to (required - files must belong to a contract) + pub contract_id: Uuid, /// Name of the file (auto-generated if not provided) pub name: Option<String>, /// Optional description pub description: Option<String>, - /// Transcript entries + /// Transcript entries (default to empty) + #[serde(default)] pub transcript: Vec<TranscriptEntry>, /// Storage location (e.g., s3://bucket/path) - not used yet pub location: Option<String>, + /// Initial body elements (e.g., from a template) + #[serde(default)] + pub body: Vec<BodyElement>, + /// Path to linked repository file (e.g., "README.md") + pub repo_file_path: Option<String>, + /// Contract phase this file belongs to (for deliverable tracking) + pub contract_phase: Option<String>, } /// Request payload for updating an existing file. @@ -114,6 +136,8 @@ pub struct UpdateFileRequest { pub body: Option<Vec<BodyElement>>, /// Version for optimistic locking (required for updates from frontend) pub version: Option<i32>, + /// Path to linked repository file (e.g., "README.md") + pub repo_file_path: Option<String>, } /// Response for file list endpoint. @@ -129,6 +153,12 @@ pub struct FileListResponse { #[serde(rename_all = "camelCase")] pub struct FileSummary { pub id: Uuid, + /// Contract this file belongs to + pub contract_id: Option<Uuid>, + /// Contract name (joined from contracts table) + pub contract_name: Option<String>, + /// Phase when file was added to contract + pub contract_phase: Option<String>, pub name: String, pub description: Option<String>, pub transcript_count: usize, @@ -136,6 +166,10 @@ pub struct FileSummary { pub duration: Option<f32>, /// Version number for optimistic locking pub version: i32, + /// Path to linked repository file + pub repo_file_path: Option<String>, + /// Sync status with repository + pub repo_sync_status: Option<String>, pub created_at: DateTime<Utc>, pub updated_at: DateTime<Utc>, } @@ -149,11 +183,16 @@ impl From<File> for FileSummary { .fold(0.0_f32, f32::max); Self { id: file.id, + contract_id: file.contract_id, + contract_name: None, // Not available from File alone, requires JOIN + contract_phase: file.contract_phase, name: file.name, description: file.description, transcript_count: file.transcript.len(), duration: if duration > 0.0 { Some(duration) } else { None }, version: file.version, + repo_file_path: file.repo_file_path, + repo_sync_status: file.repo_sync_status, created_at: file.created_at, updated_at: file.updated_at, } @@ -345,8 +384,10 @@ impl std::str::FromStr for MergeMode { pub struct Task { pub id: Uuid, pub owner_id: Uuid, + /// Contract this task belongs to (required for new tasks) + pub contract_id: Option<Uuid>, pub parent_task_id: Option<Uuid>, - /// Depth in task hierarchy: 0=orchestrator (top-level), 1=subtask (max) + /// Depth in task hierarchy (no longer constrained) pub depth: i32, pub name: String, pub description: Option<String>, @@ -354,6 +395,11 @@ pub struct Task { pub priority: i32, pub plan: String, + // Supervisor flag + /// True for contract supervisor tasks. Only supervisors can spawn new tasks. + #[serde(default)] + pub is_supervisor: bool, + // Daemon/container info pub daemon_id: Option<Uuid>, pub container_id: Option<String>, @@ -379,6 +425,30 @@ pub struct Task { pub last_output: Option<String>, pub error_message: Option<String>, + // Git checkpoint tracking + /// Git commit SHA of the most recent checkpoint + #[serde(skip_serializing_if = "Option::is_none")] + pub last_checkpoint_sha: Option<String>, + /// Number of checkpoints created by this task + #[serde(default)] + pub checkpoint_count: i32, + /// Message from the most recent checkpoint + #[serde(skip_serializing_if = "Option::is_none")] + pub checkpoint_message: Option<String>, + + // Conversation state for resumption + /// Saved conversation context for task/supervisor resumption + #[serde(skip_serializing_if = "Option::is_none")] + pub conversation_state: Option<serde_json::Value>, + + // Daemon migration tracking + /// Previous daemon if task was migrated + #[serde(skip_serializing_if = "Option::is_none")] + pub migrated_from_daemon_id: Option<Uuid>, + /// Most recent daemon that worked on this task + #[serde(skip_serializing_if = "Option::is_none")] + pub last_active_daemon_id: Option<Uuid>, + // Timestamps pub started_at: Option<DateTime<Utc>>, pub completed_at: Option<DateTime<Utc>>, @@ -413,6 +483,12 @@ impl Task { #[serde(rename_all = "camelCase")] pub struct TaskSummary { pub id: Uuid, + /// Contract this task belongs to + pub contract_id: Option<Uuid>, + /// Contract name (joined from contracts table) + pub contract_name: Option<String>, + /// Contract phase (joined from contracts table) + pub contract_phase: Option<String>, pub parent_task_id: Option<Uuid>, /// Depth in task hierarchy: 0=orchestrator (top-level), 1=subtask (max) pub depth: i32, @@ -422,10 +498,36 @@ pub struct TaskSummary { pub progress_summary: Option<String>, pub subtask_count: i64, pub version: i32, + /// True for contract supervisor tasks + #[serde(default)] + pub is_supervisor: bool, pub created_at: DateTime<Utc>, pub updated_at: DateTime<Utc>, } +/// Convert a full Task to a TaskSummary +impl From<Task> for TaskSummary { + fn from(task: Task) -> Self { + Self { + id: task.id, + contract_id: task.contract_id, + contract_name: None, // Not available from Task directly + contract_phase: None, // Not available from Task directly + parent_task_id: task.parent_task_id, + depth: task.depth, + name: task.name, + status: task.status, + priority: task.priority, + progress_summary: task.progress_summary, + subtask_count: 0, // Would need separate query + version: task.version, + is_supervisor: task.is_supervisor, + created_at: task.created_at, + updated_at: task.updated_at, + } + } +} + /// Response for task list endpoint #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] @@ -438,6 +540,8 @@ pub struct TaskListResponse { #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct CreateTaskRequest { + /// Contract this task belongs to (required) + pub contract_id: Uuid, /// Name of the task pub name: String, /// Optional description @@ -446,6 +550,9 @@ pub struct CreateTaskRequest { pub plan: String, /// Parent task ID (for subtasks) pub parent_task_id: Option<Uuid>, + /// True for contract supervisor tasks. Only supervisors can spawn new tasks. + #[serde(default)] + pub is_supervisor: bool, /// Priority (higher = more urgent) #[serde(default)] pub priority: i32, @@ -466,6 +573,8 @@ pub struct CreateTaskRequest { /// Files to copy from parent task's worktree when starting #[serde(skip_serializing_if = "Option::is_none")] pub copy_files: Option<Vec<String>>, + /// Checkpoint SHA to branch from (optional) + pub checkpoint_sha: Option<String>, } /// Request payload for updating a task @@ -482,6 +591,8 @@ pub struct UpdateTaskRequest { pub error_message: Option<String>, pub merge_mode: Option<String>, pub pr_url: Option<String>, + /// Repository URL for the task (e.g., when updating supervisor with repo info) + pub repository_url: Option<String>, /// Path to user's local repository (outside ~/.makima) pub target_repo_path: Option<String>, /// Action on completion: "none", "branch", "merge", "pr" @@ -733,6 +844,47 @@ pub struct MeshChatHistoryResponse { } // ============================================================================= +// Contract Chat History Types +// ============================================================================= + +/// Conversation thread for contract chat (scoped to a specific contract) +#[derive(Debug, Clone, FromRow, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ContractChatConversation { + pub id: Uuid, + pub contract_id: Uuid, + pub owner_id: Uuid, + pub name: Option<String>, + pub is_active: bool, + pub created_at: DateTime<Utc>, + pub updated_at: DateTime<Utc>, +} + +/// Individual message in a contract chat conversation +#[derive(Debug, Clone, FromRow, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ContractChatMessageRecord { + pub id: Uuid, + pub conversation_id: Uuid, + pub role: String, + pub content: String, + /// Tool calls made during this message (JSON, nullable) + pub tool_calls: Option<serde_json::Value>, + /// Pending questions requiring user response (JSON, nullable) + pub pending_questions: Option<serde_json::Value>, + pub created_at: DateTime<Utc>, +} + +/// Response for contract chat history endpoint +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ContractChatHistoryResponse { + pub contract_id: Uuid, + pub conversation_id: Uuid, + pub messages: Vec<ContractChatMessageRecord>, +} + +// ============================================================================= // Merge API Types // ============================================================================= @@ -834,3 +986,450 @@ pub struct MergeCompleteCheckResponse { /// Count of skipped branches pub skipped_count: u32, } + +// ============================================================================= +// Contract Types +// ============================================================================= + +/// Contract phase for workflow progression +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "lowercase")] +pub enum ContractPhase { + Research, + Specify, + Plan, + Execute, + Review, +} + +impl std::fmt::Display for ContractPhase { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ContractPhase::Research => write!(f, "research"), + ContractPhase::Specify => write!(f, "specify"), + ContractPhase::Plan => write!(f, "plan"), + ContractPhase::Execute => write!(f, "execute"), + ContractPhase::Review => write!(f, "review"), + } + } +} + +impl std::str::FromStr for ContractPhase { + type Err = String; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s.to_lowercase().as_str() { + "research" => Ok(ContractPhase::Research), + "specify" => Ok(ContractPhase::Specify), + "plan" => Ok(ContractPhase::Plan), + "execute" => Ok(ContractPhase::Execute), + "review" => Ok(ContractPhase::Review), + _ => Err(format!("Unknown contract phase: {}", s)), + } + } +} + +/// Contract status +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "lowercase")] +pub enum ContractStatus { + Active, + Completed, + Archived, +} + +impl std::fmt::Display for ContractStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ContractStatus::Active => write!(f, "active"), + ContractStatus::Completed => write!(f, "completed"), + ContractStatus::Archived => write!(f, "archived"), + } + } +} + +impl std::str::FromStr for ContractStatus { + type Err = String; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s.to_lowercase().as_str() { + "active" => Ok(ContractStatus::Active), + "completed" => Ok(ContractStatus::Completed), + "archived" => Ok(ContractStatus::Archived), + _ => Err(format!("Unknown contract status: {}", s)), + } + } +} + +/// Repository source type +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "lowercase")] +pub enum RepositorySourceType { + /// Existing remote repo (GitHub, GitLab, etc) + Remote, + /// Existing local repo + Local, + /// New repo created/managed by Makima daemon + Managed, +} + +impl std::fmt::Display for RepositorySourceType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RepositorySourceType::Remote => write!(f, "remote"), + RepositorySourceType::Local => write!(f, "local"), + RepositorySourceType::Managed => write!(f, "managed"), + } + } +} + +impl std::str::FromStr for RepositorySourceType { + type Err = String; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s.to_lowercase().as_str() { + "remote" => Ok(RepositorySourceType::Remote), + "local" => Ok(RepositorySourceType::Local), + "managed" => Ok(RepositorySourceType::Managed), + _ => Err(format!("Unknown repository source type: {}", s)), + } + } +} + +/// Repository status (for managed repos) +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "lowercase")] +pub enum RepositoryStatus { + /// Repo is usable + Ready, + /// Waiting for daemon to create + Pending, + /// Daemon is creating the repo + Creating, + /// Creation failed + Failed, +} + +impl std::fmt::Display for RepositoryStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RepositoryStatus::Ready => write!(f, "ready"), + RepositoryStatus::Pending => write!(f, "pending"), + RepositoryStatus::Creating => write!(f, "creating"), + RepositoryStatus::Failed => write!(f, "failed"), + } + } +} + +impl std::str::FromStr for RepositoryStatus { + type Err = String; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s.to_lowercase().as_str() { + "ready" => Ok(RepositoryStatus::Ready), + "pending" => Ok(RepositoryStatus::Pending), + "creating" => Ok(RepositoryStatus::Creating), + "failed" => Ok(RepositoryStatus::Failed), + _ => Err(format!("Unknown repository status: {}", s)), + } + } +} + +/// Contract record from the database +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct Contract { + pub id: Uuid, + pub owner_id: Uuid, + pub name: String, + pub description: Option<String>, + pub phase: String, + pub status: String, + /// The long-running supervisor task that orchestrates this contract + #[serde(skip_serializing_if = "Option::is_none")] + pub supervisor_task_id: Option<Uuid>, + pub version: i32, + pub created_at: DateTime<Utc>, + pub updated_at: DateTime<Utc>, +} + +impl Contract { + /// Parse phase string to ContractPhase enum + pub fn phase_enum(&self) -> Result<ContractPhase, String> { + self.phase.parse() + } + + /// Parse status string to ContractStatus enum + pub fn status_enum(&self) -> Result<ContractStatus, String> { + self.status.parse() + } +} + +/// Contract repository record from the database +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ContractRepository { + pub id: Uuid, + pub contract_id: Uuid, + pub name: String, + pub repository_url: Option<String>, + pub local_path: Option<String>, + pub source_type: String, + pub status: String, + pub is_primary: bool, + pub created_at: DateTime<Utc>, + pub updated_at: DateTime<Utc>, +} + +impl ContractRepository { + /// Parse source_type string to RepositorySourceType enum + pub fn source_type_enum(&self) -> Result<RepositorySourceType, String> { + self.source_type.parse() + } + + /// Parse status string to RepositoryStatus enum + pub fn status_enum(&self) -> Result<RepositoryStatus, String> { + self.status.parse() + } +} + +/// Summary of a contract for list views +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ContractSummary { + pub id: Uuid, + pub name: String, + pub description: Option<String>, + pub phase: String, + pub status: String, + pub file_count: i64, + pub task_count: i64, + pub repository_count: i64, + pub version: i32, + pub created_at: DateTime<Utc>, +} + +/// Contract with all relations for detail view +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ContractWithRelations { + #[serde(flatten)] + pub contract: Contract, + pub repositories: Vec<ContractRepository>, + pub files: Vec<FileSummary>, + pub tasks: Vec<TaskSummary>, +} + +/// Response for contract list endpoint +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ContractListResponse { + pub contracts: Vec<ContractSummary>, + pub total: i64, +} + +/// Request payload for creating a new contract +#[derive(Debug, Clone, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreateContractRequest { + /// Name of the contract + pub name: String, + /// Optional description + pub description: Option<String>, + /// Initial phase to start in (defaults to "research") + /// Valid values: "research", "specify", "plan", "execute", "review" + #[serde(default)] + pub initial_phase: Option<String>, +} + +/// Request payload for updating a contract +#[derive(Debug, Default, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct UpdateContractRequest { + pub name: Option<String>, + pub description: Option<String>, + pub phase: Option<String>, + pub status: Option<String>, + /// Supervisor task ID for contract orchestration + #[serde(skip_serializing_if = "Option::is_none")] + pub supervisor_task_id: Option<Uuid>, + /// Version for optimistic locking + pub version: Option<i32>, +} + +/// Request to add a remote repository to a contract +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct AddRemoteRepositoryRequest { + pub name: String, + pub repository_url: String, + #[serde(default)] + pub is_primary: bool, +} + +/// Request to add a local repository to a contract +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct AddLocalRepositoryRequest { + pub name: String, + pub local_path: String, + #[serde(default)] + pub is_primary: bool, +} + +/// Request to create a managed repository (daemon will create it) +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreateManagedRepositoryRequest { + pub name: String, + #[serde(default)] + pub is_primary: bool, +} + +/// Request to change contract phase +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ChangePhaseRequest { + pub phase: String, +} + +/// Contract event record from the database +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ContractEvent { + pub id: Uuid, + pub contract_id: Uuid, + pub event_type: String, + pub previous_phase: Option<String>, + pub new_phase: Option<String>, + #[sqlx(json)] + pub event_data: Option<serde_json::Value>, + pub created_at: DateTime<Utc>, +} + +/// Response for contract events list endpoint +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ContractEventListResponse { + pub events: Vec<ContractEvent>, + pub total: i64, +} + +// ============================================================================ +// Task Checkpoints (for git checkpoint tracking) +// ============================================================================ + +/// Task checkpoint record - represents a git commit checkpoint +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct TaskCheckpoint { + pub id: Uuid, + pub task_id: Uuid, + /// Sequential checkpoint number within this task + pub checkpoint_number: i32, + /// Git commit SHA + pub commit_sha: String, + /// Git branch name + pub branch_name: String, + /// Commit message + pub message: String, + /// Files changed in this commit: [{path, action: 'A'|'M'|'D'}] + #[sqlx(json)] + pub files_changed: Option<serde_json::Value>, + /// Lines added in this commit + pub lines_added: Option<i32>, + /// Lines removed in this commit + pub lines_removed: Option<i32>, + pub created_at: DateTime<Utc>, +} + +/// Request to create a checkpoint +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreateCheckpointRequest { + /// Commit message + pub message: String, +} + +/// Response for checkpoint list endpoint +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CheckpointListResponse { + pub checkpoints: Vec<TaskCheckpoint>, + pub total: i64, +} + +// ============================================================================ +// Supervisor State (for supervisor resumability) +// ============================================================================ + +/// Supervisor state for contract supervisor tasks +/// Enables resumption after interruption +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorState { + pub id: Uuid, + pub contract_id: Uuid, + pub task_id: Uuid, + /// Full Claude conversation history for resumption + #[sqlx(json)] + pub conversation_history: serde_json::Value, + /// Last checkpoint this supervisor created + pub last_checkpoint_id: Option<Uuid>, + /// Tasks the supervisor is waiting on + #[sqlx(try_from = "Vec<Uuid>")] + pub pending_task_ids: Vec<Uuid>, + /// Current contract phase when supervisor was last active + pub phase: String, + /// When supervisor was last active + pub last_activity: DateTime<Utc>, + pub created_at: DateTime<Utc>, + pub updated_at: DateTime<Utc>, +} + +/// Request to update supervisor state +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct UpdateSupervisorStateRequest { + /// Updated conversation history + pub conversation_history: Option<serde_json::Value>, + /// Tasks the supervisor is waiting on + pub pending_task_ids: Option<Vec<Uuid>>, + /// Current contract phase + pub phase: Option<String>, +} + +// ============================================================================ +// Daemon Task Assignments (for multi-daemon support) +// ============================================================================ + +/// Daemon task assignment record +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct DaemonTaskAssignment { + pub id: Uuid, + pub daemon_id: Uuid, + pub task_id: Uuid, + pub assigned_at: DateTime<Utc>, + /// Status: 'active', 'migrating', 'completed' + pub status: String, +} + +/// Extended daemon info for selection +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct DaemonWithCapacity { + pub id: Uuid, + pub owner_id: Uuid, + pub connection_id: String, + pub hostname: Option<String>, + pub machine_id: Option<String>, + pub max_concurrent_tasks: i32, + pub current_task_count: i32, + pub capacity_score: Option<i32>, + pub task_queue_length: Option<i32>, + pub supports_migration: Option<bool>, + pub status: String, + pub last_heartbeat_at: DateTime<Utc>, + pub connected_at: DateTime<Utc>, +} diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index ce1e97d..3b911c2 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -5,8 +5,12 @@ use sqlx::PgPool; use uuid::Uuid; use super::models::{ - CreateFileRequest, CreateTaskRequest, Daemon, File, FileVersion, MeshChatConversation, - MeshChatMessageRecord, Task, TaskEvent, TaskSummary, UpdateFileRequest, UpdateTaskRequest, + Contract, ContractChatConversation, ContractChatMessageRecord, ContractEvent, ContractRepository, + ContractSummary, CreateCheckpointRequest, CreateContractRequest, CreateFileRequest, + CreateTaskRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, File, FileSummary, + FileVersion, MeshChatConversation, MeshChatMessageRecord, SupervisorState, Task, TaskCheckpoint, + TaskEvent, TaskSummary, UpdateContractRequest, UpdateFileRequest, UpdateSupervisorStateRequest, + UpdateTaskRequest, }; /// Repository error types. @@ -52,8 +56,18 @@ fn generate_default_name() -> String { now.format("Recording - %b %d %Y %H:%M:%S").to_string() } -/// Create a new file record. -pub async fn create_file(pool: &PgPool, req: CreateFileRequest) -> Result<File, sqlx::Error> { +/// Internal request for creating files without contract association (e.g., audio transcription). +/// User-facing file creation should use CreateFileRequest which requires contract_id. +pub struct InternalCreateFileRequest { + pub name: Option<String>, + pub description: Option<String>, + pub transcript: Vec<super::models::TranscriptEntry>, + pub location: Option<String>, +} + +/// Create a new file record (internal use, no contract required). +/// For user-facing file creation, use create_file_for_owner which requires a contract. +pub async fn create_file(pool: &PgPool, req: InternalCreateFileRequest) -> Result<File, sqlx::Error> { let name = req.name.unwrap_or_else(generate_default_name); let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default(); let body_json = serde_json::to_value::<Vec<super::models::BodyElement>>(vec![]).unwrap(); @@ -62,7 +76,7 @@ pub async fn create_file(pool: &PgPool, req: CreateFileRequest) -> Result<File, r#" INSERT INTO files (name, description, transcript, location, summary, body) VALUES ($1, $2, $3, $4, NULL, $5) - RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(&name) @@ -78,7 +92,7 @@ pub async fn create_file(pool: &PgPool, req: CreateFileRequest) -> Result<File, pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Error> { sqlx::query_as::<_, File>( r#" - SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files WHERE id = $1 "#, @@ -92,7 +106,7 @@ pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Err pub async fn list_files(pool: &PgPool) -> Result<Vec<File>, sqlx::Error> { sqlx::query_as::<_, File>( r#" - SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files ORDER BY created_at DESC "#, @@ -144,7 +158,7 @@ pub async fn update_file( UPDATE files SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW() WHERE id = $1 AND version = $7 - RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) @@ -163,7 +177,7 @@ pub async fn update_file( UPDATE files SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW() WHERE id = $1 - RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) @@ -219,6 +233,7 @@ pub async fn count_files(pool: &PgPool) -> Result<i64, sqlx::Error> { // ============================================================================= /// Create a new file record for a specific owner. +/// Files must belong to a contract - the contract_id is required and the phase is looked up. pub async fn create_file_for_owner( pool: &PgPool, owner_id: Uuid, @@ -226,21 +241,38 @@ pub async fn create_file_for_owner( ) -> Result<File, sqlx::Error> { let name = req.name.unwrap_or_else(generate_default_name); let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default(); - let body_json = serde_json::to_value::<Vec<super::models::BodyElement>>(vec![]).unwrap(); + // Use body from request (may be empty or contain template elements) + let body_json = serde_json::to_value(&req.body).unwrap_or_default(); + + // Use provided contract_phase, or look up from contract's current phase + let contract_phase: Option<String> = if req.contract_phase.is_some() { + req.contract_phase + } else { + sqlx::query_scalar( + "SELECT phase FROM contracts WHERE id = $1 AND owner_id = $2", + ) + .bind(req.contract_id) + .bind(owner_id) + .fetch_optional(pool) + .await? + }; sqlx::query_as::<_, File>( r#" - INSERT INTO files (owner_id, name, description, transcript, location, summary, body) - VALUES ($1, $2, $3, $4, $5, NULL, $6) - RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + INSERT INTO files (owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, repo_file_path) + VALUES ($1, $2, $3, $4, $5, $6, $7, NULL, $8, $9) + RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(owner_id) + .bind(req.contract_id) + .bind(&contract_phase) .bind(&name) .bind(&req.description) .bind(&transcript_json) .bind(&req.location) .bind(&body_json) + .bind(&req.repo_file_path) .fetch_one(pool) .await } @@ -253,7 +285,7 @@ pub async fn get_file_for_owner( ) -> Result<Option<File>, sqlx::Error> { sqlx::query_as::<_, File>( r#" - SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files WHERE id = $1 AND owner_id = $2 "#, @@ -268,7 +300,7 @@ pub async fn get_file_for_owner( pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<File>, sqlx::Error> { sqlx::query_as::<_, File>( r#" - SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at FROM files WHERE owner_id = $1 ORDER BY created_at DESC @@ -279,6 +311,72 @@ pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<F .await } +/// Database row type for file summary with contract info +#[derive(Debug, sqlx::FromRow)] +struct FileSummaryRow { + id: Uuid, + contract_id: Option<Uuid>, + contract_name: Option<String>, + contract_phase: Option<String>, + name: String, + description: Option<String>, + #[sqlx(json)] + transcript: Vec<crate::db::models::TranscriptEntry>, + version: i32, + repo_file_path: Option<String>, + repo_sync_status: Option<String>, + created_at: chrono::DateTime<chrono::Utc>, + updated_at: chrono::DateTime<chrono::Utc>, +} + +/// List file summaries for an owner with contract info (joined). +pub async fn list_file_summaries_for_owner( + pool: &PgPool, + owner_id: Uuid, +) -> Result<Vec<FileSummary>, sqlx::Error> { + let rows = sqlx::query_as::<_, FileSummaryRow>( + r#" + SELECT + f.id, f.contract_id, c.name as contract_name, f.contract_phase, + f.name, f.description, f.transcript, f.version, + f.repo_file_path, f.repo_sync_status, f.created_at, f.updated_at + FROM files f + LEFT JOIN contracts c ON f.contract_id = c.id + WHERE f.owner_id = $1 + ORDER BY f.created_at DESC + "#, + ) + .bind(owner_id) + .fetch_all(pool) + .await?; + + Ok(rows + .into_iter() + .map(|row| { + let duration = row + .transcript + .iter() + .map(|t| t.end) + .fold(0.0_f32, f32::max); + FileSummary { + id: row.id, + contract_id: row.contract_id, + contract_name: row.contract_name, + contract_phase: row.contract_phase, + name: row.name, + description: row.description, + transcript_count: row.transcript.len(), + duration: if duration > 0.0 { Some(duration) } else { None }, + version: row.version, + repo_file_path: row.repo_file_path, + repo_sync_status: row.repo_sync_status, + created_at: row.created_at, + updated_at: row.updated_at, + } + }) + .collect()) +} + /// Update a file by ID with optimistic locking, scoped to owner. pub async fn update_file_for_owner( pool: &PgPool, @@ -318,7 +416,7 @@ pub async fn update_file_for_owner( UPDATE files SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() WHERE id = $1 AND owner_id = $2 AND version = $8 - RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) @@ -338,7 +436,7 @@ pub async fn update_file_for_owner( UPDATE files SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() WHERE id = $1 AND owner_id = $2 - RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at "#, ) .bind(id) @@ -511,6 +609,7 @@ pub async fn restore_file_version( summary: target.summary, body: Some(target.body), version: Some(current_version), + repo_file_path: None, }; update_file(pool, file_id, update_req).await @@ -540,26 +639,22 @@ pub async fn count_file_versions(pool: &PgPool, file_id: Uuid) -> Result<i64, sq /// to max 1 (2 levels: orchestrator at depth 0, subtasks at depth 1). /// /// NOTE: completion_action is NOT inherited - subtasks should not auto-merge unless -/// explicitly configured. The orchestrator controls when completion steps happen. +/// explicitly configured. The supervisor controls when completion steps happen. +/// +/// Task spawning is now controlled by supervisors at the application level. +/// Depth is no longer constrained in the database. pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, sqlx::Error> { // Calculate depth and inherit settings from parent if applicable - let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = + let (depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = if let Some(parent_id) = req.parent_task_id { - // Fetch parent task to get depth and inherit repo settings + // Fetch parent task to get depth and inherit settings let parent = get_task(pool, parent_id).await? .ok_or_else(|| sqlx::Error::RowNotFound)?; let new_depth = parent.depth + 1; - // Validate max depth (must be < 2, i.e., 0 or 1 only) - // Orchestrators are at depth 0, subtasks at depth 1 - // Subtasks cannot have their own children - if new_depth >= 2 { - return Err(sqlx::Error::Protocol(format!( - "Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.", - new_depth - ))); - } + // Subtasks inherit contract_id from parent + let contract_id = parent.contract_id.unwrap_or(req.contract_id); // Inherit repo settings if not provided let repo_url = req.repository_url.clone().or(parent.repository_url); @@ -568,14 +663,15 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, let merge_mode = req.merge_mode.clone().or(parent.merge_mode); let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path); // NOTE: completion_action is NOT inherited - subtasks should not auto-merge. - // The orchestrator integrates subtask work from their worktrees. + // The supervisor integrates subtask work from their worktrees. let completion_action = req.completion_action.clone(); - (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) + (new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) } else { - // Top-level task: depth 0 + // Top-level task: depth 0, use contract_id from request ( 0, + req.contract_id, req.repository_url.clone(), req.base_branch.clone(), req.target_branch.clone(), @@ -590,20 +686,22 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, sqlx::query_as::<_, Task>( r#" INSERT INTO tasks ( - parent_task_id, depth, name, description, plan, priority, - repository_url, base_branch, target_branch, merge_mode, + contract_id, parent_task_id, depth, name, description, plan, priority, + is_supervisor, repository_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action, continue_from_task_id, copy_files ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) RETURNING * "#, ) + .bind(contract_id) .bind(req.parent_task_id) .bind(depth) .bind(&req.name) .bind(&req.description) .bind(&req.plan) .bind(req.priority) + .bind(req.is_supervisor) .bind(&repo_url) .bind(&base_branch) .bind(&target_branch) @@ -635,10 +733,13 @@ pub async fn list_tasks(pool: &PgPool) -> Result<Vec<TaskSummary>, sqlx::Error> sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, - t.progress_summary, t.version, t.created_at, t.updated_at, - (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count + t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, + t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, + t.version, t.is_supervisor, t.created_at, t.updated_at FROM tasks t + LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id IS NULL ORDER BY t.priority DESC, t.created_at DESC "#, @@ -652,10 +753,13 @@ pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSum sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, - t.progress_summary, t.version, t.created_at, t.updated_at, - (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count + t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, + t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, + t.version, t.is_supervisor, t.created_at, t.updated_at FROM tasks t + LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id = $1 ORDER BY t.priority DESC, t.created_at DESC "#, @@ -665,6 +769,25 @@ pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSum .await } +/// List all tasks in a contract (for supervisor tree view). +pub async fn list_tasks_by_contract( + pool: &PgPool, + contract_id: Uuid, + owner_id: Uuid, +) -> Result<Vec<Task>, sqlx::Error> { + sqlx::query_as::<_, Task>( + r#" + SELECT * FROM tasks + WHERE contract_id = $1 AND owner_id = $2 + ORDER BY is_supervisor DESC, depth ASC, created_at ASC + "#, + ) + .bind(contract_id) + .bind(owner_id) + .fetch_all(pool) + .await +} + /// Update a task by ID with optimistic locking. pub async fn update_task( pool: &PgPool, @@ -817,9 +940,9 @@ pub async fn create_task_for_owner( req: CreateTaskRequest, ) -> Result<Task, sqlx::Error> { // Calculate depth and inherit settings from parent if applicable - let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = + let (depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = if let Some(parent_id) = req.parent_task_id { - // Fetch parent task to get depth and inherit repo settings (must belong to same owner) + // Fetch parent task to get depth and inherit settings (must belong to same owner) let parent = get_task_for_owner(pool, parent_id, owner_id).await? .ok_or_else(|| sqlx::Error::RowNotFound)?; @@ -833,6 +956,9 @@ pub async fn create_task_for_owner( ))); } + // Subtasks inherit contract_id from parent + let contract_id = parent.contract_id.unwrap_or(req.contract_id); + // Inherit repo settings if not provided let repo_url = req.repository_url.clone().or(parent.repository_url); let base_branch = req.base_branch.clone().or(parent.base_branch); @@ -843,11 +969,12 @@ pub async fn create_task_for_owner( // The orchestrator integrates subtask work from their worktrees. let completion_action = req.completion_action.clone(); - (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) + (new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) } else { - // Top-level task: depth 0 + // Top-level task: depth 0, use contract_id from request ( 0, + req.contract_id, req.repository_url.clone(), req.base_branch.clone(), req.target_branch.clone(), @@ -862,21 +989,23 @@ pub async fn create_task_for_owner( sqlx::query_as::<_, Task>( r#" INSERT INTO tasks ( - owner_id, parent_task_id, depth, name, description, plan, priority, - repository_url, base_branch, target_branch, merge_mode, + owner_id, contract_id, parent_task_id, depth, name, description, plan, priority, + is_supervisor, repository_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action, continue_from_task_id, copy_files ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) RETURNING * "#, ) .bind(owner_id) + .bind(contract_id) .bind(req.parent_task_id) .bind(depth) .bind(&req.name) .bind(&req.description) .bind(&req.plan) .bind(req.priority) + .bind(req.is_supervisor) .bind(&repo_url) .bind(&base_branch) .bind(&target_branch) @@ -916,10 +1045,13 @@ pub async fn list_tasks_for_owner( sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, - t.progress_summary, t.version, t.created_at, t.updated_at, - (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count + t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, + t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, + t.version, t.is_supervisor, t.created_at, t.updated_at FROM tasks t + LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.owner_id = $1 AND t.parent_task_id IS NULL ORDER BY t.priority DESC, t.created_at DESC "#, @@ -938,10 +1070,13 @@ pub async fn list_subtasks_for_owner( sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, - t.progress_summary, t.version, t.created_at, t.updated_at, - (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count + t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, + t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, + t.version, t.is_supervisor, t.created_at, t.updated_at FROM tasks t + LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.owner_id = $1 AND t.parent_task_id = $2 ORDER BY t.priority DESC, t.created_at DESC "#, @@ -986,6 +1121,7 @@ pub async fn update_task_for_owner( let error_message = req.error_message.or(existing.error_message); let merge_mode = req.merge_mode.or(existing.merge_mode); let pr_url = req.pr_url.or(existing.pr_url); + let repository_url = req.repository_url.or(existing.repository_url); let target_repo_path = req.target_repo_path.or(existing.target_repo_path); let completion_action = req.completion_action.or(existing.completion_action); let daemon_id = if req.clear_daemon_id { @@ -1002,8 +1138,9 @@ pub async fn update_task_for_owner( SET name = $3, description = $4, plan = $5, status = $6, priority = $7, progress_summary = $8, last_output = $9, error_message = $10, merge_mode = $11, pr_url = $12, daemon_id = $13, - target_repo_path = $14, completion_action = $15, updated_at = NOW() - WHERE id = $1 AND owner_id = $2 AND version = $16 + target_repo_path = $14, completion_action = $15, repository_url = $16, + updated_at = NOW() + WHERE id = $1 AND owner_id = $2 AND version = $17 RETURNING * "#, ) @@ -1022,6 +1159,7 @@ pub async fn update_task_for_owner( .bind(daemon_id) .bind(&target_repo_path) .bind(&completion_action) + .bind(&repository_url) .bind(req.version.unwrap()) .fetch_optional(pool) .await? @@ -1032,7 +1170,8 @@ pub async fn update_task_for_owner( SET name = $3, description = $4, plan = $5, status = $6, priority = $7, progress_summary = $8, last_output = $9, error_message = $10, merge_mode = $11, pr_url = $12, daemon_id = $13, - target_repo_path = $14, completion_action = $15, updated_at = NOW() + target_repo_path = $14, completion_action = $15, repository_url = $16, + updated_at = NOW() WHERE id = $1 AND owner_id = $2 RETURNING * "#, @@ -1052,6 +1191,7 @@ pub async fn update_task_for_owner( .bind(daemon_id) .bind(&target_repo_path) .bind(&completion_action) + .bind(&repository_url) .fetch_optional(pool) .await? }; @@ -1328,6 +1468,26 @@ pub async fn update_daemon_status( Ok(result.rows_affected() > 0) } +/// Mark daemon as disconnected by connection_id. +pub async fn disconnect_daemon_by_connection( + pool: &PgPool, + connection_id: &str, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + UPDATE daemons + SET status = 'disconnected', + disconnected_at = NOW() + WHERE connection_id = $1 + "#, + ) + .bind(connection_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + /// Update daemon task count. pub async fn update_daemon_task_count( pool: &PgPool, @@ -1393,6 +1553,25 @@ pub async fn count_daemons(pool: &PgPool) -> Result<i64, sqlx::Error> { Ok(result.0) } +/// Delete stale daemons that haven't sent a heartbeat within the timeout. +/// Returns the number of deleted daemons. +pub async fn delete_stale_daemons( + pool: &PgPool, + timeout_seconds: i64, +) -> Result<u64, sqlx::Error> { + let result = sqlx::query( + r#" + DELETE FROM daemons + WHERE last_heartbeat_at < NOW() - INTERVAL '1 second' * $1 + "#, + ) + .bind(timeout_seconds) + .execute(pool) + .await?; + + Ok(result.rows_affected()) +} + // ============================================================================= // Sibling Awareness Functions // ============================================================================= @@ -1408,10 +1587,13 @@ pub async fn list_sibling_tasks( sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, - t.progress_summary, t.version, t.created_at, t.updated_at, - (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count + t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, + t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, + t.version, t.is_supervisor, t.created_at, t.updated_at FROM tasks t + LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id = $1 AND t.id != $2 ORDER BY t.priority DESC, t.created_at DESC "#, @@ -1426,10 +1608,13 @@ pub async fn list_sibling_tasks( sqlx::query_as::<_, TaskSummary>( r#" SELECT - t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, - t.progress_summary, t.version, t.created_at, t.updated_at, - (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count + t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, + t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, + t.version, t.is_supervisor, t.created_at, t.updated_at FROM tasks t + LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id IS NULL AND t.id != $1 ORDER BY t.priority DESC, t.created_at DESC "#, @@ -1710,3 +1895,1092 @@ pub async fn clear_conversation(pool: &PgPool, owner_id: Uuid) -> Result<MeshCha // Create new active conversation get_or_create_active_conversation(pool, owner_id).await } + +// ============================================================================= +// Contract Chat History Functions +// ============================================================================= + +/// Get or create the active conversation for a contract. +pub async fn get_or_create_contract_conversation( + pool: &PgPool, + contract_id: Uuid, + owner_id: Uuid, +) -> Result<ContractChatConversation, sqlx::Error> { + // Try to get existing active conversation for this contract + let existing = sqlx::query_as::<_, ContractChatConversation>( + r#" + SELECT * + FROM contract_chat_conversations + WHERE is_active = true AND contract_id = $1 AND owner_id = $2 + LIMIT 1 + "#, + ) + .bind(contract_id) + .bind(owner_id) + .fetch_optional(pool) + .await?; + + if let Some(conv) = existing { + return Ok(conv); + } + + // Create new conversation + sqlx::query_as::<_, ContractChatConversation>( + r#" + INSERT INTO contract_chat_conversations (contract_id, owner_id, is_active) + VALUES ($1, $2, true) + RETURNING * + "#, + ) + .bind(contract_id) + .bind(owner_id) + .fetch_one(pool) + .await +} + +/// List messages for a contract conversation. +pub async fn list_contract_chat_messages( + pool: &PgPool, + conversation_id: Uuid, + limit: Option<i32>, +) -> Result<Vec<ContractChatMessageRecord>, sqlx::Error> { + let limit = limit.unwrap_or(100); + sqlx::query_as::<_, ContractChatMessageRecord>( + r#" + SELECT * + FROM contract_chat_messages + WHERE conversation_id = $1 + ORDER BY created_at ASC + LIMIT $2 + "#, + ) + .bind(conversation_id) + .bind(limit) + .fetch_all(pool) + .await +} + +/// Add a message to a contract conversation. +pub async fn add_contract_chat_message( + pool: &PgPool, + conversation_id: Uuid, + role: &str, + content: &str, + tool_calls: Option<serde_json::Value>, + pending_questions: Option<serde_json::Value>, +) -> Result<ContractChatMessageRecord, sqlx::Error> { + sqlx::query_as::<_, ContractChatMessageRecord>( + r#" + INSERT INTO contract_chat_messages + (conversation_id, role, content, tool_calls, pending_questions) + VALUES ($1, $2, $3, $4, $5) + RETURNING * + "#, + ) + .bind(conversation_id) + .bind(role) + .bind(content) + .bind(tool_calls) + .bind(pending_questions) + .fetch_one(pool) + .await +} + +/// Clear contract conversation (archive existing and create new). +pub async fn clear_contract_conversation( + pool: &PgPool, + contract_id: Uuid, + owner_id: Uuid, +) -> Result<ContractChatConversation, sqlx::Error> { + // Mark existing as inactive for this contract + sqlx::query( + r#" + UPDATE contract_chat_conversations + SET is_active = false, updated_at = NOW() + WHERE is_active = true AND contract_id = $1 AND owner_id = $2 + "#, + ) + .bind(contract_id) + .bind(owner_id) + .execute(pool) + .await?; + + // Create new active conversation + get_or_create_contract_conversation(pool, contract_id, owner_id).await +} + +// ============================================================================= +// Contract Functions (Owner-Scoped) +// ============================================================================= + +/// Create a new contract for a specific owner. +pub async fn create_contract_for_owner( + pool: &PgPool, + owner_id: Uuid, + req: CreateContractRequest, +) -> Result<Contract, sqlx::Error> { + // Use provided initial_phase or default to "research" + let phase = req.initial_phase.as_deref().unwrap_or("research"); + + // Validate the phase + let valid_phases = ["research", "specify", "plan", "execute", "review"]; + if !valid_phases.contains(&phase) { + return Err(sqlx::Error::Protocol(format!( + "Invalid initial_phase '{}'. Must be one of: {}", + phase, + valid_phases.join(", ") + ))); + } + + sqlx::query_as::<_, Contract>( + r#" + INSERT INTO contracts (owner_id, name, description, phase) + VALUES ($1, $2, $3, $4) + RETURNING * + "#, + ) + .bind(owner_id) + .bind(&req.name) + .bind(&req.description) + .bind(phase) + .fetch_one(pool) + .await +} + +/// Get a contract by ID, scoped to owner. +pub async fn get_contract_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, +) -> Result<Option<Contract>, sqlx::Error> { + sqlx::query_as::<_, Contract>( + r#" + SELECT * + FROM contracts + WHERE id = $1 AND owner_id = $2 + "#, + ) + .bind(id) + .bind(owner_id) + .fetch_optional(pool) + .await +} + +/// List all contracts for an owner, ordered by created_at DESC. +pub async fn list_contracts_for_owner( + pool: &PgPool, + owner_id: Uuid, +) -> Result<Vec<ContractSummary>, sqlx::Error> { + sqlx::query_as::<_, ContractSummary>( + r#" + SELECT + c.id, c.name, c.description, c.phase, c.status, + c.version, c.created_at, + (SELECT COUNT(*) FROM files WHERE contract_id = c.id) as file_count, + (SELECT COUNT(*) FROM tasks WHERE contract_id = c.id) as task_count, + (SELECT COUNT(*) FROM contract_repositories WHERE contract_id = c.id) as repository_count + FROM contracts c + WHERE c.owner_id = $1 + ORDER BY c.created_at DESC + "#, + ) + .bind(owner_id) + .fetch_all(pool) + .await +} + +/// Get contract summary by ID. +pub async fn get_contract_summary_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, +) -> Result<Option<ContractSummary>, sqlx::Error> { + sqlx::query_as::<_, ContractSummary>( + r#" + SELECT + c.id, c.name, c.description, c.phase, c.status, + c.version, c.created_at, + (SELECT COUNT(*) FROM files WHERE contract_id = c.id) as file_count, + (SELECT COUNT(*) FROM tasks WHERE contract_id = c.id) as task_count, + (SELECT COUNT(*) FROM contract_repositories WHERE contract_id = c.id) as repository_count + FROM contracts c + WHERE c.id = $1 AND c.owner_id = $2 + "#, + ) + .bind(id) + .bind(owner_id) + .fetch_optional(pool) + .await +} + +/// Update a contract by ID with optimistic locking, scoped to owner. +pub async fn update_contract_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, + req: UpdateContractRequest, +) -> Result<Option<Contract>, RepositoryError> { + let existing = get_contract_for_owner(pool, id, owner_id).await?; + let Some(existing) = existing else { + return Ok(None); + }; + + // Check version if provided (optimistic locking) + if let Some(expected_version) = req.version { + if existing.version != expected_version { + return Err(RepositoryError::VersionConflict { + expected: expected_version, + actual: existing.version, + }); + } + } + + // Apply updates + let name = req.name.unwrap_or(existing.name); + let description = req.description.or(existing.description); + let phase = req.phase.unwrap_or(existing.phase); + let status = req.status.unwrap_or(existing.status); + let supervisor_task_id = req.supervisor_task_id.or(existing.supervisor_task_id); + + let result = if req.version.is_some() { + sqlx::query_as::<_, Contract>( + r#" + UPDATE contracts + SET name = $3, description = $4, phase = $5, status = $6, + supervisor_task_id = $7, version = version + 1, updated_at = NOW() + WHERE id = $1 AND owner_id = $2 AND version = $8 + RETURNING * + "#, + ) + .bind(id) + .bind(owner_id) + .bind(&name) + .bind(&description) + .bind(&phase) + .bind(&status) + .bind(supervisor_task_id) + .bind(req.version.unwrap()) + .fetch_optional(pool) + .await? + } else { + sqlx::query_as::<_, Contract>( + r#" + UPDATE contracts + SET name = $3, description = $4, phase = $5, status = $6, + supervisor_task_id = $7, version = version + 1, updated_at = NOW() + WHERE id = $1 AND owner_id = $2 + RETURNING * + "#, + ) + .bind(id) + .bind(owner_id) + .bind(&name) + .bind(&description) + .bind(&phase) + .bind(&status) + .bind(supervisor_task_id) + .fetch_optional(pool) + .await? + }; + + // If versioned update returned None, there was a race condition + if result.is_none() && req.version.is_some() { + if let Some(current) = get_contract_for_owner(pool, id, owner_id).await? { + return Err(RepositoryError::VersionConflict { + expected: req.version.unwrap(), + actual: current.version, + }); + } + } + + Ok(result) +} + +/// Delete a contract by ID, scoped to owner. +pub async fn delete_contract_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + DELETE FROM contracts + WHERE id = $1 AND owner_id = $2 + "#, + ) + .bind(id) + .bind(owner_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Change contract phase and record event. +pub async fn change_contract_phase_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, + new_phase: &str, +) -> Result<Option<Contract>, sqlx::Error> { + // Get current phase + let existing = get_contract_for_owner(pool, id, owner_id).await?; + let Some(existing) = existing else { + return Ok(None); + }; + + let previous_phase = existing.phase.clone(); + + // Update phase + let contract = sqlx::query_as::<_, Contract>( + r#" + UPDATE contracts + SET phase = $3, version = version + 1, updated_at = NOW() + WHERE id = $1 AND owner_id = $2 + RETURNING * + "#, + ) + .bind(id) + .bind(owner_id) + .bind(new_phase) + .fetch_optional(pool) + .await?; + + // Record event + if contract.is_some() { + sqlx::query( + r#" + INSERT INTO contract_events (contract_id, event_type, previous_phase, new_phase) + VALUES ($1, 'phase_change', $2, $3) + "#, + ) + .bind(id) + .bind(&previous_phase) + .bind(new_phase) + .execute(pool) + .await?; + } + + Ok(contract) +} + +// ============================================================================= +// Contract Repository Functions +// ============================================================================= + +/// List repositories for a contract. +pub async fn list_contract_repositories( + pool: &PgPool, + contract_id: Uuid, +) -> Result<Vec<ContractRepository>, sqlx::Error> { + sqlx::query_as::<_, ContractRepository>( + r#" + SELECT * + FROM contract_repositories + WHERE contract_id = $1 + ORDER BY is_primary DESC, created_at ASC + "#, + ) + .bind(contract_id) + .fetch_all(pool) + .await +} + +/// Add a remote repository to a contract. +pub async fn add_remote_repository( + pool: &PgPool, + contract_id: Uuid, + name: &str, + repository_url: &str, + is_primary: bool, +) -> Result<ContractRepository, sqlx::Error> { + // If is_primary, clear other primaries first + if is_primary { + sqlx::query( + r#" + UPDATE contract_repositories + SET is_primary = false, updated_at = NOW() + WHERE contract_id = $1 AND is_primary = true + "#, + ) + .bind(contract_id) + .execute(pool) + .await?; + } + + sqlx::query_as::<_, ContractRepository>( + r#" + INSERT INTO contract_repositories (contract_id, name, repository_url, source_type, status, is_primary) + VALUES ($1, $2, $3, 'remote', 'ready', $4) + RETURNING * + "#, + ) + .bind(contract_id) + .bind(name) + .bind(repository_url) + .bind(is_primary) + .fetch_one(pool) + .await +} + +/// Add a local repository to a contract. +pub async fn add_local_repository( + pool: &PgPool, + contract_id: Uuid, + name: &str, + local_path: &str, + is_primary: bool, +) -> Result<ContractRepository, sqlx::Error> { + // If is_primary, clear other primaries first + if is_primary { + sqlx::query( + r#" + UPDATE contract_repositories + SET is_primary = false, updated_at = NOW() + WHERE contract_id = $1 AND is_primary = true + "#, + ) + .bind(contract_id) + .execute(pool) + .await?; + } + + sqlx::query_as::<_, ContractRepository>( + r#" + INSERT INTO contract_repositories (contract_id, name, local_path, source_type, status, is_primary) + VALUES ($1, $2, $3, 'local', 'ready', $4) + RETURNING * + "#, + ) + .bind(contract_id) + .bind(name) + .bind(local_path) + .bind(is_primary) + .fetch_one(pool) + .await +} + +/// Create a managed repository (daemon will create it). +pub async fn create_managed_repository( + pool: &PgPool, + contract_id: Uuid, + name: &str, + is_primary: bool, +) -> Result<ContractRepository, sqlx::Error> { + // If is_primary, clear other primaries first + if is_primary { + sqlx::query( + r#" + UPDATE contract_repositories + SET is_primary = false, updated_at = NOW() + WHERE contract_id = $1 AND is_primary = true + "#, + ) + .bind(contract_id) + .execute(pool) + .await?; + } + + sqlx::query_as::<_, ContractRepository>( + r#" + INSERT INTO contract_repositories (contract_id, name, source_type, status, is_primary) + VALUES ($1, $2, 'managed', 'pending', $3) + RETURNING * + "#, + ) + .bind(contract_id) + .bind(name) + .bind(is_primary) + .fetch_one(pool) + .await +} + +/// Delete a repository from a contract. +pub async fn delete_contract_repository( + pool: &PgPool, + repo_id: Uuid, + contract_id: Uuid, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + DELETE FROM contract_repositories + WHERE id = $1 AND contract_id = $2 + "#, + ) + .bind(repo_id) + .bind(contract_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Set a repository as primary (and clear others). +pub async fn set_repository_primary( + pool: &PgPool, + repo_id: Uuid, + contract_id: Uuid, +) -> Result<bool, sqlx::Error> { + // Clear other primaries + sqlx::query( + r#" + UPDATE contract_repositories + SET is_primary = false, updated_at = NOW() + WHERE contract_id = $1 AND is_primary = true + "#, + ) + .bind(contract_id) + .execute(pool) + .await?; + + // Set this one as primary + let result = sqlx::query( + r#" + UPDATE contract_repositories + SET is_primary = true, updated_at = NOW() + WHERE id = $1 AND contract_id = $2 + "#, + ) + .bind(repo_id) + .bind(contract_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Update managed repository status (used by daemon). +pub async fn update_managed_repository_status( + pool: &PgPool, + repo_id: Uuid, + status: &str, + repository_url: Option<&str>, +) -> Result<Option<ContractRepository>, sqlx::Error> { + sqlx::query_as::<_, ContractRepository>( + r#" + UPDATE contract_repositories + SET status = $2, repository_url = COALESCE($3, repository_url), updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(repo_id) + .bind(status) + .bind(repository_url) + .fetch_optional(pool) + .await +} + +// ============================================================================= +// Contract Task Association Functions +// ============================================================================= + +/// Add a task to a contract. +pub async fn add_task_to_contract( + pool: &PgPool, + contract_id: Uuid, + task_id: Uuid, + owner_id: Uuid, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + UPDATE tasks + SET contract_id = $2, updated_at = NOW() + WHERE id = $1 AND owner_id = $3 + "#, + ) + .bind(task_id) + .bind(contract_id) + .bind(owner_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Remove a task from a contract. +pub async fn remove_task_from_contract( + pool: &PgPool, + contract_id: Uuid, + task_id: Uuid, + owner_id: Uuid, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + UPDATE tasks + SET contract_id = NULL, updated_at = NOW() + WHERE id = $1 AND contract_id = $2 AND owner_id = $3 + "#, + ) + .bind(task_id) + .bind(contract_id) + .bind(owner_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// List files in a contract. +pub async fn list_files_in_contract( + pool: &PgPool, + contract_id: Uuid, + owner_id: Uuid, +) -> Result<Vec<FileSummary>, sqlx::Error> { + // Use a manual query since FileSummary doesn't have a FromRow derive with all the computed fields + let files = sqlx::query_as::<_, File>( + r#" + SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at + FROM files + WHERE contract_id = $1 AND owner_id = $2 + ORDER BY created_at DESC + "#, + ) + .bind(contract_id) + .bind(owner_id) + .fetch_all(pool) + .await?; + + Ok(files.into_iter().map(FileSummary::from).collect()) +} + +/// List tasks in a contract. +pub async fn list_tasks_in_contract( + pool: &PgPool, + contract_id: Uuid, + owner_id: Uuid, +) -> Result<Vec<TaskSummary>, sqlx::Error> { + sqlx::query_as::<_, TaskSummary>( + r#" + SELECT + t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase, + t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, + t.version, t.is_supervisor, t.created_at, t.updated_at + FROM tasks t + LEFT JOIN contracts c ON t.contract_id = c.id + WHERE t.contract_id = $1 AND t.owner_id = $2 + ORDER BY t.priority DESC, t.created_at DESC + "#, + ) + .bind(contract_id) + .bind(owner_id) + .fetch_all(pool) + .await +} + +// ============================================================================= +// Contract Events +// ============================================================================= + +/// List events for a contract. +pub async fn list_contract_events( + pool: &PgPool, + contract_id: Uuid, +) -> Result<Vec<ContractEvent>, sqlx::Error> { + sqlx::query_as::<_, ContractEvent>( + r#" + SELECT * + FROM contract_events + WHERE contract_id = $1 + ORDER BY created_at DESC + "#, + ) + .bind(contract_id) + .fetch_all(pool) + .await +} + +/// Record a contract event. +pub async fn record_contract_event( + pool: &PgPool, + contract_id: Uuid, + event_type: &str, + event_data: Option<serde_json::Value>, +) -> Result<ContractEvent, sqlx::Error> { + sqlx::query_as::<_, ContractEvent>( + r#" + INSERT INTO contract_events (contract_id, event_type, event_data) + VALUES ($1, $2, $3) + RETURNING * + "#, + ) + .bind(contract_id) + .bind(event_type) + .bind(event_data) + .fetch_one(pool) + .await +} + +// ============================================================================ +// Task Checkpoints +// ============================================================================ + +/// Create a checkpoint for a task. +pub async fn create_task_checkpoint( + pool: &PgPool, + task_id: Uuid, + commit_sha: &str, + branch_name: &str, + message: &str, + files_changed: Option<serde_json::Value>, + lines_added: Option<i32>, + lines_removed: Option<i32>, +) -> Result<TaskCheckpoint, sqlx::Error> { + // Get current checkpoint count and increment + let checkpoint_number: i32 = sqlx::query_scalar( + "SELECT COALESCE(MAX(checkpoint_number), 0) + 1 FROM task_checkpoints WHERE task_id = $1", + ) + .bind(task_id) + .fetch_one(pool) + .await?; + + // Update task's checkpoint tracking + sqlx::query( + r#" + UPDATE tasks + SET last_checkpoint_sha = $1, + checkpoint_count = $2, + checkpoint_message = $3, + updated_at = NOW() + WHERE id = $4 + "#, + ) + .bind(commit_sha) + .bind(checkpoint_number) + .bind(message) + .bind(task_id) + .execute(pool) + .await?; + + sqlx::query_as::<_, TaskCheckpoint>( + r#" + INSERT INTO task_checkpoints ( + task_id, checkpoint_number, commit_sha, branch_name, message, + files_changed, lines_added, lines_removed + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + RETURNING * + "#, + ) + .bind(task_id) + .bind(checkpoint_number) + .bind(commit_sha) + .bind(branch_name) + .bind(message) + .bind(files_changed) + .bind(lines_added) + .bind(lines_removed) + .fetch_one(pool) + .await +} + +/// Get a checkpoint by ID. +pub async fn get_task_checkpoint( + pool: &PgPool, + id: Uuid, +) -> Result<Option<TaskCheckpoint>, sqlx::Error> { + sqlx::query_as::<_, TaskCheckpoint>("SELECT * FROM task_checkpoints WHERE id = $1") + .bind(id) + .fetch_optional(pool) + .await +} + +/// Get a checkpoint by commit SHA. +pub async fn get_task_checkpoint_by_sha( + pool: &PgPool, + commit_sha: &str, +) -> Result<Option<TaskCheckpoint>, sqlx::Error> { + sqlx::query_as::<_, TaskCheckpoint>("SELECT * FROM task_checkpoints WHERE commit_sha = $1") + .bind(commit_sha) + .fetch_optional(pool) + .await +} + +/// List checkpoints for a task. +pub async fn list_task_checkpoints( + pool: &PgPool, + task_id: Uuid, +) -> Result<Vec<TaskCheckpoint>, sqlx::Error> { + sqlx::query_as::<_, TaskCheckpoint>( + "SELECT * FROM task_checkpoints WHERE task_id = $1 ORDER BY checkpoint_number DESC", + ) + .bind(task_id) + .fetch_all(pool) + .await +} + +// ============================================================================ +// Supervisor State +// ============================================================================ + +/// Create or update supervisor state for a contract. +pub async fn upsert_supervisor_state( + pool: &PgPool, + contract_id: Uuid, + task_id: Uuid, + conversation_history: serde_json::Value, + pending_task_ids: &[Uuid], + phase: &str, +) -> Result<SupervisorState, sqlx::Error> { + sqlx::query_as::<_, SupervisorState>( + r#" + INSERT INTO supervisor_states (contract_id, task_id, conversation_history, pending_task_ids, phase, last_activity) + VALUES ($1, $2, $3, $4, $5, NOW()) + ON CONFLICT (contract_id) DO UPDATE SET + task_id = EXCLUDED.task_id, + conversation_history = EXCLUDED.conversation_history, + pending_task_ids = EXCLUDED.pending_task_ids, + phase = EXCLUDED.phase, + last_activity = NOW(), + updated_at = NOW() + RETURNING * + "#, + ) + .bind(contract_id) + .bind(task_id) + .bind(conversation_history) + .bind(pending_task_ids) + .bind(phase) + .fetch_one(pool) + .await +} + +/// Get supervisor state for a contract. +pub async fn get_supervisor_state( + pool: &PgPool, + contract_id: Uuid, +) -> Result<Option<SupervisorState>, sqlx::Error> { + sqlx::query_as::<_, SupervisorState>("SELECT * FROM supervisor_states WHERE contract_id = $1") + .bind(contract_id) + .fetch_optional(pool) + .await +} + +/// Get supervisor state by task ID. +pub async fn get_supervisor_state_by_task( + pool: &PgPool, + task_id: Uuid, +) -> Result<Option<SupervisorState>, sqlx::Error> { + sqlx::query_as::<_, SupervisorState>("SELECT * FROM supervisor_states WHERE task_id = $1") + .bind(task_id) + .fetch_optional(pool) + .await +} + +/// Update supervisor conversation history. +pub async fn update_supervisor_conversation( + pool: &PgPool, + contract_id: Uuid, + conversation_history: serde_json::Value, +) -> Result<SupervisorState, sqlx::Error> { + sqlx::query_as::<_, SupervisorState>( + r#" + UPDATE supervisor_states + SET conversation_history = $1, + last_activity = NOW(), + updated_at = NOW() + WHERE contract_id = $2 + RETURNING * + "#, + ) + .bind(conversation_history) + .bind(contract_id) + .fetch_one(pool) + .await +} + +/// Update supervisor pending tasks. +pub async fn update_supervisor_pending_tasks( + pool: &PgPool, + contract_id: Uuid, + pending_task_ids: &[Uuid], +) -> Result<SupervisorState, sqlx::Error> { + sqlx::query_as::<_, SupervisorState>( + r#" + UPDATE supervisor_states + SET pending_task_ids = $1, + last_activity = NOW(), + updated_at = NOW() + WHERE contract_id = $2 + RETURNING * + "#, + ) + .bind(pending_task_ids) + .bind(contract_id) + .fetch_one(pool) + .await +} + +// ============================================================================ +// Contract Supervisor +// ============================================================================ + +/// Update contract's supervisor task ID. +pub async fn update_contract_supervisor( + pool: &PgPool, + contract_id: Uuid, + supervisor_task_id: Uuid, +) -> Result<Contract, sqlx::Error> { + sqlx::query_as::<_, Contract>( + r#" + UPDATE contracts + SET supervisor_task_id = $1, + updated_at = NOW() + WHERE id = $2 + RETURNING * + "#, + ) + .bind(supervisor_task_id) + .bind(contract_id) + .fetch_one(pool) + .await +} + +/// Get the supervisor task for a contract. +pub async fn get_contract_supervisor_task( + pool: &PgPool, + contract_id: Uuid, +) -> Result<Option<Task>, sqlx::Error> { + sqlx::query_as::<_, Task>( + r#" + SELECT t.* FROM tasks t + JOIN contracts c ON c.supervisor_task_id = t.id + WHERE c.id = $1 + "#, + ) + .bind(contract_id) + .fetch_optional(pool) + .await +} + +// ============================================================================ +// Task Tree Queries +// ============================================================================ + +/// Get full task tree for a contract. +pub async fn get_contract_task_tree( + pool: &PgPool, + contract_id: Uuid, +) -> Result<Vec<Task>, sqlx::Error> { + sqlx::query_as::<_, Task>( + r#" + WITH RECURSIVE task_tree AS ( + -- Base case: root tasks (no parent) + SELECT * FROM tasks + WHERE contract_id = $1 AND parent_task_id IS NULL + UNION ALL + -- Recursive case: children of current level + SELECT t.* FROM tasks t + JOIN task_tree tt ON t.parent_task_id = tt.id + ) + SELECT * FROM task_tree + ORDER BY depth, created_at + "#, + ) + .bind(contract_id) + .fetch_all(pool) + .await +} + +/// Get task tree from a specific root task. +pub async fn get_task_tree(pool: &PgPool, root_task_id: Uuid) -> Result<Vec<Task>, sqlx::Error> { + sqlx::query_as::<_, Task>( + r#" + WITH RECURSIVE task_tree AS ( + -- Base case: the root task + SELECT * FROM tasks WHERE id = $1 + UNION ALL + -- Recursive case: children of current level + SELECT t.* FROM tasks t + JOIN task_tree tt ON t.parent_task_id = tt.id + ) + SELECT * FROM task_tree + ORDER BY depth, created_at + "#, + ) + .bind(root_task_id) + .fetch_all(pool) + .await +} + +// ============================================================================ +// Daemon Selection +// ============================================================================ + +/// Get daemons with capacity info for selection. +pub async fn get_available_daemons( + pool: &PgPool, + owner_id: Uuid, +) -> Result<Vec<DaemonWithCapacity>, sqlx::Error> { + sqlx::query_as::<_, DaemonWithCapacity>( + r#" + SELECT id, owner_id, connection_id, hostname, machine_id, + max_concurrent_tasks, current_task_count, + capacity_score, task_queue_length, supports_migration, + status, last_heartbeat_at, connected_at + FROM daemons + WHERE owner_id = $1 AND status = 'connected' + ORDER BY + COALESCE(capacity_score, 100) DESC, + (max_concurrent_tasks - current_task_count) DESC, + COALESCE(task_queue_length, 0) ASC + "#, + ) + .bind(owner_id) + .fetch_all(pool) + .await +} + +/// Create a daemon task assignment. +pub async fn create_daemon_task_assignment( + pool: &PgPool, + daemon_id: Uuid, + task_id: Uuid, +) -> Result<DaemonTaskAssignment, sqlx::Error> { + sqlx::query_as::<_, DaemonTaskAssignment>( + r#" + INSERT INTO daemon_task_assignments (daemon_id, task_id) + VALUES ($1, $2) + RETURNING * + "#, + ) + .bind(daemon_id) + .bind(task_id) + .fetch_one(pool) + .await +} + +/// Update daemon task assignment status. +pub async fn update_daemon_task_assignment_status( + pool: &PgPool, + task_id: Uuid, + status: &str, +) -> Result<DaemonTaskAssignment, sqlx::Error> { + sqlx::query_as::<_, DaemonTaskAssignment>( + r#" + UPDATE daemon_task_assignments + SET status = $1 + WHERE task_id = $2 + RETURNING * + "#, + ) + .bind(status) + .bind(task_id) + .fetch_one(pool) + .await +} + +/// Get daemon task assignment for a task. +pub async fn get_daemon_task_assignment( + pool: &PgPool, + task_id: Uuid, +) -> Result<Option<DaemonTaskAssignment>, sqlx::Error> { + sqlx::query_as::<_, DaemonTaskAssignment>( + "SELECT * FROM daemon_task_assignments WHERE task_id = $1", + ) + .bind(task_id) + .fetch_optional(pool) + .await +} |
