//! HTTP handlers for task and daemon mesh operations.
use axum::{
extract::{Path, State},
http::{HeaderMap, StatusCode},
response::IntoResponse,
Json,
};
use uuid::Uuid;
use crate::db::models::{
CreateTaskRequest, DaemonDirectory, DaemonDirectoriesResponse, DaemonListResponse,
SendMessageRequest, Task, TaskEventListResponse, TaskListResponse, TaskOutputEntry,
TaskOutputResponse, TaskWithSubtasks, UpdateTaskRequest,
};
use crate::db::repository::{self, RepositoryError};
use crate::server::auth::Authenticated;
use crate::server::messages::ApiError;
use crate::server::state::{DaemonCommand, SharedState, TaskUpdateNotification};
// =============================================================================
// Authentication Types
// =============================================================================
/// Source of authentication for mesh endpoints.
#[derive(Debug, Clone)]
pub enum AuthSource {
/// Authenticated via tool key (orchestrator accessing API).
/// Contains the task ID that owns this key.
ToolKey(Uuid),
/// Authenticated via user token (web client).
/// Contains the user ID. (Not implemented yet)
#[allow(dead_code)]
UserToken(Uuid),
/// No authentication provided (anonymous access).
Anonymous,
}
/// Header name for tool key authentication.
pub const TOOL_KEY_HEADER: &str = "x-makima-tool-key";
/// Extract authentication source from request headers.
///
/// Checks for:
/// 1. `X-Makima-Tool-Key` header for orchestrator tool access
/// 2. `Authorization: Bearer` header for user access (future)
/// 3. Falls back to Anonymous if no auth provided
pub fn extract_auth(state: &SharedState, headers: &HeaderMap) -> AuthSource {
// Check for tool key header first
if let Some(tool_key) = headers.get(TOOL_KEY_HEADER) {
if let Ok(key_str) = tool_key.to_str() {
if let Some(task_id) = state.validate_tool_key(key_str) {
return AuthSource::ToolKey(task_id);
}
tracing::warn!("Invalid tool key provided");
}
}
// Check for Authorization header (future user auth)
if let Some(auth_header) = headers.get("authorization") {
if let Ok(auth_str) = auth_header.to_str() {
if auth_str.starts_with("Bearer ") {
// Future: validate JWT and extract user ID
tracing::debug!("Bearer token auth not yet implemented");
}
}
}
// Default to anonymous
AuthSource::Anonymous
}
// =============================================================================
// Task Handlers
// =============================================================================
/// List all tasks for the current owner.
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks",
responses(
(status = 200, description = "List of tasks", body = TaskListResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn list_tasks(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
match repository::list_tasks_for_owner(pool, auth.owner_id).await {
Ok(tasks) => {
let total = tasks.len() as i64;
Json(TaskListResponse { tasks, total }).into_response()
}
Err(e) => {
tracing::error!("Failed to list tasks: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Get a single task by ID with its subtasks (scoped by owner).
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks/{id}",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "Task details with subtasks", body = TaskWithSubtasks),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn get_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(task)) => {
// Get subtasks for this task (also scoped by owner)
match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await {
Ok(subtasks) => Json(TaskWithSubtasks { task, subtasks }).into_response(),
Err(e) => {
tracing::error!("Failed to get subtasks for task {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response(),
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Create a new task.
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks",
request_body = CreateTaskRequest,
responses(
(status = 201, description = "Task created", body = Task),
(status = 400, description = "Invalid request", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn create_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Json(req): Json<CreateTaskRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
match repository::create_task_for_owner(pool, auth.owner_id, req).await {
Ok(task) => (StatusCode::CREATED, Json(task)).into_response(),
Err(e) => {
tracing::error!("Failed to create task: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Update an existing task (scoped by owner).
#[utoipa::path(
put,
path = "/api/v1/mesh/tasks/{id}",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = UpdateTaskRequest,
responses(
(status = 200, description = "Task updated", body = Task),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 409, description = "Version conflict", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn update_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
Json(req): Json<UpdateTaskRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Track which fields are being updated for the notification
let mut updated_fields = Vec::new();
if req.name.is_some() {
updated_fields.push("name".to_string());
}
if req.description.is_some() {
updated_fields.push("description".to_string());
}
if req.status.is_some() {
updated_fields.push("status".to_string());
}
if req.priority.is_some() {
updated_fields.push("priority".to_string());
}
if req.plan.is_some() {
updated_fields.push("plan".to_string());
}
if req.progress_summary.is_some() {
updated_fields.push("progress_summary".to_string());
}
if req.error_message.is_some() {
updated_fields.push("error_message".to_string());
}
match repository::update_task_for_owner(pool, id, auth.owner_id, req).await {
Ok(Some(task)) => {
// Broadcast task update notification
state.broadcast_task_update(TaskUpdateNotification {
task_id: task.id,
owner_id: Some(auth.owner_id),
version: task.version,
status: task.status.clone(),
updated_fields,
updated_by: "user".to_string(),
});
Json(task).into_response()
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response(),
Err(RepositoryError::VersionConflict { expected, actual }) => {
tracing::info!(
"Version conflict on task {}: expected {}, actual {}",
id,
expected,
actual
);
(
StatusCode::CONFLICT,
Json(serde_json::json!({
"code": "VERSION_CONFLICT",
"message": format!(
"Task was modified by another user. Expected version {}, actual version {}",
expected, actual
),
"expectedVersion": expected,
"actualVersion": actual,
})),
)
.into_response()
}
Err(RepositoryError::Database(e)) => {
tracing::error!("Failed to update task {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Delete a task (scoped by owner).
#[utoipa::path(
delete,
path = "/api/v1/mesh/tasks/{id}",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 204, description = "Task deleted"),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn delete_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task first to check if it's running and needs to be stopped
if let Ok(Some(task)) = repository::get_task_for_owner(pool, id, auth.owner_id).await {
let is_active = matches!(
task.status.as_str(),
"running" | "starting" | "initializing" | "paused"
);
// If task is active and has a daemon, send interrupt command
if is_active {
if let Some(daemon_id) = task.daemon_id {
let command = DaemonCommand::InterruptTask {
task_id: id,
graceful: false,
};
if let Err(e) = state.send_daemon_command(daemon_id, command).await {
tracing::warn!(
task_id = %id,
daemon_id = %daemon_id,
"Failed to send InterruptTask before delete: {}",
e
);
} else {
tracing::info!(
task_id = %id,
daemon_id = %daemon_id,
"Sent InterruptTask before delete"
);
}
}
}
}
match repository::delete_task_for_owner(pool, id, auth.owner_id).await {
Ok(true) => StatusCode::NO_CONTENT.into_response(),
Ok(false) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response(),
Err(e) => {
tracing::error!("Failed to delete task {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Start a task by sending it to an available daemon (scoped by owner).
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/start",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "Task started", body = Task),
(status = 400, description = "Task cannot be started", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured or no daemons available", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn start_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
headers: HeaderMap,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
// Extract authentication to log who is starting the task
let legacy_auth = extract_auth(&state, &headers);
match &legacy_auth {
AuthSource::ToolKey(orchestrator_id) => {
tracing::info!(
task_id = %id,
orchestrator_task_id = %orchestrator_id,
owner_id = %auth.owner_id,
"Orchestrator starting subtask via tool key"
);
}
AuthSource::Anonymous => {
tracing::info!(
task_id = %id,
owner_id = %auth.owner_id,
"Starting task (user request)"
);
}
AuthSource::UserToken(user_id) => {
tracing::info!(
task_id = %id,
user_id = %user_id,
owner_id = %auth.owner_id,
"Starting task via user token"
);
}
}
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task (scoped by owner)
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Check if task can be started (allow pending, failed, interrupted, done, or merged)
let startable_statuses = ["pending", "failed", "interrupted", "done", "merged"];
if !startable_statuses.contains(&task.status.as_str()) {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"INVALID_STATE",
format!("Task cannot be started from status: {}", task.status),
)),
)
.into_response();
}
// Find an available daemon belonging to this owner
let target_daemon_id = match state.daemon_connections
.iter()
.find(|d| d.value().owner_id == auth.owner_id)
{
Some(d) => d.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"NO_DAEMON",
"No daemons connected for your account. Cannot start task.",
)),
)
.into_response();
}
};
// Check if this is an orchestrator (depth 0 with subtasks)
let subtask_count = match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await {
Ok(subtasks) => {
tracing::info!(
task_id = %id,
subtask_count = subtasks.len(),
subtask_ids = ?subtasks.iter().map(|s| s.id.to_string()).collect::<Vec<_>>(),
"Counted subtasks for orchestrator check"
);
subtasks.len()
},
Err(e) => {
tracing::warn!("Failed to check subtasks for {}: {}", id, e);
0
}
};
let is_orchestrator = task.depth == 0 && subtask_count > 0;
tracing::info!(
task_id = %id,
task_depth = task.depth,
subtask_count = subtask_count,
is_orchestrator = is_orchestrator,
"Starting task with orchestrator determination"
);
// IMPORTANT: Update database FIRST to assign daemon_id before sending command
// This prevents race conditions where the task starts but daemon_id is not set
let update_req = UpdateTaskRequest {
status: Some("starting".to_string()),
daemon_id: Some(target_daemon_id),
version: Some(task.version),
..Default::default()
};
let updated_task = match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to update task status: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Send SpawnTask command to daemon
let command = DaemonCommand::SpawnTask {
task_id: id,
task_name: task.name.clone(),
plan: task.plan.clone(),
repo_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
target_branch: task.target_branch.clone(),
parent_task_id: task.parent_task_id,
depth: task.depth,
is_orchestrator,
target_repo_path: task.target_repo_path.clone(),
completion_action: task.completion_action.clone(),
continue_from_task_id: task.continue_from_task_id,
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
};
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
tracing::error!("Failed to send SpawnTask command: {}", e);
// Rollback: clear daemon_id and reset status since command failed
let rollback_req = UpdateTaskRequest {
status: Some("pending".to_string()),
clear_daemon_id: true, // Explicitly clear daemon_id
..Default::default()
};
let _ = repository::update_task_for_owner(pool, id, auth.owner_id, rollback_req).await;
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
// Broadcast task update notification
state.broadcast_task_update(TaskUpdateNotification {
task_id: id,
owner_id: Some(auth.owner_id),
version: updated_task.version,
status: "starting".to_string(),
updated_fields: vec!["status".to_string(), "daemon_id".to_string()],
updated_by: "system".to_string(),
});
Json(updated_task).into_response()
}
/// Stop a running task (scoped by owner).
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/stop",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "Task stopped", body = Task),
(status = 400, description = "Task is not running", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured or daemon not connected", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn stop_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task (scoped by owner)
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Check if task is running/active
let is_active = matches!(
task.status.as_str(),
"running" | "starting" | "initializing" | "paused"
);
if !is_active {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"INVALID_STATE",
format!("Task cannot be stopped from status: {}", task.status),
)),
)
.into_response();
}
// Find the daemon running this task
let target_daemon_id = if let Some(daemon_id) = task.daemon_id {
daemon_id
} else {
// No daemon assigned, just update status directly
let update_req = UpdateTaskRequest {
status: Some("failed".to_string()),
error_message: Some("Task stopped by user".to_string()),
version: Some(task.version),
..Default::default()
};
return match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
Ok(Some(updated_task)) => {
state.broadcast_task_update(TaskUpdateNotification {
task_id: id,
owner_id: Some(auth.owner_id),
version: updated_task.version,
status: "failed".to_string(),
updated_fields: vec!["status".to_string(), "error_message".to_string()],
updated_by: "user".to_string(),
});
Json(updated_task).into_response()
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response(),
Err(e) => {
tracing::error!("Failed to update task status: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
};
};
// Send InterruptTask command to daemon
let command = DaemonCommand::InterruptTask {
task_id: id,
graceful: false,
};
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
tracing::warn!("Failed to send InterruptTask command: {}", e);
// Daemon might be disconnected - update task status directly
let update_req = UpdateTaskRequest {
status: Some("failed".to_string()),
error_message: Some("Task stopped by user (daemon unavailable)".to_string()),
version: Some(task.version),
..Default::default()
};
return match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
Ok(Some(updated_task)) => {
state.broadcast_task_update(TaskUpdateNotification {
task_id: id,
owner_id: Some(auth.owner_id),
version: updated_task.version,
status: "failed".to_string(),
updated_fields: vec!["status".to_string(), "error_message".to_string()],
updated_by: "user".to_string(),
});
Json(updated_task).into_response()
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response(),
Err(e) => {
tracing::error!("Failed to update task status: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
};
}
// Update task status to "failed" (stopped)
let update_req = UpdateTaskRequest {
status: Some("failed".to_string()),
error_message: Some("Task stopped by user".to_string()),
version: Some(task.version),
..Default::default()
};
match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
Ok(Some(updated_task)) => {
// Broadcast task update notification
state.broadcast_task_update(TaskUpdateNotification {
task_id: id,
owner_id: Some(auth.owner_id),
version: updated_task.version,
status: "failed".to_string(),
updated_fields: vec!["status".to_string(), "error_message".to_string()],
updated_by: "user".to_string(),
});
Json(updated_task).into_response()
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response(),
Err(e) => {
tracing::error!("Failed to update task status: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Send a message to a running task's stdin (scoped by owner).
///
/// This can be used to provide input to Claude Code when it's waiting for user input,
/// or to inject context/instructions into a running task.
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/message",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = SendMessageRequest,
responses(
(status = 200, description = "Message sent successfully"),
(status = 400, description = "Task is not running", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured or daemon not connected", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn send_message(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
Json(req): Json<SendMessageRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task (scoped by owner)
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Check if task is running
if task.status != "running" {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"INVALID_STATE",
format!(
"Cannot send message to task in status: {}. Task must be running.",
task.status
),
)),
)
.into_response();
}
// Find the daemon running this task
let target_daemon_id = if let Some(daemon_id) = task.daemon_id {
daemon_id
} else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"NO_DAEMON",
"Task has no assigned daemon. Cannot send message.",
)),
)
.into_response();
};
// Send SendMessage command to daemon
let command = DaemonCommand::SendMessage {
task_id: id,
message: req.message.clone(),
};
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
tracing::error!("Failed to send SendMessage command: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
tracing::info!(task_id = %id, message_len = req.message.len(), "Message sent to task");
// Return success
(
StatusCode::OK,
Json(serde_json::json!({
"success": true,
"taskId": id,
"messageLength": req.message.len()
})),
)
.into_response()
}
/// Get task output history (scoped by owner).
///
/// Retrieves all recorded output from a task's Claude Code process.
/// This allows the frontend to fetch missed output when subscribing late
/// or reconnecting after a disconnect.
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks/{id}/output",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "Task output history", body = TaskOutputResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn get_task_output(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Verify task exists and belongs to owner
match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(_)) => {}
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
}
// Get output history (task already verified to belong to owner)
match repository::get_task_output(pool, id, None).await {
Ok(events) => {
let entries: Vec<TaskOutputEntry> = events
.into_iter()
.filter_map(TaskOutputEntry::from_task_event)
.collect();
let total = entries.len();
Json(TaskOutputResponse {
entries,
total,
task_id: id,
})
.into_response()
}
Err(e) => {
tracing::error!("Failed to get task output: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// List subtasks for a parent task (scoped by owner).
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks/{id}/subtasks",
params(
("id" = Uuid, Path, description = "Parent task ID")
),
responses(
(status = 200, description = "List of subtasks", body = TaskListResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn list_subtasks(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await {
Ok(tasks) => {
let total = tasks.len() as i64;
Json(TaskListResponse { tasks, total }).into_response()
}
Err(e) => {
tracing::error!("Failed to list subtasks for task {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// List events for a task (scoped by owner).
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks/{id}/events",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "List of task events", body = TaskEventListResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn list_task_events(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Verify task exists and belongs to owner
match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(_)) => {}
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
}
match repository::list_task_events(pool, id, None).await {
Ok(events) => {
let total = events.len() as i64;
Json(TaskEventListResponse { events, total }).into_response()
}
Err(e) => {
tracing::error!("Failed to list events for task {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Retry completion action for a completed task (scoped by owner).
///
/// This allows retrying a completion action (push branch, merge, create PR)
/// after filling in the target_repo_path if it wasn't set when the task completed.
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/retry-completion",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "Completion action initiated"),
(status = 400, description = "Invalid request (task not completed, no completion action, etc.)", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured or daemon not connected", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn retry_completion_action(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task (scoped by owner)
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Check if task is in a terminal state
let terminal_statuses = ["done", "failed", "merged"];
if !terminal_statuses.contains(&task.status.as_str()) {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"INVALID_STATE",
format!(
"Task must be completed to retry completion action. Current status: {}",
task.status
),
)),
)
.into_response();
}
// Check if completion action is set
let action = match &task.completion_action {
Some(action) if action != "none" => action.clone(),
_ => {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"NO_COMPLETION_ACTION",
"Task has no completion action configured (or is set to 'none')",
)),
)
.into_response();
}
};
// Check if target_repo_path is set
let target_repo_path = match &task.target_repo_path {
Some(path) if !path.is_empty() => path.clone(),
_ => {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"NO_TARGET_REPO",
"Target repository path must be set before retrying completion action",
)),
)
.into_response();
}
};
// Note: We don't check overlay_path here because the server may not have it
// The daemon will scan its worktrees directory to find the worktree by task ID
// Find a daemon to execute the action (must belong to this owner)
// Prefer the daemon that ran the task, but fall back to any available daemon for this owner
let target_daemon_id = if let Some(daemon_id) = task.daemon_id {
// Check if this daemon is still connected and belongs to this owner
if state.daemon_connections.iter().any(|d| d.value().id == daemon_id && d.value().owner_id == auth.owner_id) {
daemon_id
} else {
// Fall back to any connected daemon for this owner
match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) {
Some(d) => d.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"NO_DAEMON",
"No daemons connected for your account. Cannot execute completion action.",
)),
)
.into_response();
}
}
}
} else {
// No daemon assigned - use any available for this owner
match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) {
Some(d) => d.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"NO_DAEMON",
"No daemons connected for your account. Cannot execute completion action.",
)),
)
.into_response();
}
}
};
// Send RetryCompletionAction command to daemon
let command = DaemonCommand::RetryCompletionAction {
task_id: id,
task_name: task.name.clone(),
action: action.clone(),
target_repo_path: target_repo_path.clone(),
target_branch: task.target_branch.clone(),
};
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
tracing::error!("Failed to send RetryCompletionAction command: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
tracing::info!(
task_id = %id,
action = %action,
target_repo = %target_repo_path,
"Retry completion action initiated"
);
(
StatusCode::OK,
Json(serde_json::json!({
"success": true,
"taskId": id,
"action": action,
"targetRepoPath": target_repo_path,
"message": "Completion action initiated. Check task output for results."
})),
)
.into_response()
}
// =============================================================================
// Daemon Handlers
// =============================================================================
/// List all connected daemons (requires authentication).
#[utoipa::path(
get,
path = "/api/v1/mesh/daemons",
responses(
(status = 200, description = "List of daemons", body = DaemonListResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn list_daemons(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Only list daemons belonging to this owner
match repository::list_daemons_for_owner(pool, auth.owner_id).await {
Ok(daemons) => {
let total = daemons.len() as i64;
Json(DaemonListResponse { daemons, total }).into_response()
}
Err(e) => {
tracing::error!("Failed to list daemons: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Get a single daemon by ID (requires authentication).
#[utoipa::path(
get,
path = "/api/v1/mesh/daemons/{id}",
params(
("id" = Uuid, Path, description = "Daemon ID")
),
responses(
(status = 200, description = "Daemon details", body = crate::db::models::Daemon),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Daemon not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn get_daemon(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Only get daemon if it belongs to this owner
match repository::get_daemon_for_owner(pool, id, auth.owner_id).await {
Ok(Some(daemon)) => Json(daemon).into_response(),
Ok(None) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Daemon not found")),
)
.into_response(),
Err(e) => {
tracing::error!("Failed to get daemon {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Get suggested directories from connected daemons (requires authentication).
///
/// Returns directories that can be used as target_repo_path for completion actions.
#[utoipa::path(
get,
path = "/api/v1/mesh/daemons/directories",
responses(
(status = 200, description = "List of suggested directories", body = DaemonDirectoriesResponse),
(status = 401, description = "Unauthorized", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn get_daemon_directories(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
) -> impl IntoResponse {
let mut directories = Vec::new();
// Iterate over connected daemons belonging to this owner and collect their directories
for entry in state.daemon_connections.iter() {
let daemon = entry.value();
// Only include daemons belonging to this owner
if daemon.owner_id != auth.owner_id {
continue;
}
// Add working directory if available
if let Some(ref working_dir) = daemon.working_directory {
directories.push(DaemonDirectory {
path: working_dir.clone(),
label: "Working Directory".to_string(),
directory_type: "working".to_string(),
hostname: daemon.hostname.clone(),
exists: None,
});
}
// Add home directory if available (for cloning completed work)
if let Some(ref home_dir) = daemon.home_directory {
directories.push(DaemonDirectory {
path: home_dir.clone(),
label: "Makima Home".to_string(),
directory_type: "home".to_string(),
hostname: daemon.hostname.clone(),
exists: None,
});
}
}
Json(DaemonDirectoriesResponse { directories })
}
/// Request to clone a worktree to a target directory.
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CloneWorktreeRequest {
/// Path to the target directory.
pub target_dir: String,
}
/// Clone a task's worktree to a target directory (scoped by owner).
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/clone",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = CloneWorktreeRequest,
responses(
(status = 200, description = "Clone command sent"),
(status = 400, description = "Invalid request or task not completed", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured or daemon not connected", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn clone_worktree(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
Json(body): Json<CloneWorktreeRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task (scoped by owner)
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Verify task is in a completed state
let is_completed = matches!(task.status.as_str(), "done" | "failed" | "merged");
if !is_completed {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"INVALID_STATE",
format!("Task must be completed to clone (current status: {})", task.status),
)),
)
.into_response();
}
// Find a connected daemon belonging to this owner to send the command
let daemon_entry = state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id);
let daemon_id = match daemon_entry {
Some(entry) => entry.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")),
)
.into_response();
}
};
// Send CloneWorktree command to daemon
let command = DaemonCommand::CloneWorktree {
task_id: id,
target_dir: body.target_dir.clone(),
};
if let Err(e) = state.send_daemon_command(daemon_id, command).await {
tracing::error!("Failed to send CloneWorktree command: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
Json(serde_json::json!({
"status": "cloning",
"taskId": id.to_string(),
"targetDir": body.target_dir,
}))
.into_response()
}
/// Request to check if a target directory exists.
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CheckTargetExistsRequest {
/// Path to check.
pub target_dir: String,
}
/// Response for check target exists.
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CheckTargetExistsResponse {
/// Whether the target directory exists.
pub exists: bool,
/// The path that was checked (expanded).
pub target_dir: String,
}
/// Check if a target directory exists (for clone validation, requires authentication).
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/check-target",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = CheckTargetExistsRequest,
responses(
(status = 200, description = "Check result", body = CheckTargetExistsResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 503, description = "No daemon connected", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn check_target_exists(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
Json(body): Json<CheckTargetExistsRequest>,
) -> impl IntoResponse {
// Find a connected daemon belonging to this owner to send the command
let daemon_entry = state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id);
let daemon_id = match daemon_entry {
Some(entry) => entry.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")),
)
.into_response();
}
};
// Send CheckTargetExists command to daemon
let command = DaemonCommand::CheckTargetExists {
task_id: id,
target_dir: body.target_dir.clone(),
};
if let Err(e) = state.send_daemon_command(daemon_id, command).await {
tracing::error!("Failed to send CheckTargetExists command: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
// The actual result will be sent back via WebSocket
// For now, just acknowledge the request was sent
Json(serde_json::json!({
"status": "checking",
"taskId": id.to_string(),
"targetDir": body.target_dir,
}))
.into_response()
}