summaryrefslogblamecommitdiff
path: root/makima/src/server/state.rs
blob: 9e06b4c25d954aa14aa39e209b12bba8562daf3b (plain) (tree)
1
2
3
4
5
6
7
8
9
                                                                 

                   
                     
                 
                                                             
               
 
                                                                             
                                                   
                          
 












                                                                               




















































                                                                                                   



























                                                                              














                                                                  














                                               





                                                                  
                                    
                      


                                                                     














                                                                         


                                                






                                              

                                                                     




                                                  

                                                

                                                                        









                                                    









                                         
















                                         



















                                                             











































                                                                                               


                                                    





                                                                           





                                                                                           
                                                                           

                                               


                                                                                          


                                                                                          



































































































































                                                                                   



















































                                                                                
                                                                                 
                                       
                                    

                                                         







                                         





                                                                  






                                         







                                                                     








                                                                            






                                                                                      

                                            


                                  
 












                                                      















                                                                                      
























                                                                               








                                                                    





                                                              
                                     



                            
                                     
                                         
                                      









                                                                        

                                         

                                                                



                                                                

                                                                             

                                                                                

                                                                  

                                                              



                                                                                  





                                                                         

                                                                                    

                                                                              

                                                                                    

                                                                                        

                                                                        

                                                                        




                                                                                


               







                                                                           
                                                                     



                                                                          
       
                                                                         

                                 
                               
                                    
                                   
               









                                                                             
                                                                 
                                                        

                                                                                              
                                                                                              
                                                                                                     
                                                                                        
                                                                                 



























                                                                                                                                                 
 
              
                         
                                       
                          
                         

                         
                             
                                 
                          
                       

                                               


                                               
                                                  
                                              
                                                  
                                                    
                                        
                                                 
                                                                            


         





                                                                             

                                                        
                                                                                        

                                                                                                            
                                                                                          
                                                  

                                     
                                                                                   
              



                                                                          




                                                             




































                                                                                                      
     





                                                         







                                                                               














                                                                               






                                                                                       






                                                                                               






                                                                                 






                                                                               


                                                                  


                                   







                                   


                                         




                                       
                             




                                           
                             
                                                     


              


                                                                           
                         





                                     
                         




                                       
                                         
                                           
                            























                                                                                                     




                                                                                                               



















                                                                                         
                                                      





                                                    
                                                      






















                                                                                                  































                                                                                    






















































































































                                                                                                                            






















                                                                                       

























                                                                                

                                                                                

































                                                                                 



                                                
//! Application state holding shared ML models and database pool.

use std::sync::Arc;
use dashmap::DashMap;
use sqlx::PgPool;
use tokio::sync::{broadcast, mpsc, oneshot, Mutex, OnceCell};
use uuid::Uuid;

use crate::listen::{DiarizationConfig, ParakeetEOU, ParakeetTDT, Sortformer};
use crate::server::auth::{AuthConfig, JwtVerifier};
use crate::tts::TtsEngine;

/// Notification payload for file updates (broadcast to WebSocket subscribers).
#[derive(Debug, Clone)]
pub struct FileUpdateNotification {
    /// ID of the updated file
    pub file_id: Uuid,
    /// New version number after update
    pub version: i32,
    /// List of fields that were updated
    pub updated_fields: Vec<String>,
    /// Source of the update: "user", "llm", or "system"
    pub updated_by: String,
}

// =============================================================================
// Task/Mesh Notifications
// =============================================================================

/// Notification payload for task updates (broadcast to WebSocket subscribers).
#[derive(Debug, Clone)]
pub struct TaskUpdateNotification {
    /// ID of the updated task
    pub task_id: Uuid,
    /// Owner ID for data isolation (notifications are scoped to owner)
    pub owner_id: Option<Uuid>,
    /// New version number after update
    pub version: i32,
    /// Current task status
    pub status: String,
    /// List of fields that were updated
    pub updated_fields: Vec<String>,
    /// Source of the update: "user", "daemon", or "system"
    pub updated_by: String,
}

/// Notification for streaming task output from Claude Code containers.
#[derive(Debug, Clone, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct TaskOutputNotification {
    /// ID of the task producing output
    pub task_id: Uuid,
    /// Owner ID for data isolation (notifications are scoped to owner)
    #[serde(skip)]
    pub owner_id: Option<Uuid>,
    /// Type of message: "assistant", "tool_use", "tool_result", "result", "system", "error", "raw"
    pub message_type: String,
    /// Main text content of the message
    pub content: String,
    /// Tool name if this is a tool_use message
    #[serde(skip_serializing_if = "Option::is_none")]
    pub tool_name: Option<String>,
    /// Tool input (JSON) if this is a tool_use message
    #[serde(skip_serializing_if = "Option::is_none")]
    pub tool_input: Option<serde_json::Value>,
    /// Whether tool result was an error
    #[serde(skip_serializing_if = "Option::is_none")]
    pub is_error: Option<bool>,
    /// Cost in USD if this is a result message
    #[serde(skip_serializing_if = "Option::is_none")]
    pub cost_usd: Option<f64>,
    /// Duration in milliseconds if this is a result message
    #[serde(skip_serializing_if = "Option::is_none")]
    pub duration_ms: Option<u64>,
    /// Whether this is a partial line (more coming) or complete
    pub is_partial: bool,
}

