summaryrefslogtreecommitdiff
path: root/makima/src/db/repository.rs
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-02-09 00:11:51 +0000
committersoryu <soryu@soryu.co>2026-02-09 00:11:51 +0000
commit8c23b3ab6f7fabca01b0468911bae073aa5ced32 (patch)
treef50159aee13b13f0b55618ac09e9be1f89a41bb2 /makima/src/db/repository.rs
parent3662b334dfd68cfdf00ed44ae88927c2e1b2aabe (diff)
downloadsoryu-8c23b3ab6f7fabca01b0468911bae073aa5ced32.tar.gz
soryu-8c23b3ab6f7fabca01b0468911bae073aa5ced32.zip
Add new directive mechanism v3
Diffstat (limited to 'makima/src/db/repository.rs')
-rw-r--r--makima/src/db/repository.rs415
1 files changed, 414 insertions, 1 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index 4ed2298..f347fc7 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -11,7 +11,9 @@ use super::models::{
ContractTypeTemplateRecord, ConversationMessage, ConversationSnapshot,
CreateContractRequest, CreateFileRequest, CreateTaskRequest,
CreateTemplateRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity,
- DeliverableDefinition,
+ DeliverableDefinition, Directive, DirectiveStep, DirectiveSummary,
+ CreateDirectiveRequest, CreateDirectiveStepRequest, UpdateDirectiveRequest,
+ UpdateDirectiveStepRequest,
File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters,
MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult, PhaseConfig,
PhaseDefinition, SupervisorHeartbeatRecord, SupervisorState,
@@ -4912,3 +4914,414 @@ fn truncate_string(s: &str, max_len: usize) -> String {
}
}
+// =============================================================================
+// Directive CRUD
+// =============================================================================
+
+/// Create a new directive for an owner.
+pub async fn create_directive_for_owner(
+ pool: &PgPool,
+ owner_id: Uuid,
+ req: CreateDirectiveRequest,
+) -> Result<Directive, sqlx::Error> {
+ sqlx::query_as::<_, Directive>(
+ r#"
+ INSERT INTO directives (owner_id, title, goal, repository_url, local_path, base_branch)
+ VALUES ($1, $2, $3, $4, $5, $6)
+ RETURNING *
+ "#,
+ )
+ .bind(owner_id)
+ .bind(&req.title)
+ .bind(&req.goal)
+ .bind(&req.repository_url)
+ .bind(&req.local_path)
+ .bind(&req.base_branch)
+ .fetch_one(pool)
+ .await
+}
+
+/// Get a single directive for an owner.
+pub async fn get_directive_for_owner(
+ pool: &PgPool,
+ owner_id: Uuid,
+ id: Uuid,
+) -> Result<Option<Directive>, sqlx::Error> {
+ sqlx::query_as::<_, Directive>(
+ r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Get a directive with all its steps.
+pub async fn get_directive_with_steps_for_owner(
+ pool: &PgPool,
+ owner_id: Uuid,
+ id: Uuid,
+) -> Result<Option<(Directive, Vec<DirectiveStep>)>, sqlx::Error> {
+ let directive = sqlx::query_as::<_, Directive>(
+ r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .fetch_optional(pool)
+ .await?;
+
+ match directive {
+ Some(d) => {
+ let steps = list_directive_steps(pool, d.id).await?;
+ Ok(Some((d, steps)))
+ }
+ None => Ok(None),
+ }
+}
+
+/// List all directives for an owner with step counts.
+pub async fn list_directives_for_owner(
+ pool: &PgPool,
+ owner_id: Uuid,
+) -> Result<Vec<DirectiveSummary>, sqlx::Error> {
+ sqlx::query_as::<_, DirectiveSummary>(
+ r#"
+ SELECT
+ d.id, d.owner_id, d.title, d.goal, d.status, d.repository_url,
+ d.orchestrator_task_id, d.version, d.created_at, d.updated_at,
+ COALESCE((SELECT COUNT(*) FROM directive_steps WHERE directive_id = d.id), 0) as total_steps,
+ COALESCE((SELECT COUNT(*) FROM directive_steps WHERE directive_id = d.id AND status = 'completed'), 0) as completed_steps,
+ COALESCE((SELECT COUNT(*) FROM directive_steps WHERE directive_id = d.id AND status = 'running'), 0) as running_steps,
+ COALESCE((SELECT COUNT(*) FROM directive_steps WHERE directive_id = d.id AND status = 'failed'), 0) as failed_steps
+ FROM directives d
+ WHERE d.owner_id = $1
+ ORDER BY d.created_at DESC
+ "#,
+ )
+ .bind(owner_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Update a directive with optimistic locking.
+pub async fn update_directive_for_owner(
+ pool: &PgPool,
+ owner_id: Uuid,
+ id: Uuid,
+ req: UpdateDirectiveRequest,
+) -> Result<Option<Directive>, RepositoryError> {
+ let current = sqlx::query_as::<_, Directive>(
+ r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .fetch_optional(pool)
+ .await
+ .map_err(RepositoryError::Database)?;
+
+ let current = match current {
+ Some(c) => c,
+ None => return Ok(None),
+ };
+
+ if let Some(expected_version) = req.version {
+ if expected_version != current.version {
+ return Err(RepositoryError::VersionConflict {
+ expected: expected_version,
+ actual: current.version,
+ });
+ }
+ }
+
+ let title = req.title.as_deref().unwrap_or(&current.title);
+ let goal = req.goal.as_deref().unwrap_or(&current.goal);
+ let status = req.status.as_deref().unwrap_or(&current.status);
+ let repository_url = req.repository_url.as_deref().or(current.repository_url.as_deref());
+ let local_path = req.local_path.as_deref().or(current.local_path.as_deref());
+ let base_branch = req.base_branch.as_deref().or(current.base_branch.as_deref());
+ let orchestrator_task_id = req.orchestrator_task_id.or(current.orchestrator_task_id);
+
+ let result = sqlx::query_as::<_, Directive>(
+ r#"
+ UPDATE directives
+ SET title = $3, goal = $4, status = $5, repository_url = $6, local_path = $7,
+ base_branch = $8, orchestrator_task_id = $9, version = version + 1, updated_at = NOW()
+ WHERE id = $1 AND owner_id = $2
+ RETURNING *
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .bind(title)
+ .bind(goal)
+ .bind(status)
+ .bind(repository_url)
+ .bind(local_path)
+ .bind(base_branch)
+ .bind(orchestrator_task_id)
+ .fetch_optional(pool)
+ .await
+ .map_err(RepositoryError::Database)?;
+
+ Ok(result)
+}
+
+/// Delete a directive for an owner.
+pub async fn delete_directive_for_owner(
+ pool: &PgPool,
+ owner_id: Uuid,
+ id: Uuid,
+) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"DELETE FROM directives WHERE id = $1 AND owner_id = $2"#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+// =============================================================================
+// Directive Step CRUD
+// =============================================================================
+
+/// List all steps for a directive, ordered by order_index.
+pub async fn list_directive_steps(
+ pool: &PgPool,
+ directive_id: Uuid,
+) -> Result<Vec<DirectiveStep>, sqlx::Error> {
+ sqlx::query_as::<_, DirectiveStep>(
+ r#"
+ SELECT * FROM directive_steps
+ WHERE directive_id = $1
+ ORDER BY order_index, created_at
+ "#,
+ )
+ .bind(directive_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Create a single directive step.
+pub async fn create_directive_step(
+ pool: &PgPool,
+ directive_id: Uuid,
+ req: CreateDirectiveStepRequest,
+) -> Result<DirectiveStep, sqlx::Error> {
+ let generation = req.generation.unwrap_or(1);
+ sqlx::query_as::<_, DirectiveStep>(
+ r#"
+ INSERT INTO directive_steps (directive_id, name, description, task_plan, depends_on, order_index, generation)
+ VALUES ($1, $2, $3, $4, $5, $6, $7)
+ RETURNING *
+ "#,
+ )
+ .bind(directive_id)
+ .bind(&req.name)
+ .bind(&req.description)
+ .bind(&req.task_plan)
+ .bind(&req.depends_on)
+ .bind(req.order_index)
+ .bind(generation)
+ .fetch_one(pool)
+ .await
+}
+
+/// Batch create multiple directive steps.
+pub async fn batch_create_directive_steps(
+ pool: &PgPool,
+ directive_id: Uuid,
+ steps: Vec<CreateDirectiveStepRequest>,
+) -> Result<Vec<DirectiveStep>, sqlx::Error> {
+ let mut results = Vec::with_capacity(steps.len());
+ for req in steps {
+ let step = create_directive_step(pool, directive_id, req).await?;
+ results.push(step);
+ }
+ Ok(results)
+}
+
+/// Update a directive step.
+pub async fn update_directive_step(
+ pool: &PgPool,
+ step_id: Uuid,
+ req: UpdateDirectiveStepRequest,
+) -> Result<Option<DirectiveStep>, sqlx::Error> {
+ let current = sqlx::query_as::<_, DirectiveStep>(
+ r#"SELECT * FROM directive_steps WHERE id = $1"#,
+ )
+ .bind(step_id)
+ .fetch_optional(pool)
+ .await?;
+
+ let current = match current {
+ Some(c) => c,
+ None => return Ok(None),
+ };
+
+ let name = req.name.as_deref().unwrap_or(&current.name);
+ let description = req.description.as_deref().or(current.description.as_deref());
+ let task_plan = req.task_plan.as_deref().or(current.task_plan.as_deref());
+ let depends_on = req.depends_on.as_deref().unwrap_or(&current.depends_on);
+ let status = req.status.as_deref().unwrap_or(&current.status);
+ let task_id = req.task_id.or(current.task_id);
+ let order_index = req.order_index.unwrap_or(current.order_index);
+
+ // Set started_at when transitioning to running
+ let started_at = if status == "running" && current.status != "running" {
+ Some(Utc::now())
+ } else {
+ current.started_at
+ };
+
+ // Set completed_at when transitioning to terminal state
+ let completed_at = if matches!(status, "completed" | "failed" | "skipped")
+ && !matches!(current.status.as_str(), "completed" | "failed" | "skipped")
+ {
+ Some(Utc::now())
+ } else {
+ current.completed_at
+ };
+
+ sqlx::query_as::<_, DirectiveStep>(
+ r#"
+ UPDATE directive_steps
+ SET name = $2, description = $3, task_plan = $4, depends_on = $5,
+ status = $6, task_id = $7, order_index = $8, started_at = $9, completed_at = $10
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(step_id)
+ .bind(name)
+ .bind(description)
+ .bind(task_plan)
+ .bind(depends_on)
+ .bind(status)
+ .bind(task_id)
+ .bind(order_index)
+ .bind(started_at)
+ .bind(completed_at)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Delete a directive step.
+pub async fn delete_directive_step(
+ pool: &PgPool,
+ step_id: Uuid,
+) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(r#"DELETE FROM directive_steps WHERE id = $1"#)
+ .bind(step_id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+// =============================================================================
+// Directive DAG Progression
+// =============================================================================
+
+/// Advance pending steps to ready if all their dependencies are in terminal states.
+/// Returns the newly-ready steps.
+pub async fn advance_directive_ready_steps(
+ pool: &PgPool,
+ directive_id: Uuid,
+) -> Result<Vec<DirectiveStep>, sqlx::Error> {
+ sqlx::query_as::<_, DirectiveStep>(
+ r#"
+ UPDATE directive_steps SET status = 'ready'
+ WHERE directive_id = $1 AND status = 'pending'
+ AND NOT EXISTS (
+ SELECT 1 FROM unnest(depends_on) AS dep_id
+ JOIN directive_steps ds ON ds.id = dep_id
+ WHERE ds.status NOT IN ('completed', 'skipped')
+ )
+ RETURNING *
+ "#,
+ )
+ .bind(directive_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Check if all steps in a directive are in terminal states.
+/// If so, set the directive to 'idle' (not completed — directives are ongoing).
+/// Returns true if the directive was set to idle.
+pub async fn check_directive_idle(
+ pool: &PgPool,
+ directive_id: Uuid,
+) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ UPDATE directives SET status = 'idle', updated_at = NOW()
+ WHERE id = $1 AND status = 'active'
+ AND NOT EXISTS (
+ SELECT 1 FROM directive_steps
+ WHERE directive_id = $1
+ AND status NOT IN ('completed', 'failed', 'skipped')
+ )
+ AND EXISTS (
+ SELECT 1 FROM directive_steps WHERE directive_id = $1
+ )
+ "#,
+ )
+ .bind(directive_id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// Update a directive's goal and bump goal_updated_at. Reactivates if idle.
+pub async fn update_directive_goal(
+ pool: &PgPool,
+ owner_id: Uuid,
+ directive_id: Uuid,
+ goal: &str,
+) -> Result<Option<Directive>, sqlx::Error> {
+ sqlx::query_as::<_, Directive>(
+ r#"
+ UPDATE directives
+ SET goal = $3,
+ goal_updated_at = NOW(),
+ status = CASE WHEN status = 'idle' THEN 'active' ELSE status END,
+ updated_at = NOW(),
+ version = version + 1
+ WHERE id = $1 AND owner_id = $2
+ RETURNING *
+ "#,
+ )
+ .bind(directive_id)
+ .bind(owner_id)
+ .bind(goal)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Set a directive's status (used for start/pause/archive transitions).
+pub async fn set_directive_status(
+ pool: &PgPool,
+ owner_id: Uuid,
+ directive_id: Uuid,
+ status: &str,
+) -> Result<Option<Directive>, sqlx::Error> {
+ let mut query = String::from(
+ r#"UPDATE directives SET status = $3, updated_at = NOW(), version = version + 1"#,
+ );
+ if status == "active" {
+ query.push_str(", started_at = COALESCE(started_at, NOW())");
+ }
+ query.push_str(" WHERE id = $1 AND owner_id = $2 RETURNING *");
+
+ sqlx::query_as::<_, Directive>(&query)
+ .bind(directive_id)
+ .bind(owner_id)
+ .bind(status)
+ .fetch_optional(pool)
+ .await
+}
+