From 9c92d9235a0d1258fff9f7e625b0463c4952c45f Mon Sep 17 00:00:00 2001 From: soryu Date: Mon, 9 Feb 2026 02:19:38 +0000 Subject: Resume contracts from patches --- makima/src/daemon/task/manager.rs | 153 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 153 insertions(+) (limited to 'makima/src/daemon') diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs index f921d50..ae84294 100644 --- a/makima/src/daemon/task/manager.rs +++ b/makima/src/daemon/task/manager.rs @@ -2037,6 +2037,7 @@ impl TaskManager { git_user_email: self.git_user_email.clone(), git_user_name: self.git_user_name.clone(), api_url: self.config.api_url.clone(), + api_key: self.config.api_key.clone(), heartbeat_commit_interval_secs: self.config.heartbeat_commit_interval_secs, contract_task_counts: self.contract_task_counts.clone(), checkpoint_patches: self.config.checkpoint_patches.clone(), @@ -3953,6 +3954,7 @@ struct TaskManagerInner { git_user_email: Arc>>, git_user_name: Arc>>, api_url: String, + api_key: String, heartbeat_commit_interval_secs: u64, /// Shared contract task counts for releasing concurrency slots. contract_task_counts: Arc>>, @@ -3991,6 +3993,103 @@ impl TaskManagerInner { } } + /// Fetch the latest checkpoint patch from the server and restore a worktree. + async fn fetch_and_restore_patch( + &self, + task_id: Uuid, + task_name: &str, + repo_source: Option<&str>, + ) -> Result, DaemonError> { + use crate::daemon::api::client::ApiClient; + + if self.api_key.is_empty() { + tracing::debug!(task_id = %task_id, "No API key configured, skipping patch fetch"); + return Ok(None); + } + + let client = match ApiClient::new(self.api_url.clone(), self.api_key.clone()) { + Ok(c) => c, + Err(e) => { + tracing::warn!(task_id = %task_id, error = %e, "Failed to create API client for patch fetch"); + return Ok(None); + } + }; + + let path = format!("/api/v1/mesh/tasks/{}/patch-data", task_id); + + #[derive(serde::Deserialize)] + #[serde(rename_all = "camelCase")] + struct PatchDataResponse { + patch_data: String, + base_commit_sha: String, + repository_url: Option, + } + + let resp: PatchDataResponse = match client.get(&path).await { + Ok(r) => r, + Err(crate::daemon::api::client::ApiError::Api { status: 404, .. }) => { + tracing::debug!(task_id = %task_id, "No checkpoint patch found on server"); + return Ok(None); + } + Err(e) => { + tracing::warn!(task_id = %task_id, error = %e, "Failed to fetch patch data from server"); + return Ok(None); + } + }; + + // Determine repo source: prefer the one from run_task args, fall back to server response + let source = repo_source + .map(|s| s.to_string()) + .or(resp.repository_url); + + let source = match source { + Some(s) => s, + None => { + tracing::warn!(task_id = %task_id, "No repository URL available to restore patch"); + return Ok(None); + } + }; + + tracing::info!( + task_id = %task_id, + base_sha = %resp.base_commit_sha, + "Fetched checkpoint patch from server, attempting restore" + ); + + // Decode base64 patch data + let patch_bytes = match base64::Engine::decode( + &base64::engine::general_purpose::STANDARD, + &resp.patch_data, + ) { + Ok(b) => b, + Err(e) => { + tracing::warn!(task_id = %task_id, error = %e, "Failed to decode fetched patch data"); + return Ok(None); + } + }; + + match self.worktree_manager.restore_from_patch( + &source, + task_id, + task_name, + &resp.base_commit_sha, + &patch_bytes, + ).await { + Ok(worktree_info) => { + tracing::info!( + task_id = %task_id, + path = %worktree_info.path.display(), + "Successfully restored worktree from fetched patch" + ); + Ok(Some(worktree_info.path)) + } + Err(e) => { + tracing::warn!(task_id = %task_id, error = %e, "Failed to restore worktree from fetched patch"); + Ok(None) + } + } + } + /// Run a task to completion. #[allow(clippy::too_many_arguments)] async fn run_task( @@ -4110,6 +4209,59 @@ impl TaskManagerInner { None }; + // If resuming but no local worktree and no inline patch, try fetching from server + let restored_from_patch = if restored_from_patch.is_none() && existing_worktree.is_none() && resume_session { + tracing::info!(task_id = %task_id, "No local worktree or inline patch for resume, trying server fetch"); + + let msg = DaemonMessage::task_output( + task_id, + "Fetching checkpoint patch from server...\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + + match self.fetch_and_restore_patch(task_id, &task_name, repo_source.as_deref()).await { + Ok(Some(path)) => { + // Store worktree info in tasks map + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.worktree = Some(WorktreeInfo { + path: path.clone(), + branch: format!("task/{}", task_id), + source_repo: repo_source.clone().unwrap_or_default().into(), + }); + } + } + + let msg = DaemonMessage::task_output( + task_id, + format!("Worktree restored from server patch at {}\n", path.display()), + false, + ); + let _ = self.ws_tx.send(msg).await; + + Some(path) + } + Ok(None) => { + tracing::info!(task_id = %task_id, "No server patch available, falling through to conversation-only resume"); + let msg = DaemonMessage::task_output( + task_id, + "No checkpoint patch available on server, resuming with conversation history only\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + None + } + Err(e) => { + tracing::warn!(task_id = %task_id, error = %e, "Failed to fetch/restore patch from server"); + None + } + } + } else { + restored_from_patch + }; + // Determine working directory // First check if we should share a supervisor's worktree // Track if we need to merge to supervisor on completion (cross-daemon case) @@ -5780,6 +5932,7 @@ impl Clone for TaskManagerInner { git_user_email: self.git_user_email.clone(), git_user_name: self.git_user_name.clone(), api_url: self.api_url.clone(), + api_key: self.api_key.clone(), heartbeat_commit_interval_secs: self.heartbeat_commit_interval_secs, contract_task_counts: self.contract_task_counts.clone(), checkpoint_patches: self.checkpoint_patches.clone(), -- cgit v1.2.3