summaryrefslogtreecommitdiff
path: root/makima/src/db
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/db')
-rw-r--r--makima/src/db/models.rs151
-rw-r--r--makima/src/db/repository.rs415
2 files changed, 565 insertions, 1 deletions
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs
index d0a0bd6..9159fd5 100644
--- a/makima/src/db/models.rs
+++ b/makima/src/db/models.rs
@@ -531,6 +531,14 @@ pub struct Task {
/// Standalone completed tasks can be dismissed by the user.
#[serde(default)]
pub hidden: bool,
+
+ // Directive association
+ /// Directive this task belongs to (for directive-driven tasks)
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub directive_id: Option<Uuid>,
+ /// Directive step this task executes
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub directive_step_id: Option<Uuid>,
}
impl Task {
@@ -656,6 +664,10 @@ pub struct CreateTaskRequest {
/// Task ID whose worktree this task shares. When set, this task reuses the supervisor's
/// worktree instead of creating its own, and should NOT have its worktree deleted during cleanup.
pub supervisor_worktree_task_id: Option<Uuid>,
+ /// Directive this task belongs to (for directive-driven tasks)
+ pub directive_id: Option<Uuid>,
+ /// Directive step this task executes
+ pub directive_step_id: Option<Uuid>,
}
/// Request payload for updating a task
@@ -2682,3 +2694,142 @@ mod tests {
}
// =============================================================================
+// Directive Types
+// =============================================================================
+
+/// A directive — a long-lived top-level entity for managing projects via a DAG of steps.
+#[derive(Debug, Clone, FromRow, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct Directive {
+ pub id: Uuid,
+ pub owner_id: Uuid,
+ pub title: String,
+ pub goal: String,
+ /// Status: draft, active, idle, paused, archived
+ pub status: String,
+ pub repository_url: Option<String>,
+ pub local_path: Option<String>,
+ pub base_branch: Option<String>,
+ pub orchestrator_task_id: Option<Uuid>,
+ pub goal_updated_at: DateTime<Utc>,
+ pub started_at: Option<DateTime<Utc>>,
+ pub version: i32,
+ pub created_at: DateTime<Utc>,
+ pub updated_at: DateTime<Utc>,
+}
+
+/// A step in a directive's DAG.
+#[derive(Debug, Clone, FromRow, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct DirectiveStep {
+ pub id: Uuid,
+ pub directive_id: Uuid,
+ pub name: String,
+ pub description: Option<String>,
+ pub task_plan: Option<String>,
+ pub depends_on: Vec<Uuid>,
+ /// Status: pending, ready, running, completed, failed, skipped
+ pub status: String,
+ pub task_id: Option<Uuid>,
+ pub order_index: i32,
+ pub generation: i32,
+ pub started_at: Option<DateTime<Utc>>,
+ pub completed_at: Option<DateTime<Utc>>,
+ pub created_at: DateTime<Utc>,
+}
+
+/// Directive with its steps for detail view.
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct DirectiveWithSteps {
+ #[serde(flatten)]
+ pub directive: Directive,
+ pub steps: Vec<DirectiveStep>,
+}
+
+/// Summary for directive list views.
+#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct DirectiveSummary {
+ pub id: Uuid,
+ pub owner_id: Uuid,
+ pub title: String,
+ pub goal: String,
+ pub status: String,
+ pub repository_url: Option<String>,
+ pub orchestrator_task_id: Option<Uuid>,
+ pub version: i32,
+ pub created_at: DateTime<Utc>,
+ pub updated_at: DateTime<Utc>,
+ pub total_steps: i64,
+ pub completed_steps: i64,
+ pub running_steps: i64,
+ pub failed_steps: i64,
+}
+
+/// List response for directives.
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct DirectiveListResponse {
+ pub directives: Vec<DirectiveSummary>,
+ pub total: i64,
+}
+
+/// Request to create a new directive.
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct CreateDirectiveRequest {
+ pub title: String,
+ pub goal: String,
+ pub repository_url: Option<String>,
+ pub local_path: Option<String>,
+ pub base_branch: Option<String>,
+}
+
+/// Request to update a directive.
+#[derive(Debug, Default, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct UpdateDirectiveRequest {
+ pub title: Option<String>,
+ pub goal: Option<String>,
+ pub status: Option<String>,
+ pub repository_url: Option<String>,
+ pub local_path: Option<String>,
+ pub base_branch: Option<String>,
+ pub orchestrator_task_id: Option<Uuid>,
+ pub version: Option<i32>,
+}
+
+/// Request to update a directive's goal (triggers re-planning).
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct UpdateGoalRequest {
+ pub goal: String,
+}
+
+/// Request to create a directive step.
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct CreateDirectiveStepRequest {
+ pub name: String,
+ pub description: Option<String>,
+ pub task_plan: Option<String>,
+ #[serde(default)]
+ pub depends_on: Vec<Uuid>,
+ #[serde(default)]
+ pub order_index: i32,
+ pub generation: Option<i32>,
+}
+
+/// Request to update a directive step.
+#[derive(Debug, Default, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct UpdateDirectiveStepRequest {
+ pub name: Option<String>,
+ pub description: Option<String>,
+ pub task_plan: Option<String>,
+ pub depends_on: Option<Vec<Uuid>>,
+ pub status: Option<String>,
+ pub task_id: Option<Uuid>,
+ pub order_index: Option<i32>,
+}
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index 4ed2298..f347fc7 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -11,7 +11,9 @@ use super::models::{
ContractTypeTemplateRecord, ConversationMessage, ConversationSnapshot,
CreateContractRequest, CreateFileRequest, CreateTaskRequest,
CreateTemplateRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity,
- DeliverableDefinition,
+ DeliverableDefinition, Directive, DirectiveStep, DirectiveSummary,
+ CreateDirectiveRequest, CreateDirectiveStepRequest, UpdateDirectiveRequest,
+ UpdateDirectiveStepRequest,
File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters,
MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult, PhaseConfig,
PhaseDefinition, SupervisorHeartbeatRecord, SupervisorState,
@@ -4912,3 +4914,414 @@ fn truncate_string(s: &str, max_len: usize) -> String {
}
}
+// =============================================================================
+// Directive CRUD
+// =============================================================================
+
+/// Create a new directive for an owner.
+pub async fn create_directive_for_owner(
+ pool: &PgPool,
+ owner_id: Uuid,
+ req: CreateDirectiveRequest,
+) -> Result<Directive, sqlx::Error> {
+ sqlx::query_as::<_, Directive>(
+ r#"
+ INSERT INTO directives (owner_id, title, goal, repository_url, local_path, base_branch)
+ VALUES ($1, $2, $3, $4, $5, $6)
+ RETURNING *
+ "#,
+ )
+ .bind(owner_id)
+ .bind(&req.title)
+ .bind(&req.goal)
+ .bind(&req.repository_url)
+ .bind(&req.local_path)
+ .bind(&req.base_branch)
+ .fetch_one(pool)
+ .await
+}
+
+/// Get a single directive for an owner.
+pub async fn get_directive_for_owner(
+ pool: &PgPool,
+ owner_id: Uuid,
+ id: Uuid,
+) -> Result<Option<Directive>, sqlx::Error> {
+ sqlx::query_as::<_, Directive>(
+ r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Get a directive with all its steps.
+pub async fn get_directive_with_steps_for_owner(
+ pool: &PgPool,
+ owner_id: Uuid,
+ id: Uuid,
+) -> Result<Option<(Directive, Vec<DirectiveStep>)>, sqlx::Error> {
+ let directive = sqlx::query_as::<_, Directive>(
+ r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .fetch_optional(pool)
+ .await?;
+
+ match directive {
+ Some(d) => {
+ let steps = list_directive_steps(pool, d.id).await?;
+ Ok(Some((d, steps)))
+ }
+ None => Ok(None),
+ }
+}
+
+/// List all directives for an owner with step counts.
+pub async fn list_directives_for_owner(
+ pool: &PgPool,
+ owner_id: Uuid,
+) -> Result<Vec<DirectiveSummary>, sqlx::Error> {
+ sqlx::query_as::<_, DirectiveSummary>(
+ r#"
+ SELECT
+ d.id, d.owner_id, d.title, d.goal, d.status, d.repository_url,
+ d.orchestrator_task_id, d.version, d.created_at, d.updated_at,
+ COALESCE((SELECT COUNT(*) FROM directive_steps WHERE directive_id = d.id), 0) as total_steps,
+ COALESCE((SELECT COUNT(*) FROM directive_steps WHERE directive_id = d.id AND status = 'completed'), 0) as completed_steps,
+ COALESCE((SELECT COUNT(*) FROM directive_steps WHERE directive_id = d.id AND status = 'running'), 0) as running_steps,
+ COALESCE((SELECT COUNT(*) FROM directive_steps WHERE directive_id = d.id AND status = 'failed'), 0) as failed_steps
+ FROM directives d
+ WHERE d.owner_id = $1
+ ORDER BY d.created_at DESC
+ "#,
+ )
+ .bind(owner_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Update a directive with optimistic locking.
+pub async fn update_directive_for_owner(
+ pool: &PgPool,
+ owner_id: Uuid,
+ id: Uuid,
+ req: UpdateDirectiveRequest,
+) -> Result<Option<Directive>, RepositoryError> {
+ let current = sqlx::query_as::<_, Directive>(
+ r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .fetch_optional(pool)
+ .await
+ .map_err(RepositoryError::Database)?;
+
+ let current = match current {
+ Some(c) => c,
+ None => return Ok(None),
+ };
+
+ if let Some(expected_version) = req.version {
+ if expected_version != current.version {
+ return Err(RepositoryError::VersionConflict {
+ expected: expected_version,
+ actual: current.version,
+ });
+ }
+ }
+
+ let title = req.title.as_deref().unwrap_or(&current.title);
+ let goal = req.goal.as_deref().unwrap_or(&current.goal);
+ let status = req.status.as_deref().unwrap_or(&current.status);
+ let repository_url = req.repository_url.as_deref().or(current.repository_url.as_deref());
+ let local_path = req.local_path.as_deref().or(current.local_path.as_deref());
+ let base_branch = req.base_branch.as_deref().or(current.base_branch.as_deref());
+ let orchestrator_task_id = req.orchestrator_task_id.or(current.orchestrator_task_id);
+
+ let result = sqlx::query_as::<_, Directive>(
+ r#"
+ UPDATE directives
+ SET title = $3, goal = $4, status = $5, repository_url = $6, local_path = $7,
+ base_branch = $8, orchestrator_task_id = $9, version = version + 1, updated_at = NOW()
+ WHERE id = $1 AND owner_id = $2
+ RETURNING *
+ "#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .bind(title)
+ .bind(goal)
+ .bind(status)
+ .bind(repository_url)
+ .bind(local_path)
+ .bind(base_branch)
+ .bind(orchestrator_task_id)
+ .fetch_optional(pool)
+ .await
+ .map_err(RepositoryError::Database)?;
+
+ Ok(result)
+}
+
+/// Delete a directive for an owner.
+pub async fn delete_directive_for_owner(
+ pool: &PgPool,
+ owner_id: Uuid,
+ id: Uuid,
+) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"DELETE FROM directives WHERE id = $1 AND owner_id = $2"#,
+ )
+ .bind(id)
+ .bind(owner_id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+// =============================================================================
+// Directive Step CRUD
+// =============================================================================
+
+/// List all steps for a directive, ordered by order_index.
+pub async fn list_directive_steps(
+ pool: &PgPool,
+ directive_id: Uuid,
+) -> Result<Vec<DirectiveStep>, sqlx::Error> {
+ sqlx::query_as::<_, DirectiveStep>(
+ r#"
+ SELECT * FROM directive_steps
+ WHERE directive_id = $1
+ ORDER BY order_index, created_at
+ "#,
+ )
+ .bind(directive_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Create a single directive step.
+pub async fn create_directive_step(
+ pool: &PgPool,
+ directive_id: Uuid,
+ req: CreateDirectiveStepRequest,
+) -> Result<DirectiveStep, sqlx::Error> {
+ let generation = req.generation.unwrap_or(1);
+ sqlx::query_as::<_, DirectiveStep>(
+ r#"
+ INSERT INTO directive_steps (directive_id, name, description, task_plan, depends_on, order_index, generation)
+ VALUES ($1, $2, $3, $4, $5, $6, $7)
+ RETURNING *
+ "#,
+ )
+ .bind(directive_id)
+ .bind(&req.name)
+ .bind(&req.description)
+ .bind(&req.task_plan)
+ .bind(&req.depends_on)
+ .bind(req.order_index)
+ .bind(generation)
+ .fetch_one(pool)
+ .await
+}
+
+/// Batch create multiple directive steps.
+pub async fn batch_create_directive_steps(
+ pool: &PgPool,
+ directive_id: Uuid,
+ steps: Vec<CreateDirectiveStepRequest>,
+) -> Result<Vec<DirectiveStep>, sqlx::Error> {
+ let mut results = Vec::with_capacity(steps.len());
+ for req in steps {
+ let step = create_directive_step(pool, directive_id, req).await?;
+ results.push(step);
+ }
+ Ok(results)
+}
+
+/// Update a directive step.
+pub async fn update_directive_step(
+ pool: &PgPool,
+ step_id: Uuid,
+ req: UpdateDirectiveStepRequest,
+) -> Result<Option<DirectiveStep>, sqlx::Error> {
+ let current = sqlx::query_as::<_, DirectiveStep>(
+ r#"SELECT * FROM directive_steps WHERE id = $1"#,
+ )
+ .bind(step_id)
+ .fetch_optional(pool)
+ .await?;
+
+ let current = match current {
+ Some(c) => c,
+ None => return Ok(None),
+ };
+
+ let name = req.name.as_deref().unwrap_or(&current.name);
+ let description = req.description.as_deref().or(current.description.as_deref());
+ let task_plan = req.task_plan.as_deref().or(current.task_plan.as_deref());
+ let depends_on = req.depends_on.as_deref().unwrap_or(&current.depends_on);
+ let status = req.status.as_deref().unwrap_or(&current.status);
+ let task_id = req.task_id.or(current.task_id);
+ let order_index = req.order_index.unwrap_or(current.order_index);
+
+ // Set started_at when transitioning to running
+ let started_at = if status == "running" && current.status != "running" {
+ Some(Utc::now())
+ } else {
+ current.started_at
+ };
+
+ // Set completed_at when transitioning to terminal state
+ let completed_at = if matches!(status, "completed" | "failed" | "skipped")
+ && !matches!(current.status.as_str(), "completed" | "failed" | "skipped")
+ {
+ Some(Utc::now())
+ } else {
+ current.completed_at
+ };
+
+ sqlx::query_as::<_, DirectiveStep>(
+ r#"
+ UPDATE directive_steps
+ SET name = $2, description = $3, task_plan = $4, depends_on = $5,
+ status = $6, task_id = $7, order_index = $8, started_at = $9, completed_at = $10
+ WHERE id = $1
+ RETURNING *
+ "#,
+ )
+ .bind(step_id)
+ .bind(name)
+ .bind(description)
+ .bind(task_plan)
+ .bind(depends_on)
+ .bind(status)
+ .bind(task_id)
+ .bind(order_index)
+ .bind(started_at)
+ .bind(completed_at)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Delete a directive step.
+pub async fn delete_directive_step(
+ pool: &PgPool,
+ step_id: Uuid,
+) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(r#"DELETE FROM directive_steps WHERE id = $1"#)
+ .bind(step_id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+// =============================================================================
+// Directive DAG Progression
+// =============================================================================
+
+/// Advance pending steps to ready if all their dependencies are in terminal states.
+/// Returns the newly-ready steps.
+pub async fn advance_directive_ready_steps(
+ pool: &PgPool,
+ directive_id: Uuid,
+) -> Result<Vec<DirectiveStep>, sqlx::Error> {
+ sqlx::query_as::<_, DirectiveStep>(
+ r#"
+ UPDATE directive_steps SET status = 'ready'
+ WHERE directive_id = $1 AND status = 'pending'
+ AND NOT EXISTS (
+ SELECT 1 FROM unnest(depends_on) AS dep_id
+ JOIN directive_steps ds ON ds.id = dep_id
+ WHERE ds.status NOT IN ('completed', 'skipped')
+ )
+ RETURNING *
+ "#,
+ )
+ .bind(directive_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Check if all steps in a directive are in terminal states.
+/// If so, set the directive to 'idle' (not completed — directives are ongoing).
+/// Returns true if the directive was set to idle.
+pub async fn check_directive_idle(
+ pool: &PgPool,
+ directive_id: Uuid,
+) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ UPDATE directives SET status = 'idle', updated_at = NOW()
+ WHERE id = $1 AND status = 'active'
+ AND NOT EXISTS (
+ SELECT 1 FROM directive_steps
+ WHERE directive_id = $1
+ AND status NOT IN ('completed', 'failed', 'skipped')
+ )
+ AND EXISTS (
+ SELECT 1 FROM directive_steps WHERE directive_id = $1
+ )
+ "#,
+ )
+ .bind(directive_id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// Update a directive's goal and bump goal_updated_at. Reactivates if idle.
+pub async fn update_directive_goal(
+ pool: &PgPool,
+ owner_id: Uuid,
+ directive_id: Uuid,
+ goal: &str,
+) -> Result<Option<Directive>, sqlx::Error> {
+ sqlx::query_as::<_, Directive>(
+ r#"
+ UPDATE directives
+ SET goal = $3,
+ goal_updated_at = NOW(),
+ status = CASE WHEN status = 'idle' THEN 'active' ELSE status END,
+ updated_at = NOW(),
+ version = version + 1
+ WHERE id = $1 AND owner_id = $2
+ RETURNING *
+ "#,
+ )
+ .bind(directive_id)
+ .bind(owner_id)
+ .bind(goal)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Set a directive's status (used for start/pause/archive transitions).
+pub async fn set_directive_status(
+ pool: &PgPool,
+ owner_id: Uuid,
+ directive_id: Uuid,
+ status: &str,
+) -> Result<Option<Directive>, sqlx::Error> {
+ let mut query = String::from(
+ r#"UPDATE directives SET status = $3, updated_at = NOW(), version = version + 1"#,
+ );
+ if status == "active" {
+ query.push_str(", started_at = COALESCE(started_at, NOW())");
+ }
+ query.push_str(" WHERE id = $1 AND owner_id = $2 RETURNING *");
+
+ sqlx::query_as::<_, Directive>(&query)
+ .bind(directive_id)
+ .bind(owner_id)
+ .bind(status)
+ .fetch_optional(pool)
+ .await
+}
+