summaryrefslogblamecommitdiff
path: root/makima/src/server/handlers/mesh_supervisor.rs
blob: 4a9a00bebff01f2558e4056ac026f1f00c84ed21 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
                                                             
   







                                                                          










                                    
                                          
                          
                                       

                                                              
                                      

                                                                                
              

                                                                                















                                                                                             
 





                                                                             
 














                                                                       
 











                                                                        

 



                                                                                


                                       
                         

                             
                                

                                                  


                                                                    

                         

                           
                                                          

                           
                                                                      





                                               


                                      
        

 
                                                  

                                  
                          
                                 
                        

                                                                 

                            

 


                                       


                         


                                     


                      




                                     

                                                     



                                                  

                           

                              

 
                                                                                
                    

                                                                                

                                                                        

               

                                               
              
                                                                                   
                                                     
                                                             
      
                                

                           
                          

                                     
                                            
                        
                                                                              





                                               



                                                                                    


                                      


                                                                   

                   
                                                                

                                                  


                                                                        


         
                                         
 








                                                                                           

      











                                                    
                








                                      
     
 








                                                                                          

      
                                                                        



                                                              

                                                                          







                                                      


                                 
 

                                                
                        






                                              


                                     
                                                       
                    






                                            


                                 
 
                                                
     

 

                                                                
               


                                                                      
              

                                                                                              
      
                                

                           
                           
                                     

                                  
                        






                                                                     
 

                                                                      
                






                                                  

                             
     
 














                                                                          
                    






                                                      


                                 
                                       
                    






                                          


                                 


                                                
 























































                                                                                              

                                      
                                                                       



                                 
                                           
                

                                                                  



                             

                                                                      
            




                                                                   
     
 

                                                                                
                                        

                                                                                


















                                                


                                   



                                                
                                                                      





                                                      
                                                                                   
                                                     
                                                             
      
                                






                                                   
                                                                              



                                           










                                                                                    

                                               









                                                                   
                                                                

                                                  
                                                                        









                                                
                                                                                      




                                 


                                                              





                                                                                       








                                             
                     




























                                                                          
//! Question + order backchannel for directive-spawned tasks.
//!
//! Originally a much larger handler that orchestrated contract-supervisor
//! task trees (spawn / wait / merge / PR / etc.). Legacy contracts and
//! supervisor tasks have been removed; what remains is the in-memory
//! question machinery (`makima directive ask`) and order creation
//! (`makima directive create-order`).
//!
//! Module name is kept as `mesh_supervisor` for route-path stability —
//! the CLI client still hits `/api/v1/mesh/supervisor/...` endpoints.

use axum::{
    extract::{Path, State},
    http::{HeaderMap, StatusCode},
    response::IntoResponse,
    Json,
};
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use uuid::Uuid;

use crate::db::models::CreateOrderRequest;
use crate::db::repository;
use crate::server::auth::Authenticated;
use crate::server::handlers::mesh::{extract_auth, AuthSource};
use crate::server::messages::ApiError;
use crate::server::state::SharedState;

// =============================================================================
// Auth helper
// =============================================================================

/// Verify the request comes from a directive task (tool-key auth) and
/// return the calling task id + owner id.
async fn verify_task_auth(
    state: &SharedState,
    headers: &HeaderMap,
) -> Result<(Uuid, Uuid), (StatusCode, Json<ApiError>)> {
    let auth = extract_auth(state, headers);
    let task_id = match auth {
        AuthSource::ToolKey(task_id) => task_id,
        _ => {
            return Err((
                StatusCode::UNAUTHORIZED,
                Json(ApiError::new("UNAUTHORIZED", "These endpoints require tool key auth")),
            ));
        }
    };

    let pool = state.db_pool.as_ref().ok_or_else(|| {
        (
            StatusCode::SERVICE_UNAVAILABLE,
            Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
        )
    })?;

    let task = repository::get_task(pool, task_id)
        .await
        .map_err(|e| {
            tracing::error!(error = %e, "Failed to load task");
            (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", "Failed to load task")),
            )
        })?
        .ok_or_else(|| {
            (
                StatusCode::NOT_FOUND,
                Json(ApiError::new("NOT_FOUND", "Task not found")),
            )
        })?;

    // Only directive-attached tasks may use this backchannel.
    if task.directive_id.is_none() {
        return Err((
            StatusCode::FORBIDDEN,
            Json(ApiError::new(
                "NOT_DIRECTIVE_TASK",
                "Only directive-attached tasks can use these endpoints",
            )),
        ));
    }

    Ok((task_id, task.owner_id))
}

// =============================================================================
// Question types
// =============================================================================

