diff options
Diffstat (limited to 'makima/src/server')
| -rw-r--r-- | makima/src/server/handlers/mesh.rs | 51 | ||||
| -rw-r--r-- | makima/src/server/mod.rs | 61 |
2 files changed, 109 insertions, 3 deletions
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index 275dc3c..21416c1 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -3304,3 +3304,54 @@ pub async fn branch_from_checkpoint( ) .into_response() } + +// ============================================================================= +// Anonymous Task Handlers +// ============================================================================= + +/// List anonymous tasks for the current owner. +/// +/// Anonymous tasks are ephemeral tasks that don't belong to a contract. +/// They are automatically cleaned up after a period of inactivity (default: 7 days). +#[utoipa::path( + get, + path = "/api/v1/mesh/tasks/anonymous", + responses( + (status = 200, description = "List of anonymous tasks", body = TaskListResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn list_anonymous_tasks( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match repository::list_anonymous_tasks_for_owner(pool, auth.owner_id).await { + Ok(tasks) => { + let total = tasks.len() as i64; + Json(TaskListResponse { tasks, total }).into_response() + } + Err(e) => { + tracing::error!("Failed to list anonymous tasks: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index 7e31285..256082d 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -67,6 +67,7 @@ pub fn make_router(state: SharedState) -> Router { "/mesh/tasks", get(mesh::list_tasks).post(mesh::create_task), ) + .route("/mesh/tasks/anonymous", get(mesh::list_anonymous_tasks)) .route( "/mesh/tasks/{id}", get(mesh::get_task) @@ -241,13 +242,18 @@ const DAEMON_CLEANUP_INTERVAL_SECS: u64 = 60; /// Daemon heartbeat timeout in seconds (delete daemons older than this) const DAEMON_HEARTBEAT_TIMEOUT_SECS: i64 = 120; +/// Anonymous task cleanup interval in hours (check every 24 hours) +const ANONYMOUS_TASK_CLEANUP_INTERVAL_HOURS: u64 = 24; +/// Anonymous task age threshold in days (clean up tasks older than this) +const ANONYMOUS_TASK_MAX_AGE_DAYS: i32 = 7; + /// Run the HTTP server with graceful shutdown support. /// /// # Arguments /// * `state` - Shared application state containing ML models /// * `addr` - Address to bind to (e.g., "0.0.0.0:8080") pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> { - // Start background daemon cleanup task if database is available + // Start background cleanup tasks if database is available if let Some(pool) = state.db_pool.clone() { // Initial cleanup of any stale daemons from previous server run match crate::db::repository::delete_stale_daemons(&pool, 0).await { @@ -263,7 +269,8 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> { _ => {} } - // Spawn periodic cleanup task + // Spawn periodic daemon cleanup task + let daemon_pool = pool.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval( std::time::Duration::from_secs(DAEMON_CLEANUP_INTERVAL_SECS) @@ -271,7 +278,7 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> { loop { interval.tick().await; match crate::db::repository::delete_stale_daemons( - &pool, + &daemon_pool, DAEMON_HEARTBEAT_TIMEOUT_SECS, ).await { Ok(deleted) if deleted > 0 => { @@ -288,6 +295,54 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> { } } }); + + // Spawn periodic anonymous task cleanup job + let task_pool = pool.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval( + std::time::Duration::from_secs(ANONYMOUS_TASK_CLEANUP_INTERVAL_HOURS * 3600) + ); + loop { + interval.tick().await; + match crate::db::repository::get_stale_anonymous_tasks( + &task_pool, + ANONYMOUS_TASK_MAX_AGE_DAYS, + ).await { + Ok(tasks) => { + if !tasks.is_empty() { + tracing::info!( + count = tasks.len(), + max_age_days = ANONYMOUS_TASK_MAX_AGE_DAYS, + "Starting cleanup of stale anonymous tasks" + ); + } + for task in tasks { + tracing::info!( + task_id = %task.id, + task_name = %task.name, + "Cleaning up stale anonymous task" + ); + + // TODO: Also clean up worktree if it exists on daemon + // For now, we just delete the database records + if let Err(e) = crate::db::repository::delete_task_cascade( + &task_pool, + task.id, + ).await { + tracing::error!( + task_id = %task.id, + error = %e, + "Failed to delete anonymous task" + ); + } + } + } + Err(e) => { + tracing::error!(error = %e, "Failed to query stale anonymous tasks"); + } + } + } + }); } let app = make_router(state); |
