diff options
Diffstat (limited to 'makima/src/db/repository.rs')
| -rw-r--r-- | makima/src/db/repository.rs | 113 |
1 files changed, 111 insertions, 2 deletions
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 84afc8d..da44899 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -6,8 +6,9 @@ use sqlx::PgPool; use uuid::Uuid; use super::models::{ - Contract, ContractChatConversation, ContractChatMessageRecord, ContractEvent, ContractRepository, - ContractSummary, ConversationMessage, ConversationSnapshot, CreateContractRequest, CreateFileRequest, + CheckpointPatch, CheckpointPatchInfo, Contract, ContractChatConversation, + ContractChatMessageRecord, ContractEvent, ContractRepository, ContractSummary, + ConversationMessage, ConversationSnapshot, CreateContractRequest, CreateFileRequest, CreateTaskRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters, MeshChatConversation, MeshChatMessageRecord, SupervisorState, Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest, @@ -3717,3 +3718,111 @@ pub async fn cleanup_stale_anonymous_tasks( Ok(result.rows_affected() as i64) } + +// ============================================================================ +// Checkpoint Patches (for task recovery) +// ============================================================================ + +/// Create a checkpoint patch for task recovery. +pub async fn create_checkpoint_patch( + pool: &PgPool, + task_id: Uuid, + checkpoint_id: Option<Uuid>, + base_commit_sha: &str, + patch_data: &[u8], + files_count: i32, + ttl_hours: i64, +) -> Result<CheckpointPatch, sqlx::Error> { + sqlx::query_as::<_, CheckpointPatch>( + r#" + INSERT INTO checkpoint_patches ( + task_id, checkpoint_id, base_commit_sha, patch_data, + patch_size_bytes, files_count, expires_at + ) + VALUES ($1, $2, $3, $4, $5, $6, NOW() + INTERVAL '1 hour' * $7) + RETURNING * + "#, + ) + .bind(task_id) + .bind(checkpoint_id) + .bind(base_commit_sha) + .bind(patch_data) + .bind(patch_data.len() as i32) + .bind(files_count) + .bind(ttl_hours) + .fetch_one(pool) + .await +} + +/// Get the latest checkpoint patch for a task. +pub async fn get_latest_checkpoint_patch( + pool: &PgPool, + task_id: Uuid, +) -> Result<Option<CheckpointPatch>, sqlx::Error> { + sqlx::query_as::<_, CheckpointPatch>( + r#" + SELECT * FROM checkpoint_patches + WHERE task_id = $1 AND expires_at > NOW() + ORDER BY created_at DESC + LIMIT 1 + "#, + ) + .bind(task_id) + .fetch_optional(pool) + .await +} + +/// Get a checkpoint patch by ID. +pub async fn get_checkpoint_patch( + pool: &PgPool, + id: Uuid, +) -> Result<Option<CheckpointPatch>, sqlx::Error> { + sqlx::query_as::<_, CheckpointPatch>( + "SELECT * FROM checkpoint_patches WHERE id = $1", + ) + .bind(id) + .fetch_optional(pool) + .await +} + +/// List all checkpoint patches for a task (without patch data for efficiency). +pub async fn list_checkpoint_patches( + pool: &PgPool, + task_id: Uuid, +) -> Result<Vec<CheckpointPatchInfo>, sqlx::Error> { + sqlx::query_as::<_, CheckpointPatchInfo>( + r#" + SELECT id, task_id, checkpoint_id, base_commit_sha, + patch_size_bytes, files_count, created_at, expires_at + FROM checkpoint_patches + WHERE task_id = $1 + ORDER BY created_at DESC + "#, + ) + .bind(task_id) + .fetch_all(pool) + .await +} + +/// Delete expired checkpoint patches. +/// Returns the number of deleted patches. +pub async fn cleanup_expired_checkpoint_patches( + pool: &PgPool, +) -> Result<i64, sqlx::Error> { + let result = sqlx::query("DELETE FROM checkpoint_patches WHERE expires_at < NOW()") + .execute(pool) + .await?; + Ok(result.rows_affected() as i64) +} + +/// Delete all checkpoint patches for a task. +pub async fn delete_checkpoint_patches_for_task( + pool: &PgPool, + task_id: Uuid, +) -> Result<i64, sqlx::Error> { + let result = sqlx::query("DELETE FROM checkpoint_patches WHERE task_id = $1") + .bind(task_id) + .execute(pool) + .await?; + Ok(result.rows_affected() as i64) +} |
