diff options
| author | soryu <soryu@soryu.co> | 2026-01-23 23:52:35 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-23 23:52:35 +0000 |
| commit | 579c983d3efb8f1414ffb45b9e031f741cce5f76 (patch) | |
| tree | 1a0060f19a4f4eea8fb9cff9eb52a46cedcdc152 /makima/src/server | |
| parent | f6f0790217d4098ffb6d2b3df08b0cf83ff61727 (diff) | |
| download | soryu-579c983d3efb8f1414ffb45b9e031f741cce5f76.tar.gz soryu-579c983d3efb8f1414ffb45b9e031f741cce5f76.zip | |
Add resume to daemon tasks
Diffstat (limited to 'makima/src/server')
| -rw-r--r-- | makima/src/server/handlers/mesh.rs | 29 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 117 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_supervisor.rs | 39 | ||||
| -rw-r--r-- | makima/src/server/mod.rs | 61 |
4 files changed, 239 insertions, 7 deletions
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index 240e1f7..3d05f35 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -6,6 +6,7 @@ use axum::{ response::IntoResponse, Json, }; +use base64::Engine; use uuid::Uuid; use crate::db::models::{ @@ -2265,6 +2266,30 @@ pub async fn reassign_task( } }; + // Fetch latest checkpoint patch for worktree recovery during reassignment + let (patch_data, patch_base_sha) = match repository::get_latest_checkpoint_patch(pool, id).await { + Ok(Some(patch)) => { + tracing::info!( + old_task_id = %id, + new_task_id = %new_task.id, + patch_size = patch.patch_size_bytes, + base_sha = %patch.base_commit_sha, + files_count = patch.files_count, + "Including checkpoint patch for task reassignment recovery" + ); + let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data); + (Some(encoded), Some(patch.base_commit_sha)) + } + Ok(None) => { + tracing::debug!(old_task_id = %id, "No checkpoint patch found for reassignment"); + (None, None) + } + Err(e) => { + tracing::warn!(old_task_id = %id, error = %e, "Failed to fetch checkpoint patch for reassignment"); + (None, None) + } + }; + // Send SpawnTask command to daemon for the new task let command = DaemonCommand::SpawnTask { task_id: new_task.id, @@ -2285,8 +2310,8 @@ pub async fn reassign_task( autonomous_loop: false, resume_session: false, conversation_history: None, - patch_data: None, - patch_base_sha: None, + patch_data, + patch_base_sha, }; tracing::info!( diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 65db373..53ee806 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -291,6 +291,19 @@ pub enum DaemonMessage { success: bool, error: Option<String>, }, + /// Task recovery detected after daemon restart + TaskRecoveryDetected { + #[serde(rename = "taskId")] + task_id: Uuid, + #[serde(rename = "previousState")] + previous_state: String, + #[serde(rename = "worktreeIntact")] + worktree_intact: bool, + #[serde(rename = "worktreePath")] + worktree_path: Option<String>, + #[serde(rename = "needsPatch")] + needs_patch: bool, + }, /// Register a tool key for orchestrator API access RegisterToolKey { #[serde(rename = "taskId")] @@ -990,6 +1003,110 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re }); } } + Ok(DaemonMessage::TaskRecoveryDetected { + task_id, + previous_state, + worktree_intact, + worktree_path, + needs_patch, + }) => { + tracing::info!( + task_id = %task_id, + previous_state = %previous_state, + worktree_intact = worktree_intact, + worktree_path = ?worktree_path, + needs_patch = needs_patch, + "Task recovery detected after daemon restart" + ); + + // Update task in database based on recovery state + if let Some(ref pool) = state.db_pool { + let pool = pool.clone(); + let state = state.clone(); + tokio::spawn(async move { + if worktree_intact { + // Worktree exists - task can be resumed on this daemon + // Update task status to 'pending' so it can be picked up + match sqlx::query( + r#" + UPDATE tasks + SET status = 'pending', + daemon_id = NULL, + error_message = 'Daemon restarted - task ready for resumption', + interrupted_at = NOW(), + updated_at = NOW() + WHERE id = $1 AND owner_id = $2 + RETURNING id + "#, + ) + .bind(task_id) + .bind(owner_id) + .fetch_optional(&pool) + .await + { + Ok(Some(_)) => { + tracing::info!( + task_id = %task_id, + "Task marked as pending for resumption" + ); + state.broadcast_task_update(TaskUpdateNotification { + task_id, + owner_id: Some(owner_id), + version: 0, + status: "pending".into(), + updated_fields: vec![ + "status".into(), + "daemon_id".into(), + "interrupted_at".into(), + ], + updated_by: "daemon_recovery".into(), + }); + } + Ok(None) => { + tracing::warn!( + task_id = %task_id, + "Task not found during recovery update" + ); + } + Err(e) => { + tracing::error!( + task_id = %task_id, + error = %e, + "Failed to update task during recovery" + ); + } + } + } else { + // Worktree missing - mark for retry with patch restoration + match repository::mark_task_for_retry( + &pool, + task_id, + daemon_uuid, // Mark this daemon as failed + ).await { + Ok(Some(_)) => { + tracing::info!( + task_id = %task_id, + "Task marked for retry (worktree missing)" + ); + } + Ok(None) => { + tracing::warn!( + task_id = %task_id, + "Task not found or exceeded retries" + ); + } + Err(e) => { + tracing::error!( + task_id = %task_id, + error = %e, + "Failed to mark task for retry" + ); + } + } + } + }); + } + } Ok(DaemonMessage::Authenticate { .. }) => { // Already authenticated, ignore } diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index 21c9515..1b5e376 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -279,8 +279,9 @@ async fn verify_supervisor_auth( /// Try to start a pending task on an available daemon. /// Returns Ok(Some(task)) if a task was started, Ok(None) if no tasks could be started. -/// For retried tasks, excludes daemons that previously failed the task. -async fn try_start_pending_task( +/// For retried tasks, excludes daemons that previously failed the task and includes +/// checkpoint patch data for worktree recovery. +pub async fn try_start_pending_task( state: &SharedState, contract_id: Uuid, owner_id: Uuid, @@ -348,6 +349,34 @@ async fn try_start_pending_task( } }; + // For retried tasks, fetch checkpoint patch for worktree recovery + let (patch_data, patch_base_sha) = if task.retry_count > 0 { + // This is a retry - try to restore from checkpoint + match repository::get_latest_checkpoint_patch(pool, task.id).await { + Ok(Some(patch)) => { + tracing::info!( + task_id = %task.id, + retry_count = task.retry_count, + patch_size = patch.patch_size_bytes, + base_sha = %patch.base_commit_sha, + "Including checkpoint patch for task retry recovery" + ); + let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data); + (Some(encoded), Some(patch.base_commit_sha)) + } + Ok(None) => { + tracing::debug!(task_id = %task.id, "No checkpoint patch found for retry"); + (None, None) + } + Err(e) => { + tracing::warn!(task_id = %task.id, error = %e, "Failed to fetch checkpoint patch for retry"); + (None, None) + } + } + } else { + (None, None) + }; + // Send spawn command let cmd = DaemonCommand::SpawnTask { task_id: updated_task.id, @@ -366,10 +395,10 @@ async fn try_start_pending_task( contract_id: updated_task.contract_id, is_supervisor: false, autonomous_loop: false, - resume_session: false, + resume_session: task.retry_count > 0, // Use --continue for retried tasks conversation_history: None, - patch_data: None, - patch_base_sha: None, + patch_data, + patch_base_sha, }; if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index 3a27513..de20569 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -251,6 +251,9 @@ const ANONYMOUS_TASK_MAX_AGE_DAYS: i32 = 7; /// Interval for checkpoint patch cleanup (hourly) const CHECKPOINT_PATCH_CLEANUP_INTERVAL_SECS: u64 = 3600; +// Retry orchestrator checks for pending tasks every 30 seconds +const RETRY_ORCHESTRATOR_INTERVAL_SECS: u64 = 30; + /// Run the HTTP server with graceful shutdown support. /// /// # Arguments @@ -387,6 +390,64 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> { } } }); + + // Clone state and pool for retry orchestrator + let retry_pool = pool.clone(); + let retry_state = state.clone(); + + // Spawn retry orchestrator - periodically retries pending tasks on available daemons + tokio::spawn(async move { + let mut interval = tokio::time::interval( + std::time::Duration::from_secs(RETRY_ORCHESTRATOR_INTERVAL_SECS) + ); + loop { + interval.tick().await; + + // Get all contracts with pending tasks awaiting retry + match crate::db::repository::get_all_pending_task_contracts(&retry_pool).await { + Ok(contract_owners) => { + for (contract_id, owner_id) in contract_owners { + // Try to start a pending task for this contract + match handlers::mesh_supervisor::try_start_pending_task( + &retry_state, + contract_id, + owner_id, + ).await { + Ok(Some(task)) => { + tracing::info!( + task_id = %task.id, + contract_id = %contract_id, + retry_count = task.retry_count, + "Retry orchestrator started pending task" + ); + } + Ok(None) => { + // No tasks could be started (no available daemons, etc.) + } + Err(e) => { + tracing::warn!( + contract_id = %contract_id, + error = %e, + "Retry orchestrator failed to start pending task" + ); + } + } + } + } + Err(e) => { + tracing::warn!( + error = %e, + "Retry orchestrator failed to query pending task contracts" + ); + } + } + } + }); + + tracing::info!( + "Retry orchestrator started (interval: {}s)", + RETRY_ORCHESTRATOR_INTERVAL_SECS + ); } let app = make_router(state); |
