summaryrefslogblamecommitdiff
path: root/makima/src/server/handlers/mesh_supervisor.rs
blob: 5e742518db13e8bb12b787a8a56c69622c771967 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11










                                                                                       
                   



                                    
                                                                                 
                          
                                       

                                                              
                                                                                                       













                                                                                
                                                                                                  
                                       



                                                                                




















                                              













                                                            


                                                                                       


                                                                                      









                                                                        












































                                                  


                                                


                                                                        

 








































































































































                                                                                                        

                                                                                        


                                                                                    





                                                                        
                                                                                      







                                                                                               





                                                                                  



















                                                                                               
 












                                                                                             
 






                                                 
 







                                                                                                               
 



























                                                                                                                 
















                                                                                                              
                                   
                                                                                     
                                       

                           
                                            

                                                                                                        
          
 









                                                                                                                   
         
 

                                                                                                          

     

                                

 














































































































                                                                                                                     

                                                                                                        















                                                                          




                                                                               

                

            

      































                                                                                                                                                       

                          


                                                                                                        




                                         
                                               

                                               
                           








                                                       

                                    
                                    




















                                                                                          














                                                          









                                                                    

                                               
                                


                                                  























                                                                                                                    
                                                


                                                     
                                           



                                                                  
                                       




                                                                                                                  
                                     
                                       

                                           

                                     
                                                

                                                                                                               



                                                                                                   






                                                                                                               
                    
                                                                                                                                    









                                                                                        




























































                                                                                                          




                  
                                                             


































































                                                                                                 


                                                  



                                                                                   
                                                                                 
                                                      


                                                                                                    
              

























                                                                                                       

                     
















                                                                                                 

                         




















































































































                                                                                                                
                                                                                                
                                                     
                                                                                            

                                                              
                                                                    






























                                                                                        
                             
















                                                                      











                                                                            
 









                                                                                      
     






                                                                              






















































































































                                                                                           
                       

                             






















































































































































                                                                                                 


                                                            














                                                                                      













































                                                                                                 





















                                                                            
                                                                                                 





                                               

                                                                      



                                      
                                                                              


                              
                                                                         

                                                  
                                                                                 



                              
                                        


                                               
                                                                                  


                          


                                                         

                                                                     
                                                                                 
                                       
                               

                                     
                                              
                                       









                                                                                      





                                                                                          
                                                              





















                                                  
                                           











                                                             
                                           







                                                                                             












































































                                                                                                 





































































                                                                                                
                             
                                      

      




                                                                  
                                             
                                               































                                                                                                     

































                                                                                                                        









                                      




















                                                                                                 

                                                















































                                                                                                      
                                         
                                           

















































                                                                                           
                                                                                        






                                               





























                                                                                                             



                                                            






























































































































                                                                                                                          

                                                                          
                                            

















                                                                                         








                                                        


















                                                                                         


                                               


                                                                                                             



                                               
                                                                           





                                                                   
 






















                                                                                                                                    

















                                                                                                                     
                                       

                                                                                                                          

                               
                                                
                                                                                      


























                                                                                                              































































                                                                                            
                                      




                                                          
                                








































































































                                                                                                           







                                                                                           










































                                                                                      











































































































































                                                                                                                                                                                                        
                                                                                

















































                                                                                                
                                                          







































                                                                                                                              
//! HTTP handlers for supervisor-specific mesh operations.
//!
//! These endpoints are used by supervisor tasks (via supervisor.sh) to orchestrate
//! contract work: spawning tasks, waiting for completion, reading worktree files, etc.

use axum::{
    extract::{Path, State},
    http::{HeaderMap, StatusCode},
    response::IntoResponse,
    Json,
};
use base64::Engine;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use uuid::Uuid;

use crate::db::models::{CreateTaskRequest, Task, TaskSummary, UpdateTaskRequest};
use crate::db::repository;
use crate::server::auth::Authenticated;
use crate::server::handlers::mesh::{extract_auth, AuthSource};
use crate::server::messages::ApiError;
use crate::server::state::{DaemonCommand, SharedState, TaskOutputNotification, TaskUpdateNotification};

// =============================================================================
// Request/Response Types
// =============================================================================

/// Request to spawn a new task from supervisor.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct SpawnTaskRequest {
    pub name: String,
    pub plan: String,
    pub contract_id: Uuid,
    pub parent_task_id: Option<Uuid>,
    pub checkpoint_sha: Option<String>,
    /// Repository URL for the task (optional - if not provided, will be looked up from contract).
    pub repository_url: Option<String>,
    /// If true, create a separate worktree for the task (requires merge after).
    /// If false (default), the task shares the supervisor's worktree.
    #[serde(default)]
    pub use_own_worktree: bool,
}

/// Request to wait for task completion.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct WaitForTaskRequest {
    #[serde(default = "default_timeout")]
    pub timeout_seconds: i32,
}

fn default_timeout() -> i32 {
    300
}

/// Request to read a file from task worktree.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ReadWorktreeFileRequest {
    pub file_path: String,
}

/// Request to ask a question and wait for user feedback.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct AskQuestionRequest {
    /// The question to ask the user
    pub question: String,
    /// Optional choices (if empty, free-form text response)
    #[serde(default)]
    pub choices: Vec<String>,
    /// Optional context about what this relates to
    pub context: Option<String>,
    /// How long to wait for a response (seconds)
    #[serde(default = "default_question_timeout")]
    pub timeout_seconds: i32,
    /// When true, the request will block indefinitely until user responds (no timeout)
    #[serde(default)]
    pub phaseguard: bool,
    /// When true, allow selecting multiple choices (response will be comma-separated)
    #[serde(default)]
    pub multi_select: bool,
    /// When true, return immediately without waiting for response
    #[serde(default)]
    pub non_blocking: bool,
    /// Question type: general, phase_confirmation, or contract_complete
    #[serde(default = "default_question_type")]
    pub question_type: String,
}

fn default_question_type() -> String {
    "general".to_string()
}

fn default_question_timeout() -> i32 {
    3600 // 1 hour default
}

/// Response from asking a question.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct AskQuestionResponse {
    /// The question ID for tracking
    pub question_id: Uuid,
    /// The user's response (None if timed out)
    pub response: Option<String>,
    /// Whether the question timed out
    pub timed_out: bool,
}

/// Request to answer a supervisor question.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct AnswerQuestionRequest {
    /// The user's response
    pub response: String,
}

/// Response to answering a question.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct AnswerQuestionResponse {
    /// Whether the answer was accepted
    pub success: bool,
}

/// Pending question summary.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct PendingQuestionSummary {
    pub question_id: Uuid,
    pub task_id: Uuid,
    pub contract_id: Uuid,
    pub question: String,
    pub choices: Vec<String>,
    pub context: Option<String>,
    pub created_at: chrono::DateTime<chrono::Utc>,
    /// Whether multiple choices can be selected
    #[serde(default)]
    pub multi_select: bool,
    /// Question type: general, phase_confirmation, or contract_complete
    #[serde(default)]
    pub question_type: String,
}

/// Request to create a checkpoint.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateCheckpointRequest {
    pub message: String,
}

/// Response for task tree.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct TaskTreeResponse {
    pub tasks: Vec<TaskSummary>,
    pub supervisor_task_id: Option<Uuid>,
    pub total_count: usize,
}

/// Response for wait operation.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct WaitResponse {
    pub task_id: Uuid,
    pub status: String,
    pub completed: bool,
    pub output_summary: Option<String>,
}

/// Response for read file operation.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ReadFileResponse {
    pub task_id: Uuid,
    pub file_path: String,
    pub content: String,
    pub exists: bool,
}

/// Response for checkpoint operations.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CheckpointResponse {
    pub task_id: Uuid,
    pub checkpoint_number: i32,
    pub commit_sha: String,
    pub message: String,
}

/// Task checkpoint info.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct TaskCheckpoint {
    pub id: Uuid,
    pub task_id: Uuid,
    pub checkpoint_number: i32,
    pub commit_sha: String,
    pub branch_name: String,
    pub message: String,
    pub files_changed: Option<serde_json::Value>,
    pub lines_added: i32,
    pub lines_removed: i32,
    pub created_at: chrono::DateTime<chrono::Utc>,
}

/// Response for list checkpoints.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CheckpointListResponse {
    pub task_id: Uuid,
    pub checkpoints: Vec<TaskCheckpoint>,
}

// =============================================================================
// Helper Functions
// =============================================================================

/// Verify the request comes from a supervisor task and extract ownership info.
async fn verify_supervisor_auth(
    state: &SharedState,
    headers: &HeaderMap,
    contract_id: Option<Uuid>,
) -> Result<(Uuid, Uuid), (StatusCode, Json<ApiError>)> {
    let auth = extract_auth(state, headers);

    let task_id = match auth {
        AuthSource::ToolKey(task_id) => task_id,
        _ => {
            return Err((
                StatusCode::UNAUTHORIZED,
                Json(ApiError::new("UNAUTHORIZED", "Supervisor endpoints require tool key auth")),
            ));
        }
    };

    // Get the task to verify it's a supervisor and get owner_id
    let pool = state.db_pool.as_ref().ok_or_else(|| {
        (
            StatusCode::SERVICE_UNAVAILABLE,
            Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
        )
    })?;

    let task = repository::get_task(pool, task_id)
        .await
        .map_err(|e| {
            tracing::error!(error = %e, "Failed to get supervisor task");
            (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", "Failed to verify supervisor")),
            )
        })?
        .ok_or_else(|| {
            (
                StatusCode::NOT_FOUND,
                Json(ApiError::new("NOT_FOUND", "Task not found")),
            )
        })?;

    // Verify task is a supervisor
    if !task.is_supervisor {
        return Err((
            StatusCode::FORBIDDEN,
            Json(ApiError::new("NOT_SUPERVISOR", "Only supervisor tasks can use these endpoints")),
        ));
    }

    // If contract_id provided, verify the supervisor belongs to that contract
    if let Some(cid) = contract_id {
        if task.contract_id != Some(cid) {
            return Err((
                StatusCode::FORBIDDEN,
                Json(ApiError::new("CONTRACT_MISMATCH", "Supervisor does not belong to this contract")),
            ));
        }
    }

    Ok((task_id, task.owner_id))
}

