From 8c23b3ab6f7fabca01b0468911bae073aa5ced32 Mon Sep 17 00:00:00 2001 From: soryu Date: Mon, 9 Feb 2026 00:11:51 +0000 Subject: Add new directive mechanism v3 --- makima/src/db/repository.rs | 415 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 414 insertions(+), 1 deletion(-) (limited to 'makima/src/db/repository.rs') diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 4ed2298..f347fc7 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -11,7 +11,9 @@ use super::models::{ ContractTypeTemplateRecord, ConversationMessage, ConversationSnapshot, CreateContractRequest, CreateFileRequest, CreateTaskRequest, CreateTemplateRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, - DeliverableDefinition, + DeliverableDefinition, Directive, DirectiveStep, DirectiveSummary, + CreateDirectiveRequest, CreateDirectiveStepRequest, UpdateDirectiveRequest, + UpdateDirectiveStepRequest, File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters, MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult, PhaseConfig, PhaseDefinition, SupervisorHeartbeatRecord, SupervisorState, @@ -4912,3 +4914,414 @@ fn truncate_string(s: &str, max_len: usize) -> String { } } +// ============================================================================= +// Directive CRUD +// ============================================================================= + +/// Create a new directive for an owner. +pub async fn create_directive_for_owner( + pool: &PgPool, + owner_id: Uuid, + req: CreateDirectiveRequest, +) -> Result { + sqlx::query_as::<_, Directive>( + r#" + INSERT INTO directives (owner_id, title, goal, repository_url, local_path, base_branch) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING * + "#, + ) + .bind(owner_id) + .bind(&req.title) + .bind(&req.goal) + .bind(&req.repository_url) + .bind(&req.local_path) + .bind(&req.base_branch) + .fetch_one(pool) + .await +} + +/// Get a single directive for an owner. +pub async fn get_directive_for_owner( + pool: &PgPool, + owner_id: Uuid, + id: Uuid, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, Directive>( + r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#, + ) + .bind(id) + .bind(owner_id) + .fetch_optional(pool) + .await +} + +/// Get a directive with all its steps. +pub async fn get_directive_with_steps_for_owner( + pool: &PgPool, + owner_id: Uuid, + id: Uuid, +) -> Result)>, sqlx::Error> { + let directive = sqlx::query_as::<_, Directive>( + r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#, + ) + .bind(id) + .bind(owner_id) + .fetch_optional(pool) + .await?; + + match directive { + Some(d) => { + let steps = list_directive_steps(pool, d.id).await?; + Ok(Some((d, steps))) + } + None => Ok(None), + } +} + +/// List all directives for an owner with step counts. +pub async fn list_directives_for_owner( + pool: &PgPool, + owner_id: Uuid, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, DirectiveSummary>( + r#" + SELECT + d.id, d.owner_id, d.title, d.goal, d.status, d.repository_url, + d.orchestrator_task_id, d.version, d.created_at, d.updated_at, + COALESCE((SELECT COUNT(*) FROM directive_steps WHERE directive_id = d.id), 0) as total_steps, + COALESCE((SELECT COUNT(*) FROM directive_steps WHERE directive_id = d.id AND status = 'completed'), 0) as completed_steps, + COALESCE((SELECT COUNT(*) FROM directive_steps WHERE directive_id = d.id AND status = 'running'), 0) as running_steps, + COALESCE((SELECT COUNT(*) FROM directive_steps WHERE directive_id = d.id AND status = 'failed'), 0) as failed_steps + FROM directives d + WHERE d.owner_id = $1 + ORDER BY d.created_at DESC + "#, + ) + .bind(owner_id) + .fetch_all(pool) + .await +} + +/// Update a directive with optimistic locking. +pub async fn update_directive_for_owner( + pool: &PgPool, + owner_id: Uuid, + id: Uuid, + req: UpdateDirectiveRequest, +) -> Result, RepositoryError> { + let current = sqlx::query_as::<_, Directive>( + r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#, + ) + .bind(id) + .bind(owner_id) + .fetch_optional(pool) + .await + .map_err(RepositoryError::Database)?; + + let current = match current { + Some(c) => c, + None => return Ok(None), + }; + + if let Some(expected_version) = req.version { + if expected_version != current.version { + return Err(RepositoryError::VersionConflict { + expected: expected_version, + actual: current.version, + }); + } + } + + let title = req.title.as_deref().unwrap_or(¤t.title); + let goal = req.goal.as_deref().unwrap_or(¤t.goal); + let status = req.status.as_deref().unwrap_or(¤t.status); + let repository_url = req.repository_url.as_deref().or(current.repository_url.as_deref()); + let local_path = req.local_path.as_deref().or(current.local_path.as_deref()); + let base_branch = req.base_branch.as_deref().or(current.base_branch.as_deref()); + let orchestrator_task_id = req.orchestrator_task_id.or(current.orchestrator_task_id); + + let result = sqlx::query_as::<_, Directive>( + r#" + UPDATE directives + SET title = $3, goal = $4, status = $5, repository_url = $6, local_path = $7, + base_branch = $8, orchestrator_task_id = $9, version = version + 1, updated_at = NOW() + WHERE id = $1 AND owner_id = $2 + RETURNING * + "#, + ) + .bind(id) + .bind(owner_id) + .bind(title) + .bind(goal) + .bind(status) + .bind(repository_url) + .bind(local_path) + .bind(base_branch) + .bind(orchestrator_task_id) + .fetch_optional(pool) + .await + .map_err(RepositoryError::Database)?; + + Ok(result) +} + +/// Delete a directive for an owner. +pub async fn delete_directive_for_owner( + pool: &PgPool, + owner_id: Uuid, + id: Uuid, +) -> Result { + let result = sqlx::query( + r#"DELETE FROM directives WHERE id = $1 AND owner_id = $2"#, + ) + .bind(id) + .bind(owner_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +// ============================================================================= +// Directive Step CRUD +// ============================================================================= + +/// List all steps for a directive, ordered by order_index. +pub async fn list_directive_steps( + pool: &PgPool, + directive_id: Uuid, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, DirectiveStep>( + r#" + SELECT * FROM directive_steps + WHERE directive_id = $1 + ORDER BY order_index, created_at + "#, + ) + .bind(directive_id) + .fetch_all(pool) + .await +} + +/// Create a single directive step. +pub async fn create_directive_step( + pool: &PgPool, + directive_id: Uuid, + req: CreateDirectiveStepRequest, +) -> Result { + let generation = req.generation.unwrap_or(1); + sqlx::query_as::<_, DirectiveStep>( + r#" + INSERT INTO directive_steps (directive_id, name, description, task_plan, depends_on, order_index, generation) + VALUES ($1, $2, $3, $4, $5, $6, $7) + RETURNING * + "#, + ) + .bind(directive_id) + .bind(&req.name) + .bind(&req.description) + .bind(&req.task_plan) + .bind(&req.depends_on) + .bind(req.order_index) + .bind(generation) + .fetch_one(pool) + .await +} + +/// Batch create multiple directive steps. +pub async fn batch_create_directive_steps( + pool: &PgPool, + directive_id: Uuid, + steps: Vec, +) -> Result, sqlx::Error> { + let mut results = Vec::with_capacity(steps.len()); + for req in steps { + let step = create_directive_step(pool, directive_id, req).await?; + results.push(step); + } + Ok(results) +} + +/// Update a directive step. +pub async fn update_directive_step( + pool: &PgPool, + step_id: Uuid, + req: UpdateDirectiveStepRequest, +) -> Result, sqlx::Error> { + let current = sqlx::query_as::<_, DirectiveStep>( + r#"SELECT * FROM directive_steps WHERE id = $1"#, + ) + .bind(step_id) + .fetch_optional(pool) + .await?; + + let current = match current { + Some(c) => c, + None => return Ok(None), + }; + + let name = req.name.as_deref().unwrap_or(¤t.name); + let description = req.description.as_deref().or(current.description.as_deref()); + let task_plan = req.task_plan.as_deref().or(current.task_plan.as_deref()); + let depends_on = req.depends_on.as_deref().unwrap_or(¤t.depends_on); + let status = req.status.as_deref().unwrap_or(¤t.status); + let task_id = req.task_id.or(current.task_id); + let order_index = req.order_index.unwrap_or(current.order_index); + + // Set started_at when transitioning to running + let started_at = if status == "running" && current.status != "running" { + Some(Utc::now()) + } else { + current.started_at + }; + + // Set completed_at when transitioning to terminal state + let completed_at = if matches!(status, "completed" | "failed" | "skipped") + && !matches!(current.status.as_str(), "completed" | "failed" | "skipped") + { + Some(Utc::now()) + } else { + current.completed_at + }; + + sqlx::query_as::<_, DirectiveStep>( + r#" + UPDATE directive_steps + SET name = $2, description = $3, task_plan = $4, depends_on = $5, + status = $6, task_id = $7, order_index = $8, started_at = $9, completed_at = $10 + WHERE id = $1 + RETURNING * + "#, + ) + .bind(step_id) + .bind(name) + .bind(description) + .bind(task_plan) + .bind(depends_on) + .bind(status) + .bind(task_id) + .bind(order_index) + .bind(started_at) + .bind(completed_at) + .fetch_optional(pool) + .await +} + +/// Delete a directive step. +pub async fn delete_directive_step( + pool: &PgPool, + step_id: Uuid, +) -> Result { + let result = sqlx::query(r#"DELETE FROM directive_steps WHERE id = $1"#) + .bind(step_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +// ============================================================================= +// Directive DAG Progression +// ============================================================================= + +/// Advance pending steps to ready if all their dependencies are in terminal states. +/// Returns the newly-ready steps. +pub async fn advance_directive_ready_steps( + pool: &PgPool, + directive_id: Uuid, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, DirectiveStep>( + r#" + UPDATE directive_steps SET status = 'ready' + WHERE directive_id = $1 AND status = 'pending' + AND NOT EXISTS ( + SELECT 1 FROM unnest(depends_on) AS dep_id + JOIN directive_steps ds ON ds.id = dep_id + WHERE ds.status NOT IN ('completed', 'skipped') + ) + RETURNING * + "#, + ) + .bind(directive_id) + .fetch_all(pool) + .await +} + +/// Check if all steps in a directive are in terminal states. +/// If so, set the directive to 'idle' (not completed — directives are ongoing). +/// Returns true if the directive was set to idle. +pub async fn check_directive_idle( + pool: &PgPool, + directive_id: Uuid, +) -> Result { + let result = sqlx::query( + r#" + UPDATE directives SET status = 'idle', updated_at = NOW() + WHERE id = $1 AND status = 'active' + AND NOT EXISTS ( + SELECT 1 FROM directive_steps + WHERE directive_id = $1 + AND status NOT IN ('completed', 'failed', 'skipped') + ) + AND EXISTS ( + SELECT 1 FROM directive_steps WHERE directive_id = $1 + ) + "#, + ) + .bind(directive_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Update a directive's goal and bump goal_updated_at. Reactivates if idle. +pub async fn update_directive_goal( + pool: &PgPool, + owner_id: Uuid, + directive_id: Uuid, + goal: &str, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, Directive>( + r#" + UPDATE directives + SET goal = $3, + goal_updated_at = NOW(), + status = CASE WHEN status = 'idle' THEN 'active' ELSE status END, + updated_at = NOW(), + version = version + 1 + WHERE id = $1 AND owner_id = $2 + RETURNING * + "#, + ) + .bind(directive_id) + .bind(owner_id) + .bind(goal) + .fetch_optional(pool) + .await +} + +/// Set a directive's status (used for start/pause/archive transitions). +pub async fn set_directive_status( + pool: &PgPool, + owner_id: Uuid, + directive_id: Uuid, + status: &str, +) -> Result, sqlx::Error> { + let mut query = String::from( + r#"UPDATE directives SET status = $3, updated_at = NOW(), version = version + 1"#, + ); + if status == "active" { + query.push_str(", started_at = COALESCE(started_at, NOW())"); + } + query.push_str(" WHERE id = $1 AND owner_id = $2 RETURNING *"); + + sqlx::query_as::<_, Directive>(&query) + .bind(directive_id) + .bind(owner_id) + .bind(status) + .fetch_optional(pool) + .await +} + -- cgit v1.2.3