diff options
| author | soryu <soryu@soryu.co> | 2026-01-23 23:52:35 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-23 23:52:35 +0000 |
| commit | 579c983d3efb8f1414ffb45b9e031f741cce5f76 (patch) | |
| tree | 1a0060f19a4f4eea8fb9cff9eb52a46cedcdc152 /makima/src/server/mod.rs | |
| parent | f6f0790217d4098ffb6d2b3df08b0cf83ff61727 (diff) | |
| download | soryu-579c983d3efb8f1414ffb45b9e031f741cce5f76.tar.gz soryu-579c983d3efb8f1414ffb45b9e031f741cce5f76.zip | |
Add resume to daemon tasks
Diffstat (limited to 'makima/src/server/mod.rs')
| -rw-r--r-- | makima/src/server/mod.rs | 61 |
1 files changed, 61 insertions, 0 deletions
diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index 3a27513..de20569 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -251,6 +251,9 @@ const ANONYMOUS_TASK_MAX_AGE_DAYS: i32 = 7; /// Interval for checkpoint patch cleanup (hourly) const CHECKPOINT_PATCH_CLEANUP_INTERVAL_SECS: u64 = 3600; +// Retry orchestrator checks for pending tasks every 30 seconds +const RETRY_ORCHESTRATOR_INTERVAL_SECS: u64 = 30; + /// Run the HTTP server with graceful shutdown support. /// /// # Arguments @@ -387,6 +390,64 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> { } } }); + + // Clone state and pool for retry orchestrator + let retry_pool = pool.clone(); + let retry_state = state.clone(); + + // Spawn retry orchestrator - periodically retries pending tasks on available daemons + tokio::spawn(async move { + let mut interval = tokio::time::interval( + std::time::Duration::from_secs(RETRY_ORCHESTRATOR_INTERVAL_SECS) + ); + loop { + interval.tick().await; + + // Get all contracts with pending tasks awaiting retry + match crate::db::repository::get_all_pending_task_contracts(&retry_pool).await { + Ok(contract_owners) => { + for (contract_id, owner_id) in contract_owners { + // Try to start a pending task for this contract + match handlers::mesh_supervisor::try_start_pending_task( + &retry_state, + contract_id, + owner_id, + ).await { + Ok(Some(task)) => { + tracing::info!( + task_id = %task.id, + contract_id = %contract_id, + retry_count = task.retry_count, + "Retry orchestrator started pending task" + ); + } + Ok(None) => { + // No tasks could be started (no available daemons, etc.) + } + Err(e) => { + tracing::warn!( + contract_id = %contract_id, + error = %e, + "Retry orchestrator failed to start pending task" + ); + } + } + } + } + Err(e) => { + tracing::warn!( + error = %e, + "Retry orchestrator failed to query pending task contracts" + ); + } + } + } + }); + + tracing::info!( + "Retry orchestrator started (interval: {}s)", + RETRY_ORCHESTRATOR_INTERVAL_SECS + ); } let app = make_router(state); |
