//! HTTP handlers for supervisor-specific mesh operations.
//!
//! These endpoints are used by supervisor tasks (via supervisor.sh) to orchestrate
//! contract work: spawning tasks, waiting for completion, reading worktree files, etc.
use axum::{
extract::{Path, State},
http::{HeaderMap, StatusCode},
response::IntoResponse,
Json,
};
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use uuid::Uuid;
use crate::db::models::{CreateTaskRequest, Task, TaskSummary};
use crate::db::repository;
use crate::server::handlers::mesh::{extract_auth, AuthSource};
use crate::server::messages::ApiError;
use crate::server::state::{DaemonCommand, SharedState};
// =============================================================================
// Request/Response Types
// =============================================================================
/// Request to spawn a new task from supervisor.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct SpawnTaskRequest {
pub name: String,
pub plan: String,
pub contract_id: Uuid,
pub parent_task_id: Option<Uuid>,
pub checkpoint_sha: Option<String>,
/// Repository URL for the task (supervisor should provide this)
pub repository_url: Option<String>,
}
/// Request to wait for task completion.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct WaitForTaskRequest {
#[serde(default = "default_timeout")]
pub timeout_seconds: i32,
}
fn default_timeout() -> i32 {
300
}
/// Request to read a file from task worktree.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ReadWorktreeFileRequest {
pub file_path: String,
}
/// Request to create a checkpoint.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateCheckpointRequest {
pub message: String,
}
/// Response for task tree.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct TaskTreeResponse {
pub tasks: Vec<TaskSummary>,
pub supervisor_task_id: Option<Uuid>,
pub total_count: usize,
}
/// Response for wait operation.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct WaitResponse {
pub task_id: Uuid,
pub status: String,
pub completed: bool,
pub output_summary: Option<String>,
}
/// Response for read file operation.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ReadFileResponse {
pub task_id: Uuid,
pub file_path: String,
pub content: String,
pub exists: bool,
}
/// Response for checkpoint operations.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CheckpointResponse {
pub task_id: Uuid,
pub checkpoint_number: i32,
pub commit_sha: String,
pub message: String,
}
/// Task checkpoint info.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct TaskCheckpoint {
pub id: Uuid,
pub task_id: Uuid,
pub checkpoint_number: i32,
pub commit_sha: String,
pub branch_name: String,
pub message: String,
pub files_changed: Option<serde_json::Value>,
pub lines_added: i32,
pub lines_removed: i32,
pub created_at: chrono::DateTime<chrono::Utc>,
}
/// Response for list checkpoints.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CheckpointListResponse {
pub task_id: Uuid,
pub checkpoints: Vec<TaskCheckpoint>,
}
// =============================================================================
// Helper Functions
// =============================================================================
/// Verify the request comes from a supervisor task and extract ownership info.
async fn verify_supervisor_auth(
state: &SharedState,
headers: &HeaderMap,
contract_id: Option<Uuid>,
) -> Result<(Uuid, Uuid), (StatusCode, Json<ApiError>)> {
let auth = extract_auth(state, headers);
let task_id = match auth {
AuthSource::ToolKey(task_id) => task_id,
_ => {
return Err((
StatusCode::UNAUTHORIZED,
Json(ApiError::new("UNAUTHORIZED", "Supervisor endpoints require tool key auth")),
));
}
};
// Get the task to verify it's a supervisor and get owner_id
let pool = state.db_pool.as_ref().ok_or_else(|| {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
})?;
let task = repository::get_task(pool, task_id)
.await
.map_err(|e| {
tracing::error!(error = %e, "Failed to get supervisor task");
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to verify supervisor")),
)
})?
.ok_or_else(|| {
(
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
})?;
// Verify task is a supervisor
if !task.is_supervisor {
return Err((
StatusCode::FORBIDDEN,
Json(ApiError::new("NOT_SUPERVISOR", "Only supervisor tasks can use these endpoints")),
));
}
// If contract_id provided, verify the supervisor belongs to that contract
if let Some(cid) = contract_id {
if task.contract_id != Some(cid) {
return Err((
StatusCode::FORBIDDEN,
Json(ApiError::new("CONTRACT_MISMATCH", "Supervisor does not belong to this contract")),
));
}
}
Ok((task_id, task.owner_id))
}
// =============================================================================
// Contract Task Handlers
// =============================================================================
/// List all tasks in a contract's tree.
#[utoipa::path(
get,
path = "/api/v1/mesh/supervisor/contracts/{contract_id}/tasks",
params(
("contract_id" = Uuid, Path, description = "Contract ID")
),
responses(
(status = 200, description = "List of tasks in contract", body = TaskTreeResponse),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden - not a supervisor"),
(status = 500, description = "Internal server error"),
),
tag = "Mesh Supervisor"
)]
pub async fn list_contract_tasks(
State(state): State<SharedState>,
Path(contract_id): Path<Uuid>,
headers: HeaderMap,
) -> impl IntoResponse {
let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, Some(contract_id)).await {
Ok(ids) => ids,
Err(e) => return e.into_response(),
};
let pool = state.db_pool.as_ref().unwrap();
// Get all tasks for this contract
match repository::list_tasks_by_contract(pool, contract_id, owner_id).await {
Ok(tasks) => {
let supervisor_task_id = tasks.iter().find(|t| t.is_supervisor).map(|t| t.id);
let summaries: Vec<TaskSummary> = tasks.into_iter().map(TaskSummary::from).collect();
let total_count = summaries.len();
(
StatusCode::OK,
Json(TaskTreeResponse {
tasks: summaries,
supervisor_task_id,
total_count,
}),
).into_response()
}
Err(e) => {
tracing::error!(error = %e, "Failed to list contract tasks");
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to list tasks")),
).into_response()
}
}
}
/// Get full task tree structure for a contract.
#[utoipa::path(
get,
path = "/api/v1/mesh/supervisor/contracts/{contract_id}/tree",
params(
("contract_id" = Uuid, Path, description = "Contract ID")
),
responses(
(status = 200, description = "Task tree structure", body = TaskTreeResponse),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden - not a supervisor"),
(status = 500, description = "Internal server error"),
),
tag = "Mesh Supervisor"
)]
pub async fn get_contract_tree(
State(state): State<SharedState>,
Path(contract_id): Path<Uuid>,
headers: HeaderMap,
) -> impl IntoResponse {
// Same as list_contract_tasks for now - can add tree structure later
list_contract_tasks(State(state), Path(contract_id), headers).await
}
// =============================================================================
// Task Spawn Handler
// =============================================================================
/// Spawn a new task (supervisor only).
#[utoipa::path(
post,
path = "/api/v1/mesh/supervisor/tasks",
request_body = SpawnTaskRequest,
responses(
(status = 201, description = "Task created", body = Task),
(status = 400, description = "Invalid request"),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden - not a supervisor"),
(status = 500, description = "Internal server error"),
),
tag = "Mesh Supervisor"
)]
pub async fn spawn_task(
State(state): State<SharedState>,
headers: HeaderMap,
Json(request): Json<SpawnTaskRequest>,
) -> impl IntoResponse {
let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, Some(request.contract_id)).await {
Ok(ids) => ids,
Err(e) => return e.into_response(),
};
let pool = state.db_pool.as_ref().unwrap();
// Verify contract exists
let _contract = match repository::get_contract_for_owner(pool, request.contract_id, owner_id).await {
Ok(Some(c)) => c,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Contract not found")),
).into_response();
}
Err(e) => {
tracing::error!(error = %e, "Failed to get contract");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to get contract")),
).into_response();
}
};
// Get repository URL from the contract's primary repository
let repo_url = match repository::list_contract_repositories(pool, request.contract_id).await {
Ok(repos) => {
// Prefer primary repo, fallback to first repo
repos.iter()
.find(|r| r.is_primary)
.or(repos.first())
.and_then(|r| r.repository_url.clone())
}
Err(e) => {
tracing::warn!(error = %e, "Failed to get contract repositories, continuing without repo URL");
None
}
};
// Supervisor can override with explicit repository_url
let repo_url = request.repository_url.clone().or(repo_url);
// Create task request
let create_req = CreateTaskRequest {
name: request.name.clone(),
description: None,
plan: request.plan.clone(),
repository_url: repo_url.clone(),
contract_id: request.contract_id,
parent_task_id: request.parent_task_id,
is_supervisor: false,
checkpoint_sha: request.checkpoint_sha.clone(),
merge_mode: Some("manual".to_string()),
priority: 0,
base_branch: None,
target_branch: None,
target_repo_path: None,
completion_action: None,
continue_from_task_id: None,
copy_files: None,
};
// Create task in DB
let task = match repository::create_task_for_owner(pool, owner_id, create_req).await {
Ok(t) => t,
Err(e) => {
tracing::error!(error = %e, "Failed to create task");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to create task")),
).into_response();
}
};
tracing::info!(
supervisor_id = %supervisor_id,
task_id = %task.id,
task_name = %task.name,
"Supervisor spawned new task"
);
// Start task on a daemon
// Find a daemon that belongs to this owner
for entry in state.daemon_connections.iter() {
let daemon = entry.value();
if daemon.owner_id == owner_id {
// Send spawn command to first available daemon
let cmd = DaemonCommand::SpawnTask {
task_id: task.id,
task_name: task.name.clone(),
plan: task.plan.clone(),
repo_url: repo_url.clone(),
base_branch: task.base_branch.clone(),
target_branch: task.target_branch.clone(),
parent_task_id: task.parent_task_id,
depth: task.depth,
is_orchestrator: false,
target_repo_path: task.target_repo_path.clone(),
completion_action: task.completion_action.clone(),
continue_from_task_id: task.continue_from_task_id,
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: task.contract_id,
is_supervisor: false,
};
if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
tracing::warn!(error = %e, daemon_id = %daemon.id, "Failed to send spawn command");
} else {
tracing::info!(task_id = %task.id, daemon_id = %daemon.id, repo_url = ?repo_url, "Task spawn command sent");
}
break;
}
}
(StatusCode::CREATED, Json(task)).into_response()
}
// =============================================================================
// Wait for Task Handler
// =============================================================================
/// Wait for a task to complete.
#[utoipa::path(
post,
path = "/api/v1/mesh/supervisor/tasks/{task_id}/wait",
params(
("task_id" = Uuid, Path, description = "Task ID to wait for")
),
request_body = WaitForTaskRequest,
responses(
(status = 200, description = "Task completed or timed out", body = WaitResponse),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden - not a supervisor"),
(status = 404, description = "Task not found"),
(status = 500, description = "Internal server error"),
),
tag = "Mesh Supervisor"
)]
pub async fn wait_for_task(
State(state): State<SharedState>,
Path(task_id): Path<Uuid>,
headers: HeaderMap,
Json(request): Json<WaitForTaskRequest>,
) -> impl IntoResponse {
let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
Ok(ids) => ids,
Err(e) => return e.into_response(),
};
let pool = state.db_pool.as_ref().unwrap();
// Verify task belongs to same owner
let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
).into_response();
}
Err(e) => {
tracing::error!(error = %e, "Failed to get task");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to get task")),
).into_response();
}
};
// Check if already done
if task.status == "done" || task.status == "failed" || task.status == "merged" {
return (
StatusCode::OK,
Json(WaitResponse {
task_id,
status: task.status,
completed: true,
output_summary: None,
}),
).into_response();
}
// Subscribe to task completions
let mut rx = state.task_completions.subscribe();
let timeout = tokio::time::Duration::from_secs(request.timeout_seconds as u64);
// Wait for completion or timeout
let result = tokio::time::timeout(timeout, async {
loop {
match rx.recv().await {
Ok(notification) => {
if notification.task_id == task_id {
return Some(notification);
}
}
Err(_) => {
// Channel closed or lagged - check DB directly
if let Ok(Some(t)) = repository::get_task(pool, task_id).await {
if t.status == "done" || t.status == "failed" || t.status == "merged" {
return Some(crate::server::state::TaskCompletionNotification {
task_id: t.id,
owner_id: Some(t.owner_id),
contract_id: t.contract_id,
parent_task_id: t.parent_task_id,
status: t.status,
output_summary: None,
worktree_path: None,
error_message: t.error_message,
});
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
}
}).await;
match result {
Ok(Some(notification)) => {
(
StatusCode::OK,
Json(WaitResponse {
task_id,
status: notification.status,
completed: true,
output_summary: notification.output_summary,
}),
).into_response()
}
Ok(None) | Err(_) => {
// Timeout - check final status
let final_status = repository::get_task(pool, task_id)
.await
.ok()
.flatten()
.map(|t| t.status)
.unwrap_or_else(|| "unknown".to_string());
(
StatusCode::OK,
Json(WaitResponse {
task_id,
status: final_status.clone(),
completed: final_status == "done" || final_status == "failed" || final_status == "merged",
output_summary: None,
}),
).into_response()
}
}
}
// =============================================================================
// Read Worktree File Handler
// =============================================================================
/// Read a file from a task's worktree.
#[utoipa::path(
post,
path = "/api/v1/mesh/supervisor/tasks/{task_id}/read-file",
params(
("task_id" = Uuid, Path, description = "Task ID")
),
request_body = ReadWorktreeFileRequest,
responses(
(status = 200, description = "File content", body = ReadFileResponse),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden - not a supervisor"),
(status = 404, description = "Task not found"),
(status = 500, description = "Internal server error"),
),
tag = "Mesh Supervisor"
)]
pub async fn read_worktree_file(
State(state): State<SharedState>,
Path(task_id): Path<Uuid>,
headers: HeaderMap,
Json(request): Json<ReadWorktreeFileRequest>,
) -> impl IntoResponse {
let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
Ok(ids) => ids,
Err(e) => return e.into_response(),
};
let pool = state.db_pool.as_ref().unwrap();
// Get task to verify ownership
let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
).into_response();
}
Err(e) => {
tracing::error!(error = %e, "Failed to get task");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to get task")),
).into_response();
}
};
// TODO: Implement file reading via worktree path
// For now, return not implemented - supervisor should use local file access via worktree
let _ = (task, request);
(
StatusCode::NOT_IMPLEMENTED,
Json(ApiError::new(
"NOT_IMPLEMENTED",
"Worktree file reading via API not yet implemented. Use local filesystem access via worktree path.",
)),
).into_response()
}
// =============================================================================
// Checkpoint Handlers
// =============================================================================
/// Create a git checkpoint for a task.
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{task_id}/checkpoint",
params(
("task_id" = Uuid, Path, description = "Task ID")
),
request_body = CreateCheckpointRequest,
responses(
(status = 201, description = "Checkpoint created", body = CheckpointResponse),
(status = 401, description = "Unauthorized"),
(status = 404, description = "Task not found"),
(status = 500, description = "Internal server error"),
),
tag = "Mesh Supervisor"
)]
pub async fn create_checkpoint(
State(state): State<SharedState>,
Path(task_id): Path<Uuid>,
headers: HeaderMap,
Json(request): Json<CreateCheckpointRequest>,
) -> impl IntoResponse {
let auth = extract_auth(&state, &headers);
let task_id_from_auth = match auth {
AuthSource::ToolKey(tid) => tid,
_ => {
return (
StatusCode::UNAUTHORIZED,
Json(ApiError::new("UNAUTHORIZED", "Tool key required")),
).into_response();
}
};
// Can only create checkpoint for own task
if task_id_from_auth != task_id {
return (
StatusCode::FORBIDDEN,
Json(ApiError::new("FORBIDDEN", "Can only create checkpoint for own task")),
).into_response();
}
let pool = state.db_pool.as_ref().unwrap();
// Get task
let task = match repository::get_task(pool, task_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
).into_response();
}
Err(e) => {
tracing::error!(error = %e, "Failed to get task");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to get task")),
).into_response();
}
};
// TODO: Implement checkpoint creation via daemon command
// For now, checkpoints should be created by the task itself via git commands
let _ = (task, request);
(
StatusCode::NOT_IMPLEMENTED,
Json(ApiError::new(
"NOT_IMPLEMENTED",
"Checkpoint creation via API not yet implemented. Use git commands directly in the task.",
)),
).into_response()
}
/// List checkpoints for a task.
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks/{task_id}/checkpoints",
params(
("task_id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "List of checkpoints", body = CheckpointListResponse),
(status = 401, description = "Unauthorized"),
(status = 404, description = "Task not found"),
(status = 500, description = "Internal server error"),
),
tag = "Mesh Supervisor"
)]
pub async fn list_checkpoints(
State(state): State<SharedState>,
Path(task_id): Path<Uuid>,
headers: HeaderMap,
) -> impl IntoResponse {
let auth = extract_auth(&state, &headers);
let _task_id_from_auth = match auth {
AuthSource::ToolKey(tid) => tid,
_ => {
return (
StatusCode::UNAUTHORIZED,
Json(ApiError::new("UNAUTHORIZED", "Tool key required")),
).into_response();
}
};
let pool = state.db_pool.as_ref().unwrap();
// Get checkpoints from DB
match repository::list_task_checkpoints(pool, task_id).await {
Ok(checkpoints) => {
let checkpoint_list: Vec<TaskCheckpoint> = checkpoints
.into_iter()
.map(|c| TaskCheckpoint {
id: c.id,
task_id: c.task_id,
checkpoint_number: c.checkpoint_number,
commit_sha: c.commit_sha,
branch_name: c.branch_name,
message: c.message,
files_changed: c.files_changed,
lines_added: c.lines_added.unwrap_or(0),
lines_removed: c.lines_removed.unwrap_or(0),
created_at: c.created_at,
})
.collect();
(
StatusCode::OK,
Json(CheckpointListResponse {
task_id,
checkpoints: checkpoint_list,
}),
).into_response()
}
Err(e) => {
tracing::error!(error = %e, "Failed to list checkpoints");
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to list checkpoints")),
).into_response()
}
}
}
// =============================================================================
// Git Operations - Request/Response Types
// =============================================================================
/// Request to create a new branch.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateBranchRequest {
pub branch_name: String,
pub from_ref: Option<String>,
}
/// Response for branch creation.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateBranchResponse {
pub success: bool,
pub branch_name: String,
pub message: String,
}
/// Request to merge task changes.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct MergeTaskRequest {
pub target_branch: Option<String>,
#[serde(default)]
pub squash: bool,
}
/// Response for merge operation.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct MergeTaskResponse {
pub task_id: Uuid,
pub success: bool,
pub message: String,
pub commit_sha: Option<String>,
pub conflicts: Option<Vec<String>>,
}
/// Request to create a pull request.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreatePRRequest {
pub task_id: Uuid,
pub title: String,
pub body: Option<String>,
#[serde(default = "default_base_branch")]
pub base_branch: String,
}
fn default_base_branch() -> String {
"main".to_string()
}
/// Response for PR creation.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreatePRResponse {
pub task_id: Uuid,
pub success: bool,
pub message: String,
pub pr_url: Option<String>,
pub pr_number: Option<i32>,
}
/// Response for task diff.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct TaskDiffResponse {
pub task_id: Uuid,
pub success: bool,
pub diff: Option<String>,
pub error: Option<String>,
}
// =============================================================================
// Git Operations - Handlers
// =============================================================================
/// Create a new branch from supervisor's worktree.
#[utoipa::path(
post,
path = "/api/v1/mesh/supervisor/branches",
request_body = CreateBranchRequest,
responses(
(status = 201, description = "Branch created", body = CreateBranchResponse),
(status = 400, description = "Invalid request"),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden - not a supervisor"),
(status = 500, description = "Internal server error"),
),
tag = "Mesh Supervisor"
)]
pub async fn create_branch(
State(state): State<SharedState>,
headers: HeaderMap,
Json(request): Json<CreateBranchRequest>,
) -> impl IntoResponse {
let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
Ok(ids) => ids,
Err(e) => return e.into_response(),
};
// Find daemon running supervisor
let daemon_id = {
let pool = state.db_pool.as_ref().unwrap();
match repository::get_task(pool, supervisor_id).await {
Ok(Some(task)) => task.daemon_id,
_ => None,
}
};
let Some(daemon_id) = daemon_id else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("NO_DAEMON", "Supervisor has no assigned daemon")),
).into_response();
};
// Send CreateBranch command to daemon
let cmd = DaemonCommand::CreateBranch {
task_id: supervisor_id,
branch_name: request.branch_name.clone(),
from_ref: request.from_ref,
};
if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
tracing::error!(error = %e, "Failed to send CreateBranch command");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")),
).into_response();
}
// Note: Real implementation would wait for daemon response
// For now, return success immediately - daemon will send response via WebSocket
(
StatusCode::CREATED,
Json(CreateBranchResponse {
success: true,
branch_name: request.branch_name,
message: "Branch creation command sent".to_string(),
}),
).into_response()
}
/// Merge a task's changes to a target branch.
#[utoipa::path(
post,
path = "/api/v1/mesh/supervisor/tasks/{task_id}/merge",
params(
("task_id" = Uuid, Path, description = "Task ID to merge")
),
request_body = MergeTaskRequest,
responses(
(status = 200, description = "Merge initiated", body = MergeTaskResponse),
(status = 400, description = "Invalid request"),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden - not a supervisor"),
(status = 404, description = "Task not found"),
(status = 500, description = "Internal server error"),
),
tag = "Mesh Supervisor"
)]
pub async fn merge_task(
State(state): State<SharedState>,
Path(task_id): Path<Uuid>,
headers: HeaderMap,
Json(request): Json<MergeTaskRequest>,
) -> impl IntoResponse {
let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
Ok(ids) => ids,
Err(e) => return e.into_response(),
};
let pool = state.db_pool.as_ref().unwrap();
// Get the target task
let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
).into_response();
}
Err(e) => {
tracing::error!(error = %e, "Failed to get task");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to get task")),
).into_response();
}
};
// Get daemon running the task
let Some(daemon_id) = task.daemon_id else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")),
).into_response();
};
// Send MergeTaskToTarget command to daemon
let cmd = DaemonCommand::MergeTaskToTarget {
task_id,
target_branch: request.target_branch,
squash: request.squash,
};
if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
tracing::error!(error = %e, "Failed to send MergeTaskToTarget command");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")),
).into_response();
}
(
StatusCode::OK,
Json(MergeTaskResponse {
task_id,
success: true,
message: "Merge command sent".to_string(),
commit_sha: None,
conflicts: None,
}),
).into_response()
}
/// Create a pull request for a task's changes.
#[utoipa::path(
post,
path = "/api/v1/mesh/supervisor/pr",
request_body = CreatePRRequest,
responses(
(status = 201, description = "PR created", body = CreatePRResponse),
(status = 400, description = "Invalid request"),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden - not a supervisor"),
(status = 404, description = "Task not found"),
(status = 500, description = "Internal server error"),
),
tag = "Mesh Supervisor"
)]
pub async fn create_pr(
State(state): State<SharedState>,
headers: HeaderMap,
Json(request): Json<CreatePRRequest>,
) -> impl IntoResponse {
let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
Ok(ids) => ids,
Err(e) => return e.into_response(),
};
let pool = state.db_pool.as_ref().unwrap();
// Get the target task
let task = match repository::get_task_for_owner(pool, request.task_id, owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
).into_response();
}
Err(e) => {
tracing::error!(error = %e, "Failed to get task");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to get task")),
).into_response();
}
};
// Get daemon running the task
let Some(daemon_id) = task.daemon_id else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")),
).into_response();
};
// Send CreatePR command to daemon
let cmd = DaemonCommand::CreatePR {
task_id: request.task_id,
title: request.title.clone(),
body: request.body.clone(),
base_branch: request.base_branch.clone(),
};
if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
tracing::error!(error = %e, "Failed to send CreatePR command");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")),
).into_response();
}
(
StatusCode::CREATED,
Json(CreatePRResponse {
task_id: request.task_id,
success: true,
message: "PR creation command sent".to_string(),
pr_url: None,
pr_number: None,
}),
).into_response()
}
/// Get the diff for a task's changes.
#[utoipa::path(
get,
path = "/api/v1/mesh/supervisor/tasks/{task_id}/diff",
params(
("task_id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "Task diff", body = TaskDiffResponse),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden - not a supervisor"),
(status = 404, description = "Task not found"),
(status = 500, description = "Internal server error"),
),
tag = "Mesh Supervisor"
)]
pub async fn get_task_diff(
State(state): State<SharedState>,
Path(task_id): Path<Uuid>,
headers: HeaderMap,
) -> impl IntoResponse {
let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
Ok(ids) => ids,
Err(e) => return e.into_response(),
};
let pool = state.db_pool.as_ref().unwrap();
// Get the target task
let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
).into_response();
}
Err(e) => {
tracing::error!(error = %e, "Failed to get task");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to get task")),
).into_response();
}
};
// Get daemon running the task
let Some(daemon_id) = task.daemon_id else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")),
).into_response();
};
// Send GetTaskDiff command to daemon
let cmd = DaemonCommand::GetTaskDiff { task_id };
if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
tracing::error!(error = %e, "Failed to send GetTaskDiff command");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")),
).into_response();
}
(
StatusCode::OK,
Json(TaskDiffResponse {
task_id,
success: true,
diff: None,
error: Some("Diff command sent - response will be streamed".to_string()),
}),
).into_response()
}