summaryrefslogtreecommitdiff
path: root/makima/src/server/handlers/mesh.rs
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/server/handlers/mesh.rs')
-rw-r--r--makima/src/server/handlers/mesh.rs1679
1 files changed, 1679 insertions, 0 deletions
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs
new file mode 100644
index 0000000..760740c
--- /dev/null
+++ b/makima/src/server/handlers/mesh.rs
@@ -0,0 +1,1679 @@
+//! HTTP handlers for task and daemon mesh operations.
+
+use axum::{
+ extract::{Path, State},
+ http::{HeaderMap, StatusCode},
+ response::IntoResponse,
+ Json,
+};
+use uuid::Uuid;
+
+use crate::db::models::{
+ CreateTaskRequest, DaemonDirectory, DaemonDirectoriesResponse, DaemonListResponse,
+ SendMessageRequest, Task, TaskEventListResponse, TaskListResponse, TaskOutputEntry,
+ TaskOutputResponse, TaskWithSubtasks, UpdateTaskRequest,
+};
+use crate::db::repository::{self, RepositoryError};
+use crate::server::auth::Authenticated;
+use crate::server::messages::ApiError;
+use crate::server::state::{DaemonCommand, SharedState, TaskUpdateNotification};
+
+// =============================================================================
+// Authentication Types
+// =============================================================================
+
+/// Source of authentication for mesh endpoints.
+#[derive(Debug, Clone)]
+pub enum AuthSource {
+ /// Authenticated via tool key (orchestrator accessing API).
+ /// Contains the task ID that owns this key.
+ ToolKey(Uuid),
+ /// Authenticated via user token (web client).
+ /// Contains the user ID. (Not implemented yet)
+ #[allow(dead_code)]
+ UserToken(Uuid),
+ /// No authentication provided (anonymous access).
+ Anonymous,
+}
+
+/// Header name for tool key authentication.
+pub const TOOL_KEY_HEADER: &str = "x-makima-tool-key";
+
+/// Extract authentication source from request headers.
+///
+/// Checks for:
+/// 1. `X-Makima-Tool-Key` header for orchestrator tool access
+/// 2. `Authorization: Bearer` header for user access (future)
+/// 3. Falls back to Anonymous if no auth provided
+pub fn extract_auth(state: &SharedState, headers: &HeaderMap) -> AuthSource {
+ // Check for tool key header first
+ if let Some(tool_key) = headers.get(TOOL_KEY_HEADER) {
+ if let Ok(key_str) = tool_key.to_str() {
+ if let Some(task_id) = state.validate_tool_key(key_str) {
+ return AuthSource::ToolKey(task_id);
+ }
+ tracing::warn!("Invalid tool key provided");
+ }
+ }
+
+ // Check for Authorization header (future user auth)
+ if let Some(auth_header) = headers.get("authorization") {
+ if let Ok(auth_str) = auth_header.to_str() {
+ if auth_str.starts_with("Bearer ") {
+ // Future: validate JWT and extract user ID
+ tracing::debug!("Bearer token auth not yet implemented");
+ }
+ }
+ }
+
+ // Default to anonymous
+ AuthSource::Anonymous
+}
+
+// =============================================================================
+// Task Handlers
+// =============================================================================
+
+/// List all tasks for the current owner.
+#[utoipa::path(
+ get,
+ path = "/api/v1/mesh/tasks",
+ responses(
+ (status = 200, description = "List of tasks", body = TaskListResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn list_tasks(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ match repository::list_tasks_for_owner(pool, auth.owner_id).await {
+ Ok(tasks) => {
+ let total = tasks.len() as i64;
+ Json(TaskListResponse { tasks, total }).into_response()
+ }
+ Err(e) => {
+ tracing::error!("Failed to list tasks: {}", e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ }
+}
+
+/// Get a single task by ID with its subtasks (scoped by owner).
+#[utoipa::path(
+ get,
+ path = "/api/v1/mesh/tasks/{id}",
+ params(
+ ("id" = Uuid, Path, description = "Task ID")
+ ),
+ responses(
+ (status = 200, description = "Task details with subtasks", body = TaskWithSubtasks),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn get_task(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ match repository::get_task_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(task)) => {
+ // Get subtasks for this task (also scoped by owner)
+ match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await {
+ Ok(subtasks) => Json(TaskWithSubtasks { task, subtasks }).into_response(),
+ Err(e) => {
+ tracing::error!("Failed to get subtasks for task {}: {}", id, e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ }
+ }
+ Ok(None) => (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response(),
+ Err(e) => {
+ tracing::error!("Failed to get task {}: {}", id, e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ }
+}
+
+/// Create a new task.
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks",
+ request_body = CreateTaskRequest,
+ responses(
+ (status = 201, description = "Task created", body = Task),
+ (status = 400, description = "Invalid request", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn create_task(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Json(req): Json<CreateTaskRequest>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ match repository::create_task_for_owner(pool, auth.owner_id, req).await {
+ Ok(task) => (StatusCode::CREATED, Json(task)).into_response(),
+ Err(e) => {
+ tracing::error!("Failed to create task: {}", e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ }
+}
+
+/// Update an existing task (scoped by owner).
+#[utoipa::path(
+ put,
+ path = "/api/v1/mesh/tasks/{id}",
+ params(
+ ("id" = Uuid, Path, description = "Task ID")
+ ),
+ request_body = UpdateTaskRequest,
+ responses(
+ (status = 200, description = "Task updated", body = Task),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task not found", body = ApiError),
+ (status = 409, description = "Version conflict", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn update_task(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+ Json(req): Json<UpdateTaskRequest>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Track which fields are being updated for the notification
+ let mut updated_fields = Vec::new();
+ if req.name.is_some() {
+ updated_fields.push("name".to_string());
+ }
+ if req.description.is_some() {
+ updated_fields.push("description".to_string());
+ }
+ if req.status.is_some() {
+ updated_fields.push("status".to_string());
+ }
+ if req.priority.is_some() {
+ updated_fields.push("priority".to_string());
+ }
+ if req.plan.is_some() {
+ updated_fields.push("plan".to_string());
+ }
+ if req.progress_summary.is_some() {
+ updated_fields.push("progress_summary".to_string());
+ }
+ if req.error_message.is_some() {
+ updated_fields.push("error_message".to_string());
+ }
+
+ match repository::update_task_for_owner(pool, id, auth.owner_id, req).await {
+ Ok(Some(task)) => {
+ // Broadcast task update notification
+ state.broadcast_task_update(TaskUpdateNotification {
+ task_id: task.id,
+ owner_id: Some(auth.owner_id),
+ version: task.version,
+ status: task.status.clone(),
+ updated_fields,
+ updated_by: "user".to_string(),
+ });
+ Json(task).into_response()
+ }
+ Ok(None) => (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response(),
+ Err(RepositoryError::VersionConflict { expected, actual }) => {
+ tracing::info!(
+ "Version conflict on task {}: expected {}, actual {}",
+ id,
+ expected,
+ actual
+ );
+ (
+ StatusCode::CONFLICT,
+ Json(serde_json::json!({
+ "code": "VERSION_CONFLICT",
+ "message": format!(
+ "Task was modified by another user. Expected version {}, actual version {}",
+ expected, actual
+ ),
+ "expectedVersion": expected,
+ "actualVersion": actual,
+ })),
+ )
+ .into_response()
+ }
+ Err(RepositoryError::Database(e)) => {
+ tracing::error!("Failed to update task {}: {}", id, e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ }
+}
+
+/// Delete a task (scoped by owner).
+#[utoipa::path(
+ delete,
+ path = "/api/v1/mesh/tasks/{id}",
+ params(
+ ("id" = Uuid, Path, description = "Task ID")
+ ),
+ responses(
+ (status = 204, description = "Task deleted"),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn delete_task(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get the task first to check if it's running and needs to be stopped
+ if let Ok(Some(task)) = repository::get_task_for_owner(pool, id, auth.owner_id).await {
+ let is_active = matches!(
+ task.status.as_str(),
+ "running" | "starting" | "initializing" | "paused"
+ );
+
+ // If task is active and has a daemon, send interrupt command
+ if is_active {
+ if let Some(daemon_id) = task.daemon_id {
+ let command = DaemonCommand::InterruptTask {
+ task_id: id,
+ graceful: false,
+ };
+ if let Err(e) = state.send_daemon_command(daemon_id, command).await {
+ tracing::warn!(
+ task_id = %id,
+ daemon_id = %daemon_id,
+ "Failed to send InterruptTask before delete: {}",
+ e
+ );
+ } else {
+ tracing::info!(
+ task_id = %id,
+ daemon_id = %daemon_id,
+ "Sent InterruptTask before delete"
+ );
+ }
+ }
+ }
+ }
+
+ match repository::delete_task_for_owner(pool, id, auth.owner_id).await {
+ Ok(true) => StatusCode::NO_CONTENT.into_response(),
+ Ok(false) => (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response(),
+ Err(e) => {
+ tracing::error!("Failed to delete task {}: {}", id, e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ }
+}
+
+/// Start a task by sending it to an available daemon (scoped by owner).
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks/{id}/start",
+ params(
+ ("id" = Uuid, Path, description = "Task ID")
+ ),
+ responses(
+ (status = 200, description = "Task started", body = Task),
+ (status = 400, description = "Task cannot be started", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task not found", body = ApiError),
+ (status = 503, description = "Database not configured or no daemons available", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn start_task(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ headers: HeaderMap,
+ Path(id): Path<Uuid>,
+) -> impl IntoResponse {
+ // Extract authentication to log who is starting the task
+ let legacy_auth = extract_auth(&state, &headers);
+ match &legacy_auth {
+ AuthSource::ToolKey(orchestrator_id) => {
+ tracing::info!(
+ task_id = %id,
+ orchestrator_task_id = %orchestrator_id,
+ owner_id = %auth.owner_id,
+ "Orchestrator starting subtask via tool key"
+ );
+ }
+ AuthSource::Anonymous => {
+ tracing::info!(
+ task_id = %id,
+ owner_id = %auth.owner_id,
+ "Starting task (user request)"
+ );
+ }
+ AuthSource::UserToken(user_id) => {
+ tracing::info!(
+ task_id = %id,
+ user_id = %user_id,
+ owner_id = %auth.owner_id,
+ "Starting task via user token"
+ );
+ }
+ }
+
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get the task (scoped by owner)
+ let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task {}: {}", id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Check if task can be started (allow pending, failed, interrupted, done, or merged)
+ let startable_statuses = ["pending", "failed", "interrupted", "done", "merged"];
+ if !startable_statuses.contains(&task.status.as_str()) {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "INVALID_STATE",
+ format!("Task cannot be started from status: {}", task.status),
+ )),
+ )
+ .into_response();
+ }
+
+ // Find an available daemon belonging to this owner
+ let target_daemon_id = match state.daemon_connections
+ .iter()
+ .find(|d| d.value().owner_id == auth.owner_id)
+ {
+ Some(d) => d.value().id,
+ None => {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new(
+ "NO_DAEMON",
+ "No daemons connected for your account. Cannot start task.",
+ )),
+ )
+ .into_response();
+ }
+ };
+
+ // Check if this is an orchestrator (depth 0 with subtasks)
+ let subtask_count = match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await {
+ Ok(subtasks) => {
+ tracing::info!(
+ task_id = %id,
+ subtask_count = subtasks.len(),
+ subtask_ids = ?subtasks.iter().map(|s| s.id.to_string()).collect::<Vec<_>>(),
+ "Counted subtasks for orchestrator check"
+ );
+ subtasks.len()
+ },
+ Err(e) => {
+ tracing::warn!("Failed to check subtasks for {}: {}", id, e);
+ 0
+ }
+ };
+ let is_orchestrator = task.depth == 0 && subtask_count > 0;
+
+ tracing::info!(
+ task_id = %id,
+ task_depth = task.depth,
+ subtask_count = subtask_count,
+ is_orchestrator = is_orchestrator,
+ "Starting task with orchestrator determination"
+ );
+
+ // IMPORTANT: Update database FIRST to assign daemon_id before sending command
+ // This prevents race conditions where the task starts but daemon_id is not set
+ let update_req = UpdateTaskRequest {
+ status: Some("starting".to_string()),
+ daemon_id: Some(target_daemon_id),
+ version: Some(task.version),
+ ..Default::default()
+ };
+
+ let updated_task = match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to update task status: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Send SpawnTask command to daemon
+ let command = DaemonCommand::SpawnTask {
+ task_id: id,
+ task_name: task.name.clone(),
+ plan: task.plan.clone(),
+ repo_url: task.repository_url.clone(),
+ base_branch: task.base_branch.clone(),
+ target_branch: task.target_branch.clone(),
+ parent_task_id: task.parent_task_id,
+ depth: task.depth,
+ is_orchestrator,
+ target_repo_path: task.target_repo_path.clone(),
+ completion_action: task.completion_action.clone(),
+ continue_from_task_id: task.continue_from_task_id,
+ copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
+ };
+
+ if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
+ tracing::error!("Failed to send SpawnTask command: {}", e);
+ // Rollback: clear daemon_id and reset status since command failed
+ let rollback_req = UpdateTaskRequest {
+ status: Some("pending".to_string()),
+ clear_daemon_id: true, // Explicitly clear daemon_id
+ ..Default::default()
+ };
+ let _ = repository::update_task_for_owner(pool, id, auth.owner_id, rollback_req).await;
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DAEMON_ERROR", e)),
+ )
+ .into_response();
+ }
+
+ // Broadcast task update notification
+ state.broadcast_task_update(TaskUpdateNotification {
+ task_id: id,
+ owner_id: Some(auth.owner_id),
+ version: updated_task.version,
+ status: "starting".to_string(),
+ updated_fields: vec!["status".to_string(), "daemon_id".to_string()],
+ updated_by: "system".to_string(),
+ });
+
+ Json(updated_task).into_response()
+}
+
+/// Stop a running task (scoped by owner).
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks/{id}/stop",
+ params(
+ ("id" = Uuid, Path, description = "Task ID")
+ ),
+ responses(
+ (status = 200, description = "Task stopped", body = Task),
+ (status = 400, description = "Task is not running", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task not found", body = ApiError),
+ (status = 503, description = "Database not configured or daemon not connected", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn stop_task(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get the task (scoped by owner)
+ let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task {}: {}", id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Check if task is running/active
+ let is_active = matches!(
+ task.status.as_str(),
+ "running" | "starting" | "initializing" | "paused"
+ );
+ if !is_active {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "INVALID_STATE",
+ format!("Task cannot be stopped from status: {}", task.status),
+ )),
+ )
+ .into_response();
+ }
+
+ // Find the daemon running this task
+ let target_daemon_id = if let Some(daemon_id) = task.daemon_id {
+ daemon_id
+ } else {
+ // No daemon assigned, just update status directly
+ let update_req = UpdateTaskRequest {
+ status: Some("failed".to_string()),
+ error_message: Some("Task stopped by user".to_string()),
+ version: Some(task.version),
+ ..Default::default()
+ };
+
+ return match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
+ Ok(Some(updated_task)) => {
+ state.broadcast_task_update(TaskUpdateNotification {
+ task_id: id,
+ owner_id: Some(auth.owner_id),
+ version: updated_task.version,
+ status: "failed".to_string(),
+ updated_fields: vec!["status".to_string(), "error_message".to_string()],
+ updated_by: "user".to_string(),
+ });
+ Json(updated_task).into_response()
+ }
+ Ok(None) => (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response(),
+ Err(e) => {
+ tracing::error!("Failed to update task status: {}", e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ };
+ };
+
+ // Send InterruptTask command to daemon
+ let command = DaemonCommand::InterruptTask {
+ task_id: id,
+ graceful: false,
+ };
+
+ if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
+ tracing::warn!("Failed to send InterruptTask command: {}", e);
+ // Daemon might be disconnected - update task status directly
+ let update_req = UpdateTaskRequest {
+ status: Some("failed".to_string()),
+ error_message: Some("Task stopped by user (daemon unavailable)".to_string()),
+ version: Some(task.version),
+ ..Default::default()
+ };
+
+ return match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
+ Ok(Some(updated_task)) => {
+ state.broadcast_task_update(TaskUpdateNotification {
+ task_id: id,
+ owner_id: Some(auth.owner_id),
+ version: updated_task.version,
+ status: "failed".to_string(),
+ updated_fields: vec!["status".to_string(), "error_message".to_string()],
+ updated_by: "user".to_string(),
+ });
+ Json(updated_task).into_response()
+ }
+ Ok(None) => (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response(),
+ Err(e) => {
+ tracing::error!("Failed to update task status: {}", e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ };
+ }
+
+ // Update task status to "failed" (stopped)
+ let update_req = UpdateTaskRequest {
+ status: Some("failed".to_string()),
+ error_message: Some("Task stopped by user".to_string()),
+ version: Some(task.version),
+ ..Default::default()
+ };
+
+ match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
+ Ok(Some(updated_task)) => {
+ // Broadcast task update notification
+ state.broadcast_task_update(TaskUpdateNotification {
+ task_id: id,
+ owner_id: Some(auth.owner_id),
+ version: updated_task.version,
+ status: "failed".to_string(),
+ updated_fields: vec!["status".to_string(), "error_message".to_string()],
+ updated_by: "user".to_string(),
+ });
+
+ Json(updated_task).into_response()
+ }
+ Ok(None) => (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response(),
+ Err(e) => {
+ tracing::error!("Failed to update task status: {}", e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ }
+}
+
+/// Send a message to a running task's stdin (scoped by owner).
+///
+/// This can be used to provide input to Claude Code when it's waiting for user input,
+/// or to inject context/instructions into a running task.
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks/{id}/message",
+ params(
+ ("id" = Uuid, Path, description = "Task ID")
+ ),
+ request_body = SendMessageRequest,
+ responses(
+ (status = 200, description = "Message sent successfully"),
+ (status = 400, description = "Task is not running", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task not found", body = ApiError),
+ (status = 503, description = "Database not configured or daemon not connected", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn send_message(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+ Json(req): Json<SendMessageRequest>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get the task (scoped by owner)
+ let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task {}: {}", id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Check if task is running
+ if task.status != "running" {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "INVALID_STATE",
+ format!(
+ "Cannot send message to task in status: {}. Task must be running.",
+ task.status
+ ),
+ )),
+ )
+ .into_response();
+ }
+
+ // Find the daemon running this task
+ let target_daemon_id = if let Some(daemon_id) = task.daemon_id {
+ daemon_id
+ } else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new(
+ "NO_DAEMON",
+ "Task has no assigned daemon. Cannot send message.",
+ )),
+ )
+ .into_response();
+ };
+
+ // Send SendMessage command to daemon
+ let command = DaemonCommand::SendMessage {
+ task_id: id,
+ message: req.message.clone(),
+ };
+
+ if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
+ tracing::error!("Failed to send SendMessage command: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DAEMON_ERROR", e)),
+ )
+ .into_response();
+ }
+
+ tracing::info!(task_id = %id, message_len = req.message.len(), "Message sent to task");
+
+ // Return success
+ (
+ StatusCode::OK,
+ Json(serde_json::json!({
+ "success": true,
+ "taskId": id,
+ "messageLength": req.message.len()
+ })),
+ )
+ .into_response()
+}
+
+/// Get task output history (scoped by owner).
+///
+/// Retrieves all recorded output from a task's Claude Code process.
+/// This allows the frontend to fetch missed output when subscribing late
+/// or reconnecting after a disconnect.
+#[utoipa::path(
+ get,
+ path = "/api/v1/mesh/tasks/{id}/output",
+ params(
+ ("id" = Uuid, Path, description = "Task ID")
+ ),
+ responses(
+ (status = 200, description = "Task output history", body = TaskOutputResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn get_task_output(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Verify task exists and belongs to owner
+ match repository::get_task_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(_)) => {}
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ }
+
+ // Get output history (task already verified to belong to owner)
+ match repository::get_task_output(pool, id, None).await {
+ Ok(events) => {
+ let entries: Vec<TaskOutputEntry> = events
+ .into_iter()
+ .filter_map(TaskOutputEntry::from_task_event)
+ .collect();
+ let total = entries.len();
+
+ Json(TaskOutputResponse {
+ entries,
+ total,
+ task_id: id,
+ })
+ .into_response()
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task output: {}", e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ }
+}
+
+/// List subtasks for a parent task (scoped by owner).
+#[utoipa::path(
+ get,
+ path = "/api/v1/mesh/tasks/{id}/subtasks",
+ params(
+ ("id" = Uuid, Path, description = "Parent task ID")
+ ),
+ responses(
+ (status = 200, description = "List of subtasks", body = TaskListResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn list_subtasks(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await {
+ Ok(tasks) => {
+ let total = tasks.len() as i64;
+ Json(TaskListResponse { tasks, total }).into_response()
+ }
+ Err(e) => {
+ tracing::error!("Failed to list subtasks for task {}: {}", id, e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ }
+}
+
+/// List events for a task (scoped by owner).
+#[utoipa::path(
+ get,
+ path = "/api/v1/mesh/tasks/{id}/events",
+ params(
+ ("id" = Uuid, Path, description = "Task ID")
+ ),
+ responses(
+ (status = 200, description = "List of task events", body = TaskEventListResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn list_task_events(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Verify task exists and belongs to owner
+ match repository::get_task_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(_)) => {}
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ }
+
+ match repository::list_task_events(pool, id, None).await {
+ Ok(events) => {
+ let total = events.len() as i64;
+ Json(TaskEventListResponse { events, total }).into_response()
+ }
+ Err(e) => {
+ tracing::error!("Failed to list events for task {}: {}", id, e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ }
+}
+
+/// Retry completion action for a completed task (scoped by owner).
+///
+/// This allows retrying a completion action (push branch, merge, create PR)
+/// after filling in the target_repo_path if it wasn't set when the task completed.
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks/{id}/retry-completion",
+ params(
+ ("id" = Uuid, Path, description = "Task ID")
+ ),
+ responses(
+ (status = 200, description = "Completion action initiated"),
+ (status = 400, description = "Invalid request (task not completed, no completion action, etc.)", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task not found", body = ApiError),
+ (status = 503, description = "Database not configured or daemon not connected", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn retry_completion_action(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get the task (scoped by owner)
+ let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task {}: {}", id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Check if task is in a terminal state
+ let terminal_statuses = ["done", "failed", "merged"];
+ if !terminal_statuses.contains(&task.status.as_str()) {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "INVALID_STATE",
+ format!(
+ "Task must be completed to retry completion action. Current status: {}",
+ task.status
+ ),
+ )),
+ )
+ .into_response();
+ }
+
+ // Check if completion action is set
+ let action = match &task.completion_action {
+ Some(action) if action != "none" => action.clone(),
+ _ => {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "NO_COMPLETION_ACTION",
+ "Task has no completion action configured (or is set to 'none')",
+ )),
+ )
+ .into_response();
+ }
+ };
+
+ // Check if target_repo_path is set
+ let target_repo_path = match &task.target_repo_path {
+ Some(path) if !path.is_empty() => path.clone(),
+ _ => {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "NO_TARGET_REPO",
+ "Target repository path must be set before retrying completion action",
+ )),
+ )
+ .into_response();
+ }
+ };
+
+ // Note: We don't check overlay_path here because the server may not have it
+ // The daemon will scan its worktrees directory to find the worktree by task ID
+
+ // Find a daemon to execute the action (must belong to this owner)
+ // Prefer the daemon that ran the task, but fall back to any available daemon for this owner
+ let target_daemon_id = if let Some(daemon_id) = task.daemon_id {
+ // Check if this daemon is still connected and belongs to this owner
+ if state.daemon_connections.iter().any(|d| d.value().id == daemon_id && d.value().owner_id == auth.owner_id) {
+ daemon_id
+ } else {
+ // Fall back to any connected daemon for this owner
+ match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) {
+ Some(d) => d.value().id,
+ None => {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new(
+ "NO_DAEMON",
+ "No daemons connected for your account. Cannot execute completion action.",
+ )),
+ )
+ .into_response();
+ }
+ }
+ }
+ } else {
+ // No daemon assigned - use any available for this owner
+ match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) {
+ Some(d) => d.value().id,
+ None => {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new(
+ "NO_DAEMON",
+ "No daemons connected for your account. Cannot execute completion action.",
+ )),
+ )
+ .into_response();
+ }
+ }
+ };
+
+ // Send RetryCompletionAction command to daemon
+ let command = DaemonCommand::RetryCompletionAction {
+ task_id: id,
+ task_name: task.name.clone(),
+ action: action.clone(),
+ target_repo_path: target_repo_path.clone(),
+ target_branch: task.target_branch.clone(),
+ };
+
+ if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
+ tracing::error!("Failed to send RetryCompletionAction command: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DAEMON_ERROR", e)),
+ )
+ .into_response();
+ }
+
+ tracing::info!(
+ task_id = %id,
+ action = %action,
+ target_repo = %target_repo_path,
+ "Retry completion action initiated"
+ );
+
+ (
+ StatusCode::OK,
+ Json(serde_json::json!({
+ "success": true,
+ "taskId": id,
+ "action": action,
+ "targetRepoPath": target_repo_path,
+ "message": "Completion action initiated. Check task output for results."
+ })),
+ )
+ .into_response()
+}
+
+// =============================================================================
+// Daemon Handlers
+// =============================================================================
+
+/// List all connected daemons (requires authentication).
+#[utoipa::path(
+ get,
+ path = "/api/v1/mesh/daemons",
+ responses(
+ (status = 200, description = "List of daemons", body = DaemonListResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn list_daemons(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Only list daemons belonging to this owner
+ match repository::list_daemons_for_owner(pool, auth.owner_id).await {
+ Ok(daemons) => {
+ let total = daemons.len() as i64;
+ Json(DaemonListResponse { daemons, total }).into_response()
+ }
+ Err(e) => {
+ tracing::error!("Failed to list daemons: {}", e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ }
+}
+
+/// Get a single daemon by ID (requires authentication).
+#[utoipa::path(
+ get,
+ path = "/api/v1/mesh/daemons/{id}",
+ params(
+ ("id" = Uuid, Path, description = "Daemon ID")
+ ),
+ responses(
+ (status = 200, description = "Daemon details", body = crate::db::models::Daemon),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Daemon not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn get_daemon(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Only get daemon if it belongs to this owner
+ match repository::get_daemon_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(daemon)) => Json(daemon).into_response(),
+ Ok(None) => (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Daemon not found")),
+ )
+ .into_response(),
+ Err(e) => {
+ tracing::error!("Failed to get daemon {}: {}", id, e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ }
+}
+
+/// Get suggested directories from connected daemons (requires authentication).
+///
+/// Returns directories that can be used as target_repo_path for completion actions.
+#[utoipa::path(
+ get,
+ path = "/api/v1/mesh/daemons/directories",
+ responses(
+ (status = 200, description = "List of suggested directories", body = DaemonDirectoriesResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn get_daemon_directories(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+) -> impl IntoResponse {
+ let mut directories = Vec::new();
+
+ // Iterate over connected daemons belonging to this owner and collect their directories
+ for entry in state.daemon_connections.iter() {
+ let daemon = entry.value();
+
+ // Only include daemons belonging to this owner
+ if daemon.owner_id != auth.owner_id {
+ continue;
+ }
+
+ // Add working directory if available
+ if let Some(ref working_dir) = daemon.working_directory {
+ directories.push(DaemonDirectory {
+ path: working_dir.clone(),
+ label: "Working Directory".to_string(),
+ directory_type: "working".to_string(),
+ hostname: daemon.hostname.clone(),
+ exists: None,
+ });
+ }
+
+ // Add home directory if available (for cloning completed work)
+ if let Some(ref home_dir) = daemon.home_directory {
+ directories.push(DaemonDirectory {
+ path: home_dir.clone(),
+ label: "Makima Home".to_string(),
+ directory_type: "home".to_string(),
+ hostname: daemon.hostname.clone(),
+ exists: None,
+ });
+ }
+ }
+
+ Json(DaemonDirectoriesResponse { directories })
+}
+
+/// Request to clone a worktree to a target directory.
+#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct CloneWorktreeRequest {
+ /// Path to the target directory.
+ pub target_dir: String,
+}
+
+/// Clone a task's worktree to a target directory (scoped by owner).
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks/{id}/clone",
+ params(
+ ("id" = Uuid, Path, description = "Task ID")
+ ),
+ request_body = CloneWorktreeRequest,
+ responses(
+ (status = 200, description = "Clone command sent"),
+ (status = 400, description = "Invalid request or task not completed", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task not found", body = ApiError),
+ (status = 503, description = "Database not configured or daemon not connected", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn clone_worktree(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+ Json(body): Json<CloneWorktreeRequest>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get the task (scoped by owner)
+ let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task {}: {}", id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Verify task is in a completed state
+ let is_completed = matches!(task.status.as_str(), "done" | "failed" | "merged");
+ if !is_completed {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "INVALID_STATE",
+ format!("Task must be completed to clone (current status: {})", task.status),
+ )),
+ )
+ .into_response();
+ }
+
+ // Find a connected daemon belonging to this owner to send the command
+ let daemon_entry = state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id);
+ let daemon_id = match daemon_entry {
+ Some(entry) => entry.value().id,
+ None => {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")),
+ )
+ .into_response();
+ }
+ };
+
+ // Send CloneWorktree command to daemon
+ let command = DaemonCommand::CloneWorktree {
+ task_id: id,
+ target_dir: body.target_dir.clone(),
+ };
+
+ if let Err(e) = state.send_daemon_command(daemon_id, command).await {
+ tracing::error!("Failed to send CloneWorktree command: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DAEMON_ERROR", e)),
+ )
+ .into_response();
+ }
+
+ Json(serde_json::json!({
+ "status": "cloning",
+ "taskId": id.to_string(),
+ "targetDir": body.target_dir,
+ }))
+ .into_response()
+}
+
+/// Request to check if a target directory exists.
+#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct CheckTargetExistsRequest {
+ /// Path to check.
+ pub target_dir: String,
+}
+
+/// Response for check target exists.
+#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct CheckTargetExistsResponse {
+ /// Whether the target directory exists.
+ pub exists: bool,
+ /// The path that was checked (expanded).
+ pub target_dir: String,
+}
+
+/// Check if a target directory exists (for clone validation, requires authentication).
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks/{id}/check-target",
+ params(
+ ("id" = Uuid, Path, description = "Task ID")
+ ),
+ request_body = CheckTargetExistsRequest,
+ responses(
+ (status = 200, description = "Check result", body = CheckTargetExistsResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 503, description = "No daemon connected", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn check_target_exists(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+ Json(body): Json<CheckTargetExistsRequest>,
+) -> impl IntoResponse {
+ // Find a connected daemon belonging to this owner to send the command
+ let daemon_entry = state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id);
+ let daemon_id = match daemon_entry {
+ Some(entry) => entry.value().id,
+ None => {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")),
+ )
+ .into_response();
+ }
+ };
+
+ // Send CheckTargetExists command to daemon
+ let command = DaemonCommand::CheckTargetExists {
+ task_id: id,
+ target_dir: body.target_dir.clone(),
+ };
+
+ if let Err(e) = state.send_daemon_command(daemon_id, command).await {
+ tracing::error!("Failed to send CheckTargetExists command: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DAEMON_ERROR", e)),
+ )
+ .into_response();
+ }
+
+ // The actual result will be sent back via WebSocket
+ // For now, just acknowledge the request was sent
+ Json(serde_json::json!({
+ "status": "checking",
+ "taskId": id.to_string(),
+ "targetDir": body.target_dir,
+ }))
+ .into_response()
+}