summaryrefslogtreecommitdiff
path: root/makima/src/server/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/server/handlers')
-rw-r--r--makima/src/server/handlers/contract_chat.rs3
-rw-r--r--makima/src/server/handlers/mesh.rs18
-rw-r--r--makima/src/server/handlers/mesh_chat.rs3
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs59
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs33
5 files changed, 116 insertions, 0 deletions
diff --git a/makima/src/server/handlers/contract_chat.rs b/makima/src/server/handlers/contract_chat.rs
index c94538d..e2adb72 100644
--- a/makima/src/server/handlers/contract_chat.rs
+++ b/makima/src/server/handlers/contract_chat.rs
@@ -1593,8 +1593,11 @@ async fn handle_contract_request(
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
+ autonomous_loop: false,
resume_session: false,
conversation_history: None,
+ patch_data: None,
+ patch_base_sha: None,
};
if let Err(e) = command_sender.send(command).await {
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs
index 53e1587..240e1f7 100644
--- a/makima/src/server/handlers/mesh.rs
+++ b/makima/src/server/handlers/mesh.rs
@@ -685,8 +685,11 @@ pub async fn start_task(
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
+ autonomous_loop: false,
resume_session: false,
conversation_history: None,
+ patch_data: None,
+ patch_base_sha: None,
};
tracing::info!(
@@ -734,8 +737,11 @@ pub async fn start_task(
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
+ autonomous_loop: false,
resume_session: false,
conversation_history: None,
+ patch_data: None,
+ patch_base_sha: None,
};
if state.send_daemon_command(alt_daemon_id, alt_command).await.is_ok() {
@@ -1135,8 +1141,11 @@ pub async fn send_message(
copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: updated_task.contract_id,
is_supervisor: updated_task.is_supervisor,
+ autonomous_loop: false,
resume_session: false,
conversation_history: None,
+ patch_data: None,
+ patch_base_sha: None,
};
if state.send_daemon_command(new_daemon_id, spawn_cmd).await.is_ok() {
@@ -2273,8 +2282,11 @@ pub async fn reassign_task(
copy_files: None,
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
+ autonomous_loop: false,
resume_session: false,
conversation_history: None,
+ patch_data: None,
+ patch_base_sha: None,
};
tracing::info!(
@@ -2597,8 +2609,11 @@ pub async fn continue_task(
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
+ autonomous_loop: false,
resume_session: false,
conversation_history: None,
+ patch_data: None,
+ patch_base_sha: None,
};
tracing::info!(
@@ -3490,8 +3505,11 @@ pub async fn branch_task(
copy_files: None,
contract_id: None,
is_supervisor: false,
+ autonomous_loop: false,
resume_session: message_count > 0, // Resume if we have conversation history
conversation_history: updated_task.conversation_state.clone(),
+ patch_data: None,
+ patch_base_sha: None,
};
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
diff --git a/makima/src/server/handlers/mesh_chat.rs b/makima/src/server/handlers/mesh_chat.rs
index 8e134bd..1ff0724 100644
--- a/makima/src/server/handlers/mesh_chat.rs
+++ b/makima/src/server/handlers/mesh_chat.rs
@@ -1148,8 +1148,11 @@ async fn handle_mesh_request(
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
+ autonomous_loop: false,
resume_session: false,
conversation_history: None,
+ patch_data: None,
+ patch_base_sha: None,
};
match state.send_daemon_command(target_daemon_id, command).await {
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index 6262975..65db373 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -14,6 +14,7 @@ use axum::{
http::{HeaderMap, StatusCode},
response::{IntoResponse, Response},
};
+use base64::Engine;
use futures::{SinkExt, StreamExt};
use serde::Deserialize;
use sqlx::Row;
@@ -410,6 +411,15 @@ pub enum DaemonMessage {
error: Option<String>,
/// User-provided checkpoint message
message: String,
+ /// Base64-encoded gzip-compressed patch data for recovery
+ #[serde(rename = "patchData", skip_serializing_if = "Option::is_none")]
+ patch_data: Option<String>,
+ /// Commit SHA to apply patch on top of (for recovery)
+ #[serde(rename = "patchBaseSha", skip_serializing_if = "Option::is_none")]
+ patch_base_sha: Option<String>,
+ /// Number of files in the patch
+ #[serde(rename = "patchFilesCount", skip_serializing_if = "Option::is_none")]
+ patch_files_count: Option<i32>,
},
/// Notification that git config was inherited
GitConfigInherited {
@@ -1279,11 +1289,15 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
lines_removed,
error,
message,
+ patch_data,
+ patch_base_sha,
+ patch_files_count,
}) => {
tracing::info!(
task_id = %task_id,
success = success,
commit_sha = ?commit_sha,
+ has_patch = patch_data.is_some(),
"Checkpoint created notification received"
);
@@ -1309,6 +1323,50 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
"Checkpoint stored in database"
);
+ // Store patch if provided (for task recovery)
+ if let (Some(patch_b64), Some(base_sha)) = (&patch_data, &patch_base_sha) {
+ // Decode base64 patch data
+ match base64::engine::general_purpose::STANDARD.decode(patch_b64) {
+ Ok(patch_bytes) => {
+ let files_count = patch_files_count.unwrap_or(0);
+ // Default TTL: 7 days (168 hours)
+ let ttl_hours = 168i64;
+ match repository::create_checkpoint_patch(
+ pool,
+ task_id,
+ Some(checkpoint.id),
+ base_sha,
+ &patch_bytes,
+ files_count,
+ ttl_hours,
+ ).await {
+ Ok(patch) => {
+ tracing::info!(
+ task_id = %task_id,
+ patch_id = %patch.id,
+ patch_size = patch_bytes.len(),
+ "Checkpoint patch stored for recovery"
+ );
+ }
+ Err(e) => {
+ tracing::warn!(
+ task_id = %task_id,
+ error = %e,
+ "Failed to store checkpoint patch"
+ );
+ }
+ }
+ }
+ Err(e) => {
+ tracing::warn!(
+ task_id = %task_id,
+ error = %e,
+ "Failed to decode patch base64 data"
+ );
+ }
+ }
+ }
+
// Broadcast success as task output
state.broadcast_task_output(TaskOutputNotification {
task_id,
@@ -1346,6 +1404,7 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
"filesChanged": files_changed,
"linesAdded": lines_added,
"linesRemoved": lines_removed,
+ "hasPatch": patch_data.is_some(),
}),
).await;
}
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index 57f3f2f..21c9515 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -9,6 +9,7 @@ use axum::{
response::IntoResponse,
Json,
};
+use base64::Engine;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use uuid::Uuid;
@@ -364,8 +365,11 @@ async fn try_start_pending_task(
copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: updated_task.contract_id,
is_supervisor: false,
+ autonomous_loop: false,
resume_session: false,
conversation_history: None,
+ patch_data: None,
+ patch_base_sha: None,
};
if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
@@ -663,8 +667,11 @@ pub async fn spawn_task(
copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: updated_task.contract_id,
is_supervisor: false,
+ autonomous_loop: false,
resume_session: false,
conversation_history: None,
+ patch_data: None,
+ patch_base_sha: None,
};
if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
@@ -1992,6 +1999,29 @@ pub async fn resume_supervisor(
.into_response();
}
+ // Fetch latest checkpoint patch for worktree recovery
+ let (patch_data, patch_base_sha) = match repository::get_latest_checkpoint_patch(pool, supervisor_state.task_id).await {
+ Ok(Some(patch)) => {
+ tracing::info!(
+ task_id = %supervisor_state.task_id,
+ patch_size = patch.patch_size_bytes,
+ base_sha = %patch.base_commit_sha,
+ "Including checkpoint patch for worktree recovery"
+ );
+ // Encode patch as base64 for JSON transport
+ let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data);
+ (Some(encoded), Some(patch.base_commit_sha))
+ }
+ Ok(None) => {
+ tracing::debug!(task_id = %supervisor_state.task_id, "No checkpoint patch found");
+ (None, None)
+ }
+ Err(e) => {
+ tracing::warn!(task_id = %supervisor_state.task_id, error = %e, "Failed to fetch checkpoint patch");
+ (None, None)
+ }
+ };
+
// Send SpawnTask with resume_session=true to use Claude's --continue
// Include conversation_history as fallback if worktree doesn't exist on target daemon
let command = DaemonCommand::SpawnTask {
@@ -2010,8 +2040,11 @@ pub async fn resume_supervisor(
copy_files: supervisor_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: supervisor_task.contract_id,
is_supervisor: true,
+ autonomous_loop: false,
resume_session: true, // Use --continue to preserve conversation
conversation_history: Some(supervisor_state.conversation_history.clone()), // Fallback if worktree missing
+ patch_data,
+ patch_base_sha,
};
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {