diff options
Diffstat (limited to 'makima/src/server/handlers')
| -rw-r--r-- | makima/src/server/handlers/contract_chat.rs | 3 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh.rs | 18 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_chat.rs | 3 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 59 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_supervisor.rs | 33 |
5 files changed, 116 insertions, 0 deletions
diff --git a/makima/src/server/handlers/contract_chat.rs b/makima/src/server/handlers/contract_chat.rs index c94538d..e2adb72 100644 --- a/makima/src/server/handlers/contract_chat.rs +++ b/makima/src/server/handlers/contract_chat.rs @@ -1593,8 +1593,11 @@ async fn handle_contract_request( copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; if let Err(e) = command_sender.send(command).await { diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index 53e1587..240e1f7 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -685,8 +685,11 @@ pub async fn start_task( copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; tracing::info!( @@ -734,8 +737,11 @@ pub async fn start_task( copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; if state.send_daemon_command(alt_daemon_id, alt_command).await.is_ok() { @@ -1135,8 +1141,11 @@ pub async fn send_message( copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: updated_task.contract_id, is_supervisor: updated_task.is_supervisor, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; if state.send_daemon_command(new_daemon_id, spawn_cmd).await.is_ok() { @@ -2273,8 +2282,11 @@ pub async fn reassign_task( copy_files: None, contract_id: task.contract_id, is_supervisor: task.is_supervisor, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; tracing::info!( @@ -2597,8 +2609,11 @@ pub async fn continue_task( copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; tracing::info!( @@ -3490,8 +3505,11 @@ pub async fn branch_task( copy_files: None, contract_id: None, is_supervisor: false, + autonomous_loop: false, resume_session: message_count > 0, // Resume if we have conversation history conversation_history: updated_task.conversation_state.clone(), + patch_data: None, + patch_base_sha: None, }; if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { diff --git a/makima/src/server/handlers/mesh_chat.rs b/makima/src/server/handlers/mesh_chat.rs index 8e134bd..1ff0724 100644 --- a/makima/src/server/handlers/mesh_chat.rs +++ b/makima/src/server/handlers/mesh_chat.rs @@ -1148,8 +1148,11 @@ async fn handle_mesh_request( copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; match state.send_daemon_command(target_daemon_id, command).await { diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 6262975..65db373 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -14,6 +14,7 @@ use axum::{ http::{HeaderMap, StatusCode}, response::{IntoResponse, Response}, }; +use base64::Engine; use futures::{SinkExt, StreamExt}; use serde::Deserialize; use sqlx::Row; @@ -410,6 +411,15 @@ pub enum DaemonMessage { error: Option<String>, /// User-provided checkpoint message message: String, + /// Base64-encoded gzip-compressed patch data for recovery + #[serde(rename = "patchData", skip_serializing_if = "Option::is_none")] + patch_data: Option<String>, + /// Commit SHA to apply patch on top of (for recovery) + #[serde(rename = "patchBaseSha", skip_serializing_if = "Option::is_none")] + patch_base_sha: Option<String>, + /// Number of files in the patch + #[serde(rename = "patchFilesCount", skip_serializing_if = "Option::is_none")] + patch_files_count: Option<i32>, }, /// Notification that git config was inherited GitConfigInherited { @@ -1279,11 +1289,15 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re lines_removed, error, message, + patch_data, + patch_base_sha, + patch_files_count, }) => { tracing::info!( task_id = %task_id, success = success, commit_sha = ?commit_sha, + has_patch = patch_data.is_some(), "Checkpoint created notification received" ); @@ -1309,6 +1323,50 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re "Checkpoint stored in database" ); + // Store patch if provided (for task recovery) + if let (Some(patch_b64), Some(base_sha)) = (&patch_data, &patch_base_sha) { + // Decode base64 patch data + match base64::engine::general_purpose::STANDARD.decode(patch_b64) { + Ok(patch_bytes) => { + let files_count = patch_files_count.unwrap_or(0); + // Default TTL: 7 days (168 hours) + let ttl_hours = 168i64; + match repository::create_checkpoint_patch( + pool, + task_id, + Some(checkpoint.id), + base_sha, + &patch_bytes, + files_count, + ttl_hours, + ).await { + Ok(patch) => { + tracing::info!( + task_id = %task_id, + patch_id = %patch.id, + patch_size = patch_bytes.len(), + "Checkpoint patch stored for recovery" + ); + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + error = %e, + "Failed to store checkpoint patch" + ); + } + } + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + error = %e, + "Failed to decode patch base64 data" + ); + } + } + } + // Broadcast success as task output state.broadcast_task_output(TaskOutputNotification { task_id, @@ -1346,6 +1404,7 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re "filesChanged": files_changed, "linesAdded": lines_added, "linesRemoved": lines_removed, + "hasPatch": patch_data.is_some(), }), ).await; } diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index 57f3f2f..21c9515 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -9,6 +9,7 @@ use axum::{ response::IntoResponse, Json, }; +use base64::Engine; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use uuid::Uuid; @@ -364,8 +365,11 @@ async fn try_start_pending_task( copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: updated_task.contract_id, is_supervisor: false, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { @@ -663,8 +667,11 @@ pub async fn spawn_task( copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: updated_task.contract_id, is_supervisor: false, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { @@ -1992,6 +1999,29 @@ pub async fn resume_supervisor( .into_response(); } + // Fetch latest checkpoint patch for worktree recovery + let (patch_data, patch_base_sha) = match repository::get_latest_checkpoint_patch(pool, supervisor_state.task_id).await { + Ok(Some(patch)) => { + tracing::info!( + task_id = %supervisor_state.task_id, + patch_size = patch.patch_size_bytes, + base_sha = %patch.base_commit_sha, + "Including checkpoint patch for worktree recovery" + ); + // Encode patch as base64 for JSON transport + let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data); + (Some(encoded), Some(patch.base_commit_sha)) + } + Ok(None) => { + tracing::debug!(task_id = %supervisor_state.task_id, "No checkpoint patch found"); + (None, None) + } + Err(e) => { + tracing::warn!(task_id = %supervisor_state.task_id, error = %e, "Failed to fetch checkpoint patch"); + (None, None) + } + }; + // Send SpawnTask with resume_session=true to use Claude's --continue // Include conversation_history as fallback if worktree doesn't exist on target daemon let command = DaemonCommand::SpawnTask { @@ -2010,8 +2040,11 @@ pub async fn resume_supervisor( copy_files: supervisor_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: supervisor_task.contract_id, is_supervisor: true, + autonomous_loop: false, resume_session: true, // Use --continue to preserve conversation conversation_history: Some(supervisor_state.conversation_history.clone()), // Fallback if worktree missing + patch_data, + patch_base_sha, }; if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { |
