diff options
Diffstat (limited to 'makima/src/db')
| -rw-r--r-- | makima/src/db/models.rs | 151 | ||||
| -rw-r--r-- | makima/src/db/repository.rs | 415 |
2 files changed, 565 insertions, 1 deletions
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index d0a0bd6..9159fd5 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -531,6 +531,14 @@ pub struct Task { /// Standalone completed tasks can be dismissed by the user. #[serde(default)] pub hidden: bool, + + // Directive association + /// Directive this task belongs to (for directive-driven tasks) + #[serde(skip_serializing_if = "Option::is_none")] + pub directive_id: Option<Uuid>, + /// Directive step this task executes + #[serde(skip_serializing_if = "Option::is_none")] + pub directive_step_id: Option<Uuid>, } impl Task { @@ -656,6 +664,10 @@ pub struct CreateTaskRequest { /// Task ID whose worktree this task shares. When set, this task reuses the supervisor's /// worktree instead of creating its own, and should NOT have its worktree deleted during cleanup. pub supervisor_worktree_task_id: Option<Uuid>, + /// Directive this task belongs to (for directive-driven tasks) + pub directive_id: Option<Uuid>, + /// Directive step this task executes + pub directive_step_id: Option<Uuid>, } /// Request payload for updating a task @@ -2682,3 +2694,142 @@ mod tests { } // ============================================================================= +// Directive Types +// ============================================================================= + +/// A directive — a long-lived top-level entity for managing projects via a DAG of steps. +#[derive(Debug, Clone, FromRow, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct Directive { + pub id: Uuid, + pub owner_id: Uuid, + pub title: String, + pub goal: String, + /// Status: draft, active, idle, paused, archived + pub status: String, + pub repository_url: Option<String>, + pub local_path: Option<String>, + pub base_branch: Option<String>, + pub orchestrator_task_id: Option<Uuid>, + pub goal_updated_at: DateTime<Utc>, + pub started_at: Option<DateTime<Utc>>, + pub version: i32, + pub created_at: DateTime<Utc>, + pub updated_at: DateTime<Utc>, +} + +/// A step in a directive's DAG. +#[derive(Debug, Clone, FromRow, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct DirectiveStep { + pub id: Uuid, + pub directive_id: Uuid, + pub name: String, + pub description: Option<String>, + pub task_plan: Option<String>, + pub depends_on: Vec<Uuid>, + /// Status: pending, ready, running, completed, failed, skipped + pub status: String, + pub task_id: Option<Uuid>, + pub order_index: i32, + pub generation: i32, + pub started_at: Option<DateTime<Utc>>, + pub completed_at: Option<DateTime<Utc>>, + pub created_at: DateTime<Utc>, +} + +/// Directive with its steps for detail view. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct DirectiveWithSteps { + #[serde(flatten)] + pub directive: Directive, + pub steps: Vec<DirectiveStep>, +} + +/// Summary for directive list views. +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct DirectiveSummary { + pub id: Uuid, + pub owner_id: Uuid, + pub title: String, + pub goal: String, + pub status: String, + pub repository_url: Option<String>, + pub orchestrator_task_id: Option<Uuid>, + pub version: i32, + pub created_at: DateTime<Utc>, + pub updated_at: DateTime<Utc>, + pub total_steps: i64, + pub completed_steps: i64, + pub running_steps: i64, + pub failed_steps: i64, +} + +/// List response for directives. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct DirectiveListResponse { + pub directives: Vec<DirectiveSummary>, + pub total: i64, +} + +/// Request to create a new directive. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreateDirectiveRequest { + pub title: String, + pub goal: String, + pub repository_url: Option<String>, + pub local_path: Option<String>, + pub base_branch: Option<String>, +} + +/// Request to update a directive. +#[derive(Debug, Default, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct UpdateDirectiveRequest { + pub title: Option<String>, + pub goal: Option<String>, + pub status: Option<String>, + pub repository_url: Option<String>, + pub local_path: Option<String>, + pub base_branch: Option<String>, + pub orchestrator_task_id: Option<Uuid>, + pub version: Option<i32>, +} + +/// Request to update a directive's goal (triggers re-planning). +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct UpdateGoalRequest { + pub goal: String, +} + +/// Request to create a directive step. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreateDirectiveStepRequest { + pub name: String, + pub description: Option<String>, + pub task_plan: Option<String>, + #[serde(default)] + pub depends_on: Vec<Uuid>, + #[serde(default)] + pub order_index: i32, + pub generation: Option<i32>, +} + +/// Request to update a directive step. +#[derive(Debug, Default, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct UpdateDirectiveStepRequest { + pub name: Option<String>, + pub description: Option<String>, + pub task_plan: Option<String>, + pub depends_on: Option<Vec<Uuid>>, + pub status: Option<String>, + pub task_id: Option<Uuid>, + pub order_index: Option<i32>, +} diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 4ed2298..f347fc7 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -11,7 +11,9 @@ use super::models::{ ContractTypeTemplateRecord, ConversationMessage, ConversationSnapshot, CreateContractRequest, CreateFileRequest, CreateTaskRequest, CreateTemplateRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, - DeliverableDefinition, + DeliverableDefinition, Directive, DirectiveStep, DirectiveSummary, + CreateDirectiveRequest, CreateDirectiveStepRequest, UpdateDirectiveRequest, + UpdateDirectiveStepRequest, File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters, MeshChatConversation, MeshChatMessageRecord, PhaseChangeResult, PhaseConfig, PhaseDefinition, SupervisorHeartbeatRecord, SupervisorState, @@ -4912,3 +4914,414 @@ fn truncate_string(s: &str, max_len: usize) -> String { } } +// ============================================================================= +// Directive CRUD +// ============================================================================= + +/// Create a new directive for an owner. +pub async fn create_directive_for_owner( + pool: &PgPool, + owner_id: Uuid, + req: CreateDirectiveRequest, +) -> Result<Directive, sqlx::Error> { + sqlx::query_as::<_, Directive>( + r#" + INSERT INTO directives (owner_id, title, goal, repository_url, local_path, base_branch) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING * + "#, + ) + .bind(owner_id) + .bind(&req.title) + .bind(&req.goal) + .bind(&req.repository_url) + .bind(&req.local_path) + .bind(&req.base_branch) + .fetch_one(pool) + .await +} + +/// Get a single directive for an owner. +pub async fn get_directive_for_owner( + pool: &PgPool, + owner_id: Uuid, + id: Uuid, +) -> Result<Option<Directive>, sqlx::Error> { + sqlx::query_as::<_, Directive>( + r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#, + ) + .bind(id) + .bind(owner_id) + .fetch_optional(pool) + .await +} + +/// Get a directive with all its steps. +pub async fn get_directive_with_steps_for_owner( + pool: &PgPool, + owner_id: Uuid, + id: Uuid, +) -> Result<Option<(Directive, Vec<DirectiveStep>)>, sqlx::Error> { + let directive = sqlx::query_as::<_, Directive>( + r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#, + ) + .bind(id) + .bind(owner_id) + .fetch_optional(pool) + .await?; + + match directive { + Some(d) => { + let steps = list_directive_steps(pool, d.id).await?; + Ok(Some((d, steps))) + } + None => Ok(None), + } +} + +/// List all directives for an owner with step counts. +pub async fn list_directives_for_owner( + pool: &PgPool, + owner_id: Uuid, +) -> Result<Vec<DirectiveSummary>, sqlx::Error> { + sqlx::query_as::<_, DirectiveSummary>( + r#" + SELECT + d.id, d.owner_id, d.title, d.goal, d.status, d.repository_url, + d.orchestrator_task_id, d.version, d.created_at, d.updated_at, + COALESCE((SELECT COUNT(*) FROM directive_steps WHERE directive_id = d.id), 0) as total_steps, + COALESCE((SELECT COUNT(*) FROM directive_steps WHERE directive_id = d.id AND status = 'completed'), 0) as completed_steps, + COALESCE((SELECT COUNT(*) FROM directive_steps WHERE directive_id = d.id AND status = 'running'), 0) as running_steps, + COALESCE((SELECT COUNT(*) FROM directive_steps WHERE directive_id = d.id AND status = 'failed'), 0) as failed_steps + FROM directives d + WHERE d.owner_id = $1 + ORDER BY d.created_at DESC + "#, + ) + .bind(owner_id) + .fetch_all(pool) + .await +} + +/// Update a directive with optimistic locking. +pub async fn update_directive_for_owner( + pool: &PgPool, + owner_id: Uuid, + id: Uuid, + req: UpdateDirectiveRequest, +) -> Result<Option<Directive>, RepositoryError> { + let current = sqlx::query_as::<_, Directive>( + r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#, + ) + .bind(id) + .bind(owner_id) + .fetch_optional(pool) + .await + .map_err(RepositoryError::Database)?; + + let current = match current { + Some(c) => c, + None => return Ok(None), + }; + + if let Some(expected_version) = req.version { + if expected_version != current.version { + return Err(RepositoryError::VersionConflict { + expected: expected_version, + actual: current.version, + }); + } + } + + let title = req.title.as_deref().unwrap_or(¤t.title); + let goal = req.goal.as_deref().unwrap_or(¤t.goal); + let status = req.status.as_deref().unwrap_or(¤t.status); + let repository_url = req.repository_url.as_deref().or(current.repository_url.as_deref()); + let local_path = req.local_path.as_deref().or(current.local_path.as_deref()); + let base_branch = req.base_branch.as_deref().or(current.base_branch.as_deref()); + let orchestrator_task_id = req.orchestrator_task_id.or(current.orchestrator_task_id); + + let result = sqlx::query_as::<_, Directive>( + r#" + UPDATE directives + SET title = $3, goal = $4, status = $5, repository_url = $6, local_path = $7, + base_branch = $8, orchestrator_task_id = $9, version = version + 1, updated_at = NOW() + WHERE id = $1 AND owner_id = $2 + RETURNING * + "#, + ) + .bind(id) + .bind(owner_id) + .bind(title) + .bind(goal) + .bind(status) + .bind(repository_url) + .bind(local_path) + .bind(base_branch) + .bind(orchestrator_task_id) + .fetch_optional(pool) + .await + .map_err(RepositoryError::Database)?; + + Ok(result) +} + +/// Delete a directive for an owner. +pub async fn delete_directive_for_owner( + pool: &PgPool, + owner_id: Uuid, + id: Uuid, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#"DELETE FROM directives WHERE id = $1 AND owner_id = $2"#, + ) + .bind(id) + .bind(owner_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +// ============================================================================= +// Directive Step CRUD +// ============================================================================= + +/// List all steps for a directive, ordered by order_index. +pub async fn list_directive_steps( + pool: &PgPool, + directive_id: Uuid, +) -> Result<Vec<DirectiveStep>, sqlx::Error> { + sqlx::query_as::<_, DirectiveStep>( + r#" + SELECT * FROM directive_steps + WHERE directive_id = $1 + ORDER BY order_index, created_at + "#, + ) + .bind(directive_id) + .fetch_all(pool) + .await +} + +/// Create a single directive step. +pub async fn create_directive_step( + pool: &PgPool, + directive_id: Uuid, + req: CreateDirectiveStepRequest, +) -> Result<DirectiveStep, sqlx::Error> { + let generation = req.generation.unwrap_or(1); + sqlx::query_as::<_, DirectiveStep>( + r#" + INSERT INTO directive_steps (directive_id, name, description, task_plan, depends_on, order_index, generation) + VALUES ($1, $2, $3, $4, $5, $6, $7) + RETURNING * + "#, + ) + .bind(directive_id) + .bind(&req.name) + .bind(&req.description) + .bind(&req.task_plan) + .bind(&req.depends_on) + .bind(req.order_index) + .bind(generation) + .fetch_one(pool) + .await +} + +/// Batch create multiple directive steps. +pub async fn batch_create_directive_steps( + pool: &PgPool, + directive_id: Uuid, + steps: Vec<CreateDirectiveStepRequest>, +) -> Result<Vec<DirectiveStep>, sqlx::Error> { + let mut results = Vec::with_capacity(steps.len()); + for req in steps { + let step = create_directive_step(pool, directive_id, req).await?; + results.push(step); + } + Ok(results) +} + +/// Update a directive step. +pub async fn update_directive_step( + pool: &PgPool, + step_id: Uuid, + req: UpdateDirectiveStepRequest, +) -> Result<Option<DirectiveStep>, sqlx::Error> { + let current = sqlx::query_as::<_, DirectiveStep>( + r#"SELECT * FROM directive_steps WHERE id = $1"#, + ) + .bind(step_id) + .fetch_optional(pool) + .await?; + + let current = match current { + Some(c) => c, + None => return Ok(None), + }; + + let name = req.name.as_deref().unwrap_or(¤t.name); + let description = req.description.as_deref().or(current.description.as_deref()); + let task_plan = req.task_plan.as_deref().or(current.task_plan.as_deref()); + let depends_on = req.depends_on.as_deref().unwrap_or(¤t.depends_on); + let status = req.status.as_deref().unwrap_or(¤t.status); + let task_id = req.task_id.or(current.task_id); + let order_index = req.order_index.unwrap_or(current.order_index); + + // Set started_at when transitioning to running + let started_at = if status == "running" && current.status != "running" { + Some(Utc::now()) + } else { + current.started_at + }; + + // Set completed_at when transitioning to terminal state + let completed_at = if matches!(status, "completed" | "failed" | "skipped") + && !matches!(current.status.as_str(), "completed" | "failed" | "skipped") + { + Some(Utc::now()) + } else { + current.completed_at + }; + + sqlx::query_as::<_, DirectiveStep>( + r#" + UPDATE directive_steps + SET name = $2, description = $3, task_plan = $4, depends_on = $5, + status = $6, task_id = $7, order_index = $8, started_at = $9, completed_at = $10 + WHERE id = $1 + RETURNING * + "#, + ) + .bind(step_id) + .bind(name) + .bind(description) + .bind(task_plan) + .bind(depends_on) + .bind(status) + .bind(task_id) + .bind(order_index) + .bind(started_at) + .bind(completed_at) + .fetch_optional(pool) + .await +} + +/// Delete a directive step. +pub async fn delete_directive_step( + pool: &PgPool, + step_id: Uuid, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query(r#"DELETE FROM directive_steps WHERE id = $1"#) + .bind(step_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +// ============================================================================= +// Directive DAG Progression +// ============================================================================= + +/// Advance pending steps to ready if all their dependencies are in terminal states. +/// Returns the newly-ready steps. +pub async fn advance_directive_ready_steps( + pool: &PgPool, + directive_id: Uuid, +) -> Result<Vec<DirectiveStep>, sqlx::Error> { + sqlx::query_as::<_, DirectiveStep>( + r#" + UPDATE directive_steps SET status = 'ready' + WHERE directive_id = $1 AND status = 'pending' + AND NOT EXISTS ( + SELECT 1 FROM unnest(depends_on) AS dep_id + JOIN directive_steps ds ON ds.id = dep_id + WHERE ds.status NOT IN ('completed', 'skipped') + ) + RETURNING * + "#, + ) + .bind(directive_id) + .fetch_all(pool) + .await +} + +/// Check if all steps in a directive are in terminal states. +/// If so, set the directive to 'idle' (not completed — directives are ongoing). +/// Returns true if the directive was set to idle. +pub async fn check_directive_idle( + pool: &PgPool, + directive_id: Uuid, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + UPDATE directives SET status = 'idle', updated_at = NOW() + WHERE id = $1 AND status = 'active' + AND NOT EXISTS ( + SELECT 1 FROM directive_steps + WHERE directive_id = $1 + AND status NOT IN ('completed', 'failed', 'skipped') + ) + AND EXISTS ( + SELECT 1 FROM directive_steps WHERE directive_id = $1 + ) + "#, + ) + .bind(directive_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Update a directive's goal and bump goal_updated_at. Reactivates if idle. +pub async fn update_directive_goal( + pool: &PgPool, + owner_id: Uuid, + directive_id: Uuid, + goal: &str, +) -> Result<Option<Directive>, sqlx::Error> { + sqlx::query_as::<_, Directive>( + r#" + UPDATE directives + SET goal = $3, + goal_updated_at = NOW(), + status = CASE WHEN status = 'idle' THEN 'active' ELSE status END, + updated_at = NOW(), + version = version + 1 + WHERE id = $1 AND owner_id = $2 + RETURNING * + "#, + ) + .bind(directive_id) + .bind(owner_id) + .bind(goal) + .fetch_optional(pool) + .await +} + +/// Set a directive's status (used for start/pause/archive transitions). +pub async fn set_directive_status( + pool: &PgPool, + owner_id: Uuid, + directive_id: Uuid, + status: &str, +) -> Result<Option<Directive>, sqlx::Error> { + let mut query = String::from( + r#"UPDATE directives SET status = $3, updated_at = NOW(), version = version + 1"#, + ); + if status == "active" { + query.push_str(", started_at = COALESCE(started_at, NOW())"); + } + query.push_str(" WHERE id = $1 AND owner_id = $2 RETURNING *"); + + sqlx::query_as::<_, Directive>(&query) + .bind(directive_id) + .bind(owner_id) + .bind(status) + .fetch_optional(pool) + .await +} + |
