summaryrefslogtreecommitdiff
path: root/makima/src/server/handlers/mesh_supervisor.rs
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-11 05:52:14 +0000
committersoryu <soryu@soryu.co>2026-01-15 00:21:16 +0000
commit87044a747b47bd83249d61a45842c7f7b2eae56d (patch)
treeef2000ce79ffcc2723ef841acef5aa1deb1d5378 /makima/src/server/handlers/mesh_supervisor.rs
parent077820c4167c168072d217a1b01df840463a12a8 (diff)
downloadsoryu-87044a747b47bd83249d61a45842c7f7b2eae56d.tar.gz
soryu-87044a747b47bd83249d61a45842c7f7b2eae56d.zip
Contract system
Diffstat (limited to 'makima/src/server/handlers/mesh_supervisor.rs')
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs1153
1 files changed, 1153 insertions, 0 deletions
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
new file mode 100644
index 0000000..ac59130
--- /dev/null
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -0,0 +1,1153 @@
+//! HTTP handlers for supervisor-specific mesh operations.
+//!
+//! These endpoints are used by supervisor tasks (via supervisor.sh) to orchestrate
+//! contract work: spawning tasks, waiting for completion, reading worktree files, etc.
+
+use axum::{
+ extract::{Path, State},
+ http::{HeaderMap, StatusCode},
+ response::IntoResponse,
+ Json,
+};
+use serde::{Deserialize, Serialize};
+use utoipa::ToSchema;
+use uuid::Uuid;
+
+use crate::db::models::{CreateTaskRequest, Task, TaskSummary};
+use crate::db::repository;
+use crate::server::handlers::mesh::{extract_auth, AuthSource};
+use crate::server::messages::ApiError;
+use crate::server::state::{DaemonCommand, SharedState};
+
+// =============================================================================
+// Request/Response Types
+// =============================================================================
+
+/// Request to spawn a new task from supervisor.
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct SpawnTaskRequest {
+ pub name: String,
+ pub plan: String,
+ pub contract_id: Uuid,
+ pub parent_task_id: Option<Uuid>,
+ pub checkpoint_sha: Option<String>,
+ /// Repository URL for the task (supervisor should provide this)
+ pub repository_url: Option<String>,
+}
+
+/// Request to wait for task completion.
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct WaitForTaskRequest {
+ #[serde(default = "default_timeout")]
+ pub timeout_seconds: i32,
+}
+
+fn default_timeout() -> i32 {
+ 300
+}
+
+/// Request to read a file from task worktree.
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ReadWorktreeFileRequest {
+ pub file_path: String,
+}
+
+/// Request to create a checkpoint.
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct CreateCheckpointRequest {
+ pub message: String,
+}
+
+/// Response for task tree.
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct TaskTreeResponse {
+ pub tasks: Vec<TaskSummary>,
+ pub supervisor_task_id: Option<Uuid>,
+ pub total_count: usize,
+}
+
+/// Response for wait operation.
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct WaitResponse {
+ pub task_id: Uuid,
+ pub status: String,
+ pub completed: bool,
+ pub output_summary: Option<String>,
+}
+
+/// Response for read file operation.
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ReadFileResponse {
+ pub task_id: Uuid,
+ pub file_path: String,
+ pub content: String,
+ pub exists: bool,
+}
+
+/// Response for checkpoint operations.
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct CheckpointResponse {
+ pub task_id: Uuid,
+ pub checkpoint_number: i32,
+ pub commit_sha: String,
+ pub message: String,
+}
+
+/// Task checkpoint info.
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct TaskCheckpoint {
+ pub id: Uuid,
+ pub task_id: Uuid,
+ pub checkpoint_number: i32,
+ pub commit_sha: String,
+ pub branch_name: String,
+ pub message: String,
+ pub files_changed: Option<serde_json::Value>,
+ pub lines_added: i32,
+ pub lines_removed: i32,
+ pub created_at: chrono::DateTime<chrono::Utc>,
+}
+
+/// Response for list checkpoints.
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct CheckpointListResponse {
+ pub task_id: Uuid,
+ pub checkpoints: Vec<TaskCheckpoint>,
+}
+
+// =============================================================================
+// Helper Functions
+// =============================================================================
+
+/// Verify the request comes from a supervisor task and extract ownership info.
+async fn verify_supervisor_auth(
+ state: &SharedState,
+ headers: &HeaderMap,
+ contract_id: Option<Uuid>,
+) -> Result<(Uuid, Uuid), (StatusCode, Json<ApiError>)> {
+ let auth = extract_auth(state, headers);
+
+ let task_id = match auth {
+ AuthSource::ToolKey(task_id) => task_id,
+ _ => {
+ return Err((
+ StatusCode::UNAUTHORIZED,
+ Json(ApiError::new("UNAUTHORIZED", "Supervisor endpoints require tool key auth")),
+ ));
+ }
+ };
+
+ // Get the task to verify it's a supervisor and get owner_id
+ let pool = state.db_pool.as_ref().ok_or_else(|| {
+ (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ })?;
+
+ let task = repository::get_task(pool, task_id)
+ .await
+ .map_err(|e| {
+ tracing::error!(error = %e, "Failed to get supervisor task");
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", "Failed to verify supervisor")),
+ )
+ })?
+ .ok_or_else(|| {
+ (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ })?;
+
+ // Verify task is a supervisor
+ if !task.is_supervisor {
+ return Err((
+ StatusCode::FORBIDDEN,
+ Json(ApiError::new("NOT_SUPERVISOR", "Only supervisor tasks can use these endpoints")),
+ ));
+ }
+
+ // If contract_id provided, verify the supervisor belongs to that contract
+ if let Some(cid) = contract_id {
+ if task.contract_id != Some(cid) {
+ return Err((
+ StatusCode::FORBIDDEN,
+ Json(ApiError::new("CONTRACT_MISMATCH", "Supervisor does not belong to this contract")),
+ ));
+ }
+ }
+
+ Ok((task_id, task.owner_id))
+}
+
+// =============================================================================
+// Contract Task Handlers
+// =============================================================================
+
+/// List all tasks in a contract's tree.
+#[utoipa::path(
+ get,
+ path = "/api/v1/mesh/supervisor/contracts/{contract_id}/tasks",
+ params(
+ ("contract_id" = Uuid, Path, description = "Contract ID")
+ ),
+ responses(
+ (status = 200, description = "List of tasks in contract", body = TaskTreeResponse),
+ (status = 401, description = "Unauthorized"),
+ (status = 403, description = "Forbidden - not a supervisor"),
+ (status = 500, description = "Internal server error"),
+ ),
+ tag = "Mesh Supervisor"
+)]
+pub async fn list_contract_tasks(
+ State(state): State<SharedState>,
+ Path(contract_id): Path<Uuid>,
+ headers: HeaderMap,
+) -> impl IntoResponse {
+ let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, Some(contract_id)).await {
+ Ok(ids) => ids,
+ Err(e) => return e.into_response(),
+ };
+
+ let pool = state.db_pool.as_ref().unwrap();
+
+ // Get all tasks for this contract
+ match repository::list_tasks_by_contract(pool, contract_id, owner_id).await {
+ Ok(tasks) => {
+ let supervisor_task_id = tasks.iter().find(|t| t.is_supervisor).map(|t| t.id);
+ let summaries: Vec<TaskSummary> = tasks.into_iter().map(TaskSummary::from).collect();
+ let total_count = summaries.len();
+
+ (
+ StatusCode::OK,
+ Json(TaskTreeResponse {
+ tasks: summaries,
+ supervisor_task_id,
+ total_count,
+ }),
+ ).into_response()
+ }
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to list contract tasks");
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", "Failed to list tasks")),
+ ).into_response()
+ }
+ }
+}
+
+/// Get full task tree structure for a contract.
+#[utoipa::path(
+ get,
+ path = "/api/v1/mesh/supervisor/contracts/{contract_id}/tree",
+ params(
+ ("contract_id" = Uuid, Path, description = "Contract ID")
+ ),
+ responses(
+ (status = 200, description = "Task tree structure", body = TaskTreeResponse),
+ (status = 401, description = "Unauthorized"),
+ (status = 403, description = "Forbidden - not a supervisor"),
+ (status = 500, description = "Internal server error"),
+ ),
+ tag = "Mesh Supervisor"
+)]
+pub async fn get_contract_tree(
+ State(state): State<SharedState>,
+ Path(contract_id): Path<Uuid>,
+ headers: HeaderMap,
+) -> impl IntoResponse {
+ // Same as list_contract_tasks for now - can add tree structure later
+ list_contract_tasks(State(state), Path(contract_id), headers).await
+}
+
+// =============================================================================
+// Task Spawn Handler
+// =============================================================================
+
+/// Spawn a new task (supervisor only).
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/supervisor/tasks",
+ request_body = SpawnTaskRequest,
+ responses(
+ (status = 201, description = "Task created", body = Task),
+ (status = 400, description = "Invalid request"),
+ (status = 401, description = "Unauthorized"),
+ (status = 403, description = "Forbidden - not a supervisor"),
+ (status = 500, description = "Internal server error"),
+ ),
+ tag = "Mesh Supervisor"
+)]
+pub async fn spawn_task(
+ State(state): State<SharedState>,
+ headers: HeaderMap,
+ Json(request): Json<SpawnTaskRequest>,
+) -> impl IntoResponse {
+ let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, Some(request.contract_id)).await {
+ Ok(ids) => ids,
+ Err(e) => return e.into_response(),
+ };
+
+ let pool = state.db_pool.as_ref().unwrap();
+
+ // Verify contract exists
+ let _contract = match repository::get_contract_for_owner(pool, request.contract_id, owner_id).await {
+ Ok(Some(c)) => c,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Contract not found")),
+ ).into_response();
+ }
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to get contract");
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", "Failed to get contract")),
+ ).into_response();
+ }
+ };
+
+ // Get repository URL from the contract's primary repository
+ let repo_url = match repository::list_contract_repositories(pool, request.contract_id).await {
+ Ok(repos) => {
+ // Prefer primary repo, fallback to first repo
+ repos.iter()
+ .find(|r| r.is_primary)
+ .or(repos.first())
+ .and_then(|r| r.repository_url.clone())
+ }
+ Err(e) => {
+ tracing::warn!(error = %e, "Failed to get contract repositories, continuing without repo URL");
+ None
+ }
+ };
+
+ // Supervisor can override with explicit repository_url
+ let repo_url = request.repository_url.clone().or(repo_url);
+
+ // Create task request
+ let create_req = CreateTaskRequest {
+ name: request.name.clone(),
+ description: None,
+ plan: request.plan.clone(),
+ repository_url: repo_url.clone(),
+ contract_id: request.contract_id,
+ parent_task_id: request.parent_task_id,
+ is_supervisor: false,
+ checkpoint_sha: request.checkpoint_sha.clone(),
+ merge_mode: Some("manual".to_string()),
+ priority: 0,
+ base_branch: None,
+ target_branch: None,
+ target_repo_path: None,
+ completion_action: None,
+ continue_from_task_id: None,
+ copy_files: None,
+ };
+
+ // Create task in DB
+ let task = match repository::create_task_for_owner(pool, owner_id, create_req).await {
+ Ok(t) => t,
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to create task");
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", "Failed to create task")),
+ ).into_response();
+ }
+ };
+
+ tracing::info!(
+ supervisor_id = %supervisor_id,
+ task_id = %task.id,
+ task_name = %task.name,
+ "Supervisor spawned new task"
+ );
+
+ // Start task on a daemon
+ // Find a daemon that belongs to this owner
+ for entry in state.daemon_connections.iter() {
+ let daemon = entry.value();
+ if daemon.owner_id == owner_id {
+ // Send spawn command to first available daemon
+ let cmd = DaemonCommand::SpawnTask {
+ task_id: task.id,
+ task_name: task.name.clone(),
+ plan: task.plan.clone(),
+ repo_url: repo_url.clone(),
+ base_branch: task.base_branch.clone(),
+ target_branch: task.target_branch.clone(),
+ parent_task_id: task.parent_task_id,
+ depth: task.depth,
+ is_orchestrator: false,
+ target_repo_path: task.target_repo_path.clone(),
+ completion_action: task.completion_action.clone(),
+ continue_from_task_id: task.continue_from_task_id,
+ copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
+ contract_id: task.contract_id,
+ is_supervisor: false,
+ };
+
+ if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
+ tracing::warn!(error = %e, daemon_id = %daemon.id, "Failed to send spawn command");
+ } else {
+ tracing::info!(task_id = %task.id, daemon_id = %daemon.id, repo_url = ?repo_url, "Task spawn command sent");
+ }
+ break;
+ }
+ }
+
+ (StatusCode::CREATED, Json(task)).into_response()
+}
+
+// =============================================================================
+// Wait for Task Handler
+// =============================================================================
+
+/// Wait for a task to complete.
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/supervisor/tasks/{task_id}/wait",
+ params(
+ ("task_id" = Uuid, Path, description = "Task ID to wait for")
+ ),
+ request_body = WaitForTaskRequest,
+ responses(
+ (status = 200, description = "Task completed or timed out", body = WaitResponse),
+ (status = 401, description = "Unauthorized"),
+ (status = 403, description = "Forbidden - not a supervisor"),
+ (status = 404, description = "Task not found"),
+ (status = 500, description = "Internal server error"),
+ ),
+ tag = "Mesh Supervisor"
+)]
+pub async fn wait_for_task(
+ State(state): State<SharedState>,
+ Path(task_id): Path<Uuid>,
+ headers: HeaderMap,
+ Json(request): Json<WaitForTaskRequest>,
+) -> impl IntoResponse {
+ let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
+ Ok(ids) => ids,
+ Err(e) => return e.into_response(),
+ };
+
+ let pool = state.db_pool.as_ref().unwrap();
+
+ // Verify task belongs to same owner
+ let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ ).into_response();
+ }
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to get task");
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", "Failed to get task")),
+ ).into_response();
+ }
+ };
+
+ // Check if already done
+ if task.status == "done" || task.status == "failed" || task.status == "merged" {
+ return (
+ StatusCode::OK,
+ Json(WaitResponse {
+ task_id,
+ status: task.status,
+ completed: true,
+ output_summary: None,
+ }),
+ ).into_response();
+ }
+
+ // Subscribe to task completions
+ let mut rx = state.task_completions.subscribe();
+ let timeout = tokio::time::Duration::from_secs(request.timeout_seconds as u64);
+
+ // Wait for completion or timeout
+ let result = tokio::time::timeout(timeout, async {
+ loop {
+ match rx.recv().await {
+ Ok(notification) => {
+ if notification.task_id == task_id {
+ return Some(notification);
+ }
+ }
+ Err(_) => {
+ // Channel closed or lagged - check DB directly
+ if let Ok(Some(t)) = repository::get_task(pool, task_id).await {
+ if t.status == "done" || t.status == "failed" || t.status == "merged" {
+ return Some(crate::server::state::TaskCompletionNotification {
+ task_id: t.id,
+ owner_id: Some(t.owner_id),
+ contract_id: t.contract_id,
+ parent_task_id: t.parent_task_id,
+ status: t.status,
+ output_summary: None,
+ worktree_path: None,
+ error_message: t.error_message,
+ });
+ }
+ }
+ tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
+ }
+ }
+ }
+ }).await;
+
+ match result {
+ Ok(Some(notification)) => {
+ (
+ StatusCode::OK,
+ Json(WaitResponse {
+ task_id,
+ status: notification.status,
+ completed: true,
+ output_summary: notification.output_summary,
+ }),
+ ).into_response()
+ }
+ Ok(None) | Err(_) => {
+ // Timeout - check final status
+ let final_status = repository::get_task(pool, task_id)
+ .await
+ .ok()
+ .flatten()
+ .map(|t| t.status)
+ .unwrap_or_else(|| "unknown".to_string());
+
+ (
+ StatusCode::OK,
+ Json(WaitResponse {
+ task_id,
+ status: final_status.clone(),
+ completed: final_status == "done" || final_status == "failed" || final_status == "merged",
+ output_summary: None,
+ }),
+ ).into_response()
+ }
+ }
+}
+
+// =============================================================================
+// Read Worktree File Handler
+// =============================================================================
+
+/// Read a file from a task's worktree.
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/supervisor/tasks/{task_id}/read-file",
+ params(
+ ("task_id" = Uuid, Path, description = "Task ID")
+ ),
+ request_body = ReadWorktreeFileRequest,
+ responses(
+ (status = 200, description = "File content", body = ReadFileResponse),
+ (status = 401, description = "Unauthorized"),
+ (status = 403, description = "Forbidden - not a supervisor"),
+ (status = 404, description = "Task not found"),
+ (status = 500, description = "Internal server error"),
+ ),
+ tag = "Mesh Supervisor"
+)]
+pub async fn read_worktree_file(
+ State(state): State<SharedState>,
+ Path(task_id): Path<Uuid>,
+ headers: HeaderMap,
+ Json(request): Json<ReadWorktreeFileRequest>,
+) -> impl IntoResponse {
+ let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
+ Ok(ids) => ids,
+ Err(e) => return e.into_response(),
+ };
+
+ let pool = state.db_pool.as_ref().unwrap();
+
+ // Get task to verify ownership
+ let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ ).into_response();
+ }
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to get task");
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", "Failed to get task")),
+ ).into_response();
+ }
+ };
+
+ // TODO: Implement file reading via worktree path
+ // For now, return not implemented - supervisor should use local file access via worktree
+ let _ = (task, request);
+
+ (
+ StatusCode::NOT_IMPLEMENTED,
+ Json(ApiError::new(
+ "NOT_IMPLEMENTED",
+ "Worktree file reading via API not yet implemented. Use local filesystem access via worktree path.",
+ )),
+ ).into_response()
+}
+
+// =============================================================================
+// Checkpoint Handlers
+// =============================================================================
+
+/// Create a git checkpoint for a task.
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks/{task_id}/checkpoint",
+ params(
+ ("task_id" = Uuid, Path, description = "Task ID")
+ ),
+ request_body = CreateCheckpointRequest,
+ responses(
+ (status = 201, description = "Checkpoint created", body = CheckpointResponse),
+ (status = 401, description = "Unauthorized"),
+ (status = 404, description = "Task not found"),
+ (status = 500, description = "Internal server error"),
+ ),
+ tag = "Mesh Supervisor"
+)]
+pub async fn create_checkpoint(
+ State(state): State<SharedState>,
+ Path(task_id): Path<Uuid>,
+ headers: HeaderMap,
+ Json(request): Json<CreateCheckpointRequest>,
+) -> impl IntoResponse {
+ let auth = extract_auth(&state, &headers);
+
+ let task_id_from_auth = match auth {
+ AuthSource::ToolKey(tid) => tid,
+ _ => {
+ return (
+ StatusCode::UNAUTHORIZED,
+ Json(ApiError::new("UNAUTHORIZED", "Tool key required")),
+ ).into_response();
+ }
+ };
+
+ // Can only create checkpoint for own task
+ if task_id_from_auth != task_id {
+ return (
+ StatusCode::FORBIDDEN,
+ Json(ApiError::new("FORBIDDEN", "Can only create checkpoint for own task")),
+ ).into_response();
+ }
+
+ let pool = state.db_pool.as_ref().unwrap();
+
+ // Get task
+ let task = match repository::get_task(pool, task_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ ).into_response();
+ }
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to get task");
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", "Failed to get task")),
+ ).into_response();
+ }
+ };
+
+ // TODO: Implement checkpoint creation via daemon command
+ // For now, checkpoints should be created by the task itself via git commands
+ let _ = (task, request);
+
+ (
+ StatusCode::NOT_IMPLEMENTED,
+ Json(ApiError::new(
+ "NOT_IMPLEMENTED",
+ "Checkpoint creation via API not yet implemented. Use git commands directly in the task.",
+ )),
+ ).into_response()
+}
+
+/// List checkpoints for a task.
+#[utoipa::path(
+ get,
+ path = "/api/v1/mesh/tasks/{task_id}/checkpoints",
+ params(
+ ("task_id" = Uuid, Path, description = "Task ID")
+ ),
+ responses(
+ (status = 200, description = "List of checkpoints", body = CheckpointListResponse),
+ (status = 401, description = "Unauthorized"),
+ (status = 404, description = "Task not found"),
+ (status = 500, description = "Internal server error"),
+ ),
+ tag = "Mesh Supervisor"
+)]
+pub async fn list_checkpoints(
+ State(state): State<SharedState>,
+ Path(task_id): Path<Uuid>,
+ headers: HeaderMap,
+) -> impl IntoResponse {
+ let auth = extract_auth(&state, &headers);
+
+ let _task_id_from_auth = match auth {
+ AuthSource::ToolKey(tid) => tid,
+ _ => {
+ return (
+ StatusCode::UNAUTHORIZED,
+ Json(ApiError::new("UNAUTHORIZED", "Tool key required")),
+ ).into_response();
+ }
+ };
+
+ let pool = state.db_pool.as_ref().unwrap();
+
+ // Get checkpoints from DB
+ match repository::list_task_checkpoints(pool, task_id).await {
+ Ok(checkpoints) => {
+ let checkpoint_list: Vec<TaskCheckpoint> = checkpoints
+ .into_iter()
+ .map(|c| TaskCheckpoint {
+ id: c.id,
+ task_id: c.task_id,
+ checkpoint_number: c.checkpoint_number,
+ commit_sha: c.commit_sha,
+ branch_name: c.branch_name,
+ message: c.message,
+ files_changed: c.files_changed,
+ lines_added: c.lines_added.unwrap_or(0),
+ lines_removed: c.lines_removed.unwrap_or(0),
+ created_at: c.created_at,
+ })
+ .collect();
+
+ (
+ StatusCode::OK,
+ Json(CheckpointListResponse {
+ task_id,
+ checkpoints: checkpoint_list,
+ }),
+ ).into_response()
+ }
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to list checkpoints");
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", "Failed to list checkpoints")),
+ ).into_response()
+ }
+ }
+}
+
+// =============================================================================
+// Git Operations - Request/Response Types
+// =============================================================================
+
+/// Request to create a new branch.
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct CreateBranchRequest {
+ pub branch_name: String,
+ pub from_ref: Option<String>,
+}
+
+/// Response for branch creation.
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct CreateBranchResponse {
+ pub success: bool,
+ pub branch_name: String,
+ pub message: String,
+}
+
+/// Request to merge task changes.
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct MergeTaskRequest {
+ pub target_branch: Option<String>,
+ #[serde(default)]
+ pub squash: bool,
+}
+
+/// Response for merge operation.
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct MergeTaskResponse {
+ pub task_id: Uuid,
+ pub success: bool,
+ pub message: String,
+ pub commit_sha: Option<String>,
+ pub conflicts: Option<Vec<String>>,
+}
+
+/// Request to create a pull request.
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct CreatePRRequest {
+ pub task_id: Uuid,
+ pub title: String,
+ pub body: Option<String>,
+ #[serde(default = "default_base_branch")]
+ pub base_branch: String,
+}
+
+fn default_base_branch() -> String {
+ "main".to_string()
+}
+
+/// Response for PR creation.
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct CreatePRResponse {
+ pub task_id: Uuid,
+ pub success: bool,
+ pub message: String,
+ pub pr_url: Option<String>,
+ pub pr_number: Option<i32>,
+}
+
+/// Response for task diff.
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct TaskDiffResponse {
+ pub task_id: Uuid,
+ pub success: bool,
+ pub diff: Option<String>,
+ pub error: Option<String>,
+}
+
+// =============================================================================
+// Git Operations - Handlers
+// =============================================================================
+
+/// Create a new branch from supervisor's worktree.
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/supervisor/branches",
+ request_body = CreateBranchRequest,
+ responses(
+ (status = 201, description = "Branch created", body = CreateBranchResponse),
+ (status = 400, description = "Invalid request"),
+ (status = 401, description = "Unauthorized"),
+ (status = 403, description = "Forbidden - not a supervisor"),
+ (status = 500, description = "Internal server error"),
+ ),
+ tag = "Mesh Supervisor"
+)]
+pub async fn create_branch(
+ State(state): State<SharedState>,
+ headers: HeaderMap,
+ Json(request): Json<CreateBranchRequest>,
+) -> impl IntoResponse {
+ let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
+ Ok(ids) => ids,
+ Err(e) => return e.into_response(),
+ };
+
+ // Find daemon running supervisor
+ let daemon_id = {
+ let pool = state.db_pool.as_ref().unwrap();
+ match repository::get_task(pool, supervisor_id).await {
+ Ok(Some(task)) => task.daemon_id,
+ _ => None,
+ }
+ };
+
+ let Some(daemon_id) = daemon_id else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("NO_DAEMON", "Supervisor has no assigned daemon")),
+ ).into_response();
+ };
+
+ // Send CreateBranch command to daemon
+ let cmd = DaemonCommand::CreateBranch {
+ task_id: supervisor_id,
+ branch_name: request.branch_name.clone(),
+ from_ref: request.from_ref,
+ };
+
+ if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
+ tracing::error!(error = %e, "Failed to send CreateBranch command");
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")),
+ ).into_response();
+ }
+
+ // Note: Real implementation would wait for daemon response
+ // For now, return success immediately - daemon will send response via WebSocket
+ (
+ StatusCode::CREATED,
+ Json(CreateBranchResponse {
+ success: true,
+ branch_name: request.branch_name,
+ message: "Branch creation command sent".to_string(),
+ }),
+ ).into_response()
+}
+
+/// Merge a task's changes to a target branch.
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/supervisor/tasks/{task_id}/merge",
+ params(
+ ("task_id" = Uuid, Path, description = "Task ID to merge")
+ ),
+ request_body = MergeTaskRequest,
+ responses(
+ (status = 200, description = "Merge initiated", body = MergeTaskResponse),
+ (status = 400, description = "Invalid request"),
+ (status = 401, description = "Unauthorized"),
+ (status = 403, description = "Forbidden - not a supervisor"),
+ (status = 404, description = "Task not found"),
+ (status = 500, description = "Internal server error"),
+ ),
+ tag = "Mesh Supervisor"
+)]
+pub async fn merge_task(
+ State(state): State<SharedState>,
+ Path(task_id): Path<Uuid>,
+ headers: HeaderMap,
+ Json(request): Json<MergeTaskRequest>,
+) -> impl IntoResponse {
+ let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
+ Ok(ids) => ids,
+ Err(e) => return e.into_response(),
+ };
+
+ let pool = state.db_pool.as_ref().unwrap();
+
+ // Get the target task
+ let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ ).into_response();
+ }
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to get task");
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", "Failed to get task")),
+ ).into_response();
+ }
+ };
+
+ // Get daemon running the task
+ let Some(daemon_id) = task.daemon_id else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")),
+ ).into_response();
+ };
+
+ // Send MergeTaskToTarget command to daemon
+ let cmd = DaemonCommand::MergeTaskToTarget {
+ task_id,
+ target_branch: request.target_branch,
+ squash: request.squash,
+ };
+
+ if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
+ tracing::error!(error = %e, "Failed to send MergeTaskToTarget command");
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")),
+ ).into_response();
+ }
+
+ (
+ StatusCode::OK,
+ Json(MergeTaskResponse {
+ task_id,
+ success: true,
+ message: "Merge command sent".to_string(),
+ commit_sha: None,
+ conflicts: None,
+ }),
+ ).into_response()
+}
+
+/// Create a pull request for a task's changes.
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/supervisor/pr",
+ request_body = CreatePRRequest,
+ responses(
+ (status = 201, description = "PR created", body = CreatePRResponse),
+ (status = 400, description = "Invalid request"),
+ (status = 401, description = "Unauthorized"),
+ (status = 403, description = "Forbidden - not a supervisor"),
+ (status = 404, description = "Task not found"),
+ (status = 500, description = "Internal server error"),
+ ),
+ tag = "Mesh Supervisor"
+)]
+pub async fn create_pr(
+ State(state): State<SharedState>,
+ headers: HeaderMap,
+ Json(request): Json<CreatePRRequest>,
+) -> impl IntoResponse {
+ let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
+ Ok(ids) => ids,
+ Err(e) => return e.into_response(),
+ };
+
+ let pool = state.db_pool.as_ref().unwrap();
+
+ // Get the target task
+ let task = match repository::get_task_for_owner(pool, request.task_id, owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ ).into_response();
+ }
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to get task");
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", "Failed to get task")),
+ ).into_response();
+ }
+ };
+
+ // Get daemon running the task
+ let Some(daemon_id) = task.daemon_id else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")),
+ ).into_response();
+ };
+
+ // Send CreatePR command to daemon
+ let cmd = DaemonCommand::CreatePR {
+ task_id: request.task_id,
+ title: request.title.clone(),
+ body: request.body.clone(),
+ base_branch: request.base_branch.clone(),
+ };
+
+ if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
+ tracing::error!(error = %e, "Failed to send CreatePR command");
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")),
+ ).into_response();
+ }
+
+ (
+ StatusCode::CREATED,
+ Json(CreatePRResponse {
+ task_id: request.task_id,
+ success: true,
+ message: "PR creation command sent".to_string(),
+ pr_url: None,
+ pr_number: None,
+ }),
+ ).into_response()
+}
+
+/// Get the diff for a task's changes.
+#[utoipa::path(
+ get,
+ path = "/api/v1/mesh/supervisor/tasks/{task_id}/diff",
+ params(
+ ("task_id" = Uuid, Path, description = "Task ID")
+ ),
+ responses(
+ (status = 200, description = "Task diff", body = TaskDiffResponse),
+ (status = 401, description = "Unauthorized"),
+ (status = 403, description = "Forbidden - not a supervisor"),
+ (status = 404, description = "Task not found"),
+ (status = 500, description = "Internal server error"),
+ ),
+ tag = "Mesh Supervisor"
+)]
+pub async fn get_task_diff(
+ State(state): State<SharedState>,
+ Path(task_id): Path<Uuid>,
+ headers: HeaderMap,
+) -> impl IntoResponse {
+ let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
+ Ok(ids) => ids,
+ Err(e) => return e.into_response(),
+ };
+
+ let pool = state.db_pool.as_ref().unwrap();
+
+ // Get the target task
+ let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ ).into_response();
+ }
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to get task");
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", "Failed to get task")),
+ ).into_response();
+ }
+ };
+
+ // Get daemon running the task
+ let Some(daemon_id) = task.daemon_id else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")),
+ ).into_response();
+ };
+
+ // Send GetTaskDiff command to daemon
+ let cmd = DaemonCommand::GetTaskDiff { task_id };
+
+ if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
+ tracing::error!(error = %e, "Failed to send GetTaskDiff command");
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")),
+ ).into_response();
+ }
+
+ (
+ StatusCode::OK,
+ Json(TaskDiffResponse {
+ task_id,
+ success: true,
+ diff: None,
+ error: Some("Diff command sent - response will be streamed".to_string()),
+ }),
+ ).into_response()
+}