//! Question + order backchannel for directive-spawned tasks.
//!
//! Originally a much larger handler that orchestrated contract-supervisor
//! task trees (spawn / wait / merge / PR / etc.). Legacy contracts and
//! supervisor tasks have been removed; what remains is the in-memory
//! question machinery (`makima directive ask`) and order creation
//! (`makima directive create-order`).
//!
//! Module name is kept as `mesh_supervisor` for route-path stability —
//! the CLI client still hits `/api/v1/mesh/supervisor/...` endpoints.
use axum::{
extract::{Path, State},
http::{HeaderMap, StatusCode},
response::IntoResponse,
Json,
};
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use uuid::Uuid;
use crate::db::models::CreateOrderRequest;
use crate::db::repository;
use crate::server::auth::Authenticated;
use crate::server::handlers::mesh::{extract_auth, AuthSource};
use crate::server::messages::ApiError;
use crate::server::state::SharedState;
// =============================================================================
// Auth helper
// =============================================================================
/// Verify the request comes from a directive task (tool-key auth) and
/// return the calling task id + owner id.
async fn verify_task_auth(
state: &SharedState,
headers: &HeaderMap,
) -> Result<(Uuid, Uuid), (StatusCode, Json<ApiError>)> {
let auth = extract_auth(state, headers);
let task_id = match auth {
AuthSource::ToolKey(task_id) => task_id,
_ => {
return Err((
StatusCode::UNAUTHORIZED,
Json(ApiError::new("UNAUTHORIZED", "These endpoints require tool key auth")),
));
}
};
let pool = state.db_pool.as_ref().ok_or_else(|| {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
})?;
let task = repository::get_task(pool, task_id)
.await
.map_err(|e| {
tracing::error!(error = %e, "Failed to load task");
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to load task")),
)
})?
.ok_or_else(|| {
(
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
})?;
// Only directive-attached tasks may use this backchannel.
if task.directive_id.is_none() {
return Err((
StatusCode::FORBIDDEN,
Json(ApiError::new(
"NOT_DIRECTIVE_TASK",
"Only directive-attached tasks can use these endpoints",
)),
));
}
Ok((task_id, task.owner_id))
}
// =============================================================================
// Question types
// =============================================================================
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct AskQuestionRequest {
pub question: String,
#[serde(default)]
pub choices: Vec<String>,
pub context: Option<String>,
#[serde(default = "default_question_timeout")]
pub timeout_seconds: i32,
/// When true the request blocks until the user responds (no
/// timeout) — the CLI reconnects via the poll endpoint if the
/// server-side timeout is reached.
#[serde(default)]
pub phaseguard: bool,
#[serde(default)]
pub multi_select: bool,
/// Return immediately without waiting for a response.
#[serde(default)]
pub non_blocking: bool,
/// Question type: general, phase_confirmation, contract_complete.
#[serde(default = "default_question_type")]
pub question_type: String,
}
fn default_question_type() -> String {
"general".to_string()
}
fn default_question_timeout() -> i32 {
3600
}
#[derive(Debug, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct AskQuestionResponse {
pub question_id: Uuid,
pub response: Option<String>,
pub timed_out: bool,
/// Server-side timeout was reached but the question is still
/// pending. CLI should re-poll via `/poll`.
#[serde(default)]
pub still_pending: bool,
}
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct AnswerQuestionRequest {
pub response: String,
}
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct AnswerQuestionResponse {
pub success: bool,
}
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct PendingQuestionSummary {
pub question_id: Uuid,
pub task_id: Uuid,
#[serde(skip_serializing_if = "Option::is_none")]
pub directive_id: Option<Uuid>,
pub question: String,
pub choices: Vec<String>,
pub context: Option<String>,
pub created_at: chrono::DateTime<chrono::Utc>,
#[serde(default)]
pub multi_select: bool,
#[serde(default)]
pub question_type: String,
}
// =============================================================================
// Question handlers
// =============================================================================
/// Ask the user a question from a directive task. Blocks until the user
/// answers, the timeout fires, or `non_blocking` returns immediately.
#[utoipa::path(
post,
path = "/api/v1/mesh/supervisor/questions",
request_body = AskQuestionRequest,
responses(
(status = 200, description = "Question asked", body = AskQuestionResponse),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Not a directive task"),
),
security(("tool_key" = [])),
tag = "Mesh Supervisor"
)]
pub async fn ask_question(
State(state): State<SharedState>,
headers: HeaderMap,
Json(request): Json<AskQuestionRequest>,
) -> impl IntoResponse {
let (task_id, owner_id) = match verify_task_auth(&state, &headers).await {
Ok(ids) => ids,
Err(e) => return e.into_response(),
};
let pool = state.db_pool.as_ref().unwrap();
// Pull the directive_id off the calling task so subscribers can
// route the question to the right directive view.
let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!(error = %e, "Failed to fetch task");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to fetch task")),
)
.into_response();
}
};
let directive_id = task.directive_id;
// Reconcile mode controls block-vs-timeout behaviour on directive
// tasks: semi-auto / manual block indefinitely (effectively
// phaseguard); auto times out after 30s.
let reconcile_mode: String = match directive_id {
Some(did) => match repository::get_directive_for_owner(pool, owner_id, did).await {
Ok(Some(d)) => d.reconcile_mode.clone(),
_ => "auto".to_string(),
},
None => "auto".to_string(),
};
let question_id = state.add_supervisor_question(
task_id,
directive_id,
owner_id,
request.question.clone(),
request.choices.clone(),
request.context.clone(),
request.multi_select,
request.question_type.clone(),
);
if request.non_blocking {
return (
StatusCode::OK,
Json(AskQuestionResponse {
question_id,
response: None,
timed_out: false,
still_pending: false,
}),
)
.into_response();
}
// Determine block behaviour.
let use_phaseguard =
request.phaseguard || reconcile_mode == "semi-auto" || reconcile_mode == "manual";
let timeout_secs = if use_phaseguard {
300
} else if reconcile_mode == "auto" {
30
} else {
request.timeout_seconds.max(1) as u64
};
let timeout_duration = std::time::Duration::from_secs(timeout_secs);
let start = std::time::Instant::now();
let poll_interval = std::time::Duration::from_millis(500);
loop {
if let Some(response) = state.get_question_response(question_id) {
state.cleanup_question_response(question_id);
return (
StatusCode::OK,
Json(AskQuestionResponse {
question_id,
response: Some(response.response),
timed_out: false,
still_pending: false,
}),
)
.into_response();
}
if start.elapsed() >= timeout_duration {
if use_phaseguard {
return (
StatusCode::OK,
Json(AskQuestionResponse {
question_id,
response: None,
timed_out: false,
still_pending: true,
}),
)
.into_response();
}
state.remove_pending_question(question_id);
return (
StatusCode::REQUEST_TIMEOUT,
Json(AskQuestionResponse {
question_id,
response: None,
timed_out: true,
still_pending: false,
}),
)
.into_response();
}
tokio::time::sleep(poll_interval).await;
}
}
/// Re-poll a question by id. Used by the CLI to reconnect after
/// `still_pending` from `ask_question`. Blocks up to 5 minutes.
#[utoipa::path(
get,
path = "/api/v1/mesh/supervisor/questions/{question_id}/poll",
params(("question_id" = Uuid, Path, description = "Question id")),
responses(
(status = 200, description = "Answered or still pending", body = AskQuestionResponse),
(status = 404, description = "Not found"),
),
security(("tool_key" = [])),
tag = "Mesh Supervisor"
)]
pub async fn poll_question(
State(state): State<SharedState>,
headers: HeaderMap,
Path(question_id): Path<Uuid>,
) -> impl IntoResponse {
if verify_task_auth(&state, &headers).await.is_err() {
return (
StatusCode::UNAUTHORIZED,
Json(ApiError::new("UNAUTHORIZED", "Tool key required")),
)
.into_response();
}
if let Some(response) = state.get_question_response(question_id) {
state.cleanup_question_response(question_id);
return (
StatusCode::OK,
Json(AskQuestionResponse {
question_id,
response: Some(response.response),
timed_out: false,
still_pending: false,
}),
)
.into_response();
}
if state.get_pending_question(question_id).is_none() {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Question not found")),
)
.into_response();
}
let timeout = std::time::Duration::from_secs(300);
let start = std::time::Instant::now();
let poll_interval = std::time::Duration::from_millis(500);
loop {
if let Some(response) = state.get_question_response(question_id) {
state.cleanup_question_response(question_id);
return (
StatusCode::OK,
Json(AskQuestionResponse {
question_id,
response: Some(response.response),
timed_out: false,
still_pending: false,
}),
)
.into_response();
}
if start.elapsed() >= timeout {
return (
StatusCode::OK,
Json(AskQuestionResponse {
question_id,
response: None,
timed_out: false,
still_pending: true,
}),
)
.into_response();
}
tokio::time::sleep(poll_interval).await;
}
}
/// List currently-pending questions for the caller.
#[utoipa::path(
get,
path = "/api/v1/mesh/questions",
responses(
(status = 200, description = "Pending questions", body = Vec<PendingQuestionSummary>),
),
security(("bearer_auth" = []), ("api_key" = [])),
tag = "Mesh"
)]
pub async fn list_pending_questions(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
) -> impl IntoResponse {
let questions: Vec<PendingQuestionSummary> = state
.get_pending_questions_for_owner(auth.owner_id)
.into_iter()
.map(|q| PendingQuestionSummary {
question_id: q.question_id,
task_id: q.task_id,
directive_id: q.directive_id,
question: q.question,
choices: q.choices,
context: q.context,
created_at: q.created_at,
multi_select: q.multi_select,
question_type: q.question_type,
})
.collect();
Json(questions).into_response()
}
/// Answer a pending question.
#[utoipa::path(
post,
path = "/api/v1/mesh/questions/{question_id}/answer",
params(("question_id" = Uuid, Path, description = "Question id")),
request_body = AnswerQuestionRequest,
responses(
(status = 200, description = "Answered", body = AnswerQuestionResponse),
(status = 404, description = "Not found"),
),
security(("bearer_auth" = []), ("api_key" = [])),
tag = "Mesh"
)]
pub async fn answer_question(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(question_id): Path<Uuid>,
Json(req): Json<AnswerQuestionRequest>,
) -> impl IntoResponse {
// Ownership check: only the owner of the question can answer it.
let question = match state.get_pending_question(question_id) {
Some(q) => q,
None => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Question not found")),
)
.into_response();
}
};
if question.owner_id != auth.owner_id {
return (
StatusCode::FORBIDDEN,
Json(ApiError::new("FORBIDDEN", "Not your question")),
)
.into_response();
}
if state.submit_question_response(question_id, req.response) {
Json(AnswerQuestionResponse { success: true }).into_response()
} else {
(
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Question not found")),
)
.into_response()
}
}
// =============================================================================
// Order creation (from directive tasks)
// =============================================================================
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateOrderForTaskRequest {
pub title: String,
#[serde(default)]
pub description: Option<String>,
#[serde(default = "default_order_priority")]
pub priority: String,
#[serde(default = "default_order_type")]
pub order_type: String,
#[serde(default = "default_order_labels")]
pub labels: serde_json::Value,
#[serde(default)]
pub repository_url: Option<String>,
}
fn default_order_priority() -> String {
"medium".to_string()
}
fn default_order_type() -> String {
"spike".to_string()
}
fn default_order_labels() -> serde_json::Value {
serde_json::json!([])
}
/// Create a follow-up order from a directive task (spike/chore only).
#[utoipa::path(
post,
path = "/api/v1/mesh/supervisor/orders",
request_body = CreateOrderForTaskRequest,
responses(
(status = 201, description = "Order created"),
(status = 400, description = "Invalid order type or no directive context"),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Not a directive task"),
),
security(("tool_key" = [])),
tag = "Mesh Supervisor"
)]
pub async fn create_order_for_task(
State(state): State<SharedState>,
headers: HeaderMap,
Json(request): Json<CreateOrderForTaskRequest>,
) -> impl IntoResponse {
let (task_id, owner_id) = match verify_task_auth(&state, &headers).await {
Ok(ids) => ids,
Err(e) => return e.into_response(),
};
if request.order_type != "spike" && request.order_type != "chore" {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"INVALID_ORDER_TYPE",
"Only spike and chore order types are allowed from directive tasks",
)),
)
.into_response();
}
let pool = state.db_pool.as_ref().unwrap();
let task = match repository::get_task(pool, task_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!(error = %e, "Failed to fetch task");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to fetch task")),
)
.into_response();
}
};
let directive_id = match task.directive_id {
Some(id) => id,
None => {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new("NO_DIRECTIVE", "Task is not directive-attached")),
)
.into_response();
}
};
let repository_url = if request.repository_url.is_some() {
request.repository_url
} else {
match repository::get_directive_for_owner(pool, owner_id, directive_id).await {
Ok(Some(d)) => d.repository_url,
_ => None,
}
};
let order_req = CreateOrderRequest {
title: request.title,
description: request.description,
priority: Some(request.priority),
status: Some("open".to_string()),
order_type: Some(request.order_type),
labels: request.labels,
directive_id,
repository_url,
dog_id: None,
};
match repository::create_order(pool, owner_id, order_req).await {
Ok(order) => (
StatusCode::CREATED,
Json(serde_json::json!({
"id": order.id,
"title": order.title,
"description": order.description,
"priority": order.priority,
"status": order.status,
"orderType": order.order_type,
"directiveId": order.directive_id,
"labels": order.labels,
"repositoryUrl": order.repository_url,
"createdAt": order.created_at,
})),
)
.into_response(),
Err(e) => {
tracing::error!(error = %e, "Failed to create order");
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to create order")),
)
.into_response()
}
}
}