/// Try to start a pending task on an available daemon.
/// Returns Ok(Some(task)) if a task was started, Ok(None) if no tasks could be started.
/// For retried tasks, excludes daemons that previously failed the task and includes
/// checkpoint patch data for worktree recovery.
pub async fn try_start_pending_task(
    state: &SharedState,
    contract_id: Uuid,
    owner_id: Uuid,
) -> Result<Option<Task>, String> {
    let pool = state.db_pool.as_ref().ok_or("Database not configured")?;

    // Get pending tasks for this contract (includes interrupted tasks awaiting retry)
    let pending_tasks = repository::get_pending_tasks_for_contract(pool, contract_id, owner_id)
        .await
        .map_err(|e| format!("Failed to get pending tasks: {}", e))?;

    if pending_tasks.is_empty() {
        return Ok(None);
    }

    // Get contract to check local_only flag
    let contract = repository::get_contract_for_owner(pool, contract_id, owner_id)
        .await
        .map_err(|e| format!("Failed to get contract: {}", e))?
        .ok_or_else(|| "Contract not found".to_string())?;

    // Try each pending task until we find one we can start
    for task in &pending_tasks {
        // Get excluded daemon IDs for this task (daemons that have already failed it)
        let exclude_ids: Vec<Uuid> = task.failed_daemon_ids.clone().unwrap_or_default();

        // Get available daemons excluding failed ones for this task
        let daemons = repository::get_available_daemons_excluding(pool, owner_id, &exclude_ids)
            .await
            .map_err(|e| format!("Failed to get available daemons: {}", e))?;

        // Find a daemon with capacity
        let available_daemon = daemons.iter().find(|d| {
            d.current_task_count < d.max_concurrent_tasks
                && state.daemon_connections.contains_key(&d.connection_id)
        });

        let daemon = match available_daemon {
            Some(d) => d,
            None => continue, // Try next task
        };

        // Get repo URL from task or contract
        let repo_url = if let Some(url) = &task.repository_url {
            Some(url.clone())
        } else {
            match repository::list_contract_repositories(pool, contract_id).await {
                Ok(repos) => repos
                    .iter()
                    .find(|r| r.is_primary)
                    .or(repos.first())
                    .and_then(|r| r.repository_url.clone().or_else(|| r.local_path.clone())),
                Err(_) => None,
            }
        };

        // Update task with daemon assignment
        let update_req = UpdateTaskRequest {
            status: Some("starting".to_string()),
            daemon_id: Some(daemon.id),
            version: Some(task.version),
            ..Default::default()
        };

        let updated_task = match repository::update_task_for_owner(pool, task.id, owner_id, update_req).await {
            Ok(Some(t)) => t,
            Ok(None) => continue, // Task was modified concurrently, try next
            Err(e) => {
                tracing::warn!(task_id = %task.id, error = %e, "Failed to update task for daemon assignment");
                continue; // Try next task
            }
        };

        // For retried tasks, fetch checkpoint patch for worktree recovery
        let (patch_data, patch_base_sha) = if task.retry_count > 0 {
            // This is a retry - try to restore from checkpoint
            match repository::get_latest_checkpoint_patch(pool, task.id).await {
                Ok(Some(patch)) => {
                    tracing::info!(
                        task_id = %task.id,
                        retry_count = task.retry_count,
                        patch_size = patch.patch_size_bytes,
                        base_sha = %patch.base_commit_sha,
                        "Including checkpoint patch for task retry recovery"
                    );
                    let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data);
                    (Some(encoded), Some(patch.base_commit_sha))
                }
                Ok(None) => {
                    tracing::debug!(task_id = %task.id, "No checkpoint patch found for retry");
                    (None, None)
                }
                Err(e) => {
                    tracing::warn!(task_id = %task.id, error = %e, "Failed to fetch checkpoint patch for retry");
                    (None, None)
                }
            }
        } else {
            (None, None)
        };

        // Send spawn command
        let cmd = DaemonCommand::SpawnTask {
            task_id: updated_task.id,
            task_name: updated_task.name.clone(),
            plan: updated_task.plan.clone(),
            repo_url,
            base_branch: updated_task.base_branch.clone(),
            target_branch: updated_task.target_branch.clone(),
            parent_task_id: updated_task.parent_task_id,
            depth: updated_task.depth,
            is_orchestrator: false,
            target_repo_path: updated_task.target_repo_path.clone(),
            completion_action: updated_task.completion_action.clone(),
            continue_from_task_id: updated_task.continue_from_task_id,
            copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
            contract_id: updated_task.contract_id,
            is_supervisor: false,
            autonomous_loop: false,
            resume_session: task.retry_count > 0, // Use --continue for retried tasks
            conversation_history: None,
            patch_data,
            patch_base_sha,
            local_only: contract.local_only,
            // For retried tasks, use their own worktree (they already have state from previous attempt)
            supervisor_worktree_task_id: None,
        };

        if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
            tracing::warn!(error = %e, daemon_id = %daemon.id, task_id = %task.id, "Failed to send spawn command");
            // Rollback
            let rollback_req = UpdateTaskRequest {
                status: Some("pending".to_string()),
                clear_daemon_id: true,
                ..Default::default()
            };
            let _ = repository::update_task_for_owner(pool, task.id, owner_id, rollback_req).await;
            continue; // Try next task
        }

        tracing::info!(task_id = %task.id, daemon_id = %daemon.id, "Started pending task from wait loop");
        return Ok(Some(updated_task));
    }

    // No tasks could be started
    Ok(None)
}

// =============================================================================
// Contract Task Handlers
// =============================================================================

/// List all tasks in a contract's tree.
#[utoipa::path(
    get,
    path = "/api/v1/mesh/supervisor/contracts/{contract_id}/tasks",
    params(
        ("contract_id" = Uuid, Path, description = "Contract ID")
    ),
    responses(
        (status = 200, description = "List of tasks in contract", body = TaskTreeResponse),
        (status = 401, description = "Unauthorized"),
        (status = 403, description = "Forbidden - not a supervisor"),
        (status = 500, description = "Internal server error"),
    ),
    tag = "Mesh Supervisor"
)]
pub async fn list_contract_tasks(
    State(state): State<SharedState>,
    Path(contract_id): Path<Uuid>,
    headers: HeaderMap,
) -> impl IntoResponse {
    let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, Some(contract_id)).await {
        Ok(ids) => ids,
        Err(e) => return e.into_response(),
    };

    let pool = state.db_pool.as_ref().unwrap();

    // Get all tasks for this contract
    match repository::list_tasks_by_contract(pool, contract_id, owner_id).await {
        Ok(tasks) => {
            let supervisor_task_id = tasks.iter().find(|t| t.is_supervisor).map(|t| t.id);
            let summaries: Vec<TaskSummary> = tasks.into_iter().map(TaskSummary::from).collect();
            let total_count = summaries.len();

            (
                StatusCode::OK,
                Json(TaskTreeResponse {
                    tasks: summaries,
                    supervisor_task_id,
                    total_count,
                }),
            ).into_response()
        }
        Err(e) => {
            tracing::error!(error = %e, "Failed to list contract tasks");
            (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", "Failed to list tasks")),
            ).into_response()
        }
    }
}

/// Get full task tree structure for a contract.
#[utoipa::path(
    get,
    path = "/api/v1/mesh/supervisor/contracts/{contract_id}/tree",
    params(
        ("contract_id" = Uuid, Path, description = "Contract ID")
    ),
    responses(
        (status = 200, description = "Task tree structure", body = TaskTreeResponse),
        (status = 401, description = "Unauthorized"),
        (status = 403, description = "Forbidden - not a supervisor"),
        (status = 500, description = "Internal server error"),
    ),
    tag = "Mesh Supervisor"
)]
pub async fn get_contract_tree(
    State(state): State<SharedState>,
    Path(contract_id): Path<Uuid>,
    headers: HeaderMap,
) -> impl IntoResponse {
    // Same as list_contract_tasks for now - can add tree structure later
    list_contract_tasks(State(state), Path(contract_id), headers).await
}

// =============================================================================
// Task Spawn Handler
// =============================================================================

