//! HTTP handlers for supervisor-specific mesh operations. //! //! These endpoints are used by supervisor tasks (via supervisor.sh) to orchestrate //! contract work: spawning tasks, waiting for completion, reading worktree files, etc. use axum::{ extract::{Path, State}, http::{HeaderMap, StatusCode}, response::IntoResponse, Json, }; use base64::Engine; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use uuid::Uuid; use crate::db::models::{CreateOrderRequest, CreateTaskRequest, PendingQuestion, Task, TaskSummary, UpdateTaskRequest}; use crate::db::repository; use sqlx::PgPool; use crate::server::auth::Authenticated; use crate::server::handlers::mesh::{extract_auth, AuthSource}; use crate::server::messages::ApiError; use crate::server::state::{DaemonCommand, SharedState, TaskOutputNotification, TaskUpdateNotification}; // ============================================================================= // Request/Response Types // ============================================================================= /// Request to spawn a new task from supervisor. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct SpawnTaskRequest { pub name: String, pub plan: String, pub contract_id: Uuid, pub parent_task_id: Option, pub checkpoint_sha: Option, /// Repository URL for the task (optional - if not provided, will be looked up from contract). pub repository_url: Option, } /// Request to wait for task completion. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct WaitForTaskRequest { #[serde(default = "default_timeout")] pub timeout_seconds: i32, } fn default_timeout() -> i32 { 300 } /// Request to read a file from task worktree. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct ReadWorktreeFileRequest { pub file_path: String, } /// Request to ask a question and wait for user feedback. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct AskQuestionRequest { /// The question to ask the user pub question: String, /// Optional choices (if empty, free-form text response) #[serde(default)] pub choices: Vec, /// Optional context about what this relates to pub context: Option, /// How long to wait for a response (seconds) #[serde(default = "default_question_timeout")] pub timeout_seconds: i32, /// When true, the request will block indefinitely until user responds (no timeout) #[serde(default)] pub phaseguard: bool, /// When true, allow selecting multiple choices (response will be comma-separated) #[serde(default)] pub multi_select: bool, /// When true, return immediately without waiting for response #[serde(default)] pub non_blocking: bool, /// Question type: general, phase_confirmation, or contract_complete #[serde(default = "default_question_type")] pub question_type: String, } fn default_question_type() -> String { "general".to_string() } fn default_question_timeout() -> i32 { 3600 // 1 hour default } /// Response from asking a question. #[derive(Debug, Serialize, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct AskQuestionResponse { /// The question ID for tracking pub question_id: Uuid, /// The user's response (None if timed out) pub response: Option, /// Whether the question timed out pub timed_out: bool, /// Whether the question is still pending (server-side timeout reached but question not removed). /// The client should poll the poll endpoint to continue waiting. #[serde(default)] pub still_pending: bool, } /// Request to answer a supervisor question. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct AnswerQuestionRequest { /// The user's response pub response: String, } /// Response to answering a question. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct AnswerQuestionResponse { /// Whether the answer was accepted pub success: bool, } /// Pending question summary. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct PendingQuestionSummary { pub question_id: Uuid, pub task_id: Uuid, pub contract_id: Uuid, /// Directive this question relates to (if from a directive task) #[serde(skip_serializing_if = "Option::is_none")] pub directive_id: Option, pub question: String, pub choices: Vec, pub context: Option, pub created_at: chrono::DateTime, /// Whether multiple choices can be selected #[serde(default)] pub multi_select: bool, /// Question type: general, phase_confirmation, or contract_complete #[serde(default)] pub question_type: String, } /// Request to create a checkpoint. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct CreateCheckpointRequest { pub message: String, } /// Response for task tree. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct TaskTreeResponse { pub tasks: Vec, pub supervisor_task_id: Option, pub total_count: usize, } /// Response for wait operation. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct WaitResponse { pub task_id: Uuid, pub status: String, pub completed: bool, pub output_summary: Option, } /// Response for read file operation. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct ReadFileResponse { pub task_id: Uuid, pub file_path: String, pub content: String, pub exists: bool, } /// Response for checkpoint operations. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct CheckpointResponse { pub task_id: Uuid, pub checkpoint_number: i32, pub commit_sha: String, pub message: String, } /// Task checkpoint info. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct TaskCheckpoint { pub id: Uuid, pub task_id: Uuid, pub checkpoint_number: i32, pub commit_sha: String, pub branch_name: String, pub message: String, pub files_changed: Option, pub lines_added: i32, pub lines_removed: i32, pub created_at: chrono::DateTime, } /// Response for list checkpoints. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct CheckpointListResponse { pub task_id: Uuid, pub checkpoints: Vec, } // ============================================================================= // Helper Functions // ============================================================================= /// Verify the request comes from a supervisor task and extract ownership info. async fn verify_supervisor_auth( state: &SharedState, headers: &HeaderMap, contract_id: Option, ) -> Result<(Uuid, Uuid), (StatusCode, Json)> { let auth = extract_auth(state, headers); let task_id = match auth { AuthSource::ToolKey(task_id) => task_id, _ => { return Err(( StatusCode::UNAUTHORIZED, Json(ApiError::new("UNAUTHORIZED", "Supervisor endpoints require tool key auth")), )); } }; // Get the task to verify it's a supervisor and get owner_id let pool = state.db_pool.as_ref().ok_or_else(|| { ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) })?; let task = repository::get_task(pool, task_id) .await .map_err(|e| { tracing::error!(error = %e, "Failed to get supervisor task"); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to verify supervisor")), ) })? .ok_or_else(|| { ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) })?; // Verify task is a supervisor or a directive task if !task.is_supervisor && task.directive_id.is_none() { return Err(( StatusCode::FORBIDDEN, Json(ApiError::new("NOT_SUPERVISOR", "Only supervisor or directive tasks can use these endpoints")), )); } // If contract_id provided, verify the supervisor belongs to that contract if let Some(cid) = contract_id { if task.contract_id != Some(cid) { return Err(( StatusCode::FORBIDDEN, Json(ApiError::new("CONTRACT_MISMATCH", "Supervisor does not belong to this contract")), )); } } Ok((task_id, task.owner_id)) } /// Try to start a pending task on an available daemon. /// Returns Ok(Some(task)) if a task was started, Ok(None) if no tasks could be started. /// For retried tasks, excludes daemons that previously failed the task and includes /// checkpoint patch data for worktree recovery. pub async fn try_start_pending_task( state: &SharedState, contract_id: Uuid, owner_id: Uuid, ) -> Result, String> { let pool = state.db_pool.as_ref().ok_or("Database not configured")?; // Get pending tasks for this contract (includes interrupted tasks awaiting retry) let pending_tasks = repository::get_pending_tasks_for_contract(pool, contract_id, owner_id) .await .map_err(|e| format!("Failed to get pending tasks: {}", e))?; if pending_tasks.is_empty() { return Ok(None); } // Get contract to check local_only flag let contract = repository::get_contract_for_owner(pool, contract_id, owner_id) .await .map_err(|e| format!("Failed to get contract: {}", e))? .ok_or_else(|| "Contract not found".to_string())?; // Try each pending task until we find one we can start for task in &pending_tasks { // Get excluded daemon IDs for this task (daemons that have already failed it) let exclude_ids: Vec = task.failed_daemon_ids.clone().unwrap_or_default(); // Get available daemons excluding failed ones for this task let daemons = repository::get_available_daemons_excluding(pool, owner_id, &exclude_ids) .await .map_err(|e| format!("Failed to get available daemons: {}", e))?; // Find a daemon with capacity let available_daemon = daemons.iter().find(|d| { d.current_task_count < d.max_concurrent_tasks && state.daemon_connections.contains_key(&d.connection_id) }); let daemon = match available_daemon { Some(d) => d, None => continue, // Try next task }; // Get repo URL from task or contract let repo_url = if let Some(url) = &task.repository_url { Some(url.clone()) } else { match repository::list_contract_repositories(pool, contract_id).await { Ok(repos) => repos .iter() .find(|r| r.is_primary) .or(repos.first()) .and_then(|r| r.repository_url.clone().or_else(|| r.local_path.clone())), Err(_) => None, } }; // Update task with daemon assignment let update_req = UpdateTaskRequest { status: Some("starting".to_string()), daemon_id: Some(daemon.id), version: Some(task.version), ..Default::default() }; let updated_task = match repository::update_task_for_owner(pool, task.id, owner_id, update_req).await { Ok(Some(t)) => t, Ok(None) => continue, // Task was modified concurrently, try next Err(e) => { tracing::warn!(task_id = %task.id, error = %e, "Failed to update task for daemon assignment"); continue; // Try next task } }; // For retried tasks, fetch checkpoint patch for worktree recovery let (patch_data, patch_base_sha) = if task.retry_count > 0 { // This is a retry - try to restore from checkpoint match repository::get_latest_checkpoint_patch(pool, task.id).await { Ok(Some(patch)) => { tracing::info!( task_id = %task.id, retry_count = task.retry_count, patch_size = patch.patch_size_bytes, base_sha = %patch.base_commit_sha, "Including checkpoint patch for task retry recovery" ); let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data); (Some(encoded), Some(patch.base_commit_sha)) } Ok(None) => { tracing::debug!(task_id = %task.id, "No checkpoint patch found for retry"); (None, None) } Err(e) => { tracing::warn!(task_id = %task.id, error = %e, "Failed to fetch checkpoint patch for retry"); (None, None) } } } else { (None, None) }; // Send spawn command let cmd = DaemonCommand::SpawnTask { task_id: updated_task.id, task_name: updated_task.name.clone(), plan: updated_task.plan.clone(), repo_url, base_branch: updated_task.base_branch.clone(), target_branch: updated_task.target_branch.clone(), parent_task_id: updated_task.parent_task_id, depth: updated_task.depth, is_orchestrator: false, target_repo_path: updated_task.target_repo_path.clone(), completion_action: updated_task.completion_action.clone(), continue_from_task_id: updated_task.continue_from_task_id, copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: updated_task.contract_id, is_supervisor: updated_task.is_supervisor, autonomous_loop: updated_task.is_supervisor, resume_session: task.retry_count > 0, // Use --continue for retried tasks conversation_history: None, patch_data, patch_base_sha, local_only: contract.local_only, auto_merge_local: contract.auto_merge_local, // For retried tasks, use their own worktree (they already have state from previous attempt) supervisor_worktree_task_id: None, directive_id: updated_task.directive_id, }; if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { tracing::warn!(error = %e, daemon_id = %daemon.id, task_id = %task.id, "Failed to send spawn command"); // Rollback let rollback_req = UpdateTaskRequest { status: Some("pending".to_string()), clear_daemon_id: true, ..Default::default() }; let _ = repository::update_task_for_owner(pool, task.id, owner_id, rollback_req).await; continue; // Try next task } tracing::info!(task_id = %task.id, daemon_id = %daemon.id, "Started pending task from wait loop"); return Ok(Some(updated_task)); } // No tasks could be started Ok(None) } // ============================================================================= // Contract Task Handlers // ============================================================================= /// List all tasks in a contract's tree. #[utoipa::path( get, path = "/api/v1/mesh/supervisor/contracts/{contract_id}/tasks", params( ("contract_id" = Uuid, Path, description = "Contract ID") ), responses( (status = 200, description = "List of tasks in contract", body = TaskTreeResponse), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden - not a supervisor"), (status = 500, description = "Internal server error"), ), tag = "Mesh Supervisor" )] pub async fn list_contract_tasks( State(state): State, Path(contract_id): Path, headers: HeaderMap, ) -> impl IntoResponse { let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, Some(contract_id)).await { Ok(ids) => ids, Err(e) => return e.into_response(), }; let pool = state.db_pool.as_ref().unwrap(); // Get all tasks for this contract match repository::list_tasks_by_contract(pool, contract_id, owner_id).await { Ok(tasks) => { let supervisor_task_id = tasks.iter().find(|t| t.is_supervisor).map(|t| t.id); let summaries: Vec = tasks.into_iter().map(TaskSummary::from).collect(); let total_count = summaries.len(); ( StatusCode::OK, Json(TaskTreeResponse { tasks: summaries, supervisor_task_id, total_count, }), ).into_response() } Err(e) => { tracing::error!(error = %e, "Failed to list contract tasks"); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to list tasks")), ).into_response() } } } /// Get full task tree structure for a contract. #[utoipa::path( get, path = "/api/v1/mesh/supervisor/contracts/{contract_id}/tree", params( ("contract_id" = Uuid, Path, description = "Contract ID") ), responses( (status = 200, description = "Task tree structure", body = TaskTreeResponse), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden - not a supervisor"), (status = 500, description = "Internal server error"), ), tag = "Mesh Supervisor" )] pub async fn get_contract_tree( State(state): State, Path(contract_id): Path, headers: HeaderMap, ) -> impl IntoResponse { // Same as list_contract_tasks for now - can add tree structure later list_contract_tasks(State(state), Path(contract_id), headers).await } // ============================================================================= // Task Spawn Handler // ============================================================================= /// Spawn a new task (supervisor only). #[utoipa::path( post, path = "/api/v1/mesh/supervisor/tasks", request_body = SpawnTaskRequest, responses( (status = 201, description = "Task created", body = Task), (status = 400, description = "Invalid request"), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden - not a supervisor"), (status = 500, description = "Internal server error"), ), tag = "Mesh Supervisor" )] pub async fn spawn_task( State(state): State, headers: HeaderMap, Json(request): Json, ) -> impl IntoResponse { let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, Some(request.contract_id)).await { Ok(ids) => ids, Err(e) => return e.into_response(), }; let pool = state.db_pool.as_ref().unwrap(); // Verify contract exists and get local_only flag let contract = match repository::get_contract_for_owner(pool, request.contract_id, owner_id).await { Ok(Some(c)) => c, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Contract not found")), ).into_response(); } Err(e) => { tracing::error!(error = %e, "Failed to get contract"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to get contract")), ).into_response(); } }; // Get repository URL - either from request or from contract's repositories let repo_url = if let Some(url) = request.repository_url.clone() { if !url.trim().is_empty() { Some(url) } else { None } } else { None }; // If no repo URL provided, look it up from the contract let repo_url = match repo_url { Some(url) => Some(url), None => { match repository::list_contract_repositories(pool, request.contract_id).await { Ok(repos) => { // Prefer primary repo, fallback to first repo let repo = repos.iter() .find(|r| r.is_primary) .or(repos.first()); // Use repository_url if set, otherwise use local_path repo.and_then(|r| { r.repository_url.clone() .or_else(|| r.local_path.clone()) }) } Err(e) => { tracing::warn!(error = %e, "Failed to get contract repositories"); None } } } }; // Validate that we have a repo URL if repo_url.is_none() { return ( StatusCode::BAD_REQUEST, Json(ApiError::new("MISSING_REPO_URL", "No repository URL found. Either provide one or ensure the contract has repositories configured.")), ).into_response(); } // Create task request // All tasks share the supervisor's worktree let supervisor_worktree_task_id = Some(supervisor_id); let create_req = CreateTaskRequest { name: request.name.clone(), description: None, plan: request.plan.clone(), repository_url: repo_url.clone(), contract_id: Some(request.contract_id), parent_task_id: request.parent_task_id, is_supervisor: false, checkpoint_sha: request.checkpoint_sha.clone(), merge_mode: Some("manual".to_string()), priority: 0, base_branch: None, target_branch: None, target_repo_path: None, completion_action: None, continue_from_task_id: None, copy_files: None, branched_from_task_id: None, conversation_history: None, supervisor_worktree_task_id, directive_id: None, directive_step_id: None, }; // Create task in DB let task = match repository::create_task_for_owner(pool, owner_id, create_req).await { Ok(t) => t, Err(e) => { tracing::error!(error = %e, "Failed to create task"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to create task")), ).into_response(); } }; tracing::info!( supervisor_id = %supervisor_id, task_id = %task.id, task_name = %task.name, "Supervisor spawned new task" ); // Record history event for task spawned by supervisor let _ = repository::record_history_event( pool, owner_id, task.contract_id, Some(task.id), "task", Some("spawned"), None, serde_json::json!({ "name": &task.name, "spawnedBy": supervisor_id.to_string(), }), ).await; // Broadcast task creation notification to WebSocket subscribers state.broadcast_task_update(TaskUpdateNotification { task_id: task.id, owner_id: Some(owner_id), version: task.version, status: task.status.clone(), updated_fields: vec!["created".to_string()], updated_by: "supervisor".to_string(), }); // Start task on a daemon // Find a daemon that belongs to this owner let mut updated_task = task; for entry in state.daemon_connections.iter() { let daemon = entry.value(); if daemon.owner_id == owner_id { // IMPORTANT: Update database FIRST to assign daemon_id before sending command // This prevents race conditions where the task starts but daemon_id is not set let update_req = UpdateTaskRequest { status: Some("starting".to_string()), daemon_id: Some(daemon.id), version: Some(updated_task.version), ..Default::default() }; match repository::update_task_for_owner(pool, updated_task.id, owner_id, update_req).await { Ok(Some(t)) => { updated_task = t; } Ok(None) => { tracing::warn!(task_id = %updated_task.id, "Task not found when updating daemon_id"); break; } Err(e) => { tracing::error!(task_id = %updated_task.id, error = %e, "Failed to update task with daemon_id"); break; } } // Send spawn command to daemon let cmd = DaemonCommand::SpawnTask { task_id: updated_task.id, task_name: updated_task.name.clone(), plan: updated_task.plan.clone(), repo_url: repo_url.clone(), base_branch: updated_task.base_branch.clone(), target_branch: updated_task.target_branch.clone(), parent_task_id: updated_task.parent_task_id, depth: updated_task.depth, is_orchestrator: false, target_repo_path: updated_task.target_repo_path.clone(), completion_action: updated_task.completion_action.clone(), continue_from_task_id: updated_task.continue_from_task_id, copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: updated_task.contract_id, is_supervisor: false, autonomous_loop: false, resume_session: false, conversation_history: None, patch_data: None, patch_base_sha: None, local_only: contract.local_only, auto_merge_local: contract.auto_merge_local, // All tasks share the supervisor's worktree supervisor_worktree_task_id: Some(supervisor_id), directive_id: updated_task.directive_id, }; if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { tracing::warn!(error = %e, daemon_id = %daemon.id, "Failed to send spawn command"); // Rollback: clear daemon_id and reset status since command failed let rollback_req = UpdateTaskRequest { status: Some("pending".to_string()), clear_daemon_id: true, ..Default::default() }; let _ = repository::update_task_for_owner(pool, updated_task.id, owner_id, rollback_req).await; } else { tracing::info!(task_id = %updated_task.id, daemon_id = %daemon.id, repo_url = ?repo_url, "Task spawn command sent"); // Save state: task spawn is a key save point (Task 3.3) save_state_on_task_spawn(pool, request.contract_id, updated_task.id).await; // Broadcast task status update notification to WebSocket subscribers state.broadcast_task_update(TaskUpdateNotification { task_id: updated_task.id, owner_id: Some(owner_id), version: updated_task.version, status: "starting".to_string(), updated_fields: vec!["status".to_string(), "daemon_id".to_string()], updated_by: "supervisor".to_string(), }); } break; } } (StatusCode::CREATED, Json(updated_task)).into_response() } // ============================================================================= // Wait for Task Handler // ============================================================================= /// Wait for a task to complete. #[utoipa::path( post, path = "/api/v1/mesh/supervisor/tasks/{task_id}/wait", params( ("task_id" = Uuid, Path, description = "Task ID to wait for") ), request_body = WaitForTaskRequest, responses( (status = 200, description = "Task completed or timed out", body = WaitResponse), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden - not a supervisor"), (status = 404, description = "Task not found"), (status = 500, description = "Internal server error"), ), tag = "Mesh Supervisor" )] pub async fn wait_for_task( State(state): State, Path(task_id): Path, headers: HeaderMap, Json(request): Json, ) -> impl IntoResponse { let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { Ok(ids) => ids, Err(e) => return e.into_response(), }; let pool = state.db_pool.as_ref().unwrap(); // Verify task belongs to same owner let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ).into_response(); } Err(e) => { tracing::error!(error = %e, "Failed to get task"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to get task")), ).into_response(); } }; // Check if already done if task.status == "done" || task.status == "failed" || task.status == "merged" { return ( StatusCode::OK, Json(WaitResponse { task_id, status: task.status, completed: true, output_summary: None, }), ).into_response(); } // Get contract_id for pending task scheduling let contract_id = task.contract_id; // Subscribe to task completions let mut rx = state.task_completions.subscribe(); let timeout = tokio::time::Duration::from_secs(request.timeout_seconds as u64); // Wait for completion or timeout, periodically trying to start pending tasks let result = tokio::time::timeout(timeout, async { let mut pending_check_interval = tokio::time::interval(tokio::time::Duration::from_secs(5)); pending_check_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { tokio::select! { // Check for task completion notifications recv_result = rx.recv() => { match recv_result { Ok(notification) => { if notification.task_id == task_id { return Some(notification); } } Err(_) => { // Channel closed or lagged - check DB directly if let Ok(Some(t)) = repository::get_task(pool, task_id).await { if t.status == "done" || t.status == "failed" || t.status == "merged" { return Some(crate::server::state::TaskCompletionNotification { task_id: t.id, owner_id: Some(t.owner_id), contract_id: t.contract_id, parent_task_id: t.parent_task_id, status: t.status, output_summary: None, worktree_path: None, error_message: t.error_message, }); } } } } } // Periodically try to start pending tasks _ = pending_check_interval.tick() => { if let Some(cid) = contract_id { match try_start_pending_task(&state, cid, owner_id).await { Ok(Some(started_task)) => { tracing::debug!( task_id = %started_task.id, task_name = %started_task.name, "Started pending task while waiting" ); } Ok(None) => { // No pending tasks or no capacity - that's fine } Err(e) => { tracing::warn!(error = %e, "Error trying to start pending task"); } } } } } } }).await; match result { Ok(Some(notification)) => { ( StatusCode::OK, Json(WaitResponse { task_id, status: notification.status, completed: true, output_summary: notification.output_summary, }), ).into_response() } Ok(None) | Err(_) => { // Timeout - check final status let final_status = repository::get_task(pool, task_id) .await .ok() .flatten() .map(|t| t.status) .unwrap_or_else(|| "unknown".to_string()); ( StatusCode::OK, Json(WaitResponse { task_id, status: final_status.clone(), completed: final_status == "done" || final_status == "failed" || final_status == "merged", output_summary: None, }), ).into_response() } } } // ============================================================================= // Read Worktree File Handler // ============================================================================= /// Read a file from a task's worktree. #[utoipa::path( post, path = "/api/v1/mesh/supervisor/tasks/{task_id}/read-file", params( ("task_id" = Uuid, Path, description = "Task ID") ), request_body = ReadWorktreeFileRequest, responses( (status = 200, description = "File content", body = ReadFileResponse), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden - not a supervisor"), (status = 404, description = "Task not found"), (status = 500, description = "Internal server error"), ), tag = "Mesh Supervisor" )] pub async fn read_worktree_file( State(state): State, Path(task_id): Path, headers: HeaderMap, Json(request): Json, ) -> impl IntoResponse { let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { Ok(ids) => ids, Err(e) => return e.into_response(), }; let pool = state.db_pool.as_ref().unwrap(); // Get task to verify ownership let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ).into_response(); } Err(e) => { tracing::error!(error = %e, "Failed to get task"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to get task")), ).into_response(); } }; // TODO: Implement file reading via worktree path // For now, return not implemented - supervisor should use local file access via worktree let _ = (task, request); ( StatusCode::NOT_IMPLEMENTED, Json(ApiError::new( "NOT_IMPLEMENTED", "Worktree file reading via API not yet implemented. Use local filesystem access via worktree path.", )), ).into_response() } // ============================================================================= // Checkpoint Handlers // ============================================================================= /// Create a git checkpoint for a task. #[utoipa::path( post, path = "/api/v1/mesh/tasks/{task_id}/checkpoint", params( ("task_id" = Uuid, Path, description = "Task ID") ), request_body = CreateCheckpointRequest, responses( (status = 202, description = "Checkpoint creation accepted", body = CheckpointResponse), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden - can only create checkpoint for own task"), (status = 404, description = "Task not found"), (status = 500, description = "Internal server error"), (status = 503, description = "Task has no assigned daemon"), ), tag = "Mesh Supervisor" )] pub async fn create_checkpoint( State(state): State, Path(task_id): Path, headers: HeaderMap, Json(request): Json, ) -> impl IntoResponse { let auth = extract_auth(&state, &headers); let task_id_from_auth = match auth { AuthSource::ToolKey(tid) => tid, _ => { return ( StatusCode::UNAUTHORIZED, Json(ApiError::new("UNAUTHORIZED", "Tool key required")), ).into_response(); } }; // Can only create checkpoint for own task if task_id_from_auth != task_id { return ( StatusCode::FORBIDDEN, Json(ApiError::new("FORBIDDEN", "Can only create checkpoint for own task")), ).into_response(); } let pool = state.db_pool.as_ref().unwrap(); // Get task and daemon_id let task = match repository::get_task(pool, task_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ).into_response(); } Err(e) => { tracing::error!(error = %e, "Failed to get task"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to get task")), ).into_response(); } }; let Some(daemon_id) = task.daemon_id else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")), ).into_response(); }; // Send CreateCheckpoint command to daemon let cmd = DaemonCommand::CreateCheckpoint { task_id, message: request.message.clone(), }; if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { tracing::error!(error = %e, "Failed to send CreateCheckpoint command"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), ).into_response(); } // Return accepted - the checkpoint result will be delivered via WebSocket // and stored in the database by the daemon message handler ( StatusCode::ACCEPTED, Json(CheckpointResponse { task_id, checkpoint_number: 0, // Will be assigned by DB on actual creation commit_sha: "pending".to_string(), message: request.message, }), ).into_response() } /// List checkpoints for a task. #[utoipa::path( get, path = "/api/v1/mesh/tasks/{task_id}/checkpoints", params( ("task_id" = Uuid, Path, description = "Task ID") ), responses( (status = 200, description = "List of checkpoints", body = CheckpointListResponse), (status = 401, description = "Unauthorized"), (status = 404, description = "Task not found"), (status = 500, description = "Internal server error"), ), tag = "Mesh Supervisor" )] pub async fn list_checkpoints( State(state): State, Path(task_id): Path, headers: HeaderMap, ) -> impl IntoResponse { let auth = extract_auth(&state, &headers); let _task_id_from_auth = match auth { AuthSource::ToolKey(tid) => tid, _ => { return ( StatusCode::UNAUTHORIZED, Json(ApiError::new("UNAUTHORIZED", "Tool key required")), ).into_response(); } }; let pool = state.db_pool.as_ref().unwrap(); // Get checkpoints from DB match repository::list_task_checkpoints(pool, task_id).await { Ok(checkpoints) => { let checkpoint_list: Vec = checkpoints .into_iter() .map(|c| TaskCheckpoint { id: c.id, task_id: c.task_id, checkpoint_number: c.checkpoint_number, commit_sha: c.commit_sha, branch_name: c.branch_name, message: c.message, files_changed: c.files_changed, lines_added: c.lines_added.unwrap_or(0), lines_removed: c.lines_removed.unwrap_or(0), created_at: c.created_at, }) .collect(); ( StatusCode::OK, Json(CheckpointListResponse { task_id, checkpoints: checkpoint_list, }), ).into_response() } Err(e) => { tracing::error!(error = %e, "Failed to list checkpoints"); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to list checkpoints")), ).into_response() } } } // ============================================================================= // Git Operations - Request/Response Types // ============================================================================= /// Request to create a new branch. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct CreateBranchRequest { pub branch_name: String, pub from_ref: Option, } /// Response for branch creation. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct CreateBranchResponse { pub success: bool, pub branch_name: String, pub message: String, } /// Request to merge task changes. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct MergeTaskRequest { pub target_branch: Option, #[serde(default)] pub squash: bool, } /// Response for merge operation. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct MergeTaskResponse { pub task_id: Uuid, pub success: bool, pub message: String, pub commit_sha: Option, pub conflicts: Option>, } /// Request to create a pull request. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct CreatePRRequest { pub branch: String, pub title: String, pub body: Option, } /// Response for PR creation. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct CreatePRResponse { pub task_id: Uuid, pub success: bool, pub message: String, pub pr_url: Option, pub pr_number: Option, } /// Response for task diff. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct TaskDiffResponse { pub task_id: Uuid, pub success: bool, pub diff: Option, pub error: Option, } // ============================================================================= // Git Operations - Handlers // ============================================================================= /// Create a new branch from supervisor's worktree. #[utoipa::path( post, path = "/api/v1/mesh/supervisor/branches", request_body = CreateBranchRequest, responses( (status = 201, description = "Branch created", body = CreateBranchResponse), (status = 400, description = "Invalid request"), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden - not a supervisor"), (status = 500, description = "Internal server error"), ), tag = "Mesh Supervisor" )] pub async fn create_branch( State(state): State, headers: HeaderMap, Json(request): Json, ) -> impl IntoResponse { let (supervisor_id, _owner_id) = match verify_supervisor_auth(&state, &headers, None).await { Ok(ids) => ids, Err(e) => return e.into_response(), }; // Find daemon running supervisor let daemon_id = { let pool = state.db_pool.as_ref().unwrap(); match repository::get_task(pool, supervisor_id).await { Ok(Some(task)) => task.daemon_id, _ => None, } }; let Some(daemon_id) = daemon_id else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("NO_DAEMON", "Supervisor has no assigned daemon")), ).into_response(); }; // Send CreateBranch command to daemon let cmd = DaemonCommand::CreateBranch { task_id: supervisor_id, branch_name: request.branch_name.clone(), from_ref: request.from_ref, }; if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { tracing::error!(error = %e, "Failed to send CreateBranch command"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), ).into_response(); } // Note: Real implementation would wait for daemon response // For now, return success immediately - daemon will send response via WebSocket ( StatusCode::CREATED, Json(CreateBranchResponse { success: true, branch_name: request.branch_name, message: "Branch creation command sent".to_string(), }), ).into_response() } /// Merge a task's changes to a target branch. #[utoipa::path( post, path = "/api/v1/mesh/supervisor/tasks/{task_id}/merge", params( ("task_id" = Uuid, Path, description = "Task ID to merge") ), request_body = MergeTaskRequest, responses( (status = 200, description = "Merge initiated", body = MergeTaskResponse), (status = 400, description = "Invalid request"), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden - not a supervisor"), (status = 404, description = "Task not found"), (status = 500, description = "Internal server error"), ), tag = "Mesh Supervisor" )] pub async fn merge_task( State(state): State, Path(task_id): Path, headers: HeaderMap, Json(request): Json, ) -> impl IntoResponse { let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { Ok(ids) => ids, Err(e) => return e.into_response(), }; let pool = state.db_pool.as_ref().unwrap(); // Get the target task let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ).into_response(); } Err(e) => { tracing::error!(error = %e, "Failed to get task"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to get task")), ).into_response(); } }; // Get daemon running the task let Some(daemon_id) = task.daemon_id else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")), ).into_response(); }; // Subscribe to merge results BEFORE sending the command let mut rx = state.merge_results.subscribe(); // Send MergeTaskToTarget command to daemon let cmd = DaemonCommand::MergeTaskToTarget { task_id, target_branch: request.target_branch, squash: request.squash, }; if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { tracing::error!(error = %e, "Failed to send MergeTaskToTarget command"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), ).into_response(); } // Wait for the merge result with a timeout (60 seconds should be plenty for a merge) let timeout = tokio::time::Duration::from_secs(60); let result = tokio::time::timeout(timeout, async { loop { match rx.recv().await { Ok(notification) => { if notification.task_id == task_id { return Some(notification); } // Not our task, keep waiting } Err(_) => { // Channel closed or lagged return None; } } } }).await; match result { Ok(Some(notification)) => { ( StatusCode::OK, Json(MergeTaskResponse { task_id, success: notification.success, message: notification.message, commit_sha: notification.commit_sha, conflicts: notification.conflicts, }), ).into_response() } Ok(None) | Err(_) => { // Timeout or channel error - return error status ( StatusCode::GATEWAY_TIMEOUT, Json(MergeTaskResponse { task_id, success: false, message: "Merge operation timed out waiting for daemon response".to_string(), commit_sha: None, conflicts: None, }), ).into_response() } } } /// Create a pull request for a task's changes. #[utoipa::path( post, path = "/api/v1/mesh/supervisor/pr", request_body = CreatePRRequest, responses( (status = 201, description = "PR created", body = CreatePRResponse), (status = 400, description = "Invalid request"), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden - not a supervisor"), (status = 404, description = "Task not found"), (status = 500, description = "Internal server error"), ), tag = "Mesh Supervisor" )] pub async fn create_pr( State(state): State, headers: HeaderMap, Json(request): Json, ) -> impl IntoResponse { let (supervisor_id, _owner_id) = match verify_supervisor_auth(&state, &headers, None).await { Ok(ids) => ids, Err(e) => return e.into_response(), }; let pool = state.db_pool.as_ref().unwrap(); // Get the supervisor's own task to find daemon and base_branch let task = match repository::get_task(pool, supervisor_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Supervisor task not found")), ).into_response(); } Err(e) => { tracing::error!(error = %e, "Failed to get supervisor task"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to get supervisor task")), ).into_response(); } }; // Get daemon running the supervisor let Some(daemon_id) = task.daemon_id else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("NO_DAEMON", "Supervisor has no assigned daemon")), ).into_response(); }; // Subscribe to PR results BEFORE sending the command let mut rx = state.pr_results.subscribe(); // Send CreatePR command to daemon using the supervisor's task ID // (the branch is in the supervisor's worktree) // Pass base_branch from task if available, otherwise daemon will auto-detect let cmd = DaemonCommand::CreatePR { task_id: supervisor_id, title: request.title.clone(), body: request.body.clone(), base_branch: task.base_branch.clone(), branch: request.branch.clone(), }; if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { tracing::error!(error = %e, "Failed to send CreatePR command"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), ).into_response(); } // Wait for the PR result with a timeout (60 seconds should be plenty for PR creation) let timeout = tokio::time::Duration::from_secs(60); let result = tokio::time::timeout(timeout, async { loop { match rx.recv().await { Ok(notification) => { if notification.task_id == supervisor_id { return Some(notification); } // Not our task, keep waiting } Err(_) => { // Channel closed or lagged return None; } } } }).await; match result { Ok(Some(notification)) => { let status = if notification.success { StatusCode::CREATED } else { StatusCode::INTERNAL_SERVER_ERROR }; ( status, Json(CreatePRResponse { task_id: supervisor_id, success: notification.success, message: notification.message, pr_url: notification.pr_url, pr_number: notification.pr_number, }), ).into_response() } Ok(None) | Err(_) => { // Timeout or channel error - return error status ( StatusCode::GATEWAY_TIMEOUT, Json(CreatePRResponse { task_id: supervisor_id, success: false, message: "PR creation timed out waiting for daemon response".to_string(), pr_url: None, pr_number: None, }), ).into_response() } } } /// Get the diff for a task's changes. #[utoipa::path( get, path = "/api/v1/mesh/supervisor/tasks/{task_id}/diff", params( ("task_id" = Uuid, Path, description = "Task ID") ), responses( (status = 200, description = "Task diff", body = TaskDiffResponse), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden - not a supervisor"), (status = 404, description = "Task not found"), (status = 500, description = "Internal server error"), ), tag = "Mesh Supervisor" )] pub async fn get_task_diff( State(state): State, Path(task_id): Path, headers: HeaderMap, ) -> impl IntoResponse { let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { Ok(ids) => ids, Err(e) => return e.into_response(), }; let pool = state.db_pool.as_ref().unwrap(); // Get the target task let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ).into_response(); } Err(e) => { tracing::error!(error = %e, "Failed to get task"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to get task")), ).into_response(); } }; // Get daemon running the task let Some(daemon_id) = task.daemon_id else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")), ).into_response(); }; // Send GetTaskDiff command to daemon let cmd = DaemonCommand::GetTaskDiff { task_id }; if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { tracing::error!(error = %e, "Failed to send GetTaskDiff command"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), ).into_response(); } ( StatusCode::OK, Json(TaskDiffResponse { task_id, success: true, diff: None, error: Some("Diff command sent - response will be streamed".to_string()), }), ).into_response() } // ============================================================================= // Supervisor Question Handlers // ============================================================================= /// Ask a question and wait for user feedback. /// /// The supervisor calls this to ask a question. The endpoint will poll until /// either the user responds or the timeout is reached. #[utoipa::path( post, path = "/api/v1/mesh/supervisor/questions", request_body = AskQuestionRequest, responses( (status = 200, description = "Question answered", body = AskQuestionResponse), (status = 408, description = "Question timed out", body = AskQuestionResponse), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden - not a supervisor"), (status = 500, description = "Internal server error"), ), security( ("tool_key" = []) ), tag = "Mesh Supervisor" )] pub async fn ask_question( State(state): State, headers: HeaderMap, Json(request): Json, ) -> impl IntoResponse { let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { Ok(ids) => ids, Err(e) => return e.into_response(), }; let pool = state.db_pool.as_ref().unwrap(); // Get the supervisor task to find its contract let supervisor = match repository::get_task_for_owner(pool, supervisor_id, owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Supervisor task not found")), ).into_response(); } Err(e) => { tracing::error!(error = %e, "Failed to get supervisor task"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to get supervisor task")), ).into_response(); } }; // Determine context: contract or directive let contract_id = supervisor.contract_id; let directive_id = supervisor.directive_id; if contract_id.is_none() && directive_id.is_none() { return ( StatusCode::BAD_REQUEST, Json(ApiError::new("NO_CONTEXT", "Supervisor has no associated contract or directive")), ).into_response(); } let is_directive_context = directive_id.is_some() && contract_id.is_none(); // For directive context, check reconcile_mode to determine behavior let directive_reconcile_mode: String = if let Some(did) = directive_id { if is_directive_context { match repository::get_directive_for_owner(pool, owner_id, did).await { Ok(Some(d)) => d.reconcile_mode.clone(), Ok(None) => "auto".to_string(), Err(e) => { tracing::warn!(error = %e, "Failed to get directive for reconcile_mode check"); "auto".to_string() } } } else { "auto".to_string() } } else { "auto".to_string() }; // Add the question (use Uuid::nil() for contract_id in directive-only context) let effective_contract_id = contract_id.unwrap_or(Uuid::nil()); let question_id = state.add_supervisor_question_with_directive( supervisor_id, effective_contract_id, directive_id, owner_id, request.question.clone(), request.choices.clone(), request.context.clone(), request.multi_select, request.question_type.clone(), ); // Save state: question asked is a key save point (Task 3.3) // Only for contract context — directive tasks don't use supervisor_states table if let Some(cid) = contract_id { let pending_question = PendingQuestion { id: question_id, question: request.question.clone(), choices: request.choices.clone(), context: request.context.clone(), question_type: request.question_type.clone(), asked_at: chrono::Utc::now(), }; save_state_on_question_asked(pool, cid, pending_question).await; } // Broadcast question as task output entry for the task's chat let question_data = serde_json::json!({ "question_id": question_id.to_string(), "choices": request.choices, "context": request.context, "multi_select": request.multi_select, "question_type": request.question_type, }); state.broadcast_task_output(TaskOutputNotification { task_id: supervisor_id, owner_id: Some(owner_id), message_type: "supervisor_question".to_string(), content: request.question.clone(), tool_name: None, tool_input: Some(question_data.clone()), is_error: None, cost_usd: None, duration_ms: None, is_partial: false, }); // Persist to database so it appears when reloading the page // Use event_type "output" with messageType "supervisor_question" to match TaskOutputEntry format if let Some(pool) = state.db_pool.as_ref() { let event_data = serde_json::json!({ "messageType": "supervisor_question", "content": request.question, "toolInput": question_data, }); let _ = repository::create_task_event( pool, supervisor_id, "output", None, None, Some(event_data), ).await; } // If non_blocking mode, return immediately if request.non_blocking { return ( StatusCode::OK, Json(AskQuestionResponse { question_id, response: None, timed_out: false, still_pending: false, }), ).into_response(); } // Determine if we should block indefinitely (phaseguard or directive reconcile mode) let use_phaseguard = request.phaseguard || (is_directive_context && (directive_reconcile_mode == "semi-auto" || directive_reconcile_mode == "manual")); // Poll for response with timeout // - Phaseguard: block indefinitely until user responds // - Directive tasks without reconcile mode: 30s default timeout // - Contract tasks: use requested timeout_seconds let timeout_secs = if use_phaseguard { // Cap at 5 minutes per HTTP request (well under Claude Code's 10-min limit). // The CLI will automatically reconnect via the poll endpoint. 300 } else if is_directive_context && directive_reconcile_mode == "auto" { 30 } else { request.timeout_seconds.max(1) as u64 }; let timeout_duration = std::time::Duration::from_secs(timeout_secs); let start = std::time::Instant::now(); let poll_interval = std::time::Duration::from_millis(500); loop { // Check if response has been submitted if let Some(response) = state.get_question_response(question_id) { // Clean up the response state.cleanup_question_response(question_id); // Clear pending question from supervisor state (Task 3.3) // Skip for directive context — no supervisor_states for directives if let Some(cid) = contract_id { clear_pending_question(pool, cid, question_id).await; } return ( StatusCode::OK, Json(AskQuestionResponse { question_id, response: Some(response.response), timed_out: false, still_pending: false, }), ).into_response(); } // Check timeout if start.elapsed() >= timeout_duration { if use_phaseguard { // Phaseguard/reconcile: DON'T remove the pending question. // Return still_pending so the CLI can reconnect via the poll endpoint. return ( StatusCode::OK, Json(AskQuestionResponse { question_id, response: None, timed_out: false, still_pending: true, }), ).into_response(); } // Non-phaseguard: remove the pending question on timeout state.remove_pending_question(question_id); // Clear pending question from supervisor state on timeout (Task 3.3) // Skip for directive context — no supervisor_states for directives if let Some(cid) = contract_id { clear_pending_question(pool, cid, question_id).await; } return ( StatusCode::REQUEST_TIMEOUT, Json(AskQuestionResponse { question_id, response: None, timed_out: true, still_pending: false, }), ).into_response(); } // Wait before polling again tokio::time::sleep(poll_interval).await; } } /// Poll for a question response by question_id. /// /// Used by the CLI to reconnect after a still_pending response from ask_question. /// Blocks for up to 5 minutes polling every 500ms. Returns still_pending if timeout /// is reached without a response, allowing the CLI to reconnect again. #[utoipa::path( get, path = "/api/v1/mesh/supervisor/questions/{question_id}/poll", params( ("question_id" = Uuid, Path, description = "The question ID to poll for"), ), responses( (status = 200, description = "Question answered or still pending", body = AskQuestionResponse), (status = 404, description = "Question not found"), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden - not a supervisor"), ), security( ("tool_key" = []) ), tag = "Mesh Supervisor" )] pub async fn poll_question( State(state): State, headers: HeaderMap, Path(question_id): Path, ) -> impl IntoResponse { let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { Ok(ids) => ids, Err(e) => return e.into_response(), }; // Check if a response is already available if let Some(response) = state.get_question_response(question_id) { state.cleanup_question_response(question_id); // Clear pending question from supervisor state for contract context let pool = state.db_pool.as_ref().unwrap(); if let Ok(Some(task)) = repository::get_task_for_owner(pool, supervisor_id, owner_id).await { if let Some(cid) = task.contract_id { clear_pending_question(pool, cid, question_id).await; } } return ( StatusCode::OK, Json(AskQuestionResponse { question_id, response: Some(response.response), timed_out: false, still_pending: false, }), ).into_response(); } // Check if the question exists at all (pending or response) if !state.has_pending_question(question_id) { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Question not found or already answered")), ).into_response(); } // Block for up to 5 minutes polling every 500ms let timeout_duration = std::time::Duration::from_secs(300); let start = std::time::Instant::now(); let poll_interval = std::time::Duration::from_millis(500); loop { // Check if response has been submitted if let Some(response) = state.get_question_response(question_id) { state.cleanup_question_response(question_id); // Clear pending question from supervisor state for contract context let pool = state.db_pool.as_ref().unwrap(); if let Ok(Some(task)) = repository::get_task_for_owner(pool, supervisor_id, owner_id).await { if let Some(cid) = task.contract_id { clear_pending_question(pool, cid, question_id).await; } } return ( StatusCode::OK, Json(AskQuestionResponse { question_id, response: Some(response.response), timed_out: false, still_pending: false, }), ).into_response(); } // Check if the question was removed (e.g., task deleted) if !state.has_pending_question(question_id) { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Question no longer exists")), ).into_response(); } // Check timeout if start.elapsed() >= timeout_duration { return ( StatusCode::OK, Json(AskQuestionResponse { question_id, response: None, timed_out: false, still_pending: true, }), ).into_response(); } // Wait before polling again tokio::time::sleep(poll_interval).await; } } /// Get all pending questions for the current user. #[utoipa::path( get, path = "/api/v1/mesh/questions", responses( (status = 200, description = "List of pending questions", body = Vec), (status = 401, description = "Unauthorized"), (status = 500, description = "Internal server error"), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn list_pending_questions( State(state): State, Authenticated(auth): Authenticated, ) -> impl IntoResponse { let questions: Vec = state .get_pending_questions_for_owner(auth.owner_id) .into_iter() .map(|q| PendingQuestionSummary { question_id: q.question_id, task_id: q.task_id, contract_id: q.contract_id, directive_id: q.directive_id, question: q.question, choices: q.choices, context: q.context, created_at: q.created_at, multi_select: q.multi_select, question_type: q.question_type, }) .collect(); Json(questions).into_response() } /// Answer a pending supervisor question. #[utoipa::path( post, path = "/api/v1/mesh/questions/{question_id}/answer", params( ("question_id" = Uuid, Path, description = "Question ID") ), request_body = AnswerQuestionRequest, responses( (status = 200, description = "Question answered", body = AnswerQuestionResponse), (status = 401, description = "Unauthorized"), (status = 404, description = "Question not found"), (status = 500, description = "Internal server error"), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn answer_question( State(state): State, Authenticated(auth): Authenticated, Path(question_id): Path, Json(request): Json, ) -> impl IntoResponse { // Verify the question exists and belongs to this owner let question = match state.get_pending_question(question_id) { Some(q) if q.owner_id == auth.owner_id => q, Some(_) => { return ( StatusCode::FORBIDDEN, Json(ApiError::new("FORBIDDEN", "Question belongs to another user")), ).into_response(); } None => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Question not found or already answered")), ).into_response(); } }; // Submit the response let success = state.submit_question_response(question_id, request.response.clone()); if success { tracing::info!( question_id = %question_id, task_id = %question.task_id, "User answered supervisor question" ); // Send the response to the task as a message // This will auto-resume the task if it was paused (phaseguard) let pool = state.db_pool.as_ref().unwrap(); if let Ok(Some(task)) = repository::get_task_for_owner(pool, question.task_id, auth.owner_id).await { if let Some(daemon_id) = task.daemon_id { // Format the response message let response_msg = format!( "\n[User Response to Question]\nQuestion: {}\nAnswer: {}\n", question.question, request.response ); let cmd = DaemonCommand::SendMessage { task_id: question.task_id, message: response_msg, }; if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { tracing::warn!( task_id = %question.task_id, error = %e, "Failed to send response message to task" ); } else { tracing::info!( task_id = %question.task_id, "Sent response message to task (will auto-resume if paused)" ); } } } } Json(AnswerQuestionResponse { success }).into_response() } // ============================================================================= // Supervisor Resume and Conversation Rewind // ============================================================================= /// Response for supervisor resume #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct ResumeSupervisorResponse { pub supervisor_task_id: Uuid, pub daemon_id: Option, pub resumed_from: ResumedFromInfo, pub status: String, /// Restoration context (Task 3.4) pub restoration: Option, } #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct ResumedFromInfo { pub phase: String, pub last_activity: chrono::DateTime, pub message_count: i32, } /// Information about supervisor restoration (Task 3.4) #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct RestorationInfo { /// Previous state before restoration pub previous_state: String, /// How many times this supervisor has been restored pub restoration_count: i32, /// Number of pending questions to re-deliver pub pending_questions_count: usize, /// Number of tasks being waited on pub waiting_tasks_count: usize, /// Number of tasks spawned before crash pub spawned_tasks_count: usize, /// Any warnings from state validation pub warnings: Vec, } /// Resume interrupted supervisor with specified mode. /// /// POST /api/v1/contracts/{id}/supervisor/resume #[utoipa::path( post, path = "/api/v1/contracts/{id}/supervisor/resume", params( ("id" = Uuid, Path, description = "Contract ID") ), request_body = crate::db::models::ResumeSupervisorRequest, responses( (status = 200, description = "Supervisor resumed", body = ResumeSupervisorResponse), (status = 400, description = "Invalid request", body = ApiError), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Contract or supervisor not found", body = ApiError), (status = 409, description = "Supervisor is already running", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh Supervisor" )] pub async fn resume_supervisor( State(state): State, Path(contract_id): Path, auth: crate::server::auth::Authenticated, Json(req): Json, ) -> impl IntoResponse { let crate::server::auth::Authenticated(auth_info) = auth; let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Get contract and verify ownership let contract = match repository::get_contract_for_owner(pool, contract_id, auth_info.owner_id).await { Ok(Some(c)) => c, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Contract not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get contract {}: {}", contract_id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Get existing supervisor state let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await { Ok(Some(s)) => s, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new( "NO_SUPERVISOR_STATE", "No supervisor state found - supervisor may not have been started", )), ) .into_response(); } Err(e) => { tracing::error!("Failed to get supervisor state: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Get supervisor task let supervisor_task = match repository::get_task_for_owner(pool, supervisor_state.task_id, auth_info.owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Supervisor task not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get supervisor task: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Check if already running - but only if daemon is actually connected // (daemon disconnect handler may not have updated status yet) if supervisor_task.status == "running" { let daemon_connected = supervisor_task .daemon_id .map(|d| state.is_daemon_connected(d)) .unwrap_or(false); if daemon_connected { return ( StatusCode::CONFLICT, Json(ApiError::new("ALREADY_RUNNING", "Supervisor is already running")), ) .into_response(); } // Daemon not connected - allow resume (treat as interrupted) tracing::info!( supervisor_task_id = %supervisor_task.id, daemon_id = ?supervisor_task.daemon_id, "Supervisor status is 'running' but daemon is not connected, allowing resume" ); } // Calculate message count from conversation history let message_count = supervisor_state .conversation_history .as_array() .map(|arr| arr.len() as i32) .unwrap_or(0); // Find a connected daemon for this owner let target_daemon_id = match state.find_alternative_daemon(auth_info.owner_id, &[]) { Some(id) => id, None => { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new( "NO_DAEMON", "No daemons connected for your account. Cannot resume supervisor.", )), ) .into_response(); } }; // Track response values (may be updated by resume modes) let mut response_daemon_id = supervisor_task.daemon_id; let mut response_status = "pending".to_string(); // Based on resume mode, handle differently match req.resume_mode.as_str() { "continue" => { // Update task status to starting and assign daemon if let Err(e) = sqlx::query("UPDATE tasks SET status = 'starting', daemon_id = $1 WHERE id = $2") .bind(target_daemon_id) .bind(supervisor_state.task_id) .execute(pool) .await { tracing::error!("Failed to update task for resume: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } // Fetch latest checkpoint patch for worktree recovery let (patch_data, patch_base_sha) = match repository::get_latest_checkpoint_patch(pool, supervisor_state.task_id).await { Ok(Some(patch)) => { tracing::info!( task_id = %supervisor_state.task_id, patch_size = patch.patch_size_bytes, base_sha = %patch.base_commit_sha, "Including checkpoint patch for worktree recovery" ); // Encode patch as base64 for JSON transport let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data); (Some(encoded), Some(patch.base_commit_sha)) } Ok(None) => { tracing::debug!(task_id = %supervisor_state.task_id, "No checkpoint patch found"); (None, None) } Err(e) => { tracing::warn!(task_id = %supervisor_state.task_id, error = %e, "Failed to fetch checkpoint patch"); (None, None) } }; // Send SpawnTask with resume_session=true to use Claude's --continue // Include conversation_history as fallback if worktree doesn't exist on target daemon let command = DaemonCommand::SpawnTask { task_id: supervisor_state.task_id, task_name: supervisor_task.name.clone(), plan: supervisor_task.plan.clone(), repo_url: supervisor_task.repository_url.clone(), base_branch: supervisor_task.base_branch.clone(), target_branch: supervisor_task.target_branch.clone(), parent_task_id: supervisor_task.parent_task_id, depth: supervisor_task.depth, is_orchestrator: false, target_repo_path: supervisor_task.target_repo_path.clone(), completion_action: supervisor_task.completion_action.clone(), continue_from_task_id: supervisor_task.continue_from_task_id, copy_files: supervisor_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: supervisor_task.contract_id, is_supervisor: true, autonomous_loop: false, resume_session: true, // Use --continue to preserve conversation conversation_history: Some(supervisor_state.conversation_history.clone()), // Fallback if worktree missing patch_data, patch_base_sha, local_only: contract.local_only, auto_merge_local: contract.auto_merge_local, supervisor_worktree_task_id: None, // Supervisor uses its own worktree directive_id: supervisor_task.directive_id, }; if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { // Rollback status on failure let _ = sqlx::query("UPDATE tasks SET status = 'interrupted', daemon_id = NULL WHERE id = $1") .bind(supervisor_state.task_id) .execute(pool) .await; tracing::error!("Failed to send SpawnTask to daemon: {}", e); return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DAEMON_ERROR", format!("Failed to send to daemon: {}", e))), ) .into_response(); } tracing::info!( contract_id = %contract_id, supervisor_task_id = %supervisor_state.task_id, daemon_id = %target_daemon_id, message_count = message_count, "Supervisor resumed with --continue (resume_session=true)" ); // Update response values for successful resume response_daemon_id = Some(target_daemon_id); response_status = "starting".to_string(); } "restart_phase" => { // Clear conversation but keep phase progress if let Err(e) = repository::update_supervisor_conversation( pool, contract_id, serde_json::json!([]), ) .await { tracing::error!("Failed to clear conversation: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } if let Err(e) = sqlx::query("UPDATE tasks SET status = 'pending' WHERE id = $1") .bind(supervisor_state.task_id) .execute(pool) .await { tracing::error!("Failed to update task status: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } } "from_checkpoint" => { // This would require more complex handling with checkpoint system return ( StatusCode::BAD_REQUEST, Json(ApiError::new( "NOT_IMPLEMENTED", "from_checkpoint mode not yet implemented", )), ) .into_response(); } _ => { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( "INVALID_RESUME_MODE", "Invalid resume_mode. Use: continue, restart_phase, or from_checkpoint", )), ) .into_response(); } } tracing::info!( contract_id = %contract_id, supervisor_task_id = %supervisor_state.task_id, resume_mode = %req.resume_mode, message_count = message_count, "Supervisor resume requested" ); // Build restoration info (Task 3.4) let pending_questions: Vec = serde_json::from_value( supervisor_state.pending_questions.clone() ).unwrap_or_default(); let restoration_info = RestorationInfo { previous_state: supervisor_state.state.clone(), restoration_count: supervisor_state.restoration_count, pending_questions_count: pending_questions.len(), waiting_tasks_count: supervisor_state.pending_task_ids.len(), spawned_tasks_count: supervisor_state.spawned_task_ids.len(), warnings: vec![], // Could add validation warnings here }; // Re-deliver pending questions if any (Task 3.4) if !pending_questions.is_empty() { redeliver_pending_questions( &state, supervisor_state.task_id, contract_id, auth_info.owner_id, &pending_questions, ).await; } Json(ResumeSupervisorResponse { supervisor_task_id: supervisor_state.task_id, daemon_id: response_daemon_id, resumed_from: ResumedFromInfo { phase: contract.phase, last_activity: supervisor_state.last_activity, message_count, }, status: response_status, restoration: Some(restoration_info), }) .into_response() } /// Response for conversation rewind #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct RewindConversationResponse { pub contract_id: Uuid, pub messages_removed: i32, pub new_message_count: i32, pub code_rewound: bool, } /// Rewind supervisor conversation to specified point. /// /// POST /api/v1/contracts/{id}/supervisor/conversation/rewind #[utoipa::path( post, path = "/api/v1/contracts/{id}/supervisor/conversation/rewind", params( ("id" = Uuid, Path, description = "Contract ID") ), request_body = crate::db::models::RewindConversationRequest, responses( (status = 200, description = "Conversation rewound", body = RewindConversationResponse), (status = 400, description = "Invalid request", body = ApiError), (status = 401, description = "Unauthorized", body = ApiError), (status = 404, description = "Contract or supervisor not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh Supervisor" )] pub async fn rewind_conversation( State(state): State, Path(contract_id): Path, auth: crate::server::auth::Authenticated, Json(req): Json, ) -> impl IntoResponse { let crate::server::auth::Authenticated(auth_info) = auth; let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Get contract and verify ownership let _contract = match repository::get_contract_for_owner(pool, contract_id, auth_info.owner_id).await { Ok(Some(c)) => c, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Contract not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get contract {}: {}", contract_id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Get supervisor state let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await { Ok(Some(s)) => s, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Supervisor state not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get supervisor state: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; let conversation = supervisor_state .conversation_history .as_array() .cloned() .unwrap_or_default(); let original_count = conversation.len() as i32; // Determine how many messages to keep let new_count = if let Some(by_count) = req.by_message_count { (original_count - by_count).max(0) } else if let Some(ref to_id) = req.to_message_id { // Find message by ID and keep up to and including it let index = conversation .iter() .position(|msg| msg.get("id").and_then(|v| v.as_str()) == Some(to_id.as_str())) .map(|i| i as i32) .unwrap_or(original_count - 1); (index + 1).min(original_count).max(0) } else { // Default to removing last message (original_count - 1).max(0) }; // Truncate conversation let new_conversation: Vec = conversation .into_iter() .take(new_count as usize) .collect(); // Update the conversation if let Err(e) = repository::update_supervisor_conversation( pool, contract_id, serde_json::Value::Array(new_conversation), ) .await { tracing::error!("Failed to update conversation: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } tracing::info!( contract_id = %contract_id, original_count = original_count, new_count = new_count, messages_removed = original_count - new_count, "Conversation rewound" ); Json(RewindConversationResponse { contract_id, messages_removed: original_count - new_count, new_message_count: new_count, code_rewound: req.rewind_code.unwrap_or(false), // TODO: implement code rewind }) .into_response() } // ============================================================================= // Supervisor State Persistence Helpers (Task 3.3) // ============================================================================= use crate::db::models::{ SupervisorRestorationContext, SupervisorStateEnum, StateValidationResult, StateRecoveryAction, }; /// Save supervisor state on task spawn. /// This is called when a supervisor spawns a new task. pub async fn save_state_on_task_spawn( pool: &PgPool, contract_id: Uuid, spawned_task_id: Uuid, ) { if let Err(e) = repository::add_supervisor_spawned_task(pool, contract_id, spawned_task_id).await { tracing::warn!( contract_id = %contract_id, spawned_task_id = %spawned_task_id, error = %e, "Failed to save spawned task to supervisor state" ); } else { tracing::debug!( contract_id = %contract_id, spawned_task_id = %spawned_task_id, "Saved spawned task to supervisor state" ); } // Also update state to working if let Err(e) = repository::update_supervisor_detailed_state( pool, contract_id, "working", Some(&format!("Spawned task {}", spawned_task_id)), 0, // Progress resets when spawning new work None, ).await { tracing::warn!(contract_id = %contract_id, error = %e, "Failed to update supervisor state on task spawn"); } } /// Save supervisor state on question asked. /// This is called when a supervisor asks a question and is waiting for user input. pub async fn save_state_on_question_asked( pool: &PgPool, contract_id: Uuid, question: PendingQuestion, ) { let question_json = match serde_json::to_value(&[&question]) { Ok(v) => v, Err(e) => { tracing::warn!(contract_id = %contract_id, error = %e, "Failed to serialize pending question"); return; } }; if let Err(e) = repository::add_supervisor_pending_question(pool, contract_id, question_json).await { tracing::warn!( contract_id = %contract_id, question_id = %question.id, error = %e, "Failed to save pending question to supervisor state" ); } else { tracing::debug!( contract_id = %contract_id, question_id = %question.id, "Saved pending question to supervisor state" ); } } /// Clear pending question after it's answered. pub async fn clear_pending_question( pool: &PgPool, contract_id: Uuid, question_id: Uuid, ) { if let Err(e) = repository::remove_supervisor_pending_question(pool, contract_id, question_id).await { tracing::warn!( contract_id = %contract_id, question_id = %question_id, error = %e, "Failed to remove pending question from supervisor state" ); } // Update state back to working (if no more pending questions) match repository::get_supervisor_state(pool, contract_id).await { Ok(Some(state)) => { let questions: Vec = serde_json::from_value(state.pending_questions.clone()) .unwrap_or_default(); if questions.is_empty() { let _ = repository::update_supervisor_detailed_state( pool, contract_id, "working", Some("Resumed after user response"), state.progress, None, ).await; } } Ok(None) => {} Err(e) => { tracing::warn!(contract_id = %contract_id, error = %e, "Failed to check supervisor state after clearing question"); } } } /// Save supervisor state on phase change. pub async fn save_state_on_phase_change( pool: &PgPool, contract_id: Uuid, new_phase: &str, ) { if let Err(e) = repository::update_supervisor_phase(pool, contract_id, new_phase).await { tracing::warn!( contract_id = %contract_id, new_phase = %new_phase, error = %e, "Failed to update supervisor state on phase change" ); } else { tracing::info!( contract_id = %contract_id, new_phase = %new_phase, "Updated supervisor state on phase change" ); } } // ============================================================================= // Supervisor Restoration Protocol (Task 3.4) // ============================================================================= /// Validate supervisor state consistency before restoration. /// Checks that spawned tasks and pending questions are in expected states. pub async fn validate_supervisor_state( pool: &PgPool, state: &crate::db::models::SupervisorState, ) -> StateValidationResult { let mut issues = Vec::new(); // Validate spawned tasks if !state.spawned_task_ids.is_empty() { match repository::validate_spawned_tasks(pool, &state.spawned_task_ids).await { Ok(task_statuses) => { for task_id in &state.spawned_task_ids { if !task_statuses.contains_key(task_id) { issues.push(format!("Spawned task {} not found in database", task_id)); } } } Err(e) => { issues.push(format!("Failed to validate spawned tasks: {}", e)); } } } // Validate pending questions let pending_questions: Vec = serde_json::from_value(state.pending_questions.clone()) .unwrap_or_default(); // Check if questions are not too old (e.g., more than 24 hours) for question in &pending_questions { let age = chrono::Utc::now() - question.asked_at; if age.num_hours() > 24 { issues.push(format!( "Pending question {} is {} hours old, may be stale", question.id, age.num_hours() )); } } // Validate conversation history if let Some(history) = state.conversation_history.as_array() { if history.is_empty() && state.restoration_count > 0 { issues.push("Conversation history is empty after previous restoration".to_string()); } } // Determine recovery action let recovery_action = if issues.is_empty() { StateRecoveryAction::Proceed } else if issues.iter().any(|i| i.contains("not found")) { // Missing tasks suggest corruption - use checkpoint StateRecoveryAction::UseCheckpoint } else if issues.len() > 3 { // Many issues suggest manual intervention needed StateRecoveryAction::ManualIntervention } else { // Minor issues - proceed with warnings StateRecoveryAction::Proceed }; StateValidationResult { is_valid: issues.is_empty(), issues, recovery_action, } } /// Restore supervisor from saved state after daemon crash or task reassignment. /// Returns restoration context to send to the supervisor. pub async fn restore_supervisor( pool: &PgPool, contract_id: Uuid, restoration_source: &str, ) -> Result { // Step 1: Load supervisor state let state = match repository::get_supervisor_state_for_restoration(pool, contract_id).await { Ok(Some(s)) => s, Ok(None) => { tracing::warn!( contract_id = %contract_id, "No supervisor state found for restoration - starting fresh" ); return Ok(SupervisorRestorationContext { success: true, previous_state: SupervisorStateEnum::Initializing, conversation_history: serde_json::json!([]), pending_questions: vec![], waiting_task_ids: vec![], spawned_task_ids: vec![], restoration_count: 0, restoration_context_message: "No previous state found. Starting fresh.".to_string(), warnings: vec!["No previous supervisor state found".to_string()], }); } Err(e) => { return Err(format!("Failed to load supervisor state: {}", e)); } }; // Step 2: Parse previous state let previous_state: SupervisorStateEnum = state.state.parse().unwrap_or(SupervisorStateEnum::Interrupted); // Step 3: Validate state consistency let validation = validate_supervisor_state(pool, &state).await; let mut warnings = validation.issues.clone(); // Step 4: Handle based on validation result let (conversation_history, pending_questions, restoration_message) = match validation.recovery_action { StateRecoveryAction::Proceed => { // State is valid, use it let questions: Vec = serde_json::from_value(state.pending_questions.clone()) .unwrap_or_default(); let message = format!( "Restored from {} state. {} pending questions, {} spawned tasks, {} waiting tasks.", state.state, questions.len(), state.spawned_task_ids.len(), state.pending_task_ids.len() ); (state.conversation_history.clone(), questions, message) } StateRecoveryAction::UseCheckpoint => { // State is corrupted, try to use checkpoint warnings.push("State validation failed, attempting checkpoint recovery".to_string()); // TODO: Implement checkpoint-based recovery // For now, start with empty questions but preserve conversation let message = "Restored from last checkpoint due to state inconsistency.".to_string(); (state.conversation_history.clone(), vec![], message) } StateRecoveryAction::StartFresh => { warnings.push("Starting fresh due to unrecoverable state".to_string()); let message = "Starting fresh due to unrecoverable state corruption.".to_string(); (serde_json::json!([]), vec![], message) } StateRecoveryAction::ManualIntervention => { warnings.push("Manual intervention may be required".to_string()); // Still try to restore but with warning let questions: Vec = serde_json::from_value(state.pending_questions.clone()) .unwrap_or_default(); let message = "Restored with warnings - manual intervention may be required.".to_string(); (state.conversation_history.clone(), questions, message) } }; // Step 5: Mark supervisor as restored let new_state = match repository::mark_supervisor_restored(pool, contract_id, restoration_source).await { Ok(s) => s, Err(e) => { return Err(format!("Failed to mark supervisor as restored: {}", e)); } }; // Step 6: Build restoration context let context = SupervisorRestorationContext { success: true, previous_state, conversation_history, pending_questions, waiting_task_ids: state.pending_task_ids.clone(), spawned_task_ids: state.spawned_task_ids.clone(), restoration_count: new_state.restoration_count, restoration_context_message: restoration_message, warnings, }; tracing::info!( contract_id = %contract_id, restoration_source = %restoration_source, restoration_count = new_state.restoration_count, pending_questions_count = context.pending_questions.len(), waiting_tasks_count = context.waiting_task_ids.len(), spawned_tasks_count = context.spawned_task_ids.len(), "Supervisor restoration completed" ); Ok(context) } /// Re-deliver pending questions to the user after restoration. /// This ensures questions asked before crash are shown to the user again. pub async fn redeliver_pending_questions( state: &SharedState, supervisor_id: Uuid, contract_id: Uuid, owner_id: Uuid, questions: &[PendingQuestion], ) { for question in questions { // Add to in-memory question state state.add_supervisor_question( supervisor_id, contract_id, owner_id, question.question.clone(), question.choices.clone(), question.context.clone(), false, // Assume single select for restored questions question.question_type.clone(), ); // Broadcast to WebSocket clients let question_data = serde_json::json!({ "question_id": question.id.to_string(), "choices": question.choices, "context": question.context, "question_type": question.question_type, "is_restored": true, "originally_asked_at": question.asked_at.to_rfc3339(), }); state.broadcast_task_output(TaskOutputNotification { task_id: supervisor_id, owner_id: Some(owner_id), message_type: "supervisor_question".to_string(), content: question.question.clone(), tool_name: None, tool_input: Some(question_data), is_error: None, cost_usd: None, duration_ms: None, is_partial: false, }); tracing::info!( supervisor_id = %supervisor_id, question_id = %question.id, "Re-delivered pending question after restoration" ); } } /// Generate restoration context message for Claude. /// This message is injected into the conversation to inform Claude about the restoration. pub fn generate_restoration_context_message(context: &SupervisorRestorationContext) -> String { let mut message = String::new(); message.push_str("=== SUPERVISOR RESTORATION NOTICE ===\n\n"); message.push_str(&format!("This supervisor has been restored after interruption. {}\n\n", context.restoration_context_message)); message.push_str(&format!("Restoration count: {}\n", context.restoration_count)); if !context.pending_questions.is_empty() { message.push_str(&format!("\nPending questions ({}): These have been re-delivered to the user.\n", context.pending_questions.len())); for q in &context.pending_questions { message.push_str(&format!(" - {}: {}\n", q.id, q.question)); } } if !context.waiting_task_ids.is_empty() { message.push_str(&format!("\nWaiting on {} task(s) to complete. Check their status before continuing.\n", context.waiting_task_ids.len())); } if !context.spawned_task_ids.is_empty() { message.push_str(&format!("\n{} task(s) were spawned before interruption. Their status may need verification.\n", context.spawned_task_ids.len())); } if !context.warnings.is_empty() { message.push_str("\nWarnings:\n"); for warning in &context.warnings { message.push_str(&format!(" - {}\n", warning)); } } message.push_str("\n=== END RESTORATION NOTICE ===\n"); message } // ============================================================================= // Order Creation from Directive Tasks // ============================================================================= /// Request to create an order from a directive task. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct CreateOrderForTaskRequest { pub title: String, #[serde(default)] pub description: Option, #[serde(default = "default_order_priority")] pub priority: String, #[serde(default = "default_order_type")] pub order_type: String, #[serde(default = "default_order_labels")] pub labels: serde_json::Value, #[serde(default)] pub repository_url: Option, } fn default_order_priority() -> String { "medium".to_string() } fn default_order_type() -> String { "spike".to_string() } fn default_order_labels() -> serde_json::Value { serde_json::json!([]) } /// Create an order for future work from a directive task. /// /// Only spike and chore order types are allowed. The order is automatically /// linked to the directive associated with the calling task. #[utoipa::path( post, path = "/api/v1/mesh/supervisor/orders", request_body = CreateOrderForTaskRequest, responses( (status = 201, description = "Order created"), (status = 400, description = "Invalid order type"), (status = 401, description = "Unauthorized"), (status = 403, description = "Forbidden - not a supervisor/directive task"), (status = 500, description = "Internal server error"), ), tag = "Mesh Supervisor" )] pub async fn create_order_for_task( State(state): State, headers: HeaderMap, Json(request): Json, ) -> impl IntoResponse { let (task_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { Ok(ids) => ids, Err(e) => return e.into_response(), }; let pool = state.db_pool.as_ref().unwrap(); // Validate order_type is spike or chore if request.order_type != "spike" && request.order_type != "chore" { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( "INVALID_ORDER_TYPE", "Only spike and chore order types are allowed from directive tasks", )), ) .into_response(); } // Get the task to find its directive_id let task = match repository::get_task(pool, task_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(); } Err(e) => { tracing::error!(error = %e, "Failed to get task"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to get task")), ) .into_response(); } }; let directive_id = match task.directive_id { Some(id) => id, None => { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( "NO_DIRECTIVE", "Task is not associated with a directive", )), ) .into_response(); } }; // Determine repository_url: use request value, or fall back to directive's repository_url let repository_url = if request.repository_url.is_some() { request.repository_url } else { // Look up directive for its repository_url match repository::get_directive_for_owner(pool, owner_id, directive_id).await { Ok(Some(d)) => d.repository_url, _ => None, } }; // Create the order let order_req = CreateOrderRequest { title: request.title, description: request.description, priority: Some(request.priority), status: Some("open".to_string()), order_type: Some(request.order_type), labels: request.labels, directive_id, repository_url, dog_id: None, }; match repository::create_order(pool, owner_id, order_req).await { Ok(order) => ( StatusCode::CREATED, Json(serde_json::json!({ "id": order.id, "title": order.title, "description": order.description, "priority": order.priority, "status": order.status, "orderType": order.order_type, "directiveId": order.directive_id, "labels": order.labels, "repositoryUrl": order.repository_url, "createdAt": order.created_at, })), ) .into_response(), Err(e) => { tracing::error!(error = %e, "Failed to create order"); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to create order")), ) .into_response() } } }