summaryrefslogtreecommitdiff
path: root/makima/src/server/handlers/mesh_supervisor.rs
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-15 22:33:47 +0000
committerGitHub <noreply@github.com>2026-01-15 22:33:47 +0000
commit6ee2e75834bff187b8c262e0798ef365bc21cd59 (patch)
treed4bd61c7740835acfc9a9dff952d1d088ff6d535 /makima/src/server/handlers/mesh_supervisor.rs
parent908973b5c08a8b7b624880843c512e8bddf37896 (diff)
downloadsoryu-6ee2e75834bff187b8c262e0798ef365bc21cd59.tar.gz
soryu-6ee2e75834bff187b8c262e0798ef365bc21cd59.zip
Add resume and history system for makima (#1)
This PR implements a comprehensive resume and history system that enables: 1. **History Viewing** - View complete conversation history for contracts across all phases - View conversation history for individual tasks - View task output/tool call history with timestamps - View checkpoint history - Timeline view showing all activities 2. **Resume System** - Resume interrupted supervisor conversations with full context - Resume interrupted task conversations - Resume from specific checkpoints - Continue tasks from previous task state (worktree inheritance) 3. **Rewind/Restore Features** - Rewind code to any checkpoint (git restore) - Rewind conversation to any point - Create new branches from historical points - Fork tasks from any point in history - New migration: 20250117000000_history_tables.sql - conversation_snapshots table for storing conversation state - history_events table for unified timeline - Added forking fields to tasks table - Added conversation_snapshot_id to task_checkpoints - ConversationSnapshot, HistoryEvent, ConversationMessage - Request/response types for resume and rewind operations - Query filter types for history endpoints - CRUD functions for conversation_snapshots - CRUD functions for history_events - Task conversation retrieval from task_events - GET /api/v1/contracts/{id}/history - GET /api/v1/contracts/{id}/supervisor/conversation - GET /api/v1/mesh/tasks/{id}/conversation - GET /api/v1/timeline - POST /api/v1/contracts/{id}/supervisor/resume - POST /api/v1/mesh/tasks/{id}/rewind - POST /api/v1/mesh/tasks/{id}/fork - POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume - POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch - POST /api/v1/contracts/{id}/supervisor/conversation/rewind - task-history: View task conversation history - task-checkpoints: List task checkpoints - resume: Resume supervisor after interruption - task-resume-from: Resume task from checkpoint - task-rewind: Rewind task code to checkpoint - task-fork: Fork task from historical point - rewind-conversation: Rewind supervisor conversation
Diffstat (limited to 'makima/src/server/handlers/mesh_supervisor.rs')
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs381
1 files changed, 381 insertions, 0 deletions
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index 278d0f5..b45dda5 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -1504,3 +1504,384 @@ pub async fn answer_question(
Json(AnswerQuestionResponse { success }).into_response()
}
+
+// =============================================================================
+// Supervisor Resume and Conversation Rewind
+// =============================================================================
+
+/// Response for supervisor resume
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ResumeSupervisorResponse {
+ pub supervisor_task_id: Uuid,
+ pub daemon_id: Option<Uuid>,
+ pub resumed_from: ResumedFromInfo,
+ pub status: String,
+}
+
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ResumedFromInfo {
+ pub phase: String,
+ pub last_activity: chrono::DateTime<chrono::Utc>,
+ pub message_count: i32,
+}
+
+/// Resume interrupted supervisor with specified mode.
+///
+/// POST /api/v1/contracts/{id}/supervisor/resume
+#[utoipa::path(
+ post,
+ path = "/api/v1/contracts/{id}/supervisor/resume",
+ params(
+ ("id" = Uuid, Path, description = "Contract ID")
+ ),
+ request_body = crate::db::models::ResumeSupervisorRequest,
+ responses(
+ (status = 200, description = "Supervisor resumed", body = ResumeSupervisorResponse),
+ (status = 400, description = "Invalid request", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Contract or supervisor not found", body = ApiError),
+ (status = 409, description = "Supervisor is already running", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh Supervisor"
+)]
+pub async fn resume_supervisor(
+ State(state): State<SharedState>,
+ Path(contract_id): Path<Uuid>,
+ auth: crate::server::auth::Authenticated,
+ Json(req): Json<crate::db::models::ResumeSupervisorRequest>,
+) -> impl IntoResponse {
+ let crate::server::auth::Authenticated(auth_info) = auth;
+
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get contract and verify ownership
+ let contract = match repository::get_contract_for_owner(pool, contract_id, auth_info.owner_id).await {
+ Ok(Some(c)) => c,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Contract not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get contract {}: {}", contract_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get existing supervisor state
+ let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await {
+ Ok(Some(s)) => s,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new(
+ "NO_SUPERVISOR_STATE",
+ "No supervisor state found - supervisor may not have been started",
+ )),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get supervisor state: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get supervisor task
+ let supervisor_task = match repository::get_task_for_owner(pool, supervisor_state.task_id, auth_info.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Supervisor task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get supervisor task: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Check if already running
+ if supervisor_task.status == "running" {
+ return (
+ StatusCode::CONFLICT,
+ Json(ApiError::new("ALREADY_RUNNING", "Supervisor is already running")),
+ )
+ .into_response();
+ }
+
+ // Calculate message count from conversation history
+ let message_count = supervisor_state
+ .conversation_history
+ .as_array()
+ .map(|arr| arr.len() as i32)
+ .unwrap_or(0);
+
+ // Based on resume mode, handle differently
+ match req.resume_mode.as_str() {
+ "continue" => {
+ // Mark task for reassignment with existing conversation context
+ if let Err(e) = sqlx::query("UPDATE tasks SET status = 'pending' WHERE id = $1")
+ .bind(supervisor_state.task_id)
+ .execute(pool)
+ .await
+ {
+ tracing::error!("Failed to update task status: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ }
+ "restart_phase" => {
+ // Clear conversation but keep phase progress
+ if let Err(e) = repository::update_supervisor_conversation(
+ pool,
+ contract_id,
+ serde_json::json!([]),
+ )
+ .await
+ {
+ tracing::error!("Failed to clear conversation: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+
+ if let Err(e) = sqlx::query("UPDATE tasks SET status = 'pending' WHERE id = $1")
+ .bind(supervisor_state.task_id)
+ .execute(pool)
+ .await
+ {
+ tracing::error!("Failed to update task status: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ }
+ "from_checkpoint" => {
+ // This would require more complex handling with checkpoint system
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "NOT_IMPLEMENTED",
+ "from_checkpoint mode not yet implemented",
+ )),
+ )
+ .into_response();
+ }
+ _ => {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "INVALID_RESUME_MODE",
+ "Invalid resume_mode. Use: continue, restart_phase, or from_checkpoint",
+ )),
+ )
+ .into_response();
+ }
+ }
+
+ tracing::info!(
+ contract_id = %contract_id,
+ supervisor_task_id = %supervisor_state.task_id,
+ resume_mode = %req.resume_mode,
+ message_count = message_count,
+ "Supervisor resume requested"
+ );
+
+ Json(ResumeSupervisorResponse {
+ supervisor_task_id: supervisor_state.task_id,
+ daemon_id: supervisor_task.daemon_id,
+ resumed_from: ResumedFromInfo {
+ phase: contract.phase,
+ last_activity: supervisor_state.last_activity,
+ message_count,
+ },
+ status: "pending".to_string(),
+ })
+ .into_response()
+}
+
+/// Response for conversation rewind
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct RewindConversationResponse {
+ pub contract_id: Uuid,
+ pub messages_removed: i32,
+ pub new_message_count: i32,
+ pub code_rewound: bool,
+}
+
+/// Rewind supervisor conversation to specified point.
+///
+/// POST /api/v1/contracts/{id}/supervisor/conversation/rewind
+#[utoipa::path(
+ post,
+ path = "/api/v1/contracts/{id}/supervisor/conversation/rewind",
+ params(
+ ("id" = Uuid, Path, description = "Contract ID")
+ ),
+ request_body = crate::db::models::RewindConversationRequest,
+ responses(
+ (status = 200, description = "Conversation rewound", body = RewindConversationResponse),
+ (status = 400, description = "Invalid request", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Contract or supervisor not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh Supervisor"
+)]
+pub async fn rewind_conversation(
+ State(state): State<SharedState>,
+ Path(contract_id): Path<Uuid>,
+ auth: crate::server::auth::Authenticated,
+ Json(req): Json<crate::db::models::RewindConversationRequest>,
+) -> impl IntoResponse {
+ let crate::server::auth::Authenticated(auth_info) = auth;
+
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get contract and verify ownership
+ let _contract = match repository::get_contract_for_owner(pool, contract_id, auth_info.owner_id).await {
+ Ok(Some(c)) => c,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Contract not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get contract {}: {}", contract_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get supervisor state
+ let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await {
+ Ok(Some(s)) => s,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Supervisor state not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get supervisor state: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ let conversation = supervisor_state
+ .conversation_history
+ .as_array()
+ .cloned()
+ .unwrap_or_default();
+
+ let original_count = conversation.len() as i32;
+
+ // Determine how many messages to keep
+ let new_count = if let Some(by_count) = req.by_message_count {
+ (original_count - by_count).max(0)
+ } else if let Some(to_index) = req.to_message_index {
+ // Keep messages up to and including the specified index
+ (to_index + 1).min(original_count).max(0)
+ } else {
+ // Default to removing last message
+ (original_count - 1).max(0)
+ };
+
+ // Truncate conversation
+ let new_conversation: Vec<serde_json::Value> = conversation
+ .into_iter()
+ .take(new_count as usize)
+ .collect();
+
+ // Update the conversation
+ if let Err(e) = repository::update_supervisor_conversation(
+ pool,
+ contract_id,
+ serde_json::Value::Array(new_conversation),
+ )
+ .await
+ {
+ tracing::error!("Failed to update conversation: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+
+ tracing::info!(
+ contract_id = %contract_id,
+ original_count = original_count,
+ new_count = new_count,
+ messages_removed = original_count - new_count,
+ "Conversation rewound"
+ );
+
+ Json(RewindConversationResponse {
+ contract_id,
+ messages_removed: original_count - new_count,
+ new_message_count: new_count,
+ code_rewound: req.rewind_code.unwrap_or(false), // TODO: implement code rewind
+ })
+ .into_response()
+}