/// Spawn a new task (supervisor only).
#[utoipa::path(
    post,
    path = "/api/v1/mesh/supervisor/tasks",
    request_body = SpawnTaskRequest,
    responses(
        (status = 201, description = "Task created", body = Task),
        (status = 400, description = "Invalid request"),
        (status = 401, description = "Unauthorized"),
        (status = 403, description = "Forbidden - not a supervisor"),
        (status = 500, description = "Internal server error"),
    ),
    tag = "Mesh Supervisor"
)]
pub async fn spawn_task(
    State(state): State<SharedState>,
    headers: HeaderMap,
    Json(request): Json<SpawnTaskRequest>,
) -> impl IntoResponse {
    let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, Some(request.contract_id)).await {
        Ok(ids) => ids,
        Err(e) => return e.into_response(),
    };

    let pool = state.db_pool.as_ref().unwrap();

    // Verify contract exists and get local_only flag
    let contract = match repository::get_contract_for_owner(pool, request.contract_id, owner_id).await {
        Ok(Some(c)) => c,
        Ok(None) => {
            return (
                StatusCode::NOT_FOUND,
                Json(ApiError::new("NOT_FOUND", "Contract not found")),
            ).into_response();
        }
        Err(e) => {
            tracing::error!(error = %e, "Failed to get contract");
            return (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", "Failed to get contract")),
            ).into_response();
        }
    };

    // Get repository URL - either from request or from contract's repositories
    let repo_url = if let Some(url) = request.repository_url.clone() {
        if !url.trim().is_empty() {
            Some(url)
        } else {
            None
        }
    } else {
        None
    };

    // If no repo URL provided, look it up from the contract
    let repo_url = match repo_url {
        Some(url) => Some(url),
        None => {
            match repository::list_contract_repositories(pool, request.contract_id).await {
                Ok(repos) => {
                    // Prefer primary repo, fallback to first repo
                    let repo = repos.iter()
                        .find(|r| r.is_primary)
                        .or(repos.first());

                    // Use repository_url if set, otherwise use local_path
                    repo.and_then(|r| {
                        r.repository_url.clone()
                            .or_else(|| r.local_path.clone())
                    })
                }
                Err(e) => {
                    tracing::warn!(error = %e, "Failed to get contract repositories");
                    None
                }
            }
        }
    };

    // Validate that we have a repo URL
    if repo_url.is_none() {
        return (
            StatusCode::BAD_REQUEST,
            Json(ApiError::new("MISSING_REPO_URL", "No repository URL found. Either provide one or ensure the contract has repositories configured.")),
        ).into_response();
    }

    // Create task request
    // Share supervisor's worktree by default; separate worktree only when explicitly requested
    let supervisor_worktree_task_id = if request.use_own_worktree { None } else { Some(supervisor_id) };

    let create_req = CreateTaskRequest {
        name: request.name.clone(),
        description: None,
        plan: request.plan.clone(),
        repository_url: repo_url.clone(),
        contract_id: Some(request.contract_id),
        parent_task_id: request.parent_task_id,
        is_supervisor: false,
        is_red_team: false,
        checkpoint_sha: request.checkpoint_sha.clone(),
        merge_mode: Some("manual".to_string()),
        priority: 0,
        base_branch: None,
        target_branch: None,
        target_repo_path: None,
        completion_action: None,
        continue_from_task_id: None,
        copy_files: None,
        branched_from_task_id: None,
        conversation_history: None,
        supervisor_worktree_task_id,
    };

    // Create task in DB
    let task = match repository::create_task_for_owner(pool, owner_id, create_req).await {
        Ok(t) => t,
        Err(e) => {
            tracing::error!(error = %e, "Failed to create task");
            return (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", "Failed to create task")),
            ).into_response();
        }
    };

    tracing::info!(
        supervisor_id = %supervisor_id,
        task_id = %task.id,
        task_name = %task.name,
        "Supervisor spawned new task"
    );

    // Record history event for task spawned by supervisor
    let _ = repository::record_history_event(
        pool,
        owner_id,
        task.contract_id,
        Some(task.id),
        "task",
        Some("spawned"),
        None,
        serde_json::json!({
            "name": &task.name,
            "spawnedBy": supervisor_id.to_string(),
        }),
    ).await;

    // Broadcast task creation notification to WebSocket subscribers
    state.broadcast_task_update(TaskUpdateNotification {
        task_id: task.id,
        owner_id: Some(owner_id),
        version: task.version,
        status: task.status.clone(),
        updated_fields: vec!["created".to_string()],
        updated_by: "supervisor".to_string(),
    });

    // Start task on a daemon
    // Find a daemon that belongs to this owner
    let mut updated_task = task;
    for entry in state.daemon_connections.iter() {
        let daemon = entry.value();
        if daemon.owner_id == owner_id {
            // IMPORTANT: Update database FIRST to assign daemon_id before sending command
            // This prevents race conditions where the task starts but daemon_id is not set
            let update_req = UpdateTaskRequest {
                status: Some("starting".to_string()),
                daemon_id: Some(daemon.id),
                version: Some(updated_task.version),
                ..Default::default()
            };

            match repository::update_task_for_owner(pool, updated_task.id, owner_id, update_req).await {
                Ok(Some(t)) => {
                    updated_task = t;
                }
                Ok(None) => {
                    tracing::warn!(task_id = %updated_task.id, "Task not found when updating daemon_id");
                    break;
                }
                Err(e) => {
                    tracing::error!(task_id = %updated_task.id, error = %e, "Failed to update task with daemon_id");
                    break;
                }
            }

            // Send spawn command to daemon
            let cmd = DaemonCommand::SpawnTask {
                task_id: updated_task.id,
                task_name: updated_task.name.clone(),
                plan: updated_task.plan.clone(),
                repo_url: repo_url.clone(),
                base_branch: updated_task.base_branch.clone(),
                target_branch: updated_task.target_branch.clone(),
                parent_task_id: updated_task.parent_task_id,
                depth: updated_task.depth,
                is_orchestrator: false,
                target_repo_path: updated_task.target_repo_path.clone(),
                completion_action: updated_task.completion_action.clone(),
                continue_from_task_id: updated_task.continue_from_task_id,
                copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
                contract_id: updated_task.contract_id,
                is_supervisor: false,
                autonomous_loop: false,
                resume_session: false,
                conversation_history: None,
                patch_data: None,
                patch_base_sha: None,
                local_only: contract.local_only,
                // Share supervisor's worktree by default; separate worktree only when explicitly requested
                supervisor_worktree_task_id: if request.use_own_worktree { None } else { Some(supervisor_id) },
            };

            if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
                tracing::warn!(error = %e, daemon_id = %daemon.id, "Failed to send spawn command");
                // Rollback: clear daemon_id and reset status since command failed
                let rollback_req = UpdateTaskRequest {
                    status: Some("pending".to_string()),
                    clear_daemon_id: true,
                    ..Default::default()
                };
                let _ = repository::update_task_for_owner(pool, updated_task.id, owner_id, rollback_req).await;
            } else {
                tracing::info!(task_id = %updated_task.id, daemon_id = %daemon.id, repo_url = ?repo_url, "Task spawn command sent");

                // Broadcast task status update notification to WebSocket subscribers
                state.broadcast_task_update(TaskUpdateNotification {
                    task_id: updated_task.id,
                    owner_id: Some(owner_id),
                    version: updated_task.version,
                    status: "starting".to_string(),
                    updated_fields: vec!["status".to_string(), "daemon_id".to_string()],
                    updated_by: "supervisor".to_string(),
                });

                // Check if we should spawn a red team task
                // Conditions:
                // 1. This is not a supervisor task
                // 2. This is not already a red team task
                // 3. Contract has red_team_enabled = true
                // 4. No red team task exists for this contract yet
                if !updated_task.is_supervisor && !updated_task.is_red_team && contract.red_team_enabled {
                    if let Some(contract_id) = updated_task.contract_id {
                        // Check if a red team task already exists
                        match repository::get_red_team_task_for_contract(pool, contract_id).await {
                            Ok(None) => {
                                // No red team task exists, spawn one
                                tracing::info!(
                                    contract_id = %contract_id,
                                    work_task_id = %updated_task.id,
                                    "Spawning red team task for contract (first work task started)"
                                );
                                match spawn_red_team_task(
                                    pool,
                                    &state,
                                    contract_id,
                                    owner_id,
                                    &contract.name,
                                    &contract.phase,
                                    contract.red_team_prompt.as_deref(),
                                ).await {
                                    Ok(red_team_task) => {
                                        tracing::info!(
                                            contract_id = %contract_id,
                                            red_team_task_id = %red_team_task.id,
                                            "Red team task spawned successfully"
                                        );
                                    }
                                    Err(e) => {
                                        // Log error but don't fail the work task spawn
                                        tracing::error!(
                                            contract_id = %contract_id,
                                            error = %e,
                                            "Failed to spawn red team task"
                                        );
                                    }
                                }
                            }
                            Ok(Some(existing)) => {
                                tracing::debug!(
                                    contract_id = %contract_id,
                                    red_team_task_id = %existing.id,
                                    "Red team task already exists for contract"
                                );
                            }
                            Err(e) => {
                                tracing::error!(
                                    contract_id = %contract_id,
                                    error = %e,
                                    "Error checking for existing red team task"
                                );
                            }
                        }
                    }
                }
            }
            break;
        }
    }

    (StatusCode::CREATED, Json(updated_task)).into_response()
}

// =============================================================================
// Wait for Task Handler
// =============================================================================

/// Wait for a task to complete.
#[utoipa::path(
    post,
    path = "/api/v1/mesh/supervisor/tasks/{task_id}/wait",
    params(
        ("task_id" = Uuid, Path, description = "Task ID to wait for")
    ),
    request_body = WaitForTaskRequest,
    responses(
        (status = 200, description = "Task completed or timed out", body = WaitResponse),
        (status = 401, description = "Unauthorized"),
        (status = 403, description = "Forbidden - not a supervisor"),
        (status = 404, description = "Task not found"),
        (status = 500, description = "Internal server error"),
    ),
    tag = "Mesh Supervisor"
)]
pub async fn wait_for_task(
    State(state): State<SharedState>,
    Path(task_id): Path<Uuid>,
    headers: HeaderMap,
    Json(request): Json<WaitForTaskRequest>,
) -> impl IntoResponse {
    let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
        Ok(ids) => ids,
        Err(e) => return e.into_response(),
    };

    let pool = state.db_pool.as_ref().unwrap();

    // Verify task belongs to same owner
    let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
        Ok(Some(t)) => t,
        Ok(None) => {
            return (
                StatusCode::NOT_FOUND,
                Json(ApiError::new("NOT_FOUND", "Task not found")),
            ).into_response();
        }
        Err(e) => {
            tracing::error!(error = %e, "Failed to get task");
            return (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", "Failed to get task")),
            ).into_response();
        }
    };

    // Check if already done
    if task.status == "done" || task.status == "failed" || task.status == "merged" {
        return (
            StatusCode::OK,
            Json(WaitResponse {
                task_id,
                status: task.status,
                completed: true,
                output_summary: None,
            }),
        ).into_response();
    }

    // Get contract_id for pending task scheduling
    let contract_id = task.contract_id;

    // Subscribe to task completions
    let mut rx = state.task_completions.subscribe();
    let timeout = tokio::time::Duration::from_secs(request.timeout_seconds as u64);

    // Wait for completion or timeout, periodically trying to start pending tasks
    let result = tokio::time::timeout(timeout, async {
        let mut pending_check_interval = tokio::time::interval(tokio::time::Duration::from_secs(5));
        pending_check_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

        loop {
            tokio::select! {
                // Check for task completion notifications
                recv_result = rx.recv() => {
                    match recv_result {
                        Ok(notification) => {
                            if notification.task_id == task_id {
                                return Some(notification);
                            }
                        }
                        Err(_) => {
                            // Channel closed or lagged - check DB directly
                            if let Ok(Some(t)) = repository::get_task(pool, task_id).await {
                                if t.status == "done" || t.status == "failed" || t.status == "merged" {
                                    return Some(crate::server::state::TaskCompletionNotification {
                                        task_id: t.id,
                                        owner_id: Some(t.owner_id),
                                        contract_id: t.contract_id,
                                        parent_task_id: t.parent_task_id,
                                        status: t.status,
                                        output_summary: None,
                                        worktree_path: None,
                                        error_message: t.error_message,
                                    });
                                }
                            }
                        }
                    }
                }
                // Periodically try to start pending tasks
                _ = pending_check_interval.tick() => {
                    if let Some(cid) = contract_id {
                        match try_start_pending_task(&state, cid, owner_id).await {
                            Ok(Some(started_task)) => {
                                tracing::debug!(
                                    task_id = %started_task.id,
                                    task_name = %started_task.name,
                                    "Started pending task while waiting"
                                );
                            }
                            Ok(None) => {
                                // No pending tasks or no capacity - that's fine
                            }
                            Err(e) => {
                                tracing::warn!(error = %e, "Error trying to start pending task");
                            }
                        }
                    }
                }
            }
        }
    }).await;

    match result {
        Ok(Some(notification)) => {
            (
                StatusCode::OK,
                Json(WaitResponse {
                    task_id,
                    status: notification.status,
                    completed: true,
                    output_summary: notification.output_summary,
                }),
            ).into_response()
        }
        Ok(None) | Err(_) => {
            // Timeout - check final status
            let final_status = repository::get_task(pool, task_id)
                .await
                .ok()
                .flatten()
                .map(|t| t.status)
                .unwrap_or_else(|| "unknown".to_string());

            (
                StatusCode::OK,
                Json(WaitResponse {
                    task_id,
                    status: final_status.clone(),
                    completed: final_status == "done" || final_status == "failed" || final_status == "merged",
                    output_summary: None,
                }),
            ).into_response()
        }
    }
}