/// Notification for task completion events (for supervisor tasks to monitor).
#[derive(Debug, Clone, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct TaskCompletionNotification {
    /// ID of the completed task
    pub task_id: Uuid,
    /// Owner ID for data isolation
    #[serde(skip)]
    pub owner_id: Option<Uuid>,
    /// Contract ID if task belongs to a contract
    #[serde(skip_serializing_if = "Option::is_none")]
    pub contract_id: Option<Uuid>,
    /// Parent task ID (to notify parent/supervisor)
    #[serde(skip_serializing_if = "Option::is_none")]
    pub parent_task_id: Option<Uuid>,
    /// Final status: "done", "failed", etc.
    pub status: String,
    /// Summary of task output/results
    #[serde(skip_serializing_if = "Option::is_none")]
    pub output_summary: Option<String>,
    /// Path to the task's worktree (for reading files)
    #[serde(skip_serializing_if = "Option::is_none")]
    pub worktree_path: Option<String>,
    /// Error message if task failed
    #[serde(skip_serializing_if = "Option::is_none")]
    pub error_message: Option<String>,
}

/// Notification for merge operation results.
#[derive(Debug, Clone)]
pub struct MergeResultNotification {
    /// ID of the task that was merged
    pub task_id: Uuid,
    /// Whether the merge succeeded
    pub success: bool,
    /// Message describing the result
    pub message: String,
    /// Commit SHA if merge succeeded
    pub commit_sha: Option<String>,
    /// List of conflicting files if merge failed due to conflicts
    pub conflicts: Option<Vec<String>>,
}

/// Notification for PR creation results.
#[derive(Debug, Clone)]
pub struct PrResultNotification {
    /// ID of the task for which PR was created
    pub task_id: Uuid,
    /// Whether the PR creation succeeded
    pub success: bool,
    /// Message describing the result
    pub message: String,
    /// PR URL if creation succeeded
    pub pr_url: Option<String>,
    /// PR number if creation succeeded
    pub pr_number: Option<i32>,
}

/// Notification for supervisor questions requiring user feedback.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SupervisorQuestionNotification {
    /// Unique ID for this question
    pub question_id: Uuid,
    /// Task that asked the question
    pub task_id: Uuid,
    /// Directive this question relates to (if from a directive task)
    #[serde(skip_serializing_if = "Option::is_none")]
    pub directive_id: Option<Uuid>,
    /// Owner ID for data isolation
    #[serde(skip)]
    pub owner_id: Option<Uuid>,
    /// The question text
    pub question: String,
    /// Optional choices for the user (if empty, free-form text response)
    #[serde(default, skip_serializing_if = "Vec::is_empty")]
    pub choices: Vec<String>,
    /// Context about what phase/action this relates to
    #[serde(skip_serializing_if = "Option::is_none")]
    pub context: Option<String>,
    /// Whether this question is still pending
    pub pending: bool,
    /// When the question was asked
    pub created_at: chrono::DateTime<chrono::Utc>,
    /// Whether multiple choices can be selected
    #[serde(default)]
    pub multi_select: bool,
}

/// Stored supervisor question for persistence
#[derive(Debug, Clone)]
pub struct PendingSupervisorQuestion {
    pub question_id: Uuid,
    pub task_id: Uuid,
    /// Directive this question relates to (if from a directive task)
    pub directive_id: Option<Uuid>,
    pub owner_id: Uuid,
    pub question: String,
    pub choices: Vec<String>,
    pub context: Option<String>,
    pub created_at: chrono::DateTime<chrono::Utc>,
    /// Whether multiple choices can be selected
    pub multi_select: bool,
    /// Question type: general, phase_confirmation, or contract_complete
    pub question_type: String,
}

/// Response to a supervisor question
#[derive(Debug, Clone)]
pub struct SupervisorQuestionResponse {
    pub question_id: Uuid,
    pub response: String,
    pub responded_at: chrono::DateTime<chrono::Utc>,
}

/// Worktree diff response from daemon
#[derive(Debug, Clone, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct WorktreeDiffResponse {
    pub task_id: Uuid,
    pub success: bool,
    pub diff: String,
    pub error: Option<String>,
}

/// Worktree info response from daemon
#[derive(Debug, Clone, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct WorktreeInfoResponse {
    pub task_id: Uuid,
    pub success: bool,
    pub worktree_path: Option<String>,
    pub exists: bool,
    pub files_changed: i32,
    pub insertions: i32,
    pub deletions: i32,
    pub files: Option<serde_json::Value>,
    pub branch: Option<String>,
    pub head_sha: Option<String>,
    pub error: Option<String>,
}

/// Task diff result from daemon
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TaskDiffResult {
    pub task_id: Uuid,
    pub success: bool,
    pub diff: Option<String>,
    pub error: Option<String>,
}

/// Worktree commit response from daemon
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct WorktreeCommitResponse {
    pub task_id: Uuid,
    pub success: bool,
    pub commit_sha: Option<String>,
    pub error: Option<String>,
}

