diff options
Diffstat (limited to 'makima')
| -rw-r--r-- | makima/src/daemon/task/manager.rs | 153 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh.rs | 97 | ||||
| -rw-r--r-- | makima/src/server/mod.rs | 1 | ||||
| -rw-r--r-- | makima/src/server/openapi.rs | 2 |
4 files changed, 253 insertions, 0 deletions
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs index f921d50..ae84294 100644 --- a/makima/src/daemon/task/manager.rs +++ b/makima/src/daemon/task/manager.rs @@ -2037,6 +2037,7 @@ impl TaskManager { git_user_email: self.git_user_email.clone(), git_user_name: self.git_user_name.clone(), api_url: self.config.api_url.clone(), + api_key: self.config.api_key.clone(), heartbeat_commit_interval_secs: self.config.heartbeat_commit_interval_secs, contract_task_counts: self.contract_task_counts.clone(), checkpoint_patches: self.config.checkpoint_patches.clone(), @@ -3953,6 +3954,7 @@ struct TaskManagerInner { git_user_email: Arc<RwLock<Option<String>>>, git_user_name: Arc<RwLock<Option<String>>>, api_url: String, + api_key: String, heartbeat_commit_interval_secs: u64, /// Shared contract task counts for releasing concurrency slots. contract_task_counts: Arc<RwLock<HashMap<Uuid, usize>>>, @@ -3991,6 +3993,103 @@ impl TaskManagerInner { } } + /// Fetch the latest checkpoint patch from the server and restore a worktree. + async fn fetch_and_restore_patch( + &self, + task_id: Uuid, + task_name: &str, + repo_source: Option<&str>, + ) -> Result<Option<std::path::PathBuf>, DaemonError> { + use crate::daemon::api::client::ApiClient; + + if self.api_key.is_empty() { + tracing::debug!(task_id = %task_id, "No API key configured, skipping patch fetch"); + return Ok(None); + } + + let client = match ApiClient::new(self.api_url.clone(), self.api_key.clone()) { + Ok(c) => c, + Err(e) => { + tracing::warn!(task_id = %task_id, error = %e, "Failed to create API client for patch fetch"); + return Ok(None); + } + }; + + let path = format!("/api/v1/mesh/tasks/{}/patch-data", task_id); + + #[derive(serde::Deserialize)] + #[serde(rename_all = "camelCase")] + struct PatchDataResponse { + patch_data: String, + base_commit_sha: String, + repository_url: Option<String>, + } + + let resp: PatchDataResponse = match client.get(&path).await { + Ok(r) => r, + Err(crate::daemon::api::client::ApiError::Api { status: 404, .. }) => { + tracing::debug!(task_id = %task_id, "No checkpoint patch found on server"); + return Ok(None); + } + Err(e) => { + tracing::warn!(task_id = %task_id, error = %e, "Failed to fetch patch data from server"); + return Ok(None); + } + }; + + // Determine repo source: prefer the one from run_task args, fall back to server response + let source = repo_source + .map(|s| s.to_string()) + .or(resp.repository_url); + + let source = match source { + Some(s) => s, + None => { + tracing::warn!(task_id = %task_id, "No repository URL available to restore patch"); + return Ok(None); + } + }; + + tracing::info!( + task_id = %task_id, + base_sha = %resp.base_commit_sha, + "Fetched checkpoint patch from server, attempting restore" + ); + + // Decode base64 patch data + let patch_bytes = match base64::Engine::decode( + &base64::engine::general_purpose::STANDARD, + &resp.patch_data, + ) { + Ok(b) => b, + Err(e) => { + tracing::warn!(task_id = %task_id, error = %e, "Failed to decode fetched patch data"); + return Ok(None); + } + }; + + match self.worktree_manager.restore_from_patch( + &source, + task_id, + task_name, + &resp.base_commit_sha, + &patch_bytes, + ).await { + Ok(worktree_info) => { + tracing::info!( + task_id = %task_id, + path = %worktree_info.path.display(), + "Successfully restored worktree from fetched patch" + ); + Ok(Some(worktree_info.path)) + } + Err(e) => { + tracing::warn!(task_id = %task_id, error = %e, "Failed to restore worktree from fetched patch"); + Ok(None) + } + } + } + /// Run a task to completion. #[allow(clippy::too_many_arguments)] async fn run_task( @@ -4110,6 +4209,59 @@ impl TaskManagerInner { None }; + // If resuming but no local worktree and no inline patch, try fetching from server + let restored_from_patch = if restored_from_patch.is_none() && existing_worktree.is_none() && resume_session { + tracing::info!(task_id = %task_id, "No local worktree or inline patch for resume, trying server fetch"); + + let msg = DaemonMessage::task_output( + task_id, + "Fetching checkpoint patch from server...\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + + match self.fetch_and_restore_patch(task_id, &task_name, repo_source.as_deref()).await { + Ok(Some(path)) => { + // Store worktree info in tasks map + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.worktree = Some(WorktreeInfo { + path: path.clone(), + branch: format!("task/{}", task_id), + source_repo: repo_source.clone().unwrap_or_default().into(), + }); + } + } + + let msg = DaemonMessage::task_output( + task_id, + format!("Worktree restored from server patch at {}\n", path.display()), + false, + ); + let _ = self.ws_tx.send(msg).await; + + Some(path) + } + Ok(None) => { + tracing::info!(task_id = %task_id, "No server patch available, falling through to conversation-only resume"); + let msg = DaemonMessage::task_output( + task_id, + "No checkpoint patch available on server, resuming with conversation history only\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + None + } + Err(e) => { + tracing::warn!(task_id = %task_id, error = %e, "Failed to fetch/restore patch from server"); + None + } + } + } else { + restored_from_patch + }; + // Determine working directory // First check if we should share a supervisor's worktree // Track if we need to merge to supervisor on completion (cross-daemon case) @@ -5780,6 +5932,7 @@ impl Clone for TaskManagerInner { git_user_email: self.git_user_email.clone(), git_user_name: self.git_user_name.clone(), api_url: self.api_url.clone(), + api_key: self.api_key.clone(), heartbeat_commit_interval_secs: self.heartbeat_commit_interval_secs, contract_task_counts: self.contract_task_counts.clone(), checkpoint_patches: self.checkpoint_patches.clone(), diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index 310bec8..5572d95 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -2264,6 +2264,103 @@ pub async fn list_task_patches( Json(summaries).into_response() } +/// Response containing the latest checkpoint patch data for a task. +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct TaskPatchDataResponse { + /// Task ID + pub task_id: Uuid, + /// Base64-encoded patch data (gzip-compressed git diff) + pub patch_data: String, + /// The commit SHA that the patch should be applied on top of + pub base_commit_sha: String, + /// Repository URL from the task + pub repository_url: Option<String>, +} + +/// Get the latest checkpoint patch data for a task (for worktree restoration). +#[utoipa::path( + get, + path = "/api/v1/mesh/tasks/{id}/patch-data", + params( + ("id" = Uuid, Path, description = "Task ID") + ), + responses( + (status = 200, description = "Latest patch data", body = TaskPatchDataResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task or patch not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn get_task_patch_data( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get the task (scoped by owner) + let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get latest checkpoint patch (with binary data) + let patch = match repository::get_latest_checkpoint_patch(pool, id).await { + Ok(Some(p)) => p, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "No checkpoint patch found for this task")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get patch for task {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + let patch_data_b64 = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data); + + Json(TaskPatchDataResponse { + task_id: id, + patch_data: patch_data_b64, + base_commit_sha: patch.base_commit_sha, + repository_url: task.repository_url, + }) + .into_response() +} + /// Request to check if a target directory exists. #[derive(Debug, serde::Deserialize, utoipa::ToSchema)] #[serde(rename_all = "camelCase")] diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index 9e1ee50..7a1391b 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -84,6 +84,7 @@ pub fn make_router(state: SharedState) -> Router { .route("/mesh/tasks/{id}/clone", post(mesh::clone_worktree)) .route("/mesh/tasks/{id}/worktree-info", get(mesh::get_worktree_info)) .route("/mesh/tasks/{id}/patches", get(mesh::list_task_patches)) + .route("/mesh/tasks/{id}/patch-data", get(mesh::get_task_patch_data)) .route("/mesh/tasks/{id}/check-target", post(mesh::check_target_exists)) .route("/mesh/tasks/{id}/reassign", post(mesh::reassign_task)) .route("/mesh/tasks/{id}/continue", post(mesh::continue_task)) diff --git a/makima/src/server/openapi.rs b/makima/src/server/openapi.rs index 4e3b85b..ddc2db5 100644 --- a/makima/src/server/openapi.rs +++ b/makima/src/server/openapi.rs @@ -63,6 +63,7 @@ use crate::server::messages::{ApiError, AudioEncoding, StartMessage, StopMessage mesh::get_daemon_directories, mesh::clone_worktree, mesh::check_target_exists, + mesh::get_task_patch_data, mesh::branch_task, mesh_chat::get_chat_history, mesh_chat::clear_chat_history, @@ -156,6 +157,7 @@ use crate::server::messages::{ApiError, AudioEncoding, StartMessage, StopMessage DaemonListResponse, DaemonDirectoriesResponse, DaemonDirectory, + mesh::TaskPatchDataResponse, MeshChatConversation, MeshChatMessageRecord, MeshChatHistoryResponse, |
