From 579c983d3efb8f1414ffb45b9e031f741cce5f76 Mon Sep 17 00:00:00 2001 From: soryu Date: Fri, 23 Jan 2026 23:52:35 +0000 Subject: Add resume to daemon tasks --- makima/src/server/handlers/mesh_daemon.rs | 117 ++++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) (limited to 'makima/src/server/handlers/mesh_daemon.rs') diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 65db373..53ee806 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -291,6 +291,19 @@ pub enum DaemonMessage { success: bool, error: Option, }, + /// Task recovery detected after daemon restart + TaskRecoveryDetected { + #[serde(rename = "taskId")] + task_id: Uuid, + #[serde(rename = "previousState")] + previous_state: String, + #[serde(rename = "worktreeIntact")] + worktree_intact: bool, + #[serde(rename = "worktreePath")] + worktree_path: Option, + #[serde(rename = "needsPatch")] + needs_patch: bool, + }, /// Register a tool key for orchestrator API access RegisterToolKey { #[serde(rename = "taskId")] @@ -990,6 +1003,110 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re }); } } + Ok(DaemonMessage::TaskRecoveryDetected { + task_id, + previous_state, + worktree_intact, + worktree_path, + needs_patch, + }) => { + tracing::info!( + task_id = %task_id, + previous_state = %previous_state, + worktree_intact = worktree_intact, + worktree_path = ?worktree_path, + needs_patch = needs_patch, + "Task recovery detected after daemon restart" + ); + + // Update task in database based on recovery state + if let Some(ref pool) = state.db_pool { + let pool = pool.clone(); + let state = state.clone(); + tokio::spawn(async move { + if worktree_intact { + // Worktree exists - task can be resumed on this daemon + // Update task status to 'pending' so it can be picked up + match sqlx::query( + r#" + UPDATE tasks + SET status = 'pending', + daemon_id = NULL, + error_message = 'Daemon restarted - task ready for resumption', + interrupted_at = NOW(), + updated_at = NOW() + WHERE id = $1 AND owner_id = $2 + RETURNING id + "#, + ) + .bind(task_id) + .bind(owner_id) + .fetch_optional(&pool) + .await + { + Ok(Some(_)) => { + tracing::info!( + task_id = %task_id, + "Task marked as pending for resumption" + ); + state.broadcast_task_update(TaskUpdateNotification { + task_id, + owner_id: Some(owner_id), + version: 0, + status: "pending".into(), + updated_fields: vec![ + "status".into(), + "daemon_id".into(), + "interrupted_at".into(), + ], + updated_by: "daemon_recovery".into(), + }); + } + Ok(None) => { + tracing::warn!( + task_id = %task_id, + "Task not found during recovery update" + ); + } + Err(e) => { + tracing::error!( + task_id = %task_id, + error = %e, + "Failed to update task during recovery" + ); + } + } + } else { + // Worktree missing - mark for retry with patch restoration + match repository::mark_task_for_retry( + &pool, + task_id, + daemon_uuid, // Mark this daemon as failed + ).await { + Ok(Some(_)) => { + tracing::info!( + task_id = %task_id, + "Task marked for retry (worktree missing)" + ); + } + Ok(None) => { + tracing::warn!( + task_id = %task_id, + "Task not found or exceeded retries" + ); + } + Err(e) => { + tracing::error!( + task_id = %task_id, + error = %e, + "Failed to mark task for retry" + ); + } + } + } + }); + } + } Ok(DaemonMessage::Authenticate { .. }) => { // Already authenticated, ignore } -- cgit v1.2.3