diff options
| author | soryu <soryu@soryu.co> | 2026-02-12 19:12:36 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-02-12 19:12:36 +0000 |
| commit | a6d40de1be0f8ae1ba938acb7875ebcf2b3b18e7 (patch) | |
| tree | 9d657c2d77fca9c9a033cf86efcbf06a4d3eb8da | |
| parent | ffbd8fed748ff4b60c53ee6ac54d7cf0548a7048 (diff) | |
| download | soryu-a6d40de1be0f8ae1ba938acb7875ebcf2b3b18e7.tar.gz soryu-a6d40de1be0f8ae1ba938acb7875ebcf2b3b18e7.zip | |
Fix PR creation task for directives to be atomic
| -rw-r--r-- | makima/src/db/repository.rs | 20 | ||||
| -rw-r--r-- | makima/src/orchestration/directive.rs | 25 |
2 files changed, 43 insertions, 2 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 127f4cd..a79818f 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -5229,7 +5229,25 @@ pub async fn get_completion_tasks_to_check( .await } -/// Assign a completion task to a directive. +/// Atomically claim a directive for completion by setting a placeholder completion_task_id. +/// Returns true if the claim was successful (no other task already claimed it). +pub async fn claim_directive_for_completion( + pool: &PgPool, + directive_id: Uuid, + task_id: Uuid, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#"UPDATE directives SET completion_task_id = $2, updated_at = NOW() + WHERE id = $1 AND completion_task_id IS NULL"#, + ) + .bind(directive_id) + .bind(task_id) + .execute(pool) + .await?; + Ok(result.rows_affected() > 0) +} + +/// Assign a completion task to a directive (unconditional update). pub async fn assign_completion_task( pool: &PgPool, directive_id: Uuid, diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs index 37cc5e7..d2fcbfd 100644 --- a/makima/src/orchestration/directive.rs +++ b/makima/src/orchestration/directive.rs @@ -537,6 +537,24 @@ impl DirectiveOrchestrator { let directives = repository::get_idle_directives_needing_completion(&self.pool).await?; for directive in directives { + // Atomically claim this directive for completion using a placeholder. + // This prevents a concurrent tick from also spawning a completion task. + let placeholder_id = Uuid::new_v4(); + let claimed = repository::claim_directive_for_completion( + &self.pool, + directive.id, + placeholder_id, + ) + .await?; + + if !claimed { + tracing::debug!( + directive_id = %directive.id, + "Directive already claimed for completion — skipping" + ); + continue; + } + tracing::info!( directive_id = %directive.id, title = %directive.title, @@ -545,6 +563,8 @@ impl DirectiveOrchestrator { let step_tasks = repository::get_completed_step_tasks(&self.pool, directive.id).await?; if step_tasks.is_empty() { + // Release the claim since there's nothing to complete + let _ = repository::clear_completion_task(&self.pool, directive.id).await; continue; } @@ -603,14 +623,17 @@ impl DirectiveOrchestrator { update, ) .await; + // Replace placeholder with the real task ID repository::assign_completion_task(&self.pool, directive.id, task_id).await?; } Err(e) => { tracing::warn!( directive_id = %directive.id, error = %e, - "Failed to spawn completion task" + "Failed to spawn completion task — releasing claim" ); + // Release the claim so it can be retried on the next tick + let _ = repository::clear_completion_task(&self.pool, directive.id).await; } } } |
