summaryrefslogtreecommitdiff
path: root/makima/src/db
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-11 05:52:14 +0000
committersoryu <soryu@soryu.co>2026-01-15 00:21:16 +0000
commit87044a747b47bd83249d61a45842c7f7b2eae56d (patch)
treeef2000ce79ffcc2723ef841acef5aa1deb1d5378 /makima/src/db
parent077820c4167c168072d217a1b01df840463a12a8 (diff)
downloadsoryu-87044a747b47bd83249d61a45842c7f7b2eae56d.tar.gz
soryu-87044a747b47bd83249d61a45842c7f7b2eae56d.zip
Contract system
Diffstat (limited to 'makima/src/db')
-rw-r--r--makima/src/db/models.rs603
-rw-r--r--makima/src/db/repository.rs1400
2 files changed, 1938 insertions, 65 deletions
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs
index 5064b97..e16c43f 100644
--- a/makima/src/db/models.rs
+++ b/makima/src/db/models.rs
@@ -60,6 +60,8 @@ pub enum BodyElement {
alt: Option<String>,
caption: Option<String>,
},
+ /// Raw markdown content - renders as formatted markdown, edits as raw text
+ Markdown { content: String },
}
/// File record from the database.
@@ -68,6 +70,10 @@ pub enum BodyElement {
pub struct File {
pub id: Uuid,
pub owner_id: Uuid,
+ /// Contract this file belongs to (optional)
+ pub contract_id: Option<Uuid>,
+ /// Phase of the contract when file was added (e.g., "research", "specify")
+ pub contract_phase: Option<String>,
pub name: String,
pub description: Option<String>,
#[sqlx(json)]
@@ -80,6 +86,12 @@ pub struct File {
pub body: Vec<BodyElement>,
/// Version number for optimistic locking
pub version: i32,
+ /// Path to linked repository file (e.g., "README.md", "docs/design.md")
+ pub repo_file_path: Option<String>,
+ /// When the file was last synced from the repository
+ pub repo_synced_at: Option<DateTime<Utc>>,
+ /// Sync status: 'none', 'synced', 'modified', 'conflict'
+ pub repo_sync_status: Option<String>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
@@ -88,14 +100,24 @@ pub struct File {
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateFileRequest {
+ /// Contract this file belongs to (required - files must belong to a contract)
+ pub contract_id: Uuid,
/// Name of the file (auto-generated if not provided)
pub name: Option<String>,
/// Optional description
pub description: Option<String>,
- /// Transcript entries
+ /// Transcript entries (default to empty)
+ #[serde(default)]
pub transcript: Vec<TranscriptEntry>,
/// Storage location (e.g., s3://bucket/path) - not used yet
pub location: Option<String>,
+ /// Initial body elements (e.g., from a template)
+ #[serde(default)]
+ pub body: Vec<BodyElement>,
+ /// Path to linked repository file (e.g., "README.md")
+ pub repo_file_path: Option<String>,
+ /// Contract phase this file belongs to (for deliverable tracking)
+ pub contract_phase: Option<String>,
}
/// Request payload for updating an existing file.
@@ -114,6 +136,8 @@ pub struct UpdateFileRequest {
pub body: Option<Vec<BodyElement>>,
/// Version for optimistic locking (required for updates from frontend)
pub version: Option<i32>,
+ /// Path to linked repository file (e.g., "README.md")
+ pub repo_file_path: Option<String>,
}
/// Response for file list endpoint.
@@ -129,6 +153,12 @@ pub struct FileListResponse {
#[serde(rename_all = "camelCase")]
pub struct FileSummary {
pub id: Uuid,
+ /// Contract this file belongs to
+ pub contract_id: Option<Uuid>,
+ /// Contract name (joined from contracts table)
+ pub contract_name: Option<String>,
+ /// Phase when file was added to contract
+ pub contract_phase: Option<String>,
pub name: String,
pub description: Option<String>,
pub transcript_count: usize,
@@ -136,6 +166,10 @@ pub struct FileSummary {
pub duration: Option<f32>,
/// Version number for optimistic locking
pub version: i32,
+ /// Path to linked repository file
+ pub repo_file_path: Option<String>,
+ /// Sync status with repository
+ pub repo_sync_status: Option<String>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
@@ -149,11 +183,16 @@ impl From<File> for FileSummary {
.fold(0.0_f32, f32::max);
Self {
id: file.id,
+ contract_id: file.contract_id,
+ contract_name: None, // Not available from File alone, requires JOIN
+ contract_phase: file.contract_phase,
name: file.name,
description: file.description,
transcript_count: file.transcript.len(),
duration: if duration > 0.0 { Some(duration) } else { None },
version: file.version,
+ repo_file_path: file.repo_file_path,
+ repo_sync_status: file.repo_sync_status,
created_at: file.created_at,
updated_at: file.updated_at,
}
@@ -345,8 +384,10 @@ impl std::str::FromStr for MergeMode {
pub struct Task {
pub id: Uuid,
pub owner_id: Uuid,
+ /// Contract this task belongs to (required for new tasks)
+ pub contract_id: Option<Uuid>,
pub parent_task_id: Option<Uuid>,
- /// Depth in task hierarchy: 0=orchestrator (top-level), 1=subtask (max)
+ /// Depth in task hierarchy (no longer constrained)
pub depth: i32,
pub name: String,
pub description: Option<String>,
@@ -354,6 +395,11 @@ pub struct Task {
pub priority: i32,
pub plan: String,
+ // Supervisor flag
+ /// True for contract supervisor tasks. Only supervisors can spawn new tasks.
+ #[serde(default)]
+ pub is_supervisor: bool,
+
// Daemon/container info
pub daemon_id: Option<Uuid>,
pub container_id: Option<String>,
@@ -379,6 +425,30 @@ pub struct Task {
pub last_output: Option<String>,
pub error_message: Option<String>,
+ // Git checkpoint tracking
+ /// Git commit SHA of the most recent checkpoint
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub last_checkpoint_sha: Option<String>,
+ /// Number of checkpoints created by this task
+ #[serde(default)]
+ pub checkpoint_count: i32,
+ /// Message from the most recent checkpoint
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub checkpoint_message: Option<String>,
+
+ // Conversation state for resumption
+ /// Saved conversation context for task/supervisor resumption
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub conversation_state: Option<serde_json::Value>,
+
+ // Daemon migration tracking
+ /// Previous daemon if task was migrated
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub migrated_from_daemon_id: Option<Uuid>,
+ /// Most recent daemon that worked on this task
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub last_active_daemon_id: Option<Uuid>,
+
// Timestamps
pub started_at: Option<DateTime<Utc>>,
pub completed_at: Option<DateTime<Utc>>,
@@ -413,6 +483,12 @@ impl Task {
#[serde(rename_all = "camelCase")]
pub struct TaskSummary {
pub id: Uuid,
+ /// Contract this task belongs to
+ pub contract_id: Option<Uuid>,
+ /// Contract name (joined from contracts table)
+ pub contract_name: Option<String>,
+ /// Contract phase (joined from contracts table)
+ pub contract_phase: Option<String>,
pub parent_task_id: Option<Uuid>,
/// Depth in task hierarchy: 0=orchestrator (top-level), 1=subtask (max)
pub depth: i32,
@@ -422,10 +498,36 @@ pub struct TaskSummary {
pub progress_summary: Option<String>,
pub subtask_count: i64,
pub version: i32,
+ /// True for contract supervisor tasks
+ #[serde(default)]
+ pub is_supervisor: bool,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
+/// Convert a full Task to a TaskSummary
+impl From<Task> for TaskSummary {
+ fn from(task: Task) -> Self {
+ Self {
+ id: task.id,
+ contract_id: task.contract_id,
+ contract_name: None, // Not available from Task directly
+ contract_phase: None, // Not available from Task directly
+ parent_task_id: task.parent_task_id,
+ depth: task.depth,
+ name: task.name,
+ status: task.status,
+ priority: task.priority,
+ progress_summary: task.progress_summary,
+ subtask_count: 0, // Would need separate query
+ version: task.version,
+ is_supervisor: task.is_supervisor,
+ created_at: task.created_at,
+ updated_at: task.updated_at,
+ }
+ }
+}
+
/// Response for task list endpoint
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
@@ -438,6 +540,8 @@ pub struct TaskListResponse {
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateTaskRequest {
+ /// Contract this task belongs to (required)
+ pub contract_id: Uuid,
/// Name of the task
pub name: String,
/// Optional description
@@ -446,6 +550,9 @@ pub struct CreateTaskRequest {
pub plan: String,
/// Parent task ID (for subtasks)
pub parent_task_id: Option<Uuid>,
+ /// True for contract supervisor tasks. Only supervisors can spawn new tasks.
+ #[serde(default)]
+ pub is_supervisor: bool,
/// Priority (higher = more urgent)
#[serde(default)]
pub priority: i32,
@@ -466,6 +573,8 @@ pub struct CreateTaskRequest {
/// Files to copy from parent task's worktree when starting
#[serde(skip_serializing_if = "Option::is_none")]
pub copy_files: Option<Vec<String>>,
+ /// Checkpoint SHA to branch from (optional)
+ pub checkpoint_sha: Option<String>,
}
/// Request payload for updating a task
@@ -482,6 +591,8 @@ pub struct UpdateTaskRequest {
pub error_message: Option<String>,
pub merge_mode: Option<String>,
pub pr_url: Option<String>,
+ /// Repository URL for the task (e.g., when updating supervisor with repo info)
+ pub repository_url: Option<String>,
/// Path to user's local repository (outside ~/.makima)
pub target_repo_path: Option<String>,
/// Action on completion: "none", "branch", "merge", "pr"
@@ -733,6 +844,47 @@ pub struct MeshChatHistoryResponse {
}
// =============================================================================
+// Contract Chat History Types
+// =============================================================================
+
+/// Conversation thread for contract chat (scoped to a specific contract)
+#[derive(Debug, Clone, FromRow, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ContractChatConversation {
+ pub id: Uuid,
+ pub contract_id: Uuid,
+ pub owner_id: Uuid,
+ pub name: Option<String>,
+ pub is_active: bool,
+ pub created_at: DateTime<Utc>,
+ pub updated_at: DateTime<Utc>,
+}
+
+/// Individual message in a contract chat conversation
+#[derive(Debug, Clone, FromRow, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ContractChatMessageRecord {
+ pub id: Uuid,
+ pub conversation_id: Uuid,
+ pub role: String,
+ pub content: String,
+ /// Tool calls made during this message (JSON, nullable)
+ pub tool_calls: Option<serde_json::Value>,
+ /// Pending questions requiring user response (JSON, nullable)
+ pub pending_questions: Option<serde_json::Value>,
+ pub created_at: DateTime<Utc>,
+}
+
+/// Response for contract chat history endpoint
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ContractChatHistoryResponse {
+ pub contract_id: Uuid,
+ pub conversation_id: Uuid,
+ pub messages: Vec<ContractChatMessageRecord>,
+}
+
+// =============================================================================
// Merge API Types
// =============================================================================
@@ -834,3 +986,450 @@ pub struct MergeCompleteCheckResponse {
/// Count of skipped branches
pub skipped_count: u32,
}
+
+// =============================================================================
+// Contract Types
+// =============================================================================
+
+/// Contract phase for workflow progression
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "lowercase")]
+pub enum ContractPhase {
+ Research,
+ Specify,
+ Plan,
+ Execute,
+ Review,
+}
+
+impl std::fmt::Display for ContractPhase {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ ContractPhase::Research => write!(f, "research"),
+ ContractPhase::Specify => write!(f, "specify"),
+ ContractPhase::Plan => write!(f, "plan"),
+ ContractPhase::Execute => write!(f, "execute"),
+ ContractPhase::Review => write!(f, "review"),
+ }
+ }
+}
+
+impl std::str::FromStr for ContractPhase {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.to_lowercase().as_str() {
+ "research" => Ok(ContractPhase::Research),
+ "specify" => Ok(ContractPhase::Specify),
+ "plan" => Ok(ContractPhase::Plan),
+ "execute" => Ok(ContractPhase::Execute),
+ "review" => Ok(ContractPhase::Review),
+ _ => Err(format!("Unknown contract phase: {}", s)),
+ }
+ }
+}
+
+/// Contract status
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "lowercase")]
+pub enum ContractStatus {
+ Active,
+ Completed,
+ Archived,
+}
+
+impl std::fmt::Display for ContractStatus {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ ContractStatus::Active => write!(f, "active"),
+ ContractStatus::Completed => write!(f, "completed"),
+ ContractStatus::Archived => write!(f, "archived"),
+ }
+ }
+}
+
+impl std::str::FromStr for ContractStatus {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.to_lowercase().as_str() {
+ "active" => Ok(ContractStatus::Active),
+ "completed" => Ok(ContractStatus::Completed),
+ "archived" => Ok(ContractStatus::Archived),
+ _ => Err(format!("Unknown contract status: {}", s)),
+ }
+ }
+}
+
+/// Repository source type
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "lowercase")]
+pub enum RepositorySourceType {
+ /// Existing remote repo (GitHub, GitLab, etc)
+ Remote,
+ /// Existing local repo
+ Local,
+ /// New repo created/managed by Makima daemon
+ Managed,
+}
+
+impl std::fmt::Display for RepositorySourceType {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ RepositorySourceType::Remote => write!(f, "remote"),
+ RepositorySourceType::Local => write!(f, "local"),
+ RepositorySourceType::Managed => write!(f, "managed"),
+ }
+ }
+}
+
+impl std::str::FromStr for RepositorySourceType {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.to_lowercase().as_str() {
+ "remote" => Ok(RepositorySourceType::Remote),
+ "local" => Ok(RepositorySourceType::Local),
+ "managed" => Ok(RepositorySourceType::Managed),
+ _ => Err(format!("Unknown repository source type: {}", s)),
+ }
+ }
+}
+
+/// Repository status (for managed repos)
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "lowercase")]
+pub enum RepositoryStatus {
+ /// Repo is usable
+ Ready,
+ /// Waiting for daemon to create
+ Pending,
+ /// Daemon is creating the repo
+ Creating,
+ /// Creation failed
+ Failed,
+}
+
+impl std::fmt::Display for RepositoryStatus {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ RepositoryStatus::Ready => write!(f, "ready"),
+ RepositoryStatus::Pending => write!(f, "pending"),
+ RepositoryStatus::Creating => write!(f, "creating"),
+ RepositoryStatus::Failed => write!(f, "failed"),
+ }
+ }
+}
+
+impl std::str::FromStr for RepositoryStatus {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.to_lowercase().as_str() {
+ "ready" => Ok(RepositoryStatus::Ready),
+ "pending" => Ok(RepositoryStatus::Pending),
+ "creating" => Ok(RepositoryStatus::Creating),
+ "failed" => Ok(RepositoryStatus::Failed),
+ _ => Err(format!("Unknown repository status: {}", s)),
+ }
+ }
+}
+
+/// Contract record from the database
+#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct Contract {
+ pub id: Uuid,
+ pub owner_id: Uuid,
+ pub name: String,
+ pub description: Option<String>,
+ pub phase: String,
+ pub status: String,
+ /// The long-running supervisor task that orchestrates this contract
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub supervisor_task_id: Option<Uuid>,
+ pub version: i32,
+ pub created_at: DateTime<Utc>,
+ pub updated_at: DateTime<Utc>,
+}
+
+impl Contract {
+ /// Parse phase string to ContractPhase enum
+ pub fn phase_enum(&self) -> Result<ContractPhase, String> {
+ self.phase.parse()
+ }
+
+ /// Parse status string to ContractStatus enum
+ pub fn status_enum(&self) -> Result<ContractStatus, String> {
+ self.status.parse()
+ }
+}
+
+/// Contract repository record from the database
+#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ContractRepository {
+ pub id: Uuid,
+ pub contract_id: Uuid,
+ pub name: String,
+ pub repository_url: Option<String>,
+ pub local_path: Option<String>,
+ pub source_type: String,
+ pub status: String,
+ pub is_primary: bool,
+ pub created_at: DateTime<Utc>,
+ pub updated_at: DateTime<Utc>,
+}
+
+impl ContractRepository {
+ /// Parse source_type string to RepositorySourceType enum
+ pub fn source_type_enum(&self) -> Result<RepositorySourceType, String> {
+ self.source_type.parse()
+ }
+
+ /// Parse status string to RepositoryStatus enum
+ pub fn status_enum(&self) -> Result<RepositoryStatus, String> {
+ self.status.parse()
+ }
+}
+
+/// Summary of a contract for list views
+#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ContractSummary {
+ pub id: Uuid,
+ pub name: String,
+ pub description: Option<String>,
+ pub phase: String,
+ pub status: String,
+ pub file_count: i64,
+ pub task_count: i64,
+ pub repository_count: i64,
+ pub version: i32,
+ pub created_at: DateTime<Utc>,
+}
+
+/// Contract with all relations for detail view
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ContractWithRelations {
+ #[serde(flatten)]
+ pub contract: Contract,
+ pub repositories: Vec<ContractRepository>,
+ pub files: Vec<FileSummary>,
+ pub tasks: Vec<TaskSummary>,
+}
+
+/// Response for contract list endpoint
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ContractListResponse {
+ pub contracts: Vec<ContractSummary>,
+ pub total: i64,
+}
+
+/// Request payload for creating a new contract
+#[derive(Debug, Clone, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct CreateContractRequest {
+ /// Name of the contract
+ pub name: String,
+ /// Optional description
+ pub description: Option<String>,
+ /// Initial phase to start in (defaults to "research")
+ /// Valid values: "research", "specify", "plan", "execute", "review"
+ #[serde(default)]
+ pub initial_phase: Option<String>,
+}
+
+/// Request payload for updating a contract
+#[derive(Debug, Default, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct UpdateContractRequest {
+ pub name: Option<String>,
+ pub description: Option<String>,
+ pub phase: Option<String>,
+ pub status: Option<String>,
+ /// Supervisor task ID for contract orchestration
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub supervisor_task_id: Option<Uuid>,
+ /// Version for optimistic locking
+ pub version: Option<i32>,
+}
+
+/// Request to add a remote repository to a contract
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct AddRemoteRepositoryRequest {
+ pub name: String,
+ pub repository_url: String,
+ #[serde(default)]
+ pub is_primary: bool,
+}
+
+/// Request to add a local repository to a contract
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct AddLocalRepositoryRequest {
+ pub name: String,
+ pub local_path: String,
+ #[serde(default)]
+ pub is_primary: bool,
+}
+
+/// Request to create a managed repository (daemon will create it)
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct CreateManagedRepositoryRequest {
+ pub name: String,
+ #[serde(default)]
+ pub is_primary: bool,
+}
+
+/// Request to change contract phase
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ChangePhaseRequest {
+ pub phase: String,
+}
+
+/// Contract event record from the database
+#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ContractEvent {
+ pub id: Uuid,
+ pub contract_id: Uuid,
+ pub event_type: String,
+ pub previous_phase: Option<String>,
+ pub new_phase: Option<String>,
+ #[sqlx(json)]
+ pub event_data: Option<serde_json::Value>,
+ pub created_at: DateTime<Utc>,
+}
+
+/// Response for contract events list endpoint
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ContractEventListResponse {
+ pub events: Vec<ContractEvent>,
+ pub total: i64,
+}
+
+// ============================================================================
+// Task Checkpoints (for git checkpoint tracking)
+// ============================================================================
+
+/// Task checkpoint record - represents a git commit checkpoint
+#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct TaskCheckpoint {
+ pub id: Uuid,
+ pub task_id: Uuid,
+ /// Sequential checkpoint number within this task
+ pub checkpoint_number: i32,
+ /// Git commit SHA
+ pub commit_sha: String,
+ /// Git branch name
+ pub branch_name: String,
+ /// Commit message
+ pub message: String,
+ /// Files changed in this commit: [{path, action: 'A'|'M'|'D'}]
+ #[sqlx(json)]
+ pub files_changed: Option<serde_json::Value>,
+ /// Lines added in this commit
+ pub lines_added: Option<i32>,
+ /// Lines removed in this commit
+ pub lines_removed: Option<i32>,
+ pub created_at: DateTime<Utc>,
+}
+
+/// Request to create a checkpoint
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct CreateCheckpointRequest {
+ /// Commit message
+ pub message: String,
+}
+
+/// Response for checkpoint list endpoint
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct CheckpointListResponse {
+ pub checkpoints: Vec<TaskCheckpoint>,
+ pub total: i64,
+}
+
+// ============================================================================
+// Supervisor State (for supervisor resumability)
+// ============================================================================
+
+/// Supervisor state for contract supervisor tasks
+/// Enables resumption after interruption
+#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct SupervisorState {
+ pub id: Uuid,
+ pub contract_id: Uuid,
+ pub task_id: Uuid,
+ /// Full Claude conversation history for resumption
+ #[sqlx(json)]
+ pub conversation_history: serde_json::Value,
+ /// Last checkpoint this supervisor created
+ pub last_checkpoint_id: Option<Uuid>,
+ /// Tasks the supervisor is waiting on
+ #[sqlx(try_from = "Vec<Uuid>")]
+ pub pending_task_ids: Vec<Uuid>,
+ /// Current contract phase when supervisor was last active
+ pub phase: String,
+ /// When supervisor was last active
+ pub last_activity: DateTime<Utc>,
+ pub created_at: DateTime<Utc>,
+ pub updated_at: DateTime<Utc>,
+}
+
+/// Request to update supervisor state
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct UpdateSupervisorStateRequest {
+ /// Updated conversation history
+ pub conversation_history: Option<serde_json::Value>,
+ /// Tasks the supervisor is waiting on
+ pub pending_task_ids: Option<Vec<Uuid>>,
+ /// Current contract phase
+ pub phase: Option<String>,
+}
+
+// ============================================================================
+// Daemon Task Assignments (for multi-daemon support)
+// ============================================================================
+
+/// Daemon task assignment record
+#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct DaemonTaskAssignment {
+ pub id: Uuid,
+ pub daemon_id: Uuid,
+ pub task_id: Uuid,
+ pub assigned_at: DateTime<Utc>,
+ /// Status: 'active', 'migrating', 'completed'
+ pub status: String,
+}
+
+/// Extended daemon info for selection
+#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct DaemonWithCapacity {
+ pub id: Uuid,
+ pub owner_id: Uuid,
+ pub connection_id: String,
+ pub hostname: Option<String>,
+ pub machine_id: Option<String>,
+ pub max_concurrent_tasks: i32,
+ pub current_task_count: i32,
+ pub capacity_score: Option<i32>,
+ pub task_queue_length: Option<i32>,
+ pub supports_migration: Option<bool>,
+ pub status: String,
+ pub last_heartbeat_at: DateTime<Utc>,
+ pub connected_at: DateTime<Utc>,
+}
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index ce1e97d..3b911c2 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -5,8 +5,12 @@ use sqlx::PgPool;
use uuid::Uuid;
use super::models::{
- CreateFileRequest, CreateTaskRequest, Daemon, File, FileVersion, MeshChatConversation,
- MeshChatMessageRecord, Task, TaskEvent, TaskSummary, UpdateFileRequest, UpdateTaskRequest,
+ Contract, ContractChatConversation, ContractChatMessageRecord, ContractEvent, ContractRepository,
+ ContractSummary, CreateCheckpointRequest, CreateContractRequest, CreateFileRequest,
+ CreateTaskRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, File, FileSummary,
+ FileVersion, MeshChatConversation, MeshChatMessageRecord, SupervisorState, Task, TaskCheckpoint,
+ TaskEvent, TaskSummary, UpdateContractRequest, UpdateFileRequest, UpdateSupervisorStateRequest,
+ UpdateTaskRequest,
};
/// Repository error types.
@@ -52,8 +56,18 @@ fn generate_default_name() -> String {
now.format("Recording - %b %d %Y %H:%M:%S").to_string()
}
-/// Create a new file record.
-pub async fn create_file(pool: &PgPool, req: CreateFileRequest) -> Result<File, sqlx::Error> {
+/// Internal request for creating files without contract association (e.g., audio transcription).
+/// User-facing file creation should use CreateFileRequest which requires contract_id.
+pub struct InternalCreateFileRequest {
+ pub name: Option<String>,
+ pub description: Option<String>,
+ pub transcript: Vec<super::models::TranscriptEntry>,
+ pub location: Option<String>,
+}
+
+/// Create a new file record (internal use, no contract required).
+/// For user-facing file creation, use create_file_for_owner which requires a contract.
+pub async fn create_file(pool: &PgPool, req: InternalCreateFileRequest) -> Result<File, sqlx::Error> {
let name = req.name.unwrap_or_else(generate_default_name);
let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default();
let body_json = serde_json::to_value::<Vec<super::models::BodyElement>>(vec![]).unwrap();
@@ -62,7 +76,7 @@ pub async fn create_file(pool: &PgPool, req: CreateFileRequest) -> Result<File,
r#"
INSERT INTO files (name, description, transcript, location, summary, body)
VALUES ($1, $2, $3, $4, NULL, $5)
- RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
+ RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(&name)
@@ -78,7 +92,7 @@ pub async fn create_file(pool: &PgPool, req: CreateFileRequest) -> Result<File,
pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
- SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
+ SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
WHERE id = $1
"#,
@@ -92,7 +106,7 @@ pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Err
pub async fn list_files(pool: &PgPool) -> Result<Vec<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
- SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
+ SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
ORDER BY created_at DESC
"#,
@@ -144,7 +158,7 @@ pub async fn update_file(
UPDATE files
SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW()
WHERE id = $1 AND version = $7
- RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
+ RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(id)
@@ -163,7 +177,7 @@ pub async fn update_file(
UPDATE files
SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW()
WHERE id = $1
- RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
+ RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(id)
@@ -219,6 +233,7 @@ pub async fn count_files(pool: &PgPool) -> Result<i64, sqlx::Error> {
// =============================================================================
/// Create a new file record for a specific owner.
+/// Files must belong to a contract - the contract_id is required and the phase is looked up.
pub async fn create_file_for_owner(
pool: &PgPool,
owner_id: Uuid,
@@ -226,21 +241,38 @@ pub async fn create_file_for_owner(
) -> Result<File, sqlx::Error> {
let name = req.name.unwrap_or_else(generate_default_name);
let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default();
- let body_json = serde_json::to_value::<Vec<super::models::BodyElement>>(vec![]).unwrap();
+ // Use body from request (may be empty or contain template elements)
+ let body_json = serde_json::to_value(&req.body).unwrap_or_default();
+
+ // Use provided contract_phase, or look up from contract's current phase
+ let contract_phase: Option<String> = if req.contract_phase.is_some() {
+ req.contract_phase
+ } else {
+ sqlx::query_scalar(
+ "SELECT phase FROM contracts WHERE id = $1 AND owner_id = $2",
+ )
+ .bind(req.contract_id)
+ .bind(owner_id)
+ .fetch_optional(pool)
+ .await?
+ };
sqlx::query_as::<_, File>(
r#"
- INSERT INTO files (owner_id, name, description, transcript, location, summary, body)
- VALUES ($1, $2, $3, $4, $5, NULL, $6)
- RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
+ INSERT INTO files (owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, repo_file_path)
+ VALUES ($1, $2, $3, $4, $5, $6, $7, NULL, $8, $9)
+ RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(owner_id)
+ .bind(req.contract_id)
+ .bind(&contract_phase)
.bind(&name)
.bind(&req.description)
.bind(&transcript_json)
.bind(&req.location)
.bind(&body_json)
+ .bind(&req.repo_file_path)
.fetch_one(pool)
.await
}
@@ -253,7 +285,7 @@ pub async fn get_file_for_owner(
) -> Result<Option<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
- SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
+ SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
WHERE id = $1 AND owner_id = $2
"#,
@@ -268,7 +300,7 @@ pub async fn get_file_for_owner(
pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
- SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
+ SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
FROM files
WHERE owner_id = $1
ORDER BY created_at DESC
@@ -279,6 +311,72 @@ pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<F
.await
}
+/// Database row type for file summary with contract info
+#[derive(Debug, sqlx::FromRow)]
+struct FileSummaryRow {
+ id: Uuid,
+ contract_id: Option<Uuid>,
+ contract_name: Option<String>,
+ contract_phase: Option<String>,
+ name: String,
+ description: Option<String>,
+ #[sqlx(json)]
+ transcript: Vec<crate::db::models::TranscriptEntry>,
+ version: i32,
+ repo_file_path: Option<String>,
+ repo_sync_status: Option<String>,
+ created_at: chrono::DateTime<chrono::Utc>,
+ updated_at: chrono::DateTime<chrono::Utc>,
+}
+
+/// List file summaries for an owner with contract info (joined).
+pub async fn list_file_summaries_for_owner(
+ pool: &PgPool,
+ owner_id: Uuid,
+) -> Result<Vec<FileSummary>, sqlx::Error> {
+ let rows = sqlx::query_as::<_, FileSummaryRow>(
+ r#"
+ SELECT
+ f.id, f.contract_id, c.name as contract_name, f.contract_phase,
+ f.name, f.description, f.transcript, f.version,
+ f.repo_file_path, f.repo_sync_status, f.created_at, f.updated_at
+ FROM files f
+ LEFT JOIN contracts c ON f.contract_id = c.id
+ WHERE f.owner_id = $1
+ ORDER BY f.created_at DESC
+ "#,
+ )
+ .bind(owner_id)
+ .fetch_all(pool)
+ .await?;
+
+ Ok(rows
+ .into_iter()
+ .map(|row| {
+ let duration = row
+ .transcript
+ .iter()
+ .map(|t| t.end)
+ .fold(0.0_f32, f32::max);
+ FileSummary {
+ id: row.id,
+ contract_id: row.contract_id,
+ contract_name: row.contract_name,
+ contract_phase: row.contract_phase,
+ name: row.name,
+ description: row.description,
+ transcript_count: row.transcript.len(),
+ duration: if duration > 0.0 { Some(duration) } else { None },
+ version: row.version,
+ repo_file_path: row.repo_file_path,
+ repo_sync_status: row.repo_sync_status,
+ created_at: row.created_at,
+ updated_at: row.updated_at,
+ }
+ })
+ .collect())
+}
+
/// Update a file by ID with optimistic locking, scoped to owner.
pub async fn update_file_for_owner(
pool: &PgPool,
@@ -318,7 +416,7 @@ pub async fn update_file_for_owner(
UPDATE files
SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
WHERE id = $1 AND owner_id = $2 AND version = $8
- RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
+ RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(id)
@@ -338,7 +436,7 @@ pub async fn update_file_for_owner(
UPDATE files
SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
WHERE id = $1 AND owner_id = $2
- RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
+ RETURNING id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
"#,
)
.bind(id)
@@ -511,6 +609,7 @@ pub async fn restore_file_version(
summary: target.summary,
body: Some(target.body),
version: Some(current_version),
+ repo_file_path: None,
};
update_file(pool, file_id, update_req).await
@@ -540,26 +639,22 @@ pub async fn count_file_versions(pool: &PgPool, file_id: Uuid) -> Result<i64, sq
/// to max 1 (2 levels: orchestrator at depth 0, subtasks at depth 1).
///
/// NOTE: completion_action is NOT inherited - subtasks should not auto-merge unless
-/// explicitly configured. The orchestrator controls when completion steps happen.
+/// explicitly configured. The supervisor controls when completion steps happen.
+///
+/// Task spawning is now controlled by supervisors at the application level.
+/// Depth is no longer constrained in the database.
pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, sqlx::Error> {
// Calculate depth and inherit settings from parent if applicable
- let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
+ let (depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
if let Some(parent_id) = req.parent_task_id {
- // Fetch parent task to get depth and inherit repo settings
+ // Fetch parent task to get depth and inherit settings
let parent = get_task(pool, parent_id).await?
.ok_or_else(|| sqlx::Error::RowNotFound)?;
let new_depth = parent.depth + 1;
- // Validate max depth (must be < 2, i.e., 0 or 1 only)
- // Orchestrators are at depth 0, subtasks at depth 1
- // Subtasks cannot have their own children
- if new_depth >= 2 {
- return Err(sqlx::Error::Protocol(format!(
- "Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.",
- new_depth
- )));
- }
+ // Subtasks inherit contract_id from parent
+ let contract_id = parent.contract_id.unwrap_or(req.contract_id);
// Inherit repo settings if not provided
let repo_url = req.repository_url.clone().or(parent.repository_url);
@@ -568,14 +663,15 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task,
let merge_mode = req.merge_mode.clone().or(parent.merge_mode);
let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path);
// NOTE: completion_action is NOT inherited - subtasks should not auto-merge.
- // The orchestrator integrates subtask work from their worktrees.
+ // The supervisor integrates subtask work from their worktrees.
let completion_action = req.completion_action.clone();
- (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
+ (new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
} else {
- // Top-level task: depth 0
+ // Top-level task: depth 0, use contract_id from request
(
0,
+ req.contract_id,
req.repository_url.clone(),
req.base_branch.clone(),
req.target_branch.clone(),
@@ -590,20 +686,22 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task,
sqlx::query_as::<_, Task>(
r#"
INSERT INTO tasks (
- parent_task_id, depth, name, description, plan, priority,
- repository_url, base_branch, target_branch, merge_mode,
+ contract_id, parent_task_id, depth, name, description, plan, priority,
+ is_supervisor, repository_url, base_branch, target_branch, merge_mode,
target_repo_path, completion_action, continue_from_task_id, copy_files
)
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
RETURNING *
"#,
)
+ .bind(contract_id)
.bind(req.parent_task_id)
.bind(depth)
.bind(&req.name)
.bind(&req.description)
.bind(&req.plan)
.bind(req.priority)
+ .bind(req.is_supervisor)
.bind(&repo_url)
.bind(&base_branch)
.bind(&target_branch)
@@ -635,10 +733,13 @@ pub async fn list_tasks(pool: &PgPool) -> Result<Vec<TaskSummary>, sqlx::Error>
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority,
- t.progress_summary, t.version, t.created_at, t.updated_at,
- (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count
+ t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
+ t.parent_task_id, t.depth, t.name, t.status, t.priority,
+ t.progress_summary,
+ (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
+ t.version, t.is_supervisor, t.created_at, t.updated_at
FROM tasks t
+ LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id IS NULL
ORDER BY t.priority DESC, t.created_at DESC
"#,
@@ -652,10 +753,13 @@ pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSum
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority,
- t.progress_summary, t.version, t.created_at, t.updated_at,
- (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count
+ t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
+ t.parent_task_id, t.depth, t.name, t.status, t.priority,
+ t.progress_summary,
+ (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
+ t.version, t.is_supervisor, t.created_at, t.updated_at
FROM tasks t
+ LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id = $1
ORDER BY t.priority DESC, t.created_at DESC
"#,
@@ -665,6 +769,25 @@ pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSum
.await
}
+/// List all tasks in a contract (for supervisor tree view).
+pub async fn list_tasks_by_contract(
+ pool: &PgPool,
+ contract_id: Uuid,
+ owner_id: Uuid,
+) -> Result<Vec<Task>, sqlx::Error> {
+ sqlx::query_as::<_, Task>(
+ r#"
+ SELECT * FROM tasks
+ WHERE contract_id = $1 AND owner_id = $2
+ ORDER BY is_supervisor DESC, depth ASC, created_at ASC
+ "#,
+ )
+ .bind(contract_id)
+ .bind(owner_id)
+ .fetch_all(pool)
+ .await
+}
+
/// Update a task by ID with optimistic locking.
pub async fn update_task(
pool: &PgPool,
@@ -817,9 +940,9 @@ pub async fn create_task_for_owner(
req: CreateTaskRequest,
) -> Result<Task, sqlx::Error> {
// Calculate depth and inherit settings from parent if applicable
- let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
+ let (depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
if let Some(parent_id) = req.parent_task_id {
- // Fetch parent task to get depth and inherit repo settings (must belong to same owner)
+ // Fetch parent task to get depth and inherit settings (must belong to same owner)
let parent = get_task_for_owner(pool, parent_id, owner_id).await?
.ok_or_else(|| sqlx::Error::RowNotFound)?;
@@ -833,6 +956,9 @@ pub async fn create_task_for_owner(
)));
}
+ // Subtasks inherit contract_id from parent
+ let contract_id = parent.contract_id.unwrap_or(req.contract_id);
+
// Inherit repo settings if not provided
let repo_url = req.repository_url.clone().or(parent.repository_url);
let base_branch = req.base_branch.clone().or(parent.base_branch);
@@ -843,11 +969,12 @@ pub async fn create_task_for_owner(
// The orchestrator integrates subtask work from their worktrees.
let completion_action = req.completion_action.clone();
- (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
+ (new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
} else {
- // Top-level task: depth 0
+ // Top-level task: depth 0, use contract_id from request
(
0,
+ req.contract_id,
req.repository_url.clone(),
req.base_branch.clone(),
req.target_branch.clone(),
@@ -862,21 +989,23 @@ pub async fn create_task_for_owner(
sqlx::query_as::<_, Task>(
r#"
INSERT INTO tasks (
- owner_id, parent_task_id, depth, name, description, plan, priority,
- repository_url, base_branch, target_branch, merge_mode,
+ owner_id, contract_id, parent_task_id, depth, name, description, plan, priority,
+ is_supervisor, repository_url, base_branch, target_branch, merge_mode,
target_repo_path, completion_action, continue_from_task_id, copy_files
)
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)
RETURNING *
"#,
)
.bind(owner_id)
+ .bind(contract_id)
.bind(req.parent_task_id)
.bind(depth)
.bind(&req.name)
.bind(&req.description)
.bind(&req.plan)
.bind(req.priority)
+ .bind(req.is_supervisor)
.bind(&repo_url)
.bind(&base_branch)
.bind(&target_branch)
@@ -916,10 +1045,13 @@ pub async fn list_tasks_for_owner(
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority,
- t.progress_summary, t.version, t.created_at, t.updated_at,
- (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count
+ t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
+ t.parent_task_id, t.depth, t.name, t.status, t.priority,
+ t.progress_summary,
+ (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
+ t.version, t.is_supervisor, t.created_at, t.updated_at
FROM tasks t
+ LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.owner_id = $1 AND t.parent_task_id IS NULL
ORDER BY t.priority DESC, t.created_at DESC
"#,
@@ -938,10 +1070,13 @@ pub async fn list_subtasks_for_owner(
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority,
- t.progress_summary, t.version, t.created_at, t.updated_at,
- (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count
+ t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
+ t.parent_task_id, t.depth, t.name, t.status, t.priority,
+ t.progress_summary,
+ (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
+ t.version, t.is_supervisor, t.created_at, t.updated_at
FROM tasks t
+ LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.owner_id = $1 AND t.parent_task_id = $2
ORDER BY t.priority DESC, t.created_at DESC
"#,
@@ -986,6 +1121,7 @@ pub async fn update_task_for_owner(
let error_message = req.error_message.or(existing.error_message);
let merge_mode = req.merge_mode.or(existing.merge_mode);
let pr_url = req.pr_url.or(existing.pr_url);
+ let repository_url = req.repository_url.or(existing.repository_url);
let target_repo_path = req.target_repo_path.or(existing.target_repo_path);
let completion_action = req.completion_action.or(existing.completion_action);
let daemon_id = if req.clear_daemon_id {
@@ -1002,8 +1138,9 @@ pub async fn update_task_for_owner(
SET name = $3, description = $4, plan = $5, status = $6, priority = $7,
progress_summary = $8, last_output = $9, error_message = $10,
merge_mode = $11, pr_url = $12, daemon_id = $13,
- target_repo_path = $14, completion_action = $15, updated_at = NOW()
- WHERE id = $1 AND owner_id = $2 AND version = $16
+ target_repo_path = $14, completion_action = $15, repository_url = $16,
+ updated_at = NOW()
+ WHERE id = $1 AND owner_id = $2 AND version = $17
RETURNING *
"#,
)
@@ -1022,6 +1159,7 @@ pub async fn update_task_for_owner(
.bind(daemon_id)
.bind(&target_repo_path)
.bind(&completion_action)
+ .bind(&repository_url)
.bind(req.version.unwrap())
.fetch_optional(pool)
.await?
@@ -1032,7 +1170,8 @@ pub async fn update_task_for_owner(
SET name = $3, description = $4, plan = $5, status = $6, priority = $7,
progress_summary = $8, last_output = $9, error_message = $10,
merge_mode = $11, pr_url = $12, daemon_id = $13,
- target_repo_path = $14, completion_action = $15, updated_at = NOW()
+ target_repo_path = $14, completion_action = $15, repository_url = $16,
+ updated_at = NOW()
WHERE id = $1 AND owner_id = $2
RETURNING *
"#,
@@ -1052,6 +1191,7 @@ pub async fn update_task_for_owner(
.bind(daemon_id)
.bind(&target_repo_path)
.bind(&completion_action)
+ .bind(&repository_url)
.fetch_optional(pool)
.await?
};
@@ -1328,6 +1468,26 @@ pub async fn update_daemon_status(
Ok(result.rows_affected() > 0)
}
+/// Mark daemon as disconnected by connection_id.
+pub async fn disconnect_daemon_by_connection(
+ pool: &PgPool,
+ connection_id: &str,
+) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ UPDATE daemons
+ SET status = 'disconnected',
+ disconnected_at = NOW()
+ WHERE connection_id = $1
+ "#,
+ )
+ .bind(connection_id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
/// Update daemon task count.
pub async fn update_daemon_task_count(
pool: &PgPool,
@@ -1393,6 +1553,25 @@ pub async fn count_daemons(pool: &PgPool) -> Result<i64, sqlx::Error> {
Ok(result.0)
}
+/// Delete stale daemons that haven't sent a heartbeat within the timeout.
+/// Returns the number of deleted daemons.
+pub async fn delete_stale_daemons(
+ pool: &PgPool,
+ timeout_seconds: i64,
+) -> Result<u64, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ DELETE FROM daemons
+ WHERE last_heartbeat_at < NOW() - INTERVAL '1 second' * $1
+ "#,
+ )
+ .bind(timeout_seconds)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected())
+}
+
// =============================================================================
// Sibling Awareness Functions
// =============================================================================
@@ -1408,10 +1587,13 @@ pub async fn list_sibling_tasks(
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority,
- t.progress_summary, t.version, t.created_at, t.updated_at,
- (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count
+ t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
+ t.parent_task_id, t.depth, t.name, t.status, t.priority,
+ t.progress_summary,
+ (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
+ t.version, t.is_supervisor, t.created_at, t.updated_at
FROM tasks t
+ LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id = $1 AND t.id != $2
ORDER BY t.priority DESC, t.created_at DESC
"#,
@@ -1426,10 +1608,13 @@ pub async fn list_sibling_tasks(
sqlx::query_as::<_, TaskSummary>(
r#"
SELECT
- t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority,
- t.progress_summary, t.version, t.created_at, t.updated_at,
- (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count
+ t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
+ t.parent_task_id, t.depth, t.name, t.status, t.priority,
+ t.progress_summary,
+ (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
+ t.version, t.is_supervisor, t.created_at, t.updated_at
FROM tasks t
+ LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id IS NULL AND t.id != $1
ORDER BY t.priority DESC, t.created_at DESC
"#,
@@ -1710,3 +1895,1092 @@ pub async fn clear_conversation(pool: &PgPool, owner_id: Uuid) -> Result<MeshCha
// Create new active conversation
get_or_create_active_conversation(pool, owner_id).await
}
+
+// =============================================================================
+// Contract Chat History Functions
+// =============================================================================
+
+/// Get or create the active conversation for a contract.
+pub async fn get_or_create_contract_conversation(
+ pool: &PgPool,
+ contract_id: Uuid,
+ owner_id: Uuid,
+) -> Result<ContractChatConversation, sqlx::Error> {
+ // Try to get existing active conversation for this contract
+ let existing = sqlx::query_as::<_, ContractChatConversation>(
+ r#"
+ SELECT *
+ FROM contract_chat_conversations
+ WHERE is_active = true AND contract_id = $1 AND owner_id = $2
+ LIMIT 1
+ "#,
+ )
+ .bind(contract_id)
+ .bind(owner_id)
+ .fetch_optional(pool)
+ .await?;
+
+ if let Some(conv) = existing {
+ return Ok(conv);
+ }
+
+ // Create new conversation
+ sqlx::query_as::<_, ContractChatConversation>(
+ r#"
+ INSERT INTO contract_chat_conversations (contract_id, owner_id, is_active)
+ VALUES ($1, $2, true)
+ RETURNING *
+ "#,
+ )
+ .bind(contract_id)
+ .bind(owner_id)
+ .fetch_one(pool)
+ .await
+}
+
+/// List messages for a contract conversation.
+pub async fn list_contract_chat_messages(
+ pool: &PgPool,
+ conversation_id: Uuid,
+ limit: Option<i32>,
+) -> Result<Vec<ContractChatMessageRecord>, sqlx::Error> {
+ let limit = limit.unwrap_or(100);
+ sqlx::query_as::<_, ContractChatMessageRecord>(
+ r#"
+ SELECT *
+ FROM contract_chat_messages
+ WHERE conversation_id = $1
+ ORDER BY created_at ASC
+ LIMIT $2
+ "#,
+ )
+ .bind(conversation_id)
+ .bind(limit)
+ .fetch_all(pool)
+ .await
+}
+
+/// Add a message to a contract conversation.
+pub async fn add_contract_chat_message(
+ pool: &PgPool,
+ conversation_id: Uuid,
+ role: &str,
+ content: &str,
+ tool_calls: Option<serde_json::Value>,
+ pending_questions: Option<serde_json::Value>,
+) -> Result<ContractChatMessageRecord, sqlx::Error> {
+ sqlx::query_as::<_, ContractChatMessageRecord>(
+ r#"
+ INSERT INTO contract_chat_messages
+ (conversation_id, role, content, tool_calls, pending_questions)
+ VALUES ($1, $2, $3, $4, $5)
+ RETURNING *
+ "#,
+ )
+ .bind(conversation_id)
+ .bind(role)
+ .bind(content)
+ .bind(tool_calls)
+ .bind(pending_questions)
+ .fetch_one(pool)
+ .await
+}
+
+/// Clear contract conversation (archive existing and create new).
+pub async fn clear_contract_conversation(
+ pool: &PgPool,
+ contract_id: Uuid,
+ owner_id: Uuid,
+) -> Result<ContractChatConversation, sqlx::Error> {
+ // Mark existing as inactive for this contract
+ sqlx::query(
+ r#"
+ UPDATE contract_chat_conversations
+ SET is_active = false, updated_at = NOW()
+ WHERE is_active = true AND contract_id = $1 AND owner_id = $2
+ "#,
+ )
+ .bind(contract_id)
+ .bind(owner_id)
+ .execute(pool)
+ .await?;
+
+ // Create new active conversation
+ get_or_create_contract_conversation(pool, contract_id, owner_id).await
+}
+
+// =============================================================================
+// Contract Functions (Owner-Scoped)
+// =============================================================================
+
+/// Create a new contract for a specific owner.
+pub async fn create_contract_for_owner(
+ pool: &PgPool,
+ owner_id: Uuid,
+ req: CreateContractRequest,
+) -> Result<Contract, sqlx::Error> {
+ // Use provided initial_phase or default to "research"
+ let phase = req.initial_phase.as_deref().unwrap_or("research");
+
+ // Validate the phase
+ let valid_phases = ["research", "specify", "plan", "execute", "review"];
+ if !valid_phases.contains(&phase) {
+ return Err(sqlx::Error::Protocol(format!(
+ "Invalid initial_phase '{}'. Must be one of: {}",
+ phase,
+ valid_phases.join(", ")
+ )));
+ }
+
+ sqlx::query_as::<_, Contract>(
+ r#"
+ INSERT INTO contracts (owner_id, name, description, phase)
+ VALUES ($1, $2, $3, $4)
+ RETURNING *
+ "#,
+ )
+ .bind(owner_id)
+ .bind(&req.name)
+ .bind(&req.description)
+ .bind(phase)
+ .fetch_one(pool)
+ .await
+}
+
+/// Get a contract by ID, scoped to owner.
+pub async fn get_contract_for_owner(
+ pool: &PgPool,
+ id: Uuid,
+ owner_id: Uuid,
+) -> Result<Option<Contract>, sqlx::Error> {
+ sqlx::query_as::<_, Contract>(
+ r#"
+ SELECT *
+ FROM contracts
+ WHERE id = $1 AND owner_id = $2
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// List all contracts for an owner, ordered by created_at DESC.
+pub async fn list_contracts_for_owner(
+ pool: &PgPool,
+ owner_id: Uuid,
+) -> Result<Vec<ContractSummary>, sqlx::Error> {
+ sqlx::query_as::<_, ContractSummary>(
+ r#"
+ SELECT
+ c.id, c.name, c.description, c.phase, c.status,
+ c.version, c.created_at,
+ (SELECT COUNT(*) FROM files WHERE contract_id = c.id) as file_count,
+ (SELECT COUNT(*) FROM tasks WHERE contract_id = c.id) as task_count,
+ (SELECT COUNT(*) FROM contract_repositories WHERE contract_id = c.id) as repository_count
+ FROM contracts c
+ WHERE c.owner_id = $1
+ ORDER BY c.created_at DESC
+ "#,
+ )
+ .bind(owner_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Get contract summary by ID.
+pub async fn get_contract_summary_for_owner(
+ pool: &PgPool,
+ id: Uuid,
+ owner_id: Uuid,
+) -> Result<Option<ContractSummary>, sqlx::Error> {
+ sqlx::query_as::<_, ContractSummary>(
+ r#"
+ SELECT
+ c.id, c.name, c.description, c.phase, c.status,
+ c.version, c.created_at,
+ (SELECT COUNT(*) FROM files WHERE contract_id = c.id) as file_count,
+ (SELECT COUNT(*) FROM tasks WHERE contract_id = c.id) as task_count,
+ (SELECT COUNT(*) FROM contract_repositories WHERE contract_id = c.id) as repository_count
+ FROM contracts c
+ WHERE c.id = $1 AND c.owner_id = $2
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Update a contract by ID with optimistic locking, scoped to owner.
+pub async fn update_contract_for_owner(
+ pool: &PgPool,
+ id: Uuid,
+ owner_id: Uuid,
+ req: UpdateContractRequest,
+) -> Result<Option<Contract>, RepositoryError> {
+ let existing = get_contract_for_owner(pool, id, owner_id).await?;
+ let Some(existing) = existing else {
+ return Ok(None);
+ };
+
+ // Check version if provided (optimistic locking)
+ if let Some(expected_version) = req.version {
+ if existing.version != expected_version {
+ return Err(RepositoryError::VersionConflict {
+ expected: expected_version,
+ actual: existing.version,
+ });
+ }
+ }
+
+ // Apply updates
+ let name = req.name.unwrap_or(existing.name);
+ let description = req.description.or(existing.description);
+ let phase = req.phase.unwrap_or(existing.phase);
+ let status = req.status.unwrap_or(existing.status);
+ let supervisor_task_id = req.supervisor_task_id.or(existing.supervisor_task_id);
+
+ let result = if req.version.is_some() {
+ sqlx::query_as::<_, Contract>(
+ r#"
+ UPDATE contracts
+ SET name = $3, description = $4, phase = $5, status = $6,
+ supervisor_task_id = $7, version = version + 1, updated_at = NOW()
+ WHERE id = $1 AND owner_id = $2 AND version = $8
+ RETURNING *
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .bind(&name)
+ .bind(&description)
+ .bind(&phase)
+ .bind(&status)
+ .bind(supervisor_task_id)
+ .bind(req.version.unwrap())
+ .fetch_optional(pool)
+ .await?
+ } else {
+ sqlx::query_as::<_, Contract>(
+ r#"
+ UPDATE contracts
+ SET name = $3, description = $4, phase = $5, status = $6,
+ supervisor_task_id = $7, version = version + 1, updated_at = NOW()
+ WHERE id = $1 AND owner_id = $2
+ RETURNING *
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .bind(&name)
+ .bind(&description)
+ .bind(&phase)
+ .bind(&status)
+ .bind(supervisor_task_id)
+ .fetch_optional(pool)
+ .await?
+ };
+
+ // If versioned update returned None, there was a race condition
+ if result.is_none() && req.version.is_some() {
+ if let Some(current) = get_contract_for_owner(pool, id, owner_id).await? {
+ return Err(RepositoryError::VersionConflict {
+ expected: req.version.unwrap(),
+ actual: current.version,
+ });
+ }
+ }
+
+ Ok(result)
+}
+
+/// Delete a contract by ID, scoped to owner.
+pub async fn delete_contract_for_owner(
+ pool: &PgPool,
+ id: Uuid,
+ owner_id: Uuid,
+) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ DELETE FROM contracts
+ WHERE id = $1 AND owner_id = $2
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// Change contract phase and record event.
+pub async fn change_contract_phase_for_owner(
+ pool: &PgPool,
+ id: Uuid,
+ owner_id: Uuid,
+ new_phase: &str,
+) -> Result<Option<Contract>, sqlx::Error> {
+ // Get current phase
+ let existing = get_contract_for_owner(pool, id, owner_id).await?;
+ let Some(existing) = existing else {
+ return Ok(None);
+ };
+
+ let previous_phase = existing.phase.clone();
+
+ // Update phase
+ let contract = sqlx::query_as::<_, Contract>(
+ r#"
+ UPDATE contracts
+ SET phase = $3, version = version + 1, updated_at = NOW()
+ WHERE id = $1 AND owner_id = $2
+ RETURNING *
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .bind(new_phase)
+ .fetch_optional(pool)
+ .await?;
+
+ // Record event
+ if contract.is_some() {
+ sqlx::query(
+ r#"
+ INSERT INTO contract_events (contract_id, event_type, previous_phase, new_phase)
+ VALUES ($1, 'phase_change', $2, $3)
+ "#,
+ )
+ .bind(id)
+ .bind(&previous_phase)
+ .bind(new_phase)
+ .execute(pool)
+ .await?;
+ }
+
+ Ok(contract)
+}
+
+// =============================================================================
+// Contract Repository Functions
+// =============================================================================
+
+/// List repositories for a contract.
+pub async fn list_contract_repositories(
+ pool: &PgPool,
+ contract_id: Uuid,
+) -> Result<Vec<ContractRepository>, sqlx::Error> {
+ sqlx::query_as::<_, ContractRepository>(
+ r#"
+ SELECT *
+ FROM contract_repositories
+ WHERE contract_id = $1
+ ORDER BY is_primary DESC, created_at ASC
+ "#,
+ )
+ .bind(contract_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Add a remote repository to a contract.
+pub async fn add_remote_repository(
+ pool: &PgPool,
+ contract_id: Uuid,
+ name: &str,
+ repository_url: &str,
+ is_primary: bool,
+) -> Result<ContractRepository, sqlx::Error> {
+ // If is_primary, clear other primaries first
+ if is_primary {
+ sqlx::query(
+ r#"
+ UPDATE contract_repositories
+ SET is_primary = false, updated_at = NOW()
+ WHERE contract_id = $1 AND is_primary = true
+ "#,
+ )
+ .bind(contract_id)
+ .execute(pool)
+ .await?;
+ }
+
+ sqlx::query_as::<_, ContractRepository>(
+ r#"
+ INSERT INTO contract_repositories (contract_id, name, repository_url, source_type, status, is_primary)
+ VALUES ($1, $2, $3, 'remote', 'ready', $4)
+ RETURNING *
+ "#,
+ )
+ .bind(contract_id)
+ .bind(name)
+ .bind(repository_url)
+ .bind(is_primary)
+ .fetch_one(pool)
+ .await
+}
+
+/// Add a local repository to a contract.
+pub async fn add_local_repository(
+ pool: &PgPool,
+ contract_id: Uuid,
+ name: &str,
+ local_path: &str,
+ is_primary: bool,
+) -> Result<ContractRepository, sqlx::Error> {
+ // If is_primary, clear other primaries first
+ if is_primary {
+ sqlx::query(
+ r#"
+ UPDATE contract_repositories
+ SET is_primary = false, updated_at = NOW()
+ WHERE contract_id = $1 AND is_primary = true
+ "#,
+ )
+ .bind(contract_id)
+ .execute(pool)
+ .await?;
+ }
+
+ sqlx::query_as::<_, ContractRepository>(
+ r#"
+ INSERT INTO contract_repositories (contract_id, name, local_path, source_type, status, is_primary)
+ VALUES ($1, $2, $3, 'local', 'ready', $4)
+ RETURNING *
+ "#,
+ )
+ .bind(contract_id)
+ .bind(name)
+ .bind(local_path)
+ .bind(is_primary)
+ .fetch_one(pool)
+ .await
+}
+
+/// Create a managed repository (daemon will create it).
+pub async fn create_managed_repository(
+ pool: &PgPool,
+ contract_id: Uuid,
+ name: &str,
+ is_primary: bool,
+) -> Result<ContractRepository, sqlx::Error> {
+ // If is_primary, clear other primaries first
+ if is_primary {
+ sqlx::query(
+ r#"
+ UPDATE contract_repositories
+ SET is_primary = false, updated_at = NOW()
+ WHERE contract_id = $1 AND is_primary = true
+ "#,
+ )
+ .bind(contract_id)
+ .execute(pool)
+ .await?;
+ }
+
+ sqlx::query_as::<_, ContractRepository>(
+ r#"
+ INSERT INTO contract_repositories (contract_id, name, source_type, status, is_primary)
+ VALUES ($1, $2, 'managed', 'pending', $3)
+ RETURNING *
+ "#,
+ )
+ .bind(contract_id)
+ .bind(name)
+ .bind(is_primary)
+ .fetch_one(pool)
+ .await
+}
+
+/// Delete a repository from a contract.
+pub async fn delete_contract_repository(
+ pool: &PgPool,
+ repo_id: Uuid,
+ contract_id: Uuid,
+) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ DELETE FROM contract_repositories
+ WHERE id = $1 AND contract_id = $2
+ "#,
+ )
+ .bind(repo_id)
+ .bind(contract_id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// Set a repository as primary (and clear others).
+pub async fn set_repository_primary(
+ pool: &PgPool,
+ repo_id: Uuid,
+ contract_id: Uuid,
+) -> Result<bool, sqlx::Error> {
+ // Clear other primaries
+ sqlx::query(
+ r#"
+ UPDATE contract_repositories
+ SET is_primary = false, updated_at = NOW()
+ WHERE contract_id = $1 AND is_primary = true
+ "#,
+ )
+ .bind(contract_id)
+ .execute(pool)
+ .await?;
+
+ // Set this one as primary
+ let result = sqlx::query(
+ r#"
+ UPDATE contract_repositories
+ SET is_primary = true, updated_at = NOW()
+ WHERE id = $1 AND contract_id = $2
+ "#,
+ )
+ .bind(repo_id)
+ .bind(contract_id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// Update managed repository status (used by daemon).
+pub async fn update_managed_repository_status(
+ pool: &PgPool,
+ repo_id: Uuid,
+ status: &str,
+ repository_url: Option<&str>,
+) -> Result<Option<ContractRepository>, sqlx::Error> {
+ sqlx::query_as::<_, ContractRepository>(
+ r#"
+ UPDATE contract_repositories
+ SET status = $2, repository_url = COALESCE($3, repository_url), updated_at = NOW()
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(repo_id)
+ .bind(status)
+ .bind(repository_url)
+ .fetch_optional(pool)
+ .await
+}
+
+// =============================================================================
+// Contract Task Association Functions
+// =============================================================================
+
+/// Add a task to a contract.
+pub async fn add_task_to_contract(
+ pool: &PgPool,
+ contract_id: Uuid,
+ task_id: Uuid,
+ owner_id: Uuid,
+) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ UPDATE tasks
+ SET contract_id = $2, updated_at = NOW()
+ WHERE id = $1 AND owner_id = $3
+ "#,
+ )
+ .bind(task_id)
+ .bind(contract_id)
+ .bind(owner_id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// Remove a task from a contract.
+pub async fn remove_task_from_contract(
+ pool: &PgPool,
+ contract_id: Uuid,
+ task_id: Uuid,
+ owner_id: Uuid,
+) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ UPDATE tasks
+ SET contract_id = NULL, updated_at = NOW()
+ WHERE id = $1 AND contract_id = $2 AND owner_id = $3
+ "#,
+ )
+ .bind(task_id)
+ .bind(contract_id)
+ .bind(owner_id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// List files in a contract.
+pub async fn list_files_in_contract(
+ pool: &PgPool,
+ contract_id: Uuid,
+ owner_id: Uuid,
+) -> Result<Vec<FileSummary>, sqlx::Error> {
+ // Use a manual query since FileSummary doesn't have a FromRow derive with all the computed fields
+ let files = sqlx::query_as::<_, File>(
+ r#"
+ SELECT id, owner_id, contract_id, contract_phase, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
+ FROM files
+ WHERE contract_id = $1 AND owner_id = $2
+ ORDER BY created_at DESC
+ "#,
+ )
+ .bind(contract_id)
+ .bind(owner_id)
+ .fetch_all(pool)
+ .await?;
+
+ Ok(files.into_iter().map(FileSummary::from).collect())
+}
+
+/// List tasks in a contract.
+pub async fn list_tasks_in_contract(
+ pool: &PgPool,
+ contract_id: Uuid,
+ owner_id: Uuid,
+) -> Result<Vec<TaskSummary>, sqlx::Error> {
+ sqlx::query_as::<_, TaskSummary>(
+ r#"
+ SELECT
+ t.id, t.contract_id, c.name as contract_name, c.phase as contract_phase,
+ t.parent_task_id, t.depth, t.name, t.status, t.priority,
+ t.progress_summary,
+ (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
+ t.version, t.is_supervisor, t.created_at, t.updated_at
+ FROM tasks t
+ LEFT JOIN contracts c ON t.contract_id = c.id
+ WHERE t.contract_id = $1 AND t.owner_id = $2
+ ORDER BY t.priority DESC, t.created_at DESC
+ "#,
+ )
+ .bind(contract_id)
+ .bind(owner_id)
+ .fetch_all(pool)
+ .await
+}
+
+// =============================================================================
+// Contract Events
+// =============================================================================
+
+/// List events for a contract.
+pub async fn list_contract_events(
+ pool: &PgPool,
+ contract_id: Uuid,
+) -> Result<Vec<ContractEvent>, sqlx::Error> {
+ sqlx::query_as::<_, ContractEvent>(
+ r#"
+ SELECT *
+ FROM contract_events
+ WHERE contract_id = $1
+ ORDER BY created_at DESC
+ "#,
+ )
+ .bind(contract_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Record a contract event.
+pub async fn record_contract_event(
+ pool: &PgPool,
+ contract_id: Uuid,
+ event_type: &str,
+ event_data: Option<serde_json::Value>,
+) -> Result<ContractEvent, sqlx::Error> {
+ sqlx::query_as::<_, ContractEvent>(
+ r#"
+ INSERT INTO contract_events (contract_id, event_type, event_data)
+ VALUES ($1, $2, $3)
+ RETURNING *
+ "#,
+ )
+ .bind(contract_id)
+ .bind(event_type)
+ .bind(event_data)
+ .fetch_one(pool)
+ .await
+}
+
+// ============================================================================
+// Task Checkpoints
+// ============================================================================
+
+/// Create a checkpoint for a task.
+pub async fn create_task_checkpoint(
+ pool: &PgPool,
+ task_id: Uuid,
+ commit_sha: &str,
+ branch_name: &str,
+ message: &str,
+ files_changed: Option<serde_json::Value>,
+ lines_added: Option<i32>,
+ lines_removed: Option<i32>,
+) -> Result<TaskCheckpoint, sqlx::Error> {
+ // Get current checkpoint count and increment
+ let checkpoint_number: i32 = sqlx::query_scalar(
+ "SELECT COALESCE(MAX(checkpoint_number), 0) + 1 FROM task_checkpoints WHERE task_id = $1",
+ )
+ .bind(task_id)
+ .fetch_one(pool)
+ .await?;
+
+ // Update task's checkpoint tracking
+ sqlx::query(
+ r#"
+ UPDATE tasks
+ SET last_checkpoint_sha = $1,
+ checkpoint_count = $2,
+ checkpoint_message = $3,
+ updated_at = NOW()
+ WHERE id = $4
+ "#,
+ )
+ .bind(commit_sha)
+ .bind(checkpoint_number)
+ .bind(message)
+ .bind(task_id)
+ .execute(pool)
+ .await?;
+
+ sqlx::query_as::<_, TaskCheckpoint>(
+ r#"
+ INSERT INTO task_checkpoints (
+ task_id, checkpoint_number, commit_sha, branch_name, message,
+ files_changed, lines_added, lines_removed
+ )
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
+ RETURNING *
+ "#,
+ )
+ .bind(task_id)
+ .bind(checkpoint_number)
+ .bind(commit_sha)
+ .bind(branch_name)
+ .bind(message)
+ .bind(files_changed)
+ .bind(lines_added)
+ .bind(lines_removed)
+ .fetch_one(pool)
+ .await
+}
+
+/// Get a checkpoint by ID.
+pub async fn get_task_checkpoint(
+ pool: &PgPool,
+ id: Uuid,
+) -> Result<Option<TaskCheckpoint>, sqlx::Error> {
+ sqlx::query_as::<_, TaskCheckpoint>("SELECT * FROM task_checkpoints WHERE id = $1")
+ .bind(id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Get a checkpoint by commit SHA.
+pub async fn get_task_checkpoint_by_sha(
+ pool: &PgPool,
+ commit_sha: &str,
+) -> Result<Option<TaskCheckpoint>, sqlx::Error> {
+ sqlx::query_as::<_, TaskCheckpoint>("SELECT * FROM task_checkpoints WHERE commit_sha = $1")
+ .bind(commit_sha)
+ .fetch_optional(pool)
+ .await
+}
+
+/// List checkpoints for a task.
+pub async fn list_task_checkpoints(
+ pool: &PgPool,
+ task_id: Uuid,
+) -> Result<Vec<TaskCheckpoint>, sqlx::Error> {
+ sqlx::query_as::<_, TaskCheckpoint>(
+ "SELECT * FROM task_checkpoints WHERE task_id = $1 ORDER BY checkpoint_number DESC",
+ )
+ .bind(task_id)
+ .fetch_all(pool)
+ .await
+}
+
+// ============================================================================
+// Supervisor State
+// ============================================================================
+
+/// Create or update supervisor state for a contract.
+pub async fn upsert_supervisor_state(
+ pool: &PgPool,
+ contract_id: Uuid,
+ task_id: Uuid,
+ conversation_history: serde_json::Value,
+ pending_task_ids: &[Uuid],
+ phase: &str,
+) -> Result<SupervisorState, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorState>(
+ r#"
+ INSERT INTO supervisor_states (contract_id, task_id, conversation_history, pending_task_ids, phase, last_activity)
+ VALUES ($1, $2, $3, $4, $5, NOW())
+ ON CONFLICT (contract_id) DO UPDATE SET
+ task_id = EXCLUDED.task_id,
+ conversation_history = EXCLUDED.conversation_history,
+ pending_task_ids = EXCLUDED.pending_task_ids,
+ phase = EXCLUDED.phase,
+ last_activity = NOW(),
+ updated_at = NOW()
+ RETURNING *
+ "#,
+ )
+ .bind(contract_id)
+ .bind(task_id)
+ .bind(conversation_history)
+ .bind(pending_task_ids)
+ .bind(phase)
+ .fetch_one(pool)
+ .await
+}
+
+/// Get supervisor state for a contract.
+pub async fn get_supervisor_state(
+ pool: &PgPool,
+ contract_id: Uuid,
+) -> Result<Option<SupervisorState>, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorState>("SELECT * FROM supervisor_states WHERE contract_id = $1")
+ .bind(contract_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Get supervisor state by task ID.
+pub async fn get_supervisor_state_by_task(
+ pool: &PgPool,
+ task_id: Uuid,
+) -> Result<Option<SupervisorState>, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorState>("SELECT * FROM supervisor_states WHERE task_id = $1")
+ .bind(task_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Update supervisor conversation history.
+pub async fn update_supervisor_conversation(
+ pool: &PgPool,
+ contract_id: Uuid,
+ conversation_history: serde_json::Value,
+) -> Result<SupervisorState, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorState>(
+ r#"
+ UPDATE supervisor_states
+ SET conversation_history = $1,
+ last_activity = NOW(),
+ updated_at = NOW()
+ WHERE contract_id = $2
+ RETURNING *
+ "#,
+ )
+ .bind(conversation_history)
+ .bind(contract_id)
+ .fetch_one(pool)
+ .await
+}
+
+/// Update supervisor pending tasks.
+pub async fn update_supervisor_pending_tasks(
+ pool: &PgPool,
+ contract_id: Uuid,
+ pending_task_ids: &[Uuid],
+) -> Result<SupervisorState, sqlx::Error> {
+ sqlx::query_as::<_, SupervisorState>(
+ r#"
+ UPDATE supervisor_states
+ SET pending_task_ids = $1,
+ last_activity = NOW(),
+ updated_at = NOW()
+ WHERE contract_id = $2
+ RETURNING *
+ "#,
+ )
+ .bind(pending_task_ids)
+ .bind(contract_id)
+ .fetch_one(pool)
+ .await
+}
+
+// ============================================================================
+// Contract Supervisor
+// ============================================================================
+
+/// Update contract's supervisor task ID.
+pub async fn update_contract_supervisor(
+ pool: &PgPool,
+ contract_id: Uuid,
+ supervisor_task_id: Uuid,
+) -> Result<Contract, sqlx::Error> {
+ sqlx::query_as::<_, Contract>(
+ r#"
+ UPDATE contracts
+ SET supervisor_task_id = $1,
+ updated_at = NOW()
+ WHERE id = $2
+ RETURNING *
+ "#,
+ )
+ .bind(supervisor_task_id)
+ .bind(contract_id)
+ .fetch_one(pool)
+ .await
+}
+
+/// Get the supervisor task for a contract.
+pub async fn get_contract_supervisor_task(
+ pool: &PgPool,
+ contract_id: Uuid,
+) -> Result<Option<Task>, sqlx::Error> {
+ sqlx::query_as::<_, Task>(
+ r#"
+ SELECT t.* FROM tasks t
+ JOIN contracts c ON c.supervisor_task_id = t.id
+ WHERE c.id = $1
+ "#,
+ )
+ .bind(contract_id)
+ .fetch_optional(pool)
+ .await
+}
+
+// ============================================================================
+// Task Tree Queries
+// ============================================================================
+
+/// Get full task tree for a contract.
+pub async fn get_contract_task_tree(
+ pool: &PgPool,
+ contract_id: Uuid,
+) -> Result<Vec<Task>, sqlx::Error> {
+ sqlx::query_as::<_, Task>(
+ r#"
+ WITH RECURSIVE task_tree AS (
+ -- Base case: root tasks (no parent)
+ SELECT * FROM tasks
+ WHERE contract_id = $1 AND parent_task_id IS NULL
+ UNION ALL
+ -- Recursive case: children of current level
+ SELECT t.* FROM tasks t
+ JOIN task_tree tt ON t.parent_task_id = tt.id
+ )
+ SELECT * FROM task_tree
+ ORDER BY depth, created_at
+ "#,
+ )
+ .bind(contract_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Get task tree from a specific root task.
+pub async fn get_task_tree(pool: &PgPool, root_task_id: Uuid) -> Result<Vec<Task>, sqlx::Error> {
+ sqlx::query_as::<_, Task>(
+ r#"
+ WITH RECURSIVE task_tree AS (
+ -- Base case: the root task
+ SELECT * FROM tasks WHERE id = $1
+ UNION ALL
+ -- Recursive case: children of current level
+ SELECT t.* FROM tasks t
+ JOIN task_tree tt ON t.parent_task_id = tt.id
+ )
+ SELECT * FROM task_tree
+ ORDER BY depth, created_at
+ "#,
+ )
+ .bind(root_task_id)
+ .fetch_all(pool)
+ .await
+}
+
+// ============================================================================
+// Daemon Selection
+// ============================================================================
+
+/// Get daemons with capacity info for selection.
+pub async fn get_available_daemons(
+ pool: &PgPool,
+ owner_id: Uuid,
+) -> Result<Vec<DaemonWithCapacity>, sqlx::Error> {
+ sqlx::query_as::<_, DaemonWithCapacity>(
+ r#"
+ SELECT id, owner_id, connection_id, hostname, machine_id,
+ max_concurrent_tasks, current_task_count,
+ capacity_score, task_queue_length, supports_migration,
+ status, last_heartbeat_at, connected_at
+ FROM daemons
+ WHERE owner_id = $1 AND status = 'connected'
+ ORDER BY
+ COALESCE(capacity_score, 100) DESC,
+ (max_concurrent_tasks - current_task_count) DESC,
+ COALESCE(task_queue_length, 0) ASC
+ "#,
+ )
+ .bind(owner_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Create a daemon task assignment.
+pub async fn create_daemon_task_assignment(
+ pool: &PgPool,
+ daemon_id: Uuid,
+ task_id: Uuid,
+) -> Result<DaemonTaskAssignment, sqlx::Error> {
+ sqlx::query_as::<_, DaemonTaskAssignment>(
+ r#"
+ INSERT INTO daemon_task_assignments (daemon_id, task_id)
+ VALUES ($1, $2)
+ RETURNING *
+ "#,
+ )
+ .bind(daemon_id)
+ .bind(task_id)
+ .fetch_one(pool)
+ .await
+}
+
+/// Update daemon task assignment status.
+pub async fn update_daemon_task_assignment_status(
+ pool: &PgPool,
+ task_id: Uuid,
+ status: &str,
+) -> Result<DaemonTaskAssignment, sqlx::Error> {
+ sqlx::query_as::<_, DaemonTaskAssignment>(
+ r#"
+ UPDATE daemon_task_assignments
+ SET status = $1
+ WHERE task_id = $2
+ RETURNING *
+ "#,
+ )
+ .bind(status)
+ .bind(task_id)
+ .fetch_one(pool)
+ .await
+}
+
+/// Get daemon task assignment for a task.
+pub async fn get_daemon_task_assignment(
+ pool: &PgPool,
+ task_id: Uuid,
+) -> Result<Option<DaemonTaskAssignment>, sqlx::Error> {
+ sqlx::query_as::<_, DaemonTaskAssignment>(
+ "SELECT * FROM daemon_task_assignments WHERE task_id = $1",
+ )
+ .bind(task_id)
+ .fetch_optional(pool)
+ .await
+}