diff options
| author | soryu <soryu@soryu.co> | 2026-02-12 03:18:27 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-02-12 03:18:27 +0000 |
| commit | e03ac942b97255f01cb98f3a6c927da18e001b18 (patch) | |
| tree | 8e747a6a82bd6f91185eabf591de2d8eb77d263f /makima/src | |
| parent | 355f10964c4dbec24a244a00caba5c17ed23fc65 (diff) | |
| download | soryu-e03ac942b97255f01cb98f3a6c927da18e001b18.tar.gz soryu-e03ac942b97255f01cb98f3a6c927da18e001b18.zip | |
Add depends_on for directive tasks
# Conflicts:
# makima/src/orchestration/directive.rs
Diffstat (limited to 'makima/src')
| -rw-r--r-- | makima/src/db/models.rs | 8 | ||||
| -rw-r--r-- | makima/src/db/repository.rs | 103 | ||||
| -rw-r--r-- | makima/src/orchestration/directive.rs | 55 | ||||
| -rw-r--r-- | makima/src/server/handlers/directives.rs | 72 |
4 files changed, 225 insertions, 13 deletions
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index 169f468..66c0a30 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -2717,7 +2717,6 @@ pub struct Directive { pub memory_enabled: bool, pub goal_updated_at: DateTime<Utc>, pub started_at: Option<DateTime<Utc>>, - pub memory_enabled: bool, pub version: i32, pub created_at: DateTime<Utc>, pub updated_at: DateTime<Utc>, @@ -2820,6 +2819,13 @@ pub struct UpdateGoalRequest { pub goal: String, } +/// Response for cleanup_directive_tasks. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CleanupTasksResponse { + pub deleted: i64, +} + /// Request to create a directive step. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 95460f7..127f4cd 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -5105,6 +5105,68 @@ pub async fn delete_directive_for_owner( Ok(result.rows_affected() > 0) } +/// Clean up terminal tasks associated with a directive. +/// +/// Deletes tasks in terminal states (completed, failed, merged, done, interrupted) +/// that belong to this directive, excluding tasks currently referenced by +/// `completion_task_id` or `orchestrator_task_id` on the directive. +/// NULLs out `task_id` on directive_steps for deleted tasks. +pub async fn cleanup_directive_tasks( + pool: &PgPool, + owner_id: Uuid, + directive_id: Uuid, +) -> Result<i64, sqlx::Error> { + // NULL out task_id on steps that reference terminal tasks we're about to delete + sqlx::query( + r#" + UPDATE directive_steps + SET task_id = NULL + WHERE directive_id = $1 + AND task_id IS NOT NULL + AND task_id IN ( + SELECT t.id FROM tasks t + WHERE t.directive_id = $1 + AND t.owner_id = $2 + AND t.status IN ('completed', 'failed', 'merged', 'done', 'interrupted') + AND t.id NOT IN ( + SELECT COALESCE(d.completion_task_id, '00000000-0000-0000-0000-000000000000') + FROM directives d WHERE d.id = $1 + UNION + SELECT COALESCE(d.orchestrator_task_id, '00000000-0000-0000-0000-000000000000') + FROM directives d WHERE d.id = $1 + ) + ) + "#, + ) + .bind(directive_id) + .bind(owner_id) + .execute(pool) + .await?; + + // Delete terminal tasks not currently referenced by the directive + let result = sqlx::query( + r#" + DELETE FROM tasks + WHERE directive_id = $1 + AND owner_id = $2 + AND status IN ('completed', 'failed', 'merged', 'done', 'interrupted') + AND id NOT IN ( + SELECT COALESCE(d.completion_task_id, '00000000-0000-0000-0000-000000000000') + FROM directives d WHERE d.id = $1 + UNION + SELECT COALESCE(d.orchestrator_task_id, '00000000-0000-0000-0000-000000000000') + FROM directives d WHERE d.id = $1 + ) + "#, + ) + .bind(directive_id) + .bind(owner_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() as i64) +} + // ============================================================================= // Directive Completion Helpers // ============================================================================= @@ -5499,6 +5561,7 @@ pub struct StepForDispatch { pub task_plan: Option<String>, pub order_index: i32, pub generation: i32, + pub depends_on: Vec<Uuid>, // Directive fields pub owner_id: Uuid, pub directive_title: String, @@ -5521,6 +5584,7 @@ pub async fn get_ready_steps_for_dispatch( ds.task_plan, ds.order_index, ds.generation, + ds.depends_on, d.owner_id, d.title AS directive_title, d.repository_url, @@ -5538,6 +5602,45 @@ pub async fn get_ready_steps_for_dispatch( .await } +/// Task info for a dependency step (step → linked task). +#[derive(Debug, Clone, sqlx::FromRow)] +pub struct DependencyTaskInfo { + pub step_id: Uuid, + pub task_id: Uuid, + pub task_name: String, +} + +/// Resolve dependency step UUIDs to their linked task IDs and names. +/// Returns results in the same order as the input `depends_on` slice. +pub async fn get_step_dependency_tasks( + pool: &PgPool, + depends_on: &[Uuid], +) -> Result<Vec<DependencyTaskInfo>, sqlx::Error> { + if depends_on.is_empty() { + return Ok(vec![]); + } + let rows = sqlx::query_as::<_, DependencyTaskInfo>( + r#" + SELECT ds.id AS step_id, t.id AS task_id, t.name AS task_name + FROM directive_steps ds + JOIN tasks t ON t.id = ds.task_id + WHERE ds.id = ANY($1) + "#, + ) + .bind(depends_on) + .fetch_all(pool) + .await?; + + // Re-order to match input ordering + let mut ordered = Vec::with_capacity(depends_on.len()); + for dep_id in depends_on { + if let Some(row) = rows.iter().find(|r| r.step_id == *dep_id) { + ordered.push(row.clone()); + } + } + Ok(ordered) +} + /// A running step joined with its task's current status. #[derive(Debug, Clone, sqlx::FromRow)] pub struct RunningStepWithTask { diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs index cb3983a..37cc5e7 100644 --- a/makima/src/orchestration/directive.rs +++ b/makima/src/orchestration/directive.rs @@ -46,7 +46,7 @@ impl DirectiveOrchestrator { // Load memories if memory is enabled for this directive let memories = if directive.memory_enabled { - match repository::list_directive_memories(&self.pool, directive.id).await { + match repository::list_directive_memories(&self.pool, directive.id, None).await { Ok(m) => m, Err(e) => { tracing::warn!( @@ -98,6 +98,11 @@ impl DirectiveOrchestrator { "Dispatching execution task for ready step" ); + // Resolve dependency steps to their task IDs for worktree continuation + let dep_tasks = + repository::get_step_dependency_tasks(&self.pool, &step.depends_on).await?; + let continue_from_task_id = dep_tasks.first().map(|d| d.task_id); + let task_plan = step .task_plan .as_deref() @@ -105,7 +110,7 @@ impl DirectiveOrchestrator { // Load memories if memory is enabled for this directive let memory_context = if step.memory_enabled { - match repository::list_directive_memories(&self.pool, step.directive_id).await { + match repository::list_directive_memories(&self.pool, step.directive_id, None).await { Ok(memories) if !memories.is_empty() => { format!("\n\nMEMORY CONTEXT (from previous planning/execution cycles):\n{}\n", format_memories_for_prompt(&memories)) @@ -124,10 +129,37 @@ impl DirectiveOrchestrator { String::new() }; + // Build merge instructions for additional dependencies (beyond the first) + let merge_preamble = if dep_tasks.len() > 1 { + use crate::daemon::worktree::{sanitize_name, short_uuid}; + let merge_lines: Vec<String> = dep_tasks[1..] + .iter() + .map(|d| { + let branch = format!( + "makima/task-{}-{}", + sanitize_name(&d.task_name), + short_uuid(d.task_id) + ); + format!("git merge origin/{} --no-edit", branch) + }) + .collect(); + format!( + "IMPORTANT — MERGE DEPENDENCY BRANCHES FIRST:\n\ + This step continues from one dependency's worktree, but also depends on \ + additional branches. Before starting work, run:\n\ + ```\ngit fetch origin\n{}\n```\n\ + Resolve any merge conflicts sensibly, then proceed.\n\n", + merge_lines.join("\n"), + ) + } else { + String::new() + }; + let plan = format!( "You are executing a step in directive \"{directive_title}\".\n\n\ STEP: {step_name}\n\ DESCRIPTION: {description}\n\n\ + {merge_preamble}\ INSTRUCTIONS:\n{task_plan}\n\ {memory_context}\ When done, the system will automatically mark this step as completed.\n\ @@ -135,6 +167,7 @@ impl DirectiveOrchestrator { directive_title = step.directive_title, step_name = step.step_name, description = step.step_description.as_deref().unwrap_or("(none)"), + merge_preamble = merge_preamble, task_plan = task_plan, memory_context = memory_context, ); @@ -148,6 +181,7 @@ impl DirectiveOrchestrator { plan, step.repository_url.as_deref(), step.base_branch.as_deref(), + continue_from_task_id, ) .await { @@ -281,7 +315,7 @@ impl DirectiveOrchestrator { // Load memories if memory is enabled for this directive let memories = if directive.memory_enabled { - match repository::list_directive_memories(&self.pool, directive.id).await { + match repository::list_directive_memories(&self.pool, directive.id, None).await { Ok(m) => m, Err(e) => { tracing::warn!( @@ -374,6 +408,7 @@ impl DirectiveOrchestrator { plan: String, repo_url: Option<&str>, base_branch: Option<&str>, + continue_from_task_id: Option<Uuid>, ) -> Result<(), anyhow::Error> { let req = CreateTaskRequest { contract_id: None, @@ -389,7 +424,7 @@ impl DirectiveOrchestrator { merge_mode: None, target_repo_path: None, completion_action: Some("branch".to_string()), - continue_from_task_id: None, + continue_from_task_id, copy_files: None, checkpoint_sha: None, branched_from_task_id: None, @@ -454,7 +489,7 @@ impl DirectiveOrchestrator { is_orchestrator: false, target_repo_path: None, completion_action: updated_task.completion_action.clone(), - continue_from_task_id: None, + continue_from_task_id: updated_task.continue_from_task_id, copy_files: None, contract_id: None, is_supervisor: false, @@ -658,9 +693,10 @@ impl DirectiveOrchestrator { fn format_memories_for_prompt(memories: &[DirectiveMemory]) -> String { let mut out = String::new(); for memory in memories { + let cat = memory.category.as_deref().unwrap_or("other"); out.push_str(&format!( - "- [{}] ({}): {}\n", - memory.category, memory.source, memory.content + "- [{}] {}: {}\n", + cat, memory.key, memory.value )); } out @@ -729,6 +765,11 @@ Submit steps: Or batch: makima directive batch-add-steps --json '[{{"name":"...","description":"...","taskPlan":"...","dependsOn":[],"orderIndex":0}}]' +DEPENDENCY WORKTREE CONTINUATION: +When a step has dependsOn, it automatically continues from the first dependency's worktree (inheriting +committed and uncommitted changes). If there are multiple dependencies, the first provides the base worktree +and additional dependency branches are merged in before work starts. Use this for incremental work chains. + IMPORTANT: Each step's taskPlan must be self-contained. The executing instance won't have your planning context. "#, title = directive.title, diff --git a/makima/src/server/handlers/directives.rs b/makima/src/server/handlers/directives.rs index f624d82..585899e 100644 --- a/makima/src/server/handlers/directives.rs +++ b/makima/src/server/handlers/directives.rs @@ -10,10 +10,10 @@ use serde::Deserialize; use uuid::Uuid; use crate::db::models::{ - BatchSetDirectiveMemoryRequest, CreateDirectiveRequest, CreateDirectiveStepRequest, - Directive, DirectiveListResponse, DirectiveMemory, DirectiveMemoryListResponse, - DirectiveStep, DirectiveWithSteps, SetDirectiveMemoryRequest, UpdateDirectiveRequest, - UpdateDirectiveStepRequest, UpdateGoalRequest, + BatchSetDirectiveMemoryRequest, CleanupTasksResponse, CreateDirectiveRequest, + CreateDirectiveStepRequest, Directive, DirectiveListResponse, DirectiveMemory, + DirectiveMemoryListResponse, DirectiveStep, DirectiveWithSteps, SetDirectiveMemoryRequest, + UpdateDirectiveRequest, UpdateDirectiveStepRequest, UpdateGoalRequest, }; use crate::db::repository; use crate::server::auth::Authenticated; @@ -1090,7 +1090,7 @@ pub async fn batch_set_memories( } } - match repository::batch_set_directive_memories(pool, id, &req.memories).await { + match repository::batch_set_directive_memories(pool, id, &req.entries).await { Ok(memories) => Json(memories).into_response(), Err(e) => { tracing::error!("Failed to batch set memories: {}", e); @@ -1226,3 +1226,65 @@ pub async fn clear_memories( } } } + +// ============================================================================= +// Task Cleanup +// ============================================================================= + +/// Clean up terminal tasks associated with a directive. +#[utoipa::path( + post, + path = "/api/v1/directives/{id}/cleanup-tasks", + params(("id" = Uuid, Path, description = "Directive ID")), + responses( + (status = 200, description = "Tasks cleaned up", body = CleanupTasksResponse), + (status = 404, description = "Not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + ), + security(("bearer_auth" = []), ("api_key" = [])), + tag = "Directives" +)] +pub async fn cleanup_tasks( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify directive ownership + match repository::get_directive_for_owner(pool, auth.owner_id, id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response(); + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("GET_FAILED", &e.to_string())), + ) + .into_response(); + } + } + + match repository::cleanup_directive_tasks(pool, auth.owner_id, id).await { + Ok(deleted) => Json(CleanupTasksResponse { deleted }).into_response(), + Err(e) => { + tracing::error!("Failed to cleanup directive tasks: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("CLEANUP_FAILED", &e.to_string())), + ) + .into_response() + } + } +} |