// =============================================================================
// Read Worktree File Handler
// =============================================================================

/// Read a file from a task's worktree.
#[utoipa::path(
    post,
    path = "/api/v1/mesh/supervisor/tasks/{task_id}/read-file",
    params(
        ("task_id" = Uuid, Path, description = "Task ID")
    ),
    request_body = ReadWorktreeFileRequest,
    responses(
        (status = 200, description = "File content", body = ReadFileResponse),
        (status = 401, description = "Unauthorized"),
        (status = 403, description = "Forbidden - not a supervisor"),
        (status = 404, description = "Task not found"),
        (status = 500, description = "Internal server error"),
    ),
    tag = "Mesh Supervisor"
)]
pub async fn read_worktree_file(
    State(state): State<SharedState>,
    Path(task_id): Path<Uuid>,
    headers: HeaderMap,
    Json(request): Json<ReadWorktreeFileRequest>,
) -> impl IntoResponse {
    let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
        Ok(ids) => ids,
        Err(e) => return e.into_response(),
    };

    let pool = state.db_pool.as_ref().unwrap();

    // Get task to verify ownership
    let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
        Ok(Some(t)) => t,
        Ok(None) => {
            return (
                StatusCode::NOT_FOUND,
                Json(ApiError::new("NOT_FOUND", "Task not found")),
            ).into_response();
        }
        Err(e) => {
            tracing::error!(error = %e, "Failed to get task");
            return (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", "Failed to get task")),
            ).into_response();
        }
    };

    // TODO: Implement file reading via worktree path
    // For now, return not implemented - supervisor should use local file access via worktree
    let _ = (task, request);

    (
        StatusCode::NOT_IMPLEMENTED,
        Json(ApiError::new(
            "NOT_IMPLEMENTED",
            "Worktree file reading via API not yet implemented. Use local filesystem access via worktree path.",
        )),
    ).into_response()
}

// =============================================================================
// Checkpoint Handlers
// =============================================================================

/// Create a git checkpoint for a task.
#[utoipa::path(
    post,
    path = "/api/v1/mesh/tasks/{task_id}/checkpoint",
    params(
        ("task_id" = Uuid, Path, description = "Task ID")
    ),
    request_body = CreateCheckpointRequest,
    responses(
        (status = 202, description = "Checkpoint creation accepted", body = CheckpointResponse),
        (status = 401, description = "Unauthorized"),
        (status = 403, description = "Forbidden - can only create checkpoint for own task"),
        (status = 404, description = "Task not found"),
        (status = 500, description = "Internal server error"),
        (status = 503, description = "Task has no assigned daemon"),
    ),
    tag = "Mesh Supervisor"
)]
pub async fn create_checkpoint(
    State(state): State<SharedState>,
    Path(task_id): Path<Uuid>,
    headers: HeaderMap,
    Json(request): Json<CreateCheckpointRequest>,
) -> impl IntoResponse {
    let auth = extract_auth(&state, &headers);

    let task_id_from_auth = match auth {
        AuthSource::ToolKey(tid) => tid,
        _ => {
            return (
                StatusCode::UNAUTHORIZED,
                Json(ApiError::new("UNAUTHORIZED", "Tool key required")),
            ).into_response();
        }
    };

    // Can only create checkpoint for own task
    if task_id_from_auth != task_id {
        return (
            StatusCode::FORBIDDEN,
            Json(ApiError::new("FORBIDDEN", "Can only create checkpoint for own task")),
        ).into_response();
    }

    let pool = state.db_pool.as_ref().unwrap();

    // Get task and daemon_id
    let task = match repository::get_task(pool, task_id).await {
        Ok(Some(t)) => t,
        Ok(None) => {
            return (
                StatusCode::NOT_FOUND,
                Json(ApiError::new("NOT_FOUND", "Task not found")),
            ).into_response();
        }
        Err(e) => {
            tracing::error!(error = %e, "Failed to get task");
            return (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", "Failed to get task")),
            ).into_response();
        }
    };

    let Some(daemon_id) = task.daemon_id else {
        return (
            StatusCode::SERVICE_UNAVAILABLE,
            Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")),
        ).into_response();
    };

    // Send CreateCheckpoint command to daemon
    let cmd = DaemonCommand::CreateCheckpoint {
        task_id,
        message: request.message.clone(),
    };

    if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
        tracing::error!(error = %e, "Failed to send CreateCheckpoint command");
        return (
            StatusCode::INTERNAL_SERVER_ERROR,
            Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")),
        ).into_response();
    }

    // Return accepted - the checkpoint result will be delivered via WebSocket
    // and stored in the database by the daemon message handler
    (
        StatusCode::ACCEPTED,
        Json(CheckpointResponse {
            task_id,
            checkpoint_number: 0, // Will be assigned by DB on actual creation
            commit_sha: "pending".to_string(),
            message: request.message,
        }),
    ).into_response()
}

/// List checkpoints for a task.
#[utoipa::path(
    get,
    path = "/api/v1/mesh/tasks/{task_id}/checkpoints",
    params(
        ("task_id" = Uuid, Path, description = "Task ID")
    ),
    responses(
        (status = 200, description = "List of checkpoints", body = CheckpointListResponse),
        (status = 401, description = "Unauthorized"),
        (status = 404, description = "Task not found"),
        (status = 500, description = "Internal server error"),
    ),
    tag = "Mesh Supervisor"
)]
pub async fn list_checkpoints(
    State(state): State<SharedState>,
    Path(task_id): Path<Uuid>,
    headers: HeaderMap,
) -> impl IntoResponse {
    let auth = extract_auth(&state, &headers);

    let _task_id_from_auth = match auth {
        AuthSource::ToolKey(tid) => tid,
        _ => {
            return (
                StatusCode::UNAUTHORIZED,
                Json(ApiError::new("UNAUTHORIZED", "Tool key required")),
            ).into_response();
        }
    };

    let pool = state.db_pool.as_ref().unwrap();

    // Get checkpoints from DB
    match repository::list_task_checkpoints(pool, task_id).await {
        Ok(checkpoints) => {
            let checkpoint_list: Vec<TaskCheckpoint> = checkpoints
                .into_iter()
                .map(|c| TaskCheckpoint {
                    id: c.id,
                    task_id: c.task_id,
                    checkpoint_number: c.checkpoint_number,
                    commit_sha: c.commit_sha,
                    branch_name: c.branch_name,
                    message: c.message,
                    files_changed: c.files_changed,
                    lines_added: c.lines_added.unwrap_or(0),
                    lines_removed: c.lines_removed.unwrap_or(0),
                    created_at: c.created_at,
                })
                .collect();

            (
                StatusCode::OK,
                Json(CheckpointListResponse {
                    task_id,
                    checkpoints: checkpoint_list,
                }),
            ).into_response()
        }
        Err(e) => {
            tracing::error!(error = %e, "Failed to list checkpoints");
            (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", "Failed to list checkpoints")),
            ).into_response()
        }
    }
}

// =============================================================================
// Git Operations - Request/Response Types
// =============================================================================

/// Request to create a new branch.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateBranchRequest {
    pub branch_name: String,
    pub from_ref: Option<String>,
}

/// Response for branch creation.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateBranchResponse {
    pub success: bool,
    pub branch_name: String,
    pub message: String,
}

/// Request to merge task changes.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct MergeTaskRequest {
    pub target_branch: Option<String>,
    #[serde(default)]
    pub squash: bool,
}

/// Response for merge operation.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct MergeTaskResponse {
    pub task_id: Uuid,
    pub success: bool,
    pub message: String,
    pub commit_sha: Option<String>,
    pub conflicts: Option<Vec<String>>,
}

/// Request to create a pull request.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreatePRRequest {
    pub branch: String,
    pub title: String,
    pub body: Option<String>,
}

/// Response for PR creation.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreatePRResponse {
    pub task_id: Uuid,
    pub success: bool,
    pub message: String,
    pub pr_url: Option<String>,
    pub pr_number: Option<i32>,
}

/// Response for task diff.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct TaskDiffResponse {
    pub task_id: Uuid,
    pub success: bool,
    pub diff: Option<String>,
    pub error: Option<String>,
}

// =============================================================================
// Git Operations - Handlers
// =============================================================================

