summaryrefslogblamecommitdiff
path: root/makima/src/server/handlers/mesh_daemon.rs
blob: ed1b603d014323b7fa5a7d5af3e4f916e92615cc (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16















                                                                                     
                   
                            





                                  
                            
                          
                                                   













































































































































































































































                                                                                                                                




















                                                                                                                                       





























                                                              












                                                   











                                                                                










                                                                         



                                                                                
                                                                 





                                                                


                                                          
      









































                                                                         














                                                 











                                               



























                                                    








                                                                                     
      












                                                  






















                                                                





















                                             







                                           


























                                           
















                                          















                                                                                     










                                           












                                                                                                











                                                                                             

                                                                                      



                                         






                                           

                                                                 

                                
                                

                       

      








                                                                                                   
                                
                                                                               







                          



























































































                                                                                                    

























































































































                                                                                                         
                                                                          
                                                   
                                           




















                                                                                                                           

                                                                                             








                                                                      
                                                           
                                                                          



                                                                                            

                                                                               
                                                              





                                                                                       
                                                

                                         










                                                                                                 
 
                                                                                           

                                                          
                                                     







                                                     

                                                                                                            











































                                                                                                             

                                                                                          













                                                                              













                                                                                                                     
                             


















                                                                   
                                                                                                     

























                                                                                                















                                                                                                                         














                                                                                                                             






















































                                                                                                                         
                                                                                     


                                                                                          
 
                                                                                                                                        

                                                                                                                













                                                                                                                                       

                                                                                
                                                                                     
                                                                         
                                                                                           


                                                                                                       

                                                                                                                                 
                                                                          













                                                                                                                                                                 





                                                                                                       
                                                                                                                   




                                                                          

















































                                                                                                                                   





                                                                                               
                                                                                                               





















                                                                                                       










































































                                                                                                    





                                                                                                                                                          






                                                                                                                            







                                                                                                         
                                                                                            



                                                                    
 










                                                                                                                                    















                                                                                                                                 
 














                                                                                                                                         








                                                                                                                                                             

                                                 



























                                                                                                     







































































































                                                                                                                   
















                                                                                    








                                                                                                                    
                                                       

































                                                                                                                       
                                                                                                                      





                                                             
                                                              











                                                                                      














































































































                                                                                                                                





















































































                                                                                                                                        





























                                                                                                                               










                                                                           


                                                  




                                                             
                                                                     
























                                                                                                                  











































                                                                                                                               

















                                                                                                        


















                                                                                                                       
                                                                                                 


                                                                















                                                                                                                                     





























































                                                                                                                      



















                                                                                                         



                                                                   

                                           






                                                                     








                                                                                                            



























                                                                                                                                                 
                                          




                                                       
                                                           


                                                                








                                                                                                      








                                                                                                                                                                              






                                                                                                                                                   

























                                                                                                                           




                                             




















                                                                      




































































                                                                                                                                              







































                                                                                                     


































                                                                                                                





































































                                                                                                                         
















                                                                                                        






































                                                                                               
                                                  

                                           
                                            
                                 








                                                                                            

                                                                                        


                                             
                                                                  





                  



                                                                                                          
           
                           




                                                               
                    

            





















































                                                                                                      



          
//! WebSocket handler for daemon connections.
//!
//! Daemons connect to report task progress, stream output, and receive commands.
//! Each daemon manages Claude Code containers on its local machine.
//!
//! ## Authentication
//!
//! Daemons authenticate via the `X-Api-Key` header in the WebSocket upgrade request.
//! The API key is validated against the database and the daemon is associated with
//! the corresponding owner_id for data isolation.

use axum::{
    extract::{ws::Message, ws::WebSocket, State, WebSocketUpgrade},
    http::{HeaderMap, StatusCode},
    response::{IntoResponse, Response},
};
use base64::Engine;
use chrono::{DateTime, Utc};
use futures::{SinkExt, StreamExt};
use serde::Deserialize;
use sqlx::Row;
use tokio::sync::mpsc;
use uuid::Uuid;

use crate::db::models::Task;
use crate::db::repository;
use crate::llm::{check_deliverables_met, TaskInfo};
use crate::server::auth::{hash_api_key, API_KEY_HEADER};
use crate::server::messages::ApiError;
use crate::server::state::{
    DaemonCommand, SharedState, TaskOutputNotification, TaskUpdateNotification,
};

// =============================================================================
// Claude Code JSON Output Parsing
// =============================================================================

/// Claude Code stream-json message structure
#[derive(Debug, Deserialize)]
struct ClaudeMessage {
    #[serde(rename = "type")]
    msg_type: String,
    subtype: Option<String>,
    message: Option<ClaudeMessageContent>,
    tool_name: Option<String>,
    tool_input: Option<serde_json::Value>,
    tool_result: Option<ClaudeToolResult>,
    result: Option<String>,
    cost_usd: Option<f64>,
    duration_ms: Option<u64>,
    error: Option<String>,
}

#[derive(Debug, Deserialize)]
struct ClaudeMessageContent {
    content: Option<Vec<ClaudeContentBlock>>,
}

#[derive(Debug, Deserialize)]
struct ClaudeContentBlock {
    #[serde(rename = "type")]
    block_type: String,
    text: Option<String>,
    name: Option<String>,
    input: Option<serde_json::Value>,
}

#[derive(Debug, Deserialize)]
struct ClaudeToolResult {
    content: Option<String>,
    is_error: Option<bool>,
}

/// Parse a line of Claude Code output into a structured notification
fn parse_claude_output(task_id: Uuid, owner_id: Uuid, line: &str, is_partial: bool) -> Option<TaskOutputNotification> {
    let trimmed = line.trim();
    if trimmed.is_empty() {
        return None;
    }

    // Try to parse as JSON
    if trimmed.starts_with('{') {
        if let Ok(msg) = serde_json::from_str::<ClaudeMessage>(trimmed) {
            return parse_claude_message(task_id, owner_id, msg, is_partial);
        }
    }

    // Not JSON or failed to parse - treat as raw output
    Some(TaskOutputNotification {
        task_id,
        owner_id: Some(owner_id),
        message_type: "raw".to_string(),
        content: trimmed.to_string(),
        tool_name: None,
        tool_input: None,
        is_error: None,
        cost_usd: None,
        duration_ms: None,
        is_partial,
    })
}

fn parse_claude_message(task_id: Uuid, owner_id: Uuid, msg: ClaudeMessage, is_partial: bool) -> Option<TaskOutputNotification> {
    match msg.msg_type.as_str() {
        "system" => {
            // System messages (init, etc.) - include subtype info
            let content = match msg.subtype.as_deref() {
                Some("init") => "Session started".to_string(),
                Some(sub) => format!("System: {}", sub),
                None => "System message".to_string(),
            };
            Some(TaskOutputNotification {
                task_id,
                owner_id: Some(owner_id),
                message_type: "system".to_string(),
                content,
                tool_name: None,
                tool_input: None,
                is_error: None,
                cost_usd: None,
                duration_ms: None,
                is_partial,
            })
        }

        "assistant" => {
            // Extract text content from message blocks
            if let Some(message) = msg.message {
                if let Some(blocks) = message.content {
                    // Check for text blocks
                    let text_content: Vec<String> = blocks
                        .iter()
                        .filter(|b| b.block_type == "text")
                        .filter_map(|b| b.text.clone())
                        .collect();

                    if !text_content.is_empty() {
                        return Some(TaskOutputNotification {
                            task_id,
                            owner_id: Some(owner_id),
                            message_type: "assistant".to_string(),
                            content: text_content.join("\n"),
                            tool_name: None,
                            tool_input: None,
                            is_error: None,
                            cost_usd: None,
                            duration_ms: None,
                            is_partial,
                        });
                    }

                    // Check for tool_use blocks
                    if let Some(tool_block) = blocks.iter().find(|b| b.block_type == "tool_use") {
                        return Some(TaskOutputNotification {
                            task_id,
                            owner_id: Some(owner_id),
                            message_type: "tool_use".to_string(),
                            content: format!("Using tool: {}", tool_block.name.as_deref().unwrap_or("unknown")),
                            tool_name: tool_block.name.clone(),
                            tool_input: tool_block.input.clone(),
                            is_error: None,
                            cost_usd: None,
                            duration_ms: None,
                            is_partial,
                        });
                    }
                }
            }
            None
        }

        "tool_use" => {
            Some(TaskOutputNotification {
                task_id,
                owner_id: Some(owner_id),
                message_type: "tool_use".to_string(),
                content: format!("Using tool: {}", msg.tool_name.as_deref().unwrap_or("unknown")),
                tool_name: msg.tool_name,
                tool_input: msg.tool_input,
                is_error: None,
                cost_usd: None,
                duration_ms: None,
                is_partial,
            })
        }

        "tool_result" => {
            if let Some(result) = msg.tool_result {
                let content = result.content.unwrap_or_default();
                // Truncate long results
                let content = if content.len() > 500 {
                    format!("{}...", &content[..500])
                } else {
                    content
                };
                Some(TaskOutputNotification {
                    task_id,
                    owner_id: Some(owner_id),
                    message_type: "tool_result".to_string(),
                    content,
                    tool_name: None,
                    tool_input: None,
                    is_error: result.is_error,
                    cost_usd: None,
                    duration_ms: None,
                    is_partial,
                })
            } else {
                None
            }
        }

        "result" => {
            Some(TaskOutputNotification {
                task_id,
                owner_id: Some(owner_id),
                message_type: "result".to_string(),
                content: msg.result.unwrap_or_else(|| "Task completed".to_string()),
                tool_name: None,
                tool_input: None,
                is_error: None,
                cost_usd: msg.cost_usd,
                duration_ms: msg.duration_ms,
                is_partial,
            })
        }

        "error" => {
            Some(TaskOutputNotification {
                task_id,
                owner_id: Some(owner_id),
                message_type: "error".to_string(),
                content: msg.error.unwrap_or_else(|| "An error occurred".to_string()),
                tool_name: None,
                tool_input: None,
                is_error: Some(true),
                cost_usd: None,
                duration_ms: None,
                is_partial,
            })
        }

        _ => None, // Skip unknown message types
    }
}

/// Message from daemon to server.
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum DaemonMessage {
    /// Authentication request (first message required)
    Authenticate {
        #[serde(rename = "apiKey")]
        api_key: String,
        #[serde(rename = "machineId")]
        machine_id: String,
        hostname: String,
        #[serde(rename = "maxConcurrentTasks")]
        max_concurrent_tasks: i32,
    },
    /// Periodic heartbeat with current status
    Heartbeat {
        #[serde(rename = "activeTasks")]
        active_tasks: Vec<Uuid>,
    },
    /// Enhanced supervisor heartbeat with detailed state
    SupervisorHeartbeat {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        #[serde(rename = "contractId")]
        contract_id: Uuid,
        /// Supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted
        state: String,
        /// Current contract phase
        phase: String,
        /// Description of current activity
        #[serde(rename = "currentActivity")]
        current_activity: Option<String>,
        /// Progress percentage (0-100)
        progress: u8,
        /// Task IDs the supervisor is waiting on
        #[serde(rename = "pendingTaskIds")]
        pending_task_ids: Vec<Uuid>,
        /// Timestamp of this heartbeat
        timestamp: DateTime<Utc>,
    },
    /// Task output streaming (stdout/stderr from Claude Code)
    TaskOutput {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        output: String,
        #[serde(rename = "isPartial")]
        is_partial: bool,
    },
    /// Task status change notification
    TaskStatusChange {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        #[serde(rename = "oldStatus")]
        old_status: String,
        #[serde(rename = "newStatus")]
        new_status: String,
    },
    /// Task progress update with summary
    TaskProgress {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        summary: String,
    },
    /// Task completion notification
    TaskComplete {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        success: bool,
        error: Option<String>,
    },
    /// Task recovery detected after daemon restart
    TaskRecoveryDetected {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        #[serde(rename = "previousState")]
        previous_state: String,
        #[serde(rename = "worktreeIntact")]
        worktree_intact: bool,
        #[serde(rename = "worktreePath")]
        worktree_path: Option<String>,
        #[serde(rename = "needsPatch")]
        needs_patch: bool,
    },
    /// Register a tool key for orchestrator API access
    RegisterToolKey {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        /// The API key for this orchestrator to use when calling mesh endpoints
        key: String,
    },
    /// Revoke a tool key when task completes
    RevokeToolKey {
        #[serde(rename = "taskId")]
        task_id: Uuid,
    },
    /// Authentication required - OAuth token expired, provides login URL
    AuthenticationRequired {
        /// Task ID that triggered the auth error (if any)
        #[serde(rename = "taskId")]
        task_id: Option<Uuid>,
        /// OAuth login URL for remote authentication
        #[serde(rename = "loginUrl")]
        login_url: String,
        /// Hostname of the daemon requiring auth
        hostname: Option<String>,
    },
    /// Reauth status update (response to TriggerReauth/SubmitAuthCode commands)
    ReauthStatus {
        #[serde(rename = "requestId")]
        request_id: Uuid,
        /// Status: "pending", "url_ready", "completed", "failed"
        status: String,
        /// OAuth login URL (present when status is "url_ready")
        #[serde(rename = "loginUrl")]
        login_url: Option<String>,
        /// Error message (present when status is "failed")
        error: Option<String>,
        /// Whether the OAuth token has been saved to disk
        #[serde(rename = "tokenSaved", default)]
        token_saved: bool,
    },
    /// Response to RetryCompletionAction command
    CompletionActionResult {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        success: bool,
        message: String,
        /// PR URL if action was "pr" and successful
        #[serde(rename = "prUrl")]
        pr_url: Option<String>,
    },
    /// Report daemon's available directories for task output
    DaemonDirectories {
        /// Current working directory of the daemon
        #[serde(rename = "workingDirectory")]
        working_directory: String,
        /// Path to ~/.makima/home directory (for cloning completed work)
        #[serde(rename = "homeDirectory")]
        home_directory: String,
        /// Path to worktrees directory (~/.makima/worktrees)
        #[serde(rename = "worktreesDirectory")]
        worktrees_directory: String,
    },
    /// Response to CloneWorktree command
    CloneWorktreeResult {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        success: bool,
        message: String,
        /// The path where the worktree was cloned
        #[serde(rename = "targetDir")]
        target_dir: Option<String>,
    },
    /// Response to CheckTargetExists command
    CheckTargetExistsResult {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        /// Whether the target directory exists
        exists: bool,
        /// The path that was checked
        #[serde(rename = "targetDir")]
        target_dir: String,
    },
    /// Response to ReadRepoFile command
    RepoFileContent {
        /// Request ID from the original command
        #[serde(rename = "requestId")]
        request_id: Uuid,
        /// Path to the file that was read
        #[serde(rename = "filePath")]
        file_path: String,
        /// File content (None if error occurred)
        content: Option<String>,
        /// Whether the operation succeeded
        success: bool,
        /// Error message if operation failed
        error: Option<String>,
    },
    /// Notification that a branch was created
    BranchCreated {
        #[serde(rename = "taskId")]
        task_id: Option<Uuid>,
        /// Name of the branch that was created
        #[serde(rename = "branchName")]
        branch_name: String,
        /// Whether the operation succeeded
        success: bool,
        /// Error message if operation failed
        error: Option<String>,
    },
    /// Notification that a checkpoint was created
    CheckpointCreated {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        /// Whether the operation succeeded
        success: bool,
        /// Commit SHA if successful
        #[serde(rename = "commitSha")]
        commit_sha: Option<String>,
        /// Branch name where checkpoint was created
        #[serde(rename = "branchName")]
        branch_name: Option<String>,
        /// Checkpoint number in sequence
        #[serde(rename = "checkpointNumber")]
        checkpoint_number: Option<i32>,
        /// Files changed in this checkpoint
        #[serde(rename = "filesChanged")]
        files_changed: Option<serde_json::Value>,
        /// Lines added
        #[serde(rename = "linesAdded")]
        lines_added: Option<i32>,
        /// Lines removed
        #[serde(rename = "linesRemoved")]
        lines_removed: Option<i32>,
        /// Error message if operation failed
        error: Option<String>,
        /// User-provided checkpoint message
        message: String,
        /// Base64-encoded gzip-compressed patch data for recovery
        #[serde(rename = "patchData", skip_serializing_if = "Option::is_none")]
        patch_data: Option<String>,
        /// Commit SHA to apply patch on top of (for recovery)
        #[serde(rename = "patchBaseSha", skip_serializing_if = "Option::is_none")]
        patch_base_sha: Option<String>,
        /// Number of files in the patch
        #[serde(rename = "patchFilesCount", skip_serializing_if = "Option::is_none")]
        patch_files_count: Option<i32>,
    },
    /// Notification that git config was inherited
    GitConfigInherited {
        /// Whether the operation succeeded
        success: bool,
        /// Git user.email that was inherited
        #[serde(rename = "userEmail")]
        user_email: Option<String>,
        /// Git user.name that was inherited
        #[serde(rename = "userName")]
        user_name: Option<String>,
        /// Error message if operation failed
        error: Option<String>,
    },
    /// Response to CreateExportPatch command
    ExportPatchCreated {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        success: bool,
        /// The uncompressed, human-readable patch content
        #[serde(rename = "patchContent")]
        patch_content: Option<String>,
        /// Number of files changed
        #[serde(rename = "filesCount")]
        files_count: Option<usize>,
        /// Lines added
        #[serde(rename = "linesAdded")]
        lines_added: Option<usize>,
        /// Lines removed
        #[serde(rename = "linesRemoved")]
        lines_removed: Option<usize>,
        /// The base commit SHA that the patch is diffed against
        #[serde(rename = "baseCommitSha")]
        base_commit_sha: Option<String>,
        /// Error message if failed
        error: Option<String>,
    },
    /// Response to MergeTaskToTarget command
    MergeToTargetResult {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        success: bool,
        message: String,
        #[serde(rename = "commitSha")]
        commit_sha: Option<String>,
        conflicts: Option<Vec<String>>,
    },
    /// Response to CreatePR command
    #[serde(rename = "prCreated")]
    PRCreated {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        success: bool,
        message: String,
        #[serde(rename = "prUrl")]
        pr_url: Option<String>,
        #[serde(rename = "prNumber")]
        pr_number: Option<i32>,
    },
    /// Response to GetWorktreeDiff command
    WorktreeDiffResult {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        success: bool,
        diff: Option<String>,
        error: Option<String>,
    },
    /// Response to GetWorktreeInfo command
    WorktreeInfoResult {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        success: bool,
        /// Path to the worktree directory
        #[serde(rename = "worktreePath")]
        worktree_path: Option<String>,
        /// Whether the worktree exists
        exists: bool,
        /// Number of files changed
        #[serde(rename = "filesChanged")]
        files_changed: i32,
        /// Total lines inserted
        insertions: i32,
        /// Total lines deleted
        deletions: i32,
        /// Changed files list
        files: Option<serde_json::Value>,
        /// Current branch name
        branch: Option<String>,
        /// Current HEAD commit SHA
        #[serde(rename = "headSha")]
        head_sha: Option<String>,
        /// Error message if failed
        error: Option<String>,
    },
    /// Response to GetTaskDiff command
    TaskDiff {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        success: bool,
        diff: Option<String>,
        error: Option<String>,
    },
    /// Response to CommitWorktree command
    WorktreeCommitResult {
        #[serde(rename = "taskId")]
        task_id: Uuid,
        success: bool,
        #[serde(rename = "commitSha")]
        commit_sha: Option<String>,
        error: Option<String>,
    },
    /// Request to merge a task's patch to supervisor's worktree (cross-daemon case).
    /// Sent when a task completes on a different daemon than its supervisor.
    MergePatchToSupervisor {
        /// The task that completed.
        #[serde(rename = "taskId")]
        task_id: Uuid,
        /// The supervisor task to merge into.
        #[serde(rename = "supervisorTaskId")]
        supervisor_task_id: Uuid,
        /// Base64-gzipped patch data.
        #[serde(rename = "patchData")]
        patch_data: String,
        /// Base commit SHA for the patch.
        #[serde(rename = "baseSha")]
        base_sha: String,
    },
}

/// Validated daemon authentication result.
#[derive(Debug, Clone)]
struct DaemonAuthResult {
    /// User ID from the API key
    user_id: Uuid,
    /// Owner ID for data isolation
    owner_id: Uuid,
}

/// Compute an action directive for the supervisor based on deliverable status.
/// Returns an [ACTION REQUIRED] message if all deliverables are met.
async fn compute_action_directive(
    pool: &sqlx::PgPool,
    contract_id: Uuid,
    owner_id: Uuid,
) -> Option<String> {
    // Get contract
    let contract = match repository::get_contract_for_owner(pool, contract_id, owner_id).await {
        Ok(Some(c)) => c,
        _ => return None,
    };

    // Get tasks (non-supervisor only)
    let tasks = match repository::list_tasks_by_contract(pool, contract_id, owner_id).await {
        Ok(t) => t.into_iter().filter(|t| !t.is_supervisor).collect::<Vec<_>>(),
        _ => return None,
    };

    // Get repositories
    let repos = match repository::list_contract_repositories(pool, contract_id).await {
        Ok(r) => r,
        _ => return None,
    };

    // Get completed deliverables for the current phase
    let completed_deliverables = contract.get_completed_deliverables(&contract.phase);

    let task_infos: Vec<TaskInfo> = tasks
        .iter()
        .map(|t| TaskInfo {
            name: t.name.clone(),
            status: t.status.clone(),
        })
        .collect();

    let has_repository = !repos.is_empty();

    // Check deliverables (unused, but kept for future reference)
    let _check = check_deliverables_met(
        &contract.phase,
        &contract.contract_type,
        &completed_deliverables,
        &task_infos,
        has_repository,
    );

    // Generate directive based on deliverable status
    if contract.phase == "execute" {
        // Check if all tasks are done but PR deliverable is not marked complete
        let all_tasks_done = !task_infos.is_empty()
            && task_infos.iter().all(|t| t.status == "done");
        let pr_deliverable_complete = completed_deliverables.contains(&"pull-request".to_string());

        if all_tasks_done && !pr_deliverable_complete {
            let done_count = task_infos.len();
            return Some(format!(
                "[INFO] All {} task(s) completed. System is auto-creating PR.",
                done_count
            ));
        }
    }

    None
}

/// Automatically create a PR when all non-supervisor tasks for a contract are done.
/// Only applies to remote-repo contracts in the "execute" phase.
/// Fires as a best-effort operation — errors are logged but not propagated.
async fn auto_create_pr_if_ready(
    pool: &sqlx::PgPool,
    state: &SharedState,
    contract_id: Uuid,
    owner_id: Uuid,
) {
    // 1. Load contract — must be remote (not local_only) and in execute phase
    let contract = match repository::get_contract_for_owner(pool, contract_id, owner_id).await {
        Ok(Some(c)) => c,
        _ => return,
    };
    if contract.local_only || contract.phase != "execute" {
        return;
    }

    // 2. Load non-supervisor tasks — all must be done
    let tasks = match repository::list_tasks_by_contract(pool, contract_id, owner_id).await {
        Ok(t) => t,
        _ => return,
    };
    let non_supervisor_tasks: Vec<_> = tasks.iter().filter(|t| !t.is_supervisor).collect();
    if non_supervisor_tasks.is_empty() || !non_supervisor_tasks.iter().all(|t| t.status == "done") {
        return;
    }

    // 3. Check pull-request deliverable not already complete
    let completed_deliverables = contract.get_completed_deliverables(&contract.phase);
    if completed_deliverables.contains(&"pull-request".to_string()) {
        return;
    }

    // 4. Check at least one repository has a remote URL
    let repos = match repository::list_contract_repositories(pool, contract_id).await {
        Ok(r) => r,
        _ => return,
    };
    if !repos.iter().any(|r| r.repository_url.is_some()) {
        return;
    }

    // 5. Load supervisor task
    let supervisor = match repository::get_contract_supervisor_task(pool, contract_id).await {
        Ok(Some(s)) => s,
        _ => return,
    };

    // Need supervisor's daemon_id to send command
    let daemon_id = match supervisor.daemon_id {
        Some(id) => id,
        None => return,
    };

    // 6. Construct branch name
    let sanitized_name: String = supervisor
        .name
        .chars()
        .map(|c| if c.is_alphanumeric() || c == '-' || c == '_' { c } else { '-' })
        .collect::<String>()
        .to_lowercase();
    let short_id = &supervisor.id.to_string()[..8];
    let branch = format!("makima/{}-{}", sanitized_name, short_id);

    // 7. Send CreatePR command to supervisor's daemon
    let command = DaemonCommand::CreatePR {
        task_id: supervisor.id,
        title: contract.name.clone(),
        body: contract.description.clone(),
        base_branch: supervisor.base_branch.clone(),
        branch,
    };

    match state.send_daemon_command(daemon_id, command).await {
        Ok(()) => {
            tracing::info!(
                contract_id = %contract_id,
                supervisor_id = %supervisor.id,
                "Auto-PR: sent CreatePR command to supervisor daemon"
            );
        }
        Err(e) => {
            tracing::warn!(
                contract_id = %contract_id,
                error = %e,
                "Auto-PR: failed to send CreatePR command"
            );
        }
    }
}

/// Validate an API key and return (user_id, owner_id).
async fn validate_daemon_api_key(pool: &sqlx::PgPool, key: &str) -> Result<DaemonAuthResult, String> {
    let key_hash = hash_api_key(key);

    // Look up the API key and join with users to get owner_id
    let row = sqlx::query(
        r#"
        SELECT ak.user_id, u.default_owner_id
        FROM api_keys ak
        JOIN users u ON u.id = ak.user_id
        WHERE ak.key_hash = $1 AND ak.revoked_at IS NULL
        "#,
    )
    .bind(&key_hash)
    .fetch_optional(pool)
    .await
    .map_err(|e| format!("Database error: {}", e))?;

    match row {
        Some(row) => {
            let user_id: Uuid = row.try_get("user_id")
                .map_err(|e| format!("Failed to get user_id: {}", e))?;
            let owner_id: Option<Uuid> = row.try_get("default_owner_id")
                .map_err(|e| format!("Failed to get owner_id: {}", e))?;
            let owner_id = owner_id.ok_or_else(|| "User has no default owner".to_string())?;

            // Update last_used_at asynchronously (fire and forget)
            let pool_clone = pool.clone();
            let key_hash_clone = key_hash.clone();
            tokio::spawn(async move {
                let _ = sqlx::query("UPDATE api_keys SET last_used_at = NOW() WHERE key_hash = $1")
                    .bind(&key_hash_clone)
                    .execute(&pool_clone)
                    .await;
            });

            Ok(DaemonAuthResult { user_id, owner_id })
        }
        None => Err("Invalid or revoked API key".to_string()),
    }
}

/// WebSocket upgrade handler for daemon connections.
///
/// Daemons must authenticate via the `X-Api-Key` header in the WebSocket upgrade request.
/// The API key is validated against the database and used to determine the owner_id
/// for data isolation.
#[utoipa::path(
    get,
    path = "/api/v1/mesh/daemons/connect",
    params(
        ("X-Api-Key" = String, Header, description = "API key for daemon authentication"),
    ),
    responses(
        (status = 101, description = "WebSocket connection established"),
        (status = 401, description = "Missing or invalid API key"),
        (status = 503, description = "Database not configured"),
    ),
    tag = "Mesh"
)]
pub async fn daemon_handler(
    ws: WebSocketUpgrade,
    State(state): State<SharedState>,
    headers: HeaderMap,
) -> Response {
    // Extract API key from headers
    let api_key = match headers.get(API_KEY_HEADER).or_else(|| headers.get("x-api-key")) {
        Some(value) => match value.to_str() {
            Ok(key) if !key.is_empty() => key.to_string(),
            _ => {
                return (
                    StatusCode::UNAUTHORIZED,
                    axum::Json(ApiError::new("INVALID_API_KEY", "Invalid API key header value")),
                )
                    .into_response();
            }
        },
        None => {
            return (
                StatusCode::UNAUTHORIZED,
                axum::Json(ApiError::new("MISSING_API_KEY", "X-Api-Key header required")),
            )
                .into_response();
        }
    };

    // Validate API key against database
    let pool = match state.db_pool.as_ref() {
        Some(pool) => pool,
        None => {
            return (
                StatusCode::SERVICE_UNAVAILABLE,
                axum::Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
            )
                .into_response();
        }
    };

    let auth_result = match validate_daemon_api_key(pool, &api_key).await {
        Ok(result) => result,
        Err(e) => {
            tracing::warn!("Daemon authentication failed: {}", e);
            return (
                StatusCode::UNAUTHORIZED,
                axum::Json(ApiError::new("AUTH_FAILED", e)),
            )
                .into_response();
        }
    };

    tracing::info!(
        user_id = %auth_result.user_id,
        owner_id = %auth_result.owner_id,
        "Daemon authenticated via API key"
    );

    ws.on_upgrade(move |socket| handle_daemon_connection(socket, state, auth_result))
}