/// Command sent from server to daemon.
#[derive(Debug, Clone, serde::Serialize)]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum DaemonCommand {
    /// Confirm successful authentication
    Authenticated {
        #[serde(rename = "daemonId")]
        daemon_id: Uuid,
    },
    /// Spawn a new task in a container
    SpawnTask {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        /// Human-readable task name (used for commit messages)
        #[serde(rename = "taskName")]
        task_name: String,
        plan: String,
        #[serde(rename = "repoUrl")]
        repo_url: Option<String>,
        #[serde(rename = "baseBranch")]
        base_branch: Option<String>,
        /// Target branch to merge into (used for completion actions)
        #[serde(rename = "targetBranch")]
        target_branch: Option<String>,
        /// Parent task ID if this is a subtask
        #[serde(rename = "parentTaskId")]
        parent_task_id: Option<Uuid>,
        /// Depth in task hierarchy (0=top-level, 1=subtask, 2=sub-subtask)
        depth: i32,
        /// Whether this task should run as an orchestrator (true if depth==0 and has subtasks)
        #[serde(rename = "isOrchestrator")]
        is_orchestrator: bool,
        /// Path to user's local repository (outside ~/.makima) for completion actions
        #[serde(rename = "targetRepoPath")]
        target_repo_path: Option<String>,
        /// Action on completion: "none", "branch", "merge", "pr"
        #[serde(rename = "completionAction")]
        completion_action: Option<String>,
        /// Task ID to continue from (copy worktree from this task)
        #[serde(rename = "continueFromTaskId")]
        continue_from_task_id: Option<Uuid>,
        /// Files to copy from parent task's worktree
        #[serde(rename = "copyFiles")]
        copy_files: Option<Vec<String>>,
        /// Whether to run in autonomous loop mode
        #[serde(rename = "autonomousLoop", default)]
        autonomous_loop: bool,
        /// Whether to resume from a previous session using --continue flag
        #[serde(rename = "resumeSession", default)]
        resume_session: bool,
        /// Conversation history for fallback when worktree doesn't exist
        #[serde(rename = "conversationHistory", default)]
        conversation_history: Option<serde_json::Value>,
        /// Base64-encoded gzip-compressed patch for worktree recovery
        #[serde(rename = "patchData", default, skip_serializing_if = "Option::is_none")]
        patch_data: Option<String>,
        /// Commit SHA to apply the patch on top of
        #[serde(rename = "patchBaseSha", default, skip_serializing_if = "Option::is_none")]
        patch_base_sha: Option<String>,
        /// Whether to skip automatic completion actions (local-only mode).
        #[serde(rename = "localOnly", default)]
        local_only: bool,
        /// Whether to auto-merge to target branch locally when local_only mode is enabled
        #[serde(rename = "autoMergeLocal", default)]
        auto_merge_local: bool,
        /// Directive ID if this task is associated with a directive
        #[serde(rename = "directiveId", default, skip_serializing_if = "Option::is_none")]
        directive_id: Option<Uuid>,
    },
    /// Pause a running task
    PauseTask {
        #[serde(rename = "taskId")]
        task_id: Uuid,
    },
    /// Resume a paused task
    ResumeTask {
        #[serde(rename = "taskId")]
        task_id: Uuid,
    },
    /// Interrupt a task (gracefully or forced)
    InterruptTask {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        graceful: bool,
    },
    /// Send a message to a running task
    SendMessage {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        message: String,
    },
    /// Inject context about sibling task progress
    InjectSiblingContext {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        #[serde(rename = "siblingTaskId")]
        sibling_task_id: Uuid,
        #[serde(rename = "siblingName")]
        sibling_name: String,
        #[serde(rename = "siblingStatus")]
        sibling_status: String,
        #[serde(rename = "progressSummary")]
        progress_summary: Option<String>,
        #[serde(rename = "changedFiles")]
        changed_files: Vec<String>,
    },

    // =========================================================================
    // Merge Commands (for orchestrators to merge subtask branches)
    // =========================================================================

    /// List all subtask branches for a task
    ListBranches {
        #[serde(rename = "taskId")]
        task_id: Uuid,
    },
    /// Start merging a subtask branch
    MergeStart {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        #[serde(rename = "sourceBranch")]
        source_branch: String,
    },
    /// Get current merge status
    MergeStatus {
        #[serde(rename = "taskId")]
        task_id: Uuid,
    },
    /// Resolve a merge conflict
    MergeResolve {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        file: String,
        /// "ours" or "theirs"
        strategy: String,
    },
    /// Commit the current merge
    MergeCommit {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        message: String,
    },
    /// Abort the current merge
    MergeAbort {
        #[serde(rename = "taskId")]
        task_id: Uuid,
    },
    /// Skip merging a subtask branch (mark as intentionally not merged)
    MergeSkip {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        #[serde(rename = "subtaskId")]
        subtask_id: Uuid,
        reason: String,
    },
    /// Check if all subtask branches have been merged or skipped (completion gate)
    CheckMergeComplete {
        #[serde(rename = "taskId")]
        task_id: Uuid,
    },

    // =========================================================================
    // Completion Action Commands
    // =========================================================================

    /// Retry a completion action for a completed task
    RetryCompletionAction {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        /// Human-readable task name (used for commit messages)
        #[serde(rename = "taskName")]
        task_name: String,
        /// The action to execute: "branch", "merge", or "pr"
        action: String,
        /// Path to the target repository
        #[serde(rename = "targetRepoPath")]
        target_repo_path: String,
        /// Target branch to merge into (for merge/pr actions)
        #[serde(rename = "targetBranch")]
        target_branch: Option<String>,
    },

    /// Clone worktree to a target directory
    CloneWorktree {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        /// Path to the target directory
        #[serde(rename = "targetDir")]
        target_dir: String,
    },

    /// Check if a target directory exists
    CheckTargetExists {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        /// Path to check
        #[serde(rename = "targetDir")]
        target_dir: String,
    },

    // =========================================================================
    // Contract File Commands
    // =========================================================================

    /// Read a file from a repository linked to a contract
    ReadRepoFile {
        /// Request ID for correlating response
        #[serde(rename = "requestId")]
        request_id: Uuid,
        /// Contract ID (used for logging/context)
        #[serde(rename = "contractId")]
        contract_id: Uuid,
        /// Path to the file within the repository
        #[serde(rename = "filePath")]
        file_path: String,
        /// Full repository path on daemon's filesystem
        #[serde(rename = "repoPath")]
        repo_path: String,
    },

    // =========================================================================
    // Supervisor Git Commands
    // =========================================================================

    /// Create a new branch in a task's worktree
    CreateBranch {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        #[serde(rename = "branchName")]
        branch_name: String,
        /// Optional reference to create branch from (task_id or SHA)
        #[serde(rename = "fromRef")]
        from_ref: Option<String>,
    },

    /// Merge a task's changes to a target branch
    MergeTaskToTarget {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        /// Target branch to merge into (default: task's base branch)
        #[serde(rename = "targetBranch")]
        target_branch: Option<String>,
        /// Whether to squash commits
        squash: bool,
    },

    /// Create a pull request for a task's changes
    CreatePR {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        title: String,
        body: Option<String>,
        /// Base branch for the PR. If None, will be auto-detected from the repo.
        #[serde(rename = "baseBranch")]
        base_branch: Option<String>,
        /// Source branch name to push and create PR from
        branch: String,
    },

    /// Get the diff for a task's changes
    GetTaskDiff {
        #[serde(rename = "taskId")]
        task_id: Uuid,
    },

    /// Get worktree information (files, stats, branch) for a task
    GetWorktreeInfo {
        #[serde(rename = "taskId")]
        task_id: Uuid,
    },

    /// Commit changes in a task worktree
    CommitWorktree {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        message: Option<String>,
    },

    /// Create a git checkpoint (stage changes, commit, record stats)
    CreateCheckpoint {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        /// Commit message for the checkpoint
        message: String,
    },

    /// Clean up a task's worktree (used when contract is completed/deleted)
    CleanupWorktree {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        /// Whether to delete the associated branch
        #[serde(rename = "deleteBranch")]
        delete_branch: bool,
    },

    /// Inherit git config (user.email, user.name) from a directory
    InheritGitConfig {
        /// Directory to read git config from (defaults to daemon's working directory)
        #[serde(rename = "sourceDir")]
        source_dir: Option<String>,
    },

    /// Error response
    Error { code: String, message: String },

    /// Restart the daemon process
    RestartDaemon,

    /// Trigger OAuth re-authentication on this daemon
    TriggerReauth {
        #[serde(rename = "requestId")]
        request_id: Uuid,
    },

    /// Submit auth code for pending reauth
    SubmitAuthCode {
        #[serde(rename = "requestId")]
        request_id: Uuid,
        code: String,
    },

    /// Apply a patch to a task's worktree (for cross-daemon merge).
    /// Sent by server when routing MergePatchToSupervisor to the supervisor's daemon.
    ApplyPatchToWorktree {
        /// Target task whose worktree should be patched.
        #[serde(rename = "targetTaskId")]
        target_task_id: Uuid,
        /// Source task that generated the patch (for logging).
        #[serde(rename = "sourceTaskId")]
        source_task_id: Uuid,
        /// Base64-gzipped patch data.
        #[serde(rename = "patchData")]
        patch_data: String,
        /// Base commit SHA for the patch.
        #[serde(rename = "baseSha")]
        base_sha: String,
    },
}

/// Active daemon connection info stored in state.
#[derive(Debug)]
pub struct DaemonConnectionInfo {
    /// Database ID of the daemon
    pub id: Uuid,
    /// Owner ID for data isolation (from API key authentication)
    pub owner_id: Uuid,
    /// WebSocket connection identifier
    pub connection_id: String,
    /// Daemon hostname
    pub hostname: Option<String>,
    /// Machine identifier
    pub machine_id: Option<String>,
    /// Channel to send commands to this daemon
    pub command_sender: mpsc::Sender<DaemonCommand>,
    /// Current working directory of the daemon
    pub working_directory: Option<String>,
    /// Path to ~/.makima/home directory on daemon (for cloning completed work)
    pub home_directory: Option<String>,
    /// Path to worktrees directory (~/.makima/worktrees) on daemon
    pub worktrees_directory: Option<String>,
}

/// Status of a daemon reauth request (stored in state for polling).
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DaemonReauthStatus {
    pub request_id: Uuid,
    pub status: String,
    pub login_url: Option<String>,
    pub error: Option<String>,
}

/// Configuration paths for ML models (used for lazy loading).
#[derive(Clone)]
pub struct ModelConfig {
    pub parakeet_model_dir: String,
    pub parakeet_eou_dir: String,
    pub sortformer_model_path: String,
    pub chatterbox_model_dir: String,
}

/// Lazily-loaded ML models.
pub struct MlModels {
    pub parakeet: Mutex<ParakeetTDT>,
    pub parakeet_eou: Mutex<ParakeetEOU>,
    pub sortformer: Mutex<Sortformer>,
}

/// Shared application state containing ML models and database pool.
///
/// Models are lazily loaded on first use to speed up server startup.
pub struct AppState {
    /// ML model configuration (paths for lazy loading)
    pub model_config: Option<ModelConfig>,
    /// Lazily-loaded ML models (initialized on first Listen connection)
    pub ml_models: OnceCell<MlModels>,
    /// Optional database connection pool
    pub db_pool: Option<PgPool>,
    /// Broadcast channel for file update notifications
    pub file_updates: broadcast::Sender<FileUpdateNotification>,
    /// Broadcast channel for task update notifications
    pub task_updates: broadcast::Sender<TaskUpdateNotification>,
    /// Broadcast channel for task output streaming
    pub task_output: broadcast::Sender<TaskOutputNotification>,
    /// Broadcast channel for task completion notifications (for supervisors)
    pub task_completions: broadcast::Sender<TaskCompletionNotification>,
    /// Broadcast channel for supervisor question notifications
    pub supervisor_questions: broadcast::Sender<SupervisorQuestionNotification>,
    /// Broadcast channel for merge result notifications
    pub merge_results: broadcast::Sender<MergeResultNotification>,
    /// Broadcast channel for PR creation result notifications
    pub pr_results: broadcast::Sender<PrResultNotification>,
    /// Pending supervisor questions awaiting user response (keyed by question_id)
    pub pending_questions: DashMap<Uuid, PendingSupervisorQuestion>,
    /// Responses to supervisor questions (keyed by question_id)
    pub question_responses: DashMap<Uuid, SupervisorQuestionResponse>,
    /// Active daemon connections (keyed by connection_id)
    pub daemon_connections: DashMap<String, DaemonConnectionInfo>,
    /// Tool keys for orchestrator API access (key -> task_id)
    pub tool_keys: DashMap<String, Uuid>,
    /// JWT verifier for Supabase authentication (None if not configured)
    pub jwt_verifier: Option<JwtVerifier>,
    /// Pending worktree info requests awaiting daemon response (keyed by task_id)
    pub pending_worktree_info: DashMap<Uuid, oneshot::Sender<WorktreeInfoResponse>>,
    /// Pending task diff requests awaiting daemon response (keyed by task_id)
    pub pending_task_diff: DashMap<Uuid, oneshot::Sender<TaskDiffResult>>,
    /// Pending worktree diff requests awaiting daemon response (keyed by task_id)
    pub pending_worktree_diff: DashMap<Uuid, oneshot::Sender<WorktreeDiffResponse>>,
    /// Pending worktree commit requests awaiting daemon response (keyed by task_id)
    pub pending_worktree_commit: DashMap<Uuid, oneshot::Sender<WorktreeCommitResponse>>,
    /// Lazily-loaded TTS engine (initialized on first Speak connection)
    pub tts_engine: OnceCell<Box<dyn TtsEngine>>,
    /// Daemon reauth status storage (keyed by (daemon_id, request_id))
    pub daemon_reauth_status: DashMap<(Uuid, Uuid), DaemonReauthStatus>,
    /// Signal used to nudge the directive reconciler to run a tick immediately
    /// (e.g. after a goal update) rather than waiting up to 15s for the next
    /// interval. The reconciler loop in `server::mod` awaits `notified()` in
    /// parallel with its interval; handlers call `kick_directive_reconciler()`.
    pub directive_kick: std::sync::Arc<tokio::sync::Notify>,
}

impl AppState {
    /// Create AppState WITHOUT ML model configuration. Listen and Speak
    /// endpoints will return "not configured" errors if used; everything
    /// else (mesh, directives, files, contracts-free CRUD) works normally.
    /// This is the constructor used by the slim Dockerfile.
    pub fn new_slim() -> Self {
        Self::new_inner(None)
    }

    /// Create AppState with ML model configuration for lazy loading.
    /// Pass None to disable a specific model family — Listen needs all
    /// three of parakeet/parakeet_eou/sortformer; Speak needs chatterbox.
    /// If `parakeet_model_dir` is None we skip the whole ModelConfig and
    /// behave like `new_slim()`.
    ///
    /// Models are NOT loaded at startup — they're loaded on first use.
    pub fn new(
        parakeet_model_dir: &str,
        parakeet_eou_dir: &str,
        sortformer_model_path: &str,
        chatterbox_model_dir: &str,
    ) -> Self {
        Self::new_inner(Some(ModelConfig {
            parakeet_model_dir: parakeet_model_dir.to_string(),
            parakeet_eou_dir: parakeet_eou_dir.to_string(),
            sortformer_model_path: sortformer_model_path.to_string(),
            chatterbox_model_dir: chatterbox_model_dir.to_string(),
        }))
    }

    /// Internal constructor — model_config can be None for the slim build.
    fn new_inner(model_config: Option<ModelConfig>) -> Self {
        // Create broadcast channels with buffer for 256 messages
        let (file_updates, _) = broadcast::channel(256);
        let (task_updates, _) = broadcast::channel(256);
        let (task_output, _) = broadcast::channel(1024); // Larger buffer for output streaming
        let (task_completions, _) = broadcast::channel(256); // For supervisor task monitoring
        let (supervisor_questions, _) = broadcast::channel(256); // For supervisor questions to users
        let (merge_results, _) = broadcast::channel(256); // For merge operation results
        let (pr_results, _) = broadcast::channel(256); // For PR creation results

        // Initialize JWT verifier from environment (optional)
        // Requires SUPABASE_URL and either SUPABASE_JWT_PUBLIC_KEY (RS256) or SUPABASE_JWT_SECRET (HS256)
        let jwt_verifier = match AuthConfig::from_env() {
            Some(config) => match JwtVerifier::new(config) {
                Ok(verifier) => {
                    tracing::info!("JWT authentication configured");
                    Some(verifier)
                }
                Err(e) => {
                    tracing::error!("Failed to initialize JWT verifier: {}", e);
                    None
                }
            },
            None => {
                // Log which env vars are missing
                let has_url = std::env::var("SUPABASE_URL").is_ok();
                let has_public_key = std::env::var("SUPABASE_JWT_PUBLIC_KEY").is_ok();
                let has_secret = std::env::var("SUPABASE_JWT_SECRET").is_ok();

                if !has_url {
                    tracing::info!("JWT authentication not configured (SUPABASE_URL not set)");
                } else if !has_public_key && !has_secret {
                    tracing::info!("JWT authentication not configured (set SUPABASE_JWT_PUBLIC_KEY for RS256 or SUPABASE_JWT_SECRET for HS256)");
                }
                None
            }
        };

        Self {
            model_config,
            ml_models: OnceCell::new(),
            db_pool: None,
            file_updates,
            task_updates,
            task_output,
            task_completions,
            supervisor_questions,
            merge_results,
            pr_results,
            pending_questions: DashMap::new(),
            question_responses: DashMap::new(),
            daemon_connections: DashMap::new(),
            tool_keys: DashMap::new(),
            jwt_verifier,
            pending_worktree_info: DashMap::new(),
            pending_task_diff: DashMap::new(),
            pending_worktree_diff: DashMap::new(),
            pending_worktree_commit: DashMap::new(),
            tts_engine: OnceCell::new(),
            daemon_reauth_status: DashMap::new(),
            directive_kick: std::sync::Arc::new(tokio::sync::Notify::new()),
        }
    }

    /// Wake the directive reconciler so it ticks now instead of waiting for
    /// the next 15-second interval. Cheap and safe to call from any handler.
    pub fn kick_directive_reconciler(&self) {
        self.directive_kick.notify_one();
    }

    /// Get or initialize the TTS engine (lazy loading).
    ///
    /// The TTS engine is loaded on first Speak connection using the Chatterbox backend.
    /// Returns a reference to the engine, or an error if loading fails.
    pub async fn get_tts_engine(&self) -> Result<&dyn TtsEngine, Box<dyn std::error::Error + Send + Sync>> {
        let tts_dir = self.model_config.as_ref().map(|c| c.chatterbox_model_dir.as_str());
        self.tts_engine.get_or_try_init(|| async {
            tracing::info!(
                model_dir = ?tts_dir,
                "Lazy-loading TTS engine (Chatterbox) on first Speak connection..."
            );
            let engine = crate::tts::TtsEngineFactory::create(tts_dir)
                .map_err(|e| -> Box<dyn std::error::Error + Send + Sync> {
                    Box::new(e)
                })?;
            tracing::info!("TTS engine loaded successfully");
            Ok(engine)
        }).await.map(|b| b.as_ref())
    }

    /// Get or initialize ML models (lazy loading).
    ///
    /// Models are loaded on first call and cached for subsequent calls.
    /// Returns None if model config is not set.
    pub async fn get_ml_models(&self) -> Result<&MlModels, Box<dyn std::error::Error + Send + Sync>> {
        let config = self.model_config.as_ref()
            .ok_or_else(|| "ML model configuration not set")?;

        self.ml_models.get_or_try_init(|| async {
            tracing::info!(
                parakeet = %config.parakeet_model_dir,
                eou = %config.parakeet_eou_dir,
                sortformer = %config.sortformer_model_path,
                "Lazy-loading ML models on first Listen connection..."
            );

            let parakeet = ParakeetTDT::from_pretrained(&config.parakeet_model_dir, None)?;
            let parakeet_eou = ParakeetEOU::from_pretrained(&config.parakeet_eou_dir, None)?;
            let sortformer = Sortformer::with_config(
                &config.sortformer_model_path,
                None,
                DiarizationConfig::callhome(),
            )?;

            tracing::info!("ML models loaded successfully");

            Ok(MlModels {
                parakeet: Mutex::new(parakeet),
                parakeet_eou: Mutex::new(parakeet_eou),
                sortformer: Mutex::new(sortformer),
            })
        }).await
    }

    /// Check if ML models are loaded.
    pub fn are_models_loaded(&self) -> bool {
        self.ml_models.initialized()
    }

    /// Set the database pool.
    pub fn with_db_pool(mut self, pool: PgPool) -> Self {
        self.db_pool = Some(pool);
        self
    }

    /// Broadcast a file update notification to all subscribers.
    ///
    /// This is a no-op if there are no subscribers (ignores send errors).
    pub fn broadcast_file_update(&self, notification: FileUpdateNotification) {
        // Ignore send errors - they just mean no one is listening
        let _ = self.file_updates.send(notification);
    }

    /// Broadcast a task update notification to all subscribers.
    ///
    /// This is a no-op if there are no subscribers (ignores send errors).
    pub fn broadcast_task_update(&self, notification: TaskUpdateNotification) {
        let _ = self.task_updates.send(notification);
    }

    /// Broadcast task output to all subscribers.
    ///
    /// Used for streaming Claude Code container output to frontend clients.
    pub fn broadcast_task_output(&self, notification: TaskOutputNotification) {
        let _ = self.task_output.send(notification);
    }

    /// Broadcast a task completion notification to all subscribers.
    ///
    /// Used to notify supervisor tasks when their child tasks complete.
    pub fn broadcast_task_completion(&self, notification: TaskCompletionNotification) {
        let _ = self.task_completions.send(notification);
    }

    /// Broadcast a supervisor question notification to all subscribers.
    ///
    /// Used to notify frontend clients when a supervisor needs user feedback.
    pub fn broadcast_supervisor_question(&self, notification: SupervisorQuestionNotification) {
        let _ = self.supervisor_questions.send(notification);
    }

    /// Broadcast a merge result notification to all subscribers.
    ///
    /// Used to notify waiting handlers when a merge operation completes.
    pub fn broadcast_merge_result(&self, notification: MergeResultNotification) {
        let _ = self.merge_results.send(notification);
    }

    /// Broadcast a PR creation result notification to all subscribers.
    ///
    /// Used to notify waiting handlers when a PR creation operation completes.
    pub fn broadcast_pr_result(&self, notification: PrResultNotification) {
        let _ = self.pr_results.send(notification);
    }

    /// Add a pending question and broadcast it. Questions live in
    /// memory only; they're a back-channel for directive tasks to
    /// pause for clarification (used by `makima directive ask`).
    pub fn add_supervisor_question(
        &self,
        task_id: Uuid,
        directive_id: Option<Uuid>,
        owner_id: Uuid,
        question: String,
        choices: Vec<String>,
        context: Option<String>,
        multi_select: bool,
        question_type: String,
    ) -> Uuid {
        let question_id = Uuid::new_v4();
        let now = chrono::Utc::now();

        self.pending_questions.insert(
            question_id,
            PendingSupervisorQuestion {
                question_id,
                task_id,
                directive_id,
                owner_id,
                question: question.clone(),
                choices: choices.clone(),
                context: context.clone(),
                created_at: now,
                multi_select,
                question_type: question_type.clone(),
            },
        );

        self.broadcast_supervisor_question(SupervisorQuestionNotification {
            question_id,
            task_id,
            directive_id,
            owner_id: Some(owner_id),
            question,
            choices,
            context,
            pending: true,
            created_at: now,
            multi_select,
        });

        tracing::info!(
            question_id = %question_id,
            task_id = %task_id,
            directive_id = ?directive_id,
            question_type = %question_type,
            "Question added"
        );

        question_id
    }

    /// Remove a pending question (after it's been answered).
    pub fn remove_pending_question(&self, question_id: Uuid) -> Option<PendingSupervisorQuestion> {
        self.pending_questions.remove(&question_id).map(|(_, q)| q)
    }

    /// Get all pending questions for an owner.
    pub fn get_pending_questions_for_owner(&self, owner_id: Uuid) -> Vec<PendingSupervisorQuestion> {
        self.pending_questions
            .iter()
            .filter(|entry| entry.value().owner_id == owner_id)
            .map(|entry| entry.value().clone())
            .collect()
    }

    /// Get a specific pending question.
    pub fn get_pending_question(&self, question_id: Uuid) -> Option<PendingSupervisorQuestion> {
        self.pending_questions.get(&question_id).map(|entry| entry.value().clone())
    }

    /// Check if a pending question exists (either still pending or has a response ready).
    pub fn has_pending_question(&self, question_id: Uuid) -> bool {
        self.pending_questions.contains_key(&question_id) || self.question_responses.contains_key(&question_id)
    }

    /// Submit a response to a supervisor question.
    pub fn submit_question_response(&self, question_id: Uuid, response: String) -> bool {
        // Check if the question exists
        if let Some(question) = self.pending_questions.remove(&question_id) {
            let now = chrono::Utc::now();

            // Store the response
            self.question_responses.insert(
                question_id,
                SupervisorQuestionResponse {
                    question_id,
                    response: response.clone(),
                    responded_at: now,
                },
            );

            // Broadcast that the question is no longer pending
            self.broadcast_supervisor_question(SupervisorQuestionNotification {
                question_id,
                task_id: question.1.task_id,
                directive_id: question.1.directive_id,
                owner_id: Some(question.1.owner_id),
                question: question.1.question,
                choices: question.1.choices,
                context: question.1.context,
                pending: false,
                created_at: question.1.created_at,
                multi_select: question.1.multi_select,
            });

            tracing::info!(
                question_id = %question_id,
                "Supervisor question answered"
            );

            true
        } else {
            false
        }
    }

    /// Get the response to a question (if answered).
    pub fn get_question_response(&self, question_id: Uuid) -> Option<SupervisorQuestionResponse> {
        self.question_responses.get(&question_id).map(|entry| entry.value().clone())
    }

    /// Clean up a question response after the supervisor has read it.
    pub fn cleanup_question_response(&self, question_id: Uuid) {
        self.question_responses.remove(&question_id);
    }

    /// Remove all pending questions for a specific task.
    ///
    /// This should be called when a task is deleted to clean up orphaned questions.
    /// Returns the number of questions removed.
    pub fn remove_pending_questions_for_task(&self, task_id: Uuid) -> usize {
        // Collect question IDs to remove
        let question_ids: Vec<Uuid> = self
            .pending_questions
            .iter()
            .filter(|entry| entry.value().task_id == task_id)
            .map(|entry| entry.value().question_id)
            .collect();

        let count = question_ids.len();

        // Remove pending questions and their responses
        for question_id in question_ids {
            self.pending_questions.remove(&question_id);
            self.question_responses.remove(&question_id);
        }

        if count > 0 {
            tracing::info!(
                task_id = %task_id,
                count = count,
                "Cleaned up pending questions for deleted task"
            );
        }

        count
    }

    /// Register a new daemon connection.
    ///
    /// Returns the connection_id for later reference.
    pub fn register_daemon(
        &self,
        connection_id: String,
        daemon_id: Uuid,
        owner_id: Uuid,
        hostname: Option<String>,
        machine_id: Option<String>,
        command_sender: mpsc::Sender<DaemonCommand>,
    ) {
        self.daemon_connections.insert(
            connection_id.clone(),
            DaemonConnectionInfo {
                id: daemon_id,
                owner_id,
                connection_id,
                hostname,
                machine_id,
                command_sender,
                working_directory: None,
                home_directory: None,
                worktrees_directory: None,
            },
        );
    }

    /// Update daemon directory information.
    pub fn update_daemon_directories(
        &self,
        connection_id: &str,
        working_directory: String,
        home_directory: String,
        worktrees_directory: String,
    ) {
        if let Some(mut entry) = self.daemon_connections.get_mut(connection_id) {
            entry.working_directory = Some(working_directory);
            entry.home_directory = Some(home_directory);
            entry.worktrees_directory = Some(worktrees_directory);
        }
    }