/// Create a new branch from supervisor's worktree.
#[utoipa::path(
    post,
    path = "/api/v1/mesh/supervisor/branches",
    request_body = CreateBranchRequest,
    responses(
        (status = 201, description = "Branch created", body = CreateBranchResponse),
        (status = 400, description = "Invalid request"),
        (status = 401, description = "Unauthorized"),
        (status = 403, description = "Forbidden - not a supervisor"),
        (status = 500, description = "Internal server error"),
    ),
    tag = "Mesh Supervisor"
)]
pub async fn create_branch(
    State(state): State<SharedState>,
    headers: HeaderMap,
    Json(request): Json<CreateBranchRequest>,
) -> impl IntoResponse {
    let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
        Ok(ids) => ids,
        Err(e) => return e.into_response(),
    };

    // Find daemon running supervisor
    let daemon_id = {
        let pool = state.db_pool.as_ref().unwrap();
        match repository::get_task(pool, supervisor_id).await {
            Ok(Some(task)) => task.daemon_id,
            _ => None,
        }
    };

    let Some(daemon_id) = daemon_id else {
        return (
            StatusCode::SERVICE_UNAVAILABLE,
            Json(ApiError::new("NO_DAEMON", "Supervisor has no assigned daemon")),
        ).into_response();
    };

    // Send CreateBranch command to daemon
    let cmd = DaemonCommand::CreateBranch {
        task_id: supervisor_id,
        branch_name: request.branch_name.clone(),
        from_ref: request.from_ref,
    };

    if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
        tracing::error!(error = %e, "Failed to send CreateBranch command");
        return (
            StatusCode::INTERNAL_SERVER_ERROR,
            Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")),
        ).into_response();
    }

    // Note: Real implementation would wait for daemon response
    // For now, return success immediately - daemon will send response via WebSocket
    (
        StatusCode::CREATED,
        Json(CreateBranchResponse {
            success: true,
            branch_name: request.branch_name,
            message: "Branch creation command sent".to_string(),
        }),
    ).into_response()
}

/// Merge a task's changes to a target branch.
#[utoipa::path(
    post,
    path = "/api/v1/mesh/supervisor/tasks/{task_id}/merge",
    params(
        ("task_id" = Uuid, Path, description = "Task ID to merge")
    ),
    request_body = MergeTaskRequest,
    responses(
        (status = 200, description = "Merge initiated", body = MergeTaskResponse),
        (status = 400, description = "Invalid request"),
        (status = 401, description = "Unauthorized"),
        (status = 403, description = "Forbidden - not a supervisor"),
        (status = 404, description = "Task not found"),
        (status = 500, description = "Internal server error"),
    ),
    tag = "Mesh Supervisor"
)]
pub async fn merge_task(
    State(state): State<SharedState>,
    Path(task_id): Path<Uuid>,
    headers: HeaderMap,
    Json(request): Json<MergeTaskRequest>,
) -> impl IntoResponse {
    let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
        Ok(ids) => ids,
        Err(e) => return e.into_response(),
    };

    let pool = state.db_pool.as_ref().unwrap();

    // Get the target task
    let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
        Ok(Some(t)) => t,
        Ok(None) => {
            return (
                StatusCode::NOT_FOUND,
                Json(ApiError::new("NOT_FOUND", "Task not found")),
            ).into_response();
        }
        Err(e) => {
            tracing::error!(error = %e, "Failed to get task");
            return (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", "Failed to get task")),
            ).into_response();
        }
    };

    // Get daemon running the task
    let Some(daemon_id) = task.daemon_id else {
        return (
            StatusCode::SERVICE_UNAVAILABLE,
            Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")),
        ).into_response();
    };

    // Subscribe to merge results BEFORE sending the command
    let mut rx = state.merge_results.subscribe();

    // Send MergeTaskToTarget command to daemon
    let cmd = DaemonCommand::MergeTaskToTarget {
        task_id,
        target_branch: request.target_branch,
        squash: request.squash,
    };

    if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
        tracing::error!(error = %e, "Failed to send MergeTaskToTarget command");
        return (
            StatusCode::INTERNAL_SERVER_ERROR,
            Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")),
        ).into_response();
    }

    // Wait for the merge result with a timeout (60 seconds should be plenty for a merge)
    let timeout = tokio::time::Duration::from_secs(60);
    let result = tokio::time::timeout(timeout, async {
        loop {
            match rx.recv().await {
                Ok(notification) => {
                    if notification.task_id == task_id {
                        return Some(notification);
                    }
                    // Not our task, keep waiting
                }
                Err(_) => {
                    // Channel closed or lagged
                    return None;
                }
            }
        }
    }).await;

    match result {
        Ok(Some(notification)) => {
            (
                StatusCode::OK,
                Json(MergeTaskResponse {
                    task_id,
                    success: notification.success,
                    message: notification.message,
                    commit_sha: notification.commit_sha,
                    conflicts: notification.conflicts,
                }),
            ).into_response()
        }
        Ok(None) | Err(_) => {
            // Timeout or channel error - return error status
            (
                StatusCode::GATEWAY_TIMEOUT,
                Json(MergeTaskResponse {
                    task_id,
                    success: false,
                    message: "Merge operation timed out waiting for daemon response".to_string(),
                    commit_sha: None,
                    conflicts: None,
                }),
            ).into_response()
        }
    }
}

/// Create a pull request for a task's changes.
#[utoipa::path(
    post,
    path = "/api/v1/mesh/supervisor/pr",
    request_body = CreatePRRequest,
    responses(
        (status = 201, description = "PR created", body = CreatePRResponse),
        (status = 400, description = "Invalid request"),
        (status = 401, description = "Unauthorized"),
        (status = 403, description = "Forbidden - not a supervisor"),
        (status = 404, description = "Task not found"),
        (status = 500, description = "Internal server error"),
    ),
    tag = "Mesh Supervisor"
)]
pub async fn create_pr(
    State(state): State<SharedState>,
    headers: HeaderMap,
    Json(request): Json<CreatePRRequest>,
) -> impl IntoResponse {
    let (supervisor_id, _owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
        Ok(ids) => ids,
        Err(e) => return e.into_response(),
    };

    let pool = state.db_pool.as_ref().unwrap();

    // Get the supervisor's own task to find daemon and base_branch
    let task = match repository::get_task(pool, supervisor_id).await {
        Ok(Some(t)) => t,
        Ok(None) => {
            return (
                StatusCode::NOT_FOUND,
                Json(ApiError::new("NOT_FOUND", "Supervisor task not found")),
            ).into_response();
        }
        Err(e) => {
            tracing::error!(error = %e, "Failed to get supervisor task");
            return (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", "Failed to get supervisor task")),
            ).into_response();
        }
    };

    // Get daemon running the supervisor
    let Some(daemon_id) = task.daemon_id else {
        return (
            StatusCode::SERVICE_UNAVAILABLE,
            Json(ApiError::new("NO_DAEMON", "Supervisor has no assigned daemon")),
        ).into_response();
    };

    // Subscribe to PR results BEFORE sending the command
    let mut rx = state.pr_results.subscribe();

    // Send CreatePR command to daemon using the supervisor's task ID
    // (the branch is in the supervisor's worktree)
    // Pass base_branch from task if available, otherwise daemon will auto-detect
    let cmd = DaemonCommand::CreatePR {
        task_id: supervisor_id,
        title: request.title.clone(),
        body: request.body.clone(),
        base_branch: task.base_branch.clone(),
        branch: request.branch.clone(),
    };

    if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
        tracing::error!(error = %e, "Failed to send CreatePR command");
        return (
            StatusCode::INTERNAL_SERVER_ERROR,
            Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")),
        ).into_response();
    }

    // Wait for the PR result with a timeout (60 seconds should be plenty for PR creation)
    let timeout = tokio::time::Duration::from_secs(60);
    let result = tokio::time::timeout(timeout, async {
        loop {
            match rx.recv().await {
                Ok(notification) => {
                    if notification.task_id == supervisor_id {
                        return Some(notification);
                    }
                    // Not our task, keep waiting
                }
                Err(_) => {
                    // Channel closed or lagged
                    return None;
                }
            }
        }
    }).await;

    match result {
        Ok(Some(notification)) => {
            let status = if notification.success {
                StatusCode::CREATED
            } else {
                StatusCode::INTERNAL_SERVER_ERROR
            };
            (
                status,
                Json(CreatePRResponse {
                    task_id: supervisor_id,
                    success: notification.success,
                    message: notification.message,
                    pr_url: notification.pr_url,
                    pr_number: notification.pr_number,
                }),
            ).into_response()
        }
        Ok(None) | Err(_) => {
            // Timeout or channel error - return error status
            (
                StatusCode::GATEWAY_TIMEOUT,
                Json(CreatePRResponse {
                    task_id: supervisor_id,
                    success: false,
                    message: "PR creation timed out waiting for daemon response".to_string(),
                    pr_url: None,
                    pr_number: None,
                }),
            ).into_response()
        }
    }
}

/// Get the diff for a task's changes.
#[utoipa::path(
    get,
    path = "/api/v1/mesh/supervisor/tasks/{task_id}/diff",
    params(
        ("task_id" = Uuid, Path, description = "Task ID")
    ),
    responses(
        (status = 200, description = "Task diff", body = TaskDiffResponse),
        (status = 401, description = "Unauthorized"),
        (status = 403, description = "Forbidden - not a supervisor"),
        (status = 404, description = "Task not found"),
        (status = 500, description = "Internal server error"),
    ),
    tag = "Mesh Supervisor"
)]
pub async fn get_task_diff(
    State(state): State<SharedState>,
    Path(task_id): Path<Uuid>,
    headers: HeaderMap,
) -> impl IntoResponse {
    let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
        Ok(ids) => ids,
        Err(e) => return e.into_response(),
    };

    let pool = state.db_pool.as_ref().unwrap();

    // Get the target task
    let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
        Ok(Some(t)) => t,
        Ok(None) => {
            return (
                StatusCode::NOT_FOUND,
                Json(ApiError::new("NOT_FOUND", "Task not found")),
            ).into_response();
        }
        Err(e) => {
            tracing::error!(error = %e, "Failed to get task");
            return (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", "Failed to get task")),
            ).into_response();
        }
    };

    // Get daemon running the task
    let Some(daemon_id) = task.daemon_id else {
        return (
            StatusCode::SERVICE_UNAVAILABLE,
            Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")),
        ).into_response();
    };

    // Send GetTaskDiff command to daemon
    let cmd = DaemonCommand::GetTaskDiff { task_id };

    if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
        tracing::error!(error = %e, "Failed to send GetTaskDiff command");
        return (
            StatusCode::INTERNAL_SERVER_ERROR,
            Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")),
        ).into_response();
    }

    (
        StatusCode::OK,
        Json(TaskDiffResponse {
            task_id,
            success: true,
            diff: None,
            error: Some("Diff command sent - response will be streamed".to_string()),
        }),
    ).into_response()
}

// =============================================================================
// Supervisor Question Handlers
// =============================================================================

