summaryrefslogtreecommitdiff
path: root/makima/src/server/handlers/mesh_supervisor.rs
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/server/handlers/mesh_supervisor.rs')
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs309
1 files changed, 304 insertions, 5 deletions
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index 0ea1a57..8c36500 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use uuid::Uuid;
-use crate::db::models::{CreateTaskRequest, PendingQuestion, Task, TaskSummary, UpdateTaskRequest};
+use crate::db::models::{CreateOrderRequest, CreateTaskRequest, PendingQuestion, Task, TaskSummary, UpdateTaskRequest};
use crate::db::repository;
use sqlx::PgPool;
use crate::server::auth::Authenticated;
@@ -95,7 +95,7 @@ fn default_question_timeout() -> i32 {
}
/// Response from asking a question.
-#[derive(Debug, Serialize, ToSchema)]
+#[derive(Debug, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct AskQuestionResponse {
/// The question ID for tracking
@@ -104,6 +104,10 @@ pub struct AskQuestionResponse {
pub response: Option<String>,
/// Whether the question timed out
pub timed_out: bool,
+ /// Whether the question is still pending (server-side timeout reached but question not removed).
+ /// The client should poll the poll endpoint to continue waiting.
+ #[serde(default)]
+ pub still_pending: bool,
}
/// Request to answer a supervisor question.
@@ -1803,6 +1807,7 @@ pub async fn ask_question(
question_id,
response: None,
timed_out: false,
+ still_pending: false,
}),
).into_response();
}
@@ -1815,8 +1820,9 @@ pub async fn ask_question(
// - Directive tasks without reconcile mode: 30s default timeout
// - Contract tasks: use requested timeout_seconds
let timeout_secs = if use_phaseguard {
- // Block indefinitely until user responds
- u64::MAX / 2
+ // Cap at 5 minutes per HTTP request (well under Claude Code's 10-min limit).
+ // The CLI will automatically reconnect via the poll endpoint.
+ 300
} else if is_directive_context && !directive_reconcile_mode {
30
} else {
@@ -1844,13 +1850,28 @@ pub async fn ask_question(
question_id,
response: Some(response.response),
timed_out: false,
+ still_pending: false,
}),
).into_response();
}
// Check timeout
if start.elapsed() >= timeout_duration {
- // Remove the pending question on timeout
+ if use_phaseguard {
+ // Phaseguard/reconcile: DON'T remove the pending question.
+ // Return still_pending so the CLI can reconnect via the poll endpoint.
+ return (
+ StatusCode::OK,
+ Json(AskQuestionResponse {
+ question_id,
+ response: None,
+ timed_out: false,
+ still_pending: true,
+ }),
+ ).into_response();
+ }
+
+ // Non-phaseguard: remove the pending question on timeout
state.remove_pending_question(question_id);
// Clear pending question from supervisor state on timeout (Task 3.3)
@@ -1865,6 +1886,125 @@ pub async fn ask_question(
question_id,
response: None,
timed_out: true,
+ still_pending: false,
+ }),
+ ).into_response();
+ }
+
+ // Wait before polling again
+ tokio::time::sleep(poll_interval).await;
+ }
+}
+
+/// Poll for a question response by question_id.
+///
+/// Used by the CLI to reconnect after a still_pending response from ask_question.
+/// Blocks for up to 5 minutes polling every 500ms. Returns still_pending if timeout
+/// is reached without a response, allowing the CLI to reconnect again.
+#[utoipa::path(
+ get,
+ path = "/api/v1/mesh/supervisor/questions/{question_id}/poll",
+ params(
+ ("question_id" = Uuid, Path, description = "The question ID to poll for"),
+ ),
+ responses(
+ (status = 200, description = "Question answered or still pending", body = AskQuestionResponse),
+ (status = 404, description = "Question not found"),
+ (status = 401, description = "Unauthorized"),
+ (status = 403, description = "Forbidden - not a supervisor"),
+ ),
+ security(
+ ("tool_key" = [])
+ ),
+ tag = "Mesh Supervisor"
+)]
+pub async fn poll_question(
+ State(state): State<SharedState>,
+ headers: HeaderMap,
+ Path(question_id): Path<Uuid>,
+) -> impl IntoResponse {
+ let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
+ Ok(ids) => ids,
+ Err(e) => return e.into_response(),
+ };
+
+ // Check if a response is already available
+ if let Some(response) = state.get_question_response(question_id) {
+ state.cleanup_question_response(question_id);
+
+ // Clear pending question from supervisor state for contract context
+ let pool = state.db_pool.as_ref().unwrap();
+ if let Ok(Some(task)) = repository::get_task_for_owner(pool, supervisor_id, owner_id).await {
+ if let Some(cid) = task.contract_id {
+ clear_pending_question(pool, cid, question_id).await;
+ }
+ }
+
+ return (
+ StatusCode::OK,
+ Json(AskQuestionResponse {
+ question_id,
+ response: Some(response.response),
+ timed_out: false,
+ still_pending: false,
+ }),
+ ).into_response();
+ }
+
+ // Check if the question exists at all (pending or response)
+ if !state.has_pending_question(question_id) {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Question not found or already answered")),
+ ).into_response();
+ }
+
+ // Block for up to 5 minutes polling every 500ms
+ let timeout_duration = std::time::Duration::from_secs(300);
+ let start = std::time::Instant::now();
+ let poll_interval = std::time::Duration::from_millis(500);
+
+ loop {
+ // Check if response has been submitted
+ if let Some(response) = state.get_question_response(question_id) {
+ state.cleanup_question_response(question_id);
+
+ // Clear pending question from supervisor state for contract context
+ let pool = state.db_pool.as_ref().unwrap();
+ if let Ok(Some(task)) = repository::get_task_for_owner(pool, supervisor_id, owner_id).await {
+ if let Some(cid) = task.contract_id {
+ clear_pending_question(pool, cid, question_id).await;
+ }
+ }
+
+ return (
+ StatusCode::OK,
+ Json(AskQuestionResponse {
+ question_id,
+ response: Some(response.response),
+ timed_out: false,
+ still_pending: false,
+ }),
+ ).into_response();
+ }
+
+ // Check if the question was removed (e.g., task deleted)
+ if !state.has_pending_question(question_id) {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Question no longer exists")),
+ ).into_response();
+ }
+
+ // Check timeout
+ if start.elapsed() >= timeout_duration {
+ return (
+ StatusCode::OK,
+ Json(AskQuestionResponse {
+ question_id,
+ response: None,
+ timed_out: false,
+ still_pending: true,
}),
).into_response();
}
@@ -2952,3 +3092,162 @@ pub fn generate_restoration_context_message(context: &SupervisorRestorationConte
message
}
+
+// =============================================================================
+// Order Creation from Directive Tasks
+// =============================================================================
+
+/// Request to create an order from a directive task.
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct CreateOrderForTaskRequest {
+ pub title: String,
+ #[serde(default)]
+ pub description: Option<String>,
+ #[serde(default = "default_order_priority")]
+ pub priority: String,
+ #[serde(default = "default_order_type")]
+ pub order_type: String,
+ #[serde(default = "default_order_labels")]
+ pub labels: serde_json::Value,
+ #[serde(default)]
+ pub repository_url: Option<String>,
+}
+
+fn default_order_priority() -> String {
+ "medium".to_string()
+}
+
+fn default_order_type() -> String {
+ "spike".to_string()
+}
+
+fn default_order_labels() -> serde_json::Value {
+ serde_json::json!([])
+}
+
+/// Create an order for future work from a directive task.
+///
+/// Only spike and chore order types are allowed. The order is automatically
+/// linked to the directive associated with the calling task.
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/supervisor/orders",
+ request_body = CreateOrderForTaskRequest,
+ responses(
+ (status = 201, description = "Order created"),
+ (status = 400, description = "Invalid order type"),
+ (status = 401, description = "Unauthorized"),
+ (status = 403, description = "Forbidden - not a supervisor/directive task"),
+ (status = 500, description = "Internal server error"),
+ ),
+ tag = "Mesh Supervisor"
+)]
+pub async fn create_order_for_task(
+ State(state): State<SharedState>,
+ headers: HeaderMap,
+ Json(request): Json<CreateOrderForTaskRequest>,
+) -> impl IntoResponse {
+ let (task_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
+ Ok(ids) => ids,
+ Err(e) => return e.into_response(),
+ };
+
+ let pool = state.db_pool.as_ref().unwrap();
+
+ // Validate order_type is spike or chore
+ if request.order_type != "spike" && request.order_type != "chore" {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "INVALID_ORDER_TYPE",
+ "Only spike and chore order types are allowed from directive tasks",
+ )),
+ )
+ .into_response();
+ }
+
+ // Get the task to find its directive_id
+ let task = match repository::get_task(pool, task_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to get task");
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", "Failed to get task")),
+ )
+ .into_response();
+ }
+ };
+
+ let directive_id = match task.directive_id {
+ Some(id) => id,
+ None => {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "NO_DIRECTIVE",
+ "Task is not associated with a directive",
+ )),
+ )
+ .into_response();
+ }
+ };
+
+ // Determine repository_url: use request value, or fall back to directive's repository_url
+ let repository_url = if request.repository_url.is_some() {
+ request.repository_url
+ } else {
+ // Look up directive for its repository_url
+ match repository::get_directive_for_owner(pool, owner_id, directive_id).await {
+ Ok(Some(d)) => d.repository_url,
+ _ => None,
+ }
+ };
+
+ // Create the order
+ let order_req = CreateOrderRequest {
+ title: request.title,
+ description: request.description,
+ priority: Some(request.priority),
+ status: Some("open".to_string()),
+ order_type: Some(request.order_type),
+ labels: request.labels,
+ directive_id,
+ repository_url,
+ };
+
+ match repository::create_order(pool, owner_id, order_req).await {
+ Ok(order) => (
+ StatusCode::CREATED,
+ Json(serde_json::json!({
+ "id": order.id,
+ "title": order.title,
+ "description": order.description,
+ "priority": order.priority,
+ "status": order.status,
+ "orderType": order.order_type,
+ "directiveId": order.directive_id,
+ "labels": order.labels,
+ "repositoryUrl": order.repository_url,
+ "createdAt": order.created_at,
+ })),
+ )
+ .into_response(),
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to create order");
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", "Failed to create order")),
+ )
+ .into_response()
+ }
+ }
+}