//! HTTP handlers for task and daemon mesh operations.
use axum::{
extract::{Path, State},
http::{HeaderMap, StatusCode},
response::IntoResponse,
Json,
};
use base64::Engine;
use std::time::Duration;
use tokio::sync::oneshot;
use uuid::Uuid;
use crate::db::models::{
BranchTaskRequest, BranchTaskResponse, CreateTaskRequest, DaemonDirectory,
DaemonDirectoriesResponse, DaemonListResponse, SendMessageRequest, Task,
TaskEventListResponse, TaskListResponse, TaskOutputEntry, TaskOutputResponse,
TaskWithSubtasks, UpdateTaskRequest,
};
use crate::db::repository::{self, RepositoryError};
use crate::server::auth::Authenticated;
use crate::server::messages::ApiError;
use crate::server::state::{DaemonCommand, DaemonReauthStatus, SharedState, TaskUpdateNotification};
// =============================================================================
// Authentication Types
// =============================================================================
/// Source of authentication for mesh endpoints.
#[derive(Debug, Clone)]
pub enum AuthSource {
/// Authenticated via tool key (orchestrator accessing API).
/// Contains the task ID that owns this key.
ToolKey(Uuid),
/// Authenticated via user token (web client).
/// Contains the user ID. (Not implemented yet)
#[allow(dead_code)]
UserToken(Uuid),
/// No authentication provided (anonymous access).
Anonymous,
}
/// Header name for tool key authentication.
pub const TOOL_KEY_HEADER: &str = "x-makima-tool-key";
/// Extract authentication source from request headers.
///
/// Checks for:
/// 1. `X-Makima-Tool-Key` header for orchestrator tool access
/// 2. `Authorization: Bearer` header for user access (future)
/// 3. Falls back to Anonymous if no auth provided
pub fn extract_auth(state: &SharedState, headers: &HeaderMap) -> AuthSource {
// Check for tool key header first
if let Some(tool_key) = headers.get(TOOL_KEY_HEADER) {
if let Ok(key_str) = tool_key.to_str() {
if let Some(task_id) = state.validate_tool_key(key_str) {
return AuthSource::ToolKey(task_id);
}
tracing::warn!("Invalid tool key provided: {}", key_str);
}
}
// Check for Authorization header (future user auth)
if let Some(auth_header) = headers.get("authorization") {
if let Ok(auth_str) = auth_header.to_str() {
if auth_str.starts_with("Bearer ") {
// Future: validate JWT and extract user ID
tracing::debug!("Bearer token auth not yet implemented");
}
}
}
// Default to anonymous
AuthSource::Anonymous
}
// =============================================================================
// Task Handlers
// =============================================================================
/// List all tasks for the current owner.
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks",
responses(
(status = 200, description = "List of tasks", body = TaskListResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn list_tasks(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
match repository::list_tasks_for_owner(pool, auth.owner_id).await {
Ok(tasks) => {
let total = tasks.len() as i64;
Json(TaskListResponse { tasks, total }).into_response()
}
Err(e) => {
tracing::error!("Failed to list tasks: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Get a single task by ID with its subtasks (scoped by owner).
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks/{id}",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "Task details with subtasks", body = TaskWithSubtasks),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn get_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(task)) => {
// Get subtasks for this task (also scoped by owner)
match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await {
Ok(subtasks) => Json(TaskWithSubtasks { task, subtasks }).into_response(),
Err(e) => {
tracing::error!("Failed to get subtasks for task {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response(),
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Create a new task.
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks",
request_body = CreateTaskRequest,
responses(
(status = 201, description = "Task created", body = Task),
(status = 400, description = "Invalid request", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn create_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Json(req): Json<CreateTaskRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
match repository::create_task_for_owner(pool, auth.owner_id, req).await {
Ok(task) => {
// Record history event for task creation
let _ = repository::record_history_event(
pool,
auth.owner_id,
task.contract_id,
Some(task.id),
"task",
Some("created"),
None,
serde_json::json!({
"name": &task.name,
"isSupervisor": task.is_supervisor,
}),
).await;
// Notify supervisor of new task creation if task belongs to a contract
if let Some(contract_id) = task.contract_id {
if !task.is_supervisor {
let pool = pool.clone();
let state_clone = state.clone();
let task_clone = task.clone();
tokio::spawn(async move {
if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await {
state_clone.notify_supervisor_of_task_created(
supervisor.id,
supervisor.daemon_id,
task_clone.id,
&task_clone.name,
).await;
}
});
}
}
(StatusCode::CREATED, Json(task)).into_response()
}
Err(e) => {
tracing::error!("Failed to create task: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Update an existing task (scoped by owner).
#[utoipa::path(
put,
path = "/api/v1/mesh/tasks/{id}",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = UpdateTaskRequest,
responses(
(status = 200, description = "Task updated", body = Task),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 409, description = "Version conflict", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn update_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
Json(req): Json<UpdateTaskRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Check if trying to set a supervisor task to a terminal status
if let Some(ref new_status) = req.status {
let terminal_statuses = ["done", "failed", "merged"];
if terminal_statuses.contains(&new_status.as_str()) {
// Get the task to check if it's a supervisor
if let Ok(Some(task)) = repository::get_task_for_owner(pool, id, auth.owner_id).await {
if task.is_supervisor {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"SUPERVISOR_CANNOT_COMPLETE",
"Supervisor tasks cannot be marked as done, failed, or merged. They run for the lifetime of the contract.",
)),
)
.into_response();
}
}
}
}
// Track which fields are being updated for the notification
let mut updated_fields = Vec::new();
if req.name.is_some() {
updated_fields.push("name".to_string());
}
if req.description.is_some() {
updated_fields.push("description".to_string());
}
if req.status.is_some() {
updated_fields.push("status".to_string());
}
if req.priority.is_some() {
updated_fields.push("priority".to_string());
}
if req.plan.is_some() {
updated_fields.push("plan".to_string());
}
if req.progress_summary.is_some() {
updated_fields.push("progress_summary".to_string());
}
if req.error_message.is_some() {
updated_fields.push("error_message".to_string());
}
match repository::update_task_for_owner(pool, id, auth.owner_id, req).await {
Ok(Some(task)) => {
let updated_fields_clone = updated_fields.clone();
// Broadcast task update notification
state.broadcast_task_update(TaskUpdateNotification {
task_id: task.id,
owner_id: Some(auth.owner_id),
version: task.version,
status: task.status.clone(),
updated_fields,
updated_by: "user".to_string(),
});
// Notify supervisor of status change if task belongs to a contract
if let Some(contract_id) = task.contract_id {
if !task.is_supervisor && updated_fields_clone.contains(&"status".to_string()) {
let pool = pool.clone();
let state_clone = state.clone();
let task_clone = task.clone();
tokio::spawn(async move {
if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await {
state_clone.notify_supervisor_of_task_update(
supervisor.id,
supervisor.daemon_id,
task_clone.id,
&task_clone.name,
&task_clone.status,
&updated_fields_clone,
).await;
}
});
}
}
Json(task).into_response()
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response(),
Err(RepositoryError::VersionConflict { expected, actual }) => {
tracing::info!(
"Version conflict on task {}: expected {}, actual {}",
id,
expected,
actual
);
(
StatusCode::CONFLICT,
Json(serde_json::json!({
"code": "VERSION_CONFLICT",
"message": format!(
"Task was modified by another user. Expected version {}, actual version {}",
expected, actual
),
"expectedVersion": expected,
"actualVersion": actual,
})),
)
.into_response()
}
Err(RepositoryError::Database(e)) => {
tracing::error!("Failed to update task {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Delete a task (scoped by owner).
#[utoipa::path(
delete,
path = "/api/v1/mesh/tasks/{id}",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 204, description = "Task deleted"),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn delete_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task first to check if it's running and needs to be stopped
if let Ok(Some(task)) = repository::get_task_for_owner(pool, id, auth.owner_id).await {
let is_active = matches!(
task.status.as_str(),
"running" | "starting" | "initializing" | "paused"
);
// If task is active and has a daemon, send interrupt command
if is_active {
if let Some(daemon_id) = task.daemon_id {
let command = DaemonCommand::InterruptTask {
task_id: id,
graceful: false,
};
if let Err(e) = state.send_daemon_command(daemon_id, command).await {
tracing::warn!(
task_id = %id,
daemon_id = %daemon_id,
"Failed to send InterruptTask before delete: {}",
e
);
} else {
tracing::info!(
task_id = %id,
daemon_id = %daemon_id,
"Sent InterruptTask before delete"
);
}
}
}
}
// Clean up any pending supervisor questions for this task
state.remove_pending_questions_for_task(id);
match repository::delete_task_for_owner(pool, id, auth.owner_id).await {
Ok(true) => StatusCode::NO_CONTENT.into_response(),
Ok(false) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response(),
Err(e) => {
tracing::error!("Failed to delete task {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Start a task by sending it to an available daemon (scoped by owner).
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/start",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "Task started", body = Task),
(status = 400, description = "Task cannot be started", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured or no daemons available", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn start_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
headers: HeaderMap,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
// Extract authentication to log who is starting the task
let legacy_auth = extract_auth(&state, &headers);
match &legacy_auth {
AuthSource::ToolKey(orchestrator_id) => {
tracing::info!(
task_id = %id,
orchestrator_task_id = %orchestrator_id,
owner_id = %auth.owner_id,
"Orchestrator starting subtask via tool key"
);
}
AuthSource::Anonymous => {
tracing::info!(
task_id = %id,
owner_id = %auth.owner_id,
"Starting task (user request)"
);
}
AuthSource::UserToken(user_id) => {
tracing::info!(
task_id = %id,
user_id = %user_id,
owner_id = %auth.owner_id,
"Starting task via user token"
);
}
}
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task (scoped by owner)
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Check if task can be started (allow pending, failed, interrupted, done, or merged)
let startable_statuses = ["pending", "failed", "interrupted", "done", "merged"];
if !startable_statuses.contains(&task.status.as_str()) {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"INVALID_STATE",
format!("Task cannot be started from status: {}", task.status),
)),
)
.into_response();
}
// Get local_only and auto_merge_local flags from contract if task has one
let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id {
match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local),
_ => (false, false),
}
} else {
(false, false)
};
// Get list of daemons that have previously failed this task
let mut exclude_daemon_ids: Vec<Uuid> = task.failed_daemon_ids.clone().unwrap_or_default();
// Find an available daemon belonging to this owner, excluding failed ones
let target_daemon_id = match state.find_alternative_daemon(auth.owner_id, &exclude_daemon_ids) {
Some(id) => id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"NO_DAEMON",
"No daemons connected for your account. Cannot start task.",
)),
)
.into_response();
}
};
// Check if this is an orchestrator (depth 0 with subtasks)
let subtask_count = match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await {
Ok(subtasks) => {
tracing::info!(
task_id = %id,
subtask_count = subtasks.len(),
subtask_ids = ?subtasks.iter().map(|s| s.id.to_string()).collect::<Vec<_>>(),
"Counted subtasks for orchestrator check"
);
subtasks.len()
},
Err(e) => {
tracing::warn!("Failed to check subtasks for {}: {}", id, e);
0
}
};
let is_orchestrator = task.depth == 0 && subtask_count > 0;
tracing::info!(
task_id = %id,
task_depth = task.depth,
subtask_count = subtask_count,
is_orchestrator = is_orchestrator,
is_supervisor = task.is_supervisor,
"Starting task with orchestrator/supervisor determination"
);
// IMPORTANT: Update database FIRST to assign daemon_id before sending command
// This prevents race conditions where the task starts but daemon_id is not set
let update_req = UpdateTaskRequest {
status: Some("starting".to_string()),
daemon_id: Some(target_daemon_id),
version: Some(task.version),
..Default::default()
};
let updated_task = match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to update task status: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Send SpawnTask command to daemon
let command = DaemonCommand::SpawnTask {
task_id: id,
task_name: task.name.clone(),
plan: task.plan.clone(),
repo_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
target_branch: task.target_branch.clone(),
parent_task_id: task.parent_task_id,
depth: task.depth,
is_orchestrator,
target_repo_path: task.target_repo_path.clone(),
completion_action: task.completion_action.clone(),
continue_from_task_id: task.continue_from_task_id,
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
autonomous_loop: false,
resume_session: false,
conversation_history: None,
patch_data: None,
patch_base_sha: None,
local_only,
auto_merge_local,
supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: task.directive_id,
};
tracing::info!(
task_id = %id,
is_supervisor = task.is_supervisor,
is_orchestrator = is_orchestrator,
daemon_id = %target_daemon_id,
"Sending SpawnTask command to daemon"
);
if let Err(e) = state.send_daemon_command(target_daemon_id, command.clone()).await {
tracing::warn!(
task_id = %id,
daemon_id = %target_daemon_id,
error = %e,
"Failed to send SpawnTask command, trying alternative daemon"
);
// Add this daemon to exclude list and try another
exclude_daemon_ids.push(target_daemon_id);
// Try to find an alternative daemon
if let Some(alt_daemon_id) = state.find_alternative_daemon(auth.owner_id, &exclude_daemon_ids) {
// Update task with new daemon
let alt_update_req = UpdateTaskRequest {
daemon_id: Some(alt_daemon_id),
..Default::default()
};
if let Ok(Some(alt_updated_task)) = repository::update_task_for_owner(pool, id, auth.owner_id, alt_update_req).await {
// Recreate command with same data but try new daemon
let alt_command = DaemonCommand::SpawnTask {
task_id: id,
task_name: task.name.clone(),
plan: task.plan.clone(),
repo_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
target_branch: task.target_branch.clone(),
parent_task_id: task.parent_task_id,
depth: task.depth,
is_orchestrator,
target_repo_path: task.target_repo_path.clone(),
completion_action: task.completion_action.clone(),
continue_from_task_id: task.continue_from_task_id,
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
autonomous_loop: false,
resume_session: false,
conversation_history: None,
patch_data: None,
patch_base_sha: None,
local_only,
auto_merge_local,
supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: task.directive_id,
};
if state.send_daemon_command(alt_daemon_id, alt_command).await.is_ok() {
tracing::info!(
task_id = %id,
old_daemon_id = %target_daemon_id,
new_daemon_id = %alt_daemon_id,
"Task started on alternative daemon after first daemon failed"
);
// Broadcast task update notification
state.broadcast_task_update(TaskUpdateNotification {
task_id: id,
owner_id: Some(auth.owner_id),
version: alt_updated_task.version,
status: "starting".to_string(),
updated_fields: vec!["status".to_string(), "daemon_id".to_string()],
updated_by: "system".to_string(),
});
return Json(alt_updated_task).into_response();
}
}
}
// All daemons failed - rollback and return error
tracing::error!("Failed to start task on any daemon: {}", e);
let rollback_req = UpdateTaskRequest {
status: Some("pending".to_string()),
clear_daemon_id: true,
..Default::default()
};
let _ = repository::update_task_for_owner(pool, id, auth.owner_id, rollback_req).await;
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", "Failed to start task on any available daemon")),
)
.into_response();
}
// Broadcast task update notification
state.broadcast_task_update(TaskUpdateNotification {
task_id: id,
owner_id: Some(auth.owner_id),
version: updated_task.version,
status: "starting".to_string(),
updated_fields: vec!["status".to_string(), "daemon_id".to_string()],
updated_by: "system".to_string(),
});
Json(updated_task).into_response()
}
/// Stop a running task (scoped by owner).
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/stop",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "Task stopped", body = Task),
(status = 400, description = "Task is not running", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured or daemon not connected", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn stop_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task (scoped by owner)
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Check if task is running/active
let is_active = matches!(
task.status.as_str(),
"running" | "starting" | "initializing" | "paused"
);
if !is_active {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"INVALID_STATE",
format!("Task cannot be stopped from status: {}", task.status),
)),
)
.into_response();
}
// Find the daemon running this task
let target_daemon_id = if let Some(daemon_id) = task.daemon_id {
daemon_id
} else {
// No daemon assigned, just update status directly
let update_req = UpdateTaskRequest {
status: Some("failed".to_string()),
error_message: Some("Task stopped by user".to_string()),
version: Some(task.version),
..Default::default()
};
return match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
Ok(Some(updated_task)) => {
state.broadcast_task_update(TaskUpdateNotification {
task_id: id,
owner_id: Some(auth.owner_id),
version: updated_task.version,
status: "failed".to_string(),
updated_fields: vec!["status".to_string(), "error_message".to_string()],
updated_by: "user".to_string(),
});
Json(updated_task).into_response()
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response(),
Err(e) => {
tracing::error!("Failed to update task status: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
};
};
// Send InterruptTask command to daemon
let command = DaemonCommand::InterruptTask {
task_id: id,
graceful: false,
};
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
tracing::warn!("Failed to send InterruptTask command: {}", e);
// Daemon might be disconnected - update task status directly
let update_req = UpdateTaskRequest {
status: Some("failed".to_string()),
error_message: Some("Task stopped by user (daemon unavailable)".to_string()),
version: Some(task.version),
..Default::default()
};
return match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
Ok(Some(updated_task)) => {
state.broadcast_task_update(TaskUpdateNotification {
task_id: id,
owner_id: Some(auth.owner_id),
version: updated_task.version,
status: "failed".to_string(),
updated_fields: vec!["status".to_string(), "error_message".to_string()],
updated_by: "user".to_string(),
});
Json(updated_task).into_response()
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response(),
Err(e) => {
tracing::error!("Failed to update task status: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
};
}
// Update task status to "failed" (stopped)
let update_req = UpdateTaskRequest {
status: Some("failed".to_string()),
error_message: Some("Task stopped by user".to_string()),
version: Some(task.version),
..Default::default()
};
match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
Ok(Some(updated_task)) => {
// Broadcast task update notification
state.broadcast_task_update(TaskUpdateNotification {
task_id: id,
owner_id: Some(auth.owner_id),
version: updated_task.version,
status: "failed".to_string(),
updated_fields: vec!["status".to_string(), "error_message".to_string()],
updated_by: "user".to_string(),
});
Json(updated_task).into_response()
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response(),
Err(e) => {
tracing::error!("Failed to update task status: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Send a message to a running task's stdin (scoped by owner).
///
/// This can be used to provide input to Claude Code when it's waiting for user input,
/// or to inject context/instructions into a running task.
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/message",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = SendMessageRequest,
responses(
(status = 200, description = "Message sent successfully"),
(status = 400, description = "Task is not running", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured or daemon not connected", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn send_message(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
Json(req): Json<SendMessageRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task (scoped by owner)
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Check if task is in a state that can receive messages
// Allow "running" and "starting" (to handle race between status update and message send)
// Also allow AUTH_CODE messages and supervisor tasks regardless of status
let is_auth_code = req.message.starts_with("AUTH_CODE:");
let is_supervisor = task.is_supervisor;
let can_receive_message = task.status == "running" || task.status == "starting";
if !can_receive_message && !is_auth_code && !is_supervisor {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"INVALID_STATE",
format!(
"Cannot send message to task in status: {}. Task must be running or starting.",
task.status
),
)),
)
.into_response();
}
// Find the daemon running this task
// For supervisors, if no daemon is assigned, find any available daemon for this owner
let target_daemon_id = if let Some(daemon_id) = task.daemon_id {
daemon_id
} else if is_supervisor {
// Supervisor without daemon - find one
match state.daemon_connections
.iter()
.find(|d| d.value().owner_id == auth.owner_id)
{
Some(entry) => entry.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"NO_DAEMON",
"No daemon available. Please start a daemon.",
)),
)
.into_response();
}
}
} else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"NO_DAEMON",
"Task has no assigned daemon. Cannot send message.",
)),
)
.into_response();
};
// Check if daemon is connected before trying to send
if !state.is_daemon_connected(target_daemon_id) {
tracing::warn!(
task_id = %id,
daemon_id = %target_daemon_id,
"Daemon not connected, attempting to reallocate task"
);
// Get list of failed daemons for this task
let mut exclude_daemons = task.failed_daemon_ids.clone().unwrap_or_default();
exclude_daemons.push(target_daemon_id);
// Try to find an alternative daemon
if let Some(new_daemon_id) = state.find_alternative_daemon(auth.owner_id, &exclude_daemons) {
// Mark the task for retry and update with new daemon
if let Some(ref pool) = state.db_pool {
// Mark the old daemon as failed for this task
let _ = repository::mark_task_for_retry(pool, id, target_daemon_id).await;
// Update task with new daemon and restart
let update_req = UpdateTaskRequest {
status: Some("starting".to_string()),
daemon_id: Some(new_daemon_id),
..Default::default()
};
if let Ok(Some(updated_task)) = repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
// Get local_only and auto_merge_local from contract if task has one
let (local_only, auto_merge_local) = if let Some(contract_id) = updated_task.contract_id {
match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local),
_ => (false, false),
}
} else {
(false, false)
};
// Send spawn command to new daemon
let spawn_cmd = DaemonCommand::SpawnTask {
task_id: id,
task_name: updated_task.name.clone(),
plan: updated_task.plan.clone(),
repo_url: updated_task.repository_url.clone(),
base_branch: updated_task.base_branch.clone(),
target_branch: updated_task.target_branch.clone(),
parent_task_id: updated_task.parent_task_id,
depth: updated_task.depth,
is_orchestrator: false,
target_repo_path: updated_task.target_repo_path.clone(),
completion_action: updated_task.completion_action.clone(),
continue_from_task_id: updated_task.continue_from_task_id,
copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: updated_task.contract_id,
is_supervisor: updated_task.is_supervisor,
autonomous_loop: false,
resume_session: false,
conversation_history: None,
patch_data: None,
patch_base_sha: None,
local_only,
auto_merge_local,
supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: updated_task.directive_id,
};
if state.send_daemon_command(new_daemon_id, spawn_cmd).await.is_ok() {
tracing::info!(
task_id = %id,
old_daemon_id = %target_daemon_id,
new_daemon_id = %new_daemon_id,
"Task reallocated to new daemon, will restart"
);
return (
StatusCode::ACCEPTED,
Json(serde_json::json!({
"success": true,
"reallocated": true,
"taskId": id,
"message": "Task was reallocated to a new daemon and is restarting. Please retry your message shortly."
})),
).into_response();
}
}
}
}
// Could not reallocate - return error with helpful message
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"DAEMON_DISCONNECTED",
"The daemon running this task has disconnected and no alternative daemon is available. Please start a daemon.",
)),
).into_response();
}
// Send SendMessage command to daemon
let command = DaemonCommand::SendMessage {
task_id: id,
message: req.message.clone(),
};
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
tracing::error!("Failed to send SendMessage command: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
tracing::info!(task_id = %id, message_len = req.message.len(), "Message sent to task");
// Return success
(
StatusCode::OK,
Json(serde_json::json!({
"success": true,
"taskId": id,
"messageLength": req.message.len()
})),
)
.into_response()
}
/// Get task output history (scoped by owner).
///
/// Retrieves all recorded output from a task's Claude Code process.
/// This allows the frontend to fetch missed output when subscribing late
/// or reconnecting after a disconnect.
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks/{id}/output",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "Task output history", body = TaskOutputResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn get_task_output(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Verify task exists and belongs to owner
match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(_)) => {}
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
}
// Get output history (task already verified to belong to owner)
match repository::get_task_output(pool, id, None).await {
Ok(events) => {
let entries: Vec<TaskOutputEntry> = events
.into_iter()
.filter_map(TaskOutputEntry::from_task_event)
.collect();
let total = entries.len();
Json(TaskOutputResponse {
entries,
total,
task_id: id,
})
.into_response()
}
Err(e) => {
tracing::error!("Failed to get task output: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// List subtasks for a parent task (scoped by owner).
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks/{id}/subtasks",
params(
("id" = Uuid, Path, description = "Parent task ID")
),
responses(
(status = 200, description = "List of subtasks", body = TaskListResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn list_subtasks(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await {
Ok(tasks) => {
let total = tasks.len() as i64;
Json(TaskListResponse { tasks, total }).into_response()
}
Err(e) => {
tracing::error!("Failed to list subtasks for task {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// List events for a task (scoped by owner).
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks/{id}/events",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "List of task events", body = TaskEventListResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn list_task_events(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Verify task exists and belongs to owner
match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(_)) => {}
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
}
match repository::list_task_events(pool, id, None).await {
Ok(events) => {
let total = events.len() as i64;
Json(TaskEventListResponse { events, total }).into_response()
}
Err(e) => {
tracing::error!("Failed to list events for task {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Retry completion action for a completed task (scoped by owner).
///
/// This allows retrying a completion action (push branch, merge, create PR)
/// after filling in the target_repo_path if it wasn't set when the task completed.
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/retry-completion",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "Completion action initiated"),
(status = 400, description = "Invalid request (task not completed, no completion action, etc.)", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured or daemon not connected", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn retry_completion_action(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task (scoped by owner)
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Check if task is in a terminal state
let terminal_statuses = ["done", "failed", "merged"];
if !terminal_statuses.contains(&task.status.as_str()) {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"INVALID_STATE",
format!(
"Task must be completed to retry completion action. Current status: {}",
task.status
),
)),
)
.into_response();
}
// Check if completion action is set
let action = match &task.completion_action {
Some(action) if action != "none" => action.clone(),
_ => {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"NO_COMPLETION_ACTION",
"Task has no completion action configured (or is set to 'none')",
)),
)
.into_response();
}
};
// Check if target_repo_path is set
let target_repo_path = match &task.target_repo_path {
Some(path) if !path.is_empty() => path.clone(),
_ => {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"NO_TARGET_REPO",
"Target repository path must be set before retrying completion action",
)),
)
.into_response();
}
};
// Note: We don't check overlay_path here because the server may not have it
// The daemon will scan its worktrees directory to find the worktree by task ID
// Find a daemon to execute the action (must belong to this owner)
// Prefer the daemon that ran the task, but fall back to any available daemon for this owner
let target_daemon_id = if let Some(daemon_id) = task.daemon_id {
// Check if this daemon is still connected and belongs to this owner
if state.daemon_connections.iter().any(|d| d.value().id == daemon_id && d.value().owner_id == auth.owner_id) {
daemon_id
} else {
// Fall back to any connected daemon for this owner
match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) {
Some(d) => d.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"NO_DAEMON",
"No daemons connected for your account. Cannot execute completion action.",
)),
)
.into_response();
}
}
}
} else {
// No daemon assigned - use any available for this owner
match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) {
Some(d) => d.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"NO_DAEMON",
"No daemons connected for your account. Cannot execute completion action.",
)),
)
.into_response();
}
}
};
// Send RetryCompletionAction command to daemon
let command = DaemonCommand::RetryCompletionAction {
task_id: id,
task_name: task.name.clone(),
action: action.clone(),
target_repo_path: target_repo_path.clone(),
target_branch: task.target_branch.clone(),
};
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
tracing::error!("Failed to send RetryCompletionAction command: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
tracing::info!(
task_id = %id,
action = %action,
target_repo = %target_repo_path,
"Retry completion action initiated"
);
(
StatusCode::OK,
Json(serde_json::json!({
"success": true,
"taskId": id,
"action": action,
"targetRepoPath": target_repo_path,
"message": "Completion action initiated. Check task output for results."
})),
)
.into_response()
}
// =============================================================================
// Daemon Handlers
// =============================================================================
/// List all connected daemons (requires authentication).
#[utoipa::path(
get,
path = "/api/v1/mesh/daemons",
responses(
(status = 200, description = "List of daemons", body = DaemonListResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn list_daemons(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Only list daemons belonging to this owner
match repository::list_daemons_for_owner(pool, auth.owner_id).await {
Ok(daemons) => {
let total = daemons.len() as i64;
Json(DaemonListResponse { daemons, total }).into_response()
}
Err(e) => {
tracing::error!("Failed to list daemons: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Get a single daemon by ID (requires authentication).
#[utoipa::path(
get,
path = "/api/v1/mesh/daemons/{id}",
params(
("id" = Uuid, Path, description = "Daemon ID")
),
responses(
(status = 200, description = "Daemon details", body = crate::db::models::Daemon),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Daemon not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn get_daemon(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Only get daemon if it belongs to this owner
match repository::get_daemon_for_owner(pool, id, auth.owner_id).await {
Ok(Some(daemon)) => Json(daemon).into_response(),
Ok(None) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Daemon not found")),
)
.into_response(),
Err(e) => {
tracing::error!("Failed to get daemon {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Get suggested directories from connected daemons (requires authentication).
///
/// Returns directories that can be used as target_repo_path for completion actions.
#[utoipa::path(
get,
path = "/api/v1/mesh/daemons/directories",
responses(
(status = 200, description = "List of suggested directories", body = DaemonDirectoriesResponse),
(status = 401, description = "Unauthorized", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn get_daemon_directories(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
) -> impl IntoResponse {
let mut directories = Vec::new();
// Iterate over connected daemons belonging to this owner and collect their directories
for entry in state.daemon_connections.iter() {
let daemon = entry.value();
// Only include daemons belonging to this owner
if daemon.owner_id != auth.owner_id {
continue;
}
// Add working directory if available
if let Some(ref working_dir) = daemon.working_directory {
directories.push(DaemonDirectory {
path: working_dir.clone(),
label: "Working Directory".to_string(),
directory_type: "working".to_string(),
hostname: daemon.hostname.clone(),
exists: None,
});
}
// Add home directory if available (for cloning completed work)
if let Some(ref home_dir) = daemon.home_directory {
directories.push(DaemonDirectory {
path: home_dir.clone(),
label: "Makima Home".to_string(),
directory_type: "home".to_string(),
hostname: daemon.hostname.clone(),
exists: None,
});
}
}
Json(DaemonDirectoriesResponse { directories })
}
/// Request to clone a worktree to a target directory.
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CloneWorktreeRequest {
/// Path to the target directory.
pub target_dir: String,
}
/// Clone a task's worktree to a target directory (scoped by owner).
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/clone",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = CloneWorktreeRequest,
responses(
(status = 200, description = "Clone command sent"),
(status = 400, description = "Invalid request or task not completed", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured or daemon not connected", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn clone_worktree(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
Json(body): Json<CloneWorktreeRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task (scoped by owner)
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Verify task is in a completed state
let is_completed = matches!(task.status.as_str(), "done" | "failed" | "merged");
if !is_completed {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"INVALID_STATE",
format!("Task must be completed to clone (current status: {})", task.status),
)),
)
.into_response();
}
// Find a connected daemon belonging to this owner to send the command
let daemon_entry = state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id);
let daemon_id = match daemon_entry {
Some(entry) => entry.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")),
)
.into_response();
}
};
// Send CloneWorktree command to daemon
let command = DaemonCommand::CloneWorktree {
task_id: id,
target_dir: body.target_dir.clone(),
};
if let Err(e) = state.send_daemon_command(daemon_id, command).await {
tracing::error!("Failed to send CloneWorktree command: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
Json(serde_json::json!({
"status": "cloning",
"taskId": id.to_string(),
"targetDir": body.target_dir,
}))
.into_response()
}
// =============================================================================
// Worktree Info
// =============================================================================
/// Response for worktree info.
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct WorktreeInfoResponse {
/// Task ID.
pub task_id: Uuid,
/// Path to the worktree directory.
pub worktree_path: Option<String>,
/// Whether the worktree exists.
pub exists: bool,
/// Aggregate statistics.
pub stats: WorktreeStats,
/// Changed files list.
pub files: Vec<WorktreeFile>,
/// Current branch name.
pub branch: Option<String>,
/// Current HEAD commit SHA.
pub head_sha: Option<String>,
}
/// Statistics about worktree changes.
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct WorktreeStats {
/// Number of files changed.
pub files_changed: i32,
/// Total lines inserted.
pub insertions: i32,
/// Total lines deleted.
pub deletions: i32,
}
/// Information about a changed file in the worktree.
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct WorktreeFile {
/// File path relative to worktree root.
pub path: String,
/// Git status code (M, A, D, R, C, U, ?).
pub status: String,
/// Lines added.
pub lines_added: i32,
/// Lines removed.
pub lines_removed: i32,
}
/// Get worktree information for a task (files, stats, branch info).
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks/{id}/worktree-info",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "Worktree info", body = WorktreeInfoResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured or daemon not connected", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn get_worktree_info(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task (scoped by owner)
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Get daemon running the task
let Some(daemon_id) = task.daemon_id else {
// Task has no daemon, return empty worktree info
return Json(WorktreeInfoResponse {
task_id: id,
worktree_path: None,
exists: false,
stats: WorktreeStats {
files_changed: 0,
insertions: 0,
deletions: 0,
},
files: vec![],
branch: None,
head_sha: None,
})
.into_response();
};
// Create oneshot channel for response
let (tx, rx) = oneshot::channel();
// Store the sender for the daemon message handler to use
state.pending_worktree_info.insert(id, tx);
// Send GetWorktreeInfo command to daemon
let command = DaemonCommand::GetWorktreeInfo { task_id: id };
if let Err(e) = state.send_daemon_command(daemon_id, command).await {
// Clean up pending request on error
state.pending_worktree_info.remove(&id);
tracing::error!("Failed to send GetWorktreeInfo command: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
// Wait for daemon response with timeout
match tokio::time::timeout(Duration::from_secs(10), rx).await {
Ok(Ok(response)) => {
// Convert internal response to API response
let files: Vec<WorktreeFile> = response.files
.and_then(|f| f.as_array().cloned())
.unwrap_or_default()
.into_iter()
.filter_map(|v| {
Some(WorktreeFile {
path: v.get("path")?.as_str()?.to_string(),
status: v.get("status")?.as_str()?.to_string(),
lines_added: v.get("linesAdded")?.as_i64()? as i32,
lines_removed: v.get("linesRemoved")?.as_i64()? as i32,
})
})
.collect();
Json(WorktreeInfoResponse {
task_id: id,
worktree_path: response.worktree_path,
exists: response.exists,
stats: WorktreeStats {
files_changed: response.files_changed,
insertions: response.insertions,
deletions: response.deletions,
},
files,
branch: response.branch,
head_sha: response.head_sha,
})
.into_response()
}
Ok(Err(_)) => {
// Channel was dropped (sender side closed)
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_DISCONNECTED", "Daemon disconnected before responding")),
)
.into_response()
}
Err(_) => {
// Timeout - clean up pending request
state.pending_worktree_info.remove(&id);
(
StatusCode::GATEWAY_TIMEOUT,
Json(ApiError::new("TIMEOUT", "Daemon did not respond in time")),
)
.into_response()
}
}
}
// =============================================================================
// Task Patches
// =============================================================================
/// Query parameters for listing task patches
#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ListPatchesQuery {
/// Contract ID to scope the patches
pub contract_id: Uuid,
}
/// Patch summary for API response
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct PatchSummary {
/// Patch ID
pub id: Uuid,
/// Patch name (generated from checkpoint message or timestamp)
pub name: String,
/// Description (checkpoint message)
pub description: Option<String>,
/// Task ID
pub task_id: Uuid,
/// Contract ID
pub contract_id: Uuid,
/// Number of files in the patch
pub files_count: i32,
/// Total lines added (estimated from patch size)
pub lines_added: i32,
/// Total lines removed (estimated from patch size)
pub lines_removed: i32,
/// List of file paths in the patch (if available from checkpoint)
pub files: Option<Vec<String>>,
/// When the patch was created
pub created_at: chrono::DateTime<chrono::Utc>,
/// When the patch was last updated
pub updated_at: chrono::DateTime<chrono::Utc>,
}
/// List patches (checkpoint patches) for a task.
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks/{id}/patches",
params(
("id" = Uuid, Path, description = "Task ID"),
("contractId" = Uuid, Query, description = "Contract ID")
),
responses(
(status = 200, description = "List of patches", body = Vec<PatchSummary>),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn list_task_patches(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
axum::extract::Query(query): axum::extract::Query<ListPatchesQuery>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task (scoped by owner)
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Verify task belongs to the specified contract
if task.contract_id != Some(query.contract_id) {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new("INVALID_CONTRACT", "Task does not belong to the specified contract")),
)
.into_response();
}
// Get checkpoint patches for this task
let patches = match repository::list_checkpoint_patches(pool, id).await {
Ok(p) => p,
Err(e) => {
tracing::error!("Failed to list patches for task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Get checkpoints to get file lists and messages
let checkpoints = match repository::list_task_checkpoints(pool, id).await {
Ok(c) => c,
Err(e) => {
tracing::warn!("Failed to list checkpoints for task {}: {}", id, e);
vec![]
}
};
// Create a map of checkpoint_id to checkpoint info
let checkpoint_map: std::collections::HashMap<Uuid, _> = checkpoints
.into_iter()
.map(|c| (c.id, c))
.collect();
// Transform to PatchSummary
let summaries: Vec<PatchSummary> = patches
.into_iter()
.map(|p| {
let checkpoint = p.checkpoint_id.and_then(|cid| checkpoint_map.get(&cid));
let name = checkpoint
.map(|c| c.message.clone())
.unwrap_or_else(|| format!("Patch {}", p.created_at.format("%Y-%m-%d %H:%M")));
let description = checkpoint.map(|c| c.message.clone());
let files = checkpoint.and_then(|c| {
c.files_changed.as_ref().and_then(|f| {
f.as_array().map(|arr| {
arr.iter()
.filter_map(|v| v.get("path").and_then(|p| p.as_str()).map(|s| s.to_string()))
.collect()
})
})
});
let lines_added = checkpoint.and_then(|c| c.lines_added).unwrap_or(0);
let lines_removed = checkpoint.and_then(|c| c.lines_removed).unwrap_or(0);
PatchSummary {
id: p.id,
name,
description,
task_id: p.task_id,
contract_id: query.contract_id,
files_count: p.files_count,
lines_added,
lines_removed,
files,
created_at: p.created_at,
updated_at: p.created_at, // Use created_at as updated_at
}
})
.collect();
Json(summaries).into_response()
}
/// Response containing the latest checkpoint patch data for a task.
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct TaskPatchDataResponse {
/// Task ID
pub task_id: Uuid,
/// Base64-encoded patch data (gzip-compressed git diff)
pub patch_data: String,
/// The commit SHA that the patch should be applied on top of
pub base_commit_sha: String,
/// Repository URL from the task
pub repository_url: Option<String>,
}
/// Get the latest checkpoint patch data for a task (for worktree restoration).
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks/{id}/patch-data",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "Latest patch data", body = TaskPatchDataResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task or patch not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn get_task_patch_data(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task (scoped by owner)
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Get latest checkpoint patch (with binary data)
let patch = match repository::get_latest_checkpoint_patch(pool, id).await {
Ok(Some(p)) => p,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "No checkpoint patch found for this task")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get patch for task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
let patch_data_b64 = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data);
Json(TaskPatchDataResponse {
task_id: id,
patch_data: patch_data_b64,
base_commit_sha: patch.base_commit_sha,
repository_url: task.repository_url,
})
.into_response()
}
/// Request to check if a target directory exists.
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CheckTargetExistsRequest {
/// Path to check.
pub target_dir: String,
}
/// Response for check target exists.
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CheckTargetExistsResponse {
/// Whether the target directory exists.
pub exists: bool,
/// The path that was checked (expanded).
pub target_dir: String,
}
/// Check if a target directory exists (for clone validation, requires authentication).
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/check-target",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = CheckTargetExistsRequest,
responses(
(status = 200, description = "Check result", body = CheckTargetExistsResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 503, description = "No daemon connected", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn check_target_exists(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
Json(body): Json<CheckTargetExistsRequest>,
) -> impl IntoResponse {
// Find a connected daemon belonging to this owner to send the command
let daemon_entry = state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id);
let daemon_id = match daemon_entry {
Some(entry) => entry.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")),
)
.into_response();
}
};
// Send CheckTargetExists command to daemon
let command = DaemonCommand::CheckTargetExists {
task_id: id,
target_dir: body.target_dir.clone(),
};
if let Err(e) = state.send_daemon_command(daemon_id, command).await {
tracing::error!("Failed to send CheckTargetExists command: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
// The actual result will be sent back via WebSocket
// For now, just acknowledge the request was sent
Json(serde_json::json!({
"status": "checking",
"taskId": id.to_string(),
"targetDir": body.target_dir,
}))
.into_response()
}
// =============================================================================
// Task Reassignment (Daemon Failover)
// =============================================================================
/// Request to reassign a task to a new daemon after daemon disconnect.
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ReassignTaskRequest {
/// Target daemon ID to reassign to. If not provided, will select any available daemon.
pub target_daemon_id: Option<Uuid>,
/// Whether to include conversation context from previous run.
#[serde(default = "default_include_context")]
pub include_context: bool,
}
fn default_include_context() -> bool {
true
}
/// Response from task reassignment.
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ReassignTaskResponse {
/// The new task that was created.
pub task: Task,
/// The new daemon ID.
pub daemon_id: Uuid,
/// The ID of the old task that was deleted.
pub old_task_id: Uuid,
/// Whether conversation context was included.
pub context_included: bool,
/// Number of context entries from previous conversation.
pub context_entries: usize,
}
/// Build a conversation context summary from task output entries.
/// Returns a formatted string that can be prepended to the task plan.
/// Only includes user and assistant messages - tool use is excluded for cleaner context.
/// Limited to ~4000 characters to avoid bloating the plan.
fn build_conversation_context(entries: &[TaskOutputEntry]) -> String {
const MAX_CONTEXT_LEN: usize = 4000;
const MAX_MESSAGE_LEN: usize = 500;
if entries.is_empty() {
return String::new();
}
// Collect only user and assistant messages
let mut messages: Vec<String> = Vec::new();
for entry in entries.iter() {
match entry.message_type.as_str() {
"assistant" => {
// Truncate long messages
let content = if entry.content.len() > MAX_MESSAGE_LEN {
format!("{}... [truncated]", &entry.content[..MAX_MESSAGE_LEN])
} else {
entry.content.clone()
};
messages.push(format!("Assistant: {}\n", content));
}
"user" | "user_input" => {
let content = if entry.content.len() > MAX_MESSAGE_LEN {
format!("{}... [truncated]", &entry.content[..MAX_MESSAGE_LEN])
} else {
entry.content.clone()
};
messages.push(format!("User: {}\n", content));
}
// Skip tool_use, tool_result, and other message types for cleaner context
_ => {}
}
}
if messages.is_empty() {
return String::new();
}
// Build context from the end (most recent messages) up to the max length
let mut context_body = String::new();
let mut included_count = 0;
for msg in messages.iter().rev() {
if context_body.len() + msg.len() > MAX_CONTEXT_LEN {
break;
}
context_body = format!("{}{}\n", msg, context_body);
included_count += 1;
}
// If we couldn't include all messages, note how many were skipped
let skipped = messages.len() - included_count;
let mut context = String::from("\n\n=== PREVIOUS CONVERSATION CONTEXT ===\n");
context.push_str("The daemon running this task disconnected. Here is what happened so far:\n");
if skipped > 0 {
context.push_str(&format!("(Showing last {} of {} messages)\n", included_count, messages.len()));
}
context.push('\n');
context.push_str(&context_body);
context.push_str("=== END PREVIOUS CONTEXT ===\n\n");
context.push_str("Please continue from where the conversation left off. Do not repeat work that was already done.\n\n");
context
}
/// Reassign a task to a new daemon after the original daemon disconnected.
///
/// This endpoint is used for daemon failover - when a daemon restarts or disconnects,
/// the task can be reassigned to a new daemon with the conversation context preserved.
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/reassign",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = ReassignTaskRequest,
responses(
(status = 200, description = "Task reassigned successfully", body = ReassignTaskResponse),
(status = 400, description = "Task cannot be reassigned (not in failed/interrupted state)", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "No daemon available", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn reassign_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
Json(body): Json<ReassignTaskRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Check if task is in a state that can be reassigned
// Allow reassignment for: failed, interrupted, pending, or tasks whose daemon disconnected
// Helper closure to check if a daemon is connected by its UUID
let is_daemon_connected = |daemon_id: Uuid| {
state.daemon_connections.iter().any(|d| d.value().id == daemon_id)
};
let can_reassign = matches!(
task.status.as_str(),
"failed" | "interrupted" | "pending" | "starting"
) || {
// Also allow if daemon is not connected
if let Some(daemon_id) = task.daemon_id {
!is_daemon_connected(daemon_id)
} else {
true
}
};
if !can_reassign && task.status == "running" {
// Running task - check if its daemon is still connected
if let Some(daemon_id) = task.daemon_id {
if is_daemon_connected(daemon_id) {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"TASK_RUNNING",
"Task is running on a connected daemon. Stop it first to reassign.",
)),
)
.into_response();
}
}
}
// Find a target daemon
let target_daemon_id = if let Some(requested_daemon_id) = body.target_daemon_id {
// Verify the requested daemon is connected and belongs to the owner
let daemon = state.daemon_connections.iter().find(|d| d.value().id == requested_daemon_id);
if let Some(daemon) = daemon {
if daemon.owner_id != auth.owner_id {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new("INVALID_DAEMON", "Daemon does not belong to your account")),
)
.into_response();
}
requested_daemon_id
} else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DAEMON_NOT_CONNECTED", "Requested daemon is not connected")),
)
.into_response();
}
} else {
// Find any available daemon for this owner
match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) {
Some(entry) => entry.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")),
)
.into_response();
}
}
};
// Build conversation context if requested
let (context_str, context_entries) = if body.include_context {
match repository::get_task_output(pool, id, Some(500)).await {
Ok(events) => {
let entries: Vec<TaskOutputEntry> = events
.into_iter()
.filter_map(TaskOutputEntry::from_task_event)
.collect();
let context = build_conversation_context(&entries);
let count = entries.len();
(context, count)
}
Err(e) => {
tracing::warn!("Failed to get task output for context: {}", e);
(String::new(), 0)
}
}
} else {
(String::new(), 0)
};
// Build updated plan with context prepended
let updated_plan = if !context_str.is_empty() {
format!("{}{}", context_str, task.plan)
} else {
task.plan.clone()
};
// Create a NEW task with the conversation context
let create_req = CreateTaskRequest {
contract_id: task.contract_id,
name: format!("{} (resumed)", task.name),
description: task.description.clone(),
plan: updated_plan.clone(),
parent_task_id: task.parent_task_id,
is_supervisor: task.is_supervisor,
priority: task.priority,
repository_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
target_branch: task.target_branch.clone(),
merge_mode: task.merge_mode.clone(),
target_repo_path: task.target_repo_path.clone(),
completion_action: task.completion_action.clone(),
continue_from_task_id: Some(id), // Continue from the old task's worktree if possible
copy_files: None,
checkpoint_sha: task.last_checkpoint_sha.clone(),
branched_from_task_id: None,
conversation_history: None,
supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: None,
directive_step_id: None,
};
let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
Ok(t) => t,
Err(e) => {
tracing::error!("Failed to create new task for reassignment: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Update new task to starting and assign daemon
let start_update = UpdateTaskRequest {
status: Some("starting".to_string()),
daemon_id: Some(target_daemon_id),
version: Some(new_task.version),
..Default::default()
};
let final_task = match repository::update_task_for_owner(pool, new_task.id, auth.owner_id, start_update).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "New task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to update new task daemon assignment: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Fetch latest checkpoint patch for worktree recovery during reassignment
let (patch_data, patch_base_sha) = match repository::get_latest_checkpoint_patch(pool, id).await {
Ok(Some(patch)) => {
tracing::info!(
old_task_id = %id,
new_task_id = %new_task.id,
patch_size = patch.patch_size_bytes,
base_sha = %patch.base_commit_sha,
files_count = patch.files_count,
"Including checkpoint patch for task reassignment recovery"
);
let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data);
(Some(encoded), Some(patch.base_commit_sha))
}
Ok(None) => {
tracing::debug!(old_task_id = %id, "No checkpoint patch found for reassignment");
(None, None)
}
Err(e) => {
tracing::warn!(old_task_id = %id, error = %e, "Failed to fetch checkpoint patch for reassignment");
(None, None)
}
};
// Get local_only and auto_merge_local from contract if task has one
let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id {
match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local),
_ => (false, false),
}
} else {
(false, false)
};
// Send SpawnTask command to daemon for the new task
let command = DaemonCommand::SpawnTask {
task_id: new_task.id,
task_name: final_task.name.clone(),
plan: updated_plan,
repo_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
target_branch: task.target_branch.clone(),
parent_task_id: task.parent_task_id,
depth: task.depth,
is_orchestrator: false, // New task starts fresh
target_repo_path: task.target_repo_path.clone(),
completion_action: task.completion_action.clone(),
continue_from_task_id: Some(id), // Continue from old task's worktree
copy_files: None,
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
autonomous_loop: false,
resume_session: false,
conversation_history: None,
patch_data,
patch_base_sha,
local_only,
auto_merge_local,
supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: task.directive_id,
};
tracing::info!(
old_task_id = %id,
new_task_id = %new_task.id,
new_daemon_id = %target_daemon_id,
context_entries = context_entries,
"Reassigning task: creating new task and deleting old one"
);
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
tracing::error!("Failed to send SpawnTask command for reassignment: {}", e);
// Rollback: delete the new task we created
let _ = repository::delete_task_for_owner(pool, new_task.id, auth.owner_id).await;
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
// Delete the old task now that the new one is spawned
let old_task_id = id;
if let Err(e) = repository::delete_task_for_owner(pool, old_task_id, auth.owner_id).await {
tracing::warn!("Failed to delete old task {}: {}", old_task_id, e);
// Don't fail the request, the new task is already running
}
// Notify the contract's supervisor about the reassignment (if applicable)
if let Some(contract_id) = task.contract_id {
if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
if let Some(supervisor_task_id) = contract.supervisor_task_id {
// Don't notify if we're reassigning the supervisor itself
if supervisor_task_id != old_task_id {
// Find the supervisor's daemon and send a message
if let Ok(Some(supervisor_task)) = repository::get_task_for_owner(pool, supervisor_task_id, auth.owner_id).await {
if supervisor_task.status == "running" {
if let Some(supervisor_daemon_id) = supervisor_task.daemon_id {
// Find the daemon by its UUID
if let Some(daemon_entry) = state.daemon_connections.iter().find(|d| d.value().id == supervisor_daemon_id) {
let notification_msg = format!(
"\n\n[SYSTEM NOTIFICATION] Task '{}' (ID: {}) was reassigned due to daemon disconnect. \
A new task '{}' (ID: {}) has been created to continue the work. \
The new task has {} context entries from the previous conversation.\n\n",
task.name,
old_task_id,
final_task.name,
new_task.id,
context_entries
);
let notify_cmd = DaemonCommand::SendMessage {
task_id: supervisor_task_id,
message: notification_msg,
};
if let Err(e) = state.send_daemon_command(daemon_entry.value().id, notify_cmd).await {
tracing::warn!(
supervisor_id = %supervisor_task_id,
error = %e,
"Failed to notify supervisor about task reassignment"
);
} else {
tracing::info!(
supervisor_id = %supervisor_task_id,
old_task_id = %old_task_id,
new_task_id = %new_task.id,
"Notified supervisor about task reassignment"
);
}
}
}
}
}
}
}
}
}
// Broadcast task update for the new task
state.broadcast_task_update(TaskUpdateNotification {
task_id: new_task.id,
owner_id: Some(auth.owner_id),
version: final_task.version,
status: "starting".to_string(),
updated_fields: vec!["status".to_string(), "daemon_id".to_string()],
updated_by: "reassignment".to_string(),
});
Json(ReassignTaskResponse {
task: final_task,
daemon_id: target_daemon_id,
old_task_id,
context_included: !context_str.is_empty(),
context_entries,
})
.into_response()
}
// =============================================================================
// Task Continue (Restart with Context)
// =============================================================================
/// Request to continue a task after daemon disconnect (restart in-place with context).
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ContinueTaskRequest {
/// Target daemon ID to continue on. If not provided, will select any available daemon.
pub target_daemon_id: Option<Uuid>,
}
/// Response from continuing a task.
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ContinueTaskResponse {
/// The continued task (same ID, updated plan with context).
pub task: Task,
/// The daemon ID running the task.
pub daemon_id: Uuid,
/// Number of context entries from previous conversation.
pub context_entries: usize,
}
/// Continue a task after daemon disconnect by restarting it with conversation context.
///
/// Unlike reassign, this keeps the same task ID and just restarts it with the
/// previous conversation context prepended to the plan. Useful for supervisors.
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/continue",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = ContinueTaskRequest,
responses(
(status = 200, description = "Task continued successfully", body = ContinueTaskResponse),
(status = 400, description = "Task cannot be continued", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "No daemon available", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn continue_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
Json(body): Json<ContinueTaskRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Helper closure to check if a daemon is connected by its UUID
let is_daemon_connected = |daemon_id: Uuid| {
state.daemon_connections.iter().any(|d| d.value().id == daemon_id)
};
// Check if task can be continued (not currently running on a connected daemon)
let can_continue = matches!(
task.status.as_str(),
"failed" | "interrupted" | "pending" | "starting" | "completed"
) || {
if let Some(daemon_id) = task.daemon_id {
!is_daemon_connected(daemon_id)
} else {
true
}
};
if !can_continue && task.status == "running" {
if let Some(daemon_id) = task.daemon_id {
if is_daemon_connected(daemon_id) {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"TASK_RUNNING",
"Task is running on a connected daemon. Stop it first to continue.",
)),
)
.into_response();
}
}
}
// Find a target daemon
let target_daemon_id = if let Some(requested_daemon_id) = body.target_daemon_id {
let daemon = state.daemon_connections.iter().find(|d| d.value().id == requested_daemon_id);
if let Some(daemon) = daemon {
if daemon.owner_id != auth.owner_id {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new("INVALID_DAEMON", "Daemon does not belong to your account")),
)
.into_response();
}
requested_daemon_id
} else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DAEMON_NOT_CONNECTED", "Requested daemon is not connected")),
)
.into_response();
}
} else {
match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) {
Some(entry) => entry.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")),
)
.into_response();
}
}
};
// Build conversation context from task output
let (context_str, context_entries) = match repository::get_task_output(pool, id, Some(500)).await {
Ok(events) => {
let entries: Vec<TaskOutputEntry> = events
.into_iter()
.filter_map(TaskOutputEntry::from_task_event)
.collect();
let context = build_conversation_context(&entries);
let count = entries.len();
(context, count)
}
Err(e) => {
tracing::warn!("Failed to get task output for context: {}", e);
(String::new(), 0)
}
};
// Build updated plan with context prepended
let updated_plan = if !context_str.is_empty() {
format!("{}{}", context_str, task.plan)
} else {
task.plan.clone()
};
// Update task in database: reset status, update plan with context, assign daemon
let update_req = UpdateTaskRequest {
status: Some("starting".to_string()),
plan: Some(updated_plan.clone()),
daemon_id: Some(target_daemon_id),
error_message: None,
..Default::default()
};
let updated_task = match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to update task for continuation: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Check if this is an orchestrator
let subtask_count = match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await {
Ok(subtasks) => subtasks.len(),
Err(_) => 0,
};
let is_orchestrator = task.depth == 0 && subtask_count > 0;
// Get local_only and auto_merge_local from contract if task has one
let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id {
match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local),
_ => (false, false),
}
} else {
(false, false)
};
// Send SpawnTask command to daemon
let command = DaemonCommand::SpawnTask {
task_id: id,
task_name: task.name.clone(),
plan: updated_plan,
repo_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
target_branch: task.target_branch.clone(),
parent_task_id: task.parent_task_id,
depth: task.depth,
is_orchestrator,
target_repo_path: task.target_repo_path.clone(),
completion_action: task.completion_action.clone(),
continue_from_task_id: task.continue_from_task_id,
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
autonomous_loop: false,
resume_session: false,
conversation_history: None,
patch_data: None,
patch_base_sha: None,
local_only,
auto_merge_local,
supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: task.directive_id,
};
tracing::info!(
task_id = %id,
daemon_id = %target_daemon_id,
context_entries = context_entries,
is_supervisor = task.is_supervisor,
"Continuing task with conversation context"
);
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
tracing::error!("Failed to send SpawnTask command for continuation: {}", e);
// Rollback
let rollback_req = UpdateTaskRequest {
status: Some("failed".to_string()),
clear_daemon_id: true,
error_message: Some(format!("Continuation failed: {}", e)),
..Default::default()
};
let _ = repository::update_task_for_owner(pool, id, auth.owner_id, rollback_req).await;
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
// Broadcast task update
state.broadcast_task_update(TaskUpdateNotification {
task_id: id,
owner_id: Some(auth.owner_id),
version: updated_task.version,
status: "starting".to_string(),
updated_fields: vec!["status".to_string(), "daemon_id".to_string(), "plan".to_string()],
updated_by: "continuation".to_string(),
});
Json(ContinueTaskResponse {
task: updated_task,
daemon_id: target_daemon_id,
context_entries,
})
.into_response()
}
// =============================================================================
// Task Rewind and Fork (Resume and History System)
// =============================================================================
/// Rewind task code to specified checkpoint.
///
/// POST /api/v1/mesh/tasks/{id}/rewind
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/rewind",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = crate::db::models::RewindTaskRequest,
responses(
(status = 200, description = "Task rewound successfully", body = crate::db::models::RewindTaskResponse),
(status = 400, description = "Invalid request", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task or checkpoint not found", body = ApiError),
(status = 409, description = "Cannot rewind a running task", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn rewind_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(task_id): Path<Uuid>,
Json(req): Json<crate::db::models::RewindTaskRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get task and verify ownership
let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", task_id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Task cannot be running during rewind
if task.status == "running" {
return (
StatusCode::CONFLICT,
Json(ApiError::new("TASK_RUNNING", "Cannot rewind a running task")),
)
.into_response();
}
// Get checkpoint info
let checkpoint = if let Some(checkpoint_id) = req.checkpoint_id {
match repository::get_task_checkpoint(pool, checkpoint_id).await {
Ok(Some(c)) => c,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Checkpoint not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get checkpoint: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
}
} else if let Some(ref sha) = req.checkpoint_sha {
match repository::get_task_checkpoint_by_sha(pool, sha).await {
Ok(Some(c)) => c,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Checkpoint not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get checkpoint by SHA: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
}
} else {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"MISSING_CHECKPOINT",
"Must provide checkpoint_id or checkpoint_sha",
)),
)
.into_response();
};
// Verify checkpoint belongs to this task
if checkpoint.task_id != task_id {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"CHECKPOINT_MISMATCH",
"Checkpoint does not belong to this task",
)),
)
.into_response();
}
// TODO: Send rewind command to daemon when daemon integration is complete
// For now, return a success response with checkpoint info
tracing::info!(
task_id = %task_id,
checkpoint_number = checkpoint.checkpoint_number,
commit_sha = %checkpoint.commit_sha,
"Task rewind requested"
);
Json(crate::db::models::RewindTaskResponse {
task_id,
rewinded_to: crate::db::models::CheckpointInfo {
checkpoint_number: checkpoint.checkpoint_number,
sha: checkpoint.commit_sha.clone(),
message: checkpoint.message,
},
preserved_as: req.branch_name.map(|name| crate::db::models::PreservedState {
state_type: "branch".to_string(),
reference: name,
}),
})
.into_response()
}
/// Fork task from historical point.
///
/// POST /api/v1/mesh/tasks/{id}/fork
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/fork",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = crate::db::models::ForkTaskRequest,
responses(
(status = 201, description = "Task forked successfully", body = crate::db::models::ForkTaskResponse),
(status = 400, description = "Invalid request", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task or checkpoint not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn fork_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(task_id): Path<Uuid>,
Json(req): Json<crate::db::models::ForkTaskRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get source task and verify ownership
let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", task_id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Find the checkpoint to fork from
let checkpoint = match req.fork_from_type.as_str() {
"checkpoint" => {
// fork_from_value is checkpoint number
let checkpoint_num: i32 = match req.fork_from_value.parse() {
Ok(n) => n,
Err(_) => {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new("INVALID_CHECKPOINT", "Invalid checkpoint number")),
)
.into_response();
}
};
let checkpoints = match repository::list_task_checkpoints(pool, task_id).await {
Ok(c) => c,
Err(e) => {
tracing::error!("Failed to list checkpoints: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
match checkpoints
.into_iter()
.find(|c| c.checkpoint_number == checkpoint_num)
{
Some(c) => c,
None => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Checkpoint not found")),
)
.into_response();
}
}
}
_ => {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"UNSUPPORTED_FORK_TYPE",
"Only 'checkpoint' fork type is currently supported",
)),
)
.into_response();
}
};
// Create the new forked task
let create_req = CreateTaskRequest {
contract_id: task.contract_id,
name: req.new_task_name.clone(),
description: task.description.clone(),
plan: req.new_task_plan.clone(),
parent_task_id: None, // Forked tasks are independent
is_supervisor: false,
priority: task.priority,
repository_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
target_branch: None, // New branch for forked work
merge_mode: task.merge_mode.clone(),
target_repo_path: task.target_repo_path.clone(),
completion_action: task.completion_action.clone(),
continue_from_task_id: None,
copy_files: None,
checkpoint_sha: Some(checkpoint.commit_sha.clone()),
branched_from_task_id: None,
conversation_history: None,
supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: None,
directive_step_id: None,
};
let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
Ok(t) => t,
Err(e) => {
tracing::error!("Failed to create forked task: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
tracing::info!(
source_task_id = %task_id,
new_task_id = %new_task.id,
checkpoint_number = checkpoint.checkpoint_number,
"Task forked from checkpoint"
);
(
StatusCode::CREATED,
Json(crate::db::models::ForkTaskResponse {
new_task_id: new_task.id,
source_task_id: task_id,
fork_point: crate::db::models::ForkPoint {
fork_type: "checkpoint".to_string(),
checkpoint: Some(checkpoint.clone()),
timestamp: checkpoint.created_at,
},
branch_name: req.branch_name,
conversation_included: req.include_conversation.unwrap_or(false),
message_count: None,
}),
)
.into_response()
}
/// Create new task starting from specific checkpoint.
///
/// POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume",
params(
("id" = Uuid, Path, description = "Task ID"),
("cid" = Uuid, Path, description = "Checkpoint ID")
),
request_body = crate::db::models::ResumeFromCheckpointRequest,
responses(
(status = 201, description = "Task created from checkpoint", body = Task),
(status = 400, description = "Invalid request", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task or checkpoint not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn resume_from_checkpoint(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path((task_id, checkpoint_id)): Path<(Uuid, Uuid)>,
Json(req): Json<crate::db::models::ResumeFromCheckpointRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get source task and verify ownership
let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", task_id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Get checkpoint
let checkpoint = match repository::get_task_checkpoint(pool, checkpoint_id).await {
Ok(Some(c)) => c,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Checkpoint not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get checkpoint: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Verify checkpoint belongs to the task
if checkpoint.task_id != task_id {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"CHECKPOINT_MISMATCH",
"Checkpoint does not belong to this task",
)),
)
.into_response();
}
// Create the new task that will start from checkpoint
let task_name = req.task_name.unwrap_or_else(|| {
format!(
"{} (resumed from checkpoint {})",
task.name, checkpoint.checkpoint_number
)
});
let create_req = CreateTaskRequest {
contract_id: task.contract_id,
name: task_name,
description: task.description.clone(),
plan: req.plan,
parent_task_id: None,
is_supervisor: false,
priority: task.priority,
repository_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
target_branch: None, // New branch for resumed work
merge_mode: task.merge_mode.clone(),
target_repo_path: task.target_repo_path.clone(),
completion_action: task.completion_action.clone(),
continue_from_task_id: Some(task_id), // Copy worktree from original task
copy_files: None,
checkpoint_sha: Some(checkpoint.commit_sha.clone()),
branched_from_task_id: None,
conversation_history: None,
supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: None,
directive_step_id: None,
};
let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
Ok(t) => t,
Err(e) => {
tracing::error!("Failed to create resumed task: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
tracing::info!(
source_task_id = %task_id,
new_task_id = %new_task.id,
checkpoint_id = %checkpoint_id,
checkpoint_number = checkpoint.checkpoint_number,
"Task resumed from checkpoint"
);
(StatusCode::CREATED, Json(new_task)).into_response()
}
/// Request to create branch from checkpoint.
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateBranchFromCheckpointRequest {
pub branch_name: String,
#[serde(default)]
pub checkout: bool,
}
/// Response for branch creation from checkpoint.
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct BranchCreatedResponse {
pub branch_name: String,
pub commit_sha: String,
pub task_id: Uuid,
pub checkpoint_number: i32,
}
/// Create git branch from checkpoint without starting task.
///
/// POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch",
params(
("id" = Uuid, Path, description = "Task ID"),
("cid" = Uuid, Path, description = "Checkpoint ID")
),
request_body = CreateBranchFromCheckpointRequest,
responses(
(status = 201, description = "Branch created", body = BranchCreatedResponse),
(status = 400, description = "Invalid request", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task or checkpoint not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn branch_from_checkpoint(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path((task_id, checkpoint_id)): Path<(Uuid, Uuid)>,
Json(req): Json<CreateBranchFromCheckpointRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get task and verify ownership
let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", task_id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Get checkpoint
let checkpoint = match repository::get_task_checkpoint(pool, checkpoint_id).await {
Ok(Some(c)) => c,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Checkpoint not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get checkpoint: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Verify checkpoint belongs to the task
if checkpoint.task_id != task_id {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"CHECKPOINT_MISMATCH",
"Checkpoint does not belong to this task",
)),
)
.into_response();
}
// Find a daemon to execute the branch creation
let target_daemon_id = if let Some(daemon_id) = task.daemon_id {
// Check if the original daemon is still connected
if state
.daemon_connections
.iter()
.any(|d| d.value().id == daemon_id && d.value().owner_id == auth.owner_id)
{
daemon_id
} else {
// Find any connected daemon for this owner
match state
.daemon_connections
.iter()
.find(|d| d.value().owner_id == auth.owner_id)
{
Some(d) => d.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"NO_DAEMON",
"No daemon connected to create branch",
)),
)
.into_response();
}
}
}
} else {
// No daemon assigned - use any available for this owner
match state
.daemon_connections
.iter()
.find(|d| d.value().owner_id == auth.owner_id)
{
Some(d) => d.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"NO_DAEMON",
"No daemon connected to create branch",
)),
)
.into_response();
}
}
};
// Send CreateBranch command to daemon
let cmd = DaemonCommand::CreateBranch {
task_id,
branch_name: req.branch_name.clone(),
from_ref: Some(checkpoint.commit_sha.clone()),
};
if let Err(e) = state.send_daemon_command(target_daemon_id, cmd).await {
tracing::error!("Failed to send CreateBranch command: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
tracing::info!(
task_id = %task_id,
checkpoint_id = %checkpoint_id,
branch_name = %req.branch_name,
commit_sha = %checkpoint.commit_sha,
"Branch creation requested from checkpoint"
);
(
StatusCode::CREATED,
Json(BranchCreatedResponse {
branch_name: req.branch_name,
commit_sha: checkpoint.commit_sha,
task_id,
checkpoint_number: checkpoint.checkpoint_number,
}),
)
.into_response()
}
// =============================================================================
// Task Branching
// =============================================================================
/// Branch a task, creating a new anonymous task from an existing task's conversation.
///
/// Creates a new task that:
/// - Has no contract_id (anonymous task)
/// - Has branched_from_task_id pointing to the source task
/// - Optionally includes conversation history from the source task
/// - Can be started on an available daemon
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/branch",
params(
("id" = Uuid, Path, description = "Source task ID to branch from")
),
request_body = BranchTaskRequest,
responses(
(status = 201, description = "Task branched successfully", body = BranchTaskResponse),
(status = 400, description = "Invalid request", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Source task not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn branch_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(source_task_id): Path<Uuid>,
Json(req): Json<BranchTaskRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the source task (must belong to the same owner)
let source_task = match repository::get_task_for_owner(pool, source_task_id, auth.owner_id).await {
Ok(Some(task)) => task,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Source task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get source task: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Build conversation history if requested
let (conversation_history, message_count) = if req.include_conversation {
match repository::get_task_output(pool, source_task_id, Some(500)).await {
Ok(events) => {
let entries: Vec<TaskOutputEntry> = events
.into_iter()
.filter_map(TaskOutputEntry::from_task_event)
.collect();
let count = entries.len();
// Convert entries to a JSON array for conversation_history
let history_json = serde_json::to_value(&entries).unwrap_or(serde_json::Value::Null);
(Some(history_json), count)
}
Err(e) => {
tracing::warn!("Failed to get task output for branching: {}", e);
(None, 0)
}
}
} else {
(None, 0)
};
// Generate task name if not provided
let task_name = req.name.unwrap_or_else(|| {
format!("{} (branch)", source_task.name)
});
// Create the branched task (anonymous - no contract_id)
let create_req = CreateTaskRequest {
contract_id: None, // Anonymous task
name: task_name,
description: Some(format!("Branched from task: {}", source_task.name)),
plan: req.message,
parent_task_id: None,
is_supervisor: false,
priority: source_task.priority,
repository_url: source_task.repository_url.clone(),
base_branch: source_task.base_branch.clone(),
target_branch: None, // Branched tasks don't auto-merge
merge_mode: None,
target_repo_path: source_task.target_repo_path.clone(),
completion_action: Some("none".to_string()), // Don't auto-complete
continue_from_task_id: Some(source_task_id), // Continue from source task's worktree
copy_files: None,
checkpoint_sha: None,
branched_from_task_id: Some(source_task_id),
conversation_history,
supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: None,
directive_step_id: None,
};
let task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
Ok(task) => task,
Err(e) => {
tracing::error!("Failed to create branched task: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Record history event for task branching
let _ = repository::record_history_event(
pool,
auth.owner_id,
None, // No contract for anonymous tasks
Some(task.id),
"task",
Some("branched"),
None,
serde_json::json!({
"name": &task.name,
"sourceTaskId": source_task_id,
"sourceTaskName": &source_task.name,
"messageCount": message_count,
}),
).await;
// Fetch latest checkpoint patch from source task for worktree recovery
let (patch_data, patch_base_sha) = match repository::get_latest_checkpoint_patch(pool, source_task_id).await {
Ok(Some(patch)) => {
tracing::info!(
source_task_id = %source_task_id,
new_task_id = %task.id,
patch_size = patch.patch_size_bytes,
base_sha = %patch.base_commit_sha,
files_count = patch.files_count,
"Including checkpoint patch for task branching recovery"
);
let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data);
(Some(encoded), Some(patch.base_commit_sha))
}
Ok(None) => {
tracing::debug!(source_task_id = %source_task_id, "No checkpoint patch found for branching");
(None, None)
}
Err(e) => {
tracing::warn!(source_task_id = %source_task_id, error = %e, "Failed to fetch checkpoint patch for branching");
(None, None)
}
};
// Try to find an available daemon to start the task
let daemon_id = state.daemon_connections
.iter()
.find(|d| d.value().owner_id == auth.owner_id)
.map(|d| d.value().id);
// If a daemon is available, start the task
if let Some(target_daemon_id) = daemon_id {
// Update task with daemon assignment
let update_req = UpdateTaskRequest {
status: Some("starting".to_string()),
daemon_id: Some(target_daemon_id),
..Default::default()
};
if let Ok(Some(updated_task)) = repository::update_task_for_owner(pool, task.id, auth.owner_id, update_req).await {
// Send SpawnTask command to daemon
let command = DaemonCommand::SpawnTask {
task_id: task.id,
task_name: updated_task.name.clone(),
plan: updated_task.plan.clone(),
repo_url: updated_task.repository_url.clone(),
base_branch: updated_task.base_branch.clone(),
target_branch: updated_task.target_branch.clone(),
parent_task_id: None,
depth: 0,
is_orchestrator: false,
target_repo_path: updated_task.target_repo_path.clone(),
completion_action: updated_task.completion_action.clone(),
continue_from_task_id: updated_task.continue_from_task_id,
copy_files: None,
contract_id: None,
is_supervisor: false,
autonomous_loop: false,
resume_session: message_count > 0, // Resume if we have conversation history
conversation_history: updated_task.conversation_state.clone(),
patch_data,
patch_base_sha,
local_only: false, // No contract, so not local_only
auto_merge_local: false, // No contract, so no auto_merge_local
supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: None,
};
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
tracing::warn!(
task_id = %task.id,
daemon_id = %target_daemon_id,
error = %e,
"Failed to send SpawnTask command for branched task, task created but not started"
);
// Task was created but not started - return without daemon_id
return (
StatusCode::CREATED,
Json(BranchTaskResponse {
task,
message_count,
daemon_id: None,
}),
)
.into_response();
}
tracing::info!(
task_id = %task.id,
source_task_id = %source_task_id,
daemon_id = %target_daemon_id,
message_count = message_count,
"Branched task created and started"
);
// Broadcast task update notification
state.broadcast_task_update(TaskUpdateNotification {
task_id: task.id,
owner_id: Some(auth.owner_id),
version: updated_task.version,
status: "starting".to_string(),
updated_fields: vec!["status".to_string(), "daemon_id".to_string()],
updated_by: "system".to_string(),
});
return (
StatusCode::CREATED,
Json(BranchTaskResponse {
task: updated_task,
message_count,
daemon_id: Some(target_daemon_id),
}),
)
.into_response();
}
}
// No daemon available or failed to start - return task without daemon_id
tracing::info!(
task_id = %task.id,
source_task_id = %source_task_id,
message_count = message_count,
"Branched task created (no daemon available to start)"
);
(
StatusCode::CREATED,
Json(BranchTaskResponse {
task,
message_count,
daemon_id: None,
}),
)
.into_response()
}
// =============================================================================
// Daemon Management
// =============================================================================
/// Response for restart daemon request.
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct RestartDaemonResponse {
/// Whether the restart command was sent successfully.
pub success: bool,
/// The daemon ID that received the restart command.
pub daemon_id: Uuid,
/// Message describing the result.
pub message: String,
}
/// Restart a daemon by ID (requires authentication).
///
/// Sends a restart command to the specified daemon, which will cause it to
/// gracefully terminate and restart. Any running tasks will be interrupted.
#[utoipa::path(
post,
path = "/api/v1/mesh/daemons/{id}/restart",
params(
("id" = Uuid, Path, description = "Daemon ID")
),
responses(
(status = 200, description = "Restart command sent", body = RestartDaemonResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Daemon not found or not connected", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn restart_daemon(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Verify the daemon exists and belongs to this owner
match repository::get_daemon_for_owner(pool, id, auth.owner_id).await {
Ok(Some(_)) => {}
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Daemon not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get daemon {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
}
// Check if daemon is connected
if !state.is_daemon_connected(id) {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new(
"DAEMON_NOT_CONNECTED",
"Daemon is not currently connected",
)),
)
.into_response();
}
// Send restart command to daemon
let command = DaemonCommand::RestartDaemon;
if let Err(e) = state.send_daemon_command(id, command).await {
tracing::error!("Failed to send restart command to daemon {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
tracing::info!(
daemon_id = %id,
owner_id = %auth.owner_id,
"Restart command sent to daemon"
);
Json(RestartDaemonResponse {
success: true,
daemon_id: id,
message: "Restart command sent. The daemon will restart shortly.".to_string(),
})
.into_response()
}
// =============================================================================
// Daemon Reauthorization
// =============================================================================
/// Response from the trigger reauth endpoint.
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
pub struct TriggerReauthResponse {
/// Whether the reauth command was sent successfully.
pub success: bool,
/// The daemon ID that received the reauth command.
#[serde(rename = "daemonId")]
pub daemon_id: Uuid,
/// Unique request ID for tracking this reauth flow.
#[serde(rename = "requestId")]
pub request_id: Uuid,
}
/// Request body for submitting an auth code.
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
pub struct SubmitAuthCodeRequest {
/// The auth code obtained from the OAuth login flow.
pub code: String,
/// The request ID from the trigger reauth response.
#[serde(rename = "requestId")]
pub request_id: Uuid,
}
/// Response from the submit auth code endpoint.
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
pub struct SubmitAuthCodeResponse {
/// Whether the auth code was sent to the daemon successfully.
pub success: bool,
}
/// Response from the reauth status polling endpoint.
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
pub struct ReauthStatusResponse {
/// Current status of the reauth flow: "pending", "url_ready", "completed", "failed"
pub status: String,
/// OAuth login URL (present when status is "url_ready")
#[serde(rename = "loginUrl", skip_serializing_if = "Option::is_none")]
pub login_url: Option<String>,
/// Error message (present when status is "failed")
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
/// Trigger OAuth re-authentication on a daemon.
///
/// Sends a reauth command to the specified daemon, which will spawn `claude setup-token`
/// and return the OAuth login URL via a status update.
#[utoipa::path(
post,
path = "/api/v1/mesh/daemons/{id}/reauth",
params(
("id" = Uuid, Path, description = "Daemon ID")
),
responses(
(status = 200, description = "Reauth command sent", body = TriggerReauthResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Daemon not found or not connected", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn trigger_daemon_reauth(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Verify the daemon exists and belongs to this owner
match repository::get_daemon_for_owner(pool, id, auth.owner_id).await {
Ok(Some(_)) => {}
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Daemon not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get daemon {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
}
// Check if daemon is connected
if !state.is_daemon_connected(id) {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new(
"DAEMON_NOT_CONNECTED",
"Daemon is not currently connected",
)),
)
.into_response();
}
// Generate a unique request ID for this reauth flow
let request_id = Uuid::new_v4();
// Initialize the status as "pending"
state.set_daemon_reauth_status(id, request_id, "pending".to_string(), None, None);
// Send reauth command to daemon
let command = DaemonCommand::TriggerReauth { request_id };
if let Err(e) = state.send_daemon_command(id, command).await {
tracing::error!("Failed to send reauth command to daemon {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
tracing::info!(
daemon_id = %id,
request_id = %request_id,
owner_id = %auth.owner_id,
"Reauth command sent to daemon"
);
Json(TriggerReauthResponse {
success: true,
daemon_id: id,
request_id,
})
.into_response()
}
/// Submit an OAuth auth code to a daemon's pending reauth flow.
///
/// Sends the auth code to the daemon, which will forward it to the `claude setup-token` process.
#[utoipa::path(
post,
path = "/api/v1/mesh/daemons/{id}/reauth/code",
params(
("id" = Uuid, Path, description = "Daemon ID")
),
request_body = SubmitAuthCodeRequest,
responses(
(status = 200, description = "Auth code submitted", body = SubmitAuthCodeResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Daemon not found or not connected", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn submit_daemon_auth_code(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
Json(body): Json<SubmitAuthCodeRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Verify the daemon exists and belongs to this owner
match repository::get_daemon_for_owner(pool, id, auth.owner_id).await {
Ok(Some(_)) => {}
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Daemon not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get daemon {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
}
// Check if daemon is connected
if !state.is_daemon_connected(id) {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new(
"DAEMON_NOT_CONNECTED",
"Daemon is not currently connected",
)),
)
.into_response();
}
// Send auth code command to daemon
let command = DaemonCommand::SubmitAuthCode {
request_id: body.request_id,
code: body.code,
};
if let Err(e) = state.send_daemon_command(id, command).await {
tracing::error!("Failed to send auth code to daemon {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
tracing::info!(
daemon_id = %id,
request_id = %body.request_id,
owner_id = %auth.owner_id,
"Auth code submitted to daemon"
);
Json(SubmitAuthCodeResponse { success: true }).into_response()
}
/// Get the status of a daemon reauth request.
///
/// Used by the frontend to poll for reauth status updates.
#[utoipa::path(
get,
path = "/api/v1/mesh/daemons/{id}/reauth/{request_id}/status",
params(
("id" = Uuid, Path, description = "Daemon ID"),
("request_id" = Uuid, Path, description = "Reauth request ID")
),
responses(
(status = 200, description = "Reauth status", body = ReauthStatusResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Reauth request not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn get_daemon_reauth_status(
State(state): State<SharedState>,
Authenticated(_auth): Authenticated,
Path((id, request_id)): Path<(Uuid, Uuid)>,
) -> impl IntoResponse {
match state.get_daemon_reauth_status(id, request_id) {
Some(status) => Json(ReauthStatusResponse {
status: status.status,
login_url: status.login_url,
error: status.error,
})
.into_response(),
None => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Reauth request not found")),
)
.into_response(),
}
}