/// Ask a question and wait for user feedback.
///
/// The supervisor calls this to ask a question. The endpoint will poll until
/// either the user responds or the timeout is reached.
#[utoipa::path(
    post,
    path = "/api/v1/mesh/supervisor/questions",
    request_body = AskQuestionRequest,
    responses(
        (status = 200, description = "Question answered", body = AskQuestionResponse),
        (status = 408, description = "Question timed out", body = AskQuestionResponse),
        (status = 401, description = "Unauthorized"),
        (status = 403, description = "Forbidden - not a supervisor"),
        (status = 500, description = "Internal server error"),
    ),
    security(
        ("tool_key" = [])
    ),
    tag = "Mesh Supervisor"
)]
pub async fn ask_question(
    State(state): State<SharedState>,
    headers: HeaderMap,
    Json(request): Json<AskQuestionRequest>,
) -> impl IntoResponse {
    let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
        Ok(ids) => ids,
        Err(e) => return e.into_response(),
    };

    let pool = state.db_pool.as_ref().unwrap();

    // Get the supervisor task to find its contract
    let supervisor = match repository::get_task_for_owner(pool, supervisor_id, owner_id).await {
        Ok(Some(t)) => t,
        Ok(None) => {
            return (
                StatusCode::NOT_FOUND,
                Json(ApiError::new("NOT_FOUND", "Supervisor task not found")),
            ).into_response();
        }
        Err(e) => {
            tracing::error!(error = %e, "Failed to get supervisor task");
            return (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", "Failed to get supervisor task")),
            ).into_response();
        }
    };

    let Some(contract_id) = supervisor.contract_id else {
        return (
            StatusCode::BAD_REQUEST,
            Json(ApiError::new("NO_CONTRACT", "Supervisor has no associated contract")),
        ).into_response();
    };

    // Add the question
    let question_id = state.add_supervisor_question(
        supervisor_id,
        contract_id,
        owner_id,
        request.question.clone(),
        request.choices.clone(),
        request.context.clone(),
        request.multi_select,
        request.question_type.clone(),
    );

    // Broadcast question as task output entry for the task's chat
    let question_data = serde_json::json!({
        "question_id": question_id.to_string(),
        "choices": request.choices,
        "context": request.context,
        "multi_select": request.multi_select,
        "question_type": request.question_type,
    });
    state.broadcast_task_output(TaskOutputNotification {
        task_id: supervisor_id,
        owner_id: Some(owner_id),
        message_type: "supervisor_question".to_string(),
        content: request.question.clone(),
        tool_name: None,
        tool_input: Some(question_data.clone()),
        is_error: None,
        cost_usd: None,
        duration_ms: None,
        is_partial: false,
    });

    // Persist to database so it appears when reloading the page
    // Use event_type "output" with messageType "supervisor_question" to match TaskOutputEntry format
    if let Some(pool) = state.db_pool.as_ref() {
        let event_data = serde_json::json!({
            "messageType": "supervisor_question",
            "content": request.question,
            "toolInput": question_data,
        });
        let _ = repository::create_task_event(
            pool,
            supervisor_id,
            "output",
            None,
            None,
            Some(event_data),
        ).await;
    }

    // If non_blocking mode, return immediately
    if request.non_blocking {
        return (
            StatusCode::OK,
            Json(AskQuestionResponse {
                question_id,
                response: None,
                timed_out: false,
            }),
        ).into_response();
    }

    // If phaseguard is enabled, pause the supervisor task and return
    // The task will be auto-resumed when a message is sent to it (e.g., when user answers)
    if request.phaseguard {
        // Pause the supervisor task
        if let Some(daemon_id) = supervisor.daemon_id {
            let cmd = DaemonCommand::PauseTask { task_id: supervisor_id };
            if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
                tracing::warn!(supervisor_id = %supervisor_id, error = %e, "Failed to pause supervisor for phaseguard");
            } else {
                tracing::info!(supervisor_id = %supervisor_id, "Paused supervisor for phaseguard question");
            }
        }

        // Update task status to paused in DB
        let update = crate::db::models::UpdateTaskRequest {
            status: Some("paused".to_string()),
            ..Default::default()
        };
        if let Err(e) = repository::update_task_for_owner(pool, supervisor_id, owner_id, update).await {
            tracing::warn!(supervisor_id = %supervisor_id, error = %e, "Failed to update task status to paused");
        }

        return (
            StatusCode::OK,
            Json(AskQuestionResponse {
                question_id,
                response: None,
                timed_out: false,
            }),
        ).into_response();
    }

    // Poll for response with timeout
    let timeout_duration = std::time::Duration::from_secs(request.timeout_seconds.max(1) as u64);
    let start = std::time::Instant::now();
    let poll_interval = std::time::Duration::from_millis(500);

    loop {
        // Check if response has been submitted
        if let Some(response) = state.get_question_response(question_id) {
            // Clean up the response
            state.cleanup_question_response(question_id);

            return (
                StatusCode::OK,
                Json(AskQuestionResponse {
                    question_id,
                    response: Some(response.response),
                    timed_out: false,
                }),
            ).into_response();
        }

        // Check timeout
        if start.elapsed() >= timeout_duration {
            // Remove the pending question on timeout
            state.remove_pending_question(question_id);

            return (
                StatusCode::REQUEST_TIMEOUT,
                Json(AskQuestionResponse {
                    question_id,
                    response: None,
                    timed_out: true,
                }),
            ).into_response();
        }

        // Wait before polling again
        tokio::time::sleep(poll_interval).await;
    }
}

/// Get all pending questions for the current user.
#[utoipa::path(
    get,
    path = "/api/v1/mesh/questions",
    responses(
        (status = 200, description = "List of pending questions", body = Vec<PendingQuestionSummary>),
        (status = 401, description = "Unauthorized"),
        (status = 500, description = "Internal server error"),
    ),
    security(
        ("bearer_auth" = []),
        ("api_key" = [])
    ),
    tag = "Mesh"
)]
pub async fn list_pending_questions(
    State(state): State<SharedState>,
    Authenticated(auth): Authenticated,
) -> impl IntoResponse {
    let questions: Vec<PendingQuestionSummary> = state
        .get_pending_questions_for_owner(auth.owner_id)
        .into_iter()
        .map(|q| PendingQuestionSummary {
            question_id: q.question_id,
            task_id: q.task_id,
            contract_id: q.contract_id,
            question: q.question,
            choices: q.choices,
            context: q.context,
            created_at: q.created_at,
            multi_select: q.multi_select,
            question_type: q.question_type,
        })
        .collect();

    Json(questions).into_response()
}

/// Answer a pending supervisor question.
#[utoipa::path(
    post,
    path = "/api/v1/mesh/questions/{question_id}/answer",
    params(
        ("question_id" = Uuid, Path, description = "Question ID")
    ),
    request_body = AnswerQuestionRequest,
    responses(
        (status = 200, description = "Question answered", body = AnswerQuestionResponse),
        (status = 401, description = "Unauthorized"),
        (status = 404, description = "Question not found"),
        (status = 500, description = "Internal server error"),
    ),
    security(
        ("bearer_auth" = []),
        ("api_key" = [])
    ),
    tag = "Mesh"
)]
pub async fn answer_question(
    State(state): State<SharedState>,
    Authenticated(auth): Authenticated,
    Path(question_id): Path<Uuid>,
    Json(request): Json<AnswerQuestionRequest>,
) -> impl IntoResponse {
    // Verify the question exists and belongs to this owner
    let question = match state.get_pending_question(question_id) {
        Some(q) if q.owner_id == auth.owner_id => q,
        Some(_) => {
            return (
                StatusCode::FORBIDDEN,
                Json(ApiError::new("FORBIDDEN", "Question belongs to another user")),
            ).into_response();
        }
        None => {
            return (
                StatusCode::NOT_FOUND,
                Json(ApiError::new("NOT_FOUND", "Question not found or already answered")),
            ).into_response();
        }
    };

    // Submit the response
    let success = state.submit_question_response(question_id, request.response.clone());

    if success {
        tracing::info!(
            question_id = %question_id,
            task_id = %question.task_id,
            "User answered supervisor question"
        );

        // Send the response to the task as a message
        // This will auto-resume the task if it was paused (phaseguard)
        let pool = state.db_pool.as_ref().unwrap();
        if let Ok(Some(task)) = repository::get_task_for_owner(pool, question.task_id, auth.owner_id).await {
            if let Some(daemon_id) = task.daemon_id {
                // Format the response message
                let response_msg = format!(
                    "\n[User Response to Question]\nQuestion: {}\nAnswer: {}\n",
                    question.question,
                    request.response
                );
                let cmd = DaemonCommand::SendMessage {
                    task_id: question.task_id,
                    message: response_msg,
                };
                if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
                    tracing::warn!(
                        task_id = %question.task_id,
                        error = %e,
                        "Failed to send response message to task"
                    );
                } else {
                    tracing::info!(
                        task_id = %question.task_id,
                        "Sent response message to task (will auto-resume if paused)"
                    );
                }
            }
        }
    }

    Json(AnswerQuestionResponse { success }).into_response()
}

// =============================================================================
// Supervisor Resume and Conversation Rewind
// =============================================================================

/// Response for supervisor resume
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ResumeSupervisorResponse {
    pub supervisor_task_id: Uuid,
    pub daemon_id: Option<Uuid>,
    pub resumed_from: ResumedFromInfo,
    pub status: String,
}

#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ResumedFromInfo {
    pub phase: String,
    pub last_activity: chrono::DateTime<chrono::Utc>,
    pub message_count: i32,
}

