diff options
| author | soryu <soryu@soryu.co> | 2026-01-15 22:33:47 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-01-15 22:33:47 +0000 |
| commit | 6ee2e75834bff187b8c262e0798ef365bc21cd59 (patch) | |
| tree | d4bd61c7740835acfc9a9dff952d1d088ff6d535 /makima/src/server/handlers/mesh.rs | |
| parent | 908973b5c08a8b7b624880843c512e8bddf37896 (diff) | |
| download | soryu-6ee2e75834bff187b8c262e0798ef365bc21cd59.tar.gz soryu-6ee2e75834bff187b8c262e0798ef365bc21cd59.zip | |
Add resume and history system for makima (#1)
This PR implements a comprehensive resume and history system that enables:
1. **History Viewing**
- View complete conversation history for contracts across all phases
- View conversation history for individual tasks
- View task output/tool call history with timestamps
- View checkpoint history
- Timeline view showing all activities
2. **Resume System**
- Resume interrupted supervisor conversations with full context
- Resume interrupted task conversations
- Resume from specific checkpoints
- Continue tasks from previous task state (worktree inheritance)
3. **Rewind/Restore Features**
- Rewind code to any checkpoint (git restore)
- Rewind conversation to any point
- Create new branches from historical points
- Fork tasks from any point in history
- New migration: 20250117000000_history_tables.sql
- conversation_snapshots table for storing conversation state
- history_events table for unified timeline
- Added forking fields to tasks table
- Added conversation_snapshot_id to task_checkpoints
- ConversationSnapshot, HistoryEvent, ConversationMessage
- Request/response types for resume and rewind operations
- Query filter types for history endpoints
- CRUD functions for conversation_snapshots
- CRUD functions for history_events
- Task conversation retrieval from task_events
- GET /api/v1/contracts/{id}/history
- GET /api/v1/contracts/{id}/supervisor/conversation
- GET /api/v1/mesh/tasks/{id}/conversation
- GET /api/v1/timeline
- POST /api/v1/contracts/{id}/supervisor/resume
- POST /api/v1/mesh/tasks/{id}/rewind
- POST /api/v1/mesh/tasks/{id}/fork
- POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume
- POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch
- POST /api/v1/contracts/{id}/supervisor/conversation/rewind
- task-history: View task conversation history
- task-checkpoints: List task checkpoints
- resume: Resume supervisor after interruption
- task-resume-from: Resume task from checkpoint
- task-rewind: Rewind task code to checkpoint
- task-fork: Fork task from historical point
- rewind-conversation: Rewind supervisor conversation
Diffstat (limited to 'makima/src/server/handlers/mesh.rs')
| -rw-r--r-- | makima/src/server/handlers/mesh.rs | 664 |
1 files changed, 664 insertions, 0 deletions
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index 3da6fd5..b5ade53 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -2459,3 +2459,667 @@ pub async fn continue_task( }) .into_response() } + +// ============================================================================= +// Task Rewind and Fork (Resume and History System) +// ============================================================================= + +/// Rewind task code to specified checkpoint. +/// +/// POST /api/v1/mesh/tasks/{id}/rewind +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/rewind", + params( + ("id" = Uuid, Path, description = "Task ID") + ), + request_body = crate::db::models::RewindTaskRequest, + responses( + (status = 200, description = "Task rewound successfully", body = crate::db::models::RewindTaskResponse), + (status = 400, description = "Invalid request", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task or checkpoint not found", body = ApiError), + (status = 409, description = "Cannot rewind a running task", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn rewind_task( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(task_id): Path<Uuid>, + Json(req): Json<crate::db::models::RewindTaskRequest>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get task and verify ownership + let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task {}: {}", task_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Task cannot be running during rewind + if task.status == "running" { + return ( + StatusCode::CONFLICT, + Json(ApiError::new("TASK_RUNNING", "Cannot rewind a running task")), + ) + .into_response(); + } + + // Get checkpoint info + let checkpoint = if let Some(checkpoint_id) = req.checkpoint_id { + match repository::get_task_checkpoint(pool, checkpoint_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Checkpoint not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get checkpoint: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + } else if let Some(ref sha) = req.checkpoint_sha { + match repository::get_task_checkpoint_by_sha(pool, sha).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Checkpoint not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get checkpoint by SHA: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + } else { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "MISSING_CHECKPOINT", + "Must provide checkpoint_id or checkpoint_sha", + )), + ) + .into_response(); + }; + + // Verify checkpoint belongs to this task + if checkpoint.task_id != task_id { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "CHECKPOINT_MISMATCH", + "Checkpoint does not belong to this task", + )), + ) + .into_response(); + } + + // TODO: Send rewind command to daemon when daemon integration is complete + // For now, return a success response with checkpoint info + + tracing::info!( + task_id = %task_id, + checkpoint_number = checkpoint.checkpoint_number, + commit_sha = %checkpoint.commit_sha, + "Task rewind requested" + ); + + Json(crate::db::models::RewindTaskResponse { + task_id, + rewinded_to: crate::db::models::CheckpointInfo { + checkpoint_number: checkpoint.checkpoint_number, + sha: checkpoint.commit_sha.clone(), + message: checkpoint.message, + }, + preserved_as: req.branch_name.map(|name| crate::db::models::PreservedState { + state_type: "branch".to_string(), + reference: name, + }), + }) + .into_response() +} + +/// Fork task from historical point. +/// +/// POST /api/v1/mesh/tasks/{id}/fork +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/fork", + params( + ("id" = Uuid, Path, description = "Task ID") + ), + request_body = crate::db::models::ForkTaskRequest, + responses( + (status = 201, description = "Task forked successfully", body = crate::db::models::ForkTaskResponse), + (status = 400, description = "Invalid request", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task or checkpoint not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn fork_task( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(task_id): Path<Uuid>, + Json(req): Json<crate::db::models::ForkTaskRequest>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get source task and verify ownership + let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task {}: {}", task_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Find the checkpoint to fork from + let checkpoint = match req.fork_from_type.as_str() { + "checkpoint" => { + // fork_from_value is checkpoint number + let checkpoint_num: i32 = match req.fork_from_value.parse() { + Ok(n) => n, + Err(_) => { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("INVALID_CHECKPOINT", "Invalid checkpoint number")), + ) + .into_response(); + } + }; + + let checkpoints = match repository::list_task_checkpoints(pool, task_id).await { + Ok(c) => c, + Err(e) => { + tracing::error!("Failed to list checkpoints: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + match checkpoints + .into_iter() + .find(|c| c.checkpoint_number == checkpoint_num) + { + Some(c) => c, + None => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Checkpoint not found")), + ) + .into_response(); + } + } + } + _ => { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "UNSUPPORTED_FORK_TYPE", + "Only 'checkpoint' fork type is currently supported", + )), + ) + .into_response(); + } + }; + + // Create the new forked task + let create_req = CreateTaskRequest { + contract_id: task.contract_id.unwrap_or(Uuid::nil()), + name: req.new_task_name.clone(), + description: task.description.clone(), + plan: req.new_task_plan.clone(), + parent_task_id: None, // Forked tasks are independent + is_supervisor: false, + priority: task.priority, + repository_url: task.repository_url.clone(), + base_branch: task.base_branch.clone(), + target_branch: None, // New branch for forked work + merge_mode: task.merge_mode.clone(), + target_repo_path: task.target_repo_path.clone(), + completion_action: task.completion_action.clone(), + continue_from_task_id: None, + copy_files: None, + checkpoint_sha: Some(checkpoint.commit_sha.clone()), + }; + + let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await { + Ok(t) => t, + Err(e) => { + tracing::error!("Failed to create forked task: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + tracing::info!( + source_task_id = %task_id, + new_task_id = %new_task.id, + checkpoint_number = checkpoint.checkpoint_number, + "Task forked from checkpoint" + ); + + ( + StatusCode::CREATED, + Json(crate::db::models::ForkTaskResponse { + new_task_id: new_task.id, + source_task_id: task_id, + fork_point: crate::db::models::ForkPoint { + fork_type: "checkpoint".to_string(), + checkpoint: Some(checkpoint.clone()), + timestamp: checkpoint.created_at, + }, + branch_name: req.branch_name, + conversation_included: req.include_conversation.unwrap_or(false), + message_count: None, + }), + ) + .into_response() +} + +/// Create new task starting from specific checkpoint. +/// +/// POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume", + params( + ("id" = Uuid, Path, description = "Task ID"), + ("cid" = Uuid, Path, description = "Checkpoint ID") + ), + request_body = crate::db::models::ResumeFromCheckpointRequest, + responses( + (status = 201, description = "Task created from checkpoint", body = Task), + (status = 400, description = "Invalid request", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task or checkpoint not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn resume_from_checkpoint( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path((task_id, checkpoint_id)): Path<(Uuid, Uuid)>, + Json(req): Json<crate::db::models::ResumeFromCheckpointRequest>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get source task and verify ownership + let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task {}: {}", task_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get checkpoint + let checkpoint = match repository::get_task_checkpoint(pool, checkpoint_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Checkpoint not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get checkpoint: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Verify checkpoint belongs to the task + if checkpoint.task_id != task_id { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "CHECKPOINT_MISMATCH", + "Checkpoint does not belong to this task", + )), + ) + .into_response(); + } + + // Create the new task that will start from checkpoint + let task_name = req.task_name.unwrap_or_else(|| { + format!( + "{} (resumed from checkpoint {})", + task.name, checkpoint.checkpoint_number + ) + }); + + let create_req = CreateTaskRequest { + contract_id: task.contract_id.unwrap_or(Uuid::nil()), + name: task_name, + description: task.description.clone(), + plan: req.plan, + parent_task_id: None, + is_supervisor: false, + priority: task.priority, + repository_url: task.repository_url.clone(), + base_branch: task.base_branch.clone(), + target_branch: None, // New branch for resumed work + merge_mode: task.merge_mode.clone(), + target_repo_path: task.target_repo_path.clone(), + completion_action: task.completion_action.clone(), + continue_from_task_id: Some(task_id), // Copy worktree from original task + copy_files: None, + checkpoint_sha: Some(checkpoint.commit_sha.clone()), + }; + + let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await { + Ok(t) => t, + Err(e) => { + tracing::error!("Failed to create resumed task: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + tracing::info!( + source_task_id = %task_id, + new_task_id = %new_task.id, + checkpoint_id = %checkpoint_id, + checkpoint_number = checkpoint.checkpoint_number, + "Task resumed from checkpoint" + ); + + (StatusCode::CREATED, Json(new_task)).into_response() +} + +/// Request to create branch from checkpoint. +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreateBranchFromCheckpointRequest { + pub branch_name: String, + #[serde(default)] + pub checkout: bool, +} + +/// Response for branch creation from checkpoint. +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct BranchCreatedResponse { + pub branch_name: String, + pub commit_sha: String, + pub task_id: Uuid, + pub checkpoint_number: i32, +} + +/// Create git branch from checkpoint without starting task. +/// +/// POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch", + params( + ("id" = Uuid, Path, description = "Task ID"), + ("cid" = Uuid, Path, description = "Checkpoint ID") + ), + request_body = CreateBranchFromCheckpointRequest, + responses( + (status = 201, description = "Branch created", body = BranchCreatedResponse), + (status = 400, description = "Invalid request", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task or checkpoint not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn branch_from_checkpoint( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path((task_id, checkpoint_id)): Path<(Uuid, Uuid)>, + Json(req): Json<CreateBranchFromCheckpointRequest>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get task and verify ownership + let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task {}: {}", task_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get checkpoint + let checkpoint = match repository::get_task_checkpoint(pool, checkpoint_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Checkpoint not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get checkpoint: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Verify checkpoint belongs to the task + if checkpoint.task_id != task_id { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "CHECKPOINT_MISMATCH", + "Checkpoint does not belong to this task", + )), + ) + .into_response(); + } + + // Find a daemon to execute the branch creation + let target_daemon_id = if let Some(daemon_id) = task.daemon_id { + // Check if the original daemon is still connected + if state + .daemon_connections + .iter() + .any(|d| d.value().id == daemon_id && d.value().owner_id == auth.owner_id) + { + daemon_id + } else { + // Find any connected daemon for this owner + match state + .daemon_connections + .iter() + .find(|d| d.value().owner_id == auth.owner_id) + { + Some(d) => d.value().id, + None => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new( + "NO_DAEMON", + "No daemon connected to create branch", + )), + ) + .into_response(); + } + } + } + } else { + // No daemon assigned - use any available for this owner + match state + .daemon_connections + .iter() + .find(|d| d.value().owner_id == auth.owner_id) + { + Some(d) => d.value().id, + None => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new( + "NO_DAEMON", + "No daemon connected to create branch", + )), + ) + .into_response(); + } + } + }; + + // Send CreateBranch command to daemon + let cmd = DaemonCommand::CreateBranch { + task_id, + branch_name: req.branch_name.clone(), + from_ref: Some(checkpoint.commit_sha.clone()), + }; + + if let Err(e) = state.send_daemon_command(target_daemon_id, cmd).await { + tracing::error!("Failed to send CreateBranch command: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DAEMON_ERROR", e)), + ) + .into_response(); + } + + tracing::info!( + task_id = %task_id, + checkpoint_id = %checkpoint_id, + branch_name = %req.branch_name, + commit_sha = %checkpoint.commit_sha, + "Branch creation requested from checkpoint" + ); + + ( + StatusCode::CREATED, + Json(BranchCreatedResponse { + branch_name: req.branch_name, + commit_sha: checkpoint.commit_sha, + task_id, + checkpoint_number: checkpoint.checkpoint_number, + }), + ) + .into_response() +} |
