From 1ed362424dafec690f919154f5116471951cda9c Mon Sep 17 00:00:00 2001 From: soryu Date: Thu, 22 Jan 2026 22:32:46 +0000 Subject: Add patch checkpointing --- makima/src/db/models.rs | 42 ++++++++++++++++ makima/src/db/repository.rs | 113 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 153 insertions(+), 2 deletions(-) (limited to 'makima/src/db') diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index 6ede268..58f4da1 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -1966,3 +1966,45 @@ pub struct ForkPoint { pub checkpoint: Option, pub timestamp: DateTime, } + +// ============================================================================ +// Checkpoint Patches (for task recovery when worktrees are lost) +// ============================================================================ + +/// A stored git patch for checkpoint recovery. +/// Enables task recovery when local worktrees are deleted or corrupted. +#[derive(Debug, Clone, FromRow, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CheckpointPatch { + pub id: Uuid, + pub task_id: Uuid, + /// Optional link to a task_checkpoint record + pub checkpoint_id: Option, + /// The commit SHA that the patch should be applied on top of + pub base_commit_sha: String, + /// Compressed git diff data (gzip) + #[sqlx(rename = "patch_data")] + #[serde(skip)] // Don't serialize binary data to JSON + pub patch_data: Vec, + /// Size of the compressed patch in bytes + pub patch_size_bytes: i32, + /// Number of files affected by this patch + pub files_count: i32, + pub created_at: DateTime, + /// When this patch expires and will be automatically deleted + pub expires_at: DateTime, +} + +/// Response for checkpoint patch (without binary data) +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CheckpointPatchInfo { + pub id: Uuid, + pub task_id: Uuid, + pub checkpoint_id: Option, + pub base_commit_sha: String, + pub patch_size_bytes: i32, + pub files_count: i32, + pub created_at: DateTime, + pub expires_at: DateTime, +} diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 84afc8d..da44899 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -6,8 +6,9 @@ use sqlx::PgPool; use uuid::Uuid; use super::models::{ - Contract, ContractChatConversation, ContractChatMessageRecord, ContractEvent, ContractRepository, - ContractSummary, ConversationMessage, ConversationSnapshot, CreateContractRequest, CreateFileRequest, + CheckpointPatch, CheckpointPatchInfo, Contract, ContractChatConversation, + ContractChatMessageRecord, ContractEvent, ContractRepository, ContractSummary, + ConversationMessage, ConversationSnapshot, CreateContractRequest, CreateFileRequest, CreateTaskRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters, MeshChatConversation, MeshChatMessageRecord, SupervisorState, Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest, @@ -3717,3 +3718,111 @@ pub async fn cleanup_stale_anonymous_tasks( Ok(result.rows_affected() as i64) } + +// ============================================================================ +// Checkpoint Patches (for task recovery) +// ============================================================================ + +/// Create a checkpoint patch for task recovery. +pub async fn create_checkpoint_patch( + pool: &PgPool, + task_id: Uuid, + checkpoint_id: Option, + base_commit_sha: &str, + patch_data: &[u8], + files_count: i32, + ttl_hours: i64, +) -> Result { + sqlx::query_as::<_, CheckpointPatch>( + r#" + INSERT INTO checkpoint_patches ( + task_id, checkpoint_id, base_commit_sha, patch_data, + patch_size_bytes, files_count, expires_at + ) + VALUES ($1, $2, $3, $4, $5, $6, NOW() + INTERVAL '1 hour' * $7) + RETURNING * + "#, + ) + .bind(task_id) + .bind(checkpoint_id) + .bind(base_commit_sha) + .bind(patch_data) + .bind(patch_data.len() as i32) + .bind(files_count) + .bind(ttl_hours) + .fetch_one(pool) + .await +} + +/// Get the latest checkpoint patch for a task. +pub async fn get_latest_checkpoint_patch( + pool: &PgPool, + task_id: Uuid, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, CheckpointPatch>( + r#" + SELECT * FROM checkpoint_patches + WHERE task_id = $1 AND expires_at > NOW() + ORDER BY created_at DESC + LIMIT 1 + "#, + ) + .bind(task_id) + .fetch_optional(pool) + .await +} + +/// Get a checkpoint patch by ID. +pub async fn get_checkpoint_patch( + pool: &PgPool, + id: Uuid, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, CheckpointPatch>( + "SELECT * FROM checkpoint_patches WHERE id = $1", + ) + .bind(id) + .fetch_optional(pool) + .await +} + +/// List all checkpoint patches for a task (without patch data for efficiency). +pub async fn list_checkpoint_patches( + pool: &PgPool, + task_id: Uuid, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, CheckpointPatchInfo>( + r#" + SELECT id, task_id, checkpoint_id, base_commit_sha, + patch_size_bytes, files_count, created_at, expires_at + FROM checkpoint_patches + WHERE task_id = $1 + ORDER BY created_at DESC + "#, + ) + .bind(task_id) + .fetch_all(pool) + .await +} + +/// Delete expired checkpoint patches. +/// Returns the number of deleted patches. +pub async fn cleanup_expired_checkpoint_patches( + pool: &PgPool, +) -> Result { + let result = sqlx::query("DELETE FROM checkpoint_patches WHERE expires_at < NOW()") + .execute(pool) + .await?; + Ok(result.rows_affected() as i64) +} + +/// Delete all checkpoint patches for a task. +pub async fn delete_checkpoint_patches_for_task( + pool: &PgPool, + task_id: Uuid, +) -> Result { + let result = sqlx::query("DELETE FROM checkpoint_patches WHERE task_id = $1") + .bind(task_id) + .execute(pool) + .await?; + Ok(result.rows_affected() as i64) +} -- cgit v1.2.3