#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct AskQuestionRequest {
    pub question: String,
    #[serde(default)]
    pub choices: Vec<String>,
    pub context: Option<String>,
    #[serde(default = "default_question_timeout")]
    pub timeout_seconds: i32,
    /// When true the request blocks until the user responds (no
    /// timeout) — the CLI reconnects via the poll endpoint if the
    /// server-side timeout is reached.
    #[serde(default)]
    pub phaseguard: bool,
    #[serde(default)]
    pub multi_select: bool,
    /// Return immediately without waiting for a response.
    #[serde(default)]
    pub non_blocking: bool,
    /// Question type: general, phase_confirmation, contract_complete.
    #[serde(default = "default_question_type")]
    pub question_type: String,
}

fn default_question_type() -> String {
    "general".to_string()
}

fn default_question_timeout() -> i32 {
    3600
}

#[derive(Debug, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct AskQuestionResponse {
    pub question_id: Uuid,
    pub response: Option<String>,
    pub timed_out: bool,
    /// Server-side timeout was reached but the question is still
    /// pending. CLI should re-poll via `/poll`.
    #[serde(default)]
    pub still_pending: bool,
}

#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct AnswerQuestionRequest {
    pub response: String,
}

#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct AnswerQuestionResponse {
    pub success: bool,
}

#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct PendingQuestionSummary {
    pub question_id: Uuid,
    pub task_id: Uuid,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub directive_id: Option<Uuid>,
    pub question: String,
    pub choices: Vec<String>,
    pub context: Option<String>,
    pub created_at: chrono::DateTime<chrono::Utc>,
    #[serde(default)]
    pub multi_select: bool,
    #[serde(default)]
    pub question_type: String,
}

// =============================================================================
// Question handlers
// =============================================================================

/// Ask the user a question from a directive task. Blocks until the user
/// answers, the timeout fires, or `non_blocking` returns immediately.
#[utoipa::path(
    post,
    path = "/api/v1/mesh/supervisor/questions",
    request_body = AskQuestionRequest,
    responses(
        (status = 200, description = "Question asked", body = AskQuestionResponse),
        (status = 401, description = "Unauthorized"),
        (status = 403, description = "Not a directive task"),
    ),
    security(("tool_key" = [])),
    tag = "Mesh Supervisor"
)]
pub async fn ask_question(
    State(state): State<SharedState>,
    headers: HeaderMap,
    Json(request): Json<AskQuestionRequest>,
) -> impl IntoResponse {
    let (task_id, owner_id) = match verify_task_auth(&state, &headers).await {
        Ok(ids) => ids,
        Err(e) => return e.into_response(),
    };

    let pool = state.db_pool.as_ref().unwrap();

    // Pull the directive_id off the calling task so subscribers can
    // route the question to the right directive view.
    let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
        Ok(Some(t)) => t,
        Ok(None) => {
            return (
                StatusCode::NOT_FOUND,
                Json(ApiError::new("NOT_FOUND", "Task not found")),
            )
                .into_response();
        }
        Err(e) => {
            tracing::error!(error = %e, "Failed to fetch task");
            return (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", "Failed to fetch task")),
            )
                .into_response();
        }
    };

    let directive_id = task.directive_id;

    // Reconcile mode controls block-vs-timeout behaviour on directive
    // tasks: semi-auto / manual block indefinitely (effectively
    // phaseguard); auto times out after 30s.
    let reconcile_mode: String = match directive_id {
        Some(did) => match repository::get_directive_for_owner(pool, owner_id, did).await {
            Ok(Some(d)) => d.reconcile_mode.clone(),
            _ => "auto".to_string(),
        },
        None => "auto".to_string(),
    };

    let question_id = state.add_supervisor_question(
        task_id,
        directive_id,
        owner_id,
        request.question.clone(),
        request.choices.clone(),
        request.context.clone(),
        request.multi_select,
        request.question_type.clone(),
    );

    if request.non_blocking {
        return (
            StatusCode::OK,
            Json(AskQuestionResponse {
                question_id,
                response: None,
                timed_out: false,
                still_pending: false,
            }),
        )
            .into_response();
    }

    // Determine block behaviour.
    let use_phaseguard =
        request.phaseguard || reconcile_mode == "semi-auto" || reconcile_mode == "manual";
    let timeout_secs = if use_phaseguard {
        300
    } else if reconcile_mode == "auto" {
        30
    } else {
        request.timeout_seconds.max(1) as u64
    };

    let timeout_duration = std::time::Duration::from_secs(timeout_secs);
    let start = std::time::Instant::now();
    let poll_interval = std::time::Duration::from_millis(500);

    loop {
        if let Some(response) = state.get_question_response(question_id) {
            state.cleanup_question_response(question_id);
            return (
                StatusCode::OK,
                Json(AskQuestionResponse {
                    question_id,
                    response: Some(response.response),
                    timed_out: false,
                    still_pending: false,
                }),
            )
                .into_response();
        }

        if start.elapsed() >= timeout_duration {
            if use_phaseguard {
                return (
                    StatusCode::OK,
                    Json(AskQuestionResponse {
                        question_id,
                        response: None,
                        timed_out: false,
                        still_pending: true,
                    }),
                )
                    .into_response();
            }
            state.remove_pending_question(question_id);
            return (
                StatusCode::REQUEST_TIMEOUT,
                Json(AskQuestionResponse {
                    question_id,
                    response: None,
                    timed_out: true,
                    still_pending: false,
                }),
            )
                .into_response();
        }

        tokio::time::sleep(poll_interval).await;
    }
}

