diff options
| author | soryu <soryu@soryu.co> | 2026-01-22 22:32:46 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-23 01:03:04 +0000 |
| commit | 1ed362424dafec690f919154f5116471951cda9c (patch) | |
| tree | 19c7ca9231887394a791223fe32a8ad335a687a8 /makima/src/server | |
| parent | 265f8cf14fec9d7116d09af49e4b48b357faceda (diff) | |
| download | soryu-1ed362424dafec690f919154f5116471951cda9c.tar.gz soryu-1ed362424dafec690f919154f5116471951cda9c.zip | |
Add patch checkpointing
Diffstat (limited to 'makima/src/server')
| -rw-r--r-- | makima/src/server/handlers/contract_chat.rs | 3 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh.rs | 18 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_chat.rs | 3 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 59 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_supervisor.rs | 33 | ||||
| -rw-r--r-- | makima/src/server/mod.rs | 43 | ||||
| -rw-r--r-- | makima/src/server/state.rs | 9 |
7 files changed, 168 insertions, 0 deletions
diff --git a/makima/src/server/handlers/contract_chat.rs b/makima/src/server/handlers/contract_chat.rs index c94538d..e2adb72 100644 --- a/makima/src/server/handlers/contract_chat.rs +++ b/makima/src/server/handlers/contract_chat.rs @@ -1593,8 +1593,11 @@ async fn handle_contract_request( copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; if let Err(e) = command_sender.send(command).await { diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index 53e1587..240e1f7 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -685,8 +685,11 @@ pub async fn start_task( copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; tracing::info!( @@ -734,8 +737,11 @@ pub async fn start_task( copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; if state.send_daemon_command(alt_daemon_id, alt_command).await.is_ok() { @@ -1135,8 +1141,11 @@ pub async fn send_message( copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: updated_task.contract_id, is_supervisor: updated_task.is_supervisor, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; if state.send_daemon_command(new_daemon_id, spawn_cmd).await.is_ok() { @@ -2273,8 +2282,11 @@ pub async fn reassign_task( copy_files: None, contract_id: task.contract_id, is_supervisor: task.is_supervisor, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; tracing::info!( @@ -2597,8 +2609,11 @@ pub async fn continue_task( copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; tracing::info!( @@ -3490,8 +3505,11 @@ pub async fn branch_task( copy_files: None, contract_id: None, is_supervisor: false, + autonomous_loop: false, resume_session: message_count > 0, // Resume if we have conversation history conversation_history: updated_task.conversation_state.clone(), + patch_data: None, + patch_base_sha: None, }; if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { diff --git a/makima/src/server/handlers/mesh_chat.rs b/makima/src/server/handlers/mesh_chat.rs index 8e134bd..1ff0724 100644 --- a/makima/src/server/handlers/mesh_chat.rs +++ b/makima/src/server/handlers/mesh_chat.rs @@ -1148,8 +1148,11 @@ async fn handle_mesh_request( copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; match state.send_daemon_command(target_daemon_id, command).await { diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 6262975..65db373 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -14,6 +14,7 @@ use axum::{ http::{HeaderMap, StatusCode}, response::{IntoResponse, Response}, }; +use base64::Engine; use futures::{SinkExt, StreamExt}; use serde::Deserialize; use sqlx::Row; @@ -410,6 +411,15 @@ pub enum DaemonMessage { error: Option<String>, /// User-provided checkpoint message message: String, + /// Base64-encoded gzip-compressed patch data for recovery + #[serde(rename = "patchData", skip_serializing_if = "Option::is_none")] + patch_data: Option<String>, + /// Commit SHA to apply patch on top of (for recovery) + #[serde(rename = "patchBaseSha", skip_serializing_if = "Option::is_none")] + patch_base_sha: Option<String>, + /// Number of files in the patch + #[serde(rename = "patchFilesCount", skip_serializing_if = "Option::is_none")] + patch_files_count: Option<i32>, }, /// Notification that git config was inherited GitConfigInherited { @@ -1279,11 +1289,15 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re lines_removed, error, message, + patch_data, + patch_base_sha, + patch_files_count, }) => { tracing::info!( task_id = %task_id, success = success, commit_sha = ?commit_sha, + has_patch = patch_data.is_some(), "Checkpoint created notification received" ); @@ -1309,6 +1323,50 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re "Checkpoint stored in database" ); + // Store patch if provided (for task recovery) + if let (Some(patch_b64), Some(base_sha)) = (&patch_data, &patch_base_sha) { + // Decode base64 patch data + match base64::engine::general_purpose::STANDARD.decode(patch_b64) { + Ok(patch_bytes) => { + let files_count = patch_files_count.unwrap_or(0); + // Default TTL: 7 days (168 hours) + let ttl_hours = 168i64; + match repository::create_checkpoint_patch( + pool, + task_id, + Some(checkpoint.id), + base_sha, + &patch_bytes, + files_count, + ttl_hours, + ).await { + Ok(patch) => { + tracing::info!( + task_id = %task_id, + patch_id = %patch.id, + patch_size = patch_bytes.len(), + "Checkpoint patch stored for recovery" + ); + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + error = %e, + "Failed to store checkpoint patch" + ); + } + } + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + error = %e, + "Failed to decode patch base64 data" + ); + } + } + } + // Broadcast success as task output state.broadcast_task_output(TaskOutputNotification { task_id, @@ -1346,6 +1404,7 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re "filesChanged": files_changed, "linesAdded": lines_added, "linesRemoved": lines_removed, + "hasPatch": patch_data.is_some(), }), ).await; } diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index 57f3f2f..21c9515 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -9,6 +9,7 @@ use axum::{ response::IntoResponse, Json, }; +use base64::Engine; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use uuid::Uuid; @@ -364,8 +365,11 @@ async fn try_start_pending_task( copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: updated_task.contract_id, is_supervisor: false, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { @@ -663,8 +667,11 @@ pub async fn spawn_task( copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: updated_task.contract_id, is_supervisor: false, + autonomous_loop: false, resume_session: false, conversation_history: None, + patch_data: None, + patch_base_sha: None, }; if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { @@ -1992,6 +1999,29 @@ pub async fn resume_supervisor( .into_response(); } + // Fetch latest checkpoint patch for worktree recovery + let (patch_data, patch_base_sha) = match repository::get_latest_checkpoint_patch(pool, supervisor_state.task_id).await { + Ok(Some(patch)) => { + tracing::info!( + task_id = %supervisor_state.task_id, + patch_size = patch.patch_size_bytes, + base_sha = %patch.base_commit_sha, + "Including checkpoint patch for worktree recovery" + ); + // Encode patch as base64 for JSON transport + let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data); + (Some(encoded), Some(patch.base_commit_sha)) + } + Ok(None) => { + tracing::debug!(task_id = %supervisor_state.task_id, "No checkpoint patch found"); + (None, None) + } + Err(e) => { + tracing::warn!(task_id = %supervisor_state.task_id, error = %e, "Failed to fetch checkpoint patch"); + (None, None) + } + }; + // Send SpawnTask with resume_session=true to use Claude's --continue // Include conversation_history as fallback if worktree doesn't exist on target daemon let command = DaemonCommand::SpawnTask { @@ -2010,8 +2040,11 @@ pub async fn resume_supervisor( copy_files: supervisor_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: supervisor_task.contract_id, is_supervisor: true, + autonomous_loop: false, resume_session: true, // Use --continue to preserve conversation conversation_history: Some(supervisor_state.conversation_history.clone()), // Fallback if worktree missing + patch_data, + patch_base_sha, }; if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index 0bc1b92..3a27513 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -248,6 +248,8 @@ const DAEMON_HEARTBEAT_TIMEOUT_SECS: i64 = 120; const ANONYMOUS_TASK_CLEANUP_INTERVAL_SECS: u64 = 24 * 60 * 60; /// Maximum age in days for anonymous tasks before cleanup const ANONYMOUS_TASK_MAX_AGE_DAYS: i32 = 7; +/// Interval for checkpoint patch cleanup (hourly) +const CHECKPOINT_PATCH_CLEANUP_INTERVAL_SECS: u64 = 3600; /// Run the HTTP server with graceful shutdown support. /// @@ -344,6 +346,47 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> { } } }); + + // Clone pool for checkpoint patch cleanup + let checkpoint_patch_cleanup_pool = pool.clone(); + + // Initial cleanup of any expired checkpoint patches + match crate::db::repository::cleanup_expired_checkpoint_patches(&pool).await { + Ok(deleted) if deleted > 0 => { + tracing::info!( + deleted = deleted, + "Cleaned up expired checkpoint patches on startup" + ); + } + Err(e) => { + tracing::warn!(error = %e, "Failed to clean up expired checkpoint patches on startup"); + } + _ => {} + } + + // Spawn periodic checkpoint patch cleanup task (runs hourly) + tokio::spawn(async move { + let mut interval = tokio::time::interval( + std::time::Duration::from_secs(CHECKPOINT_PATCH_CLEANUP_INTERVAL_SECS) + ); + loop { + interval.tick().await; + match crate::db::repository::cleanup_expired_checkpoint_patches( + &checkpoint_patch_cleanup_pool, + ).await { + Ok(deleted) if deleted > 0 => { + tracing::info!( + deleted = deleted, + "Cleaned up expired checkpoint patches" + ); + } + Err(e) => { + tracing::warn!(error = %e, "Failed to clean up expired checkpoint patches"); + } + _ => {} + } + } + }); } let app = make_router(state); diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs index 28d65d0..5b75281 100644 --- a/makima/src/server/state.rs +++ b/makima/src/server/state.rs @@ -208,12 +208,21 @@ pub enum DaemonCommand { /// Whether this task is a supervisor (long-running contract orchestrator) #[serde(rename = "isSupervisor")] is_supervisor: bool, + /// Whether to run in autonomous loop mode + #[serde(rename = "autonomousLoop", default)] + autonomous_loop: bool, /// Whether to resume from a previous session using --continue flag #[serde(rename = "resumeSession", default)] resume_session: bool, /// Conversation history for fallback when worktree doesn't exist #[serde(rename = "conversationHistory", default)] conversation_history: Option<serde_json::Value>, + /// Base64-encoded gzip-compressed patch for worktree recovery + #[serde(rename = "patchData", default, skip_serializing_if = "Option::is_none")] + patch_data: Option<String>, + /// Commit SHA to apply the patch on top of + #[serde(rename = "patchBaseSha", default, skip_serializing_if = "Option::is_none")] + patch_base_sha: Option<String>, }, /// Pause a running task PauseTask { |
