//! Question + order backchannel for directive-spawned tasks. //! //! Originally a much larger handler that orchestrated contract-supervisor //! task trees (spawn / wait / merge / PR / etc.). Legacy contracts and //! supervisor tasks have been removed; what remains is the in-memory //! question machinery (`makima directive ask`) and order creation //! (`makima directive create-order`). //! //! Module name is kept as `mesh_supervisor` for route-path stability — //! the CLI client still hits `/api/v1/mesh/supervisor/...` endpoints. use axum::{ extract::{Path, State}, http::{HeaderMap, StatusCode}, response::IntoResponse, Json, }; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use uuid::Uuid; use crate::db::models::CreateOrderRequest; use crate::db::repository; use crate::server::auth::Authenticated; use crate::server::handlers::mesh::{extract_auth, AuthSource}; use crate::server::messages::ApiError; use crate::server::state::SharedState; // ============================================================================= // Auth helper // ============================================================================= /// Verify the request comes from a directive task (tool-key auth) and /// return the calling task id + owner id. async fn verify_task_auth( state: &SharedState, headers: &HeaderMap, ) -> Result<(Uuid, Uuid), (StatusCode, Json)> { let auth = extract_auth(state, headers); let task_id = match auth { AuthSource::ToolKey(task_id) => task_id, _ => { return Err(( StatusCode::UNAUTHORIZED, Json(ApiError::new("UNAUTHORIZED", "These endpoints require tool key auth")), )); } }; let pool = state.db_pool.as_ref().ok_or_else(|| { ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) })?; let task = repository::get_task(pool, task_id) .await .map_err(|e| { tracing::error!(error = %e, "Failed to load task"); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to load task")), ) })? .ok_or_else(|| { ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) })?; // Only directive-attached tasks may use this backchannel. if task.directive_id.is_none() { return Err(( StatusCode::FORBIDDEN, Json(ApiError::new( "NOT_DIRECTIVE_TASK", "Only directive-attached tasks can use these endpoints", )), )); } Ok((task_id, task.owner_id)) } // ============================================================================= // Question types // ============================================================================= #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct AskQuestionRequest { pub question: String, #[serde(default)] pub choices: Vec, pub context: Option, #[serde(default = "default_question_timeout")] pub timeout_seconds: i32, /// When true the request blocks until the user responds (no /// timeout) — the CLI reconnects via the poll endpoint if the /// server-side timeout is reached. #[serde(default)] pub phaseguard: bool, #[serde(default)] pub multi_select: bool, /// Return immediately without waiting for a response. #[serde(default)] pub non_blocking: bool, /// Question type: general, phase_confirmation, contract_complete. #[serde(default = "default_question_type")] pub question_type: String, } fn default_question_type() -> String { "general".to_string() } fn default_question_timeout() -> i32 { 3600 } #[derive(Debug, Serialize, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct AskQuestionResponse { pub question_id: Uuid, pub response: Option, pub timed_out: bool, /// Server-side timeout was reached but the question is still /// pending. CLI should re-poll via `/poll`. #[serde(default)] pub still_pending: bool, } #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct AnswerQuestionRequest { pub response: String, } #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct AnswerQuestionResponse { pub success: bool, } #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct PendingQuestionSummary { pub question_id: Uuid, pub task_id: Uuid, #[serde(skip_serializing_if = "Option::is_none")] pub directive_id: Option, pub question: String, pub choices: Vec, pub context: Option, pub created_at: chrono::DateTime, #[serde(default)] pub multi_select: bool, #[serde(default)] pub question_type: String, } // ============================================================================= // Question handlers // ============================================================================= /// Ask the user a question from a directive task. Blocks until the user /// answers, the timeout fires, or `non_blocking` returns immediately. #[utoipa::path( post, path = "/api/v1/mesh/supervisor/questions", request_body = AskQuestionRequest, responses( (status = 200, description = "Question asked", body = AskQuestionResponse), (status = 401, description = "Unauthorized"), (status = 403, description = "Not a directive task"), ), security(("tool_key" = [])), tag = "Mesh Supervisor" )] pub async fn ask_question( State(state): State, headers: HeaderMap, Json(request): Json, ) -> impl IntoResponse { let (task_id, owner_id) = match verify_task_auth(&state, &headers).await { Ok(ids) => ids, Err(e) => return e.into_response(), }; let pool = state.db_pool.as_ref().unwrap(); // Pull the directive_id off the calling task so subscribers can // route the question to the right directive view. let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(); } Err(e) => { tracing::error!(error = %e, "Failed to fetch task"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to fetch task")), ) .into_response(); } }; let directive_id = task.directive_id; // Reconcile mode controls block-vs-timeout behaviour on directive // tasks: semi-auto / manual block indefinitely (effectively // phaseguard); auto times out after 30s. let reconcile_mode: String = match directive_id { Some(did) => match repository::get_directive_for_owner(pool, owner_id, did).await { Ok(Some(d)) => d.reconcile_mode.clone(), _ => "auto".to_string(), }, None => "auto".to_string(), }; let question_id = state.add_supervisor_question( task_id, directive_id, owner_id, request.question.clone(), request.choices.clone(), request.context.clone(), request.multi_select, request.question_type.clone(), ); if request.non_blocking { return ( StatusCode::OK, Json(AskQuestionResponse { question_id, response: None, timed_out: false, still_pending: false, }), ) .into_response(); } // Determine block behaviour. let use_phaseguard = request.phaseguard || reconcile_mode == "semi-auto" || reconcile_mode == "manual"; let timeout_secs = if use_phaseguard { 300 } else if reconcile_mode == "auto" { 30 } else { request.timeout_seconds.max(1) as u64 }; let timeout_duration = std::time::Duration::from_secs(timeout_secs); let start = std::time::Instant::now(); let poll_interval = std::time::Duration::from_millis(500); loop { if let Some(response) = state.get_question_response(question_id) { state.cleanup_question_response(question_id); return ( StatusCode::OK, Json(AskQuestionResponse { question_id, response: Some(response.response), timed_out: false, still_pending: false, }), ) .into_response(); } if start.elapsed() >= timeout_duration { if use_phaseguard { return ( StatusCode::OK, Json(AskQuestionResponse { question_id, response: None, timed_out: false, still_pending: true, }), ) .into_response(); } state.remove_pending_question(question_id); return ( StatusCode::REQUEST_TIMEOUT, Json(AskQuestionResponse { question_id, response: None, timed_out: true, still_pending: false, }), ) .into_response(); } tokio::time::sleep(poll_interval).await; } } /// Re-poll a question by id. Used by the CLI to reconnect after /// `still_pending` from `ask_question`. Blocks up to 5 minutes. #[utoipa::path( get, path = "/api/v1/mesh/supervisor/questions/{question_id}/poll", params(("question_id" = Uuid, Path, description = "Question id")), responses( (status = 200, description = "Answered or still pending", body = AskQuestionResponse), (status = 404, description = "Not found"), ), security(("tool_key" = [])), tag = "Mesh Supervisor" )] pub async fn poll_question( State(state): State, headers: HeaderMap, Path(question_id): Path, ) -> impl IntoResponse { if verify_task_auth(&state, &headers).await.is_err() { return ( StatusCode::UNAUTHORIZED, Json(ApiError::new("UNAUTHORIZED", "Tool key required")), ) .into_response(); } if let Some(response) = state.get_question_response(question_id) { state.cleanup_question_response(question_id); return ( StatusCode::OK, Json(AskQuestionResponse { question_id, response: Some(response.response), timed_out: false, still_pending: false, }), ) .into_response(); } if state.get_pending_question(question_id).is_none() { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Question not found")), ) .into_response(); } let timeout = std::time::Duration::from_secs(300); let start = std::time::Instant::now(); let poll_interval = std::time::Duration::from_millis(500); loop { if let Some(response) = state.get_question_response(question_id) { state.cleanup_question_response(question_id); return ( StatusCode::OK, Json(AskQuestionResponse { question_id, response: Some(response.response), timed_out: false, still_pending: false, }), ) .into_response(); } if start.elapsed() >= timeout { return ( StatusCode::OK, Json(AskQuestionResponse { question_id, response: None, timed_out: false, still_pending: true, }), ) .into_response(); } tokio::time::sleep(poll_interval).await; } } /// List currently-pending questions for the caller. #[utoipa::path( get, path = "/api/v1/mesh/questions", responses( (status = 200, description = "Pending questions", body = Vec), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Mesh" )] pub async fn list_pending_questions( State(state): State, Authenticated(auth): Authenticated, ) -> impl IntoResponse { let questions: Vec = state .get_pending_questions_for_owner(auth.owner_id) .into_iter() .map(|q| PendingQuestionSummary { question_id: q.question_id, task_id: q.task_id, directive_id: q.directive_id, question: q.question, choices: q.choices, context: q.context, created_at: q.created_at, multi_select: q.multi_select, question_type: q.question_type, }) .collect(); Json(questions).into_response() } /// Answer a pending question. #[utoipa::path( post, path = "/api/v1/mesh/questions/{question_id}/answer", params(("question_id" = Uuid, Path, description = "Question id")), request_body = AnswerQuestionRequest, responses( (status = 200, description = "Answered", body = AnswerQuestionResponse), (status = 404, description = "Not found"), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Mesh" )] pub async fn answer_question( State(state): State, Authenticated(auth): Authenticated, Path(question_id): Path, Json(req): Json, ) -> impl IntoResponse { // Ownership check: only the owner of the question can answer it. let question = match state.get_pending_question(question_id) { Some(q) => q, None => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Question not found")), ) .into_response(); } }; if question.owner_id != auth.owner_id { return ( StatusCode::FORBIDDEN, Json(ApiError::new("FORBIDDEN", "Not your question")), ) .into_response(); } if state.submit_question_response(question_id, req.response) { Json(AnswerQuestionResponse { success: true }).into_response() } else { ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Question not found")), ) .into_response() } } // ============================================================================= // Order creation (from directive tasks) // ============================================================================= #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct CreateOrderForTaskRequest { pub title: String, #[serde(default)] pub description: Option, #[serde(default = "default_order_priority")] pub priority: String, #[serde(default = "default_order_type")] pub order_type: String, #[serde(default = "default_order_labels")] pub labels: serde_json::Value, #[serde(default)] pub repository_url: Option, } fn default_order_priority() -> String { "medium".to_string() } fn default_order_type() -> String { "spike".to_string() } fn default_order_labels() -> serde_json::Value { serde_json::json!([]) } /// Create a follow-up order from a directive task (spike/chore only). #[utoipa::path( post, path = "/api/v1/mesh/supervisor/orders", request_body = CreateOrderForTaskRequest, responses( (status = 201, description = "Order created"), (status = 400, description = "Invalid order type or no directive context"), (status = 401, description = "Unauthorized"), (status = 403, description = "Not a directive task"), ), security(("tool_key" = [])), tag = "Mesh Supervisor" )] pub async fn create_order_for_task( State(state): State, headers: HeaderMap, Json(request): Json, ) -> impl IntoResponse { let (task_id, owner_id) = match verify_task_auth(&state, &headers).await { Ok(ids) => ids, Err(e) => return e.into_response(), }; if request.order_type != "spike" && request.order_type != "chore" { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( "INVALID_ORDER_TYPE", "Only spike and chore order types are allowed from directive tasks", )), ) .into_response(); } let pool = state.db_pool.as_ref().unwrap(); let task = match repository::get_task(pool, task_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(); } Err(e) => { tracing::error!(error = %e, "Failed to fetch task"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to fetch task")), ) .into_response(); } }; let directive_id = match task.directive_id { Some(id) => id, None => { return ( StatusCode::BAD_REQUEST, Json(ApiError::new("NO_DIRECTIVE", "Task is not directive-attached")), ) .into_response(); } }; let repository_url = if request.repository_url.is_some() { request.repository_url } else { match repository::get_directive_for_owner(pool, owner_id, directive_id).await { Ok(Some(d)) => d.repository_url, _ => None, } }; let order_req = CreateOrderRequest { title: request.title, description: request.description, priority: Some(request.priority), status: Some("open".to_string()), order_type: Some(request.order_type), labels: request.labels, directive_id, repository_url, dog_id: None, }; match repository::create_order(pool, owner_id, order_req).await { Ok(order) => ( StatusCode::CREATED, Json(serde_json::json!({ "id": order.id, "title": order.title, "description": order.description, "priority": order.priority, "status": order.status, "orderType": order.order_type, "directiveId": order.directive_id, "labels": order.labels, "repositoryUrl": order.repository_url, "createdAt": order.created_at, })), ) .into_response(), Err(e) => { tracing::error!(error = %e, "Failed to create order"); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", "Failed to create order")), ) .into_response() } } }