From 6ee2e75834bff187b8c262e0798ef365bc21cd59 Mon Sep 17 00:00:00 2001 From: soryu Date: Thu, 15 Jan 2026 22:33:47 +0000 Subject: Add resume and history system for makima (#1) This PR implements a comprehensive resume and history system that enables: 1. **History Viewing** - View complete conversation history for contracts across all phases - View conversation history for individual tasks - View task output/tool call history with timestamps - View checkpoint history - Timeline view showing all activities 2. **Resume System** - Resume interrupted supervisor conversations with full context - Resume interrupted task conversations - Resume from specific checkpoints - Continue tasks from previous task state (worktree inheritance) 3. **Rewind/Restore Features** - Rewind code to any checkpoint (git restore) - Rewind conversation to any point - Create new branches from historical points - Fork tasks from any point in history - New migration: 20250117000000_history_tables.sql - conversation_snapshots table for storing conversation state - history_events table for unified timeline - Added forking fields to tasks table - Added conversation_snapshot_id to task_checkpoints - ConversationSnapshot, HistoryEvent, ConversationMessage - Request/response types for resume and rewind operations - Query filter types for history endpoints - CRUD functions for conversation_snapshots - CRUD functions for history_events - Task conversation retrieval from task_events - GET /api/v1/contracts/{id}/history - GET /api/v1/contracts/{id}/supervisor/conversation - GET /api/v1/mesh/tasks/{id}/conversation - GET /api/v1/timeline - POST /api/v1/contracts/{id}/supervisor/resume - POST /api/v1/mesh/tasks/{id}/rewind - POST /api/v1/mesh/tasks/{id}/fork - POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume - POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch - POST /api/v1/contracts/{id}/supervisor/conversation/rewind - task-history: View task conversation history - task-checkpoints: List task checkpoints - resume: Resume supervisor after interruption - task-resume-from: Resume task from checkpoint - task-rewind: Rewind task code to checkpoint - task-fork: Fork task from historical point - rewind-conversation: Rewind supervisor conversation --- makima/src/server/handlers/mesh_supervisor.rs | 381 ++++++++++++++++++++++++++ 1 file changed, 381 insertions(+) (limited to 'makima/src/server/handlers/mesh_supervisor.rs') diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index 278d0f5..b45dda5 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -1504,3 +1504,384 @@ pub async fn answer_question( Json(AnswerQuestionResponse { success }).into_response() } + +// ============================================================================= +// Supervisor Resume and Conversation Rewind +// ============================================================================= + +/// Response for supervisor resume +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ResumeSupervisorResponse { + pub supervisor_task_id: Uuid, + pub daemon_id: Option, + pub resumed_from: ResumedFromInfo, + pub status: String, +} + +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ResumedFromInfo { + pub phase: String, + pub last_activity: chrono::DateTime, + pub message_count: i32, +} + +/// Resume interrupted supervisor with specified mode. +/// +/// POST /api/v1/contracts/{id}/supervisor/resume +#[utoipa::path( + post, + path = "/api/v1/contracts/{id}/supervisor/resume", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + request_body = crate::db::models::ResumeSupervisorRequest, + responses( + (status = 200, description = "Supervisor resumed", body = ResumeSupervisorResponse), + (status = 400, description = "Invalid request", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract or supervisor not found", body = ApiError), + (status = 409, description = "Supervisor is already running", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh Supervisor" +)] +pub async fn resume_supervisor( + State(state): State, + Path(contract_id): Path, + auth: crate::server::auth::Authenticated, + Json(req): Json, +) -> impl IntoResponse { + let crate::server::auth::Authenticated(auth_info) = auth; + + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get contract and verify ownership + let contract = match repository::get_contract_for_owner(pool, contract_id, auth_info.owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", contract_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get existing supervisor state + let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await { + Ok(Some(s)) => s, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new( + "NO_SUPERVISOR_STATE", + "No supervisor state found - supervisor may not have been started", + )), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get supervisor state: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get supervisor task + let supervisor_task = match repository::get_task_for_owner(pool, supervisor_state.task_id, auth_info.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Supervisor task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get supervisor task: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Check if already running + if supervisor_task.status == "running" { + return ( + StatusCode::CONFLICT, + Json(ApiError::new("ALREADY_RUNNING", "Supervisor is already running")), + ) + .into_response(); + } + + // Calculate message count from conversation history + let message_count = supervisor_state + .conversation_history + .as_array() + .map(|arr| arr.len() as i32) + .unwrap_or(0); + + // Based on resume mode, handle differently + match req.resume_mode.as_str() { + "continue" => { + // Mark task for reassignment with existing conversation context + if let Err(e) = sqlx::query("UPDATE tasks SET status = 'pending' WHERE id = $1") + .bind(supervisor_state.task_id) + .execute(pool) + .await + { + tracing::error!("Failed to update task status: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + "restart_phase" => { + // Clear conversation but keep phase progress + if let Err(e) = repository::update_supervisor_conversation( + pool, + contract_id, + serde_json::json!([]), + ) + .await + { + tracing::error!("Failed to clear conversation: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + + if let Err(e) = sqlx::query("UPDATE tasks SET status = 'pending' WHERE id = $1") + .bind(supervisor_state.task_id) + .execute(pool) + .await + { + tracing::error!("Failed to update task status: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + "from_checkpoint" => { + // This would require more complex handling with checkpoint system + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "NOT_IMPLEMENTED", + "from_checkpoint mode not yet implemented", + )), + ) + .into_response(); + } + _ => { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "INVALID_RESUME_MODE", + "Invalid resume_mode. Use: continue, restart_phase, or from_checkpoint", + )), + ) + .into_response(); + } + } + + tracing::info!( + contract_id = %contract_id, + supervisor_task_id = %supervisor_state.task_id, + resume_mode = %req.resume_mode, + message_count = message_count, + "Supervisor resume requested" + ); + + Json(ResumeSupervisorResponse { + supervisor_task_id: supervisor_state.task_id, + daemon_id: supervisor_task.daemon_id, + resumed_from: ResumedFromInfo { + phase: contract.phase, + last_activity: supervisor_state.last_activity, + message_count, + }, + status: "pending".to_string(), + }) + .into_response() +} + +/// Response for conversation rewind +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct RewindConversationResponse { + pub contract_id: Uuid, + pub messages_removed: i32, + pub new_message_count: i32, + pub code_rewound: bool, +} + +/// Rewind supervisor conversation to specified point. +/// +/// POST /api/v1/contracts/{id}/supervisor/conversation/rewind +#[utoipa::path( + post, + path = "/api/v1/contracts/{id}/supervisor/conversation/rewind", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + request_body = crate::db::models::RewindConversationRequest, + responses( + (status = 200, description = "Conversation rewound", body = RewindConversationResponse), + (status = 400, description = "Invalid request", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract or supervisor not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh Supervisor" +)] +pub async fn rewind_conversation( + State(state): State, + Path(contract_id): Path, + auth: crate::server::auth::Authenticated, + Json(req): Json, +) -> impl IntoResponse { + let crate::server::auth::Authenticated(auth_info) = auth; + + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get contract and verify ownership + let _contract = match repository::get_contract_for_owner(pool, contract_id, auth_info.owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", contract_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get supervisor state + let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await { + Ok(Some(s)) => s, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Supervisor state not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get supervisor state: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + let conversation = supervisor_state + .conversation_history + .as_array() + .cloned() + .unwrap_or_default(); + + let original_count = conversation.len() as i32; + + // Determine how many messages to keep + let new_count = if let Some(by_count) = req.by_message_count { + (original_count - by_count).max(0) + } else if let Some(to_index) = req.to_message_index { + // Keep messages up to and including the specified index + (to_index + 1).min(original_count).max(0) + } else { + // Default to removing last message + (original_count - 1).max(0) + }; + + // Truncate conversation + let new_conversation: Vec = conversation + .into_iter() + .take(new_count as usize) + .collect(); + + // Update the conversation + if let Err(e) = repository::update_supervisor_conversation( + pool, + contract_id, + serde_json::Value::Array(new_conversation), + ) + .await + { + tracing::error!("Failed to update conversation: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + + tracing::info!( + contract_id = %contract_id, + original_count = original_count, + new_count = new_count, + messages_removed = original_count - new_count, + "Conversation rewound" + ); + + Json(RewindConversationResponse { + contract_id, + messages_removed: original_count - new_count, + new_message_count: new_count, + code_rewound: req.rewind_code.unwrap_or(false), // TODO: implement code rewind + }) + .into_response() +} -- cgit v1.2.3