diff options
| author | soryu <soryu@soryu.co> | 2026-02-24 23:37:44 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-02-24 23:37:44 +0000 |
| commit | 5d1fbed2733e93cc2be2e1a89ca022d88bef613f (patch) | |
| tree | 48f66f56c2557b5101a49775e0d964ccd94b516a /makima/src | |
| parent | b9bc33ab69d094d97fd1398aaa39e8e435547d17 (diff) | |
| download | soryu-5d1fbed2733e93cc2be2e1a89ca022d88bef613f.tar.gz soryu-5d1fbed2733e93cc2be2e1a89ca022d88bef613f.zip | |
feat: non-blocking reconcile polling, directive CLI orders & cleanup (#82)v0.3.1
* feat: soryu-co/soryu - makima: Remove aarch64-unknown-linux-gnu from release workflow
* WIP: heartbeat checkpoint
* WIP: heartbeat checkpoint
* WIP: heartbeat checkpoint
* feat: soryu-co/soryu - makima: Implement non-blocking ask with client-side polling for reconcile mode
Diffstat (limited to 'makima/src')
| -rw-r--r-- | makima/src/bin/makima.rs | 52 | ||||
| -rw-r--r-- | makima/src/daemon/api/supervisor.rs | 31 | ||||
| -rw-r--r-- | makima/src/daemon/cli/directive.rs | 27 | ||||
| -rw-r--r-- | makima/src/daemon/cli/mod.rs | 3 | ||||
| -rw-r--r-- | makima/src/daemon/skills/directive.md | 27 | ||||
| -rw-r--r-- | makima/src/orchestration/directive.rs | 20 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_supervisor.rs | 309 | ||||
| -rw-r--r-- | makima/src/server/mod.rs | 3 | ||||
| -rw-r--r-- | makima/src/server/state.rs | 5 |
9 files changed, 470 insertions, 7 deletions
diff --git a/makima/src/bin/makima.rs b/makima/src/bin/makima.rs index aaf5a08..c4183f3 100644 --- a/makima/src/bin/makima.rs +++ b/makima/src/bin/makima.rs @@ -810,6 +810,58 @@ async fn run_directive( let result = client .supervisor_ask(&args.question, choices, args.context, args.timeout, args.phaseguard, args.multi_select, args.non_blocking, args.question_type) .await?; + let mut response_value = result.0; + + // If the server returned still_pending, poll until we get a real response + while response_value.get("stillPending").and_then(|v| v.as_bool()).unwrap_or(false) { + // Extract question_id for polling + let question_id_str = response_value.get("questionId") + .and_then(|v| v.as_str()) + .ok_or_else(|| { + Box::<dyn std::error::Error + Send + Sync>::from( + "Missing questionId in still_pending response" + ) + })?; + let question_id: uuid::Uuid = question_id_str.parse().map_err(|e| { + Box::<dyn std::error::Error + Send + Sync>::from( + format!("Invalid questionId: {}", e) + ) + })?; + + eprintln!("Waiting for user response (polling)..."); + let poll_result = client.supervisor_poll_question(question_id).await?; + response_value = poll_result.0; + } + + println!("{}", serde_json::to_string(&response_value)?); + } + DirectiveCommand::CreateOrder(args) => { + // Validate order_type is spike or chore + if args.order_type != "spike" && args.order_type != "chore" { + eprintln!("Error: Only 'spike' and 'chore' order types are allowed. Got: '{}'", args.order_type); + std::process::exit(1); + } + let client = ApiClient::new(args.common.api_url.clone(), args.common.api_key.clone())?; + eprintln!("Creating order: {}...", args.title); + let labels = args + .labels + .map(|l| { + serde_json::Value::Array( + l.split(',') + .map(|s| serde_json::Value::String(s.trim().to_string())) + .collect(), + ) + }) + .unwrap_or_else(|| serde_json::json!([])); + let req = makima::daemon::api::supervisor::CreateOrderRequest { + title: args.title, + description: args.description, + priority: args.priority, + order_type: args.order_type, + labels, + repository_url: None, + }; + let result = client.create_order(&req).await?; println!("{}", serde_json::to_string(&result.0)?); } } diff --git a/makima/src/daemon/api/supervisor.rs b/makima/src/daemon/api/supervisor.rs index c67c9ca..675b0bb 100644 --- a/makima/src/daemon/api/supervisor.rs +++ b/makima/src/daemon/api/supervisor.rs @@ -83,6 +83,20 @@ pub struct AskQuestionRequest { pub question_type: String, } +/// Request to create an order for future work. +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CreateOrderRequest { + pub title: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub description: Option<String>, + pub priority: String, + pub order_type: String, + pub labels: serde_json::Value, + #[serde(skip_serializing_if = "Option::is_none")] + pub repository_url: Option<String>, +} + // Generic response type for JSON output #[derive(Deserialize, Serialize)] pub struct JsonValue(pub serde_json::Value); @@ -228,6 +242,18 @@ impl ApiClient { self.post("/api/v1/mesh/supervisor/questions", &req).await } + /// Poll for a question response by question_id. + /// + /// Used after a still_pending response from supervisor_ask. Blocks for up to + /// 5 minutes on the server side. Returns still_pending if no response yet. + pub async fn supervisor_poll_question( + &self, + question_id: Uuid, + ) -> Result<JsonValue, ApiError> { + self.get(&format!("/api/v1/mesh/supervisor/questions/{}/poll", question_id)) + .await + } + /// Advance contract to a new phase. /// /// When `confirmed` is false and phase_guard is enabled, returns a response with @@ -312,6 +338,11 @@ impl ApiClient { .await } + /// Create an order for future work from a directive task. + pub async fn create_order(&self, req: &CreateOrderRequest) -> Result<JsonValue, ApiError> { + self.post("/api/v1/mesh/supervisor/orders", req).await + } + /// Delete a task. pub async fn delete_task(&self, task_id: Uuid) -> Result<(), ApiError> { self.delete(&format!("/api/v1/mesh/tasks/{}", task_id)).await diff --git a/makima/src/daemon/cli/directive.rs b/makima/src/daemon/cli/directive.rs index 7c8451f..cc7b224 100644 --- a/makima/src/daemon/cli/directive.rs +++ b/makima/src/daemon/cli/directive.rs @@ -150,6 +150,33 @@ pub struct AskArgs { pub question_type: String, } +/// Arguments for create-order command. +#[derive(Args, Debug)] +pub struct CreateOrderArgs { + #[command(flatten)] + pub common: DirectiveArgs, + + /// Order title + #[arg(long)] + pub title: String, + + /// Order description + #[arg(long)] + pub description: Option<String>, + + /// Priority: critical, high, medium, low, none + #[arg(long, default_value = "medium")] + pub priority: String, + + /// Order type: spike or chore only + #[arg(long, default_value = "spike")] + pub order_type: String, + + /// Comma-separated labels + #[arg(long)] + pub labels: Option<String>, +} + /// Arguments for update command. #[derive(Args, Debug)] pub struct UpdateArgs { diff --git a/makima/src/daemon/cli/mod.rs b/makima/src/daemon/cli/mod.rs index 8063541..af6f885 100644 --- a/makima/src/daemon/cli/mod.rs +++ b/makima/src/daemon/cli/mod.rs @@ -252,6 +252,9 @@ pub enum DirectiveCommand { /// Ask a question and wait for user feedback Ask(directive::AskArgs), + + /// Create an order for future work (spike or chore only) + CreateOrder(directive::CreateOrderArgs), } impl Cli { diff --git a/makima/src/daemon/skills/directive.md b/makima/src/daemon/skills/directive.md index 02de836..dc1bce2 100644 --- a/makima/src/daemon/skills/directive.md +++ b/makima/src/daemon/skills/directive.md @@ -92,7 +92,7 @@ Options: - `--choices "opt1,opt2,opt3"` - Provide choices for the user to select from - `--context "<context>"` - Additional context to help the user understand the question - `--timeout <seconds>` - Wait timeout (default: 3600 = 1 hour) -- `--phaseguard` - Block indefinitely until the user responds (no timeout). Recommended for critical decisions during planning. +- `--phaseguard` - Wait indefinitely until the user responds (no timeout). The CLI automatically reconnects via polling every ~5 minutes to avoid HTTP timeout limits. Recommended for critical decisions during planning. - `--multi-select` - Allow the user to select multiple choices - `--non-blocking` - Return immediately without waiting for a response - `--question-type <general|phase_confirmation|contract_complete>` - Question type @@ -108,6 +108,31 @@ Options: makima directive ask "Should we use REST or GraphQL for the new API?" --choices "REST,GraphQL" --context "The existing codebase uses REST but the frontend team prefers GraphQL" --phaseguard ``` +### Create an Order (Future Work) +```bash +makima directive create-order --title "Order title" --order-type spike +``` +Creates an order for future work that is automatically linked to the current directive. Only `spike` and `chore` order types are allowed. + +Options: +- `--title "<title>"` - (Required) Title of the order +- `--description "<description>"` - Optional description of the work +- `--priority <critical|high|medium|low|none>` - Priority level (default: medium) +- `--order-type <spike|chore>` - Type of work (default: spike). Only spike and chore are allowed. +- `--labels "label1,label2"` - Optional comma-separated labels + +**When to use:** +- When you discover work that is out of scope for the current directive step but should be tracked +- When a spike reveals follow-up tasks that need to be done later +- When you identify technical debt or maintenance work during implementation +- When a chore (e.g., dependency update, config change) is needed but not part of the current goal + +**Example:** +```bash +makima directive create-order --title "Investigate flaky test in auth module" --order-type spike --priority high --description "The auth module tests intermittently fail on CI. Needs investigation." --labels "testing,auth" +makima directive create-order --title "Update deprecated serde API usage" --order-type chore --priority low --labels "tech-debt" +``` + ## Memory Commands Directives have an optional key-value memory system that persists across steps and planning cycles. Use memory to share context, decisions, and learned information between steps — so downstream tasks don't need to re-discover what earlier steps already figured out. diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs index b91781c..98690bb 100644 --- a/makima/src/orchestration/directive.rs +++ b/makima/src/orchestration/directive.rs @@ -1371,8 +1371,9 @@ If you need clarification from the user before finalizing the plan, you can ask makima directive ask "Confirm this approach?" --context "Additional context here" --phaseguard Use --phaseguard for questions that block progress (the question will wait indefinitely for a response). +The CLI automatically reconnects via polling every ~5 minutes to avoid HTTP timeout limits. Without --phaseguard, questions timeout based on the directive's reconcile mode: -- Reconcile ON: questions block indefinitely until answered +- Reconcile ON: questions wait indefinitely (with automatic reconnecting polls every ~5 min) - Reconcile OFF: questions timeout after 30 seconds with no response When to ask: @@ -1385,6 +1386,23 @@ Do NOT ask questions for: - Implementation details you can determine from the codebase - Standard engineering decisions with clear best practices - Trivial choices that do not significantly affect the outcome + +CREATING ORDERS FOR FUTURE WORK: +If you discover work that is out of scope for the current plan but should be tracked, create an order: + makima directive create-order --title "Order title" --order-type spike + makima directive create-order --title "Fix flaky test" --order-type chore --priority high --description "Details..." + +Only spike and chore types are allowed. The order is automatically linked to this directive. + +When to create orders: +- You discover follow-up work that is outside the current goal +- A spike reveals additional tasks that need future attention +- You identify technical debt or maintenance chores during planning +- Something needs investigation but is not blocking the current plan + +Do NOT create orders for: +- Work that should be a step in the current plan +- Tasks that are part of the current goal "#, title = directive.title, goal = directive.goal, diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index 0ea1a57..8c36500 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use uuid::Uuid; -use crate::db::models::{CreateTaskRequest, PendingQuestion, Task, TaskSummary, UpdateTaskRequest}; +use crate::db::models::{CreateOrderRequest, CreateTaskRequest, PendingQuestion, Task, TaskSummary, UpdateTaskRequest}; use crate::db::repository; use sqlx::PgPool; use crate::server::auth::Authenticated; @@ -95,7 +95,7 @@ fn default_question_timeout() -> i32 { } /// Response from asking a question. -#[derive(Debug, Serialize, ToSchema)] +#[derive(Debug, Serialize, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct AskQuestionResponse { /// The question ID for tracking @@ -104,6 +104,10 @@ pub struct AskQuestionResponse { pub response: Option<String>, /// Whether the question timed out pub timed_out: bool, + /// Whether the question is still pending (server-side timeout reached but question not removed). + /// The client should poll the poll endpoint to continue waiting. + #[serde(default)] + pub still_pending: bool, } /// Request to answer a supervisor question. @@ -1803,6 +1807,7 @@ pub async fn ask_question( question_id, response: None, timed_out: false, + still_pending: false, }), ).into_response(); } @@ -1815,8 +1820,9 @@ pub async fn ask_question( // - Directive tasks without reconcile mode: 30s default timeout // - Contract tasks: use requested timeout_seconds let timeout_secs = if use_phaseguard { - // Block indefinitely until user responds - u64::MAX / 2 + // Cap at 5 minutes per HTTP request (well under Claude Code's 10-min limit). + // The CLI will automatically reconnect via the poll endpoint. + 300 } else if is_directive_context && !directive_reconcile_mode { 30 } else { @@ -1844,13 +1850,28 @@ pub async fn ask_question( question_id, response: Some(response.response), timed_out: false, + still_pending: false, }), ).into_response(); } // Check timeout if start.elapsed() >= timeout_duration { - // Remove the pending question on timeout + if use_phaseguard { + // Phaseguard/reconcile: DON'T remove the pending question. + // Return still_pending so the CLI can reconnect via the poll endpoint. + return ( + StatusCode::OK, + Json(AskQuestionResponse { + question_id, + response: None, + timed_out: false, + still_pending: true, + }), + ).into_response(); + } + + // Non-phaseguard: remove the pending question on timeout state.remove_pending_question(question_id); // Clear pending question from supervisor state on timeout (Task 3.3) @@ -1865,6 +1886,125 @@ pub async fn ask_question( question_id, response: None, timed_out: true, + still_pending: false, + }), + ).into_response(); + } + + // Wait before polling again + tokio::time::sleep(poll_interval).await; + } +} + +/// Poll for a question response by question_id. +/// +/// Used by the CLI to reconnect after a still_pending response from ask_question. +/// Blocks for up to 5 minutes polling every 500ms. Returns still_pending if timeout +/// is reached without a response, allowing the CLI to reconnect again. +#[utoipa::path( + get, + path = "/api/v1/mesh/supervisor/questions/{question_id}/poll", + params( + ("question_id" = Uuid, Path, description = "The question ID to poll for"), + ), + responses( + (status = 200, description = "Question answered or still pending", body = AskQuestionResponse), + (status = 404, description = "Question not found"), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - not a supervisor"), + ), + security( + ("tool_key" = []) + ), + tag = "Mesh Supervisor" +)] +pub async fn poll_question( + State(state): State<SharedState>, + headers: HeaderMap, + Path(question_id): Path<Uuid>, +) -> impl IntoResponse { + let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { + Ok(ids) => ids, + Err(e) => return e.into_response(), + }; + + // Check if a response is already available + if let Some(response) = state.get_question_response(question_id) { + state.cleanup_question_response(question_id); + + // Clear pending question from supervisor state for contract context + let pool = state.db_pool.as_ref().unwrap(); + if let Ok(Some(task)) = repository::get_task_for_owner(pool, supervisor_id, owner_id).await { + if let Some(cid) = task.contract_id { + clear_pending_question(pool, cid, question_id).await; + } + } + + return ( + StatusCode::OK, + Json(AskQuestionResponse { + question_id, + response: Some(response.response), + timed_out: false, + still_pending: false, + }), + ).into_response(); + } + + // Check if the question exists at all (pending or response) + if !state.has_pending_question(question_id) { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Question not found or already answered")), + ).into_response(); + } + + // Block for up to 5 minutes polling every 500ms + let timeout_duration = std::time::Duration::from_secs(300); + let start = std::time::Instant::now(); + let poll_interval = std::time::Duration::from_millis(500); + + loop { + // Check if response has been submitted + if let Some(response) = state.get_question_response(question_id) { + state.cleanup_question_response(question_id); + + // Clear pending question from supervisor state for contract context + let pool = state.db_pool.as_ref().unwrap(); + if let Ok(Some(task)) = repository::get_task_for_owner(pool, supervisor_id, owner_id).await { + if let Some(cid) = task.contract_id { + clear_pending_question(pool, cid, question_id).await; + } + } + + return ( + StatusCode::OK, + Json(AskQuestionResponse { + question_id, + response: Some(response.response), + timed_out: false, + still_pending: false, + }), + ).into_response(); + } + + // Check if the question was removed (e.g., task deleted) + if !state.has_pending_question(question_id) { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Question no longer exists")), + ).into_response(); + } + + // Check timeout + if start.elapsed() >= timeout_duration { + return ( + StatusCode::OK, + Json(AskQuestionResponse { + question_id, + response: None, + timed_out: false, + still_pending: true, }), ).into_response(); } @@ -2952,3 +3092,162 @@ pub fn generate_restoration_context_message(context: &SupervisorRestorationConte message } + +// ============================================================================= +// Order Creation from Directive Tasks +// ============================================================================= + +/// Request to create an order from a directive task. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreateOrderForTaskRequest { + pub title: String, + #[serde(default)] + pub description: Option<String>, + #[serde(default = "default_order_priority")] + pub priority: String, + #[serde(default = "default_order_type")] + pub order_type: String, + #[serde(default = "default_order_labels")] + pub labels: serde_json::Value, + #[serde(default)] + pub repository_url: Option<String>, +} + +fn default_order_priority() -> String { + "medium".to_string() +} + +fn default_order_type() -> String { + "spike".to_string() +} + +fn default_order_labels() -> serde_json::Value { + serde_json::json!([]) +} + +/// Create an order for future work from a directive task. +/// +/// Only spike and chore order types are allowed. The order is automatically +/// linked to the directive associated with the calling task. +#[utoipa::path( + post, + path = "/api/v1/mesh/supervisor/orders", + request_body = CreateOrderForTaskRequest, + responses( + (status = 201, description = "Order created"), + (status = 400, description = "Invalid order type"), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - not a supervisor/directive task"), + (status = 500, description = "Internal server error"), + ), + tag = "Mesh Supervisor" +)] +pub async fn create_order_for_task( + State(state): State<SharedState>, + headers: HeaderMap, + Json(request): Json<CreateOrderForTaskRequest>, +) -> impl IntoResponse { + let (task_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { + Ok(ids) => ids, + Err(e) => return e.into_response(), + }; + + let pool = state.db_pool.as_ref().unwrap(); + + // Validate order_type is spike or chore + if request.order_type != "spike" && request.order_type != "chore" { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "INVALID_ORDER_TYPE", + "Only spike and chore order types are allowed from directive tasks", + )), + ) + .into_response(); + } + + // Get the task to find its directive_id + let task = match repository::get_task(pool, task_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!(error = %e, "Failed to get task"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to get task")), + ) + .into_response(); + } + }; + + let directive_id = match task.directive_id { + Some(id) => id, + None => { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "NO_DIRECTIVE", + "Task is not associated with a directive", + )), + ) + .into_response(); + } + }; + + // Determine repository_url: use request value, or fall back to directive's repository_url + let repository_url = if request.repository_url.is_some() { + request.repository_url + } else { + // Look up directive for its repository_url + match repository::get_directive_for_owner(pool, owner_id, directive_id).await { + Ok(Some(d)) => d.repository_url, + _ => None, + } + }; + + // Create the order + let order_req = CreateOrderRequest { + title: request.title, + description: request.description, + priority: Some(request.priority), + status: Some("open".to_string()), + order_type: Some(request.order_type), + labels: request.labels, + directive_id, + repository_url, + }; + + match repository::create_order(pool, owner_id, order_req).await { + Ok(order) => ( + StatusCode::CREATED, + Json(serde_json::json!({ + "id": order.id, + "title": order.title, + "description": order.description, + "priority": order.priority, + "status": order.status, + "orderType": order.order_type, + "directiveId": order.directive_id, + "labels": order.labels, + "repositoryUrl": order.repository_url, + "createdAt": order.created_at, + })), + ) + .into_response(), + Err(e) => { + tracing::error!(error = %e, "Failed to create order"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to create order")), + ) + .into_response() + } + } +} diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index 1ad3a8d..e0f8e7d 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -129,8 +129,11 @@ pub fn make_router(state: SharedState) -> Router { .route("/mesh/supervisor/tasks/{task_id}/merge", post(mesh_supervisor::merge_task)) .route("/mesh/supervisor/tasks/{task_id}/diff", get(mesh_supervisor::get_task_diff)) .route("/mesh/supervisor/pr", post(mesh_supervisor::create_pr)) + // Supervisor order creation endpoint + .route("/mesh/supervisor/orders", post(mesh_supervisor::create_order_for_task)) // Supervisor question endpoints .route("/mesh/supervisor/questions", post(mesh_supervisor::ask_question)) + .route("/mesh/supervisor/questions/{question_id}/poll", get(mesh_supervisor::poll_question)) .route("/mesh/questions", get(mesh_supervisor::list_pending_questions)) .route("/mesh/questions/{question_id}/answer", post(mesh_supervisor::answer_question)) // Mesh WebSocket endpoints diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs index 41c336e..15fec6b 100644 --- a/makima/src/server/state.rs +++ b/makima/src/server/state.rs @@ -910,6 +910,11 @@ impl AppState { self.pending_questions.get(&question_id).map(|entry| entry.value().clone()) } + /// Check if a pending question exists (either still pending or has a response ready). + pub fn has_pending_question(&self, question_id: Uuid) -> bool { + self.pending_questions.contains_key(&question_id) || self.question_responses.contains_key(&question_id) + } + /// Submit a response to a supervisor question. pub fn submit_question_response(&self, question_id: Uuid, response: String) -> bool { // Check if the question exists |
