diff options
Diffstat (limited to 'makima/src')
| -rw-r--r-- | makima/src/db/models.rs | 10 | ||||
| -rw-r--r-- | makima/src/db/repository.rs | 104 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh.rs | 51 | ||||
| -rw-r--r-- | makima/src/server/mod.rs | 61 |
4 files changed, 216 insertions, 10 deletions
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index 291fad7..1eaaf58 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -441,6 +441,12 @@ pub struct Task { #[serde(default)] pub is_supervisor: bool, + // Anonymous task flag + /// True for ephemeral one-off tasks that don't belong to a contract. + /// Anonymous tasks are automatically cleaned up after a period of inactivity. + #[serde(default)] + pub is_anonymous: bool, + // Daemon/container info pub daemon_id: Option<Uuid>, pub container_id: Option<String>, @@ -558,6 +564,9 @@ pub struct TaskSummary { /// True for contract supervisor tasks #[serde(default)] pub is_supervisor: bool, + /// True for ephemeral anonymous tasks + #[serde(default)] + pub is_anonymous: bool, pub created_at: DateTime<Utc>, pub updated_at: DateTime<Utc>, } @@ -580,6 +589,7 @@ impl From<Task> for TaskSummary { subtask_count: 0, // Would need separate query version: task.version, is_supervisor: task.is_supervisor, + is_anonymous: task.is_anonymous, created_at: task.created_at, updated_at: task.updated_at, } diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 536bc9b..d7f2e69 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -739,7 +739,7 @@ pub async fn list_tasks(pool: &PgPool) -> Result<Vec<TaskSummary>, sqlx::Error> t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, - t.version, t.is_supervisor, t.created_at, t.updated_at + t.version, t.is_supervisor, t.is_anonymous, t.created_at, t.updated_at FROM tasks t LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id IS NULL @@ -760,7 +760,7 @@ pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSum t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, - t.version, t.is_supervisor, t.created_at, t.updated_at + t.version, t.is_supervisor, t.is_anonymous, t.created_at, t.updated_at FROM tasks t LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id = $1 @@ -1135,7 +1135,7 @@ pub async fn list_tasks_for_owner( t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, - t.version, t.is_supervisor, t.created_at, t.updated_at + t.version, t.is_supervisor, t.is_anonymous, t.created_at, t.updated_at FROM tasks t LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.owner_id = $1 AND t.parent_task_id IS NULL @@ -1161,7 +1161,7 @@ pub async fn list_subtasks_for_owner( t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, - t.version, t.is_supervisor, t.created_at, t.updated_at + t.version, t.is_supervisor, t.is_anonymous, t.created_at, t.updated_at FROM tasks t LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.owner_id = $1 AND t.parent_task_id = $2 @@ -1679,7 +1679,7 @@ pub async fn list_sibling_tasks( t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, - t.version, t.is_supervisor, t.created_at, t.updated_at + t.version, t.is_supervisor, t.is_anonymous, t.created_at, t.updated_at FROM tasks t LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id = $1 AND t.id != $2 @@ -1701,7 +1701,7 @@ pub async fn list_sibling_tasks( t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, - t.version, t.is_supervisor, t.created_at, t.updated_at + t.version, t.is_supervisor, t.is_anonymous, t.created_at, t.updated_at FROM tasks t LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.parent_task_id IS NULL AND t.id != $1 @@ -2679,7 +2679,7 @@ pub async fn list_tasks_in_contract( t.parent_task_id, t.depth, t.name, t.status, t.priority, t.progress_summary, (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, - t.version, t.is_supervisor, t.created_at, t.updated_at + t.version, t.is_supervisor, t.is_anonymous, t.created_at, t.updated_at FROM tasks t LEFT JOIN contracts c ON t.contract_id = c.id WHERE t.contract_id = $1 AND t.owner_id = $2 @@ -3678,3 +3678,93 @@ pub async fn get_supervisor_conversation_full( ) -> Result<Option<SupervisorState>, sqlx::Error> { get_supervisor_state(pool, contract_id).await } + +// ============================================================================= +// Anonymous Task Cleanup Functions +// ============================================================================= + +/// Get anonymous tasks that are completed/failed and older than the specified threshold. +/// Used by the background cleanup job to find stale anonymous tasks. +pub async fn get_stale_anonymous_tasks( + pool: &PgPool, + older_than_days: i32, +) -> Result<Vec<Task>, sqlx::Error> { + sqlx::query_as::<_, Task>( + r#" + SELECT * FROM tasks + WHERE is_anonymous = TRUE + AND status IN ('done', 'failed', 'merged') + AND completed_at < NOW() - INTERVAL '1 day' * $1 + ORDER BY completed_at ASC + "#, + ) + .bind(older_than_days) + .fetch_all(pool) + .await +} + +/// Delete a task and its associated data (cascade delete). +/// Related records (task_events, conversation_snapshots, etc.) are automatically +/// deleted via ON DELETE CASCADE foreign key constraints. +pub async fn delete_task_cascade( + pool: &PgPool, + task_id: Uuid, +) -> Result<bool, sqlx::Error> { + let result = sqlx::query( + r#" + DELETE FROM tasks + WHERE id = $1 + "#, + ) + .bind(task_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// List anonymous tasks for a specific owner. +/// Used by the API endpoint to let users view their ephemeral tasks. +pub async fn list_anonymous_tasks_for_owner( + pool: &PgPool, + owner_id: Uuid, +) -> Result<Vec<TaskSummary>, sqlx::Error> { + sqlx::query_as::<_, TaskSummary>( + r#" + SELECT + t.id, t.contract_id, NULL as contract_name, NULL as contract_phase, + NULL as contract_status, + t.parent_task_id, t.depth, t.name, t.status, t.priority, + t.progress_summary, + (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count, + t.version, t.is_supervisor, t.is_anonymous, t.created_at, t.updated_at + FROM tasks t + WHERE t.owner_id = $1 AND t.is_anonymous = TRUE + ORDER BY t.created_at DESC + "#, + ) + .bind(owner_id) + .fetch_all(pool) + .await +} + +/// Count anonymous tasks that will be cleaned up. +/// Useful for monitoring and logging purposes. +pub async fn count_stale_anonymous_tasks( + pool: &PgPool, + older_than_days: i32, +) -> Result<i64, sqlx::Error> { + let result: (i64,) = sqlx::query_as( + r#" + SELECT COUNT(*) FROM tasks + WHERE is_anonymous = TRUE + AND status IN ('done', 'failed', 'merged') + AND completed_at < NOW() - INTERVAL '1 day' * $1 + "#, + ) + .bind(older_than_days) + .fetch_one(pool) + .await?; + + Ok(result.0) +} diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index 275dc3c..21416c1 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -3304,3 +3304,54 @@ pub async fn branch_from_checkpoint( ) .into_response() } + +// ============================================================================= +// Anonymous Task Handlers +// ============================================================================= + +/// List anonymous tasks for the current owner. +/// +/// Anonymous tasks are ephemeral tasks that don't belong to a contract. +/// They are automatically cleaned up after a period of inactivity (default: 7 days). +#[utoipa::path( + get, + path = "/api/v1/mesh/tasks/anonymous", + responses( + (status = 200, description = "List of anonymous tasks", body = TaskListResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn list_anonymous_tasks( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match repository::list_anonymous_tasks_for_owner(pool, auth.owner_id).await { + Ok(tasks) => { + let total = tasks.len() as i64; + Json(TaskListResponse { tasks, total }).into_response() + } + Err(e) => { + tracing::error!("Failed to list anonymous tasks: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index 7e31285..256082d 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -67,6 +67,7 @@ pub fn make_router(state: SharedState) -> Router { "/mesh/tasks", get(mesh::list_tasks).post(mesh::create_task), ) + .route("/mesh/tasks/anonymous", get(mesh::list_anonymous_tasks)) .route( "/mesh/tasks/{id}", get(mesh::get_task) @@ -241,13 +242,18 @@ const DAEMON_CLEANUP_INTERVAL_SECS: u64 = 60; /// Daemon heartbeat timeout in seconds (delete daemons older than this) const DAEMON_HEARTBEAT_TIMEOUT_SECS: i64 = 120; +/// Anonymous task cleanup interval in hours (check every 24 hours) +const ANONYMOUS_TASK_CLEANUP_INTERVAL_HOURS: u64 = 24; +/// Anonymous task age threshold in days (clean up tasks older than this) +const ANONYMOUS_TASK_MAX_AGE_DAYS: i32 = 7; + /// Run the HTTP server with graceful shutdown support. /// /// # Arguments /// * `state` - Shared application state containing ML models /// * `addr` - Address to bind to (e.g., "0.0.0.0:8080") pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> { - // Start background daemon cleanup task if database is available + // Start background cleanup tasks if database is available if let Some(pool) = state.db_pool.clone() { // Initial cleanup of any stale daemons from previous server run match crate::db::repository::delete_stale_daemons(&pool, 0).await { @@ -263,7 +269,8 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> { _ => {} } - // Spawn periodic cleanup task + // Spawn periodic daemon cleanup task + let daemon_pool = pool.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval( std::time::Duration::from_secs(DAEMON_CLEANUP_INTERVAL_SECS) @@ -271,7 +278,7 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> { loop { interval.tick().await; match crate::db::repository::delete_stale_daemons( - &pool, + &daemon_pool, DAEMON_HEARTBEAT_TIMEOUT_SECS, ).await { Ok(deleted) if deleted > 0 => { @@ -288,6 +295,54 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> { } } }); + + // Spawn periodic anonymous task cleanup job + let task_pool = pool.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval( + std::time::Duration::from_secs(ANONYMOUS_TASK_CLEANUP_INTERVAL_HOURS * 3600) + ); + loop { + interval.tick().await; + match crate::db::repository::get_stale_anonymous_tasks( + &task_pool, + ANONYMOUS_TASK_MAX_AGE_DAYS, + ).await { + Ok(tasks) => { + if !tasks.is_empty() { + tracing::info!( + count = tasks.len(), + max_age_days = ANONYMOUS_TASK_MAX_AGE_DAYS, + "Starting cleanup of stale anonymous tasks" + ); + } + for task in tasks { + tracing::info!( + task_id = %task.id, + task_name = %task.name, + "Cleaning up stale anonymous task" + ); + + // TODO: Also clean up worktree if it exists on daemon + // For now, we just delete the database records + if let Err(e) = crate::db::repository::delete_task_cascade( + &task_pool, + task.id, + ).await { + tracing::error!( + task_id = %task.id, + error = %e, + "Failed to delete anonymous task" + ); + } + } + } + Err(e) => { + tracing::error!(error = %e, "Failed to query stale anonymous tasks"); + } + } + } + }); } let app = make_router(state); |
