summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--makima/src/bin/makima.rs10
-rw-r--r--makima/src/daemon/api/directive.rs20
-rw-r--r--makima/src/daemon/cli/mod.rs3
-rw-r--r--makima/src/db/models.rs7
-rw-r--r--makima/src/db/repository.rs20
-rw-r--r--makima/src/orchestration/directive.rs258
-rw-r--r--makima/src/server/handlers/directives.rs61
-rw-r--r--makima/src/server/mod.rs1
-rw-r--r--makima/src/server/openapi.rs4
9 files changed, 278 insertions, 106 deletions
diff --git a/makima/src/bin/makima.rs b/makima/src/bin/makima.rs
index 92fdae6..8115387 100644
--- a/makima/src/bin/makima.rs
+++ b/makima/src/bin/makima.rs
@@ -787,6 +787,16 @@ async fn run_directive(
.await?;
println!("{}", serde_json::to_string(&result.0)?);
}
+ DirectiveCommand::SubmitPlan(args) => {
+ let client = ApiClient::new(args.api_url, args.api_key)?;
+ // Read plan JSON from stdin
+ let mut plan_json = String::new();
+ io::stdin().read_to_string(&mut plan_json)?;
+ let result = client
+ .directive_submit_plan(args.directive_id, &plan_json)
+ .await?;
+ println!("{}", serde_json::to_string(&result.0)?);
+ }
}
Ok(())
diff --git a/makima/src/daemon/api/directive.rs b/makima/src/daemon/api/directive.rs
index 3589e78..c51882b 100644
--- a/makima/src/daemon/api/directive.rs
+++ b/makima/src/daemon/api/directive.rs
@@ -71,6 +71,26 @@ impl ApiClient {
.await
}
+ /// Submit a chain plan for a directive.
+ pub async fn directive_submit_plan(
+ &self,
+ directive_id: Uuid,
+ plan_json: &str,
+ ) -> Result<JsonValue, ApiError> {
+ #[derive(serde::Serialize)]
+ #[serde(rename_all = "camelCase")]
+ struct SubmitPlanBody {
+ plan: String,
+ }
+ self.post(
+ &format!("/api/v1/directives/{}/submit-plan", directive_id),
+ &SubmitPlanBody {
+ plan: plan_json.to_string(),
+ },
+ )
+ .await
+ }
+
/// List evaluations for a step.
pub async fn directive_evaluations(
&self,
diff --git a/makima/src/daemon/cli/mod.rs b/makima/src/daemon/cli/mod.rs
index c9a8c6f..954f219 100644
--- a/makima/src/daemon/cli/mod.rs
+++ b/makima/src/daemon/cli/mod.rs
@@ -231,6 +231,9 @@ pub enum DirectiveCommand {
/// List evaluation history for a step
Evaluations(directive::EvaluationsArgs),
+
+ /// Submit a chain plan for a directive (reads JSON from stdin)
+ SubmitPlan(DirectiveArgs),
}
impl Cli {
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs
index eff2df0..6045c7d 100644
--- a/makima/src/db/models.rs
+++ b/makima/src/db/models.rs
@@ -2824,6 +2824,13 @@ pub struct CreateDirectiveRequest {
pub base_branch: Option<String>,
}
+/// Request to submit a chain plan for a directive.
+#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct SubmitPlanRequest {
+ pub plan: String,
+}
+
/// Request to update an existing directive.
#[derive(Debug, Clone, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index 177aed3..ed56fff 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -5279,6 +5279,26 @@ pub async fn set_directive_current_chain(
.await
}
+/// Increment the chain_generation_count on a directive (without setting current_chain_id).
+pub async fn increment_chain_generation_count(
+ pool: &PgPool,
+ directive_id: Uuid,
+) -> Result<Option<Directive>, sqlx::Error> {
+ sqlx::query_as::<_, Directive>(
+ r#"
+ UPDATE directives
+ SET chain_generation_count = chain_generation_count + 1,
+ version = version + 1,
+ updated_at = NOW()
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(directive_id)
+ .fetch_optional(pool)
+ .await
+}
+
/// Create a new directive chain.
pub async fn create_directive_chain(
pool: &PgPool,
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs
index 0dbdbf3..80e2a8b 100644
--- a/makima/src/orchestration/directive.rs
+++ b/makima/src/orchestration/directive.rs
@@ -182,6 +182,62 @@ pub async fn init_directive(
Ok(updated)
}
+/// Submit a chain plan for a directive via the CLI/API (instead of file-based extraction).
+pub async fn submit_plan(
+ pool: &PgPool,
+ state: &SharedState,
+ owner_id: Uuid,
+ directive_id: Uuid,
+ plan_json: &str,
+) -> Result<Directive, String> {
+ // 1. Get directive, verify status
+ let directive = repository::get_directive_for_owner(pool, directive_id, owner_id)
+ .await
+ .map_err(|e| format!("Failed to get directive: {}", e))?
+ .ok_or("Directive not found")?;
+
+ if directive.status != "planning" {
+ return Err(format!(
+ "Directive must be in 'planning' status to submit a plan, current status: '{}'",
+ directive.status
+ ));
+ }
+
+ // 2. Idempotency: if current_chain_id already set, return existing directive
+ if directive.current_chain_id.is_some() {
+ tracing::info!(
+ directive_id = %directive_id,
+ "Plan already submitted (current_chain_id set), returning existing directive"
+ );
+ return Ok(directive);
+ }
+
+ // 3. Parse the plan JSON
+ let chain_plan: ChainPlan = serde_json::from_str(plan_json)
+ .map_err(|e| format!("Failed to parse chain plan JSON: {}", e))?;
+
+ if chain_plan.steps.is_empty() {
+ return Err("Chain plan has no steps".to_string());
+ }
+
+ // 4. Create chain and steps, transition to active
+ create_chain_and_steps(pool, state, &directive, &chain_plan, owner_id).await?;
+
+ // 5. Re-fetch and return the updated directive
+ let updated = repository::get_directive(pool, directive_id)
+ .await
+ .map_err(|e| format!("Failed to re-fetch directive: {}", e))?
+ .ok_or("Directive not found after plan submission")?;
+
+ tracing::info!(
+ directive_id = %directive_id,
+ step_count = chain_plan.steps.len(),
+ "Plan submitted via API, directive now active"
+ );
+
+ Ok(updated)
+}
+
/// Called when any task completes — checks if it's directive-related and advances.
/// Called when a contract's status is updated to "completed" via the API.
/// This is the primary entry point for directive orchestration because supervisor
@@ -206,9 +262,9 @@ pub async fn on_contract_completed(
tracing::info!(
directive_id = %directive.id,
contract_id = %contract.id,
- "Directive orchestrator contract completed, processing plan"
+ "Directive orchestrator contract completed, handling planning completion"
);
- process_planning_result(pool, state, &directive, contract.id, owner_id).await?;
+ handle_planning_completion(pool, state, &directive, owner_id).await?;
} else {
tracing::warn!(
contract_id = %contract.id,
@@ -347,120 +403,113 @@ async fn on_planning_completed(
return Ok(());
}
- let Some(contract_id) = task.contract_id else {
- return Ok(());
- };
-
- process_planning_result(pool, state, directive, contract_id, owner_id).await
+ handle_planning_completion(pool, state, directive, owner_id).await
}
-/// Core logic: read plan from contract files, create chain and steps, advance.
-/// Called from both `on_planning_completed` (task path) and `on_contract_completed` (API path).
-async fn process_planning_result(
+/// Handle planning contract/task completion.
+/// Checks if a plan was submitted via the CLI; if not, retries or fails.
+async fn handle_planning_completion(
pool: &PgPool,
- state: &SharedState,
+ _state: &SharedState,
directive: &Directive,
- contract_id: Uuid,
owner_id: Uuid,
) -> Result<(), String> {
- // Idempotency guard: only process if directive is still in "planning" status.
- // Both the contract-completion and task-completion paths can fire concurrently.
+ // Re-fetch directive to check latest state
let current = repository::get_directive(pool, directive.id)
.await
- .map_err(|e| format!("Failed to re-fetch directive: {}", e))?;
- if let Some(ref d) = current {
- if d.status != "planning" {
- tracing::info!(
- directive_id = %directive.id,
- status = %d.status,
- "Skipping process_planning_result: directive no longer in planning status"
- );
- return Ok(());
- }
- }
-
- // Get contract files to find the chain plan
- let files = repository::list_files_in_contract(pool, contract_id, owner_id)
- .await
- .map_err(|e| format!("Failed to list contract files: {}", e))?;
-
- // Find the chain plan file
- let plan_file = files.iter().find(|f| {
- let name_lower = f.name.to_lowercase();
- name_lower.contains("chain") || name_lower.contains("plan")
- });
-
- let plan_file = plan_file.or_else(|| files.first());
+ .map_err(|e| format!("Failed to re-fetch directive: {}", e))?
+ .ok_or("Directive not found")?;
- let Some(plan_file) = plan_file else {
- tracing::warn!(
+ // Idempotency: only process if still in "planning" status
+ if current.status != "planning" {
+ tracing::info!(
directive_id = %directive.id,
- "No plan file found in planning contract, marking directive failed"
+ status = %current.status,
+ "Skipping handle_planning_completion: directive no longer in planning status"
);
- repository::update_directive_status(pool, directive.id, "failed")
- .await
- .map_err(|e| format!("Failed to update directive status: {}", e))?;
return Ok(());
- };
-
- // Read the full file to get the body content
- let full_file = repository::get_file(pool, plan_file.id)
- .await
- .map_err(|e| format!("Failed to get plan file: {}", e))?
- .ok_or("Plan file not found")?;
+ }
- // Extract JSON from the file body elements
- let plan_json = extract_plan_json(&full_file.body);
+ // If plan was already submitted via CLI (current_chain_id is set), nothing to do
+ if current.current_chain_id.is_some() {
+ tracing::info!(
+ directive_id = %directive.id,
+ "Plan already submitted via CLI, skipping handle_planning_completion"
+ );
+ return Ok(());
+ }
- let Some(plan_json) = plan_json else {
+ // No plan was submitted — check retry budget
+ let max_regenerations = current.max_chain_regenerations.unwrap_or(2);
+ if current.chain_generation_count < max_regenerations {
tracing::warn!(
directive_id = %directive.id,
- "Could not extract chain plan JSON from file body, marking directive failed"
+ attempt = current.chain_generation_count + 1,
+ max = max_regenerations,
+ "Planning completed without plan submission, retrying"
);
- repository::update_directive_status(pool, directive.id, "failed")
+
+ let _ = repository::create_directive_event(
+ pool,
+ directive.id,
+ None,
+ None,
+ "planning_retry",
+ "warn",
+ Some(&serde_json::json!({
+ "attempt": current.chain_generation_count + 1,
+ "maxRegenerations": max_regenerations,
+ "reason": "Planning contract completed without submitting a plan"
+ })),
+ "system",
+ None,
+ )
+ .await;
+
+ // Increment generation count
+ repository::increment_chain_generation_count(pool, directive.id)
.await
- .map_err(|e| format!("Failed to update directive status: {}", e))?;
- return Ok(());
- };
+ .map_err(|e| format!("Failed to increment chain generation count: {}", e))?;
- let chain_plan: ChainPlan = match serde_json::from_str(&plan_json) {
- Ok(plan) => plan,
- Err(e) => {
- tracing::warn!(
- directive_id = %directive.id,
- error = %e,
- "Failed to parse chain plan JSON, marking directive failed"
- );
- repository::update_directive_status(pool, directive.id, "failed")
- .await
- .map_err(|e| format!("Failed to update directive status: {}", e))?;
- return Ok(());
- }
- };
+ // Reset to draft so init_directive can be called again
+ repository::update_directive_status(pool, directive.id, "draft")
+ .await
+ .map_err(|e| format!("Failed to reset directive status: {}", e))?;
- if chain_plan.steps.is_empty() {
- tracing::warn!(
+ // Re-init planning
+ init_directive(pool, _state, owner_id, directive.id).await?;
+
+ Ok(())
+ } else {
+ tracing::error!(
directive_id = %directive.id,
- "Chain plan has no steps, marking directive failed"
+ attempts = current.chain_generation_count,
+ max = max_regenerations,
+ "Planning failed: max regeneration attempts exhausted without plan submission"
);
+
+ let _ = repository::create_directive_event(
+ pool,
+ directive.id,
+ None,
+ None,
+ "planning_failed",
+ "error",
+ Some(&serde_json::json!({
+ "attempts": current.chain_generation_count,
+ "maxRegenerations": max_regenerations,
+ "reason": "Max chain regeneration attempts exhausted without plan submission"
+ })),
+ "system",
+ None,
+ )
+ .await;
+
repository::update_directive_status(pool, directive.id, "failed")
.await
.map_err(|e| format!("Failed to update directive status: {}", e))?;
- return Ok(());
- }
- // Create chain and steps — if anything fails, mark directive as failed
- match create_chain_and_steps(pool, state, directive, &chain_plan, owner_id).await {
- Ok(()) => Ok(()),
- Err(e) => {
- tracing::error!(
- directive_id = %directive.id,
- error = %e,
- "Failed to create chain/steps, marking directive failed"
- );
- let _ = repository::update_directive_status(pool, directive.id, "failed").await;
- Err(e)
- }
+ Ok(())
}
}
@@ -848,18 +897,15 @@ fn build_planning_prompt(directive: &Directive) -> String {
format!(
r#"You are planning the execution of a directive.
-DIRECTIVE: {}
-GOAL: {}
-REQUIREMENTS: {}
-ACCEPTANCE CRITERIA: {}
-CONSTRAINTS: {}
+DIRECTIVE: {title}
+GOAL: {goal}
+REQUIREMENTS: {requirements}
+ACCEPTANCE CRITERIA: {acceptance_criteria}
+CONSTRAINTS: {constraints}
Your job is to decompose this goal into a sequence of executable steps.
Each step will become a separate contract with its own supervisor.
-Write a JSON plan to a contract file named "chain-plan" using:
- makima contract create-file "chain-plan" < plan.json
-
The JSON format:
{{
"steps": [
@@ -880,13 +926,17 @@ Rules:
- Be specific in task_plan — include file paths, function names, and acceptance criteria where possible.
- Keep the number of steps reasonable (3-10 typically).
-After writing the plan file, mark the contract as complete using:
+Submit your plan by piping the JSON to stdin:
+ echo '<your_json_plan>' | makima directive submit-plan --directive-id {directive_id}
+
+After submitting the plan, mark the contract as complete:
makima supervisor complete"#,
- directive.title,
- directive.goal,
- serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(),
- serde_json::to_string_pretty(&directive.acceptance_criteria).unwrap_or_default(),
- serde_json::to_string_pretty(&directive.constraints).unwrap_or_default(),
+ title = directive.title,
+ goal = directive.goal,
+ requirements = serde_json::to_string_pretty(&directive.requirements).unwrap_or_default(),
+ acceptance_criteria = serde_json::to_string_pretty(&directive.acceptance_criteria).unwrap_or_default(),
+ constraints = serde_json::to_string_pretty(&directive.constraints).unwrap_or_default(),
+ directive_id = directive.id,
)
}
diff --git a/makima/src/server/handlers/directives.rs b/makima/src/server/handlers/directives.rs
index 65f32d5..3f62a33 100644
--- a/makima/src/server/handlers/directives.rs
+++ b/makima/src/server/handlers/directives.rs
@@ -13,7 +13,7 @@ use std::collections::HashMap;
use crate::db::models::{
ChainStep, ChainStepWithContract, ChainWithSteps, CreateDirectiveRequest, Directive,
DirectiveChain, DirectiveListResponse, DirectiveWithChains, EvaluationListResponse,
- StepContractSummary, UpdateDirectiveRequest,
+ StepContractSummary, SubmitPlanRequest, UpdateDirectiveRequest,
};
use crate::db::repository::{self, RepositoryError};
use crate::orchestration;
@@ -724,3 +724,62 @@ pub async fn list_evaluations(
}
}
}
+
+/// Submit a chain plan for a directive.
+#[utoipa::path(
+ post,
+ path = "/api/v1/directives/{id}/submit-plan",
+ params(
+ ("id" = Uuid, Path, description = "Directive ID")
+ ),
+ request_body = SubmitPlanRequest,
+ responses(
+ (status = 200, description = "Plan submitted, directive active", body = Directive),
+ (status = 400, description = "Invalid request or status", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Directive not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Directives"
+)]
+pub async fn submit_plan(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+ Json(req): Json<SubmitPlanRequest>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ match orchestration::directive::submit_plan(pool, &state, auth.owner_id, id, &req.plan).await {
+ Ok(directive) => Json(directive).into_response(),
+ Err(e) if e.contains("not found") || e.contains("Not found") => (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", e)),
+ )
+ .into_response(),
+ Err(e) if e.contains("must be in 'planning'") || e.contains("no steps") => (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new("INVALID_REQUEST", e)),
+ )
+ .into_response(),
+ Err(e) => {
+ tracing::error!("Failed to submit plan for directive {}: {}", id, e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("SUBMIT_PLAN_FAILED", e)),
+ )
+ .into_response()
+ }
+ }
+}
diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs
index c8242ae..0cad050 100644
--- a/makima/src/server/mod.rs
+++ b/makima/src/server/mod.rs
@@ -184,6 +184,7 @@ pub fn make_router(state: SharedState) -> Router {
.route("/directives/{id}/start", post(directives::start_directive))
.route("/directives/{id}/chains", get(directives::list_chains))
.route("/directives/{id}/chains/{chain_id}", get(directives::get_chain))
+ .route("/directives/{id}/submit-plan", post(directives::submit_plan))
.route("/directives/{id}/steps/{step_id}/evaluate", post(directives::evaluate_step))
.route("/directives/{id}/steps/{step_id}/evaluations", get(directives::list_evaluations))
// Contract supervisor resume endpoints
diff --git a/makima/src/server/openapi.rs b/makima/src/server/openapi.rs
index e680c07..888269f 100644
--- a/makima/src/server/openapi.rs
+++ b/makima/src/server/openapi.rs
@@ -17,7 +17,7 @@ use crate::db::models::{
MergeSkipRequest, MergeStartRequest, MergeStatusResponse, MeshChatConversation,
MeshChatHistoryResponse, MeshChatMessageRecord, RepositoryHistoryEntry,
RepositoryHistoryListResponse, RepositorySuggestionsQuery, SendMessageRequest,
- StepContractSummary, Task,
+ StepContractSummary, SubmitPlanRequest, Task,
TaskEventListResponse, TaskListResponse, TaskSummary, TaskWithSubtasks, TranscriptEntry,
UpdateContractRequest, UpdateDirectiveRequest, UpdateFileRequest, UpdateTaskRequest,
};
@@ -119,6 +119,7 @@ use crate::server::messages::{ApiError, AudioEncoding, StartMessage, StopMessage
directives::get_chain,
directives::evaluate_step,
directives::list_evaluations,
+ directives::submit_plan,
),
components(
schemas(
@@ -215,6 +216,7 @@ use crate::server::messages::{ApiError, AudioEncoding, StartMessage, StopMessage
StepContractSummary,
CreateDirectiveRequest,
UpdateDirectiveRequest,
+ SubmitPlanRequest,
DirectiveEvaluation,
DirectiveEvent,
EvaluationListResponse,