diff options
| author | soryu <soryu@soryu.co> | 2026-01-22 22:32:46 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-23 01:03:04 +0000 |
| commit | 1ed362424dafec690f919154f5116471951cda9c (patch) | |
| tree | 19c7ca9231887394a791223fe32a8ad335a687a8 /makima/src/server/handlers/mesh_daemon.rs | |
| parent | 265f8cf14fec9d7116d09af49e4b48b357faceda (diff) | |
| download | soryu-1ed362424dafec690f919154f5116471951cda9c.tar.gz soryu-1ed362424dafec690f919154f5116471951cda9c.zip | |
Add patch checkpointing
Diffstat (limited to 'makima/src/server/handlers/mesh_daemon.rs')
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 59 |
1 files changed, 59 insertions, 0 deletions
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 6262975..65db373 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -14,6 +14,7 @@ use axum::{ http::{HeaderMap, StatusCode}, response::{IntoResponse, Response}, }; +use base64::Engine; use futures::{SinkExt, StreamExt}; use serde::Deserialize; use sqlx::Row; @@ -410,6 +411,15 @@ pub enum DaemonMessage { error: Option<String>, /// User-provided checkpoint message message: String, + /// Base64-encoded gzip-compressed patch data for recovery + #[serde(rename = "patchData", skip_serializing_if = "Option::is_none")] + patch_data: Option<String>, + /// Commit SHA to apply patch on top of (for recovery) + #[serde(rename = "patchBaseSha", skip_serializing_if = "Option::is_none")] + patch_base_sha: Option<String>, + /// Number of files in the patch + #[serde(rename = "patchFilesCount", skip_serializing_if = "Option::is_none")] + patch_files_count: Option<i32>, }, /// Notification that git config was inherited GitConfigInherited { @@ -1279,11 +1289,15 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re lines_removed, error, message, + patch_data, + patch_base_sha, + patch_files_count, }) => { tracing::info!( task_id = %task_id, success = success, commit_sha = ?commit_sha, + has_patch = patch_data.is_some(), "Checkpoint created notification received" ); @@ -1309,6 +1323,50 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re "Checkpoint stored in database" ); + // Store patch if provided (for task recovery) + if let (Some(patch_b64), Some(base_sha)) = (&patch_data, &patch_base_sha) { + // Decode base64 patch data + match base64::engine::general_purpose::STANDARD.decode(patch_b64) { + Ok(patch_bytes) => { + let files_count = patch_files_count.unwrap_or(0); + // Default TTL: 7 days (168 hours) + let ttl_hours = 168i64; + match repository::create_checkpoint_patch( + pool, + task_id, + Some(checkpoint.id), + base_sha, + &patch_bytes, + files_count, + ttl_hours, + ).await { + Ok(patch) => { + tracing::info!( + task_id = %task_id, + patch_id = %patch.id, + patch_size = patch_bytes.len(), + "Checkpoint patch stored for recovery" + ); + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + error = %e, + "Failed to store checkpoint patch" + ); + } + } + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + error = %e, + "Failed to decode patch base64 data" + ); + } + } + } + // Broadcast success as task output state.broadcast_task_output(TaskOutputNotification { task_id, @@ -1346,6 +1404,7 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re "filesChanged": files_changed, "linesAdded": lines_added, "linesRemoved": lines_removed, + "hasPatch": patch_data.is_some(), }), ).await; } |