async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_result: DaemonAuthResult) {
    let (mut sender, mut receiver) = socket.split();

    // Generate a unique connection ID (daemon ID will come from database)
    let connection_id = Uuid::new_v4().to_string();
    let mut daemon_id: Option<Uuid> = None;
    let owner_id = auth_result.owner_id;

    // Create command channel for sending commands to this daemon
    let (cmd_tx, mut cmd_rx) = mpsc::channel::<DaemonCommand>(64);

    // Wait for the daemon to send its registration info (hostname, machine_id, etc.)
    // The daemon is already authenticated via API key header, but we need metadata
    #[allow(unused_assignments)]
    let mut registered = false;

    // Wait for registration message with metadata
    loop {
        tokio::select! {
            msg = receiver.next() => {
                match msg {
                    Some(Ok(Message::Text(text))) => {
                        match serde_json::from_str::<DaemonMessage>(&text) {
                            Ok(DaemonMessage::Authenticate { api_key: _, machine_id, hostname, max_concurrent_tasks }) => {
                                // API key was already validated via headers, but we use this message
                                // for backward compatibility to get the machine_id and hostname

                                // Register daemon in database first to get the persistent ID
                                let db_daemon_id = if let Some(ref pool) = state.db_pool {
                                    match repository::register_daemon(
                                        pool,
                                        owner_id,
                                        &connection_id,
                                        Some(&hostname),
                                        Some(&machine_id),
                                        max_concurrent_tasks as i32,
                                    ).await {
                                        Ok(db_daemon) => {
                                            tracing::info!(
                                                daemon_id = %db_daemon.id,
                                                owner_id = %owner_id,
                                                hostname = %hostname,
                                                machine_id = %machine_id,
                                                max_concurrent_tasks = max_concurrent_tasks,
                                                "Daemon registered in database"
                                            );
                                            Some(db_daemon.id)
                                        }
                                        Err(e) => {
                                            tracing::error!(
                                                error = %e,
                                                "Failed to register daemon in database"
                                            );
                                            None
                                        }
                                    }
                                } else {
                                    None
                                };

                                // If database registration failed, we can't proceed
                                let Some(actual_daemon_id) = db_daemon_id else {
                                    tracing::error!("Cannot proceed without database daemon ID");
                                    break;
                                };

                                daemon_id = Some(actual_daemon_id);

                                // Register daemon in state with owner_id using database ID
                                state.register_daemon(
                                    connection_id.clone(),
                                    actual_daemon_id,
                                    owner_id,
                                    Some(hostname),
                                    Some(machine_id),
                                    cmd_tx.clone(),
                                );

                                registered = true;

                                // Send authentication confirmation with database ID
                                let response = DaemonCommand::Authenticated { daemon_id: actual_daemon_id };
                                let json = serde_json::to_string(&response).unwrap();
                                if sender.send(Message::Text(json.into())).await.is_err() {
                                    break;
                                }

                                break; // Exit registration loop, continue to main loop
                            }
                            Ok(_) => {
                                // Non-auth message before registration - still requires registration message
                                let response = DaemonCommand::Error {
                                    code: "NOT_REGISTERED".into(),
                                    message: "Must send registration message (Authenticate) first".into(),
                                };
                                let json = serde_json::to_string(&response).unwrap();
                                let _ = sender.send(Message::Text(json.into())).await;
                            }
                            Err(e) => {
                                let response = DaemonCommand::Error {
                                    code: "PARSE_ERROR".into(),
                                    message: e.to_string(),
                                };
                                let json = serde_json::to_string(&response).unwrap();
                                let _ = sender.send(Message::Text(json.into())).await;
                            }
                        }
                    }
                    Some(Ok(Message::Close(_))) | None => {
                        tracing::debug!("Daemon disconnected during registration");
                        return;
                    }
                    Some(Err(e)) => {
                        tracing::warn!("Daemon WebSocket error during registration: {}", e);
                        return;
                    }
                    _ => {}
                }
            }
        }
    }

    if !registered {
        return;
    }

    // daemon_id is guaranteed to be Some here since registered is true
    let daemon_uuid = daemon_id.expect("daemon_id should be set when registered is true");

    // Main message loop after authentication
    loop {
        tokio::select! {
            // Handle incoming messages from daemon
            msg = receiver.next() => {
                match msg {
                    Some(Ok(Message::Text(text))) => {
                        match serde_json::from_str::<DaemonMessage>(&text) {
                            Ok(DaemonMessage::Heartbeat { active_tasks }) => {
                                tracing::trace!(
                                    "Daemon {} heartbeat: {} active tasks",
                                    daemon_uuid, active_tasks.len()
                                );
                                // Update daemon last_heartbeat_at in DB
                                if let Some(ref pool) = state.db_pool {
                                    let pool = pool.clone();
                                    let daemon_id = daemon_uuid;
                                    tokio::spawn(async move {
                                        if let Err(e) = repository::update_daemon_heartbeat(&pool, daemon_id).await {
                                            tracing::warn!(
                                                daemon_id = %daemon_id,
                                                error = %e,
                                                "Failed to update daemon heartbeat"
                                            );
                                        }
                                    });
                                }
                            }
                            Ok(DaemonMessage::SupervisorHeartbeat {
                                task_id,
                                contract_id,
                                state: supervisor_state,
                                phase,
                                current_activity,
                                progress,
                                pending_task_ids,
                                timestamp: _,
                            }) => {
                                tracing::debug!(
                                    task_id = %task_id,
                                    contract_id = %contract_id,
                                    state = %supervisor_state,
                                    phase = %phase,
                                    progress = progress,
                                    "Supervisor heartbeat received"
                                );

                                // Store heartbeat in database and update supervisor state (Task 3.3)
                                if let Some(ref pool) = state.db_pool {
                                    let pool = pool.clone();
                                    let pending_ids = pending_task_ids.clone();
                                    let activity = current_activity.clone();
                                    let state_str = supervisor_state.clone();
                                    let phase_str = phase.clone();
                                    tokio::spawn(async move {
                                        // Store the heartbeat record
                                        if let Err(e) = repository::create_supervisor_heartbeat(
                                            &pool,
                                            task_id,
                                            contract_id,
                                            &state_str,
                                            &phase_str,
                                            activity.as_deref(),
                                            progress as i32,
                                            &pending_ids,
                                        ).await {
                                            tracing::warn!(
                                                task_id = %task_id,
                                                contract_id = %contract_id,
                                                error = %e,
                                                "Failed to store supervisor heartbeat"
                                            );
                                        }

                                        // Update supervisor_states table (lightweight heartbeat state update - Task 3.3)
                                        if let Err(e) = repository::update_supervisor_heartbeat_state(
                                            &pool,
                                            contract_id,
                                            &state_str,
                                            activity.as_deref(),
                                            progress as i32,
                                            &pending_ids,
                                        ).await {
                                            tracing::debug!(
                                                contract_id = %contract_id,
                                                error = %e,
                                                "Failed to update supervisor state from heartbeat (may not exist yet)"
                                            );
                                        }

                                        // Also update the daemon heartbeat
                                        if let Ok(Some(task)) = repository::get_task(&pool, task_id).await {
                                            if let Some(daemon_id) = task.daemon_id {
                                                if let Err(e) = repository::update_daemon_heartbeat(&pool, daemon_id).await {
                                                    tracing::warn!(
                                                        daemon_id = %daemon_id,
                                                        error = %e,
                                                        "Failed to update daemon heartbeat from supervisor"
                                                    );
                                                }
                                            }
                                        }
                                    });
                                }
                            }
                            Ok(DaemonMessage::TaskOutput { task_id, output, is_partial }) => {
                                // Parse the output line and broadcast structured data
                                if let Some(notification) = parse_claude_output(task_id, owner_id, &output, is_partial) {
                                    // Broadcast to connected clients
                                    state.broadcast_task_output(notification.clone());

                                    // Persist to database (fire and forget)
                                    if let Some(ref pool) = state.db_pool {
                                        let pool = pool.clone();
                                        let notification = notification.clone();
                                        tokio::spawn(async move {
                                            if let Err(e) = repository::save_task_output(
                                                &pool,
                                                notification.task_id,
                                                &notification.message_type,
                                                &notification.content,
                                                notification.tool_name.as_deref(),
                                                notification.tool_input.clone(),
                                                notification.is_error,
                                                notification.cost_usd,
                                                notification.duration_ms,
                                            ).await {
                                                tracing::warn!(
                                                    task_id = %notification.task_id,
                                                    "Failed to persist task output: {}",
                                                    e
                                                );
                                            }
                                        });
                                    }
                                }
                            }
                            Ok(DaemonMessage::TaskStatusChange { task_id, old_status, new_status }) => {
                                tracing::info!(
                                    "Task {} status change: {} -> {}",
                                    task_id, old_status, new_status
                                );

                                // Update task status in database and broadcast
                                if let Some(ref pool) = state.db_pool {
                                    let pool = pool.clone();
                                    let state = state.clone();
                                    let new_status_owned = new_status.clone();
                                    tokio::spawn(async move {
                                        match repository::update_task_status(
                                            &pool,
                                            task_id,
                                            &new_status_owned,
                                            None,
                                        ).await {
                                            Ok(Some(updated_task)) => {
                                                state.broadcast_task_update(TaskUpdateNotification {
                                                    task_id,
                                                    owner_id: Some(owner_id),
                                                    version: updated_task.version,
                                                    status: new_status_owned.clone(),
                                                    updated_fields: vec!["status".into()],
                                                    updated_by: "daemon".into(),
                                                });

                                                // Initialize or restore supervisor_state when supervisor task starts running (Task 3.4)
                                                if updated_task.is_supervisor && new_status_owned == "running" {
                                                    if let Some(contract_id) = updated_task.contract_id {
                                                        // Check if supervisor state already exists (restoration scenario)
                                                        match repository::get_supervisor_state(&pool, contract_id).await {
                                                            Ok(Some(existing_state)) => {
                                                                // State exists - this is a restoration
                                                                tracing::info!(
                                                                    task_id = %task_id,
                                                                    contract_id = %contract_id,
                                                                    existing_state = %existing_state.state,
                                                                    restoration_count = existing_state.restoration_count,
                                                                    "Supervisor starting with existing state - restoration in progress"
                                                                );

                                                                // Mark as restored (increments restoration_count)
                                                                match repository::mark_supervisor_restored(
                                                                    &pool,
                                                                    contract_id,
                                                                    "daemon_restart",
                                                                ).await {
                                                                    Ok(restored_state) => {
                                                                        tracing::info!(
                                                                            task_id = %task_id,
                                                                            contract_id = %contract_id,
                                                                            restoration_count = restored_state.restoration_count,
                                                                            "Supervisor restoration marked"
                                                                        );

                                                                        // Check for pending questions to re-deliver
                                                                        if let Ok(questions) = serde_json::from_value::<Vec<crate::db::models::PendingQuestion>>(
                                                                            restored_state.pending_questions.clone()
                                                                        ) {
                                                                            if !questions.is_empty() {
                                                                                tracing::info!(
                                                                                    contract_id = %contract_id,
                                                                                    question_count = questions.len(),
                                                                                    "Pending questions found for re-delivery"
                                                                                );
                                                                                // Questions will be re-delivered by the supervisor when it restores
                                                                            }
                                                                        }
                                                                    }
                                                                    Err(e) => {
                                                                        tracing::warn!(
                                                                            task_id = %task_id,
                                                                            contract_id = %contract_id,
                                                                            error = %e,
                                                                            "Failed to mark supervisor as restored"
                                                                        );
                                                                    }
                                                                }
                                                            }
                                                            Ok(None) => {
                                                                // No existing state - fresh start
                                                                // Get contract to get its phase
                                                                match repository::get_contract_for_owner(
                                                                    &pool,
                                                                    contract_id,
                                                                    updated_task.owner_id,
                                                                ).await {
                                                                    Ok(Some(contract)) => {
                                                                        match repository::upsert_supervisor_state(
                                                                            &pool,
                                                                            contract_id,
                                                                            task_id,
                                                                            serde_json::json!([]),  // Empty conversation
                                                                            &[],                     // No pending tasks
                                                                            &contract.phase,
                                                                        ).await {
                                                                            Ok(_) => {
                                                                                tracing::info!(
                                                                                    task_id = %task_id,
                                                                                    contract_id = %contract_id,
                                                                                    phase = %contract.phase,
                                                                                    "Initialized fresh supervisor state"
                                                                                );
                                                                            }
                                                                            Err(e) => {
                                                                                tracing::warn!(
                                                                                    task_id = %task_id,
                                                                                    contract_id = %contract_id,
                                                                                    error = %e,
                                                                                    "Failed to initialize supervisor state"
                                                                                );
                                                                            }
                                                                        }
                                                                    }
                                                                    Ok(None) => {
                                                                        tracing::warn!(
                                                                            task_id = %task_id,
                                                                            contract_id = %contract_id,
                                                                            "Contract not found when initializing supervisor state"
                                                                        );
                                                                    }
                                                                    Err(e) => {
                                                                        tracing::warn!(
                                                                            task_id = %task_id,
                                                                            contract_id = %contract_id,
                                                                            error = %e,
                                                                            "Failed to get contract for supervisor state"
                                                                        );
                                                                    }
                                                                }
                                                            }
                                                            Err(e) => {
                                                                tracing::warn!(
                                                                    task_id = %task_id,
                                                                    contract_id = %contract_id,
                                                                    error = %e,
                                                                    "Failed to check existing supervisor state"
                                                                );
                                                            }
                                                        }
                                                    }
                                                }

                                                // Record history event when task starts running
                                                if new_status_owned == "running" {
                                                    let _ = repository::record_history_event(
                                                        &pool,
                                                        updated_task.owner_id,
                                                        updated_task.contract_id,
                                                        Some(task_id),
                                                        "task",
                                                        Some("started"),
                                                        None,
                                                        serde_json::json!({
                                                            "name": &updated_task.name,
                                                            "isSupervisor": updated_task.is_supervisor,
                                                        }),
                                                    ).await;
                                                }
                                            }
                                            Ok(None) => {
                                                tracing::warn!(
                                                    task_id = %task_id,
                                                    "Task not found when updating status"
                                                );
                                            }
                                            Err(e) => {
                                                tracing::error!(
                                                    task_id = %task_id,
                                                    "Failed to update task status: {}",
                                                    e
                                                );
                                            }
                                        }
                                    });
                                } else {
                                    // No DB, just broadcast
                                    state.broadcast_task_update(TaskUpdateNotification {
                                        task_id,
                                        owner_id: Some(owner_id),
                                        version: 0,
                                        status: new_status,
                                        updated_fields: vec!["status".into()],
                                        updated_by: "daemon".into(),
                                    });
                                }
                            }
                            Ok(DaemonMessage::TaskProgress { task_id, summary }) => {
                                tracing::debug!("Task {} progress: {}", task_id, summary);
                                // TODO: Update task progress_summary in database
                                state.broadcast_task_update(TaskUpdateNotification {
                                    task_id,
                                    owner_id: Some(owner_id),
                                    version: 0,
                                    status: "running".into(),
                                    updated_fields: vec!["progress_summary".into()],
                                    updated_by: "daemon".into(),
                                });
                            }
                            Ok(DaemonMessage::TaskComplete { task_id, success, error }) => {
                                let status = if success { "done" } else { "failed" };
                                tracing::info!(
                                    "Task {} completed: success={}, error={:?}",
                                    task_id, success, error
                                );

                                // Revoke any tool keys for this task
                                state.revoke_tool_key(task_id);

                                // Update task in database with completion info
                                if let Some(ref pool) = state.db_pool {
                                    let pool = pool.clone();
                                    let state = state.clone();
                                    let error_clone = error.clone();
                                    tokio::spawn(async move {
                                        match repository::complete_task(
                                            &pool,
                                            task_id,
                                            success,
                                            error_clone.as_deref(),
                                        ).await {
                                            Ok(Some(updated_task)) => {
                                                state.broadcast_task_update(TaskUpdateNotification {
                                                    task_id,
                                                    owner_id: Some(owner_id),
                                                    version: updated_task.version,
                                                    status: updated_task.status.clone(),
                                                    updated_fields: vec![
                                                        "status".into(),
                                                        "completed_at".into(),
                                                        "error_message".into(),
                                                    ],
                                                    updated_by: "daemon".into(),
                                                });

                                                // Notify supervisor if this task belongs to a contract
                                                if let Some(contract_id) = updated_task.contract_id {
                                                    // Don't notify for supervisor tasks (they don't report to themselves)
                                                    if !updated_task.is_supervisor {
                                                        if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await {
                                                            // Compute action directive if task completed successfully
                                                            let action_directive = if updated_task.status == "done" {
                                                                compute_action_directive(&pool, contract_id, owner_id).await
                                                            } else {
                                                                None
                                                            };

                                                            state.notify_supervisor_of_task_completion(
                                                                supervisor.id,
                                                                supervisor.daemon_id,
                                                                updated_task.id,
                                                                &updated_task.name,
                                                                &updated_task.status,
                                                                updated_task.progress_summary.as_deref(),
                                                                updated_task.error_message.as_deref(),
                                                                action_directive.as_deref(),
                                                            ).await;
                                                        }
                                                    }
                                                }

                                                // Auto-create PR if all tasks are done and repo is remote
                                                if updated_task.status == "done" {
                                                    if let Some(contract_id) = updated_task.contract_id {
                                                        let pool_c = pool.clone();
                                                        let state_c = state.clone();
                                                        tokio::spawn(async move {
                                                            auto_create_pr_if_ready(&pool_c, &state_c, contract_id, owner_id).await;
                                                        });
                                                    }
                                                }

                                                // Record history event for task completion
                                                let subtype = if updated_task.status == "done" { "completed" } else { "failed" };
                                                let _ = repository::record_history_event(
                                                    &pool,
                                                    updated_task.owner_id,
                                                    updated_task.contract_id,
                                                    Some(task_id),
                                                    "task",
                                                    Some(subtype),
                                                    None,
                                                    serde_json::json!({
                                                        "name": &updated_task.name,
                                                        "status": &updated_task.status,
                                                        "error": &updated_task.error_message,
                                                    }),
                                                ).await;

                                                // Auto-advance directive DAG when a directive step task completes
                                                if let Some(step_id) = updated_task.directive_step_id {
                                                    let step_status = if updated_task.status == "done" { "completed" } else { "failed" };
                                                    let step_update = crate::db::models::UpdateDirectiveStepRequest {
                                                        status: Some(step_status.to_string()),
                                                        ..Default::default()
                                                    };
                                                    let _ = repository::update_directive_step(&pool, step_id, step_update).await;

                                                    if let Some(directive_id) = updated_task.directive_id {
                                                        // Advance newly-ready steps in the DAG
                                                        let _ = repository::advance_directive_ready_steps(&pool, directive_id).await;
                                                        // Check if all steps are done → set directive to idle
                                                        let _ = repository::check_directive_idle(&pool, directive_id).await;
                                                    }
                                                } else if let Some(directive_id) = updated_task.directive_id {
                                                    // Planning/orchestrator task completed — clear orchestrator_task_id
                                                    let _ = repository::clear_orchestrator_task(&pool, directive_id).await;
                                                    // Advance DAG — planning task should have created steps
                                                    let _ = repository::advance_directive_ready_steps(&pool, directive_id).await;
                                                    if updated_task.status != "done" {
                                                        // Planning failed — pause directive
                                                        let _ = repository::set_directive_status(&pool, updated_task.owner_id, directive_id, "paused").await;
                                                    }
                                                }

                                            }
                                            Ok(None) => {
                                                tracing::warn!(
                                                    task_id = %task_id,
                                                    "Task not found when completing"
                                                );
                                            }
                                            Err(e) => {
                                                tracing::error!(
                                                    task_id = %task_id,
                                                    "Failed to complete task: {}",
                                                    e
                                                );
                                            }
                                        }
                                    });
                                } else {
                                    // No DB, just broadcast
                                    state.broadcast_task_update(TaskUpdateNotification {
                                        task_id,
                                        owner_id: Some(owner_id),
                                        version: 0,
                                        status: status.into(),
                                        updated_fields: vec!["status".into(), "completed_at".into()],
                                        updated_by: "daemon".into(),
                                    });
                                }
                            }
                            Ok(DaemonMessage::TaskRecoveryDetected {
                                task_id,
                                previous_state,
                                worktree_intact,
                                worktree_path,
                                needs_patch,
                            }) => {
                                tracing::info!(
                                    task_id = %task_id,
                                    previous_state = %previous_state,
                                    worktree_intact = worktree_intact,
                                    worktree_path = ?worktree_path,
                                    needs_patch = needs_patch,
                                    "Task recovery detected after daemon restart"
                                );

                                // Update task in database based on recovery state
                                if let Some(ref pool) = state.db_pool {
                                    let pool = pool.clone();
                                    let state = state.clone();
                                    tokio::spawn(async move {
                                        if worktree_intact {
                                            // Worktree exists - task can be resumed on this daemon
                                            // Update task status to 'pending' so it can be picked up
                                            match sqlx::query(
                                                r#"
                                                UPDATE tasks
                                                SET status = 'pending',
                                                    daemon_id = NULL,
                                                    error_message = 'Daemon restarted - task ready for resumption',
                                                    interrupted_at = NOW(),
                                                    updated_at = NOW()
                                                WHERE id = $1 AND owner_id = $2
                                                RETURNING id
                                                "#,
                                            )
                                            .bind(task_id)
                                            .bind(owner_id)
                                            .fetch_optional(&pool)
                                            .await
                                            {
                                                Ok(Some(_)) => {
                                                    tracing::info!(
                                                        task_id = %task_id,
                                                        "Task marked as pending for resumption"
                                                    );
                                                    state.broadcast_task_update(TaskUpdateNotification {
                                                        task_id,
                                                        owner_id: Some(owner_id),
                                                        version: 0,
                                                        status: "pending".into(),
                                                        updated_fields: vec![
                                                            "status".into(),
                                                            "daemon_id".into(),
                                                            "interrupted_at".into(),
                                                        ],
                                                        updated_by: "daemon_recovery".into(),
                                                    });
                                                }
                                                Ok(None) => {
                                                    tracing::warn!(
                                                        task_id = %task_id,
                                                        "Task not found during recovery update"
                                                    );
                                                }
                                                Err(e) => {
                                                    tracing::error!(
                                                        task_id = %task_id,
                                                        error = %e,
                                                        "Failed to update task during recovery"
                                                    );
                                                }
                                            }
                                        } else {
                                            // Worktree missing - mark for retry with patch restoration
                                            match repository::mark_task_for_retry(
                                                &pool,
                                                task_id,
                                                daemon_uuid, // Mark this daemon as failed
                                            ).await {
                                                Ok(Some(_)) => {
                                                    tracing::info!(
                                                        task_id = %task_id,
                                                        "Task marked for retry (worktree missing)"
                                                    );
                                                }
                                                Ok(None) => {
                                                    tracing::warn!(
                                                        task_id = %task_id,
                                                        "Task not found or exceeded retries"
                                                    );
                                                }
                                                Err(e) => {
                                                    tracing::error!(
                                                        task_id = %task_id,
                                                        error = %e,
                                                        "Failed to mark task for retry"
                                                    );
                                                }
                                            }
                                        }
                                    });
                                }
                            }
                            Ok(DaemonMessage::Authenticate { .. }) => {
                                // Already authenticated, ignore
                            }
                            Ok(DaemonMessage::RegisterToolKey { task_id, key }) => {
                                tracing::info!(
                                    task_id = %task_id,
                                    "Registering tool key for orchestrator"
                                );
                                state.register_tool_key(key, task_id);
                            }
                            Ok(DaemonMessage::RevokeToolKey { task_id }) => {
                                tracing::info!(
                                    task_id = %task_id,
                                    "Revoking tool key for task"
                                );
                                state.revoke_tool_key(task_id);
                            }
                            Ok(DaemonMessage::AuthenticationRequired { task_id, login_url, hostname }) => {
                                tracing::warn!(
                                    task_id = ?task_id,
                                    login_url = %login_url,
                                    hostname = ?hostname,
                                    "Daemon requires authentication - OAuth token expired"
                                );

                                // Broadcast as task output with auth_required type so UI can display the login link
                                let _content = format!(
                                    "🔐 Authentication required on daemon{}. Click to login: {}",
                                    hostname.as_ref().map(|h| format!(" ({})", h)).unwrap_or_default(),
                                    login_url
                                );

                                // Broadcast to task subscribers if we have a task_id
                                if let Some(tid) = task_id {
                                    tracing::info!(task_id = %tid, "Broadcasting auth_required to task subscribers");
                                    state.broadcast_task_output(TaskOutputNotification {
                                        task_id: tid,
                                        owner_id: Some(owner_id),
                                        message_type: "auth_required".to_string(),
                                        content: "Authentication required".to_string(), // Constant for dedup
                                        tool_name: None,
                                        tool_input: Some(serde_json::json!({
                                            "loginUrl": login_url,
                                            "hostname": hostname,
                                            "taskId": tid.to_string(),
                                        })),
                                        is_error: Some(true),
                                        cost_usd: None,
                                        duration_ms: None,
                                        is_partial: false,
                                    });
                                } else {
                                    tracing::warn!("No task_id for auth_required - cannot broadcast to specific task");
                                }

                                // Also log the full URL for manual use
                                tracing::info!(
                                    login_url = %login_url,
                                    "OAuth login URL available - user should open this in browser"
                                );
                            }
                            Ok(DaemonMessage::ReauthStatus { request_id, status, login_url, error, token_saved }) => {
                                tracing::info!(
                                    daemon_id = %daemon_uuid,
                                    request_id = %request_id,
                                    status = %status,
                                    login_url = ?login_url,
                                    error = ?error,
                                    token_saved = token_saved,
                                    "Daemon reauth status update"
                                );

                                // Store the reauth status for polling by the frontend
                                state.set_daemon_reauth_status(
                                    daemon_uuid,
                                    request_id,
                                    status.clone(),
                                    login_url.clone(),
                                    error.clone(),
                                );
                            }
                            Ok(DaemonMessage::DaemonDirectories { working_directory, home_directory, worktrees_directory }) => {
                                tracing::info!(
                                    daemon_id = %daemon_uuid,
                                    working_directory = %working_directory,
                                    home_directory = %home_directory,
                                    worktrees_directory = %worktrees_directory,
                                    "Daemon directories received"
                                );
                                state.update_daemon_directories(
                                    &connection_id,
                                    working_directory,
                                    home_directory,
                                    worktrees_directory,
                                );
                            }
                            Ok(DaemonMessage::CompletionActionResult { task_id, success, message, pr_url }) => {
                                tracing::info!(
                                    task_id = %task_id,
                                    success = success,
                                    message = %message,
                                    pr_url = ?pr_url,
                                    "Completion action result received"
                                );

                                // Update task with PR URL if created
                                if let Some(ref url) = pr_url {
                                    if let Some(ref pool) = state.db_pool {
                                        let update_req = crate::db::models::UpdateTaskRequest {
                                            pr_url: Some(url.clone()),
                                            ..Default::default()
                                        };
                                        if let Err(e) = crate::db::repository::update_task(pool, task_id, update_req).await {
                                            tracing::error!("Failed to update task PR URL: {}", e);
                                        }
                                    }
                                }

                                // Broadcast as task output so UI can see the result
                                let output_text = if success {
                                    format!("✓ Completion action succeeded: {}", message)
                                } else {
                                    format!("✗ Completion action failed: {}", message)
                                };
                                state.broadcast_task_output(TaskOutputNotification {
                                    task_id,
                                    owner_id: Some(owner_id),
                                    message_type: "system".to_string(),
                                    content: output_text,
                                    tool_name: None,
                                    tool_input: None,
                                    is_error: Some(!success),
                                    cost_usd: None,
                                    duration_ms: None,
                                    is_partial: false,
                                });
                            }
                            Ok(DaemonMessage::CloneWorktreeResult { task_id, success, message, target_dir }) => {
                                tracing::info!(
                                    task_id = %task_id,
                                    success = success,
                                    message = %message,
                                    target_dir = ?target_dir,
                                    "Clone worktree result received"
                                );

                                // Broadcast as task output so UI can see the result
                                let output_text = if success {
                                    format!("✓ Clone succeeded: {}", message)
                                } else {
                                    format!("✗ Clone failed: {}", message)
                                };
                                state.broadcast_task_output(TaskOutputNotification {
                                    task_id,
                                    owner_id: Some(owner_id),
                                    message_type: "system".to_string(),
                                    content: output_text,
                                    tool_name: None,
                                    tool_input: None,
                                    is_error: Some(!success),
                                    cost_usd: None,
                                    duration_ms: None,
                                    is_partial: false,
                                });
                            }
                            Ok(DaemonMessage::CheckTargetExistsResult { task_id, exists, target_dir }) => {
                                tracing::debug!(
                                    task_id = %task_id,
                                    exists = exists,
                                    target_dir = %target_dir,
                                    "Check target exists result received"
                                );

                                // Broadcast as task output so UI can use the result
                                let output_text = if exists {
                                    format!("Target directory exists: {}", target_dir)
                                } else {
                                    format!("Target directory does not exist: {}", target_dir)
                                };
                                state.broadcast_task_output(TaskOutputNotification {
                                    task_id,
                                    owner_id: Some(owner_id),
                                    message_type: "system".to_string(),
                                    content: output_text,
                                    tool_name: None,
                                    tool_input: None,
                                    is_error: None,
                                    cost_usd: None,
                                    duration_ms: None,
                                    is_partial: false,
                                });
                            }
                            Ok(DaemonMessage::RepoFileContent {
                                request_id,
                                file_path,
                                content,
                                success,
                                error,
                            }) => {
                                tracing::info!(
                                    request_id = %request_id,
                                    file_path = %file_path,
                                    success = success,
                                    content_len = content.as_ref().map(|c| c.len()),
                                    error = ?error,
                                    "Repo file content received from daemon"
                                );

                                // The request_id is the file_id we want to update
                                if success {
                                    if let (Some(pool), Some(content)) = (&state.db_pool, content) {
                                        // Convert markdown to body elements
                                        let body = crate::llm::markdown_to_body(&content);

                                        // Update file in database
                                        let update_req = crate::db::models::UpdateFileRequest {
                                            name: None,
                                            description: None,
                                            transcript: None,
                                            summary: None,
                                            body: Some(body),
                                            version: None,
                                            repo_file_path: None,
                                        };

                                        match repository::update_file_for_owner(pool, request_id, owner_id, update_req).await {
                                            Ok(Some(_file)) => {
                                                tracing::info!(
                                                    file_id = %request_id,
                                                    "File synced from repository successfully"
                                                );

                                                // Update repo_sync_status to 'synced' and set repo_synced_at
                                                if let Err(e) = sqlx::query(
                                                    "UPDATE files SET repo_sync_status = 'synced', repo_synced_at = NOW() WHERE id = $1"
                                                )
                                                .bind(request_id)
                                                .execute(pool)
                                                .await
                                                {
                                                    tracing::warn!(
                                                        file_id = %request_id,
                                                        error = %e,
                                                        "Failed to update repo sync status"
                                                    );
                                                }

                                                // Broadcast file update notification
                                                state.broadcast_file_update(crate::server::state::FileUpdateNotification {
                                                    file_id: request_id,
                                                    version: 0, // Will be updated by next fetch
                                                    updated_fields: vec!["body".to_string(), "repo_sync_status".to_string()],
                                                    updated_by: "daemon".to_string(),
                                                });
                                            }
                                            Ok(None) => {
                                                tracing::warn!(
                                                    file_id = %request_id,
                                                    "File not found when syncing from repository"
                                                );
                                            }
                                            Err(e) => {
                                                tracing::error!(
                                                    file_id = %request_id,
                                                    error = %e,
                                                    "Failed to update file from repository content"
                                                );
                                            }
                                        }
                                    }
                                } else {
                                    tracing::warn!(
                                        file_id = %request_id,
                                        error = ?error,
                                        "Daemon failed to read repo file"
                                    );
                                }
                            }
                            Ok(DaemonMessage::BranchCreated { task_id, branch_name, success, error }) => {
                                tracing::info!(
                                    task_id = ?task_id,
                                    branch_name = %branch_name,
                                    success = success,
                                    error = ?error,
                                    "Branch created notification received"
                                );

                                // Broadcast as task output if we have a task_id
                                if let Some(tid) = task_id {
                                    let output_text = if success {
                                        format!("✓ Branch '{}' created successfully", branch_name)
                                    } else {
                                        format!("✗ Failed to create branch '{}': {}", branch_name, error.unwrap_or_default())
                                    };
                                    state.broadcast_task_output(TaskOutputNotification {
                                        task_id: tid,
                                        owner_id: Some(owner_id),
                                        message_type: "system".to_string(),
                                        content: output_text,
                                        tool_name: None,
                                        tool_input: None,
                                        is_error: Some(!success),
                                        cost_usd: None,
                                        duration_ms: None,
                                        is_partial: false,
                                    });
                                }
                            }
                            Ok(DaemonMessage::CheckpointCreated {
                                task_id,
                                success,
                                commit_sha,
                                branch_name,
                                checkpoint_number: _,  // We'll get from DB
                                files_changed,
                                lines_added,
                                lines_removed,
                                error,
                                message,
                                patch_data,
                                patch_base_sha,
                                patch_files_count,
                            }) => {
                                tracing::info!(
                                    task_id = %task_id,
                                    success = success,
                                    commit_sha = ?commit_sha,
                                    has_patch = patch_data.is_some(),
                                    "Checkpoint created notification received"
                                );

                                if success {
                                    if let (Some(sha), Some(branch)) = (commit_sha.clone(), branch_name.clone()) {
                                        // Store checkpoint in database
                                        if let Some(pool) = state.db_pool.as_ref() {
                                            match repository::create_task_checkpoint(
                                                pool,
                                                task_id,
                                                &sha,
                                                &branch,
                                                &message,
                                                files_changed.clone(),
                                                lines_added,
                                                lines_removed,
                                            ).await {
                                                Ok(checkpoint) => {
                                                    tracing::info!(
                                                        task_id = %task_id,
                                                        checkpoint_id = %checkpoint.id,
                                                        checkpoint_number = checkpoint.checkpoint_number,
                                                        "Checkpoint stored in database"
                                                    );

                                                    // Store patch if provided (for task recovery)
                                                    if let (Some(patch_b64), Some(base_sha)) = (&patch_data, &patch_base_sha) {
                                                        // Decode base64 patch data
                                                        match base64::engine::general_purpose::STANDARD.decode(patch_b64) {
                                                            Ok(patch_bytes) => {
                                                                let files_count = patch_files_count.unwrap_or(0);
                                                                // Default TTL: 7 days (168 hours)
                                                                let ttl_hours = 168i64;
                                                                match repository::create_checkpoint_patch(
                                                                    pool,
                                                                    task_id,
                                                                    Some(checkpoint.id),
                                                                    base_sha,
                                                                    &patch_bytes,
                                                                    files_count,
                                                                    ttl_hours,
                                                                ).await {
                                                                    Ok(patch) => {
                                                                        tracing::info!(
                                                                            task_id = %task_id,
                                                                            patch_id = %patch.id,
                                                                            patch_size = patch_bytes.len(),
                                                                            "Checkpoint patch stored for recovery"
                                                                        );
                                                                    }
                                                                    Err(e) => {
                                                                        tracing::warn!(
                                                                            task_id = %task_id,
                                                                            error = %e,
                                                                            "Failed to store checkpoint patch"
                                                                        );
                                                                    }
                                                                }
                                                            }
                                                            Err(e) => {
                                                                tracing::warn!(
                                                                    task_id = %task_id,
                                                                    error = %e,
                                                                    "Failed to decode patch base64 data"
                                                                );
                                                            }
                                                        }
                                                    }

                                                    // Broadcast success as task output
                                                    state.broadcast_task_output(TaskOutputNotification {
                                                        task_id,
                                                        owner_id: Some(owner_id),
                                                        message_type: "system".to_string(),
                                                        content: format!(
                                                            "✓ Checkpoint #{} created: {} ({})",
                                                            checkpoint.checkpoint_number,
                                                            message,
                                                            &sha[..7.min(sha.len())]
                                                        ),
                                                        tool_name: None,
                                                        tool_input: None,
                                                        is_error: Some(false),
                                                        cost_usd: None,
                                                        duration_ms: None,
                                                        is_partial: false,
                                                    });

                                                    // Record history event for checkpoint
                                                    // Get task to get contract_id
                                                    if let Ok(Some(task)) = repository::get_task(pool, task_id).await {
                                                        let _ = repository::record_history_event(
                                                            pool,
                                                            task.owner_id,
                                                            task.contract_id,
                                                            Some(task_id),
                                                            "checkpoint",
                                                            Some("created"),
                                                            None,
                                                            serde_json::json!({
                                                                "checkpointNumber": checkpoint.checkpoint_number,
                                                                "commitSha": &sha,
                                                                "message": &message,
                                                                "filesChanged": files_changed,
                                                                "linesAdded": lines_added,
                                                                "linesRemoved": lines_removed,
                                                                "hasPatch": patch_data.is_some(),
                                                            }),
                                                        ).await;
                                                    }
                                                }
                                                Err(e) => {
                                                    tracing::error!(error = %e, "Failed to store checkpoint in database");
                                                    state.broadcast_task_output(TaskOutputNotification {
                                                        task_id,
                                                        owner_id: Some(owner_id),
                                                        message_type: "error".to_string(),
                                                        content: format!("Checkpoint commit succeeded but DB storage failed: {}", e),
                                                        tool_name: None,
                                                        tool_input: None,
                                                        is_error: Some(true),
                                                        cost_usd: None,
                                                        duration_ms: None,
                                                        is_partial: false,
                                                    });
                                                }
                                            }
                                        }
                                    } else if let (Some(patch_b64), Some(base_sha)) = (&patch_data, &patch_base_sha) {
                                        // Ephemeral patch-only checkpoint (no git commit)
                                        // Store patch directly in checkpoint_patches without a task_checkpoint
                                        if let Some(pool) = state.db_pool.as_ref() {
                                            match base64::engine::general_purpose::STANDARD.decode(patch_b64) {
                                                Ok(patch_bytes) => {
                                                    let files_count = patch_files_count.unwrap_or(0);
                                                    // Default TTL: 7 days (168 hours)
                                                    let ttl_hours = 168i64;
                                                    match repository::create_checkpoint_patch(
                                                        pool,
                                                        task_id,
                                                        None, // No checkpoint_id for ephemeral patches
                                                        base_sha,
                                                        &patch_bytes,
                                                        files_count,
                                                        ttl_hours,
                                                    ).await {
                                                        Ok(patch) => {
                                                            tracing::info!(
                                                                task_id = %task_id,
                                                                patch_id = %patch.id,
                                                                patch_size = patch_bytes.len(),
                                                                files_count = files_count,
                                                                "Ephemeral patch stored for recovery"
                                                            );

                                                            state.broadcast_task_output(TaskOutputNotification {
                                                                task_id,
                                                                owner_id: Some(owner_id),
                                                                message_type: "system".to_string(),
                                                                content: format!(
                                                                    "✓ Patch saved: {} ({} files)",
                                                                    message,
                                                                    files_count
                                                                ),
                                                                tool_name: None,
                                                                tool_input: None,
                                                                is_error: Some(false),
                                                                cost_usd: None,
                                                                duration_ms: None,
                                                                is_partial: false,
                                                            });
                                                        }
                                                        Err(e) => {
                                                            tracing::warn!(
                                                                task_id = %task_id,
                                                                error = %e,
                                                                "Failed to store ephemeral patch"
                                                            );
                                                        }
                                                    }
                                                }
                                                Err(e) => {
                                                    tracing::warn!(
                                                        task_id = %task_id,
                                                        error = %e,
                                                        "Failed to decode ephemeral patch base64 data"
                                                    );
                                                }
                                            }
                                        }
                                    }
                                } else {
                                    // Broadcast failure
                                    let error_msg = error.unwrap_or_else(|| "Unknown error".to_string());
                                    state.broadcast_task_output(TaskOutputNotification {
                                        task_id,
                                        owner_id: Some(owner_id),
                                        message_type: "error".to_string(),
                                        content: format!("✗ Checkpoint failed: {}", error_msg),
                                        tool_name: None,
                                        tool_input: None,
                                        is_error: Some(true),
                                        cost_usd: None,
                                        duration_ms: None,
                                        is_partial: false,
                                    });
                                }
                            }
                            Ok(DaemonMessage::MergeToTargetResult {
                                task_id,
                                success,
                                message,
                                commit_sha,
                                conflicts,
                            }) => {
                                tracing::info!(
                                    task_id = %task_id,
                                    success = success,
                                    "Merge to target result received"
                                );

                                // Broadcast the merge result for waiting handlers
                                state.broadcast_merge_result(crate::server::state::MergeResultNotification {
                                    task_id,
                                    success,
                                    message: message.clone(),
                                    commit_sha: commit_sha.clone(),
                                    conflicts: conflicts.clone(),
                                });

                                // On successful merge, notify supervisor to check if all merges complete
                                if success {
                                    if let Some(pool) = state.db_pool.as_ref() {
                                        if let Ok(Some(task)) = repository::get_task(pool, task_id).await {
                                            if let Some(contract_id) = task.contract_id {
                                                if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(pool, contract_id).await {
                                                    let prompt = format!(
                                                        "[INFO] Merge completed: {}\n\
                                                        Check if all tasks are merged with `makima supervisor tasks`.\n\
                                                        If ready, create PR with `makima supervisor pr`.",
                                                        message
                                                    );
                                                    let _ = state.notify_supervisor(
                                                        supervisor.id,
                                                        supervisor.daemon_id,
                                                        &prompt,
                                                    ).await;
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                            Ok(DaemonMessage::PRCreated {
                                task_id,
                                success,
                                message,
                                pr_url,
                                pr_number,
                            }) => {
                                tracing::info!(
                                    task_id = %task_id,
                                    success = success,
                                    pr_url = ?pr_url,
                                    pr_number = ?pr_number,
                                    "PR created result received"
                                );

                                // Broadcast the PR result for waiting handlers
                                state.broadcast_pr_result(crate::server::state::PrResultNotification {
                                    task_id,
                                    success,
                                    message: message.clone(),
                                    pr_url: pr_url.clone(),
                                    pr_number,
                                });

                                // Notify supervisor of PR result (both success and failure)
                                if let Some(pool) = state.db_pool.as_ref() {
                                    if let Ok(Some(task)) = repository::get_task(pool, task_id).await {
                                        if let Some(contract_id) = task.contract_id {
                                            if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(pool, contract_id).await {
                                                let prompt = if success {
                                                    // Get contract to determine next action
                                                    let next_action = if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, task.owner_id).await {
                                                        match (contract.contract_type.as_str(), contract.phase.as_str()) {
                                                            ("simple", "execute") => {
                                                                "Mark contract complete with `makima supervisor complete`".to_string()
                                                            }
                                                            ("specification", "execute") => {
                                                                "Advance to review phase with `makima supervisor advance-phase review`".to_string()
                                                            }
                                                            _ => "Check contract status with `makima supervisor status`".to_string()
                                                        }
                                                    } else {
                                                        "Check contract status with `makima supervisor status`".to_string()
                                                    };

                                                    format!(
                                                        "[ACTION REQUIRED] PR created successfully!\n\
                                                        PR: {}\n\n\
                                                        Next step: {}",
                                                        pr_url.as_deref().unwrap_or(&message),
                                                        next_action
                                                    )
                                                } else {
                                                    format!(
                                                        "[ERROR] PR creation failed for task {}:\n\
                                                        {}\n\n\
                                                        Please fix the issue and retry with `makima supervisor pr`.",
                                                        task_id,
                                                        message
                                                    )
                                                };
                                                let _ = state.notify_supervisor(
                                                    supervisor.id,
                                                    supervisor.daemon_id,
                                                    &prompt,
                                                ).await;
                                            }
                                        }
                                    }
                                }
                            }
                            Ok(DaemonMessage::GitConfigInherited {
                                success,
                                user_email,
                                user_name,
                                error,
                            }) => {
                                if success {
                                    tracing::info!(
                                        daemon_id = %daemon_uuid,
                                        user_email = ?user_email,
                                        user_name = ?user_name,
                                        "Daemon inherited git config"
                                    );
                                } else {
                                    tracing::warn!(
                                        daemon_id = %daemon_uuid,
                                        error = ?error,
                                        "Failed to inherit git config"
                                    );
                                }
                            }
                            Ok(DaemonMessage::ExportPatchCreated {
                                task_id,
                                success,
                                patch_content,
                                files_count,
                                lines_added,
                                lines_removed,
                                base_commit_sha,
                                error,
                            }) => {
                                if success {
                                    tracing::info!(
                                        task_id = %task_id,
                                        files_count = ?files_count,
                                        lines_added = ?lines_added,
                                        lines_removed = ?lines_removed,
                                        base_commit_sha = ?base_commit_sha,
                                        patch_len = patch_content.as_ref().map(|p| p.len()),
                                        "Export patch created successfully"
                                    );

                                    // Broadcast as task output so UI can access the result
                                    let output_text = format!(
                                        "✓ Export patch created: {} files changed, +{} -{} lines (base: {})",
                                        files_count.unwrap_or(0),
                                        lines_added.unwrap_or(0),
                                        lines_removed.unwrap_or(0),
                                        base_commit_sha.as_deref().unwrap_or("unknown")
                                    );
                                    state.broadcast_task_output(TaskOutputNotification {
                                        task_id,
                                        owner_id: Some(owner_id),
                                        message_type: "export_patch".to_string(),
                                        content: output_text,
                                        tool_name: None,
                                        tool_input: Some(serde_json::json!({
                                            "patchContent": patch_content,
                                            "filesCount": files_count,
                                            "linesAdded": lines_added,
                                            "linesRemoved": lines_removed,
                                            "baseCommitSha": base_commit_sha,
                                        })),
                                        is_error: None,
                                        cost_usd: None,
                                        duration_ms: None,
                                        is_partial: false,
                                    });
                                } else {
                                    tracing::warn!(
                                        task_id = %task_id,
                                        error = ?error,
                                        "Failed to create export patch"
                                    );

                                    // Broadcast error
                                    state.broadcast_task_output(TaskOutputNotification {
                                        task_id,
                                        owner_id: Some(owner_id),
                                        message_type: "error".to_string(),
                                        content: format!("✗ Export patch failed: {}", error.unwrap_or_else(|| "Unknown error".to_string())),
                                        tool_name: None,
                                        tool_input: None,
                                        is_error: Some(true),
                                        cost_usd: None,
                                        duration_ms: None,
                                        is_partial: false,
                                    });
                                }
                            }
                            Ok(DaemonMessage::WorktreeInfoResult {
                                task_id,
                                success,
                                worktree_path,
                                exists,
                                files_changed,
                                insertions,
                                deletions,
                                files,
                                branch,
                                head_sha,
                                error,
                            }) => {
                                tracing::debug!(
                                    task_id = %task_id,
                                    success = success,
                                    exists = exists,
                                    files_changed = files_changed,
                                    "Worktree info result received"
                                );

                                // Fulfill pending worktree info request if any
                                if let Some((_, tx)) = state.pending_worktree_info.remove(&task_id) {
                                    let response = crate::server::state::WorktreeInfoResponse {
                                        task_id,
                                        success,
                                        worktree_path,
                                        exists,
                                        files_changed,
                                        insertions,
                                        deletions,
                                        files,
                                        branch,
                                        head_sha,
                                        error,
                                    };
                                    // Ignore send error - receiver may have timed out
                                    let _ = tx.send(response);
                                }
                            }
                            Ok(DaemonMessage::TaskDiff { task_id, success, diff, error }) => {
                                tracing::debug!(
                                    task_id = %task_id,
                                    success = success,
                                    "Task diff result received"
                                );

                                // Fulfill pending task diff request if any
                                if let Some((_, tx)) = state.pending_task_diff.remove(&task_id) {
                                    let _ = tx.send(crate::server::state::TaskDiffResult {
                                        task_id,
                                        success,
                                        diff,
                                        error,
                                    });
                                }
                            }
                            Ok(DaemonMessage::WorktreeCommitResult { task_id, success, commit_sha, error }) => {
                                tracing::debug!(
                                    task_id = %task_id,
                                    success = success,
                                    commit_sha = ?commit_sha,
                                    "Worktree commit result received"
                                );

                                // Fulfill pending worktree commit request if any
                                if let Some((_, tx)) = state.pending_worktree_commit.remove(&task_id) {
                                    let _ = tx.send(crate::server::state::WorktreeCommitResponse {
                                        task_id,
                                        success,
                                        commit_sha,
                                        error,
                                    });
                                }
                            }
                            Ok(DaemonMessage::MergePatchToSupervisor {
                                task_id,
                                supervisor_task_id,
                                patch_data,
                                base_sha,
                            }) => {
                                tracing::info!(
                                    task_id = %task_id,
                                    supervisor_task_id = %supervisor_task_id,
                                    base_sha = %base_sha,
                                    "Received cross-daemon merge patch request"
                                );

                                // Look up the supervisor task to find which daemon it's on
                                if let Some(ref pool) = state.db_pool {
                                    match repository::get_task(pool, supervisor_task_id).await {
                                        Ok(Some(supervisor_task)) => {
                                            if let Some(target_daemon_id) = supervisor_task.daemon_id {
                                                // Send ApplyPatchToWorktree command to the supervisor's daemon
                                                let command = crate::server::state::DaemonCommand::ApplyPatchToWorktree {
                                                    target_task_id: supervisor_task_id,
                                                    source_task_id: task_id,
                                                    patch_data,
                                                    base_sha,
                                                };
                                                match state.send_daemon_command(target_daemon_id, command).await {
                                                    Ok(()) => {
                                                        tracing::info!(
                                                            task_id = %task_id,
                                                            supervisor_task_id = %supervisor_task_id,
                                                            target_daemon_id = %target_daemon_id,
                                                            "Routed patch merge to supervisor's daemon"
                                                        );
                                                    }
                                                    Err(e) => {
                                                        tracing::error!(
                                                            task_id = %task_id,
                                                            supervisor_task_id = %supervisor_task_id,
                                                            target_daemon_id = %target_daemon_id,
                                                            error = %e,
                                                            "Failed to route patch merge to supervisor's daemon"
                                                        );
                                                    }
                                                }
                                            } else {
                                                tracing::warn!(
                                                    task_id = %task_id,
                                                    supervisor_task_id = %supervisor_task_id,
                                                    "Supervisor task has no daemon assigned, cannot route patch merge"
                                                );
                                            }
                                        }
                                        Ok(None) => {
                                            tracing::warn!(
                                                task_id = %task_id,
                                                supervisor_task_id = %supervisor_task_id,
                                                "Supervisor task not found, cannot route patch merge"
                                            );
                                        }
                                        Err(e) => {
                                            tracing::error!(
                                                task_id = %task_id,
                                                supervisor_task_id = %supervisor_task_id,
                                                error = %e,
                                                "Failed to look up supervisor task for patch merge"
                                            );
                                        }
                                    }
                                }
                            }
                            Ok(DaemonMessage::WorktreeDiffResult { task_id, success, diff, error }) => {
                                tracing::debug!(
                                    task_id = %task_id,
                                    success = success,
                                    "Worktree diff result received"
                                );

                                // Fulfill pending worktree diff request if any
                                if let Some((_, tx)) = state.pending_worktree_diff.remove(&task_id) {
                                    let _ = tx.send(crate::server::state::WorktreeDiffResponse {
                                        task_id,
                                        success,
                                        diff: diff.unwrap_or_default(),
                                        error,
                                    });
                                }
                            }
                            Err(e) => {
                                tracing::warn!("Failed to parse daemon message: {}", e);
                            }
                        }
                    }
                    Some(Ok(Message::Close(_))) | None => {
                        tracing::info!("Daemon {} disconnected", daemon_uuid);
                        break;
                    }
                    Some(Err(e)) => {
                        tracing::warn!("Daemon {} WebSocket error: {}", daemon_uuid, e);
                        break;
                    }
                    _ => {}
                }
            }

            // Handle commands to send to daemon
            cmd = cmd_rx.recv() => {
                match cmd {
                    Some(command) => {
                        let json = serde_json::to_string(&command).unwrap();
                        if sender.send(Message::Text(json.into())).await.is_err() {
                            tracing::warn!("Failed to send command to daemon {}", daemon_uuid);
                            break;
                        }
                    }
                    None => {
                        // Channel closed
                        break;
                    }
                }
            }
        }
    }

    // Cleanup on disconnect
    state.unregister_daemon(&connection_id);

    // Delete daemon from database and clear tasks
    if let Some(ref pool) = state.db_pool {
        let pool = pool.clone();
        let conn_id = connection_id.clone();
        tokio::spawn(async move {
            // Delete daemon from database
            if let Err(e) = repository::delete_daemon_by_connection(&pool, &conn_id).await {
                tracing::error!(
                    connection_id = %conn_id,
                    error = %e,
                    "Failed to delete daemon from database"
                );
            }

            // Find tasks assigned to this daemon and mark for retry or fail permanently
            if let Err(e) = handle_daemon_disconnect_tasks(&pool, daemon_uuid).await {
                tracing::error!(
                    daemon_id = %daemon_uuid,
                    error = %e,
                    "Failed to handle daemon disconnect for tasks"
                );
            }
        });
    }
}

/// Handle tasks when daemon disconnects - mark for retry or fail permanently.
async fn handle_daemon_disconnect_tasks(pool: &sqlx::PgPool, daemon_id: Uuid) -> Result<(), sqlx::Error> {
    // Get all active tasks on this daemon
    let active_tasks: Vec<Task> = sqlx::query_as(
        r#"
        SELECT * FROM tasks
        WHERE daemon_id = $1
          AND status IN ('starting', 'running', 'initializing')
        "#,
    )
    .bind(daemon_id)
    .fetch_all(pool)
    .await?;

    if active_tasks.is_empty() {
        return Ok(());
    }

    tracing::info!(
        daemon_id = %daemon_id,
        task_count = active_tasks.len(),
        "Processing tasks for disconnected daemon"
    );

    for task in active_tasks {
        if task.retry_count < task.max_retries {
            // Mark for retry
            match repository::mark_task_for_retry(pool, task.id, daemon_id).await {
                Ok(Some(updated_task)) => {
                    tracing::info!(
                        task_id = %task.id,
                        task_name = %task.name,
                        retry_count = updated_task.retry_count,
                        max_retries = updated_task.max_retries,
                        "Task marked for retry after daemon disconnect"
                    );
                }
                Ok(None) => {
                    tracing::warn!(
                        task_id = %task.id,
                        "Task not found or already at max retries"
                    );
                }
                Err(e) => {
                    tracing::error!(
                        task_id = %task.id,
                        error = %e,
                        "Failed to mark task for retry"
                    );
                }
            }
        } else {
            // Exceeded retries, mark as permanently failed
            if let Err(e) = repository::mark_task_permanently_failed(pool, task.id, daemon_id).await {
                tracing::error!(
                    task_id = %task.id,
                    error = %e,
                    "Failed to mark task as permanently failed"
                );
            } else {
                tracing::warn!(
                    task_id = %task.id,
                    task_name = %task.name,
                    retry_count = task.retry_count + 1,
                    "Task permanently failed: exceeded maximum retries"
                );
            }
        }
    }

    Ok(())
}