//! Application state holding shared ML models and database pool. use std::sync::Arc; use dashmap::DashMap; use sqlx::PgPool; use tokio::sync::{broadcast, mpsc, oneshot, Mutex, OnceCell}; use uuid::Uuid; use crate::listen::{DiarizationConfig, ParakeetEOU, ParakeetTDT, Sortformer}; use crate::server::auth::{AuthConfig, JwtVerifier}; use crate::tts::TtsEngine; /// Notification payload for file updates (broadcast to WebSocket subscribers). #[derive(Debug, Clone)] pub struct FileUpdateNotification { /// ID of the updated file pub file_id: Uuid, /// New version number after update pub version: i32, /// List of fields that were updated pub updated_fields: Vec, /// Source of the update: "user", "llm", or "system" pub updated_by: String, } // ============================================================================= // Task/Mesh Notifications // ============================================================================= /// Notification payload for task updates (broadcast to WebSocket subscribers). #[derive(Debug, Clone)] pub struct TaskUpdateNotification { /// ID of the updated task pub task_id: Uuid, /// Owner ID for data isolation (notifications are scoped to owner) pub owner_id: Option, /// New version number after update pub version: i32, /// Current task status pub status: String, /// List of fields that were updated pub updated_fields: Vec, /// Source of the update: "user", "daemon", or "system" pub updated_by: String, } /// Notification for streaming task output from Claude Code containers. #[derive(Debug, Clone, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct TaskOutputNotification { /// ID of the task producing output pub task_id: Uuid, /// Owner ID for data isolation (notifications are scoped to owner) #[serde(skip)] pub owner_id: Option, /// Type of message: "assistant", "tool_use", "tool_result", "result", "system", "error", "raw" pub message_type: String, /// Main text content of the message pub content: String, /// Tool name if this is a tool_use message #[serde(skip_serializing_if = "Option::is_none")] pub tool_name: Option, /// Tool input (JSON) if this is a tool_use message #[serde(skip_serializing_if = "Option::is_none")] pub tool_input: Option, /// Whether tool result was an error #[serde(skip_serializing_if = "Option::is_none")] pub is_error: Option, /// Cost in USD if this is a result message #[serde(skip_serializing_if = "Option::is_none")] pub cost_usd: Option, /// Duration in milliseconds if this is a result message #[serde(skip_serializing_if = "Option::is_none")] pub duration_ms: Option, /// Whether this is a partial line (more coming) or complete pub is_partial: bool, } /// Notification for task completion events (for supervisor tasks to monitor). #[derive(Debug, Clone, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct TaskCompletionNotification { /// ID of the completed task pub task_id: Uuid, /// Owner ID for data isolation #[serde(skip)] pub owner_id: Option, /// Contract ID if task belongs to a contract #[serde(skip_serializing_if = "Option::is_none")] pub contract_id: Option, /// Parent task ID (to notify parent/supervisor) #[serde(skip_serializing_if = "Option::is_none")] pub parent_task_id: Option, /// Final status: "done", "failed", etc. pub status: String, /// Summary of task output/results #[serde(skip_serializing_if = "Option::is_none")] pub output_summary: Option, /// Path to the task's worktree (for reading files) #[serde(skip_serializing_if = "Option::is_none")] pub worktree_path: Option, /// Error message if task failed #[serde(skip_serializing_if = "Option::is_none")] pub error_message: Option, } /// Notification for merge operation results. #[derive(Debug, Clone)] pub struct MergeResultNotification { /// ID of the task that was merged pub task_id: Uuid, /// Whether the merge succeeded pub success: bool, /// Message describing the result pub message: String, /// Commit SHA if merge succeeded pub commit_sha: Option, /// List of conflicting files if merge failed due to conflicts pub conflicts: Option>, } /// Notification for PR creation results. #[derive(Debug, Clone)] pub struct PrResultNotification { /// ID of the task for which PR was created pub task_id: Uuid, /// Whether the PR creation succeeded pub success: bool, /// Message describing the result pub message: String, /// PR URL if creation succeeded pub pr_url: Option, /// PR number if creation succeeded pub pr_number: Option, } /// Notification for supervisor questions requiring user feedback. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct SupervisorQuestionNotification { /// Unique ID for this question pub question_id: Uuid, /// Supervisor task that asked the question pub task_id: Uuid, /// Contract this question relates to (Uuid::nil() for directive context) pub contract_id: Uuid, /// Directive this question relates to (if from a directive task) #[serde(skip_serializing_if = "Option::is_none")] pub directive_id: Option, /// Owner ID for data isolation #[serde(skip)] pub owner_id: Option, /// The question text pub question: String, /// Optional choices for the user (if empty, free-form text response) #[serde(default, skip_serializing_if = "Vec::is_empty")] pub choices: Vec, /// Context about what phase/action this relates to #[serde(skip_serializing_if = "Option::is_none")] pub context: Option, /// Whether this question is still pending pub pending: bool, /// When the question was asked pub created_at: chrono::DateTime, /// Whether multiple choices can be selected #[serde(default)] pub multi_select: bool, } /// Stored supervisor question for persistence #[derive(Debug, Clone)] pub struct PendingSupervisorQuestion { pub question_id: Uuid, pub task_id: Uuid, pub contract_id: Uuid, /// Directive this question relates to (if from a directive task) pub directive_id: Option, pub owner_id: Uuid, pub question: String, pub choices: Vec, pub context: Option, pub created_at: chrono::DateTime, /// Whether multiple choices can be selected pub multi_select: bool, /// Question type: general, phase_confirmation, or contract_complete pub question_type: String, } /// Response to a supervisor question #[derive(Debug, Clone)] pub struct SupervisorQuestionResponse { pub question_id: Uuid, pub response: String, pub responded_at: chrono::DateTime, } /// Worktree diff response from daemon #[derive(Debug, Clone, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct WorktreeDiffResponse { pub task_id: Uuid, pub success: bool, pub diff: String, pub error: Option, } /// Worktree info response from daemon #[derive(Debug, Clone, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct WorktreeInfoResponse { pub task_id: Uuid, pub success: bool, pub worktree_path: Option, pub exists: bool, pub files_changed: i32, pub insertions: i32, pub deletions: i32, pub files: Option, pub branch: Option, pub head_sha: Option, pub error: Option, } /// Task diff result from daemon #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct TaskDiffResult { pub task_id: Uuid, pub success: bool, pub diff: Option, pub error: Option, } /// Worktree commit response from daemon #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct WorktreeCommitResponse { pub task_id: Uuid, pub success: bool, pub commit_sha: Option, pub error: Option, } /// Command sent from server to daemon. #[derive(Debug, Clone, serde::Serialize)] #[serde(tag = "type", rename_all = "camelCase")] pub enum DaemonCommand { /// Confirm successful authentication Authenticated { #[serde(rename = "daemonId")] daemon_id: Uuid, }, /// Spawn a new task in a container SpawnTask { #[serde(rename = "taskId")] task_id: Uuid, /// Human-readable task name (used for commit messages) #[serde(rename = "taskName")] task_name: String, plan: String, #[serde(rename = "repoUrl")] repo_url: Option, #[serde(rename = "baseBranch")] base_branch: Option, /// Target branch to merge into (used for completion actions) #[serde(rename = "targetBranch")] target_branch: Option, /// Parent task ID if this is a subtask #[serde(rename = "parentTaskId")] parent_task_id: Option, /// Depth in task hierarchy (0=top-level, 1=subtask, 2=sub-subtask) depth: i32, /// Whether this task should run as an orchestrator (true if depth==0 and has subtasks) #[serde(rename = "isOrchestrator")] is_orchestrator: bool, /// Path to user's local repository (outside ~/.makima) for completion actions #[serde(rename = "targetRepoPath")] target_repo_path: Option, /// Action on completion: "none", "branch", "merge", "pr" #[serde(rename = "completionAction")] completion_action: Option, /// Task ID to continue from (copy worktree from this task) #[serde(rename = "continueFromTaskId")] continue_from_task_id: Option, /// Files to copy from parent task's worktree #[serde(rename = "copyFiles")] copy_files: Option>, /// Contract ID if this task is associated with a contract #[serde(rename = "contractId")] contract_id: Option, /// Whether this task is a supervisor (long-running contract orchestrator) #[serde(rename = "isSupervisor")] is_supervisor: bool, /// Whether to run in autonomous loop mode #[serde(rename = "autonomousLoop", default)] autonomous_loop: bool, /// Whether to resume from a previous session using --continue flag #[serde(rename = "resumeSession", default)] resume_session: bool, /// Conversation history for fallback when worktree doesn't exist #[serde(rename = "conversationHistory", default)] conversation_history: Option, /// Base64-encoded gzip-compressed patch for worktree recovery #[serde(rename = "patchData", default, skip_serializing_if = "Option::is_none")] patch_data: Option, /// Commit SHA to apply the patch on top of #[serde(rename = "patchBaseSha", default, skip_serializing_if = "Option::is_none")] patch_base_sha: Option, /// Whether the contract is in local-only mode (skips automatic completion actions) #[serde(rename = "localOnly", default)] local_only: bool, /// Whether to auto-merge to target branch locally when local_only mode is enabled #[serde(rename = "autoMergeLocal", default)] auto_merge_local: bool, /// Task ID to share worktree with (supervisor's task ID). If Some, use that task's worktree instead of creating a new one. #[serde(rename = "supervisorWorktreeTaskId", default, skip_serializing_if = "Option::is_none")] supervisor_worktree_task_id: Option, /// Directive ID if this task is associated with a directive #[serde(rename = "directiveId", default, skip_serializing_if = "Option::is_none")] directive_id: Option, }, /// Pause a running task PauseTask { #[serde(rename = "taskId")] task_id: Uuid, }, /// Resume a paused task ResumeTask { #[serde(rename = "taskId")] task_id: Uuid, }, /// Interrupt a task (gracefully or forced) InterruptTask { #[serde(rename = "taskId")] task_id: Uuid, graceful: bool, }, /// Send a message to a running task SendMessage { #[serde(rename = "taskId")] task_id: Uuid, message: String, }, /// Inject context about sibling task progress InjectSiblingContext { #[serde(rename = "taskId")] task_id: Uuid, #[serde(rename = "siblingTaskId")] sibling_task_id: Uuid, #[serde(rename = "siblingName")] sibling_name: String, #[serde(rename = "siblingStatus")] sibling_status: String, #[serde(rename = "progressSummary")] progress_summary: Option, #[serde(rename = "changedFiles")] changed_files: Vec, }, // ========================================================================= // Merge Commands (for orchestrators to merge subtask branches) // ========================================================================= /// List all subtask branches for a task ListBranches { #[serde(rename = "taskId")] task_id: Uuid, }, /// Start merging a subtask branch MergeStart { #[serde(rename = "taskId")] task_id: Uuid, #[serde(rename = "sourceBranch")] source_branch: String, }, /// Get current merge status MergeStatus { #[serde(rename = "taskId")] task_id: Uuid, }, /// Resolve a merge conflict MergeResolve { #[serde(rename = "taskId")] task_id: Uuid, file: String, /// "ours" or "theirs" strategy: String, }, /// Commit the current merge MergeCommit { #[serde(rename = "taskId")] task_id: Uuid, message: String, }, /// Abort the current merge MergeAbort { #[serde(rename = "taskId")] task_id: Uuid, }, /// Skip merging a subtask branch (mark as intentionally not merged) MergeSkip { #[serde(rename = "taskId")] task_id: Uuid, #[serde(rename = "subtaskId")] subtask_id: Uuid, reason: String, }, /// Check if all subtask branches have been merged or skipped (completion gate) CheckMergeComplete { #[serde(rename = "taskId")] task_id: Uuid, }, // ========================================================================= // Completion Action Commands // ========================================================================= /// Retry a completion action for a completed task RetryCompletionAction { #[serde(rename = "taskId")] task_id: Uuid, /// Human-readable task name (used for commit messages) #[serde(rename = "taskName")] task_name: String, /// The action to execute: "branch", "merge", or "pr" action: String, /// Path to the target repository #[serde(rename = "targetRepoPath")] target_repo_path: String, /// Target branch to merge into (for merge/pr actions) #[serde(rename = "targetBranch")] target_branch: Option, }, /// Clone worktree to a target directory CloneWorktree { #[serde(rename = "taskId")] task_id: Uuid, /// Path to the target directory #[serde(rename = "targetDir")] target_dir: String, }, /// Check if a target directory exists CheckTargetExists { #[serde(rename = "taskId")] task_id: Uuid, /// Path to check #[serde(rename = "targetDir")] target_dir: String, }, // ========================================================================= // Contract File Commands // ========================================================================= /// Read a file from a repository linked to a contract ReadRepoFile { /// Request ID for correlating response #[serde(rename = "requestId")] request_id: Uuid, /// Contract ID (used for logging/context) #[serde(rename = "contractId")] contract_id: Uuid, /// Path to the file within the repository #[serde(rename = "filePath")] file_path: String, /// Full repository path on daemon's filesystem #[serde(rename = "repoPath")] repo_path: String, }, // ========================================================================= // Supervisor Git Commands // ========================================================================= /// Create a new branch in a task's worktree CreateBranch { #[serde(rename = "taskId")] task_id: Uuid, #[serde(rename = "branchName")] branch_name: String, /// Optional reference to create branch from (task_id or SHA) #[serde(rename = "fromRef")] from_ref: Option, }, /// Merge a task's changes to a target branch MergeTaskToTarget { #[serde(rename = "taskId")] task_id: Uuid, /// Target branch to merge into (default: task's base branch) #[serde(rename = "targetBranch")] target_branch: Option, /// Whether to squash commits squash: bool, }, /// Create a pull request for a task's changes CreatePR { #[serde(rename = "taskId")] task_id: Uuid, title: String, body: Option, /// Base branch for the PR. If None, will be auto-detected from the repo. #[serde(rename = "baseBranch")] base_branch: Option, /// Source branch name to push and create PR from branch: String, }, /// Get the diff for a task's changes GetTaskDiff { #[serde(rename = "taskId")] task_id: Uuid, }, /// Get worktree information (files, stats, branch) for a task GetWorktreeInfo { #[serde(rename = "taskId")] task_id: Uuid, }, /// Commit changes in a task worktree CommitWorktree { #[serde(rename = "taskId")] task_id: Uuid, message: Option, }, /// Create a git checkpoint (stage changes, commit, record stats) CreateCheckpoint { #[serde(rename = "taskId")] task_id: Uuid, /// Commit message for the checkpoint message: String, }, /// Clean up a task's worktree (used when contract is completed/deleted) CleanupWorktree { #[serde(rename = "taskId")] task_id: Uuid, /// Whether to delete the associated branch #[serde(rename = "deleteBranch")] delete_branch: bool, }, /// Inherit git config (user.email, user.name) from a directory InheritGitConfig { /// Directory to read git config from (defaults to daemon's working directory) #[serde(rename = "sourceDir")] source_dir: Option, }, /// Error response Error { code: String, message: String }, /// Restart the daemon process RestartDaemon, /// Trigger OAuth re-authentication on this daemon TriggerReauth { #[serde(rename = "requestId")] request_id: Uuid, }, /// Submit auth code for pending reauth SubmitAuthCode { #[serde(rename = "requestId")] request_id: Uuid, code: String, }, /// Apply a patch to a task's worktree (for cross-daemon merge). /// Sent by server when routing MergePatchToSupervisor to the supervisor's daemon. ApplyPatchToWorktree { /// Target task whose worktree should be patched. #[serde(rename = "targetTaskId")] target_task_id: Uuid, /// Source task that generated the patch (for logging). #[serde(rename = "sourceTaskId")] source_task_id: Uuid, /// Base64-gzipped patch data. #[serde(rename = "patchData")] patch_data: String, /// Base commit SHA for the patch. #[serde(rename = "baseSha")] base_sha: String, }, } /// Active daemon connection info stored in state. #[derive(Debug)] pub struct DaemonConnectionInfo { /// Database ID of the daemon pub id: Uuid, /// Owner ID for data isolation (from API key authentication) pub owner_id: Uuid, /// WebSocket connection identifier pub connection_id: String, /// Daemon hostname pub hostname: Option, /// Machine identifier pub machine_id: Option, /// Channel to send commands to this daemon pub command_sender: mpsc::Sender, /// Current working directory of the daemon pub working_directory: Option, /// Path to ~/.makima/home directory on daemon (for cloning completed work) pub home_directory: Option, /// Path to worktrees directory (~/.makima/worktrees) on daemon pub worktrees_directory: Option, } /// Status of a daemon reauth request (stored in state for polling). #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct DaemonReauthStatus { pub request_id: Uuid, pub status: String, pub login_url: Option, pub error: Option, } /// Configuration paths for ML models (used for lazy loading). #[derive(Clone)] pub struct ModelConfig { pub parakeet_model_dir: String, pub parakeet_eou_dir: String, pub sortformer_model_path: String, pub chatterbox_model_dir: String, } /// Lazily-loaded ML models. pub struct MlModels { pub parakeet: Mutex, pub parakeet_eou: Mutex, pub sortformer: Mutex, } /// Shared application state containing ML models and database pool. /// /// Models are lazily loaded on first use to speed up server startup. pub struct AppState { /// ML model configuration (paths for lazy loading) pub model_config: Option, /// Lazily-loaded ML models (initialized on first Listen connection) pub ml_models: OnceCell, /// Optional database connection pool pub db_pool: Option, /// Broadcast channel for file update notifications pub file_updates: broadcast::Sender, /// Broadcast channel for task update notifications pub task_updates: broadcast::Sender, /// Broadcast channel for task output streaming pub task_output: broadcast::Sender, /// Broadcast channel for task completion notifications (for supervisors) pub task_completions: broadcast::Sender, /// Broadcast channel for supervisor question notifications pub supervisor_questions: broadcast::Sender, /// Broadcast channel for merge result notifications pub merge_results: broadcast::Sender, /// Broadcast channel for PR creation result notifications pub pr_results: broadcast::Sender, /// Pending supervisor questions awaiting user response (keyed by question_id) pub pending_questions: DashMap, /// Responses to supervisor questions (keyed by question_id) pub question_responses: DashMap, /// Active daemon connections (keyed by connection_id) pub daemon_connections: DashMap, /// Tool keys for orchestrator API access (key -> task_id) pub tool_keys: DashMap, /// JWT verifier for Supabase authentication (None if not configured) pub jwt_verifier: Option, /// Pending worktree info requests awaiting daemon response (keyed by task_id) pub pending_worktree_info: DashMap>, /// Pending task diff requests awaiting daemon response (keyed by task_id) pub pending_task_diff: DashMap>, /// Pending worktree diff requests awaiting daemon response (keyed by task_id) pub pending_worktree_diff: DashMap>, /// Pending worktree commit requests awaiting daemon response (keyed by task_id) pub pending_worktree_commit: DashMap>, /// Lazily-loaded TTS engine (initialized on first Speak connection) pub tts_engine: OnceCell>, /// Daemon reauth status storage (keyed by (daemon_id, request_id)) pub daemon_reauth_status: DashMap<(Uuid, Uuid), DaemonReauthStatus>, /// Signal used to nudge the directive reconciler to run a tick immediately /// (e.g. after a goal update) rather than waiting up to 15s for the next /// interval. The reconciler loop in `server::mod` awaits `notified()` in /// parallel with its interval; handlers call `kick_directive_reconciler()`. pub directive_kick: std::sync::Arc, } impl AppState { /// Create AppState WITHOUT ML model configuration. Listen and Speak /// endpoints will return "not configured" errors if used; everything /// else (mesh, directives, files, contracts-free CRUD) works normally. /// This is the constructor used by the slim Dockerfile. pub fn new_slim() -> Self { Self::new_inner(None) } /// Create AppState with ML model configuration for lazy loading. /// Pass None to disable a specific model family — Listen needs all /// three of parakeet/parakeet_eou/sortformer; Speak needs chatterbox. /// If `parakeet_model_dir` is None we skip the whole ModelConfig and /// behave like `new_slim()`. /// /// Models are NOT loaded at startup — they're loaded on first use. pub fn new( parakeet_model_dir: &str, parakeet_eou_dir: &str, sortformer_model_path: &str, chatterbox_model_dir: &str, ) -> Self { Self::new_inner(Some(ModelConfig { parakeet_model_dir: parakeet_model_dir.to_string(), parakeet_eou_dir: parakeet_eou_dir.to_string(), sortformer_model_path: sortformer_model_path.to_string(), chatterbox_model_dir: chatterbox_model_dir.to_string(), })) } /// Internal constructor — model_config can be None for the slim build. fn new_inner(model_config: Option) -> Self { // Create broadcast channels with buffer for 256 messages let (file_updates, _) = broadcast::channel(256); let (task_updates, _) = broadcast::channel(256); let (task_output, _) = broadcast::channel(1024); // Larger buffer for output streaming let (task_completions, _) = broadcast::channel(256); // For supervisor task monitoring let (supervisor_questions, _) = broadcast::channel(256); // For supervisor questions to users let (merge_results, _) = broadcast::channel(256); // For merge operation results let (pr_results, _) = broadcast::channel(256); // For PR creation results // Initialize JWT verifier from environment (optional) // Requires SUPABASE_URL and either SUPABASE_JWT_PUBLIC_KEY (RS256) or SUPABASE_JWT_SECRET (HS256) let jwt_verifier = match AuthConfig::from_env() { Some(config) => match JwtVerifier::new(config) { Ok(verifier) => { tracing::info!("JWT authentication configured"); Some(verifier) } Err(e) => { tracing::error!("Failed to initialize JWT verifier: {}", e); None } }, None => { // Log which env vars are missing let has_url = std::env::var("SUPABASE_URL").is_ok(); let has_public_key = std::env::var("SUPABASE_JWT_PUBLIC_KEY").is_ok(); let has_secret = std::env::var("SUPABASE_JWT_SECRET").is_ok(); if !has_url { tracing::info!("JWT authentication not configured (SUPABASE_URL not set)"); } else if !has_public_key && !has_secret { tracing::info!("JWT authentication not configured (set SUPABASE_JWT_PUBLIC_KEY for RS256 or SUPABASE_JWT_SECRET for HS256)"); } None } }; Self { model_config, ml_models: OnceCell::new(), db_pool: None, file_updates, task_updates, task_output, task_completions, supervisor_questions, merge_results, pr_results, pending_questions: DashMap::new(), question_responses: DashMap::new(), daemon_connections: DashMap::new(), tool_keys: DashMap::new(), jwt_verifier, pending_worktree_info: DashMap::new(), pending_task_diff: DashMap::new(), pending_worktree_diff: DashMap::new(), pending_worktree_commit: DashMap::new(), tts_engine: OnceCell::new(), daemon_reauth_status: DashMap::new(), directive_kick: std::sync::Arc::new(tokio::sync::Notify::new()), } } /// Wake the directive reconciler so it ticks now instead of waiting for /// the next 15-second interval. Cheap and safe to call from any handler. pub fn kick_directive_reconciler(&self) { self.directive_kick.notify_one(); } /// Get or initialize the TTS engine (lazy loading). /// /// The TTS engine is loaded on first Speak connection using the Chatterbox backend. /// Returns a reference to the engine, or an error if loading fails. pub async fn get_tts_engine(&self) -> Result<&dyn TtsEngine, Box> { let tts_dir = self.model_config.as_ref().map(|c| c.chatterbox_model_dir.as_str()); self.tts_engine.get_or_try_init(|| async { tracing::info!( model_dir = ?tts_dir, "Lazy-loading TTS engine (Chatterbox) on first Speak connection..." ); let engine = crate::tts::TtsEngineFactory::create(tts_dir) .map_err(|e| -> Box { Box::new(e) })?; tracing::info!("TTS engine loaded successfully"); Ok(engine) }).await.map(|b| b.as_ref()) } /// Get or initialize ML models (lazy loading). /// /// Models are loaded on first call and cached for subsequent calls. /// Returns None if model config is not set. pub async fn get_ml_models(&self) -> Result<&MlModels, Box> { let config = self.model_config.as_ref() .ok_or_else(|| "ML model configuration not set")?; self.ml_models.get_or_try_init(|| async { tracing::info!( parakeet = %config.parakeet_model_dir, eou = %config.parakeet_eou_dir, sortformer = %config.sortformer_model_path, "Lazy-loading ML models on first Listen connection..." ); let parakeet = ParakeetTDT::from_pretrained(&config.parakeet_model_dir, None)?; let parakeet_eou = ParakeetEOU::from_pretrained(&config.parakeet_eou_dir, None)?; let sortformer = Sortformer::with_config( &config.sortformer_model_path, None, DiarizationConfig::callhome(), )?; tracing::info!("ML models loaded successfully"); Ok(MlModels { parakeet: Mutex::new(parakeet), parakeet_eou: Mutex::new(parakeet_eou), sortformer: Mutex::new(sortformer), }) }).await } /// Check if ML models are loaded. pub fn are_models_loaded(&self) -> bool { self.ml_models.initialized() } /// Set the database pool. pub fn with_db_pool(mut self, pool: PgPool) -> Self { self.db_pool = Some(pool); self } /// Broadcast a file update notification to all subscribers. /// /// This is a no-op if there are no subscribers (ignores send errors). pub fn broadcast_file_update(&self, notification: FileUpdateNotification) { // Ignore send errors - they just mean no one is listening let _ = self.file_updates.send(notification); } /// Broadcast a task update notification to all subscribers. /// /// This is a no-op if there are no subscribers (ignores send errors). pub fn broadcast_task_update(&self, notification: TaskUpdateNotification) { let _ = self.task_updates.send(notification); } /// Broadcast task output to all subscribers. /// /// Used for streaming Claude Code container output to frontend clients. pub fn broadcast_task_output(&self, notification: TaskOutputNotification) { let _ = self.task_output.send(notification); } /// Broadcast a task completion notification to all subscribers. /// /// Used to notify supervisor tasks when their child tasks complete. pub fn broadcast_task_completion(&self, notification: TaskCompletionNotification) { let _ = self.task_completions.send(notification); } /// Broadcast a supervisor question notification to all subscribers. /// /// Used to notify frontend clients when a supervisor needs user feedback. pub fn broadcast_supervisor_question(&self, notification: SupervisorQuestionNotification) { let _ = self.supervisor_questions.send(notification); } /// Broadcast a merge result notification to all subscribers. /// /// Used to notify waiting handlers when a merge operation completes. pub fn broadcast_merge_result(&self, notification: MergeResultNotification) { let _ = self.merge_results.send(notification); } /// Broadcast a PR creation result notification to all subscribers. /// /// Used to notify waiting handlers when a PR creation operation completes. pub fn broadcast_pr_result(&self, notification: PrResultNotification) { let _ = self.pr_results.send(notification); } /// Add a pending supervisor question and broadcast it. pub fn add_supervisor_question( &self, task_id: Uuid, contract_id: Uuid, owner_id: Uuid, question: String, choices: Vec, context: Option, multi_select: bool, question_type: String, ) -> Uuid { self.add_supervisor_question_with_directive( task_id, contract_id, None, owner_id, question, choices, context, multi_select, question_type, ) } /// Add a pending supervisor question with optional directive context and broadcast it. pub fn add_supervisor_question_with_directive( &self, task_id: Uuid, contract_id: Uuid, directive_id: Option, owner_id: Uuid, question: String, choices: Vec, context: Option, multi_select: bool, question_type: String, ) -> Uuid { let question_id = Uuid::new_v4(); let now = chrono::Utc::now(); // Store the pending question self.pending_questions.insert( question_id, PendingSupervisorQuestion { question_id, task_id, contract_id, directive_id, owner_id, question: question.clone(), choices: choices.clone(), context: context.clone(), created_at: now, multi_select, question_type: question_type.clone(), }, ); // Broadcast to subscribers self.broadcast_supervisor_question(SupervisorQuestionNotification { question_id, task_id, contract_id, directive_id, owner_id: Some(owner_id), question, choices, context, pending: true, created_at: now, multi_select, }); tracing::info!( question_id = %question_id, task_id = %task_id, contract_id = %contract_id, directive_id = ?directive_id, question_type = %question_type, "Supervisor question added" ); question_id } /// Remove a pending question (after it's been answered). pub fn remove_pending_question(&self, question_id: Uuid) -> Option { self.pending_questions.remove(&question_id).map(|(_, q)| q) } /// Get all pending questions for an owner. pub fn get_pending_questions_for_owner(&self, owner_id: Uuid) -> Vec { self.pending_questions .iter() .filter(|entry| entry.value().owner_id == owner_id) .map(|entry| entry.value().clone()) .collect() } /// Get a specific pending question. pub fn get_pending_question(&self, question_id: Uuid) -> Option { self.pending_questions.get(&question_id).map(|entry| entry.value().clone()) } /// Check if a pending question exists (either still pending or has a response ready). pub fn has_pending_question(&self, question_id: Uuid) -> bool { self.pending_questions.contains_key(&question_id) || self.question_responses.contains_key(&question_id) } /// Submit a response to a supervisor question. pub fn submit_question_response(&self, question_id: Uuid, response: String) -> bool { // Check if the question exists if let Some(question) = self.pending_questions.remove(&question_id) { let now = chrono::Utc::now(); // Store the response self.question_responses.insert( question_id, SupervisorQuestionResponse { question_id, response: response.clone(), responded_at: now, }, ); // Broadcast that the question is no longer pending self.broadcast_supervisor_question(SupervisorQuestionNotification { question_id, task_id: question.1.task_id, contract_id: question.1.contract_id, directive_id: question.1.directive_id, owner_id: Some(question.1.owner_id), question: question.1.question, choices: question.1.choices, context: question.1.context, pending: false, created_at: question.1.created_at, multi_select: question.1.multi_select, }); tracing::info!( question_id = %question_id, "Supervisor question answered" ); true } else { false } } /// Get the response to a question (if answered). pub fn get_question_response(&self, question_id: Uuid) -> Option { self.question_responses.get(&question_id).map(|entry| entry.value().clone()) } /// Clean up a question response after the supervisor has read it. pub fn cleanup_question_response(&self, question_id: Uuid) { self.question_responses.remove(&question_id); } /// Remove all pending questions for a specific task. /// /// This should be called when a task is deleted to clean up orphaned questions. /// Returns the number of questions removed. pub fn remove_pending_questions_for_task(&self, task_id: Uuid) -> usize { // Collect question IDs to remove let question_ids: Vec = self .pending_questions .iter() .filter(|entry| entry.value().task_id == task_id) .map(|entry| entry.value().question_id) .collect(); let count = question_ids.len(); // Remove pending questions and their responses for question_id in question_ids { self.pending_questions.remove(&question_id); self.question_responses.remove(&question_id); } if count > 0 { tracing::info!( task_id = %task_id, count = count, "Cleaned up pending questions for deleted task" ); } count } /// Remove all pending questions for a specific contract. /// /// This should be called when a contract is deleted to clean up orphaned questions. /// Returns the number of questions removed. pub fn remove_pending_questions_for_contract(&self, contract_id: Uuid) -> usize { // Collect question IDs to remove let question_ids: Vec = self .pending_questions .iter() .filter(|entry| entry.value().contract_id == contract_id) .map(|entry| entry.value().question_id) .collect(); let count = question_ids.len(); // Remove pending questions and their responses for question_id in question_ids { self.pending_questions.remove(&question_id); self.question_responses.remove(&question_id); } if count > 0 { tracing::info!( contract_id = %contract_id, count = count, "Cleaned up pending questions for deleted contract" ); } count } /// Register a new daemon connection. /// /// Returns the connection_id for later reference. pub fn register_daemon( &self, connection_id: String, daemon_id: Uuid, owner_id: Uuid, hostname: Option, machine_id: Option, command_sender: mpsc::Sender, ) { self.daemon_connections.insert( connection_id.clone(), DaemonConnectionInfo { id: daemon_id, owner_id, connection_id, hostname, machine_id, command_sender, working_directory: None, home_directory: None, worktrees_directory: None, }, ); } /// Update daemon directory information. pub fn update_daemon_directories( &self, connection_id: &str, working_directory: String, home_directory: String, worktrees_directory: String, ) { if let Some(mut entry) = self.daemon_connections.get_mut(connection_id) { entry.working_directory = Some(working_directory); entry.home_directory = Some(home_directory); entry.worktrees_directory = Some(worktrees_directory); } } /// Unregister a daemon connection. pub fn unregister_daemon(&self, connection_id: &str) { self.daemon_connections.remove(connection_id); } /// Get a daemon connection by connection_id. pub fn get_daemon(&self, connection_id: &str) -> Option> { self.daemon_connections.get(connection_id) } /// Get a daemon by its database ID. pub fn get_daemon_by_id(&self, daemon_id: Uuid) -> Option> { self.daemon_connections .iter() .find(|entry| entry.value().id == daemon_id) .map(|entry| { // Return a reference to the found entry self.daemon_connections.get(entry.key()).unwrap() }) } /// Send a command to a specific daemon by its database ID. pub async fn send_daemon_command(&self, daemon_id: Uuid, command: DaemonCommand) -> Result<(), String> { if let Some(daemon) = self.daemon_connections .iter() .find(|entry| entry.value().id == daemon_id) { daemon.value().command_sender.send(command).await .map_err(|e| format!("Failed to send command to daemon: {}", e)) } else { Err(format!("Daemon {} not connected", daemon_id)) } } /// Broadcast sibling progress to all running sibling tasks. /// /// This is used for sibling awareness - when a task makes progress, /// its siblings are notified so they can adjust their work if needed. pub async fn broadcast_sibling_progress( &self, source_task_id: Uuid, source_task_name: &str, source_task_status: &str, progress_summary: Option, changed_files: Vec, running_sibling_daemon_ids: Vec<(Uuid, Uuid)>, // (task_id, daemon_id) ) { for (sibling_task_id, daemon_id) in running_sibling_daemon_ids { let command = DaemonCommand::InjectSiblingContext { task_id: sibling_task_id, sibling_task_id: source_task_id, sibling_name: source_task_name.to_string(), sibling_status: source_task_status.to_string(), progress_summary: progress_summary.clone(), changed_files: changed_files.clone(), }; // Fire and forget - don't block on sending to all daemons if let Err(e) = self.send_daemon_command(daemon_id, command).await { tracing::warn!( "Failed to inject sibling context to task {}: {}", sibling_task_id, e ); } } } /// Get list of connected daemon IDs. pub fn list_connected_daemon_ids(&self) -> Vec { self.daemon_connections .iter() .map(|entry| entry.value().id) .collect() } /// Find an alternative daemon for a task, excluding specified daemon IDs. /// Returns the daemon ID and connection info if found. pub fn find_alternative_daemon( &self, owner_id: Uuid, exclude_daemon_ids: &[Uuid], ) -> Option { self.daemon_connections .iter() .find(|entry| { let daemon = entry.value(); daemon.owner_id == owner_id && !exclude_daemon_ids.contains(&daemon.id) }) .map(|entry| entry.value().id) } /// Check if a specific daemon is connected. pub fn is_daemon_connected(&self, daemon_id: Uuid) -> bool { self.daemon_connections .iter() .any(|entry| entry.value().id == daemon_id) } // ========================================================================= // Tool Key Management // ========================================================================= /// Register a tool key for a task. /// /// This allows orchestrators to authenticate with the API using /// the `X-Makima-Tool-Key` header. pub fn register_tool_key(&self, key: String, task_id: Uuid) { tracing::info!(task_id = %task_id, "Registering tool key"); self.tool_keys.insert(key, task_id); } /// Validate a tool key and return the associated task ID. pub fn validate_tool_key(&self, key: &str) -> Option { self.tool_keys.get(key).map(|entry| *entry.value()) } /// Revoke a tool key for a task. /// /// This should be called when a task completes or is terminated. pub fn revoke_tool_key(&self, task_id: Uuid) { // Find and remove the key for this task self.tool_keys.retain(|_, v| *v != task_id); tracing::info!(task_id = %task_id, "Revoked tool key"); } // ========================================================================= // Daemon Reauth Status // ========================================================================= /// Store a daemon reauth status update (from daemon's ReauthStatus message). pub fn set_daemon_reauth_status( &self, daemon_id: Uuid, request_id: Uuid, status: String, login_url: Option, error: Option, ) { self.daemon_reauth_status.insert( (daemon_id, request_id), DaemonReauthStatus { request_id, status, login_url, error, }, ); } /// Get a daemon reauth status (for frontend polling). pub fn get_daemon_reauth_status( &self, daemon_id: Uuid, request_id: Uuid, ) -> Option { self.daemon_reauth_status .get(&(daemon_id, request_id)) .map(|entry| entry.value().clone()) } // ========================================================================= // Supervisor Notifications // ========================================================================= /// Notify a contract's supervisor task about an event. /// /// This sends a message to the supervisor's stdin so it can react to changes /// in tasks or contract state. pub async fn notify_supervisor( &self, supervisor_task_id: Uuid, supervisor_daemon_id: Option, message: &str, ) -> Result<(), String> { // Only send if we have a daemon ID let daemon_id = match supervisor_daemon_id { Some(id) => id, None => { tracing::debug!( supervisor_task_id = %supervisor_task_id, "Supervisor has no daemon assigned, skipping notification" ); return Ok(()); } }; let command = DaemonCommand::SendMessage { task_id: supervisor_task_id, message: message.to_string(), }; self.send_daemon_command(daemon_id, command).await } /// Format and send a task completion notification to a supervisor. /// /// If `action_directive` is provided, it will be appended to the message /// as an [ACTION REQUIRED] block to prompt the supervisor to take action. pub async fn notify_supervisor_of_task_completion( &self, supervisor_task_id: Uuid, supervisor_daemon_id: Option, completed_task_id: Uuid, completed_task_name: &str, status: &str, progress_summary: Option<&str>, error_message: Option<&str>, action_directive: Option<&str>, ) { let mut message = format!( "TASK_COMPLETED task_id={} name=\"{}\" status={}", completed_task_id, completed_task_name, status ); if let Some(summary) = progress_summary { // Escape newlines in summary let escaped = summary.replace('\n', "\\n"); message.push_str(&format!(" summary=\"{}\"", escaped)); } if let Some(err) = error_message { let escaped = err.replace('\n', "\\n"); message.push_str(&format!(" error=\"{}\"", escaped)); } // Append action directive if provided if let Some(directive) = action_directive { message.push_str("\n\n"); message.push_str(directive); } if let Err(e) = self.notify_supervisor( supervisor_task_id, supervisor_daemon_id, &message, ).await { tracing::warn!( supervisor_task_id = %supervisor_task_id, completed_task_id = %completed_task_id, "Failed to notify supervisor of task completion: {}", e ); } } /// Format and send a task status change notification to a supervisor. pub async fn notify_supervisor_of_task_update( &self, supervisor_task_id: Uuid, supervisor_daemon_id: Option, updated_task_id: Uuid, updated_task_name: &str, new_status: &str, updated_fields: &[String], ) { let message = format!( "TASK_UPDATED task_id={} name=\"{}\" status={} fields={}", updated_task_id, updated_task_name, new_status, updated_fields.join(",") ); if let Err(e) = self.notify_supervisor( supervisor_task_id, supervisor_daemon_id, &message, ).await { tracing::warn!( supervisor_task_id = %supervisor_task_id, updated_task_id = %updated_task_id, "Failed to notify supervisor of task update: {}", e ); } } /// Format and send a contract phase change notification to a supervisor. pub async fn notify_supervisor_of_phase_change( &self, supervisor_task_id: Uuid, supervisor_daemon_id: Option, contract_id: Uuid, new_phase: &str, ) { let message = format!( "PHASE_CHANGED contract_id={} phase={}", contract_id, new_phase ); if let Err(e) = self.notify_supervisor( supervisor_task_id, supervisor_daemon_id, &message, ).await { tracing::warn!( supervisor_task_id = %supervisor_task_id, contract_id = %contract_id, "Failed to notify supervisor of phase change: {}", e ); } } /// Format and send a new task created notification to a supervisor. pub async fn notify_supervisor_of_task_created( &self, supervisor_task_id: Uuid, supervisor_daemon_id: Option, new_task_id: Uuid, new_task_name: &str, ) { let message = format!( "TASK_CREATED task_id={} name=\"{}\"", new_task_id, new_task_name ); if let Err(e) = self.notify_supervisor( supervisor_task_id, supervisor_daemon_id, &message, ).await { tracing::warn!( supervisor_task_id = %supervisor_task_id, new_task_id = %new_task_id, "Failed to notify supervisor of task creation: {}", e ); } } } /// Type alias for the shared application state. pub type SharedState = Arc;