diff options
Diffstat (limited to 'makima/src/db')
| -rw-r--r-- | makima/src/db/models.rs | 589 | ||||
| -rw-r--r-- | makima/src/db/repository.rs | 1393 |
2 files changed, 1959 insertions, 23 deletions
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index 617e590..5064b97 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -36,6 +36,16 @@ pub enum BodyElement { Heading { level: u8, text: String }, /// Paragraph text Paragraph { text: String }, + /// Code block with optional language + Code { + language: Option<String>, + content: String, + }, + /// List (ordered or unordered) + List { + ordered: bool, + items: Vec<String>, + }, /// Chart visualization Chart { #[serde(rename = "chartType")] @@ -245,3 +255,582 @@ pub struct RestoreVersionRequest { /// The current version (for optimistic locking) pub current_version: i32, } + +// ============================================================================= +// Mesh/Task Types +// ============================================================================= + +/// Task status for orchestrating Claude Code instances +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "lowercase")] +pub enum TaskStatus { + Pending, + Running, + Paused, + Blocked, + Done, + Failed, + Merged, +} + +impl std::fmt::Display for TaskStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TaskStatus::Pending => write!(f, "pending"), + TaskStatus::Running => write!(f, "running"), + TaskStatus::Paused => write!(f, "paused"), + TaskStatus::Blocked => write!(f, "blocked"), + TaskStatus::Done => write!(f, "done"), + TaskStatus::Failed => write!(f, "failed"), + TaskStatus::Merged => write!(f, "merged"), + } + } +} + +impl std::str::FromStr for TaskStatus { + type Err = String; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s.to_lowercase().as_str() { + "pending" => Ok(TaskStatus::Pending), + "running" => Ok(TaskStatus::Running), + "paused" => Ok(TaskStatus::Paused), + "blocked" => Ok(TaskStatus::Blocked), + "done" => Ok(TaskStatus::Done), + "failed" => Ok(TaskStatus::Failed), + "merged" => Ok(TaskStatus::Merged), + _ => Err(format!("Unknown task status: {}", s)), + } + } +} + +/// Merge mode for task completion +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "lowercase")] +pub enum MergeMode { + /// Create a PR for review + Pr, + /// Auto-merge to target branch + Auto, + /// Manual merge by user + Manual, +} + +impl std::fmt::Display for MergeMode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + MergeMode::Pr => write!(f, "pr"), + MergeMode::Auto => write!(f, "auto"), + MergeMode::Manual => write!(f, "manual"), + } + } +} + +impl std::str::FromStr for MergeMode { + type Err = String; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s.to_lowercase().as_str() { + "pr" => Ok(MergeMode::Pr), + "auto" => Ok(MergeMode::Auto), + "manual" => Ok(MergeMode::Manual), + _ => Err(format!("Unknown merge mode: {}", s)), + } + } +} + +/// Task record from the database +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct Task { + pub id: Uuid, + pub owner_id: Uuid, + pub parent_task_id: Option<Uuid>, + /// Depth in task hierarchy: 0=orchestrator (top-level), 1=subtask (max) + pub depth: i32, + pub name: String, + pub description: Option<String>, + pub status: String, + pub priority: i32, + pub plan: String, + + // Daemon/container info + pub daemon_id: Option<Uuid>, + pub container_id: Option<String>, + pub overlay_path: Option<String>, + + // Repository info + pub repository_url: Option<String>, + pub base_branch: Option<String>, + pub target_branch: Option<String>, + + // Merge settings + pub merge_mode: Option<String>, + pub pr_url: Option<String>, + + // Completion action settings + /// Path to user's local repository (outside ~/.makima) + pub target_repo_path: Option<String>, + /// Action on completion: "none", "branch", "merge", "pr" + pub completion_action: Option<String>, + + // Progress tracking + pub progress_summary: Option<String>, + pub last_output: Option<String>, + pub error_message: Option<String>, + + // Timestamps + pub started_at: Option<DateTime<Utc>>, + pub completed_at: Option<DateTime<Utc>>, + pub version: i32, + pub created_at: DateTime<Utc>, + pub updated_at: DateTime<Utc>, + + // Task continuation + /// Task ID to continue from (copy worktree from this task when starting). + /// Used for sequential subtask dependencies. + #[serde(skip_serializing_if = "Option::is_none")] + pub continue_from_task_id: Option<Uuid>, + /// Files to copy from parent task's worktree when starting. + #[serde(skip_serializing_if = "Option::is_none")] + pub copy_files: Option<serde_json::Value>, +} + +impl Task { + /// Parse status string to TaskStatus enum + pub fn status_enum(&self) -> Result<TaskStatus, String> { + self.status.parse() + } + + /// Parse merge_mode string to MergeMode enum + pub fn merge_mode_enum(&self) -> Option<Result<MergeMode, String>> { + self.merge_mode.as_ref().map(|s| s.parse()) + } +} + +/// Summary of a task for list views +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct TaskSummary { + pub id: Uuid, + pub parent_task_id: Option<Uuid>, + /// Depth in task hierarchy: 0=orchestrator (top-level), 1=subtask (max) + pub depth: i32, + pub name: String, + pub status: String, + pub priority: i32, + pub progress_summary: Option<String>, + pub subtask_count: i64, + pub version: i32, + pub created_at: DateTime<Utc>, + pub updated_at: DateTime<Utc>, +} + +/// Response for task list endpoint +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct TaskListResponse { + pub tasks: Vec<TaskSummary>, + pub total: i64, +} + +/// Request payload for creating a new task +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreateTaskRequest { + /// Name of the task + pub name: String, + /// Optional description + pub description: Option<String>, + /// The plan/instructions for Claude Code + pub plan: String, + /// Parent task ID (for subtasks) + pub parent_task_id: Option<Uuid>, + /// Priority (higher = more urgent) + #[serde(default)] + pub priority: i32, + /// Repository URL + pub repository_url: Option<String>, + /// Base branch for overlay + pub base_branch: Option<String>, + /// Target branch to merge into + pub target_branch: Option<String>, + /// Merge mode (pr, auto, manual) + pub merge_mode: Option<String>, + /// Path to user's local repository (outside ~/.makima) + pub target_repo_path: Option<String>, + /// Action on completion: "none", "branch", "merge", "pr" + pub completion_action: Option<String>, + /// Task ID to continue from (copy worktree from this task when starting) + pub continue_from_task_id: Option<Uuid>, + /// Files to copy from parent task's worktree when starting + #[serde(skip_serializing_if = "Option::is_none")] + pub copy_files: Option<Vec<String>>, +} + +/// Request payload for updating a task +#[derive(Debug, Default, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct UpdateTaskRequest { + pub name: Option<String>, + pub description: Option<String>, + pub plan: Option<String>, + pub status: Option<String>, + pub priority: Option<i32>, + pub progress_summary: Option<String>, + pub last_output: Option<String>, + pub error_message: Option<String>, + pub merge_mode: Option<String>, + pub pr_url: Option<String>, + /// Path to user's local repository (outside ~/.makima) + pub target_repo_path: Option<String>, + /// Action on completion: "none", "branch", "merge", "pr" + pub completion_action: Option<String>, + /// The daemon currently running this task + pub daemon_id: Option<Uuid>, + /// Explicitly clear daemon_id (set to NULL) + #[serde(default)] + pub clear_daemon_id: bool, + /// Version for optimistic locking + pub version: Option<i32>, +} + +/// Task with its subtasks for detail view +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct TaskWithSubtasks { + #[serde(flatten)] + pub task: Task, + pub subtasks: Vec<TaskSummary>, +} + +/// Request to send a message to a running task's stdin. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SendMessageRequest { + /// The message to send to the task's stdin. + pub message: String, +} + +// ============================================================================= +// Daemon Types +// ============================================================================= + +/// Daemon status +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "lowercase")] +pub enum DaemonStatus { + Connected, + Disconnected, + Unhealthy, +} + +impl std::fmt::Display for DaemonStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + DaemonStatus::Connected => write!(f, "connected"), + DaemonStatus::Disconnected => write!(f, "disconnected"), + DaemonStatus::Unhealthy => write!(f, "unhealthy"), + } + } +} + +impl std::str::FromStr for DaemonStatus { + type Err = String; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s.to_lowercase().as_str() { + "connected" => Ok(DaemonStatus::Connected), + "disconnected" => Ok(DaemonStatus::Disconnected), + "unhealthy" => Ok(DaemonStatus::Unhealthy), + _ => Err(format!("Unknown daemon status: {}", s)), + } + } +} + +/// Connected daemon record from the database +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct Daemon { + pub id: Uuid, + pub owner_id: Uuid, + pub connection_id: String, + pub hostname: Option<String>, + pub machine_id: Option<String>, + pub max_concurrent_tasks: i32, + pub current_task_count: i32, + pub status: String, + pub last_heartbeat_at: DateTime<Utc>, + pub connected_at: DateTime<Utc>, + pub disconnected_at: Option<DateTime<Utc>>, +} + +impl Daemon { + /// Parse status string to DaemonStatus enum + pub fn status_enum(&self) -> Result<DaemonStatus, String> { + self.status.parse() + } +} + +/// Response for daemon list endpoint +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct DaemonListResponse { + pub daemons: Vec<Daemon>, + pub total: i64, +} + +/// Response for daemon directories endpoint +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct DaemonDirectoriesResponse { + /// List of suggested directories from connected daemons + pub directories: Vec<DaemonDirectory>, +} + +/// A suggested directory from a daemon +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct DaemonDirectory { + /// Path to the directory + pub path: String, + /// Display label for the directory + pub label: String, + /// Type of directory: "working", "makima", "worktrees" + pub directory_type: String, + /// Daemon hostname this directory is from + pub hostname: Option<String>, + /// Whether the directory already exists (for validation) + #[serde(skip_serializing_if = "Option::is_none")] + pub exists: Option<bool>, +} + +// ============================================================================= +// Task Event Types +// ============================================================================= + +/// Task event record from the database +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct TaskEvent { + pub id: Uuid, + pub task_id: Uuid, + pub event_type: String, + pub previous_status: Option<String>, + pub new_status: Option<String>, + #[sqlx(json)] + pub event_data: Option<serde_json::Value>, + pub created_at: DateTime<Utc>, +} + +/// Response for task events list endpoint +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct TaskEventListResponse { + pub events: Vec<TaskEvent>, + pub total: i64, +} + +/// A single output entry from a Claude Code task +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct TaskOutputEntry { + pub id: Uuid, + pub task_id: Uuid, + /// Message type: "assistant", "tool_use", "tool_result", "result", "system", "error", "raw" + pub message_type: String, + /// Main text content + pub content: String, + /// Tool name if tool_use message + #[serde(skip_serializing_if = "Option::is_none")] + pub tool_name: Option<String>, + /// Tool input JSON if tool_use message + #[serde(skip_serializing_if = "Option::is_none")] + pub tool_input: Option<serde_json::Value>, + /// Whether tool result was an error + #[serde(skip_serializing_if = "Option::is_none")] + pub is_error: Option<bool>, + /// Cost in USD if result message + #[serde(skip_serializing_if = "Option::is_none")] + pub cost_usd: Option<f64>, + /// Duration in ms if result message + #[serde(skip_serializing_if = "Option::is_none")] + pub duration_ms: Option<u64>, + /// Timestamp when this output was recorded + pub created_at: DateTime<Utc>, +} + +impl TaskOutputEntry { + /// Convert a TaskEvent with event_type='output' to a TaskOutputEntry + pub fn from_task_event(event: TaskEvent) -> Option<Self> { + if event.event_type != "output" { + return None; + } + let data = event.event_data?; + Some(Self { + id: event.id, + task_id: event.task_id, + message_type: data.get("messageType")?.as_str()?.to_string(), + content: data.get("content")?.as_str().unwrap_or("").to_string(), + tool_name: data.get("toolName").and_then(|v| v.as_str()).map(|s| s.to_string()), + tool_input: data.get("toolInput").cloned(), + is_error: data.get("isError").and_then(|v| v.as_bool()), + cost_usd: data.get("costUsd").and_then(|v| v.as_f64()), + duration_ms: data.get("durationMs").and_then(|v| v.as_u64()), + created_at: event.created_at, + }) + } +} + +/// Response for task output history endpoint +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct TaskOutputResponse { + pub entries: Vec<TaskOutputEntry>, + pub total: usize, + pub task_id: Uuid, +} + +// ============================================================================= +// Mesh Chat History Types +// ============================================================================= + +/// Mesh chat conversation for persisting history +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct MeshChatConversation { + pub id: Uuid, + pub owner_id: Uuid, + pub name: Option<String>, + pub is_active: bool, + pub created_at: DateTime<Utc>, + pub updated_at: DateTime<Utc>, +} + +/// Individual message in a mesh chat conversation +#[derive(Debug, Clone, FromRow, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct MeshChatMessageRecord { + pub id: Uuid, + pub conversation_id: Uuid, + pub role: String, + pub content: String, + pub context_type: String, + pub context_task_id: Option<Uuid>, + /// Tool calls made during this message (JSON, nullable) + pub tool_calls: Option<serde_json::Value>, + /// Pending questions requiring user response (JSON, nullable) + pub pending_questions: Option<serde_json::Value>, + pub created_at: DateTime<Utc>, +} + +/// Response for chat history endpoint +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct MeshChatHistoryResponse { + pub conversation_id: Uuid, + pub messages: Vec<MeshChatMessageRecord>, +} + +// ============================================================================= +// Merge API Types +// ============================================================================= + +/// Information about a task branch +#[derive(Debug, Clone, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct BranchInfo { + /// Full branch name + pub name: String, + /// Task ID extracted from branch name (if parseable) + pub task_id: Option<Uuid>, + /// Whether this branch has been merged + pub is_merged: bool, + /// Short SHA of the last commit + pub last_commit: String, + /// Subject line of the last commit + pub last_commit_message: String, +} + +/// Response for branch list endpoint +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct BranchListResponse { + pub branches: Vec<BranchInfo>, +} + +/// Request to start a merge +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct MergeStartRequest { + /// Branch name to merge + pub source_branch: String, +} + +/// Current merge state +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct MergeStatusResponse { + /// Whether a merge is in progress + pub in_progress: bool, + /// Branch being merged (if in progress) + pub source_branch: Option<String>, + /// Files with unresolved conflicts + pub conflicted_files: Vec<String>, +} + +/// Request to resolve a conflict +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct MergeResolveRequest { + /// File path to resolve + pub file: String, + /// Resolution strategy: "ours" or "theirs" + pub strategy: String, +} + +/// Request to commit a merge +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct MergeCommitRequest { + /// Commit message + pub message: String, +} + +/// Request to skip a subtask branch +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct MergeSkipRequest { + /// Subtask ID to skip + pub subtask_id: Uuid, + /// Reason for skipping + pub reason: String, +} + +/// Result of a merge operation +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct MergeResultResponse { + /// Whether the operation succeeded + pub success: bool, + /// Human-readable message + pub message: String, + /// Commit SHA (if a commit was created) + pub commit_sha: Option<String>, + /// Conflicted files (if conflicts occurred) + pub conflicts: Option<Vec<String>>, +} + +/// Response to check if all branches are merged +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct MergeCompleteCheckResponse { + /// Whether the orchestrator can mark itself as complete + pub can_complete: bool, + /// Branches not yet merged or skipped + pub unmerged_branches: Vec<String>, + /// Count of merged branches + pub merged_count: u32, + /// Count of skipped branches + pub skipped_count: u32, +} diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 4137ba6..ce1e97d 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -4,10 +4,10 @@ use chrono::Utc; use sqlx::PgPool; use uuid::Uuid; -use super::models::{CreateFileRequest, File, FileVersion, UpdateFileRequest}; - -/// Default owner ID for anonymous users. -pub const ANONYMOUS_OWNER_ID: Uuid = Uuid::from_u128(0x00000000_0000_0000_0000_000000000002); +use super::models::{ + CreateFileRequest, CreateTaskRequest, Daemon, File, FileVersion, MeshChatConversation, + MeshChatMessageRecord, Task, TaskEvent, TaskSummary, UpdateFileRequest, UpdateTaskRequest, +}; /// Repository error types. #[derive(Debug)] @@ -60,12 +60,11 @@ pub async fn create_file(pool: &PgPool, req: CreateFileRequest) -> Result<File, sqlx::query_as::<_, File>( r#" - INSERT INTO files (owner_id, name, description, transcript, location, summary, body) - VALUES ($1, $2, $3, $4, $5, NULL, $6) + INSERT INTO files (name, description, transcript, location, summary, body) + VALUES ($1, $2, $3, $4, NULL, $5) RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at "#, ) - .bind(ANONYMOUS_OWNER_ID) .bind(&name) .bind(&req.description) .bind(&transcript_json) @@ -81,26 +80,23 @@ pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Err r#" SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at FROM files - WHERE id = $1 AND owner_id = $2 + WHERE id = $1 "#, ) .bind(id) - .bind(ANONYMOUS_OWNER_ID) .fetch_optional(pool) .await } -/// List all files for the owner, ordered by created_at DESC. +/// List all files, ordered by created_at DESC. pub async fn list_files(pool: &PgPool) -> Result<Vec<File>, sqlx::Error> { sqlx::query_as::<_, File>( r#" SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at FROM files - WHERE owner_id = $1 ORDER BY created_at DESC "#, ) - .bind(ANONYMOUS_OWNER_ID) .fetch_all(pool) .await } @@ -146,13 +142,12 @@ pub async fn update_file( sqlx::query_as::<_, File>( r#" UPDATE files - SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() - WHERE id = $1 AND owner_id = $2 AND version = $8 + SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW() + WHERE id = $1 AND version = $7 RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at "#, ) .bind(id) - .bind(ANONYMOUS_OWNER_ID) .bind(&name) .bind(&description) .bind(&transcript_json) @@ -166,13 +161,12 @@ pub async fn update_file( sqlx::query_as::<_, File>( r#" UPDATE files - SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() - WHERE id = $1 AND owner_id = $2 + SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW() + WHERE id = $1 RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at "#, ) .bind(id) - .bind(ANONYMOUS_OWNER_ID) .bind(&name) .bind(&description) .bind(&transcript_json) @@ -201,21 +195,19 @@ pub async fn delete_file(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> { let result = sqlx::query( r#" DELETE FROM files - WHERE id = $1 AND owner_id = $2 + WHERE id = $1 "#, ) .bind(id) - .bind(ANONYMOUS_OWNER_ID) .execute(pool) .await?; Ok(result.rows_affected() > 0) } -/// Count total files for owner. +/// Count total files. pub async fn count_files(pool: &PgPool) -> Result<i64, sqlx::Error> { - let result: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM files WHERE owner_id = $1") - .bind(ANONYMOUS_OWNER_ID) + let result: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM files") .fetch_one(pool) .await?; @@ -223,6 +215,178 @@ pub async fn count_files(pool: &PgPool) -> Result<i64, sqlx::Error> { } // ============================================================================= +// Owner-Scoped File Functions +// ============================================================================= + +/// Create a new file record for a specific owner. +pub async fn create_file_for_owner( + pool: &PgPool, + owner_id: Uuid, + req: CreateFileRequest, +) -> Result<File, sqlx::Error> { + let name = req.name.unwrap_or_else(generate_default_name); + let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default(); + let body_json = serde_json::to_value::<Vec<super::models::BodyElement>>(vec![]).unwrap(); + + sqlx::query_as::<_, File>( + r#" + INSERT INTO files (owner_id, name, description, transcript, location, summary, body) + VALUES ($1, $2, $3, $4, $5, NULL, $6) + RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + "#, + ) + .bind(owner_id) + .bind(&name) + .bind(&req.description) + .bind(&transcript_json) + .bind(&req.location) + .bind(&body_json) + .fetch_one(pool) + .await +} + +/// Get a file by ID, scoped to owner. +pub async fn get_file_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, +) -> Result<Option<File>, sqlx::Error> { + sqlx::query_as::<_, File>( + r#" + SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + FROM files + WHERE id = $1 AND owner_id = $2 + "#, + ) + .bind(id) + .bind(owner_id) + .fetch_optional(pool) + .await +} + +/// List all files for an owner, ordered by created_at DESC. +pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<File>, sqlx::Error> { + sqlx::query_as::<_, File>( + r#" + SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + FROM files + WHERE owner_id = $1 + ORDER BY created_at DESC + "#, + ) + .bind(owner_id) + .fetch_all(pool) + .await +} + +/// Update a file by ID with optimistic locking, scoped to owner. +pub async fn update_file_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, + req: UpdateFileRequest, +) -> Result<Option<File>, RepositoryError> { + // Get the existing file first (scoped to owner) + let existing = get_file_for_owner(pool, id, owner_id).await?; + let Some(existing) = existing else { + return Ok(None); + }; + + // Check version if provided (optimistic locking) + if let Some(expected_version) = req.version { + if existing.version != expected_version { + return Err(RepositoryError::VersionConflict { + expected: expected_version, + actual: existing.version, + }); + } + } + + // Apply updates + let name = req.name.unwrap_or(existing.name); + let description = req.description.or(existing.description); + let transcript = req.transcript.unwrap_or(existing.transcript); + let transcript_json = serde_json::to_value(&transcript).unwrap_or_default(); + let summary = req.summary.or(existing.summary); + let body = req.body.unwrap_or(existing.body); + let body_json = serde_json::to_value(&body).unwrap_or_default(); + + // Update with version check in WHERE clause for race condition safety + let result = if req.version.is_some() { + sqlx::query_as::<_, File>( + r#" + UPDATE files + SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() + WHERE id = $1 AND owner_id = $2 AND version = $8 + RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + "#, + ) + .bind(id) + .bind(owner_id) + .bind(&name) + .bind(&description) + .bind(&transcript_json) + .bind(&summary) + .bind(&body_json) + .bind(req.version.unwrap()) + .fetch_optional(pool) + .await? + } else { + // No version check for internal updates + sqlx::query_as::<_, File>( + r#" + UPDATE files + SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() + WHERE id = $1 AND owner_id = $2 + RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + "#, + ) + .bind(id) + .bind(owner_id) + .bind(&name) + .bind(&description) + .bind(&transcript_json) + .bind(&summary) + .bind(&body_json) + .fetch_optional(pool) + .await? + }; + + // If versioned update returned None, there was a race condition + if result.is_none() && req.version.is_some() { + // Re-fetch to get the actual version + if let Some(current) = get_file_for_owner(pool, id, owner_id).await? { + return Err(RepositoryError::VersionConflict { + expected: req.version.unwrap(), + actual: current.version, + }); + } + } + + Ok(result) +} + +/// Delete a file by ID, scoped to owner. +pub async fn delete_file_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + DELETE FROM files + WHERE id = $1 AND owner_id = $2 + "#, + ) + .bind(id) + .bind(owner_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +// ============================================================================= // Version History Functions // ============================================================================= @@ -363,3 +527,1186 @@ pub async fn count_file_versions(pool: &PgPool, file_id: Uuid) -> Result<i64, sq Ok(result.0) } + +// ============================================================================= +// Task Functions +// ============================================================================= + +/// Create a new task. +/// +/// If creating a subtask (parent_task_id is set) and repository settings are not provided, +/// the subtask will inherit repository_url, base_branch, target_branch, merge_mode, +/// and target_repo_path from the parent task. Depth is calculated from parent and limited +/// to max 1 (2 levels: orchestrator at depth 0, subtasks at depth 1). +/// +/// NOTE: completion_action is NOT inherited - subtasks should not auto-merge unless +/// explicitly configured. The orchestrator controls when completion steps happen. +pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, sqlx::Error> { + // Calculate depth and inherit settings from parent if applicable + let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = + if let Some(parent_id) = req.parent_task_id { + // Fetch parent task to get depth and inherit repo settings + let parent = get_task(pool, parent_id).await? + .ok_or_else(|| sqlx::Error::RowNotFound)?; + + let new_depth = parent.depth + 1; + + // Validate max depth (must be < 2, i.e., 0 or 1 only) + // Orchestrators are at depth 0, subtasks at depth 1 + // Subtasks cannot have their own children + if new_depth >= 2 { + return Err(sqlx::Error::Protocol(format!( + "Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.", + new_depth + ))); + } + + // Inherit repo settings if not provided + let repo_url = req.repository_url.clone().or(parent.repository_url); + let base_branch = req.base_branch.clone().or(parent.base_branch); + let target_branch = req.target_branch.clone().or(parent.target_branch); + let merge_mode = req.merge_mode.clone().or(parent.merge_mode); + let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path); + // NOTE: completion_action is NOT inherited - subtasks should not auto-merge. + // The orchestrator integrates subtask work from their worktrees. + let completion_action = req.completion_action.clone(); + + (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) + } else { + // Top-level task: depth 0 + ( + 0, + req.repository_url.clone(), + req.base_branch.clone(), + req.target_branch.clone(), + req.merge_mode.clone(), + req.target_repo_path.clone(), + req.completion_action.clone(), + ) + }; + + let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default()); + + sqlx::query_as::<_, Task>( + r#" + INSERT INTO tasks ( + parent_task_id, depth, name, description, plan, priority, + repository_url, base_branch, target_branch, merge_mode, + target_repo_path, completion_action, continue_from_task_id, copy_files + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) + RETURNING * + "#, + ) + .bind(req.parent_task_id) + .bind(depth) + .bind(&req.name) + .bind(&req.description) + .bind(&req.plan) + .bind(req.priority) + .bind(&repo_url) + .bind(&base_branch) + .bind(&target_branch) + .bind(&merge_mode) + .bind(&target_repo_path) + .bind(&completion_action) + .bind(&req.continue_from_task_id) + .bind(©_files_json) + .fetch_one(pool) + .await +} + +/// Get a task by ID. +pub async fn get_task(pool: &PgPool, id: Uuid) -> Result<Option<Task>, sqlx::Error> { + sqlx::query_as::<_, Task>( + r#" + SELECT * + FROM tasks + WHERE id = $1 + "#, + ) + .bind(id) + .fetch_optional(pool) + .await +} + +/// List all top-level tasks (no parent), ordered by created_at DESC. +pub async fn list_tasks(pool: &PgPool) -> Result<Vec<TaskSummary>, sqlx::Error> { + sqlx::query_as::<_, TaskSummary>( + r#" + SELECT + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, t.version, t.created_at, t.updated_at, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count + FROM tasks t + WHERE t.parent_task_id IS NULL + ORDER BY t.priority DESC, t.created_at DESC + "#, + ) + .fetch_all(pool) + .await +} + +/// List subtasks of a parent task. +pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSummary>, sqlx::Error> { + sqlx::query_as::<_, TaskSummary>( + r#" + SELECT + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, t.version, t.created_at, t.updated_at, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count + FROM tasks t + WHERE t.parent_task_id = $1 + ORDER BY t.priority DESC, t.created_at DESC + "#, + ) + .bind(parent_id) + .fetch_all(pool) + .await +} + +/// Update a task by ID with optimistic locking. +pub async fn update_task( + pool: &PgPool, + id: Uuid, + req: UpdateTaskRequest, +) -> Result<Option<Task>, RepositoryError> { + // Get the existing task first + let existing = get_task(pool, id).await?; + let Some(existing) = existing else { + return Ok(None); + }; + + // Check version if provided (optimistic locking) + if let Some(expected_version) = req.version { + if existing.version != expected_version { + return Err(RepositoryError::VersionConflict { + expected: expected_version, + actual: existing.version, + }); + } + } + + // Apply updates + let name = req.name.unwrap_or(existing.name); + let description = req.description.or(existing.description); + let plan = req.plan.unwrap_or(existing.plan); + let status = req.status.unwrap_or(existing.status); + let priority = req.priority.unwrap_or(existing.priority); + let progress_summary = req.progress_summary.or(existing.progress_summary); + let last_output = req.last_output.or(existing.last_output); + let error_message = req.error_message.or(existing.error_message); + let merge_mode = req.merge_mode.or(existing.merge_mode); + let pr_url = req.pr_url.or(existing.pr_url); + let target_repo_path = req.target_repo_path.or(existing.target_repo_path); + let completion_action = req.completion_action.or(existing.completion_action); + // Handle clear_daemon_id: if true, set to NULL; otherwise use provided value or keep existing + let daemon_id = if req.clear_daemon_id { + None + } else { + req.daemon_id.or(existing.daemon_id) + }; + + // Update with version check in WHERE clause for race condition safety + let result = if req.version.is_some() { + sqlx::query_as::<_, Task>( + r#" + UPDATE tasks + SET name = $2, description = $3, plan = $4, status = $5, priority = $6, + progress_summary = $7, last_output = $8, error_message = $9, + merge_mode = $10, pr_url = $11, daemon_id = $12, + target_repo_path = $13, completion_action = $14, updated_at = NOW() + WHERE id = $1 AND version = $15 + RETURNING * + "#, + ) + .bind(id) + .bind(&name) + .bind(&description) + .bind(&plan) + .bind(&status) + .bind(priority) + .bind(&progress_summary) + .bind(&last_output) + .bind(&error_message) + .bind(&merge_mode) + .bind(&pr_url) + .bind(daemon_id) + .bind(&target_repo_path) + .bind(&completion_action) + .bind(req.version.unwrap()) + .fetch_optional(pool) + .await? + } else { + sqlx::query_as::<_, Task>( + r#" + UPDATE tasks + SET name = $2, description = $3, plan = $4, status = $5, priority = $6, + progress_summary = $7, last_output = $8, error_message = $9, + merge_mode = $10, pr_url = $11, daemon_id = $12, + target_repo_path = $13, completion_action = $14, updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(id) + .bind(&name) + .bind(&description) + .bind(&plan) + .bind(&status) + .bind(priority) + .bind(&progress_summary) + .bind(&last_output) + .bind(&error_message) + .bind(&merge_mode) + .bind(&pr_url) + .bind(daemon_id) + .bind(&target_repo_path) + .bind(&completion_action) + .fetch_optional(pool) + .await? + }; + + // If versioned update returned None, there was a race condition + if result.is_none() && req.version.is_some() { + if let Some(current) = get_task(pool, id).await? { + return Err(RepositoryError::VersionConflict { + expected: req.version.unwrap(), + actual: current.version, + }); + } + } + + Ok(result) +} + +/// Delete a task by ID. +pub async fn delete_task(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + DELETE FROM tasks + WHERE id = $1 + "#, + ) + .bind(id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Count total tasks. +pub async fn count_tasks(pool: &PgPool) -> Result<i64, sqlx::Error> { + let result: (i64,) = sqlx::query_as( + "SELECT COUNT(*) FROM tasks WHERE parent_task_id IS NULL", + ) + .fetch_one(pool) + .await?; + + Ok(result.0) +} + +// ============================================================================= +// Owner-Scoped Task Functions +// ============================================================================= + +/// Create a new task for a specific owner. +pub async fn create_task_for_owner( + pool: &PgPool, + owner_id: Uuid, + req: CreateTaskRequest, +) -> Result<Task, sqlx::Error> { + // Calculate depth and inherit settings from parent if applicable + let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) = + if let Some(parent_id) = req.parent_task_id { + // Fetch parent task to get depth and inherit repo settings (must belong to same owner) + let parent = get_task_for_owner(pool, parent_id, owner_id).await? + .ok_or_else(|| sqlx::Error::RowNotFound)?; + + let new_depth = parent.depth + 1; + + // Validate max depth + if new_depth >= 2 { + return Err(sqlx::Error::Protocol(format!( + "Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.", + new_depth + ))); + } + + // Inherit repo settings if not provided + let repo_url = req.repository_url.clone().or(parent.repository_url); + let base_branch = req.base_branch.clone().or(parent.base_branch); + let target_branch = req.target_branch.clone().or(parent.target_branch); + let merge_mode = req.merge_mode.clone().or(parent.merge_mode); + let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path); + // NOTE: completion_action is NOT inherited - subtasks should not auto-merge. + // The orchestrator integrates subtask work from their worktrees. + let completion_action = req.completion_action.clone(); + + (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) + } else { + // Top-level task: depth 0 + ( + 0, + req.repository_url.clone(), + req.base_branch.clone(), + req.target_branch.clone(), + req.merge_mode.clone(), + req.target_repo_path.clone(), + req.completion_action.clone(), + ) + }; + + let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default()); + + sqlx::query_as::<_, Task>( + r#" + INSERT INTO tasks ( + owner_id, parent_task_id, depth, name, description, plan, priority, + repository_url, base_branch, target_branch, merge_mode, + target_repo_path, completion_action, continue_from_task_id, copy_files + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15) + RETURNING * + "#, + ) + .bind(owner_id) + .bind(req.parent_task_id) + .bind(depth) + .bind(&req.name) + .bind(&req.description) + .bind(&req.plan) + .bind(req.priority) + .bind(&repo_url) + .bind(&base_branch) + .bind(&target_branch) + .bind(&merge_mode) + .bind(&target_repo_path) + .bind(&completion_action) + .bind(&req.continue_from_task_id) + .bind(©_files_json) + .fetch_one(pool) + .await +} + +/// Get a task by ID, scoped to owner. +pub async fn get_task_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, +) -> Result<Option<Task>, sqlx::Error> { + sqlx::query_as::<_, Task>( + r#" + SELECT * + FROM tasks + WHERE id = $1 AND owner_id = $2 + "#, + ) + .bind(id) + .bind(owner_id) + .fetch_optional(pool) + .await +} + +/// List all top-level tasks (no parent) for an owner, ordered by created_at DESC. +pub async fn list_tasks_for_owner( + pool: &PgPool, + owner_id: Uuid, +) -> Result<Vec<TaskSummary>, sqlx::Error> { + sqlx::query_as::<_, TaskSummary>( + r#" + SELECT + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, t.version, t.created_at, t.updated_at, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count + FROM tasks t + WHERE t.owner_id = $1 AND t.parent_task_id IS NULL + ORDER BY t.priority DESC, t.created_at DESC + "#, + ) + .bind(owner_id) + .fetch_all(pool) + .await +} + +/// List subtasks of a parent task, scoped to owner. +pub async fn list_subtasks_for_owner( + pool: &PgPool, + parent_id: Uuid, + owner_id: Uuid, +) -> Result<Vec<TaskSummary>, sqlx::Error> { + sqlx::query_as::<_, TaskSummary>( + r#" + SELECT + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, t.version, t.created_at, t.updated_at, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count + FROM tasks t + WHERE t.owner_id = $1 AND t.parent_task_id = $2 + ORDER BY t.priority DESC, t.created_at DESC + "#, + ) + .bind(owner_id) + .bind(parent_id) + .fetch_all(pool) + .await +} + +/// Update a task by ID with optimistic locking, scoped to owner. +pub async fn update_task_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, + req: UpdateTaskRequest, +) -> Result<Option<Task>, RepositoryError> { + // Get the existing task first (scoped to owner) + let existing = get_task_for_owner(pool, id, owner_id).await?; + let Some(existing) = existing else { + return Ok(None); + }; + + // Check version if provided (optimistic locking) + if let Some(expected_version) = req.version { + if existing.version != expected_version { + return Err(RepositoryError::VersionConflict { + expected: expected_version, + actual: existing.version, + }); + } + } + + // Apply updates + let name = req.name.unwrap_or(existing.name); + let description = req.description.or(existing.description); + let plan = req.plan.unwrap_or(existing.plan); + let status = req.status.unwrap_or(existing.status); + let priority = req.priority.unwrap_or(existing.priority); + let progress_summary = req.progress_summary.or(existing.progress_summary); + let last_output = req.last_output.or(existing.last_output); + let error_message = req.error_message.or(existing.error_message); + let merge_mode = req.merge_mode.or(existing.merge_mode); + let pr_url = req.pr_url.or(existing.pr_url); + let target_repo_path = req.target_repo_path.or(existing.target_repo_path); + let completion_action = req.completion_action.or(existing.completion_action); + let daemon_id = if req.clear_daemon_id { + None + } else { + req.daemon_id.or(existing.daemon_id) + }; + + // Update with version check in WHERE clause for race condition safety + let result = if req.version.is_some() { + sqlx::query_as::<_, Task>( + r#" + UPDATE tasks + SET name = $3, description = $4, plan = $5, status = $6, priority = $7, + progress_summary = $8, last_output = $9, error_message = $10, + merge_mode = $11, pr_url = $12, daemon_id = $13, + target_repo_path = $14, completion_action = $15, updated_at = NOW() + WHERE id = $1 AND owner_id = $2 AND version = $16 + RETURNING * + "#, + ) + .bind(id) + .bind(owner_id) + .bind(&name) + .bind(&description) + .bind(&plan) + .bind(&status) + .bind(priority) + .bind(&progress_summary) + .bind(&last_output) + .bind(&error_message) + .bind(&merge_mode) + .bind(&pr_url) + .bind(daemon_id) + .bind(&target_repo_path) + .bind(&completion_action) + .bind(req.version.unwrap()) + .fetch_optional(pool) + .await? + } else { + sqlx::query_as::<_, Task>( + r#" + UPDATE tasks + SET name = $3, description = $4, plan = $5, status = $6, priority = $7, + progress_summary = $8, last_output = $9, error_message = $10, + merge_mode = $11, pr_url = $12, daemon_id = $13, + target_repo_path = $14, completion_action = $15, updated_at = NOW() + WHERE id = $1 AND owner_id = $2 + RETURNING * + "#, + ) + .bind(id) + .bind(owner_id) + .bind(&name) + .bind(&description) + .bind(&plan) + .bind(&status) + .bind(priority) + .bind(&progress_summary) + .bind(&last_output) + .bind(&error_message) + .bind(&merge_mode) + .bind(&pr_url) + .bind(daemon_id) + .bind(&target_repo_path) + .bind(&completion_action) + .fetch_optional(pool) + .await? + }; + + // If versioned update returned None, there was a race condition + if result.is_none() && req.version.is_some() { + if let Some(current) = get_task_for_owner(pool, id, owner_id).await? { + return Err(RepositoryError::VersionConflict { + expected: req.version.unwrap(), + actual: current.version, + }); + } + } + + Ok(result) +} + +/// Delete a task by ID, scoped to owner. +pub async fn delete_task_for_owner( + pool: &PgPool, + id: Uuid, + owner_id: Uuid, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + DELETE FROM tasks + WHERE id = $1 AND owner_id = $2 + "#, + ) + .bind(id) + .bind(owner_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Update task status and record event. +pub async fn update_task_status( + pool: &PgPool, + id: Uuid, + new_status: &str, + event_data: Option<serde_json::Value>, +) -> Result<Option<Task>, sqlx::Error> { + // Get existing status + let existing = get_task(pool, id).await?; + let Some(existing) = existing else { + return Ok(None); + }; + + let previous_status = existing.status.clone(); + + // Update task status + let task = sqlx::query_as::<_, Task>( + r#" + UPDATE tasks + SET status = $2, updated_at = NOW(), + started_at = CASE WHEN $2 = 'running' AND started_at IS NULL THEN NOW() ELSE started_at END, + completed_at = CASE WHEN $2 IN ('done', 'failed', 'merged') THEN NOW() ELSE completed_at END + WHERE id = $1 + RETURNING * + "#, + ) + .bind(id) + .bind(new_status) + .fetch_optional(pool) + .await?; + + // Record event + if task.is_some() { + let _ = create_task_event( + pool, + id, + "status_change", + Some(&previous_status), + Some(new_status), + event_data, + ) + .await; + } + + Ok(task) +} + +// ============================================================================= +// Task Event Functions +// ============================================================================= + +/// Create a task event. +pub async fn create_task_event( + pool: &PgPool, + task_id: Uuid, + event_type: &str, + previous_status: Option<&str>, + new_status: Option<&str>, + event_data: Option<serde_json::Value>, +) -> Result<TaskEvent, sqlx::Error> { + sqlx::query_as::<_, TaskEvent>( + r#" + INSERT INTO task_events (task_id, event_type, previous_status, new_status, event_data) + VALUES ($1, $2, $3, $4, $5) + RETURNING * + "#, + ) + .bind(task_id) + .bind(event_type) + .bind(previous_status) + .bind(new_status) + .bind(event_data) + .fetch_one(pool) + .await +} + +/// List events for a task. +pub async fn list_task_events( + pool: &PgPool, + task_id: Uuid, + limit: Option<i64>, +) -> Result<Vec<TaskEvent>, sqlx::Error> { + let limit = limit.unwrap_or(100); + sqlx::query_as::<_, TaskEvent>( + r#" + SELECT * + FROM task_events + WHERE task_id = $1 + ORDER BY created_at DESC + LIMIT $2 + "#, + ) + .bind(task_id) + .bind(limit) + .fetch_all(pool) + .await +} + +// ============================================================================= +// Daemon Functions +// ============================================================================= + +/// Register a new daemon connection. +pub async fn register_daemon( + pool: &PgPool, + owner_id: Uuid, + connection_id: &str, + hostname: Option<&str>, + machine_id: Option<&str>, + max_concurrent_tasks: i32, +) -> Result<Daemon, sqlx::Error> { + sqlx::query_as::<_, Daemon>( + r#" + INSERT INTO daemons (owner_id, connection_id, hostname, machine_id, max_concurrent_tasks) + VALUES ($1, $2, $3, $4, $5) + RETURNING * + "#, + ) + .bind(owner_id) + .bind(connection_id) + .bind(hostname) + .bind(machine_id) + .bind(max_concurrent_tasks) + .fetch_one(pool) + .await +} + +/// Get a daemon by ID. +pub async fn get_daemon(pool: &PgPool, id: Uuid) -> Result<Option<Daemon>, sqlx::Error> { + sqlx::query_as::<_, Daemon>( + r#" + SELECT * + FROM daemons + WHERE id = $1 + "#, + ) + .bind(id) + .fetch_optional(pool) + .await +} + +/// Get a daemon by connection ID. +pub async fn get_daemon_by_connection( + pool: &PgPool, + connection_id: &str, +) -> Result<Option<Daemon>, sqlx::Error> { + sqlx::query_as::<_, Daemon>( + r#" + SELECT * + FROM daemons + WHERE connection_id = $1 + "#, + ) + .bind(connection_id) + .fetch_optional(pool) + .await +} + +/// List all daemons. +pub async fn list_daemons(pool: &PgPool) -> Result<Vec<Daemon>, sqlx::Error> { + sqlx::query_as::<_, Daemon>( + r#" + SELECT * + FROM daemons + ORDER BY connected_at DESC + "#, + ) + .fetch_all(pool) + .await +} + +/// List daemons for a specific owner. +pub async fn list_daemons_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<Daemon>, sqlx::Error> { + sqlx::query_as::<_, Daemon>( + r#" + SELECT * + FROM daemons + WHERE owner_id = $1 + ORDER BY connected_at DESC + "#, + ) + .bind(owner_id) + .fetch_all(pool) + .await +} + +/// Get a daemon by ID for a specific owner. +pub async fn get_daemon_for_owner(pool: &PgPool, id: Uuid, owner_id: Uuid) -> Result<Option<Daemon>, sqlx::Error> { + sqlx::query_as::<_, Daemon>( + r#" + SELECT * + FROM daemons + WHERE id = $1 AND owner_id = $2 + "#, + ) + .bind(id) + .bind(owner_id) + .fetch_optional(pool) + .await +} + +/// Update daemon heartbeat. +pub async fn update_daemon_heartbeat(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + UPDATE daemons + SET last_heartbeat_at = NOW(), status = 'connected' + WHERE id = $1 + "#, + ) + .bind(id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Update daemon status. +pub async fn update_daemon_status( + pool: &PgPool, + id: Uuid, + status: &str, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + UPDATE daemons + SET status = $2, + disconnected_at = CASE WHEN $2 = 'disconnected' THEN NOW() ELSE disconnected_at END + WHERE id = $1 + "#, + ) + .bind(id) + .bind(status) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Update daemon task count. +pub async fn update_daemon_task_count( + pool: &PgPool, + id: Uuid, + delta: i32, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + UPDATE daemons + SET current_task_count = GREATEST(0, current_task_count + $2) + WHERE id = $1 + "#, + ) + .bind(id) + .bind(delta) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Delete a daemon by ID. +pub async fn delete_daemon(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + DELETE FROM daemons + WHERE id = $1 + "#, + ) + .bind(id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Delete a daemon by connection ID. +pub async fn delete_daemon_by_connection( + pool: &PgPool, + connection_id: &str, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + DELETE FROM daemons + WHERE connection_id = $1 + "#, + ) + .bind(connection_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Count connected daemons. +pub async fn count_daemons(pool: &PgPool) -> Result<i64, sqlx::Error> { + let result: (i64,) = sqlx::query_as( + "SELECT COUNT(*) FROM daemons WHERE status = 'connected'", + ) + .fetch_one(pool) + .await?; + + Ok(result.0) +} + +// ============================================================================= +// Sibling Awareness Functions +// ============================================================================= + +/// List sibling tasks (tasks with the same parent, excluding the given task). +pub async fn list_sibling_tasks( + pool: &PgPool, + task_id: Uuid, + parent_id: Option<Uuid>, +) -> Result<Vec<TaskSummary>, sqlx::Error> { + match parent_id { + Some(parent) => { + sqlx::query_as::<_, TaskSummary>( + r#" + SELECT + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, t.version, t.created_at, t.updated_at, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count + FROM tasks t + WHERE t.parent_task_id = $1 AND t.id != $2 + ORDER BY t.priority DESC, t.created_at DESC + "#, + ) + .bind(parent) + .bind(task_id) + .fetch_all(pool) + .await + } + None => { + // Top-level tasks (no parent) - siblings are other top-level tasks + sqlx::query_as::<_, TaskSummary>( + r#" + SELECT + t.id, t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, t.version, t.created_at, t.updated_at, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count + FROM tasks t + WHERE t.parent_task_id IS NULL AND t.id != $1 + ORDER BY t.priority DESC, t.created_at DESC + "#, + ) + .bind(task_id) + .fetch_all(pool) + .await + } + } +} + +/// Get running sibling tasks (for context injection). +pub async fn get_running_siblings( + pool: &PgPool, + owner_id: Uuid, + task_id: Uuid, + parent_id: Option<Uuid>, +) -> Result<Vec<Task>, sqlx::Error> { + match parent_id { + Some(parent) => { + sqlx::query_as::<_, Task>( + r#" + SELECT * + FROM tasks t + WHERE t.owner_id = $1 + AND t.parent_task_id = $2 + AND t.id != $3 + AND t.status = 'running' + ORDER BY t.priority DESC + "#, + ) + .bind(owner_id) + .bind(parent) + .bind(task_id) + .fetch_all(pool) + .await + } + None => { + sqlx::query_as::<_, Task>( + r#" + SELECT * + FROM tasks t + WHERE t.owner_id = $1 + AND t.parent_task_id IS NULL + AND t.id != $2 + AND t.status = 'running' + ORDER BY t.priority DESC + "#, + ) + .bind(owner_id) + .bind(task_id) + .fetch_all(pool) + .await + } + } +} + +/// Get task with its siblings for context awareness. +pub async fn get_task_with_siblings( + pool: &PgPool, + id: Uuid, +) -> Result<Option<(Task, Vec<TaskSummary>)>, sqlx::Error> { + let task = get_task(pool, id).await?; + let Some(task) = task else { + return Ok(None); + }; + + let siblings = list_sibling_tasks(pool, id, task.parent_task_id).await?; + Ok(Some((task, siblings))) +} + +// ============================================================================= +// Task Output Persistence Functions +// ============================================================================= + +/// Save task output to the database. +/// This stores output in the task_events table with event_type='output'. +pub async fn save_task_output( + pool: &PgPool, + task_id: Uuid, + message_type: &str, + content: &str, + tool_name: Option<&str>, + tool_input: Option<serde_json::Value>, + is_error: Option<bool>, + cost_usd: Option<f64>, + duration_ms: Option<u64>, +) -> Result<TaskEvent, sqlx::Error> { + let event_data = serde_json::json!({ + "messageType": message_type, + "content": content, + "toolName": tool_name, + "toolInput": tool_input, + "isError": is_error, + "costUsd": cost_usd, + "durationMs": duration_ms, + }); + + create_task_event(pool, task_id, "output", None, None, Some(event_data)).await +} + +/// Get task output from the database. +/// Retrieves all output events for a task, ordered by creation time. +pub async fn get_task_output( + pool: &PgPool, + task_id: Uuid, + limit: Option<i64>, +) -> Result<Vec<TaskEvent>, sqlx::Error> { + let limit = limit.unwrap_or(1000); + sqlx::query_as::<_, TaskEvent>( + r#" + SELECT * + FROM task_events + WHERE task_id = $1 AND event_type = 'output' + ORDER BY created_at ASC + LIMIT $2 + "#, + ) + .bind(task_id) + .bind(limit) + .fetch_all(pool) + .await +} + +/// Update task completion status with error message. +/// Sets the task status to 'done' or 'failed' and records completion time. +pub async fn complete_task( + pool: &PgPool, + task_id: Uuid, + success: bool, + error_message: Option<&str>, +) -> Result<Option<Task>, sqlx::Error> { + let status = if success { "done" } else { "failed" }; + + let task = sqlx::query_as::<_, Task>( + r#" + UPDATE tasks + SET status = $2, + error_message = COALESCE($3, error_message), + completed_at = NOW(), + updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(task_id) + .bind(status) + .bind(error_message) + .fetch_optional(pool) + .await?; + + // Record completion event + if task.is_some() { + let event_data = serde_json::json!({ + "success": success, + "errorMessage": error_message, + }); + let _ = create_task_event( + pool, + task_id, + "complete", + Some("running"), + Some(status), + Some(event_data), + ) + .await; + } + + Ok(task) +} + +// ============================================================================= +// Mesh Chat History Functions +// ============================================================================= + +/// Get or create the active conversation for an owner. +pub async fn get_or_create_active_conversation( + pool: &PgPool, + owner_id: Uuid, +) -> Result<MeshChatConversation, sqlx::Error> { + // Try to get existing active conversation for this owner + let existing = sqlx::query_as::<_, MeshChatConversation>( + r#" + SELECT * + FROM mesh_chat_conversations + WHERE is_active = true AND owner_id = $1 + LIMIT 1 + "#, + ) + .bind(owner_id) + .fetch_optional(pool) + .await?; + + if let Some(conv) = existing { + return Ok(conv); + } + + // Create new conversation + sqlx::query_as::<_, MeshChatConversation>( + r#" + INSERT INTO mesh_chat_conversations (owner_id, is_active) + VALUES ($1, true) + RETURNING * + "#, + ) + .bind(owner_id) + .fetch_one(pool) + .await +} + +/// List messages for a conversation. +pub async fn list_chat_messages( + pool: &PgPool, + conversation_id: Uuid, + limit: Option<i32>, +) -> Result<Vec<MeshChatMessageRecord>, sqlx::Error> { + let limit = limit.unwrap_or(100); + sqlx::query_as::<_, MeshChatMessageRecord>( + r#" + SELECT * + FROM mesh_chat_messages + WHERE conversation_id = $1 + ORDER BY created_at ASC + LIMIT $2 + "#, + ) + .bind(conversation_id) + .bind(limit) + .fetch_all(pool) + .await +} + +/// Add a message to a conversation. +#[allow(clippy::too_many_arguments)] +pub async fn add_chat_message( + pool: &PgPool, + conversation_id: Uuid, + role: &str, + content: &str, + context_type: &str, + context_task_id: Option<Uuid>, + tool_calls: Option<serde_json::Value>, + pending_questions: Option<serde_json::Value>, +) -> Result<MeshChatMessageRecord, sqlx::Error> { + sqlx::query_as::<_, MeshChatMessageRecord>( + r#" + INSERT INTO mesh_chat_messages + (conversation_id, role, content, context_type, context_task_id, tool_calls, pending_questions) + VALUES ($1, $2, $3, $4, $5, $6, $7) + RETURNING * + "#, + ) + .bind(conversation_id) + .bind(role) + .bind(content) + .bind(context_type) + .bind(context_task_id) + .bind(tool_calls) + .bind(pending_questions) + .fetch_one(pool) + .await +} + +/// Clear conversation (archive existing and create new). +pub async fn clear_conversation(pool: &PgPool, owner_id: Uuid) -> Result<MeshChatConversation, sqlx::Error> { + // Mark existing as inactive for this owner + sqlx::query( + r#" + UPDATE mesh_chat_conversations + SET is_active = false, updated_at = NOW() + WHERE is_active = true AND owner_id = $1 + "#, + ) + .bind(owner_id) + .execute(pool) + .await?; + + // Create new active conversation + get_or_create_active_conversation(pool, owner_id).await +} |