/// Re-poll a question by id. Used by the CLI to reconnect after
/// `still_pending` from `ask_question`. Blocks up to 5 minutes.
#[utoipa::path(
    get,
    path = "/api/v1/mesh/supervisor/questions/{question_id}/poll",
    params(("question_id" = Uuid, Path, description = "Question id")),
    responses(
        (status = 200, description = "Answered or still pending", body = AskQuestionResponse),
        (status = 404, description = "Not found"),
    ),
    security(("tool_key" = [])),
    tag = "Mesh Supervisor"
)]
pub async fn poll_question(
    State(state): State<SharedState>,
    headers: HeaderMap,
    Path(question_id): Path<Uuid>,
) -> impl IntoResponse {
    if verify_task_auth(&state, &headers).await.is_err() {
        return (
            StatusCode::UNAUTHORIZED,
            Json(ApiError::new("UNAUTHORIZED", "Tool key required")),
        )
            .into_response();
    }

    if let Some(response) = state.get_question_response(question_id) {
        state.cleanup_question_response(question_id);
        return (
            StatusCode::OK,
            Json(AskQuestionResponse {
                question_id,
                response: Some(response.response),
                timed_out: false,
                still_pending: false,
            }),
        )
            .into_response();
    }

    if state.get_pending_question(question_id).is_none() {
        return (
            StatusCode::NOT_FOUND,
            Json(ApiError::new("NOT_FOUND", "Question not found")),
        )
            .into_response();
    }

    let timeout = std::time::Duration::from_secs(300);
    let start = std::time::Instant::now();
    let poll_interval = std::time::Duration::from_millis(500);

    loop {
        if let Some(response) = state.get_question_response(question_id) {
            state.cleanup_question_response(question_id);
            return (
                StatusCode::OK,
                Json(AskQuestionResponse {
                    question_id,
                    response: Some(response.response),
                    timed_out: false,
                    still_pending: false,
                }),
            )
                .into_response();
        }
        if start.elapsed() >= timeout {
            return (
                StatusCode::OK,
                Json(AskQuestionResponse {
                    question_id,
                    response: None,
                    timed_out: false,
                    still_pending: true,
                }),
            )
                .into_response();
        }
        tokio::time::sleep(poll_interval).await;
    }
}

/// List currently-pending questions for the caller.
#[utoipa::path(
    get,
    path = "/api/v1/mesh/questions",
    responses(
        (status = 200, description = "Pending questions", body = Vec<PendingQuestionSummary>),
    ),
    security(("bearer_auth" = []), ("api_key" = [])),
    tag = "Mesh"
)]
pub async fn list_pending_questions(
    State(state): State<SharedState>,
    Authenticated(auth): Authenticated,
) -> impl IntoResponse {
    let questions: Vec<PendingQuestionSummary> = state
        .get_pending_questions_for_owner(auth.owner_id)
        .into_iter()
        .map(|q| PendingQuestionSummary {
            question_id: q.question_id,
            task_id: q.task_id,
            directive_id: q.directive_id,
            question: q.question,
            choices: q.choices,
            context: q.context,
            created_at: q.created_at,
            multi_select: q.multi_select,
            question_type: q.question_type,
        })
        .collect();

    Json(questions).into_response()
}

