//! Application state holding shared ML models and database pool.
use std::sync::Arc;
use dashmap::DashMap;
use sqlx::PgPool;
use tokio::sync::{broadcast, mpsc, oneshot, Mutex, OnceCell};
use uuid::Uuid;
use crate::listen::{DiarizationConfig, ParakeetEOU, ParakeetTDT, Sortformer};
use crate::server::auth::{AuthConfig, JwtVerifier};
use crate::tts::TtsEngine;
/// Notification payload for file updates (broadcast to WebSocket subscribers).
#[derive(Debug, Clone)]
pub struct FileUpdateNotification {
/// ID of the updated file
pub file_id: Uuid,
/// New version number after update
pub version: i32,
/// List of fields that were updated
pub updated_fields: Vec<String>,
/// Source of the update: "user", "llm", or "system"
pub updated_by: String,
}
// =============================================================================
// Task/Mesh Notifications
// =============================================================================
/// Notification payload for task updates (broadcast to WebSocket subscribers).
#[derive(Debug, Clone)]
pub struct TaskUpdateNotification {
/// ID of the updated task
pub task_id: Uuid,
/// Owner ID for data isolation (notifications are scoped to owner)
pub owner_id: Option<Uuid>,
/// New version number after update
pub version: i32,
/// Current task status
pub status: String,
/// List of fields that were updated
pub updated_fields: Vec<String>,
/// Source of the update: "user", "daemon", or "system"
pub updated_by: String,
}
/// Notification for streaming task output from Claude Code containers.
#[derive(Debug, Clone, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct TaskOutputNotification {
/// ID of the task producing output
pub task_id: Uuid,
/// Owner ID for data isolation (notifications are scoped to owner)
#[serde(skip)]
pub owner_id: Option<Uuid>,
/// Type of message: "assistant", "tool_use", "tool_result", "result", "system", "error", "raw"
pub message_type: String,
/// Main text content of the message
pub content: String,
/// Tool name if this is a tool_use message
#[serde(skip_serializing_if = "Option::is_none")]
pub tool_name: Option<String>,
/// Tool input (JSON) if this is a tool_use message
#[serde(skip_serializing_if = "Option::is_none")]
pub tool_input: Option<serde_json::Value>,
/// Whether tool result was an error
#[serde(skip_serializing_if = "Option::is_none")]
pub is_error: Option<bool>,
/// Cost in USD if this is a result message
#[serde(skip_serializing_if = "Option::is_none")]
pub cost_usd: Option<f64>,
/// Duration in milliseconds if this is a result message
#[serde(skip_serializing_if = "Option::is_none")]
pub duration_ms: Option<u64>,
/// Whether this is a partial line (more coming) or complete
pub is_partial: bool,
}
/// Notification for task completion events (for supervisor tasks to monitor).
#[derive(Debug, Clone, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct TaskCompletionNotification {
/// ID of the completed task
pub task_id: Uuid,
/// Owner ID for data isolation
#[serde(skip)]
pub owner_id: Option<Uuid>,
/// Contract ID if task belongs to a contract
#[serde(skip_serializing_if = "Option::is_none")]
pub contract_id: Option<Uuid>,
/// Parent task ID (to notify parent/supervisor)
#[serde(skip_serializing_if = "Option::is_none")]
pub parent_task_id: Option<Uuid>,
/// Final status: "done", "failed", etc.
pub status: String,
/// Summary of task output/results
#[serde(skip_serializing_if = "Option::is_none")]
pub output_summary: Option<String>,
/// Path to the task's worktree (for reading files)
#[serde(skip_serializing_if = "Option::is_none")]
pub worktree_path: Option<String>,
/// Error message if task failed
#[serde(skip_serializing_if = "Option::is_none")]
pub error_message: Option<String>,
}
/// Notification for merge operation results.
#[derive(Debug, Clone)]
pub struct MergeResultNotification {
/// ID of the task that was merged
pub task_id: Uuid,
/// Whether the merge succeeded
pub success: bool,
/// Message describing the result
pub message: String,
/// Commit SHA if merge succeeded
pub commit_sha: Option<String>,
/// List of conflicting files if merge failed due to conflicts
pub conflicts: Option<Vec<String>>,
}
/// Notification for PR creation results.
#[derive(Debug, Clone)]
pub struct PrResultNotification {
/// ID of the task for which PR was created
pub task_id: Uuid,
/// Whether the PR creation succeeded
pub success: bool,
/// Message describing the result
pub message: String,
/// PR URL if creation succeeded
pub pr_url: Option<String>,
/// PR number if creation succeeded
pub pr_number: Option<i32>,
}
/// Notification for supervisor questions requiring user feedback.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SupervisorQuestionNotification {
/// Unique ID for this question
pub question_id: Uuid,
/// Task that asked the question
pub task_id: Uuid,
/// Directive this question relates to (if from a directive task)
#[serde(skip_serializing_if = "Option::is_none")]
pub directive_id: Option<Uuid>,
/// Owner ID for data isolation
#[serde(skip)]
pub owner_id: Option<Uuid>,
/// The question text
pub question: String,
/// Optional choices for the user (if empty, free-form text response)
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub choices: Vec<String>,
/// Context about what phase/action this relates to
#[serde(skip_serializing_if = "Option::is_none")]
pub context: Option<String>,
/// Whether this question is still pending
pub pending: bool,
/// When the question was asked
pub created_at: chrono::DateTime<chrono::Utc>,
/// Whether multiple choices can be selected
#[serde(default)]
pub multi_select: bool,
}
/// Stored supervisor question for persistence
#[derive(Debug, Clone)]
pub struct PendingSupervisorQuestion {
pub question_id: Uuid,
pub task_id: Uuid,
/// Directive this question relates to (if from a directive task)
pub directive_id: Option<Uuid>,
pub owner_id: Uuid,
pub question: String,
pub choices: Vec<String>,
pub context: Option<String>,
pub created_at: chrono::DateTime<chrono::Utc>,
/// Whether multiple choices can be selected
pub multi_select: bool,
/// Question type: general, phase_confirmation, or contract_complete
pub question_type: String,
}
/// Response to a supervisor question
#[derive(Debug, Clone)]
pub struct SupervisorQuestionResponse {
pub question_id: Uuid,
pub response: String,
pub responded_at: chrono::DateTime<chrono::Utc>,
}
/// Worktree diff response from daemon
#[derive(Debug, Clone, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct WorktreeDiffResponse {
pub task_id: Uuid,
pub success: bool,
pub diff: String,
pub error: Option<String>,
}
/// Worktree info response from daemon
#[derive(Debug, Clone, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct WorktreeInfoResponse {
pub task_id: Uuid,
pub success: bool,
pub worktree_path: Option<String>,
pub exists: bool,
pub files_changed: i32,
pub insertions: i32,
pub deletions: i32,
pub files: Option<serde_json::Value>,
pub branch: Option<String>,
pub head_sha: Option<String>,
pub error: Option<String>,
}
/// Task diff result from daemon
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TaskDiffResult {
pub task_id: Uuid,
pub success: bool,
pub diff: Option<String>,
pub error: Option<String>,
}
/// Worktree commit response from daemon
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct WorktreeCommitResponse {
pub task_id: Uuid,
pub success: bool,
pub commit_sha: Option<String>,
pub error: Option<String>,
}
/// Command sent from server to daemon.
#[derive(Debug, Clone, serde::Serialize)]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum DaemonCommand {
/// Confirm successful authentication
Authenticated {
#[serde(rename = "daemonId")]
daemon_id: Uuid,
},
/// Spawn a new task in a container
SpawnTask {
#[serde(rename = "taskId")]
task_id: Uuid,
/// Human-readable task name (used for commit messages)
#[serde(rename = "taskName")]
task_name: String,
plan: String,
#[serde(rename = "repoUrl")]
repo_url: Option<String>,
#[serde(rename = "baseBranch")]
base_branch: Option<String>,
/// Target branch to merge into (used for completion actions)
#[serde(rename = "targetBranch")]
target_branch: Option<String>,
/// Parent task ID if this is a subtask
#[serde(rename = "parentTaskId")]
parent_task_id: Option<Uuid>,
/// Depth in task hierarchy (0=top-level, 1=subtask, 2=sub-subtask)
depth: i32,
/// Whether this task should run as an orchestrator (true if depth==0 and has subtasks)
#[serde(rename = "isOrchestrator")]
is_orchestrator: bool,
/// Path to user's local repository (outside ~/.makima) for completion actions
#[serde(rename = "targetRepoPath")]
target_repo_path: Option<String>,
/// Action on completion: "none", "branch", "merge", "pr"
#[serde(rename = "completionAction")]
completion_action: Option<String>,
/// Task ID to continue from (copy worktree from this task)
#[serde(rename = "continueFromTaskId")]
continue_from_task_id: Option<Uuid>,
/// Files to copy from parent task's worktree
#[serde(rename = "copyFiles")]
copy_files: Option<Vec<String>>,
/// Whether to run in autonomous loop mode
#[serde(rename = "autonomousLoop", default)]
autonomous_loop: bool,
/// Whether to resume from a previous session using --continue flag
#[serde(rename = "resumeSession", default)]
resume_session: bool,
/// Conversation history for fallback when worktree doesn't exist
#[serde(rename = "conversationHistory", default)]
conversation_history: Option<serde_json::Value>,
/// Base64-encoded gzip-compressed patch for worktree recovery
#[serde(rename = "patchData", default, skip_serializing_if = "Option::is_none")]
patch_data: Option<String>,
/// Commit SHA to apply the patch on top of
#[serde(rename = "patchBaseSha", default, skip_serializing_if = "Option::is_none")]
patch_base_sha: Option<String>,
/// Whether to skip automatic completion actions (local-only mode).
#[serde(rename = "localOnly", default)]
local_only: bool,
/// Whether to auto-merge to target branch locally when local_only mode is enabled
#[serde(rename = "autoMergeLocal", default)]
auto_merge_local: bool,
/// Directive ID if this task is associated with a directive
#[serde(rename = "directiveId", default, skip_serializing_if = "Option::is_none")]
directive_id: Option<Uuid>,
},
/// Pause a running task
PauseTask {
#[serde(rename = "taskId")]
task_id: Uuid,
},
/// Resume a paused task
ResumeTask {
#[serde(rename = "taskId")]
task_id: Uuid,
},
/// Interrupt a task (gracefully or forced)
InterruptTask {
#[serde(rename = "taskId")]
task_id: Uuid,
graceful: bool,
},
/// Send a message to a running task
SendMessage {
#[serde(rename = "taskId")]
task_id: Uuid,
message: String,
},
/// Inject context about sibling task progress
InjectSiblingContext {
#[serde(rename = "taskId")]
task_id: Uuid,
#[serde(rename = "siblingTaskId")]
sibling_task_id: Uuid,
#[serde(rename = "siblingName")]
sibling_name: String,
#[serde(rename = "siblingStatus")]
sibling_status: String,
#[serde(rename = "progressSummary")]
progress_summary: Option<String>,
#[serde(rename = "changedFiles")]
changed_files: Vec<String>,
},
// =========================================================================
// Merge Commands (for orchestrators to merge subtask branches)
// =========================================================================
/// List all subtask branches for a task
ListBranches {
#[serde(rename = "taskId")]
task_id: Uuid,
},
/// Start merging a subtask branch
MergeStart {
#[serde(rename = "taskId")]
task_id: Uuid,
#[serde(rename = "sourceBranch")]
source_branch: String,
},
/// Get current merge status
MergeStatus {
#[serde(rename = "taskId")]
task_id: Uuid,
},
/// Resolve a merge conflict
MergeResolve {
#[serde(rename = "taskId")]
task_id: Uuid,
file: String,
/// "ours" or "theirs"
strategy: String,
},
/// Commit the current merge
MergeCommit {
#[serde(rename = "taskId")]
task_id: Uuid,
message: String,
},
/// Abort the current merge
MergeAbort {
#[serde(rename = "taskId")]
task_id: Uuid,
},
/// Skip merging a subtask branch (mark as intentionally not merged)
MergeSkip {
#[serde(rename = "taskId")]
task_id: Uuid,
#[serde(rename = "subtaskId")]
subtask_id: Uuid,
reason: String,
},
/// Check if all subtask branches have been merged or skipped (completion gate)
CheckMergeComplete {
#[serde(rename = "taskId")]
task_id: Uuid,
},
// =========================================================================
// Completion Action Commands
// =========================================================================
/// Retry a completion action for a completed task
RetryCompletionAction {
#[serde(rename = "taskId")]
task_id: Uuid,
/// Human-readable task name (used for commit messages)
#[serde(rename = "taskName")]
task_name: String,
/// The action to execute: "branch", "merge", or "pr"
action: String,
/// Path to the target repository
#[serde(rename = "targetRepoPath")]
target_repo_path: String,
/// Target branch to merge into (for merge/pr actions)
#[serde(rename = "targetBranch")]
target_branch: Option<String>,
},
/// Clone worktree to a target directory
CloneWorktree {
#[serde(rename = "taskId")]
task_id: Uuid,
/// Path to the target directory
#[serde(rename = "targetDir")]
target_dir: String,
},
/// Check if a target directory exists
CheckTargetExists {
#[serde(rename = "taskId")]
task_id: Uuid,
/// Path to check
#[serde(rename = "targetDir")]
target_dir: String,
},
// =========================================================================
// Contract File Commands
// =========================================================================
/// Read a file from a repository linked to a contract
ReadRepoFile {
/// Request ID for correlating response
#[serde(rename = "requestId")]
request_id: Uuid,
/// Contract ID (used for logging/context)
#[serde(rename = "contractId")]
contract_id: Uuid,
/// Path to the file within the repository
#[serde(rename = "filePath")]
file_path: String,
/// Full repository path on daemon's filesystem
#[serde(rename = "repoPath")]
repo_path: String,
},
// =========================================================================
// Supervisor Git Commands
// =========================================================================
/// Create a new branch in a task's worktree
CreateBranch {
#[serde(rename = "taskId")]
task_id: Uuid,
#[serde(rename = "branchName")]
branch_name: String,
/// Optional reference to create branch from (task_id or SHA)
#[serde(rename = "fromRef")]
from_ref: Option<String>,
},
/// Merge a task's changes to a target branch
MergeTaskToTarget {
#[serde(rename = "taskId")]
task_id: Uuid,
/// Target branch to merge into (default: task's base branch)
#[serde(rename = "targetBranch")]
target_branch: Option<String>,
/// Whether to squash commits
squash: bool,
},
/// Create a pull request for a task's changes
CreatePR {
#[serde(rename = "taskId")]
task_id: Uuid,
title: String,
body: Option<String>,
/// Base branch for the PR. If None, will be auto-detected from the repo.
#[serde(rename = "baseBranch")]
base_branch: Option<String>,
/// Source branch name to push and create PR from
branch: String,
},
/// Get the diff for a task's changes
GetTaskDiff {
#[serde(rename = "taskId")]
task_id: Uuid,
},
/// Get worktree information (files, stats, branch) for a task
GetWorktreeInfo {
#[serde(rename = "taskId")]
task_id: Uuid,
},
/// Commit changes in a task worktree
CommitWorktree {
#[serde(rename = "taskId")]
task_id: Uuid,
message: Option<String>,
},
/// Create a git checkpoint (stage changes, commit, record stats)
CreateCheckpoint {
#[serde(rename = "taskId")]
task_id: Uuid,
/// Commit message for the checkpoint
message: String,
},
/// Clean up a task's worktree (used when contract is completed/deleted)
CleanupWorktree {
#[serde(rename = "taskId")]
task_id: Uuid,
/// Whether to delete the associated branch
#[serde(rename = "deleteBranch")]
delete_branch: bool,
},
/// Inherit git config (user.email, user.name) from a directory
InheritGitConfig {
/// Directory to read git config from (defaults to daemon's working directory)
#[serde(rename = "sourceDir")]
source_dir: Option<String>,
},
/// Error response
Error { code: String, message: String },
/// Restart the daemon process
RestartDaemon,
/// Trigger OAuth re-authentication on this daemon
TriggerReauth {
#[serde(rename = "requestId")]
request_id: Uuid,
},
/// Submit auth code for pending reauth
SubmitAuthCode {
#[serde(rename = "requestId")]
request_id: Uuid,
code: String,
},
/// Apply a patch to a task's worktree (for cross-daemon merge).
/// Sent by server when routing MergePatchToSupervisor to the supervisor's daemon.
ApplyPatchToWorktree {
/// Target task whose worktree should be patched.
#[serde(rename = "targetTaskId")]
target_task_id: Uuid,
/// Source task that generated the patch (for logging).
#[serde(rename = "sourceTaskId")]
source_task_id: Uuid,
/// Base64-gzipped patch data.
#[serde(rename = "patchData")]
patch_data: String,
/// Base commit SHA for the patch.
#[serde(rename = "baseSha")]
base_sha: String,
},
}
/// Active daemon connection info stored in state.
#[derive(Debug)]
pub struct DaemonConnectionInfo {
/// Database ID of the daemon
pub id: Uuid,
/// Owner ID for data isolation (from API key authentication)
pub owner_id: Uuid,
/// WebSocket connection identifier
pub connection_id: String,
/// Daemon hostname
pub hostname: Option<String>,
/// Machine identifier
pub machine_id: Option<String>,
/// Channel to send commands to this daemon
pub command_sender: mpsc::Sender<DaemonCommand>,
/// Current working directory of the daemon
pub working_directory: Option<String>,
/// Path to ~/.makima/home directory on daemon (for cloning completed work)
pub home_directory: Option<String>,
/// Path to worktrees directory (~/.makima/worktrees) on daemon
pub worktrees_directory: Option<String>,
}
/// Status of a daemon reauth request (stored in state for polling).
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DaemonReauthStatus {
pub request_id: Uuid,
pub status: String,
pub login_url: Option<String>,
pub error: Option<String>,
}
/// Configuration paths for ML models (used for lazy loading).
#[derive(Clone)]
pub struct ModelConfig {
pub parakeet_model_dir: String,
pub parakeet_eou_dir: String,
pub sortformer_model_path: String,
pub chatterbox_model_dir: String,
}
/// Lazily-loaded ML models.
pub struct MlModels {
pub parakeet: Mutex<ParakeetTDT>,
pub parakeet_eou: Mutex<ParakeetEOU>,
pub sortformer: Mutex<Sortformer>,
}
/// Shared application state containing ML models and database pool.
///
/// Models are lazily loaded on first use to speed up server startup.
pub struct AppState {
/// ML model configuration (paths for lazy loading)
pub model_config: Option<ModelConfig>,
/// Lazily-loaded ML models (initialized on first Listen connection)
pub ml_models: OnceCell<MlModels>,
/// Optional database connection pool
pub db_pool: Option<PgPool>,
/// Broadcast channel for file update notifications
pub file_updates: broadcast::Sender<FileUpdateNotification>,
/// Broadcast channel for task update notifications
pub task_updates: broadcast::Sender<TaskUpdateNotification>,
/// Broadcast channel for task output streaming
pub task_output: broadcast::Sender<TaskOutputNotification>,
/// Broadcast channel for task completion notifications (for supervisors)
pub task_completions: broadcast::Sender<TaskCompletionNotification>,
/// Broadcast channel for supervisor question notifications
pub supervisor_questions: broadcast::Sender<SupervisorQuestionNotification>,
/// Broadcast channel for merge result notifications
pub merge_results: broadcast::Sender<MergeResultNotification>,
/// Broadcast channel for PR creation result notifications
pub pr_results: broadcast::Sender<PrResultNotification>,
/// Pending supervisor questions awaiting user response (keyed by question_id)
pub pending_questions: DashMap<Uuid, PendingSupervisorQuestion>,
/// Responses to supervisor questions (keyed by question_id)
pub question_responses: DashMap<Uuid, SupervisorQuestionResponse>,
/// Active daemon connections (keyed by connection_id)
pub daemon_connections: DashMap<String, DaemonConnectionInfo>,
/// Tool keys for orchestrator API access (key -> task_id)
pub tool_keys: DashMap<String, Uuid>,
/// JWT verifier for Supabase authentication (None if not configured)
pub jwt_verifier: Option<JwtVerifier>,
/// Pending worktree info requests awaiting daemon response (keyed by task_id)
pub pending_worktree_info: DashMap<Uuid, oneshot::Sender<WorktreeInfoResponse>>,
/// Pending task diff requests awaiting daemon response (keyed by task_id)
pub pending_task_diff: DashMap<Uuid, oneshot::Sender<TaskDiffResult>>,
/// Pending worktree diff requests awaiting daemon response (keyed by task_id)
pub pending_worktree_diff: DashMap<Uuid, oneshot::Sender<WorktreeDiffResponse>>,
/// Pending worktree commit requests awaiting daemon response (keyed by task_id)
pub pending_worktree_commit: DashMap<Uuid, oneshot::Sender<WorktreeCommitResponse>>,
/// Lazily-loaded TTS engine (initialized on first Speak connection)
pub tts_engine: OnceCell<Box<dyn TtsEngine>>,
/// Daemon reauth status storage (keyed by (daemon_id, request_id))
pub daemon_reauth_status: DashMap<(Uuid, Uuid), DaemonReauthStatus>,
/// Signal used to nudge the directive reconciler to run a tick immediately
/// (e.g. after a goal update) rather than waiting up to 15s for the next
/// interval. The reconciler loop in `server::mod` awaits `notified()` in
/// parallel with its interval; handlers call `kick_directive_reconciler()`.
pub directive_kick: std::sync::Arc<tokio::sync::Notify>,
}
impl AppState {
/// Create AppState WITHOUT ML model configuration. Listen and Speak
/// endpoints will return "not configured" errors if used; everything
/// else (mesh, directives, files, contracts-free CRUD) works normally.
/// This is the constructor used by the slim Dockerfile.
pub fn new_slim() -> Self {
Self::new_inner(None)
}
/// Create AppState with ML model configuration for lazy loading.
/// Pass None to disable a specific model family — Listen needs all
/// three of parakeet/parakeet_eou/sortformer; Speak needs chatterbox.
/// If `parakeet_model_dir` is None we skip the whole ModelConfig and
/// behave like `new_slim()`.
///
/// Models are NOT loaded at startup — they're loaded on first use.
pub fn new(
parakeet_model_dir: &str,
parakeet_eou_dir: &str,
sortformer_model_path: &str,
chatterbox_model_dir: &str,
) -> Self {
Self::new_inner(Some(ModelConfig {
parakeet_model_dir: parakeet_model_dir.to_string(),
parakeet_eou_dir: parakeet_eou_dir.to_string(),
sortformer_model_path: sortformer_model_path.to_string(),
chatterbox_model_dir: chatterbox_model_dir.to_string(),
}))
}
/// Internal constructor — model_config can be None for the slim build.
fn new_inner(model_config: Option<ModelConfig>) -> Self {
// Create broadcast channels with buffer for 256 messages
let (file_updates, _) = broadcast::channel(256);
let (task_updates, _) = broadcast::channel(256);
let (task_output, _) = broadcast::channel(1024); // Larger buffer for output streaming
let (task_completions, _) = broadcast::channel(256); // For supervisor task monitoring
let (supervisor_questions, _) = broadcast::channel(256); // For supervisor questions to users
let (merge_results, _) = broadcast::channel(256); // For merge operation results
let (pr_results, _) = broadcast::channel(256); // For PR creation results
// Initialize JWT verifier from environment (optional)
// Requires SUPABASE_URL and either SUPABASE_JWT_PUBLIC_KEY (RS256) or SUPABASE_JWT_SECRET (HS256)
let jwt_verifier = match AuthConfig::from_env() {
Some(config) => match JwtVerifier::new(config) {
Ok(verifier) => {
tracing::info!("JWT authentication configured");
Some(verifier)
}
Err(e) => {
tracing::error!("Failed to initialize JWT verifier: {}", e);
None
}
},
None => {
// Log which env vars are missing
let has_url = std::env::var("SUPABASE_URL").is_ok();
let has_public_key = std::env::var("SUPABASE_JWT_PUBLIC_KEY").is_ok();
let has_secret = std::env::var("SUPABASE_JWT_SECRET").is_ok();
if !has_url {
tracing::info!("JWT authentication not configured (SUPABASE_URL not set)");
} else if !has_public_key && !has_secret {
tracing::info!("JWT authentication not configured (set SUPABASE_JWT_PUBLIC_KEY for RS256 or SUPABASE_JWT_SECRET for HS256)");
}
None
}
};
Self {
model_config,
ml_models: OnceCell::new(),
db_pool: None,
file_updates,
task_updates,
task_output,
task_completions,
supervisor_questions,
merge_results,
pr_results,
pending_questions: DashMap::new(),
question_responses: DashMap::new(),
daemon_connections: DashMap::new(),
tool_keys: DashMap::new(),
jwt_verifier,
pending_worktree_info: DashMap::new(),
pending_task_diff: DashMap::new(),
pending_worktree_diff: DashMap::new(),
pending_worktree_commit: DashMap::new(),
tts_engine: OnceCell::new(),
daemon_reauth_status: DashMap::new(),
directive_kick: std::sync::Arc::new(tokio::sync::Notify::new()),
}
}
/// Wake the directive reconciler so it ticks now instead of waiting for
/// the next 15-second interval. Cheap and safe to call from any handler.
pub fn kick_directive_reconciler(&self) {
self.directive_kick.notify_one();
}
/// Get or initialize the TTS engine (lazy loading).
///
/// The TTS engine is loaded on first Speak connection using the Chatterbox backend.
/// Returns a reference to the engine, or an error if loading fails.
pub async fn get_tts_engine(&self) -> Result<&dyn TtsEngine, Box<dyn std::error::Error + Send + Sync>> {
let tts_dir = self.model_config.as_ref().map(|c| c.chatterbox_model_dir.as_str());
self.tts_engine.get_or_try_init(|| async {
tracing::info!(
model_dir = ?tts_dir,
"Lazy-loading TTS engine (Chatterbox) on first Speak connection..."
);
let engine = crate::tts::TtsEngineFactory::create(tts_dir)
.map_err(|e| -> Box<dyn std::error::Error + Send + Sync> {
Box::new(e)
})?;
tracing::info!("TTS engine loaded successfully");
Ok(engine)
}).await.map(|b| b.as_ref())
}
/// Get or initialize ML models (lazy loading).
///
/// Models are loaded on first call and cached for subsequent calls.
/// Returns None if model config is not set.
pub async fn get_ml_models(&self) -> Result<&MlModels, Box<dyn std::error::Error + Send + Sync>> {
let config = self.model_config.as_ref()
.ok_or_else(|| "ML model configuration not set")?;
self.ml_models.get_or_try_init(|| async {
tracing::info!(
parakeet = %config.parakeet_model_dir,
eou = %config.parakeet_eou_dir,
sortformer = %config.sortformer_model_path,
"Lazy-loading ML models on first Listen connection..."
);
let parakeet = ParakeetTDT::from_pretrained(&config.parakeet_model_dir, None)?;
let parakeet_eou = ParakeetEOU::from_pretrained(&config.parakeet_eou_dir, None)?;
let sortformer = Sortformer::with_config(
&config.sortformer_model_path,
None,
DiarizationConfig::callhome(),
)?;
tracing::info!("ML models loaded successfully");
Ok(MlModels {
parakeet: Mutex::new(parakeet),
parakeet_eou: Mutex::new(parakeet_eou),
sortformer: Mutex::new(sortformer),
})
}).await
}
/// Check if ML models are loaded.
pub fn are_models_loaded(&self) -> bool {
self.ml_models.initialized()
}
/// Set the database pool.
pub fn with_db_pool(mut self, pool: PgPool) -> Self {
self.db_pool = Some(pool);
self
}
/// Broadcast a file update notification to all subscribers.
///
/// This is a no-op if there are no subscribers (ignores send errors).
pub fn broadcast_file_update(&self, notification: FileUpdateNotification) {
// Ignore send errors - they just mean no one is listening
let _ = self.file_updates.send(notification);
}
/// Broadcast a task update notification to all subscribers.
///
/// This is a no-op if there are no subscribers (ignores send errors).
pub fn broadcast_task_update(&self, notification: TaskUpdateNotification) {
let _ = self.task_updates.send(notification);
}
/// Broadcast task output to all subscribers.
///
/// Used for streaming Claude Code container output to frontend clients.
pub fn broadcast_task_output(&self, notification: TaskOutputNotification) {
let _ = self.task_output.send(notification);
}
/// Broadcast a task completion notification to all subscribers.
///
/// Used to notify supervisor tasks when their child tasks complete.
pub fn broadcast_task_completion(&self, notification: TaskCompletionNotification) {
let _ = self.task_completions.send(notification);
}
/// Broadcast a supervisor question notification to all subscribers.
///
/// Used to notify frontend clients when a supervisor needs user feedback.
pub fn broadcast_supervisor_question(&self, notification: SupervisorQuestionNotification) {
let _ = self.supervisor_questions.send(notification);
}
/// Broadcast a merge result notification to all subscribers.
///
/// Used to notify waiting handlers when a merge operation completes.
pub fn broadcast_merge_result(&self, notification: MergeResultNotification) {
let _ = self.merge_results.send(notification);
}
/// Broadcast a PR creation result notification to all subscribers.
///
/// Used to notify waiting handlers when a PR creation operation completes.
pub fn broadcast_pr_result(&self, notification: PrResultNotification) {
let _ = self.pr_results.send(notification);
}
/// Add a pending question and broadcast it. Questions live in
/// memory only; they're a back-channel for directive tasks to
/// pause for clarification (used by `makima directive ask`).
pub fn add_supervisor_question(
&self,
task_id: Uuid,
directive_id: Option<Uuid>,
owner_id: Uuid,
question: String,
choices: Vec<String>,
context: Option<String>,
multi_select: bool,
question_type: String,
) -> Uuid {
let question_id = Uuid::new_v4();
let now = chrono::Utc::now();
self.pending_questions.insert(
question_id,
PendingSupervisorQuestion {
question_id,
task_id,
directive_id,
owner_id,
question: question.clone(),
choices: choices.clone(),
context: context.clone(),
created_at: now,
multi_select,
question_type: question_type.clone(),
},
);
self.broadcast_supervisor_question(SupervisorQuestionNotification {
question_id,
task_id,
directive_id,
owner_id: Some(owner_id),
question,
choices,
context,
pending: true,
created_at: now,
multi_select,
});
tracing::info!(
question_id = %question_id,
task_id = %task_id,
directive_id = ?directive_id,
question_type = %question_type,
"Question added"
);
question_id
}
/// Remove a pending question (after it's been answered).
pub fn remove_pending_question(&self, question_id: Uuid) -> Option<PendingSupervisorQuestion> {
self.pending_questions.remove(&question_id).map(|(_, q)| q)
}
/// Get all pending questions for an owner.
pub fn get_pending_questions_for_owner(&self, owner_id: Uuid) -> Vec<PendingSupervisorQuestion> {
self.pending_questions
.iter()
.filter(|entry| entry.value().owner_id == owner_id)
.map(|entry| entry.value().clone())
.collect()
}
/// Get a specific pending question.
pub fn get_pending_question(&self, question_id: Uuid) -> Option<PendingSupervisorQuestion> {
self.pending_questions.get(&question_id).map(|entry| entry.value().clone())
}
/// Check if a pending question exists (either still pending or has a response ready).
pub fn has_pending_question(&self, question_id: Uuid) -> bool {
self.pending_questions.contains_key(&question_id) || self.question_responses.contains_key(&question_id)
}
/// Submit a response to a supervisor question.
pub fn submit_question_response(&self, question_id: Uuid, response: String) -> bool {
// Check if the question exists
if let Some(question) = self.pending_questions.remove(&question_id) {
let now = chrono::Utc::now();
// Store the response
self.question_responses.insert(
question_id,
SupervisorQuestionResponse {
question_id,
response: response.clone(),
responded_at: now,
},
);
// Broadcast that the question is no longer pending
self.broadcast_supervisor_question(SupervisorQuestionNotification {
question_id,
task_id: question.1.task_id,
directive_id: question.1.directive_id,
owner_id: Some(question.1.owner_id),
question: question.1.question,
choices: question.1.choices,
context: question.1.context,
pending: false,
created_at: question.1.created_at,
multi_select: question.1.multi_select,
});
tracing::info!(
question_id = %question_id,
"Supervisor question answered"
);
true
} else {
false
}
}
/// Get the response to a question (if answered).
pub fn get_question_response(&self, question_id: Uuid) -> Option<SupervisorQuestionResponse> {
self.question_responses.get(&question_id).map(|entry| entry.value().clone())
}
/// Clean up a question response after the supervisor has read it.
pub fn cleanup_question_response(&self, question_id: Uuid) {
self.question_responses.remove(&question_id);
}
/// Remove all pending questions for a specific task.
///
/// This should be called when a task is deleted to clean up orphaned questions.
/// Returns the number of questions removed.
pub fn remove_pending_questions_for_task(&self, task_id: Uuid) -> usize {
// Collect question IDs to remove
let question_ids: Vec<Uuid> = self
.pending_questions
.iter()
.filter(|entry| entry.value().task_id == task_id)
.map(|entry| entry.value().question_id)
.collect();
let count = question_ids.len();
// Remove pending questions and their responses
for question_id in question_ids {
self.pending_questions.remove(&question_id);
self.question_responses.remove(&question_id);
}
if count > 0 {
tracing::info!(
task_id = %task_id,
count = count,
"Cleaned up pending questions for deleted task"
);
}
count
}
/// Register a new daemon connection.
///
/// Returns the connection_id for later reference.
pub fn register_daemon(
&self,
connection_id: String,
daemon_id: Uuid,
owner_id: Uuid,
hostname: Option<String>,
machine_id: Option<String>,
command_sender: mpsc::Sender<DaemonCommand>,
) {
self.daemon_connections.insert(
connection_id.clone(),
DaemonConnectionInfo {
id: daemon_id,
owner_id,
connection_id,
hostname,
machine_id,
command_sender,
working_directory: None,
home_directory: None,
worktrees_directory: None,
},
);
}
/// Update daemon directory information.
pub fn update_daemon_directories(
&self,
connection_id: &str,
working_directory: String,
home_directory: String,
worktrees_directory: String,
) {
if let Some(mut entry) = self.daemon_connections.get_mut(connection_id) {
entry.working_directory = Some(working_directory);
entry.home_directory = Some(home_directory);
entry.worktrees_directory = Some(worktrees_directory);
}
}
/// Unregister a daemon connection.
pub fn unregister_daemon(&self, connection_id: &str) {
self.daemon_connections.remove(connection_id);
}
/// Get a daemon connection by connection_id.
pub fn get_daemon(&self, connection_id: &str) -> Option<dashmap::mapref::one::Ref<'_, String, DaemonConnectionInfo>> {
self.daemon_connections.get(connection_id)
}
/// Get a daemon by its database ID.
pub fn get_daemon_by_id(&self, daemon_id: Uuid) -> Option<dashmap::mapref::one::Ref<'_, String, DaemonConnectionInfo>> {
self.daemon_connections
.iter()
.find(|entry| entry.value().id == daemon_id)
.map(|entry| {
// Return a reference to the found entry
self.daemon_connections.get(entry.key()).unwrap()
})
}
/// Send a command to a specific daemon by its database ID.
pub async fn send_daemon_command(&self, daemon_id: Uuid, command: DaemonCommand) -> Result<(), String> {
if let Some(daemon) = self.daemon_connections
.iter()
.find(|entry| entry.value().id == daemon_id)
{
daemon.value().command_sender.send(command).await
.map_err(|e| format!("Failed to send command to daemon: {}", e))
} else {
Err(format!("Daemon {} not connected", daemon_id))
}
}
/// Broadcast sibling progress to all running sibling tasks.
///
/// This is used for sibling awareness - when a task makes progress,
/// its siblings are notified so they can adjust their work if needed.
pub async fn broadcast_sibling_progress(
&self,
source_task_id: Uuid,
source_task_name: &str,
source_task_status: &str,
progress_summary: Option<String>,
changed_files: Vec<String>,
running_sibling_daemon_ids: Vec<(Uuid, Uuid)>, // (task_id, daemon_id)
) {
for (sibling_task_id, daemon_id) in running_sibling_daemon_ids {
let command = DaemonCommand::InjectSiblingContext {
task_id: sibling_task_id,
sibling_task_id: source_task_id,
sibling_name: source_task_name.to_string(),
sibling_status: source_task_status.to_string(),
progress_summary: progress_summary.clone(),
changed_files: changed_files.clone(),
};
// Fire and forget - don't block on sending to all daemons
if let Err(e) = self.send_daemon_command(daemon_id, command).await {
tracing::warn!(
"Failed to inject sibling context to task {}: {}",
sibling_task_id,
e
);
}
}
}
/// Get list of connected daemon IDs.
pub fn list_connected_daemon_ids(&self) -> Vec<Uuid> {
self.daemon_connections
.iter()
.map(|entry| entry.value().id)
.collect()
}
/// Find an alternative daemon for a task, excluding specified daemon IDs.
/// Returns the daemon ID and connection info if found.
pub fn find_alternative_daemon(
&self,
owner_id: Uuid,
exclude_daemon_ids: &[Uuid],
) -> Option<Uuid> {
self.daemon_connections
.iter()
.find(|entry| {
let daemon = entry.value();
daemon.owner_id == owner_id && !exclude_daemon_ids.contains(&daemon.id)
})
.map(|entry| entry.value().id)
}
/// Check if a specific daemon is connected.
pub fn is_daemon_connected(&self, daemon_id: Uuid) -> bool {
self.daemon_connections
.iter()
.any(|entry| entry.value().id == daemon_id)
}
// =========================================================================
// Tool Key Management
// =========================================================================
/// Register a tool key for a task.
///
/// This allows orchestrators to authenticate with the API using
/// the `X-Makima-Tool-Key` header.
pub fn register_tool_key(&self, key: String, task_id: Uuid) {
tracing::info!(task_id = %task_id, "Registering tool key");
self.tool_keys.insert(key, task_id);
}
/// Validate a tool key and return the associated task ID.
pub fn validate_tool_key(&self, key: &str) -> Option<Uuid> {
self.tool_keys.get(key).map(|entry| *entry.value())
}
/// Revoke a tool key for a task.
///
/// This should be called when a task completes or is terminated.
pub fn revoke_tool_key(&self, task_id: Uuid) {
// Find and remove the key for this task
self.tool_keys.retain(|_, v| *v != task_id);
tracing::info!(task_id = %task_id, "Revoked tool key");
}
// =========================================================================
// Daemon Reauth Status
// =========================================================================
/// Store a daemon reauth status update (from daemon's ReauthStatus message).
pub fn set_daemon_reauth_status(
&self,
daemon_id: Uuid,
request_id: Uuid,
status: String,
login_url: Option<String>,
error: Option<String>,
) {
self.daemon_reauth_status.insert(
(daemon_id, request_id),
DaemonReauthStatus {
request_id,
status,
login_url,
error,
},
);
}
/// Get a daemon reauth status (for frontend polling).
pub fn get_daemon_reauth_status(
&self,
daemon_id: Uuid,
request_id: Uuid,
) -> Option<DaemonReauthStatus> {
self.daemon_reauth_status
.get(&(daemon_id, request_id))
.map(|entry| entry.value().clone())
}
}
/// Type alias for the shared application state.
pub type SharedState = Arc<AppState>;