diff options
| author | soryu <soryu@soryu.co> | 2026-01-11 05:52:14 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-15 00:21:16 +0000 |
| commit | 87044a747b47bd83249d61a45842c7f7b2eae56d (patch) | |
| tree | ef2000ce79ffcc2723ef841acef5aa1deb1d5378 /makima/src/server/handlers/mesh_supervisor.rs | |
| parent | 077820c4167c168072d217a1b01df840463a12a8 (diff) | |
| download | soryu-87044a747b47bd83249d61a45842c7f7b2eae56d.tar.gz soryu-87044a747b47bd83249d61a45842c7f7b2eae56d.zip | |
Contract system
Diffstat (limited to 'makima/src/server/handlers/mesh_supervisor.rs')
| -rw-r--r-- | makima/src/server/handlers/mesh_supervisor.rs | 1153 |
1 files changed, 1153 insertions, 0 deletions
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs new file mode 100644 index 0000000..ac59130 --- /dev/null +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -0,0 +1,1153 @@ +//! HTTP handlers for supervisor-specific mesh operations. +//! +//! These endpoints are used by supervisor tasks (via supervisor.sh) to orchestrate +//! contract work: spawning tasks, waiting for completion, reading worktree files, etc. + +use axum::{ + extract::{Path, State}, + http::{HeaderMap, StatusCode}, + response::IntoResponse, + Json, +}; +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; +use uuid::Uuid; + +use crate::db::models::{CreateTaskRequest, Task, TaskSummary}; +use crate::db::repository; +use crate::server::handlers::mesh::{extract_auth, AuthSource}; +use crate::server::messages::ApiError; +use crate::server::state::{DaemonCommand, SharedState}; + +// ============================================================================= +// Request/Response Types +// ============================================================================= + +/// Request to spawn a new task from supervisor. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SpawnTaskRequest { + pub name: String, + pub plan: String, + pub contract_id: Uuid, + pub parent_task_id: Option<Uuid>, + pub checkpoint_sha: Option<String>, + /// Repository URL for the task (supervisor should provide this) + pub repository_url: Option<String>, +} + +/// Request to wait for task completion. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct WaitForTaskRequest { + #[serde(default = "default_timeout")] + pub timeout_seconds: i32, +} + +fn default_timeout() -> i32 { + 300 +} + +/// Request to read a file from task worktree. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ReadWorktreeFileRequest { + pub file_path: String, +} + +/// Request to create a checkpoint. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreateCheckpointRequest { + pub message: String, +} + +/// Response for task tree. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct TaskTreeResponse { + pub tasks: Vec<TaskSummary>, + pub supervisor_task_id: Option<Uuid>, + pub total_count: usize, +} + +/// Response for wait operation. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct WaitResponse { + pub task_id: Uuid, + pub status: String, + pub completed: bool, + pub output_summary: Option<String>, +} + +/// Response for read file operation. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ReadFileResponse { + pub task_id: Uuid, + pub file_path: String, + pub content: String, + pub exists: bool, +} + +/// Response for checkpoint operations. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CheckpointResponse { + pub task_id: Uuid, + pub checkpoint_number: i32, + pub commit_sha: String, + pub message: String, +} + +/// Task checkpoint info. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct TaskCheckpoint { + pub id: Uuid, + pub task_id: Uuid, + pub checkpoint_number: i32, + pub commit_sha: String, + pub branch_name: String, + pub message: String, + pub files_changed: Option<serde_json::Value>, + pub lines_added: i32, + pub lines_removed: i32, + pub created_at: chrono::DateTime<chrono::Utc>, +} + +/// Response for list checkpoints. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CheckpointListResponse { + pub task_id: Uuid, + pub checkpoints: Vec<TaskCheckpoint>, +} + +// ============================================================================= +// Helper Functions +// ============================================================================= + +/// Verify the request comes from a supervisor task and extract ownership info. +async fn verify_supervisor_auth( + state: &SharedState, + headers: &HeaderMap, + contract_id: Option<Uuid>, +) -> Result<(Uuid, Uuid), (StatusCode, Json<ApiError>)> { + let auth = extract_auth(state, headers); + + let task_id = match auth { + AuthSource::ToolKey(task_id) => task_id, + _ => { + return Err(( + StatusCode::UNAUTHORIZED, + Json(ApiError::new("UNAUTHORIZED", "Supervisor endpoints require tool key auth")), + )); + } + }; + + // Get the task to verify it's a supervisor and get owner_id + let pool = state.db_pool.as_ref().ok_or_else(|| { + ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + })?; + + let task = repository::get_task(pool, task_id) + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to get supervisor task"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to verify supervisor")), + ) + })? + .ok_or_else(|| { + ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + })?; + + // Verify task is a supervisor + if !task.is_supervisor { + return Err(( + StatusCode::FORBIDDEN, + Json(ApiError::new("NOT_SUPERVISOR", "Only supervisor tasks can use these endpoints")), + )); + } + + // If contract_id provided, verify the supervisor belongs to that contract + if let Some(cid) = contract_id { + if task.contract_id != Some(cid) { + return Err(( + StatusCode::FORBIDDEN, + Json(ApiError::new("CONTRACT_MISMATCH", "Supervisor does not belong to this contract")), + )); + } + } + + Ok((task_id, task.owner_id)) +} + +// ============================================================================= +// Contract Task Handlers +// ============================================================================= + +/// List all tasks in a contract's tree. +#[utoipa::path( + get, + path = "/api/v1/mesh/supervisor/contracts/{contract_id}/tasks", + params( + ("contract_id" = Uuid, Path, description = "Contract ID") + ), + responses( + (status = 200, description = "List of tasks in contract", body = TaskTreeResponse), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - not a supervisor"), + (status = 500, description = "Internal server error"), + ), + tag = "Mesh Supervisor" +)] +pub async fn list_contract_tasks( + State(state): State<SharedState>, + Path(contract_id): Path<Uuid>, + headers: HeaderMap, +) -> impl IntoResponse { + let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, Some(contract_id)).await { + Ok(ids) => ids, + Err(e) => return e.into_response(), + }; + + let pool = state.db_pool.as_ref().unwrap(); + + // Get all tasks for this contract + match repository::list_tasks_by_contract(pool, contract_id, owner_id).await { + Ok(tasks) => { + let supervisor_task_id = tasks.iter().find(|t| t.is_supervisor).map(|t| t.id); + let summaries: Vec<TaskSummary> = tasks.into_iter().map(TaskSummary::from).collect(); + let total_count = summaries.len(); + + ( + StatusCode::OK, + Json(TaskTreeResponse { + tasks: summaries, + supervisor_task_id, + total_count, + }), + ).into_response() + } + Err(e) => { + tracing::error!(error = %e, "Failed to list contract tasks"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to list tasks")), + ).into_response() + } + } +} + +/// Get full task tree structure for a contract. +#[utoipa::path( + get, + path = "/api/v1/mesh/supervisor/contracts/{contract_id}/tree", + params( + ("contract_id" = Uuid, Path, description = "Contract ID") + ), + responses( + (status = 200, description = "Task tree structure", body = TaskTreeResponse), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - not a supervisor"), + (status = 500, description = "Internal server error"), + ), + tag = "Mesh Supervisor" +)] +pub async fn get_contract_tree( + State(state): State<SharedState>, + Path(contract_id): Path<Uuid>, + headers: HeaderMap, +) -> impl IntoResponse { + // Same as list_contract_tasks for now - can add tree structure later + list_contract_tasks(State(state), Path(contract_id), headers).await +} + +// ============================================================================= +// Task Spawn Handler +// ============================================================================= + +/// Spawn a new task (supervisor only). +#[utoipa::path( + post, + path = "/api/v1/mesh/supervisor/tasks", + request_body = SpawnTaskRequest, + responses( + (status = 201, description = "Task created", body = Task), + (status = 400, description = "Invalid request"), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - not a supervisor"), + (status = 500, description = "Internal server error"), + ), + tag = "Mesh Supervisor" +)] +pub async fn spawn_task( + State(state): State<SharedState>, + headers: HeaderMap, + Json(request): Json<SpawnTaskRequest>, +) -> impl IntoResponse { + let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, Some(request.contract_id)).await { + Ok(ids) => ids, + Err(e) => return e.into_response(), + }; + + let pool = state.db_pool.as_ref().unwrap(); + + // Verify contract exists + let _contract = match repository::get_contract_for_owner(pool, request.contract_id, owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ).into_response(); + } + Err(e) => { + tracing::error!(error = %e, "Failed to get contract"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to get contract")), + ).into_response(); + } + }; + + // Get repository URL from the contract's primary repository + let repo_url = match repository::list_contract_repositories(pool, request.contract_id).await { + Ok(repos) => { + // Prefer primary repo, fallback to first repo + repos.iter() + .find(|r| r.is_primary) + .or(repos.first()) + .and_then(|r| r.repository_url.clone()) + } + Err(e) => { + tracing::warn!(error = %e, "Failed to get contract repositories, continuing without repo URL"); + None + } + }; + + // Supervisor can override with explicit repository_url + let repo_url = request.repository_url.clone().or(repo_url); + + // Create task request + let create_req = CreateTaskRequest { + name: request.name.clone(), + description: None, + plan: request.plan.clone(), + repository_url: repo_url.clone(), + contract_id: request.contract_id, + parent_task_id: request.parent_task_id, + is_supervisor: false, + checkpoint_sha: request.checkpoint_sha.clone(), + merge_mode: Some("manual".to_string()), + priority: 0, + base_branch: None, + target_branch: None, + target_repo_path: None, + completion_action: None, + continue_from_task_id: None, + copy_files: None, + }; + + // Create task in DB + let task = match repository::create_task_for_owner(pool, owner_id, create_req).await { + Ok(t) => t, + Err(e) => { + tracing::error!(error = %e, "Failed to create task"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to create task")), + ).into_response(); + } + }; + + tracing::info!( + supervisor_id = %supervisor_id, + task_id = %task.id, + task_name = %task.name, + "Supervisor spawned new task" + ); + + // Start task on a daemon + // Find a daemon that belongs to this owner + for entry in state.daemon_connections.iter() { + let daemon = entry.value(); + if daemon.owner_id == owner_id { + // Send spawn command to first available daemon + let cmd = DaemonCommand::SpawnTask { + task_id: task.id, + task_name: task.name.clone(), + plan: task.plan.clone(), + repo_url: repo_url.clone(), + base_branch: task.base_branch.clone(), + target_branch: task.target_branch.clone(), + parent_task_id: task.parent_task_id, + depth: task.depth, + is_orchestrator: false, + target_repo_path: task.target_repo_path.clone(), + completion_action: task.completion_action.clone(), + continue_from_task_id: task.continue_from_task_id, + copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), + contract_id: task.contract_id, + is_supervisor: false, + }; + + if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { + tracing::warn!(error = %e, daemon_id = %daemon.id, "Failed to send spawn command"); + } else { + tracing::info!(task_id = %task.id, daemon_id = %daemon.id, repo_url = ?repo_url, "Task spawn command sent"); + } + break; + } + } + + (StatusCode::CREATED, Json(task)).into_response() +} + +// ============================================================================= +// Wait for Task Handler +// ============================================================================= + +/// Wait for a task to complete. +#[utoipa::path( + post, + path = "/api/v1/mesh/supervisor/tasks/{task_id}/wait", + params( + ("task_id" = Uuid, Path, description = "Task ID to wait for") + ), + request_body = WaitForTaskRequest, + responses( + (status = 200, description = "Task completed or timed out", body = WaitResponse), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - not a supervisor"), + (status = 404, description = "Task not found"), + (status = 500, description = "Internal server error"), + ), + tag = "Mesh Supervisor" +)] +pub async fn wait_for_task( + State(state): State<SharedState>, + Path(task_id): Path<Uuid>, + headers: HeaderMap, + Json(request): Json<WaitForTaskRequest>, +) -> impl IntoResponse { + let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { + Ok(ids) => ids, + Err(e) => return e.into_response(), + }; + + let pool = state.db_pool.as_ref().unwrap(); + + // Verify task belongs to same owner + let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ).into_response(); + } + Err(e) => { + tracing::error!(error = %e, "Failed to get task"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to get task")), + ).into_response(); + } + }; + + // Check if already done + if task.status == "done" || task.status == "failed" || task.status == "merged" { + return ( + StatusCode::OK, + Json(WaitResponse { + task_id, + status: task.status, + completed: true, + output_summary: None, + }), + ).into_response(); + } + + // Subscribe to task completions + let mut rx = state.task_completions.subscribe(); + let timeout = tokio::time::Duration::from_secs(request.timeout_seconds as u64); + + // Wait for completion or timeout + let result = tokio::time::timeout(timeout, async { + loop { + match rx.recv().await { + Ok(notification) => { + if notification.task_id == task_id { + return Some(notification); + } + } + Err(_) => { + // Channel closed or lagged - check DB directly + if let Ok(Some(t)) = repository::get_task(pool, task_id).await { + if t.status == "done" || t.status == "failed" || t.status == "merged" { + return Some(crate::server::state::TaskCompletionNotification { + task_id: t.id, + owner_id: Some(t.owner_id), + contract_id: t.contract_id, + parent_task_id: t.parent_task_id, + status: t.status, + output_summary: None, + worktree_path: None, + error_message: t.error_message, + }); + } + } + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + } + } + }).await; + + match result { + Ok(Some(notification)) => { + ( + StatusCode::OK, + Json(WaitResponse { + task_id, + status: notification.status, + completed: true, + output_summary: notification.output_summary, + }), + ).into_response() + } + Ok(None) | Err(_) => { + // Timeout - check final status + let final_status = repository::get_task(pool, task_id) + .await + .ok() + .flatten() + .map(|t| t.status) + .unwrap_or_else(|| "unknown".to_string()); + + ( + StatusCode::OK, + Json(WaitResponse { + task_id, + status: final_status.clone(), + completed: final_status == "done" || final_status == "failed" || final_status == "merged", + output_summary: None, + }), + ).into_response() + } + } +} + +// ============================================================================= +// Read Worktree File Handler +// ============================================================================= + +/// Read a file from a task's worktree. +#[utoipa::path( + post, + path = "/api/v1/mesh/supervisor/tasks/{task_id}/read-file", + params( + ("task_id" = Uuid, Path, description = "Task ID") + ), + request_body = ReadWorktreeFileRequest, + responses( + (status = 200, description = "File content", body = ReadFileResponse), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - not a supervisor"), + (status = 404, description = "Task not found"), + (status = 500, description = "Internal server error"), + ), + tag = "Mesh Supervisor" +)] +pub async fn read_worktree_file( + State(state): State<SharedState>, + Path(task_id): Path<Uuid>, + headers: HeaderMap, + Json(request): Json<ReadWorktreeFileRequest>, +) -> impl IntoResponse { + let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { + Ok(ids) => ids, + Err(e) => return e.into_response(), + }; + + let pool = state.db_pool.as_ref().unwrap(); + + // Get task to verify ownership + let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ).into_response(); + } + Err(e) => { + tracing::error!(error = %e, "Failed to get task"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to get task")), + ).into_response(); + } + }; + + // TODO: Implement file reading via worktree path + // For now, return not implemented - supervisor should use local file access via worktree + let _ = (task, request); + + ( + StatusCode::NOT_IMPLEMENTED, + Json(ApiError::new( + "NOT_IMPLEMENTED", + "Worktree file reading via API not yet implemented. Use local filesystem access via worktree path.", + )), + ).into_response() +} + +// ============================================================================= +// Checkpoint Handlers +// ============================================================================= + +/// Create a git checkpoint for a task. +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{task_id}/checkpoint", + params( + ("task_id" = Uuid, Path, description = "Task ID") + ), + request_body = CreateCheckpointRequest, + responses( + (status = 201, description = "Checkpoint created", body = CheckpointResponse), + (status = 401, description = "Unauthorized"), + (status = 404, description = "Task not found"), + (status = 500, description = "Internal server error"), + ), + tag = "Mesh Supervisor" +)] +pub async fn create_checkpoint( + State(state): State<SharedState>, + Path(task_id): Path<Uuid>, + headers: HeaderMap, + Json(request): Json<CreateCheckpointRequest>, +) -> impl IntoResponse { + let auth = extract_auth(&state, &headers); + + let task_id_from_auth = match auth { + AuthSource::ToolKey(tid) => tid, + _ => { + return ( + StatusCode::UNAUTHORIZED, + Json(ApiError::new("UNAUTHORIZED", "Tool key required")), + ).into_response(); + } + }; + + // Can only create checkpoint for own task + if task_id_from_auth != task_id { + return ( + StatusCode::FORBIDDEN, + Json(ApiError::new("FORBIDDEN", "Can only create checkpoint for own task")), + ).into_response(); + } + + let pool = state.db_pool.as_ref().unwrap(); + + // Get task + let task = match repository::get_task(pool, task_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ).into_response(); + } + Err(e) => { + tracing::error!(error = %e, "Failed to get task"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to get task")), + ).into_response(); + } + }; + + // TODO: Implement checkpoint creation via daemon command + // For now, checkpoints should be created by the task itself via git commands + let _ = (task, request); + + ( + StatusCode::NOT_IMPLEMENTED, + Json(ApiError::new( + "NOT_IMPLEMENTED", + "Checkpoint creation via API not yet implemented. Use git commands directly in the task.", + )), + ).into_response() +} + +/// List checkpoints for a task. +#[utoipa::path( + get, + path = "/api/v1/mesh/tasks/{task_id}/checkpoints", + params( + ("task_id" = Uuid, Path, description = "Task ID") + ), + responses( + (status = 200, description = "List of checkpoints", body = CheckpointListResponse), + (status = 401, description = "Unauthorized"), + (status = 404, description = "Task not found"), + (status = 500, description = "Internal server error"), + ), + tag = "Mesh Supervisor" +)] +pub async fn list_checkpoints( + State(state): State<SharedState>, + Path(task_id): Path<Uuid>, + headers: HeaderMap, +) -> impl IntoResponse { + let auth = extract_auth(&state, &headers); + + let _task_id_from_auth = match auth { + AuthSource::ToolKey(tid) => tid, + _ => { + return ( + StatusCode::UNAUTHORIZED, + Json(ApiError::new("UNAUTHORIZED", "Tool key required")), + ).into_response(); + } + }; + + let pool = state.db_pool.as_ref().unwrap(); + + // Get checkpoints from DB + match repository::list_task_checkpoints(pool, task_id).await { + Ok(checkpoints) => { + let checkpoint_list: Vec<TaskCheckpoint> = checkpoints + .into_iter() + .map(|c| TaskCheckpoint { + id: c.id, + task_id: c.task_id, + checkpoint_number: c.checkpoint_number, + commit_sha: c.commit_sha, + branch_name: c.branch_name, + message: c.message, + files_changed: c.files_changed, + lines_added: c.lines_added.unwrap_or(0), + lines_removed: c.lines_removed.unwrap_or(0), + created_at: c.created_at, + }) + .collect(); + + ( + StatusCode::OK, + Json(CheckpointListResponse { + task_id, + checkpoints: checkpoint_list, + }), + ).into_response() + } + Err(e) => { + tracing::error!(error = %e, "Failed to list checkpoints"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to list checkpoints")), + ).into_response() + } + } +} + +// ============================================================================= +// Git Operations - Request/Response Types +// ============================================================================= + +/// Request to create a new branch. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreateBranchRequest { + pub branch_name: String, + pub from_ref: Option<String>, +} + +/// Response for branch creation. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreateBranchResponse { + pub success: bool, + pub branch_name: String, + pub message: String, +} + +/// Request to merge task changes. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct MergeTaskRequest { + pub target_branch: Option<String>, + #[serde(default)] + pub squash: bool, +} + +/// Response for merge operation. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct MergeTaskResponse { + pub task_id: Uuid, + pub success: bool, + pub message: String, + pub commit_sha: Option<String>, + pub conflicts: Option<Vec<String>>, +} + +/// Request to create a pull request. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreatePRRequest { + pub task_id: Uuid, + pub title: String, + pub body: Option<String>, + #[serde(default = "default_base_branch")] + pub base_branch: String, +} + +fn default_base_branch() -> String { + "main".to_string() +} + +/// Response for PR creation. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreatePRResponse { + pub task_id: Uuid, + pub success: bool, + pub message: String, + pub pr_url: Option<String>, + pub pr_number: Option<i32>, +} + +/// Response for task diff. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct TaskDiffResponse { + pub task_id: Uuid, + pub success: bool, + pub diff: Option<String>, + pub error: Option<String>, +} + +// ============================================================================= +// Git Operations - Handlers +// ============================================================================= + +/// Create a new branch from supervisor's worktree. +#[utoipa::path( + post, + path = "/api/v1/mesh/supervisor/branches", + request_body = CreateBranchRequest, + responses( + (status = 201, description = "Branch created", body = CreateBranchResponse), + (status = 400, description = "Invalid request"), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - not a supervisor"), + (status = 500, description = "Internal server error"), + ), + tag = "Mesh Supervisor" +)] +pub async fn create_branch( + State(state): State<SharedState>, + headers: HeaderMap, + Json(request): Json<CreateBranchRequest>, +) -> impl IntoResponse { + let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { + Ok(ids) => ids, + Err(e) => return e.into_response(), + }; + + // Find daemon running supervisor + let daemon_id = { + let pool = state.db_pool.as_ref().unwrap(); + match repository::get_task(pool, supervisor_id).await { + Ok(Some(task)) => task.daemon_id, + _ => None, + } + }; + + let Some(daemon_id) = daemon_id else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("NO_DAEMON", "Supervisor has no assigned daemon")), + ).into_response(); + }; + + // Send CreateBranch command to daemon + let cmd = DaemonCommand::CreateBranch { + task_id: supervisor_id, + branch_name: request.branch_name.clone(), + from_ref: request.from_ref, + }; + + if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { + tracing::error!(error = %e, "Failed to send CreateBranch command"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), + ).into_response(); + } + + // Note: Real implementation would wait for daemon response + // For now, return success immediately - daemon will send response via WebSocket + ( + StatusCode::CREATED, + Json(CreateBranchResponse { + success: true, + branch_name: request.branch_name, + message: "Branch creation command sent".to_string(), + }), + ).into_response() +} + +/// Merge a task's changes to a target branch. +#[utoipa::path( + post, + path = "/api/v1/mesh/supervisor/tasks/{task_id}/merge", + params( + ("task_id" = Uuid, Path, description = "Task ID to merge") + ), + request_body = MergeTaskRequest, + responses( + (status = 200, description = "Merge initiated", body = MergeTaskResponse), + (status = 400, description = "Invalid request"), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - not a supervisor"), + (status = 404, description = "Task not found"), + (status = 500, description = "Internal server error"), + ), + tag = "Mesh Supervisor" +)] +pub async fn merge_task( + State(state): State<SharedState>, + Path(task_id): Path<Uuid>, + headers: HeaderMap, + Json(request): Json<MergeTaskRequest>, +) -> impl IntoResponse { + let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { + Ok(ids) => ids, + Err(e) => return e.into_response(), + }; + + let pool = state.db_pool.as_ref().unwrap(); + + // Get the target task + let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ).into_response(); + } + Err(e) => { + tracing::error!(error = %e, "Failed to get task"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to get task")), + ).into_response(); + } + }; + + // Get daemon running the task + let Some(daemon_id) = task.daemon_id else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")), + ).into_response(); + }; + + // Send MergeTaskToTarget command to daemon + let cmd = DaemonCommand::MergeTaskToTarget { + task_id, + target_branch: request.target_branch, + squash: request.squash, + }; + + if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { + tracing::error!(error = %e, "Failed to send MergeTaskToTarget command"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), + ).into_response(); + } + + ( + StatusCode::OK, + Json(MergeTaskResponse { + task_id, + success: true, + message: "Merge command sent".to_string(), + commit_sha: None, + conflicts: None, + }), + ).into_response() +} + +/// Create a pull request for a task's changes. +#[utoipa::path( + post, + path = "/api/v1/mesh/supervisor/pr", + request_body = CreatePRRequest, + responses( + (status = 201, description = "PR created", body = CreatePRResponse), + (status = 400, description = "Invalid request"), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - not a supervisor"), + (status = 404, description = "Task not found"), + (status = 500, description = "Internal server error"), + ), + tag = "Mesh Supervisor" +)] +pub async fn create_pr( + State(state): State<SharedState>, + headers: HeaderMap, + Json(request): Json<CreatePRRequest>, +) -> impl IntoResponse { + let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { + Ok(ids) => ids, + Err(e) => return e.into_response(), + }; + + let pool = state.db_pool.as_ref().unwrap(); + + // Get the target task + let task = match repository::get_task_for_owner(pool, request.task_id, owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ).into_response(); + } + Err(e) => { + tracing::error!(error = %e, "Failed to get task"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to get task")), + ).into_response(); + } + }; + + // Get daemon running the task + let Some(daemon_id) = task.daemon_id else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")), + ).into_response(); + }; + + // Send CreatePR command to daemon + let cmd = DaemonCommand::CreatePR { + task_id: request.task_id, + title: request.title.clone(), + body: request.body.clone(), + base_branch: request.base_branch.clone(), + }; + + if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { + tracing::error!(error = %e, "Failed to send CreatePR command"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), + ).into_response(); + } + + ( + StatusCode::CREATED, + Json(CreatePRResponse { + task_id: request.task_id, + success: true, + message: "PR creation command sent".to_string(), + pr_url: None, + pr_number: None, + }), + ).into_response() +} + +/// Get the diff for a task's changes. +#[utoipa::path( + get, + path = "/api/v1/mesh/supervisor/tasks/{task_id}/diff", + params( + ("task_id" = Uuid, Path, description = "Task ID") + ), + responses( + (status = 200, description = "Task diff", body = TaskDiffResponse), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - not a supervisor"), + (status = 404, description = "Task not found"), + (status = 500, description = "Internal server error"), + ), + tag = "Mesh Supervisor" +)] +pub async fn get_task_diff( + State(state): State<SharedState>, + Path(task_id): Path<Uuid>, + headers: HeaderMap, +) -> impl IntoResponse { + let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { + Ok(ids) => ids, + Err(e) => return e.into_response(), + }; + + let pool = state.db_pool.as_ref().unwrap(); + + // Get the target task + let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ).into_response(); + } + Err(e) => { + tracing::error!(error = %e, "Failed to get task"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to get task")), + ).into_response(); + } + }; + + // Get daemon running the task + let Some(daemon_id) = task.daemon_id else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")), + ).into_response(); + }; + + // Send GetTaskDiff command to daemon + let cmd = DaemonCommand::GetTaskDiff { task_id }; + + if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { + tracing::error!(error = %e, "Failed to send GetTaskDiff command"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), + ).into_response(); + } + + ( + StatusCode::OK, + Json(TaskDiffResponse { + task_id, + success: true, + diff: None, + error: Some("Diff command sent - response will be streamed".to_string()), + }), + ).into_response() +} |
