summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-02-12 19:12:36 +0000
committersoryu <soryu@soryu.co>2026-02-12 19:12:36 +0000
commita6d40de1be0f8ae1ba938acb7875ebcf2b3b18e7 (patch)
tree9d657c2d77fca9c9a033cf86efcbf06a4d3eb8da
parentffbd8fed748ff4b60c53ee6ac54d7cf0548a7048 (diff)
downloadsoryu-a6d40de1be0f8ae1ba938acb7875ebcf2b3b18e7.tar.gz
soryu-a6d40de1be0f8ae1ba938acb7875ebcf2b3b18e7.zip
Fix PR creation task for directives to be atomic
-rw-r--r--makima/src/db/repository.rs20
-rw-r--r--makima/src/orchestration/directive.rs25
2 files changed, 43 insertions, 2 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index 127f4cd..a79818f 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -5229,7 +5229,25 @@ pub async fn get_completion_tasks_to_check(
.await
}
-/// Assign a completion task to a directive.
+/// Atomically claim a directive for completion by setting a placeholder completion_task_id.
+/// Returns true if the claim was successful (no other task already claimed it).
+pub async fn claim_directive_for_completion(
+ pool: &PgPool,
+ directive_id: Uuid,
+ task_id: Uuid,
+) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"UPDATE directives SET completion_task_id = $2, updated_at = NOW()
+ WHERE id = $1 AND completion_task_id IS NULL"#,
+ )
+ .bind(directive_id)
+ .bind(task_id)
+ .execute(pool)
+ .await?;
+ Ok(result.rows_affected() > 0)
+}
+
+/// Assign a completion task to a directive (unconditional update).
pub async fn assign_completion_task(
pool: &PgPool,
directive_id: Uuid,
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs
index 37cc5e7..d2fcbfd 100644
--- a/makima/src/orchestration/directive.rs
+++ b/makima/src/orchestration/directive.rs
@@ -537,6 +537,24 @@ impl DirectiveOrchestrator {
let directives = repository::get_idle_directives_needing_completion(&self.pool).await?;
for directive in directives {
+ // Atomically claim this directive for completion using a placeholder.
+ // This prevents a concurrent tick from also spawning a completion task.
+ let placeholder_id = Uuid::new_v4();
+ let claimed = repository::claim_directive_for_completion(
+ &self.pool,
+ directive.id,
+ placeholder_id,
+ )
+ .await?;
+
+ if !claimed {
+ tracing::debug!(
+ directive_id = %directive.id,
+ "Directive already claimed for completion — skipping"
+ );
+ continue;
+ }
+
tracing::info!(
directive_id = %directive.id,
title = %directive.title,
@@ -545,6 +563,8 @@ impl DirectiveOrchestrator {
let step_tasks = repository::get_completed_step_tasks(&self.pool, directive.id).await?;
if step_tasks.is_empty() {
+ // Release the claim since there's nothing to complete
+ let _ = repository::clear_completion_task(&self.pool, directive.id).await;
continue;
}
@@ -603,14 +623,17 @@ impl DirectiveOrchestrator {
update,
)
.await;
+ // Replace placeholder with the real task ID
repository::assign_completion_task(&self.pool, directive.id, task_id).await?;
}
Err(e) => {
tracing::warn!(
directive_id = %directive.id,
error = %e,
- "Failed to spawn completion task"
+ "Failed to spawn completion task — releasing claim"
);
+ // Release the claim so it can be retried on the next tick
+ let _ = repository::clear_completion_task(&self.pool, directive.id).await;
}
}
}