summaryrefslogtreecommitdiff
path: root/makima/src/daemon/task
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/daemon/task')
-rw-r--r--makima/src/daemon/task/manager.rs153
1 files changed, 153 insertions, 0 deletions
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs
index f921d50..ae84294 100644
--- a/makima/src/daemon/task/manager.rs
+++ b/makima/src/daemon/task/manager.rs
@@ -2037,6 +2037,7 @@ impl TaskManager {
git_user_email: self.git_user_email.clone(),
git_user_name: self.git_user_name.clone(),
api_url: self.config.api_url.clone(),
+ api_key: self.config.api_key.clone(),
heartbeat_commit_interval_secs: self.config.heartbeat_commit_interval_secs,
contract_task_counts: self.contract_task_counts.clone(),
checkpoint_patches: self.config.checkpoint_patches.clone(),
@@ -3953,6 +3954,7 @@ struct TaskManagerInner {
git_user_email: Arc<RwLock<Option<String>>>,
git_user_name: Arc<RwLock<Option<String>>>,
api_url: String,
+ api_key: String,
heartbeat_commit_interval_secs: u64,
/// Shared contract task counts for releasing concurrency slots.
contract_task_counts: Arc<RwLock<HashMap<Uuid, usize>>>,
@@ -3991,6 +3993,103 @@ impl TaskManagerInner {
}
}
+ /// Fetch the latest checkpoint patch from the server and restore a worktree.
+ async fn fetch_and_restore_patch(
+ &self,
+ task_id: Uuid,
+ task_name: &str,
+ repo_source: Option<&str>,
+ ) -> Result<Option<std::path::PathBuf>, DaemonError> {
+ use crate::daemon::api::client::ApiClient;
+
+ if self.api_key.is_empty() {
+ tracing::debug!(task_id = %task_id, "No API key configured, skipping patch fetch");
+ return Ok(None);
+ }
+
+ let client = match ApiClient::new(self.api_url.clone(), self.api_key.clone()) {
+ Ok(c) => c,
+ Err(e) => {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to create API client for patch fetch");
+ return Ok(None);
+ }
+ };
+
+ let path = format!("/api/v1/mesh/tasks/{}/patch-data", task_id);
+
+ #[derive(serde::Deserialize)]
+ #[serde(rename_all = "camelCase")]
+ struct PatchDataResponse {
+ patch_data: String,
+ base_commit_sha: String,
+ repository_url: Option<String>,
+ }
+
+ let resp: PatchDataResponse = match client.get(&path).await {
+ Ok(r) => r,
+ Err(crate::daemon::api::client::ApiError::Api { status: 404, .. }) => {
+ tracing::debug!(task_id = %task_id, "No checkpoint patch found on server");
+ return Ok(None);
+ }
+ Err(e) => {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to fetch patch data from server");
+ return Ok(None);
+ }
+ };
+
+ // Determine repo source: prefer the one from run_task args, fall back to server response
+ let source = repo_source
+ .map(|s| s.to_string())
+ .or(resp.repository_url);
+
+ let source = match source {
+ Some(s) => s,
+ None => {
+ tracing::warn!(task_id = %task_id, "No repository URL available to restore patch");
+ return Ok(None);
+ }
+ };
+
+ tracing::info!(
+ task_id = %task_id,
+ base_sha = %resp.base_commit_sha,
+ "Fetched checkpoint patch from server, attempting restore"
+ );
+
+ // Decode base64 patch data
+ let patch_bytes = match base64::Engine::decode(
+ &base64::engine::general_purpose::STANDARD,
+ &resp.patch_data,
+ ) {
+ Ok(b) => b,
+ Err(e) => {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to decode fetched patch data");
+ return Ok(None);
+ }
+ };
+
+ match self.worktree_manager.restore_from_patch(
+ &source,
+ task_id,
+ task_name,
+ &resp.base_commit_sha,
+ &patch_bytes,
+ ).await {
+ Ok(worktree_info) => {
+ tracing::info!(
+ task_id = %task_id,
+ path = %worktree_info.path.display(),
+ "Successfully restored worktree from fetched patch"
+ );
+ Ok(Some(worktree_info.path))
+ }
+ Err(e) => {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to restore worktree from fetched patch");
+ Ok(None)
+ }
+ }
+ }
+
/// Run a task to completion.
#[allow(clippy::too_many_arguments)]
async fn run_task(
@@ -4110,6 +4209,59 @@ impl TaskManagerInner {
None
};
+ // If resuming but no local worktree and no inline patch, try fetching from server
+ let restored_from_patch = if restored_from_patch.is_none() && existing_worktree.is_none() && resume_session {
+ tracing::info!(task_id = %task_id, "No local worktree or inline patch for resume, trying server fetch");
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ "Fetching checkpoint patch from server...\n".to_string(),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ match self.fetch_and_restore_patch(task_id, &task_name, repo_source.as_deref()).await {
+ Ok(Some(path)) => {
+ // Store worktree info in tasks map
+ {
+ let mut tasks = self.tasks.write().await;
+ if let Some(task) = tasks.get_mut(&task_id) {
+ task.worktree = Some(WorktreeInfo {
+ path: path.clone(),
+ branch: format!("task/{}", task_id),
+ source_repo: repo_source.clone().unwrap_or_default().into(),
+ });
+ }
+ }
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("Worktree restored from server patch at {}\n", path.display()),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ Some(path)
+ }
+ Ok(None) => {
+ tracing::info!(task_id = %task_id, "No server patch available, falling through to conversation-only resume");
+ let msg = DaemonMessage::task_output(
+ task_id,
+ "No checkpoint patch available on server, resuming with conversation history only\n".to_string(),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+ None
+ }
+ Err(e) => {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to fetch/restore patch from server");
+ None
+ }
+ }
+ } else {
+ restored_from_patch
+ };
+
// Determine working directory
// First check if we should share a supervisor's worktree
// Track if we need to merge to supervisor on completion (cross-daemon case)
@@ -5780,6 +5932,7 @@ impl Clone for TaskManagerInner {
git_user_email: self.git_user_email.clone(),
git_user_name: self.git_user_name.clone(),
api_url: self.api_url.clone(),
+ api_key: self.api_key.clone(),
heartbeat_commit_interval_secs: self.heartbeat_commit_interval_secs,
contract_task_counts: self.contract_task_counts.clone(),
checkpoint_patches: self.checkpoint_patches.clone(),