summaryrefslogtreecommitdiff
path: root/makima/src/server/handlers/mesh_daemon.rs
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-23 23:52:35 +0000
committersoryu <soryu@soryu.co>2026-01-23 23:52:35 +0000
commit579c983d3efb8f1414ffb45b9e031f741cce5f76 (patch)
tree1a0060f19a4f4eea8fb9cff9eb52a46cedcdc152 /makima/src/server/handlers/mesh_daemon.rs
parentf6f0790217d4098ffb6d2b3df08b0cf83ff61727 (diff)
downloadsoryu-579c983d3efb8f1414ffb45b9e031f741cce5f76.tar.gz
soryu-579c983d3efb8f1414ffb45b9e031f741cce5f76.zip
Add resume to daemon tasks
Diffstat (limited to 'makima/src/server/handlers/mesh_daemon.rs')
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs117
1 files changed, 117 insertions, 0 deletions
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index 65db373..53ee806 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -291,6 +291,19 @@ pub enum DaemonMessage {
success: bool,
error: Option<String>,
},
+ /// Task recovery detected after daemon restart
+ TaskRecoveryDetected {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ #[serde(rename = "previousState")]
+ previous_state: String,
+ #[serde(rename = "worktreeIntact")]
+ worktree_intact: bool,
+ #[serde(rename = "worktreePath")]
+ worktree_path: Option<String>,
+ #[serde(rename = "needsPatch")]
+ needs_patch: bool,
+ },
/// Register a tool key for orchestrator API access
RegisterToolKey {
#[serde(rename = "taskId")]
@@ -990,6 +1003,110 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
});
}
}
+ Ok(DaemonMessage::TaskRecoveryDetected {
+ task_id,
+ previous_state,
+ worktree_intact,
+ worktree_path,
+ needs_patch,
+ }) => {
+ tracing::info!(
+ task_id = %task_id,
+ previous_state = %previous_state,
+ worktree_intact = worktree_intact,
+ worktree_path = ?worktree_path,
+ needs_patch = needs_patch,
+ "Task recovery detected after daemon restart"
+ );
+
+ // Update task in database based on recovery state
+ if let Some(ref pool) = state.db_pool {
+ let pool = pool.clone();
+ let state = state.clone();
+ tokio::spawn(async move {
+ if worktree_intact {
+ // Worktree exists - task can be resumed on this daemon
+ // Update task status to 'pending' so it can be picked up
+ match sqlx::query(
+ r#"
+ UPDATE tasks
+ SET status = 'pending',
+ daemon_id = NULL,
+ error_message = 'Daemon restarted - task ready for resumption',
+ interrupted_at = NOW(),
+ updated_at = NOW()
+ WHERE id = $1 AND owner_id = $2
+ RETURNING id
+ "#,
+ )
+ .bind(task_id)
+ .bind(owner_id)
+ .fetch_optional(&pool)
+ .await
+ {
+ Ok(Some(_)) => {
+ tracing::info!(
+ task_id = %task_id,
+ "Task marked as pending for resumption"
+ );
+ state.broadcast_task_update(TaskUpdateNotification {
+ task_id,
+ owner_id: Some(owner_id),
+ version: 0,
+ status: "pending".into(),
+ updated_fields: vec![
+ "status".into(),
+ "daemon_id".into(),
+ "interrupted_at".into(),
+ ],
+ updated_by: "daemon_recovery".into(),
+ });
+ }
+ Ok(None) => {
+ tracing::warn!(
+ task_id = %task_id,
+ "Task not found during recovery update"
+ );
+ }
+ Err(e) => {
+ tracing::error!(
+ task_id = %task_id,
+ error = %e,
+ "Failed to update task during recovery"
+ );
+ }
+ }
+ } else {
+ // Worktree missing - mark for retry with patch restoration
+ match repository::mark_task_for_retry(
+ &pool,
+ task_id,
+ daemon_uuid, // Mark this daemon as failed
+ ).await {
+ Ok(Some(_)) => {
+ tracing::info!(
+ task_id = %task_id,
+ "Task marked for retry (worktree missing)"
+ );
+ }
+ Ok(None) => {
+ tracing::warn!(
+ task_id = %task_id,
+ "Task not found or exceeded retries"
+ );
+ }
+ Err(e) => {
+ tracing::error!(
+ task_id = %task_id,
+ error = %e,
+ "Failed to mark task for retry"
+ );
+ }
+ }
+ }
+ });
+ }
+ }
Ok(DaemonMessage::Authenticate { .. }) => {
// Already authenticated, ignore
}