summaryrefslogtreecommitdiff
path: root/makima/src
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src')
-rw-r--r--makima/src/db/models.rs10
-rw-r--r--makima/src/db/repository.rs104
-rw-r--r--makima/src/server/handlers/mesh.rs51
-rw-r--r--makima/src/server/mod.rs61
4 files changed, 216 insertions, 10 deletions
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs
index 291fad7..1eaaf58 100644
--- a/makima/src/db/models.rs
+++ b/makima/src/db/models.rs
@@ -441,6 +441,12 @@ pub struct Task {
#[serde(default)]
pub is_supervisor: bool,
+ // Anonymous task flag
+ /// True for ephemeral one-off tasks that don't belong to a contract.
+ /// Anonymous tasks are automatically cleaned up after a period of inactivity.
+ #[serde(default)]
+ pub is_anonymous: bool,
+
// Daemon/container info
pub daemon_id: Option<Uuid>,
pub container_id: Option<String>,
@@ -558,6 +564,9 @@ pub struct TaskSummary {
/// True for contract supervisor tasks
#[serde(default)]
pub is_supervisor: bool,
+ /// True for ephemeral anonymous tasks
+ #[serde(default)]
+ pub is_anonymous: bool,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
@@ -580,6 +589,7 @@ impl From<Task> for TaskSummary {
subtask_count: 0, // Would need separate query
version: task.version,
is_supervisor: task.is_supervisor,
+ is_anonymous: task.is_anonymous,
created_at: task.created_at,
updated_at: task.updated_at,
}
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index 536bc9b..d7f2e69 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -739,7 +739,7 @@ pub async fn list_tasks(pool: &PgPool) -> Result<Vec<TaskSummary>, sqlx::Error>
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, t.created_at, t.updated_at
+ t.version, t.is_supervisor, t.is_anonymous, t.created_at, t.updated_at
FROM tasks t
LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id IS NULL
@@ -760,7 +760,7 @@ pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSum
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, t.created_at, t.updated_at
+ t.version, t.is_supervisor, t.is_anonymous, t.created_at, t.updated_at
FROM tasks t
LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id = $1
@@ -1135,7 +1135,7 @@ pub async fn list_tasks_for_owner(
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, t.created_at, t.updated_at
+ t.version, t.is_supervisor, t.is_anonymous, t.created_at, t.updated_at
FROM tasks t
LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.owner_id = $1 AND t.parent_task_id IS NULL
@@ -1161,7 +1161,7 @@ pub async fn list_subtasks_for_owner(
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, t.created_at, t.updated_at
+ t.version, t.is_supervisor, t.is_anonymous, t.created_at, t.updated_at
FROM tasks t
LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.owner_id = $1 AND t.parent_task_id = $2
@@ -1679,7 +1679,7 @@ pub async fn list_sibling_tasks(
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, t.created_at, t.updated_at
+ t.version, t.is_supervisor, t.is_anonymous, t.created_at, t.updated_at
FROM tasks t
LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id = $1 AND t.id != $2
@@ -1701,7 +1701,7 @@ pub async fn list_sibling_tasks(
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, t.created_at, t.updated_at
+ t.version, t.is_supervisor, t.is_anonymous, t.created_at, t.updated_at
FROM tasks t
LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.parent_task_id IS NULL AND t.id != $1
@@ -2679,7 +2679,7 @@ pub async fn list_tasks_in_contract(
t.parent_task_id, t.depth, t.name, t.status, t.priority,
t.progress_summary,
(SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
- t.version, t.is_supervisor, t.created_at, t.updated_at
+ t.version, t.is_supervisor, t.is_anonymous, t.created_at, t.updated_at
FROM tasks t
LEFT JOIN contracts c ON t.contract_id = c.id
WHERE t.contract_id = $1 AND t.owner_id = $2
@@ -3678,3 +3678,93 @@ pub async fn get_supervisor_conversation_full(
) -> Result<Option<SupervisorState>, sqlx::Error> {
get_supervisor_state(pool, contract_id).await
}
+
+// =============================================================================
+// Anonymous Task Cleanup Functions
+// =============================================================================
+
+/// Get anonymous tasks that are completed/failed and older than the specified threshold.
+/// Used by the background cleanup job to find stale anonymous tasks.
+pub async fn get_stale_anonymous_tasks(
+ pool: &PgPool,
+ older_than_days: i32,
+) -> Result<Vec<Task>, sqlx::Error> {
+ sqlx::query_as::<_, Task>(
+ r#"
+ SELECT * FROM tasks
+ WHERE is_anonymous = TRUE
+ AND status IN ('done', 'failed', 'merged')
+ AND completed_at < NOW() - INTERVAL '1 day' * $1
+ ORDER BY completed_at ASC
+ "#,
+ )
+ .bind(older_than_days)
+ .fetch_all(pool)
+ .await
+}
+
+/// Delete a task and its associated data (cascade delete).
+/// Related records (task_events, conversation_snapshots, etc.) are automatically
+/// deleted via ON DELETE CASCADE foreign key constraints.
+pub async fn delete_task_cascade(
+ pool: &PgPool,
+ task_id: Uuid,
+) -> Result<bool, sqlx::Error> {
+ let result = sqlx::query(
+ r#"
+ DELETE FROM tasks
+ WHERE id = $1
+ "#,
+ )
+ .bind(task_id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// List anonymous tasks for a specific owner.
+/// Used by the API endpoint to let users view their ephemeral tasks.
+pub async fn list_anonymous_tasks_for_owner(
+ pool: &PgPool,
+ owner_id: Uuid,
+) -> Result<Vec<TaskSummary>, sqlx::Error> {
+ sqlx::query_as::<_, TaskSummary>(
+ r#"
+ SELECT
+ t.id, t.contract_id, NULL as contract_name, NULL as contract_phase,
+ NULL as contract_status,
+ t.parent_task_id, t.depth, t.name, t.status, t.priority,
+ t.progress_summary,
+ (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
+ t.version, t.is_supervisor, t.is_anonymous, t.created_at, t.updated_at
+ FROM tasks t
+ WHERE t.owner_id = $1 AND t.is_anonymous = TRUE
+ ORDER BY t.created_at DESC
+ "#,
+ )
+ .bind(owner_id)
+ .fetch_all(pool)
+ .await
+}
+
+/// Count anonymous tasks that will be cleaned up.
+/// Useful for monitoring and logging purposes.
+pub async fn count_stale_anonymous_tasks(
+ pool: &PgPool,
+ older_than_days: i32,
+) -> Result<i64, sqlx::Error> {
+ let result: (i64,) = sqlx::query_as(
+ r#"
+ SELECT COUNT(*) FROM tasks
+ WHERE is_anonymous = TRUE
+ AND status IN ('done', 'failed', 'merged')
+ AND completed_at < NOW() - INTERVAL '1 day' * $1
+ "#,
+ )
+ .bind(older_than_days)
+ .fetch_one(pool)
+ .await?;
+
+ Ok(result.0)
+}
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs
index 275dc3c..21416c1 100644
--- a/makima/src/server/handlers/mesh.rs
+++ b/makima/src/server/handlers/mesh.rs
@@ -3304,3 +3304,54 @@ pub async fn branch_from_checkpoint(
)
.into_response()
}
+
+// =============================================================================
+// Anonymous Task Handlers
+// =============================================================================
+
+/// List anonymous tasks for the current owner.
+///
+/// Anonymous tasks are ephemeral tasks that don't belong to a contract.
+/// They are automatically cleaned up after a period of inactivity (default: 7 days).
+#[utoipa::path(
+ get,
+ path = "/api/v1/mesh/tasks/anonymous",
+ responses(
+ (status = 200, description = "List of anonymous tasks", body = TaskListResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn list_anonymous_tasks(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ match repository::list_anonymous_tasks_for_owner(pool, auth.owner_id).await {
+ Ok(tasks) => {
+ let total = tasks.len() as i64;
+ Json(TaskListResponse { tasks, total }).into_response()
+ }
+ Err(e) => {
+ tracing::error!("Failed to list anonymous tasks: {}", e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ }
+}
diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs
index 7e31285..256082d 100644
--- a/makima/src/server/mod.rs
+++ b/makima/src/server/mod.rs
@@ -67,6 +67,7 @@ pub fn make_router(state: SharedState) -> Router {
"/mesh/tasks",
get(mesh::list_tasks).post(mesh::create_task),
)
+ .route("/mesh/tasks/anonymous", get(mesh::list_anonymous_tasks))
.route(
"/mesh/tasks/{id}",
get(mesh::get_task)
@@ -241,13 +242,18 @@ const DAEMON_CLEANUP_INTERVAL_SECS: u64 = 60;
/// Daemon heartbeat timeout in seconds (delete daemons older than this)
const DAEMON_HEARTBEAT_TIMEOUT_SECS: i64 = 120;
+/// Anonymous task cleanup interval in hours (check every 24 hours)
+const ANONYMOUS_TASK_CLEANUP_INTERVAL_HOURS: u64 = 24;
+/// Anonymous task age threshold in days (clean up tasks older than this)
+const ANONYMOUS_TASK_MAX_AGE_DAYS: i32 = 7;
+
/// Run the HTTP server with graceful shutdown support.
///
/// # Arguments
/// * `state` - Shared application state containing ML models
/// * `addr` - Address to bind to (e.g., "0.0.0.0:8080")
pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> {
- // Start background daemon cleanup task if database is available
+ // Start background cleanup tasks if database is available
if let Some(pool) = state.db_pool.clone() {
// Initial cleanup of any stale daemons from previous server run
match crate::db::repository::delete_stale_daemons(&pool, 0).await {
@@ -263,7 +269,8 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> {
_ => {}
}
- // Spawn periodic cleanup task
+ // Spawn periodic daemon cleanup task
+ let daemon_pool = pool.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(
std::time::Duration::from_secs(DAEMON_CLEANUP_INTERVAL_SECS)
@@ -271,7 +278,7 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> {
loop {
interval.tick().await;
match crate::db::repository::delete_stale_daemons(
- &pool,
+ &daemon_pool,
DAEMON_HEARTBEAT_TIMEOUT_SECS,
).await {
Ok(deleted) if deleted > 0 => {
@@ -288,6 +295,54 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> {
}
}
});
+
+ // Spawn periodic anonymous task cleanup job
+ let task_pool = pool.clone();
+ tokio::spawn(async move {
+ let mut interval = tokio::time::interval(
+ std::time::Duration::from_secs(ANONYMOUS_TASK_CLEANUP_INTERVAL_HOURS * 3600)
+ );
+ loop {
+ interval.tick().await;
+ match crate::db::repository::get_stale_anonymous_tasks(
+ &task_pool,
+ ANONYMOUS_TASK_MAX_AGE_DAYS,
+ ).await {
+ Ok(tasks) => {
+ if !tasks.is_empty() {
+ tracing::info!(
+ count = tasks.len(),
+ max_age_days = ANONYMOUS_TASK_MAX_AGE_DAYS,
+ "Starting cleanup of stale anonymous tasks"
+ );
+ }
+ for task in tasks {
+ tracing::info!(
+ task_id = %task.id,
+ task_name = %task.name,
+ "Cleaning up stale anonymous task"
+ );
+
+ // TODO: Also clean up worktree if it exists on daemon
+ // For now, we just delete the database records
+ if let Err(e) = crate::db::repository::delete_task_cascade(
+ &task_pool,
+ task.id,
+ ).await {
+ tracing::error!(
+ task_id = %task.id,
+ error = %e,
+ "Failed to delete anonymous task"
+ );
+ }
+ }
+ }
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to query stale anonymous tasks");
+ }
+ }
+ }
+ });
}
let app = make_router(state);