//! HTTP handlers for supervisor-specific mesh operations. //! //! These endpoints are used by supervisor tasks (via supervisor.sh) to orchestrate //! contract work: spawning tasks, waiting for completion, reading worktree files, etc. use axum::{ extract::{Path, State}, http::{HeaderMap, StatusCode}, response::IntoResponse, Json, }; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use uuid::Uuid; use crate::db::models::{CreateTaskRequest, Task, TaskSummary}; use crate::db::repository; use crate::server::handlers::mesh::{extract_auth, AuthSource}; use crate::server::messages::ApiError; use crate::server::state::{DaemonCommand, SharedState}; // ============================================================================= // Request/Response Types // ============================================================================= /// Request to spawn a new task from supervisor. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct SpawnTaskRequest { pub name: String, pub plan: String, pub contract_id: Uuid, pub parent_task_id: Option, pub checkpoint_sha: Option, /// Repository URL for the task (supervisor should provide this) pub repository_url: Option, } /// Request to wait for task completion. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct WaitForTaskRequest { #[serde(default = "default_timeout")] pub timeout_seconds: i32, } fn default_timeout() -> i32 { 300 } /// Request to read a file from task worktree. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct ReadWorktreeFileRequest { pub file_path: String, } /// Request to create a checkpoint. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct CreateCheckpointRequest { pub message: String, } /// Response for task tree. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct TaskTreeResponse { pub tasks: Vec, pub supervisor_task_id: Option, pub total_count: usize, } /// Response for wait operation. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct WaitResponse { pub task_id: Uuid, pub status: String, pub completed: bool, pub output_summary: Option, } /// Response for read file operation. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct ReadFileResponse { pub task_id: Uuid, pub file_path: String, pub content: String, pub exists: bool, } /// Response for checkpoint operations. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct CheckpointResponse { pub task_id: Uuid, pub checkpoint_number: i32, pub commit_sha: String, pub message: String, } /// Task checkpoint info. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct TaskCheckpoint { pub id: Uuid, pub task_id: Uuid, pub checkpoint_number: i32, pub commit_sha: String, pub branch_name: String, pub message: String, pub files_changed: Option, pub lines_added: i32, pub lines_removed: i32, pub created_at: chrono::DateTime, } /// Response for list checkpoints. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct CheckpointListResponse { pub task_id: Uuid, pub checkpoints: Vec, } // ============================================================================= // Helper Functions // ============================================================================= /// Verify the request comes from a supervisor task and extract ownership info. async fn verify_supervisor_auth( state: &SharedState, headers: &HeaderMap, contract_id: Option, ) -> Result<(Uuid, Uuid), (StatusCode, Json)> { let auth = extract_auth(state, headers); let task_id = match auth { AuthSource::ToolKey(task_id) => task_id, _ => { return Err(( StatusCode::UNAUTHORIZED, Json(ApiError::new("UNAUTHORIZED", "Supervisor endpoints require tool key auth")), )); } }; // Get the task to verify it's a supervisor and get owner_id let pool = state.db_pool.as_ref().ok_or_else(|| { ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) })?; let task = repository::get_task(pool, task_id) .await .map_err(|e| { tracing::error!(error = %e, "Failed to get supervisor task"); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to verify supervisor")), ) })? .ok_or_else(|| { ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) })?; // Verify task is a supervisor if !task.is_supervisor { return Err(( StatusCode::FORBIDDEN, Json(ApiError::new("NOT_SUPERVISOR", "Only supervisor tasks can use these endpoints")), )); } // If contract_id provided, verify the supervisor belongs to that contract if let Some(cid) = contract_id { if task.contract_id != Some(cid) { return Err(( StatusCode::FORBIDDEN, Json(ApiError::new("CONTRACT_MISMATCH", "Supervisor does not belong to this contract")), )); } } Ok((task_id, task.owner_id)) } // ============================================================================= // Contract Task Handlers // ============================================================================= /// List all tasks in a contract's tree. #[utoipa::path( get, path = "/api/v1/mesh/supervisor/contracts/{contract_id}/tasks", params( ("contract_id" = Uuid, Path, description = "Contract ID") ), responses( (status = 200, description = "List of tasks in contract", body = TaskTreeResponse), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden - not a supervisor"), (status = 500, description = "Internal server error"), ), tag = "Mesh Supervisor" )] pub async fn list_contract_tasks( State(state): State, Path(contract_id): Path, headers: HeaderMap, ) -> impl IntoResponse { let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, Some(contract_id)).await { Ok(ids) => ids, Err(e) => return e.into_response(), }; let pool = state.db_pool.as_ref().unwrap(); // Get all tasks for this contract match repository::list_tasks_by_contract(pool, contract_id, owner_id).await { Ok(tasks) => { let supervisor_task_id = tasks.iter().find(|t| t.is_supervisor).map(|t| t.id); let summaries: Vec = tasks.into_iter().map(TaskSummary::from).collect(); let total_count = summaries.len(); ( StatusCode::OK, Json(TaskTreeResponse { tasks: summaries, supervisor_task_id, total_count, }), ).into_response() } Err(e) => { tracing::error!(error = %e, "Failed to list contract tasks"); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to list tasks")), ).into_response() } } } /// Get full task tree structure for a contract. #[utoipa::path( get, path = "/api/v1/mesh/supervisor/contracts/{contract_id}/tree", params( ("contract_id" = Uuid, Path, description = "Contract ID") ), responses( (status = 200, description = "Task tree structure", body = TaskTreeResponse), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden - not a supervisor"), (status = 500, description = "Internal server error"), ), tag = "Mesh Supervisor" )] pub async fn get_contract_tree( State(state): State, Path(contract_id): Path, headers: HeaderMap, ) -> impl IntoResponse { // Same as list_contract_tasks for now - can add tree structure later list_contract_tasks(State(state), Path(contract_id), headers).await } // ============================================================================= // Task Spawn Handler // ============================================================================= /// Spawn a new task (supervisor only). #[utoipa::path( post, path = "/api/v1/mesh/supervisor/tasks", request_body = SpawnTaskRequest, responses( (status = 201, description = "Task created", body = Task), (status = 400, description = "Invalid request"), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden - not a supervisor"), (status = 500, description = "Internal server error"), ), tag = "Mesh Supervisor" )] pub async fn spawn_task( State(state): State, headers: HeaderMap, Json(request): Json, ) -> impl IntoResponse { let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, Some(request.contract_id)).await { Ok(ids) => ids, Err(e) => return e.into_response(), }; let pool = state.db_pool.as_ref().unwrap(); // Verify contract exists let _contract = match repository::get_contract_for_owner(pool, request.contract_id, owner_id).await { Ok(Some(c)) => c, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Contract not found")), ).into_response(); } Err(e) => { tracing::error!(error = %e, "Failed to get contract"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to get contract")), ).into_response(); } }; // Get repository URL from the contract's primary repository let repo_url = match repository::list_contract_repositories(pool, request.contract_id).await { Ok(repos) => { // Prefer primary repo, fallback to first repo repos.iter() .find(|r| r.is_primary) .or(repos.first()) .and_then(|r| r.repository_url.clone()) } Err(e) => { tracing::warn!(error = %e, "Failed to get contract repositories, continuing without repo URL"); None } }; // Supervisor can override with explicit repository_url let repo_url = request.repository_url.clone().or(repo_url); // Create task request let create_req = CreateTaskRequest { name: request.name.clone(), description: None, plan: request.plan.clone(), repository_url: repo_url.clone(), contract_id: request.contract_id, parent_task_id: request.parent_task_id, is_supervisor: false, checkpoint_sha: request.checkpoint_sha.clone(), merge_mode: Some("manual".to_string()), priority: 0, base_branch: None, target_branch: None, target_repo_path: None, completion_action: None, continue_from_task_id: None, copy_files: None, }; // Create task in DB let task = match repository::create_task_for_owner(pool, owner_id, create_req).await { Ok(t) => t, Err(e) => { tracing::error!(error = %e, "Failed to create task"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to create task")), ).into_response(); } }; tracing::info!( supervisor_id = %supervisor_id, task_id = %task.id, task_name = %task.name, "Supervisor spawned new task" ); // Start task on a daemon // Find a daemon that belongs to this owner for entry in state.daemon_connections.iter() { let daemon = entry.value(); if daemon.owner_id == owner_id { // Send spawn command to first available daemon let cmd = DaemonCommand::SpawnTask { task_id: task.id, task_name: task.name.clone(), plan: task.plan.clone(), repo_url: repo_url.clone(), base_branch: task.base_branch.clone(), target_branch: task.target_branch.clone(), parent_task_id: task.parent_task_id, depth: task.depth, is_orchestrator: false, target_repo_path: task.target_repo_path.clone(), completion_action: task.completion_action.clone(), continue_from_task_id: task.continue_from_task_id, copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: false, }; if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { tracing::warn!(error = %e, daemon_id = %daemon.id, "Failed to send spawn command"); } else { tracing::info!(task_id = %task.id, daemon_id = %daemon.id, repo_url = ?repo_url, "Task spawn command sent"); } break; } } (StatusCode::CREATED, Json(task)).into_response() } // ============================================================================= // Wait for Task Handler // ============================================================================= /// Wait for a task to complete. #[utoipa::path( post, path = "/api/v1/mesh/supervisor/tasks/{task_id}/wait", params( ("task_id" = Uuid, Path, description = "Task ID to wait for") ), request_body = WaitForTaskRequest, responses( (status = 200, description = "Task completed or timed out", body = WaitResponse), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden - not a supervisor"), (status = 404, description = "Task not found"), (status = 500, description = "Internal server error"), ), tag = "Mesh Supervisor" )] pub async fn wait_for_task( State(state): State, Path(task_id): Path, headers: HeaderMap, Json(request): Json, ) -> impl IntoResponse { let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { Ok(ids) => ids, Err(e) => return e.into_response(), }; let pool = state.db_pool.as_ref().unwrap(); // Verify task belongs to same owner let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ).into_response(); } Err(e) => { tracing::error!(error = %e, "Failed to get task"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to get task")), ).into_response(); } }; // Check if already done if task.status == "done" || task.status == "failed" || task.status == "merged" { return ( StatusCode::OK, Json(WaitResponse { task_id, status: task.status, completed: true, output_summary: None, }), ).into_response(); } // Subscribe to task completions let mut rx = state.task_completions.subscribe(); let timeout = tokio::time::Duration::from_secs(request.timeout_seconds as u64); // Wait for completion or timeout let result = tokio::time::timeout(timeout, async { loop { match rx.recv().await { Ok(notification) => { if notification.task_id == task_id { return Some(notification); } } Err(_) => { // Channel closed or lagged - check DB directly if let Ok(Some(t)) = repository::get_task(pool, task_id).await { if t.status == "done" || t.status == "failed" || t.status == "merged" { return Some(crate::server::state::TaskCompletionNotification { task_id: t.id, owner_id: Some(t.owner_id), contract_id: t.contract_id, parent_task_id: t.parent_task_id, status: t.status, output_summary: None, worktree_path: None, error_message: t.error_message, }); } } tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; } } } }).await; match result { Ok(Some(notification)) => { ( StatusCode::OK, Json(WaitResponse { task_id, status: notification.status, completed: true, output_summary: notification.output_summary, }), ).into_response() } Ok(None) | Err(_) => { // Timeout - check final status let final_status = repository::get_task(pool, task_id) .await .ok() .flatten() .map(|t| t.status) .unwrap_or_else(|| "unknown".to_string()); ( StatusCode::OK, Json(WaitResponse { task_id, status: final_status.clone(), completed: final_status == "done" || final_status == "failed" || final_status == "merged", output_summary: None, }), ).into_response() } } } // ============================================================================= // Read Worktree File Handler // ============================================================================= /// Read a file from a task's worktree. #[utoipa::path( post, path = "/api/v1/mesh/supervisor/tasks/{task_id}/read-file", params( ("task_id" = Uuid, Path, description = "Task ID") ), request_body = ReadWorktreeFileRequest, responses( (status = 200, description = "File content", body = ReadFileResponse), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden - not a supervisor"), (status = 404, description = "Task not found"), (status = 500, description = "Internal server error"), ), tag = "Mesh Supervisor" )] pub async fn read_worktree_file( State(state): State, Path(task_id): Path, headers: HeaderMap, Json(request): Json, ) -> impl IntoResponse { let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { Ok(ids) => ids, Err(e) => return e.into_response(), }; let pool = state.db_pool.as_ref().unwrap(); // Get task to verify ownership let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ).into_response(); } Err(e) => { tracing::error!(error = %e, "Failed to get task"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to get task")), ).into_response(); } }; // TODO: Implement file reading via worktree path // For now, return not implemented - supervisor should use local file access via worktree let _ = (task, request); ( StatusCode::NOT_IMPLEMENTED, Json(ApiError::new( "NOT_IMPLEMENTED", "Worktree file reading via API not yet implemented. Use local filesystem access via worktree path.", )), ).into_response() } // ============================================================================= // Checkpoint Handlers // ============================================================================= /// Create a git checkpoint for a task. #[utoipa::path( post, path = "/api/v1/mesh/tasks/{task_id}/checkpoint", params( ("task_id" = Uuid, Path, description = "Task ID") ), request_body = CreateCheckpointRequest, responses( (status = 201, description = "Checkpoint created", body = CheckpointResponse), (status = 401, description = "Unauthorized"), (status = 404, description = "Task not found"), (status = 500, description = "Internal server error"), ), tag = "Mesh Supervisor" )] pub async fn create_checkpoint( State(state): State, Path(task_id): Path, headers: HeaderMap, Json(request): Json, ) -> impl IntoResponse { let auth = extract_auth(&state, &headers); let task_id_from_auth = match auth { AuthSource::ToolKey(tid) => tid, _ => { return ( StatusCode::UNAUTHORIZED, Json(ApiError::new("UNAUTHORIZED", "Tool key required")), ).into_response(); } }; // Can only create checkpoint for own task if task_id_from_auth != task_id { return ( StatusCode::FORBIDDEN, Json(ApiError::new("FORBIDDEN", "Can only create checkpoint for own task")), ).into_response(); } let pool = state.db_pool.as_ref().unwrap(); // Get task let task = match repository::get_task(pool, task_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ).into_response(); } Err(e) => { tracing::error!(error = %e, "Failed to get task"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to get task")), ).into_response(); } }; // TODO: Implement checkpoint creation via daemon command // For now, checkpoints should be created by the task itself via git commands let _ = (task, request); ( StatusCode::NOT_IMPLEMENTED, Json(ApiError::new( "NOT_IMPLEMENTED", "Checkpoint creation via API not yet implemented. Use git commands directly in the task.", )), ).into_response() } /// List checkpoints for a task. #[utoipa::path( get, path = "/api/v1/mesh/tasks/{task_id}/checkpoints", params( ("task_id" = Uuid, Path, description = "Task ID") ), responses( (status = 200, description = "List of checkpoints", body = CheckpointListResponse), (status = 401, description = "Unauthorized"), (status = 404, description = "Task not found"), (status = 500, description = "Internal server error"), ), tag = "Mesh Supervisor" )] pub async fn list_checkpoints( State(state): State, Path(task_id): Path, headers: HeaderMap, ) -> impl IntoResponse { let auth = extract_auth(&state, &headers); let _task_id_from_auth = match auth { AuthSource::ToolKey(tid) => tid, _ => { return ( StatusCode::UNAUTHORIZED, Json(ApiError::new("UNAUTHORIZED", "Tool key required")), ).into_response(); } }; let pool = state.db_pool.as_ref().unwrap(); // Get checkpoints from DB match repository::list_task_checkpoints(pool, task_id).await { Ok(checkpoints) => { let checkpoint_list: Vec = checkpoints .into_iter() .map(|c| TaskCheckpoint { id: c.id, task_id: c.task_id, checkpoint_number: c.checkpoint_number, commit_sha: c.commit_sha, branch_name: c.branch_name, message: c.message, files_changed: c.files_changed, lines_added: c.lines_added.unwrap_or(0), lines_removed: c.lines_removed.unwrap_or(0), created_at: c.created_at, }) .collect(); ( StatusCode::OK, Json(CheckpointListResponse { task_id, checkpoints: checkpoint_list, }), ).into_response() } Err(e) => { tracing::error!(error = %e, "Failed to list checkpoints"); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to list checkpoints")), ).into_response() } } } // ============================================================================= // Git Operations - Request/Response Types // ============================================================================= /// Request to create a new branch. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct CreateBranchRequest { pub branch_name: String, pub from_ref: Option, } /// Response for branch creation. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct CreateBranchResponse { pub success: bool, pub branch_name: String, pub message: String, } /// Request to merge task changes. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct MergeTaskRequest { pub target_branch: Option, #[serde(default)] pub squash: bool, } /// Response for merge operation. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct MergeTaskResponse { pub task_id: Uuid, pub success: bool, pub message: String, pub commit_sha: Option, pub conflicts: Option>, } /// Request to create a pull request. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct CreatePRRequest { pub task_id: Uuid, pub title: String, pub body: Option, #[serde(default = "default_base_branch")] pub base_branch: String, } fn default_base_branch() -> String { "main".to_string() } /// Response for PR creation. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct CreatePRResponse { pub task_id: Uuid, pub success: bool, pub message: String, pub pr_url: Option, pub pr_number: Option, } /// Response for task diff. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct TaskDiffResponse { pub task_id: Uuid, pub success: bool, pub diff: Option, pub error: Option, } // ============================================================================= // Git Operations - Handlers // ============================================================================= /// Create a new branch from supervisor's worktree. #[utoipa::path( post, path = "/api/v1/mesh/supervisor/branches", request_body = CreateBranchRequest, responses( (status = 201, description = "Branch created", body = CreateBranchResponse), (status = 400, description = "Invalid request"), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden - not a supervisor"), (status = 500, description = "Internal server error"), ), tag = "Mesh Supervisor" )] pub async fn create_branch( State(state): State, headers: HeaderMap, Json(request): Json, ) -> impl IntoResponse { let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { Ok(ids) => ids, Err(e) => return e.into_response(), }; // Find daemon running supervisor let daemon_id = { let pool = state.db_pool.as_ref().unwrap(); match repository::get_task(pool, supervisor_id).await { Ok(Some(task)) => task.daemon_id, _ => None, } }; let Some(daemon_id) = daemon_id else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("NO_DAEMON", "Supervisor has no assigned daemon")), ).into_response(); }; // Send CreateBranch command to daemon let cmd = DaemonCommand::CreateBranch { task_id: supervisor_id, branch_name: request.branch_name.clone(), from_ref: request.from_ref, }; if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { tracing::error!(error = %e, "Failed to send CreateBranch command"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), ).into_response(); } // Note: Real implementation would wait for daemon response // For now, return success immediately - daemon will send response via WebSocket ( StatusCode::CREATED, Json(CreateBranchResponse { success: true, branch_name: request.branch_name, message: "Branch creation command sent".to_string(), }), ).into_response() } /// Merge a task's changes to a target branch. #[utoipa::path( post, path = "/api/v1/mesh/supervisor/tasks/{task_id}/merge", params( ("task_id" = Uuid, Path, description = "Task ID to merge") ), request_body = MergeTaskRequest, responses( (status = 200, description = "Merge initiated", body = MergeTaskResponse), (status = 400, description = "Invalid request"), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden - not a supervisor"), (status = 404, description = "Task not found"), (status = 500, description = "Internal server error"), ), tag = "Mesh Supervisor" )] pub async fn merge_task( State(state): State, Path(task_id): Path, headers: HeaderMap, Json(request): Json, ) -> impl IntoResponse { let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { Ok(ids) => ids, Err(e) => return e.into_response(), }; let pool = state.db_pool.as_ref().unwrap(); // Get the target task let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ).into_response(); } Err(e) => { tracing::error!(error = %e, "Failed to get task"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to get task")), ).into_response(); } }; // Get daemon running the task let Some(daemon_id) = task.daemon_id else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")), ).into_response(); }; // Send MergeTaskToTarget command to daemon let cmd = DaemonCommand::MergeTaskToTarget { task_id, target_branch: request.target_branch, squash: request.squash, }; if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { tracing::error!(error = %e, "Failed to send MergeTaskToTarget command"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), ).into_response(); } ( StatusCode::OK, Json(MergeTaskResponse { task_id, success: true, message: "Merge command sent".to_string(), commit_sha: None, conflicts: None, }), ).into_response() } /// Create a pull request for a task's changes. #[utoipa::path( post, path = "/api/v1/mesh/supervisor/pr", request_body = CreatePRRequest, responses( (status = 201, description = "PR created", body = CreatePRResponse), (status = 400, description = "Invalid request"), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden - not a supervisor"), (status = 404, description = "Task not found"), (status = 500, description = "Internal server error"), ), tag = "Mesh Supervisor" )] pub async fn create_pr( State(state): State, headers: HeaderMap, Json(request): Json, ) -> impl IntoResponse { let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { Ok(ids) => ids, Err(e) => return e.into_response(), }; let pool = state.db_pool.as_ref().unwrap(); // Get the target task let task = match repository::get_task_for_owner(pool, request.task_id, owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ).into_response(); } Err(e) => { tracing::error!(error = %e, "Failed to get task"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to get task")), ).into_response(); } }; // Get daemon running the task let Some(daemon_id) = task.daemon_id else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")), ).into_response(); }; // Send CreatePR command to daemon let cmd = DaemonCommand::CreatePR { task_id: request.task_id, title: request.title.clone(), body: request.body.clone(), base_branch: request.base_branch.clone(), }; if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { tracing::error!(error = %e, "Failed to send CreatePR command"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), ).into_response(); } ( StatusCode::CREATED, Json(CreatePRResponse { task_id: request.task_id, success: true, message: "PR creation command sent".to_string(), pr_url: None, pr_number: None, }), ).into_response() } /// Get the diff for a task's changes. #[utoipa::path( get, path = "/api/v1/mesh/supervisor/tasks/{task_id}/diff", params( ("task_id" = Uuid, Path, description = "Task ID") ), responses( (status = 200, description = "Task diff", body = TaskDiffResponse), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden - not a supervisor"), (status = 404, description = "Task not found"), (status = 500, description = "Internal server error"), ), tag = "Mesh Supervisor" )] pub async fn get_task_diff( State(state): State, Path(task_id): Path, headers: HeaderMap, ) -> impl IntoResponse { let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { Ok(ids) => ids, Err(e) => return e.into_response(), }; let pool = state.db_pool.as_ref().unwrap(); // Get the target task let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ).into_response(); } Err(e) => { tracing::error!(error = %e, "Failed to get task"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to get task")), ).into_response(); } }; // Get daemon running the task let Some(daemon_id) = task.daemon_id else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")), ).into_response(); }; // Send GetTaskDiff command to daemon let cmd = DaemonCommand::GetTaskDiff { task_id }; if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { tracing::error!(error = %e, "Failed to send GetTaskDiff command"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), ).into_response(); } ( StatusCode::OK, Json(TaskDiffResponse { task_id, success: true, diff: None, error: Some("Diff command sent - response will be streamed".to_string()), }), ).into_response() }