/// Resume interrupted supervisor with specified mode.
///
/// POST /api/v1/contracts/{id}/supervisor/resume
#[utoipa::path(
    post,
    path = "/api/v1/contracts/{id}/supervisor/resume",
    params(
        ("id" = Uuid, Path, description = "Contract ID")
    ),
    request_body = crate::db::models::ResumeSupervisorRequest,
    responses(
        (status = 200, description = "Supervisor resumed", body = ResumeSupervisorResponse),
        (status = 400, description = "Invalid request", body = ApiError),
        (status = 401, description = "Unauthorized", body = ApiError),
        (status = 404, description = "Contract or supervisor not found", body = ApiError),
        (status = 409, description = "Supervisor is already running", body = ApiError),
        (status = 503, description = "Database not configured", body = ApiError),
        (status = 500, description = "Internal server error", body = ApiError),
    ),
    security(
        ("bearer_auth" = []),
        ("api_key" = [])
    ),
    tag = "Mesh Supervisor"
)]
pub async fn resume_supervisor(
    State(state): State<SharedState>,
    Path(contract_id): Path<Uuid>,
    auth: crate::server::auth::Authenticated,
    Json(req): Json<crate::db::models::ResumeSupervisorRequest>,
) -> impl IntoResponse {
    let crate::server::auth::Authenticated(auth_info) = auth;

    let Some(ref pool) = state.db_pool else {
        return (
            StatusCode::SERVICE_UNAVAILABLE,
            Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
        )
            .into_response();
    };

    // Get contract and verify ownership
    let contract = match repository::get_contract_for_owner(pool, contract_id, auth_info.owner_id).await {
        Ok(Some(c)) => c,
        Ok(None) => {
            return (
                StatusCode::NOT_FOUND,
                Json(ApiError::new("NOT_FOUND", "Contract not found")),
            )
                .into_response();
        }
        Err(e) => {
            tracing::error!("Failed to get contract {}: {}", contract_id, e);
            return (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", e.to_string())),
            )
                .into_response();
        }
    };

    // Get existing supervisor state
    let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await {
        Ok(Some(s)) => s,
        Ok(None) => {
            return (
                StatusCode::NOT_FOUND,
                Json(ApiError::new(
                    "NO_SUPERVISOR_STATE",
                    "No supervisor state found - supervisor may not have been started",
                )),
            )
                .into_response();
        }
        Err(e) => {
            tracing::error!("Failed to get supervisor state: {}", e);
            return (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", e.to_string())),
            )
                .into_response();
        }
    };

    // Get supervisor task
    let supervisor_task = match repository::get_task_for_owner(pool, supervisor_state.task_id, auth_info.owner_id).await {
        Ok(Some(t)) => t,
        Ok(None) => {
            return (
                StatusCode::NOT_FOUND,
                Json(ApiError::new("NOT_FOUND", "Supervisor task not found")),
            )
                .into_response();
        }
        Err(e) => {
            tracing::error!("Failed to get supervisor task: {}", e);
            return (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", e.to_string())),
            )
                .into_response();
        }
    };

    // Check if already running - but only if daemon is actually connected
    // (daemon disconnect handler may not have updated status yet)
    if supervisor_task.status == "running" {
        let daemon_connected = supervisor_task
            .daemon_id
            .map(|d| state.is_daemon_connected(d))
            .unwrap_or(false);

        if daemon_connected {
            return (
                StatusCode::CONFLICT,
                Json(ApiError::new("ALREADY_RUNNING", "Supervisor is already running")),
            )
                .into_response();
        }
        // Daemon not connected - allow resume (treat as interrupted)
        tracing::info!(
            supervisor_task_id = %supervisor_task.id,
            daemon_id = ?supervisor_task.daemon_id,
            "Supervisor status is 'running' but daemon is not connected, allowing resume"
        );
    }

    // Calculate message count from conversation history
    let message_count = supervisor_state
        .conversation_history
        .as_array()
        .map(|arr| arr.len() as i32)
        .unwrap_or(0);

    // Find a connected daemon for this owner
    let target_daemon_id = match state.find_alternative_daemon(auth_info.owner_id, &[]) {
        Some(id) => id,
        None => {
            return (
                StatusCode::SERVICE_UNAVAILABLE,
                Json(ApiError::new(
                    "NO_DAEMON",
                    "No daemons connected for your account. Cannot resume supervisor.",
                )),
            )
                .into_response();
        }
    };

    // Track response values (may be updated by resume modes)
    let mut response_daemon_id = supervisor_task.daemon_id;
    let mut response_status = "pending".to_string();

    // Based on resume mode, handle differently
    match req.resume_mode.as_str() {
        "continue" => {
            // Update task status to starting and assign daemon
            if let Err(e) = sqlx::query("UPDATE tasks SET status = 'starting', daemon_id = $1 WHERE id = $2")
                .bind(target_daemon_id)
                .bind(supervisor_state.task_id)
                .execute(pool)
                .await
            {
                tracing::error!("Failed to update task for resume: {}", e);
                return (
                    StatusCode::INTERNAL_SERVER_ERROR,
                    Json(ApiError::new("DB_ERROR", e.to_string())),
                )
                    .into_response();
            }

            // Fetch latest checkpoint patch for worktree recovery
            let (patch_data, patch_base_sha) = match repository::get_latest_checkpoint_patch(pool, supervisor_state.task_id).await {
                Ok(Some(patch)) => {
                    tracing::info!(
                        task_id = %supervisor_state.task_id,
                        patch_size = patch.patch_size_bytes,
                        base_sha = %patch.base_commit_sha,
                        "Including checkpoint patch for worktree recovery"
                    );
                    // Encode patch as base64 for JSON transport
                    let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data);
                    (Some(encoded), Some(patch.base_commit_sha))
                }
                Ok(None) => {
                    tracing::debug!(task_id = %supervisor_state.task_id, "No checkpoint patch found");
                    (None, None)
                }
                Err(e) => {
                    tracing::warn!(task_id = %supervisor_state.task_id, error = %e, "Failed to fetch checkpoint patch");
                    (None, None)
                }
            };

            // Send SpawnTask with resume_session=true to use Claude's --continue
            // Include conversation_history as fallback if worktree doesn't exist on target daemon
            let command = DaemonCommand::SpawnTask {
                task_id: supervisor_state.task_id,
                task_name: supervisor_task.name.clone(),
                plan: supervisor_task.plan.clone(),
                repo_url: supervisor_task.repository_url.clone(),
                base_branch: supervisor_task.base_branch.clone(),
                target_branch: supervisor_task.target_branch.clone(),
                parent_task_id: supervisor_task.parent_task_id,
                depth: supervisor_task.depth,
                is_orchestrator: false,
                target_repo_path: supervisor_task.target_repo_path.clone(),
                completion_action: supervisor_task.completion_action.clone(),
                continue_from_task_id: supervisor_task.continue_from_task_id,
                copy_files: supervisor_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
                contract_id: supervisor_task.contract_id,
                is_supervisor: true,
                autonomous_loop: false,
                resume_session: true, // Use --continue to preserve conversation
                conversation_history: Some(supervisor_state.conversation_history.clone()), // Fallback if worktree missing
                patch_data,
                patch_base_sha,
                local_only: contract.local_only,
                supervisor_worktree_task_id: None, // Supervisor uses its own worktree
            };

            if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
                // Rollback status on failure
                let _ = sqlx::query("UPDATE tasks SET status = 'interrupted', daemon_id = NULL WHERE id = $1")
                    .bind(supervisor_state.task_id)
                    .execute(pool)
                    .await;
                tracing::error!("Failed to send SpawnTask to daemon: {}", e);
                return (
                    StatusCode::SERVICE_UNAVAILABLE,
                    Json(ApiError::new("DAEMON_ERROR", format!("Failed to send to daemon: {}", e))),
                )
                    .into_response();
            }

            tracing::info!(
                contract_id = %contract_id,
                supervisor_task_id = %supervisor_state.task_id,
                daemon_id = %target_daemon_id,
                message_count = message_count,
                "Supervisor resumed with --continue (resume_session=true)"
            );

            // Update response values for successful resume
            response_daemon_id = Some(target_daemon_id);
            response_status = "starting".to_string();
        }
        "restart_phase" => {
            // Clear conversation but keep phase progress
            if let Err(e) = repository::update_supervisor_conversation(
                pool,
                contract_id,
                serde_json::json!([]),
            )
            .await
            {
                tracing::error!("Failed to clear conversation: {}", e);
                return (
                    StatusCode::INTERNAL_SERVER_ERROR,
                    Json(ApiError::new("DB_ERROR", e.to_string())),
                )
                    .into_response();
            }

            if let Err(e) = sqlx::query("UPDATE tasks SET status = 'pending' WHERE id = $1")
                .bind(supervisor_state.task_id)
                .execute(pool)
                .await
            {
                tracing::error!("Failed to update task status: {}", e);
                return (
                    StatusCode::INTERNAL_SERVER_ERROR,
                    Json(ApiError::new("DB_ERROR", e.to_string())),
                )
                    .into_response();
            }
        }
        "from_checkpoint" => {
            // This would require more complex handling with checkpoint system
            return (
                StatusCode::BAD_REQUEST,
                Json(ApiError::new(
                    "NOT_IMPLEMENTED",
                    "from_checkpoint mode not yet implemented",
                )),
            )
                .into_response();
        }
        _ => {
            return (
                StatusCode::BAD_REQUEST,
                Json(ApiError::new(
                    "INVALID_RESUME_MODE",
                    "Invalid resume_mode. Use: continue, restart_phase, or from_checkpoint",
                )),
            )
                .into_response();
        }
    }

    tracing::info!(
        contract_id = %contract_id,
        supervisor_task_id = %supervisor_state.task_id,
        resume_mode = %req.resume_mode,
        message_count = message_count,
        "Supervisor resume requested"
    );

    Json(ResumeSupervisorResponse {
        supervisor_task_id: supervisor_state.task_id,
        daemon_id: response_daemon_id,
        resumed_from: ResumedFromInfo {
            phase: contract.phase,
            last_activity: supervisor_state.last_activity,
            message_count,
        },
        status: response_status,
    })
    .into_response()
}

/// Response for conversation rewind
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct RewindConversationResponse {
    pub contract_id: Uuid,
    pub messages_removed: i32,
    pub new_message_count: i32,
    pub code_rewound: bool,
}

