From 8b17a175c3e7e27b789812eba4e3cd760beadb10 Mon Sep 17 00:00:00 2001 From: soryu Date: Tue, 6 Jan 2026 04:08:11 +0000 Subject: Initial Control system --- makima/src/server/handlers/mesh.rs | 1679 ++++++++++++++++++++++++++++++++++++ 1 file changed, 1679 insertions(+) create mode 100644 makima/src/server/handlers/mesh.rs (limited to 'makima/src/server/handlers/mesh.rs') diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs new file mode 100644 index 0000000..760740c --- /dev/null +++ b/makima/src/server/handlers/mesh.rs @@ -0,0 +1,1679 @@ +//! HTTP handlers for task and daemon mesh operations. + +use axum::{ + extract::{Path, State}, + http::{HeaderMap, StatusCode}, + response::IntoResponse, + Json, +}; +use uuid::Uuid; + +use crate::db::models::{ + CreateTaskRequest, DaemonDirectory, DaemonDirectoriesResponse, DaemonListResponse, + SendMessageRequest, Task, TaskEventListResponse, TaskListResponse, TaskOutputEntry, + TaskOutputResponse, TaskWithSubtasks, UpdateTaskRequest, +}; +use crate::db::repository::{self, RepositoryError}; +use crate::server::auth::Authenticated; +use crate::server::messages::ApiError; +use crate::server::state::{DaemonCommand, SharedState, TaskUpdateNotification}; + +// ============================================================================= +// Authentication Types +// ============================================================================= + +/// Source of authentication for mesh endpoints. +#[derive(Debug, Clone)] +pub enum AuthSource { + /// Authenticated via tool key (orchestrator accessing API). + /// Contains the task ID that owns this key. + ToolKey(Uuid), + /// Authenticated via user token (web client). + /// Contains the user ID. (Not implemented yet) + #[allow(dead_code)] + UserToken(Uuid), + /// No authentication provided (anonymous access). + Anonymous, +} + +/// Header name for tool key authentication. +pub const TOOL_KEY_HEADER: &str = "x-makima-tool-key"; + +/// Extract authentication source from request headers. +/// +/// Checks for: +/// 1. `X-Makima-Tool-Key` header for orchestrator tool access +/// 2. `Authorization: Bearer` header for user access (future) +/// 3. Falls back to Anonymous if no auth provided +pub fn extract_auth(state: &SharedState, headers: &HeaderMap) -> AuthSource { + // Check for tool key header first + if let Some(tool_key) = headers.get(TOOL_KEY_HEADER) { + if let Ok(key_str) = tool_key.to_str() { + if let Some(task_id) = state.validate_tool_key(key_str) { + return AuthSource::ToolKey(task_id); + } + tracing::warn!("Invalid tool key provided"); + } + } + + // Check for Authorization header (future user auth) + if let Some(auth_header) = headers.get("authorization") { + if let Ok(auth_str) = auth_header.to_str() { + if auth_str.starts_with("Bearer ") { + // Future: validate JWT and extract user ID + tracing::debug!("Bearer token auth not yet implemented"); + } + } + } + + // Default to anonymous + AuthSource::Anonymous +} + +// ============================================================================= +// Task Handlers +// ============================================================================= + +/// List all tasks for the current owner. +#[utoipa::path( + get, + path = "/api/v1/mesh/tasks", + responses( + (status = 200, description = "List of tasks", body = TaskListResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn list_tasks( + State(state): State, + Authenticated(auth): Authenticated, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match repository::list_tasks_for_owner(pool, auth.owner_id).await { + Ok(tasks) => { + let total = tasks.len() as i64; + Json(TaskListResponse { tasks, total }).into_response() + } + Err(e) => { + tracing::error!("Failed to list tasks: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Get a single task by ID with its subtasks (scoped by owner). +#[utoipa::path( + get, + path = "/api/v1/mesh/tasks/{id}", + params( + ("id" = Uuid, Path, description = "Task ID") + ), + responses( + (status = 200, description = "Task details with subtasks", body = TaskWithSubtasks), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn get_task( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match repository::get_task_for_owner(pool, id, auth.owner_id).await { + Ok(Some(task)) => { + // Get subtasks for this task (also scoped by owner) + match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await { + Ok(subtasks) => Json(TaskWithSubtasks { task, subtasks }).into_response(), + Err(e) => { + tracing::error!("Failed to get subtasks for task {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } + } + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to get task {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Create a new task. +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks", + request_body = CreateTaskRequest, + responses( + (status = 201, description = "Task created", body = Task), + (status = 400, description = "Invalid request", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn create_task( + State(state): State, + Authenticated(auth): Authenticated, + Json(req): Json, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match repository::create_task_for_owner(pool, auth.owner_id, req).await { + Ok(task) => (StatusCode::CREATED, Json(task)).into_response(), + Err(e) => { + tracing::error!("Failed to create task: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Update an existing task (scoped by owner). +#[utoipa::path( + put, + path = "/api/v1/mesh/tasks/{id}", + params( + ("id" = Uuid, Path, description = "Task ID") + ), + request_body = UpdateTaskRequest, + responses( + (status = 200, description = "Task updated", body = Task), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task not found", body = ApiError), + (status = 409, description = "Version conflict", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn update_task( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, + Json(req): Json, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Track which fields are being updated for the notification + let mut updated_fields = Vec::new(); + if req.name.is_some() { + updated_fields.push("name".to_string()); + } + if req.description.is_some() { + updated_fields.push("description".to_string()); + } + if req.status.is_some() { + updated_fields.push("status".to_string()); + } + if req.priority.is_some() { + updated_fields.push("priority".to_string()); + } + if req.plan.is_some() { + updated_fields.push("plan".to_string()); + } + if req.progress_summary.is_some() { + updated_fields.push("progress_summary".to_string()); + } + if req.error_message.is_some() { + updated_fields.push("error_message".to_string()); + } + + match repository::update_task_for_owner(pool, id, auth.owner_id, req).await { + Ok(Some(task)) => { + // Broadcast task update notification + state.broadcast_task_update(TaskUpdateNotification { + task_id: task.id, + owner_id: Some(auth.owner_id), + version: task.version, + status: task.status.clone(), + updated_fields, + updated_by: "user".to_string(), + }); + Json(task).into_response() + } + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(), + Err(RepositoryError::VersionConflict { expected, actual }) => { + tracing::info!( + "Version conflict on task {}: expected {}, actual {}", + id, + expected, + actual + ); + ( + StatusCode::CONFLICT, + Json(serde_json::json!({ + "code": "VERSION_CONFLICT", + "message": format!( + "Task was modified by another user. Expected version {}, actual version {}", + expected, actual + ), + "expectedVersion": expected, + "actualVersion": actual, + })), + ) + .into_response() + } + Err(RepositoryError::Database(e)) => { + tracing::error!("Failed to update task {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Delete a task (scoped by owner). +#[utoipa::path( + delete, + path = "/api/v1/mesh/tasks/{id}", + params( + ("id" = Uuid, Path, description = "Task ID") + ), + responses( + (status = 204, description = "Task deleted"), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn delete_task( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get the task first to check if it's running and needs to be stopped + if let Ok(Some(task)) = repository::get_task_for_owner(pool, id, auth.owner_id).await { + let is_active = matches!( + task.status.as_str(), + "running" | "starting" | "initializing" | "paused" + ); + + // If task is active and has a daemon, send interrupt command + if is_active { + if let Some(daemon_id) = task.daemon_id { + let command = DaemonCommand::InterruptTask { + task_id: id, + graceful: false, + }; + if let Err(e) = state.send_daemon_command(daemon_id, command).await { + tracing::warn!( + task_id = %id, + daemon_id = %daemon_id, + "Failed to send InterruptTask before delete: {}", + e + ); + } else { + tracing::info!( + task_id = %id, + daemon_id = %daemon_id, + "Sent InterruptTask before delete" + ); + } + } + } + } + + match repository::delete_task_for_owner(pool, id, auth.owner_id).await { + Ok(true) => StatusCode::NO_CONTENT.into_response(), + Ok(false) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to delete task {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Start a task by sending it to an available daemon (scoped by owner). +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/start", + params( + ("id" = Uuid, Path, description = "Task ID") + ), + responses( + (status = 200, description = "Task started", body = Task), + (status = 400, description = "Task cannot be started", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task not found", body = ApiError), + (status = 503, description = "Database not configured or no daemons available", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn start_task( + State(state): State, + Authenticated(auth): Authenticated, + headers: HeaderMap, + Path(id): Path, +) -> impl IntoResponse { + // Extract authentication to log who is starting the task + let legacy_auth = extract_auth(&state, &headers); + match &legacy_auth { + AuthSource::ToolKey(orchestrator_id) => { + tracing::info!( + task_id = %id, + orchestrator_task_id = %orchestrator_id, + owner_id = %auth.owner_id, + "Orchestrator starting subtask via tool key" + ); + } + AuthSource::Anonymous => { + tracing::info!( + task_id = %id, + owner_id = %auth.owner_id, + "Starting task (user request)" + ); + } + AuthSource::UserToken(user_id) => { + tracing::info!( + task_id = %id, + user_id = %user_id, + owner_id = %auth.owner_id, + "Starting task via user token" + ); + } + } + + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get the task (scoped by owner) + let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Check if task can be started (allow pending, failed, interrupted, done, or merged) + let startable_statuses = ["pending", "failed", "interrupted", "done", "merged"]; + if !startable_statuses.contains(&task.status.as_str()) { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "INVALID_STATE", + format!("Task cannot be started from status: {}", task.status), + )), + ) + .into_response(); + } + + // Find an available daemon belonging to this owner + let target_daemon_id = match state.daemon_connections + .iter() + .find(|d| d.value().owner_id == auth.owner_id) + { + Some(d) => d.value().id, + None => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new( + "NO_DAEMON", + "No daemons connected for your account. Cannot start task.", + )), + ) + .into_response(); + } + }; + + // Check if this is an orchestrator (depth 0 with subtasks) + let subtask_count = match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await { + Ok(subtasks) => { + tracing::info!( + task_id = %id, + subtask_count = subtasks.len(), + subtask_ids = ?subtasks.iter().map(|s| s.id.to_string()).collect::>(), + "Counted subtasks for orchestrator check" + ); + subtasks.len() + }, + Err(e) => { + tracing::warn!("Failed to check subtasks for {}: {}", id, e); + 0 + } + }; + let is_orchestrator = task.depth == 0 && subtask_count > 0; + + tracing::info!( + task_id = %id, + task_depth = task.depth, + subtask_count = subtask_count, + is_orchestrator = is_orchestrator, + "Starting task with orchestrator determination" + ); + + // IMPORTANT: Update database FIRST to assign daemon_id before sending command + // This prevents race conditions where the task starts but daemon_id is not set + let update_req = UpdateTaskRequest { + status: Some("starting".to_string()), + daemon_id: Some(target_daemon_id), + version: Some(task.version), + ..Default::default() + }; + + let updated_task = match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to update task status: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Send SpawnTask command to daemon + let command = DaemonCommand::SpawnTask { + task_id: id, + task_name: task.name.clone(), + plan: task.plan.clone(), + repo_url: task.repository_url.clone(), + base_branch: task.base_branch.clone(), + target_branch: task.target_branch.clone(), + parent_task_id: task.parent_task_id, + depth: task.depth, + is_orchestrator, + target_repo_path: task.target_repo_path.clone(), + completion_action: task.completion_action.clone(), + continue_from_task_id: task.continue_from_task_id, + copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), + }; + + if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { + tracing::error!("Failed to send SpawnTask command: {}", e); + // Rollback: clear daemon_id and reset status since command failed + let rollback_req = UpdateTaskRequest { + status: Some("pending".to_string()), + clear_daemon_id: true, // Explicitly clear daemon_id + ..Default::default() + }; + let _ = repository::update_task_for_owner(pool, id, auth.owner_id, rollback_req).await; + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DAEMON_ERROR", e)), + ) + .into_response(); + } + + // Broadcast task update notification + state.broadcast_task_update(TaskUpdateNotification { + task_id: id, + owner_id: Some(auth.owner_id), + version: updated_task.version, + status: "starting".to_string(), + updated_fields: vec!["status".to_string(), "daemon_id".to_string()], + updated_by: "system".to_string(), + }); + + Json(updated_task).into_response() +} + +/// Stop a running task (scoped by owner). +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/stop", + params( + ("id" = Uuid, Path, description = "Task ID") + ), + responses( + (status = 200, description = "Task stopped", body = Task), + (status = 400, description = "Task is not running", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task not found", body = ApiError), + (status = 503, description = "Database not configured or daemon not connected", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn stop_task( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get the task (scoped by owner) + let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Check if task is running/active + let is_active = matches!( + task.status.as_str(), + "running" | "starting" | "initializing" | "paused" + ); + if !is_active { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "INVALID_STATE", + format!("Task cannot be stopped from status: {}", task.status), + )), + ) + .into_response(); + } + + // Find the daemon running this task + let target_daemon_id = if let Some(daemon_id) = task.daemon_id { + daemon_id + } else { + // No daemon assigned, just update status directly + let update_req = UpdateTaskRequest { + status: Some("failed".to_string()), + error_message: Some("Task stopped by user".to_string()), + version: Some(task.version), + ..Default::default() + }; + + return match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await { + Ok(Some(updated_task)) => { + state.broadcast_task_update(TaskUpdateNotification { + task_id: id, + owner_id: Some(auth.owner_id), + version: updated_task.version, + status: "failed".to_string(), + updated_fields: vec!["status".to_string(), "error_message".to_string()], + updated_by: "user".to_string(), + }); + Json(updated_task).into_response() + } + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to update task status: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + }; + }; + + // Send InterruptTask command to daemon + let command = DaemonCommand::InterruptTask { + task_id: id, + graceful: false, + }; + + if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { + tracing::warn!("Failed to send InterruptTask command: {}", e); + // Daemon might be disconnected - update task status directly + let update_req = UpdateTaskRequest { + status: Some("failed".to_string()), + error_message: Some("Task stopped by user (daemon unavailable)".to_string()), + version: Some(task.version), + ..Default::default() + }; + + return match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await { + Ok(Some(updated_task)) => { + state.broadcast_task_update(TaskUpdateNotification { + task_id: id, + owner_id: Some(auth.owner_id), + version: updated_task.version, + status: "failed".to_string(), + updated_fields: vec!["status".to_string(), "error_message".to_string()], + updated_by: "user".to_string(), + }); + Json(updated_task).into_response() + } + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to update task status: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + }; + } + + // Update task status to "failed" (stopped) + let update_req = UpdateTaskRequest { + status: Some("failed".to_string()), + error_message: Some("Task stopped by user".to_string()), + version: Some(task.version), + ..Default::default() + }; + + match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await { + Ok(Some(updated_task)) => { + // Broadcast task update notification + state.broadcast_task_update(TaskUpdateNotification { + task_id: id, + owner_id: Some(auth.owner_id), + version: updated_task.version, + status: "failed".to_string(), + updated_fields: vec!["status".to_string(), "error_message".to_string()], + updated_by: "user".to_string(), + }); + + Json(updated_task).into_response() + } + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to update task status: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Send a message to a running task's stdin (scoped by owner). +/// +/// This can be used to provide input to Claude Code when it's waiting for user input, +/// or to inject context/instructions into a running task. +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/message", + params( + ("id" = Uuid, Path, description = "Task ID") + ), + request_body = SendMessageRequest, + responses( + (status = 200, description = "Message sent successfully"), + (status = 400, description = "Task is not running", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task not found", body = ApiError), + (status = 503, description = "Database not configured or daemon not connected", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn send_message( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, + Json(req): Json, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get the task (scoped by owner) + let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Check if task is running + if task.status != "running" { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "INVALID_STATE", + format!( + "Cannot send message to task in status: {}. Task must be running.", + task.status + ), + )), + ) + .into_response(); + } + + // Find the daemon running this task + let target_daemon_id = if let Some(daemon_id) = task.daemon_id { + daemon_id + } else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new( + "NO_DAEMON", + "Task has no assigned daemon. Cannot send message.", + )), + ) + .into_response(); + }; + + // Send SendMessage command to daemon + let command = DaemonCommand::SendMessage { + task_id: id, + message: req.message.clone(), + }; + + if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { + tracing::error!("Failed to send SendMessage command: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DAEMON_ERROR", e)), + ) + .into_response(); + } + + tracing::info!(task_id = %id, message_len = req.message.len(), "Message sent to task"); + + // Return success + ( + StatusCode::OK, + Json(serde_json::json!({ + "success": true, + "taskId": id, + "messageLength": req.message.len() + })), + ) + .into_response() +} + +/// Get task output history (scoped by owner). +/// +/// Retrieves all recorded output from a task's Claude Code process. +/// This allows the frontend to fetch missed output when subscribing late +/// or reconnecting after a disconnect. +#[utoipa::path( + get, + path = "/api/v1/mesh/tasks/{id}/output", + params( + ("id" = Uuid, Path, description = "Task ID") + ), + responses( + (status = 200, description = "Task output history", body = TaskOutputResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn get_task_output( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify task exists and belongs to owner + match repository::get_task_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + + // Get output history (task already verified to belong to owner) + match repository::get_task_output(pool, id, None).await { + Ok(events) => { + let entries: Vec = events + .into_iter() + .filter_map(TaskOutputEntry::from_task_event) + .collect(); + let total = entries.len(); + + Json(TaskOutputResponse { + entries, + total, + task_id: id, + }) + .into_response() + } + Err(e) => { + tracing::error!("Failed to get task output: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// List subtasks for a parent task (scoped by owner). +#[utoipa::path( + get, + path = "/api/v1/mesh/tasks/{id}/subtasks", + params( + ("id" = Uuid, Path, description = "Parent task ID") + ), + responses( + (status = 200, description = "List of subtasks", body = TaskListResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn list_subtasks( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await { + Ok(tasks) => { + let total = tasks.len() as i64; + Json(TaskListResponse { tasks, total }).into_response() + } + Err(e) => { + tracing::error!("Failed to list subtasks for task {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// List events for a task (scoped by owner). +#[utoipa::path( + get, + path = "/api/v1/mesh/tasks/{id}/events", + params( + ("id" = Uuid, Path, description = "Task ID") + ), + responses( + (status = 200, description = "List of task events", body = TaskEventListResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn list_task_events( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify task exists and belongs to owner + match repository::get_task_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + + match repository::list_task_events(pool, id, None).await { + Ok(events) => { + let total = events.len() as i64; + Json(TaskEventListResponse { events, total }).into_response() + } + Err(e) => { + tracing::error!("Failed to list events for task {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Retry completion action for a completed task (scoped by owner). +/// +/// This allows retrying a completion action (push branch, merge, create PR) +/// after filling in the target_repo_path if it wasn't set when the task completed. +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/retry-completion", + params( + ("id" = Uuid, Path, description = "Task ID") + ), + responses( + (status = 200, description = "Completion action initiated"), + (status = 400, description = "Invalid request (task not completed, no completion action, etc.)", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task not found", body = ApiError), + (status = 503, description = "Database not configured or daemon not connected", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn retry_completion_action( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get the task (scoped by owner) + let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Check if task is in a terminal state + let terminal_statuses = ["done", "failed", "merged"]; + if !terminal_statuses.contains(&task.status.as_str()) { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "INVALID_STATE", + format!( + "Task must be completed to retry completion action. Current status: {}", + task.status + ), + )), + ) + .into_response(); + } + + // Check if completion action is set + let action = match &task.completion_action { + Some(action) if action != "none" => action.clone(), + _ => { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "NO_COMPLETION_ACTION", + "Task has no completion action configured (or is set to 'none')", + )), + ) + .into_response(); + } + }; + + // Check if target_repo_path is set + let target_repo_path = match &task.target_repo_path { + Some(path) if !path.is_empty() => path.clone(), + _ => { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "NO_TARGET_REPO", + "Target repository path must be set before retrying completion action", + )), + ) + .into_response(); + } + }; + + // Note: We don't check overlay_path here because the server may not have it + // The daemon will scan its worktrees directory to find the worktree by task ID + + // Find a daemon to execute the action (must belong to this owner) + // Prefer the daemon that ran the task, but fall back to any available daemon for this owner + let target_daemon_id = if let Some(daemon_id) = task.daemon_id { + // Check if this daemon is still connected and belongs to this owner + if state.daemon_connections.iter().any(|d| d.value().id == daemon_id && d.value().owner_id == auth.owner_id) { + daemon_id + } else { + // Fall back to any connected daemon for this owner + match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) { + Some(d) => d.value().id, + None => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new( + "NO_DAEMON", + "No daemons connected for your account. Cannot execute completion action.", + )), + ) + .into_response(); + } + } + } + } else { + // No daemon assigned - use any available for this owner + match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) { + Some(d) => d.value().id, + None => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new( + "NO_DAEMON", + "No daemons connected for your account. Cannot execute completion action.", + )), + ) + .into_response(); + } + } + }; + + // Send RetryCompletionAction command to daemon + let command = DaemonCommand::RetryCompletionAction { + task_id: id, + task_name: task.name.clone(), + action: action.clone(), + target_repo_path: target_repo_path.clone(), + target_branch: task.target_branch.clone(), + }; + + if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { + tracing::error!("Failed to send RetryCompletionAction command: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DAEMON_ERROR", e)), + ) + .into_response(); + } + + tracing::info!( + task_id = %id, + action = %action, + target_repo = %target_repo_path, + "Retry completion action initiated" + ); + + ( + StatusCode::OK, + Json(serde_json::json!({ + "success": true, + "taskId": id, + "action": action, + "targetRepoPath": target_repo_path, + "message": "Completion action initiated. Check task output for results." + })), + ) + .into_response() +} + +// ============================================================================= +// Daemon Handlers +// ============================================================================= + +/// List all connected daemons (requires authentication). +#[utoipa::path( + get, + path = "/api/v1/mesh/daemons", + responses( + (status = 200, description = "List of daemons", body = DaemonListResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn list_daemons( + State(state): State, + Authenticated(auth): Authenticated, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Only list daemons belonging to this owner + match repository::list_daemons_for_owner(pool, auth.owner_id).await { + Ok(daemons) => { + let total = daemons.len() as i64; + Json(DaemonListResponse { daemons, total }).into_response() + } + Err(e) => { + tracing::error!("Failed to list daemons: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Get a single daemon by ID (requires authentication). +#[utoipa::path( + get, + path = "/api/v1/mesh/daemons/{id}", + params( + ("id" = Uuid, Path, description = "Daemon ID") + ), + responses( + (status = 200, description = "Daemon details", body = crate::db::models::Daemon), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Daemon not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn get_daemon( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Only get daemon if it belongs to this owner + match repository::get_daemon_for_owner(pool, id, auth.owner_id).await { + Ok(Some(daemon)) => Json(daemon).into_response(), + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Daemon not found")), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to get daemon {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Get suggested directories from connected daemons (requires authentication). +/// +/// Returns directories that can be used as target_repo_path for completion actions. +#[utoipa::path( + get, + path = "/api/v1/mesh/daemons/directories", + responses( + (status = 200, description = "List of suggested directories", body = DaemonDirectoriesResponse), + (status = 401, description = "Unauthorized", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn get_daemon_directories( + State(state): State, + Authenticated(auth): Authenticated, +) -> impl IntoResponse { + let mut directories = Vec::new(); + + // Iterate over connected daemons belonging to this owner and collect their directories + for entry in state.daemon_connections.iter() { + let daemon = entry.value(); + + // Only include daemons belonging to this owner + if daemon.owner_id != auth.owner_id { + continue; + } + + // Add working directory if available + if let Some(ref working_dir) = daemon.working_directory { + directories.push(DaemonDirectory { + path: working_dir.clone(), + label: "Working Directory".to_string(), + directory_type: "working".to_string(), + hostname: daemon.hostname.clone(), + exists: None, + }); + } + + // Add home directory if available (for cloning completed work) + if let Some(ref home_dir) = daemon.home_directory { + directories.push(DaemonDirectory { + path: home_dir.clone(), + label: "Makima Home".to_string(), + directory_type: "home".to_string(), + hostname: daemon.hostname.clone(), + exists: None, + }); + } + } + + Json(DaemonDirectoriesResponse { directories }) +} + +/// Request to clone a worktree to a target directory. +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CloneWorktreeRequest { + /// Path to the target directory. + pub target_dir: String, +} + +/// Clone a task's worktree to a target directory (scoped by owner). +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/clone", + params( + ("id" = Uuid, Path, description = "Task ID") + ), + request_body = CloneWorktreeRequest, + responses( + (status = 200, description = "Clone command sent"), + (status = 400, description = "Invalid request or task not completed", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task not found", body = ApiError), + (status = 503, description = "Database not configured or daemon not connected", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn clone_worktree( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, + Json(body): Json, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get the task (scoped by owner) + let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Verify task is in a completed state + let is_completed = matches!(task.status.as_str(), "done" | "failed" | "merged"); + if !is_completed { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "INVALID_STATE", + format!("Task must be completed to clone (current status: {})", task.status), + )), + ) + .into_response(); + } + + // Find a connected daemon belonging to this owner to send the command + let daemon_entry = state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id); + let daemon_id = match daemon_entry { + Some(entry) => entry.value().id, + None => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")), + ) + .into_response(); + } + }; + + // Send CloneWorktree command to daemon + let command = DaemonCommand::CloneWorktree { + task_id: id, + target_dir: body.target_dir.clone(), + }; + + if let Err(e) = state.send_daemon_command(daemon_id, command).await { + tracing::error!("Failed to send CloneWorktree command: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DAEMON_ERROR", e)), + ) + .into_response(); + } + + Json(serde_json::json!({ + "status": "cloning", + "taskId": id.to_string(), + "targetDir": body.target_dir, + })) + .into_response() +} + +/// Request to check if a target directory exists. +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CheckTargetExistsRequest { + /// Path to check. + pub target_dir: String, +} + +/// Response for check target exists. +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CheckTargetExistsResponse { + /// Whether the target directory exists. + pub exists: bool, + /// The path that was checked (expanded). + pub target_dir: String, +} + +/// Check if a target directory exists (for clone validation, requires authentication). +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/check-target", + params( + ("id" = Uuid, Path, description = "Task ID") + ), + request_body = CheckTargetExistsRequest, + responses( + (status = 200, description = "Check result", body = CheckTargetExistsResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 503, description = "No daemon connected", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn check_target_exists( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, + Json(body): Json, +) -> impl IntoResponse { + // Find a connected daemon belonging to this owner to send the command + let daemon_entry = state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id); + let daemon_id = match daemon_entry { + Some(entry) => entry.value().id, + None => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")), + ) + .into_response(); + } + }; + + // Send CheckTargetExists command to daemon + let command = DaemonCommand::CheckTargetExists { + task_id: id, + target_dir: body.target_dir.clone(), + }; + + if let Err(e) = state.send_daemon_command(daemon_id, command).await { + tracing::error!("Failed to send CheckTargetExists command: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DAEMON_ERROR", e)), + ) + .into_response(); + } + + // The actual result will be sent back via WebSocket + // For now, just acknowledge the request was sent + Json(serde_json::json!({ + "status": "checking", + "taskId": id.to_string(), + "targetDir": body.target_dir, + })) + .into_response() +} -- cgit v1.2.3