diff options
| -rw-r--r-- | makima/src/bin/makima.rs | 10 | ||||
| -rw-r--r-- | makima/src/daemon/api/directive.rs | 20 | ||||
| -rw-r--r-- | makima/src/daemon/cli/mod.rs | 3 | ||||
| -rw-r--r-- | makima/src/db/models.rs | 7 | ||||
| -rw-r--r-- | makima/src/db/repository.rs | 20 | ||||
| -rw-r--r-- | makima/src/orchestration/directive.rs | 258 | ||||
| -rw-r--r-- | makima/src/server/handlers/directives.rs | 61 | ||||
| -rw-r--r-- | makima/src/server/mod.rs | 1 | ||||
| -rw-r--r-- | makima/src/server/openapi.rs | 4 |
9 files changed, 278 insertions, 106 deletions
diff --git a/makima/src/bin/makima.rs b/makima/src/bin/makima.rs index 92fdae6..8115387 100644 --- a/makima/src/bin/makima.rs +++ b/makima/src/bin/makima.rs @@ -787,6 +787,16 @@ async fn run_directive( .await?; println!("{}", serde_json::to_string(&result.0)?); } + DirectiveCommand::SubmitPlan(args) => { + let client = ApiClient::new(args.api_url, args.api_key)?; + // Read plan JSON from stdin + let mut plan_json = String::new(); + io::stdin().read_to_string(&mut plan_json)?; + let result = client + .directive_submit_plan(args.directive_id, &plan_json) + .await?; + println!("{}", serde_json::to_string(&result.0)?); + } } Ok(()) diff --git a/makima/src/daemon/api/directive.rs b/makima/src/daemon/api/directive.rs index 3589e78..c51882b 100644 --- a/makima/src/daemon/api/directive.rs +++ b/makima/src/daemon/api/directive.rs @@ -71,6 +71,26 @@ impl ApiClient { .await } + /// Submit a chain plan for a directive. + pub async fn directive_submit_plan( + &self, + directive_id: Uuid, + plan_json: &str, + ) -> Result<JsonValue, ApiError> { + #[derive(serde::Serialize)] + #[serde(rename_all = "camelCase")] + struct SubmitPlanBody { + plan: String, + } + self.post( + &format!("/api/v1/directives/{}/submit-plan", directive_id), + &SubmitPlanBody { + plan: plan_json.to_string(), + }, + ) + .await + } + /// List evaluations for a step. pub async fn directive_evaluations( &self, diff --git a/makima/src/daemon/cli/mod.rs b/makima/src/daemon/cli/mod.rs index c9a8c6f..954f219 100644 --- a/makima/src/daemon/cli/mod.rs +++ b/makima/src/daemon/cli/mod.rs @@ -231,6 +231,9 @@ pub enum DirectiveCommand { /// List evaluation history for a step Evaluations(directive::EvaluationsArgs), + + /// Submit a chain plan for a directive (reads JSON from stdin) + SubmitPlan(DirectiveArgs), } impl Cli { diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index eff2df0..6045c7d 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -2824,6 +2824,13 @@ pub struct CreateDirectiveRequest { pub base_branch: Option<String>, } +/// Request to submit a chain plan for a directive. +#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SubmitPlanRequest { + pub plan: String, +} + /// Request to update an existing directive. #[derive(Debug, Clone, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 177aed3..ed56fff 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -5279,6 +5279,26 @@ pub async fn set_directive_current_chain( .await } +/// Increment the chain_generation_count on a directive (without setting current_chain_id). +pub async fn increment_chain_generation_count( + pool: &PgPool, + directive_id: Uuid, +) -> Result<Option<Directive>, sqlx::Error> { + sqlx::query_as::<_, Directive>( + r#" + UPDATE directives + SET chain_generation_count = chain_generation_count + 1, + version = version + 1, + updated_at = NOW() + WHERE id = $1 + RETURNING * + "#, + ) + .bind(directive_id) + .fetch_optional(pool) + .await +} + /// Create a new directive chain. pub async fn create_directive_chain( pool: &PgPool, diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs index 0dbdbf3..80e2a8b 100644 --- a/makima/src/orchestration/directive.rs +++ b/makima/src/orchestration/directive.rs @@ -182,6 +182,62 @@ pub async fn init_directive( Ok(updated) } +/// Submit a chain plan for a directive via the CLI/API (instead of file-based extraction). +pub async fn submit_plan( + pool: &PgPool, + state: &SharedState, + owner_id: Uuid, + directive_id: Uuid, + plan_json: &str, +) -> Result<Directive, String> { + // 1. Get directive, verify status + let directive = repository::get_directive_for_owner(pool, directive_id, owner_id) + .await + .map_err(|e| format!("Failed to get directive: {}", e))? + .ok_or("Directive not found")?; + + if directive.status != "planning" { + return Err(format!( + "Directive must be in 'planning' status to submit a plan, current status: '{}'", + directive.status + )); + } + + // 2. Idempotency: if current_chain_id already set, return existing directive + if directive.current_chain_id.is_some() { + tracing::info!( + directive_id = %directive_id, + "Plan already submitted (current_chain_id set), returning existing directive" + ); + return Ok(directive); + } + + // 3. Parse the plan JSON + let chain_plan: ChainPlan = serde_json::from_str(plan_json) + .map_err(|e| format!("Failed to parse chain plan JSON: {}", e))?; + + if chain_plan.steps.is_empty() { + return Err("Chain plan has no steps".to_string()); + } + + // 4. Create chain and steps, transition to active + create_chain_and_steps(pool, state, &directive, &chain_plan, owner_id).await?; + + // 5. Re-fetch and return the updated directive + let updated = repository::get_directive(pool, directive_id) + .await + .map_err(|e| format!("Failed to re-fetch directive: {}", e))? + .ok_or("Directive not found after plan submission")?; + + tracing::info!( + directive_id = %directive_id, + step_count = chain_plan.steps.len(), + "Plan submitted via API, directive now active" + ); + + Ok(updated) +} + /// Called when any task completes — checks if it's directive-related and advances. /// Called when a contract's status is updated to "completed" via the API. /// This is the primary entry point for directive orchestration because supervisor @@ -206,9 +262,9 @@ pub async fn on_contract_completed( tracing::info!( directive_id = %directive.id, contract_id = %contract.id, - "Directive orchestrator contract completed, processing plan" + "Directive orchestrator contract completed, handling planning completion" ); - process_planning_result(pool, state, &directive, contract.id, owner_id).await?; + handle_planning_completion(pool, state, &directive, owner_id).await?; } else { tracing::warn!( contract_id = %contract.id, @@ -347,120 +403,113 @@ async fn on_planning_completed( return Ok(()); } - let Some(contract_id) = task.contract_id else { - return Ok(()); - }; - - process_planning_result(pool, state, directive, contract_id, owner_id).await + handle_planning_completion(pool, state, directive, owner_id).await } -/// Core logic: read plan from contract files, create chain and steps, advance. -/// Called from both `on_planning_completed` (task path) and `on_contract_completed` (API path). -async fn process_planning_result( +/// Handle planning contract/task completion. +/// Checks if a plan was submitted via the CLI; if not, retries or fails. +async fn handle_planning_completion( pool: &PgPool, - state: &SharedState, + _state: &SharedState, directive: &Directive, - contract_id: Uuid, owner_id: Uuid, ) -> Result<(), String> { - // Idempotency guard: only process if directive is still in "planning" status. - // Both the contract-completion and task-completion paths can fire concurrently. + // Re-fetch directive to check latest state let current = repository::get_directive(pool, directive.id) .await - .map_err(|e| format!("Failed to re-fetch directive: {}", e))?; - if let Some(ref d) = current { - if d.status != "planning" { - tracing::info!( - directive_id = %directive.id, - status = %d.status, - "Skipping process_planning_result: directive no longer in planning status" - ); - return Ok(()); - } - } - - // Get contract files to find the chain plan - let files = repository::list_files_in_contract(pool, contract_id, owner_id) - .await - .map_err(|e| format!("Failed to list contract files: {}", e))?; - - // Find the chain plan file - let plan_file = files.iter().find(|f| { - let name_lower = f.name.to_lowercase(); - name_lower.contains("chain") || name_lower.contains("plan") - }); - - let plan_file = plan_file.or_else(|| files.first()); + .map_err(|e| format!("Failed to re-fetch directive: {}", e))? + .ok_or("Directive not found")?; - let Some(plan_file) = plan_file else { - tracing::warn!( + // Idempotency: only process if still in "planning" status + if current.status != "planning" { + tracing::info!( directive_id = %directive.id, - "No plan file found in planning contract, marking directive failed" + status = %current.status, + "Skipping handle_planning_completion: directive no longer in planning status" ); - repository::update_directive_status(pool, directive.id, "failed") - .await - .map_err(|e| format!("Failed to update directive status: {}", e))?; return Ok(()); - }; - - // Read the full file to get the body content - let full_file = repository::get_file(pool, plan_file.id) - .await - .map_err(|e| format!("Failed to get plan file: {}", e))? - .ok_or("Plan file not found")?; + } - // Extract JSON from the file body elements - let plan_json = extract_plan_json(&full_file.body); + // If plan was already submitted via CLI (current_chain_id is set), nothing to do + if current.current_chain_id.is_some() { + tracing::info!( + directive_id = %directive.id, + "Plan already submitted via CLI, skipping handle_planning_completion" + ); + return Ok(()); + } - let Some(plan_json) = plan_json else { + // No plan was submitted — check retry budget + let max_regenerations = current.max_chain_regenerations.unwrap_or(2); + if current.chain_generation_count < max_regenerations { tracing::warn!( directive_id = %directive.id, - "Could not extract chain plan JSON from file body, marking directive failed" + attempt = current.chain_generation_count + 1, + max = max_regenerations, + "Planning completed without plan submission, retrying" ); - repository::update_directive_status(pool, directive.id, "failed") + + let _ = repository::create_directive_event( + pool, + directive.id, + None, + None, + "planning_retry", + "warn", + Some(&serde_json::json!({ + "attempt": current.chain_generation_count + 1, + "maxRegenerations": max_regenerations, + "reason": "Planning contract completed without submitting a plan" + })), + "system", + None, + ) + .await; + + // Increment generation count + repository::increment_chain_generation_count(pool, directive.id) .await - .map_err(|e| format!("Failed to update directive status: {}", e))?; - return Ok(()); - }; + .map_err(|e| format!("Failed to increment chain generation count: {}", e))?; - let chain_plan: ChainPlan = match serde_json::from_str(&plan_json) { - Ok(plan) => plan, - Err(e) => { - tracing::warn!( - directive_id = %directive.id, - error = %e, - "Failed to parse chain plan JSON, marking directive failed" - ); - repository::update_directive_status(pool, directive.id, "failed") - .await - .map_err(|e| format!("Failed to update directive status: {}", e))?; - return Ok(()); - } - }; + // Reset to draft so init_directive can be called again + repository::update_directive_status(pool, directive.id, "draft") + .await + .map_err(|e| format!("Failed to reset directive status: {}", e))?; - if chain_plan.steps.is_empty() { - tracing::warn!( + // Re-init planning + init_directive(pool, _state, owner_id, directive.id).await?; + + Ok(()) + } else { + tracing::error!( directive_id = %directive.id, - "Chain plan has no steps, marking directive failed" + attempts = current.chain_generation_count, + max = max_regenerations, + "Planning failed: max regeneration attempts exhausted without plan submission" ); + + let _ = repository::create_directive_event( + pool, + directive.id, + None, + None, + "planning_failed", + "error", + Some(&serde_json::json!({ + "attempts": current.chain_generation_count, + "maxRegenerations": max_regenerations, + "reason": "Max chain regeneration attempts exhausted without plan submission" + })), + "system", + None, + ) + .await; + repository::update_directive_status(pool, directive.id, "failed") .await .map_err(|e| format!("Failed to update directive status: {}", e))?; - return Ok(()); - } - // Create chain and steps — if anything fails, mark directive as failed - match create_chain_and_steps(pool, state, directive, &chain_plan, owner_id).await { - Ok(()) => Ok(()), - Err(e) => { - tracing::error!( - directive_id = %directive.id, - error = %e, - "Failed to create chain/steps, marking directive failed" - ); - let _ = repository::update_directive_status(pool, directive.id, "failed").await; - Err(e) - } + Ok(()) } } @@ -848,18 +897,15 @@ fn build_planning_prompt(directive: &Directive) -> String { format!( r#"You are planning the execution of a directive. -DIRECTIVE: {} -GOAL: {} -REQUIREMENTS: {} -ACCEPTANCE CRITERIA: {} -CONSTRAINTS: {} +DIRECTIVE: {title} +GOAL: {goal} +REQUIREMENTS: {requirements} +ACCEPTANCE CRITERIA: {acceptance_criteria} +CONSTRAINTS: {constraints} Your job is to decompose this goal into a sequence of executable steps. Each step will become a separate contract with its own supervisor. -Write a JSON plan to a contract file named "chain-plan" using: - makima contract create-file "chain-plan" < plan.json - The JSON format: {{ "steps": [ @@ -880,13 +926,17 @@ Rules: - Be specific in task_plan — include file paths, function names, and acceptance criteria where possible. - Keep the number of steps reasonable (3-10 typically). -After writing the plan file, mark the contract as complete using: +Submit your plan by piping the JSON to stdin: + echo '<your_json_plan>' | makima directive submit-plan --directive-id {directive_id} + +After submitting the plan, mark the contract as complete: makima supervisor complete"#, - directive.title, - directive.goal, - serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(), - serde_json::to_string_pretty(&directive.acceptance_criteria).unwrap_or_default(), - serde_json::to_string_pretty(&directive.constraints).unwrap_or_default(), + title = directive.title, + goal = directive.goal, + requirements = serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(), + acceptance_criteria = serde_json::to_string_pretty(&directive.acceptance_criteria).unwrap_or_default(), + constraints = serde_json::to_string_pretty(&directive.constraints).unwrap_or_default(), + directive_id = directive.id, ) } diff --git a/makima/src/server/handlers/directives.rs b/makima/src/server/handlers/directives.rs index 65f32d5..3f62a33 100644 --- a/makima/src/server/handlers/directives.rs +++ b/makima/src/server/handlers/directives.rs @@ -13,7 +13,7 @@ use std::collections::HashMap; use crate::db::models::{ ChainStep, ChainStepWithContract, ChainWithSteps, CreateDirectiveRequest, Directive, DirectiveChain, DirectiveListResponse, DirectiveWithChains, EvaluationListResponse, - StepContractSummary, UpdateDirectiveRequest, + StepContractSummary, SubmitPlanRequest, UpdateDirectiveRequest, }; use crate::db::repository::{self, RepositoryError}; use crate::orchestration; @@ -724,3 +724,62 @@ pub async fn list_evaluations( } } } + +/// Submit a chain plan for a directive. +#[utoipa::path( + post, + path = "/api/v1/directives/{id}/submit-plan", + params( + ("id" = Uuid, Path, description = "Directive ID") + ), + request_body = SubmitPlanRequest, + responses( + (status = 200, description = "Plan submitted, directive active", body = Directive), + (status = 400, description = "Invalid request or status", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Directive not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Directives" +)] +pub async fn submit_plan( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, + Json(req): Json<SubmitPlanRequest>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match orchestration::directive::submit_plan(pool, &state, auth.owner_id, id, &req.plan).await { + Ok(directive) => Json(directive).into_response(), + Err(e) if e.contains("not found") || e.contains("Not found") => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", e)), + ) + .into_response(), + Err(e) if e.contains("must be in 'planning'") || e.contains("no steps") => ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("INVALID_REQUEST", e)), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to submit plan for directive {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("SUBMIT_PLAN_FAILED", e)), + ) + .into_response() + } + } +} diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index c8242ae..0cad050 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -184,6 +184,7 @@ pub fn make_router(state: SharedState) -> Router { .route("/directives/{id}/start", post(directives::start_directive)) .route("/directives/{id}/chains", get(directives::list_chains)) .route("/directives/{id}/chains/{chain_id}", get(directives::get_chain)) + .route("/directives/{id}/submit-plan", post(directives::submit_plan)) .route("/directives/{id}/steps/{step_id}/evaluate", post(directives::evaluate_step)) .route("/directives/{id}/steps/{step_id}/evaluations", get(directives::list_evaluations)) // Contract supervisor resume endpoints diff --git a/makima/src/server/openapi.rs b/makima/src/server/openapi.rs index e680c07..888269f 100644 --- a/makima/src/server/openapi.rs +++ b/makima/src/server/openapi.rs @@ -17,7 +17,7 @@ use crate::db::models::{ MergeSkipRequest, MergeStartRequest, MergeStatusResponse, MeshChatConversation, MeshChatHistoryResponse, MeshChatMessageRecord, RepositoryHistoryEntry, RepositoryHistoryListResponse, RepositorySuggestionsQuery, SendMessageRequest, - StepContractSummary, Task, + StepContractSummary, SubmitPlanRequest, Task, TaskEventListResponse, TaskListResponse, TaskSummary, TaskWithSubtasks, TranscriptEntry, UpdateContractRequest, UpdateDirectiveRequest, UpdateFileRequest, UpdateTaskRequest, }; @@ -119,6 +119,7 @@ use crate::server::messages::{ApiError, AudioEncoding, StartMessage, StopMessage directives::get_chain, directives::evaluate_step, directives::list_evaluations, + directives::submit_plan, ), components( schemas( @@ -215,6 +216,7 @@ use crate::server::messages::{ApiError, AudioEncoding, StartMessage, StopMessage StepContractSummary, CreateDirectiveRequest, UpdateDirectiveRequest, + SubmitPlanRequest, DirectiveEvaluation, DirectiveEvent, EvaluationListResponse, |
