//! WebSocket handler for daemon connections.
//!
//! Daemons connect to report task progress, stream output, and receive commands.
//! Each daemon manages Claude Code containers on its local machine.
//!
//! ## Authentication
//!
//! Daemons authenticate via the `X-Api-Key` header in the WebSocket upgrade request.
//! The API key is validated against the database and the daemon is associated with
//! the corresponding owner_id for data isolation.
use axum::{
extract::{ws::Message, ws::WebSocket, State, WebSocketUpgrade},
http::{HeaderMap, StatusCode},
response::{IntoResponse, Response},
};
use base64::Engine;
use chrono::{DateTime, Utc};
use futures::{SinkExt, StreamExt};
use serde::Deserialize;
use sqlx::Row;
use tokio::sync::mpsc;
use uuid::Uuid;
use crate::db::models::Task;
use crate::db::repository;
use crate::llm::{check_deliverables_met, TaskInfo};
use crate::server::auth::{hash_api_key, API_KEY_HEADER};
use crate::server::messages::ApiError;
use crate::server::state::{
DaemonCommand, SharedState, TaskOutputNotification, TaskUpdateNotification,
};
// =============================================================================
// Claude Code JSON Output Parsing
// =============================================================================
/// Claude Code stream-json message structure
#[derive(Debug, Deserialize)]
struct ClaudeMessage {
#[serde(rename = "type")]
msg_type: String,
subtype: Option<String>,
message: Option<ClaudeMessageContent>,
tool_name: Option<String>,
tool_input: Option<serde_json::Value>,
tool_result: Option<ClaudeToolResult>,
result: Option<String>,
cost_usd: Option<f64>,
duration_ms: Option<u64>,
error: Option<String>,
}
#[derive(Debug, Deserialize)]
struct ClaudeMessageContent {
content: Option<Vec<ClaudeContentBlock>>,
}
#[derive(Debug, Deserialize)]
struct ClaudeContentBlock {
#[serde(rename = "type")]
block_type: String,
text: Option<String>,
name: Option<String>,
input: Option<serde_json::Value>,
}
#[derive(Debug, Deserialize)]
struct ClaudeToolResult {
content: Option<String>,
is_error: Option<bool>,
}
/// Parse a line of Claude Code output into a structured notification
fn parse_claude_output(task_id: Uuid, owner_id: Uuid, line: &str, is_partial: bool) -> Option<TaskOutputNotification> {
let trimmed = line.trim();
if trimmed.is_empty() {
return None;
}
// Try to parse as JSON
if trimmed.starts_with('{') {
if let Ok(msg) = serde_json::from_str::<ClaudeMessage>(trimmed) {
return parse_claude_message(task_id, owner_id, msg, is_partial);
}
}
// Not JSON or failed to parse - treat as raw output
Some(TaskOutputNotification {
task_id,
owner_id: Some(owner_id),
message_type: "raw".to_string(),
content: trimmed.to_string(),
tool_name: None,
tool_input: None,
is_error: None,
cost_usd: None,
duration_ms: None,
is_partial,
})
}
fn parse_claude_message(task_id: Uuid, owner_id: Uuid, msg: ClaudeMessage, is_partial: bool) -> Option<TaskOutputNotification> {
match msg.msg_type.as_str() {
"system" => {
// System messages (init, etc.) - include subtype info
let content = match msg.subtype.as_deref() {
Some("init") => "Session started".to_string(),
Some(sub) => format!("System: {}", sub),
None => "System message".to_string(),
};
Some(TaskOutputNotification {
task_id,
owner_id: Some(owner_id),
message_type: "system".to_string(),
content,
tool_name: None,
tool_input: None,
is_error: None,
cost_usd: None,
duration_ms: None,
is_partial,
})
}
"assistant" => {
// Extract text content from message blocks
if let Some(message) = msg.message {
if let Some(blocks) = message.content {
// Check for text blocks
let text_content: Vec<String> = blocks
.iter()
.filter(|b| b.block_type == "text")
.filter_map(|b| b.text.clone())
.collect();
if !text_content.is_empty() {
return Some(TaskOutputNotification {
task_id,
owner_id: Some(owner_id),
message_type: "assistant".to_string(),
content: text_content.join("\n"),
tool_name: None,
tool_input: None,
is_error: None,
cost_usd: None,
duration_ms: None,
is_partial,
});
}
// Check for tool_use blocks
if let Some(tool_block) = blocks.iter().find(|b| b.block_type == "tool_use") {
return Some(TaskOutputNotification {
task_id,
owner_id: Some(owner_id),
message_type: "tool_use".to_string(),
content: format!("Using tool: {}", tool_block.name.as_deref().unwrap_or("unknown")),
tool_name: tool_block.name.clone(),
tool_input: tool_block.input.clone(),
is_error: None,
cost_usd: None,
duration_ms: None,
is_partial,
});
}
}
}
None
}
"tool_use" => {
Some(TaskOutputNotification {
task_id,
owner_id: Some(owner_id),
message_type: "tool_use".to_string(),
content: format!("Using tool: {}", msg.tool_name.as_deref().unwrap_or("unknown")),
tool_name: msg.tool_name,
tool_input: msg.tool_input,
is_error: None,
cost_usd: None,
duration_ms: None,
is_partial,
})
}
"tool_result" => {
if let Some(result) = msg.tool_result {
let content = result.content.unwrap_or_default();
// Truncate long results
let content = if content.len() > 500 {
format!("{}...", &content[..500])
} else {
content
};
Some(TaskOutputNotification {
task_id,
owner_id: Some(owner_id),
message_type: "tool_result".to_string(),
content,
tool_name: None,
tool_input: None,
is_error: result.is_error,
cost_usd: None,
duration_ms: None,
is_partial,
})
} else {
None
}
}
"result" => {
Some(TaskOutputNotification {
task_id,
owner_id: Some(owner_id),
message_type: "result".to_string(),
content: msg.result.unwrap_or_else(|| "Task completed".to_string()),
tool_name: None,
tool_input: None,
is_error: None,
cost_usd: msg.cost_usd,
duration_ms: msg.duration_ms,
is_partial,
})
}
"error" => {
Some(TaskOutputNotification {
task_id,
owner_id: Some(owner_id),
message_type: "error".to_string(),
content: msg.error.unwrap_or_else(|| "An error occurred".to_string()),
tool_name: None,
tool_input: None,
is_error: Some(true),
cost_usd: None,
duration_ms: None,
is_partial,
})
}
_ => None, // Skip unknown message types
}
}
/// Message from daemon to server.
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum DaemonMessage {
/// Authentication request (first message required)
Authenticate {
#[serde(rename = "apiKey")]
api_key: String,
#[serde(rename = "machineId")]
machine_id: String,
hostname: String,
#[serde(rename = "maxConcurrentTasks")]
max_concurrent_tasks: i32,
},
/// Periodic heartbeat with current status
Heartbeat {
#[serde(rename = "activeTasks")]
active_tasks: Vec<Uuid>,
},
/// Enhanced supervisor heartbeat with detailed state
SupervisorHeartbeat {
#[serde(rename = "taskId")]
task_id: Uuid,
#[serde(rename = "contractId")]
contract_id: Uuid,
/// Supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted
state: String,
/// Current contract phase
phase: String,
/// Description of current activity
#[serde(rename = "currentActivity")]
current_activity: Option<String>,
/// Progress percentage (0-100)
progress: u8,
/// Task IDs the supervisor is waiting on
#[serde(rename = "pendingTaskIds")]
pending_task_ids: Vec<Uuid>,
/// Timestamp of this heartbeat
timestamp: DateTime<Utc>,
},
/// Task output streaming (stdout/stderr from Claude Code)
TaskOutput {
#[serde(rename = "taskId")]
task_id: Uuid,
output: String,
#[serde(rename = "isPartial")]
is_partial: bool,
},
/// Task status change notification
TaskStatusChange {
#[serde(rename = "taskId")]
task_id: Uuid,
#[serde(rename = "oldStatus")]
old_status: String,
#[serde(rename = "newStatus")]
new_status: String,
},
/// Task progress update with summary
TaskProgress {
#[serde(rename = "taskId")]
task_id: Uuid,
summary: String,
},
/// Task completion notification
TaskComplete {
#[serde(rename = "taskId")]
task_id: Uuid,
success: bool,
error: Option<String>,
},
/// Task recovery detected after daemon restart
TaskRecoveryDetected {
#[serde(rename = "taskId")]
task_id: Uuid,
#[serde(rename = "previousState")]
previous_state: String,
#[serde(rename = "worktreeIntact")]
worktree_intact: bool,
#[serde(rename = "worktreePath")]
worktree_path: Option<String>,
#[serde(rename = "needsPatch")]
needs_patch: bool,
},
/// Register a tool key for orchestrator API access
RegisterToolKey {
#[serde(rename = "taskId")]
task_id: Uuid,
/// The API key for this orchestrator to use when calling mesh endpoints
key: String,
},
/// Revoke a tool key when task completes
RevokeToolKey {
#[serde(rename = "taskId")]
task_id: Uuid,
},
/// Authentication required - OAuth token expired, provides login URL
AuthenticationRequired {
/// Task ID that triggered the auth error (if any)
#[serde(rename = "taskId")]
task_id: Option<Uuid>,
/// OAuth login URL for remote authentication
#[serde(rename = "loginUrl")]
login_url: String,
/// Hostname of the daemon requiring auth
hostname: Option<String>,
},
/// Reauth status update (response to TriggerReauth/SubmitAuthCode commands)
ReauthStatus {
#[serde(rename = "requestId")]
request_id: Uuid,
/// Status: "pending", "url_ready", "completed", "failed"
status: String,
/// OAuth login URL (present when status is "url_ready")
#[serde(rename = "loginUrl")]
login_url: Option<String>,
/// Error message (present when status is "failed")
error: Option<String>,
/// Whether the OAuth token has been saved to disk
#[serde(rename = "tokenSaved", default)]
token_saved: bool,
},
/// Response to RetryCompletionAction command
CompletionActionResult {
#[serde(rename = "taskId")]
task_id: Uuid,
success: bool,
message: String,
/// PR URL if action was "pr" and successful
#[serde(rename = "prUrl")]
pr_url: Option<String>,
},
/// Report daemon's available directories for task output
DaemonDirectories {
/// Current working directory of the daemon
#[serde(rename = "workingDirectory")]
working_directory: String,
/// Path to ~/.makima/home directory (for cloning completed work)
#[serde(rename = "homeDirectory")]
home_directory: String,
/// Path to worktrees directory (~/.makima/worktrees)
#[serde(rename = "worktreesDirectory")]
worktrees_directory: String,
},
/// Response to CloneWorktree command
CloneWorktreeResult {
#[serde(rename = "taskId")]
task_id: Uuid,
success: bool,
message: String,
/// The path where the worktree was cloned
#[serde(rename = "targetDir")]
target_dir: Option<String>,
},
/// Response to CheckTargetExists command
CheckTargetExistsResult {
#[serde(rename = "taskId")]
task_id: Uuid,
/// Whether the target directory exists
exists: bool,
/// The path that was checked
#[serde(rename = "targetDir")]
target_dir: String,
},
/// Response to ReadRepoFile command
RepoFileContent {
/// Request ID from the original command
#[serde(rename = "requestId")]
request_id: Uuid,
/// Path to the file that was read
#[serde(rename = "filePath")]
file_path: String,
/// File content (None if error occurred)
content: Option<String>,
/// Whether the operation succeeded
success: bool,
/// Error message if operation failed
error: Option<String>,
},
/// Notification that a branch was created
BranchCreated {
#[serde(rename = "taskId")]
task_id: Option<Uuid>,
/// Name of the branch that was created
#[serde(rename = "branchName")]
branch_name: String,
/// Whether the operation succeeded
success: bool,
/// Error message if operation failed
error: Option<String>,
},
/// Notification that a checkpoint was created
CheckpointCreated {
#[serde(rename = "taskId")]
task_id: Uuid,
/// Whether the operation succeeded
success: bool,
/// Commit SHA if successful
#[serde(rename = "commitSha")]
commit_sha: Option<String>,
/// Branch name where checkpoint was created
#[serde(rename = "branchName")]
branch_name: Option<String>,
/// Checkpoint number in sequence
#[serde(rename = "checkpointNumber")]
checkpoint_number: Option<i32>,
/// Files changed in this checkpoint
#[serde(rename = "filesChanged")]
files_changed: Option<serde_json::Value>,
/// Lines added
#[serde(rename = "linesAdded")]
lines_added: Option<i32>,
/// Lines removed
#[serde(rename = "linesRemoved")]
lines_removed: Option<i32>,
/// Error message if operation failed
error: Option<String>,
/// User-provided checkpoint message
message: String,
/// Base64-encoded gzip-compressed patch data for recovery
#[serde(rename = "patchData", skip_serializing_if = "Option::is_none")]
patch_data: Option<String>,
/// Commit SHA to apply patch on top of (for recovery)
#[serde(rename = "patchBaseSha", skip_serializing_if = "Option::is_none")]
patch_base_sha: Option<String>,
/// Number of files in the patch
#[serde(rename = "patchFilesCount", skip_serializing_if = "Option::is_none")]
patch_files_count: Option<i32>,
},
/// Notification that git config was inherited
GitConfigInherited {
/// Whether the operation succeeded
success: bool,
/// Git user.email that was inherited
#[serde(rename = "userEmail")]
user_email: Option<String>,
/// Git user.name that was inherited
#[serde(rename = "userName")]
user_name: Option<String>,
/// Error message if operation failed
error: Option<String>,
},
/// Response to CreateExportPatch command
ExportPatchCreated {
#[serde(rename = "taskId")]
task_id: Uuid,
success: bool,
/// The uncompressed, human-readable patch content
#[serde(rename = "patchContent")]
patch_content: Option<String>,
/// Number of files changed
#[serde(rename = "filesCount")]
files_count: Option<usize>,
/// Lines added
#[serde(rename = "linesAdded")]
lines_added: Option<usize>,
/// Lines removed
#[serde(rename = "linesRemoved")]
lines_removed: Option<usize>,
/// The base commit SHA that the patch is diffed against
#[serde(rename = "baseCommitSha")]
base_commit_sha: Option<String>,
/// Error message if failed
error: Option<String>,
},
/// Response to MergeTaskToTarget command
MergeToTargetResult {
#[serde(rename = "taskId")]
task_id: Uuid,
success: bool,
message: String,
#[serde(rename = "commitSha")]
commit_sha: Option<String>,
conflicts: Option<Vec<String>>,
},
/// Response to CreatePR command
#[serde(rename = "prCreated")]
PRCreated {
#[serde(rename = "taskId")]
task_id: Uuid,
success: bool,
message: String,
#[serde(rename = "prUrl")]
pr_url: Option<String>,
#[serde(rename = "prNumber")]
pr_number: Option<i32>,
},
/// Response to GetWorktreeDiff command
WorktreeDiffResult {
#[serde(rename = "taskId")]
task_id: Uuid,
success: bool,
diff: Option<String>,
error: Option<String>,
},
/// Response to GetWorktreeInfo command
WorktreeInfoResult {
#[serde(rename = "taskId")]
task_id: Uuid,
success: bool,
/// Path to the worktree directory
#[serde(rename = "worktreePath")]
worktree_path: Option<String>,
/// Whether the worktree exists
exists: bool,
/// Number of files changed
#[serde(rename = "filesChanged")]
files_changed: i32,
/// Total lines inserted
insertions: i32,
/// Total lines deleted
deletions: i32,
/// Changed files list
files: Option<serde_json::Value>,
/// Current branch name
branch: Option<String>,
/// Current HEAD commit SHA
#[serde(rename = "headSha")]
head_sha: Option<String>,
/// Error message if failed
error: Option<String>,
},
/// Response to GetTaskDiff command
TaskDiff {
#[serde(rename = "taskId")]
task_id: Uuid,
success: bool,
diff: Option<String>,
error: Option<String>,
},
/// Response to CommitWorktree command
WorktreeCommitResult {
#[serde(rename = "taskId")]
task_id: Uuid,
success: bool,
#[serde(rename = "commitSha")]
commit_sha: Option<String>,
error: Option<String>,
},
/// Request to merge a task's patch to supervisor's worktree (cross-daemon case).
/// Sent when a task completes on a different daemon than its supervisor.
MergePatchToSupervisor {
/// The task that completed.
#[serde(rename = "taskId")]
task_id: Uuid,
/// The supervisor task to merge into.
#[serde(rename = "supervisorTaskId")]
supervisor_task_id: Uuid,
/// Base64-gzipped patch data.
#[serde(rename = "patchData")]
patch_data: String,
/// Base commit SHA for the patch.
#[serde(rename = "baseSha")]
base_sha: String,
},
}
/// Validated daemon authentication result.
#[derive(Debug, Clone)]
struct DaemonAuthResult {
/// User ID from the API key
user_id: Uuid,
/// Owner ID for data isolation
owner_id: Uuid,
}
/// Compute an action directive for the supervisor based on deliverable status.
/// Returns an [ACTION REQUIRED] message if all deliverables are met.
async fn compute_action_directive(
pool: &sqlx::PgPool,
contract_id: Uuid,
owner_id: Uuid,
) -> Option<String> {
// Get contract
let contract = match repository::get_contract_for_owner(pool, contract_id, owner_id).await {
Ok(Some(c)) => c,
_ => return None,
};
// Get tasks (non-supervisor only)
let tasks = match repository::list_tasks_by_contract(pool, contract_id, owner_id).await {
Ok(t) => t.into_iter().filter(|t| !t.is_supervisor).collect::<Vec<_>>(),
_ => return None,
};
// Get repositories
let repos = match repository::list_contract_repositories(pool, contract_id).await {
Ok(r) => r,
_ => return None,
};
// Get completed deliverables for the current phase
let completed_deliverables = contract.get_completed_deliverables(&contract.phase);
let task_infos: Vec<TaskInfo> = tasks
.iter()
.map(|t| TaskInfo {
name: t.name.clone(),
status: t.status.clone(),
})
.collect();
let has_repository = !repos.is_empty();
// Check deliverables (unused, but kept for future reference)
let _check = check_deliverables_met(
&contract.phase,
&contract.contract_type,
&completed_deliverables,
&task_infos,
has_repository,
);
// Generate directive based on deliverable status
if contract.phase == "execute" {
// Check if all tasks are done but PR deliverable is not marked complete
let all_tasks_done = !task_infos.is_empty()
&& task_infos.iter().all(|t| t.status == "done");
let pr_deliverable_complete = completed_deliverables.contains(&"pull-request".to_string());
if all_tasks_done && !pr_deliverable_complete {
let done_count = task_infos.len();
return Some(format!(
"[INFO] All {} task(s) completed. System is auto-creating PR.",
done_count
));
}
}
None
}
/// Automatically create a PR when all non-supervisor tasks for a contract are done.
/// Only applies to remote-repo contracts in the "execute" phase.
/// Fires as a best-effort operation — errors are logged but not propagated.
async fn auto_create_pr_if_ready(
pool: &sqlx::PgPool,
state: &SharedState,
contract_id: Uuid,
owner_id: Uuid,
) {
// 1. Load contract — must be remote (not local_only) and in execute phase
let contract = match repository::get_contract_for_owner(pool, contract_id, owner_id).await {
Ok(Some(c)) => c,
_ => return,
};
if contract.local_only || contract.phase != "execute" {
return;
}
// 2. Load non-supervisor tasks — all must be done
let tasks = match repository::list_tasks_by_contract(pool, contract_id, owner_id).await {
Ok(t) => t,
_ => return,
};
let non_supervisor_tasks: Vec<_> = tasks.iter().filter(|t| !t.is_supervisor).collect();
if non_supervisor_tasks.is_empty() || !non_supervisor_tasks.iter().all(|t| t.status == "done") {
return;
}
// 3. Check pull-request deliverable not already complete
let completed_deliverables = contract.get_completed_deliverables(&contract.phase);
if completed_deliverables.contains(&"pull-request".to_string()) {
return;
}
// 4. Check at least one repository has a remote URL
let repos = match repository::list_contract_repositories(pool, contract_id).await {
Ok(r) => r,
_ => return,
};
if !repos.iter().any(|r| r.repository_url.is_some()) {
return;
}
// 5. Load supervisor task
let supervisor = match repository::get_contract_supervisor_task(pool, contract_id).await {
Ok(Some(s)) => s,
_ => return,
};
// Need supervisor's daemon_id to send command
let daemon_id = match supervisor.daemon_id {
Some(id) => id,
None => return,
};
// 6. Construct branch name
let sanitized_name: String = supervisor
.name
.chars()
.map(|c| if c.is_alphanumeric() || c == '-' || c == '_' { c } else { '-' })
.collect::<String>()
.to_lowercase();
let short_id = &supervisor.id.to_string()[..8];
let branch = format!("makima/{}-{}", sanitized_name, short_id);
// 7. Send CreatePR command to supervisor's daemon
let command = DaemonCommand::CreatePR {
task_id: supervisor.id,
title: contract.name.clone(),
body: contract.description.clone(),
base_branch: supervisor.base_branch.clone(),
branch,
};
match state.send_daemon_command(daemon_id, command).await {
Ok(()) => {
tracing::info!(
contract_id = %contract_id,
supervisor_id = %supervisor.id,
"Auto-PR: sent CreatePR command to supervisor daemon"
);
}
Err(e) => {
tracing::warn!(
contract_id = %contract_id,
error = %e,
"Auto-PR: failed to send CreatePR command"
);
}
}
}
/// Validate an API key and return (user_id, owner_id).
async fn validate_daemon_api_key(pool: &sqlx::PgPool, key: &str) -> Result<DaemonAuthResult, String> {
let key_hash = hash_api_key(key);
// Look up the API key and join with users to get owner_id
let row = sqlx::query(
r#"
SELECT ak.user_id, u.default_owner_id
FROM api_keys ak
JOIN users u ON u.id = ak.user_id
WHERE ak.key_hash = $1 AND ak.revoked_at IS NULL
"#,
)
.bind(&key_hash)
.fetch_optional(pool)
.await
.map_err(|e| format!("Database error: {}", e))?;
match row {
Some(row) => {
let user_id: Uuid = row.try_get("user_id")
.map_err(|e| format!("Failed to get user_id: {}", e))?;
let owner_id: Option<Uuid> = row.try_get("default_owner_id")
.map_err(|e| format!("Failed to get owner_id: {}", e))?;
let owner_id = owner_id.ok_or_else(|| "User has no default owner".to_string())?;
// Update last_used_at asynchronously (fire and forget)
let pool_clone = pool.clone();
let key_hash_clone = key_hash.clone();
tokio::spawn(async move {
let _ = sqlx::query("UPDATE api_keys SET last_used_at = NOW() WHERE key_hash = $1")
.bind(&key_hash_clone)
.execute(&pool_clone)
.await;
});
Ok(DaemonAuthResult { user_id, owner_id })
}
None => Err("Invalid or revoked API key".to_string()),
}
}
/// WebSocket upgrade handler for daemon connections.
///
/// Daemons must authenticate via the `X-Api-Key` header in the WebSocket upgrade request.
/// The API key is validated against the database and used to determine the owner_id
/// for data isolation.
#[utoipa::path(
get,
path = "/api/v1/mesh/daemons/connect",
params(
("X-Api-Key" = String, Header, description = "API key for daemon authentication"),
),
responses(
(status = 101, description = "WebSocket connection established"),
(status = 401, description = "Missing or invalid API key"),
(status = 503, description = "Database not configured"),
),
tag = "Mesh"
)]
pub async fn daemon_handler(
ws: WebSocketUpgrade,
State(state): State<SharedState>,
headers: HeaderMap,
) -> Response {
// Extract API key from headers
let api_key = match headers.get(API_KEY_HEADER).or_else(|| headers.get("x-api-key")) {
Some(value) => match value.to_str() {
Ok(key) if !key.is_empty() => key.to_string(),
_ => {
return (
StatusCode::UNAUTHORIZED,
axum::Json(ApiError::new("INVALID_API_KEY", "Invalid API key header value")),
)
.into_response();
}
},
None => {
return (
StatusCode::UNAUTHORIZED,
axum::Json(ApiError::new("MISSING_API_KEY", "X-Api-Key header required")),
)
.into_response();
}
};
// Validate API key against database
let pool = match state.db_pool.as_ref() {
Some(pool) => pool,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
axum::Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
}
};
let auth_result = match validate_daemon_api_key(pool, &api_key).await {
Ok(result) => result,
Err(e) => {
tracing::warn!("Daemon authentication failed: {}", e);
return (
StatusCode::UNAUTHORIZED,
axum::Json(ApiError::new("AUTH_FAILED", e)),
)
.into_response();
}
};
tracing::info!(
user_id = %auth_result.user_id,
owner_id = %auth_result.owner_id,
"Daemon authenticated via API key"
);
ws.on_upgrade(move |socket| handle_daemon_connection(socket, state, auth_result))
}
async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_result: DaemonAuthResult) {
let (mut sender, mut receiver) = socket.split();
// Generate a unique connection ID (daemon ID will come from database)
let connection_id = Uuid::new_v4().to_string();
let mut daemon_id: Option<Uuid> = None;
let owner_id = auth_result.owner_id;
// Create command channel for sending commands to this daemon
let (cmd_tx, mut cmd_rx) = mpsc::channel::<DaemonCommand>(64);
// Wait for the daemon to send its registration info (hostname, machine_id, etc.)
// The daemon is already authenticated via API key header, but we need metadata
#[allow(unused_assignments)]
let mut registered = false;
// Wait for registration message with metadata
loop {
tokio::select! {
msg = receiver.next() => {
match msg {
Some(Ok(Message::Text(text))) => {
match serde_json::from_str::<DaemonMessage>(&text) {
Ok(DaemonMessage::Authenticate { api_key: _, machine_id, hostname, max_concurrent_tasks }) => {
// API key was already validated via headers, but we use this message
// for backward compatibility to get the machine_id and hostname
// Register daemon in database first to get the persistent ID
let db_daemon_id = if let Some(ref pool) = state.db_pool {
match repository::register_daemon(
pool,
owner_id,
&connection_id,
Some(&hostname),
Some(&machine_id),
max_concurrent_tasks as i32,
).await {
Ok(db_daemon) => {
tracing::info!(
daemon_id = %db_daemon.id,
owner_id = %owner_id,
hostname = %hostname,
machine_id = %machine_id,
max_concurrent_tasks = max_concurrent_tasks,
"Daemon registered in database"
);
Some(db_daemon.id)
}
Err(e) => {
tracing::error!(
error = %e,
"Failed to register daemon in database"
);
None
}
}
} else {
None
};
// If database registration failed, we can't proceed
let Some(actual_daemon_id) = db_daemon_id else {
tracing::error!("Cannot proceed without database daemon ID");
break;
};
daemon_id = Some(actual_daemon_id);
// Register daemon in state with owner_id using database ID
state.register_daemon(
connection_id.clone(),
actual_daemon_id,
owner_id,
Some(hostname),
Some(machine_id),
cmd_tx.clone(),
);
registered = true;
// Send authentication confirmation with database ID
let response = DaemonCommand::Authenticated { daemon_id: actual_daemon_id };
let json = serde_json::to_string(&response).unwrap();
if sender.send(Message::Text(json.into())).await.is_err() {
break;
}
break; // Exit registration loop, continue to main loop
}
Ok(_) => {
// Non-auth message before registration - still requires registration message
let response = DaemonCommand::Error {
code: "NOT_REGISTERED".into(),
message: "Must send registration message (Authenticate) first".into(),
};
let json = serde_json::to_string(&response).unwrap();
let _ = sender.send(Message::Text(json.into())).await;
}
Err(e) => {
let response = DaemonCommand::Error {
code: "PARSE_ERROR".into(),
message: e.to_string(),
};
let json = serde_json::to_string(&response).unwrap();
let _ = sender.send(Message::Text(json.into())).await;
}
}
}
Some(Ok(Message::Close(_))) | None => {
tracing::debug!("Daemon disconnected during registration");
return;
}
Some(Err(e)) => {
tracing::warn!("Daemon WebSocket error during registration: {}", e);
return;
}
_ => {}
}
}
}
}
if !registered {
return;
}
// daemon_id is guaranteed to be Some here since registered is true
let daemon_uuid = daemon_id.expect("daemon_id should be set when registered is true");
// Main message loop after authentication
loop {
tokio::select! {
// Handle incoming messages from daemon
msg = receiver.next() => {
match msg {
Some(Ok(Message::Text(text))) => {
match serde_json::from_str::<DaemonMessage>(&text) {
Ok(DaemonMessage::Heartbeat { active_tasks }) => {
tracing::trace!(
"Daemon {} heartbeat: {} active tasks",
daemon_uuid, active_tasks.len()
);
// Update daemon last_heartbeat_at in DB
if let Some(ref pool) = state.db_pool {
let pool = pool.clone();
let daemon_id = daemon_uuid;
tokio::spawn(async move {
if let Err(e) = repository::update_daemon_heartbeat(&pool, daemon_id).await {
tracing::warn!(
daemon_id = %daemon_id,
error = %e,
"Failed to update daemon heartbeat"
);
}
});
}
}
Ok(DaemonMessage::SupervisorHeartbeat {
task_id,
contract_id,
state: supervisor_state,
phase,
current_activity,
progress,
pending_task_ids,
timestamp: _,
}) => {
tracing::debug!(
task_id = %task_id,
contract_id = %contract_id,
state = %supervisor_state,
phase = %phase,
progress = progress,
"Supervisor heartbeat received"
);
// Store heartbeat in database and update supervisor state (Task 3.3)
if let Some(ref pool) = state.db_pool {
let pool = pool.clone();
let pending_ids = pending_task_ids.clone();
let activity = current_activity.clone();
let state_str = supervisor_state.clone();
let phase_str = phase.clone();
tokio::spawn(async move {
// Store the heartbeat record
if let Err(e) = repository::create_supervisor_heartbeat(
&pool,
task_id,
contract_id,
&state_str,
&phase_str,
activity.as_deref(),
progress as i32,
&pending_ids,
).await {
tracing::warn!(
task_id = %task_id,
contract_id = %contract_id,
error = %e,
"Failed to store supervisor heartbeat"
);
}
// Update supervisor_states table (lightweight heartbeat state update - Task 3.3)
if let Err(e) = repository::update_supervisor_heartbeat_state(
&pool,
contract_id,
&state_str,
activity.as_deref(),
progress as i32,
&pending_ids,
).await {
tracing::debug!(
contract_id = %contract_id,
error = %e,
"Failed to update supervisor state from heartbeat (may not exist yet)"
);
}
// Also update the daemon heartbeat
if let Ok(Some(task)) = repository::get_task(&pool, task_id).await {
if let Some(daemon_id) = task.daemon_id {
if let Err(e) = repository::update_daemon_heartbeat(&pool, daemon_id).await {
tracing::warn!(
daemon_id = %daemon_id,
error = %e,
"Failed to update daemon heartbeat from supervisor"
);
}
}
}
});
}
}
Ok(DaemonMessage::TaskOutput { task_id, output, is_partial }) => {
// Parse the output line and broadcast structured data
if let Some(notification) = parse_claude_output(task_id, owner_id, &output, is_partial) {
// Broadcast to connected clients
state.broadcast_task_output(notification.clone());
// Persist to database (fire and forget)
if let Some(ref pool) = state.db_pool {
let pool = pool.clone();
let notification = notification.clone();
tokio::spawn(async move {
if let Err(e) = repository::save_task_output(
&pool,
notification.task_id,
¬ification.message_type,
¬ification.content,
notification.tool_name.as_deref(),
notification.tool_input.clone(),
notification.is_error,
notification.cost_usd,
notification.duration_ms,
).await {
tracing::warn!(
task_id = %notification.task_id,
"Failed to persist task output: {}",
e
);
}
});
}
}
}
Ok(DaemonMessage::TaskStatusChange { task_id, old_status, new_status }) => {
tracing::info!(
"Task {} status change: {} -> {}",
task_id, old_status, new_status
);
// Update task status in database and broadcast
if let Some(ref pool) = state.db_pool {
let pool = pool.clone();
let state = state.clone();
let new_status_owned = new_status.clone();
tokio::spawn(async move {
match repository::update_task_status(
&pool,
task_id,
&new_status_owned,
None,
).await {
Ok(Some(updated_task)) => {
state.broadcast_task_update(TaskUpdateNotification {
task_id,
owner_id: Some(owner_id),
version: updated_task.version,
status: new_status_owned.clone(),
updated_fields: vec!["status".into()],
updated_by: "daemon".into(),
});
// Initialize or restore supervisor_state when supervisor task starts running (Task 3.4)
if updated_task.is_supervisor && new_status_owned == "running" {
if let Some(contract_id) = updated_task.contract_id {
// Check if supervisor state already exists (restoration scenario)
match repository::get_supervisor_state(&pool, contract_id).await {
Ok(Some(existing_state)) => {
// State exists - this is a restoration
tracing::info!(
task_id = %task_id,
contract_id = %contract_id,
existing_state = %existing_state.state,
restoration_count = existing_state.restoration_count,
"Supervisor starting with existing state - restoration in progress"
);
// Mark as restored (increments restoration_count)
match repository::mark_supervisor_restored(
&pool,
contract_id,
"daemon_restart",
).await {
Ok(restored_state) => {
tracing::info!(
task_id = %task_id,
contract_id = %contract_id,
restoration_count = restored_state.restoration_count,
"Supervisor restoration marked"
);
// Check for pending questions to re-deliver
if let Ok(questions) = serde_json::from_value::<Vec<crate::db::models::PendingQuestion>>(
restored_state.pending_questions.clone()
) {
if !questions.is_empty() {
tracing::info!(
contract_id = %contract_id,
question_count = questions.len(),
"Pending questions found for re-delivery"
);
// Questions will be re-delivered by the supervisor when it restores
}
}
}
Err(e) => {
tracing::warn!(
task_id = %task_id,
contract_id = %contract_id,
error = %e,
"Failed to mark supervisor as restored"
);
}
}
}
Ok(None) => {
// No existing state - fresh start
// Get contract to get its phase
match repository::get_contract_for_owner(
&pool,
contract_id,
updated_task.owner_id,
).await {
Ok(Some(contract)) => {
match repository::upsert_supervisor_state(
&pool,
contract_id,
task_id,
serde_json::json!([]), // Empty conversation
&[], // No pending tasks
&contract.phase,
).await {
Ok(_) => {
tracing::info!(
task_id = %task_id,
contract_id = %contract_id,
phase = %contract.phase,
"Initialized fresh supervisor state"
);
}
Err(e) => {
tracing::warn!(
task_id = %task_id,
contract_id = %contract_id,
error = %e,
"Failed to initialize supervisor state"
);
}
}
}
Ok(None) => {
tracing::warn!(
task_id = %task_id,
contract_id = %contract_id,
"Contract not found when initializing supervisor state"
);
}
Err(e) => {
tracing::warn!(
task_id = %task_id,
contract_id = %contract_id,
error = %e,
"Failed to get contract for supervisor state"
);
}
}
}
Err(e) => {
tracing::warn!(
task_id = %task_id,
contract_id = %contract_id,
error = %e,
"Failed to check existing supervisor state"
);
}
}
}
}
// Record history event when task starts running
if new_status_owned == "running" {
let _ = repository::record_history_event(
&pool,
updated_task.owner_id,
updated_task.contract_id,
Some(task_id),
"task",
Some("started"),
None,
serde_json::json!({
"name": &updated_task.name,
"isSupervisor": updated_task.is_supervisor,
}),
).await;
}
}
Ok(None) => {
tracing::warn!(
task_id = %task_id,
"Task not found when updating status"
);
}
Err(e) => {
tracing::error!(
task_id = %task_id,
"Failed to update task status: {}",
e
);
}
}
});
} else {
// No DB, just broadcast
state.broadcast_task_update(TaskUpdateNotification {
task_id,
owner_id: Some(owner_id),
version: 0,
status: new_status,
updated_fields: vec!["status".into()],
updated_by: "daemon".into(),
});
}
}
Ok(DaemonMessage::TaskProgress { task_id, summary }) => {
tracing::debug!("Task {} progress: {}", task_id, summary);
// TODO: Update task progress_summary in database
state.broadcast_task_update(TaskUpdateNotification {
task_id,
owner_id: Some(owner_id),
version: 0,
status: "running".into(),
updated_fields: vec!["progress_summary".into()],
updated_by: "daemon".into(),
});
}
Ok(DaemonMessage::TaskComplete { task_id, success, error }) => {
let status = if success { "done" } else { "failed" };
tracing::info!(
"Task {} completed: success={}, error={:?}",
task_id, success, error
);
// Revoke any tool keys for this task
state.revoke_tool_key(task_id);
// Update task in database with completion info
if let Some(ref pool) = state.db_pool {
let pool = pool.clone();
let state = state.clone();
let error_clone = error.clone();
tokio::spawn(async move {
match repository::complete_task(
&pool,
task_id,
success,
error_clone.as_deref(),
).await {
Ok(Some(updated_task)) => {
state.broadcast_task_update(TaskUpdateNotification {
task_id,
owner_id: Some(owner_id),
version: updated_task.version,
status: updated_task.status.clone(),
updated_fields: vec![
"status".into(),
"completed_at".into(),
"error_message".into(),
],
updated_by: "daemon".into(),
});
// Notify supervisor if this task belongs to a contract
if let Some(contract_id) = updated_task.contract_id {
// Don't notify for supervisor tasks (they don't report to themselves)
if !updated_task.is_supervisor {
if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await {
// Compute action directive if task completed successfully
let action_directive = if updated_task.status == "done" {
compute_action_directive(&pool, contract_id, owner_id).await
} else {
None
};
state.notify_supervisor_of_task_completion(
supervisor.id,
supervisor.daemon_id,
updated_task.id,
&updated_task.name,
&updated_task.status,
updated_task.progress_summary.as_deref(),
updated_task.error_message.as_deref(),
action_directive.as_deref(),
).await;
}
}
}
// Auto-create PR if all tasks are done and repo is remote
if updated_task.status == "done" {
if let Some(contract_id) = updated_task.contract_id {
let pool_c = pool.clone();
let state_c = state.clone();
tokio::spawn(async move {
auto_create_pr_if_ready(&pool_c, &state_c, contract_id, owner_id).await;
});
}
}
// Record history event for task completion
let subtype = if updated_task.status == "done" { "completed" } else { "failed" };
let _ = repository::record_history_event(
&pool,
updated_task.owner_id,
updated_task.contract_id,
Some(task_id),
"task",
Some(subtype),
None,
serde_json::json!({
"name": &updated_task.name,
"status": &updated_task.status,
"error": &updated_task.error_message,
}),
).await;
// Auto-advance directive DAG when a directive step task completes
if let Some(step_id) = updated_task.directive_step_id {
let step_status = if updated_task.status == "done" { "completed" } else { "failed" };
let step_update = crate::db::models::UpdateDirectiveStepRequest {
status: Some(step_status.to_string()),
..Default::default()
};
let _ = repository::update_directive_step(&pool, step_id, step_update).await;
if let Some(directive_id) = updated_task.directive_id {
// Advance newly-ready steps in the DAG
let _ = repository::advance_directive_ready_steps(&pool, directive_id).await;
// Check if all steps are done → set directive to idle
let _ = repository::check_directive_idle(&pool, directive_id).await;
}
} else if let Some(directive_id) = updated_task.directive_id {
// Planning/orchestrator task completed — clear orchestrator_task_id
let _ = repository::clear_orchestrator_task(&pool, directive_id).await;
// Advance DAG — planning task should have created steps
let _ = repository::advance_directive_ready_steps(&pool, directive_id).await;
if updated_task.status != "done" {
// Planning failed — pause directive
let _ = repository::set_directive_status(&pool, updated_task.owner_id, directive_id, "paused").await;
}
}
}
Ok(None) => {
tracing::warn!(
task_id = %task_id,
"Task not found when completing"
);
}
Err(e) => {
tracing::error!(
task_id = %task_id,
"Failed to complete task: {}",
e
);
}
}
});
} else {
// No DB, just broadcast
state.broadcast_task_update(TaskUpdateNotification {
task_id,
owner_id: Some(owner_id),
version: 0,
status: status.into(),
updated_fields: vec!["status".into(), "completed_at".into()],
updated_by: "daemon".into(),
});
}
}
Ok(DaemonMessage::TaskRecoveryDetected {
task_id,
previous_state,
worktree_intact,
worktree_path,
needs_patch,
}) => {
tracing::info!(
task_id = %task_id,
previous_state = %previous_state,
worktree_intact = worktree_intact,
worktree_path = ?worktree_path,
needs_patch = needs_patch,
"Task recovery detected after daemon restart"
);
// Update task in database based on recovery state
if let Some(ref pool) = state.db_pool {
let pool = pool.clone();
let state = state.clone();
tokio::spawn(async move {
if worktree_intact {
// Worktree exists - task can be resumed on this daemon
// Update task status to 'pending' so it can be picked up
match sqlx::query(
r#"
UPDATE tasks
SET status = 'pending',
daemon_id = NULL,
error_message = 'Daemon restarted - task ready for resumption',
interrupted_at = NOW(),
updated_at = NOW()
WHERE id = $1 AND owner_id = $2
RETURNING id
"#,
)
.bind(task_id)
.bind(owner_id)
.fetch_optional(&pool)
.await
{
Ok(Some(_)) => {
tracing::info!(
task_id = %task_id,
"Task marked as pending for resumption"
);
state.broadcast_task_update(TaskUpdateNotification {
task_id,
owner_id: Some(owner_id),
version: 0,
status: "pending".into(),
updated_fields: vec![
"status".into(),
"daemon_id".into(),
"interrupted_at".into(),
],
updated_by: "daemon_recovery".into(),
});
}
Ok(None) => {
tracing::warn!(
task_id = %task_id,
"Task not found during recovery update"
);
}
Err(e) => {
tracing::error!(
task_id = %task_id,
error = %e,
"Failed to update task during recovery"
);
}
}
} else {
// Worktree missing - mark for retry with patch restoration
match repository::mark_task_for_retry(
&pool,
task_id,
daemon_uuid, // Mark this daemon as failed
).await {
Ok(Some(_)) => {
tracing::info!(
task_id = %task_id,
"Task marked for retry (worktree missing)"
);
}
Ok(None) => {
tracing::warn!(
task_id = %task_id,
"Task not found or exceeded retries"
);
}
Err(e) => {
tracing::error!(
task_id = %task_id,
error = %e,
"Failed to mark task for retry"
);
}
}
}
});
}
}
Ok(DaemonMessage::Authenticate { .. }) => {
// Already authenticated, ignore
}
Ok(DaemonMessage::RegisterToolKey { task_id, key }) => {
tracing::info!(
task_id = %task_id,
"Registering tool key for orchestrator"
);
state.register_tool_key(key, task_id);
}
Ok(DaemonMessage::RevokeToolKey { task_id }) => {
tracing::info!(
task_id = %task_id,
"Revoking tool key for task"
);
state.revoke_tool_key(task_id);
}
Ok(DaemonMessage::AuthenticationRequired { task_id, login_url, hostname }) => {
tracing::warn!(
task_id = ?task_id,
login_url = %login_url,
hostname = ?hostname,
"Daemon requires authentication - OAuth token expired"
);
// Broadcast as task output with auth_required type so UI can display the login link
let content = format!(
"🔐 Authentication required on daemon{}. Click to login: {}",
hostname.as_ref().map(|h| format!(" ({})", h)).unwrap_or_default(),
login_url
);
// Broadcast to task subscribers if we have a task_id
if let Some(tid) = task_id {
tracing::info!(task_id = %tid, "Broadcasting auth_required to task subscribers");
state.broadcast_task_output(TaskOutputNotification {
task_id: tid,
owner_id: Some(owner_id),
message_type: "auth_required".to_string(),
content: "Authentication required".to_string(), // Constant for dedup
tool_name: None,
tool_input: Some(serde_json::json!({
"loginUrl": login_url,
"hostname": hostname,
"taskId": tid.to_string(),
})),
is_error: Some(true),
cost_usd: None,
duration_ms: None,
is_partial: false,
});
} else {
tracing::warn!("No task_id for auth_required - cannot broadcast to specific task");
}
// Also log the full URL for manual use
tracing::info!(
login_url = %login_url,
"OAuth login URL available - user should open this in browser"
);
}
Ok(DaemonMessage::ReauthStatus { request_id, status, login_url, error, token_saved }) => {
tracing::info!(
daemon_id = %daemon_uuid,
request_id = %request_id,
status = %status,
login_url = ?login_url,
error = ?error,
token_saved = token_saved,
"Daemon reauth status update"
);
// Store the reauth status for polling by the frontend
state.set_daemon_reauth_status(
daemon_uuid,
request_id,
status.clone(),
login_url.clone(),
error.clone(),
);
}
Ok(DaemonMessage::DaemonDirectories { working_directory, home_directory, worktrees_directory }) => {
tracing::info!(
daemon_id = %daemon_uuid,
working_directory = %working_directory,
home_directory = %home_directory,
worktrees_directory = %worktrees_directory,
"Daemon directories received"
);
state.update_daemon_directories(
&connection_id,
working_directory,
home_directory,
worktrees_directory,
);
}
Ok(DaemonMessage::CompletionActionResult { task_id, success, message, pr_url }) => {
tracing::info!(
task_id = %task_id,
success = success,
message = %message,
pr_url = ?pr_url,
"Completion action result received"
);
// Update task with PR URL if created
if let Some(ref url) = pr_url {
if let Some(ref pool) = state.db_pool {
let update_req = crate::db::models::UpdateTaskRequest {
pr_url: Some(url.clone()),
..Default::default()
};
if let Err(e) = crate::db::repository::update_task(pool, task_id, update_req).await {
tracing::error!("Failed to update task PR URL: {}", e);
}
}
}
// Broadcast as task output so UI can see the result
let output_text = if success {
format!("✓ Completion action succeeded: {}", message)
} else {
format!("✗ Completion action failed: {}", message)
};
state.broadcast_task_output(TaskOutputNotification {
task_id,
owner_id: Some(owner_id),
message_type: "system".to_string(),
content: output_text,
tool_name: None,
tool_input: None,
is_error: Some(!success),
cost_usd: None,
duration_ms: None,
is_partial: false,
});
}
Ok(DaemonMessage::CloneWorktreeResult { task_id, success, message, target_dir }) => {
tracing::info!(
task_id = %task_id,
success = success,
message = %message,
target_dir = ?target_dir,
"Clone worktree result received"
);
// Broadcast as task output so UI can see the result
let output_text = if success {
format!("✓ Clone succeeded: {}", message)
} else {
format!("✗ Clone failed: {}", message)
};
state.broadcast_task_output(TaskOutputNotification {
task_id,
owner_id: Some(owner_id),
message_type: "system".to_string(),
content: output_text,
tool_name: None,
tool_input: None,
is_error: Some(!success),
cost_usd: None,
duration_ms: None,
is_partial: false,
});
}
Ok(DaemonMessage::CheckTargetExistsResult { task_id, exists, target_dir }) => {
tracing::debug!(
task_id = %task_id,
exists = exists,
target_dir = %target_dir,
"Check target exists result received"
);
// Broadcast as task output so UI can use the result
let output_text = if exists {
format!("Target directory exists: {}", target_dir)
} else {
format!("Target directory does not exist: {}", target_dir)
};
state.broadcast_task_output(TaskOutputNotification {
task_id,
owner_id: Some(owner_id),
message_type: "system".to_string(),
content: output_text,
tool_name: None,
tool_input: None,
is_error: None,
cost_usd: None,
duration_ms: None,
is_partial: false,
});
}
Ok(DaemonMessage::RepoFileContent {
request_id,
file_path,
content,
success,
error,
}) => {
tracing::info!(
request_id = %request_id,
file_path = %file_path,
success = success,
content_len = content.as_ref().map(|c| c.len()),
error = ?error,
"Repo file content received from daemon"
);
// The request_id is the file_id we want to update
if success {
if let (Some(pool), Some(content)) = (&state.db_pool, content) {
// Convert markdown to body elements
let body = crate::llm::markdown_to_body(&content);
// Update file in database
let update_req = crate::db::models::UpdateFileRequest {
name: None,
description: None,
transcript: None,
summary: None,
body: Some(body),
version: None,
repo_file_path: None,
};
match repository::update_file_for_owner(pool, request_id, owner_id, update_req).await {
Ok(Some(_file)) => {
tracing::info!(
file_id = %request_id,
"File synced from repository successfully"
);
// Update repo_sync_status to 'synced' and set repo_synced_at
if let Err(e) = sqlx::query(
"UPDATE files SET repo_sync_status = 'synced', repo_synced_at = NOW() WHERE id = $1"
)
.bind(request_id)
.execute(pool)
.await
{
tracing::warn!(
file_id = %request_id,
error = %e,
"Failed to update repo sync status"
);
}
// Broadcast file update notification
state.broadcast_file_update(crate::server::state::FileUpdateNotification {
file_id: request_id,
version: 0, // Will be updated by next fetch
updated_fields: vec!["body".to_string(), "repo_sync_status".to_string()],
updated_by: "daemon".to_string(),
});
}
Ok(None) => {
tracing::warn!(
file_id = %request_id,
"File not found when syncing from repository"
);
}
Err(e) => {
tracing::error!(
file_id = %request_id,
error = %e,
"Failed to update file from repository content"
);
}
}
}
} else {
tracing::warn!(
file_id = %request_id,
error = ?error,
"Daemon failed to read repo file"
);
}
}
Ok(DaemonMessage::BranchCreated { task_id, branch_name, success, error }) => {
tracing::info!(
task_id = ?task_id,
branch_name = %branch_name,
success = success,
error = ?error,
"Branch created notification received"
);
// Broadcast as task output if we have a task_id
if let Some(tid) = task_id {
let output_text = if success {
format!("✓ Branch '{}' created successfully", branch_name)
} else {
format!("✗ Failed to create branch '{}': {}", branch_name, error.unwrap_or_default())
};
state.broadcast_task_output(TaskOutputNotification {
task_id: tid,
owner_id: Some(owner_id),
message_type: "system".to_string(),
content: output_text,
tool_name: None,
tool_input: None,
is_error: Some(!success),
cost_usd: None,
duration_ms: None,
is_partial: false,
});
}
}
Ok(DaemonMessage::CheckpointCreated {
task_id,
success,
commit_sha,
branch_name,
checkpoint_number: _, // We'll get from DB
files_changed,
lines_added,
lines_removed,
error,
message,
patch_data,
patch_base_sha,
patch_files_count,
}) => {
tracing::info!(
task_id = %task_id,
success = success,
commit_sha = ?commit_sha,
has_patch = patch_data.is_some(),
"Checkpoint created notification received"
);
if success {
if let (Some(sha), Some(branch)) = (commit_sha.clone(), branch_name.clone()) {
// Store checkpoint in database
if let Some(pool) = state.db_pool.as_ref() {
match repository::create_task_checkpoint(
pool,
task_id,
&sha,
&branch,
&message,
files_changed.clone(),
lines_added,
lines_removed,
).await {
Ok(checkpoint) => {
tracing::info!(
task_id = %task_id,
checkpoint_id = %checkpoint.id,
checkpoint_number = checkpoint.checkpoint_number,
"Checkpoint stored in database"
);
// Store patch if provided (for task recovery)
if let (Some(patch_b64), Some(base_sha)) = (&patch_data, &patch_base_sha) {
// Decode base64 patch data
match base64::engine::general_purpose::STANDARD.decode(patch_b64) {
Ok(patch_bytes) => {
let files_count = patch_files_count.unwrap_or(0);
// Default TTL: 7 days (168 hours)
let ttl_hours = 168i64;
match repository::create_checkpoint_patch(
pool,
task_id,
Some(checkpoint.id),
base_sha,
&patch_bytes,
files_count,
ttl_hours,
).await {
Ok(patch) => {
tracing::info!(
task_id = %task_id,
patch_id = %patch.id,
patch_size = patch_bytes.len(),
"Checkpoint patch stored for recovery"
);
}
Err(e) => {
tracing::warn!(
task_id = %task_id,
error = %e,
"Failed to store checkpoint patch"
);
}
}
}
Err(e) => {
tracing::warn!(
task_id = %task_id,
error = %e,
"Failed to decode patch base64 data"
);
}
}
}
// Broadcast success as task output
state.broadcast_task_output(TaskOutputNotification {
task_id,
owner_id: Some(owner_id),
message_type: "system".to_string(),
content: format!(
"✓ Checkpoint #{} created: {} ({})",
checkpoint.checkpoint_number,
message,
&sha[..7.min(sha.len())]
),
tool_name: None,
tool_input: None,
is_error: Some(false),
cost_usd: None,
duration_ms: None,
is_partial: false,
});
// Record history event for checkpoint
// Get task to get contract_id
if let Ok(Some(task)) = repository::get_task(pool, task_id).await {
let _ = repository::record_history_event(
pool,
task.owner_id,
task.contract_id,
Some(task_id),
"checkpoint",
Some("created"),
None,
serde_json::json!({
"checkpointNumber": checkpoint.checkpoint_number,
"commitSha": &sha,
"message": &message,
"filesChanged": files_changed,
"linesAdded": lines_added,
"linesRemoved": lines_removed,
"hasPatch": patch_data.is_some(),
}),
).await;
}
}
Err(e) => {
tracing::error!(error = %e, "Failed to store checkpoint in database");
state.broadcast_task_output(TaskOutputNotification {
task_id,
owner_id: Some(owner_id),
message_type: "error".to_string(),
content: format!("Checkpoint commit succeeded but DB storage failed: {}", e),
tool_name: None,
tool_input: None,
is_error: Some(true),
cost_usd: None,
duration_ms: None,
is_partial: false,
});
}
}
}
} else if let (Some(patch_b64), Some(base_sha)) = (&patch_data, &patch_base_sha) {
// Ephemeral patch-only checkpoint (no git commit)
// Store patch directly in checkpoint_patches without a task_checkpoint
if let Some(pool) = state.db_pool.as_ref() {
match base64::engine::general_purpose::STANDARD.decode(patch_b64) {
Ok(patch_bytes) => {
let files_count = patch_files_count.unwrap_or(0);
// Default TTL: 7 days (168 hours)
let ttl_hours = 168i64;
match repository::create_checkpoint_patch(
pool,
task_id,
None, // No checkpoint_id for ephemeral patches
base_sha,
&patch_bytes,
files_count,
ttl_hours,
).await {
Ok(patch) => {
tracing::info!(
task_id = %task_id,
patch_id = %patch.id,
patch_size = patch_bytes.len(),
files_count = files_count,
"Ephemeral patch stored for recovery"
);
state.broadcast_task_output(TaskOutputNotification {
task_id,
owner_id: Some(owner_id),
message_type: "system".to_string(),
content: format!(
"✓ Patch saved: {} ({} files)",
message,
files_count
),
tool_name: None,
tool_input: None,
is_error: Some(false),
cost_usd: None,
duration_ms: None,
is_partial: false,
});
}
Err(e) => {
tracing::warn!(
task_id = %task_id,
error = %e,
"Failed to store ephemeral patch"
);
}
}
}
Err(e) => {
tracing::warn!(
task_id = %task_id,
error = %e,
"Failed to decode ephemeral patch base64 data"
);
}
}
}
}
} else {
// Broadcast failure
let error_msg = error.unwrap_or_else(|| "Unknown error".to_string());
state.broadcast_task_output(TaskOutputNotification {
task_id,
owner_id: Some(owner_id),
message_type: "error".to_string(),
content: format!("✗ Checkpoint failed: {}", error_msg),
tool_name: None,
tool_input: None,
is_error: Some(true),
cost_usd: None,
duration_ms: None,
is_partial: false,
});
}
}
Ok(DaemonMessage::MergeToTargetResult {
task_id,
success,
message,
commit_sha,
conflicts,
}) => {
tracing::info!(
task_id = %task_id,
success = success,
"Merge to target result received"
);
// Broadcast the merge result for waiting handlers
state.broadcast_merge_result(crate::server::state::MergeResultNotification {
task_id,
success,
message: message.clone(),
commit_sha: commit_sha.clone(),
conflicts: conflicts.clone(),
});
// On successful merge, notify supervisor to check if all merges complete
if success {
if let Some(pool) = state.db_pool.as_ref() {
if let Ok(Some(task)) = repository::get_task(pool, task_id).await {
if let Some(contract_id) = task.contract_id {
if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(pool, contract_id).await {
let prompt = format!(
"[INFO] Merge completed: {}\n\
Check if all tasks are merged with `makima supervisor tasks`.\n\
If ready, create PR with `makima supervisor pr`.",
message
);
let _ = state.notify_supervisor(
supervisor.id,
supervisor.daemon_id,
&prompt,
).await;
}
}
}
}
}
}
Ok(DaemonMessage::PRCreated {
task_id,
success,
message,
pr_url,
pr_number,
}) => {
tracing::info!(
task_id = %task_id,
success = success,
pr_url = ?pr_url,
pr_number = ?pr_number,
"PR created result received"
);
// Broadcast the PR result for waiting handlers
state.broadcast_pr_result(crate::server::state::PrResultNotification {
task_id,
success,
message: message.clone(),
pr_url: pr_url.clone(),
pr_number,
});
// Notify supervisor of PR result (both success and failure)
if let Some(pool) = state.db_pool.as_ref() {
if let Ok(Some(task)) = repository::get_task(pool, task_id).await {
if let Some(contract_id) = task.contract_id {
if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(pool, contract_id).await {
let prompt = if success {
// Get contract to determine next action
let next_action = if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, task.owner_id).await {
match (contract.contract_type.as_str(), contract.phase.as_str()) {
("simple", "execute") => {
"Mark contract complete with `makima supervisor complete`".to_string()
}
("specification", "execute") => {
"Advance to review phase with `makima supervisor advance-phase review`".to_string()
}
_ => "Check contract status with `makima supervisor status`".to_string()
}
} else {
"Check contract status with `makima supervisor status`".to_string()
};
format!(
"[ACTION REQUIRED] PR created successfully!\n\
PR: {}\n\n\
Next step: {}",
pr_url.as_deref().unwrap_or(&message),
next_action
)
} else {
format!(
"[ERROR] PR creation failed for task {}:\n\
{}\n\n\
Please fix the issue and retry with `makima supervisor pr`.",
task_id,
message
)
};
let _ = state.notify_supervisor(
supervisor.id,
supervisor.daemon_id,
&prompt,
).await;
}
}
}
}
}
Ok(DaemonMessage::GitConfigInherited {
success,
user_email,
user_name,
error,
}) => {
if success {
tracing::info!(
daemon_id = %daemon_uuid,
user_email = ?user_email,
user_name = ?user_name,
"Daemon inherited git config"
);
} else {
tracing::warn!(
daemon_id = %daemon_uuid,
error = ?error,
"Failed to inherit git config"
);
}
}
Ok(DaemonMessage::ExportPatchCreated {
task_id,
success,
patch_content,
files_count,
lines_added,
lines_removed,
base_commit_sha,
error,
}) => {
if success {
tracing::info!(
task_id = %task_id,
files_count = ?files_count,
lines_added = ?lines_added,
lines_removed = ?lines_removed,
base_commit_sha = ?base_commit_sha,
patch_len = patch_content.as_ref().map(|p| p.len()),
"Export patch created successfully"
);
// Broadcast as task output so UI can access the result
let output_text = format!(
"✓ Export patch created: {} files changed, +{} -{} lines (base: {})",
files_count.unwrap_or(0),
lines_added.unwrap_or(0),
lines_removed.unwrap_or(0),
base_commit_sha.as_deref().unwrap_or("unknown")
);
state.broadcast_task_output(TaskOutputNotification {
task_id,
owner_id: Some(owner_id),
message_type: "export_patch".to_string(),
content: output_text,
tool_name: None,
tool_input: Some(serde_json::json!({
"patchContent": patch_content,
"filesCount": files_count,
"linesAdded": lines_added,
"linesRemoved": lines_removed,
"baseCommitSha": base_commit_sha,
})),
is_error: None,
cost_usd: None,
duration_ms: None,
is_partial: false,
});
} else {
tracing::warn!(
task_id = %task_id,
error = ?error,
"Failed to create export patch"
);
// Broadcast error
state.broadcast_task_output(TaskOutputNotification {
task_id,
owner_id: Some(owner_id),
message_type: "error".to_string(),
content: format!("✗ Export patch failed: {}", error.unwrap_or_else(|| "Unknown error".to_string())),
tool_name: None,
tool_input: None,
is_error: Some(true),
cost_usd: None,
duration_ms: None,
is_partial: false,
});
}
}
Ok(DaemonMessage::WorktreeInfoResult {
task_id,
success,
worktree_path,
exists,
files_changed,
insertions,
deletions,
files,
branch,
head_sha,
error,
}) => {
tracing::debug!(
task_id = %task_id,
success = success,
exists = exists,
files_changed = files_changed,
"Worktree info result received"
);
// Fulfill pending worktree info request if any
if let Some((_, tx)) = state.pending_worktree_info.remove(&task_id) {
let response = crate::server::state::WorktreeInfoResponse {
task_id,
success,
worktree_path,
exists,
files_changed,
insertions,
deletions,
files,
branch,
head_sha,
error,
};
// Ignore send error - receiver may have timed out
let _ = tx.send(response);
}
}
Ok(DaemonMessage::TaskDiff { task_id, success, diff, error }) => {
tracing::debug!(
task_id = %task_id,
success = success,
"Task diff result received"
);
// Fulfill pending task diff request if any
if let Some((_, tx)) = state.pending_task_diff.remove(&task_id) {
let _ = tx.send(crate::server::state::TaskDiffResult {
task_id,
success,
diff,
error,
});
}
}
Ok(DaemonMessage::WorktreeCommitResult { task_id, success, commit_sha, error }) => {
tracing::debug!(
task_id = %task_id,
success = success,
commit_sha = ?commit_sha,
"Worktree commit result received"
);
// Fulfill pending worktree commit request if any
if let Some((_, tx)) = state.pending_worktree_commit.remove(&task_id) {
let _ = tx.send(crate::server::state::WorktreeCommitResponse {
task_id,
success,
commit_sha,
error,
});
}
}
Ok(DaemonMessage::MergePatchToSupervisor {
task_id,
supervisor_task_id,
patch_data,
base_sha,
}) => {
tracing::info!(
task_id = %task_id,
supervisor_task_id = %supervisor_task_id,
base_sha = %base_sha,
"Received cross-daemon merge patch request"
);
// Look up the supervisor task to find which daemon it's on
if let Some(ref pool) = state.db_pool {
match repository::get_task(pool, supervisor_task_id).await {
Ok(Some(supervisor_task)) => {
if let Some(target_daemon_id) = supervisor_task.daemon_id {
// Send ApplyPatchToWorktree command to the supervisor's daemon
let command = crate::server::state::DaemonCommand::ApplyPatchToWorktree {
target_task_id: supervisor_task_id,
source_task_id: task_id,
patch_data,
base_sha,
};
match state.send_daemon_command(target_daemon_id, command).await {
Ok(()) => {
tracing::info!(
task_id = %task_id,
supervisor_task_id = %supervisor_task_id,
target_daemon_id = %target_daemon_id,
"Routed patch merge to supervisor's daemon"
);
}
Err(e) => {
tracing::error!(
task_id = %task_id,
supervisor_task_id = %supervisor_task_id,
target_daemon_id = %target_daemon_id,
error = %e,
"Failed to route patch merge to supervisor's daemon"
);
}
}
} else {
tracing::warn!(
task_id = %task_id,
supervisor_task_id = %supervisor_task_id,
"Supervisor task has no daemon assigned, cannot route patch merge"
);
}
}
Ok(None) => {
tracing::warn!(
task_id = %task_id,
supervisor_task_id = %supervisor_task_id,
"Supervisor task not found, cannot route patch merge"
);
}
Err(e) => {
tracing::error!(
task_id = %task_id,
supervisor_task_id = %supervisor_task_id,
error = %e,
"Failed to look up supervisor task for patch merge"
);
}
}
}
}
Ok(DaemonMessage::WorktreeDiffResult { task_id, success, diff, error }) => {
tracing::debug!(
task_id = %task_id,
success = success,
"Worktree diff result received"
);
// Fulfill pending worktree diff request if any
if let Some((_, tx)) = state.pending_worktree_diff.remove(&task_id) {
let _ = tx.send(crate::server::state::WorktreeDiffResponse {
task_id,
success,
diff: diff.unwrap_or_default(),
error,
});
}
}
Err(e) => {
tracing::warn!("Failed to parse daemon message: {}", e);
}
}
}
Some(Ok(Message::Close(_))) | None => {
tracing::info!("Daemon {} disconnected", daemon_uuid);
break;
}
Some(Err(e)) => {
tracing::warn!("Daemon {} WebSocket error: {}", daemon_uuid, e);
break;
}
_ => {}
}
}
// Handle commands to send to daemon
cmd = cmd_rx.recv() => {
match cmd {
Some(command) => {
let json = serde_json::to_string(&command).unwrap();
if sender.send(Message::Text(json.into())).await.is_err() {
tracing::warn!("Failed to send command to daemon {}", daemon_uuid);
break;
}
}
None => {
// Channel closed
break;
}
}
}
}
}
// Cleanup on disconnect
state.unregister_daemon(&connection_id);
// Delete daemon from database and clear tasks
if let Some(ref pool) = state.db_pool {
let pool = pool.clone();
let conn_id = connection_id.clone();
tokio::spawn(async move {
// Delete daemon from database
if let Err(e) = repository::delete_daemon_by_connection(&pool, &conn_id).await {
tracing::error!(
connection_id = %conn_id,
error = %e,
"Failed to delete daemon from database"
);
}
// Find tasks assigned to this daemon and mark for retry or fail permanently
if let Err(e) = handle_daemon_disconnect_tasks(&pool, daemon_uuid).await {
tracing::error!(
daemon_id = %daemon_uuid,
error = %e,
"Failed to handle daemon disconnect for tasks"
);
}
});
}
}
/// Handle tasks when daemon disconnects - mark for retry or fail permanently.
async fn handle_daemon_disconnect_tasks(pool: &sqlx::PgPool, daemon_id: Uuid) -> Result<(), sqlx::Error> {
// Get all active tasks on this daemon
let active_tasks: Vec<Task> = sqlx::query_as(
r#"
SELECT * FROM tasks
WHERE daemon_id = $1
AND status IN ('starting', 'running', 'initializing')
"#,
)
.bind(daemon_id)
.fetch_all(pool)
.await?;
if active_tasks.is_empty() {
return Ok(());
}
tracing::info!(
daemon_id = %daemon_id,
task_count = active_tasks.len(),
"Processing tasks for disconnected daemon"
);
for task in active_tasks {
if task.retry_count < task.max_retries {
// Mark for retry
match repository::mark_task_for_retry(pool, task.id, daemon_id).await {
Ok(Some(updated_task)) => {
tracing::info!(
task_id = %task.id,
task_name = %task.name,
retry_count = updated_task.retry_count,
max_retries = updated_task.max_retries,
"Task marked for retry after daemon disconnect"
);
}
Ok(None) => {
tracing::warn!(
task_id = %task.id,
"Task not found or already at max retries"
);
}
Err(e) => {
tracing::error!(
task_id = %task.id,
error = %e,
"Failed to mark task for retry"
);
}
}
} else {
// Exceeded retries, mark as permanently failed
if let Err(e) = repository::mark_task_permanently_failed(pool, task.id, daemon_id).await {
tracing::error!(
task_id = %task.id,
error = %e,
"Failed to mark task as permanently failed"
);
} else {
tracing::warn!(
task_id = %task.id,
task_name = %task.name,
retry_count = task.retry_count + 1,
"Task permanently failed: exceeded maximum retries"
);
}
}
}
Ok(())
}