diff options
| author | soryu <soryu@soryu.co> | 2026-01-15 22:33:47 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-01-15 22:33:47 +0000 |
| commit | 6ee2e75834bff187b8c262e0798ef365bc21cd59 (patch) | |
| tree | d4bd61c7740835acfc9a9dff952d1d088ff6d535 /makima/src/server/handlers/history.rs | |
| parent | 908973b5c08a8b7b624880843c512e8bddf37896 (diff) | |
| download | soryu-6ee2e75834bff187b8c262e0798ef365bc21cd59.tar.gz soryu-6ee2e75834bff187b8c262e0798ef365bc21cd59.zip | |
Add resume and history system for makima (#1)
This PR implements a comprehensive resume and history system that enables:
1. **History Viewing**
- View complete conversation history for contracts across all phases
- View conversation history for individual tasks
- View task output/tool call history with timestamps
- View checkpoint history
- Timeline view showing all activities
2. **Resume System**
- Resume interrupted supervisor conversations with full context
- Resume interrupted task conversations
- Resume from specific checkpoints
- Continue tasks from previous task state (worktree inheritance)
3. **Rewind/Restore Features**
- Rewind code to any checkpoint (git restore)
- Rewind conversation to any point
- Create new branches from historical points
- Fork tasks from any point in history
- New migration: 20250117000000_history_tables.sql
- conversation_snapshots table for storing conversation state
- history_events table for unified timeline
- Added forking fields to tasks table
- Added conversation_snapshot_id to task_checkpoints
- ConversationSnapshot, HistoryEvent, ConversationMessage
- Request/response types for resume and rewind operations
- Query filter types for history endpoints
- CRUD functions for conversation_snapshots
- CRUD functions for history_events
- Task conversation retrieval from task_events
- GET /api/v1/contracts/{id}/history
- GET /api/v1/contracts/{id}/supervisor/conversation
- GET /api/v1/mesh/tasks/{id}/conversation
- GET /api/v1/timeline
- POST /api/v1/contracts/{id}/supervisor/resume
- POST /api/v1/mesh/tasks/{id}/rewind
- POST /api/v1/mesh/tasks/{id}/fork
- POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume
- POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch
- POST /api/v1/contracts/{id}/supervisor/conversation/rewind
- task-history: View task conversation history
- task-checkpoints: List task checkpoints
- resume: Resume supervisor after interruption
- task-resume-from: Resume task from checkpoint
- task-rewind: Rewind task code to checkpoint
- task-fork: Fork task from historical point
- rewind-conversation: Rewind supervisor conversation
Diffstat (limited to 'makima/src/server/handlers/history.rs')
| -rw-r--r-- | makima/src/server/handlers/history.rs | 438 |
1 files changed, 438 insertions, 0 deletions
diff --git a/makima/src/server/handlers/history.rs b/makima/src/server/handlers/history.rs new file mode 100644 index 0000000..b3dec97 --- /dev/null +++ b/makima/src/server/handlers/history.rs @@ -0,0 +1,438 @@ +//! HTTP handlers for history and conversation APIs. + +use axum::{ + extract::{Path, Query, State}, + http::StatusCode, + response::IntoResponse, + Json, +}; +use uuid::Uuid; + +use crate::{ + db::{ + models::{ + ContractHistoryResponse, ConversationMessage, HistoryQueryFilters, + SupervisorConversationResponse, TaskConversationResponse, TaskReference, + }, + repository, + }, + server::{auth::Authenticated, messages::ApiError, state::SharedState}, +}; + +/// Query parameters for task conversation +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TaskConversationParams { + pub include_tool_calls: Option<bool>, + pub include_tool_results: Option<bool>, + pub limit: Option<i32>, +} + +/// Query parameters for timeline +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TimelineQueryFilters { + pub contract_id: Option<Uuid>, + pub task_id: Option<Uuid>, + pub include_subtasks: Option<bool>, + pub from: Option<chrono::DateTime<chrono::Utc>>, + pub to: Option<chrono::DateTime<chrono::Utc>>, + pub limit: Option<i32>, +} + +/// GET /api/v1/contracts/{id}/history +/// Returns contract history timeline with filtering and pagination +#[utoipa::path( + get, + path = "/api/v1/contracts/{id}/history", + params( + ("id" = Uuid, Path, description = "Contract ID"), + ("phase" = Option<String>, Query, description = "Filter by phase"), + ("event_types" = Option<String>, Query, description = "Filter by event types (comma-separated)"), + ("from" = Option<String>, Query, description = "Start date filter"), + ("to" = Option<String>, Query, description = "End date filter"), + ("limit" = Option<i32>, Query, description = "Limit results"), + ), + responses( + (status = 200, description = "Contract history", body = ContractHistoryResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 403, description = "Forbidden", body = ApiError), + (status = 404, description = "Contract not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "History" +)] +pub async fn get_contract_history( + State(state): State<SharedState>, + Path(contract_id): Path<Uuid>, + Query(filters): Query<HistoryQueryFilters>, + Authenticated(auth): Authenticated, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify contract exists and user has access + let contract = match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", contract_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get history events + match repository::get_contract_history(pool, contract.id, auth.owner_id, &filters).await { + Ok((events, total_count)) => { + Json(ContractHistoryResponse { + contract_id, + entries: events, + total_count, + cursor: None, // TODO: implement cursor pagination + }) + .into_response() + } + Err(e) => { + tracing::error!("Failed to get contract history: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// GET /api/v1/contracts/{id}/supervisor/conversation +/// Returns full supervisor conversation with spawned task references +#[utoipa::path( + get, + path = "/api/v1/contracts/{id}/supervisor/conversation", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + responses( + (status = 200, description = "Supervisor conversation", body = SupervisorConversationResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 403, description = "Forbidden", body = ApiError), + (status = 404, description = "Supervisor not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "History" +)] +pub async fn get_supervisor_conversation( + State(state): State<SharedState>, + Path(contract_id): Path<Uuid>, + Authenticated(auth): Authenticated, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get contract for phase info and ownership check + let contract = match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", contract_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get the supervisor state + let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await { + Ok(Some(s)) => s, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Supervisor not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get supervisor state for {}: {}", contract_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Parse conversation history from JSONB + let messages: Vec<ConversationMessage> = supervisor_state + .conversation_history + .as_array() + .map(|arr| { + arr.iter() + .enumerate() + .map(|(i, v)| ConversationMessage { + id: i.to_string(), + role: v + .get("role") + .and_then(|r| r.as_str()) + .unwrap_or("user") + .to_string(), + content: v + .get("content") + .and_then(|c| c.as_str()) + .unwrap_or("") + .to_string(), + timestamp: supervisor_state.last_activity, + tool_calls: None, + tool_name: None, + tool_input: None, + tool_result: None, + is_error: None, + token_count: None, + cost_usd: None, + }) + .collect() + }) + .unwrap_or_default(); + + // Get spawned tasks + let tasks = match repository::list_contract_tasks(pool, contract_id).await { + Ok(t) => t, + Err(e) => { + tracing::warn!("Failed to get tasks for contract {}: {}", contract_id, e); + Vec::new() + } + }; + + let spawned_tasks: Vec<TaskReference> = tasks + .into_iter() + .filter(|t| !t.is_supervisor) + .map(|t| TaskReference { + task_id: t.id, + task_name: t.name, + status: t.status, + created_at: t.created_at, + completed_at: t.completed_at, + }) + .collect(); + + Json(SupervisorConversationResponse { + contract_id, + supervisor_task_id: supervisor_state.task_id, + phase: contract.phase, + last_activity: supervisor_state.last_activity, + pending_task_ids: supervisor_state.pending_task_ids, + messages, + spawned_tasks, + }) + .into_response() +} + +/// GET /api/v1/mesh/tasks/{id}/conversation +/// Returns task conversation history +#[utoipa::path( + get, + path = "/api/v1/mesh/tasks/{id}/conversation", + params( + ("id" = Uuid, Path, description = "Task ID"), + ("include_tool_calls" = Option<bool>, Query, description = "Include tool call messages"), + ("include_tool_results" = Option<bool>, Query, description = "Include tool result messages"), + ("limit" = Option<i32>, Query, description = "Limit messages"), + ), + responses( + (status = 200, description = "Task conversation", body = TaskConversationResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 403, description = "Forbidden", body = ApiError), + (status = 404, description = "Task not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "History" +)] +pub async fn get_task_conversation( + State(state): State<SharedState>, + Path(task_id): Path<Uuid>, + Query(params): Query<TaskConversationParams>, + Authenticated(auth): Authenticated, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get task and verify ownership + let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task {}: {}", task_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get conversation messages + let messages = match repository::get_task_conversation( + pool, + task_id, + params.include_tool_calls.unwrap_or(true), + params.include_tool_results.unwrap_or(true), + params.limit, + ) + .await + { + Ok(m) => m, + Err(e) => { + tracing::error!("Failed to get task conversation: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Calculate totals + let total_cost: f64 = messages.iter().filter_map(|m| m.cost_usd).sum(); + + Json(TaskConversationResponse { + task_id, + task_name: task.name, + status: task.status, + messages, + total_tokens: None, + total_cost: if total_cost > 0.0 { + Some(total_cost) + } else { + None + }, + }) + .into_response() +} + +/// GET /api/v1/timeline +/// Returns unified timeline for authenticated user +#[utoipa::path( + get, + path = "/api/v1/timeline", + params( + ("contract_id" = Option<Uuid>, Query, description = "Filter by contract"), + ("task_id" = Option<Uuid>, Query, description = "Filter by task"), + ("include_subtasks" = Option<bool>, Query, description = "Include subtask events"), + ("from" = Option<String>, Query, description = "Start date filter"), + ("to" = Option<String>, Query, description = "End date filter"), + ("limit" = Option<i32>, Query, description = "Limit results"), + ), + responses( + (status = 200, description = "Timeline events", body = ContractHistoryResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "History" +)] +pub async fn get_timeline( + State(state): State<SharedState>, + Query(filters): Query<TimelineQueryFilters>, + Authenticated(auth): Authenticated, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + let history_filters = HistoryQueryFilters { + phase: None, + event_types: None, + from: filters.from, + to: filters.to, + limit: filters.limit, + cursor: None, + }; + + let result = if let Some(contract_id) = filters.contract_id { + repository::get_contract_history(pool, contract_id, auth.owner_id, &history_filters).await + } else if let Some(task_id) = filters.task_id { + repository::get_task_history(pool, task_id, auth.owner_id, &history_filters).await + } else { + repository::get_timeline(pool, auth.owner_id, &history_filters).await + }; + + match result { + Ok((events, total_count)) => { + Json(ContractHistoryResponse { + contract_id: filters.contract_id.unwrap_or_default(), + entries: events, + total_count, + cursor: None, + }) + .into_response() + } + Err(e) => { + tracing::error!("Failed to get timeline: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} |
