diff options
Diffstat (limited to 'makima/src/server/handlers/mesh_supervisor.rs')
| -rw-r--r-- | makima/src/server/handlers/mesh_supervisor.rs | 2980 |
1 files changed, 173 insertions, 2807 deletions
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index ebde52b..4a9a00b 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -1,7 +1,13 @@ -//! HTTP handlers for supervisor-specific mesh operations. +//! Question + order backchannel for directive-spawned tasks. //! -//! These endpoints are used by supervisor tasks (via supervisor.sh) to orchestrate -//! contract work: spawning tasks, waiting for completion, reading worktree files, etc. +//! Originally a much larger handler that orchestrated contract-supervisor +//! task trees (spawn / wait / merge / PR / etc.). Legacy contracts and +//! supervisor tasks have been removed; what remains is the in-memory +//! question machinery (`makima directive ask`) and order creation +//! (`makima directive create-order`). +//! +//! Module name is kept as `mesh_supervisor` for route-path stability — +//! the CLI client still hits `/api/v1/mesh/supervisor/...` endpoints. use axum::{ extract::{Path, State}, @@ -9,238 +15,38 @@ use axum::{ response::IntoResponse, Json, }; -use base64::Engine; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use uuid::Uuid; -use crate::db::models::{CreateOrderRequest, CreateTaskRequest, PendingQuestion, Task, TaskSummary, UpdateTaskRequest}; +use crate::db::models::CreateOrderRequest; use crate::db::repository; -use sqlx::PgPool; use crate::server::auth::Authenticated; use crate::server::handlers::mesh::{extract_auth, AuthSource}; use crate::server::messages::ApiError; -use crate::server::state::{DaemonCommand, SharedState, TaskOutputNotification, TaskUpdateNotification}; - -// ============================================================================= -// Request/Response Types -// ============================================================================= - -/// Request to spawn a new task from supervisor. -#[derive(Debug, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct SpawnTaskRequest { - pub name: String, - pub plan: String, - pub contract_id: Uuid, - pub parent_task_id: Option<Uuid>, - pub checkpoint_sha: Option<String>, - /// Repository URL for the task (optional - if not provided, will be looked up from contract). - pub repository_url: Option<String>, -} - -/// Request to wait for task completion. -#[derive(Debug, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct WaitForTaskRequest { - #[serde(default = "default_timeout")] - pub timeout_seconds: i32, -} - -fn default_timeout() -> i32 { - 300 -} - -/// Request to read a file from task worktree. -#[derive(Debug, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ReadWorktreeFileRequest { - pub file_path: String, -} - -/// Request to ask a question and wait for user feedback. -#[derive(Debug, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct AskQuestionRequest { - /// The question to ask the user - pub question: String, - /// Optional choices (if empty, free-form text response) - #[serde(default)] - pub choices: Vec<String>, - /// Optional context about what this relates to - pub context: Option<String>, - /// How long to wait for a response (seconds) - #[serde(default = "default_question_timeout")] - pub timeout_seconds: i32, - /// When true, the request will block indefinitely until user responds (no timeout) - #[serde(default)] - pub phaseguard: bool, - /// When true, allow selecting multiple choices (response will be comma-separated) - #[serde(default)] - pub multi_select: bool, - /// When true, return immediately without waiting for response - #[serde(default)] - pub non_blocking: bool, - /// Question type: general, phase_confirmation, or contract_complete - #[serde(default = "default_question_type")] - pub question_type: String, -} - -fn default_question_type() -> String { - "general".to_string() -} - -fn default_question_timeout() -> i32 { - 3600 // 1 hour default -} - -/// Response from asking a question. -#[derive(Debug, Serialize, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct AskQuestionResponse { - /// The question ID for tracking - pub question_id: Uuid, - /// The user's response (None if timed out) - pub response: Option<String>, - /// Whether the question timed out - pub timed_out: bool, - /// Whether the question is still pending (server-side timeout reached but question not removed). - /// The client should poll the poll endpoint to continue waiting. - #[serde(default)] - pub still_pending: bool, -} - -/// Request to answer a supervisor question. -#[derive(Debug, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct AnswerQuestionRequest { - /// The user's response - pub response: String, -} - -/// Response to answering a question. -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct AnswerQuestionResponse { - /// Whether the answer was accepted - pub success: bool, -} - -/// Pending question summary. -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct PendingQuestionSummary { - pub question_id: Uuid, - pub task_id: Uuid, - pub contract_id: Uuid, - /// Directive this question relates to (if from a directive task) - #[serde(skip_serializing_if = "Option::is_none")] - pub directive_id: Option<Uuid>, - pub question: String, - pub choices: Vec<String>, - pub context: Option<String>, - pub created_at: chrono::DateTime<chrono::Utc>, - /// Whether multiple choices can be selected - #[serde(default)] - pub multi_select: bool, - /// Question type: general, phase_confirmation, or contract_complete - #[serde(default)] - pub question_type: String, -} - -/// Request to create a checkpoint. -#[derive(Debug, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct CreateCheckpointRequest { - pub message: String, -} - -/// Response for task tree. -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct TaskTreeResponse { - pub tasks: Vec<TaskSummary>, - pub supervisor_task_id: Option<Uuid>, - pub total_count: usize, -} - -/// Response for wait operation. -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct WaitResponse { - pub task_id: Uuid, - pub status: String, - pub completed: bool, - pub output_summary: Option<String>, -} - -/// Response for read file operation. -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ReadFileResponse { - pub task_id: Uuid, - pub file_path: String, - pub content: String, - pub exists: bool, -} - -/// Response for checkpoint operations. -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct CheckpointResponse { - pub task_id: Uuid, - pub checkpoint_number: i32, - pub commit_sha: String, - pub message: String, -} - -/// Task checkpoint info. -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct TaskCheckpoint { - pub id: Uuid, - pub task_id: Uuid, - pub checkpoint_number: i32, - pub commit_sha: String, - pub branch_name: String, - pub message: String, - pub files_changed: Option<serde_json::Value>, - pub lines_added: i32, - pub lines_removed: i32, - pub created_at: chrono::DateTime<chrono::Utc>, -} - -/// Response for list checkpoints. -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct CheckpointListResponse { - pub task_id: Uuid, - pub checkpoints: Vec<TaskCheckpoint>, -} +use crate::server::state::SharedState; // ============================================================================= -// Helper Functions +// Auth helper // ============================================================================= -/// Verify the request comes from a supervisor task and extract ownership info. -async fn verify_supervisor_auth( +/// Verify the request comes from a directive task (tool-key auth) and +/// return the calling task id + owner id. +async fn verify_task_auth( state: &SharedState, headers: &HeaderMap, - contract_id: Option<Uuid>, ) -> Result<(Uuid, Uuid), (StatusCode, Json<ApiError>)> { let auth = extract_auth(state, headers); - let task_id = match auth { AuthSource::ToolKey(task_id) => task_id, _ => { return Err(( StatusCode::UNAUTHORIZED, - Json(ApiError::new("UNAUTHORIZED", "Supervisor endpoints require tool key auth")), + Json(ApiError::new("UNAUTHORIZED", "These endpoints require tool key auth")), )); } }; - // Get the task to verify it's a supervisor and get owner_id let pool = state.db_pool.as_ref().ok_or_else(|| { ( StatusCode::SERVICE_UNAVAILABLE, @@ -251,10 +57,10 @@ async fn verify_supervisor_auth( let task = repository::get_task(pool, task_id) .await .map_err(|e| { - tracing::error!(error = %e, "Failed to get supervisor task"); + tracing::error!(error = %e, "Failed to load task"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to verify supervisor")), + Json(ApiError::new("DB_ERROR", "Failed to load task")), ) })? .ok_or_else(|| { @@ -264,1411 +70,113 @@ async fn verify_supervisor_auth( ) })?; - // Verify task is a supervisor or a directive task - if !task.is_supervisor && task.directive_id.is_none() { + // Only directive-attached tasks may use this backchannel. + if task.directive_id.is_none() { return Err(( StatusCode::FORBIDDEN, - Json(ApiError::new("NOT_SUPERVISOR", "Only supervisor or directive tasks can use these endpoints")), + Json(ApiError::new( + "NOT_DIRECTIVE_TASK", + "Only directive-attached tasks can use these endpoints", + )), )); } - // If contract_id provided, verify the supervisor belongs to that contract - if let Some(cid) = contract_id { - if task.contract_id != Some(cid) { - return Err(( - StatusCode::FORBIDDEN, - Json(ApiError::new("CONTRACT_MISMATCH", "Supervisor does not belong to this contract")), - )); - } - } - Ok((task_id, task.owner_id)) } -/// Try to start a pending task on an available daemon. -/// Returns Ok(Some(task)) if a task was started, Ok(None) if no tasks could be started. -/// For retried tasks, excludes daemons that previously failed the task and includes -/// checkpoint patch data for worktree recovery. -pub async fn try_start_pending_task( - state: &SharedState, - contract_id: Uuid, - owner_id: Uuid, -) -> Result<Option<Task>, String> { - let pool = state.db_pool.as_ref().ok_or("Database not configured")?; - - // Get pending tasks for this contract (includes interrupted tasks awaiting retry) - let pending_tasks = repository::get_pending_tasks_for_contract(pool, contract_id, owner_id) - .await - .map_err(|e| format!("Failed to get pending tasks: {}", e))?; - - if pending_tasks.is_empty() { - return Ok(None); - } - - // Get contract to check local_only flag - let contract = repository::get_contract_for_owner(pool, contract_id, owner_id) - .await - .map_err(|e| format!("Failed to get contract: {}", e))? - .ok_or_else(|| "Contract not found".to_string())?; - - // Try each pending task until we find one we can start - for task in &pending_tasks { - // Get excluded daemon IDs for this task (daemons that have already failed it) - let exclude_ids: Vec<Uuid> = task.failed_daemon_ids.clone().unwrap_or_default(); - - // Get available daemons excluding failed ones for this task - let daemons = repository::get_available_daemons_excluding(pool, owner_id, &exclude_ids) - .await - .map_err(|e| format!("Failed to get available daemons: {}", e))?; - - // Find a daemon with capacity - let available_daemon = daemons.iter().find(|d| { - d.current_task_count < d.max_concurrent_tasks - && state.daemon_connections.contains_key(&d.connection_id) - }); - - let daemon = match available_daemon { - Some(d) => d, - None => continue, // Try next task - }; - - // Get repo URL from task or contract - let repo_url = if let Some(url) = &task.repository_url { - Some(url.clone()) - } else { - match repository::list_contract_repositories(pool, contract_id).await { - Ok(repos) => repos - .iter() - .find(|r| r.is_primary) - .or(repos.first()) - .and_then(|r| r.repository_url.clone().or_else(|| r.local_path.clone())), - Err(_) => None, - } - }; - - // Update task with daemon assignment - let update_req = UpdateTaskRequest { - status: Some("starting".to_string()), - daemon_id: Some(daemon.id), - version: Some(task.version), - ..Default::default() - }; - - let updated_task = match repository::update_task_for_owner(pool, task.id, owner_id, update_req).await { - Ok(Some(t)) => t, - Ok(None) => continue, // Task was modified concurrently, try next - Err(e) => { - tracing::warn!(task_id = %task.id, error = %e, "Failed to update task for daemon assignment"); - continue; // Try next task - } - }; - - // For retried tasks, fetch checkpoint patch for worktree recovery - let (patch_data, patch_base_sha) = if task.retry_count > 0 { - // This is a retry - try to restore from checkpoint - match repository::get_latest_checkpoint_patch(pool, task.id).await { - Ok(Some(patch)) => { - tracing::info!( - task_id = %task.id, - retry_count = task.retry_count, - patch_size = patch.patch_size_bytes, - base_sha = %patch.base_commit_sha, - "Including checkpoint patch for task retry recovery" - ); - let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data); - (Some(encoded), Some(patch.base_commit_sha)) - } - Ok(None) => { - tracing::debug!(task_id = %task.id, "No checkpoint patch found for retry"); - (None, None) - } - Err(e) => { - tracing::warn!(task_id = %task.id, error = %e, "Failed to fetch checkpoint patch for retry"); - (None, None) - } - } - } else { - (None, None) - }; - - // Send spawn command - let cmd = DaemonCommand::SpawnTask { - task_id: updated_task.id, - task_name: updated_task.name.clone(), - plan: updated_task.plan.clone(), - repo_url, - base_branch: updated_task.base_branch.clone(), - target_branch: updated_task.target_branch.clone(), - parent_task_id: updated_task.parent_task_id, - depth: updated_task.depth, - is_orchestrator: false, - target_repo_path: updated_task.target_repo_path.clone(), - completion_action: updated_task.completion_action.clone(), - continue_from_task_id: updated_task.continue_from_task_id, - copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), - contract_id: updated_task.contract_id, - is_supervisor: updated_task.is_supervisor, - autonomous_loop: updated_task.is_supervisor, - resume_session: task.retry_count > 0, // Use --continue for retried tasks - conversation_history: None, - patch_data, - patch_base_sha, - local_only: contract.local_only, - auto_merge_local: contract.auto_merge_local, - // For retried tasks, use their own worktree (they already have state from previous attempt) - supervisor_worktree_task_id: None, - directive_id: updated_task.directive_id, - }; - - if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { - tracing::warn!(error = %e, daemon_id = %daemon.id, task_id = %task.id, "Failed to send spawn command"); - // Rollback - let rollback_req = UpdateTaskRequest { - status: Some("pending".to_string()), - clear_daemon_id: true, - ..Default::default() - }; - let _ = repository::update_task_for_owner(pool, task.id, owner_id, rollback_req).await; - continue; // Try next task - } - - tracing::info!(task_id = %task.id, daemon_id = %daemon.id, "Started pending task from wait loop"); - return Ok(Some(updated_task)); - } - - // No tasks could be started - Ok(None) -} - -// ============================================================================= -// Contract Task Handlers -// ============================================================================= - -/// List all tasks in a contract's tree. -#[utoipa::path( - get, - path = "/api/v1/mesh/supervisor/contracts/{contract_id}/tasks", - params( - ("contract_id" = Uuid, Path, description = "Contract ID") - ), - responses( - (status = 200, description = "List of tasks in contract", body = TaskTreeResponse), - (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - not a supervisor"), - (status = 500, description = "Internal server error"), - ), - tag = "Mesh Supervisor" -)] -pub async fn list_contract_tasks( - State(state): State<SharedState>, - Path(contract_id): Path<Uuid>, - headers: HeaderMap, -) -> impl IntoResponse { - let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, Some(contract_id)).await { - Ok(ids) => ids, - Err(e) => return e.into_response(), - }; - - let pool = state.db_pool.as_ref().unwrap(); - - // Get all tasks for this contract - match repository::list_tasks_by_contract(pool, contract_id, owner_id).await { - Ok(tasks) => { - let supervisor_task_id = tasks.iter().find(|t| t.is_supervisor).map(|t| t.id); - let summaries: Vec<TaskSummary> = tasks.into_iter().map(TaskSummary::from).collect(); - let total_count = summaries.len(); - - ( - StatusCode::OK, - Json(TaskTreeResponse { - tasks: summaries, - supervisor_task_id, - total_count, - }), - ).into_response() - } - Err(e) => { - tracing::error!(error = %e, "Failed to list contract tasks"); - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to list tasks")), - ).into_response() - } - } -} - -/// Get full task tree structure for a contract. -#[utoipa::path( - get, - path = "/api/v1/mesh/supervisor/contracts/{contract_id}/tree", - params( - ("contract_id" = Uuid, Path, description = "Contract ID") - ), - responses( - (status = 200, description = "Task tree structure", body = TaskTreeResponse), - (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - not a supervisor"), - (status = 500, description = "Internal server error"), - ), - tag = "Mesh Supervisor" -)] -pub async fn get_contract_tree( - State(state): State<SharedState>, - Path(contract_id): Path<Uuid>, - headers: HeaderMap, -) -> impl IntoResponse { - // Same as list_contract_tasks for now - can add tree structure later - list_contract_tasks(State(state), Path(contract_id), headers).await -} - -// ============================================================================= -// Task Spawn Handler -// ============================================================================= - -/// Spawn a new task (supervisor only). -#[utoipa::path( - post, - path = "/api/v1/mesh/supervisor/tasks", - request_body = SpawnTaskRequest, - responses( - (status = 201, description = "Task created", body = Task), - (status = 400, description = "Invalid request"), - (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - not a supervisor"), - (status = 500, description = "Internal server error"), - ), - tag = "Mesh Supervisor" -)] -pub async fn spawn_task( - State(state): State<SharedState>, - headers: HeaderMap, - Json(request): Json<SpawnTaskRequest>, -) -> impl IntoResponse { - let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, Some(request.contract_id)).await { - Ok(ids) => ids, - Err(e) => return e.into_response(), - }; - - let pool = state.db_pool.as_ref().unwrap(); - - // Verify contract exists and get local_only flag - let contract = match repository::get_contract_for_owner(pool, request.contract_id, owner_id).await { - Ok(Some(c)) => c, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Contract not found")), - ).into_response(); - } - Err(e) => { - tracing::error!(error = %e, "Failed to get contract"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to get contract")), - ).into_response(); - } - }; - - // Get repository URL - either from request or from contract's repositories - let repo_url = if let Some(url) = request.repository_url.clone() { - if !url.trim().is_empty() { - Some(url) - } else { - None - } - } else { - None - }; - - // If no repo URL provided, look it up from the contract - let repo_url = match repo_url { - Some(url) => Some(url), - None => { - match repository::list_contract_repositories(pool, request.contract_id).await { - Ok(repos) => { - // Prefer primary repo, fallback to first repo - let repo = repos.iter() - .find(|r| r.is_primary) - .or(repos.first()); - - // Use repository_url if set, otherwise use local_path - repo.and_then(|r| { - r.repository_url.clone() - .or_else(|| r.local_path.clone()) - }) - } - Err(e) => { - tracing::warn!(error = %e, "Failed to get contract repositories"); - None - } - } - } - }; - - // Validate that we have a repo URL - if repo_url.is_none() { - return ( - StatusCode::BAD_REQUEST, - Json(ApiError::new("MISSING_REPO_URL", "No repository URL found. Either provide one or ensure the contract has repositories configured.")), - ).into_response(); - } - - // Create task request - // All tasks share the supervisor's worktree - let supervisor_worktree_task_id = Some(supervisor_id); - - let create_req = CreateTaskRequest { - name: request.name.clone(), - description: None, - plan: request.plan.clone(), - repository_url: repo_url.clone(), - contract_id: Some(request.contract_id), - parent_task_id: request.parent_task_id, - is_supervisor: false, - checkpoint_sha: request.checkpoint_sha.clone(), - merge_mode: Some("manual".to_string()), - priority: 0, - base_branch: None, - target_branch: None, - target_repo_path: None, - completion_action: None, - continue_from_task_id: None, - copy_files: None, - branched_from_task_id: None, - conversation_history: None, - supervisor_worktree_task_id, - directive_id: None, - directive_step_id: None, - }; - - // Create task in DB - let task = match repository::create_task_for_owner(pool, owner_id, create_req).await { - Ok(t) => t, - Err(e) => { - tracing::error!(error = %e, "Failed to create task"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to create task")), - ).into_response(); - } - }; - - tracing::info!( - supervisor_id = %supervisor_id, - task_id = %task.id, - task_name = %task.name, - "Supervisor spawned new task" - ); - - // Record history event for task spawned by supervisor - let _ = repository::record_history_event( - pool, - owner_id, - task.contract_id, - Some(task.id), - "task", - Some("spawned"), - None, - serde_json::json!({ - "name": &task.name, - "spawnedBy": supervisor_id.to_string(), - }), - ).await; - - // Broadcast task creation notification to WebSocket subscribers - state.broadcast_task_update(TaskUpdateNotification { - task_id: task.id, - owner_id: Some(owner_id), - version: task.version, - status: task.status.clone(), - updated_fields: vec!["created".to_string()], - updated_by: "supervisor".to_string(), - }); - - // Start task on a daemon - // Find a daemon that belongs to this owner - let mut updated_task = task; - for entry in state.daemon_connections.iter() { - let daemon = entry.value(); - if daemon.owner_id == owner_id { - // IMPORTANT: Update database FIRST to assign daemon_id before sending command - // This prevents race conditions where the task starts but daemon_id is not set - let update_req = UpdateTaskRequest { - status: Some("starting".to_string()), - daemon_id: Some(daemon.id), - version: Some(updated_task.version), - ..Default::default() - }; - - match repository::update_task_for_owner(pool, updated_task.id, owner_id, update_req).await { - Ok(Some(t)) => { - updated_task = t; - } - Ok(None) => { - tracing::warn!(task_id = %updated_task.id, "Task not found when updating daemon_id"); - break; - } - Err(e) => { - tracing::error!(task_id = %updated_task.id, error = %e, "Failed to update task with daemon_id"); - break; - } - } - - // Send spawn command to daemon - let cmd = DaemonCommand::SpawnTask { - task_id: updated_task.id, - task_name: updated_task.name.clone(), - plan: updated_task.plan.clone(), - repo_url: repo_url.clone(), - base_branch: updated_task.base_branch.clone(), - target_branch: updated_task.target_branch.clone(), - parent_task_id: updated_task.parent_task_id, - depth: updated_task.depth, - is_orchestrator: false, - target_repo_path: updated_task.target_repo_path.clone(), - completion_action: updated_task.completion_action.clone(), - continue_from_task_id: updated_task.continue_from_task_id, - copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), - contract_id: updated_task.contract_id, - is_supervisor: false, - autonomous_loop: false, - resume_session: false, - conversation_history: None, - patch_data: None, - patch_base_sha: None, - local_only: contract.local_only, - auto_merge_local: contract.auto_merge_local, - // All tasks share the supervisor's worktree - supervisor_worktree_task_id: Some(supervisor_id), - directive_id: updated_task.directive_id, - }; - - if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { - tracing::warn!(error = %e, daemon_id = %daemon.id, "Failed to send spawn command"); - // Rollback: clear daemon_id and reset status since command failed - let rollback_req = UpdateTaskRequest { - status: Some("pending".to_string()), - clear_daemon_id: true, - ..Default::default() - }; - let _ = repository::update_task_for_owner(pool, updated_task.id, owner_id, rollback_req).await; - } else { - tracing::info!(task_id = %updated_task.id, daemon_id = %daemon.id, repo_url = ?repo_url, "Task spawn command sent"); - - // Save state: task spawn is a key save point (Task 3.3) - save_state_on_task_spawn(pool, request.contract_id, updated_task.id).await; - - // Broadcast task status update notification to WebSocket subscribers - state.broadcast_task_update(TaskUpdateNotification { - task_id: updated_task.id, - owner_id: Some(owner_id), - version: updated_task.version, - status: "starting".to_string(), - updated_fields: vec!["status".to_string(), "daemon_id".to_string()], - updated_by: "supervisor".to_string(), - }); - - } - break; - } - } - - (StatusCode::CREATED, Json(updated_task)).into_response() -} - // ============================================================================= -// Wait for Task Handler +// Question types // ============================================================================= -/// Wait for a task to complete. -#[utoipa::path( - post, - path = "/api/v1/mesh/supervisor/tasks/{task_id}/wait", - params( - ("task_id" = Uuid, Path, description = "Task ID to wait for") - ), - request_body = WaitForTaskRequest, - responses( - (status = 200, description = "Task completed or timed out", body = WaitResponse), - (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - not a supervisor"), - (status = 404, description = "Task not found"), - (status = 500, description = "Internal server error"), - ), - tag = "Mesh Supervisor" -)] -pub async fn wait_for_task( - State(state): State<SharedState>, - Path(task_id): Path<Uuid>, - headers: HeaderMap, - Json(request): Json<WaitForTaskRequest>, -) -> impl IntoResponse { - let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { - Ok(ids) => ids, - Err(e) => return e.into_response(), - }; - - let pool = state.db_pool.as_ref().unwrap(); - - // Verify task belongs to same owner - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Task not found")), - ).into_response(); - } - Err(e) => { - tracing::error!(error = %e, "Failed to get task"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to get task")), - ).into_response(); - } - }; - - // Check if already done - if task.status == "done" || task.status == "failed" || task.status == "merged" { - return ( - StatusCode::OK, - Json(WaitResponse { - task_id, - status: task.status, - completed: true, - output_summary: None, - }), - ).into_response(); - } - - // Get contract_id for pending task scheduling - let contract_id = task.contract_id; - - // Subscribe to task completions - let mut rx = state.task_completions.subscribe(); - let timeout = tokio::time::Duration::from_secs(request.timeout_seconds as u64); - - // Wait for completion or timeout, periodically trying to start pending tasks - let result = tokio::time::timeout(timeout, async { - let mut pending_check_interval = tokio::time::interval(tokio::time::Duration::from_secs(5)); - pending_check_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - - loop { - tokio::select! { - // Check for task completion notifications - recv_result = rx.recv() => { - match recv_result { - Ok(notification) => { - if notification.task_id == task_id { - return Some(notification); - } - } - Err(_) => { - // Channel closed or lagged - check DB directly - if let Ok(Some(t)) = repository::get_task(pool, task_id).await { - if t.status == "done" || t.status == "failed" || t.status == "merged" { - return Some(crate::server::state::TaskCompletionNotification { - task_id: t.id, - owner_id: Some(t.owner_id), - contract_id: t.contract_id, - parent_task_id: t.parent_task_id, - status: t.status, - output_summary: None, - worktree_path: None, - error_message: t.error_message, - }); - } - } - } - } - } - // Periodically try to start pending tasks - _ = pending_check_interval.tick() => { - if let Some(cid) = contract_id { - match try_start_pending_task(&state, cid, owner_id).await { - Ok(Some(started_task)) => { - tracing::debug!( - task_id = %started_task.id, - task_name = %started_task.name, - "Started pending task while waiting" - ); - } - Ok(None) => { - // No pending tasks or no capacity - that's fine - } - Err(e) => { - tracing::warn!(error = %e, "Error trying to start pending task"); - } - } - } - } - } - } - }).await; - - match result { - Ok(Some(notification)) => { - ( - StatusCode::OK, - Json(WaitResponse { - task_id, - status: notification.status, - completed: true, - output_summary: notification.output_summary, - }), - ).into_response() - } - Ok(None) | Err(_) => { - // Timeout - check final status - let final_status = repository::get_task(pool, task_id) - .await - .ok() - .flatten() - .map(|t| t.status) - .unwrap_or_else(|| "unknown".to_string()); - - ( - StatusCode::OK, - Json(WaitResponse { - task_id, - status: final_status.clone(), - completed: final_status == "done" || final_status == "failed" || final_status == "merged", - output_summary: None, - }), - ).into_response() - } - } -} - -// ============================================================================= -// Read Worktree File Handler -// ============================================================================= - -/// Read a file from a task's worktree. -#[utoipa::path( - post, - path = "/api/v1/mesh/supervisor/tasks/{task_id}/read-file", - params( - ("task_id" = Uuid, Path, description = "Task ID") - ), - request_body = ReadWorktreeFileRequest, - responses( - (status = 200, description = "File content", body = ReadFileResponse), - (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - not a supervisor"), - (status = 404, description = "Task not found"), - (status = 500, description = "Internal server error"), - ), - tag = "Mesh Supervisor" -)] -pub async fn read_worktree_file( - State(state): State<SharedState>, - Path(task_id): Path<Uuid>, - headers: HeaderMap, - Json(request): Json<ReadWorktreeFileRequest>, -) -> impl IntoResponse { - let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { - Ok(ids) => ids, - Err(e) => return e.into_response(), - }; - - let pool = state.db_pool.as_ref().unwrap(); - - // Get task to verify ownership - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Task not found")), - ).into_response(); - } - Err(e) => { - tracing::error!(error = %e, "Failed to get task"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to get task")), - ).into_response(); - } - }; - - // TODO: Implement file reading via worktree path - // For now, return not implemented - supervisor should use local file access via worktree - let _ = (task, request); - - ( - StatusCode::NOT_IMPLEMENTED, - Json(ApiError::new( - "NOT_IMPLEMENTED", - "Worktree file reading via API not yet implemented. Use local filesystem access via worktree path.", - )), - ).into_response() -} - -// ============================================================================= -// Checkpoint Handlers -// ============================================================================= - -/// Create a git checkpoint for a task. -#[utoipa::path( - post, - path = "/api/v1/mesh/tasks/{task_id}/checkpoint", - params( - ("task_id" = Uuid, Path, description = "Task ID") - ), - request_body = CreateCheckpointRequest, - responses( - (status = 202, description = "Checkpoint creation accepted", body = CheckpointResponse), - (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - can only create checkpoint for own task"), - (status = 404, description = "Task not found"), - (status = 500, description = "Internal server error"), - (status = 503, description = "Task has no assigned daemon"), - ), - tag = "Mesh Supervisor" -)] -pub async fn create_checkpoint( - State(state): State<SharedState>, - Path(task_id): Path<Uuid>, - headers: HeaderMap, - Json(request): Json<CreateCheckpointRequest>, -) -> impl IntoResponse { - let auth = extract_auth(&state, &headers); - - let task_id_from_auth = match auth { - AuthSource::ToolKey(tid) => tid, - _ => { - return ( - StatusCode::UNAUTHORIZED, - Json(ApiError::new("UNAUTHORIZED", "Tool key required")), - ).into_response(); - } - }; - - // Can only create checkpoint for own task - if task_id_from_auth != task_id { - return ( - StatusCode::FORBIDDEN, - Json(ApiError::new("FORBIDDEN", "Can only create checkpoint for own task")), - ).into_response(); - } - - let pool = state.db_pool.as_ref().unwrap(); - - // Get task and daemon_id - let task = match repository::get_task(pool, task_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Task not found")), - ).into_response(); - } - Err(e) => { - tracing::error!(error = %e, "Failed to get task"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to get task")), - ).into_response(); - } - }; - - let Some(daemon_id) = task.daemon_id else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")), - ).into_response(); - }; - - // Send CreateCheckpoint command to daemon - let cmd = DaemonCommand::CreateCheckpoint { - task_id, - message: request.message.clone(), - }; - - if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { - tracing::error!(error = %e, "Failed to send CreateCheckpoint command"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), - ).into_response(); - } - - // Return accepted - the checkpoint result will be delivered via WebSocket - // and stored in the database by the daemon message handler - ( - StatusCode::ACCEPTED, - Json(CheckpointResponse { - task_id, - checkpoint_number: 0, // Will be assigned by DB on actual creation - commit_sha: "pending".to_string(), - message: request.message, - }), - ).into_response() -} - -/// List checkpoints for a task. -#[utoipa::path( - get, - path = "/api/v1/mesh/tasks/{task_id}/checkpoints", - params( - ("task_id" = Uuid, Path, description = "Task ID") - ), - responses( - (status = 200, description = "List of checkpoints", body = CheckpointListResponse), - (status = 401, description = "Unauthorized"), - (status = 404, description = "Task not found"), - (status = 500, description = "Internal server error"), - ), - tag = "Mesh Supervisor" -)] -pub async fn list_checkpoints( - State(state): State<SharedState>, - Path(task_id): Path<Uuid>, - headers: HeaderMap, -) -> impl IntoResponse { - let auth = extract_auth(&state, &headers); - - let _task_id_from_auth = match auth { - AuthSource::ToolKey(tid) => tid, - _ => { - return ( - StatusCode::UNAUTHORIZED, - Json(ApiError::new("UNAUTHORIZED", "Tool key required")), - ).into_response(); - } - }; - - let pool = state.db_pool.as_ref().unwrap(); - - // Get checkpoints from DB - match repository::list_task_checkpoints(pool, task_id).await { - Ok(checkpoints) => { - let checkpoint_list: Vec<TaskCheckpoint> = checkpoints - .into_iter() - .map(|c| TaskCheckpoint { - id: c.id, - task_id: c.task_id, - checkpoint_number: c.checkpoint_number, - commit_sha: c.commit_sha, - branch_name: c.branch_name, - message: c.message, - files_changed: c.files_changed, - lines_added: c.lines_added.unwrap_or(0), - lines_removed: c.lines_removed.unwrap_or(0), - created_at: c.created_at, - }) - .collect(); - - ( - StatusCode::OK, - Json(CheckpointListResponse { - task_id, - checkpoints: checkpoint_list, - }), - ).into_response() - } - Err(e) => { - tracing::error!(error = %e, "Failed to list checkpoints"); - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to list checkpoints")), - ).into_response() - } - } -} - -// ============================================================================= -// Git Operations - Request/Response Types -// ============================================================================= - -/// Request to create a new branch. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] -pub struct CreateBranchRequest { - pub branch_name: String, - pub from_ref: Option<String>, +pub struct AskQuestionRequest { + pub question: String, + #[serde(default)] + pub choices: Vec<String>, + pub context: Option<String>, + #[serde(default = "default_question_timeout")] + pub timeout_seconds: i32, + /// When true the request blocks until the user responds (no + /// timeout) — the CLI reconnects via the poll endpoint if the + /// server-side timeout is reached. + #[serde(default)] + pub phaseguard: bool, + #[serde(default)] + pub multi_select: bool, + /// Return immediately without waiting for a response. + #[serde(default)] + pub non_blocking: bool, + /// Question type: general, phase_confirmation, contract_complete. + #[serde(default = "default_question_type")] + pub question_type: String, } -/// Response for branch creation. -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct CreateBranchResponse { - pub success: bool, - pub branch_name: String, - pub message: String, +fn default_question_type() -> String { + "general".to_string() } -/// Request to merge task changes. -#[derive(Debug, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct MergeTaskRequest { - pub target_branch: Option<String>, - #[serde(default)] - pub squash: bool, +fn default_question_timeout() -> i32 { + 3600 } -/// Response for merge operation. -#[derive(Debug, Serialize, ToSchema)] +#[derive(Debug, Serialize, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] -pub struct MergeTaskResponse { - pub task_id: Uuid, - pub success: bool, - pub message: String, - pub commit_sha: Option<String>, - pub conflicts: Option<Vec<String>>, +pub struct AskQuestionResponse { + pub question_id: Uuid, + pub response: Option<String>, + pub timed_out: bool, + /// Server-side timeout was reached but the question is still + /// pending. CLI should re-poll via `/poll`. + #[serde(default)] + pub still_pending: bool, } -/// Request to create a pull request. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] -pub struct CreatePRRequest { - pub branch: String, - pub title: String, - pub body: Option<String>, +pub struct AnswerQuestionRequest { + pub response: String, } -/// Response for PR creation. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] -pub struct CreatePRResponse { - pub task_id: Uuid, +pub struct AnswerQuestionResponse { pub success: bool, - pub message: String, - pub pr_url: Option<String>, - pub pr_number: Option<i32>, } -/// Response for task diff. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] -pub struct TaskDiffResponse { +pub struct PendingQuestionSummary { + pub question_id: Uuid, pub task_id: Uuid, - pub success: bool, - pub diff: Option<String>, - pub error: Option<String>, -} - -// ============================================================================= -// Git Operations - Handlers -// ============================================================================= - -/// Create a new branch from supervisor's worktree. -#[utoipa::path( - post, - path = "/api/v1/mesh/supervisor/branches", - request_body = CreateBranchRequest, - responses( - (status = 201, description = "Branch created", body = CreateBranchResponse), - (status = 400, description = "Invalid request"), - (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - not a supervisor"), - (status = 500, description = "Internal server error"), - ), - tag = "Mesh Supervisor" -)] -pub async fn create_branch( - State(state): State<SharedState>, - headers: HeaderMap, - Json(request): Json<CreateBranchRequest>, -) -> impl IntoResponse { - let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { - Ok(ids) => ids, - Err(e) => return e.into_response(), - }; - - // Find daemon running supervisor - let daemon_id = { - let pool = state.db_pool.as_ref().unwrap(); - match repository::get_task(pool, supervisor_id).await { - Ok(Some(task)) => task.daemon_id, - _ => None, - } - }; - - let Some(daemon_id) = daemon_id else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(ApiError::new("NO_DAEMON", "Supervisor has no assigned daemon")), - ).into_response(); - }; - - // Send CreateBranch command to daemon - let cmd = DaemonCommand::CreateBranch { - task_id: supervisor_id, - branch_name: request.branch_name.clone(), - from_ref: request.from_ref, - }; - - if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { - tracing::error!(error = %e, "Failed to send CreateBranch command"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), - ).into_response(); - } - - // Note: Real implementation would wait for daemon response - // For now, return success immediately - daemon will send response via WebSocket - ( - StatusCode::CREATED, - Json(CreateBranchResponse { - success: true, - branch_name: request.branch_name, - message: "Branch creation command sent".to_string(), - }), - ).into_response() -} - -/// Merge a task's changes to a target branch. -#[utoipa::path( - post, - path = "/api/v1/mesh/supervisor/tasks/{task_id}/merge", - params( - ("task_id" = Uuid, Path, description = "Task ID to merge") - ), - request_body = MergeTaskRequest, - responses( - (status = 200, description = "Merge initiated", body = MergeTaskResponse), - (status = 400, description = "Invalid request"), - (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - not a supervisor"), - (status = 404, description = "Task not found"), - (status = 500, description = "Internal server error"), - ), - tag = "Mesh Supervisor" -)] -pub async fn merge_task( - State(state): State<SharedState>, - Path(task_id): Path<Uuid>, - headers: HeaderMap, - Json(request): Json<MergeTaskRequest>, -) -> impl IntoResponse { - let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { - Ok(ids) => ids, - Err(e) => return e.into_response(), - }; - - let pool = state.db_pool.as_ref().unwrap(); - - // Get the target task - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Task not found")), - ).into_response(); - } - Err(e) => { - tracing::error!(error = %e, "Failed to get task"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to get task")), - ).into_response(); - } - }; - - // Get daemon running the task - let Some(daemon_id) = task.daemon_id else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")), - ).into_response(); - }; - - // Subscribe to merge results BEFORE sending the command - let mut rx = state.merge_results.subscribe(); - - // Send MergeTaskToTarget command to daemon - let cmd = DaemonCommand::MergeTaskToTarget { - task_id, - target_branch: request.target_branch, - squash: request.squash, - }; - - if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { - tracing::error!(error = %e, "Failed to send MergeTaskToTarget command"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), - ).into_response(); - } - - // Wait for the merge result with a timeout (60 seconds should be plenty for a merge) - let timeout = tokio::time::Duration::from_secs(60); - let result = tokio::time::timeout(timeout, async { - loop { - match rx.recv().await { - Ok(notification) => { - if notification.task_id == task_id { - return Some(notification); - } - // Not our task, keep waiting - } - Err(_) => { - // Channel closed or lagged - return None; - } - } - } - }).await; - - match result { - Ok(Some(notification)) => { - ( - StatusCode::OK, - Json(MergeTaskResponse { - task_id, - success: notification.success, - message: notification.message, - commit_sha: notification.commit_sha, - conflicts: notification.conflicts, - }), - ).into_response() - } - Ok(None) | Err(_) => { - // Timeout or channel error - return error status - ( - StatusCode::GATEWAY_TIMEOUT, - Json(MergeTaskResponse { - task_id, - success: false, - message: "Merge operation timed out waiting for daemon response".to_string(), - commit_sha: None, - conflicts: None, - }), - ).into_response() - } - } -} - -/// Create a pull request for a task's changes. -#[utoipa::path( - post, - path = "/api/v1/mesh/supervisor/pr", - request_body = CreatePRRequest, - responses( - (status = 201, description = "PR created", body = CreatePRResponse), - (status = 400, description = "Invalid request"), - (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - not a supervisor"), - (status = 404, description = "Task not found"), - (status = 500, description = "Internal server error"), - ), - tag = "Mesh Supervisor" -)] -pub async fn create_pr( - State(state): State<SharedState>, - headers: HeaderMap, - Json(request): Json<CreatePRRequest>, -) -> impl IntoResponse { - let (supervisor_id, _owner_id) = match verify_supervisor_auth(&state, &headers, None).await { - Ok(ids) => ids, - Err(e) => return e.into_response(), - }; - - let pool = state.db_pool.as_ref().unwrap(); - - // Get the supervisor's own task to find daemon and base_branch - let task = match repository::get_task(pool, supervisor_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Supervisor task not found")), - ).into_response(); - } - Err(e) => { - tracing::error!(error = %e, "Failed to get supervisor task"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to get supervisor task")), - ).into_response(); - } - }; - - // Get daemon running the supervisor - let Some(daemon_id) = task.daemon_id else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(ApiError::new("NO_DAEMON", "Supervisor has no assigned daemon")), - ).into_response(); - }; - - // Subscribe to PR results BEFORE sending the command - let mut rx = state.pr_results.subscribe(); - - // Send CreatePR command to daemon using the supervisor's task ID - // (the branch is in the supervisor's worktree) - // Pass base_branch from task if available, otherwise daemon will auto-detect - let cmd = DaemonCommand::CreatePR { - task_id: supervisor_id, - title: request.title.clone(), - body: request.body.clone(), - base_branch: task.base_branch.clone(), - branch: request.branch.clone(), - }; - - if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { - tracing::error!(error = %e, "Failed to send CreatePR command"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), - ).into_response(); - } - - // Wait for the PR result with a timeout (60 seconds should be plenty for PR creation) - let timeout = tokio::time::Duration::from_secs(60); - let result = tokio::time::timeout(timeout, async { - loop { - match rx.recv().await { - Ok(notification) => { - if notification.task_id == supervisor_id { - return Some(notification); - } - // Not our task, keep waiting - } - Err(_) => { - // Channel closed or lagged - return None; - } - } - } - }).await; - - match result { - Ok(Some(notification)) => { - let status = if notification.success { - StatusCode::CREATED - } else { - StatusCode::INTERNAL_SERVER_ERROR - }; - ( - status, - Json(CreatePRResponse { - task_id: supervisor_id, - success: notification.success, - message: notification.message, - pr_url: notification.pr_url, - pr_number: notification.pr_number, - }), - ).into_response() - } - Ok(None) | Err(_) => { - // Timeout or channel error - return error status - ( - StatusCode::GATEWAY_TIMEOUT, - Json(CreatePRResponse { - task_id: supervisor_id, - success: false, - message: "PR creation timed out waiting for daemon response".to_string(), - pr_url: None, - pr_number: None, - }), - ).into_response() - } - } -} - -/// Get the diff for a task's changes. -#[utoipa::path( - get, - path = "/api/v1/mesh/supervisor/tasks/{task_id}/diff", - params( - ("task_id" = Uuid, Path, description = "Task ID") - ), - responses( - (status = 200, description = "Task diff", body = TaskDiffResponse), - (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - not a supervisor"), - (status = 404, description = "Task not found"), - (status = 500, description = "Internal server error"), - ), - tag = "Mesh Supervisor" -)] -pub async fn get_task_diff( - State(state): State<SharedState>, - Path(task_id): Path<Uuid>, - headers: HeaderMap, -) -> impl IntoResponse { - let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { - Ok(ids) => ids, - Err(e) => return e.into_response(), - }; - - let pool = state.db_pool.as_ref().unwrap(); - - // Get the target task - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Task not found")), - ).into_response(); - } - Err(e) => { - tracing::error!(error = %e, "Failed to get task"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to get task")), - ).into_response(); - } - }; - - // Get daemon running the task - let Some(daemon_id) = task.daemon_id else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")), - ).into_response(); - }; - - // Send GetTaskDiff command to daemon - let cmd = DaemonCommand::GetTaskDiff { task_id }; - - if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { - tracing::error!(error = %e, "Failed to send GetTaskDiff command"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), - ).into_response(); - } - - ( - StatusCode::OK, - Json(TaskDiffResponse { - task_id, - success: true, - diff: None, - error: Some("Diff command sent - response will be streamed".to_string()), - }), - ).into_response() + #[serde(skip_serializing_if = "Option::is_none")] + pub directive_id: Option<Uuid>, + pub question: String, + pub choices: Vec<String>, + pub context: Option<String>, + pub created_at: chrono::DateTime<chrono::Utc>, + #[serde(default)] + pub multi_select: bool, + #[serde(default)] + pub question_type: String, } // ============================================================================= -// Supervisor Question Handlers +// Question handlers // ============================================================================= -/// Ask a question and wait for user feedback. -/// -/// The supervisor calls this to ask a question. The endpoint will poll until -/// either the user responds or the timeout is reached. +/// Ask the user a question from a directive task. Blocks until the user +/// answers, the timeout fires, or `non_blocking` returns immediately. #[utoipa::path( post, path = "/api/v1/mesh/supervisor/questions", request_body = AskQuestionRequest, responses( - (status = 200, description = "Question answered", body = AskQuestionResponse), - (status = 408, description = "Question timed out", body = AskQuestionResponse), + (status = 200, description = "Question asked", body = AskQuestionResponse), (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - not a supervisor"), - (status = 500, description = "Internal server error"), - ), - security( - ("tool_key" = []) + (status = 403, description = "Not a directive task"), ), + security(("tool_key" = [])), tag = "Mesh Supervisor" )] pub async fn ask_question( @@ -1676,67 +184,49 @@ pub async fn ask_question( headers: HeaderMap, Json(request): Json<AskQuestionRequest>, ) -> impl IntoResponse { - let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { + let (task_id, owner_id) = match verify_task_auth(&state, &headers).await { Ok(ids) => ids, Err(e) => return e.into_response(), }; let pool = state.db_pool.as_ref().unwrap(); - // Get the supervisor task to find its contract - let supervisor = match repository::get_task_for_owner(pool, supervisor_id, owner_id).await { + // Pull the directive_id off the calling task so subscribers can + // route the question to the right directive view. + let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Supervisor task not found")), - ).into_response(); + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); } Err(e) => { - tracing::error!(error = %e, "Failed to get supervisor task"); + tracing::error!(error = %e, "Failed to fetch task"); return ( StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to get supervisor task")), - ).into_response(); + Json(ApiError::new("DB_ERROR", "Failed to fetch task")), + ) + .into_response(); } }; - // Determine context: contract or directive - let contract_id = supervisor.contract_id; - let directive_id = supervisor.directive_id; + let directive_id = task.directive_id; - if contract_id.is_none() && directive_id.is_none() { - return ( - StatusCode::BAD_REQUEST, - Json(ApiError::new("NO_CONTEXT", "Supervisor has no associated contract or directive")), - ).into_response(); - } - - let is_directive_context = directive_id.is_some() && contract_id.is_none(); - - // For directive context, check reconcile_mode to determine behavior - let directive_reconcile_mode: String = if let Some(did) = directive_id { - if is_directive_context { - match repository::get_directive_for_owner(pool, owner_id, did).await { - Ok(Some(d)) => d.reconcile_mode.clone(), - Ok(None) => "auto".to_string(), - Err(e) => { - tracing::warn!(error = %e, "Failed to get directive for reconcile_mode check"); - "auto".to_string() - } - } - } else { - "auto".to_string() - } - } else { - "auto".to_string() + // Reconcile mode controls block-vs-timeout behaviour on directive + // tasks: semi-auto / manual block indefinitely (effectively + // phaseguard); auto times out after 30s. + let reconcile_mode: String = match directive_id { + Some(did) => match repository::get_directive_for_owner(pool, owner_id, did).await { + Ok(Some(d)) => d.reconcile_mode.clone(), + _ => "auto".to_string(), + }, + None => "auto".to_string(), }; - // Add the question (use Uuid::nil() for contract_id in directive-only context) - let effective_contract_id = contract_id.unwrap_or(Uuid::nil()); - let question_id = state.add_supervisor_question_with_directive( - supervisor_id, - effective_contract_id, + let question_id = state.add_supervisor_question( + task_id, directive_id, owner_id, request.question.clone(), @@ -1746,60 +236,6 @@ pub async fn ask_question( request.question_type.clone(), ); - // Save state: question asked is a key save point (Task 3.3) - // Only for contract context — directive tasks don't use supervisor_states table - if let Some(cid) = contract_id { - let pending_question = PendingQuestion { - id: question_id, - question: request.question.clone(), - choices: request.choices.clone(), - context: request.context.clone(), - question_type: request.question_type.clone(), - asked_at: chrono::Utc::now(), - }; - save_state_on_question_asked(pool, cid, pending_question).await; - } - - // Broadcast question as task output entry for the task's chat - let question_data = serde_json::json!({ - "question_id": question_id.to_string(), - "choices": request.choices, - "context": request.context, - "multi_select": request.multi_select, - "question_type": request.question_type, - }); - state.broadcast_task_output(TaskOutputNotification { - task_id: supervisor_id, - owner_id: Some(owner_id), - message_type: "supervisor_question".to_string(), - content: request.question.clone(), - tool_name: None, - tool_input: Some(question_data.clone()), - is_error: None, - cost_usd: None, - duration_ms: None, - is_partial: false, - }); - - // Persist to database so it appears when reloading the page - // Use event_type "output" with messageType "supervisor_question" to match TaskOutputEntry format - if let Some(pool) = state.db_pool.as_ref() { - let event_data = serde_json::json!({ - "messageType": "supervisor_question", - "content": request.question, - "toolInput": question_data, - }); - let _ = repository::create_task_event( - pool, - supervisor_id, - "output", - None, - None, - Some(event_data), - ).await; - } - - // If non_blocking mode, return immediately if request.non_blocking { return ( StatusCode::OK, @@ -1809,41 +245,28 @@ pub async fn ask_question( timed_out: false, still_pending: false, }), - ).into_response(); + ) + .into_response(); } - // Determine if we should block indefinitely (phaseguard or directive reconcile mode) - let use_phaseguard = request.phaseguard || (is_directive_context && (directive_reconcile_mode == "semi-auto" || directive_reconcile_mode == "manual")); - - // Poll for response with timeout - // - Phaseguard: block indefinitely until user responds - // - Directive tasks without reconcile mode: 30s default timeout - // - Contract tasks: use requested timeout_seconds + // Determine block behaviour. + let use_phaseguard = + request.phaseguard || reconcile_mode == "semi-auto" || reconcile_mode == "manual"; let timeout_secs = if use_phaseguard { - // Cap at 5 minutes per HTTP request (well under Claude Code's 10-min limit). - // The CLI will automatically reconnect via the poll endpoint. 300 - } else if is_directive_context && directive_reconcile_mode == "auto" { + } else if reconcile_mode == "auto" { 30 } else { request.timeout_seconds.max(1) as u64 }; + let timeout_duration = std::time::Duration::from_secs(timeout_secs); let start = std::time::Instant::now(); let poll_interval = std::time::Duration::from_millis(500); loop { - // Check if response has been submitted if let Some(response) = state.get_question_response(question_id) { - // Clean up the response state.cleanup_question_response(question_id); - - // Clear pending question from supervisor state (Task 3.3) - // Skip for directive context — no supervisor_states for directives - if let Some(cid) = contract_id { - clear_pending_question(pool, cid, question_id).await; - } - return ( StatusCode::OK, Json(AskQuestionResponse { @@ -1852,14 +275,12 @@ pub async fn ask_question( timed_out: false, still_pending: false, }), - ).into_response(); + ) + .into_response(); } - // Check timeout if start.elapsed() >= timeout_duration { if use_phaseguard { - // Phaseguard/reconcile: DON'T remove the pending question. - // Return still_pending so the CLI can reconnect via the poll endpoint. return ( StatusCode::OK, Json(AskQuestionResponse { @@ -1868,18 +289,10 @@ pub async fn ask_question( timed_out: false, still_pending: true, }), - ).into_response(); + ) + .into_response(); } - - // Non-phaseguard: remove the pending question on timeout state.remove_pending_question(question_id); - - // Clear pending question from supervisor state on timeout (Task 3.3) - // Skip for directive context — no supervisor_states for directives - if let Some(cid) = contract_id { - clear_pending_question(pool, cid, question_id).await; - } - return ( StatusCode::REQUEST_TIMEOUT, Json(AskQuestionResponse { @@ -1888,34 +301,25 @@ pub async fn ask_question( timed_out: true, still_pending: false, }), - ).into_response(); + ) + .into_response(); } - // Wait before polling again tokio::time::sleep(poll_interval).await; } } -/// Poll for a question response by question_id. -/// -/// Used by the CLI to reconnect after a still_pending response from ask_question. -/// Blocks for up to 5 minutes polling every 500ms. Returns still_pending if timeout -/// is reached without a response, allowing the CLI to reconnect again. +/// Re-poll a question by id. Used by the CLI to reconnect after +/// `still_pending` from `ask_question`. Blocks up to 5 minutes. #[utoipa::path( get, path = "/api/v1/mesh/supervisor/questions/{question_id}/poll", - params( - ("question_id" = Uuid, Path, description = "The question ID to poll for"), - ), + params(("question_id" = Uuid, Path, description = "Question id")), responses( - (status = 200, description = "Question answered or still pending", body = AskQuestionResponse), - (status = 404, description = "Question not found"), - (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - not a supervisor"), - ), - security( - ("tool_key" = []) + (status = 200, description = "Answered or still pending", body = AskQuestionResponse), + (status = 404, description = "Not found"), ), + security(("tool_key" = [])), tag = "Mesh Supervisor" )] pub async fn poll_question( @@ -1923,23 +327,16 @@ pub async fn poll_question( headers: HeaderMap, Path(question_id): Path<Uuid>, ) -> impl IntoResponse { - let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { - Ok(ids) => ids, - Err(e) => return e.into_response(), - }; + if verify_task_auth(&state, &headers).await.is_err() { + return ( + StatusCode::UNAUTHORIZED, + Json(ApiError::new("UNAUTHORIZED", "Tool key required")), + ) + .into_response(); + } - // Check if a response is already available if let Some(response) = state.get_question_response(question_id) { state.cleanup_question_response(question_id); - - // Clear pending question from supervisor state for contract context - let pool = state.db_pool.as_ref().unwrap(); - if let Ok(Some(task)) = repository::get_task_for_owner(pool, supervisor_id, owner_id).await { - if let Some(cid) = task.contract_id { - clear_pending_question(pool, cid, question_id).await; - } - } - return ( StatusCode::OK, Json(AskQuestionResponse { @@ -1948,35 +345,25 @@ pub async fn poll_question( timed_out: false, still_pending: false, }), - ).into_response(); + ) + .into_response(); } - // Check if the question exists at all (pending or response) - if !state.has_pending_question(question_id) { + if state.get_pending_question(question_id).is_none() { return ( StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Question not found or already answered")), - ).into_response(); + Json(ApiError::new("NOT_FOUND", "Question not found")), + ) + .into_response(); } - // Block for up to 5 minutes polling every 500ms - let timeout_duration = std::time::Duration::from_secs(300); + let timeout = std::time::Duration::from_secs(300); let start = std::time::Instant::now(); let poll_interval = std::time::Duration::from_millis(500); loop { - // Check if response has been submitted if let Some(response) = state.get_question_response(question_id) { state.cleanup_question_response(question_id); - - // Clear pending question from supervisor state for contract context - let pool = state.db_pool.as_ref().unwrap(); - if let Ok(Some(task)) = repository::get_task_for_owner(pool, supervisor_id, owner_id).await { - if let Some(cid) = task.contract_id { - clear_pending_question(pool, cid, question_id).await; - } - } - return ( StatusCode::OK, Json(AskQuestionResponse { @@ -1985,19 +372,10 @@ pub async fn poll_question( timed_out: false, still_pending: false, }), - ).into_response(); - } - - // Check if the question was removed (e.g., task deleted) - if !state.has_pending_question(question_id) { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Question no longer exists")), - ).into_response(); + ) + .into_response(); } - - // Check timeout - if start.elapsed() >= timeout_duration { + if start.elapsed() >= timeout { return ( StatusCode::OK, Json(AskQuestionResponse { @@ -2006,27 +384,21 @@ pub async fn poll_question( timed_out: false, still_pending: true, }), - ).into_response(); + ) + .into_response(); } - - // Wait before polling again tokio::time::sleep(poll_interval).await; } } -/// Get all pending questions for the current user. +/// List currently-pending questions for the caller. #[utoipa::path( get, path = "/api/v1/mesh/questions", responses( - (status = 200, description = "List of pending questions", body = Vec<PendingQuestionSummary>), - (status = 401, description = "Unauthorized"), - (status = 500, description = "Internal server error"), - ), - security( - ("bearer_auth" = []), - ("api_key" = []) + (status = 200, description = "Pending questions", body = Vec<PendingQuestionSummary>), ), + security(("bearer_auth" = []), ("api_key" = [])), tag = "Mesh" )] pub async fn list_pending_questions( @@ -2039,7 +411,6 @@ pub async fn list_pending_questions( .map(|q| PendingQuestionSummary { question_id: q.question_id, task_id: q.task_id, - contract_id: q.contract_id, directive_id: q.directive_id, question: q.question, choices: q.choices, @@ -2053,1051 +424,59 @@ pub async fn list_pending_questions( Json(questions).into_response() } -/// Answer a pending supervisor question. +/// Answer a pending question. #[utoipa::path( post, path = "/api/v1/mesh/questions/{question_id}/answer", - params( - ("question_id" = Uuid, Path, description = "Question ID") - ), + params(("question_id" = Uuid, Path, description = "Question id")), request_body = AnswerQuestionRequest, responses( - (status = 200, description = "Question answered", body = AnswerQuestionResponse), - (status = 401, description = "Unauthorized"), - (status = 404, description = "Question not found"), - (status = 500, description = "Internal server error"), - ), - security( - ("bearer_auth" = []), - ("api_key" = []) + (status = 200, description = "Answered", body = AnswerQuestionResponse), + (status = 404, description = "Not found"), ), + security(("bearer_auth" = []), ("api_key" = [])), tag = "Mesh" )] pub async fn answer_question( State(state): State<SharedState>, Authenticated(auth): Authenticated, Path(question_id): Path<Uuid>, - Json(request): Json<AnswerQuestionRequest>, + Json(req): Json<AnswerQuestionRequest>, ) -> impl IntoResponse { - // Verify the question exists and belongs to this owner + // Ownership check: only the owner of the question can answer it. let question = match state.get_pending_question(question_id) { - Some(q) if q.owner_id == auth.owner_id => q, - Some(_) => { - return ( - StatusCode::FORBIDDEN, - Json(ApiError::new("FORBIDDEN", "Question belongs to another user")), - ).into_response(); - } - None => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Question not found or already answered")), - ).into_response(); - } - }; - - // Submit the response - let success = state.submit_question_response(question_id, request.response.clone()); - - if success { - tracing::info!( - question_id = %question_id, - task_id = %question.task_id, - "User answered supervisor question" - ); - - // Send the response to the task as a message - // This will auto-resume the task if it was paused (phaseguard) - let pool = state.db_pool.as_ref().unwrap(); - if let Ok(Some(task)) = repository::get_task_for_owner(pool, question.task_id, auth.owner_id).await { - if let Some(daemon_id) = task.daemon_id { - // Format the response message - let response_msg = format!( - "\n[User Response to Question]\nQuestion: {}\nAnswer: {}\n", - question.question, - request.response - ); - let cmd = DaemonCommand::SendMessage { - task_id: question.task_id, - message: response_msg, - }; - if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { - tracing::warn!( - task_id = %question.task_id, - error = %e, - "Failed to send response message to task" - ); - } else { - tracing::info!( - task_id = %question.task_id, - "Sent response message to task (will auto-resume if paused)" - ); - } - } - } - } - - Json(AnswerQuestionResponse { success }).into_response() -} - -// ============================================================================= -// Supervisor Resume and Conversation Rewind -// ============================================================================= - -/// Response for supervisor resume -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ResumeSupervisorResponse { - pub supervisor_task_id: Uuid, - pub daemon_id: Option<Uuid>, - pub resumed_from: ResumedFromInfo, - pub status: String, - /// Restoration context (Task 3.4) - pub restoration: Option<RestorationInfo>, -} - -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ResumedFromInfo { - pub phase: String, - pub last_activity: chrono::DateTime<chrono::Utc>, - pub message_count: i32, -} - -/// Information about supervisor restoration (Task 3.4) -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct RestorationInfo { - /// Previous state before restoration - pub previous_state: String, - /// How many times this supervisor has been restored - pub restoration_count: i32, - /// Number of pending questions to re-deliver - pub pending_questions_count: usize, - /// Number of tasks being waited on - pub waiting_tasks_count: usize, - /// Number of tasks spawned before crash - pub spawned_tasks_count: usize, - /// Any warnings from state validation - pub warnings: Vec<String>, -} - -/// Resume interrupted supervisor with specified mode. -/// -/// POST /api/v1/contracts/{id}/supervisor/resume -#[utoipa::path( - post, - path = "/api/v1/contracts/{id}/supervisor/resume", - params( - ("id" = Uuid, Path, description = "Contract ID") - ), - request_body = crate::db::models::ResumeSupervisorRequest, - responses( - (status = 200, description = "Supervisor resumed", body = ResumeSupervisorResponse), - (status = 400, description = "Invalid request", body = ApiError), - (status = 401, description = "Unauthorized", body = ApiError), - (status = 404, description = "Contract or supervisor not found", body = ApiError), - (status = 409, description = "Supervisor is already running", body = ApiError), - (status = 503, description = "Database not configured", body = ApiError), - (status = 500, description = "Internal server error", body = ApiError), - ), - security( - ("bearer_auth" = []), - ("api_key" = []) - ), - tag = "Mesh Supervisor" -)] -pub async fn resume_supervisor( - State(state): State<SharedState>, - Path(contract_id): Path<Uuid>, - auth: crate::server::auth::Authenticated, - Json(req): Json<crate::db::models::ResumeSupervisorRequest>, -) -> impl IntoResponse { - let crate::server::auth::Authenticated(auth_info) = auth; - - let Some(ref pool) = state.db_pool else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), - ) - .into_response(); - }; - - // Get contract and verify ownership - let contract = match repository::get_contract_for_owner(pool, contract_id, auth_info.owner_id).await { - Ok(Some(c)) => c, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Contract not found")), - ) - .into_response(); - } - Err(e) => { - tracing::error!("Failed to get contract {}: {}", contract_id, e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", e.to_string())), - ) - .into_response(); - } - }; - - // Get existing supervisor state - let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await { - Ok(Some(s)) => s, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new( - "NO_SUPERVISOR_STATE", - "No supervisor state found - supervisor may not have been started", - )), - ) - .into_response(); - } - Err(e) => { - tracing::error!("Failed to get supervisor state: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", e.to_string())), - ) - .into_response(); - } - }; - - // Get supervisor task - let supervisor_task = match repository::get_task_for_owner(pool, supervisor_state.task_id, auth_info.owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Supervisor task not found")), - ) - .into_response(); - } - Err(e) => { - tracing::error!("Failed to get supervisor task: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", e.to_string())), - ) - .into_response(); - } - }; - - // Check if already running - but only if daemon is actually connected - // (daemon disconnect handler may not have updated status yet) - if supervisor_task.status == "running" { - let daemon_connected = supervisor_task - .daemon_id - .map(|d| state.is_daemon_connected(d)) - .unwrap_or(false); - - if daemon_connected { - return ( - StatusCode::CONFLICT, - Json(ApiError::new("ALREADY_RUNNING", "Supervisor is already running")), - ) - .into_response(); - } - // Daemon not connected - allow resume (treat as interrupted) - tracing::info!( - supervisor_task_id = %supervisor_task.id, - daemon_id = ?supervisor_task.daemon_id, - "Supervisor status is 'running' but daemon is not connected, allowing resume" - ); - } - - // Calculate message count from conversation history - let message_count = supervisor_state - .conversation_history - .as_array() - .map(|arr| arr.len() as i32) - .unwrap_or(0); - - // Find a connected daemon for this owner - let target_daemon_id = match state.find_alternative_daemon(auth_info.owner_id, &[]) { - Some(id) => id, + Some(q) => q, None => { return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(ApiError::new( - "NO_DAEMON", - "No daemons connected for your account. Cannot resume supervisor.", - )), - ) - .into_response(); - } - }; - - // Track response values (may be updated by resume modes) - let mut response_daemon_id = supervisor_task.daemon_id; - let mut response_status = "pending".to_string(); - - // Based on resume mode, handle differently - match req.resume_mode.as_str() { - "continue" => { - // Update task status to starting and assign daemon - if let Err(e) = sqlx::query("UPDATE tasks SET status = 'starting', daemon_id = $1 WHERE id = $2") - .bind(target_daemon_id) - .bind(supervisor_state.task_id) - .execute(pool) - .await - { - tracing::error!("Failed to update task for resume: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", e.to_string())), - ) - .into_response(); - } - - // Fetch latest checkpoint patch for worktree recovery - let (patch_data, patch_base_sha) = match repository::get_latest_checkpoint_patch(pool, supervisor_state.task_id).await { - Ok(Some(patch)) => { - tracing::info!( - task_id = %supervisor_state.task_id, - patch_size = patch.patch_size_bytes, - base_sha = %patch.base_commit_sha, - "Including checkpoint patch for worktree recovery" - ); - // Encode patch as base64 for JSON transport - let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data); - (Some(encoded), Some(patch.base_commit_sha)) - } - Ok(None) => { - tracing::debug!(task_id = %supervisor_state.task_id, "No checkpoint patch found"); - (None, None) - } - Err(e) => { - tracing::warn!(task_id = %supervisor_state.task_id, error = %e, "Failed to fetch checkpoint patch"); - (None, None) - } - }; - - // Send SpawnTask with resume_session=true to use Claude's --continue - // Include conversation_history as fallback if worktree doesn't exist on target daemon - let command = DaemonCommand::SpawnTask { - task_id: supervisor_state.task_id, - task_name: supervisor_task.name.clone(), - plan: supervisor_task.plan.clone(), - repo_url: supervisor_task.repository_url.clone(), - base_branch: supervisor_task.base_branch.clone(), - target_branch: supervisor_task.target_branch.clone(), - parent_task_id: supervisor_task.parent_task_id, - depth: supervisor_task.depth, - is_orchestrator: false, - target_repo_path: supervisor_task.target_repo_path.clone(), - completion_action: supervisor_task.completion_action.clone(), - continue_from_task_id: supervisor_task.continue_from_task_id, - copy_files: supervisor_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), - contract_id: supervisor_task.contract_id, - is_supervisor: true, - autonomous_loop: false, - resume_session: true, // Use --continue to preserve conversation - conversation_history: Some(supervisor_state.conversation_history.clone()), // Fallback if worktree missing - patch_data, - patch_base_sha, - local_only: contract.local_only, - auto_merge_local: contract.auto_merge_local, - supervisor_worktree_task_id: None, // Supervisor uses its own worktree - directive_id: supervisor_task.directive_id, - }; - - if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { - // Rollback status on failure - let _ = sqlx::query("UPDATE tasks SET status = 'interrupted', daemon_id = NULL WHERE id = $1") - .bind(supervisor_state.task_id) - .execute(pool) - .await; - tracing::error!("Failed to send SpawnTask to daemon: {}", e); - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(ApiError::new("DAEMON_ERROR", format!("Failed to send to daemon: {}", e))), - ) - .into_response(); - } - - tracing::info!( - contract_id = %contract_id, - supervisor_task_id = %supervisor_state.task_id, - daemon_id = %target_daemon_id, - message_count = message_count, - "Supervisor resumed with --continue (resume_session=true)" - ); - - // Update response values for successful resume - response_daemon_id = Some(target_daemon_id); - response_status = "starting".to_string(); - } - "restart_phase" => { - // Clear conversation but keep phase progress - if let Err(e) = repository::update_supervisor_conversation( - pool, - contract_id, - serde_json::json!([]), - ) - .await - { - tracing::error!("Failed to clear conversation: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", e.to_string())), - ) - .into_response(); - } - - if let Err(e) = sqlx::query("UPDATE tasks SET status = 'pending' WHERE id = $1") - .bind(supervisor_state.task_id) - .execute(pool) - .await - { - tracing::error!("Failed to update task status: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", e.to_string())), - ) - .into_response(); - } - } - "from_checkpoint" => { - // This would require more complex handling with checkpoint system - return ( - StatusCode::BAD_REQUEST, - Json(ApiError::new( - "NOT_IMPLEMENTED", - "from_checkpoint mode not yet implemented", - )), - ) - .into_response(); - } - _ => { - return ( - StatusCode::BAD_REQUEST, - Json(ApiError::new( - "INVALID_RESUME_MODE", - "Invalid resume_mode. Use: continue, restart_phase, or from_checkpoint", - )), - ) - .into_response(); - } - } - - tracing::info!( - contract_id = %contract_id, - supervisor_task_id = %supervisor_state.task_id, - resume_mode = %req.resume_mode, - message_count = message_count, - "Supervisor resume requested" - ); - - // Build restoration info (Task 3.4) - let pending_questions: Vec<PendingQuestion> = serde_json::from_value( - supervisor_state.pending_questions.clone() - ).unwrap_or_default(); - - let restoration_info = RestorationInfo { - previous_state: supervisor_state.state.clone(), - restoration_count: supervisor_state.restoration_count, - pending_questions_count: pending_questions.len(), - waiting_tasks_count: supervisor_state.pending_task_ids.len(), - spawned_tasks_count: supervisor_state.spawned_task_ids.len(), - warnings: vec![], // Could add validation warnings here - }; - - // Re-deliver pending questions if any (Task 3.4) - if !pending_questions.is_empty() { - redeliver_pending_questions( - &state, - supervisor_state.task_id, - contract_id, - auth_info.owner_id, - &pending_questions, - ).await; - } - - Json(ResumeSupervisorResponse { - supervisor_task_id: supervisor_state.task_id, - daemon_id: response_daemon_id, - resumed_from: ResumedFromInfo { - phase: contract.phase, - last_activity: supervisor_state.last_activity, - message_count, - }, - status: response_status, - restoration: Some(restoration_info), - }) - .into_response() -} - -/// Response for conversation rewind -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct RewindConversationResponse { - pub contract_id: Uuid, - pub messages_removed: i32, - pub new_message_count: i32, - pub code_rewound: bool, -} - -/// Rewind supervisor conversation to specified point. -/// -/// POST /api/v1/contracts/{id}/supervisor/conversation/rewind -#[utoipa::path( - post, - path = "/api/v1/contracts/{id}/supervisor/conversation/rewind", - params( - ("id" = Uuid, Path, description = "Contract ID") - ), - request_body = crate::db::models::RewindConversationRequest, - responses( - (status = 200, description = "Conversation rewound", body = RewindConversationResponse), - (status = 400, description = "Invalid request", body = ApiError), - (status = 401, description = "Unauthorized", body = ApiError), - (status = 404, description = "Contract or supervisor not found", body = ApiError), - (status = 503, description = "Database not configured", body = ApiError), - (status = 500, description = "Internal server error", body = ApiError), - ), - security( - ("bearer_auth" = []), - ("api_key" = []) - ), - tag = "Mesh Supervisor" -)] -pub async fn rewind_conversation( - State(state): State<SharedState>, - Path(contract_id): Path<Uuid>, - auth: crate::server::auth::Authenticated, - Json(req): Json<crate::db::models::RewindConversationRequest>, -) -> impl IntoResponse { - let crate::server::auth::Authenticated(auth_info) = auth; - - let Some(ref pool) = state.db_pool else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), - ) - .into_response(); - }; - - // Get contract and verify ownership - let _contract = match repository::get_contract_for_owner(pool, contract_id, auth_info.owner_id).await { - Ok(Some(c)) => c, - Ok(None) => { - return ( StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Contract not found")), - ) - .into_response(); - } - Err(e) => { - tracing::error!("Failed to get contract {}: {}", contract_id, e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", e.to_string())), + Json(ApiError::new("NOT_FOUND", "Question not found")), ) .into_response(); } }; - - // Get supervisor state - let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await { - Ok(Some(s)) => s, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Supervisor state not found")), - ) - .into_response(); - } - Err(e) => { - tracing::error!("Failed to get supervisor state: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", e.to_string())), - ) - .into_response(); - } - }; - - let conversation = supervisor_state - .conversation_history - .as_array() - .cloned() - .unwrap_or_default(); - - let original_count = conversation.len() as i32; - - // Determine how many messages to keep - let new_count = if let Some(by_count) = req.by_message_count { - (original_count - by_count).max(0) - } else if let Some(ref to_id) = req.to_message_id { - // Find message by ID and keep up to and including it - let index = conversation - .iter() - .position(|msg| msg.get("id").and_then(|v| v.as_str()) == Some(to_id.as_str())) - .map(|i| i as i32) - .unwrap_or(original_count - 1); - (index + 1).min(original_count).max(0) - } else { - // Default to removing last message - (original_count - 1).max(0) - }; - - // Truncate conversation - let new_conversation: Vec<serde_json::Value> = conversation - .into_iter() - .take(new_count as usize) - .collect(); - - // Update the conversation - if let Err(e) = repository::update_supervisor_conversation( - pool, - contract_id, - serde_json::Value::Array(new_conversation), - ) - .await - { - tracing::error!("Failed to update conversation: {}", e); + if question.owner_id != auth.owner_id { return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", e.to_string())), + StatusCode::FORBIDDEN, + Json(ApiError::new("FORBIDDEN", "Not your question")), ) .into_response(); } - tracing::info!( - contract_id = %contract_id, - original_count = original_count, - new_count = new_count, - messages_removed = original_count - new_count, - "Conversation rewound" - ); - - Json(RewindConversationResponse { - contract_id, - messages_removed: original_count - new_count, - new_message_count: new_count, - code_rewound: req.rewind_code.unwrap_or(false), // TODO: implement code rewind - }) - .into_response() -} - -// ============================================================================= -// Supervisor State Persistence Helpers (Task 3.3) -// ============================================================================= - -use crate::db::models::{ - SupervisorRestorationContext, SupervisorStateEnum, - StateValidationResult, StateRecoveryAction, -}; - -/// Save supervisor state on task spawn. -/// This is called when a supervisor spawns a new task. -pub async fn save_state_on_task_spawn( - pool: &PgPool, - contract_id: Uuid, - spawned_task_id: Uuid, -) { - if let Err(e) = repository::add_supervisor_spawned_task(pool, contract_id, spawned_task_id).await { - tracing::warn!( - contract_id = %contract_id, - spawned_task_id = %spawned_task_id, - error = %e, - "Failed to save spawned task to supervisor state" - ); + if state.submit_question_response(question_id, req.response) { + Json(AnswerQuestionResponse { success: true }).into_response() } else { - tracing::debug!( - contract_id = %contract_id, - spawned_task_id = %spawned_task_id, - "Saved spawned task to supervisor state" - ); - } - - // Also update state to working - if let Err(e) = repository::update_supervisor_detailed_state( - pool, - contract_id, - "working", - Some(&format!("Spawned task {}", spawned_task_id)), - 0, // Progress resets when spawning new work - None, - ).await { - tracing::warn!(contract_id = %contract_id, error = %e, "Failed to update supervisor state on task spawn"); - } -} - -/// Save supervisor state on question asked. -/// This is called when a supervisor asks a question and is waiting for user input. -pub async fn save_state_on_question_asked( - pool: &PgPool, - contract_id: Uuid, - question: PendingQuestion, -) { - let question_json = match serde_json::to_value(&[&question]) { - Ok(v) => v, - Err(e) => { - tracing::warn!(contract_id = %contract_id, error = %e, "Failed to serialize pending question"); - return; - } - }; - - if let Err(e) = repository::add_supervisor_pending_question(pool, contract_id, question_json).await { - tracing::warn!( - contract_id = %contract_id, - question_id = %question.id, - error = %e, - "Failed to save pending question to supervisor state" - ); - } else { - tracing::debug!( - contract_id = %contract_id, - question_id = %question.id, - "Saved pending question to supervisor state" - ); - } -} - -/// Clear pending question after it's answered. -pub async fn clear_pending_question( - pool: &PgPool, - contract_id: Uuid, - question_id: Uuid, -) { - if let Err(e) = repository::remove_supervisor_pending_question(pool, contract_id, question_id).await { - tracing::warn!( - contract_id = %contract_id, - question_id = %question_id, - error = %e, - "Failed to remove pending question from supervisor state" - ); - } - - // Update state back to working (if no more pending questions) - match repository::get_supervisor_state(pool, contract_id).await { - Ok(Some(state)) => { - let questions: Vec<PendingQuestion> = serde_json::from_value(state.pending_questions.clone()) - .unwrap_or_default(); - if questions.is_empty() { - let _ = repository::update_supervisor_detailed_state( - pool, - contract_id, - "working", - Some("Resumed after user response"), - state.progress, - None, - ).await; - } - } - Ok(None) => {} - Err(e) => { - tracing::warn!(contract_id = %contract_id, error = %e, "Failed to check supervisor state after clearing question"); - } - } -} - -/// Save supervisor state on phase change. -pub async fn save_state_on_phase_change( - pool: &PgPool, - contract_id: Uuid, - new_phase: &str, -) { - if let Err(e) = repository::update_supervisor_phase(pool, contract_id, new_phase).await { - tracing::warn!( - contract_id = %contract_id, - new_phase = %new_phase, - error = %e, - "Failed to update supervisor state on phase change" - ); - } else { - tracing::info!( - contract_id = %contract_id, - new_phase = %new_phase, - "Updated supervisor state on phase change" - ); - } -} - -// ============================================================================= -// Supervisor Restoration Protocol (Task 3.4) -// ============================================================================= - -/// Validate supervisor state consistency before restoration. -/// Checks that spawned tasks and pending questions are in expected states. -pub async fn validate_supervisor_state( - pool: &PgPool, - state: &crate::db::models::SupervisorState, -) -> StateValidationResult { - let mut issues = Vec::new(); - - // Validate spawned tasks - if !state.spawned_task_ids.is_empty() { - match repository::validate_spawned_tasks(pool, &state.spawned_task_ids).await { - Ok(task_statuses) => { - for task_id in &state.spawned_task_ids { - if !task_statuses.contains_key(task_id) { - issues.push(format!("Spawned task {} not found in database", task_id)); - } - } - } - Err(e) => { - issues.push(format!("Failed to validate spawned tasks: {}", e)); - } - } - } - - // Validate pending questions - let pending_questions: Vec<PendingQuestion> = serde_json::from_value(state.pending_questions.clone()) - .unwrap_or_default(); - - // Check if questions are not too old (e.g., more than 24 hours) - for question in &pending_questions { - let age = chrono::Utc::now() - question.asked_at; - if age.num_hours() > 24 { - issues.push(format!( - "Pending question {} is {} hours old, may be stale", - question.id, age.num_hours() - )); - } - } - - // Validate conversation history - if let Some(history) = state.conversation_history.as_array() { - if history.is_empty() && state.restoration_count > 0 { - issues.push("Conversation history is empty after previous restoration".to_string()); - } - } - - // Determine recovery action - let recovery_action = if issues.is_empty() { - StateRecoveryAction::Proceed - } else if issues.iter().any(|i| i.contains("not found")) { - // Missing tasks suggest corruption - use checkpoint - StateRecoveryAction::UseCheckpoint - } else if issues.len() > 3 { - // Many issues suggest manual intervention needed - StateRecoveryAction::ManualIntervention - } else { - // Minor issues - proceed with warnings - StateRecoveryAction::Proceed - }; - - StateValidationResult { - is_valid: issues.is_empty(), - issues, - recovery_action, - } -} - -/// Restore supervisor from saved state after daemon crash or task reassignment. -/// Returns restoration context to send to the supervisor. -pub async fn restore_supervisor( - pool: &PgPool, - contract_id: Uuid, - restoration_source: &str, -) -> Result<SupervisorRestorationContext, String> { - // Step 1: Load supervisor state - let state = match repository::get_supervisor_state_for_restoration(pool, contract_id).await { - Ok(Some(s)) => s, - Ok(None) => { - tracing::warn!( - contract_id = %contract_id, - "No supervisor state found for restoration - starting fresh" - ); - return Ok(SupervisorRestorationContext { - success: true, - previous_state: SupervisorStateEnum::Initializing, - conversation_history: serde_json::json!([]), - pending_questions: vec![], - waiting_task_ids: vec![], - spawned_task_ids: vec![], - restoration_count: 0, - restoration_context_message: "No previous state found. Starting fresh.".to_string(), - warnings: vec!["No previous supervisor state found".to_string()], - }); - } - Err(e) => { - return Err(format!("Failed to load supervisor state: {}", e)); - } - }; - - // Step 2: Parse previous state - let previous_state: SupervisorStateEnum = state.state.parse().unwrap_or(SupervisorStateEnum::Interrupted); - - // Step 3: Validate state consistency - let validation = validate_supervisor_state(pool, &state).await; - let mut warnings = validation.issues.clone(); - - // Step 4: Handle based on validation result - let (conversation_history, pending_questions, restoration_message) = match validation.recovery_action { - StateRecoveryAction::Proceed => { - // State is valid, use it - let questions: Vec<PendingQuestion> = serde_json::from_value(state.pending_questions.clone()) - .unwrap_or_default(); - - let message = format!( - "Restored from {} state. {} pending questions, {} spawned tasks, {} waiting tasks.", - state.state, - questions.len(), - state.spawned_task_ids.len(), - state.pending_task_ids.len() - ); - - (state.conversation_history.clone(), questions, message) - } - StateRecoveryAction::UseCheckpoint => { - // State is corrupted, try to use checkpoint - warnings.push("State validation failed, attempting checkpoint recovery".to_string()); - - // TODO: Implement checkpoint-based recovery - // For now, start with empty questions but preserve conversation - let message = "Restored from last checkpoint due to state inconsistency.".to_string(); - (state.conversation_history.clone(), vec![], message) - } - StateRecoveryAction::StartFresh => { - warnings.push("Starting fresh due to unrecoverable state".to_string()); - let message = "Starting fresh due to unrecoverable state corruption.".to_string(); - (serde_json::json!([]), vec![], message) - } - StateRecoveryAction::ManualIntervention => { - warnings.push("Manual intervention may be required".to_string()); - // Still try to restore but with warning - let questions: Vec<PendingQuestion> = serde_json::from_value(state.pending_questions.clone()) - .unwrap_or_default(); - let message = "Restored with warnings - manual intervention may be required.".to_string(); - (state.conversation_history.clone(), questions, message) - } - }; - - // Step 5: Mark supervisor as restored - let new_state = match repository::mark_supervisor_restored(pool, contract_id, restoration_source).await { - Ok(s) => s, - Err(e) => { - return Err(format!("Failed to mark supervisor as restored: {}", e)); - } - }; - - // Step 6: Build restoration context - let context = SupervisorRestorationContext { - success: true, - previous_state, - conversation_history, - pending_questions, - waiting_task_ids: state.pending_task_ids.clone(), - spawned_task_ids: state.spawned_task_ids.clone(), - restoration_count: new_state.restoration_count, - restoration_context_message: restoration_message, - warnings, - }; - - tracing::info!( - contract_id = %contract_id, - restoration_source = %restoration_source, - restoration_count = new_state.restoration_count, - pending_questions_count = context.pending_questions.len(), - waiting_tasks_count = context.waiting_task_ids.len(), - spawned_tasks_count = context.spawned_task_ids.len(), - "Supervisor restoration completed" - ); - - Ok(context) -} - -/// Re-deliver pending questions to the user after restoration. -/// This ensures questions asked before crash are shown to the user again. -pub async fn redeliver_pending_questions( - state: &SharedState, - supervisor_id: Uuid, - contract_id: Uuid, - owner_id: Uuid, - questions: &[PendingQuestion], -) { - for question in questions { - // Add to in-memory question state - state.add_supervisor_question( - supervisor_id, - contract_id, - owner_id, - question.question.clone(), - question.choices.clone(), - question.context.clone(), - false, // Assume single select for restored questions - question.question_type.clone(), - ); - - // Broadcast to WebSocket clients - let question_data = serde_json::json!({ - "question_id": question.id.to_string(), - "choices": question.choices, - "context": question.context, - "question_type": question.question_type, - "is_restored": true, - "originally_asked_at": question.asked_at.to_rfc3339(), - }); - - state.broadcast_task_output(TaskOutputNotification { - task_id: supervisor_id, - owner_id: Some(owner_id), - message_type: "supervisor_question".to_string(), - content: question.question.clone(), - tool_name: None, - tool_input: Some(question_data), - is_error: None, - cost_usd: None, - duration_ms: None, - is_partial: false, - }); - - tracing::info!( - supervisor_id = %supervisor_id, - question_id = %question.id, - "Re-delivered pending question after restoration" - ); - } -} - -/// Generate restoration context message for Claude. -/// This message is injected into the conversation to inform Claude about the restoration. -pub fn generate_restoration_context_message(context: &SupervisorRestorationContext) -> String { - let mut message = String::new(); - - message.push_str("=== SUPERVISOR RESTORATION NOTICE ===\n\n"); - message.push_str(&format!("This supervisor has been restored after interruption. {}\n\n", context.restoration_context_message)); - message.push_str(&format!("Restoration count: {}\n", context.restoration_count)); - - if !context.pending_questions.is_empty() { - message.push_str(&format!("\nPending questions ({}): These have been re-delivered to the user.\n", context.pending_questions.len())); - for q in &context.pending_questions { - message.push_str(&format!(" - {}: {}\n", q.id, q.question)); - } - } - - if !context.waiting_task_ids.is_empty() { - message.push_str(&format!("\nWaiting on {} task(s) to complete. Check their status before continuing.\n", context.waiting_task_ids.len())); - } - - if !context.spawned_task_ids.is_empty() { - message.push_str(&format!("\n{} task(s) were spawned before interruption. Their status may need verification.\n", context.spawned_task_ids.len())); - } - - if !context.warnings.is_empty() { - message.push_str("\nWarnings:\n"); - for warning in &context.warnings { - message.push_str(&format!(" - {}\n", warning)); - } + ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Question not found")), + ) + .into_response() } - - message.push_str("\n=== END RESTORATION NOTICE ===\n"); - - message } // ============================================================================= -// Order Creation from Directive Tasks +// Order creation (from directive tasks) // ============================================================================= -/// Request to create an order from a directive task. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct CreateOrderForTaskRequest { @@ -3117,30 +496,25 @@ pub struct CreateOrderForTaskRequest { fn default_order_priority() -> String { "medium".to_string() } - fn default_order_type() -> String { "spike".to_string() } - fn default_order_labels() -> serde_json::Value { serde_json::json!([]) } -/// Create an order for future work from a directive task. -/// -/// Only spike and chore order types are allowed. The order is automatically -/// linked to the directive associated with the calling task. +/// Create a follow-up order from a directive task (spike/chore only). #[utoipa::path( post, path = "/api/v1/mesh/supervisor/orders", request_body = CreateOrderForTaskRequest, responses( (status = 201, description = "Order created"), - (status = 400, description = "Invalid order type"), + (status = 400, description = "Invalid order type or no directive context"), (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - not a supervisor/directive task"), - (status = 500, description = "Internal server error"), + (status = 403, description = "Not a directive task"), ), + security(("tool_key" = [])), tag = "Mesh Supervisor" )] pub async fn create_order_for_task( @@ -3148,14 +522,11 @@ pub async fn create_order_for_task( headers: HeaderMap, Json(request): Json<CreateOrderForTaskRequest>, ) -> impl IntoResponse { - let (task_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { + let (task_id, owner_id) = match verify_task_auth(&state, &headers).await { Ok(ids) => ids, Err(e) => return e.into_response(), }; - let pool = state.db_pool.as_ref().unwrap(); - - // Validate order_type is spike or chore if request.order_type != "spike" && request.order_type != "chore" { return ( StatusCode::BAD_REQUEST, @@ -3167,7 +538,8 @@ pub async fn create_order_for_task( .into_response(); } - // Get the task to find its directive_id + let pool = state.db_pool.as_ref().unwrap(); + let task = match repository::get_task(pool, task_id).await { Ok(Some(t)) => t, Ok(None) => { @@ -3178,10 +550,10 @@ pub async fn create_order_for_task( .into_response(); } Err(e) => { - tracing::error!(error = %e, "Failed to get task"); + tracing::error!(error = %e, "Failed to fetch task"); return ( StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to get task")), + Json(ApiError::new("DB_ERROR", "Failed to fetch task")), ) .into_response(); } @@ -3192,27 +564,21 @@ pub async fn create_order_for_task( None => { return ( StatusCode::BAD_REQUEST, - Json(ApiError::new( - "NO_DIRECTIVE", - "Task is not associated with a directive", - )), + Json(ApiError::new("NO_DIRECTIVE", "Task is not directive-attached")), ) .into_response(); } }; - // Determine repository_url: use request value, or fall back to directive's repository_url let repository_url = if request.repository_url.is_some() { request.repository_url } else { - // Look up directive for its repository_url match repository::get_directive_for_owner(pool, owner_id, directive_id).await { Ok(Some(d)) => d.repository_url, _ => None, } }; - // Create the order let order_req = CreateOrderRequest { title: request.title, description: request.description, |
