summaryrefslogtreecommitdiff
path: root/makima
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-02-07 23:48:51 +0000
committersoryu <soryu@soryu.co>2026-02-07 23:48:51 +0000
commitf0a92073702d614302deff5e83c14ffec3ae6db0 (patch)
tree4bb8bf86342deeeea6b801a3df9c1048c20f48d5 /makima
parent89ccd1375c2eaddd1f2b74a42b2d152c83b346e8 (diff)
downloadsoryu-f0a92073702d614302deff5e83c14ffec3ae6db0.tar.gz
soryu-f0a92073702d614302deff5e83c14ffec3ae6db0.zip
Fixes for directive chain creation
Diffstat (limited to 'makima')
-rw-r--r--makima/src/db/repository.rs56
-rw-r--r--makima/src/orchestration/directive.rs109
2 files changed, 158 insertions, 7 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index d50ef61..177aed3 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -5299,7 +5299,7 @@ pub async fn create_directive_chain(
sqlx::query_as::<_, DirectiveChain>(
r#"
INSERT INTO directive_chains (directive_id, generation, name, description, rationale, total_steps, status)
- VALUES ($1, $2, $3, $4, $5, $6, 'active')
+ VALUES ($1, $2, $3, $4, $5, $6, 'running')
RETURNING *
"#,
)
@@ -5346,6 +5346,47 @@ pub async fn create_chain_step(
.await
}
+/// Get a single chain step by ID.
+pub async fn get_chain_step(
+ pool: &PgPool,
+ step_id: Uuid,
+) -> Result<Option<ChainStep>, sqlx::Error> {
+ sqlx::query_as::<_, ChainStep>(
+ "SELECT * FROM chain_steps WHERE id = $1",
+ )
+ .bind(step_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Increment completed_steps counter on a directive chain.
+pub async fn increment_chain_completed_steps(
+ pool: &PgPool,
+ chain_id: Uuid,
+) -> Result<(), sqlx::Error> {
+ sqlx::query(
+ "UPDATE directive_chains SET completed_steps = completed_steps + 1, updated_at = NOW() WHERE id = $1",
+ )
+ .bind(chain_id)
+ .execute(pool)
+ .await?;
+ Ok(())
+}
+
+/// Increment failed_steps counter on a directive chain.
+pub async fn increment_chain_failed_steps(
+ pool: &PgPool,
+ chain_id: Uuid,
+) -> Result<(), sqlx::Error> {
+ sqlx::query(
+ "UPDATE directive_chains SET failed_steps = failed_steps + 1, updated_at = NOW() WHERE id = $1",
+ )
+ .bind(chain_id)
+ .execute(pool)
+ .await?;
+ Ok(())
+}
+
/// Update a chain step's status with automatic timestamp management.
pub async fn update_step_status(
pool: &PgPool,
@@ -5574,6 +5615,19 @@ pub async fn list_evaluations_for_step(
.await
}
+/// Get a single directive evaluation by ID.
+pub async fn get_directive_evaluation(
+ pool: &PgPool,
+ evaluation_id: Uuid,
+) -> Result<Option<DirectiveEvaluation>, sqlx::Error> {
+ sqlx::query_as::<_, DirectiveEvaluation>(
+ "SELECT * FROM directive_evaluations WHERE id = $1",
+ )
+ .bind(evaluation_id)
+ .fetch_optional(pool)
+ .await
+}
+
/// Create a directive event.
pub async fn create_directive_event(
pool: &PgPool,
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs
index 4b75b4a..0dbdbf3 100644
--- a/makima/src/orchestration/directive.rs
+++ b/makima/src/orchestration/directive.rs
@@ -13,11 +13,13 @@ use crate::server::state::SharedState;
/// A single step in the chain plan produced by the planning supervisor.
#[derive(Debug, Deserialize)]
+#[serde(rename_all = "snake_case")]
struct ChainPlanStep {
name: String,
description: String,
+ #[serde(alias = "taskPlan")]
task_plan: String,
- #[serde(default)]
+ #[serde(default, alias = "dependsOn")]
depends_on: Vec<String>, // names of steps this depends on
}
@@ -361,6 +363,22 @@ async fn process_planning_result(
contract_id: Uuid,
owner_id: Uuid,
) -> Result<(), String> {
+ // Idempotency guard: only process if directive is still in "planning" status.
+ // Both the contract-completion and task-completion paths can fire concurrently.
+ let current = repository::get_directive(pool, directive.id)
+ .await
+ .map_err(|e| format!("Failed to re-fetch directive: {}", e))?;
+ if let Some(ref d) = current {
+ if d.status != "planning" {
+ tracing::info!(
+ directive_id = %directive.id,
+ status = %d.status,
+ "Skipping process_planning_result: directive no longer in planning status"
+ );
+ return Ok(());
+ }
+ }
+
// Get contract files to find the chain plan
let files = repository::list_files_in_contract(pool, contract_id, owner_id)
.await
@@ -405,9 +423,20 @@ async fn process_planning_result(
return Ok(());
};
- let chain_plan: ChainPlan = serde_json::from_str(&plan_json).map_err(|e| {
- format!("Failed to parse chain plan JSON: {}", e)
- })?;
+ let chain_plan: ChainPlan = match serde_json::from_str(&plan_json) {
+ Ok(plan) => plan,
+ Err(e) => {
+ tracing::warn!(
+ directive_id = %directive.id,
+ error = %e,
+ "Failed to parse chain plan JSON, marking directive failed"
+ );
+ repository::update_directive_status(pool, directive.id, "failed")
+ .await
+ .map_err(|e| format!("Failed to update directive status: {}", e))?;
+ return Ok(());
+ }
+ };
if chain_plan.steps.is_empty() {
tracing::warn!(
@@ -420,6 +449,30 @@ async fn process_planning_result(
return Ok(());
}
+ // Create chain and steps — if anything fails, mark directive as failed
+ match create_chain_and_steps(pool, state, directive, &chain_plan, owner_id).await {
+ Ok(()) => Ok(()),
+ Err(e) => {
+ tracing::error!(
+ directive_id = %directive.id,
+ error = %e,
+ "Failed to create chain/steps, marking directive failed"
+ );
+ let _ = repository::update_directive_status(pool, directive.id, "failed").await;
+ Err(e)
+ }
+ }
+}
+
+/// Inner helper: create chain, steps, set current chain, transition to active, and advance.
+/// Extracted so that `process_planning_result` can catch errors and mark the directive failed.
+async fn create_chain_and_steps(
+ pool: &PgPool,
+ state: &SharedState,
+ directive: &Directive,
+ chain_plan: &ChainPlan,
+ owner_id: Uuid,
+) -> Result<(), String> {
// Create chain
let chain = repository::create_directive_chain(
pool,
@@ -571,6 +624,8 @@ async fn on_step_completed(
.await
.map_err(|e| format!("Failed to update step status: {}", e))?;
+ let _ = repository::increment_chain_failed_steps(pool, step.chain_id).await;
+
tracing::info!(
directive_id = %directive_id,
step_id = %step.id,
@@ -678,12 +733,31 @@ async fn dispatch_step(
.await
.map_err(|e| format!("Failed to set contract directive fields: {}", e))?;
- // Build the task plan
- let task_plan = step
+ // Build the task plan, prepending rework instructions if this is a rework cycle
+ let mut task_plan = step
.task_plan
.clone()
.unwrap_or_else(|| format!("Execute step: {}", step.name));
+ if let Some(eval_id) = step.last_evaluation_id {
+ if let Ok(Some(evaluation)) = repository::get_directive_evaluation(pool, eval_id).await {
+ if let Some(ref rework) = evaluation.rework_instructions {
+ task_plan = format!(
+ "IMPORTANT — REWORK REQUIRED (attempt #{}):\n\
+ The previous attempt was evaluated and did NOT pass.\n\
+ Feedback: {}\n\
+ Rework instructions: {}\n\n\
+ ---\n\n\
+ Original task plan:\n{}",
+ step.rework_count + 1,
+ evaluation.summary_feedback,
+ rework,
+ task_plan,
+ );
+ }
+ }
+ }
+
// Create supervisor task
let supervisor_task = repository::create_task_for_owner(
pool,
@@ -1115,6 +1189,8 @@ async fn on_monitoring_completed(
.await
.map_err(|e| format!("Failed to update step status: {}", e))?;
+ let _ = repository::increment_chain_completed_steps(pool, step.chain_id).await;
+
let _ = repository::create_directive_event(
pool,
directive_id,
@@ -1151,6 +1227,21 @@ async fn process_monitoring_result(
return Ok(());
};
+ // Idempotency guard: re-fetch step and only process if still "evaluating".
+ let current_step = repository::get_chain_step(pool, step.id)
+ .await
+ .map_err(|e| format!("Failed to re-fetch step: {}", e))?;
+ if let Some(ref s) = current_step {
+ if s.status != "evaluating" {
+ tracing::info!(
+ step_id = %step.id,
+ status = %s.status,
+ "Skipping process_monitoring_result: step no longer in evaluating status"
+ );
+ return Ok(());
+ }
+ }
+
let directive = repository::get_directive(pool, directive_id)
.await
.map_err(|e| format!("Failed to get directive: {}", e))?
@@ -1195,6 +1286,8 @@ async fn process_monitoring_result(
.await
.map_err(|e| format!("Failed to update step status: {}", e))?;
+ let _ = repository::increment_chain_completed_steps(pool, step.chain_id).await;
+
let _ = repository::create_directive_event(
pool,
directive_id,
@@ -1276,6 +1369,8 @@ async fn process_monitoring_result(
.await
.map_err(|e| format!("Failed to update step status: {}", e))?;
+ let _ = repository::increment_chain_completed_steps(pool, step.chain_id).await;
+
advance_chain(pool, state, &directive, owner_id).await
} else {
// Evaluation failed — check rework budget
@@ -1294,6 +1389,8 @@ async fn process_monitoring_result(
.await
.map_err(|e| format!("Failed to update step status: {}", e))?;
+ let _ = repository::increment_chain_failed_steps(pool, step.chain_id).await;
+
advance_chain(pool, state, &directive, owner_id).await
} else {
tracing::info!(