//! WebSocket handler for daemon connections. //! //! Daemons connect to report task progress, stream output, and receive commands. //! Each daemon manages Claude Code containers on its local machine. //! //! ## Authentication //! //! Daemons authenticate via the `X-Api-Key` header in the WebSocket upgrade request. //! The API key is validated against the database and the daemon is associated with //! the corresponding owner_id for data isolation. use axum::{ extract::{ws::Message, ws::WebSocket, State, WebSocketUpgrade}, http::{HeaderMap, StatusCode}, response::{IntoResponse, Response}, }; use base64::Engine; use chrono::{DateTime, Utc}; use futures::{SinkExt, StreamExt}; use serde::Deserialize; use sqlx::Row; use tokio::sync::mpsc; use uuid::Uuid; use crate::db::models::Task; use crate::db::repository; use crate::llm::{check_deliverables_met, TaskInfo}; use crate::server::auth::{hash_api_key, API_KEY_HEADER}; use crate::server::messages::ApiError; use crate::server::state::{ DaemonCommand, SharedState, TaskOutputNotification, TaskUpdateNotification, }; // ============================================================================= // Claude Code JSON Output Parsing // ============================================================================= /// Claude Code stream-json message structure #[derive(Debug, Deserialize)] struct ClaudeMessage { #[serde(rename = "type")] msg_type: String, subtype: Option, message: Option, tool_name: Option, tool_input: Option, tool_result: Option, result: Option, cost_usd: Option, duration_ms: Option, error: Option, } #[derive(Debug, Deserialize)] struct ClaudeMessageContent { content: Option>, } #[derive(Debug, Deserialize)] struct ClaudeContentBlock { #[serde(rename = "type")] block_type: String, text: Option, name: Option, input: Option, } #[derive(Debug, Deserialize)] struct ClaudeToolResult { content: Option, is_error: Option, } /// Parse a line of Claude Code output into a structured notification fn parse_claude_output(task_id: Uuid, owner_id: Uuid, line: &str, is_partial: bool) -> Option { let trimmed = line.trim(); if trimmed.is_empty() { return None; } // Try to parse as JSON if trimmed.starts_with('{') { if let Ok(msg) = serde_json::from_str::(trimmed) { return parse_claude_message(task_id, owner_id, msg, is_partial); } } // Not JSON or failed to parse - treat as raw output Some(TaskOutputNotification { task_id, owner_id: Some(owner_id), message_type: "raw".to_string(), content: trimmed.to_string(), tool_name: None, tool_input: None, is_error: None, cost_usd: None, duration_ms: None, is_partial, }) } fn parse_claude_message(task_id: Uuid, owner_id: Uuid, msg: ClaudeMessage, is_partial: bool) -> Option { match msg.msg_type.as_str() { "system" => { // System messages (init, etc.) - include subtype info let content = match msg.subtype.as_deref() { Some("init") => "Session started".to_string(), Some(sub) => format!("System: {}", sub), None => "System message".to_string(), }; Some(TaskOutputNotification { task_id, owner_id: Some(owner_id), message_type: "system".to_string(), content, tool_name: None, tool_input: None, is_error: None, cost_usd: None, duration_ms: None, is_partial, }) } "assistant" => { // Extract text content from message blocks if let Some(message) = msg.message { if let Some(blocks) = message.content { // Check for text blocks let text_content: Vec = blocks .iter() .filter(|b| b.block_type == "text") .filter_map(|b| b.text.clone()) .collect(); if !text_content.is_empty() { return Some(TaskOutputNotification { task_id, owner_id: Some(owner_id), message_type: "assistant".to_string(), content: text_content.join("\n"), tool_name: None, tool_input: None, is_error: None, cost_usd: None, duration_ms: None, is_partial, }); } // Check for tool_use blocks if let Some(tool_block) = blocks.iter().find(|b| b.block_type == "tool_use") { return Some(TaskOutputNotification { task_id, owner_id: Some(owner_id), message_type: "tool_use".to_string(), content: format!("Using tool: {}", tool_block.name.as_deref().unwrap_or("unknown")), tool_name: tool_block.name.clone(), tool_input: tool_block.input.clone(), is_error: None, cost_usd: None, duration_ms: None, is_partial, }); } } } None } "tool_use" => { Some(TaskOutputNotification { task_id, owner_id: Some(owner_id), message_type: "tool_use".to_string(), content: format!("Using tool: {}", msg.tool_name.as_deref().unwrap_or("unknown")), tool_name: msg.tool_name, tool_input: msg.tool_input, is_error: None, cost_usd: None, duration_ms: None, is_partial, }) } "tool_result" => { if let Some(result) = msg.tool_result { let content = result.content.unwrap_or_default(); // Truncate long results let content = if content.len() > 500 { format!("{}...", &content[..500]) } else { content }; Some(TaskOutputNotification { task_id, owner_id: Some(owner_id), message_type: "tool_result".to_string(), content, tool_name: None, tool_input: None, is_error: result.is_error, cost_usd: None, duration_ms: None, is_partial, }) } else { None } } "result" => { Some(TaskOutputNotification { task_id, owner_id: Some(owner_id), message_type: "result".to_string(), content: msg.result.unwrap_or_else(|| "Task completed".to_string()), tool_name: None, tool_input: None, is_error: None, cost_usd: msg.cost_usd, duration_ms: msg.duration_ms, is_partial, }) } "error" => { Some(TaskOutputNotification { task_id, owner_id: Some(owner_id), message_type: "error".to_string(), content: msg.error.unwrap_or_else(|| "An error occurred".to_string()), tool_name: None, tool_input: None, is_error: Some(true), cost_usd: None, duration_ms: None, is_partial, }) } _ => None, // Skip unknown message types } } /// Message from daemon to server. #[derive(Debug, Clone, Deserialize)] #[serde(tag = "type", rename_all = "camelCase")] pub enum DaemonMessage { /// Authentication request (first message required) Authenticate { #[serde(rename = "apiKey")] api_key: String, #[serde(rename = "machineId")] machine_id: String, hostname: String, #[serde(rename = "maxConcurrentTasks")] max_concurrent_tasks: i32, }, /// Periodic heartbeat with current status Heartbeat { #[serde(rename = "activeTasks")] active_tasks: Vec, }, /// Enhanced supervisor heartbeat with detailed state SupervisorHeartbeat { #[serde(rename = "taskId")] task_id: Uuid, #[serde(rename = "contractId")] contract_id: Uuid, /// Supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted state: String, /// Current contract phase phase: String, /// Description of current activity #[serde(rename = "currentActivity")] current_activity: Option, /// Progress percentage (0-100) progress: u8, /// Task IDs the supervisor is waiting on #[serde(rename = "pendingTaskIds")] pending_task_ids: Vec, /// Timestamp of this heartbeat timestamp: DateTime, }, /// Task output streaming (stdout/stderr from Claude Code) TaskOutput { #[serde(rename = "taskId")] task_id: Uuid, output: String, #[serde(rename = "isPartial")] is_partial: bool, }, /// Task status change notification TaskStatusChange { #[serde(rename = "taskId")] task_id: Uuid, #[serde(rename = "oldStatus")] old_status: String, #[serde(rename = "newStatus")] new_status: String, }, /// Task progress update with summary TaskProgress { #[serde(rename = "taskId")] task_id: Uuid, summary: String, }, /// Task completion notification TaskComplete { #[serde(rename = "taskId")] task_id: Uuid, success: bool, error: Option, }, /// Task recovery detected after daemon restart TaskRecoveryDetected { #[serde(rename = "taskId")] task_id: Uuid, #[serde(rename = "previousState")] previous_state: String, #[serde(rename = "worktreeIntact")] worktree_intact: bool, #[serde(rename = "worktreePath")] worktree_path: Option, #[serde(rename = "needsPatch")] needs_patch: bool, }, /// Register a tool key for orchestrator API access RegisterToolKey { #[serde(rename = "taskId")] task_id: Uuid, /// The API key for this orchestrator to use when calling mesh endpoints key: String, }, /// Revoke a tool key when task completes RevokeToolKey { #[serde(rename = "taskId")] task_id: Uuid, }, /// Authentication required - OAuth token expired, provides login URL AuthenticationRequired { /// Task ID that triggered the auth error (if any) #[serde(rename = "taskId")] task_id: Option, /// OAuth login URL for remote authentication #[serde(rename = "loginUrl")] login_url: String, /// Hostname of the daemon requiring auth hostname: Option, }, /// Reauth status update (response to TriggerReauth/SubmitAuthCode commands) ReauthStatus { #[serde(rename = "requestId")] request_id: Uuid, /// Status: "pending", "url_ready", "completed", "failed" status: String, /// OAuth login URL (present when status is "url_ready") #[serde(rename = "loginUrl")] login_url: Option, /// Error message (present when status is "failed") error: Option, /// Whether the OAuth token has been saved to disk #[serde(rename = "tokenSaved", default)] token_saved: bool, }, /// Response to RetryCompletionAction command CompletionActionResult { #[serde(rename = "taskId")] task_id: Uuid, success: bool, message: String, /// PR URL if action was "pr" and successful #[serde(rename = "prUrl")] pr_url: Option, }, /// Report daemon's available directories for task output DaemonDirectories { /// Current working directory of the daemon #[serde(rename = "workingDirectory")] working_directory: String, /// Path to ~/.makima/home directory (for cloning completed work) #[serde(rename = "homeDirectory")] home_directory: String, /// Path to worktrees directory (~/.makima/worktrees) #[serde(rename = "worktreesDirectory")] worktrees_directory: String, }, /// Response to CloneWorktree command CloneWorktreeResult { #[serde(rename = "taskId")] task_id: Uuid, success: bool, message: String, /// The path where the worktree was cloned #[serde(rename = "targetDir")] target_dir: Option, }, /// Response to CheckTargetExists command CheckTargetExistsResult { #[serde(rename = "taskId")] task_id: Uuid, /// Whether the target directory exists exists: bool, /// The path that was checked #[serde(rename = "targetDir")] target_dir: String, }, /// Response to ReadRepoFile command RepoFileContent { /// Request ID from the original command #[serde(rename = "requestId")] request_id: Uuid, /// Path to the file that was read #[serde(rename = "filePath")] file_path: String, /// File content (None if error occurred) content: Option, /// Whether the operation succeeded success: bool, /// Error message if operation failed error: Option, }, /// Notification that a branch was created BranchCreated { #[serde(rename = "taskId")] task_id: Option, /// Name of the branch that was created #[serde(rename = "branchName")] branch_name: String, /// Whether the operation succeeded success: bool, /// Error message if operation failed error: Option, }, /// Notification that a checkpoint was created CheckpointCreated { #[serde(rename = "taskId")] task_id: Uuid, /// Whether the operation succeeded success: bool, /// Commit SHA if successful #[serde(rename = "commitSha")] commit_sha: Option, /// Branch name where checkpoint was created #[serde(rename = "branchName")] branch_name: Option, /// Checkpoint number in sequence #[serde(rename = "checkpointNumber")] checkpoint_number: Option, /// Files changed in this checkpoint #[serde(rename = "filesChanged")] files_changed: Option, /// Lines added #[serde(rename = "linesAdded")] lines_added: Option, /// Lines removed #[serde(rename = "linesRemoved")] lines_removed: Option, /// Error message if operation failed error: Option, /// User-provided checkpoint message message: String, /// Base64-encoded gzip-compressed patch data for recovery #[serde(rename = "patchData", skip_serializing_if = "Option::is_none")] patch_data: Option, /// Commit SHA to apply patch on top of (for recovery) #[serde(rename = "patchBaseSha", skip_serializing_if = "Option::is_none")] patch_base_sha: Option, /// Number of files in the patch #[serde(rename = "patchFilesCount", skip_serializing_if = "Option::is_none")] patch_files_count: Option, }, /// Notification that git config was inherited GitConfigInherited { /// Whether the operation succeeded success: bool, /// Git user.email that was inherited #[serde(rename = "userEmail")] user_email: Option, /// Git user.name that was inherited #[serde(rename = "userName")] user_name: Option, /// Error message if operation failed error: Option, }, /// Response to CreateExportPatch command ExportPatchCreated { #[serde(rename = "taskId")] task_id: Uuid, success: bool, /// The uncompressed, human-readable patch content #[serde(rename = "patchContent")] patch_content: Option, /// Number of files changed #[serde(rename = "filesCount")] files_count: Option, /// Lines added #[serde(rename = "linesAdded")] lines_added: Option, /// Lines removed #[serde(rename = "linesRemoved")] lines_removed: Option, /// The base commit SHA that the patch is diffed against #[serde(rename = "baseCommitSha")] base_commit_sha: Option, /// Error message if failed error: Option, }, /// Response to MergeTaskToTarget command MergeToTargetResult { #[serde(rename = "taskId")] task_id: Uuid, success: bool, message: String, #[serde(rename = "commitSha")] commit_sha: Option, conflicts: Option>, }, /// Response to CreatePR command #[serde(rename = "prCreated")] PRCreated { #[serde(rename = "taskId")] task_id: Uuid, success: bool, message: String, #[serde(rename = "prUrl")] pr_url: Option, #[serde(rename = "prNumber")] pr_number: Option, }, /// Response to GetWorktreeDiff command WorktreeDiffResult { #[serde(rename = "taskId")] task_id: Uuid, success: bool, diff: Option, error: Option, }, /// Response to GetWorktreeInfo command WorktreeInfoResult { #[serde(rename = "taskId")] task_id: Uuid, success: bool, /// Path to the worktree directory #[serde(rename = "worktreePath")] worktree_path: Option, /// Whether the worktree exists exists: bool, /// Number of files changed #[serde(rename = "filesChanged")] files_changed: i32, /// Total lines inserted insertions: i32, /// Total lines deleted deletions: i32, /// Changed files list files: Option, /// Current branch name branch: Option, /// Current HEAD commit SHA #[serde(rename = "headSha")] head_sha: Option, /// Error message if failed error: Option, }, /// Response to GetTaskDiff command TaskDiff { #[serde(rename = "taskId")] task_id: Uuid, success: bool, diff: Option, error: Option, }, /// Response to CommitWorktree command WorktreeCommitResult { #[serde(rename = "taskId")] task_id: Uuid, success: bool, #[serde(rename = "commitSha")] commit_sha: Option, error: Option, }, /// Request to merge a task's patch to supervisor's worktree (cross-daemon case). /// Sent when a task completes on a different daemon than its supervisor. MergePatchToSupervisor { /// The task that completed. #[serde(rename = "taskId")] task_id: Uuid, /// The supervisor task to merge into. #[serde(rename = "supervisorTaskId")] supervisor_task_id: Uuid, /// Base64-gzipped patch data. #[serde(rename = "patchData")] patch_data: String, /// Base commit SHA for the patch. #[serde(rename = "baseSha")] base_sha: String, }, } /// Validated daemon authentication result. #[derive(Debug, Clone)] struct DaemonAuthResult { /// User ID from the API key user_id: Uuid, /// Owner ID for data isolation owner_id: Uuid, } /// Compute an action directive for the supervisor based on deliverable status. /// Returns an [ACTION REQUIRED] message if all deliverables are met. async fn compute_action_directive( pool: &sqlx::PgPool, contract_id: Uuid, owner_id: Uuid, ) -> Option { // Get contract let contract = match repository::get_contract_for_owner(pool, contract_id, owner_id).await { Ok(Some(c)) => c, _ => return None, }; // Get tasks (non-supervisor only) let tasks = match repository::list_tasks_by_contract(pool, contract_id, owner_id).await { Ok(t) => t.into_iter().filter(|t| !t.is_supervisor).collect::>(), _ => return None, }; // Get repositories let repos = match repository::list_contract_repositories(pool, contract_id).await { Ok(r) => r, _ => return None, }; // Get completed deliverables for the current phase let completed_deliverables = contract.get_completed_deliverables(&contract.phase); let task_infos: Vec = tasks .iter() .map(|t| TaskInfo { name: t.name.clone(), status: t.status.clone(), }) .collect(); let has_repository = !repos.is_empty(); // Check deliverables (unused, but kept for future reference) let _check = check_deliverables_met( &contract.phase, &contract.contract_type, &completed_deliverables, &task_infos, has_repository, ); // Generate directive based on deliverable status if contract.phase == "execute" { // Check if all tasks are done but PR deliverable is not marked complete let all_tasks_done = !task_infos.is_empty() && task_infos.iter().all(|t| t.status == "done"); let pr_deliverable_complete = completed_deliverables.contains(&"pull-request".to_string()); if all_tasks_done && !pr_deliverable_complete { let done_count = task_infos.len(); return Some(format!( "[INFO] All {} task(s) completed. System is auto-creating PR.", done_count )); } } None } /// Automatically create a PR when all non-supervisor tasks for a contract are done. /// Only applies to remote-repo contracts in the "execute" phase. /// Fires as a best-effort operation — errors are logged but not propagated. async fn auto_create_pr_if_ready( pool: &sqlx::PgPool, state: &SharedState, contract_id: Uuid, owner_id: Uuid, ) { // 1. Load contract — must be remote (not local_only) and in execute phase let contract = match repository::get_contract_for_owner(pool, contract_id, owner_id).await { Ok(Some(c)) => c, _ => return, }; if contract.local_only || contract.phase != "execute" { return; } // 2. Load non-supervisor tasks — all must be done let tasks = match repository::list_tasks_by_contract(pool, contract_id, owner_id).await { Ok(t) => t, _ => return, }; let non_supervisor_tasks: Vec<_> = tasks.iter().filter(|t| !t.is_supervisor).collect(); if non_supervisor_tasks.is_empty() || !non_supervisor_tasks.iter().all(|t| t.status == "done") { return; } // 3. Check pull-request deliverable not already complete let completed_deliverables = contract.get_completed_deliverables(&contract.phase); if completed_deliverables.contains(&"pull-request".to_string()) { return; } // 4. Check at least one repository has a remote URL let repos = match repository::list_contract_repositories(pool, contract_id).await { Ok(r) => r, _ => return, }; if !repos.iter().any(|r| r.repository_url.is_some()) { return; } // 5. Load supervisor task let supervisor = match repository::get_contract_supervisor_task(pool, contract_id).await { Ok(Some(s)) => s, _ => return, }; // Need supervisor's daemon_id to send command let daemon_id = match supervisor.daemon_id { Some(id) => id, None => return, }; // 6. Construct branch name let sanitized_name: String = supervisor .name .chars() .map(|c| if c.is_alphanumeric() || c == '-' || c == '_' { c } else { '-' }) .collect::() .to_lowercase(); let short_id = &supervisor.id.to_string()[..8]; let branch = format!("makima/{}-{}", sanitized_name, short_id); // 7. Send CreatePR command to supervisor's daemon let command = DaemonCommand::CreatePR { task_id: supervisor.id, title: contract.name.clone(), body: contract.description.clone(), base_branch: supervisor.base_branch.clone(), branch, }; match state.send_daemon_command(daemon_id, command).await { Ok(()) => { tracing::info!( contract_id = %contract_id, supervisor_id = %supervisor.id, "Auto-PR: sent CreatePR command to supervisor daemon" ); } Err(e) => { tracing::warn!( contract_id = %contract_id, error = %e, "Auto-PR: failed to send CreatePR command" ); } } } /// Validate an API key and return (user_id, owner_id). async fn validate_daemon_api_key(pool: &sqlx::PgPool, key: &str) -> Result { let key_hash = hash_api_key(key); // Look up the API key and join with users to get owner_id let row = sqlx::query( r#" SELECT ak.user_id, u.default_owner_id FROM api_keys ak JOIN users u ON u.id = ak.user_id WHERE ak.key_hash = $1 AND ak.revoked_at IS NULL "#, ) .bind(&key_hash) .fetch_optional(pool) .await .map_err(|e| format!("Database error: {}", e))?; match row { Some(row) => { let user_id: Uuid = row.try_get("user_id") .map_err(|e| format!("Failed to get user_id: {}", e))?; let owner_id: Option = row.try_get("default_owner_id") .map_err(|e| format!("Failed to get owner_id: {}", e))?; let owner_id = owner_id.ok_or_else(|| "User has no default owner".to_string())?; // Update last_used_at asynchronously (fire and forget) let pool_clone = pool.clone(); let key_hash_clone = key_hash.clone(); tokio::spawn(async move { let _ = sqlx::query("UPDATE api_keys SET last_used_at = NOW() WHERE key_hash = $1") .bind(&key_hash_clone) .execute(&pool_clone) .await; }); Ok(DaemonAuthResult { user_id, owner_id }) } None => Err("Invalid or revoked API key".to_string()), } } /// WebSocket upgrade handler for daemon connections. /// /// Daemons must authenticate via the `X-Api-Key` header in the WebSocket upgrade request. /// The API key is validated against the database and used to determine the owner_id /// for data isolation. #[utoipa::path( get, path = "/api/v1/mesh/daemons/connect", params( ("X-Api-Key" = String, Header, description = "API key for daemon authentication"), ), responses( (status = 101, description = "WebSocket connection established"), (status = 401, description = "Missing or invalid API key"), (status = 503, description = "Database not configured"), ), tag = "Mesh" )] pub async fn daemon_handler( ws: WebSocketUpgrade, State(state): State, headers: HeaderMap, ) -> Response { // Extract API key from headers let api_key = match headers.get(API_KEY_HEADER).or_else(|| headers.get("x-api-key")) { Some(value) => match value.to_str() { Ok(key) if !key.is_empty() => key.to_string(), _ => { return ( StatusCode::UNAUTHORIZED, axum::Json(ApiError::new("INVALID_API_KEY", "Invalid API key header value")), ) .into_response(); } }, None => { return ( StatusCode::UNAUTHORIZED, axum::Json(ApiError::new("MISSING_API_KEY", "X-Api-Key header required")), ) .into_response(); } }; // Validate API key against database let pool = match state.db_pool.as_ref() { Some(pool) => pool, None => { return ( StatusCode::SERVICE_UNAVAILABLE, axum::Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); } }; let auth_result = match validate_daemon_api_key(pool, &api_key).await { Ok(result) => result, Err(e) => { tracing::warn!("Daemon authentication failed: {}", e); return ( StatusCode::UNAUTHORIZED, axum::Json(ApiError::new("AUTH_FAILED", e)), ) .into_response(); } }; tracing::info!( user_id = %auth_result.user_id, owner_id = %auth_result.owner_id, "Daemon authenticated via API key" ); ws.on_upgrade(move |socket| handle_daemon_connection(socket, state, auth_result)) } async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_result: DaemonAuthResult) { let (mut sender, mut receiver) = socket.split(); // Generate a unique connection ID (daemon ID will come from database) let connection_id = Uuid::new_v4().to_string(); let mut daemon_id: Option = None; let owner_id = auth_result.owner_id; // Create command channel for sending commands to this daemon let (cmd_tx, mut cmd_rx) = mpsc::channel::(64); // Wait for the daemon to send its registration info (hostname, machine_id, etc.) // The daemon is already authenticated via API key header, but we need metadata #[allow(unused_assignments)] let mut registered = false; // Wait for registration message with metadata loop { tokio::select! { msg = receiver.next() => { match msg { Some(Ok(Message::Text(text))) => { match serde_json::from_str::(&text) { Ok(DaemonMessage::Authenticate { api_key: _, machine_id, hostname, max_concurrent_tasks }) => { // API key was already validated via headers, but we use this message // for backward compatibility to get the machine_id and hostname // Register daemon in database first to get the persistent ID let db_daemon_id = if let Some(ref pool) = state.db_pool { match repository::register_daemon( pool, owner_id, &connection_id, Some(&hostname), Some(&machine_id), max_concurrent_tasks as i32, ).await { Ok(db_daemon) => { tracing::info!( daemon_id = %db_daemon.id, owner_id = %owner_id, hostname = %hostname, machine_id = %machine_id, max_concurrent_tasks = max_concurrent_tasks, "Daemon registered in database" ); Some(db_daemon.id) } Err(e) => { tracing::error!( error = %e, "Failed to register daemon in database" ); None } } } else { None }; // If database registration failed, we can't proceed let Some(actual_daemon_id) = db_daemon_id else { tracing::error!("Cannot proceed without database daemon ID"); break; }; daemon_id = Some(actual_daemon_id); // Register daemon in state with owner_id using database ID state.register_daemon( connection_id.clone(), actual_daemon_id, owner_id, Some(hostname), Some(machine_id), cmd_tx.clone(), ); registered = true; // Send authentication confirmation with database ID let response = DaemonCommand::Authenticated { daemon_id: actual_daemon_id }; let json = serde_json::to_string(&response).unwrap(); if sender.send(Message::Text(json.into())).await.is_err() { break; } break; // Exit registration loop, continue to main loop } Ok(_) => { // Non-auth message before registration - still requires registration message let response = DaemonCommand::Error { code: "NOT_REGISTERED".into(), message: "Must send registration message (Authenticate) first".into(), }; let json = serde_json::to_string(&response).unwrap(); let _ = sender.send(Message::Text(json.into())).await; } Err(e) => { let response = DaemonCommand::Error { code: "PARSE_ERROR".into(), message: e.to_string(), }; let json = serde_json::to_string(&response).unwrap(); let _ = sender.send(Message::Text(json.into())).await; } } } Some(Ok(Message::Close(_))) | None => { tracing::debug!("Daemon disconnected during registration"); return; } Some(Err(e)) => { tracing::warn!("Daemon WebSocket error during registration: {}", e); return; } _ => {} } } } } if !registered { return; } // daemon_id is guaranteed to be Some here since registered is true let daemon_uuid = daemon_id.expect("daemon_id should be set when registered is true"); // Main message loop after authentication loop { tokio::select! { // Handle incoming messages from daemon msg = receiver.next() => { match msg { Some(Ok(Message::Text(text))) => { match serde_json::from_str::(&text) { Ok(DaemonMessage::Heartbeat { active_tasks }) => { tracing::trace!( "Daemon {} heartbeat: {} active tasks", daemon_uuid, active_tasks.len() ); // Update daemon last_heartbeat_at in DB if let Some(ref pool) = state.db_pool { let pool = pool.clone(); let daemon_id = daemon_uuid; tokio::spawn(async move { if let Err(e) = repository::update_daemon_heartbeat(&pool, daemon_id).await { tracing::warn!( daemon_id = %daemon_id, error = %e, "Failed to update daemon heartbeat" ); } }); } } Ok(DaemonMessage::SupervisorHeartbeat { task_id, contract_id, state: supervisor_state, phase, current_activity, progress, pending_task_ids, timestamp: _, }) => { tracing::debug!( task_id = %task_id, contract_id = %contract_id, state = %supervisor_state, phase = %phase, progress = progress, "Supervisor heartbeat received" ); // Store heartbeat in database and update supervisor state (Task 3.3) if let Some(ref pool) = state.db_pool { let pool = pool.clone(); let pending_ids = pending_task_ids.clone(); let activity = current_activity.clone(); let state_str = supervisor_state.clone(); let phase_str = phase.clone(); tokio::spawn(async move { // Store the heartbeat record if let Err(e) = repository::create_supervisor_heartbeat( &pool, task_id, contract_id, &state_str, &phase_str, activity.as_deref(), progress as i32, &pending_ids, ).await { tracing::warn!( task_id = %task_id, contract_id = %contract_id, error = %e, "Failed to store supervisor heartbeat" ); } // Update supervisor_states table (lightweight heartbeat state update - Task 3.3) if let Err(e) = repository::update_supervisor_heartbeat_state( &pool, contract_id, &state_str, activity.as_deref(), progress as i32, &pending_ids, ).await { tracing::debug!( contract_id = %contract_id, error = %e, "Failed to update supervisor state from heartbeat (may not exist yet)" ); } // Also update the daemon heartbeat if let Ok(Some(task)) = repository::get_task(&pool, task_id).await { if let Some(daemon_id) = task.daemon_id { if let Err(e) = repository::update_daemon_heartbeat(&pool, daemon_id).await { tracing::warn!( daemon_id = %daemon_id, error = %e, "Failed to update daemon heartbeat from supervisor" ); } } } }); } } Ok(DaemonMessage::TaskOutput { task_id, output, is_partial }) => { // Parse the output line and broadcast structured data if let Some(notification) = parse_claude_output(task_id, owner_id, &output, is_partial) { // Broadcast to connected clients state.broadcast_task_output(notification.clone()); // Persist to database (fire and forget) if let Some(ref pool) = state.db_pool { let pool = pool.clone(); let notification = notification.clone(); tokio::spawn(async move { if let Err(e) = repository::save_task_output( &pool, notification.task_id, ¬ification.message_type, ¬ification.content, notification.tool_name.as_deref(), notification.tool_input.clone(), notification.is_error, notification.cost_usd, notification.duration_ms, ).await { tracing::warn!( task_id = %notification.task_id, "Failed to persist task output: {}", e ); } }); } } } Ok(DaemonMessage::TaskStatusChange { task_id, old_status, new_status }) => { tracing::info!( "Task {} status change: {} -> {}", task_id, old_status, new_status ); // Update task status in database and broadcast if let Some(ref pool) = state.db_pool { let pool = pool.clone(); let state = state.clone(); let new_status_owned = new_status.clone(); tokio::spawn(async move { match repository::update_task_status( &pool, task_id, &new_status_owned, None, ).await { Ok(Some(updated_task)) => { state.broadcast_task_update(TaskUpdateNotification { task_id, owner_id: Some(owner_id), version: updated_task.version, status: new_status_owned.clone(), updated_fields: vec!["status".into()], updated_by: "daemon".into(), }); // Initialize or restore supervisor_state when supervisor task starts running (Task 3.4) if updated_task.is_supervisor && new_status_owned == "running" { if let Some(contract_id) = updated_task.contract_id { // Check if supervisor state already exists (restoration scenario) match repository::get_supervisor_state(&pool, contract_id).await { Ok(Some(existing_state)) => { // State exists - this is a restoration tracing::info!( task_id = %task_id, contract_id = %contract_id, existing_state = %existing_state.state, restoration_count = existing_state.restoration_count, "Supervisor starting with existing state - restoration in progress" ); // Mark as restored (increments restoration_count) match repository::mark_supervisor_restored( &pool, contract_id, "daemon_restart", ).await { Ok(restored_state) => { tracing::info!( task_id = %task_id, contract_id = %contract_id, restoration_count = restored_state.restoration_count, "Supervisor restoration marked" ); // Check for pending questions to re-deliver if let Ok(questions) = serde_json::from_value::>( restored_state.pending_questions.clone() ) { if !questions.is_empty() { tracing::info!( contract_id = %contract_id, question_count = questions.len(), "Pending questions found for re-delivery" ); // Questions will be re-delivered by the supervisor when it restores } } } Err(e) => { tracing::warn!( task_id = %task_id, contract_id = %contract_id, error = %e, "Failed to mark supervisor as restored" ); } } } Ok(None) => { // No existing state - fresh start // Get contract to get its phase match repository::get_contract_for_owner( &pool, contract_id, updated_task.owner_id, ).await { Ok(Some(contract)) => { match repository::upsert_supervisor_state( &pool, contract_id, task_id, serde_json::json!([]), // Empty conversation &[], // No pending tasks &contract.phase, ).await { Ok(_) => { tracing::info!( task_id = %task_id, contract_id = %contract_id, phase = %contract.phase, "Initialized fresh supervisor state" ); } Err(e) => { tracing::warn!( task_id = %task_id, contract_id = %contract_id, error = %e, "Failed to initialize supervisor state" ); } } } Ok(None) => { tracing::warn!( task_id = %task_id, contract_id = %contract_id, "Contract not found when initializing supervisor state" ); } Err(e) => { tracing::warn!( task_id = %task_id, contract_id = %contract_id, error = %e, "Failed to get contract for supervisor state" ); } } } Err(e) => { tracing::warn!( task_id = %task_id, contract_id = %contract_id, error = %e, "Failed to check existing supervisor state" ); } } } } // Record history event when task starts running if new_status_owned == "running" { let _ = repository::record_history_event( &pool, updated_task.owner_id, updated_task.contract_id, Some(task_id), "task", Some("started"), None, serde_json::json!({ "name": &updated_task.name, "isSupervisor": updated_task.is_supervisor, }), ).await; } } Ok(None) => { tracing::warn!( task_id = %task_id, "Task not found when updating status" ); } Err(e) => { tracing::error!( task_id = %task_id, "Failed to update task status: {}", e ); } } }); } else { // No DB, just broadcast state.broadcast_task_update(TaskUpdateNotification { task_id, owner_id: Some(owner_id), version: 0, status: new_status, updated_fields: vec!["status".into()], updated_by: "daemon".into(), }); } } Ok(DaemonMessage::TaskProgress { task_id, summary }) => { tracing::debug!("Task {} progress: {}", task_id, summary); // TODO: Update task progress_summary in database state.broadcast_task_update(TaskUpdateNotification { task_id, owner_id: Some(owner_id), version: 0, status: "running".into(), updated_fields: vec!["progress_summary".into()], updated_by: "daemon".into(), }); } Ok(DaemonMessage::TaskComplete { task_id, success, error }) => { let status = if success { "done" } else { "failed" }; tracing::info!( "Task {} completed: success={}, error={:?}", task_id, success, error ); // Revoke any tool keys for this task state.revoke_tool_key(task_id); // Update task in database with completion info if let Some(ref pool) = state.db_pool { let pool = pool.clone(); let state = state.clone(); let error_clone = error.clone(); tokio::spawn(async move { match repository::complete_task( &pool, task_id, success, error_clone.as_deref(), ).await { Ok(Some(updated_task)) => { state.broadcast_task_update(TaskUpdateNotification { task_id, owner_id: Some(owner_id), version: updated_task.version, status: updated_task.status.clone(), updated_fields: vec![ "status".into(), "completed_at".into(), "error_message".into(), ], updated_by: "daemon".into(), }); // Notify supervisor if this task belongs to a contract if let Some(contract_id) = updated_task.contract_id { // Don't notify for supervisor tasks (they don't report to themselves) if !updated_task.is_supervisor { if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await { // Compute action directive if task completed successfully let action_directive = if updated_task.status == "done" { compute_action_directive(&pool, contract_id, owner_id).await } else { None }; state.notify_supervisor_of_task_completion( supervisor.id, supervisor.daemon_id, updated_task.id, &updated_task.name, &updated_task.status, updated_task.progress_summary.as_deref(), updated_task.error_message.as_deref(), action_directive.as_deref(), ).await; } } } // Auto-create PR if all tasks are done and repo is remote if updated_task.status == "done" { if let Some(contract_id) = updated_task.contract_id { let pool_c = pool.clone(); let state_c = state.clone(); tokio::spawn(async move { auto_create_pr_if_ready(&pool_c, &state_c, contract_id, owner_id).await; }); } } // Record history event for task completion let subtype = if updated_task.status == "done" { "completed" } else { "failed" }; let _ = repository::record_history_event( &pool, updated_task.owner_id, updated_task.contract_id, Some(task_id), "task", Some(subtype), None, serde_json::json!({ "name": &updated_task.name, "status": &updated_task.status, "error": &updated_task.error_message, }), ).await; // Auto-advance directive DAG when a directive step task completes if let Some(step_id) = updated_task.directive_step_id { let step_status = if updated_task.status == "done" { "completed" } else { "failed" }; let step_update = crate::db::models::UpdateDirectiveStepRequest { status: Some(step_status.to_string()), ..Default::default() }; let _ = repository::update_directive_step(&pool, step_id, step_update).await; if let Some(directive_id) = updated_task.directive_id { // Advance newly-ready steps in the DAG let _ = repository::advance_directive_ready_steps(&pool, directive_id).await; // Check if all steps are done → set directive to idle let _ = repository::check_directive_idle(&pool, directive_id).await; } } else if let Some(directive_id) = updated_task.directive_id { // Planning/orchestrator task completed — clear orchestrator_task_id let _ = repository::clear_orchestrator_task(&pool, directive_id).await; // Advance DAG — planning task should have created steps let _ = repository::advance_directive_ready_steps(&pool, directive_id).await; if updated_task.status != "done" { // Planning failed — pause directive let _ = repository::set_directive_status(&pool, updated_task.owner_id, directive_id, "paused").await; } } } Ok(None) => { tracing::warn!( task_id = %task_id, "Task not found when completing" ); } Err(e) => { tracing::error!( task_id = %task_id, "Failed to complete task: {}", e ); } } }); } else { // No DB, just broadcast state.broadcast_task_update(TaskUpdateNotification { task_id, owner_id: Some(owner_id), version: 0, status: status.into(), updated_fields: vec!["status".into(), "completed_at".into()], updated_by: "daemon".into(), }); } } Ok(DaemonMessage::TaskRecoveryDetected { task_id, previous_state, worktree_intact, worktree_path, needs_patch, }) => { tracing::info!( task_id = %task_id, previous_state = %previous_state, worktree_intact = worktree_intact, worktree_path = ?worktree_path, needs_patch = needs_patch, "Task recovery detected after daemon restart" ); // Update task in database based on recovery state if let Some(ref pool) = state.db_pool { let pool = pool.clone(); let state = state.clone(); tokio::spawn(async move { if worktree_intact { // Worktree exists - task can be resumed on this daemon // Update task status to 'pending' so it can be picked up match sqlx::query( r#" UPDATE tasks SET status = 'pending', daemon_id = NULL, error_message = 'Daemon restarted - task ready for resumption', interrupted_at = NOW(), updated_at = NOW() WHERE id = $1 AND owner_id = $2 RETURNING id "#, ) .bind(task_id) .bind(owner_id) .fetch_optional(&pool) .await { Ok(Some(_)) => { tracing::info!( task_id = %task_id, "Task marked as pending for resumption" ); state.broadcast_task_update(TaskUpdateNotification { task_id, owner_id: Some(owner_id), version: 0, status: "pending".into(), updated_fields: vec![ "status".into(), "daemon_id".into(), "interrupted_at".into(), ], updated_by: "daemon_recovery".into(), }); } Ok(None) => { tracing::warn!( task_id = %task_id, "Task not found during recovery update" ); } Err(e) => { tracing::error!( task_id = %task_id, error = %e, "Failed to update task during recovery" ); } } } else { // Worktree missing - mark for retry with patch restoration match repository::mark_task_for_retry( &pool, task_id, daemon_uuid, // Mark this daemon as failed ).await { Ok(Some(_)) => { tracing::info!( task_id = %task_id, "Task marked for retry (worktree missing)" ); } Ok(None) => { tracing::warn!( task_id = %task_id, "Task not found or exceeded retries" ); } Err(e) => { tracing::error!( task_id = %task_id, error = %e, "Failed to mark task for retry" ); } } } }); } } Ok(DaemonMessage::Authenticate { .. }) => { // Already authenticated, ignore } Ok(DaemonMessage::RegisterToolKey { task_id, key }) => { tracing::info!( task_id = %task_id, "Registering tool key for orchestrator" ); state.register_tool_key(key, task_id); } Ok(DaemonMessage::RevokeToolKey { task_id }) => { tracing::info!( task_id = %task_id, "Revoking tool key for task" ); state.revoke_tool_key(task_id); } Ok(DaemonMessage::AuthenticationRequired { task_id, login_url, hostname }) => { tracing::warn!( task_id = ?task_id, login_url = %login_url, hostname = ?hostname, "Daemon requires authentication - OAuth token expired" ); // Broadcast as task output with auth_required type so UI can display the login link let _content = format!( "🔐 Authentication required on daemon{}. Click to login: {}", hostname.as_ref().map(|h| format!(" ({})", h)).unwrap_or_default(), login_url ); // Broadcast to task subscribers if we have a task_id if let Some(tid) = task_id { tracing::info!(task_id = %tid, "Broadcasting auth_required to task subscribers"); state.broadcast_task_output(TaskOutputNotification { task_id: tid, owner_id: Some(owner_id), message_type: "auth_required".to_string(), content: "Authentication required".to_string(), // Constant for dedup tool_name: None, tool_input: Some(serde_json::json!({ "loginUrl": login_url, "hostname": hostname, "taskId": tid.to_string(), })), is_error: Some(true), cost_usd: None, duration_ms: None, is_partial: false, }); } else { tracing::warn!("No task_id for auth_required - cannot broadcast to specific task"); } // Also log the full URL for manual use tracing::info!( login_url = %login_url, "OAuth login URL available - user should open this in browser" ); } Ok(DaemonMessage::ReauthStatus { request_id, status, login_url, error, token_saved }) => { tracing::info!( daemon_id = %daemon_uuid, request_id = %request_id, status = %status, login_url = ?login_url, error = ?error, token_saved = token_saved, "Daemon reauth status update" ); // Store the reauth status for polling by the frontend state.set_daemon_reauth_status( daemon_uuid, request_id, status.clone(), login_url.clone(), error.clone(), ); } Ok(DaemonMessage::DaemonDirectories { working_directory, home_directory, worktrees_directory }) => { tracing::info!( daemon_id = %daemon_uuid, working_directory = %working_directory, home_directory = %home_directory, worktrees_directory = %worktrees_directory, "Daemon directories received" ); state.update_daemon_directories( &connection_id, working_directory, home_directory, worktrees_directory, ); } Ok(DaemonMessage::CompletionActionResult { task_id, success, message, pr_url }) => { tracing::info!( task_id = %task_id, success = success, message = %message, pr_url = ?pr_url, "Completion action result received" ); // Update task with PR URL if created if let Some(ref url) = pr_url { if let Some(ref pool) = state.db_pool { let update_req = crate::db::models::UpdateTaskRequest { pr_url: Some(url.clone()), ..Default::default() }; if let Err(e) = crate::db::repository::update_task(pool, task_id, update_req).await { tracing::error!("Failed to update task PR URL: {}", e); } } } // Broadcast as task output so UI can see the result let output_text = if success { format!("✓ Completion action succeeded: {}", message) } else { format!("✗ Completion action failed: {}", message) }; state.broadcast_task_output(TaskOutputNotification { task_id, owner_id: Some(owner_id), message_type: "system".to_string(), content: output_text, tool_name: None, tool_input: None, is_error: Some(!success), cost_usd: None, duration_ms: None, is_partial: false, }); } Ok(DaemonMessage::CloneWorktreeResult { task_id, success, message, target_dir }) => { tracing::info!( task_id = %task_id, success = success, message = %message, target_dir = ?target_dir, "Clone worktree result received" ); // Broadcast as task output so UI can see the result let output_text = if success { format!("✓ Clone succeeded: {}", message) } else { format!("✗ Clone failed: {}", message) }; state.broadcast_task_output(TaskOutputNotification { task_id, owner_id: Some(owner_id), message_type: "system".to_string(), content: output_text, tool_name: None, tool_input: None, is_error: Some(!success), cost_usd: None, duration_ms: None, is_partial: false, }); } Ok(DaemonMessage::CheckTargetExistsResult { task_id, exists, target_dir }) => { tracing::debug!( task_id = %task_id, exists = exists, target_dir = %target_dir, "Check target exists result received" ); // Broadcast as task output so UI can use the result let output_text = if exists { format!("Target directory exists: {}", target_dir) } else { format!("Target directory does not exist: {}", target_dir) }; state.broadcast_task_output(TaskOutputNotification { task_id, owner_id: Some(owner_id), message_type: "system".to_string(), content: output_text, tool_name: None, tool_input: None, is_error: None, cost_usd: None, duration_ms: None, is_partial: false, }); } Ok(DaemonMessage::RepoFileContent { request_id, file_path, content, success, error, }) => { tracing::info!( request_id = %request_id, file_path = %file_path, success = success, content_len = content.as_ref().map(|c| c.len()), error = ?error, "Repo file content received from daemon" ); // The request_id is the file_id we want to update if success { if let (Some(pool), Some(content)) = (&state.db_pool, content) { // Convert markdown to body elements let body = crate::llm::markdown_to_body(&content); // Update file in database let update_req = crate::db::models::UpdateFileRequest { name: None, description: None, transcript: None, summary: None, body: Some(body), version: None, repo_file_path: None, }; match repository::update_file_for_owner(pool, request_id, owner_id, update_req).await { Ok(Some(_file)) => { tracing::info!( file_id = %request_id, "File synced from repository successfully" ); // Update repo_sync_status to 'synced' and set repo_synced_at if let Err(e) = sqlx::query( "UPDATE files SET repo_sync_status = 'synced', repo_synced_at = NOW() WHERE id = $1" ) .bind(request_id) .execute(pool) .await { tracing::warn!( file_id = %request_id, error = %e, "Failed to update repo sync status" ); } // Broadcast file update notification state.broadcast_file_update(crate::server::state::FileUpdateNotification { file_id: request_id, version: 0, // Will be updated by next fetch updated_fields: vec!["body".to_string(), "repo_sync_status".to_string()], updated_by: "daemon".to_string(), }); } Ok(None) => { tracing::warn!( file_id = %request_id, "File not found when syncing from repository" ); } Err(e) => { tracing::error!( file_id = %request_id, error = %e, "Failed to update file from repository content" ); } } } } else { tracing::warn!( file_id = %request_id, error = ?error, "Daemon failed to read repo file" ); } } Ok(DaemonMessage::BranchCreated { task_id, branch_name, success, error }) => { tracing::info!( task_id = ?task_id, branch_name = %branch_name, success = success, error = ?error, "Branch created notification received" ); // Broadcast as task output if we have a task_id if let Some(tid) = task_id { let output_text = if success { format!("✓ Branch '{}' created successfully", branch_name) } else { format!("✗ Failed to create branch '{}': {}", branch_name, error.unwrap_or_default()) }; state.broadcast_task_output(TaskOutputNotification { task_id: tid, owner_id: Some(owner_id), message_type: "system".to_string(), content: output_text, tool_name: None, tool_input: None, is_error: Some(!success), cost_usd: None, duration_ms: None, is_partial: false, }); } } Ok(DaemonMessage::CheckpointCreated { task_id, success, commit_sha, branch_name, checkpoint_number: _, // We'll get from DB files_changed, lines_added, lines_removed, error, message, patch_data, patch_base_sha, patch_files_count, }) => { tracing::info!( task_id = %task_id, success = success, commit_sha = ?commit_sha, has_patch = patch_data.is_some(), "Checkpoint created notification received" ); if success { if let (Some(sha), Some(branch)) = (commit_sha.clone(), branch_name.clone()) { // Store checkpoint in database if let Some(pool) = state.db_pool.as_ref() { match repository::create_task_checkpoint( pool, task_id, &sha, &branch, &message, files_changed.clone(), lines_added, lines_removed, ).await { Ok(checkpoint) => { tracing::info!( task_id = %task_id, checkpoint_id = %checkpoint.id, checkpoint_number = checkpoint.checkpoint_number, "Checkpoint stored in database" ); // Store patch if provided (for task recovery) if let (Some(patch_b64), Some(base_sha)) = (&patch_data, &patch_base_sha) { // Decode base64 patch data match base64::engine::general_purpose::STANDARD.decode(patch_b64) { Ok(patch_bytes) => { let files_count = patch_files_count.unwrap_or(0); // Default TTL: 7 days (168 hours) let ttl_hours = 168i64; match repository::create_checkpoint_patch( pool, task_id, Some(checkpoint.id), base_sha, &patch_bytes, files_count, ttl_hours, ).await { Ok(patch) => { tracing::info!( task_id = %task_id, patch_id = %patch.id, patch_size = patch_bytes.len(), "Checkpoint patch stored for recovery" ); } Err(e) => { tracing::warn!( task_id = %task_id, error = %e, "Failed to store checkpoint patch" ); } } } Err(e) => { tracing::warn!( task_id = %task_id, error = %e, "Failed to decode patch base64 data" ); } } } // Broadcast success as task output state.broadcast_task_output(TaskOutputNotification { task_id, owner_id: Some(owner_id), message_type: "system".to_string(), content: format!( "✓ Checkpoint #{} created: {} ({})", checkpoint.checkpoint_number, message, &sha[..7.min(sha.len())] ), tool_name: None, tool_input: None, is_error: Some(false), cost_usd: None, duration_ms: None, is_partial: false, }); // Record history event for checkpoint // Get task to get contract_id if let Ok(Some(task)) = repository::get_task(pool, task_id).await { let _ = repository::record_history_event( pool, task.owner_id, task.contract_id, Some(task_id), "checkpoint", Some("created"), None, serde_json::json!({ "checkpointNumber": checkpoint.checkpoint_number, "commitSha": &sha, "message": &message, "filesChanged": files_changed, "linesAdded": lines_added, "linesRemoved": lines_removed, "hasPatch": patch_data.is_some(), }), ).await; } } Err(e) => { tracing::error!(error = %e, "Failed to store checkpoint in database"); state.broadcast_task_output(TaskOutputNotification { task_id, owner_id: Some(owner_id), message_type: "error".to_string(), content: format!("Checkpoint commit succeeded but DB storage failed: {}", e), tool_name: None, tool_input: None, is_error: Some(true), cost_usd: None, duration_ms: None, is_partial: false, }); } } } } else if let (Some(patch_b64), Some(base_sha)) = (&patch_data, &patch_base_sha) { // Ephemeral patch-only checkpoint (no git commit) // Store patch directly in checkpoint_patches without a task_checkpoint if let Some(pool) = state.db_pool.as_ref() { match base64::engine::general_purpose::STANDARD.decode(patch_b64) { Ok(patch_bytes) => { let files_count = patch_files_count.unwrap_or(0); // Default TTL: 7 days (168 hours) let ttl_hours = 168i64; match repository::create_checkpoint_patch( pool, task_id, None, // No checkpoint_id for ephemeral patches base_sha, &patch_bytes, files_count, ttl_hours, ).await { Ok(patch) => { tracing::info!( task_id = %task_id, patch_id = %patch.id, patch_size = patch_bytes.len(), files_count = files_count, "Ephemeral patch stored for recovery" ); state.broadcast_task_output(TaskOutputNotification { task_id, owner_id: Some(owner_id), message_type: "system".to_string(), content: format!( "✓ Patch saved: {} ({} files)", message, files_count ), tool_name: None, tool_input: None, is_error: Some(false), cost_usd: None, duration_ms: None, is_partial: false, }); } Err(e) => { tracing::warn!( task_id = %task_id, error = %e, "Failed to store ephemeral patch" ); } } } Err(e) => { tracing::warn!( task_id = %task_id, error = %e, "Failed to decode ephemeral patch base64 data" ); } } } } } else { // Broadcast failure let error_msg = error.unwrap_or_else(|| "Unknown error".to_string()); state.broadcast_task_output(TaskOutputNotification { task_id, owner_id: Some(owner_id), message_type: "error".to_string(), content: format!("✗ Checkpoint failed: {}", error_msg), tool_name: None, tool_input: None, is_error: Some(true), cost_usd: None, duration_ms: None, is_partial: false, }); } } Ok(DaemonMessage::MergeToTargetResult { task_id, success, message, commit_sha, conflicts, }) => { tracing::info!( task_id = %task_id, success = success, "Merge to target result received" ); // Broadcast the merge result for waiting handlers state.broadcast_merge_result(crate::server::state::MergeResultNotification { task_id, success, message: message.clone(), commit_sha: commit_sha.clone(), conflicts: conflicts.clone(), }); // On successful merge, notify supervisor to check if all merges complete if success { if let Some(pool) = state.db_pool.as_ref() { if let Ok(Some(task)) = repository::get_task(pool, task_id).await { if let Some(contract_id) = task.contract_id { if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(pool, contract_id).await { let prompt = format!( "[INFO] Merge completed: {}\n\ Check if all tasks are merged with `makima supervisor tasks`.\n\ If ready, create PR with `makima supervisor pr`.", message ); let _ = state.notify_supervisor( supervisor.id, supervisor.daemon_id, &prompt, ).await; } } } } } } Ok(DaemonMessage::PRCreated { task_id, success, message, pr_url, pr_number, }) => { tracing::info!( task_id = %task_id, success = success, pr_url = ?pr_url, pr_number = ?pr_number, "PR created result received" ); // Broadcast the PR result for waiting handlers state.broadcast_pr_result(crate::server::state::PrResultNotification { task_id, success, message: message.clone(), pr_url: pr_url.clone(), pr_number, }); // Notify supervisor of PR result (both success and failure) if let Some(pool) = state.db_pool.as_ref() { if let Ok(Some(task)) = repository::get_task(pool, task_id).await { if let Some(contract_id) = task.contract_id { if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(pool, contract_id).await { let prompt = if success { // Get contract to determine next action let next_action = if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, task.owner_id).await { match (contract.contract_type.as_str(), contract.phase.as_str()) { ("simple", "execute") => { "Mark contract complete with `makima supervisor complete`".to_string() } ("specification", "execute") => { "Advance to review phase with `makima supervisor advance-phase review`".to_string() } _ => "Check contract status with `makima supervisor status`".to_string() } } else { "Check contract status with `makima supervisor status`".to_string() }; format!( "[ACTION REQUIRED] PR created successfully!\n\ PR: {}\n\n\ Next step: {}", pr_url.as_deref().unwrap_or(&message), next_action ) } else { format!( "[ERROR] PR creation failed for task {}:\n\ {}\n\n\ Please fix the issue and retry with `makima supervisor pr`.", task_id, message ) }; let _ = state.notify_supervisor( supervisor.id, supervisor.daemon_id, &prompt, ).await; } } } } } Ok(DaemonMessage::GitConfigInherited { success, user_email, user_name, error, }) => { if success { tracing::info!( daemon_id = %daemon_uuid, user_email = ?user_email, user_name = ?user_name, "Daemon inherited git config" ); } else { tracing::warn!( daemon_id = %daemon_uuid, error = ?error, "Failed to inherit git config" ); } } Ok(DaemonMessage::ExportPatchCreated { task_id, success, patch_content, files_count, lines_added, lines_removed, base_commit_sha, error, }) => { if success { tracing::info!( task_id = %task_id, files_count = ?files_count, lines_added = ?lines_added, lines_removed = ?lines_removed, base_commit_sha = ?base_commit_sha, patch_len = patch_content.as_ref().map(|p| p.len()), "Export patch created successfully" ); // Broadcast as task output so UI can access the result let output_text = format!( "✓ Export patch created: {} files changed, +{} -{} lines (base: {})", files_count.unwrap_or(0), lines_added.unwrap_or(0), lines_removed.unwrap_or(0), base_commit_sha.as_deref().unwrap_or("unknown") ); state.broadcast_task_output(TaskOutputNotification { task_id, owner_id: Some(owner_id), message_type: "export_patch".to_string(), content: output_text, tool_name: None, tool_input: Some(serde_json::json!({ "patchContent": patch_content, "filesCount": files_count, "linesAdded": lines_added, "linesRemoved": lines_removed, "baseCommitSha": base_commit_sha, })), is_error: None, cost_usd: None, duration_ms: None, is_partial: false, }); } else { tracing::warn!( task_id = %task_id, error = ?error, "Failed to create export patch" ); // Broadcast error state.broadcast_task_output(TaskOutputNotification { task_id, owner_id: Some(owner_id), message_type: "error".to_string(), content: format!("✗ Export patch failed: {}", error.unwrap_or_else(|| "Unknown error".to_string())), tool_name: None, tool_input: None, is_error: Some(true), cost_usd: None, duration_ms: None, is_partial: false, }); } } Ok(DaemonMessage::WorktreeInfoResult { task_id, success, worktree_path, exists, files_changed, insertions, deletions, files, branch, head_sha, error, }) => { tracing::debug!( task_id = %task_id, success = success, exists = exists, files_changed = files_changed, "Worktree info result received" ); // Fulfill pending worktree info request if any if let Some((_, tx)) = state.pending_worktree_info.remove(&task_id) { let response = crate::server::state::WorktreeInfoResponse { task_id, success, worktree_path, exists, files_changed, insertions, deletions, files, branch, head_sha, error, }; // Ignore send error - receiver may have timed out let _ = tx.send(response); } } Ok(DaemonMessage::TaskDiff { task_id, success, diff, error }) => { tracing::debug!( task_id = %task_id, success = success, "Task diff result received" ); // Fulfill pending task diff request if any if let Some((_, tx)) = state.pending_task_diff.remove(&task_id) { let _ = tx.send(crate::server::state::TaskDiffResult { task_id, success, diff, error, }); } } Ok(DaemonMessage::WorktreeCommitResult { task_id, success, commit_sha, error }) => { tracing::debug!( task_id = %task_id, success = success, commit_sha = ?commit_sha, "Worktree commit result received" ); // Fulfill pending worktree commit request if any if let Some((_, tx)) = state.pending_worktree_commit.remove(&task_id) { let _ = tx.send(crate::server::state::WorktreeCommitResponse { task_id, success, commit_sha, error, }); } } Ok(DaemonMessage::MergePatchToSupervisor { task_id, supervisor_task_id, patch_data, base_sha, }) => { tracing::info!( task_id = %task_id, supervisor_task_id = %supervisor_task_id, base_sha = %base_sha, "Received cross-daemon merge patch request" ); // Look up the supervisor task to find which daemon it's on if let Some(ref pool) = state.db_pool { match repository::get_task(pool, supervisor_task_id).await { Ok(Some(supervisor_task)) => { if let Some(target_daemon_id) = supervisor_task.daemon_id { // Send ApplyPatchToWorktree command to the supervisor's daemon let command = crate::server::state::DaemonCommand::ApplyPatchToWorktree { target_task_id: supervisor_task_id, source_task_id: task_id, patch_data, base_sha, }; match state.send_daemon_command(target_daemon_id, command).await { Ok(()) => { tracing::info!( task_id = %task_id, supervisor_task_id = %supervisor_task_id, target_daemon_id = %target_daemon_id, "Routed patch merge to supervisor's daemon" ); } Err(e) => { tracing::error!( task_id = %task_id, supervisor_task_id = %supervisor_task_id, target_daemon_id = %target_daemon_id, error = %e, "Failed to route patch merge to supervisor's daemon" ); } } } else { tracing::warn!( task_id = %task_id, supervisor_task_id = %supervisor_task_id, "Supervisor task has no daemon assigned, cannot route patch merge" ); } } Ok(None) => { tracing::warn!( task_id = %task_id, supervisor_task_id = %supervisor_task_id, "Supervisor task not found, cannot route patch merge" ); } Err(e) => { tracing::error!( task_id = %task_id, supervisor_task_id = %supervisor_task_id, error = %e, "Failed to look up supervisor task for patch merge" ); } } } } Ok(DaemonMessage::WorktreeDiffResult { task_id, success, diff, error }) => { tracing::debug!( task_id = %task_id, success = success, "Worktree diff result received" ); // Fulfill pending worktree diff request if any if let Some((_, tx)) = state.pending_worktree_diff.remove(&task_id) { let _ = tx.send(crate::server::state::WorktreeDiffResponse { task_id, success, diff: diff.unwrap_or_default(), error, }); } } Err(e) => { tracing::warn!("Failed to parse daemon message: {}", e); } } } Some(Ok(Message::Close(_))) | None => { tracing::info!("Daemon {} disconnected", daemon_uuid); break; } Some(Err(e)) => { tracing::warn!("Daemon {} WebSocket error: {}", daemon_uuid, e); break; } _ => {} } } // Handle commands to send to daemon cmd = cmd_rx.recv() => { match cmd { Some(command) => { let json = serde_json::to_string(&command).unwrap(); if sender.send(Message::Text(json.into())).await.is_err() { tracing::warn!("Failed to send command to daemon {}", daemon_uuid); break; } } None => { // Channel closed break; } } } } } // Cleanup on disconnect state.unregister_daemon(&connection_id); // Delete daemon from database and clear tasks if let Some(ref pool) = state.db_pool { let pool = pool.clone(); let conn_id = connection_id.clone(); tokio::spawn(async move { // Delete daemon from database if let Err(e) = repository::delete_daemon_by_connection(&pool, &conn_id).await { tracing::error!( connection_id = %conn_id, error = %e, "Failed to delete daemon from database" ); } // Find tasks assigned to this daemon and mark for retry or fail permanently if let Err(e) = handle_daemon_disconnect_tasks(&pool, daemon_uuid).await { tracing::error!( daemon_id = %daemon_uuid, error = %e, "Failed to handle daemon disconnect for tasks" ); } }); } } /// Handle tasks when daemon disconnects - mark for retry or fail permanently. async fn handle_daemon_disconnect_tasks(pool: &sqlx::PgPool, daemon_id: Uuid) -> Result<(), sqlx::Error> { // Get all active tasks on this daemon let active_tasks: Vec = sqlx::query_as( r#" SELECT * FROM tasks WHERE daemon_id = $1 AND status IN ('starting', 'running', 'initializing') "#, ) .bind(daemon_id) .fetch_all(pool) .await?; if active_tasks.is_empty() { return Ok(()); } tracing::info!( daemon_id = %daemon_id, task_count = active_tasks.len(), "Processing tasks for disconnected daemon" ); for task in active_tasks { if task.retry_count < task.max_retries { // Mark for retry match repository::mark_task_for_retry(pool, task.id, daemon_id).await { Ok(Some(updated_task)) => { tracing::info!( task_id = %task.id, task_name = %task.name, retry_count = updated_task.retry_count, max_retries = updated_task.max_retries, "Task marked for retry after daemon disconnect" ); } Ok(None) => { tracing::warn!( task_id = %task.id, "Task not found or already at max retries" ); } Err(e) => { tracing::error!( task_id = %task.id, error = %e, "Failed to mark task for retry" ); } } } else { // Exceeded retries, mark as permanently failed if let Err(e) = repository::mark_task_permanently_failed(pool, task.id, daemon_id).await { tracing::error!( task_id = %task.id, error = %e, "Failed to mark task as permanently failed" ); } else { tracing::warn!( task_id = %task.id, task_name = %task.name, retry_count = task.retry_count + 1, "Task permanently failed: exceeded maximum retries" ); } } } Ok(()) }