diff options
Diffstat (limited to 'makima/src/server/handlers/mesh_supervisor.rs')
| -rw-r--r-- | makima/src/server/handlers/mesh_supervisor.rs | 309 |
1 files changed, 304 insertions, 5 deletions
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index 0ea1a57..8c36500 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use uuid::Uuid; -use crate::db::models::{CreateTaskRequest, PendingQuestion, Task, TaskSummary, UpdateTaskRequest}; +use crate::db::models::{CreateOrderRequest, CreateTaskRequest, PendingQuestion, Task, TaskSummary, UpdateTaskRequest}; use crate::db::repository; use sqlx::PgPool; use crate::server::auth::Authenticated; @@ -95,7 +95,7 @@ fn default_question_timeout() -> i32 { } /// Response from asking a question. -#[derive(Debug, Serialize, ToSchema)] +#[derive(Debug, Serialize, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct AskQuestionResponse { /// The question ID for tracking @@ -104,6 +104,10 @@ pub struct AskQuestionResponse { pub response: Option<String>, /// Whether the question timed out pub timed_out: bool, + /// Whether the question is still pending (server-side timeout reached but question not removed). + /// The client should poll the poll endpoint to continue waiting. + #[serde(default)] + pub still_pending: bool, } /// Request to answer a supervisor question. @@ -1803,6 +1807,7 @@ pub async fn ask_question( question_id, response: None, timed_out: false, + still_pending: false, }), ).into_response(); } @@ -1815,8 +1820,9 @@ pub async fn ask_question( // - Directive tasks without reconcile mode: 30s default timeout // - Contract tasks: use requested timeout_seconds let timeout_secs = if use_phaseguard { - // Block indefinitely until user responds - u64::MAX / 2 + // Cap at 5 minutes per HTTP request (well under Claude Code's 10-min limit). + // The CLI will automatically reconnect via the poll endpoint. + 300 } else if is_directive_context && !directive_reconcile_mode { 30 } else { @@ -1844,13 +1850,28 @@ pub async fn ask_question( question_id, response: Some(response.response), timed_out: false, + still_pending: false, }), ).into_response(); } // Check timeout if start.elapsed() >= timeout_duration { - // Remove the pending question on timeout + if use_phaseguard { + // Phaseguard/reconcile: DON'T remove the pending question. + // Return still_pending so the CLI can reconnect via the poll endpoint. + return ( + StatusCode::OK, + Json(AskQuestionResponse { + question_id, + response: None, + timed_out: false, + still_pending: true, + }), + ).into_response(); + } + + // Non-phaseguard: remove the pending question on timeout state.remove_pending_question(question_id); // Clear pending question from supervisor state on timeout (Task 3.3) @@ -1865,6 +1886,125 @@ pub async fn ask_question( question_id, response: None, timed_out: true, + still_pending: false, + }), + ).into_response(); + } + + // Wait before polling again + tokio::time::sleep(poll_interval).await; + } +} + +/// Poll for a question response by question_id. +/// +/// Used by the CLI to reconnect after a still_pending response from ask_question. +/// Blocks for up to 5 minutes polling every 500ms. Returns still_pending if timeout +/// is reached without a response, allowing the CLI to reconnect again. +#[utoipa::path( + get, + path = "/api/v1/mesh/supervisor/questions/{question_id}/poll", + params( + ("question_id" = Uuid, Path, description = "The question ID to poll for"), + ), + responses( + (status = 200, description = "Question answered or still pending", body = AskQuestionResponse), + (status = 404, description = "Question not found"), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - not a supervisor"), + ), + security( + ("tool_key" = []) + ), + tag = "Mesh Supervisor" +)] +pub async fn poll_question( + State(state): State<SharedState>, + headers: HeaderMap, + Path(question_id): Path<Uuid>, +) -> impl IntoResponse { + let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { + Ok(ids) => ids, + Err(e) => return e.into_response(), + }; + + // Check if a response is already available + if let Some(response) = state.get_question_response(question_id) { + state.cleanup_question_response(question_id); + + // Clear pending question from supervisor state for contract context + let pool = state.db_pool.as_ref().unwrap(); + if let Ok(Some(task)) = repository::get_task_for_owner(pool, supervisor_id, owner_id).await { + if let Some(cid) = task.contract_id { + clear_pending_question(pool, cid, question_id).await; + } + } + + return ( + StatusCode::OK, + Json(AskQuestionResponse { + question_id, + response: Some(response.response), + timed_out: false, + still_pending: false, + }), + ).into_response(); + } + + // Check if the question exists at all (pending or response) + if !state.has_pending_question(question_id) { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Question not found or already answered")), + ).into_response(); + } + + // Block for up to 5 minutes polling every 500ms + let timeout_duration = std::time::Duration::from_secs(300); + let start = std::time::Instant::now(); + let poll_interval = std::time::Duration::from_millis(500); + + loop { + // Check if response has been submitted + if let Some(response) = state.get_question_response(question_id) { + state.cleanup_question_response(question_id); + + // Clear pending question from supervisor state for contract context + let pool = state.db_pool.as_ref().unwrap(); + if let Ok(Some(task)) = repository::get_task_for_owner(pool, supervisor_id, owner_id).await { + if let Some(cid) = task.contract_id { + clear_pending_question(pool, cid, question_id).await; + } + } + + return ( + StatusCode::OK, + Json(AskQuestionResponse { + question_id, + response: Some(response.response), + timed_out: false, + still_pending: false, + }), + ).into_response(); + } + + // Check if the question was removed (e.g., task deleted) + if !state.has_pending_question(question_id) { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Question no longer exists")), + ).into_response(); + } + + // Check timeout + if start.elapsed() >= timeout_duration { + return ( + StatusCode::OK, + Json(AskQuestionResponse { + question_id, + response: None, + timed_out: false, + still_pending: true, }), ).into_response(); } @@ -2952,3 +3092,162 @@ pub fn generate_restoration_context_message(context: &SupervisorRestorationConte message } + +// ============================================================================= +// Order Creation from Directive Tasks +// ============================================================================= + +/// Request to create an order from a directive task. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreateOrderForTaskRequest { + pub title: String, + #[serde(default)] + pub description: Option<String>, + #[serde(default = "default_order_priority")] + pub priority: String, + #[serde(default = "default_order_type")] + pub order_type: String, + #[serde(default = "default_order_labels")] + pub labels: serde_json::Value, + #[serde(default)] + pub repository_url: Option<String>, +} + +fn default_order_priority() -> String { + "medium".to_string() +} + +fn default_order_type() -> String { + "spike".to_string() +} + +fn default_order_labels() -> serde_json::Value { + serde_json::json!([]) +} + +/// Create an order for future work from a directive task. +/// +/// Only spike and chore order types are allowed. The order is automatically +/// linked to the directive associated with the calling task. +#[utoipa::path( + post, + path = "/api/v1/mesh/supervisor/orders", + request_body = CreateOrderForTaskRequest, + responses( + (status = 201, description = "Order created"), + (status = 400, description = "Invalid order type"), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - not a supervisor/directive task"), + (status = 500, description = "Internal server error"), + ), + tag = "Mesh Supervisor" +)] +pub async fn create_order_for_task( + State(state): State<SharedState>, + headers: HeaderMap, + Json(request): Json<CreateOrderForTaskRequest>, +) -> impl IntoResponse { + let (task_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { + Ok(ids) => ids, + Err(e) => return e.into_response(), + }; + + let pool = state.db_pool.as_ref().unwrap(); + + // Validate order_type is spike or chore + if request.order_type != "spike" && request.order_type != "chore" { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "INVALID_ORDER_TYPE", + "Only spike and chore order types are allowed from directive tasks", + )), + ) + .into_response(); + } + + // Get the task to find its directive_id + let task = match repository::get_task(pool, task_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!(error = %e, "Failed to get task"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to get task")), + ) + .into_response(); + } + }; + + let directive_id = match task.directive_id { + Some(id) => id, + None => { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "NO_DIRECTIVE", + "Task is not associated with a directive", + )), + ) + .into_response(); + } + }; + + // Determine repository_url: use request value, or fall back to directive's repository_url + let repository_url = if request.repository_url.is_some() { + request.repository_url + } else { + // Look up directive for its repository_url + match repository::get_directive_for_owner(pool, owner_id, directive_id).await { + Ok(Some(d)) => d.repository_url, + _ => None, + } + }; + + // Create the order + let order_req = CreateOrderRequest { + title: request.title, + description: request.description, + priority: Some(request.priority), + status: Some("open".to_string()), + order_type: Some(request.order_type), + labels: request.labels, + directive_id, + repository_url, + }; + + match repository::create_order(pool, owner_id, order_req).await { + Ok(order) => ( + StatusCode::CREATED, + Json(serde_json::json!({ + "id": order.id, + "title": order.title, + "description": order.description, + "priority": order.priority, + "status": order.status, + "orderType": order.order_type, + "directiveId": order.directive_id, + "labels": order.labels, + "repositoryUrl": order.repository_url, + "createdAt": order.created_at, + })), + ) + .into_response(), + Err(e) => { + tracing::error!(error = %e, "Failed to create order"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to create order")), + ) + .into_response() + } + } +} |
