summaryrefslogtreecommitdiff
path: root/makima/src/db
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/db')
-rw-r--r--makima/src/db/models.rs589
-rw-r--r--makima/src/db/repository.rs1393
2 files changed, 1959 insertions, 23 deletions
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs
index 617e590..5064b97 100644
--- a/makima/src/db/models.rs
+++ b/makima/src/db/models.rs
@@ -36,6 +36,16 @@ pub enum BodyElement {
Heading { level: u8, text: String },
/// Paragraph text
Paragraph { text: String },
+ /// Code block with optional language
+ Code {
+ language: Option<String>,
+ content: String,
+ },
+ /// List (ordered or unordered)
+ List {
+ ordered: bool,
+ items: Vec<String>,
+ },
/// Chart visualization
Chart {
#[serde(rename = "chartType")]
@@ -245,3 +255,582 @@ pub struct RestoreVersionRequest {
/// The current version (for optimistic locking)
pub current_version: i32,
}
+
+// =============================================================================
+// Mesh/Task Types
+// =============================================================================
+
+/// Task status for orchestrating Claude Code instances
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "lowercase")]
+pub enum TaskStatus {
+ Pending,
+ Running,
+ Paused,
+ Blocked,
+ Done,
+ Failed,
+ Merged,
+}
+
+impl std::fmt::Display for TaskStatus {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ TaskStatus::Pending => write!(f, "pending"),
+ TaskStatus::Running => write!(f, "running"),
+ TaskStatus::Paused => write!(f, "paused"),
+ TaskStatus::Blocked => write!(f, "blocked"),
+ TaskStatus::Done => write!(f, "done"),
+ TaskStatus::Failed => write!(f, "failed"),
+ TaskStatus::Merged => write!(f, "merged"),
+ }
+ }
+}
+
+impl std::str::FromStr for TaskStatus {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.to_lowercase().as_str() {
+ "pending" => Ok(TaskStatus::Pending),
+ "running" => Ok(TaskStatus::Running),
+ "paused" => Ok(TaskStatus::Paused),
+ "blocked" => Ok(TaskStatus::Blocked),
+ "done" => Ok(TaskStatus::Done),
+ "failed" => Ok(TaskStatus::Failed),
+ "merged" => Ok(TaskStatus::Merged),
+ _ => Err(format!("Unknown task status: {}", s)),
+ }
+ }
+}
+
+/// Merge mode for task completion
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "lowercase")]
+pub enum MergeMode {
+ /// Create a PR for review
+ Pr,
+ /// Auto-merge to target branch
+ Auto,
+ /// Manual merge by user
+ Manual,
+}
+
+impl std::fmt::Display for MergeMode {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ MergeMode::Pr => write!(f, "pr"),
+ MergeMode::Auto => write!(f, "auto"),
+ MergeMode::Manual => write!(f, "manual"),
+ }
+ }
+}
+
+impl std::str::FromStr for MergeMode {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.to_lowercase().as_str() {
+ "pr" => Ok(MergeMode::Pr),
+ "auto" => Ok(MergeMode::Auto),
+ "manual" => Ok(MergeMode::Manual),
+ _ => Err(format!("Unknown merge mode: {}", s)),
+ }
+ }
+}
+
+/// Task record from the database
+#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct Task {
+ pub id: Uuid,
+ pub owner_id: Uuid,
+ pub parent_task_id: Option<Uuid>,
+ /// Depth in task hierarchy: 0=orchestrator (top-level), 1=subtask (max)
+ pub depth: i32,
+ pub name: String,
+ pub description: Option<String>,
+ pub status: String,
+ pub priority: i32,
+ pub plan: String,
+
+ // Daemon/container info
+ pub daemon_id: Option<Uuid>,
+ pub container_id: Option<String>,
+ pub overlay_path: Option<String>,
+
+ // Repository info
+ pub repository_url: Option<String>,
+ pub base_branch: Option<String>,
+ pub target_branch: Option<String>,
+
+ // Merge settings
+ pub merge_mode: Option<String>,
+ pub pr_url: Option<String>,
+
+ // Completion action settings
+ /// Path to user's local repository (outside ~/.makima)
+ pub target_repo_path: Option<String>,
+ /// Action on completion: "none", "branch", "merge", "pr"
+ pub completion_action: Option<String>,
+
+ // Progress tracking
+ pub progress_summary: Option<String>,
+ pub last_output: Option<String>,
+ pub error_message: Option<String>,
+
+ // Timestamps
+ pub started_at: Option<DateTime<Utc>>,
+ pub completed_at: Option<DateTime<Utc>>,
+ pub version: i32,
+ pub created_at: DateTime<Utc>,
+ pub updated_at: DateTime<Utc>,
+
+ // Task continuation
+ /// Task ID to continue from (copy worktree from this task when starting).
+ /// Used for sequential subtask dependencies.
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub continue_from_task_id: Option<Uuid>,
+ /// Files to copy from parent task's worktree when starting.
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub copy_files: Option<serde_json::Value>,
+}
+
+impl Task {
+ /// Parse status string to TaskStatus enum
+ pub fn status_enum(&self) -> Result<TaskStatus, String> {
+ self.status.parse()
+ }
+
+ /// Parse merge_mode string to MergeMode enum
+ pub fn merge_mode_enum(&self) -> Option<Result<MergeMode, String>> {
+ self.merge_mode.as_ref().map(|s| s.parse())
+ }
+}
+
+/// Summary of a task for list views
+#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct TaskSummary {
+ pub id: Uuid,
+ pub parent_task_id: Option<Uuid>,
+ /// Depth in task hierarchy: 0=orchestrator (top-level), 1=subtask (max)
+ pub depth: i32,
+ pub name: String,
+ pub status: String,
+ pub priority: i32,
+ pub progress_summary: Option<String>,
+ pub subtask_count: i64,
+ pub version: i32,
+ pub created_at: DateTime<Utc>,
+ pub updated_at: DateTime<Utc>,
+}
+
+/// Response for task list endpoint
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct TaskListResponse {
+ pub tasks: Vec<TaskSummary>,
+ pub total: i64,
+}
+
+/// Request payload for creating a new task
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct CreateTaskRequest {
+ /// Name of the task
+ pub name: String,
+ /// Optional description
+ pub description: Option<String>,
+ /// The plan/instructions for Claude Code
+ pub plan: String,
+ /// Parent task ID (for subtasks)
+ pub parent_task_id: Option<Uuid>,
+ /// Priority (higher = more urgent)
+ #[serde(default)]
+ pub priority: i32,
+ /// Repository URL
+ pub repository_url: Option<String>,
+ /// Base branch for overlay
+ pub base_branch: Option<String>,
+ /// Target branch to merge into
+ pub target_branch: Option<String>,
+ /// Merge mode (pr, auto, manual)
+ pub merge_mode: Option<String>,
+ /// Path to user's local repository (outside ~/.makima)
+ pub target_repo_path: Option<String>,
+ /// Action on completion: "none", "branch", "merge", "pr"
+ pub completion_action: Option<String>,
+ /// Task ID to continue from (copy worktree from this task when starting)
+ pub continue_from_task_id: Option<Uuid>,
+ /// Files to copy from parent task's worktree when starting
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub copy_files: Option<Vec<String>>,
+}
+
+/// Request payload for updating a task
+#[derive(Debug, Default, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct UpdateTaskRequest {
+ pub name: Option<String>,
+ pub description: Option<String>,
+ pub plan: Option<String>,
+ pub status: Option<String>,
+ pub priority: Option<i32>,
+ pub progress_summary: Option<String>,
+ pub last_output: Option<String>,
+ pub error_message: Option<String>,
+ pub merge_mode: Option<String>,
+ pub pr_url: Option<String>,
+ /// Path to user's local repository (outside ~/.makima)
+ pub target_repo_path: Option<String>,
+ /// Action on completion: "none", "branch", "merge", "pr"
+ pub completion_action: Option<String>,
+ /// The daemon currently running this task
+ pub daemon_id: Option<Uuid>,
+ /// Explicitly clear daemon_id (set to NULL)
+ #[serde(default)]
+ pub clear_daemon_id: bool,
+ /// Version for optimistic locking
+ pub version: Option<i32>,
+}
+
+/// Task with its subtasks for detail view
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct TaskWithSubtasks {
+ #[serde(flatten)]
+ pub task: Task,
+ pub subtasks: Vec<TaskSummary>,
+}
+
+/// Request to send a message to a running task's stdin.
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct SendMessageRequest {
+ /// The message to send to the task's stdin.
+ pub message: String,
+}
+
+// =============================================================================
+// Daemon Types
+// =============================================================================
+
+/// Daemon status
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "lowercase")]
+pub enum DaemonStatus {
+ Connected,
+ Disconnected,
+ Unhealthy,
+}
+
+impl std::fmt::Display for DaemonStatus {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ DaemonStatus::Connected => write!(f, "connected"),
+ DaemonStatus::Disconnected => write!(f, "disconnected"),
+ DaemonStatus::Unhealthy => write!(f, "unhealthy"),
+ }
+ }
+}
+
+impl std::str::FromStr for DaemonStatus {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.to_lowercase().as_str() {
+ "connected" => Ok(DaemonStatus::Connected),
+ "disconnected" => Ok(DaemonStatus::Disconnected),
+ "unhealthy" => Ok(DaemonStatus::Unhealthy),
+ _ => Err(format!("Unknown daemon status: {}", s)),
+ }
+ }
+}
+
+/// Connected daemon record from the database
+#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct Daemon {
+ pub id: Uuid,
+ pub owner_id: Uuid,
+ pub connection_id: String,
+ pub hostname: Option<String>,
+ pub machine_id: Option<String>,
+ pub max_concurrent_tasks: i32,
+ pub current_task_count: i32,
+ pub status: String,
+ pub last_heartbeat_at: DateTime<Utc>,
+ pub connected_at: DateTime<Utc>,
+ pub disconnected_at: Option<DateTime<Utc>>,
+}
+
+impl Daemon {
+ /// Parse status string to DaemonStatus enum
+ pub fn status_enum(&self) -> Result<DaemonStatus, String> {
+ self.status.parse()
+ }
+}
+
+/// Response for daemon list endpoint
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct DaemonListResponse {
+ pub daemons: Vec<Daemon>,
+ pub total: i64,
+}
+
+/// Response for daemon directories endpoint
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct DaemonDirectoriesResponse {
+ /// List of suggested directories from connected daemons
+ pub directories: Vec<DaemonDirectory>,
+}
+
+/// A suggested directory from a daemon
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct DaemonDirectory {
+ /// Path to the directory
+ pub path: String,
+ /// Display label for the directory
+ pub label: String,
+ /// Type of directory: "working", "makima", "worktrees"
+ pub directory_type: String,
+ /// Daemon hostname this directory is from
+ pub hostname: Option<String>,
+ /// Whether the directory already exists (for validation)
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub exists: Option<bool>,
+}
+
+// =============================================================================
+// Task Event Types
+// =============================================================================
+
+/// Task event record from the database
+#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct TaskEvent {
+ pub id: Uuid,
+ pub task_id: Uuid,
+ pub event_type: String,
+ pub previous_status: Option<String>,
+ pub new_status: Option<String>,
+ #[sqlx(json)]
+ pub event_data: Option<serde_json::Value>,
+ pub created_at: DateTime<Utc>,
+}
+
+/// Response for task events list endpoint
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct TaskEventListResponse {
+ pub events: Vec<TaskEvent>,
+ pub total: i64,
+}
+
+/// A single output entry from a Claude Code task
+#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct TaskOutputEntry {
+ pub id: Uuid,
+ pub task_id: Uuid,
+ /// Message type: "assistant", "tool_use", "tool_result", "result", "system", "error", "raw"
+ pub message_type: String,
+ /// Main text content
+ pub content: String,
+ /// Tool name if tool_use message
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub tool_name: Option<String>,
+ /// Tool input JSON if tool_use message
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub tool_input: Option<serde_json::Value>,
+ /// Whether tool result was an error
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub is_error: Option<bool>,
+ /// Cost in USD if result message
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub cost_usd: Option<f64>,
+ /// Duration in ms if result message
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub duration_ms: Option<u64>,
+ /// Timestamp when this output was recorded
+ pub created_at: DateTime<Utc>,
+}
+
+impl TaskOutputEntry {
+ /// Convert a TaskEvent with event_type='output' to a TaskOutputEntry
+ pub fn from_task_event(event: TaskEvent) -> Option<Self> {
+ if event.event_type != "output" {
+ return None;
+ }
+ let data = event.event_data?;
+ Some(Self {
+ id: event.id,
+ task_id: event.task_id,
+ message_type: data.get("messageType")?.as_str()?.to_string(),
+ content: data.get("content")?.as_str().unwrap_or("").to_string(),
+ tool_name: data.get("toolName").and_then(|v| v.as_str()).map(|s| s.to_string()),
+ tool_input: data.get("toolInput").cloned(),
+ is_error: data.get("isError").and_then(|v| v.as_bool()),
+ cost_usd: data.get("costUsd").and_then(|v| v.as_f64()),
+ duration_ms: data.get("durationMs").and_then(|v| v.as_u64()),
+ created_at: event.created_at,
+ })
+ }
+}
+
+/// Response for task output history endpoint
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct TaskOutputResponse {
+ pub entries: Vec<TaskOutputEntry>,
+ pub total: usize,
+ pub task_id: Uuid,
+}
+
+// =============================================================================
+// Mesh Chat History Types
+// =============================================================================
+
+/// Mesh chat conversation for persisting history
+#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct MeshChatConversation {
+ pub id: Uuid,
+ pub owner_id: Uuid,
+ pub name: Option<String>,
+ pub is_active: bool,
+ pub created_at: DateTime<Utc>,
+ pub updated_at: DateTime<Utc>,
+}
+
+/// Individual message in a mesh chat conversation
+#[derive(Debug, Clone, FromRow, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct MeshChatMessageRecord {
+ pub id: Uuid,
+ pub conversation_id: Uuid,
+ pub role: String,
+ pub content: String,
+ pub context_type: String,
+ pub context_task_id: Option<Uuid>,
+ /// Tool calls made during this message (JSON, nullable)
+ pub tool_calls: Option<serde_json::Value>,
+ /// Pending questions requiring user response (JSON, nullable)
+ pub pending_questions: Option<serde_json::Value>,
+ pub created_at: DateTime<Utc>,
+}
+
+/// Response for chat history endpoint
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct MeshChatHistoryResponse {
+ pub conversation_id: Uuid,
+ pub messages: Vec<MeshChatMessageRecord>,
+}
+
+// =============================================================================
+// Merge API Types
+// =============================================================================
+
+/// Information about a task branch
+#[derive(Debug, Clone, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct BranchInfo {
+ /// Full branch name
+ pub name: String,
+ /// Task ID extracted from branch name (if parseable)
+ pub task_id: Option<Uuid>,
+ /// Whether this branch has been merged
+ pub is_merged: bool,
+ /// Short SHA of the last commit
+ pub last_commit: String,
+ /// Subject line of the last commit
+ pub last_commit_message: String,
+}
+
+/// Response for branch list endpoint
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct BranchListResponse {
+ pub branches: Vec<BranchInfo>,
+}
+
+/// Request to start a merge
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct MergeStartRequest {
+ /// Branch name to merge
+ pub source_branch: String,
+}
+
+/// Current merge state
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct MergeStatusResponse {
+ /// Whether a merge is in progress
+ pub in_progress: bool,
+ /// Branch being merged (if in progress)
+ pub source_branch: Option<String>,
+ /// Files with unresolved conflicts
+ pub conflicted_files: Vec<String>,
+}
+
+/// Request to resolve a conflict
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct MergeResolveRequest {
+ /// File path to resolve
+ pub file: String,
+ /// Resolution strategy: "ours" or "theirs"
+ pub strategy: String,
+}
+
+/// Request to commit a merge
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct MergeCommitRequest {
+ /// Commit message
+ pub message: String,
+}
+
+/// Request to skip a subtask branch
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct MergeSkipRequest {
+ /// Subtask ID to skip
+ pub subtask_id: Uuid,
+ /// Reason for skipping
+ pub reason: String,
+}
+
+/// Result of a merge operation
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct MergeResultResponse {
+ /// Whether the operation succeeded
+ pub success: bool,
+ /// Human-readable message
+ pub message: String,
+ /// Commit SHA (if a commit was created)
+ pub commit_sha: Option<String>,
+ /// Conflicted files (if conflicts occurred)
+ pub conflicts: Option<Vec<String>>,
+}
+
+/// Response to check if all branches are merged
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct MergeCompleteCheckResponse {
+ /// Whether the orchestrator can mark itself as complete
+ pub can_complete: bool,
+ /// Branches not yet merged or skipped
+ pub unmerged_branches: Vec<String>,
+ /// Count of merged branches
+ pub merged_count: u32,
+ /// Count of skipped branches
+ pub skipped_count: u32,
+}
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index 4137ba6..ce1e97d 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -4,10 +4,10 @@ use chrono::Utc;
use sqlx::PgPool;
use uuid::Uuid;
-use super::models::{CreateFileRequest, File, FileVersion, UpdateFileRequest};
-
-/// Default owner ID for anonymous users.
-pub const ANONYMOUS_OWNER_ID: Uuid = Uuid::from_u128(0x00000000_0000_0000_0000_000000000002);
+use super::models::{
+ CreateFileRequest, CreateTaskRequest, Daemon, File, FileVersion, MeshChatConversation,
+ MeshChatMessageRecord, Task, TaskEvent, TaskSummary, UpdateFileRequest, UpdateTaskRequest,
+};
/// Repository error types.
#[derive(Debug)]
@@ -60,12 +60,11 @@ pub async fn create_file(pool: &PgPool, req: CreateFileRequest) -> Result<File,
sqlx::query_as::<_, File>(
r#"
- INSERT INTO files (owner_id, name, description, transcript, location, summary, body)
- VALUES ($1, $2, $3, $4, $5, NULL, $6)
+ INSERT INTO files (name, description, transcript, location, summary, body)
+ VALUES ($1, $2, $3, $4, NULL, $5)
RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
"#,
)
- .bind(ANONYMOUS_OWNER_ID)
.bind(&name)
.bind(&req.description)
.bind(&transcript_json)
@@ -81,26 +80,23 @@ pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Err
r#"
SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
FROM files
- WHERE id = $1 AND owner_id = $2
+ WHERE id = $1
"#,
)
.bind(id)
- .bind(ANONYMOUS_OWNER_ID)
.fetch_optional(pool)
.await
}
-/// List all files for the owner, ordered by created_at DESC.
+/// List all files, ordered by created_at DESC.
pub async fn list_files(pool: &PgPool) -> Result<Vec<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
FROM files
- WHERE owner_id = $1
ORDER BY created_at DESC
"#,
)
- .bind(ANONYMOUS_OWNER_ID)
.fetch_all(pool)
.await
}
@@ -146,13 +142,12 @@ pub async fn update_file(
sqlx::query_as::<_, File>(
r#"
UPDATE files
- SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
- WHERE id = $1 AND owner_id = $2 AND version = $8
+ SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW()
+ WHERE id = $1 AND version = $7
RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
"#,
)
.bind(id)
- .bind(ANONYMOUS_OWNER_ID)
.bind(&name)
.bind(&description)
.bind(&transcript_json)
@@ -166,13 +161,12 @@ pub async fn update_file(
sqlx::query_as::<_, File>(
r#"
UPDATE files
- SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
- WHERE id = $1 AND owner_id = $2
+ SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW()
+ WHERE id = $1
RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
"#,
)
.bind(id)
- .bind(ANONYMOUS_OWNER_ID)
.bind(&name)
.bind(&description)
.bind(&transcript_json)
@@ -201,21 +195,19 @@ pub async fn delete_file(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM files
- WHERE id = $1 AND owner_id = $2
+ WHERE id = $1
"#,
)
.bind(id)
- .bind(ANONYMOUS_OWNER_ID)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
-/// Count total files for owner.
+/// Count total files.
pub async fn count_files(pool: &PgPool) -> Result<i64, sqlx::Error> {
- let result: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM files WHERE owner_id = $1")
- .bind(ANONYMOUS_OWNER_ID)
+ let result: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM files")
.fetch_one(pool)
.await?;
@@ -223,6 +215,178 @@ pub async fn count_files(pool: &PgPool) -> Result<i64, sqlx::Error> {
}
// =============================================================================
+// Owner-Scoped File Functions
+// =============================================================================
+
+/// Create a new file record for a specific owner.
+pub async fn create_file_for_owner(
+ pool: &PgPool,
+ owner_id: Uuid,
+ req: CreateFileRequest,
+) -> Result<File, sqlx::Error> {
+ let name = req.name.unwrap_or_else(generate_default_name);
+ let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default();
+ let body_json = serde_json::to_value::<Vec<super::models::BodyElement>>(vec![]).unwrap();
+
+ sqlx::query_as::<_, File>(
+ r#"
+ INSERT INTO files (owner_id, name, description, transcript, location, summary, body)
+ VALUES ($1, $2, $3, $4, $5, NULL, $6)
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
+ "#,
+ )
+ .bind(owner_id)
+ .bind(&name)
+ .bind(&req.description)
+ .bind(&transcript_json)
+ .bind(&req.location)
+ .bind(&body_json)
+ .fetch_one(pool)
+ .await
+}
+
+/// Get a file by ID, scoped to owner.
+pub async fn get_file_for_owner(
+ pool: &PgPool,
+ id: Uuid,
+ owner_id: Uuid,
+) -> Result<Option<File>, sqlx::Error> {
+ sqlx::query_as::<_, File>(
+ r#"
+ SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
+ FROM files
+ WHERE id = $1 AND owner_id = $2
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// List all files for an owner, ordered by created_at DESC.
+pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<File>, sqlx::Error> {
+ sqlx::query_as::<_, File>(
+ r#"
+ SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
+ FROM files
+ WHERE owner_id = $1
+ ORDER BY created_at DESC
+ "#,
+ )
+ .bind(owner_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Update a file by ID with optimistic locking, scoped to owner.
+pub async fn update_file_for_owner(
+ pool: &PgPool,
+ id: Uuid,
+ owner_id: Uuid,
+ req: UpdateFileRequest,
+) -> Result<Option<File>, RepositoryError> {
+ // Get the existing file first (scoped to owner)
+ let existing = get_file_for_owner(pool, id, owner_id).await?;
+ let Some(existing) = existing else {
+ return Ok(None);
+ };
+
+ // Check version if provided (optimistic locking)
+ if let Some(expected_version) = req.version {
+ if existing.version != expected_version {
+ return Err(RepositoryError::VersionConflict {
+ expected: expected_version,
+ actual: existing.version,
+ });
+ }
+ }
+
+ // Apply updates
+ let name = req.name.unwrap_or(existing.name);
+ let description = req.description.or(existing.description);
+ let transcript = req.transcript.unwrap_or(existing.transcript);
+ let transcript_json = serde_json::to_value(&transcript).unwrap_or_default();
+ let summary = req.summary.or(existing.summary);
+ let body = req.body.unwrap_or(existing.body);
+ let body_json = serde_json::to_value(&body).unwrap_or_default();
+
+ // Update with version check in WHERE clause for race condition safety
+ let result = if req.version.is_some() {
+ sqlx::query_as::<_, File>(
+ r#"
+ UPDATE files
+ SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
+ WHERE id = $1 AND owner_id = $2 AND version = $8
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .bind(&name)
+ .bind(&description)
+ .bind(&transcript_json)
+ .bind(&summary)
+ .bind(&body_json)
+ .bind(req.version.unwrap())
+ .fetch_optional(pool)
+ .await?
+ } else {
+ // No version check for internal updates
+ sqlx::query_as::<_, File>(
+ r#"
+ UPDATE files
+ SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
+ WHERE id = $1 AND owner_id = $2
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .bind(&name)
+ .bind(&description)
+ .bind(&transcript_json)
+ .bind(&summary)
+ .bind(&body_json)
+ .fetch_optional(pool)
+ .await?
+ };
+
+ // If versioned update returned None, there was a race condition
+ if result.is_none() && req.version.is_some() {
+ // Re-fetch to get the actual version
+ if let Some(current) = get_file_for_owner(pool, id, owner_id).await? {
+ return Err(RepositoryError::VersionConflict {
+ expected: req.version.unwrap(),
+ actual: current.version,
+ });
+ }
+ }
+
+ Ok(result)
+}
+
+/// Delete a file by ID, scoped to owner.
+pub async fn delete_file_for_owner(
+ pool: &PgPool,
+ id: Uuid,
+ owner_id: Uuid,
+) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ DELETE FROM files
+ WHERE id = $1 AND owner_id = $2
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+// =============================================================================
// Version History Functions
// =============================================================================
@@ -363,3 +527,1186 @@ pub async fn count_file_versions(pool: &PgPool, file_id: Uuid) -> Result<i64, sq
Ok(result.0)
}
+
+// =============================================================================
+// Task Functions
+// =============================================================================
+
+/// Create a new task.
+///
+/// If creating a subtask (parent_task_id is set) and repository settings are not provided,
+/// the subtask will inherit repository_url, base_branch, target_branch, merge_mode,
+/// and target_repo_path from the parent task. Depth is calculated from parent and limited
+/// to max 1 (2 levels: orchestrator at depth 0, subtasks at depth 1).
+///
+/// NOTE: completion_action is NOT inherited - subtasks should not auto-merge unless
+/// explicitly configured. The orchestrator controls when completion steps happen.
+pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, sqlx::Error> {
+ // Calculate depth and inherit settings from parent if applicable
+ let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
+ if let Some(parent_id) = req.parent_task_id {
+ // Fetch parent task to get depth and inherit repo settings
+ let parent = get_task(pool, parent_id).await?
+ .ok_or_else(|| sqlx::Error::RowNotFound)?;
+
+ let new_depth = parent.depth + 1;
+
+ // Validate max depth (must be < 2, i.e., 0 or 1 only)
+ // Orchestrators are at depth 0, subtasks at depth 1
+ // Subtasks cannot have their own children
+ if new_depth >= 2 {
+ return Err(sqlx::Error::Protocol(format!(
+ "Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.",
+ new_depth
+ )));
+ }
+
+ // Inherit repo settings if not provided
+ let repo_url = req.repository_url.clone().or(parent.repository_url);
+ let base_branch = req.base_branch.clone().or(parent.base_branch);
+ let target_branch = req.target_branch.clone().or(parent.target_branch);
+ let merge_mode = req.merge_mode.clone().or(parent.merge_mode);
+ let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path);
+ // NOTE: completion_action is NOT inherited - subtasks should not auto-merge.
+ // The orchestrator integrates subtask work from their worktrees.
+ let completion_action = req.completion_action.clone();
+
+ (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
+ } else {
+ // Top-level task: depth 0
+ (
+ 0,
+ req.repository_url.clone(),
+ req.base_branch.clone(),
+ req.target_branch.clone(),
+ req.merge_mode.clone(),
+ req.target_repo_path.clone(),
+ req.completion_action.clone(),
+ )
+ };
+
+ let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default());
+
+ sqlx::query_as::<_, Task>(
+ r#"
+ INSERT INTO tasks (
+ parent_task_id, depth, name, description, plan, priority,
+ repository_url, base_branch, target_branch, merge_mode,
+ target_repo_path, completion_action, continue_from_task_id, copy_files
+ )
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
+ RETURNING *
+ "#,
+ )
+ .bind(req.parent_task_id)
+ .bind(depth)
+ .bind(&req.name)
+ .bind(&req.description)
+ .bind(&req.plan)
+ .bind(req.priority)
+ .bind(&repo_url)
+ .bind(&base_branch)
+ .bind(&target_branch)
+ .bind(&merge_mode)
+ .bind(&target_repo_path)
+ .bind(&completion_action)
+ .bind(&req.continue_from_task_id)
+ .bind(&copy_files_json)
+ .fetch_one(pool)
+ .await
+}
+
+/// Get a task by ID.
+pub async fn get_task(pool: &PgPool, id: Uuid) -> Result<Option<Task>, sqlx::Error> {
+ sqlx::query_as::<_, Task>(
+ r#"
+ SELECT *
+ FROM tasks
+ WHERE id = $1
+ "#,
+ )
+ .bind(id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// List all top-level tasks (no parent), ordered by created_at DESC.
+pub async fn list_tasks(pool: &PgPool) -> Result<Vec<TaskSummary>, sqlx::Error> {
+ sqlx::query_as::<_, TaskSummary>(
+ r#"
+ SELECT
+ t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority,
+ t.progress_summary, t.version, t.created_at, t.updated_at,
+ (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count
+ FROM tasks t
+ WHERE t.parent_task_id IS NULL
+ ORDER BY t.priority DESC, t.created_at DESC
+ "#,
+ )
+ .fetch_all(pool)
+ .await
+}
+
+/// List subtasks of a parent task.
+pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSummary>, sqlx::Error> {
+ sqlx::query_as::<_, TaskSummary>(
+ r#"
+ SELECT
+ t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority,
+ t.progress_summary, t.version, t.created_at, t.updated_at,
+ (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count
+ FROM tasks t
+ WHERE t.parent_task_id = $1
+ ORDER BY t.priority DESC, t.created_at DESC
+ "#,
+ )
+ .bind(parent_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Update a task by ID with optimistic locking.
+pub async fn update_task(
+ pool: &PgPool,
+ id: Uuid,
+ req: UpdateTaskRequest,
+) -> Result<Option<Task>, RepositoryError> {
+ // Get the existing task first
+ let existing = get_task(pool, id).await?;
+ let Some(existing) = existing else {
+ return Ok(None);
+ };
+
+ // Check version if provided (optimistic locking)
+ if let Some(expected_version) = req.version {
+ if existing.version != expected_version {
+ return Err(RepositoryError::VersionConflict {
+ expected: expected_version,
+ actual: existing.version,
+ });
+ }
+ }
+
+ // Apply updates
+ let name = req.name.unwrap_or(existing.name);
+ let description = req.description.or(existing.description);
+ let plan = req.plan.unwrap_or(existing.plan);
+ let status = req.status.unwrap_or(existing.status);
+ let priority = req.priority.unwrap_or(existing.priority);
+ let progress_summary = req.progress_summary.or(existing.progress_summary);
+ let last_output = req.last_output.or(existing.last_output);
+ let error_message = req.error_message.or(existing.error_message);
+ let merge_mode = req.merge_mode.or(existing.merge_mode);
+ let pr_url = req.pr_url.or(existing.pr_url);
+ let target_repo_path = req.target_repo_path.or(existing.target_repo_path);
+ let completion_action = req.completion_action.or(existing.completion_action);
+ // Handle clear_daemon_id: if true, set to NULL; otherwise use provided value or keep existing
+ let daemon_id = if req.clear_daemon_id {
+ None
+ } else {
+ req.daemon_id.or(existing.daemon_id)
+ };
+
+ // Update with version check in WHERE clause for race condition safety
+ let result = if req.version.is_some() {
+ sqlx::query_as::<_, Task>(
+ r#"
+ UPDATE tasks
+ SET name = $2, description = $3, plan = $4, status = $5, priority = $6,
+ progress_summary = $7, last_output = $8, error_message = $9,
+ merge_mode = $10, pr_url = $11, daemon_id = $12,
+ target_repo_path = $13, completion_action = $14, updated_at = NOW()
+ WHERE id = $1 AND version = $15
+ RETURNING *
+ "#,
+ )
+ .bind(id)
+ .bind(&name)
+ .bind(&description)
+ .bind(&plan)
+ .bind(&status)
+ .bind(priority)
+ .bind(&progress_summary)
+ .bind(&last_output)
+ .bind(&error_message)
+ .bind(&merge_mode)
+ .bind(&pr_url)
+ .bind(daemon_id)
+ .bind(&target_repo_path)
+ .bind(&completion_action)
+ .bind(req.version.unwrap())
+ .fetch_optional(pool)
+ .await?
+ } else {
+ sqlx::query_as::<_, Task>(
+ r#"
+ UPDATE tasks
+ SET name = $2, description = $3, plan = $4, status = $5, priority = $6,
+ progress_summary = $7, last_output = $8, error_message = $9,
+ merge_mode = $10, pr_url = $11, daemon_id = $12,
+ target_repo_path = $13, completion_action = $14, updated_at = NOW()
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(id)
+ .bind(&name)
+ .bind(&description)
+ .bind(&plan)
+ .bind(&status)
+ .bind(priority)
+ .bind(&progress_summary)
+ .bind(&last_output)
+ .bind(&error_message)
+ .bind(&merge_mode)
+ .bind(&pr_url)
+ .bind(daemon_id)
+ .bind(&target_repo_path)
+ .bind(&completion_action)
+ .fetch_optional(pool)
+ .await?
+ };
+
+ // If versioned update returned None, there was a race condition
+ if result.is_none() && req.version.is_some() {
+ if let Some(current) = get_task(pool, id).await? {
+ return Err(RepositoryError::VersionConflict {
+ expected: req.version.unwrap(),
+ actual: current.version,
+ });
+ }
+ }
+
+ Ok(result)
+}
+
+/// Delete a task by ID.
+pub async fn delete_task(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ DELETE FROM tasks
+ WHERE id = $1
+ "#,
+ )
+ .bind(id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// Count total tasks.
+pub async fn count_tasks(pool: &PgPool) -> Result<i64, sqlx::Error> {
+ let result: (i64,) = sqlx::query_as(
+ "SELECT COUNT(*) FROM tasks WHERE parent_task_id IS NULL",
+ )
+ .fetch_one(pool)
+ .await?;
+
+ Ok(result.0)
+}
+
+// =============================================================================
+// Owner-Scoped Task Functions
+// =============================================================================
+
+/// Create a new task for a specific owner.
+pub async fn create_task_for_owner(
+ pool: &PgPool,
+ owner_id: Uuid,
+ req: CreateTaskRequest,
+) -> Result<Task, sqlx::Error> {
+ // Calculate depth and inherit settings from parent if applicable
+ let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
+ if let Some(parent_id) = req.parent_task_id {
+ // Fetch parent task to get depth and inherit repo settings (must belong to same owner)
+ let parent = get_task_for_owner(pool, parent_id, owner_id).await?
+ .ok_or_else(|| sqlx::Error::RowNotFound)?;
+
+ let new_depth = parent.depth + 1;
+
+ // Validate max depth
+ if new_depth >= 2 {
+ return Err(sqlx::Error::Protocol(format!(
+ "Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.",
+ new_depth
+ )));
+ }
+
+ // Inherit repo settings if not provided
+ let repo_url = req.repository_url.clone().or(parent.repository_url);
+ let base_branch = req.base_branch.clone().or(parent.base_branch);
+ let target_branch = req.target_branch.clone().or(parent.target_branch);
+ let merge_mode = req.merge_mode.clone().or(parent.merge_mode);
+ let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path);
+ // NOTE: completion_action is NOT inherited - subtasks should not auto-merge.
+ // The orchestrator integrates subtask work from their worktrees.
+ let completion_action = req.completion_action.clone();
+
+ (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
+ } else {
+ // Top-level task: depth 0
+ (
+ 0,
+ req.repository_url.clone(),
+ req.base_branch.clone(),
+ req.target_branch.clone(),
+ req.merge_mode.clone(),
+ req.target_repo_path.clone(),
+ req.completion_action.clone(),
+ )
+ };
+
+ let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default());
+
+ sqlx::query_as::<_, Task>(
+ r#"
+ INSERT INTO tasks (
+ owner_id, parent_task_id, depth, name, description, plan, priority,
+ repository_url, base_branch, target_branch, merge_mode,
+ target_repo_path, completion_action, continue_from_task_id, copy_files
+ )
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
+ RETURNING *
+ "#,
+ )
+ .bind(owner_id)
+ .bind(req.parent_task_id)
+ .bind(depth)
+ .bind(&req.name)
+ .bind(&req.description)
+ .bind(&req.plan)
+ .bind(req.priority)
+ .bind(&repo_url)
+ .bind(&base_branch)
+ .bind(&target_branch)
+ .bind(&merge_mode)
+ .bind(&target_repo_path)
+ .bind(&completion_action)
+ .bind(&req.continue_from_task_id)
+ .bind(&copy_files_json)
+ .fetch_one(pool)
+ .await
+}
+
+/// Get a task by ID, scoped to owner.
+pub async fn get_task_for_owner(
+ pool: &PgPool,
+ id: Uuid,
+ owner_id: Uuid,
+) -> Result<Option<Task>, sqlx::Error> {
+ sqlx::query_as::<_, Task>(
+ r#"
+ SELECT *
+ FROM tasks
+ WHERE id = $1 AND owner_id = $2
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// List all top-level tasks (no parent) for an owner, ordered by created_at DESC.
+pub async fn list_tasks_for_owner(
+ pool: &PgPool,
+ owner_id: Uuid,
+) -> Result<Vec<TaskSummary>, sqlx::Error> {
+ sqlx::query_as::<_, TaskSummary>(
+ r#"
+ SELECT
+ t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority,
+ t.progress_summary, t.version, t.created_at, t.updated_at,
+ (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count
+ FROM tasks t
+ WHERE t.owner_id = $1 AND t.parent_task_id IS NULL
+ ORDER BY t.priority DESC, t.created_at DESC
+ "#,
+ )
+ .bind(owner_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// List subtasks of a parent task, scoped to owner.
+pub async fn list_subtasks_for_owner(
+ pool: &PgPool,
+ parent_id: Uuid,
+ owner_id: Uuid,
+) -> Result<Vec<TaskSummary>, sqlx::Error> {
+ sqlx::query_as::<_, TaskSummary>(
+ r#"
+ SELECT
+ t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority,
+ t.progress_summary, t.version, t.created_at, t.updated_at,
+ (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count
+ FROM tasks t
+ WHERE t.owner_id = $1 AND t.parent_task_id = $2
+ ORDER BY t.priority DESC, t.created_at DESC
+ "#,
+ )
+ .bind(owner_id)
+ .bind(parent_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Update a task by ID with optimistic locking, scoped to owner.
+pub async fn update_task_for_owner(
+ pool: &PgPool,
+ id: Uuid,
+ owner_id: Uuid,
+ req: UpdateTaskRequest,
+) -> Result<Option<Task>, RepositoryError> {
+ // Get the existing task first (scoped to owner)
+ let existing = get_task_for_owner(pool, id, owner_id).await?;
+ let Some(existing) = existing else {
+ return Ok(None);
+ };
+
+ // Check version if provided (optimistic locking)
+ if let Some(expected_version) = req.version {
+ if existing.version != expected_version {
+ return Err(RepositoryError::VersionConflict {
+ expected: expected_version,
+ actual: existing.version,
+ });
+ }
+ }
+
+ // Apply updates
+ let name = req.name.unwrap_or(existing.name);
+ let description = req.description.or(existing.description);
+ let plan = req.plan.unwrap_or(existing.plan);
+ let status = req.status.unwrap_or(existing.status);
+ let priority = req.priority.unwrap_or(existing.priority);
+ let progress_summary = req.progress_summary.or(existing.progress_summary);
+ let last_output = req.last_output.or(existing.last_output);
+ let error_message = req.error_message.or(existing.error_message);
+ let merge_mode = req.merge_mode.or(existing.merge_mode);
+ let pr_url = req.pr_url.or(existing.pr_url);
+ let target_repo_path = req.target_repo_path.or(existing.target_repo_path);
+ let completion_action = req.completion_action.or(existing.completion_action);
+ let daemon_id = if req.clear_daemon_id {
+ None
+ } else {
+ req.daemon_id.or(existing.daemon_id)
+ };
+
+ // Update with version check in WHERE clause for race condition safety
+ let result = if req.version.is_some() {
+ sqlx::query_as::<_, Task>(
+ r#"
+ UPDATE tasks
+ SET name = $3, description = $4, plan = $5, status = $6, priority = $7,
+ progress_summary = $8, last_output = $9, error_message = $10,
+ merge_mode = $11, pr_url = $12, daemon_id = $13,
+ target_repo_path = $14, completion_action = $15, updated_at = NOW()
+ WHERE id = $1 AND owner_id = $2 AND version = $16
+ RETURNING *
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .bind(&name)
+ .bind(&description)
+ .bind(&plan)
+ .bind(&status)
+ .bind(priority)
+ .bind(&progress_summary)
+ .bind(&last_output)
+ .bind(&error_message)
+ .bind(&merge_mode)
+ .bind(&pr_url)
+ .bind(daemon_id)
+ .bind(&target_repo_path)
+ .bind(&completion_action)
+ .bind(req.version.unwrap())
+ .fetch_optional(pool)
+ .await?
+ } else {
+ sqlx::query_as::<_, Task>(
+ r#"
+ UPDATE tasks
+ SET name = $3, description = $4, plan = $5, status = $6, priority = $7,
+ progress_summary = $8, last_output = $9, error_message = $10,
+ merge_mode = $11, pr_url = $12, daemon_id = $13,
+ target_repo_path = $14, completion_action = $15, updated_at = NOW()
+ WHERE id = $1 AND owner_id = $2
+ RETURNING *
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .bind(&name)
+ .bind(&description)
+ .bind(&plan)
+ .bind(&status)
+ .bind(priority)
+ .bind(&progress_summary)
+ .bind(&last_output)
+ .bind(&error_message)
+ .bind(&merge_mode)
+ .bind(&pr_url)
+ .bind(daemon_id)
+ .bind(&target_repo_path)
+ .bind(&completion_action)
+ .fetch_optional(pool)
+ .await?
+ };
+
+ // If versioned update returned None, there was a race condition
+ if result.is_none() && req.version.is_some() {
+ if let Some(current) = get_task_for_owner(pool, id, owner_id).await? {
+ return Err(RepositoryError::VersionConflict {
+ expected: req.version.unwrap(),
+ actual: current.version,
+ });
+ }
+ }
+
+ Ok(result)
+}
+
+/// Delete a task by ID, scoped to owner.
+pub async fn delete_task_for_owner(
+ pool: &PgPool,
+ id: Uuid,
+ owner_id: Uuid,
+) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ DELETE FROM tasks
+ WHERE id = $1 AND owner_id = $2
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// Update task status and record event.
+pub async fn update_task_status(
+ pool: &PgPool,
+ id: Uuid,
+ new_status: &str,
+ event_data: Option<serde_json::Value>,
+) -> Result<Option<Task>, sqlx::Error> {
+ // Get existing status
+ let existing = get_task(pool, id).await?;
+ let Some(existing) = existing else {
+ return Ok(None);
+ };
+
+ let previous_status = existing.status.clone();
+
+ // Update task status
+ let task = sqlx::query_as::<_, Task>(
+ r#"
+ UPDATE tasks
+ SET status = $2, updated_at = NOW(),
+ started_at = CASE WHEN $2 = 'running' AND started_at IS NULL THEN NOW() ELSE started_at END,
+ completed_at = CASE WHEN $2 IN ('done', 'failed', 'merged') THEN NOW() ELSE completed_at END
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(id)
+ .bind(new_status)
+ .fetch_optional(pool)
+ .await?;
+
+ // Record event
+ if task.is_some() {
+ let _ = create_task_event(
+ pool,
+ id,
+ "status_change",
+ Some(&previous_status),
+ Some(new_status),
+ event_data,
+ )
+ .await;
+ }
+
+ Ok(task)
+}
+
+// =============================================================================
+// Task Event Functions
+// =============================================================================
+
+/// Create a task event.
+pub async fn create_task_event(
+ pool: &PgPool,
+ task_id: Uuid,
+ event_type: &str,
+ previous_status: Option<&str>,
+ new_status: Option<&str>,
+ event_data: Option<serde_json::Value>,
+) -> Result<TaskEvent, sqlx::Error> {
+ sqlx::query_as::<_, TaskEvent>(
+ r#"
+ INSERT INTO task_events (task_id, event_type, previous_status, new_status, event_data)
+ VALUES ($1, $2, $3, $4, $5)
+ RETURNING *
+ "#,
+ )
+ .bind(task_id)
+ .bind(event_type)
+ .bind(previous_status)
+ .bind(new_status)
+ .bind(event_data)
+ .fetch_one(pool)
+ .await
+}
+
+/// List events for a task.
+pub async fn list_task_events(
+ pool: &PgPool,
+ task_id: Uuid,
+ limit: Option<i64>,
+) -> Result<Vec<TaskEvent>, sqlx::Error> {
+ let limit = limit.unwrap_or(100);
+ sqlx::query_as::<_, TaskEvent>(
+ r#"
+ SELECT *
+ FROM task_events
+ WHERE task_id = $1
+ ORDER BY created_at DESC
+ LIMIT $2
+ "#,
+ )
+ .bind(task_id)
+ .bind(limit)
+ .fetch_all(pool)
+ .await
+}
+
+// =============================================================================
+// Daemon Functions
+// =============================================================================
+
+/// Register a new daemon connection.
+pub async fn register_daemon(
+ pool: &PgPool,
+ owner_id: Uuid,
+ connection_id: &str,
+ hostname: Option<&str>,
+ machine_id: Option<&str>,
+ max_concurrent_tasks: i32,
+) -> Result<Daemon, sqlx::Error> {
+ sqlx::query_as::<_, Daemon>(
+ r#"
+ INSERT INTO daemons (owner_id, connection_id, hostname, machine_id, max_concurrent_tasks)
+ VALUES ($1, $2, $3, $4, $5)
+ RETURNING *
+ "#,
+ )
+ .bind(owner_id)
+ .bind(connection_id)
+ .bind(hostname)
+ .bind(machine_id)
+ .bind(max_concurrent_tasks)
+ .fetch_one(pool)
+ .await
+}
+
+/// Get a daemon by ID.
+pub async fn get_daemon(pool: &PgPool, id: Uuid) -> Result<Option<Daemon>, sqlx::Error> {
+ sqlx::query_as::<_, Daemon>(
+ r#"
+ SELECT *
+ FROM daemons
+ WHERE id = $1
+ "#,
+ )
+ .bind(id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Get a daemon by connection ID.
+pub async fn get_daemon_by_connection(
+ pool: &PgPool,
+ connection_id: &str,
+) -> Result<Option<Daemon>, sqlx::Error> {
+ sqlx::query_as::<_, Daemon>(
+ r#"
+ SELECT *
+ FROM daemons
+ WHERE connection_id = $1
+ "#,
+ )
+ .bind(connection_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// List all daemons.
+pub async fn list_daemons(pool: &PgPool) -> Result<Vec<Daemon>, sqlx::Error> {
+ sqlx::query_as::<_, Daemon>(
+ r#"
+ SELECT *
+ FROM daemons
+ ORDER BY connected_at DESC
+ "#,
+ )
+ .fetch_all(pool)
+ .await
+}
+
+/// List daemons for a specific owner.
+pub async fn list_daemons_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<Daemon>, sqlx::Error> {
+ sqlx::query_as::<_, Daemon>(
+ r#"
+ SELECT *
+ FROM daemons
+ WHERE owner_id = $1
+ ORDER BY connected_at DESC
+ "#,
+ )
+ .bind(owner_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Get a daemon by ID for a specific owner.
+pub async fn get_daemon_for_owner(pool: &PgPool, id: Uuid, owner_id: Uuid) -> Result<Option<Daemon>, sqlx::Error> {
+ sqlx::query_as::<_, Daemon>(
+ r#"
+ SELECT *
+ FROM daemons
+ WHERE id = $1 AND owner_id = $2
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Update daemon heartbeat.
+pub async fn update_daemon_heartbeat(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ UPDATE daemons
+ SET last_heartbeat_at = NOW(), status = 'connected'
+ WHERE id = $1
+ "#,
+ )
+ .bind(id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// Update daemon status.
+pub async fn update_daemon_status(
+ pool: &PgPool,
+ id: Uuid,
+ status: &str,
+) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ UPDATE daemons
+ SET status = $2,
+ disconnected_at = CASE WHEN $2 = 'disconnected' THEN NOW() ELSE disconnected_at END
+ WHERE id = $1
+ "#,
+ )
+ .bind(id)
+ .bind(status)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// Update daemon task count.
+pub async fn update_daemon_task_count(
+ pool: &PgPool,
+ id: Uuid,
+ delta: i32,
+) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ UPDATE daemons
+ SET current_task_count = GREATEST(0, current_task_count + $2)
+ WHERE id = $1
+ "#,
+ )
+ .bind(id)
+ .bind(delta)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// Delete a daemon by ID.
+pub async fn delete_daemon(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ DELETE FROM daemons
+ WHERE id = $1
+ "#,
+ )
+ .bind(id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// Delete a daemon by connection ID.
+pub async fn delete_daemon_by_connection(
+ pool: &PgPool,
+ connection_id: &str,
+) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ DELETE FROM daemons
+ WHERE connection_id = $1
+ "#,
+ )
+ .bind(connection_id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// Count connected daemons.
+pub async fn count_daemons(pool: &PgPool) -> Result<i64, sqlx::Error> {
+ let result: (i64,) = sqlx::query_as(
+ "SELECT COUNT(*) FROM daemons WHERE status = 'connected'",
+ )
+ .fetch_one(pool)
+ .await?;
+
+ Ok(result.0)
+}
+
+// =============================================================================
+// Sibling Awareness Functions
+// =============================================================================
+
+/// List sibling tasks (tasks with the same parent, excluding the given task).
+pub async fn list_sibling_tasks(
+ pool: &PgPool,
+ task_id: Uuid,
+ parent_id: Option<Uuid>,
+) -> Result<Vec<TaskSummary>, sqlx::Error> {
+ match parent_id {
+ Some(parent) => {
+ sqlx::query_as::<_, TaskSummary>(
+ r#"
+ SELECT
+ t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority,
+ t.progress_summary, t.version, t.created_at, t.updated_at,
+ (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count
+ FROM tasks t
+ WHERE t.parent_task_id = $1 AND t.id != $2
+ ORDER BY t.priority DESC, t.created_at DESC
+ "#,
+ )
+ .bind(parent)
+ .bind(task_id)
+ .fetch_all(pool)
+ .await
+ }
+ None => {
+ // Top-level tasks (no parent) - siblings are other top-level tasks
+ sqlx::query_as::<_, TaskSummary>(
+ r#"
+ SELECT
+ t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority,
+ t.progress_summary, t.version, t.created_at, t.updated_at,
+ (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count
+ FROM tasks t
+ WHERE t.parent_task_id IS NULL AND t.id != $1
+ ORDER BY t.priority DESC, t.created_at DESC
+ "#,
+ )
+ .bind(task_id)
+ .fetch_all(pool)
+ .await
+ }
+ }
+}
+
+/// Get running sibling tasks (for context injection).
+pub async fn get_running_siblings(
+ pool: &PgPool,
+ owner_id: Uuid,
+ task_id: Uuid,
+ parent_id: Option<Uuid>,
+) -> Result<Vec<Task>, sqlx::Error> {
+ match parent_id {
+ Some(parent) => {
+ sqlx::query_as::<_, Task>(
+ r#"
+ SELECT *
+ FROM tasks t
+ WHERE t.owner_id = $1
+ AND t.parent_task_id = $2
+ AND t.id != $3
+ AND t.status = 'running'
+ ORDER BY t.priority DESC
+ "#,
+ )
+ .bind(owner_id)
+ .bind(parent)
+ .bind(task_id)
+ .fetch_all(pool)
+ .await
+ }
+ None => {
+ sqlx::query_as::<_, Task>(
+ r#"
+ SELECT *
+ FROM tasks t
+ WHERE t.owner_id = $1
+ AND t.parent_task_id IS NULL
+ AND t.id != $2
+ AND t.status = 'running'
+ ORDER BY t.priority DESC
+ "#,
+ )
+ .bind(owner_id)
+ .bind(task_id)
+ .fetch_all(pool)
+ .await
+ }
+ }
+}
+
+/// Get task with its siblings for context awareness.
+pub async fn get_task_with_siblings(
+ pool: &PgPool,
+ id: Uuid,
+) -> Result<Option<(Task, Vec<TaskSummary>)>, sqlx::Error> {
+ let task = get_task(pool, id).await?;
+ let Some(task) = task else {
+ return Ok(None);
+ };
+
+ let siblings = list_sibling_tasks(pool, id, task.parent_task_id).await?;
+ Ok(Some((task, siblings)))
+}
+
+// =============================================================================
+// Task Output Persistence Functions
+// =============================================================================
+
+/// Save task output to the database.
+/// This stores output in the task_events table with event_type='output'.
+pub async fn save_task_output(
+ pool: &PgPool,
+ task_id: Uuid,
+ message_type: &str,
+ content: &str,
+ tool_name: Option<&str>,
+ tool_input: Option<serde_json::Value>,
+ is_error: Option<bool>,
+ cost_usd: Option<f64>,
+ duration_ms: Option<u64>,
+) -> Result<TaskEvent, sqlx::Error> {
+ let event_data = serde_json::json!({
+ "messageType": message_type,
+ "content": content,
+ "toolName": tool_name,
+ "toolInput": tool_input,
+ "isError": is_error,
+ "costUsd": cost_usd,
+ "durationMs": duration_ms,
+ });
+
+ create_task_event(pool, task_id, "output", None, None, Some(event_data)).await
+}
+
+/// Get task output from the database.
+/// Retrieves all output events for a task, ordered by creation time.
+pub async fn get_task_output(
+ pool: &PgPool,
+ task_id: Uuid,
+ limit: Option<i64>,
+) -> Result<Vec<TaskEvent>, sqlx::Error> {
+ let limit = limit.unwrap_or(1000);
+ sqlx::query_as::<_, TaskEvent>(
+ r#"
+ SELECT *
+ FROM task_events
+ WHERE task_id = $1 AND event_type = 'output'
+ ORDER BY created_at ASC
+ LIMIT $2
+ "#,
+ )
+ .bind(task_id)
+ .bind(limit)
+ .fetch_all(pool)
+ .await
+}
+
+/// Update task completion status with error message.
+/// Sets the task status to 'done' or 'failed' and records completion time.
+pub async fn complete_task(
+ pool: &PgPool,
+ task_id: Uuid,
+ success: bool,
+ error_message: Option<&str>,
+) -> Result<Option<Task>, sqlx::Error> {
+ let status = if success { "done" } else { "failed" };
+
+ let task = sqlx::query_as::<_, Task>(
+ r#"
+ UPDATE tasks
+ SET status = $2,
+ error_message = COALESCE($3, error_message),
+ completed_at = NOW(),
+ updated_at = NOW()
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(task_id)
+ .bind(status)
+ .bind(error_message)
+ .fetch_optional(pool)
+ .await?;
+
+ // Record completion event
+ if task.is_some() {
+ let event_data = serde_json::json!({
+ "success": success,
+ "errorMessage": error_message,
+ });
+ let _ = create_task_event(
+ pool,
+ task_id,
+ "complete",
+ Some("running"),
+ Some(status),
+ Some(event_data),
+ )
+ .await;
+ }
+
+ Ok(task)
+}
+
+// =============================================================================
+// Mesh Chat History Functions
+// =============================================================================
+
+/// Get or create the active conversation for an owner.
+pub async fn get_or_create_active_conversation(
+ pool: &PgPool,
+ owner_id: Uuid,
+) -> Result<MeshChatConversation, sqlx::Error> {
+ // Try to get existing active conversation for this owner
+ let existing = sqlx::query_as::<_, MeshChatConversation>(
+ r#"
+ SELECT *
+ FROM mesh_chat_conversations
+ WHERE is_active = true AND owner_id = $1
+ LIMIT 1
+ "#,
+ )
+ .bind(owner_id)
+ .fetch_optional(pool)
+ .await?;
+
+ if let Some(conv) = existing {
+ return Ok(conv);
+ }
+
+ // Create new conversation
+ sqlx::query_as::<_, MeshChatConversation>(
+ r#"
+ INSERT INTO mesh_chat_conversations (owner_id, is_active)
+ VALUES ($1, true)
+ RETURNING *
+ "#,
+ )
+ .bind(owner_id)
+ .fetch_one(pool)
+ .await
+}
+
+/// List messages for a conversation.
+pub async fn list_chat_messages(
+ pool: &PgPool,
+ conversation_id: Uuid,
+ limit: Option<i32>,
+) -> Result<Vec<MeshChatMessageRecord>, sqlx::Error> {
+ let limit = limit.unwrap_or(100);
+ sqlx::query_as::<_, MeshChatMessageRecord>(
+ r#"
+ SELECT *
+ FROM mesh_chat_messages
+ WHERE conversation_id = $1
+ ORDER BY created_at ASC
+ LIMIT $2
+ "#,
+ )
+ .bind(conversation_id)
+ .bind(limit)
+ .fetch_all(pool)
+ .await
+}
+
+/// Add a message to a conversation.
+#[allow(clippy::too_many_arguments)]
+pub async fn add_chat_message(
+ pool: &PgPool,
+ conversation_id: Uuid,
+ role: &str,
+ content: &str,
+ context_type: &str,
+ context_task_id: Option<Uuid>,
+ tool_calls: Option<serde_json::Value>,
+ pending_questions: Option<serde_json::Value>,
+) -> Result<MeshChatMessageRecord, sqlx::Error> {
+ sqlx::query_as::<_, MeshChatMessageRecord>(
+ r#"
+ INSERT INTO mesh_chat_messages
+ (conversation_id, role, content, context_type, context_task_id, tool_calls, pending_questions)
+ VALUES ($1, $2, $3, $4, $5, $6, $7)
+ RETURNING *
+ "#,
+ )
+ .bind(conversation_id)
+ .bind(role)
+ .bind(content)
+ .bind(context_type)
+ .bind(context_task_id)
+ .bind(tool_calls)
+ .bind(pending_questions)
+ .fetch_one(pool)
+ .await
+}
+
+/// Clear conversation (archive existing and create new).
+pub async fn clear_conversation(pool: &PgPool, owner_id: Uuid) -> Result<MeshChatConversation, sqlx::Error> {
+ // Mark existing as inactive for this owner
+ sqlx::query(
+ r#"
+ UPDATE mesh_chat_conversations
+ SET is_active = false, updated_at = NOW()
+ WHERE is_active = true AND owner_id = $1
+ "#,
+ )
+ .bind(owner_id)
+ .execute(pool)
+ .await?;
+
+ // Create new active conversation
+ get_or_create_active_conversation(pool, owner_id).await
+}