summaryrefslogtreecommitdiff
path: root/makima/src/orchestration/directive.rs
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/orchestration/directive.rs')
-rw-r--r--makima/src/orchestration/directive.rs94
1 files changed, 51 insertions, 43 deletions
diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs
index 736715d..020c2e4 100644
--- a/makima/src/orchestration/directive.rs
+++ b/makima/src/orchestration/directive.rs
@@ -580,20 +580,11 @@ impl DirectiveOrchestrator {
for directive in directives {
if let Err(e) = async {
- // Atomically claim this directive for completion using a placeholder.
- // This prevents a concurrent tick from also spawning a completion task.
- let placeholder_id = Uuid::new_v4();
- let claimed = repository::claim_directive_for_completion(
- &self.pool,
- directive.id,
- placeholder_id,
- )
- .await?;
-
- if !claimed {
+ // Skip if already claimed (completion_task_id is set)
+ if directive.completion_task_id.is_some() {
tracing::debug!(
directive_id = %directive.id,
- "Directive already claimed for completion — skipping"
+ "Directive already has a completion task — skipping"
);
return Ok::<(), anyhow::Error>(());
}
@@ -606,7 +597,6 @@ impl DirectiveOrchestrator {
let step_tasks = repository::get_completed_step_tasks(&self.pool, directive.id).await?;
if step_tasks.is_empty() {
- let _ = repository::clear_completion_task(&self.pool, directive.id).await;
return Ok(());
}
@@ -640,6 +630,7 @@ impl DirectiveOrchestrator {
base_branch,
);
+ // Create the task FIRST so we have a real task ID for the FK
match self
.spawn_completion_task(
directive.id,
@@ -652,6 +643,22 @@ impl DirectiveOrchestrator {
.await
{
Ok(task_id) => {
+ // Atomically claim with the REAL task ID (satisfies FK constraint)
+ let claimed = repository::claim_directive_for_completion(
+ &self.pool,
+ directive.id,
+ task_id,
+ )
+ .await?;
+
+ if !claimed {
+ tracing::debug!(
+ directive_id = %directive.id,
+ "Directive already claimed for completion — task will be orphaned"
+ );
+ return Ok(());
+ }
+
let update = crate::db::models::UpdateDirectiveRequest {
pr_branch: Some(directive_branch.clone()),
..Default::default()
@@ -663,15 +670,13 @@ impl DirectiveOrchestrator {
update,
)
.await;
- repository::assign_completion_task(&self.pool, directive.id, task_id).await?;
}
Err(e) => {
tracing::warn!(
directive_id = %directive.id,
error = %e,
- "Failed to spawn completion task — releasing claim"
+ "Failed to spawn completion task"
);
- let _ = repository::clear_completion_task(&self.pool, directive.id).await;
}
}
Ok(())
@@ -773,15 +778,8 @@ impl DirectiveOrchestrator {
for directive in verify_directives {
if let Err(e) = async {
- let placeholder_id = Uuid::new_v4();
- let claimed = repository::claim_directive_for_completion(
- &self.pool,
- directive.id,
- placeholder_id,
- )
- .await?;
-
- if !claimed {
+ // Skip if already claimed
+ if directive.completion_task_id.is_some() {
return Ok::<(), anyhow::Error>(());
}
@@ -795,6 +793,7 @@ impl DirectiveOrchestrator {
let base_branch = directive.base_branch.as_deref().unwrap_or("main");
let prompt = build_verification_prompt(&directive, pr_branch, base_branch);
+ // Create the task FIRST so we have a real task ID for the FK
match self
.spawn_completion_task(
directive.id,
@@ -807,15 +806,27 @@ impl DirectiveOrchestrator {
.await
{
Ok(task_id) => {
- repository::assign_completion_task(&self.pool, directive.id, task_id).await?;
+ // Atomically claim with the REAL task ID (satisfies FK constraint)
+ let claimed = repository::claim_directive_for_completion(
+ &self.pool,
+ directive.id,
+ task_id,
+ )
+ .await?;
+
+ if !claimed {
+ tracing::debug!(
+ directive_id = %directive.id,
+ "Directive already claimed for verification — task will be orphaned"
+ );
+ }
}
Err(e) => {
tracing::warn!(
directive_id = %directive.id,
error = %e,
- "Failed to spawn verification task — releasing claim"
+ "Failed to spawn verification task"
);
- let _ = repository::clear_completion_task(&self.pool, directive.id).await;
}
}
Ok(())
@@ -906,9 +917,9 @@ impl DirectiveOrchestrator {
/// This is the public entry point used by both the orchestrator tick and the
/// manual "create PR" API handler. It encapsulates the full flow:
/// 1. Validate the directive has completed step tasks
-/// 2. Claim the directive for completion (returns error if already claimed)
-/// 3. Build branch names and prompt
-/// 4. Spawn the completion task and assign it
+/// 2. Create the completion task (so we have a real task ID)
+/// 3. Atomically claim the directive for completion with the real task ID
+/// 4. Dispatch the task to a daemon
///
/// Returns the created task ID on success.
pub async fn trigger_completion_task(
@@ -931,14 +942,6 @@ pub async fn trigger_completion_task(
anyhow::bail!("No completed steps with tasks found");
}
- // Claim for completion
- let placeholder_id = Uuid::new_v4();
- let claimed =
- repository::claim_directive_for_completion(pool, directive_id, placeholder_id).await?;
- if !claimed {
- anyhow::bail!("Directive already claimed for completion");
- }
-
let base_branch = directive.base_branch.as_deref().unwrap_or("main");
let directive_branch = format!(
@@ -970,7 +973,7 @@ pub async fn trigger_completion_task(
format!("PR: {}", directive.title)
};
- // Create the completion task
+ // Create the completion task FIRST so we have a real task ID for the FK
let req = CreateTaskRequest {
contract_id: None,
name: task_name,
@@ -997,6 +1000,14 @@ pub async fn trigger_completion_task(
let task = repository::create_task_for_owner(pool, owner_id, req).await?;
+ // Atomically claim the directive with the REAL task ID (satisfies FK constraint).
+ // This prevents concurrent ticks from also spawning a completion task.
+ let claimed =
+ repository::claim_directive_for_completion(pool, directive_id, task.id).await?;
+ if !claimed {
+ anyhow::bail!("Directive already claimed for completion");
+ }
+
// Update pr_branch on the directive
let update = crate::db::models::UpdateDirectiveRequest {
pr_branch: Some(directive_branch),
@@ -1004,9 +1015,6 @@ pub async fn trigger_completion_task(
};
let _ = repository::update_directive_for_owner(pool, owner_id, directive_id, update).await;
- // Assign the real task as the completion task
- repository::assign_completion_task(pool, directive_id, task.id).await?;
-
// Try to dispatch to a daemon
if let Some(daemon_id) = state.find_alternative_daemon(owner_id, &[]) {
let update_req = crate::db::models::UpdateTaskRequest {