//! HTTP handlers for task and daemon mesh operations.
use axum::{
extract::{Path, State},
http::{HeaderMap, StatusCode},
response::IntoResponse,
Json,
};
use uuid::Uuid;
use crate::db::models::{
CreateTaskRequest, DaemonDirectory, DaemonDirectoriesResponse, DaemonListResponse,
SendMessageRequest, Task, TaskEventListResponse, TaskListResponse, TaskOutputEntry,
TaskOutputResponse, TaskWithSubtasks, UpdateTaskRequest,
};
use crate::db::repository::{self, RepositoryError};
use crate::server::auth::Authenticated;
use crate::server::messages::ApiError;
use crate::server::state::{DaemonCommand, SharedState, TaskUpdateNotification};
// =============================================================================
// Authentication Types
// =============================================================================
/// Source of authentication for mesh endpoints.
#[derive(Debug, Clone)]
pub enum AuthSource {
/// Authenticated via tool key (orchestrator accessing API).
/// Contains the task ID that owns this key.
ToolKey(Uuid),
/// Authenticated via user token (web client).
/// Contains the user ID. (Not implemented yet)
#[allow(dead_code)]
UserToken(Uuid),
/// No authentication provided (anonymous access).
Anonymous,
}
/// Header name for tool key authentication.
pub const TOOL_KEY_HEADER: &str = "x-makima-tool-key";
/// Extract authentication source from request headers.
///
/// Checks for:
/// 1. `X-Makima-Tool-Key` header for orchestrator tool access
/// 2. `Authorization: Bearer` header for user access (future)
/// 3. Falls back to Anonymous if no auth provided
pub fn extract_auth(state: &SharedState, headers: &HeaderMap) -> AuthSource {
// Check for tool key header first
if let Some(tool_key) = headers.get(TOOL_KEY_HEADER) {
if let Ok(key_str) = tool_key.to_str() {
if let Some(task_id) = state.validate_tool_key(key_str) {
return AuthSource::ToolKey(task_id);
}
tracing::warn!("Invalid tool key provided: {}", key_str);
}
}
// Check for Authorization header (future user auth)
if let Some(auth_header) = headers.get("authorization") {
if let Ok(auth_str) = auth_header.to_str() {
if auth_str.starts_with("Bearer ") {
// Future: validate JWT and extract user ID
tracing::debug!("Bearer token auth not yet implemented");
}
}
}
// Default to anonymous
AuthSource::Anonymous
}
// =============================================================================
// Task Handlers
// =============================================================================
/// List all tasks for the current owner.
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks",
responses(
(status = 200, description = "List of tasks", body = TaskListResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn list_tasks(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
match repository::list_tasks_for_owner(pool, auth.owner_id).await {
Ok(tasks) => {
let total = tasks.len() as i64;
Json(TaskListResponse { tasks, total }).into_response()
}
Err(e) => {
tracing::error!("Failed to list tasks: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Get a single task by ID with its subtasks (scoped by owner).
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks/{id}",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "Task details with subtasks", body = TaskWithSubtasks),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn get_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(task)) => {
// Get subtasks for this task (also scoped by owner)
match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await {
Ok(subtasks) => Json(TaskWithSubtasks { task, subtasks }).into_response(),
Err(e) => {
tracing::error!("Failed to get subtasks for task {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response(),
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Create a new task.
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks",
request_body = CreateTaskRequest,
responses(
(status = 201, description = "Task created", body = Task),
(status = 400, description = "Invalid request", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn create_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Json(req): Json<CreateTaskRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
match repository::create_task_for_owner(pool, auth.owner_id, req).await {
Ok(task) => {
// Notify supervisor of new task creation if task belongs to a contract
if let Some(contract_id) = task.contract_id {
if !task.is_supervisor {
let pool = pool.clone();
let state_clone = state.clone();
let task_clone = task.clone();
tokio::spawn(async move {
if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await {
state_clone.notify_supervisor_of_task_created(
supervisor.id,
supervisor.daemon_id,
task_clone.id,
&task_clone.name,
).await;
}
});
}
}
(StatusCode::CREATED, Json(task)).into_response()
}
Err(e) => {
tracing::error!("Failed to create task: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Update an existing task (scoped by owner).
#[utoipa::path(
put,
path = "/api/v1/mesh/tasks/{id}",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = UpdateTaskRequest,
responses(
(status = 200, description = "Task updated", body = Task),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 409, description = "Version conflict", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn update_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
Json(req): Json<UpdateTaskRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Check if trying to set a supervisor task to a terminal status
if let Some(ref new_status) = req.status {
let terminal_statuses = ["done", "failed", "merged"];
if terminal_statuses.contains(&new_status.as_str()) {
// Get the task to check if it's a supervisor
if let Ok(Some(task)) = repository::get_task_for_owner(pool, id, auth.owner_id).await {
if task.is_supervisor {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"SUPERVISOR_CANNOT_COMPLETE",
"Supervisor tasks cannot be marked as done, failed, or merged. They run for the lifetime of the contract.",
)),
)
.into_response();
}
}
}
}
// Track which fields are being updated for the notification
let mut updated_fields = Vec::new();
if req.name.is_some() {
updated_fields.push("name".to_string());
}
if req.description.is_some() {
updated_fields.push("description".to_string());
}
if req.status.is_some() {
updated_fields.push("status".to_string());
}
if req.priority.is_some() {
updated_fields.push("priority".to_string());
}
if req.plan.is_some() {
updated_fields.push("plan".to_string());
}
if req.progress_summary.is_some() {
updated_fields.push("progress_summary".to_string());
}
if req.error_message.is_some() {
updated_fields.push("error_message".to_string());
}
match repository::update_task_for_owner(pool, id, auth.owner_id, req).await {
Ok(Some(task)) => {
let updated_fields_clone = updated_fields.clone();
// Broadcast task update notification
state.broadcast_task_update(TaskUpdateNotification {
task_id: task.id,
owner_id: Some(auth.owner_id),
version: task.version,
status: task.status.clone(),
updated_fields,
updated_by: "user".to_string(),
});
// Notify supervisor of status change if task belongs to a contract
if let Some(contract_id) = task.contract_id {
if !task.is_supervisor && updated_fields_clone.contains(&"status".to_string()) {
let pool = pool.clone();
let state_clone = state.clone();
let task_clone = task.clone();
tokio::spawn(async move {
if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await {
state_clone.notify_supervisor_of_task_update(
supervisor.id,
supervisor.daemon_id,
task_clone.id,
&task_clone.name,
&task_clone.status,
&updated_fields_clone,
).await;
}
});
}
}
Json(task).into_response()
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response(),
Err(RepositoryError::VersionConflict { expected, actual }) => {
tracing::info!(
"Version conflict on task {}: expected {}, actual {}",
id,
expected,
actual
);
(
StatusCode::CONFLICT,
Json(serde_json::json!({
"code": "VERSION_CONFLICT",
"message": format!(
"Task was modified by another user. Expected version {}, actual version {}",
expected, actual
),
"expectedVersion": expected,
"actualVersion": actual,
})),
)
.into_response()
}
Err(RepositoryError::Database(e)) => {
tracing::error!("Failed to update task {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Delete a task (scoped by owner).
#[utoipa::path(
delete,
path = "/api/v1/mesh/tasks/{id}",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 204, description = "Task deleted"),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn delete_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task first to check if it's running and needs to be stopped
if let Ok(Some(task)) = repository::get_task_for_owner(pool, id, auth.owner_id).await {
let is_active = matches!(
task.status.as_str(),
"running" | "starting" | "initializing" | "paused"
);
// If task is active and has a daemon, send interrupt command
if is_active {
if let Some(daemon_id) = task.daemon_id {
let command = DaemonCommand::InterruptTask {
task_id: id,
graceful: false,
};
if let Err(e) = state.send_daemon_command(daemon_id, command).await {
tracing::warn!(
task_id = %id,
daemon_id = %daemon_id,
"Failed to send InterruptTask before delete: {}",
e
);
} else {
tracing::info!(
task_id = %id,
daemon_id = %daemon_id,
"Sent InterruptTask before delete"
);
}
}
}
}
match repository::delete_task_for_owner(pool, id, auth.owner_id).await {
Ok(true) => StatusCode::NO_CONTENT.into_response(),
Ok(false) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response(),
Err(e) => {
tracing::error!("Failed to delete task {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Start a task by sending it to an available daemon (scoped by owner).
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/start",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "Task started", body = Task),
(status = 400, description = "Task cannot be started", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured or no daemons available", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn start_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
headers: HeaderMap,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
// Extract authentication to log who is starting the task
let legacy_auth = extract_auth(&state, &headers);
match &legacy_auth {
AuthSource::ToolKey(orchestrator_id) => {
tracing::info!(
task_id = %id,
orchestrator_task_id = %orchestrator_id,
owner_id = %auth.owner_id,
"Orchestrator starting subtask via tool key"
);
}
AuthSource::Anonymous => {
tracing::info!(
task_id = %id,
owner_id = %auth.owner_id,
"Starting task (user request)"
);
}
AuthSource::UserToken(user_id) => {
tracing::info!(
task_id = %id,
user_id = %user_id,
owner_id = %auth.owner_id,
"Starting task via user token"
);
}
}
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task (scoped by owner)
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Check if task can be started (allow pending, failed, interrupted, done, or merged)
let startable_statuses = ["pending", "failed", "interrupted", "done", "merged"];
if !startable_statuses.contains(&task.status.as_str()) {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"INVALID_STATE",
format!("Task cannot be started from status: {}", task.status),
)),
)
.into_response();
}
// Find an available daemon belonging to this owner
let target_daemon_id = match state.daemon_connections
.iter()
.find(|d| d.value().owner_id == auth.owner_id)
{
Some(d) => d.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"NO_DAEMON",
"No daemons connected for your account. Cannot start task.",
)),
)
.into_response();
}
};
// Check if this is an orchestrator (depth 0 with subtasks)
let subtask_count = match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await {
Ok(subtasks) => {
tracing::info!(
task_id = %id,
subtask_count = subtasks.len(),
subtask_ids = ?subtasks.iter().map(|s| s.id.to_string()).collect::<Vec<_>>(),
"Counted subtasks for orchestrator check"
);
subtasks.len()
},
Err(e) => {
tracing::warn!("Failed to check subtasks for {}: {}", id, e);
0
}
};
let is_orchestrator = task.depth == 0 && subtask_count > 0;
tracing::info!(
task_id = %id,
task_depth = task.depth,
subtask_count = subtask_count,
is_orchestrator = is_orchestrator,
is_supervisor = task.is_supervisor,
"Starting task with orchestrator/supervisor determination"
);
// IMPORTANT: Update database FIRST to assign daemon_id before sending command
// This prevents race conditions where the task starts but daemon_id is not set
let update_req = UpdateTaskRequest {
status: Some("starting".to_string()),
daemon_id: Some(target_daemon_id),
version: Some(task.version),
..Default::default()
};
let updated_task = match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to update task status: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Send SpawnTask command to daemon
let command = DaemonCommand::SpawnTask {
task_id: id,
task_name: task.name.clone(),
plan: task.plan.clone(),
repo_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
target_branch: task.target_branch.clone(),
parent_task_id: task.parent_task_id,
depth: task.depth,
is_orchestrator,
target_repo_path: task.target_repo_path.clone(),
completion_action: task.completion_action.clone(),
continue_from_task_id: task.continue_from_task_id,
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
};
tracing::info!(
task_id = %id,
is_supervisor = task.is_supervisor,
is_orchestrator = is_orchestrator,
daemon_id = %target_daemon_id,
"Sending SpawnTask command to daemon"
);
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
tracing::error!("Failed to send SpawnTask command: {}", e);
// Rollback: clear daemon_id and reset status since command failed
let rollback_req = UpdateTaskRequest {
status: Some("pending".to_string()),
clear_daemon_id: true, // Explicitly clear daemon_id
..Default::default()
};
let _ = repository::update_task_for_owner(pool, id, auth.owner_id, rollback_req).await;
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
// Broadcast task update notification
state.broadcast_task_update(TaskUpdateNotification {
task_id: id,
owner_id: Some(auth.owner_id),
version: updated_task.version,
status: "starting".to_string(),
updated_fields: vec!["status".to_string(), "daemon_id".to_string()],
updated_by: "system".to_string(),
});
Json(updated_task).into_response()
}
/// Stop a running task (scoped by owner).
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/stop",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "Task stopped", body = Task),
(status = 400, description = "Task is not running", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured or daemon not connected", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn stop_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task (scoped by owner)
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Check if task is running/active
let is_active = matches!(
task.status.as_str(),
"running" | "starting" | "initializing" | "paused"
);
if !is_active {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"INVALID_STATE",
format!("Task cannot be stopped from status: {}", task.status),
)),
)
.into_response();
}
// Find the daemon running this task
let target_daemon_id = if let Some(daemon_id) = task.daemon_id {
daemon_id
} else {
// No daemon assigned, just update status directly
let update_req = UpdateTaskRequest {
status: Some("failed".to_string()),
error_message: Some("Task stopped by user".to_string()),
version: Some(task.version),
..Default::default()
};
return match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
Ok(Some(updated_task)) => {
state.broadcast_task_update(TaskUpdateNotification {
task_id: id,
owner_id: Some(auth.owner_id),
version: updated_task.version,
status: "failed".to_string(),
updated_fields: vec!["status".to_string(), "error_message".to_string()],
updated_by: "user".to_string(),
});
Json(updated_task).into_response()
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response(),
Err(e) => {
tracing::error!("Failed to update task status: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
};
};
// Send InterruptTask command to daemon
let command = DaemonCommand::InterruptTask {
task_id: id,
graceful: false,
};
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
tracing::warn!("Failed to send InterruptTask command: {}", e);
// Daemon might be disconnected - update task status directly
let update_req = UpdateTaskRequest {
status: Some("failed".to_string()),
error_message: Some("Task stopped by user (daemon unavailable)".to_string()),
version: Some(task.version),
..Default::default()
};
return match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
Ok(Some(updated_task)) => {
state.broadcast_task_update(TaskUpdateNotification {
task_id: id,
owner_id: Some(auth.owner_id),
version: updated_task.version,
status: "failed".to_string(),
updated_fields: vec!["status".to_string(), "error_message".to_string()],
updated_by: "user".to_string(),
});
Json(updated_task).into_response()
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response(),
Err(e) => {
tracing::error!("Failed to update task status: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
};
}
// Update task status to "failed" (stopped)
let update_req = UpdateTaskRequest {
status: Some("failed".to_string()),
error_message: Some("Task stopped by user".to_string()),
version: Some(task.version),
..Default::default()
};
match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
Ok(Some(updated_task)) => {
// Broadcast task update notification
state.broadcast_task_update(TaskUpdateNotification {
task_id: id,
owner_id: Some(auth.owner_id),
version: updated_task.version,
status: "failed".to_string(),
updated_fields: vec!["status".to_string(), "error_message".to_string()],
updated_by: "user".to_string(),
});
Json(updated_task).into_response()
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response(),
Err(e) => {
tracing::error!("Failed to update task status: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Send a message to a running task's stdin (scoped by owner).
///
/// This can be used to provide input to Claude Code when it's waiting for user input,
/// or to inject context/instructions into a running task.
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/message",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = SendMessageRequest,
responses(
(status = 200, description = "Message sent successfully"),
(status = 400, description = "Task is not running", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured or daemon not connected", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn send_message(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
Json(req): Json<SendMessageRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task (scoped by owner)
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Check if task is running (except for AUTH_CODE messages and supervisor tasks)
// Supervisor tasks can receive messages even when not running - daemon will respawn Claude
let is_auth_code = req.message.starts_with("AUTH_CODE:");
let is_supervisor = task.is_supervisor;
if task.status != "running" && !is_auth_code && !is_supervisor {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"INVALID_STATE",
format!(
"Cannot send message to task in status: {}. Task must be running.",
task.status
),
)),
)
.into_response();
}
// Find the daemon running this task
// For supervisors, if no daemon is assigned, find any available daemon for this owner
let target_daemon_id = if let Some(daemon_id) = task.daemon_id {
daemon_id
} else if is_supervisor {
// Supervisor without daemon - find one
match state.daemon_connections
.iter()
.find(|d| d.value().owner_id == auth.owner_id)
{
Some(entry) => entry.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"NO_DAEMON",
"No daemon available. Please start a daemon.",
)),
)
.into_response();
}
}
} else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"NO_DAEMON",
"Task has no assigned daemon. Cannot send message.",
)),
)
.into_response();
};
// Send SendMessage command to daemon
let command = DaemonCommand::SendMessage {
task_id: id,
message: req.message.clone(),
};
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
tracing::error!("Failed to send SendMessage command: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
tracing::info!(task_id = %id, message_len = req.message.len(), "Message sent to task");
// Return success
(
StatusCode::OK,
Json(serde_json::json!({
"success": true,
"taskId": id,
"messageLength": req.message.len()
})),
)
.into_response()
}
/// Get task output history (scoped by owner).
///
/// Retrieves all recorded output from a task's Claude Code process.
/// This allows the frontend to fetch missed output when subscribing late
/// or reconnecting after a disconnect.
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks/{id}/output",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "Task output history", body = TaskOutputResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn get_task_output(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Verify task exists and belongs to owner
match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(_)) => {}
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
}
// Get output history (task already verified to belong to owner)
match repository::get_task_output(pool, id, None).await {
Ok(events) => {
let entries: Vec<TaskOutputEntry> = events
.into_iter()
.filter_map(TaskOutputEntry::from_task_event)
.collect();
let total = entries.len();
Json(TaskOutputResponse {
entries,
total,
task_id: id,
})
.into_response()
}
Err(e) => {
tracing::error!("Failed to get task output: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// List subtasks for a parent task (scoped by owner).
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks/{id}/subtasks",
params(
("id" = Uuid, Path, description = "Parent task ID")
),
responses(
(status = 200, description = "List of subtasks", body = TaskListResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn list_subtasks(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await {
Ok(tasks) => {
let total = tasks.len() as i64;
Json(TaskListResponse { tasks, total }).into_response()
}
Err(e) => {
tracing::error!("Failed to list subtasks for task {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// List events for a task (scoped by owner).
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks/{id}/events",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "List of task events", body = TaskEventListResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn list_task_events(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Verify task exists and belongs to owner
match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(_)) => {}
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
}
match repository::list_task_events(pool, id, None).await {
Ok(events) => {
let total = events.len() as i64;
Json(TaskEventListResponse { events, total }).into_response()
}
Err(e) => {
tracing::error!("Failed to list events for task {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Retry completion action for a completed task (scoped by owner).
///
/// This allows retrying a completion action (push branch, merge, create PR)
/// after filling in the target_repo_path if it wasn't set when the task completed.
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/retry-completion",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "Completion action initiated"),
(status = 400, description = "Invalid request (task not completed, no completion action, etc.)", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured or daemon not connected", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn retry_completion_action(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task (scoped by owner)
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Check if task is in a terminal state
let terminal_statuses = ["done", "failed", "merged"];
if !terminal_statuses.contains(&task.status.as_str()) {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"INVALID_STATE",
format!(
"Task must be completed to retry completion action. Current status: {}",
task.status
),
)),
)
.into_response();
}
// Check if completion action is set
let action = match &task.completion_action {
Some(action) if action != "none" => action.clone(),
_ => {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"NO_COMPLETION_ACTION",
"Task has no completion action configured (or is set to 'none')",
)),
)
.into_response();
}
};
// Check if target_repo_path is set
let target_repo_path = match &task.target_repo_path {
Some(path) if !path.is_empty() => path.clone(),
_ => {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"NO_TARGET_REPO",
"Target repository path must be set before retrying completion action",
)),
)
.into_response();
}
};
// Note: We don't check overlay_path here because the server may not have it
// The daemon will scan its worktrees directory to find the worktree by task ID
// Find a daemon to execute the action (must belong to this owner)
// Prefer the daemon that ran the task, but fall back to any available daemon for this owner
let target_daemon_id = if let Some(daemon_id) = task.daemon_id {
// Check if this daemon is still connected and belongs to this owner
if state.daemon_connections.iter().any(|d| d.value().id == daemon_id && d.value().owner_id == auth.owner_id) {
daemon_id
} else {
// Fall back to any connected daemon for this owner
match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) {
Some(d) => d.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"NO_DAEMON",
"No daemons connected for your account. Cannot execute completion action.",
)),
)
.into_response();
}
}
}
} else {
// No daemon assigned - use any available for this owner
match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) {
Some(d) => d.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"NO_DAEMON",
"No daemons connected for your account. Cannot execute completion action.",
)),
)
.into_response();
}
}
};
// Send RetryCompletionAction command to daemon
let command = DaemonCommand::RetryCompletionAction {
task_id: id,
task_name: task.name.clone(),
action: action.clone(),
target_repo_path: target_repo_path.clone(),
target_branch: task.target_branch.clone(),
};
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
tracing::error!("Failed to send RetryCompletionAction command: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
tracing::info!(
task_id = %id,
action = %action,
target_repo = %target_repo_path,
"Retry completion action initiated"
);
(
StatusCode::OK,
Json(serde_json::json!({
"success": true,
"taskId": id,
"action": action,
"targetRepoPath": target_repo_path,
"message": "Completion action initiated. Check task output for results."
})),
)
.into_response()
}
// =============================================================================
// Daemon Handlers
// =============================================================================
/// List all connected daemons (requires authentication).
#[utoipa::path(
get,
path = "/api/v1/mesh/daemons",
responses(
(status = 200, description = "List of daemons", body = DaemonListResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn list_daemons(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Only list daemons belonging to this owner
match repository::list_daemons_for_owner(pool, auth.owner_id).await {
Ok(daemons) => {
let total = daemons.len() as i64;
Json(DaemonListResponse { daemons, total }).into_response()
}
Err(e) => {
tracing::error!("Failed to list daemons: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Get a single daemon by ID (requires authentication).
#[utoipa::path(
get,
path = "/api/v1/mesh/daemons/{id}",
params(
("id" = Uuid, Path, description = "Daemon ID")
),
responses(
(status = 200, description = "Daemon details", body = crate::db::models::Daemon),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Daemon not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn get_daemon(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Only get daemon if it belongs to this owner
match repository::get_daemon_for_owner(pool, id, auth.owner_id).await {
Ok(Some(daemon)) => Json(daemon).into_response(),
Ok(None) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Daemon not found")),
)
.into_response(),
Err(e) => {
tracing::error!("Failed to get daemon {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Get suggested directories from connected daemons (requires authentication).
///
/// Returns directories that can be used as target_repo_path for completion actions.
#[utoipa::path(
get,
path = "/api/v1/mesh/daemons/directories",
responses(
(status = 200, description = "List of suggested directories", body = DaemonDirectoriesResponse),
(status = 401, description = "Unauthorized", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn get_daemon_directories(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
) -> impl IntoResponse {
let mut directories = Vec::new();
// Iterate over connected daemons belonging to this owner and collect their directories
for entry in state.daemon_connections.iter() {
let daemon = entry.value();
// Only include daemons belonging to this owner
if daemon.owner_id != auth.owner_id {
continue;
}
// Add working directory if available
if let Some(ref working_dir) = daemon.working_directory {
directories.push(DaemonDirectory {
path: working_dir.clone(),
label: "Working Directory".to_string(),
directory_type: "working".to_string(),
hostname: daemon.hostname.clone(),
exists: None,
});
}
// Add home directory if available (for cloning completed work)
if let Some(ref home_dir) = daemon.home_directory {
directories.push(DaemonDirectory {
path: home_dir.clone(),
label: "Makima Home".to_string(),
directory_type: "home".to_string(),
hostname: daemon.hostname.clone(),
exists: None,
});
}
}
Json(DaemonDirectoriesResponse { directories })
}
/// Request to clone a worktree to a target directory.
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CloneWorktreeRequest {
/// Path to the target directory.
pub target_dir: String,
}
/// Clone a task's worktree to a target directory (scoped by owner).
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/clone",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = CloneWorktreeRequest,
responses(
(status = 200, description = "Clone command sent"),
(status = 400, description = "Invalid request or task not completed", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured or daemon not connected", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn clone_worktree(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
Json(body): Json<CloneWorktreeRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task (scoped by owner)
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Verify task is in a completed state
let is_completed = matches!(task.status.as_str(), "done" | "failed" | "merged");
if !is_completed {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"INVALID_STATE",
format!("Task must be completed to clone (current status: {})", task.status),
)),
)
.into_response();
}
// Find a connected daemon belonging to this owner to send the command
let daemon_entry = state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id);
let daemon_id = match daemon_entry {
Some(entry) => entry.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")),
)
.into_response();
}
};
// Send CloneWorktree command to daemon
let command = DaemonCommand::CloneWorktree {
task_id: id,
target_dir: body.target_dir.clone(),
};
if let Err(e) = state.send_daemon_command(daemon_id, command).await {
tracing::error!("Failed to send CloneWorktree command: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
Json(serde_json::json!({
"status": "cloning",
"taskId": id.to_string(),
"targetDir": body.target_dir,
}))
.into_response()
}
/// Request to check if a target directory exists.
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CheckTargetExistsRequest {
/// Path to check.
pub target_dir: String,
}
/// Response for check target exists.
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CheckTargetExistsResponse {
/// Whether the target directory exists.
pub exists: bool,
/// The path that was checked (expanded).
pub target_dir: String,
}
/// Check if a target directory exists (for clone validation, requires authentication).
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/check-target",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = CheckTargetExistsRequest,
responses(
(status = 200, description = "Check result", body = CheckTargetExistsResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 503, description = "No daemon connected", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn check_target_exists(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
Json(body): Json<CheckTargetExistsRequest>,
) -> impl IntoResponse {
// Find a connected daemon belonging to this owner to send the command
let daemon_entry = state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id);
let daemon_id = match daemon_entry {
Some(entry) => entry.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")),
)
.into_response();
}
};
// Send CheckTargetExists command to daemon
let command = DaemonCommand::CheckTargetExists {
task_id: id,
target_dir: body.target_dir.clone(),
};
if let Err(e) = state.send_daemon_command(daemon_id, command).await {
tracing::error!("Failed to send CheckTargetExists command: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
// The actual result will be sent back via WebSocket
// For now, just acknowledge the request was sent
Json(serde_json::json!({
"status": "checking",
"taskId": id.to_string(),
"targetDir": body.target_dir,
}))
.into_response()
}
// =============================================================================
// Task Reassignment (Daemon Failover)
// =============================================================================
/// Request to reassign a task to a new daemon after daemon disconnect.
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ReassignTaskRequest {
/// Target daemon ID to reassign to. If not provided, will select any available daemon.
pub target_daemon_id: Option<Uuid>,
/// Whether to include conversation context from previous run.
#[serde(default = "default_include_context")]
pub include_context: bool,
}
fn default_include_context() -> bool {
true
}
/// Response from task reassignment.
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ReassignTaskResponse {
/// The new task that was created.
pub task: Task,
/// The new daemon ID.
pub daemon_id: Uuid,
/// The ID of the old task that was deleted.
pub old_task_id: Uuid,
/// Whether conversation context was included.
pub context_included: bool,
/// Number of context entries from previous conversation.
pub context_entries: usize,
}
/// Build a conversation context summary from task output entries.
/// Returns a formatted string that can be prepended to the task plan.
fn build_conversation_context(entries: &[TaskOutputEntry]) -> String {
if entries.is_empty() {
return String::new();
}
let mut context = String::from("\n\n=== PREVIOUS CONVERSATION CONTEXT ===\n");
context.push_str("The daemon running this task disconnected. Here is what happened so far:\n\n");
for entry in entries.iter() {
match entry.message_type.as_str() {
"assistant" => {
context.push_str("Assistant: ");
// Truncate long messages
let content = if entry.content.len() > 500 {
format!("{}... [truncated]", &entry.content[..500])
} else {
entry.content.clone()
};
context.push_str(&content);
context.push_str("\n\n");
}
"tool_use" => {
if let Some(ref tool_name) = entry.tool_name {
context.push_str(&format!("[Used tool: {}]\n", tool_name));
}
}
"tool_result" => {
// Summarize tool results briefly
if entry.content.len() > 200 {
context.push_str(&format!("[Tool result: {}... truncated]\n", &entry.content[..200]));
} else if !entry.content.is_empty() {
context.push_str(&format!("[Tool result: {}]\n", entry.content));
}
}
"user" => {
context.push_str("User: ");
context.push_str(&entry.content);
context.push_str("\n\n");
}
_ => {}
}
}
context.push_str("=== END PREVIOUS CONTEXT ===\n\n");
context.push_str("Please continue from where the conversation left off. Do not repeat work that was already done.\n\n");
context
}
/// Reassign a task to a new daemon after the original daemon disconnected.
///
/// This endpoint is used for daemon failover - when a daemon restarts or disconnects,
/// the task can be reassigned to a new daemon with the conversation context preserved.
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/reassign",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = ReassignTaskRequest,
responses(
(status = 200, description = "Task reassigned successfully", body = ReassignTaskResponse),
(status = 400, description = "Task cannot be reassigned (not in failed/interrupted state)", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "No daemon available", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn reassign_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
Json(body): Json<ReassignTaskRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Check if task is in a state that can be reassigned
// Allow reassignment for: failed, interrupted, pending, or tasks whose daemon disconnected
// Helper closure to check if a daemon is connected by its UUID
let is_daemon_connected = |daemon_id: Uuid| {
state.daemon_connections.iter().any(|d| d.value().id == daemon_id)
};
let can_reassign = matches!(
task.status.as_str(),
"failed" | "interrupted" | "pending" | "starting"
) || {
// Also allow if daemon is not connected
if let Some(daemon_id) = task.daemon_id {
!is_daemon_connected(daemon_id)
} else {
true
}
};
if !can_reassign && task.status == "running" {
// Running task - check if its daemon is still connected
if let Some(daemon_id) = task.daemon_id {
if is_daemon_connected(daemon_id) {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"TASK_RUNNING",
"Task is running on a connected daemon. Stop it first to reassign.",
)),
)
.into_response();
}
}
}
// Find a target daemon
let target_daemon_id = if let Some(requested_daemon_id) = body.target_daemon_id {
// Verify the requested daemon is connected and belongs to the owner
let daemon = state.daemon_connections.iter().find(|d| d.value().id == requested_daemon_id);
if let Some(daemon) = daemon {
if daemon.owner_id != auth.owner_id {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new("INVALID_DAEMON", "Daemon does not belong to your account")),
)
.into_response();
}
requested_daemon_id
} else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DAEMON_NOT_CONNECTED", "Requested daemon is not connected")),
)
.into_response();
}
} else {
// Find any available daemon for this owner
match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) {
Some(entry) => entry.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")),
)
.into_response();
}
}
};
// Build conversation context if requested
let (context_str, context_entries) = if body.include_context {
match repository::get_task_output(pool, id, Some(500)).await {
Ok(events) => {
let entries: Vec<TaskOutputEntry> = events
.into_iter()
.filter_map(TaskOutputEntry::from_task_event)
.collect();
let context = build_conversation_context(&entries);
let count = entries.len();
(context, count)
}
Err(e) => {
tracing::warn!("Failed to get task output for context: {}", e);
(String::new(), 0)
}
}
} else {
(String::new(), 0)
};
// Build updated plan with context prepended
let updated_plan = if !context_str.is_empty() {
format!("{}{}", context_str, task.plan)
} else {
task.plan.clone()
};
// Create a NEW task with the conversation context
let create_req = CreateTaskRequest {
contract_id: task.contract_id.unwrap_or(Uuid::nil()),
name: format!("{} (resumed)", task.name),
description: task.description.clone(),
plan: updated_plan.clone(),
parent_task_id: task.parent_task_id,
is_supervisor: task.is_supervisor,
priority: task.priority,
repository_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
target_branch: task.target_branch.clone(),
merge_mode: task.merge_mode.clone(),
target_repo_path: task.target_repo_path.clone(),
completion_action: task.completion_action.clone(),
continue_from_task_id: Some(id), // Continue from the old task's worktree if possible
copy_files: None,
checkpoint_sha: task.last_checkpoint_sha.clone(),
};
let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
Ok(t) => t,
Err(e) => {
tracing::error!("Failed to create new task for reassignment: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Update new task to starting and assign daemon
let start_update = UpdateTaskRequest {
status: Some("starting".to_string()),
daemon_id: Some(target_daemon_id),
version: Some(new_task.version),
..Default::default()
};
let final_task = match repository::update_task_for_owner(pool, new_task.id, auth.owner_id, start_update).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "New task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to update new task daemon assignment: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Send SpawnTask command to daemon for the new task
let command = DaemonCommand::SpawnTask {
task_id: new_task.id,
task_name: final_task.name.clone(),
plan: updated_plan,
repo_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
target_branch: task.target_branch.clone(),
parent_task_id: task.parent_task_id,
depth: task.depth,
is_orchestrator: false, // New task starts fresh
target_repo_path: task.target_repo_path.clone(),
completion_action: task.completion_action.clone(),
continue_from_task_id: Some(id), // Continue from old task's worktree
copy_files: None,
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
};
tracing::info!(
old_task_id = %id,
new_task_id = %new_task.id,
new_daemon_id = %target_daemon_id,
context_entries = context_entries,
"Reassigning task: creating new task and deleting old one"
);
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
tracing::error!("Failed to send SpawnTask command for reassignment: {}", e);
// Rollback: delete the new task we created
let _ = repository::delete_task_for_owner(pool, new_task.id, auth.owner_id).await;
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
// Delete the old task now that the new one is spawned
let old_task_id = id;
if let Err(e) = repository::delete_task_for_owner(pool, old_task_id, auth.owner_id).await {
tracing::warn!("Failed to delete old task {}: {}", old_task_id, e);
// Don't fail the request, the new task is already running
}
// Notify the contract's supervisor about the reassignment (if applicable)
if let Some(contract_id) = task.contract_id {
if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
if let Some(supervisor_task_id) = contract.supervisor_task_id {
// Don't notify if we're reassigning the supervisor itself
if supervisor_task_id != old_task_id {
// Find the supervisor's daemon and send a message
if let Ok(Some(supervisor_task)) = repository::get_task_for_owner(pool, supervisor_task_id, auth.owner_id).await {
if supervisor_task.status == "running" {
if let Some(supervisor_daemon_id) = supervisor_task.daemon_id {
// Find the daemon by its UUID
if let Some(daemon_entry) = state.daemon_connections.iter().find(|d| d.value().id == supervisor_daemon_id) {
let notification_msg = format!(
"\n\n[SYSTEM NOTIFICATION] Task '{}' (ID: {}) was reassigned due to daemon disconnect. \
A new task '{}' (ID: {}) has been created to continue the work. \
The new task has {} context entries from the previous conversation.\n\n",
task.name,
old_task_id,
final_task.name,
new_task.id,
context_entries
);
let notify_cmd = DaemonCommand::SendMessage {
task_id: supervisor_task_id,
message: notification_msg,
};
if let Err(e) = state.send_daemon_command(daemon_entry.value().id, notify_cmd).await {
tracing::warn!(
supervisor_id = %supervisor_task_id,
error = %e,
"Failed to notify supervisor about task reassignment"
);
} else {
tracing::info!(
supervisor_id = %supervisor_task_id,
old_task_id = %old_task_id,
new_task_id = %new_task.id,
"Notified supervisor about task reassignment"
);
}
}
}
}
}
}
}
}
}
// Broadcast task update for the new task
state.broadcast_task_update(TaskUpdateNotification {
task_id: new_task.id,
owner_id: Some(auth.owner_id),
version: final_task.version,
status: "starting".to_string(),
updated_fields: vec!["status".to_string(), "daemon_id".to_string()],
updated_by: "reassignment".to_string(),
});
Json(ReassignTaskResponse {
task: final_task,
daemon_id: target_daemon_id,
old_task_id,
context_included: !context_str.is_empty(),
context_entries,
})
.into_response()
}
// =============================================================================
// Task Continue (Restart with Context)
// =============================================================================
/// Request to continue a task after daemon disconnect (restart in-place with context).
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ContinueTaskRequest {
/// Target daemon ID to continue on. If not provided, will select any available daemon.
pub target_daemon_id: Option<Uuid>,
}
/// Response from continuing a task.
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ContinueTaskResponse {
/// The continued task (same ID, updated plan with context).
pub task: Task,
/// The daemon ID running the task.
pub daemon_id: Uuid,
/// Number of context entries from previous conversation.
pub context_entries: usize,
}
/// Continue a task after daemon disconnect by restarting it with conversation context.
///
/// Unlike reassign, this keeps the same task ID and just restarts it with the
/// previous conversation context prepended to the plan. Useful for supervisors.
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/continue",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = ContinueTaskRequest,
responses(
(status = 200, description = "Task continued successfully", body = ContinueTaskResponse),
(status = 400, description = "Task cannot be continued", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "No daemon available", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn continue_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
Json(body): Json<ContinueTaskRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Helper closure to check if a daemon is connected by its UUID
let is_daemon_connected = |daemon_id: Uuid| {
state.daemon_connections.iter().any(|d| d.value().id == daemon_id)
};
// Check if task can be continued (not currently running on a connected daemon)
let can_continue = matches!(
task.status.as_str(),
"failed" | "interrupted" | "pending" | "starting" | "completed"
) || {
if let Some(daemon_id) = task.daemon_id {
!is_daemon_connected(daemon_id)
} else {
true
}
};
if !can_continue && task.status == "running" {
if let Some(daemon_id) = task.daemon_id {
if is_daemon_connected(daemon_id) {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"TASK_RUNNING",
"Task is running on a connected daemon. Stop it first to continue.",
)),
)
.into_response();
}
}
}
// Find a target daemon
let target_daemon_id = if let Some(requested_daemon_id) = body.target_daemon_id {
let daemon = state.daemon_connections.iter().find(|d| d.value().id == requested_daemon_id);
if let Some(daemon) = daemon {
if daemon.owner_id != auth.owner_id {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new("INVALID_DAEMON", "Daemon does not belong to your account")),
)
.into_response();
}
requested_daemon_id
} else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DAEMON_NOT_CONNECTED", "Requested daemon is not connected")),
)
.into_response();
}
} else {
match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) {
Some(entry) => entry.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")),
)
.into_response();
}
}
};
// Build conversation context from task output
let (context_str, context_entries) = match repository::get_task_output(pool, id, Some(500)).await {
Ok(events) => {
let entries: Vec<TaskOutputEntry> = events
.into_iter()
.filter_map(TaskOutputEntry::from_task_event)
.collect();
let context = build_conversation_context(&entries);
let count = entries.len();
(context, count)
}
Err(e) => {
tracing::warn!("Failed to get task output for context: {}", e);
(String::new(), 0)
}
};
// Build updated plan with context prepended
let updated_plan = if !context_str.is_empty() {
format!("{}{}", context_str, task.plan)
} else {
task.plan.clone()
};
// Update task in database: reset status, update plan with context, assign daemon
let update_req = UpdateTaskRequest {
status: Some("starting".to_string()),
plan: Some(updated_plan.clone()),
daemon_id: Some(target_daemon_id),
error_message: None,
..Default::default()
};
let updated_task = match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to update task for continuation: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Check if this is an orchestrator
let subtask_count = match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await {
Ok(subtasks) => subtasks.len(),
Err(_) => 0,
};
let is_orchestrator = task.depth == 0 && subtask_count > 0;
// Send SpawnTask command to daemon
let command = DaemonCommand::SpawnTask {
task_id: id,
task_name: task.name.clone(),
plan: updated_plan,
repo_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
target_branch: task.target_branch.clone(),
parent_task_id: task.parent_task_id,
depth: task.depth,
is_orchestrator,
target_repo_path: task.target_repo_path.clone(),
completion_action: task.completion_action.clone(),
continue_from_task_id: task.continue_from_task_id,
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
};
tracing::info!(
task_id = %id,
daemon_id = %target_daemon_id,
context_entries = context_entries,
is_supervisor = task.is_supervisor,
"Continuing task with conversation context"
);
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
tracing::error!("Failed to send SpawnTask command for continuation: {}", e);
// Rollback
let rollback_req = UpdateTaskRequest {
status: Some("failed".to_string()),
clear_daemon_id: true,
error_message: Some(format!("Continuation failed: {}", e)),
..Default::default()
};
let _ = repository::update_task_for_owner(pool, id, auth.owner_id, rollback_req).await;
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
// Broadcast task update
state.broadcast_task_update(TaskUpdateNotification {
task_id: id,
owner_id: Some(auth.owner_id),
version: updated_task.version,
status: "starting".to_string(),
updated_fields: vec!["status".to_string(), "daemon_id".to_string(), "plan".to_string()],
updated_by: "continuation".to_string(),
});
Json(ContinueTaskResponse {
task: updated_task,
daemon_id: target_daemon_id,
context_entries,
})
.into_response()
}