summaryrefslogtreecommitdiff
path: root/makima/src/server/mod.rs
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-23 23:52:35 +0000
committersoryu <soryu@soryu.co>2026-01-23 23:52:35 +0000
commit579c983d3efb8f1414ffb45b9e031f741cce5f76 (patch)
tree1a0060f19a4f4eea8fb9cff9eb52a46cedcdc152 /makima/src/server/mod.rs
parentf6f0790217d4098ffb6d2b3df08b0cf83ff61727 (diff)
downloadsoryu-579c983d3efb8f1414ffb45b9e031f741cce5f76.tar.gz
soryu-579c983d3efb8f1414ffb45b9e031f741cce5f76.zip
Add resume to daemon tasks
Diffstat (limited to 'makima/src/server/mod.rs')
-rw-r--r--makima/src/server/mod.rs61
1 files changed, 61 insertions, 0 deletions
diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs
index 3a27513..de20569 100644
--- a/makima/src/server/mod.rs
+++ b/makima/src/server/mod.rs
@@ -251,6 +251,9 @@ const ANONYMOUS_TASK_MAX_AGE_DAYS: i32 = 7;
/// Interval for checkpoint patch cleanup (hourly)
const CHECKPOINT_PATCH_CLEANUP_INTERVAL_SECS: u64 = 3600;
+// Retry orchestrator checks for pending tasks every 30 seconds
+const RETRY_ORCHESTRATOR_INTERVAL_SECS: u64 = 30;
+
/// Run the HTTP server with graceful shutdown support.
///
/// # Arguments
@@ -387,6 +390,64 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> {
}
}
});
+
+ // Clone state and pool for retry orchestrator
+ let retry_pool = pool.clone();
+ let retry_state = state.clone();
+
+ // Spawn retry orchestrator - periodically retries pending tasks on available daemons
+ tokio::spawn(async move {
+ let mut interval = tokio::time::interval(
+ std::time::Duration::from_secs(RETRY_ORCHESTRATOR_INTERVAL_SECS)
+ );
+ loop {
+ interval.tick().await;
+
+ // Get all contracts with pending tasks awaiting retry
+ match crate::db::repository::get_all_pending_task_contracts(&retry_pool).await {
+ Ok(contract_owners) => {
+ for (contract_id, owner_id) in contract_owners {
+ // Try to start a pending task for this contract
+ match handlers::mesh_supervisor::try_start_pending_task(
+ &retry_state,
+ contract_id,
+ owner_id,
+ ).await {
+ Ok(Some(task)) => {
+ tracing::info!(
+ task_id = %task.id,
+ contract_id = %contract_id,
+ retry_count = task.retry_count,
+ "Retry orchestrator started pending task"
+ );
+ }
+ Ok(None) => {
+ // No tasks could be started (no available daemons, etc.)
+ }
+ Err(e) => {
+ tracing::warn!(
+ contract_id = %contract_id,
+ error = %e,
+ "Retry orchestrator failed to start pending task"
+ );
+ }
+ }
+ }
+ }
+ Err(e) => {
+ tracing::warn!(
+ error = %e,
+ "Retry orchestrator failed to query pending task contracts"
+ );
+ }
+ }
+ }
+ });
+
+ tracing::info!(
+ "Retry orchestrator started (interval: {}s)",
+ RETRY_ORCHESTRATOR_INTERVAL_SECS
+ );
}
let app = make_router(state);