/// Rewind supervisor conversation to specified point.
///
/// POST /api/v1/contracts/{id}/supervisor/conversation/rewind
#[utoipa::path(
    post,
    path = "/api/v1/contracts/{id}/supervisor/conversation/rewind",
    params(
        ("id" = Uuid, Path, description = "Contract ID")
    ),
    request_body = crate::db::models::RewindConversationRequest,
    responses(
        (status = 200, description = "Conversation rewound", body = RewindConversationResponse),
        (status = 400, description = "Invalid request", body = ApiError),
        (status = 401, description = "Unauthorized", body = ApiError),
        (status = 404, description = "Contract or supervisor not found", body = ApiError),
        (status = 503, description = "Database not configured", body = ApiError),
        (status = 500, description = "Internal server error", body = ApiError),
    ),
    security(
        ("bearer_auth" = []),
        ("api_key" = [])
    ),
    tag = "Mesh Supervisor"
)]
pub async fn rewind_conversation(
    State(state): State<SharedState>,
    Path(contract_id): Path<Uuid>,
    auth: crate::server::auth::Authenticated,
    Json(req): Json<crate::db::models::RewindConversationRequest>,
) -> impl IntoResponse {
    let crate::server::auth::Authenticated(auth_info) = auth;

    let Some(ref pool) = state.db_pool else {
        return (
            StatusCode::SERVICE_UNAVAILABLE,
            Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
        )
            .into_response();
    };

    // Get contract and verify ownership
    let _contract = match repository::get_contract_for_owner(pool, contract_id, auth_info.owner_id).await {
        Ok(Some(c)) => c,
        Ok(None) => {
            return (
                StatusCode::NOT_FOUND,
                Json(ApiError::new("NOT_FOUND", "Contract not found")),
            )
                .into_response();
        }
        Err(e) => {
            tracing::error!("Failed to get contract {}: {}", contract_id, e);
            return (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", e.to_string())),
            )
                .into_response();
        }
    };

    // Get supervisor state
    let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await {
        Ok(Some(s)) => s,
        Ok(None) => {
            return (
                StatusCode::NOT_FOUND,
                Json(ApiError::new("NOT_FOUND", "Supervisor state not found")),
            )
                .into_response();
        }
        Err(e) => {
            tracing::error!("Failed to get supervisor state: {}", e);
            return (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", e.to_string())),
            )
                .into_response();
        }
    };

    let conversation = supervisor_state
        .conversation_history
        .as_array()
        .cloned()
        .unwrap_or_default();

    let original_count = conversation.len() as i32;

    // Determine how many messages to keep
    let new_count = if let Some(by_count) = req.by_message_count {
        (original_count - by_count).max(0)
    } else if let Some(ref to_id) = req.to_message_id {
        // Find message by ID and keep up to and including it
        let index = conversation
            .iter()
            .position(|msg| msg.get("id").and_then(|v| v.as_str()) == Some(to_id.as_str()))
            .map(|i| i as i32)
            .unwrap_or(original_count - 1);
        (index + 1).min(original_count).max(0)
    } else {
        // Default to removing last message
        (original_count - 1).max(0)
    };

    // Truncate conversation
    let new_conversation: Vec<serde_json::Value> = conversation
        .into_iter()
        .take(new_count as usize)
        .collect();

    // Update the conversation
    if let Err(e) = repository::update_supervisor_conversation(
        pool,
        contract_id,
        serde_json::Value::Array(new_conversation),
    )
    .await
    {
        tracing::error!("Failed to update conversation: {}", e);
        return (
            StatusCode::INTERNAL_SERVER_ERROR,
            Json(ApiError::new("DB_ERROR", e.to_string())),
        )
            .into_response();
    }

    tracing::info!(
        contract_id = %contract_id,
        original_count = original_count,
        new_count = new_count,
        messages_removed = original_count - new_count,
        "Conversation rewound"
    );

    Json(RewindConversationResponse {
        contract_id,
        messages_removed: original_count - new_count,
        new_message_count: new_count,
        code_rewound: req.rewind_code.unwrap_or(false), // TODO: implement code rewind
    })
    .into_response()
}

// =============================================================================
// Red Team Task Spawning
// =============================================================================

/// Generate the system prompt/plan for a red team task.
///
/// This creates detailed instructions for the red team monitor, including
/// what to look for, severity levels, and how to report issues.
pub fn generate_red_team_plan(
    contract_name: &str,
    contract_phase: &str,
    custom_prompt: Option<&str>,
) -> String {
    let custom_criteria = if let Some(prompt) = custom_prompt {
        format!(
            r#"

## Custom Review Criteria

The contract owner has specified additional review criteria:
{}
"#,
            prompt
        )
    } else {
        String::new()
    };

    format!(
        r#"# Red Team Monitor

You are an adversarial quality reviewer for a software development contract. Your role is to monitor work task outputs in real-time and flag potential issues BEFORE they compound into larger problems.

## Your Mission

Monitor all task outputs and verify:
1. **Plan Adherence**: Are tasks following the implementation plan?
2. **Code Quality**: Does the code meet repository standards?
3. **Contract Requirements**: Does the implementation match the specification?
4. **Best Practices**: Are there obvious anti-patterns or issues?

## Access Available

You have read-only access to:
- Task outputs (streamed in real-time)
- Task diffs (code changes)
- Contract specifications and plan documents
- Repository configuration files (CONTRIBUTING.md, linting configs, etc.)

## How to Monitor

1. **Subscribe to task outputs**: You'll receive outputs from all work tasks
2. **Analyze code changes**: Request diffs for completed tasks
3. **Cross-reference**: Compare outputs against the plan and specifications
4. **Report issues**: Use `makima red-team notify` when you detect problems

## When to Notify

NOTIFY the supervisor when you observe:
- **Critical**: Security vulnerabilities, data loss risks, breaking changes
- **High**: Significant deviations from the plan, major code quality issues
- **Medium**: Missing tests, suboptimal implementations, minor standard violations
- **Low**: Style inconsistencies, documentation gaps (use sparingly)

## What NOT to Do

- Do NOT nitpick minor style issues (that's what linters are for)
- Do NOT block progress for trivial concerns
- Do NOT write code or make changes yourself
- Do NOT notify for things that are already in progress and being addressed
- Do NOT create duplicate notifications for the same issue

## Notification Format

When notifying, always include:
1. A clear, concise description of the issue
2. The severity level (critical/high/medium/low)
3. The related task ID if applicable
4. The specific file or code location if known
5. Why this matters (reference to plan, spec, or standards)

## Example Notification

```
makima red-team notify "Task is implementing authentication with plaintext password storage, which contradicts the security requirements in the specification document" \
  --severity critical \
  --task <task_id> \
  --file "src/auth/user.rs" \
  --context "Specification section 3.2 requires bcrypt hashing for all passwords"
```
{}
## Contract Context

Contract: {}
Phase: {}

Focus your monitoring on outputs that relate to the active work tasks. Prioritize issues that could affect the success of the contract or introduce technical debt.
"#,
        custom_criteria, contract_name, contract_phase
    )
}

/// Spawn a red team task for a contract.
///
/// This creates a red team monitor task that will observe work task outputs
/// and can notify the supervisor about potential issues.
pub async fn spawn_red_team_task(
    pool: &sqlx::PgPool,
    state: &SharedState,
    contract_id: Uuid,
    owner_id: Uuid,
    contract_name: &str,
    contract_phase: &str,
    red_team_prompt: Option<&str>,
) -> Result<Task, String> {
    // Generate the red team plan/prompt
    let plan = generate_red_team_plan(contract_name, contract_phase, red_team_prompt);

    // Create task request
    let create_req = CreateTaskRequest {
        name: "Red Team Monitor".to_string(),
        description: Some("Adversarial review task monitoring work task outputs".to_string()),
        plan,
        contract_id: Some(contract_id),
        parent_task_id: None,
        is_supervisor: false,
        is_red_team: true,
        priority: 0,
        repository_url: None, // Red team doesn't need a repo
        base_branch: None,
        target_branch: None,
        merge_mode: None,
        target_repo_path: None,
        completion_action: None,
        continue_from_task_id: None,
        copy_files: None,
        checkpoint_sha: None,
        branched_from_task_id: None,
        conversation_history: None,
        supervisor_worktree_task_id: None, // Red team uses its own working area
    };

    // Create task in DB
    let task = repository::create_task_for_owner(pool, owner_id, create_req)
        .await
        .map_err(|e| format!("Failed to create red team task: {}", e))?;

    tracing::info!(
        contract_id = %contract_id,
        red_team_task_id = %task.id,
        "Created red team task for contract"
    );

    // Find a daemon to run the red team task
    for entry in state.daemon_connections.iter() {
        let daemon = entry.value();
        if daemon.owner_id == owner_id {
            // Update task with daemon assignment
            let update_req = UpdateTaskRequest {
                status: Some("starting".to_string()),
                daemon_id: Some(daemon.id),
                version: Some(task.version),
                ..Default::default()
            };

            match repository::update_task_for_owner(pool, task.id, owner_id, update_req).await {
                Ok(Some(updated_task)) => {
                    // Send spawn command to daemon
                    let cmd = DaemonCommand::SpawnTask {
                        task_id: updated_task.id,
                        task_name: updated_task.name.clone(),
                        plan: updated_task.plan.clone(),
                        repo_url: None, // Red team doesn't need a repo
                        base_branch: None,
                        target_branch: None,
                        parent_task_id: None,
                        depth: 0,
                        is_orchestrator: false,
                        target_repo_path: None,
                        completion_action: None,
                        continue_from_task_id: None,
                        copy_files: None,
                        contract_id: Some(contract_id),
                        is_supervisor: false,
                        autonomous_loop: false,
                        resume_session: false,
                        conversation_history: None,
                        patch_data: None,
                        patch_base_sha: None,
                        local_only: true, // Red team is always local-only
                        supervisor_worktree_task_id: None,
                    };

                    if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
                        tracing::warn!(
                            error = %e,
                            daemon_id = %daemon.id,
                            red_team_task_id = %task.id,
                            "Failed to send red team spawn command"
                        );
                        // Rollback
                        let rollback_req = UpdateTaskRequest {
                            status: Some("pending".to_string()),
                            clear_daemon_id: true,
                            ..Default::default()
                        };
                        let _ = repository::update_task_for_owner(pool, task.id, owner_id, rollback_req).await;
                    } else {
                        tracing::info!(
                            red_team_task_id = %task.id,
                            daemon_id = %daemon.id,
                            "Red team task spawn command sent"
                        );
                        return Ok(updated_task);
                    }
                }
                Ok(None) => {
                    tracing::warn!(red_team_task_id = %task.id, "Red team task not found when updating daemon_id");
                }
                Err(e) => {
                    tracing::error!(red_team_task_id = %task.id, error = %e, "Failed to update red team task with daemon_id");
                }
            }
            break;
        }
    }

    // Return the task even if we couldn't start it on a daemon
    // It will remain pending and can be started later
    Ok(task)
}