diff options
Diffstat (limited to 'makima/src/orchestration/directive.rs')
| -rw-r--r-- | makima/src/orchestration/directive.rs | 94 |
1 files changed, 51 insertions, 43 deletions
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs index 736715d..020c2e4 100644 --- a/makima/src/orchestration/directive.rs +++ b/makima/src/orchestration/directive.rs @@ -580,20 +580,11 @@ impl DirectiveOrchestrator { for directive in directives { if let Err(e) = async { - // Atomically claim this directive for completion using a placeholder. - // This prevents a concurrent tick from also spawning a completion task. - let placeholder_id = Uuid::new_v4(); - let claimed = repository::claim_directive_for_completion( - &self.pool, - directive.id, - placeholder_id, - ) - .await?; - - if !claimed { + // Skip if already claimed (completion_task_id is set) + if directive.completion_task_id.is_some() { tracing::debug!( directive_id = %directive.id, - "Directive already claimed for completion — skipping" + "Directive already has a completion task — skipping" ); return Ok::<(), anyhow::Error>(()); } @@ -606,7 +597,6 @@ impl DirectiveOrchestrator { let step_tasks = repository::get_completed_step_tasks(&self.pool, directive.id).await?; if step_tasks.is_empty() { - let _ = repository::clear_completion_task(&self.pool, directive.id).await; return Ok(()); } @@ -640,6 +630,7 @@ impl DirectiveOrchestrator { base_branch, ); + // Create the task FIRST so we have a real task ID for the FK match self .spawn_completion_task( directive.id, @@ -652,6 +643,22 @@ impl DirectiveOrchestrator { .await { Ok(task_id) => { + // Atomically claim with the REAL task ID (satisfies FK constraint) + let claimed = repository::claim_directive_for_completion( + &self.pool, + directive.id, + task_id, + ) + .await?; + + if !claimed { + tracing::debug!( + directive_id = %directive.id, + "Directive already claimed for completion — task will be orphaned" + ); + return Ok(()); + } + let update = crate::db::models::UpdateDirectiveRequest { pr_branch: Some(directive_branch.clone()), ..Default::default() @@ -663,15 +670,13 @@ impl DirectiveOrchestrator { update, ) .await; - repository::assign_completion_task(&self.pool, directive.id, task_id).await?; } Err(e) => { tracing::warn!( directive_id = %directive.id, error = %e, - "Failed to spawn completion task — releasing claim" + "Failed to spawn completion task" ); - let _ = repository::clear_completion_task(&self.pool, directive.id).await; } } Ok(()) @@ -773,15 +778,8 @@ impl DirectiveOrchestrator { for directive in verify_directives { if let Err(e) = async { - let placeholder_id = Uuid::new_v4(); - let claimed = repository::claim_directive_for_completion( - &self.pool, - directive.id, - placeholder_id, - ) - .await?; - - if !claimed { + // Skip if already claimed + if directive.completion_task_id.is_some() { return Ok::<(), anyhow::Error>(()); } @@ -795,6 +793,7 @@ impl DirectiveOrchestrator { let base_branch = directive.base_branch.as_deref().unwrap_or("main"); let prompt = build_verification_prompt(&directive, pr_branch, base_branch); + // Create the task FIRST so we have a real task ID for the FK match self .spawn_completion_task( directive.id, @@ -807,15 +806,27 @@ impl DirectiveOrchestrator { .await { Ok(task_id) => { - repository::assign_completion_task(&self.pool, directive.id, task_id).await?; + // Atomically claim with the REAL task ID (satisfies FK constraint) + let claimed = repository::claim_directive_for_completion( + &self.pool, + directive.id, + task_id, + ) + .await?; + + if !claimed { + tracing::debug!( + directive_id = %directive.id, + "Directive already claimed for verification — task will be orphaned" + ); + } } Err(e) => { tracing::warn!( directive_id = %directive.id, error = %e, - "Failed to spawn verification task — releasing claim" + "Failed to spawn verification task" ); - let _ = repository::clear_completion_task(&self.pool, directive.id).await; } } Ok(()) @@ -906,9 +917,9 @@ impl DirectiveOrchestrator { /// This is the public entry point used by both the orchestrator tick and the /// manual "create PR" API handler. It encapsulates the full flow: /// 1. Validate the directive has completed step tasks -/// 2. Claim the directive for completion (returns error if already claimed) -/// 3. Build branch names and prompt -/// 4. Spawn the completion task and assign it +/// 2. Create the completion task (so we have a real task ID) +/// 3. Atomically claim the directive for completion with the real task ID +/// 4. Dispatch the task to a daemon /// /// Returns the created task ID on success. pub async fn trigger_completion_task( @@ -931,14 +942,6 @@ pub async fn trigger_completion_task( anyhow::bail!("No completed steps with tasks found"); } - // Claim for completion - let placeholder_id = Uuid::new_v4(); - let claimed = - repository::claim_directive_for_completion(pool, directive_id, placeholder_id).await?; - if !claimed { - anyhow::bail!("Directive already claimed for completion"); - } - let base_branch = directive.base_branch.as_deref().unwrap_or("main"); let directive_branch = format!( @@ -970,7 +973,7 @@ pub async fn trigger_completion_task( format!("PR: {}", directive.title) }; - // Create the completion task + // Create the completion task FIRST so we have a real task ID for the FK let req = CreateTaskRequest { contract_id: None, name: task_name, @@ -997,6 +1000,14 @@ pub async fn trigger_completion_task( let task = repository::create_task_for_owner(pool, owner_id, req).await?; + // Atomically claim the directive with the REAL task ID (satisfies FK constraint). + // This prevents concurrent ticks from also spawning a completion task. + let claimed = + repository::claim_directive_for_completion(pool, directive_id, task.id).await?; + if !claimed { + anyhow::bail!("Directive already claimed for completion"); + } + // Update pr_branch on the directive let update = crate::db::models::UpdateDirectiveRequest { pr_branch: Some(directive_branch), @@ -1004,9 +1015,6 @@ pub async fn trigger_completion_task( }; let _ = repository::update_directive_for_owner(pool, owner_id, directive_id, update).await; - // Assign the real task as the completion task - repository::assign_completion_task(pool, directive_id, task.id).await?; - // Try to dispatch to a daemon if let Some(daemon_id) = state.find_alternative_daemon(owner_id, &[]) { let update_req = crate::db::models::UpdateTaskRequest { |
