//! HTTP handlers for task and daemon mesh operations. use axum::{ extract::{Path, State}, http::{HeaderMap, StatusCode}, response::IntoResponse, Json, }; use base64::Engine; use std::time::Duration; use tokio::sync::oneshot; use uuid::Uuid; use crate::db::models::{ BranchTaskRequest, BranchTaskResponse, CreateTaskRequest, DaemonDirectory, DaemonDirectoriesResponse, DaemonListResponse, SendMessageRequest, Task, TaskEventListResponse, TaskListResponse, TaskOutputEntry, TaskOutputResponse, TaskWithSubtasks, UpdateTaskRequest, }; use crate::db::repository::{self, RepositoryError}; use crate::server::auth::Authenticated; use crate::server::messages::ApiError; use crate::server::state::{DaemonCommand, DaemonReauthStatus, SharedState, TaskUpdateNotification}; // ============================================================================= // Authentication Types // ============================================================================= /// Source of authentication for mesh endpoints. #[derive(Debug, Clone)] pub enum AuthSource { /// Authenticated via tool key (orchestrator accessing API). /// Contains the task ID that owns this key. ToolKey(Uuid), /// Authenticated via user token (web client). /// Contains the user ID. (Not implemented yet) #[allow(dead_code)] UserToken(Uuid), /// No authentication provided (anonymous access). Anonymous, } /// Header name for tool key authentication. pub const TOOL_KEY_HEADER: &str = "x-makima-tool-key"; /// Extract authentication source from request headers. /// /// Checks for: /// 1. `X-Makima-Tool-Key` header for orchestrator tool access /// 2. `Authorization: Bearer` header for user access (future) /// 3. Falls back to Anonymous if no auth provided pub fn extract_auth(state: &SharedState, headers: &HeaderMap) -> AuthSource { // Check for tool key header first if let Some(tool_key) = headers.get(TOOL_KEY_HEADER) { if let Ok(key_str) = tool_key.to_str() { if let Some(task_id) = state.validate_tool_key(key_str) { return AuthSource::ToolKey(task_id); } tracing::warn!("Invalid tool key provided: {}", key_str); } } // Check for Authorization header (future user auth) if let Some(auth_header) = headers.get("authorization") { if let Ok(auth_str) = auth_header.to_str() { if auth_str.starts_with("Bearer ") { // Future: validate JWT and extract user ID tracing::debug!("Bearer token auth not yet implemented"); } } } // Default to anonymous AuthSource::Anonymous } // ============================================================================= // Task Handlers // ============================================================================= /// Query parameters for `list_tasks`. Currently only the `orphan` filter — /// when set, returns tasks with NO parent_task_id AND NO directive_id, used /// by the document-mode sidebar's `tmp/` folder. #[derive(Debug, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct ListTasksQuery { #[serde(default)] pub orphan: bool, } /// List all tasks for the current owner. Pass `?orphan=true` to restrict to /// top-level tasks with no directive attachment. #[utoipa::path( get, path = "/api/v1/mesh/tasks", params( ("orphan" = Option, Query, description = "Filter to tasks with no directive_id and no parent_task_id"), ), responses( (status = 200, description = "List of tasks", body = TaskListResponse), (status = 401, description = "Unauthorized", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn list_tasks( State(state): State, Authenticated(auth): Authenticated, axum::extract::Query(query): axum::extract::Query, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; let result = if query.orphan { // Backed by the per-owner tmp directive going forward — see // `list_tmp_tasks_for_owner` for the semantics. The query parameter // name (`?orphan=true`) is preserved for backwards compatibility // with existing frontend callers. repository::list_tmp_tasks_for_owner(pool, auth.owner_id).await } else { repository::list_tasks_for_owner(pool, auth.owner_id).await }; match result { Ok(tasks) => { let total = tasks.len() as i64; Json(TaskListResponse { tasks, total }).into_response() } Err(e) => { tracing::error!("Failed to list tasks: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response() } } } /// Get a single task by ID with its subtasks (scoped by owner). #[utoipa::path( get, path = "/api/v1/mesh/tasks/{id}", params( ("id" = Uuid, Path, description = "Task ID") ), responses( (status = 200, description = "Task details with subtasks", body = TaskWithSubtasks), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Task not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn get_task( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; match repository::get_task_for_owner(pool, id, auth.owner_id).await { Ok(Some(task)) => { // Get subtasks for this task (also scoped by owner) match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await { Ok(subtasks) => Json(TaskWithSubtasks { task, subtasks }).into_response(), Err(e) => { tracing::error!("Failed to get subtasks for task {}: {}", id, e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response() } } } Ok(None) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(), Err(e) => { tracing::error!("Failed to get task {}: {}", id, e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response() } } } /// Create a new task. #[utoipa::path( post, path = "/api/v1/mesh/tasks", request_body = CreateTaskRequest, responses( (status = 201, description = "Task created", body = Task), (status = 400, description = "Invalid request", body = ApiError), (status = 401, description = "Unauthorized", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn create_task( State(state): State, Authenticated(auth): Authenticated, Json(mut req): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Every top-level task must live under SOME directive going forward — // the unified directive surface is the only way users see tasks. If a // caller doesn't supply directive_id, attach to the owner's tmp // (scratchpad) directive, auto-creating it if needed. Subtasks // (parent_task_id set) inherit their parent's directive linkage and // are fine without an explicit directive_id. if req.directive_id.is_none() && req.parent_task_id.is_none() { match repository::get_or_create_tmp_directive(pool, auth.owner_id).await { Ok(tmp) => { req.directive_id = Some(tmp.id); } Err(e) => { tracing::error!( owner_id = %auth.owner_id, error = %e, "Failed to provision tmp directive for orphan task" ); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("TMP_PROVISION_FAILED", &e.to_string())), ) .into_response(); } } } match repository::create_task_for_owner(pool, auth.owner_id, req).await { Ok(task) => { // Record history event for task creation let _ = repository::record_history_event( pool, auth.owner_id, task.contract_id, Some(task.id), "task", Some("created"), None, serde_json::json!({ "name": &task.name, "isSupervisor": task.is_supervisor, }), ).await; // Notify supervisor of new task creation if task belongs to a contract if let Some(contract_id) = task.contract_id { if !task.is_supervisor { let pool = pool.clone(); let state_clone = state.clone(); let task_clone = task.clone(); tokio::spawn(async move { if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await { state_clone.notify_supervisor_of_task_created( supervisor.id, supervisor.daemon_id, task_clone.id, &task_clone.name, ).await; } }); } } (StatusCode::CREATED, Json(task)).into_response() } Err(e) => { tracing::error!("Failed to create task: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response() } } } /// Update an existing task (scoped by owner). #[utoipa::path( put, path = "/api/v1/mesh/tasks/{id}", params( ("id" = Uuid, Path, description = "Task ID") ), request_body = UpdateTaskRequest, responses( (status = 200, description = "Task updated", body = Task), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Task not found", body = ApiError), (status = 409, description = "Version conflict", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn update_task( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, Json(req): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Check if trying to set a supervisor task to a terminal status if let Some(ref new_status) = req.status { let terminal_statuses = ["done", "failed", "merged"]; if terminal_statuses.contains(&new_status.as_str()) { // Get the task to check if it's a supervisor if let Ok(Some(task)) = repository::get_task_for_owner(pool, id, auth.owner_id).await { if task.is_supervisor { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( "SUPERVISOR_CANNOT_COMPLETE", "Supervisor tasks cannot be marked as done, failed, or merged. They run for the lifetime of the contract.", )), ) .into_response(); } } } } // Track which fields are being updated for the notification let mut updated_fields = Vec::new(); if req.name.is_some() { updated_fields.push("name".to_string()); } if req.description.is_some() { updated_fields.push("description".to_string()); } if req.status.is_some() { updated_fields.push("status".to_string()); } if req.priority.is_some() { updated_fields.push("priority".to_string()); } if req.plan.is_some() { updated_fields.push("plan".to_string()); } if req.progress_summary.is_some() { updated_fields.push("progress_summary".to_string()); } if req.error_message.is_some() { updated_fields.push("error_message".to_string()); } match repository::update_task_for_owner(pool, id, auth.owner_id, req).await { Ok(Some(task)) => { let updated_fields_clone = updated_fields.clone(); // Broadcast task update notification state.broadcast_task_update(TaskUpdateNotification { task_id: task.id, owner_id: Some(auth.owner_id), version: task.version, status: task.status.clone(), updated_fields, updated_by: "user".to_string(), }); // Notify supervisor of status change if task belongs to a contract if let Some(contract_id) = task.contract_id { if !task.is_supervisor && updated_fields_clone.contains(&"status".to_string()) { let pool = pool.clone(); let state_clone = state.clone(); let task_clone = task.clone(); tokio::spawn(async move { if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await { state_clone.notify_supervisor_of_task_update( supervisor.id, supervisor.daemon_id, task_clone.id, &task_clone.name, &task_clone.status, &updated_fields_clone, ).await; } }); } } Json(task).into_response() } Ok(None) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(), Err(RepositoryError::VersionConflict { expected, actual }) => { tracing::info!( "Version conflict on task {}: expected {}, actual {}", id, expected, actual ); ( StatusCode::CONFLICT, Json(serde_json::json!({ "code": "VERSION_CONFLICT", "message": format!( "Task was modified by another user. Expected version {}, actual version {}", expected, actual ), "expectedVersion": expected, "actualVersion": actual, })), ) .into_response() } Err(RepositoryError::Database(e)) => { tracing::error!("Failed to update task {}: {}", id, e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response() } } } /// Delete a task (scoped by owner). #[utoipa::path( delete, path = "/api/v1/mesh/tasks/{id}", params( ("id" = Uuid, Path, description = "Task ID") ), responses( (status = 204, description = "Task deleted"), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Task not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn delete_task( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Get the task first to check if it's running and needs to be stopped if let Ok(Some(task)) = repository::get_task_for_owner(pool, id, auth.owner_id).await { let is_active = matches!( task.status.as_str(), "running" | "starting" | "initializing" | "paused" ); // If task is active and has a daemon, send interrupt command if is_active { if let Some(daemon_id) = task.daemon_id { let command = DaemonCommand::InterruptTask { task_id: id, graceful: false, }; if let Err(e) = state.send_daemon_command(daemon_id, command).await { tracing::warn!( task_id = %id, daemon_id = %daemon_id, "Failed to send InterruptTask before delete: {}", e ); } else { tracing::info!( task_id = %id, daemon_id = %daemon_id, "Sent InterruptTask before delete" ); } } } } // Clean up any pending supervisor questions for this task state.remove_pending_questions_for_task(id); match repository::delete_task_for_owner(pool, id, auth.owner_id).await { Ok(true) => StatusCode::NO_CONTENT.into_response(), Ok(false) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(), Err(e) => { tracing::error!("Failed to delete task {}: {}", id, e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response() } } } /// Start a task by sending it to an available daemon (scoped by owner). #[utoipa::path( post, path = "/api/v1/mesh/tasks/{id}/start", params( ("id" = Uuid, Path, description = "Task ID") ), responses( (status = 200, description = "Task started", body = Task), (status = 400, description = "Task cannot be started", body = ApiError), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Task not found", body = ApiError), (status = 503, description = "Database not configured or no daemons available", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn start_task( State(state): State, Authenticated(auth): Authenticated, headers: HeaderMap, Path(id): Path, ) -> impl IntoResponse { // Extract authentication to log who is starting the task let legacy_auth = extract_auth(&state, &headers); match &legacy_auth { AuthSource::ToolKey(orchestrator_id) => { tracing::info!( task_id = %id, orchestrator_task_id = %orchestrator_id, owner_id = %auth.owner_id, "Orchestrator starting subtask via tool key" ); } AuthSource::Anonymous => { tracing::info!( task_id = %id, owner_id = %auth.owner_id, "Starting task (user request)" ); } AuthSource::UserToken(user_id) => { tracing::info!( task_id = %id, user_id = %user_id, owner_id = %auth.owner_id, "Starting task via user token" ); } } let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Get the task (scoped by owner) let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get task {}: {}", id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Check if task can be started (allow pending, failed, interrupted, done, or merged) let startable_statuses = ["pending", "failed", "interrupted", "done", "merged"]; if !startable_statuses.contains(&task.status.as_str()) { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( "INVALID_STATE", format!("Task cannot be started from status: {}", task.status), )), ) .into_response(); } // Get local_only and auto_merge_local flags from contract if task has one let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id { match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local), _ => (false, false), } } else { (false, false) }; // Get list of daemons that have previously failed this task let mut exclude_daemon_ids: Vec = task.failed_daemon_ids.clone().unwrap_or_default(); // Find an available daemon belonging to this owner, excluding failed ones let target_daemon_id = match state.find_alternative_daemon(auth.owner_id, &exclude_daemon_ids) { Some(id) => id, None => { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new( "NO_DAEMON", "No daemons connected for your account. Cannot start task.", )), ) .into_response(); } }; // Check if this is an orchestrator (depth 0 with subtasks) let subtask_count = match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await { Ok(subtasks) => { tracing::info!( task_id = %id, subtask_count = subtasks.len(), subtask_ids = ?subtasks.iter().map(|s| s.id.to_string()).collect::>(), "Counted subtasks for orchestrator check" ); subtasks.len() }, Err(e) => { tracing::warn!("Failed to check subtasks for {}: {}", id, e); 0 } }; let is_orchestrator = task.depth == 0 && subtask_count > 0; tracing::info!( task_id = %id, task_depth = task.depth, subtask_count = subtask_count, is_orchestrator = is_orchestrator, is_supervisor = task.is_supervisor, "Starting task with orchestrator/supervisor determination" ); // IMPORTANT: Update database FIRST to assign daemon_id before sending command // This prevents race conditions where the task starts but daemon_id is not set let update_req = UpdateTaskRequest { status: Some("starting".to_string()), daemon_id: Some(target_daemon_id), version: Some(task.version), ..Default::default() }; let updated_task = match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to update task status: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Send SpawnTask command to daemon let command = DaemonCommand::SpawnTask { task_id: id, task_name: task.name.clone(), plan: task.plan.clone(), repo_url: task.repository_url.clone(), base_branch: task.base_branch.clone(), target_branch: task.target_branch.clone(), parent_task_id: task.parent_task_id, depth: task.depth, is_orchestrator, target_repo_path: task.target_repo_path.clone(), completion_action: task.completion_action.clone(), continue_from_task_id: task.continue_from_task_id, copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, autonomous_loop: false, resume_session: false, conversation_history: None, patch_data: None, patch_base_sha: None, local_only, auto_merge_local, supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: task.directive_id, }; tracing::info!( task_id = %id, is_supervisor = task.is_supervisor, is_orchestrator = is_orchestrator, daemon_id = %target_daemon_id, "Sending SpawnTask command to daemon" ); if let Err(e) = state.send_daemon_command(target_daemon_id, command.clone()).await { tracing::warn!( task_id = %id, daemon_id = %target_daemon_id, error = %e, "Failed to send SpawnTask command, trying alternative daemon" ); // Add this daemon to exclude list and try another exclude_daemon_ids.push(target_daemon_id); // Try to find an alternative daemon if let Some(alt_daemon_id) = state.find_alternative_daemon(auth.owner_id, &exclude_daemon_ids) { // Update task with new daemon let alt_update_req = UpdateTaskRequest { daemon_id: Some(alt_daemon_id), ..Default::default() }; if let Ok(Some(alt_updated_task)) = repository::update_task_for_owner(pool, id, auth.owner_id, alt_update_req).await { // Recreate command with same data but try new daemon let alt_command = DaemonCommand::SpawnTask { task_id: id, task_name: task.name.clone(), plan: task.plan.clone(), repo_url: task.repository_url.clone(), base_branch: task.base_branch.clone(), target_branch: task.target_branch.clone(), parent_task_id: task.parent_task_id, depth: task.depth, is_orchestrator, target_repo_path: task.target_repo_path.clone(), completion_action: task.completion_action.clone(), continue_from_task_id: task.continue_from_task_id, copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, autonomous_loop: false, resume_session: false, conversation_history: None, patch_data: None, patch_base_sha: None, local_only, auto_merge_local, supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: task.directive_id, }; if state.send_daemon_command(alt_daemon_id, alt_command).await.is_ok() { tracing::info!( task_id = %id, old_daemon_id = %target_daemon_id, new_daemon_id = %alt_daemon_id, "Task started on alternative daemon after first daemon failed" ); // Broadcast task update notification state.broadcast_task_update(TaskUpdateNotification { task_id: id, owner_id: Some(auth.owner_id), version: alt_updated_task.version, status: "starting".to_string(), updated_fields: vec!["status".to_string(), "daemon_id".to_string()], updated_by: "system".to_string(), }); return Json(alt_updated_task).into_response(); } } } // All daemons failed - rollback and return error tracing::error!("Failed to start task on any daemon: {}", e); let rollback_req = UpdateTaskRequest { status: Some("pending".to_string()), clear_daemon_id: true, ..Default::default() }; let _ = repository::update_task_for_owner(pool, id, auth.owner_id, rollback_req).await; return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DAEMON_ERROR", "Failed to start task on any available daemon")), ) .into_response(); } // Broadcast task update notification state.broadcast_task_update(TaskUpdateNotification { task_id: id, owner_id: Some(auth.owner_id), version: updated_task.version, status: "starting".to_string(), updated_fields: vec!["status".to_string(), "daemon_id".to_string()], updated_by: "system".to_string(), }); Json(updated_task).into_response() } /// Stop a running task (scoped by owner). #[utoipa::path( post, path = "/api/v1/mesh/tasks/{id}/stop", params( ("id" = Uuid, Path, description = "Task ID") ), responses( (status = 200, description = "Task stopped", body = Task), (status = 400, description = "Task is not running", body = ApiError), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Task not found", body = ApiError), (status = 503, description = "Database not configured or daemon not connected", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn stop_task( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Get the task (scoped by owner) let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get task {}: {}", id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Check if task is running/active let is_active = matches!( task.status.as_str(), "running" | "starting" | "initializing" | "paused" ); if !is_active { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( "INVALID_STATE", format!("Task cannot be stopped from status: {}", task.status), )), ) .into_response(); } // Find the daemon running this task let target_daemon_id = if let Some(daemon_id) = task.daemon_id { daemon_id } else { // No daemon assigned, just update status directly let update_req = UpdateTaskRequest { status: Some("failed".to_string()), error_message: Some("Task stopped by user".to_string()), version: Some(task.version), ..Default::default() }; return match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await { Ok(Some(updated_task)) => { state.broadcast_task_update(TaskUpdateNotification { task_id: id, owner_id: Some(auth.owner_id), version: updated_task.version, status: "failed".to_string(), updated_fields: vec!["status".to_string(), "error_message".to_string()], updated_by: "user".to_string(), }); Json(updated_task).into_response() } Ok(None) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(), Err(e) => { tracing::error!("Failed to update task status: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response() } }; }; // Send InterruptTask command to daemon let command = DaemonCommand::InterruptTask { task_id: id, graceful: false, }; if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { tracing::warn!("Failed to send InterruptTask command: {}", e); // Daemon might be disconnected - update task status directly let update_req = UpdateTaskRequest { status: Some("failed".to_string()), error_message: Some("Task stopped by user (daemon unavailable)".to_string()), version: Some(task.version), ..Default::default() }; return match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await { Ok(Some(updated_task)) => { state.broadcast_task_update(TaskUpdateNotification { task_id: id, owner_id: Some(auth.owner_id), version: updated_task.version, status: "failed".to_string(), updated_fields: vec!["status".to_string(), "error_message".to_string()], updated_by: "user".to_string(), }); Json(updated_task).into_response() } Ok(None) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(), Err(e) => { tracing::error!("Failed to update task status: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response() } }; } // Update task status to "failed" (stopped) let update_req = UpdateTaskRequest { status: Some("failed".to_string()), error_message: Some("Task stopped by user".to_string()), version: Some(task.version), ..Default::default() }; match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await { Ok(Some(updated_task)) => { // Broadcast task update notification state.broadcast_task_update(TaskUpdateNotification { task_id: id, owner_id: Some(auth.owner_id), version: updated_task.version, status: "failed".to_string(), updated_fields: vec!["status".to_string(), "error_message".to_string()], updated_by: "user".to_string(), }); Json(updated_task).into_response() } Ok(None) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(), Err(e) => { tracing::error!("Failed to update task status: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response() } } } /// Send a message to a running task's stdin (scoped by owner). /// /// This can be used to provide input to Claude Code when it's waiting for user input, /// or to inject context/instructions into a running task. #[utoipa::path( post, path = "/api/v1/mesh/tasks/{id}/message", params( ("id" = Uuid, Path, description = "Task ID") ), request_body = SendMessageRequest, responses( (status = 200, description = "Message sent successfully"), (status = 400, description = "Task is not running", body = ApiError), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Task not found", body = ApiError), (status = 503, description = "Database not configured or daemon not connected", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn send_message( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, Json(req): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Get the task (scoped by owner) let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get task {}: {}", id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Check if task is in a state that can receive messages // Allow "running" and "starting" (to handle race between status update and message send) // Also allow AUTH_CODE messages and supervisor tasks regardless of status let is_auth_code = req.message.starts_with("AUTH_CODE:"); let is_supervisor = task.is_supervisor; let can_receive_message = task.status == "running" || task.status == "starting"; if !can_receive_message && !is_auth_code && !is_supervisor { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( "INVALID_STATE", format!( "Cannot send message to task in status: {}. Task must be running or starting.", task.status ), )), ) .into_response(); } // Find the daemon running this task // For supervisors, if no daemon is assigned, find any available daemon for this owner let target_daemon_id = if let Some(daemon_id) = task.daemon_id { daemon_id } else if is_supervisor { // Supervisor without daemon - find one match state.daemon_connections .iter() .find(|d| d.value().owner_id == auth.owner_id) { Some(entry) => entry.value().id, None => { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new( "NO_DAEMON", "No daemon available. Please start a daemon.", )), ) .into_response(); } } } else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new( "NO_DAEMON", "Task has no assigned daemon. Cannot send message.", )), ) .into_response(); }; // Check if daemon is connected before trying to send if !state.is_daemon_connected(target_daemon_id) { tracing::warn!( task_id = %id, daemon_id = %target_daemon_id, "Daemon not connected, attempting to reallocate task" ); // Get list of failed daemons for this task let mut exclude_daemons = task.failed_daemon_ids.clone().unwrap_or_default(); exclude_daemons.push(target_daemon_id); // Try to find an alternative daemon if let Some(new_daemon_id) = state.find_alternative_daemon(auth.owner_id, &exclude_daemons) { // Mark the task for retry and update with new daemon if let Some(ref pool) = state.db_pool { // Mark the old daemon as failed for this task let _ = repository::mark_task_for_retry(pool, id, target_daemon_id).await; // Update task with new daemon and restart let update_req = UpdateTaskRequest { status: Some("starting".to_string()), daemon_id: Some(new_daemon_id), ..Default::default() }; if let Ok(Some(updated_task)) = repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await { // Get local_only and auto_merge_local from contract if task has one let (local_only, auto_merge_local) = if let Some(contract_id) = updated_task.contract_id { match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local), _ => (false, false), } } else { (false, false) }; // Send spawn command to new daemon let spawn_cmd = DaemonCommand::SpawnTask { task_id: id, task_name: updated_task.name.clone(), plan: updated_task.plan.clone(), repo_url: updated_task.repository_url.clone(), base_branch: updated_task.base_branch.clone(), target_branch: updated_task.target_branch.clone(), parent_task_id: updated_task.parent_task_id, depth: updated_task.depth, is_orchestrator: false, target_repo_path: updated_task.target_repo_path.clone(), completion_action: updated_task.completion_action.clone(), continue_from_task_id: updated_task.continue_from_task_id, copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: updated_task.contract_id, is_supervisor: updated_task.is_supervisor, autonomous_loop: false, resume_session: false, conversation_history: None, patch_data: None, patch_base_sha: None, local_only, auto_merge_local, supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: updated_task.directive_id, }; if state.send_daemon_command(new_daemon_id, spawn_cmd).await.is_ok() { tracing::info!( task_id = %id, old_daemon_id = %target_daemon_id, new_daemon_id = %new_daemon_id, "Task reallocated to new daemon, will restart" ); return ( StatusCode::ACCEPTED, Json(serde_json::json!({ "success": true, "reallocated": true, "taskId": id, "message": "Task was reallocated to a new daemon and is restarting. Please retry your message shortly." })), ).into_response(); } } } } // Could not reallocate - return error with helpful message return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new( "DAEMON_DISCONNECTED", "The daemon running this task has disconnected and no alternative daemon is available. Please start a daemon.", )), ).into_response(); } // Send SendMessage command to daemon let command = DaemonCommand::SendMessage { task_id: id, message: req.message.clone(), }; if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { tracing::error!("Failed to send SendMessage command: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DAEMON_ERROR", e)), ) .into_response(); } tracing::info!(task_id = %id, message_len = req.message.len(), "Message sent to task"); // Return success ( StatusCode::OK, Json(serde_json::json!({ "success": true, "taskId": id, "messageLength": req.message.len() })), ) .into_response() } /// Get task output history (scoped by owner). /// /// Retrieves all recorded output from a task's Claude Code process. /// This allows the frontend to fetch missed output when subscribing late /// or reconnecting after a disconnect. #[utoipa::path( get, path = "/api/v1/mesh/tasks/{id}/output", params( ("id" = Uuid, Path, description = "Task ID") ), responses( (status = 200, description = "Task output history", body = TaskOutputResponse), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Task not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn get_task_output( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify task exists and belongs to owner match repository::get_task_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get task: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } } // Get output history (task already verified to belong to owner) match repository::get_task_output(pool, id, None).await { Ok(events) => { let entries: Vec = events .into_iter() .filter_map(TaskOutputEntry::from_task_event) .collect(); let total = entries.len(); Json(TaskOutputResponse { entries, total, task_id: id, }) .into_response() } Err(e) => { tracing::error!("Failed to get task output: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response() } } } /// List subtasks for a parent task (scoped by owner). #[utoipa::path( get, path = "/api/v1/mesh/tasks/{id}/subtasks", params( ("id" = Uuid, Path, description = "Parent task ID") ), responses( (status = 200, description = "List of subtasks", body = TaskListResponse), (status = 401, description = "Unauthorized", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn list_subtasks( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await { Ok(tasks) => { let total = tasks.len() as i64; Json(TaskListResponse { tasks, total }).into_response() } Err(e) => { tracing::error!("Failed to list subtasks for task {}: {}", id, e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response() } } } /// List events for a task (scoped by owner). #[utoipa::path( get, path = "/api/v1/mesh/tasks/{id}/events", params( ("id" = Uuid, Path, description = "Task ID") ), responses( (status = 200, description = "List of task events", body = TaskEventListResponse), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Task not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn list_task_events( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify task exists and belongs to owner match repository::get_task_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get task: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } } match repository::list_task_events(pool, id, None).await { Ok(events) => { let total = events.len() as i64; Json(TaskEventListResponse { events, total }).into_response() } Err(e) => { tracing::error!("Failed to list events for task {}: {}", id, e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response() } } } /// Retry completion action for a completed task (scoped by owner). /// /// This allows retrying a completion action (push branch, merge, create PR) /// after filling in the target_repo_path if it wasn't set when the task completed. #[utoipa::path( post, path = "/api/v1/mesh/tasks/{id}/retry-completion", params( ("id" = Uuid, Path, description = "Task ID") ), responses( (status = 200, description = "Completion action initiated"), (status = 400, description = "Invalid request (task not completed, no completion action, etc.)", body = ApiError), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Task not found", body = ApiError), (status = 503, description = "Database not configured or daemon not connected", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn retry_completion_action( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Get the task (scoped by owner) let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get task {}: {}", id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Check if task is in a terminal state let terminal_statuses = ["done", "failed", "merged"]; if !terminal_statuses.contains(&task.status.as_str()) { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( "INVALID_STATE", format!( "Task must be completed to retry completion action. Current status: {}", task.status ), )), ) .into_response(); } // Check if completion action is set let action = match &task.completion_action { Some(action) if action != "none" => action.clone(), _ => { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( "NO_COMPLETION_ACTION", "Task has no completion action configured (or is set to 'none')", )), ) .into_response(); } }; // Check if target_repo_path is set let target_repo_path = match &task.target_repo_path { Some(path) if !path.is_empty() => path.clone(), _ => { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( "NO_TARGET_REPO", "Target repository path must be set before retrying completion action", )), ) .into_response(); } }; // Note: We don't check overlay_path here because the server may not have it // The daemon will scan its worktrees directory to find the worktree by task ID // Find a daemon to execute the action (must belong to this owner) // Prefer the daemon that ran the task, but fall back to any available daemon for this owner let target_daemon_id = if let Some(daemon_id) = task.daemon_id { // Check if this daemon is still connected and belongs to this owner if state.daemon_connections.iter().any(|d| d.value().id == daemon_id && d.value().owner_id == auth.owner_id) { daemon_id } else { // Fall back to any connected daemon for this owner match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) { Some(d) => d.value().id, None => { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new( "NO_DAEMON", "No daemons connected for your account. Cannot execute completion action.", )), ) .into_response(); } } } } else { // No daemon assigned - use any available for this owner match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) { Some(d) => d.value().id, None => { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new( "NO_DAEMON", "No daemons connected for your account. Cannot execute completion action.", )), ) .into_response(); } } }; // Send RetryCompletionAction command to daemon let command = DaemonCommand::RetryCompletionAction { task_id: id, task_name: task.name.clone(), action: action.clone(), target_repo_path: target_repo_path.clone(), target_branch: task.target_branch.clone(), }; if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { tracing::error!("Failed to send RetryCompletionAction command: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DAEMON_ERROR", e)), ) .into_response(); } tracing::info!( task_id = %id, action = %action, target_repo = %target_repo_path, "Retry completion action initiated" ); ( StatusCode::OK, Json(serde_json::json!({ "success": true, "taskId": id, "action": action, "targetRepoPath": target_repo_path, "message": "Completion action initiated. Check task output for results." })), ) .into_response() } // ============================================================================= // Daemon Handlers // ============================================================================= /// List all connected daemons (requires authentication). #[utoipa::path( get, path = "/api/v1/mesh/daemons", responses( (status = 200, description = "List of daemons", body = DaemonListResponse), (status = 401, description = "Unauthorized", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn list_daemons( State(state): State, Authenticated(auth): Authenticated, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Only list daemons belonging to this owner match repository::list_daemons_for_owner(pool, auth.owner_id).await { Ok(daemons) => { let total = daemons.len() as i64; Json(DaemonListResponse { daemons, total }).into_response() } Err(e) => { tracing::error!("Failed to list daemons: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response() } } } /// Get a single daemon by ID (requires authentication). #[utoipa::path( get, path = "/api/v1/mesh/daemons/{id}", params( ("id" = Uuid, Path, description = "Daemon ID") ), responses( (status = 200, description = "Daemon details", body = crate::db::models::Daemon), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Daemon not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn get_daemon( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Only get daemon if it belongs to this owner match repository::get_daemon_for_owner(pool, id, auth.owner_id).await { Ok(Some(daemon)) => Json(daemon).into_response(), Ok(None) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Daemon not found")), ) .into_response(), Err(e) => { tracing::error!("Failed to get daemon {}: {}", id, e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response() } } } /// Get suggested directories from connected daemons (requires authentication). /// /// Returns directories that can be used as target_repo_path for completion actions. #[utoipa::path( get, path = "/api/v1/mesh/daemons/directories", responses( (status = 200, description = "List of suggested directories", body = DaemonDirectoriesResponse), (status = 401, description = "Unauthorized", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn get_daemon_directories( State(state): State, Authenticated(auth): Authenticated, ) -> impl IntoResponse { let mut directories = Vec::new(); // Iterate over connected daemons belonging to this owner and collect their directories for entry in state.daemon_connections.iter() { let daemon = entry.value(); // Only include daemons belonging to this owner if daemon.owner_id != auth.owner_id { continue; } // Add working directory if available if let Some(ref working_dir) = daemon.working_directory { directories.push(DaemonDirectory { path: working_dir.clone(), label: "Working Directory".to_string(), directory_type: "working".to_string(), hostname: daemon.hostname.clone(), exists: None, }); } // Add home directory if available (for cloning completed work) if let Some(ref home_dir) = daemon.home_directory { directories.push(DaemonDirectory { path: home_dir.clone(), label: "Makima Home".to_string(), directory_type: "home".to_string(), hostname: daemon.hostname.clone(), exists: None, }); } } Json(DaemonDirectoriesResponse { directories }) } /// Request to clone a worktree to a target directory. #[derive(Debug, serde::Deserialize, utoipa::ToSchema)] #[serde(rename_all = "camelCase")] pub struct CloneWorktreeRequest { /// Path to the target directory. pub target_dir: String, } /// Clone a task's worktree to a target directory (scoped by owner). #[utoipa::path( post, path = "/api/v1/mesh/tasks/{id}/clone", params( ("id" = Uuid, Path, description = "Task ID") ), request_body = CloneWorktreeRequest, responses( (status = 200, description = "Clone command sent"), (status = 400, description = "Invalid request or task not completed", body = ApiError), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Task not found", body = ApiError), (status = 503, description = "Database not configured or daemon not connected", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn clone_worktree( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, Json(body): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Get the task (scoped by owner) let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get task {}: {}", id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Verify task is in a completed state let is_completed = matches!(task.status.as_str(), "done" | "failed" | "merged"); if !is_completed { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( "INVALID_STATE", format!("Task must be completed to clone (current status: {})", task.status), )), ) .into_response(); } // Find a connected daemon belonging to this owner to send the command let daemon_entry = state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id); let daemon_id = match daemon_entry { Some(entry) => entry.value().id, None => { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")), ) .into_response(); } }; // Send CloneWorktree command to daemon let command = DaemonCommand::CloneWorktree { task_id: id, target_dir: body.target_dir.clone(), }; if let Err(e) = state.send_daemon_command(daemon_id, command).await { tracing::error!("Failed to send CloneWorktree command: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DAEMON_ERROR", e)), ) .into_response(); } Json(serde_json::json!({ "status": "cloning", "taskId": id.to_string(), "targetDir": body.target_dir, })) .into_response() } // ============================================================================= // Worktree Info // ============================================================================= /// Response for worktree info. #[derive(Debug, serde::Serialize, utoipa::ToSchema)] #[serde(rename_all = "camelCase")] pub struct WorktreeInfoResponse { /// Task ID. pub task_id: Uuid, /// Path to the worktree directory. pub worktree_path: Option, /// Whether the worktree exists. pub exists: bool, /// Aggregate statistics. pub stats: WorktreeStats, /// Changed files list. pub files: Vec, /// Current branch name. pub branch: Option, /// Current HEAD commit SHA. pub head_sha: Option, } /// Statistics about worktree changes. #[derive(Debug, serde::Serialize, utoipa::ToSchema)] #[serde(rename_all = "camelCase")] pub struct WorktreeStats { /// Number of files changed. pub files_changed: i32, /// Total lines inserted. pub insertions: i32, /// Total lines deleted. pub deletions: i32, } /// Information about a changed file in the worktree. #[derive(Debug, serde::Serialize, utoipa::ToSchema)] #[serde(rename_all = "camelCase")] pub struct WorktreeFile { /// File path relative to worktree root. pub path: String, /// Git status code (M, A, D, R, C, U, ?). pub status: String, /// Lines added. pub lines_added: i32, /// Lines removed. pub lines_removed: i32, } /// Get worktree information for a task (files, stats, branch info). #[utoipa::path( get, path = "/api/v1/mesh/tasks/{id}/worktree-info", params( ("id" = Uuid, Path, description = "Task ID") ), responses( (status = 200, description = "Worktree info", body = WorktreeInfoResponse), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Task not found", body = ApiError), (status = 503, description = "Database not configured or daemon not connected", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn get_worktree_info( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Get the task (scoped by owner) let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get task {}: {}", id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Get daemon running the task let Some(daemon_id) = task.daemon_id else { // Task has no daemon, return empty worktree info return Json(WorktreeInfoResponse { task_id: id, worktree_path: None, exists: false, stats: WorktreeStats { files_changed: 0, insertions: 0, deletions: 0, }, files: vec![], branch: None, head_sha: None, }) .into_response(); }; // Create oneshot channel for response let (tx, rx) = oneshot::channel(); // Store the sender for the daemon message handler to use state.pending_worktree_info.insert(id, tx); // Send GetWorktreeInfo command to daemon let command = DaemonCommand::GetWorktreeInfo { task_id: id }; if let Err(e) = state.send_daemon_command(daemon_id, command).await { // Clean up pending request on error state.pending_worktree_info.remove(&id); tracing::error!("Failed to send GetWorktreeInfo command: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DAEMON_ERROR", e)), ) .into_response(); } // Wait for daemon response with timeout match tokio::time::timeout(Duration::from_secs(10), rx).await { Ok(Ok(response)) => { // Convert internal response to API response let files: Vec = response.files .and_then(|f| f.as_array().cloned()) .unwrap_or_default() .into_iter() .filter_map(|v| { Some(WorktreeFile { path: v.get("path")?.as_str()?.to_string(), status: v.get("status")?.as_str()?.to_string(), lines_added: v.get("linesAdded")?.as_i64()? as i32, lines_removed: v.get("linesRemoved")?.as_i64()? as i32, }) }) .collect(); Json(WorktreeInfoResponse { task_id: id, worktree_path: response.worktree_path, exists: response.exists, stats: WorktreeStats { files_changed: response.files_changed, insertions: response.insertions, deletions: response.deletions, }, files, branch: response.branch, head_sha: response.head_sha, }) .into_response() } Ok(Err(_)) => { // Channel was dropped (sender side closed) ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DAEMON_DISCONNECTED", "Daemon disconnected before responding")), ) .into_response() } Err(_) => { // Timeout - clean up pending request state.pending_worktree_info.remove(&id); ( StatusCode::GATEWAY_TIMEOUT, Json(ApiError::new("TIMEOUT", "Daemon did not respond in time")), ) .into_response() } } } // ============================================================================= // Task Diff // ============================================================================= /// Response for the task diff endpoint. #[derive(Debug, serde::Serialize, utoipa::ToSchema)] #[serde(rename_all = "camelCase")] pub struct TaskDiffApiResponse { /// Task ID. pub task_id: Uuid, /// Whether the diff was retrieved successfully. pub success: bool, /// The diff content. pub diff: Option, /// Error message if failed. pub error: Option, } /// Get the diff for a task's changes. #[utoipa::path( get, path = "/api/v1/mesh/tasks/{id}/diff", params( ("id" = Uuid, Path, description = "Task ID") ), responses( (status = 200, description = "Task diff", body = TaskDiffApiResponse), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Task not found", body = ApiError), (status = 503, description = "Database not configured or daemon not connected", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn get_task_diff( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Get the task (scoped by owner) let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get task {}: {}", id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Get daemon running the task let Some(daemon_id) = task.daemon_id else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")), ) .into_response(); }; // Create oneshot channel for response let (tx, rx) = oneshot::channel(); // Store the sender for the daemon message handler to use state.pending_task_diff.insert(id, tx); // Send GetTaskDiff command to daemon let command = DaemonCommand::GetTaskDiff { task_id: id }; if let Err(e) = state.send_daemon_command(daemon_id, command).await { // Clean up pending request on error state.pending_task_diff.remove(&id); tracing::error!("Failed to send GetTaskDiff command: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DAEMON_ERROR", e)), ) .into_response(); } // Wait for daemon response with timeout match tokio::time::timeout(Duration::from_secs(15), rx).await { Ok(Ok(response)) => { Json(TaskDiffApiResponse { task_id: id, success: response.success, diff: response.diff, error: response.error, }) .into_response() } Ok(Err(_)) => { // Channel was dropped (sender side closed) ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DAEMON_DISCONNECTED", "Daemon disconnected before responding")), ) .into_response() } Err(_) => { // Timeout - clean up pending request state.pending_task_diff.remove(&id); ( StatusCode::GATEWAY_TIMEOUT, Json(ApiError::new("TIMEOUT", "Daemon did not respond in time")), ) .into_response() } } } // ============================================================================= // Worktree Commit // ============================================================================= /// Request body for worktree commit. #[derive(Debug, serde::Deserialize, utoipa::ToSchema)] #[serde(rename_all = "camelCase")] pub struct CommitWorktreeRequest { /// Optional commit message. Defaults to "Worktree commit" if not provided. pub message: Option, } /// Response for the worktree commit endpoint. #[derive(Debug, serde::Serialize, utoipa::ToSchema)] #[serde(rename_all = "camelCase")] pub struct CommitWorktreeApiResponse { /// Task ID. pub task_id: Uuid, /// Whether the commit was successful. pub success: bool, /// The commit SHA if successful. pub commit_sha: Option, /// Error message if failed. pub error: Option, } /// Commit changes in a task's worktree. #[utoipa::path( post, path = "/api/v1/mesh/tasks/{id}/worktree-commit", params( ("id" = Uuid, Path, description = "Task ID") ), request_body = CommitWorktreeRequest, responses( (status = 200, description = "Worktree commit result", body = CommitWorktreeApiResponse), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Task not found", body = ApiError), (status = 503, description = "Database not configured or daemon not connected", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn commit_worktree( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, Json(body): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Get the task (scoped by owner) let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get task {}: {}", id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Get daemon running the task let Some(daemon_id) = task.daemon_id else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")), ) .into_response(); }; // Create oneshot channel for response let (tx, rx) = oneshot::channel(); // Store the sender for the daemon message handler to use state.pending_worktree_commit.insert(id, tx); // Send CommitWorktree command to daemon let command = DaemonCommand::CommitWorktree { task_id: id, message: body.message, }; if let Err(e) = state.send_daemon_command(daemon_id, command).await { // Clean up pending request on error state.pending_worktree_commit.remove(&id); tracing::error!("Failed to send CommitWorktree command: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DAEMON_ERROR", e)), ) .into_response(); } // Wait for daemon response with timeout match tokio::time::timeout(Duration::from_secs(15), rx).await { Ok(Ok(response)) => { Json(CommitWorktreeApiResponse { task_id: id, success: response.success, commit_sha: response.commit_sha, error: response.error, }) .into_response() } Ok(Err(_)) => { // Channel was dropped (sender side closed) ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DAEMON_DISCONNECTED", "Daemon disconnected before responding")), ) .into_response() } Err(_) => { // Timeout - clean up pending request state.pending_worktree_commit.remove(&id); ( StatusCode::GATEWAY_TIMEOUT, Json(ApiError::new("TIMEOUT", "Daemon did not respond in time")), ) .into_response() } } } // ============================================================================= // Task Patches // ============================================================================= /// Query parameters for listing task patches #[derive(Debug, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct ListPatchesQuery { /// Contract ID to scope the patches pub contract_id: Uuid, } /// Patch summary for API response #[derive(Debug, serde::Serialize, utoipa::ToSchema)] #[serde(rename_all = "camelCase")] pub struct PatchSummary { /// Patch ID pub id: Uuid, /// Patch name (generated from checkpoint message or timestamp) pub name: String, /// Description (checkpoint message) pub description: Option, /// Task ID pub task_id: Uuid, /// Contract ID pub contract_id: Uuid, /// Number of files in the patch pub files_count: i32, /// Total lines added (estimated from patch size) pub lines_added: i32, /// Total lines removed (estimated from patch size) pub lines_removed: i32, /// List of file paths in the patch (if available from checkpoint) pub files: Option>, /// When the patch was created pub created_at: chrono::DateTime, /// When the patch was last updated pub updated_at: chrono::DateTime, } /// List patches (checkpoint patches) for a task. #[utoipa::path( get, path = "/api/v1/mesh/tasks/{id}/patches", params( ("id" = Uuid, Path, description = "Task ID"), ("contractId" = Uuid, Query, description = "Contract ID") ), responses( (status = 200, description = "List of patches", body = Vec), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Task not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn list_task_patches( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, axum::extract::Query(query): axum::extract::Query, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Get the task (scoped by owner) let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get task {}: {}", id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Verify task belongs to the specified contract if task.contract_id != Some(query.contract_id) { return ( StatusCode::BAD_REQUEST, Json(ApiError::new("INVALID_CONTRACT", "Task does not belong to the specified contract")), ) .into_response(); } // Get checkpoint patches for this task let patches = match repository::list_checkpoint_patches(pool, id).await { Ok(p) => p, Err(e) => { tracing::error!("Failed to list patches for task {}: {}", id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Get checkpoints to get file lists and messages let checkpoints = match repository::list_task_checkpoints(pool, id).await { Ok(c) => c, Err(e) => { tracing::warn!("Failed to list checkpoints for task {}: {}", id, e); vec![] } }; // Create a map of checkpoint_id to checkpoint info let checkpoint_map: std::collections::HashMap = checkpoints .into_iter() .map(|c| (c.id, c)) .collect(); // Transform to PatchSummary let summaries: Vec = patches .into_iter() .map(|p| { let checkpoint = p.checkpoint_id.and_then(|cid| checkpoint_map.get(&cid)); let name = checkpoint .map(|c| c.message.clone()) .unwrap_or_else(|| format!("Patch {}", p.created_at.format("%Y-%m-%d %H:%M"))); let description = checkpoint.map(|c| c.message.clone()); let files = checkpoint.and_then(|c| { c.files_changed.as_ref().and_then(|f| { f.as_array().map(|arr| { arr.iter() .filter_map(|v| v.get("path").and_then(|p| p.as_str()).map(|s| s.to_string())) .collect() }) }) }); let lines_added = checkpoint.and_then(|c| c.lines_added).unwrap_or(0); let lines_removed = checkpoint.and_then(|c| c.lines_removed).unwrap_or(0); PatchSummary { id: p.id, name, description, task_id: p.task_id, contract_id: query.contract_id, files_count: p.files_count, lines_added, lines_removed, files, created_at: p.created_at, updated_at: p.created_at, // Use created_at as updated_at } }) .collect(); Json(summaries).into_response() } /// Response containing the latest checkpoint patch data for a task. #[derive(Debug, serde::Serialize, utoipa::ToSchema)] #[serde(rename_all = "camelCase")] pub struct TaskPatchDataResponse { /// Task ID pub task_id: Uuid, /// Base64-encoded patch data (gzip-compressed git diff) pub patch_data: String, /// The commit SHA that the patch should be applied on top of pub base_commit_sha: String, /// Repository URL from the task pub repository_url: Option, } /// Get the latest checkpoint patch data for a task (for worktree restoration). #[utoipa::path( get, path = "/api/v1/mesh/tasks/{id}/patch-data", params( ("id" = Uuid, Path, description = "Task ID") ), responses( (status = 200, description = "Latest patch data", body = TaskPatchDataResponse), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Task or patch not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn get_task_patch_data( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Get the task (scoped by owner) let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get task {}: {}", id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Get latest checkpoint patch (with binary data) let patch = match repository::get_latest_checkpoint_patch(pool, id).await { Ok(Some(p)) => p, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "No checkpoint patch found for this task")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get patch for task {}: {}", id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; let patch_data_b64 = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data); Json(TaskPatchDataResponse { task_id: id, patch_data: patch_data_b64, base_commit_sha: patch.base_commit_sha, repository_url: task.repository_url, }) .into_response() } /// Request to check if a target directory exists. #[derive(Debug, serde::Deserialize, utoipa::ToSchema)] #[serde(rename_all = "camelCase")] pub struct CheckTargetExistsRequest { /// Path to check. pub target_dir: String, } /// Response for check target exists. #[derive(Debug, serde::Serialize, utoipa::ToSchema)] #[serde(rename_all = "camelCase")] pub struct CheckTargetExistsResponse { /// Whether the target directory exists. pub exists: bool, /// The path that was checked (expanded). pub target_dir: String, } /// Check if a target directory exists (for clone validation, requires authentication). #[utoipa::path( post, path = "/api/v1/mesh/tasks/{id}/check-target", params( ("id" = Uuid, Path, description = "Task ID") ), request_body = CheckTargetExistsRequest, responses( (status = 200, description = "Check result", body = CheckTargetExistsResponse), (status = 401, description = "Unauthorized", body = ApiError), (status = 503, description = "No daemon connected", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn check_target_exists( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, Json(body): Json, ) -> impl IntoResponse { // Find a connected daemon belonging to this owner to send the command let daemon_entry = state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id); let daemon_id = match daemon_entry { Some(entry) => entry.value().id, None => { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")), ) .into_response(); } }; // Send CheckTargetExists command to daemon let command = DaemonCommand::CheckTargetExists { task_id: id, target_dir: body.target_dir.clone(), }; if let Err(e) = state.send_daemon_command(daemon_id, command).await { tracing::error!("Failed to send CheckTargetExists command: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DAEMON_ERROR", e)), ) .into_response(); } // The actual result will be sent back via WebSocket // For now, just acknowledge the request was sent Json(serde_json::json!({ "status": "checking", "taskId": id.to_string(), "targetDir": body.target_dir, })) .into_response() } // ============================================================================= // Task Reassignment (Daemon Failover) // ============================================================================= /// Request to reassign a task to a new daemon after daemon disconnect. #[derive(Debug, serde::Deserialize, utoipa::ToSchema)] #[serde(rename_all = "camelCase")] pub struct ReassignTaskRequest { /// Target daemon ID to reassign to. If not provided, will select any available daemon. pub target_daemon_id: Option, /// Whether to include conversation context from previous run. #[serde(default = "default_include_context")] pub include_context: bool, } fn default_include_context() -> bool { true } /// Response from task reassignment. #[derive(Debug, serde::Serialize, utoipa::ToSchema)] #[serde(rename_all = "camelCase")] pub struct ReassignTaskResponse { /// The new task that was created. pub task: Task, /// The new daemon ID. pub daemon_id: Uuid, /// The ID of the old task that was deleted. pub old_task_id: Uuid, /// Whether conversation context was included. pub context_included: bool, /// Number of context entries from previous conversation. pub context_entries: usize, } /// Build a conversation context summary from task output entries. /// Returns a formatted string that can be prepended to the task plan. /// Only includes user and assistant messages - tool use is excluded for cleaner context. /// Limited to ~4000 characters to avoid bloating the plan. fn build_conversation_context(entries: &[TaskOutputEntry]) -> String { const MAX_CONTEXT_LEN: usize = 4000; const MAX_MESSAGE_LEN: usize = 500; if entries.is_empty() { return String::new(); } // Collect only user and assistant messages let mut messages: Vec = Vec::new(); for entry in entries.iter() { match entry.message_type.as_str() { "assistant" => { // Truncate long messages let content = if entry.content.len() > MAX_MESSAGE_LEN { format!("{}... [truncated]", &entry.content[..MAX_MESSAGE_LEN]) } else { entry.content.clone() }; messages.push(format!("Assistant: {}\n", content)); } "user" | "user_input" => { let content = if entry.content.len() > MAX_MESSAGE_LEN { format!("{}... [truncated]", &entry.content[..MAX_MESSAGE_LEN]) } else { entry.content.clone() }; messages.push(format!("User: {}\n", content)); } // Skip tool_use, tool_result, and other message types for cleaner context _ => {} } } if messages.is_empty() { return String::new(); } // Build context from the end (most recent messages) up to the max length let mut context_body = String::new(); let mut included_count = 0; for msg in messages.iter().rev() { if context_body.len() + msg.len() > MAX_CONTEXT_LEN { break; } context_body = format!("{}{}\n", msg, context_body); included_count += 1; } // If we couldn't include all messages, note how many were skipped let skipped = messages.len() - included_count; let mut context = String::from("\n\n=== PREVIOUS CONVERSATION CONTEXT ===\n"); context.push_str("The daemon running this task disconnected. Here is what happened so far:\n"); if skipped > 0 { context.push_str(&format!("(Showing last {} of {} messages)\n", included_count, messages.len())); } context.push('\n'); context.push_str(&context_body); context.push_str("=== END PREVIOUS CONTEXT ===\n\n"); context.push_str("Please continue from where the conversation left off. Do not repeat work that was already done.\n\n"); context } /// Reassign a task to a new daemon after the original daemon disconnected. /// /// This endpoint is used for daemon failover - when a daemon restarts or disconnects, /// the task can be reassigned to a new daemon with the conversation context preserved. #[utoipa::path( post, path = "/api/v1/mesh/tasks/{id}/reassign", params( ("id" = Uuid, Path, description = "Task ID") ), request_body = ReassignTaskRequest, responses( (status = 200, description = "Task reassigned successfully", body = ReassignTaskResponse), (status = 400, description = "Task cannot be reassigned (not in failed/interrupted state)", body = ApiError), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Task not found", body = ApiError), (status = 503, description = "No daemon available", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn reassign_task( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, Json(body): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Get the task let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get task: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Check if task is in a state that can be reassigned // Allow reassignment for: failed, interrupted, pending, or tasks whose daemon disconnected // Helper closure to check if a daemon is connected by its UUID let is_daemon_connected = |daemon_id: Uuid| { state.daemon_connections.iter().any(|d| d.value().id == daemon_id) }; let can_reassign = matches!( task.status.as_str(), "failed" | "interrupted" | "pending" | "starting" ) || { // Also allow if daemon is not connected if let Some(daemon_id) = task.daemon_id { !is_daemon_connected(daemon_id) } else { true } }; if !can_reassign && task.status == "running" { // Running task - check if its daemon is still connected if let Some(daemon_id) = task.daemon_id { if is_daemon_connected(daemon_id) { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( "TASK_RUNNING", "Task is running on a connected daemon. Stop it first to reassign.", )), ) .into_response(); } } } // Find a target daemon let target_daemon_id = if let Some(requested_daemon_id) = body.target_daemon_id { // Verify the requested daemon is connected and belongs to the owner let daemon = state.daemon_connections.iter().find(|d| d.value().id == requested_daemon_id); if let Some(daemon) = daemon { if daemon.owner_id != auth.owner_id { return ( StatusCode::BAD_REQUEST, Json(ApiError::new("INVALID_DAEMON", "Daemon does not belong to your account")), ) .into_response(); } requested_daemon_id } else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DAEMON_NOT_CONNECTED", "Requested daemon is not connected")), ) .into_response(); } } else { // Find any available daemon for this owner match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) { Some(entry) => entry.value().id, None => { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")), ) .into_response(); } } }; // Build conversation context if requested let (context_str, context_entries) = if body.include_context { match repository::get_task_output(pool, id, Some(500)).await { Ok(events) => { let entries: Vec = events .into_iter() .filter_map(TaskOutputEntry::from_task_event) .collect(); let context = build_conversation_context(&entries); let count = entries.len(); (context, count) } Err(e) => { tracing::warn!("Failed to get task output for context: {}", e); (String::new(), 0) } } } else { (String::new(), 0) }; // Build updated plan with context prepended let updated_plan = if !context_str.is_empty() { format!("{}{}", context_str, task.plan) } else { task.plan.clone() }; // Create a NEW task with the conversation context let create_req = CreateTaskRequest { contract_id: task.contract_id, name: format!("{} (resumed)", task.name), description: task.description.clone(), plan: updated_plan.clone(), parent_task_id: task.parent_task_id, is_supervisor: task.is_supervisor, priority: task.priority, repository_url: task.repository_url.clone(), base_branch: task.base_branch.clone(), target_branch: task.target_branch.clone(), merge_mode: task.merge_mode.clone(), target_repo_path: task.target_repo_path.clone(), completion_action: task.completion_action.clone(), continue_from_task_id: Some(id), // Continue from the old task's worktree if possible copy_files: None, checkpoint_sha: task.last_checkpoint_sha.clone(), branched_from_task_id: None, conversation_history: None, supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: None, directive_step_id: None, }; let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await { Ok(t) => t, Err(e) => { tracing::error!("Failed to create new task for reassignment: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Update new task to starting and assign daemon let start_update = UpdateTaskRequest { status: Some("starting".to_string()), daemon_id: Some(target_daemon_id), version: Some(new_task.version), ..Default::default() }; let final_task = match repository::update_task_for_owner(pool, new_task.id, auth.owner_id, start_update).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "New task not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to update new task daemon assignment: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Fetch latest checkpoint patch for worktree recovery during reassignment let (patch_data, patch_base_sha) = match repository::get_latest_checkpoint_patch(pool, id).await { Ok(Some(patch)) => { tracing::info!( old_task_id = %id, new_task_id = %new_task.id, patch_size = patch.patch_size_bytes, base_sha = %patch.base_commit_sha, files_count = patch.files_count, "Including checkpoint patch for task reassignment recovery" ); let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data); (Some(encoded), Some(patch.base_commit_sha)) } Ok(None) => { tracing::debug!(old_task_id = %id, "No checkpoint patch found for reassignment"); (None, None) } Err(e) => { tracing::warn!(old_task_id = %id, error = %e, "Failed to fetch checkpoint patch for reassignment"); (None, None) } }; // Get local_only and auto_merge_local from contract if task has one let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id { match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local), _ => (false, false), } } else { (false, false) }; // Send SpawnTask command to daemon for the new task let command = DaemonCommand::SpawnTask { task_id: new_task.id, task_name: final_task.name.clone(), plan: updated_plan, repo_url: task.repository_url.clone(), base_branch: task.base_branch.clone(), target_branch: task.target_branch.clone(), parent_task_id: task.parent_task_id, depth: task.depth, is_orchestrator: false, // New task starts fresh target_repo_path: task.target_repo_path.clone(), completion_action: task.completion_action.clone(), continue_from_task_id: Some(id), // Continue from old task's worktree copy_files: None, contract_id: task.contract_id, is_supervisor: task.is_supervisor, autonomous_loop: false, resume_session: false, conversation_history: None, patch_data, patch_base_sha, local_only, auto_merge_local, supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: task.directive_id, }; tracing::info!( old_task_id = %id, new_task_id = %new_task.id, new_daemon_id = %target_daemon_id, context_entries = context_entries, "Reassigning task: creating new task and deleting old one" ); if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { tracing::error!("Failed to send SpawnTask command for reassignment: {}", e); // Rollback: delete the new task we created let _ = repository::delete_task_for_owner(pool, new_task.id, auth.owner_id).await; return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DAEMON_ERROR", e)), ) .into_response(); } // Delete the old task now that the new one is spawned let old_task_id = id; if let Err(e) = repository::delete_task_for_owner(pool, old_task_id, auth.owner_id).await { tracing::warn!("Failed to delete old task {}: {}", old_task_id, e); // Don't fail the request, the new task is already running } // Notify the contract's supervisor about the reassignment (if applicable) if let Some(contract_id) = task.contract_id { if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { if let Some(supervisor_task_id) = contract.supervisor_task_id { // Don't notify if we're reassigning the supervisor itself if supervisor_task_id != old_task_id { // Find the supervisor's daemon and send a message if let Ok(Some(supervisor_task)) = repository::get_task_for_owner(pool, supervisor_task_id, auth.owner_id).await { if supervisor_task.status == "running" { if let Some(supervisor_daemon_id) = supervisor_task.daemon_id { // Find the daemon by its UUID if let Some(daemon_entry) = state.daemon_connections.iter().find(|d| d.value().id == supervisor_daemon_id) { let notification_msg = format!( "\n\n[SYSTEM NOTIFICATION] Task '{}' (ID: {}) was reassigned due to daemon disconnect. \ A new task '{}' (ID: {}) has been created to continue the work. \ The new task has {} context entries from the previous conversation.\n\n", task.name, old_task_id, final_task.name, new_task.id, context_entries ); let notify_cmd = DaemonCommand::SendMessage { task_id: supervisor_task_id, message: notification_msg, }; if let Err(e) = state.send_daemon_command(daemon_entry.value().id, notify_cmd).await { tracing::warn!( supervisor_id = %supervisor_task_id, error = %e, "Failed to notify supervisor about task reassignment" ); } else { tracing::info!( supervisor_id = %supervisor_task_id, old_task_id = %old_task_id, new_task_id = %new_task.id, "Notified supervisor about task reassignment" ); } } } } } } } } } // Broadcast task update for the new task state.broadcast_task_update(TaskUpdateNotification { task_id: new_task.id, owner_id: Some(auth.owner_id), version: final_task.version, status: "starting".to_string(), updated_fields: vec!["status".to_string(), "daemon_id".to_string()], updated_by: "reassignment".to_string(), }); Json(ReassignTaskResponse { task: final_task, daemon_id: target_daemon_id, old_task_id, context_included: !context_str.is_empty(), context_entries, }) .into_response() } // ============================================================================= // Task Continue (Restart with Context) // ============================================================================= /// Request to continue a task after daemon disconnect (restart in-place with context). #[derive(Debug, serde::Deserialize, utoipa::ToSchema)] #[serde(rename_all = "camelCase")] pub struct ContinueTaskRequest { /// Target daemon ID to continue on. If not provided, will select any available daemon. pub target_daemon_id: Option, } /// Response from continuing a task. #[derive(Debug, serde::Serialize, utoipa::ToSchema)] #[serde(rename_all = "camelCase")] pub struct ContinueTaskResponse { /// The continued task (same ID, updated plan with context). pub task: Task, /// The daemon ID running the task. pub daemon_id: Uuid, /// Number of context entries from previous conversation. pub context_entries: usize, } /// Continue a task after daemon disconnect by restarting it with conversation context. /// /// Unlike reassign, this keeps the same task ID and just restarts it with the /// previous conversation context prepended to the plan. Useful for supervisors. #[utoipa::path( post, path = "/api/v1/mesh/tasks/{id}/continue", params( ("id" = Uuid, Path, description = "Task ID") ), request_body = ContinueTaskRequest, responses( (status = 200, description = "Task continued successfully", body = ContinueTaskResponse), (status = 400, description = "Task cannot be continued", body = ApiError), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Task not found", body = ApiError), (status = 503, description = "No daemon available", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn continue_task( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, Json(body): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Get the task let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get task: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Helper closure to check if a daemon is connected by its UUID let is_daemon_connected = |daemon_id: Uuid| { state.daemon_connections.iter().any(|d| d.value().id == daemon_id) }; // Check if task can be continued (not currently running on a connected daemon) let can_continue = matches!( task.status.as_str(), "failed" | "interrupted" | "pending" | "starting" | "completed" ) || { if let Some(daemon_id) = task.daemon_id { !is_daemon_connected(daemon_id) } else { true } }; if !can_continue && task.status == "running" { if let Some(daemon_id) = task.daemon_id { if is_daemon_connected(daemon_id) { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( "TASK_RUNNING", "Task is running on a connected daemon. Stop it first to continue.", )), ) .into_response(); } } } // Find a target daemon let target_daemon_id = if let Some(requested_daemon_id) = body.target_daemon_id { let daemon = state.daemon_connections.iter().find(|d| d.value().id == requested_daemon_id); if let Some(daemon) = daemon { if daemon.owner_id != auth.owner_id { return ( StatusCode::BAD_REQUEST, Json(ApiError::new("INVALID_DAEMON", "Daemon does not belong to your account")), ) .into_response(); } requested_daemon_id } else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DAEMON_NOT_CONNECTED", "Requested daemon is not connected")), ) .into_response(); } } else { match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) { Some(entry) => entry.value().id, None => { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")), ) .into_response(); } } }; // Build conversation context from task output let (context_str, context_entries) = match repository::get_task_output(pool, id, Some(500)).await { Ok(events) => { let entries: Vec = events .into_iter() .filter_map(TaskOutputEntry::from_task_event) .collect(); let context = build_conversation_context(&entries); let count = entries.len(); (context, count) } Err(e) => { tracing::warn!("Failed to get task output for context: {}", e); (String::new(), 0) } }; // Build updated plan with context prepended let updated_plan = if !context_str.is_empty() { format!("{}{}", context_str, task.plan) } else { task.plan.clone() }; // Update task in database: reset status, update plan with context, assign daemon let update_req = UpdateTaskRequest { status: Some("starting".to_string()), plan: Some(updated_plan.clone()), daemon_id: Some(target_daemon_id), error_message: None, ..Default::default() }; let updated_task = match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to update task for continuation: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Check if this is an orchestrator let subtask_count = match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await { Ok(subtasks) => subtasks.len(), Err(_) => 0, }; let is_orchestrator = task.depth == 0 && subtask_count > 0; // Get local_only and auto_merge_local from contract if task has one let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id { match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local), _ => (false, false), } } else { (false, false) }; // Send SpawnTask command to daemon let command = DaemonCommand::SpawnTask { task_id: id, task_name: task.name.clone(), plan: updated_plan, repo_url: task.repository_url.clone(), base_branch: task.base_branch.clone(), target_branch: task.target_branch.clone(), parent_task_id: task.parent_task_id, depth: task.depth, is_orchestrator, target_repo_path: task.target_repo_path.clone(), completion_action: task.completion_action.clone(), continue_from_task_id: task.continue_from_task_id, copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, autonomous_loop: false, resume_session: false, conversation_history: None, patch_data: None, patch_base_sha: None, local_only, auto_merge_local, supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: task.directive_id, }; tracing::info!( task_id = %id, daemon_id = %target_daemon_id, context_entries = context_entries, is_supervisor = task.is_supervisor, "Continuing task with conversation context" ); if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { tracing::error!("Failed to send SpawnTask command for continuation: {}", e); // Rollback let rollback_req = UpdateTaskRequest { status: Some("failed".to_string()), clear_daemon_id: true, error_message: Some(format!("Continuation failed: {}", e)), ..Default::default() }; let _ = repository::update_task_for_owner(pool, id, auth.owner_id, rollback_req).await; return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DAEMON_ERROR", e)), ) .into_response(); } // Broadcast task update state.broadcast_task_update(TaskUpdateNotification { task_id: id, owner_id: Some(auth.owner_id), version: updated_task.version, status: "starting".to_string(), updated_fields: vec!["status".to_string(), "daemon_id".to_string(), "plan".to_string()], updated_by: "continuation".to_string(), }); Json(ContinueTaskResponse { task: updated_task, daemon_id: target_daemon_id, context_entries, }) .into_response() } // ============================================================================= // Task Rewind and Fork (Resume and History System) // ============================================================================= /// Rewind task code to specified checkpoint. /// /// POST /api/v1/mesh/tasks/{id}/rewind #[utoipa::path( post, path = "/api/v1/mesh/tasks/{id}/rewind", params( ("id" = Uuid, Path, description = "Task ID") ), request_body = crate::db::models::RewindTaskRequest, responses( (status = 200, description = "Task rewound successfully", body = crate::db::models::RewindTaskResponse), (status = 400, description = "Invalid request", body = ApiError), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Task or checkpoint not found", body = ApiError), (status = 409, description = "Cannot rewind a running task", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn rewind_task( State(state): State, Authenticated(auth): Authenticated, Path(task_id): Path, Json(req): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Get task and verify ownership let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get task {}: {}", task_id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Task cannot be running during rewind if task.status == "running" { return ( StatusCode::CONFLICT, Json(ApiError::new("TASK_RUNNING", "Cannot rewind a running task")), ) .into_response(); } // Get checkpoint info let checkpoint = if let Some(checkpoint_id) = req.checkpoint_id { match repository::get_task_checkpoint(pool, checkpoint_id).await { Ok(Some(c)) => c, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Checkpoint not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get checkpoint: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } } } else if let Some(ref sha) = req.checkpoint_sha { match repository::get_task_checkpoint_by_sha(pool, sha).await { Ok(Some(c)) => c, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Checkpoint not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get checkpoint by SHA: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } } } else { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( "MISSING_CHECKPOINT", "Must provide checkpoint_id or checkpoint_sha", )), ) .into_response(); }; // Verify checkpoint belongs to this task if checkpoint.task_id != task_id { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( "CHECKPOINT_MISMATCH", "Checkpoint does not belong to this task", )), ) .into_response(); } // TODO: Send rewind command to daemon when daemon integration is complete // For now, return a success response with checkpoint info tracing::info!( task_id = %task_id, checkpoint_number = checkpoint.checkpoint_number, commit_sha = %checkpoint.commit_sha, "Task rewind requested" ); Json(crate::db::models::RewindTaskResponse { task_id, rewinded_to: crate::db::models::CheckpointInfo { checkpoint_number: checkpoint.checkpoint_number, sha: checkpoint.commit_sha.clone(), message: checkpoint.message, }, preserved_as: req.branch_name.map(|name| crate::db::models::PreservedState { state_type: "branch".to_string(), reference: name, }), }) .into_response() } /// Fork task from historical point. /// /// POST /api/v1/mesh/tasks/{id}/fork #[utoipa::path( post, path = "/api/v1/mesh/tasks/{id}/fork", params( ("id" = Uuid, Path, description = "Task ID") ), request_body = crate::db::models::ForkTaskRequest, responses( (status = 201, description = "Task forked successfully", body = crate::db::models::ForkTaskResponse), (status = 400, description = "Invalid request", body = ApiError), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Task or checkpoint not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn fork_task( State(state): State, Authenticated(auth): Authenticated, Path(task_id): Path, Json(req): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Get source task and verify ownership let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get task {}: {}", task_id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Find the checkpoint to fork from let checkpoint = match req.fork_from_type.as_str() { "checkpoint" => { // fork_from_value is checkpoint number let checkpoint_num: i32 = match req.fork_from_value.parse() { Ok(n) => n, Err(_) => { return ( StatusCode::BAD_REQUEST, Json(ApiError::new("INVALID_CHECKPOINT", "Invalid checkpoint number")), ) .into_response(); } }; let checkpoints = match repository::list_task_checkpoints(pool, task_id).await { Ok(c) => c, Err(e) => { tracing::error!("Failed to list checkpoints: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; match checkpoints .into_iter() .find(|c| c.checkpoint_number == checkpoint_num) { Some(c) => c, None => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Checkpoint not found")), ) .into_response(); } } } _ => { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( "UNSUPPORTED_FORK_TYPE", "Only 'checkpoint' fork type is currently supported", )), ) .into_response(); } }; // Create the new forked task let create_req = CreateTaskRequest { contract_id: task.contract_id, name: req.new_task_name.clone(), description: task.description.clone(), plan: req.new_task_plan.clone(), parent_task_id: None, // Forked tasks are independent is_supervisor: false, priority: task.priority, repository_url: task.repository_url.clone(), base_branch: task.base_branch.clone(), target_branch: None, // New branch for forked work merge_mode: task.merge_mode.clone(), target_repo_path: task.target_repo_path.clone(), completion_action: task.completion_action.clone(), continue_from_task_id: None, copy_files: None, checkpoint_sha: Some(checkpoint.commit_sha.clone()), branched_from_task_id: None, conversation_history: None, supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: None, directive_step_id: None, }; let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await { Ok(t) => t, Err(e) => { tracing::error!("Failed to create forked task: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; tracing::info!( source_task_id = %task_id, new_task_id = %new_task.id, checkpoint_number = checkpoint.checkpoint_number, "Task forked from checkpoint" ); ( StatusCode::CREATED, Json(crate::db::models::ForkTaskResponse { new_task_id: new_task.id, source_task_id: task_id, fork_point: crate::db::models::ForkPoint { fork_type: "checkpoint".to_string(), checkpoint: Some(checkpoint.clone()), timestamp: checkpoint.created_at, }, branch_name: req.branch_name, conversation_included: req.include_conversation.unwrap_or(false), message_count: None, }), ) .into_response() } /// Create new task starting from specific checkpoint. /// /// POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume #[utoipa::path( post, path = "/api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume", params( ("id" = Uuid, Path, description = "Task ID"), ("cid" = Uuid, Path, description = "Checkpoint ID") ), request_body = crate::db::models::ResumeFromCheckpointRequest, responses( (status = 201, description = "Task created from checkpoint", body = Task), (status = 400, description = "Invalid request", body = ApiError), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Task or checkpoint not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn resume_from_checkpoint( State(state): State, Authenticated(auth): Authenticated, Path((task_id, checkpoint_id)): Path<(Uuid, Uuid)>, Json(req): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Get source task and verify ownership let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get task {}: {}", task_id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Get checkpoint let checkpoint = match repository::get_task_checkpoint(pool, checkpoint_id).await { Ok(Some(c)) => c, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Checkpoint not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get checkpoint: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Verify checkpoint belongs to the task if checkpoint.task_id != task_id { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( "CHECKPOINT_MISMATCH", "Checkpoint does not belong to this task", )), ) .into_response(); } // Create the new task that will start from checkpoint let task_name = req.task_name.unwrap_or_else(|| { format!( "{} (resumed from checkpoint {})", task.name, checkpoint.checkpoint_number ) }); let create_req = CreateTaskRequest { contract_id: task.contract_id, name: task_name, description: task.description.clone(), plan: req.plan, parent_task_id: None, is_supervisor: false, priority: task.priority, repository_url: task.repository_url.clone(), base_branch: task.base_branch.clone(), target_branch: None, // New branch for resumed work merge_mode: task.merge_mode.clone(), target_repo_path: task.target_repo_path.clone(), completion_action: task.completion_action.clone(), continue_from_task_id: Some(task_id), // Copy worktree from original task copy_files: None, checkpoint_sha: Some(checkpoint.commit_sha.clone()), branched_from_task_id: None, conversation_history: None, supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: None, directive_step_id: None, }; let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await { Ok(t) => t, Err(e) => { tracing::error!("Failed to create resumed task: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; tracing::info!( source_task_id = %task_id, new_task_id = %new_task.id, checkpoint_id = %checkpoint_id, checkpoint_number = checkpoint.checkpoint_number, "Task resumed from checkpoint" ); (StatusCode::CREATED, Json(new_task)).into_response() } /// Request to create branch from checkpoint. #[derive(Debug, serde::Deserialize, utoipa::ToSchema)] #[serde(rename_all = "camelCase")] pub struct CreateBranchFromCheckpointRequest { pub branch_name: String, #[serde(default)] pub checkout: bool, } /// Response for branch creation from checkpoint. #[derive(Debug, serde::Serialize, utoipa::ToSchema)] #[serde(rename_all = "camelCase")] pub struct BranchCreatedResponse { pub branch_name: String, pub commit_sha: String, pub task_id: Uuid, pub checkpoint_number: i32, } /// Create git branch from checkpoint without starting task. /// /// POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch #[utoipa::path( post, path = "/api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch", params( ("id" = Uuid, Path, description = "Task ID"), ("cid" = Uuid, Path, description = "Checkpoint ID") ), request_body = CreateBranchFromCheckpointRequest, responses( (status = 201, description = "Branch created", body = BranchCreatedResponse), (status = 400, description = "Invalid request", body = ApiError), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Task or checkpoint not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn branch_from_checkpoint( State(state): State, Authenticated(auth): Authenticated, Path((task_id, checkpoint_id)): Path<(Uuid, Uuid)>, Json(req): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Get task and verify ownership let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get task {}: {}", task_id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Get checkpoint let checkpoint = match repository::get_task_checkpoint(pool, checkpoint_id).await { Ok(Some(c)) => c, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Checkpoint not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get checkpoint: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Verify checkpoint belongs to the task if checkpoint.task_id != task_id { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( "CHECKPOINT_MISMATCH", "Checkpoint does not belong to this task", )), ) .into_response(); } // Find a daemon to execute the branch creation let target_daemon_id = if let Some(daemon_id) = task.daemon_id { // Check if the original daemon is still connected if state .daemon_connections .iter() .any(|d| d.value().id == daemon_id && d.value().owner_id == auth.owner_id) { daemon_id } else { // Find any connected daemon for this owner match state .daemon_connections .iter() .find(|d| d.value().owner_id == auth.owner_id) { Some(d) => d.value().id, None => { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new( "NO_DAEMON", "No daemon connected to create branch", )), ) .into_response(); } } } } else { // No daemon assigned - use any available for this owner match state .daemon_connections .iter() .find(|d| d.value().owner_id == auth.owner_id) { Some(d) => d.value().id, None => { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new( "NO_DAEMON", "No daemon connected to create branch", )), ) .into_response(); } } }; // Send CreateBranch command to daemon let cmd = DaemonCommand::CreateBranch { task_id, branch_name: req.branch_name.clone(), from_ref: Some(checkpoint.commit_sha.clone()), }; if let Err(e) = state.send_daemon_command(target_daemon_id, cmd).await { tracing::error!("Failed to send CreateBranch command: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DAEMON_ERROR", e)), ) .into_response(); } tracing::info!( task_id = %task_id, checkpoint_id = %checkpoint_id, branch_name = %req.branch_name, commit_sha = %checkpoint.commit_sha, "Branch creation requested from checkpoint" ); ( StatusCode::CREATED, Json(BranchCreatedResponse { branch_name: req.branch_name, commit_sha: checkpoint.commit_sha, task_id, checkpoint_number: checkpoint.checkpoint_number, }), ) .into_response() } // ============================================================================= // Task Branching // ============================================================================= /// Branch a task, creating a new anonymous task from an existing task's conversation. /// /// Creates a new task that: /// - Has no contract_id (anonymous task) /// - Has branched_from_task_id pointing to the source task /// - Optionally includes conversation history from the source task /// - Can be started on an available daemon #[utoipa::path( post, path = "/api/v1/mesh/tasks/{id}/branch", params( ("id" = Uuid, Path, description = "Source task ID to branch from") ), request_body = BranchTaskRequest, responses( (status = 201, description = "Task branched successfully", body = BranchTaskResponse), (status = 400, description = "Invalid request", body = ApiError), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Source task not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn branch_task( State(state): State, Authenticated(auth): Authenticated, Path(source_task_id): Path, Json(req): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Get the source task (must belong to the same owner) let source_task = match repository::get_task_for_owner(pool, source_task_id, auth.owner_id).await { Ok(Some(task)) => task, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Source task not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get source task: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Build conversation history if requested let (conversation_history, message_count) = if req.include_conversation { match repository::get_task_output(pool, source_task_id, Some(500)).await { Ok(events) => { let entries: Vec = events .into_iter() .filter_map(TaskOutputEntry::from_task_event) .collect(); let count = entries.len(); // Convert entries to a JSON array for conversation_history let history_json = serde_json::to_value(&entries).unwrap_or(serde_json::Value::Null); (Some(history_json), count) } Err(e) => { tracing::warn!("Failed to get task output for branching: {}", e); (None, 0) } } } else { (None, 0) }; // Generate task name if not provided let task_name = req.name.unwrap_or_else(|| { format!("{} (branch)", source_task.name) }); // Create the branched task (anonymous - no contract_id) let create_req = CreateTaskRequest { contract_id: None, // Anonymous task name: task_name, description: Some(format!("Branched from task: {}", source_task.name)), plan: req.message, parent_task_id: None, is_supervisor: false, priority: source_task.priority, repository_url: source_task.repository_url.clone(), base_branch: source_task.base_branch.clone(), target_branch: None, // Branched tasks don't auto-merge merge_mode: None, target_repo_path: source_task.target_repo_path.clone(), completion_action: Some("none".to_string()), // Don't auto-complete continue_from_task_id: Some(source_task_id), // Continue from source task's worktree copy_files: None, checkpoint_sha: None, branched_from_task_id: Some(source_task_id), conversation_history, supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: None, directive_step_id: None, }; let task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await { Ok(task) => task, Err(e) => { tracing::error!("Failed to create branched task: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Record history event for task branching let _ = repository::record_history_event( pool, auth.owner_id, None, // No contract for anonymous tasks Some(task.id), "task", Some("branched"), None, serde_json::json!({ "name": &task.name, "sourceTaskId": source_task_id, "sourceTaskName": &source_task.name, "messageCount": message_count, }), ).await; // Fetch latest checkpoint patch from source task for worktree recovery let (patch_data, patch_base_sha) = match repository::get_latest_checkpoint_patch(pool, source_task_id).await { Ok(Some(patch)) => { tracing::info!( source_task_id = %source_task_id, new_task_id = %task.id, patch_size = patch.patch_size_bytes, base_sha = %patch.base_commit_sha, files_count = patch.files_count, "Including checkpoint patch for task branching recovery" ); let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data); (Some(encoded), Some(patch.base_commit_sha)) } Ok(None) => { tracing::debug!(source_task_id = %source_task_id, "No checkpoint patch found for branching"); (None, None) } Err(e) => { tracing::warn!(source_task_id = %source_task_id, error = %e, "Failed to fetch checkpoint patch for branching"); (None, None) } }; // Try to find an available daemon to start the task let daemon_id = state.daemon_connections .iter() .find(|d| d.value().owner_id == auth.owner_id) .map(|d| d.value().id); // If a daemon is available, start the task if let Some(target_daemon_id) = daemon_id { // Update task with daemon assignment let update_req = UpdateTaskRequest { status: Some("starting".to_string()), daemon_id: Some(target_daemon_id), ..Default::default() }; if let Ok(Some(updated_task)) = repository::update_task_for_owner(pool, task.id, auth.owner_id, update_req).await { // Send SpawnTask command to daemon let command = DaemonCommand::SpawnTask { task_id: task.id, task_name: updated_task.name.clone(), plan: updated_task.plan.clone(), repo_url: updated_task.repository_url.clone(), base_branch: updated_task.base_branch.clone(), target_branch: updated_task.target_branch.clone(), parent_task_id: None, depth: 0, is_orchestrator: false, target_repo_path: updated_task.target_repo_path.clone(), completion_action: updated_task.completion_action.clone(), continue_from_task_id: updated_task.continue_from_task_id, copy_files: None, contract_id: None, is_supervisor: false, autonomous_loop: false, resume_session: message_count > 0, // Resume if we have conversation history conversation_history: updated_task.conversation_state.clone(), patch_data, patch_base_sha, local_only: false, // No contract, so not local_only auto_merge_local: false, // No contract, so no auto_merge_local supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: None, }; if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { tracing::warn!( task_id = %task.id, daemon_id = %target_daemon_id, error = %e, "Failed to send SpawnTask command for branched task, task created but not started" ); // Task was created but not started - return without daemon_id return ( StatusCode::CREATED, Json(BranchTaskResponse { task, message_count, daemon_id: None, }), ) .into_response(); } tracing::info!( task_id = %task.id, source_task_id = %source_task_id, daemon_id = %target_daemon_id, message_count = message_count, "Branched task created and started" ); // Broadcast task update notification state.broadcast_task_update(TaskUpdateNotification { task_id: task.id, owner_id: Some(auth.owner_id), version: updated_task.version, status: "starting".to_string(), updated_fields: vec!["status".to_string(), "daemon_id".to_string()], updated_by: "system".to_string(), }); return ( StatusCode::CREATED, Json(BranchTaskResponse { task: updated_task, message_count, daemon_id: Some(target_daemon_id), }), ) .into_response(); } } // No daemon available or failed to start - return task without daemon_id tracing::info!( task_id = %task.id, source_task_id = %source_task_id, message_count = message_count, "Branched task created (no daemon available to start)" ); ( StatusCode::CREATED, Json(BranchTaskResponse { task, message_count, daemon_id: None, }), ) .into_response() } // ============================================================================= // Daemon Management // ============================================================================= /// Response for restart daemon request. #[derive(Debug, serde::Serialize, utoipa::ToSchema)] #[serde(rename_all = "camelCase")] pub struct RestartDaemonResponse { /// Whether the restart command was sent successfully. pub success: bool, /// The daemon ID that received the restart command. pub daemon_id: Uuid, /// Message describing the result. pub message: String, } /// Restart a daemon by ID (requires authentication). /// /// Sends a restart command to the specified daemon, which will cause it to /// gracefully terminate and restart. Any running tasks will be interrupted. #[utoipa::path( post, path = "/api/v1/mesh/daemons/{id}/restart", params( ("id" = Uuid, Path, description = "Daemon ID") ), responses( (status = 200, description = "Restart command sent", body = RestartDaemonResponse), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Daemon not found or not connected", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn restart_daemon( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify the daemon exists and belongs to this owner match repository::get_daemon_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Daemon not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get daemon {}: {}", id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } } // Check if daemon is connected if !state.is_daemon_connected(id) { return ( StatusCode::NOT_FOUND, Json(ApiError::new( "DAEMON_NOT_CONNECTED", "Daemon is not currently connected", )), ) .into_response(); } // Send restart command to daemon let command = DaemonCommand::RestartDaemon; if let Err(e) = state.send_daemon_command(id, command).await { tracing::error!("Failed to send restart command to daemon {}: {}", id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DAEMON_ERROR", e)), ) .into_response(); } tracing::info!( daemon_id = %id, owner_id = %auth.owner_id, "Restart command sent to daemon" ); Json(RestartDaemonResponse { success: true, daemon_id: id, message: "Restart command sent. The daemon will restart shortly.".to_string(), }) .into_response() } // ============================================================================= // Daemon Reauthorization // ============================================================================= /// Response from the trigger reauth endpoint. #[derive(Debug, serde::Serialize, utoipa::ToSchema)] pub struct TriggerReauthResponse { /// Whether the reauth command was sent successfully. pub success: bool, /// The daemon ID that received the reauth command. #[serde(rename = "daemonId")] pub daemon_id: Uuid, /// Unique request ID for tracking this reauth flow. #[serde(rename = "requestId")] pub request_id: Uuid, } /// Request body for submitting an auth code. #[derive(Debug, serde::Deserialize, utoipa::ToSchema)] pub struct SubmitAuthCodeRequest { /// The auth code obtained from the OAuth login flow. pub code: String, /// The request ID from the trigger reauth response. #[serde(rename = "requestId")] pub request_id: Uuid, } /// Response from the submit auth code endpoint. #[derive(Debug, serde::Serialize, utoipa::ToSchema)] pub struct SubmitAuthCodeResponse { /// Whether the auth code was sent to the daemon successfully. pub success: bool, } /// Response from the reauth status polling endpoint. #[derive(Debug, serde::Serialize, utoipa::ToSchema)] pub struct ReauthStatusResponse { /// Current status of the reauth flow: "pending", "url_ready", "completed", "failed" pub status: String, /// OAuth login URL (present when status is "url_ready") #[serde(rename = "loginUrl", skip_serializing_if = "Option::is_none")] pub login_url: Option, /// Error message (present when status is "failed") #[serde(skip_serializing_if = "Option::is_none")] pub error: Option, } /// Trigger OAuth re-authentication on a daemon. /// /// Sends a reauth command to the specified daemon, which will spawn `claude setup-token` /// and return the OAuth login URL via a status update. #[utoipa::path( post, path = "/api/v1/mesh/daemons/{id}/reauth", params( ("id" = Uuid, Path, description = "Daemon ID") ), responses( (status = 200, description = "Reauth command sent", body = TriggerReauthResponse), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Daemon not found or not connected", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn trigger_daemon_reauth( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify the daemon exists and belongs to this owner match repository::get_daemon_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Daemon not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get daemon {}: {}", id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } } // Check if daemon is connected if !state.is_daemon_connected(id) { return ( StatusCode::NOT_FOUND, Json(ApiError::new( "DAEMON_NOT_CONNECTED", "Daemon is not currently connected", )), ) .into_response(); } // Generate a unique request ID for this reauth flow let request_id = Uuid::new_v4(); // Initialize the status as "pending" state.set_daemon_reauth_status(id, request_id, "pending".to_string(), None, None); // Send reauth command to daemon let command = DaemonCommand::TriggerReauth { request_id }; if let Err(e) = state.send_daemon_command(id, command).await { tracing::error!("Failed to send reauth command to daemon {}: {}", id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DAEMON_ERROR", e)), ) .into_response(); } tracing::info!( daemon_id = %id, request_id = %request_id, owner_id = %auth.owner_id, "Reauth command sent to daemon" ); Json(TriggerReauthResponse { success: true, daemon_id: id, request_id, }) .into_response() } /// Submit an OAuth auth code to a daemon's pending reauth flow. /// /// Sends the auth code to the daemon, which will forward it to the `claude setup-token` process. #[utoipa::path( post, path = "/api/v1/mesh/daemons/{id}/reauth/code", params( ("id" = Uuid, Path, description = "Daemon ID") ), request_body = SubmitAuthCodeRequest, responses( (status = 200, description = "Auth code submitted", body = SubmitAuthCodeResponse), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Daemon not found or not connected", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn submit_daemon_auth_code( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, Json(body): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify the daemon exists and belongs to this owner match repository::get_daemon_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Daemon not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get daemon {}: {}", id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } } // Check if daemon is connected if !state.is_daemon_connected(id) { return ( StatusCode::NOT_FOUND, Json(ApiError::new( "DAEMON_NOT_CONNECTED", "Daemon is not currently connected", )), ) .into_response(); } // Send auth code command to daemon let command = DaemonCommand::SubmitAuthCode { request_id: body.request_id, code: body.code, }; if let Err(e) = state.send_daemon_command(id, command).await { tracing::error!("Failed to send auth code to daemon {}: {}", id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DAEMON_ERROR", e)), ) .into_response(); } tracing::info!( daemon_id = %id, request_id = %body.request_id, owner_id = %auth.owner_id, "Auth code submitted to daemon" ); Json(SubmitAuthCodeResponse { success: true }).into_response() } /// Get the status of a daemon reauth request. /// /// Used by the frontend to poll for reauth status updates. #[utoipa::path( get, path = "/api/v1/mesh/daemons/{id}/reauth/{request_id}/status", params( ("id" = Uuid, Path, description = "Daemon ID"), ("request_id" = Uuid, Path, description = "Reauth request ID") ), responses( (status = 200, description = "Reauth status", body = ReauthStatusResponse), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Reauth request not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn get_daemon_reauth_status( State(state): State, Authenticated(_auth): Authenticated, Path((id, request_id)): Path<(Uuid, Uuid)>, ) -> impl IntoResponse { match state.get_daemon_reauth_status(id, request_id) { Some(status) => Json(ReauthStatusResponse { status: status.status, login_url: status.login_url, error: status.error, }) .into_response(), None => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Reauth request not found")), ) .into_response(), } }