/// Answer a pending question.
#[utoipa::path(
    post,
    path = "/api/v1/mesh/questions/{question_id}/answer",
    params(("question_id" = Uuid, Path, description = "Question id")),
    request_body = AnswerQuestionRequest,
    responses(
        (status = 200, description = "Answered", body = AnswerQuestionResponse),
        (status = 404, description = "Not found"),
    ),
    security(("bearer_auth" = []), ("api_key" = [])),
    tag = "Mesh"
)]
pub async fn answer_question(
    State(state): State<SharedState>,
    Authenticated(auth): Authenticated,
    Path(question_id): Path<Uuid>,
    Json(req): Json<AnswerQuestionRequest>,
) -> impl IntoResponse {
    // Ownership check: only the owner of the question can answer it.
    let question = match state.get_pending_question(question_id) {
        Some(q) => q,
        None => {
            return (
                StatusCode::NOT_FOUND,
                Json(ApiError::new("NOT_FOUND", "Question not found")),
            )
                .into_response();
        }
    };
    if question.owner_id != auth.owner_id {
        return (
            StatusCode::FORBIDDEN,
            Json(ApiError::new("FORBIDDEN", "Not your question")),
        )
            .into_response();
    }

    if state.submit_question_response(question_id, req.response) {
        Json(AnswerQuestionResponse { success: true }).into_response()
    } else {
        (
            StatusCode::NOT_FOUND,
            Json(ApiError::new("NOT_FOUND", "Question not found")),
        )
            .into_response()
    }
}

// =============================================================================
// Order creation (from directive tasks)
// =============================================================================

#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateOrderForTaskRequest {
    pub title: String,
    #[serde(default)]
    pub description: Option<String>,
    #[serde(default = "default_order_priority")]
    pub priority: String,
    #[serde(default = "default_order_type")]
    pub order_type: String,
    #[serde(default = "default_order_labels")]
    pub labels: serde_json::Value,
    #[serde(default)]
    pub repository_url: Option<String>,
}

fn default_order_priority() -> String {
    "medium".to_string()
}
fn default_order_type() -> String {
    "spike".to_string()
}
fn default_order_labels() -> serde_json::Value {
    serde_json::json!([])
}

/// Create a follow-up order from a directive task (spike/chore only).
#[utoipa::path(
    post,
    path = "/api/v1/mesh/supervisor/orders",
    request_body = CreateOrderForTaskRequest,
    responses(
        (status = 201, description = "Order created"),
        (status = 400, description = "Invalid order type or no directive context"),
        (status = 401, description = "Unauthorized"),
        (status = 403, description = "Not a directive task"),
    ),
    security(("tool_key" = [])),
    tag = "Mesh Supervisor"
)]
pub async fn create_order_for_task(
    State(state): State<SharedState>,
    headers: HeaderMap,
    Json(request): Json<CreateOrderForTaskRequest>,
) -> impl IntoResponse {
    let (task_id, owner_id) = match verify_task_auth(&state, &headers).await {
        Ok(ids) => ids,
        Err(e) => return e.into_response(),
    };

    if request.order_type != "spike" && request.order_type != "chore" {
        return (
            StatusCode::BAD_REQUEST,
            Json(ApiError::new(
                "INVALID_ORDER_TYPE",
                "Only spike and chore order types are allowed from directive tasks",
            )),
        )
            .into_response();
    }

    let pool = state.db_pool.as_ref().unwrap();

    let task = match repository::get_task(pool, task_id).await {
        Ok(Some(t)) => t,
        Ok(None) => {
            return (
                StatusCode::NOT_FOUND,
                Json(ApiError::new("NOT_FOUND", "Task not found")),
            )
                .into_response();
        }
        Err(e) => {
            tracing::error!(error = %e, "Failed to fetch task");
            return (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", "Failed to fetch task")),
            )
                .into_response();
        }
    };

    let directive_id = match task.directive_id {
        Some(id) => id,
        None => {
            return (
                StatusCode::BAD_REQUEST,
                Json(ApiError::new("NO_DIRECTIVE", "Task is not directive-attached")),
            )
                .into_response();
        }
    };

    let repository_url = if request.repository_url.is_some() {
        request.repository_url
    } else {
        match repository::get_directive_for_owner(pool, owner_id, directive_id).await {
            Ok(Some(d)) => d.repository_url,
            _ => None,
        }
    };

    let order_req = CreateOrderRequest {
        title: request.title,
        description: request.description,
        priority: Some(request.priority),
        status: Some("open".to_string()),
        order_type: Some(request.order_type),
        labels: request.labels,
        directive_id,
        repository_url,
        dog_id: None,
    };

    match repository::create_order(pool, owner_id, order_req).await {
        Ok(order) => (
            StatusCode::CREATED,
            Json(serde_json::json!({
                "id": order.id,
                "title": order.title,
                "description": order.description,
                "priority": order.priority,
                "status": order.status,
                "orderType": order.order_type,
                "directiveId": order.directive_id,
                "labels": order.labels,
                "repositoryUrl": order.repository_url,
                "createdAt": order.created_at,
            })),
        )
            .into_response(),
        Err(e) => {
            tracing::error!(error = %e, "Failed to create order");
            (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", "Failed to create order")),
            )
                .into_response()
        }
    }
}