//! HTTP handlers for task and daemon mesh operations.
use axum::{
extract::{Path, State},
http::{HeaderMap, StatusCode},
response::IntoResponse,
Json,
};
use base64::Engine;
use uuid::Uuid;
use crate::db::models::{
BranchTaskRequest, BranchTaskResponse, CreateTaskRequest, DaemonDirectory,
DaemonDirectoriesResponse, DaemonListResponse, SendMessageRequest, Task,
TaskEventListResponse, TaskListResponse, TaskOutputEntry, TaskOutputResponse,
TaskWithSubtasks, UpdateTaskRequest,
};
use crate::db::repository::{self, RepositoryError};
use crate::server::auth::Authenticated;
use crate::server::messages::ApiError;
use crate::server::state::{DaemonCommand, SharedState, TaskUpdateNotification};
// =============================================================================
// Authentication Types
// =============================================================================
/// Source of authentication for mesh endpoints.
#[derive(Debug, Clone)]
pub enum AuthSource {
/// Authenticated via tool key (orchestrator accessing API).
/// Contains the task ID that owns this key.
ToolKey(Uuid),
/// Authenticated via user token (web client).
/// Contains the user ID. (Not implemented yet)
#[allow(dead_code)]
UserToken(Uuid),
/// No authentication provided (anonymous access).
Anonymous,
}
/// Header name for tool key authentication.
pub const TOOL_KEY_HEADER: &str = "x-makima-tool-key";
/// Extract authentication source from request headers.
///
/// Checks for:
/// 1. `X-Makima-Tool-Key` header for orchestrator tool access
/// 2. `Authorization: Bearer` header for user access (future)
/// 3. Falls back to Anonymous if no auth provided
pub fn extract_auth(state: &SharedState, headers: &HeaderMap) -> AuthSource {
// Check for tool key header first
if let Some(tool_key) = headers.get(TOOL_KEY_HEADER) {
if let Ok(key_str) = tool_key.to_str() {
if let Some(task_id) = state.validate_tool_key(key_str) {
return AuthSource::ToolKey(task_id);
}
tracing::warn!("Invalid tool key provided: {}", key_str);
}
}
// Check for Authorization header (future user auth)
if let Some(auth_header) = headers.get("authorization") {
if let Ok(auth_str) = auth_header.to_str() {
if auth_str.starts_with("Bearer ") {
// Future: validate JWT and extract user ID
tracing::debug!("Bearer token auth not yet implemented");
}
}
}
// Default to anonymous
AuthSource::Anonymous
}
// =============================================================================
// Task Handlers
// =============================================================================
/// List all tasks for the current owner.
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks",
responses(
(status = 200, description = "List of tasks", body = TaskListResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn list_tasks(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
match repository::list_tasks_for_owner(pool, auth.owner_id).await {
Ok(tasks) => {
let total = tasks.len() as i64;
Json(TaskListResponse { tasks, total }).into_response()
}
Err(e) => {
tracing::error!("Failed to list tasks: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Get a single task by ID with its subtasks (scoped by owner).
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks/{id}",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "Task details with subtasks", body = TaskWithSubtasks),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn get_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(task)) => {
// Get subtasks for this task (also scoped by owner)
match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await {
Ok(subtasks) => Json(TaskWithSubtasks { task, subtasks }).into_response(),
Err(e) => {
tracing::error!("Failed to get subtasks for task {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response(),
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Create a new task.
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks",
request_body = CreateTaskRequest,
responses(
(status = 201, description = "Task created", body = Task),
(status = 400, description = "Invalid request", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn create_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Json(req): Json<CreateTaskRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
match repository::create_task_for_owner(pool, auth.owner_id, req).await {
Ok(task) => {
// Record history event for task creation
let _ = repository::record_history_event(
pool,
auth.owner_id,
task.contract_id,
Some(task.id),
"task",
Some("created"),
None,
serde_json::json!({
"name": &task.name,
"isSupervisor": task.is_supervisor,
}),
).await;
// Notify supervisor of new task creation if task belongs to a contract
if let Some(contract_id) = task.contract_id {
if !task.is_supervisor {
let pool = pool.clone();
let state_clone = state.clone();
let task_clone = task.clone();
tokio::spawn(async move {
if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await {
state_clone.notify_supervisor_of_task_created(
supervisor.id,
supervisor.daemon_id,
task_clone.id,
&task_clone.name,
).await;
}
});
}
}
(StatusCode::CREATED, Json(task)).into_response()
}
Err(e) => {
tracing::error!("Failed to create task: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Update an existing task (scoped by owner).
#[utoipa::path(
put,
path = "/api/v1/mesh/tasks/{id}",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = UpdateTaskRequest,
responses(
(status = 200, description = "Task updated", body = Task),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 409, description = "Version conflict", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn update_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
Json(req): Json<UpdateTaskRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Check if trying to set a supervisor task to a terminal status
if let Some(ref new_status) = req.status {
let terminal_statuses = ["done", "failed", "merged"];
if terminal_statuses.contains(&new_status.as_str()) {
// Get the task to check if it's a supervisor
if let Ok(Some(task)) = repository::get_task_for_owner(pool, id, auth.owner_id).await {
if task.is_supervisor {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"SUPERVISOR_CANNOT_COMPLETE",
"Supervisor tasks cannot be marked as done, failed, or merged. They run for the lifetime of the contract.",
)),
)
.into_response();
}
}
}
}
// Track which fields are being updated for the notification
let mut updated_fields = Vec::new();
if req.name.is_some() {
updated_fields.push("name".to_string());
}
if req.description.is_some() {
updated_fields.push("description".to_string());
}
if req.status.is_some() {
updated_fields.push("status".to_string());
}
if req.priority.is_some() {
updated_fields.push("priority".to_string());
}
if req.plan.is_some() {
updated_fields.push("plan".to_string());
}
if req.progress_summary.is_some() {
updated_fields.push("progress_summary".to_string());
}
if req.error_message.is_some() {
updated_fields.push("error_message".to_string());
}
match repository::update_task_for_owner(pool, id, auth.owner_id, req).await {
Ok(Some(task)) => {
let updated_fields_clone = updated_fields.clone();
// Broadcast task update notification
state.broadcast_task_update(TaskUpdateNotification {
task_id: task.id,
owner_id: Some(auth.owner_id),
version: task.version,
status: task.status.clone(),
updated_fields,
updated_by: "user".to_string(),
});
// Notify supervisor of status change if task belongs to a contract
if let Some(contract_id) = task.contract_id {
if !task.is_supervisor && updated_fields_clone.contains(&"status".to_string()) {
let pool = pool.clone();
let state_clone = state.clone();
let task_clone = task.clone();
tokio::spawn(async move {
if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await {
state_clone.notify_supervisor_of_task_update(
supervisor.id,
supervisor.daemon_id,
task_clone.id,
&task_clone.name,
&task_clone.status,
&updated_fields_clone,
).await;
}
});
}
}
Json(task).into_response()
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response(),
Err(RepositoryError::VersionConflict { expected, actual }) => {
tracing::info!(
"Version conflict on task {}: expected {}, actual {}",
id,
expected,
actual
);
(
StatusCode::CONFLICT,
Json(serde_json::json!({
"code": "VERSION_CONFLICT",
"message": format!(
"Task was modified by another user. Expected version {}, actual version {}",
expected, actual
),
"expectedVersion": expected,
"actualVersion": actual,
})),
)
.into_response()
}
Err(RepositoryError::Database(e)) => {
tracing::error!("Failed to update task {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Delete a task (scoped by owner).
#[utoipa::path(
delete,
path = "/api/v1/mesh/tasks/{id}",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 204, description = "Task deleted"),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn delete_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task first to check if it's running and needs to be stopped
if let Ok(Some(task)) = repository::get_task_for_owner(pool, id, auth.owner_id).await {
let is_active = matches!(
task.status.as_str(),
"running" | "starting" | "initializing" | "paused"
);
// If task is active and has a daemon, send interrupt command
if is_active {
if let Some(daemon_id) = task.daemon_id {
let command = DaemonCommand::InterruptTask {
task_id: id,
graceful: false,
};
if let Err(e) = state.send_daemon_command(daemon_id, command).await {
tracing::warn!(
task_id = %id,
daemon_id = %daemon_id,
"Failed to send InterruptTask before delete: {}",
e
);
} else {
tracing::info!(
task_id = %id,
daemon_id = %daemon_id,
"Sent InterruptTask before delete"
);
}
}
}
}
// Clean up any pending supervisor questions for this task
state.remove_pending_questions_for_task(id);
match repository::delete_task_for_owner(pool, id, auth.owner_id).await {
Ok(true) => StatusCode::NO_CONTENT.into_response(),
Ok(false) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response(),
Err(e) => {
tracing::error!("Failed to delete task {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Start a task by sending it to an available daemon (scoped by owner).
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/start",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "Task started", body = Task),
(status = 400, description = "Task cannot be started", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured or no daemons available", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn start_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
headers: HeaderMap,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
// Extract authentication to log who is starting the task
let legacy_auth = extract_auth(&state, &headers);
match &legacy_auth {
AuthSource::ToolKey(orchestrator_id) => {
tracing::info!(
task_id = %id,
orchestrator_task_id = %orchestrator_id,
owner_id = %auth.owner_id,
"Orchestrator starting subtask via tool key"
);
}
AuthSource::Anonymous => {
tracing::info!(
task_id = %id,
owner_id = %auth.owner_id,
"Starting task (user request)"
);
}
AuthSource::UserToken(user_id) => {
tracing::info!(
task_id = %id,
user_id = %user_id,
owner_id = %auth.owner_id,
"Starting task via user token"
);
}
}
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task (scoped by owner)
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Check if task can be started (allow pending, failed, interrupted, done, or merged)
let startable_statuses = ["pending", "failed", "interrupted", "done", "merged"];
if !startable_statuses.contains(&task.status.as_str()) {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"INVALID_STATE",
format!("Task cannot be started from status: {}", task.status),
)),
)
.into_response();
}
// Get local_only flag from contract if task has one
let local_only = if let Some(contract_id) = task.contract_id {
match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
Ok(Some(contract)) => contract.local_only,
_ => false,
}
} else {
false
};
// Get list of daemons that have previously failed this task
let mut exclude_daemon_ids: Vec<Uuid> = task.failed_daemon_ids.clone().unwrap_or_default();
// Find an available daemon belonging to this owner, excluding failed ones
let target_daemon_id = match state.find_alternative_daemon(auth.owner_id, &exclude_daemon_ids) {
Some(id) => id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"NO_DAEMON",
"No daemons connected for your account. Cannot start task.",
)),
)
.into_response();
}
};
// Check if this is an orchestrator (depth 0 with subtasks)
let subtask_count = match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await {
Ok(subtasks) => {
tracing::info!(
task_id = %id,
subtask_count = subtasks.len(),
subtask_ids = ?subtasks.iter().map(|s| s.id.to_string()).collect::<Vec<_>>(),
"Counted subtasks for orchestrator check"
);
subtasks.len()
},
Err(e) => {
tracing::warn!("Failed to check subtasks for {}: {}", id, e);
0
}
};
let is_orchestrator = task.depth == 0 && subtask_count > 0;
tracing::info!(
task_id = %id,
task_depth = task.depth,
subtask_count = subtask_count,
is_orchestrator = is_orchestrator,
is_supervisor = task.is_supervisor,
"Starting task with orchestrator/supervisor determination"
);
// IMPORTANT: Update database FIRST to assign daemon_id before sending command
// This prevents race conditions where the task starts but daemon_id is not set
let update_req = UpdateTaskRequest {
status: Some("starting".to_string()),
daemon_id: Some(target_daemon_id),
version: Some(task.version),
..Default::default()
};
let updated_task = match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to update task status: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Send SpawnTask command to daemon
let command = DaemonCommand::SpawnTask {
task_id: id,
task_name: task.name.clone(),
plan: task.plan.clone(),
repo_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
target_branch: task.target_branch.clone(),
parent_task_id: task.parent_task_id,
depth: task.depth,
is_orchestrator,
target_repo_path: task.target_repo_path.clone(),
completion_action: task.completion_action.clone(),
continue_from_task_id: task.continue_from_task_id,
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
autonomous_loop: false,
resume_session: false,
conversation_history: None,
patch_data: None,
patch_base_sha: None,
local_only,
supervisor_worktree_task_id: None, // Not spawned by supervisor
};
tracing::info!(
task_id = %id,
is_supervisor = task.is_supervisor,
is_orchestrator = is_orchestrator,
daemon_id = %target_daemon_id,
"Sending SpawnTask command to daemon"
);
if let Err(e) = state.send_daemon_command(target_daemon_id, command.clone()).await {
tracing::warn!(
task_id = %id,
daemon_id = %target_daemon_id,
error = %e,
"Failed to send SpawnTask command, trying alternative daemon"
);
// Add this daemon to exclude list and try another
exclude_daemon_ids.push(target_daemon_id);
// Try to find an alternative daemon
if let Some(alt_daemon_id) = state.find_alternative_daemon(auth.owner_id, &exclude_daemon_ids) {
// Update task with new daemon
let alt_update_req = UpdateTaskRequest {
daemon_id: Some(alt_daemon_id),
..Default::default()
};
if let Ok(Some(alt_updated_task)) = repository::update_task_for_owner(pool, id, auth.owner_id, alt_update_req).await {
// Recreate command with same data but try new daemon
let alt_command = DaemonCommand::SpawnTask {
task_id: id,
task_name: task.name.clone(),
plan: task.plan.clone(),
repo_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
target_branch: task.target_branch.clone(),
parent_task_id: task.parent_task_id,
depth: task.depth,
is_orchestrator,
target_repo_path: task.target_repo_path.clone(),
completion_action: task.completion_action.clone(),
continue_from_task_id: task.continue_from_task_id,
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
autonomous_loop: false,
resume_session: false,
conversation_history: None,
patch_data: None,
patch_base_sha: None,
local_only,
supervisor_worktree_task_id: None, // Not spawned by supervisor
};
if state.send_daemon_command(alt_daemon_id, alt_command).await.is_ok() {
tracing::info!(
task_id = %id,
old_daemon_id = %target_daemon_id,
new_daemon_id = %alt_daemon_id,
"Task started on alternative daemon after first daemon failed"
);
// Broadcast task update notification
state.broadcast_task_update(TaskUpdateNotification {
task_id: id,
owner_id: Some(auth.owner_id),
version: alt_updated_task.version,
status: "starting".to_string(),
updated_fields: vec!["status".to_string(), "daemon_id".to_string()],
updated_by: "system".to_string(),
});
return Json(alt_updated_task).into_response();
}
}
}
// All daemons failed - rollback and return error
tracing::error!("Failed to start task on any daemon: {}", e);
let rollback_req = UpdateTaskRequest {
status: Some("pending".to_string()),
clear_daemon_id: true,
..Default::default()
};
let _ = repository::update_task_for_owner(pool, id, auth.owner_id, rollback_req).await;
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", "Failed to start task on any available daemon")),
)
.into_response();
}
// Broadcast task update notification
state.broadcast_task_update(TaskUpdateNotification {
task_id: id,
owner_id: Some(auth.owner_id),
version: updated_task.version,
status: "starting".to_string(),
updated_fields: vec!["status".to_string(), "daemon_id".to_string()],
updated_by: "system".to_string(),
});
Json(updated_task).into_response()
}
/// Stop a running task (scoped by owner).
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/stop",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "Task stopped", body = Task),
(status = 400, description = "Task is not running", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured or daemon not connected", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn stop_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task (scoped by owner)
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Check if task is running/active
let is_active = matches!(
task.status.as_str(),
"running" | "starting" | "initializing" | "paused"
);
if !is_active {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"INVALID_STATE",
format!("Task cannot be stopped from status: {}", task.status),
)),
)
.into_response();
}
// Find the daemon running this task
let target_daemon_id = if let Some(daemon_id) = task.daemon_id {
daemon_id
} else {
// No daemon assigned, just update status directly
let update_req = UpdateTaskRequest {
status: Some("failed".to_string()),
error_message: Some("Task stopped by user".to_string()),
version: Some(task.version),
..Default::default()
};
return match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
Ok(Some(updated_task)) => {
state.broadcast_task_update(TaskUpdateNotification {
task_id: id,
owner_id: Some(auth.owner_id),
version: updated_task.version,
status: "failed".to_string(),
updated_fields: vec!["status".to_string(), "error_message".to_string()],
updated_by: "user".to_string(),
});
Json(updated_task).into_response()
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response(),
Err(e) => {
tracing::error!("Failed to update task status: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
};
};
// Send InterruptTask command to daemon
let command = DaemonCommand::InterruptTask {
task_id: id,
graceful: false,
};
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
tracing::warn!("Failed to send InterruptTask command: {}", e);
// Daemon might be disconnected - update task status directly
let update_req = UpdateTaskRequest {
status: Some("failed".to_string()),
error_message: Some("Task stopped by user (daemon unavailable)".to_string()),
version: Some(task.version),
..Default::default()
};
return match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
Ok(Some(updated_task)) => {
state.broadcast_task_update(TaskUpdateNotification {
task_id: id,
owner_id: Some(auth.owner_id),
version: updated_task.version,
status: "failed".to_string(),
updated_fields: vec!["status".to_string(), "error_message".to_string()],
updated_by: "user".to_string(),
});
Json(updated_task).into_response()
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response(),
Err(e) => {
tracing::error!("Failed to update task status: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
};
}
// Update task status to "failed" (stopped)
let update_req = UpdateTaskRequest {
status: Some("failed".to_string()),
error_message: Some("Task stopped by user".to_string()),
version: Some(task.version),
..Default::default()
};
match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
Ok(Some(updated_task)) => {
// Broadcast task update notification
state.broadcast_task_update(TaskUpdateNotification {
task_id: id,
owner_id: Some(auth.owner_id),
version: updated_task.version,
status: "failed".to_string(),
updated_fields: vec!["status".to_string(), "error_message".to_string()],
updated_by: "user".to_string(),
});
Json(updated_task).into_response()
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response(),
Err(e) => {
tracing::error!("Failed to update task status: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Send a message to a running task's stdin (scoped by owner).
///
/// This can be used to provide input to Claude Code when it's waiting for user input,
/// or to inject context/instructions into a running task.
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/message",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = SendMessageRequest,
responses(
(status = 200, description = "Message sent successfully"),
(status = 400, description = "Task is not running", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured or daemon not connected", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn send_message(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
Json(req): Json<SendMessageRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task (scoped by owner)
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Check if task is running (except for AUTH_CODE messages and supervisor tasks)
// Supervisor tasks can receive messages even when not running - daemon will respawn Claude
let is_auth_code = req.message.starts_with("AUTH_CODE:");
let is_supervisor = task.is_supervisor;
if task.status != "running" && !is_auth_code && !is_supervisor {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"INVALID_STATE",
format!(
"Cannot send message to task in status: {}. Task must be running.",
task.status
),
)),
)
.into_response();
}
// Find the daemon running this task
// For supervisors, if no daemon is assigned, find any available daemon for this owner
let target_daemon_id = if let Some(daemon_id) = task.daemon_id {
daemon_id
} else if is_supervisor {
// Supervisor without daemon - find one
match state.daemon_connections
.iter()
.find(|d| d.value().owner_id == auth.owner_id)
{
Some(entry) => entry.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"NO_DAEMON",
"No daemon available. Please start a daemon.",
)),
)
.into_response();
}
}
} else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"NO_DAEMON",
"Task has no assigned daemon. Cannot send message.",
)),
)
.into_response();
};
// Check if daemon is connected before trying to send
if !state.is_daemon_connected(target_daemon_id) {
tracing::warn!(
task_id = %id,
daemon_id = %target_daemon_id,
"Daemon not connected, attempting to reallocate task"
);
// Get list of failed daemons for this task
let mut exclude_daemons = task.failed_daemon_ids.clone().unwrap_or_default();
exclude_daemons.push(target_daemon_id);
// Try to find an alternative daemon
if let Some(new_daemon_id) = state.find_alternative_daemon(auth.owner_id, &exclude_daemons) {
// Mark the task for retry and update with new daemon
if let Some(ref pool) = state.db_pool {
// Mark the old daemon as failed for this task
let _ = repository::mark_task_for_retry(pool, id, target_daemon_id).await;
// Update task with new daemon and restart
let update_req = UpdateTaskRequest {
status: Some("starting".to_string()),
daemon_id: Some(new_daemon_id),
..Default::default()
};
if let Ok(Some(updated_task)) = repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
// Get local_only from contract if task has one
let local_only = if let Some(contract_id) = updated_task.contract_id {
match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
Ok(Some(contract)) => contract.local_only,
_ => false,
}
} else {
false
};
// Send spawn command to new daemon
let spawn_cmd = DaemonCommand::SpawnTask {
task_id: id,
task_name: updated_task.name.clone(),
plan: updated_task.plan.clone(),
repo_url: updated_task.repository_url.clone(),
base_branch: updated_task.base_branch.clone(),
target_branch: updated_task.target_branch.clone(),
parent_task_id: updated_task.parent_task_id,
depth: updated_task.depth,
is_orchestrator: false,
target_repo_path: updated_task.target_repo_path.clone(),
completion_action: updated_task.completion_action.clone(),
continue_from_task_id: updated_task.continue_from_task_id,
copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: updated_task.contract_id,
is_supervisor: updated_task.is_supervisor,
autonomous_loop: false,
resume_session: false,
conversation_history: None,
patch_data: None,
patch_base_sha: None,
local_only,
supervisor_worktree_task_id: None, // Not spawned by supervisor
};
if state.send_daemon_command(new_daemon_id, spawn_cmd).await.is_ok() {
tracing::info!(
task_id = %id,
old_daemon_id = %target_daemon_id,
new_daemon_id = %new_daemon_id,
"Task reallocated to new daemon, will restart"
);
return (
StatusCode::ACCEPTED,
Json(serde_json::json!({
"success": true,
"reallocated": true,
"taskId": id,
"message": "Task was reallocated to a new daemon and is restarting. Please retry your message shortly."
})),
).into_response();
}
}
}
}
// Could not reallocate - return error with helpful message
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"DAEMON_DISCONNECTED",
"The daemon running this task has disconnected and no alternative daemon is available. Please start a daemon.",
)),
).into_response();
}
// Send SendMessage command to daemon
let command = DaemonCommand::SendMessage {
task_id: id,
message: req.message.clone(),
};
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
tracing::error!("Failed to send SendMessage command: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
tracing::info!(task_id = %id, message_len = req.message.len(), "Message sent to task");
// Return success
(
StatusCode::OK,
Json(serde_json::json!({
"success": true,
"taskId": id,
"messageLength": req.message.len()
})),
)
.into_response()
}
/// Get task output history (scoped by owner).
///
/// Retrieves all recorded output from a task's Claude Code process.
/// This allows the frontend to fetch missed output when subscribing late
/// or reconnecting after a disconnect.
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks/{id}/output",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "Task output history", body = TaskOutputResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn get_task_output(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Verify task exists and belongs to owner
match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(_)) => {}
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
}
// Get output history (task already verified to belong to owner)
match repository::get_task_output(pool, id, None).await {
Ok(events) => {
let entries: Vec<TaskOutputEntry> = events
.into_iter()
.filter_map(TaskOutputEntry::from_task_event)
.collect();
let total = entries.len();
Json(TaskOutputResponse {
entries,
total,
task_id: id,
})
.into_response()
}
Err(e) => {
tracing::error!("Failed to get task output: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// List subtasks for a parent task (scoped by owner).
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks/{id}/subtasks",
params(
("id" = Uuid, Path, description = "Parent task ID")
),
responses(
(status = 200, description = "List of subtasks", body = TaskListResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn list_subtasks(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await {
Ok(tasks) => {
let total = tasks.len() as i64;
Json(TaskListResponse { tasks, total }).into_response()
}
Err(e) => {
tracing::error!("Failed to list subtasks for task {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// List events for a task (scoped by owner).
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks/{id}/events",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "List of task events", body = TaskEventListResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn list_task_events(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Verify task exists and belongs to owner
match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(_)) => {}
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
}
match repository::list_task_events(pool, id, None).await {
Ok(events) => {
let total = events.len() as i64;
Json(TaskEventListResponse { events, total }).into_response()
}
Err(e) => {
tracing::error!("Failed to list events for task {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Retry completion action for a completed task (scoped by owner).
///
/// This allows retrying a completion action (push branch, merge, create PR)
/// after filling in the target_repo_path if it wasn't set when the task completed.
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/retry-completion",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "Completion action initiated"),
(status = 400, description = "Invalid request (task not completed, no completion action, etc.)", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured or daemon not connected", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn retry_completion_action(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task (scoped by owner)
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Check if task is in a terminal state
let terminal_statuses = ["done", "failed", "merged"];
if !terminal_statuses.contains(&task.status.as_str()) {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"INVALID_STATE",
format!(
"Task must be completed to retry completion action. Current status: {}",
task.status
),
)),
)
.into_response();
}
// Check if completion action is set
let action = match &task.completion_action {
Some(action) if action != "none" => action.clone(),
_ => {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"NO_COMPLETION_ACTION",
"Task has no completion action configured (or is set to 'none')",
)),
)
.into_response();
}
};
// Check if target_repo_path is set
let target_repo_path = match &task.target_repo_path {
Some(path) if !path.is_empty() => path.clone(),
_ => {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"NO_TARGET_REPO",
"Target repository path must be set before retrying completion action",
)),
)
.into_response();
}
};
// Note: We don't check overlay_path here because the server may not have it
// The daemon will scan its worktrees directory to find the worktree by task ID
// Find a daemon to execute the action (must belong to this owner)
// Prefer the daemon that ran the task, but fall back to any available daemon for this owner
let target_daemon_id = if let Some(daemon_id) = task.daemon_id {
// Check if this daemon is still connected and belongs to this owner
if state.daemon_connections.iter().any(|d| d.value().id == daemon_id && d.value().owner_id == auth.owner_id) {
daemon_id
} else {
// Fall back to any connected daemon for this owner
match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) {
Some(d) => d.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"NO_DAEMON",
"No daemons connected for your account. Cannot execute completion action.",
)),
)
.into_response();
}
}
}
} else {
// No daemon assigned - use any available for this owner
match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) {
Some(d) => d.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"NO_DAEMON",
"No daemons connected for your account. Cannot execute completion action.",
)),
)
.into_response();
}
}
};
// Send RetryCompletionAction command to daemon
let command = DaemonCommand::RetryCompletionAction {
task_id: id,
task_name: task.name.clone(),
action: action.clone(),
target_repo_path: target_repo_path.clone(),
target_branch: task.target_branch.clone(),
};
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
tracing::error!("Failed to send RetryCompletionAction command: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
tracing::info!(
task_id = %id,
action = %action,
target_repo = %target_repo_path,
"Retry completion action initiated"
);
(
StatusCode::OK,
Json(serde_json::json!({
"success": true,
"taskId": id,
"action": action,
"targetRepoPath": target_repo_path,
"message": "Completion action initiated. Check task output for results."
})),
)
.into_response()
}
// =============================================================================
// Daemon Handlers
// =============================================================================
/// List all connected daemons (requires authentication).
#[utoipa::path(
get,
path = "/api/v1/mesh/daemons",
responses(
(status = 200, description = "List of daemons", body = DaemonListResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn list_daemons(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Only list daemons belonging to this owner
match repository::list_daemons_for_owner(pool, auth.owner_id).await {
Ok(daemons) => {
let total = daemons.len() as i64;
Json(DaemonListResponse { daemons, total }).into_response()
}
Err(e) => {
tracing::error!("Failed to list daemons: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Get a single daemon by ID (requires authentication).
#[utoipa::path(
get,
path = "/api/v1/mesh/daemons/{id}",
params(
("id" = Uuid, Path, description = "Daemon ID")
),
responses(
(status = 200, description = "Daemon details", body = crate::db::models::Daemon),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Daemon not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn get_daemon(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Only get daemon if it belongs to this owner
match repository::get_daemon_for_owner(pool, id, auth.owner_id).await {
Ok(Some(daemon)) => Json(daemon).into_response(),
Ok(None) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Daemon not found")),
)
.into_response(),
Err(e) => {
tracing::error!("Failed to get daemon {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// Get suggested directories from connected daemons (requires authentication).
///
/// Returns directories that can be used as target_repo_path for completion actions.
#[utoipa::path(
get,
path = "/api/v1/mesh/daemons/directories",
responses(
(status = 200, description = "List of suggested directories", body = DaemonDirectoriesResponse),
(status = 401, description = "Unauthorized", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn get_daemon_directories(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
) -> impl IntoResponse {
let mut directories = Vec::new();
// Iterate over connected daemons belonging to this owner and collect their directories
for entry in state.daemon_connections.iter() {
let daemon = entry.value();
// Only include daemons belonging to this owner
if daemon.owner_id != auth.owner_id {
continue;
}
// Add working directory if available
if let Some(ref working_dir) = daemon.working_directory {
directories.push(DaemonDirectory {
path: working_dir.clone(),
label: "Working Directory".to_string(),
directory_type: "working".to_string(),
hostname: daemon.hostname.clone(),
exists: None,
});
}
// Add home directory if available (for cloning completed work)
if let Some(ref home_dir) = daemon.home_directory {
directories.push(DaemonDirectory {
path: home_dir.clone(),
label: "Makima Home".to_string(),
directory_type: "home".to_string(),
hostname: daemon.hostname.clone(),
exists: None,
});
}
}
Json(DaemonDirectoriesResponse { directories })
}
/// Request to clone a worktree to a target directory.
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CloneWorktreeRequest {
/// Path to the target directory.
pub target_dir: String,
}
/// Clone a task's worktree to a target directory (scoped by owner).
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/clone",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = CloneWorktreeRequest,
responses(
(status = 200, description = "Clone command sent"),
(status = 400, description = "Invalid request or task not completed", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured or daemon not connected", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn clone_worktree(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
Json(body): Json<CloneWorktreeRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task (scoped by owner)
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Verify task is in a completed state
let is_completed = matches!(task.status.as_str(), "done" | "failed" | "merged");
if !is_completed {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"INVALID_STATE",
format!("Task must be completed to clone (current status: {})", task.status),
)),
)
.into_response();
}
// Find a connected daemon belonging to this owner to send the command
let daemon_entry = state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id);
let daemon_id = match daemon_entry {
Some(entry) => entry.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")),
)
.into_response();
}
};
// Send CloneWorktree command to daemon
let command = DaemonCommand::CloneWorktree {
task_id: id,
target_dir: body.target_dir.clone(),
};
if let Err(e) = state.send_daemon_command(daemon_id, command).await {
tracing::error!("Failed to send CloneWorktree command: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
Json(serde_json::json!({
"status": "cloning",
"taskId": id.to_string(),
"targetDir": body.target_dir,
}))
.into_response()
}
// =============================================================================
// Worktree Info
// =============================================================================
/// Response for worktree info.
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct WorktreeInfoResponse {
/// Task ID.
pub task_id: Uuid,
/// Path to the worktree directory.
pub worktree_path: Option<String>,
/// Whether the worktree exists.
pub exists: bool,
/// Aggregate statistics.
pub stats: WorktreeStats,
/// Changed files list.
pub files: Vec<WorktreeFile>,
/// Current branch name.
pub branch: Option<String>,
/// Current HEAD commit SHA.
pub head_sha: Option<String>,
}
/// Statistics about worktree changes.
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct WorktreeStats {
/// Number of files changed.
pub files_changed: i32,
/// Total lines inserted.
pub insertions: i32,
/// Total lines deleted.
pub deletions: i32,
}
/// Information about a changed file in the worktree.
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct WorktreeFile {
/// File path relative to worktree root.
pub path: String,
/// Git status code (M, A, D, R, C, U, ?).
pub status: String,
/// Lines added.
pub lines_added: i32,
/// Lines removed.
pub lines_removed: i32,
}
/// Get worktree information for a task (files, stats, branch info).
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks/{id}/worktree-info",
params(
("id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "Worktree info", body = WorktreeInfoResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured or daemon not connected", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn get_worktree_info(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task (scoped by owner)
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Get daemon running the task
let Some(daemon_id) = task.daemon_id else {
// Task has no daemon, return empty worktree info
return Json(WorktreeInfoResponse {
task_id: id,
worktree_path: None,
exists: false,
stats: WorktreeStats {
files_changed: 0,
insertions: 0,
deletions: 0,
},
files: vec![],
branch: None,
head_sha: None,
})
.into_response();
};
// Send GetWorktreeInfo command to daemon
let command = DaemonCommand::GetWorktreeInfo { task_id: id };
if let Err(e) = state.send_daemon_command(daemon_id, command).await {
tracing::error!("Failed to send GetWorktreeInfo command: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
// Return placeholder - actual data will be streamed via WebSocket
// For now, return empty data indicating the request is being processed
Json(WorktreeInfoResponse {
task_id: id,
worktree_path: None,
exists: false,
stats: WorktreeStats {
files_changed: 0,
insertions: 0,
deletions: 0,
},
files: vec![],
branch: None,
head_sha: None,
})
.into_response()
}
/// Request to check if a target directory exists.
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CheckTargetExistsRequest {
/// Path to check.
pub target_dir: String,
}
/// Response for check target exists.
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CheckTargetExistsResponse {
/// Whether the target directory exists.
pub exists: bool,
/// The path that was checked (expanded).
pub target_dir: String,
}
/// Check if a target directory exists (for clone validation, requires authentication).
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/check-target",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = CheckTargetExistsRequest,
responses(
(status = 200, description = "Check result", body = CheckTargetExistsResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 503, description = "No daemon connected", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn check_target_exists(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
Json(body): Json<CheckTargetExistsRequest>,
) -> impl IntoResponse {
// Find a connected daemon belonging to this owner to send the command
let daemon_entry = state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id);
let daemon_id = match daemon_entry {
Some(entry) => entry.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")),
)
.into_response();
}
};
// Send CheckTargetExists command to daemon
let command = DaemonCommand::CheckTargetExists {
task_id: id,
target_dir: body.target_dir.clone(),
};
if let Err(e) = state.send_daemon_command(daemon_id, command).await {
tracing::error!("Failed to send CheckTargetExists command: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
// The actual result will be sent back via WebSocket
// For now, just acknowledge the request was sent
Json(serde_json::json!({
"status": "checking",
"taskId": id.to_string(),
"targetDir": body.target_dir,
}))
.into_response()
}
// =============================================================================
// Task Reassignment (Daemon Failover)
// =============================================================================
/// Request to reassign a task to a new daemon after daemon disconnect.
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ReassignTaskRequest {
/// Target daemon ID to reassign to. If not provided, will select any available daemon.
pub target_daemon_id: Option<Uuid>,
/// Whether to include conversation context from previous run.
#[serde(default = "default_include_context")]
pub include_context: bool,
}
fn default_include_context() -> bool {
true
}
/// Response from task reassignment.
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ReassignTaskResponse {
/// The new task that was created.
pub task: Task,
/// The new daemon ID.
pub daemon_id: Uuid,
/// The ID of the old task that was deleted.
pub old_task_id: Uuid,
/// Whether conversation context was included.
pub context_included: bool,
/// Number of context entries from previous conversation.
pub context_entries: usize,
}
/// Build a conversation context summary from task output entries.
/// Returns a formatted string that can be prepended to the task plan.
/// Only includes user and assistant messages - tool use is excluded for cleaner context.
/// Limited to ~4000 characters to avoid bloating the plan.
fn build_conversation_context(entries: &[TaskOutputEntry]) -> String {
const MAX_CONTEXT_LEN: usize = 4000;
const MAX_MESSAGE_LEN: usize = 500;
if entries.is_empty() {
return String::new();
}
// Collect only user and assistant messages
let mut messages: Vec<String> = Vec::new();
for entry in entries.iter() {
match entry.message_type.as_str() {
"assistant" => {
// Truncate long messages
let content = if entry.content.len() > MAX_MESSAGE_LEN {
format!("{}... [truncated]", &entry.content[..MAX_MESSAGE_LEN])
} else {
entry.content.clone()
};
messages.push(format!("Assistant: {}\n", content));
}
"user" | "user_input" => {
let content = if entry.content.len() > MAX_MESSAGE_LEN {
format!("{}... [truncated]", &entry.content[..MAX_MESSAGE_LEN])
} else {
entry.content.clone()
};
messages.push(format!("User: {}\n", content));
}
// Skip tool_use, tool_result, and other message types for cleaner context
_ => {}
}
}
if messages.is_empty() {
return String::new();
}
// Build context from the end (most recent messages) up to the max length
let mut context_body = String::new();
let mut included_count = 0;
for msg in messages.iter().rev() {
if context_body.len() + msg.len() > MAX_CONTEXT_LEN {
break;
}
context_body = format!("{}{}\n", msg, context_body);
included_count += 1;
}
// If we couldn't include all messages, note how many were skipped
let skipped = messages.len() - included_count;
let mut context = String::from("\n\n=== PREVIOUS CONVERSATION CONTEXT ===\n");
context.push_str("The daemon running this task disconnected. Here is what happened so far:\n");
if skipped > 0 {
context.push_str(&format!("(Showing last {} of {} messages)\n", included_count, messages.len()));
}
context.push('\n');
context.push_str(&context_body);
context.push_str("=== END PREVIOUS CONTEXT ===\n\n");
context.push_str("Please continue from where the conversation left off. Do not repeat work that was already done.\n\n");
context
}
/// Reassign a task to a new daemon after the original daemon disconnected.
///
/// This endpoint is used for daemon failover - when a daemon restarts or disconnects,
/// the task can be reassigned to a new daemon with the conversation context preserved.
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/reassign",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = ReassignTaskRequest,
responses(
(status = 200, description = "Task reassigned successfully", body = ReassignTaskResponse),
(status = 400, description = "Task cannot be reassigned (not in failed/interrupted state)", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "No daemon available", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn reassign_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
Json(body): Json<ReassignTaskRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Check if task is in a state that can be reassigned
// Allow reassignment for: failed, interrupted, pending, or tasks whose daemon disconnected
// Helper closure to check if a daemon is connected by its UUID
let is_daemon_connected = |daemon_id: Uuid| {
state.daemon_connections.iter().any(|d| d.value().id == daemon_id)
};
let can_reassign = matches!(
task.status.as_str(),
"failed" | "interrupted" | "pending" | "starting"
) || {
// Also allow if daemon is not connected
if let Some(daemon_id) = task.daemon_id {
!is_daemon_connected(daemon_id)
} else {
true
}
};
if !can_reassign && task.status == "running" {
// Running task - check if its daemon is still connected
if let Some(daemon_id) = task.daemon_id {
if is_daemon_connected(daemon_id) {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"TASK_RUNNING",
"Task is running on a connected daemon. Stop it first to reassign.",
)),
)
.into_response();
}
}
}
// Find a target daemon
let target_daemon_id = if let Some(requested_daemon_id) = body.target_daemon_id {
// Verify the requested daemon is connected and belongs to the owner
let daemon = state.daemon_connections.iter().find(|d| d.value().id == requested_daemon_id);
if let Some(daemon) = daemon {
if daemon.owner_id != auth.owner_id {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new("INVALID_DAEMON", "Daemon does not belong to your account")),
)
.into_response();
}
requested_daemon_id
} else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DAEMON_NOT_CONNECTED", "Requested daemon is not connected")),
)
.into_response();
}
} else {
// Find any available daemon for this owner
match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) {
Some(entry) => entry.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")),
)
.into_response();
}
}
};
// Build conversation context if requested
let (context_str, context_entries) = if body.include_context {
match repository::get_task_output(pool, id, Some(500)).await {
Ok(events) => {
let entries: Vec<TaskOutputEntry> = events
.into_iter()
.filter_map(TaskOutputEntry::from_task_event)
.collect();
let context = build_conversation_context(&entries);
let count = entries.len();
(context, count)
}
Err(e) => {
tracing::warn!("Failed to get task output for context: {}", e);
(String::new(), 0)
}
}
} else {
(String::new(), 0)
};
// Build updated plan with context prepended
let updated_plan = if !context_str.is_empty() {
format!("{}{}", context_str, task.plan)
} else {
task.plan.clone()
};
// Create a NEW task with the conversation context
let create_req = CreateTaskRequest {
contract_id: task.contract_id,
name: format!("{} (resumed)", task.name),
description: task.description.clone(),
plan: updated_plan.clone(),
parent_task_id: task.parent_task_id,
is_supervisor: task.is_supervisor,
is_red_team: task.is_red_team,
priority: task.priority,
repository_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
target_branch: task.target_branch.clone(),
merge_mode: task.merge_mode.clone(),
target_repo_path: task.target_repo_path.clone(),
completion_action: task.completion_action.clone(),
continue_from_task_id: Some(id), // Continue from the old task's worktree if possible
copy_files: None,
checkpoint_sha: task.last_checkpoint_sha.clone(),
branched_from_task_id: None,
conversation_history: None,
};
let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
Ok(t) => t,
Err(e) => {
tracing::error!("Failed to create new task for reassignment: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Update new task to starting and assign daemon
let start_update = UpdateTaskRequest {
status: Some("starting".to_string()),
daemon_id: Some(target_daemon_id),
version: Some(new_task.version),
..Default::default()
};
let final_task = match repository::update_task_for_owner(pool, new_task.id, auth.owner_id, start_update).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "New task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to update new task daemon assignment: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Fetch latest checkpoint patch for worktree recovery during reassignment
let (patch_data, patch_base_sha) = match repository::get_latest_checkpoint_patch(pool, id).await {
Ok(Some(patch)) => {
tracing::info!(
old_task_id = %id,
new_task_id = %new_task.id,
patch_size = patch.patch_size_bytes,
base_sha = %patch.base_commit_sha,
files_count = patch.files_count,
"Including checkpoint patch for task reassignment recovery"
);
let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data);
(Some(encoded), Some(patch.base_commit_sha))
}
Ok(None) => {
tracing::debug!(old_task_id = %id, "No checkpoint patch found for reassignment");
(None, None)
}
Err(e) => {
tracing::warn!(old_task_id = %id, error = %e, "Failed to fetch checkpoint patch for reassignment");
(None, None)
}
};
// Get local_only from contract if task has one
let local_only = if let Some(contract_id) = task.contract_id {
match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
Ok(Some(contract)) => contract.local_only,
_ => false,
}
} else {
false
};
// Send SpawnTask command to daemon for the new task
let command = DaemonCommand::SpawnTask {
task_id: new_task.id,
task_name: final_task.name.clone(),
plan: updated_plan,
repo_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
target_branch: task.target_branch.clone(),
parent_task_id: task.parent_task_id,
depth: task.depth,
is_orchestrator: false, // New task starts fresh
target_repo_path: task.target_repo_path.clone(),
completion_action: task.completion_action.clone(),
continue_from_task_id: Some(id), // Continue from old task's worktree
copy_files: None,
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
autonomous_loop: false,
resume_session: false,
conversation_history: None,
patch_data,
patch_base_sha,
local_only,
supervisor_worktree_task_id: None, // Not spawned by supervisor
};
tracing::info!(
old_task_id = %id,
new_task_id = %new_task.id,
new_daemon_id = %target_daemon_id,
context_entries = context_entries,
"Reassigning task: creating new task and deleting old one"
);
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
tracing::error!("Failed to send SpawnTask command for reassignment: {}", e);
// Rollback: delete the new task we created
let _ = repository::delete_task_for_owner(pool, new_task.id, auth.owner_id).await;
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
// Delete the old task now that the new one is spawned
let old_task_id = id;
if let Err(e) = repository::delete_task_for_owner(pool, old_task_id, auth.owner_id).await {
tracing::warn!("Failed to delete old task {}: {}", old_task_id, e);
// Don't fail the request, the new task is already running
}
// Notify the contract's supervisor about the reassignment (if applicable)
if let Some(contract_id) = task.contract_id {
if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
if let Some(supervisor_task_id) = contract.supervisor_task_id {
// Don't notify if we're reassigning the supervisor itself
if supervisor_task_id != old_task_id {
// Find the supervisor's daemon and send a message
if let Ok(Some(supervisor_task)) = repository::get_task_for_owner(pool, supervisor_task_id, auth.owner_id).await {
if supervisor_task.status == "running" {
if let Some(supervisor_daemon_id) = supervisor_task.daemon_id {
// Find the daemon by its UUID
if let Some(daemon_entry) = state.daemon_connections.iter().find(|d| d.value().id == supervisor_daemon_id) {
let notification_msg = format!(
"\n\n[SYSTEM NOTIFICATION] Task '{}' (ID: {}) was reassigned due to daemon disconnect. \
A new task '{}' (ID: {}) has been created to continue the work. \
The new task has {} context entries from the previous conversation.\n\n",
task.name,
old_task_id,
final_task.name,
new_task.id,
context_entries
);
let notify_cmd = DaemonCommand::SendMessage {
task_id: supervisor_task_id,
message: notification_msg,
};
if let Err(e) = state.send_daemon_command(daemon_entry.value().id, notify_cmd).await {
tracing::warn!(
supervisor_id = %supervisor_task_id,
error = %e,
"Failed to notify supervisor about task reassignment"
);
} else {
tracing::info!(
supervisor_id = %supervisor_task_id,
old_task_id = %old_task_id,
new_task_id = %new_task.id,
"Notified supervisor about task reassignment"
);
}
}
}
}
}
}
}
}
}
// Broadcast task update for the new task
state.broadcast_task_update(TaskUpdateNotification {
task_id: new_task.id,
owner_id: Some(auth.owner_id),
version: final_task.version,
status: "starting".to_string(),
updated_fields: vec!["status".to_string(), "daemon_id".to_string()],
updated_by: "reassignment".to_string(),
});
Json(ReassignTaskResponse {
task: final_task,
daemon_id: target_daemon_id,
old_task_id,
context_included: !context_str.is_empty(),
context_entries,
})
.into_response()
}
// =============================================================================
// Task Continue (Restart with Context)
// =============================================================================
/// Request to continue a task after daemon disconnect (restart in-place with context).
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ContinueTaskRequest {
/// Target daemon ID to continue on. If not provided, will select any available daemon.
pub target_daemon_id: Option<Uuid>,
}
/// Response from continuing a task.
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ContinueTaskResponse {
/// The continued task (same ID, updated plan with context).
pub task: Task,
/// The daemon ID running the task.
pub daemon_id: Uuid,
/// Number of context entries from previous conversation.
pub context_entries: usize,
}
/// Continue a task after daemon disconnect by restarting it with conversation context.
///
/// Unlike reassign, this keeps the same task ID and just restarts it with the
/// previous conversation context prepended to the plan. Useful for supervisors.
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/continue",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = ContinueTaskRequest,
responses(
(status = 200, description = "Task continued successfully", body = ContinueTaskResponse),
(status = 400, description = "Task cannot be continued", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "No daemon available", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn continue_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
Json(body): Json<ContinueTaskRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the task
let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Helper closure to check if a daemon is connected by its UUID
let is_daemon_connected = |daemon_id: Uuid| {
state.daemon_connections.iter().any(|d| d.value().id == daemon_id)
};
// Check if task can be continued (not currently running on a connected daemon)
let can_continue = matches!(
task.status.as_str(),
"failed" | "interrupted" | "pending" | "starting" | "completed"
) || {
if let Some(daemon_id) = task.daemon_id {
!is_daemon_connected(daemon_id)
} else {
true
}
};
if !can_continue && task.status == "running" {
if let Some(daemon_id) = task.daemon_id {
if is_daemon_connected(daemon_id) {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"TASK_RUNNING",
"Task is running on a connected daemon. Stop it first to continue.",
)),
)
.into_response();
}
}
}
// Find a target daemon
let target_daemon_id = if let Some(requested_daemon_id) = body.target_daemon_id {
let daemon = state.daemon_connections.iter().find(|d| d.value().id == requested_daemon_id);
if let Some(daemon) = daemon {
if daemon.owner_id != auth.owner_id {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new("INVALID_DAEMON", "Daemon does not belong to your account")),
)
.into_response();
}
requested_daemon_id
} else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DAEMON_NOT_CONNECTED", "Requested daemon is not connected")),
)
.into_response();
}
} else {
match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) {
Some(entry) => entry.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")),
)
.into_response();
}
}
};
// Build conversation context from task output
let (context_str, context_entries) = match repository::get_task_output(pool, id, Some(500)).await {
Ok(events) => {
let entries: Vec<TaskOutputEntry> = events
.into_iter()
.filter_map(TaskOutputEntry::from_task_event)
.collect();
let context = build_conversation_context(&entries);
let count = entries.len();
(context, count)
}
Err(e) => {
tracing::warn!("Failed to get task output for context: {}", e);
(String::new(), 0)
}
};
// Build updated plan with context prepended
let updated_plan = if !context_str.is_empty() {
format!("{}{}", context_str, task.plan)
} else {
task.plan.clone()
};
// Update task in database: reset status, update plan with context, assign daemon
let update_req = UpdateTaskRequest {
status: Some("starting".to_string()),
plan: Some(updated_plan.clone()),
daemon_id: Some(target_daemon_id),
error_message: None,
..Default::default()
};
let updated_task = match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to update task for continuation: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Check if this is an orchestrator
let subtask_count = match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await {
Ok(subtasks) => subtasks.len(),
Err(_) => 0,
};
let is_orchestrator = task.depth == 0 && subtask_count > 0;
// Get local_only from contract if task has one
let local_only = if let Some(contract_id) = task.contract_id {
match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
Ok(Some(contract)) => contract.local_only,
_ => false,
}
} else {
false
};
// Send SpawnTask command to daemon
let command = DaemonCommand::SpawnTask {
task_id: id,
task_name: task.name.clone(),
plan: updated_plan,
repo_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
target_branch: task.target_branch.clone(),
parent_task_id: task.parent_task_id,
depth: task.depth,
is_orchestrator,
target_repo_path: task.target_repo_path.clone(),
completion_action: task.completion_action.clone(),
continue_from_task_id: task.continue_from_task_id,
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
autonomous_loop: false,
resume_session: false,
conversation_history: None,
patch_data: None,
patch_base_sha: None,
local_only,
supervisor_worktree_task_id: None, // Not spawned by supervisor
};
tracing::info!(
task_id = %id,
daemon_id = %target_daemon_id,
context_entries = context_entries,
is_supervisor = task.is_supervisor,
"Continuing task with conversation context"
);
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
tracing::error!("Failed to send SpawnTask command for continuation: {}", e);
// Rollback
let rollback_req = UpdateTaskRequest {
status: Some("failed".to_string()),
clear_daemon_id: true,
error_message: Some(format!("Continuation failed: {}", e)),
..Default::default()
};
let _ = repository::update_task_for_owner(pool, id, auth.owner_id, rollback_req).await;
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
// Broadcast task update
state.broadcast_task_update(TaskUpdateNotification {
task_id: id,
owner_id: Some(auth.owner_id),
version: updated_task.version,
status: "starting".to_string(),
updated_fields: vec!["status".to_string(), "daemon_id".to_string(), "plan".to_string()],
updated_by: "continuation".to_string(),
});
Json(ContinueTaskResponse {
task: updated_task,
daemon_id: target_daemon_id,
context_entries,
})
.into_response()
}
// =============================================================================
// Task Rewind and Fork (Resume and History System)
// =============================================================================
/// Rewind task code to specified checkpoint.
///
/// POST /api/v1/mesh/tasks/{id}/rewind
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/rewind",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = crate::db::models::RewindTaskRequest,
responses(
(status = 200, description = "Task rewound successfully", body = crate::db::models::RewindTaskResponse),
(status = 400, description = "Invalid request", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task or checkpoint not found", body = ApiError),
(status = 409, description = "Cannot rewind a running task", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn rewind_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(task_id): Path<Uuid>,
Json(req): Json<crate::db::models::RewindTaskRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get task and verify ownership
let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", task_id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Task cannot be running during rewind
if task.status == "running" {
return (
StatusCode::CONFLICT,
Json(ApiError::new("TASK_RUNNING", "Cannot rewind a running task")),
)
.into_response();
}
// Get checkpoint info
let checkpoint = if let Some(checkpoint_id) = req.checkpoint_id {
match repository::get_task_checkpoint(pool, checkpoint_id).await {
Ok(Some(c)) => c,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Checkpoint not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get checkpoint: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
}
} else if let Some(ref sha) = req.checkpoint_sha {
match repository::get_task_checkpoint_by_sha(pool, sha).await {
Ok(Some(c)) => c,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Checkpoint not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get checkpoint by SHA: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
}
} else {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"MISSING_CHECKPOINT",
"Must provide checkpoint_id or checkpoint_sha",
)),
)
.into_response();
};
// Verify checkpoint belongs to this task
if checkpoint.task_id != task_id {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"CHECKPOINT_MISMATCH",
"Checkpoint does not belong to this task",
)),
)
.into_response();
}
// TODO: Send rewind command to daemon when daemon integration is complete
// For now, return a success response with checkpoint info
tracing::info!(
task_id = %task_id,
checkpoint_number = checkpoint.checkpoint_number,
commit_sha = %checkpoint.commit_sha,
"Task rewind requested"
);
Json(crate::db::models::RewindTaskResponse {
task_id,
rewinded_to: crate::db::models::CheckpointInfo {
checkpoint_number: checkpoint.checkpoint_number,
sha: checkpoint.commit_sha.clone(),
message: checkpoint.message,
},
preserved_as: req.branch_name.map(|name| crate::db::models::PreservedState {
state_type: "branch".to_string(),
reference: name,
}),
})
.into_response()
}
/// Fork task from historical point.
///
/// POST /api/v1/mesh/tasks/{id}/fork
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/fork",
params(
("id" = Uuid, Path, description = "Task ID")
),
request_body = crate::db::models::ForkTaskRequest,
responses(
(status = 201, description = "Task forked successfully", body = crate::db::models::ForkTaskResponse),
(status = 400, description = "Invalid request", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task or checkpoint not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn fork_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(task_id): Path<Uuid>,
Json(req): Json<crate::db::models::ForkTaskRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get source task and verify ownership
let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", task_id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Find the checkpoint to fork from
let checkpoint = match req.fork_from_type.as_str() {
"checkpoint" => {
// fork_from_value is checkpoint number
let checkpoint_num: i32 = match req.fork_from_value.parse() {
Ok(n) => n,
Err(_) => {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new("INVALID_CHECKPOINT", "Invalid checkpoint number")),
)
.into_response();
}
};
let checkpoints = match repository::list_task_checkpoints(pool, task_id).await {
Ok(c) => c,
Err(e) => {
tracing::error!("Failed to list checkpoints: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
match checkpoints
.into_iter()
.find(|c| c.checkpoint_number == checkpoint_num)
{
Some(c) => c,
None => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Checkpoint not found")),
)
.into_response();
}
}
}
_ => {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"UNSUPPORTED_FORK_TYPE",
"Only 'checkpoint' fork type is currently supported",
)),
)
.into_response();
}
};
// Create the new forked task
let create_req = CreateTaskRequest {
contract_id: task.contract_id,
name: req.new_task_name.clone(),
description: task.description.clone(),
plan: req.new_task_plan.clone(),
parent_task_id: None, // Forked tasks are independent
is_supervisor: false,
is_red_team: false,
priority: task.priority,
repository_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
target_branch: None, // New branch for forked work
merge_mode: task.merge_mode.clone(),
target_repo_path: task.target_repo_path.clone(),
completion_action: task.completion_action.clone(),
continue_from_task_id: None,
copy_files: None,
checkpoint_sha: Some(checkpoint.commit_sha.clone()),
branched_from_task_id: None,
conversation_history: None,
};
let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
Ok(t) => t,
Err(e) => {
tracing::error!("Failed to create forked task: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
tracing::info!(
source_task_id = %task_id,
new_task_id = %new_task.id,
checkpoint_number = checkpoint.checkpoint_number,
"Task forked from checkpoint"
);
(
StatusCode::CREATED,
Json(crate::db::models::ForkTaskResponse {
new_task_id: new_task.id,
source_task_id: task_id,
fork_point: crate::db::models::ForkPoint {
fork_type: "checkpoint".to_string(),
checkpoint: Some(checkpoint.clone()),
timestamp: checkpoint.created_at,
},
branch_name: req.branch_name,
conversation_included: req.include_conversation.unwrap_or(false),
message_count: None,
}),
)
.into_response()
}
/// Create new task starting from specific checkpoint.
///
/// POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume",
params(
("id" = Uuid, Path, description = "Task ID"),
("cid" = Uuid, Path, description = "Checkpoint ID")
),
request_body = crate::db::models::ResumeFromCheckpointRequest,
responses(
(status = 201, description = "Task created from checkpoint", body = Task),
(status = 400, description = "Invalid request", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task or checkpoint not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn resume_from_checkpoint(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path((task_id, checkpoint_id)): Path<(Uuid, Uuid)>,
Json(req): Json<crate::db::models::ResumeFromCheckpointRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get source task and verify ownership
let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", task_id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Get checkpoint
let checkpoint = match repository::get_task_checkpoint(pool, checkpoint_id).await {
Ok(Some(c)) => c,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Checkpoint not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get checkpoint: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Verify checkpoint belongs to the task
if checkpoint.task_id != task_id {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"CHECKPOINT_MISMATCH",
"Checkpoint does not belong to this task",
)),
)
.into_response();
}
// Create the new task that will start from checkpoint
let task_name = req.task_name.unwrap_or_else(|| {
format!(
"{} (resumed from checkpoint {})",
task.name, checkpoint.checkpoint_number
)
});
let create_req = CreateTaskRequest {
contract_id: task.contract_id,
name: task_name,
description: task.description.clone(),
plan: req.plan,
parent_task_id: None,
is_supervisor: false,
is_red_team: false,
priority: task.priority,
repository_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
target_branch: None, // New branch for resumed work
merge_mode: task.merge_mode.clone(),
target_repo_path: task.target_repo_path.clone(),
completion_action: task.completion_action.clone(),
continue_from_task_id: Some(task_id), // Copy worktree from original task
copy_files: None,
checkpoint_sha: Some(checkpoint.commit_sha.clone()),
branched_from_task_id: None,
conversation_history: None,
};
let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
Ok(t) => t,
Err(e) => {
tracing::error!("Failed to create resumed task: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
tracing::info!(
source_task_id = %task_id,
new_task_id = %new_task.id,
checkpoint_id = %checkpoint_id,
checkpoint_number = checkpoint.checkpoint_number,
"Task resumed from checkpoint"
);
(StatusCode::CREATED, Json(new_task)).into_response()
}
/// Request to create branch from checkpoint.
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateBranchFromCheckpointRequest {
pub branch_name: String,
#[serde(default)]
pub checkout: bool,
}
/// Response for branch creation from checkpoint.
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct BranchCreatedResponse {
pub branch_name: String,
pub commit_sha: String,
pub task_id: Uuid,
pub checkpoint_number: i32,
}
/// Create git branch from checkpoint without starting task.
///
/// POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch",
params(
("id" = Uuid, Path, description = "Task ID"),
("cid" = Uuid, Path, description = "Checkpoint ID")
),
request_body = CreateBranchFromCheckpointRequest,
responses(
(status = 201, description = "Branch created", body = BranchCreatedResponse),
(status = 400, description = "Invalid request", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Task or checkpoint not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn branch_from_checkpoint(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path((task_id, checkpoint_id)): Path<(Uuid, Uuid)>,
Json(req): Json<CreateBranchFromCheckpointRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get task and verify ownership
let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", task_id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Get checkpoint
let checkpoint = match repository::get_task_checkpoint(pool, checkpoint_id).await {
Ok(Some(c)) => c,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Checkpoint not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get checkpoint: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Verify checkpoint belongs to the task
if checkpoint.task_id != task_id {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"CHECKPOINT_MISMATCH",
"Checkpoint does not belong to this task",
)),
)
.into_response();
}
// Find a daemon to execute the branch creation
let target_daemon_id = if let Some(daemon_id) = task.daemon_id {
// Check if the original daemon is still connected
if state
.daemon_connections
.iter()
.any(|d| d.value().id == daemon_id && d.value().owner_id == auth.owner_id)
{
daemon_id
} else {
// Find any connected daemon for this owner
match state
.daemon_connections
.iter()
.find(|d| d.value().owner_id == auth.owner_id)
{
Some(d) => d.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"NO_DAEMON",
"No daemon connected to create branch",
)),
)
.into_response();
}
}
}
} else {
// No daemon assigned - use any available for this owner
match state
.daemon_connections
.iter()
.find(|d| d.value().owner_id == auth.owner_id)
{
Some(d) => d.value().id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"NO_DAEMON",
"No daemon connected to create branch",
)),
)
.into_response();
}
}
};
// Send CreateBranch command to daemon
let cmd = DaemonCommand::CreateBranch {
task_id,
branch_name: req.branch_name.clone(),
from_ref: Some(checkpoint.commit_sha.clone()),
};
if let Err(e) = state.send_daemon_command(target_daemon_id, cmd).await {
tracing::error!("Failed to send CreateBranch command: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
tracing::info!(
task_id = %task_id,
checkpoint_id = %checkpoint_id,
branch_name = %req.branch_name,
commit_sha = %checkpoint.commit_sha,
"Branch creation requested from checkpoint"
);
(
StatusCode::CREATED,
Json(BranchCreatedResponse {
branch_name: req.branch_name,
commit_sha: checkpoint.commit_sha,
task_id,
checkpoint_number: checkpoint.checkpoint_number,
}),
)
.into_response()
}
// =============================================================================
// Task Branching
// =============================================================================
/// Branch a task, creating a new anonymous task from an existing task's conversation.
///
/// Creates a new task that:
/// - Has no contract_id (anonymous task)
/// - Has branched_from_task_id pointing to the source task
/// - Optionally includes conversation history from the source task
/// - Can be started on an available daemon
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/branch",
params(
("id" = Uuid, Path, description = "Source task ID to branch from")
),
request_body = BranchTaskRequest,
responses(
(status = 201, description = "Task branched successfully", body = BranchTaskResponse),
(status = 400, description = "Invalid request", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Source task not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn branch_task(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(source_task_id): Path<Uuid>,
Json(req): Json<BranchTaskRequest>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get the source task (must belong to the same owner)
let source_task = match repository::get_task_for_owner(pool, source_task_id, auth.owner_id).await {
Ok(Some(task)) => task,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Source task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get source task: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Build conversation history if requested
let (conversation_history, message_count) = if req.include_conversation {
match repository::get_task_output(pool, source_task_id, Some(500)).await {
Ok(events) => {
let entries: Vec<TaskOutputEntry> = events
.into_iter()
.filter_map(TaskOutputEntry::from_task_event)
.collect();
let count = entries.len();
// Convert entries to a JSON array for conversation_history
let history_json = serde_json::to_value(&entries).unwrap_or(serde_json::Value::Null);
(Some(history_json), count)
}
Err(e) => {
tracing::warn!("Failed to get task output for branching: {}", e);
(None, 0)
}
}
} else {
(None, 0)
};
// Generate task name if not provided
let task_name = req.name.unwrap_or_else(|| {
format!("{} (branch)", source_task.name)
});
// Create the branched task (anonymous - no contract_id)
let create_req = CreateTaskRequest {
contract_id: None, // Anonymous task
name: task_name,
description: Some(format!("Branched from task: {}", source_task.name)),
plan: req.message,
parent_task_id: None,
is_supervisor: false,
is_red_team: false,
priority: source_task.priority,
repository_url: source_task.repository_url.clone(),
base_branch: source_task.base_branch.clone(),
target_branch: None, // Branched tasks don't auto-merge
merge_mode: None,
target_repo_path: source_task.target_repo_path.clone(),
completion_action: Some("none".to_string()), // Don't auto-complete
continue_from_task_id: Some(source_task_id), // Continue from source task's worktree
copy_files: None,
checkpoint_sha: None,
branched_from_task_id: Some(source_task_id),
conversation_history,
};
let task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
Ok(task) => task,
Err(e) => {
tracing::error!("Failed to create branched task: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Record history event for task branching
let _ = repository::record_history_event(
pool,
auth.owner_id,
None, // No contract for anonymous tasks
Some(task.id),
"task",
Some("branched"),
None,
serde_json::json!({
"name": &task.name,
"sourceTaskId": source_task_id,
"sourceTaskName": &source_task.name,
"messageCount": message_count,
}),
).await;
// Fetch latest checkpoint patch from source task for worktree recovery
let (patch_data, patch_base_sha) = match repository::get_latest_checkpoint_patch(pool, source_task_id).await {
Ok(Some(patch)) => {
tracing::info!(
source_task_id = %source_task_id,
new_task_id = %task.id,
patch_size = patch.patch_size_bytes,
base_sha = %patch.base_commit_sha,
files_count = patch.files_count,
"Including checkpoint patch for task branching recovery"
);
let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data);
(Some(encoded), Some(patch.base_commit_sha))
}
Ok(None) => {
tracing::debug!(source_task_id = %source_task_id, "No checkpoint patch found for branching");
(None, None)
}
Err(e) => {
tracing::warn!(source_task_id = %source_task_id, error = %e, "Failed to fetch checkpoint patch for branching");
(None, None)
}
};
// Try to find an available daemon to start the task
let daemon_id = state.daemon_connections
.iter()
.find(|d| d.value().owner_id == auth.owner_id)
.map(|d| d.value().id);
// If a daemon is available, start the task
if let Some(target_daemon_id) = daemon_id {
// Update task with daemon assignment
let update_req = UpdateTaskRequest {
status: Some("starting".to_string()),
daemon_id: Some(target_daemon_id),
..Default::default()
};
if let Ok(Some(updated_task)) = repository::update_task_for_owner(pool, task.id, auth.owner_id, update_req).await {
// Send SpawnTask command to daemon
let command = DaemonCommand::SpawnTask {
task_id: task.id,
task_name: updated_task.name.clone(),
plan: updated_task.plan.clone(),
repo_url: updated_task.repository_url.clone(),
base_branch: updated_task.base_branch.clone(),
target_branch: updated_task.target_branch.clone(),
parent_task_id: None,
depth: 0,
is_orchestrator: false,
target_repo_path: updated_task.target_repo_path.clone(),
completion_action: updated_task.completion_action.clone(),
continue_from_task_id: updated_task.continue_from_task_id,
copy_files: None,
contract_id: None,
is_supervisor: false,
autonomous_loop: false,
resume_session: message_count > 0, // Resume if we have conversation history
conversation_history: updated_task.conversation_state.clone(),
patch_data,
patch_base_sha,
local_only: false, // No contract, so not local_only
supervisor_worktree_task_id: None, // Not spawned by supervisor
};
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
tracing::warn!(
task_id = %task.id,
daemon_id = %target_daemon_id,
error = %e,
"Failed to send SpawnTask command for branched task, task created but not started"
);
// Task was created but not started - return without daemon_id
return (
StatusCode::CREATED,
Json(BranchTaskResponse {
task,
message_count,
daemon_id: None,
}),
)
.into_response();
}
tracing::info!(
task_id = %task.id,
source_task_id = %source_task_id,
daemon_id = %target_daemon_id,
message_count = message_count,
"Branched task created and started"
);
// Broadcast task update notification
state.broadcast_task_update(TaskUpdateNotification {
task_id: task.id,
owner_id: Some(auth.owner_id),
version: updated_task.version,
status: "starting".to_string(),
updated_fields: vec!["status".to_string(), "daemon_id".to_string()],
updated_by: "system".to_string(),
});
return (
StatusCode::CREATED,
Json(BranchTaskResponse {
task: updated_task,
message_count,
daemon_id: Some(target_daemon_id),
}),
)
.into_response();
}
}
// No daemon available or failed to start - return task without daemon_id
tracing::info!(
task_id = %task.id,
source_task_id = %source_task_id,
message_count = message_count,
"Branched task created (no daemon available to start)"
);
(
StatusCode::CREATED,
Json(BranchTaskResponse {
task,
message_count,
daemon_id: None,
}),
)
.into_response()
}
// =============================================================================
// Daemon Management
// =============================================================================
/// Response for restart daemon request.
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct RestartDaemonResponse {
/// Whether the restart command was sent successfully.
pub success: bool,
/// The daemon ID that received the restart command.
pub daemon_id: Uuid,
/// Message describing the result.
pub message: String,
}
/// Restart a daemon by ID (requires authentication).
///
/// Sends a restart command to the specified daemon, which will cause it to
/// gracefully terminate and restart. Any running tasks will be interrupted.
#[utoipa::path(
post,
path = "/api/v1/mesh/daemons/{id}/restart",
params(
("id" = Uuid, Path, description = "Daemon ID")
),
responses(
(status = 200, description = "Restart command sent", body = RestartDaemonResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Daemon not found or not connected", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn restart_daemon(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Verify the daemon exists and belongs to this owner
match repository::get_daemon_for_owner(pool, id, auth.owner_id).await {
Ok(Some(_)) => {}
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Daemon not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get daemon {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
}
// Check if daemon is connected
if !state.is_daemon_connected(id) {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new(
"DAEMON_NOT_CONNECTED",
"Daemon is not currently connected",
)),
)
.into_response();
}
// Send restart command to daemon
let command = DaemonCommand::RestartDaemon;
if let Err(e) = state.send_daemon_command(id, command).await {
tracing::error!("Failed to send restart command to daemon {}: {}", id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DAEMON_ERROR", e)),
)
.into_response();
}
tracing::info!(
daemon_id = %id,
owner_id = %auth.owner_id,
"Restart command sent to daemon"
);
Json(RestartDaemonResponse {
success: true,
daemon_id: id,
message: "Restart command sent. The daemon will restart shortly.".to_string(),
})
.into_response()
}