    /// Unregister a daemon connection.
    pub fn unregister_daemon(&self, connection_id: &str) {
        self.daemon_connections.remove(connection_id);
    }

    /// Get a daemon connection by connection_id.
    pub fn get_daemon(&self, connection_id: &str) -> Option<dashmap::mapref::one::Ref<'_, String, DaemonConnectionInfo>> {
        self.daemon_connections.get(connection_id)
    }

    /// Get a daemon by its database ID.
    pub fn get_daemon_by_id(&self, daemon_id: Uuid) -> Option<dashmap::mapref::one::Ref<'_, String, DaemonConnectionInfo>> {
        self.daemon_connections
            .iter()
            .find(|entry| entry.value().id == daemon_id)
            .map(|entry| {
                // Return a reference to the found entry
                self.daemon_connections.get(entry.key()).unwrap()
            })
    }

    /// Send a command to a specific daemon by its database ID.
    pub async fn send_daemon_command(&self, daemon_id: Uuid, command: DaemonCommand) -> Result<(), String> {
        if let Some(daemon) = self.daemon_connections
            .iter()
            .find(|entry| entry.value().id == daemon_id)
        {
            daemon.value().command_sender.send(command).await
                .map_err(|e| format!("Failed to send command to daemon: {}", e))
        } else {
            Err(format!("Daemon {} not connected", daemon_id))
        }
    }

    /// Broadcast sibling progress to all running sibling tasks.
    ///
    /// This is used for sibling awareness - when a task makes progress,
    /// its siblings are notified so they can adjust their work if needed.
    pub async fn broadcast_sibling_progress(
        &self,
        source_task_id: Uuid,
        source_task_name: &str,
        source_task_status: &str,
        progress_summary: Option<String>,
        changed_files: Vec<String>,
        running_sibling_daemon_ids: Vec<(Uuid, Uuid)>, // (task_id, daemon_id)
    ) {
        for (sibling_task_id, daemon_id) in running_sibling_daemon_ids {
            let command = DaemonCommand::InjectSiblingContext {
                task_id: sibling_task_id,
                sibling_task_id: source_task_id,
                sibling_name: source_task_name.to_string(),
                sibling_status: source_task_status.to_string(),
                progress_summary: progress_summary.clone(),
                changed_files: changed_files.clone(),
            };

            // Fire and forget - don't block on sending to all daemons
            if let Err(e) = self.send_daemon_command(daemon_id, command).await {
                tracing::warn!(
                    "Failed to inject sibling context to task {}: {}",
                    sibling_task_id,
                    e
                );
            }
        }
    }

    /// Get list of connected daemon IDs.
    pub fn list_connected_daemon_ids(&self) -> Vec<Uuid> {
        self.daemon_connections
            .iter()
            .map(|entry| entry.value().id)
            .collect()
    }

    /// Find an alternative daemon for a task, excluding specified daemon IDs.
    /// Returns the daemon ID and connection info if found.
    pub fn find_alternative_daemon(
        &self,
        owner_id: Uuid,
        exclude_daemon_ids: &[Uuid],
    ) -> Option<Uuid> {
        self.daemon_connections
            .iter()
            .find(|entry| {
                let daemon = entry.value();
                daemon.owner_id == owner_id && !exclude_daemon_ids.contains(&daemon.id)
            })
            .map(|entry| entry.value().id)
    }

    /// Check if a specific daemon is connected.
    pub fn is_daemon_connected(&self, daemon_id: Uuid) -> bool {
        self.daemon_connections
            .iter()
            .any(|entry| entry.value().id == daemon_id)
    }

    // =========================================================================
    // Tool Key Management
    // =========================================================================

    /// Register a tool key for a task.
    ///
    /// This allows orchestrators to authenticate with the API using
    /// the `X-Makima-Tool-Key` header.
    pub fn register_tool_key(&self, key: String, task_id: Uuid) {
        tracing::info!(task_id = %task_id, "Registering tool key");
        self.tool_keys.insert(key, task_id);
    }

    /// Validate a tool key and return the associated task ID.
    pub fn validate_tool_key(&self, key: &str) -> Option<Uuid> {
        self.tool_keys.get(key).map(|entry| *entry.value())
    }

    /// Revoke a tool key for a task.
    ///
    /// This should be called when a task completes or is terminated.
    pub fn revoke_tool_key(&self, task_id: Uuid) {
        // Find and remove the key for this task
        self.tool_keys.retain(|_, v| *v != task_id);
        tracing::info!(task_id = %task_id, "Revoked tool key");
    }

    // =========================================================================
    // Daemon Reauth Status
    // =========================================================================

    /// Store a daemon reauth status update (from daemon's ReauthStatus message).
    pub fn set_daemon_reauth_status(
        &self,
        daemon_id: Uuid,
        request_id: Uuid,
        status: String,
        login_url: Option<String>,
        error: Option<String>,
    ) {
        self.daemon_reauth_status.insert(
            (daemon_id, request_id),
            DaemonReauthStatus {
                request_id,
                status,
                login_url,
                error,
            },
        );
    }

    /// Get a daemon reauth status (for frontend polling).
    pub fn get_daemon_reauth_status(
        &self,
        daemon_id: Uuid,
        request_id: Uuid,
    ) -> Option<DaemonReauthStatus> {
        self.daemon_reauth_status
            .get(&(daemon_id, request_id))
            .map(|entry| entry.value().clone())
    }

}

/// Type alias for the shared application state.
pub type SharedState = Arc<AppState>;