summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-02-09 02:19:38 +0000
committersoryu <soryu@soryu.co>2026-02-09 02:19:38 +0000
commit9c92d9235a0d1258fff9f7e625b0463c4952c45f (patch)
tree408bb739c7e58e282d762631c7a812e4c96fd6e8
parent9eb28de59b3b3815fc0eb15b37efcb07d51b8e96 (diff)
downloadsoryu-9c92d9235a0d1258fff9f7e625b0463c4952c45f.tar.gz
soryu-9c92d9235a0d1258fff9f7e625b0463c4952c45f.zip
Resume contracts from patches
-rw-r--r--makima/src/daemon/task/manager.rs153
-rw-r--r--makima/src/server/handlers/mesh.rs97
-rw-r--r--makima/src/server/mod.rs1
-rw-r--r--makima/src/server/openapi.rs2
4 files changed, 253 insertions, 0 deletions
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs
index f921d50..ae84294 100644
--- a/makima/src/daemon/task/manager.rs
+++ b/makima/src/daemon/task/manager.rs
@@ -2037,6 +2037,7 @@ impl TaskManager {
git_user_email: self.git_user_email.clone(),
git_user_name: self.git_user_name.clone(),
api_url: self.config.api_url.clone(),
+ api_key: self.config.api_key.clone(),
heartbeat_commit_interval_secs: self.config.heartbeat_commit_interval_secs,
contract_task_counts: self.contract_task_counts.clone(),
checkpoint_patches: self.config.checkpoint_patches.clone(),
@@ -3953,6 +3954,7 @@ struct TaskManagerInner {
git_user_email: Arc<RwLock<Option<String>>>,
git_user_name: Arc<RwLock<Option<String>>>,
api_url: String,
+ api_key: String,
heartbeat_commit_interval_secs: u64,
/// Shared contract task counts for releasing concurrency slots.
contract_task_counts: Arc<RwLock<HashMap<Uuid, usize>>>,
@@ -3991,6 +3993,103 @@ impl TaskManagerInner {
}
}
+ /// Fetch the latest checkpoint patch from the server and restore a worktree.
+ async fn fetch_and_restore_patch(
+ &self,
+ task_id: Uuid,
+ task_name: &str,
+ repo_source: Option<&str>,
+ ) -> Result<Option<std::path::PathBuf>, DaemonError> {
+ use crate::daemon::api::client::ApiClient;
+
+ if self.api_key.is_empty() {
+ tracing::debug!(task_id = %task_id, "No API key configured, skipping patch fetch");
+ return Ok(None);
+ }
+
+ let client = match ApiClient::new(self.api_url.clone(), self.api_key.clone()) {
+ Ok(c) => c,
+ Err(e) => {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to create API client for patch fetch");
+ return Ok(None);
+ }
+ };
+
+ let path = format!("/api/v1/mesh/tasks/{}/patch-data", task_id);
+
+ #[derive(serde::Deserialize)]
+ #[serde(rename_all = "camelCase")]
+ struct PatchDataResponse {
+ patch_data: String,
+ base_commit_sha: String,
+ repository_url: Option<String>,
+ }
+
+ let resp: PatchDataResponse = match client.get(&path).await {
+ Ok(r) => r,
+ Err(crate::daemon::api::client::ApiError::Api { status: 404, .. }) => {
+ tracing::debug!(task_id = %task_id, "No checkpoint patch found on server");
+ return Ok(None);
+ }
+ Err(e) => {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to fetch patch data from server");
+ return Ok(None);
+ }
+ };
+
+ // Determine repo source: prefer the one from run_task args, fall back to server response
+ let source = repo_source
+ .map(|s| s.to_string())
+ .or(resp.repository_url);
+
+ let source = match source {
+ Some(s) => s,
+ None => {
+ tracing::warn!(task_id = %task_id, "No repository URL available to restore patch");
+ return Ok(None);
+ }
+ };
+
+ tracing::info!(
+ task_id = %task_id,
+ base_sha = %resp.base_commit_sha,
+ "Fetched checkpoint patch from server, attempting restore"
+ );
+
+ // Decode base64 patch data
+ let patch_bytes = match base64::Engine::decode(
+ &base64::engine::general_purpose::STANDARD,
+ &resp.patch_data,
+ ) {
+ Ok(b) => b,
+ Err(e) => {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to decode fetched patch data");
+ return Ok(None);
+ }
+ };
+
+ match self.worktree_manager.restore_from_patch(
+ &source,
+ task_id,
+ task_name,
+ &resp.base_commit_sha,
+ &patch_bytes,
+ ).await {
+ Ok(worktree_info) => {
+ tracing::info!(
+ task_id = %task_id,
+ path = %worktree_info.path.display(),
+ "Successfully restored worktree from fetched patch"
+ );
+ Ok(Some(worktree_info.path))
+ }
+ Err(e) => {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to restore worktree from fetched patch");
+ Ok(None)
+ }
+ }
+ }
+
/// Run a task to completion.
#[allow(clippy::too_many_arguments)]
async fn run_task(
@@ -4110,6 +4209,59 @@ impl TaskManagerInner {
None
};
+ // If resuming but no local worktree and no inline patch, try fetching from server
+ let restored_from_patch = if restored_from_patch.is_none() && existing_worktree.is_none() && resume_session {
+ tracing::info!(task_id = %task_id, "No local worktree or inline patch for resume, trying server fetch");
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ "Fetching checkpoint patch from server...\n".to_string(),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ match self.fetch_and_restore_patch(task_id, &task_name, repo_source.as_deref()).await {
+ Ok(Some(path)) => {
+ // Store worktree info in tasks map
+ {
+ let mut tasks = self.tasks.write().await;
+ if let Some(task) = tasks.get_mut(&task_id) {
+ task.worktree = Some(WorktreeInfo {
+ path: path.clone(),
+ branch: format!("task/{}", task_id),
+ source_repo: repo_source.clone().unwrap_or_default().into(),
+ });
+ }
+ }
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("Worktree restored from server patch at {}\n", path.display()),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ Some(path)
+ }
+ Ok(None) => {
+ tracing::info!(task_id = %task_id, "No server patch available, falling through to conversation-only resume");
+ let msg = DaemonMessage::task_output(
+ task_id,
+ "No checkpoint patch available on server, resuming with conversation history only\n".to_string(),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+ None
+ }
+ Err(e) => {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to fetch/restore patch from server");
+ None
+ }
+ }
+ } else {
+ restored_from_patch
+ };
+
// Determine working directory
// First check if we should share a supervisor's worktree
// Track if we need to merge to supervisor on completion (cross-daemon case)
@@ -5780,6 +5932,7 @@ impl Clone for TaskManagerInner {
git_user_email: self.git_user_email.clone(),
git_user_name: self.git_user_name.clone(),
api_url: self.api_url.clone(),
+ api_key: self.api_key.clone(),
heartbeat_commit_interval_secs: self.heartbeat_commit_interval_secs,
contract_task_counts: self.contract_task_counts.clone(),
checkpoint_patches: self.checkpoint_patches.clone(),
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs
index 310bec8..5572d95 100644
--- a/makima/src/server/handlers/mesh.rs
+++ b/makima/src/server/handlers/mesh.rs
@@ -2264,6 +2264,103 @@ pub async fn list_task_patches(
Json(summaries).into_response()
}
+/// Response containing the latest checkpoint patch data for a task.
+#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct TaskPatchDataResponse {
+ /// Task ID
+ pub task_id: Uuid,
+ /// Base64-encoded patch data (gzip-compressed git diff)
+ pub patch_data: String,
+ /// The commit SHA that the patch should be applied on top of
+ pub base_commit_sha: String,
+ /// Repository URL from the task
+ pub repository_url: Option<String>,
+}
+
+/// Get the latest checkpoint patch data for a task (for worktree restoration).
+#[utoipa::path(
+ get,
+ path = "/api/v1/mesh/tasks/{id}/patch-data",
+ params(
+ ("id" = Uuid, Path, description = "Task ID")
+ ),
+ responses(
+ (status = 200, description = "Latest patch data", body = TaskPatchDataResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task or patch not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn get_task_patch_data(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get the task (scoped by owner)
+ let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task {}: {}", id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get latest checkpoint patch (with binary data)
+ let patch = match repository::get_latest_checkpoint_patch(pool, id).await {
+ Ok(Some(p)) => p,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "No checkpoint patch found for this task")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get patch for task {}: {}", id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ let patch_data_b64 = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data);
+
+ Json(TaskPatchDataResponse {
+ task_id: id,
+ patch_data: patch_data_b64,
+ base_commit_sha: patch.base_commit_sha,
+ repository_url: task.repository_url,
+ })
+ .into_response()
+}
+
/// Request to check if a target directory exists.
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "camelCase")]
diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs
index 9e1ee50..7a1391b 100644
--- a/makima/src/server/mod.rs
+++ b/makima/src/server/mod.rs
@@ -84,6 +84,7 @@ pub fn make_router(state: SharedState) -> Router {
.route("/mesh/tasks/{id}/clone", post(mesh::clone_worktree))
.route("/mesh/tasks/{id}/worktree-info", get(mesh::get_worktree_info))
.route("/mesh/tasks/{id}/patches", get(mesh::list_task_patches))
+ .route("/mesh/tasks/{id}/patch-data", get(mesh::get_task_patch_data))
.route("/mesh/tasks/{id}/check-target", post(mesh::check_target_exists))
.route("/mesh/tasks/{id}/reassign", post(mesh::reassign_task))
.route("/mesh/tasks/{id}/continue", post(mesh::continue_task))
diff --git a/makima/src/server/openapi.rs b/makima/src/server/openapi.rs
index 4e3b85b..ddc2db5 100644
--- a/makima/src/server/openapi.rs
+++ b/makima/src/server/openapi.rs
@@ -63,6 +63,7 @@ use crate::server::messages::{ApiError, AudioEncoding, StartMessage, StopMessage
mesh::get_daemon_directories,
mesh::clone_worktree,
mesh::check_target_exists,
+ mesh::get_task_patch_data,
mesh::branch_task,
mesh_chat::get_chat_history,
mesh_chat::clear_chat_history,
@@ -156,6 +157,7 @@ use crate::server::messages::{ApiError, AudioEncoding, StartMessage, StopMessage
DaemonListResponse,
DaemonDirectoriesResponse,
DaemonDirectory,
+ mesh::TaskPatchDataResponse,
MeshChatConversation,
MeshChatMessageRecord,
MeshChatHistoryResponse,