diff options
| author | soryu <soryu@soryu.co> | 2026-01-15 22:33:47 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-01-15 22:33:47 +0000 |
| commit | 6ee2e75834bff187b8c262e0798ef365bc21cd59 (patch) | |
| tree | d4bd61c7740835acfc9a9dff952d1d088ff6d535 | |
| parent | 908973b5c08a8b7b624880843c512e8bddf37896 (diff) | |
| download | soryu-6ee2e75834bff187b8c262e0798ef365bc21cd59.tar.gz soryu-6ee2e75834bff187b8c262e0798ef365bc21cd59.zip | |
Add resume and history system for makima (#1)
This PR implements a comprehensive resume and history system that enables:
1. **History Viewing**
- View complete conversation history for contracts across all phases
- View conversation history for individual tasks
- View task output/tool call history with timestamps
- View checkpoint history
- Timeline view showing all activities
2. **Resume System**
- Resume interrupted supervisor conversations with full context
- Resume interrupted task conversations
- Resume from specific checkpoints
- Continue tasks from previous task state (worktree inheritance)
3. **Rewind/Restore Features**
- Rewind code to any checkpoint (git restore)
- Rewind conversation to any point
- Create new branches from historical points
- Fork tasks from any point in history
- New migration: 20250117000000_history_tables.sql
- conversation_snapshots table for storing conversation state
- history_events table for unified timeline
- Added forking fields to tasks table
- Added conversation_snapshot_id to task_checkpoints
- ConversationSnapshot, HistoryEvent, ConversationMessage
- Request/response types for resume and rewind operations
- Query filter types for history endpoints
- CRUD functions for conversation_snapshots
- CRUD functions for history_events
- Task conversation retrieval from task_events
- GET /api/v1/contracts/{id}/history
- GET /api/v1/contracts/{id}/supervisor/conversation
- GET /api/v1/mesh/tasks/{id}/conversation
- GET /api/v1/timeline
- POST /api/v1/contracts/{id}/supervisor/resume
- POST /api/v1/mesh/tasks/{id}/rewind
- POST /api/v1/mesh/tasks/{id}/fork
- POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume
- POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch
- POST /api/v1/contracts/{id}/supervisor/conversation/rewind
- task-history: View task conversation history
- task-checkpoints: List task checkpoints
- resume: Resume supervisor after interruption
- task-resume-from: Resume task from checkpoint
- task-rewind: Rewind task code to checkpoint
- task-fork: Fork task from historical point
- rewind-conversation: Rewind supervisor conversation
| -rw-r--r-- | makima/docs/PLAN-resume-history-system.md | 1316 | ||||
| -rw-r--r-- | makima/docs/REQUIREMENTS-resume-history-system.md | 232 | ||||
| -rw-r--r-- | makima/docs/SPEC-resume-history-system.md | 1000 | ||||
| -rw-r--r-- | makima/docs/USER-STORIES-resume-history-system.md | 240 | ||||
| -rw-r--r-- | makima/migrations/20250117000000_history_tables.sql | 55 | ||||
| -rw-r--r-- | makima/src/daemon/cli/mod.rs | 21 | ||||
| -rw-r--r-- | makima/src/daemon/cli/supervisor.rs | 161 | ||||
| -rw-r--r-- | makima/src/db/models.rs | 233 | ||||
| -rw-r--r-- | makima/src/db/repository.rs | 360 | ||||
| -rw-r--r-- | makima/src/server/handlers/history.rs | 438 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh.rs | 664 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_supervisor.rs | 381 | ||||
| -rw-r--r-- | makima/src/server/handlers/mod.rs | 1 | ||||
| -rw-r--r-- | makima/src/server/mod.rs | 8 |
14 files changed, 5106 insertions, 4 deletions
diff --git a/makima/docs/PLAN-resume-history-system.md b/makima/docs/PLAN-resume-history-system.md new file mode 100644 index 0000000..9e81c93 --- /dev/null +++ b/makima/docs/PLAN-resume-history-system.md @@ -0,0 +1,1316 @@ +# Resume and History System - Implementation Plan + +## Overview + +This document provides a detailed, actionable implementation plan for the Resume and History System. The system enables users to view historical conversation data, resume interrupted work, and rewind/restore to previous states in the Makima platform. + +**Key Reference Documents:** +- Specification: Resume and History System Specification +- Requirements: Requirements Document +- User Stories: User Stories Document + +--- + +## Phase 1: Database Schema + +**Objective:** Create the foundational database structures for storing conversation snapshots, history events, and supporting task forking. + +### Task 1.1: Create Database Migrations + +**Files to Create:** +- `makima/migrations/20250117000000_history_tables.sql` + +**Schema Changes:** + +```sql +-- 1. Conversation Snapshots table +-- Stores conversation state at specific points for rewind capability +CREATE TABLE IF NOT EXISTS conversation_snapshots ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + task_id UUID NOT NULL REFERENCES tasks(id) ON DELETE CASCADE, + checkpoint_id UUID REFERENCES task_checkpoints(id) ON DELETE SET NULL, + snapshot_type VARCHAR(50) NOT NULL, -- 'auto', 'manual', 'checkpoint' + message_count INTEGER NOT NULL, + conversation_state JSONB NOT NULL, -- Full conversation at this point + metadata JSONB, -- Additional context (token count, cost, etc.) + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_conversation_snapshots_task ON conversation_snapshots(task_id); +CREATE INDEX idx_conversation_snapshots_checkpoint ON conversation_snapshots(checkpoint_id); +CREATE INDEX idx_conversation_snapshots_created ON conversation_snapshots(created_at DESC); + +-- 2. History Events table +-- Unified event stream for timeline views +CREATE TABLE IF NOT EXISTS history_events ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + owner_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, + contract_id UUID REFERENCES contracts(id) ON DELETE CASCADE, + task_id UUID REFERENCES tasks(id) ON DELETE CASCADE, + event_type VARCHAR(50) NOT NULL, -- 'task', 'chat', 'checkpoint', 'phase', 'file' + event_subtype VARCHAR(50), -- Specific event: 'created', 'completed', 'message', etc. + phase VARCHAR(50), -- Contract phase when event occurred + event_data JSONB NOT NULL, -- Event-specific data + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_history_events_contract ON history_events(contract_id, created_at DESC); +CREATE INDEX idx_history_events_task ON history_events(task_id, created_at DESC); +CREATE INDEX idx_history_events_owner ON history_events(owner_id, created_at DESC); +CREATE INDEX idx_history_events_type ON history_events(event_type, created_at DESC); + +-- 3. Alter task_checkpoints - add conversation snapshot reference +ALTER TABLE task_checkpoints + ADD COLUMN conversation_snapshot_id UUID REFERENCES conversation_snapshots(id) ON DELETE SET NULL; + +-- 4. Alter tasks - add forking fields +ALTER TABLE tasks + ADD COLUMN forked_from_task_id UUID REFERENCES tasks(id) ON DELETE SET NULL, + ADD COLUMN forked_at_checkpoint_id UUID REFERENCES task_checkpoints(id) ON DELETE SET NULL; + +CREATE INDEX idx_tasks_forked_from ON tasks(forked_from_task_id) WHERE forked_from_task_id IS NOT NULL; + +-- Comments for documentation +COMMENT ON TABLE conversation_snapshots IS 'Stores conversation state at specific points for rewind/resume capability'; +COMMENT ON TABLE history_events IS 'Unified event stream for timeline views across contracts and tasks'; +COMMENT ON COLUMN conversation_snapshots.snapshot_type IS 'Type: auto (periodic), manual (user-triggered), checkpoint (at git checkpoint)'; +COMMENT ON COLUMN history_events.event_type IS 'Category: task, chat, checkpoint, phase, file'; +``` + +**Complexity:** Medium +**Dependencies:** None +**Estimated Time:** 2-3 hours + +--- + +## Phase 2: Repository Layer + +**Objective:** Implement database access functions for conversation snapshots, history events, and enhanced checkpoint operations. + +### Task 2.1: Core Models + +**Files to Modify:** +- `makima/src/db/models.rs` + +**New Types to Add:** + +```rust +// ConversationSnapshot model +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ConversationSnapshot { + pub id: Uuid, + pub task_id: Uuid, + pub checkpoint_id: Option<Uuid>, + pub snapshot_type: String, // 'auto', 'manual', 'checkpoint' + pub message_count: i32, + #[sqlx(json)] + pub conversation_state: serde_json::Value, + #[sqlx(json)] + pub metadata: Option<serde_json::Value>, + pub created_at: DateTime<Utc>, +} + +// HistoryEvent model +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct HistoryEvent { + pub id: Uuid, + pub owner_id: Uuid, + pub contract_id: Option<Uuid>, + pub task_id: Option<Uuid>, + pub event_type: String, + pub event_subtype: Option<String>, + pub phase: Option<String>, + #[sqlx(json)] + pub event_data: serde_json::Value, + pub created_at: DateTime<Utc>, +} + +// Unified ConversationMessage for API responses +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ConversationMessage { + pub id: String, + pub role: String, // 'user', 'assistant', 'system', 'tool' + pub content: String, + pub timestamp: DateTime<Utc>, + #[serde(skip_serializing_if = "Option::is_none")] + pub tool_calls: Option<Vec<ToolCallInfo>>, + #[serde(skip_serializing_if = "Option::is_none")] + pub tool_name: Option<String>, + #[serde(skip_serializing_if = "Option::is_none")] + pub tool_input: Option<serde_json::Value>, + #[serde(skip_serializing_if = "Option::is_none")] + pub tool_result: Option<String>, + #[serde(skip_serializing_if = "Option::is_none")] + pub is_error: Option<bool>, + #[serde(skip_serializing_if = "Option::is_none")] + pub token_count: Option<i32>, + #[serde(skip_serializing_if = "Option::is_none")] + pub cost_usd: Option<f64>, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ToolCallInfo { + pub id: String, + pub name: String, + pub input: serde_json::Value, +} + +// Query filters for history +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct HistoryQueryFilters { + pub phase: Option<String>, + pub event_types: Option<Vec<String>>, + pub from: Option<DateTime<Utc>>, + pub to: Option<DateTime<Utc>>, + pub limit: Option<i32>, + pub cursor: Option<String>, +} + +// Resume request types +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ResumeSupervisorRequest { + pub target_daemon_id: Option<Uuid>, + pub resume_mode: String, // 'continue', 'restart_phase', 'from_checkpoint' + pub checkpoint_id: Option<Uuid>, + pub additional_context: Option<String>, +} + +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ResumeFromCheckpointRequest { + pub task_name: Option<String>, + pub plan: String, + pub include_conversation: Option<bool>, + pub target_daemon_id: Option<Uuid>, +} + +// Rewind request types +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct RewindTaskRequest { + pub checkpoint_id: Option<Uuid>, + pub checkpoint_sha: Option<String>, + pub preserve_mode: String, // 'discard', 'create_branch', 'stash' + pub branch_name: Option<String>, +} + +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct RewindConversationRequest { + pub to_message_id: Option<String>, + pub to_timestamp: Option<DateTime<Utc>>, + pub by_message_count: Option<i32>, + pub rewind_code: Option<bool>, +} + +// Fork request type +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ForkTaskRequest { + pub fork_from_type: String, // 'checkpoint', 'timestamp', 'message_id' + pub fork_from_value: String, + pub new_task_name: String, + pub new_task_plan: String, + pub include_conversation: Option<bool>, + pub create_branch: Option<bool>, + pub branch_name: Option<String>, +} +``` + +**Complexity:** Medium +**Dependencies:** Phase 1 complete +**Estimated Time:** 2-3 hours + +### Task 2.2: Repository Functions + +**Files to Modify:** +- `makima/src/db/repository.rs` + +**Functions to Add:** + +```rust +// ============================================================================ +// Conversation Snapshots +// ============================================================================ + +/// Create a new conversation snapshot +pub async fn create_conversation_snapshot( + pool: &PgPool, + task_id: Uuid, + checkpoint_id: Option<Uuid>, + snapshot_type: &str, + message_count: i32, + conversation_state: serde_json::Value, + metadata: Option<serde_json::Value>, +) -> Result<ConversationSnapshot, sqlx::Error> + +/// Get a conversation snapshot by ID +pub async fn get_conversation_snapshot( + pool: &PgPool, + id: Uuid, +) -> Result<Option<ConversationSnapshot>, sqlx::Error> + +/// Get conversation snapshot at a specific checkpoint +pub async fn get_conversation_at_checkpoint( + pool: &PgPool, + checkpoint_id: Uuid, +) -> Result<Option<ConversationSnapshot>, sqlx::Error> + +/// List conversation snapshots for a task +pub async fn list_conversation_snapshots( + pool: &PgPool, + task_id: Uuid, + limit: Option<i32>, +) -> Result<Vec<ConversationSnapshot>, sqlx::Error> + +/// Delete conversation snapshots older than retention period +pub async fn cleanup_old_snapshots( + pool: &PgPool, + retention_days: i32, +) -> Result<u64, sqlx::Error> + +// ============================================================================ +// History Events +// ============================================================================ + +/// Record a new history event +pub async fn record_history_event( + pool: &PgPool, + owner_id: Uuid, + contract_id: Option<Uuid>, + task_id: Option<Uuid>, + event_type: &str, + event_subtype: Option<&str>, + phase: Option<&str>, + event_data: serde_json::Value, +) -> Result<HistoryEvent, sqlx::Error> + +/// Get contract history timeline +pub async fn get_contract_history( + pool: &PgPool, + contract_id: Uuid, + owner_id: Uuid, + filters: &HistoryQueryFilters, +) -> Result<(Vec<HistoryEvent>, i64), sqlx::Error> + +/// Get task history +pub async fn get_task_history( + pool: &PgPool, + task_id: Uuid, + owner_id: Uuid, + filters: &HistoryQueryFilters, +) -> Result<(Vec<HistoryEvent>, i64), sqlx::Error> + +/// Get unified timeline for an owner +pub async fn get_timeline( + pool: &PgPool, + owner_id: Uuid, + filters: &HistoryQueryFilters, +) -> Result<(Vec<HistoryEvent>, i64), sqlx::Error> + +// ============================================================================ +// Task Conversation Retrieval +// ============================================================================ + +/// Get task conversation messages (reconstructed from task_events) +pub async fn get_task_conversation( + pool: &PgPool, + task_id: Uuid, + include_tool_calls: bool, + include_tool_results: bool, + limit: Option<i32>, +) -> Result<Vec<ConversationMessage>, sqlx::Error> + +/// Get supervisor conversation (from supervisor_states) +pub async fn get_supervisor_conversation( + pool: &PgPool, + contract_id: Uuid, +) -> Result<Option<(SupervisorState, Vec<TaskSummary>)>, sqlx::Error> + +// ============================================================================ +// Checkpoint Operations +// ============================================================================ + +/// Create checkpoint with conversation snapshot +pub async fn create_checkpoint_with_snapshot( + pool: &PgPool, + task_id: Uuid, + checkpoint_number: i32, + commit_sha: &str, + branch_name: &str, + message: &str, + files_changed: Option<serde_json::Value>, + lines_added: Option<i32>, + lines_removed: Option<i32>, + conversation_state: serde_json::Value, +) -> Result<(TaskCheckpoint, ConversationSnapshot), sqlx::Error> + +/// Get checkpoint diff information (requires daemon interaction for actual diff) +pub async fn get_checkpoint_info( + pool: &PgPool, + checkpoint_id: Uuid, +) -> Result<Option<(TaskCheckpoint, Option<TaskCheckpoint>)>, sqlx::Error> + +// ============================================================================ +// Fork Operations +// ============================================================================ + +/// Create forked task +pub async fn create_forked_task( + pool: &PgPool, + owner_id: Uuid, + source_task_id: Uuid, + checkpoint_id: Option<Uuid>, + req: &CreateTaskRequest, +) -> Result<Task, sqlx::Error> +``` + +**Complexity:** Complex +**Dependencies:** Task 2.1 complete +**Estimated Time:** 4-5 hours + +--- + +## Phase 3: API Endpoints + +**Objective:** Implement REST API endpoints for history viewing, resume operations, and rewind/fork functionality. + +### Task 3.1: History Endpoints + +**Files to Create/Modify:** +- `makima/src/server/handlers/history.rs` (new file) +- `makima/src/server/handlers/mod.rs` (add module) +- `makima/src/server/mod.rs` (add routes) + +**Endpoints to Implement:** + +| Method | Path | Handler Function | +|--------|------|------------------| +| GET | `/api/v1/contracts/{id}/history` | `get_contract_history` | +| GET | `/api/v1/contracts/{id}/supervisor/conversation` | `get_supervisor_conversation` | +| GET | `/api/v1/mesh/tasks/{id}/conversation` | `get_task_conversation` | +| GET | `/api/v1/mesh/tasks/{id}/checkpoints/{cid}/diff` | `get_checkpoint_diff` | +| GET | `/api/v1/timeline` | `get_timeline` | + +**Implementation Details:** + +```rust +// makima/src/server/handlers/history.rs + +/// GET /api/v1/contracts/{id}/history +/// Returns contract history timeline with filtering and pagination +#[utoipa::path( + get, + path = "/api/v1/contracts/{id}/history", + params( + ("id" = Uuid, Path, description = "Contract ID"), + HistoryQueryFilters + ), + responses( + (status = 200, body = ContractHistoryResponse), + (status = 404, description = "Contract not found"), + ), + tag = "history" +)] +pub async fn get_contract_history( + State(state): State<AppState>, + Path(contract_id): Path<Uuid>, + Query(filters): Query<HistoryQueryFilters>, + auth: AuthenticatedUser, +) -> Result<Json<ContractHistoryResponse>, ApiError> + +/// GET /api/v1/contracts/{id}/supervisor/conversation +/// Returns full supervisor conversation with spawned task references +#[utoipa::path( + get, + path = "/api/v1/contracts/{id}/supervisor/conversation", + responses( + (status = 200, body = SupervisorConversationResponse), + (status = 404, description = "Supervisor not found"), + ), + tag = "history" +)] +pub async fn get_supervisor_conversation( + State(state): State<AppState>, + Path(contract_id): Path<Uuid>, + auth: AuthenticatedUser, +) -> Result<Json<SupervisorConversationResponse>, ApiError> + +/// GET /api/v1/mesh/tasks/{id}/conversation +/// Returns task conversation history +#[utoipa::path( + get, + path = "/api/v1/mesh/tasks/{id}/conversation", + params( + ("id" = Uuid, Path, description = "Task ID"), + ("include_tool_calls" = Option<bool>, Query), + ("include_tool_results" = Option<bool>, Query), + ("limit" = Option<i32>, Query), + ), + responses( + (status = 200, body = TaskConversationResponse), + (status = 404, description = "Task not found"), + ), + tag = "history" +)] +pub async fn get_task_conversation( + State(state): State<AppState>, + Path(task_id): Path<Uuid>, + Query(params): Query<TaskConversationParams>, + auth: AuthenticatedUser, +) -> Result<Json<TaskConversationResponse>, ApiError> + +/// GET /api/v1/mesh/tasks/{id}/checkpoints/{cid}/diff +/// Returns checkpoint diff (delegates to daemon for git diff) +pub async fn get_checkpoint_diff( + State(state): State<AppState>, + Path((task_id, checkpoint_id)): Path<(Uuid, Uuid)>, + auth: AuthenticatedUser, +) -> Result<Json<CheckpointDiffResponse>, ApiError> + +/// GET /api/v1/timeline +/// Returns unified timeline for authenticated user +pub async fn get_timeline( + State(state): State<AppState>, + Query(filters): Query<TimelineQueryFilters>, + auth: AuthenticatedUser, +) -> Result<Json<TimelineResponse>, ApiError> +``` + +**Complexity:** Medium +**Dependencies:** Phase 2 complete +**Estimated Time:** 4-5 hours + +### Task 3.2: Resume Endpoints + +**Files to Modify:** +- `makima/src/server/handlers/mesh_supervisor.rs` +- `makima/src/server/handlers/mesh.rs` + +**Endpoints to Implement:** + +| Method | Path | Handler Function | +|--------|------|------------------| +| POST | `/api/v1/contracts/{id}/supervisor/resume` | `resume_supervisor` | +| POST | `/api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume` | `resume_from_checkpoint` | +| POST | `/api/v1/mesh/tasks/{id}/continue` | `continue_task` (enhanced) | + +**Implementation Details:** + +```rust +// makima/src/server/handlers/mesh_supervisor.rs + +/// POST /api/v1/contracts/{id}/supervisor/resume +/// Resume interrupted supervisor with specified mode +#[utoipa::path( + post, + path = "/api/v1/contracts/{id}/supervisor/resume", + request_body = ResumeSupervisorRequest, + responses( + (status = 200, body = ResumeSupervisorResponse), + (status = 404, description = "Contract/supervisor not found"), + (status = 409, description = "Supervisor already running"), + ), + tag = "supervisor" +)] +pub async fn resume_supervisor( + State(state): State<AppState>, + Path(contract_id): Path<Uuid>, + auth: AuthenticatedUser, + Json(req): Json<ResumeSupervisorRequest>, +) -> Result<Json<ResumeSupervisorResponse>, ApiError> + +// makima/src/server/handlers/mesh.rs + +/// POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume +/// Create new task starting from specific checkpoint +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume", + request_body = ResumeFromCheckpointRequest, + responses( + (status = 201, body = ResumeFromCheckpointResponse), + (status = 404, description = "Task/checkpoint not found"), + ), + tag = "mesh" +)] +pub async fn resume_from_checkpoint( + State(state): State<AppState>, + Path((task_id, checkpoint_id)): Path<(Uuid, Uuid)>, + auth: AuthenticatedUser, + Json(req): Json<ResumeFromCheckpointRequest>, +) -> Result<Json<ResumeFromCheckpointResponse>, ApiError> + +/// POST /api/v1/mesh/tasks/{id}/continue (enhanced) +/// Enhanced with resume mode and context options +pub async fn continue_task( + State(state): State<AppState>, + Path(task_id): Path<Uuid>, + auth: AuthenticatedUser, + Json(req): Json<ContinueTaskRequest>, // Enhanced request type +) -> Result<Json<Task>, ApiError> +``` + +**Complexity:** Complex +**Dependencies:** Task 3.1 complete +**Estimated Time:** 5-6 hours + +### Task 3.3: Rewind and Fork Endpoints + +**Files to Modify:** +- `makima/src/server/handlers/mesh.rs` +- `makima/src/server/handlers/mesh_supervisor.rs` + +**Endpoints to Implement:** + +| Method | Path | Handler Function | +|--------|------|------------------| +| POST | `/api/v1/mesh/tasks/{id}/rewind` | `rewind_task` | +| POST | `/api/v1/contracts/{id}/supervisor/conversation/rewind` | `rewind_conversation` | +| POST | `/api/v1/mesh/tasks/{id}/fork` | `fork_task` | +| POST | `/api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch` | `branch_from_checkpoint` | + +**Implementation Details:** + +```rust +// makima/src/server/handlers/mesh.rs + +/// POST /api/v1/mesh/tasks/{id}/rewind +/// Rewind task code to specified checkpoint +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/rewind", + request_body = RewindTaskRequest, + responses( + (status = 200, body = RewindTaskResponse), + (status = 404, description = "Task/checkpoint not found"), + (status = 409, description = "Task is running"), + ), + tag = "mesh" +)] +pub async fn rewind_task( + State(state): State<AppState>, + Path(task_id): Path<Uuid>, + auth: AuthenticatedUser, + Json(req): Json<RewindTaskRequest>, +) -> Result<Json<RewindTaskResponse>, ApiError> + +/// POST /api/v1/mesh/tasks/{id}/fork +/// Fork task from historical point +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/fork", + request_body = ForkTaskRequest, + responses( + (status = 201, body = ForkTaskResponse), + (status = 404, description = "Task not found"), + ), + tag = "mesh" +)] +pub async fn fork_task( + State(state): State<AppState>, + Path(task_id): Path<Uuid>, + auth: AuthenticatedUser, + Json(req): Json<ForkTaskRequest>, +) -> Result<Json<ForkTaskResponse>, ApiError> + +/// POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch +/// Create git branch from checkpoint without starting task +pub async fn branch_from_checkpoint( + State(state): State<AppState>, + Path((task_id, checkpoint_id)): Path<(Uuid, Uuid)>, + auth: AuthenticatedUser, + Json(req): Json<CreateBranchFromCheckpointRequest>, +) -> Result<Json<BranchCreatedResponse>, ApiError> + +// makima/src/server/handlers/mesh_supervisor.rs + +/// POST /api/v1/contracts/{id}/supervisor/conversation/rewind +/// Rewind supervisor conversation to specified point +pub async fn rewind_conversation( + State(state): State<AppState>, + Path(contract_id): Path<Uuid>, + auth: AuthenticatedUser, + Json(req): Json<RewindConversationRequest>, +) -> Result<Json<RewindConversationResponse>, ApiError> +``` + +**Complexity:** Complex +**Dependencies:** Task 3.2 complete +**Estimated Time:** 5-6 hours + +--- + +## Phase 4: CLI Commands + +**Objective:** Implement command-line interface for history viewing, resume, and rewind operations. + +### Task 4.1: History Commands + +**Files to Modify:** +- `makima/src/daemon/cli/mod.rs` +- `makima/src/daemon/cli/contract.rs` +- `makima/src/daemon/cli/supervisor.rs` + +**New Commands:** + +```rust +// makima/src/daemon/cli/mod.rs + +/// Contract subcommands - add history commands +#[derive(Subcommand, Debug)] +pub enum ContractCommand { + // ... existing commands ... + + /// View contract history timeline + History(contract::HistoryArgs), +} + +/// Supervisor subcommands - add history and resume commands +#[derive(Subcommand, Debug)] +pub enum SupervisorCommand { + // ... existing commands ... + + /// View task conversation history + TaskHistory(supervisor::TaskHistoryArgs), + + /// List checkpoints with details + CheckpointList(supervisor::CheckpointListArgs), + + /// View checkpoint diff + CheckpointDiff(supervisor::CheckpointDiffArgs), + + /// Resume supervisor after interruption + Resume(supervisor::ResumeArgs), +} +``` + +**Implementation - Contract History:** + +```rust +// makima/src/daemon/cli/contract.rs + +#[derive(Args, Debug)] +pub struct HistoryArgs { + /// Filter by phase (research, specify, plan, execute, review) + #[arg(long)] + pub phase: Option<String>, + + /// Filter from date (ISO 8601 format) + #[arg(long)] + pub from: Option<String>, + + /// Filter to date (ISO 8601 format) + #[arg(long)] + pub to: Option<String>, + + /// Maximum entries to return + #[arg(long, default_value = "50")] + pub limit: i32, + + /// Output format (table, json) + #[arg(long, default_value = "table")] + pub format: String, +} + +pub async fn handle_history(args: &HistoryArgs) -> Result<()> { + // 1. Get contract context from environment + // 2. Call /api/v1/contracts/{id}/history with filters + // 3. Format and display results +} +``` + +**Implementation - Task History:** + +```rust +// makima/src/daemon/cli/supervisor.rs + +#[derive(Args, Debug)] +pub struct TaskHistoryArgs { + /// Task ID to view history for + pub task_id: Uuid, + + /// Include tool calls in output + #[arg(long, default_value = "true")] + pub tool_calls: bool, + + /// Maximum messages to return + #[arg(long)] + pub limit: Option<i32>, + + /// Output format (table, json, chat) + #[arg(long, default_value = "chat")] + pub format: String, +} + +pub async fn handle_task_history(args: &TaskHistoryArgs) -> Result<()> { + // 1. Call /api/v1/mesh/tasks/{id}/conversation + // 2. Format as chat-style output or JSON +} + +#[derive(Args, Debug)] +pub struct CheckpointListArgs { + /// Task ID to list checkpoints for + pub task_id: Uuid, + + /// Include diff summary + #[arg(long)] + pub with_diff: bool, +} + +#[derive(Args, Debug)] +pub struct CheckpointDiffArgs { + /// Task ID + pub task_id: Uuid, + + /// Checkpoint number + pub checkpoint_number: i32, +} +``` + +**Complexity:** Medium +**Dependencies:** Phase 3 complete +**Estimated Time:** 3-4 hours + +### Task 4.2: Resume and Rewind Commands + +**Files to Modify:** +- `makima/src/daemon/cli/supervisor.rs` +- `makima/src/daemon/cli/contract.rs` + +**New Commands:** + +```rust +// makima/src/daemon/cli/supervisor.rs + +#[derive(Args, Debug)] +pub struct ResumeArgs { + /// Resume mode: continue, restart_phase, from_checkpoint + #[arg(long, default_value = "continue")] + pub mode: String, + + /// Checkpoint ID (required for from_checkpoint mode) + #[arg(long)] + pub checkpoint: Option<Uuid>, + + /// Additional context to inject + #[arg(long)] + pub context: Option<String>, +} + +pub async fn handle_resume(args: &ResumeArgs) -> Result<()> { + // 1. Get contract context + // 2. Call /api/v1/contracts/{id}/supervisor/resume + // 3. Display result and new supervisor task info +} + +#[derive(Args, Debug)] +pub struct TaskResumeArgs { + /// Task ID to resume + pub task_id: Uuid, + + /// Resume mode: with_context, clean_restart, from_checkpoint + #[arg(long, default_value = "with_context")] + pub mode: String, + + /// Checkpoint SHA (for from_checkpoint mode) + #[arg(long)] + pub checkpoint: Option<String>, +} + +#[derive(Args, Debug)] +pub struct TaskResumeFromArgs { + /// Source task ID + pub task_id: Uuid, + + /// Checkpoint number to resume from + #[arg(long)] + pub checkpoint: i32, + + /// Plan for the new task + #[arg(long)] + pub plan: String, + + /// Name for the new task + #[arg(long)] + pub name: Option<String>, +} + +#[derive(Args, Debug)] +pub struct TaskRewindArgs { + /// Task ID to rewind + pub task_id: Uuid, + + /// Checkpoint number to rewind to + #[arg(long)] + pub checkpoint: i32, + + /// Preserve mode: discard, create_branch, stash + #[arg(long, default_value = "create_branch")] + pub preserve: String, + + /// Branch name (for create_branch mode) + #[arg(long)] + pub branch_name: Option<String>, +} + +#[derive(Args, Debug)] +pub struct TaskForkArgs { + /// Source task ID + pub task_id: Uuid, + + /// Checkpoint number to fork from + #[arg(long)] + pub checkpoint: i32, + + /// Name for the new task + #[arg(long)] + pub name: String, + + /// Plan for the new task + #[arg(long)] + pub plan: String, + + /// Include conversation history + #[arg(long, default_value = "true")] + pub include_conversation: bool, +} + +#[derive(Args, Debug)] +pub struct ConversationRewindArgs { + /// Number of messages to rewind + #[arg(long)] + pub by_messages: Option<i32>, + + /// Message ID to rewind to + #[arg(long)] + pub to_message: Option<String>, + + /// Also rewind code to matching checkpoint + #[arg(long)] + pub rewind_code: bool, +} +``` + +**Update CLI Commands enum:** + +```rust +// makima/src/daemon/cli/mod.rs + +#[derive(Subcommand, Debug)] +pub enum SupervisorCommand { + // ... existing commands ... + + /// Resume supervisor after interruption + Resume(supervisor::ResumeArgs), + + /// Resume task with context + TaskResume(supervisor::TaskResumeArgs), + + /// Resume from specific checkpoint + TaskResumeFrom(supervisor::TaskResumeFromArgs), + + /// Rewind task code to checkpoint + TaskRewind(supervisor::TaskRewindArgs), + + /// Fork task from historical point + TaskFork(supervisor::TaskForkArgs), + + /// Rewind supervisor conversation + RewindConversation(supervisor::ConversationRewindArgs), +} +``` + +**Complexity:** Medium +**Dependencies:** Task 4.1 complete +**Estimated Time:** 4-5 hours + +--- + +## Phase 5: Daemon Integration + +**Objective:** Extend daemon protocol and handlers to support rewind operations and conversation snapshots. + +### Task 5.1: Protocol Extensions + +**Files to Modify:** +- `makima/src/daemon/ws/protocol.rs` +- `makima/src/server/protocol.rs` (if separate) + +**New Commands to Add:** + +```rust +// makima/src/daemon/ws/protocol.rs + +/// Command from server to daemon +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "camelCase")] +pub enum DaemonCommand { + // ... existing commands ... + + /// Rewind task worktree to a specific checkpoint + RewindToCheckpoint { + #[serde(rename = "taskId")] + task_id: Uuid, + #[serde(rename = "checkpointSha")] + checkpoint_sha: String, + /// How to preserve current state: 'discard', 'create_branch', 'stash' + #[serde(rename = "preserveMode")] + preserve_mode: String, + /// Branch name for create_branch mode + #[serde(rename = "branchName")] + branch_name: Option<String>, + }, + + /// Create a conversation snapshot for a task + CreateConversationSnapshot { + #[serde(rename = "taskId")] + task_id: Uuid, + }, + + /// Get git diff between two commits + GetCheckpointDiff { + #[serde(rename = "taskId")] + task_id: Uuid, + /// SHA of the checkpoint to diff + #[serde(rename = "checkpointSha")] + checkpoint_sha: String, + /// SHA of the previous checkpoint (for comparison) + #[serde(rename = "previousSha")] + previous_sha: Option<String>, + }, +} + +/// Message from daemon to server +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "camelCase")] +pub enum DaemonMessage { + // ... existing messages ... + + /// Response to RewindToCheckpoint command + RewindResult { + #[serde(rename = "taskId")] + task_id: Uuid, + success: bool, + message: String, + /// Reference to preserved state (branch name or stash ref) + #[serde(rename = "preservedAs")] + preserved_as: Option<PreservedState>, + }, + + /// Response to CreateConversationSnapshot command + ConversationSnapshotCreated { + #[serde(rename = "taskId")] + task_id: Uuid, + #[serde(rename = "snapshotId")] + snapshot_id: Uuid, + #[serde(rename = "messageCount")] + message_count: i32, + }, + + /// Response to GetCheckpointDiff command + CheckpointDiffResult { + #[serde(rename = "taskId")] + task_id: Uuid, + success: bool, + diff: Option<GitDiffInfo>, + error: Option<String>, + }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct PreservedState { + /// Type: 'branch' or 'stash' + #[serde(rename = "type")] + pub state_type: String, + /// Branch name or stash reference + pub reference: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct GitDiffInfo { + pub files: Vec<FileDiffInfo>, + pub stats: DiffStats, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct FileDiffInfo { + pub path: String, + /// Action: 'added', 'modified', 'deleted' + pub action: String, + pub additions: i32, + pub deletions: i32, + /// Diff hunks (optional, may be truncated for large files) + pub hunks: Option<Vec<DiffHunk>>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct DiffHunk { + pub header: String, + pub lines: Vec<String>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct DiffStats { + pub files_changed: i32, + pub insertions: i32, + pub deletions: i32, +} +``` + +**Complexity:** Medium +**Dependencies:** None (can be done in parallel with Phase 3-4) +**Estimated Time:** 2-3 hours + +### Task 5.2: Daemon Handlers + +**Files to Modify:** +- `makima/src/daemon/task/manager.rs` +- `makima/src/daemon/worktree/manager.rs` + +**Handlers to Implement:** + +```rust +// makima/src/daemon/task/manager.rs + +impl TaskManager { + /// Handle rewind to checkpoint command + pub async fn handle_rewind_to_checkpoint( + &mut self, + task_id: Uuid, + checkpoint_sha: &str, + preserve_mode: &str, + branch_name: Option<&str>, + ) -> Result<RewindResult, DaemonError> { + // 1. Validate task exists and is not running + // 2. Get worktree path for task + // 3. Based on preserve_mode: + // - 'discard': git reset --hard {sha} + // - 'create_branch': git branch {name} && git reset --hard {sha} + // - 'stash': git stash && git reset --hard {sha} + // 4. Return result with preserved reference + } + + /// Handle conversation snapshot creation + pub async fn handle_create_conversation_snapshot( + &mut self, + task_id: Uuid, + ) -> Result<SnapshotResult, DaemonError> { + // 1. Get task state including conversation + // 2. Send to server for storage + // 3. Return snapshot ID + } + + /// Handle checkpoint diff request + pub async fn handle_get_checkpoint_diff( + &mut self, + task_id: Uuid, + checkpoint_sha: &str, + previous_sha: Option<&str>, + ) -> Result<GitDiffInfo, DaemonError> { + // 1. Get worktree path for task + // 2. Run git diff command + // 3. Parse and return diff info + } +} +``` + +**Files to Modify:** +- `makima/src/daemon/worktree/manager.rs` + +**Helper Functions:** + +```rust +// makima/src/daemon/worktree/manager.rs + +impl WorktreeManager { + /// Reset worktree to specific commit + pub async fn reset_to_commit( + &self, + worktree_path: &Path, + commit_sha: &str, + ) -> Result<(), WorktreeError> + + /// Create branch from current state + pub async fn create_branch( + &self, + worktree_path: &Path, + branch_name: &str, + ) -> Result<(), WorktreeError> + + /// Stash current changes + pub async fn stash_changes( + &self, + worktree_path: &Path, + ) -> Result<String, WorktreeError> // Returns stash ref + + /// Get diff between two commits + pub async fn get_diff( + &self, + worktree_path: &Path, + from_sha: &str, + to_sha: &str, + ) -> Result<String, WorktreeError> + + /// Parse git diff output into structured format + pub fn parse_diff(diff_output: &str) -> Result<GitDiffInfo, ParseError> +} +``` + +**Complexity:** Complex +**Dependencies:** Task 5.1 complete +**Estimated Time:** 4-5 hours + +### Task 5.3: Checkpoint Enhancement + +**Files to Modify:** +- `makima/src/daemon/task/manager.rs` +- `makima/src/server/handlers/mesh_daemon.rs` + +**Changes:** + +1. **Update checkpoint creation to include conversation state:** + +```rust +// When creating a checkpoint, also capture current conversation +pub async fn create_checkpoint_with_conversation( + &mut self, + task_id: Uuid, + message: &str, +) -> Result<(TaskCheckpoint, ConversationSnapshot), DaemonError> { + // 1. Create git commit (existing logic) + // 2. Capture current conversation state + // 3. Create checkpoint record with snapshot reference +} +``` + +2. **Update task spawning to handle fork/resume scenarios:** + +```rust +// When spawning task with forked_from_task_id or checkpoint_sha: +// - If checkpoint_sha: create worktree at that specific commit +// - If include_conversation: inject conversation history into plan +``` + +**Complexity:** Medium +**Dependencies:** Task 5.2 complete +**Estimated Time:** 3-4 hours + +--- + +## Implementation Summary + +### Phase Dependencies + +``` +Phase 1 (Database) + └─> Phase 2 (Repository) + └─> Phase 3 (API Endpoints) + └─> Phase 4 (CLI Commands) + +Phase 5 (Daemon) can be done in parallel with Phases 3-4 +``` + +### Estimated Total Time + +| Phase | Tasks | Estimated Hours | +|-------|-------|-----------------| +| Phase 1: Database Schema | 1 task | 2-3 hours | +| Phase 2: Repository Layer | 2 tasks | 6-8 hours | +| Phase 3: API Endpoints | 3 tasks | 14-17 hours | +| Phase 4: CLI Commands | 2 tasks | 7-9 hours | +| Phase 5: Daemon Integration | 3 tasks | 9-12 hours | +| **Total** | **11 tasks** | **38-49 hours** | + +### Task Breakdown by Complexity + +| Complexity | Count | Tasks | +|------------|-------|-------| +| Simple | 1 | Phase 1 Migration | +| Medium | 6 | Models, History Endpoints, CLI History, CLI Resume, Protocol, Checkpoint Enhancement | +| Complex | 4 | Repository Functions, Resume Endpoints, Rewind/Fork Endpoints, Daemon Handlers | + +### Files to Create + +1. `makima/migrations/20250117000000_history_tables.sql` +2. `makima/src/server/handlers/history.rs` + +### Files to Modify + +**Database Layer:** +- `makima/src/db/models.rs` +- `makima/src/db/repository.rs` + +**API Layer:** +- `makima/src/server/handlers/mod.rs` +- `makima/src/server/handlers/mesh.rs` +- `makima/src/server/handlers/mesh_supervisor.rs` +- `makima/src/server/mod.rs` + +**CLI Layer:** +- `makima/src/daemon/cli/mod.rs` +- `makima/src/daemon/cli/contract.rs` +- `makima/src/daemon/cli/supervisor.rs` + +**Daemon Layer:** +- `makima/src/daemon/ws/protocol.rs` +- `makima/src/daemon/task/manager.rs` +- `makima/src/daemon/worktree/manager.rs` +- `makima/src/server/handlers/mesh_daemon.rs` + +--- + +## Testing Strategy + +### Unit Tests +- Repository functions: CRUD operations for snapshots and events +- Model serialization/deserialization +- Diff parsing functions + +### Integration Tests +- API endpoint tests with mock database +- CLI command execution tests +- Daemon command handling tests + +### End-to-End Tests +- Complete workflow: Create task -> Checkpoint -> Rewind -> Resume +- Fork workflow: Create task -> Checkpoint -> Fork -> Verify state +- History viewing across contract lifecycle + +### Manual Testing Checklist +- [ ] Create conversation snapshot at checkpoint +- [ ] View contract history timeline with filters +- [ ] View task conversation history +- [ ] Resume interrupted supervisor (all modes) +- [ ] Resume task from specific checkpoint +- [ ] Rewind code with branch preservation +- [ ] Rewind conversation to message +- [ ] Fork task from checkpoint +- [ ] CLI commands output correctly formatted + +--- + +## Security Considerations + +1. **Owner verification**: All operations verify owner_id authorization +2. **Tool key authentication**: Supervisor operations require valid tool keys +3. **Data isolation**: Users cannot access other users' history +4. **Audit logging**: Log all rewind/fork operations for traceability + +## Performance Considerations + +1. **Pagination**: All list endpoints support cursor-based pagination +2. **Index usage**: Queries use appropriate indexes (contract_id, task_id, created_at) +3. **Snapshot cleanup**: Implement retention policy for old snapshots +4. **Response limits**: Cap response sizes for conversation history diff --git a/makima/docs/REQUIREMENTS-resume-history-system.md b/makima/docs/REQUIREMENTS-resume-history-system.md new file mode 100644 index 0000000..3a7cac9 --- /dev/null +++ b/makima/docs/REQUIREMENTS-resume-history-system.md @@ -0,0 +1,232 @@ +# Resume and History System - Requirements Document + +## 1. Executive Summary + +The Resume and History System enables users to view historical conversations, resume interrupted work, and rewind/restore to previous states within the Makima platform. This system is essential for recovering from interruptions, exploring alternative approaches, and maintaining visibility into AI-assisted development activities. + +## 2. Functional Requirements + +### 2.1 History Viewing (FR-HV) + +| ID | Requirement | Priority | +|----|-------------|----------| +| FR-HV-01 | System shall display complete conversation history for contracts across all phases | Must Have | +| FR-HV-02 | System shall display conversation history for individual tasks | Must Have | +| FR-HV-03 | System shall display task output/tool call history with timestamps | Must Have | +| FR-HV-04 | System shall display checkpoint history with git diffs | Must Have | +| FR-HV-05 | System shall provide a unified timeline view of all activities | Should Have | +| FR-HV-06 | System shall support filtering history by phase, event type, and time range | Should Have | +| FR-HV-07 | System shall support pagination for large history datasets | Must Have | +| FR-HV-08 | System shall display supervisor conversation history including spawned tasks | Must Have | + +### 2.2 Resume System (FR-RS) + +| ID | Requirement | Priority | +|----|-------------|----------| +| FR-RS-01 | System shall resume interrupted supervisor conversations with full context | Must Have | +| FR-RS-02 | System shall resume interrupted task conversations with context | Must Have | +| FR-RS-03 | System shall allow resuming from specific checkpoints | Must Have | +| FR-RS-04 | System shall allow continuing tasks from previous task state (worktree inheritance) | Must Have | +| FR-RS-05 | System shall support multiple resume modes: continue, restart-phase, from-checkpoint | Should Have | +| FR-RS-06 | System shall preserve conversation context during resume operations | Must Have | +| FR-RS-07 | System shall support resuming on different daemons after daemon failures | Must Have | + +### 2.3 Rewind/Restore Features (FR-RW) + +| ID | Requirement | Priority | +|----|-------------|----------| +| FR-RW-01 | System shall rewind code to any checkpoint (git restore) | Must Have | +| FR-RW-02 | System shall rewind conversation to any point in history | Should Have | +| FR-RW-03 | System shall create new branches from historical points | Must Have | +| FR-RW-04 | System shall fork tasks from any point in history | Should Have | +| FR-RW-05 | System shall preserve current state when rewinding (create branch, stash) | Should Have | +| FR-RW-06 | System shall support rewinding both code and conversation together | Should Have | + +### 2.4 Integration Requirements (FR-INT) + +| ID | Requirement | Priority | +|----|-------------|----------| +| FR-INT-01 | System shall provide CLI commands for all history/resume/rewind operations | Must Have | +| FR-INT-02 | System shall provide REST API endpoints for frontend integration | Must Have | +| FR-INT-03 | System shall integrate with existing task_checkpoints table | Must Have | +| FR-INT-04 | System shall integrate with existing task_events table | Must Have | +| FR-INT-05 | System shall integrate with existing supervisor_states table | Must Have | +| FR-INT-06 | System shall integrate with existing conversation state in tasks | Must Have | + +## 3. Non-Functional Requirements + +### 3.1 Performance (NFR-P) + +| ID | Requirement | Target | +|----|-------------|--------| +| NFR-P-01 | History queries shall complete within 500ms for up to 1000 entries | 500ms | +| NFR-P-02 | Resume operations shall complete within 5 seconds | 5s | +| NFR-P-03 | Rewind operations shall complete within 10 seconds | 10s | +| NFR-P-04 | Checkpoint diff generation shall complete within 2 seconds | 2s | + +### 3.2 Scalability (NFR-S) + +| ID | Requirement | Target | +|----|-------------|--------| +| NFR-S-01 | System shall support contracts with 100+ tasks | 100+ tasks | +| NFR-S-02 | System shall support tasks with 1000+ conversation messages | 1000+ messages | +| NFR-S-03 | System shall support checkpoints with 500+ file changes | 500+ files | + +### 3.3 Security (NFR-SEC) + +| ID | Requirement | +|----|-------------| +| NFR-SEC-01 | All operations shall verify owner_id authorization | +| NFR-SEC-02 | Supervisor operations shall require tool key authentication | +| NFR-SEC-03 | Users shall not access other users' history data | +| NFR-SEC-04 | All rewind operations shall be logged for audit | + +### 3.4 Reliability (NFR-R) + +| ID | Requirement | +|----|-------------| +| NFR-R-01 | Resume operations shall not lose conversation context | +| NFR-R-02 | Rewind operations shall be atomic (all-or-nothing) | +| NFR-R-03 | System shall handle daemon disconnects gracefully | + +## 4. Data Requirements + +### 4.1 New Data Entities + +#### Conversation Snapshots +- Store conversation state at specific points +- Link to checkpoints for combined code+conversation restore +- Support manual and automatic snapshot creation + +#### History Events +- Unified event stream for timeline views +- Include task events, chat messages, checkpoints, phase changes +- Support efficient querying by contract, task, or owner + +### 4.2 Data Retention + +| Data Type | Retention Period | Notes | +|-----------|-----------------|-------| +| Conversation history | Indefinite | Core to resume functionality | +| Conversation snapshots | 90 days | Configurable by user | +| History events | 1 year | Configurable by user | +| Checkpoint diffs | Indefinite | Git-based, low overhead | + +## 5. API Requirements + +### 5.1 New Endpoints Required + +| Method | Path | Description | +|--------|------|-------------| +| GET | `/contracts/{id}/history` | Contract history timeline | +| GET | `/contracts/{id}/supervisor/conversation` | Supervisor conversation | +| POST | `/contracts/{id}/supervisor/resume` | Resume supervisor | +| POST | `/contracts/{id}/supervisor/conversation/rewind` | Rewind conversation | +| GET | `/mesh/tasks/{id}/conversation` | Task conversation | +| GET | `/mesh/tasks/{id}/checkpoints/{cid}/diff` | Checkpoint diff | +| POST | `/mesh/tasks/{id}/checkpoints/{cid}/resume` | Resume from checkpoint | +| POST | `/mesh/tasks/{id}/checkpoints/{cid}/branch` | Branch from checkpoint | +| POST | `/mesh/tasks/{id}/rewind` | Rewind task code | +| POST | `/mesh/tasks/{id}/fork` | Fork from history | +| GET | `/timeline` | Unified timeline | + +### 5.2 Enhanced Existing Endpoints + +| Method | Path | Enhancement | +|--------|------|-------------| +| POST | `/mesh/tasks/{id}/continue` | Add resumeMode and contextOptions | +| POST | `/mesh/tasks` | Document checkpoint_sha and continueFromTaskId usage | + +## 6. CLI Requirements + +### 6.1 New Commands + +| Command | Description | +|---------|-------------| +| `makima contract history` | View contract history timeline | +| `makima task history` | View task conversation history | +| `makima task checkpoints` | List task checkpoints | +| `makima task checkpoint-diff` | View checkpoint diff | +| `makima supervisor resume` | Resume interrupted supervisor | +| `makima task resume` | Resume interrupted task | +| `makima task resume-from` | Resume from specific checkpoint | +| `makima task rewind` | Rewind code to checkpoint | +| `makima supervisor rewind-conversation` | Rewind supervisor conversation | +| `makima task fork` | Fork task from historical point | + +## 7. User Interface Requirements + +### 7.1 Frontend Components + +| Component | Description | +|-----------|-------------| +| ContractTimeline | Visual timeline of contract activities | +| ConversationViewer | Display conversation history | +| CheckpointBrowser | Browse and select checkpoints | +| DiffViewer | Display git diffs | +| RewindControls | UI for rewind operations | +| ForkDialog | Modal for forking from history | +| ResumeDialog | Modal for resume options | + +### 7.2 Frontend Routes + +| Route | Description | +|-------|-------------| +| `/contracts/:id/history` | Contract history page | +| `/contracts/:id/supervisor` | Supervisor conversation view | +| `/tasks/:id/conversation` | Task conversation view | +| `/tasks/:id/checkpoints` | Checkpoint browser | +| `/tasks/:id/checkpoints/:cid` | Checkpoint detail with diff | + +## 8. Dependencies + +### 8.1 Existing System Dependencies + +| Dependency | Usage | +|------------|-------| +| supervisor_states table | Store/retrieve supervisor conversation | +| task_checkpoints table | Git checkpoint tracking | +| task_events table | Task output history | +| tasks.conversation_state | Task conversation context | +| tasks.continue_from_task_id | Worktree inheritance | + +### 8.2 External Dependencies + +| Dependency | Usage | +|------------|-------| +| Git | Checkpoint restore, branching, diffing | +| PostgreSQL JSONB | Conversation storage and querying | + +## 9. Acceptance Criteria Summary + +### 9.1 History Viewing +- [ ] User can view complete contract history across all phases +- [ ] User can view task conversation with tool calls +- [ ] User can view checkpoint history with diffs +- [ ] Timeline view shows all activities chronologically +- [ ] Pagination works for large datasets + +### 9.2 Resume System +- [ ] Supervisor can be resumed after daemon disconnect +- [ ] Task can be resumed with full conversation context +- [ ] Resume from specific checkpoint works correctly +- [ ] Task continuation preserves worktree state + +### 9.3 Rewind/Restore +- [ ] Code can be rewound to any checkpoint +- [ ] Conversation can be rewound to any point +- [ ] Branch can be created from any checkpoint +- [ ] Fork creates new task with correct state +- [ ] Original state can be preserved during rewind + +## 10. Out of Scope + +The following are explicitly out of scope for the initial implementation: + +1. Full-text search within conversation history +2. History comparison/diffing between two points +3. Interactive replay mode +4. History export functionality +5. AI-powered history summaries +6. Cross-user history sharing +7. Real-time history synchronization diff --git a/makima/docs/SPEC-resume-history-system.md b/makima/docs/SPEC-resume-history-system.md new file mode 100644 index 0000000..7a19a90 --- /dev/null +++ b/makima/docs/SPEC-resume-history-system.md @@ -0,0 +1,1000 @@ +# Resume and History System Specification + +## Overview + +This specification defines a comprehensive system for viewing historical conversation data, resuming interrupted work, and rewinding/restoring to previous states in the Makima platform. The system integrates with the existing contract, task, supervisor, and checkpoint infrastructure. + +## Table of Contents + +1. [Current Infrastructure](#1-current-infrastructure) +2. [History Viewing Features](#2-history-viewing-features) +3. [Resume System](#3-resume-system) +4. [Rewind/Restore Features](#4-rewindrestore-features) +5. [Integration Points](#5-integration-points) +6. [Implementation Plan](#6-implementation-plan) + +--- + +## 1. Current Infrastructure + +### 1.1 Existing Database Tables + +The system builds upon these existing tables: + +#### `supervisor_states` +```sql +- id: UUID +- contract_id: UUID +- task_id: UUID +- conversation_history: JSONB -- Full Claude conversation history +- last_checkpoint_id: UUID +- pending_task_ids: UUID[] +- phase: VARCHAR +- last_activity: TIMESTAMP +- created_at: TIMESTAMP +- updated_at: TIMESTAMP +``` + +#### `task_checkpoints` +```sql +- id: UUID +- task_id: UUID +- checkpoint_number: INT +- commit_sha: VARCHAR +- branch_name: VARCHAR +- message: VARCHAR +- files_changed: JSONB -- [{path, action: 'A'|'M'|'D'}] +- lines_added: INT +- lines_removed: INT +- created_at: TIMESTAMP +``` + +#### `task_events` +```sql +- id: UUID +- task_id: UUID +- event_type: VARCHAR -- 'output', 'status_change', etc. +- previous_status: VARCHAR +- new_status: VARCHAR +- event_data: JSONB -- Contains full output data +- created_at: TIMESTAMP +``` + +#### `tasks` (relevant fields) +```sql +- conversation_state: JSONB -- Saved conversation context +- continue_from_task_id: UUID -- For worktree inheritance +- last_checkpoint_sha: VARCHAR +- checkpoint_count: INT +- checkpoint_message: VARCHAR +``` + +#### `mesh_chat_conversations` / `mesh_chat_messages` +```sql +-- Conversations for mesh chat +- id, owner_id, name, is_active, created_at, updated_at + +-- Messages +- id, conversation_id, role, content, context_type, context_task_id +- tool_calls: JSONB, pending_questions: JSONB, created_at +``` + +#### `contract_chat_conversations` / `contract_chat_messages` +```sql +-- Similar structure to mesh chat, scoped to contracts +``` + +### 1.2 Existing Repository Functions + +```rust +// Checkpoint functions +- list_task_checkpoints(pool, task_id) -> Vec<TaskCheckpoint> +- get_task_checkpoint(pool, id) -> Option<TaskCheckpoint> +- get_task_checkpoint_by_sha(pool, commit_sha) -> Option<TaskCheckpoint> + +// Supervisor state functions +- upsert_supervisor_state(pool, contract_id, task_id, conversation_history, pending_task_ids, phase) +- get_supervisor_state(pool, contract_id) -> Option<SupervisorState> +- get_supervisor_state_by_task(pool, task_id) -> Option<SupervisorState> +- update_supervisor_conversation(pool, contract_id, conversation_history) +- update_supervisor_pending_tasks(pool, contract_id, pending_task_ids) + +// Task output functions +- get_task_output(pool, task_id, limit) -> Vec<TaskEvent> +- list_task_events(pool, task_id, limit) -> Vec<TaskEvent> +``` + +### 1.3 Existing API Endpoints + +``` +GET /api/v1/mesh/tasks/{id}/output -- Task output history +GET /api/v1/mesh/tasks/{id}/events -- Task events +GET /api/v1/mesh/tasks/{id}/checkpoints -- List checkpoints +POST /api/v1/mesh/tasks/{id}/continue -- Continue interrupted task +POST /api/v1/mesh/tasks/{id}/reassign -- Reassign to new daemon +``` + +--- + +## 2. History Viewing Features + +### 2.1 Contract History Timeline + +View all activities across a contract's lifecycle, aggregating data from all phases and tasks. + +#### API Endpoint + +``` +GET /api/v1/contracts/{contract_id}/history +``` + +#### Query Parameters + +| Parameter | Type | Description | +|-----------|------|-------------| +| `phase` | string | Filter by phase (research, specify, plan, execute, review) | +| `event_types` | string[] | Filter by event types (task_created, task_completed, checkpoint, phase_change, chat) | +| `from` | datetime | Start of time range | +| `to` | datetime | End of time range | +| `limit` | int | Max results (default: 100) | +| `cursor` | string | Pagination cursor | + +#### Response Schema + +```typescript +interface ContractHistoryResponse { + contractId: string; + entries: HistoryEntry[]; + totalCount: number; + cursor?: string; +} + +interface HistoryEntry { + id: string; + timestamp: datetime; + entryType: 'task_event' | 'chat_message' | 'phase_change' | 'checkpoint' | 'file_change'; + phase: string; + + // Task event data (when entryType = 'task_event') + taskId?: string; + taskName?: string; + eventType?: string; + eventData?: any; + + // Chat message data (when entryType = 'chat_message') + conversationId?: string; + role?: string; + content?: string; + + // Checkpoint data (when entryType = 'checkpoint') + checkpointNumber?: number; + commitSha?: string; + message?: string; + filesChanged?: FileChange[]; + + // Phase change data (when entryType = 'phase_change') + previousPhase?: string; + newPhase?: string; +} +``` + +### 2.2 Task Conversation History + +View the complete conversation history for a specific task. + +#### API Endpoint + +``` +GET /api/v1/mesh/tasks/{task_id}/conversation +``` + +#### Query Parameters + +| Parameter | Type | Description | +|-----------|------|-------------| +| `include_tool_calls` | bool | Include tool call details (default: true) | +| `include_tool_results` | bool | Include tool results (default: true) | +| `limit` | int | Max messages (default: all) | + +#### Response Schema + +```typescript +interface TaskConversationResponse { + taskId: string; + taskName: string; + status: string; + messages: ConversationMessage[]; + totalTokens?: number; + totalCost?: number; +} + +interface ConversationMessage { + id: string; + role: 'user' | 'assistant' | 'system' | 'tool'; + content: string; + timestamp: datetime; + + // For assistant messages + toolCalls?: ToolCall[]; + + // For tool messages + toolName?: string; + toolInput?: any; + toolResult?: string; + isError?: boolean; + + // Cost tracking + tokenCount?: number; + costUsd?: number; +} +``` + +### 2.3 Supervisor Conversation History + +View the full conversation history for a contract's supervisor. + +#### API Endpoint + +``` +GET /api/v1/contracts/{contract_id}/supervisor/conversation +``` + +#### Response Schema + +```typescript +interface SupervisorConversationResponse { + contractId: string; + supervisorTaskId: string; + phase: string; + lastActivity: datetime; + pendingTaskIds: string[]; + + // Full conversation from supervisor_states.conversation_history + messages: ConversationMessage[]; + + // Tasks spawned during conversation + spawnedTasks: TaskReference[]; +} + +interface TaskReference { + taskId: string; + taskName: string; + status: string; + createdAt: datetime; + completedAt?: datetime; +} +``` + +### 2.4 Checkpoint History with Diffs + +View checkpoint history with Git diff information. + +#### API Endpoint + +``` +GET /api/v1/mesh/tasks/{task_id}/checkpoints/{checkpoint_id}/diff +``` + +#### Response Schema + +```typescript +interface CheckpointDiffResponse { + checkpoint: TaskCheckpoint; + diff: GitDiff; + previousCheckpoint?: TaskCheckpoint; +} + +interface GitDiff { + files: FileDiff[]; + stats: { + filesChanged: number; + insertions: number; + deletions: number; + }; +} + +interface FileDiff { + path: string; + action: 'added' | 'modified' | 'deleted'; + additions: number; + deletions: number; + hunks: DiffHunk[]; +} +``` + +### 2.5 Activity Timeline View + +Unified timeline showing all activities for a contract or task. + +#### API Endpoint + +``` +GET /api/v1/timeline +``` + +#### Query Parameters + +| Parameter | Type | Description | +|-----------|------|-------------| +| `contract_id` | UUID | Filter by contract | +| `task_id` | UUID | Filter by specific task | +| `include_subtasks` | bool | Include subtask activities | +| `from` | datetime | Start of time range | +| `to` | datetime | End of time range | + +--- + +## 3. Resume System + +### 3.1 Resume Interrupted Supervisor + +Resume a supervisor conversation that was interrupted (daemon disconnect, crash, etc.). + +#### API Endpoint + +``` +POST /api/v1/contracts/{contract_id}/supervisor/resume +``` + +#### Request Schema + +```typescript +interface ResumeSupervisorRequest { + // Optional: specify daemon to resume on + targetDaemonId?: string; + + // How to handle the resume + resumeMode: 'continue' | 'restart_phase' | 'from_checkpoint'; + + // For 'from_checkpoint' mode + checkpointId?: string; + + // Additional context to inject + additionalContext?: string; +} +``` + +#### Behavior + +1. **continue** (default): + - Load `supervisor_states.conversation_history` + - Prepend context summary to prompt + - Resume with full conversation context + +2. **restart_phase**: + - Start fresh for current phase + - Keep knowledge of completed work + - Don't replay old conversation + +3. **from_checkpoint**: + - Reset code to checkpoint state + - Clear conversation after checkpoint + - Resume from clean state + +#### Response Schema + +```typescript +interface ResumeSupervisorResponse { + supervisorTaskId: string; + daemonId: string; + resumedFrom: { + phase: string; + lastActivity: datetime; + messageCount: number; + }; + status: 'starting' | 'running'; +} +``` + +### 3.2 Resume Interrupted Task + +Resume a task that was interrupted mid-execution. + +#### API Endpoint (existing, enhanced) + +``` +POST /api/v1/mesh/tasks/{task_id}/continue +``` + +#### Enhanced Request Schema + +```typescript +interface ContinueTaskRequest { + targetDaemonId?: string; + + // NEW: Resume options + resumeMode?: 'with_context' | 'clean_restart' | 'from_checkpoint'; + checkpointSha?: string; + + // NEW: Context control + contextOptions?: { + maxMessages?: number; // Limit context size + includeToolCalls?: boolean; + includeToolResults?: boolean; + summaryMode?: boolean; // Use summarized context + }; +} +``` + +### 3.3 Resume from Specific Checkpoint + +Create a new task that starts from a specific checkpoint's code state. + +#### API Endpoint + +``` +POST /api/v1/mesh/tasks/{task_id}/checkpoints/{checkpoint_id}/resume +``` + +#### Request Schema + +```typescript +interface ResumeFromCheckpointRequest { + // Name for the new task + taskName?: string; + + // Plan/instructions for the resumed task + plan: string; + + // Whether to include conversation context up to checkpoint + includeConversation?: boolean; + + // Target daemon + targetDaemonId?: string; +} +``` + +#### Behavior + +1. Get checkpoint's commit SHA +2. Create new task with `checkpoint_sha` set +3. When daemon spawns task: + - Create worktree at checkpoint commit + - Optionally include conversation context +4. Task starts from checkpoint's code state + +#### Response Schema + +```typescript +interface ResumeFromCheckpointResponse { + newTaskId: string; + sourceTaskId: string; + checkpointUsed: { + number: number; + sha: string; + message: string; + }; + status: 'pending' | 'starting'; +} +``` + +### 3.4 Continue from Previous Task State + +Use existing `continue_from_task_id` field to inherit worktree state. + +#### API Endpoint (existing) + +``` +POST /api/v1/mesh/tasks +``` + +#### Relevant Fields + +```typescript +interface CreateTaskRequest { + // ... other fields ... + + // Copy worktree from this task + continueFromTaskId?: string; + + // Specific checkpoint SHA to branch from + checkpointSha?: string; + + // Files to copy from parent task's worktree + copyFiles?: string[]; +} +``` + +--- + +## 4. Rewind/Restore Features + +### 4.1 Rewind Code to Checkpoint + +Restore a task's worktree to a specific checkpoint's state. + +#### API Endpoint + +``` +POST /api/v1/mesh/tasks/{task_id}/rewind +``` + +#### Request Schema + +```typescript +interface RewindTaskRequest { + // Target checkpoint to rewind to + checkpointId?: string; + checkpointSha?: string; // Alternative: use SHA directly + + // What to do with current state + preserveMode: 'discard' | 'create_branch' | 'stash'; + + // For 'create_branch' mode + branchName?: string; +} +``` + +#### Behavior + +1. **discard**: Hard reset to checkpoint, lose current changes +2. **create_branch**: Create branch from current HEAD, then reset +3. **stash**: Git stash current changes, then reset + +#### Response Schema + +```typescript +interface RewindTaskResponse { + taskId: string; + rewindedTo: { + checkpointNumber: number; + sha: string; + message: string; + }; + preservedAs?: { + type: 'branch' | 'stash'; + reference: string; // Branch name or stash ref + }; +} +``` + +### 4.2 Rewind Conversation to Point + +Truncate conversation history to a specific point. + +#### API Endpoint + +``` +POST /api/v1/contracts/{contract_id}/supervisor/conversation/rewind +``` + +#### Request Schema + +```typescript +interface RewindConversationRequest { + // Rewind to message with this ID + toMessageId?: string; + + // OR rewind to messages before this timestamp + toTimestamp?: datetime; + + // OR rewind by N messages + byMessageCount?: number; + + // Whether to also rewind code to matching checkpoint + rewindCode?: boolean; +} +``` + +#### Behavior + +1. Identify the target point in `supervisor_states.conversation_history` +2. Truncate messages after that point +3. If `rewindCode` is true and there's a matching checkpoint, reset code +4. Update `supervisor_states` with truncated history + +### 4.3 Fork from Historical Point + +Create a new task/conversation branch from any point in history. + +#### API Endpoint + +``` +POST /api/v1/mesh/tasks/{task_id}/fork +``` + +#### Request Schema + +```typescript +interface ForkTaskRequest { + // Fork point specification + forkFrom: { + type: 'checkpoint' | 'timestamp' | 'message_id'; + value: string; + }; + + // New task configuration + newTask: { + name: string; + plan: string; + includeConversation?: boolean; // Include history up to fork point + }; + + // Git branching options + branchOptions?: { + createBranch?: boolean; + branchName?: string; + }; +} +``` + +#### Behavior + +1. Identify the fork point (checkpoint, timestamp, or message) +2. Create new task with: + - `checkpoint_sha` from fork point (or nearest checkpoint) + - Conversation context up to fork point (if requested) +3. Create git branch if requested +4. Start task in new direction + +#### Response Schema + +```typescript +interface ForkTaskResponse { + newTaskId: string; + sourceTaskId: string; + forkPoint: { + type: string; + checkpoint?: TaskCheckpoint; + timestamp: datetime; + }; + branchName?: string; + conversationIncluded: boolean; + messageCount?: number; +} +``` + +### 4.4 Create Branch from Checkpoint + +Create a new git branch from a specific checkpoint without starting a task. + +#### API Endpoint + +``` +POST /api/v1/mesh/tasks/{task_id}/checkpoints/{checkpoint_id}/branch +``` + +#### Request Schema + +```typescript +interface CreateBranchFromCheckpointRequest { + branchName: string; + checkout?: boolean; // Whether to checkout the new branch +} +``` + +--- + +## 5. Integration Points + +### 5.1 New CLI Commands + +#### History Commands + +```bash +# View contract history timeline +makima contract history [--phase <phase>] [--from <date>] [--to <date>] + +# View task conversation history +makima task history <task_id> [--limit <n>] [--no-tool-calls] + +# View task checkpoints +makima task checkpoints <task_id> [--with-diff] + +# View specific checkpoint diff +makima task checkpoint-diff <task_id> <checkpoint_number> +``` + +#### Resume Commands + +```bash +# Resume supervisor +makima supervisor resume [--mode <continue|restart|checkpoint>] [--checkpoint <id>] + +# Resume task +makima task resume <task_id> [--mode <context|clean|checkpoint>] [--checkpoint <sha>] + +# Resume from checkpoint +makima task resume-from <task_id> --checkpoint <number> --plan "continue with..." +``` + +#### Rewind Commands + +```bash +# Rewind code to checkpoint +makima task rewind <task_id> --checkpoint <number> [--preserve-branch <name>] + +# Rewind conversation +makima supervisor rewind-conversation --to-message <id> [--rewind-code] + +# Fork from checkpoint +makima task fork <task_id> --checkpoint <number> --name "alternate approach" --plan "..." +``` + +### 5.2 New API Endpoints Summary + +| Method | Path | Description | +|--------|------|-------------| +| GET | `/api/v1/contracts/{id}/history` | Contract history timeline | +| GET | `/api/v1/contracts/{id}/supervisor/conversation` | Supervisor conversation | +| POST | `/api/v1/contracts/{id}/supervisor/resume` | Resume supervisor | +| POST | `/api/v1/contracts/{id}/supervisor/conversation/rewind` | Rewind conversation | +| GET | `/api/v1/mesh/tasks/{id}/conversation` | Task conversation | +| GET | `/api/v1/mesh/tasks/{id}/checkpoints/{cid}/diff` | Checkpoint diff | +| POST | `/api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume` | Resume from checkpoint | +| POST | `/api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch` | Branch from checkpoint | +| POST | `/api/v1/mesh/tasks/{id}/rewind` | Rewind task code | +| POST | `/api/v1/mesh/tasks/{id}/fork` | Fork from history | +| GET | `/api/v1/timeline` | Unified timeline | + +### 5.3 Database Schema Changes + +#### New Table: `conversation_snapshots` + +Store conversation state at specific points for rewind capability. + +```sql +CREATE TABLE conversation_snapshots ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + task_id UUID NOT NULL REFERENCES tasks(id) ON DELETE CASCADE, + checkpoint_id UUID REFERENCES task_checkpoints(id) ON DELETE SET NULL, + snapshot_type VARCHAR(50) NOT NULL, -- 'auto', 'manual', 'checkpoint' + message_count INT NOT NULL, + conversation_state JSONB NOT NULL, -- Full conversation at this point + metadata JSONB, -- Additional context + created_at TIMESTAMPTZ DEFAULT NOW() +); + +CREATE INDEX idx_conversation_snapshots_task ON conversation_snapshots(task_id); +CREATE INDEX idx_conversation_snapshots_checkpoint ON conversation_snapshots(checkpoint_id); +``` + +#### New Table: `history_events` + +Unified event table for timeline views. + +```sql +CREATE TABLE history_events ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + owner_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, + contract_id UUID REFERENCES contracts(id) ON DELETE CASCADE, + task_id UUID REFERENCES tasks(id) ON DELETE CASCADE, + event_type VARCHAR(50) NOT NULL, -- 'task', 'chat', 'checkpoint', 'phase', 'file' + event_subtype VARCHAR(50), -- Specific event type + phase VARCHAR(50), + event_data JSONB NOT NULL, + created_at TIMESTAMPTZ DEFAULT NOW() +); + +CREATE INDEX idx_history_events_contract ON history_events(contract_id, created_at DESC); +CREATE INDEX idx_history_events_task ON history_events(task_id, created_at DESC); +CREATE INDEX idx_history_events_owner ON history_events(owner_id, created_at DESC); +``` + +#### Table Modifications + +```sql +-- Add to task_checkpoints +ALTER TABLE task_checkpoints ADD COLUMN conversation_snapshot_id UUID + REFERENCES conversation_snapshots(id) ON DELETE SET NULL; + +-- Add to tasks +ALTER TABLE tasks ADD COLUMN forked_from_task_id UUID + REFERENCES tasks(id) ON DELETE SET NULL; +ALTER TABLE tasks ADD COLUMN forked_at_checkpoint_id UUID + REFERENCES task_checkpoints(id) ON DELETE SET NULL; +``` + +### 5.4 Frontend Integration + +#### New Components + +1. **ContractTimeline** - Visual timeline of contract activities +2. **ConversationViewer** - Display conversation history with syntax highlighting +3. **CheckpointBrowser** - Browse checkpoints with diff view +4. **RewindControls** - UI for rewind operations +5. **ForkDialog** - Modal for forking from history + +#### New Routes + +``` +/contracts/:id/history - Contract history timeline +/contracts/:id/supervisor - Supervisor conversation view +/tasks/:id/conversation - Task conversation view +/tasks/:id/checkpoints - Checkpoint browser +/tasks/:id/checkpoints/:cid - Checkpoint detail with diff +``` + +--- + +## 6. Implementation Plan + +### 6.1 Phase 1: Database & Models (Migration 20250116_history_tables) + +**Migrations:** + +1. Create `conversation_snapshots` table +2. Create `history_events` table +3. Add columns to `task_checkpoints` and `tasks` +4. Create indexes for efficient queries + +**Models (Rust):** + +1. `ConversationSnapshot` struct +2. `HistoryEvent` struct +3. `ConversationMessage` struct (unified format) +4. Update `TaskCheckpoint` with new fields + +### 6.2 Phase 2: Repository Functions + +**New Functions:** + +```rust +// Conversation snapshots +pub async fn create_conversation_snapshot(pool, task_id, checkpoint_id, state) -> ConversationSnapshot +pub async fn get_conversation_snapshot(pool, id) -> Option<ConversationSnapshot> +pub async fn get_conversation_at_checkpoint(pool, checkpoint_id) -> Option<ConversationSnapshot> +pub async fn list_conversation_snapshots(pool, task_id) -> Vec<ConversationSnapshot> + +// History events +pub async fn record_history_event(pool, event) -> HistoryEvent +pub async fn get_contract_history(pool, contract_id, filters) -> Vec<HistoryEvent> +pub async fn get_task_history(pool, task_id, filters) -> Vec<HistoryEvent> +pub async fn get_timeline(pool, owner_id, filters) -> Vec<HistoryEvent> + +// Conversation retrieval +pub async fn get_task_conversation(pool, task_id, options) -> Vec<ConversationMessage> +pub async fn get_supervisor_conversation(pool, contract_id) -> SupervisorConversation + +// Checkpoint operations +pub async fn get_checkpoint_diff(pool, checkpoint_id) -> CheckpointDiff +pub async fn create_checkpoint_with_snapshot(pool, task_id, message, conversation_state) -> TaskCheckpoint +``` + +### 6.3 Phase 3: API Endpoints + +**History Endpoints:** + +1. `GET /api/v1/contracts/{id}/history` +2. `GET /api/v1/contracts/{id}/supervisor/conversation` +3. `GET /api/v1/mesh/tasks/{id}/conversation` +4. `GET /api/v1/mesh/tasks/{id}/checkpoints/{cid}/diff` +5. `GET /api/v1/timeline` + +**Resume Endpoints:** + +1. `POST /api/v1/contracts/{id}/supervisor/resume` +2. `POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume` +3. Enhance existing `POST /api/v1/mesh/tasks/{id}/continue` + +**Rewind Endpoints:** + +1. `POST /api/v1/mesh/tasks/{id}/rewind` +2. `POST /api/v1/contracts/{id}/supervisor/conversation/rewind` +3. `POST /api/v1/mesh/tasks/{id}/fork` +4. `POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch` + +### 6.4 Phase 4: CLI Commands + +**New Commands:** + +1. `makima contract history` - View contract history +2. `makima task history` - View task conversation +3. `makima task checkpoints` - List checkpoints +4. `makima task checkpoint-diff` - View checkpoint diff +5. `makima supervisor resume` - Resume supervisor +6. `makima task resume` - Resume task +7. `makima task resume-from` - Resume from checkpoint +8. `makima task rewind` - Rewind code +9. `makima supervisor rewind-conversation` - Rewind conversation +10. `makima task fork` - Fork from history + +### 6.5 Phase 5: Daemon Integration + +**New Daemon Commands:** + +```rust +enum DaemonCommand { + // Existing... + + // New + RewindToCheckpoint { + task_id: Uuid, + checkpoint_sha: String, + preserve_mode: PreserveMode, + branch_name: Option<String>, + }, + CreateConversationSnapshot { + task_id: Uuid, + }, + RestoreFromSnapshot { + task_id: Uuid, + snapshot_id: Uuid, + }, +} +``` + +**Daemon Handlers:** + +1. Implement `handle_rewind_to_checkpoint` +2. Implement `handle_create_conversation_snapshot` +3. Update checkpoint creation to include conversation state +4. Update task spawning to handle fork/resume scenarios + +### 6.6 Phase 6: Frontend Components + +**Components:** + +1. ContractTimeline +2. ConversationViewer +3. CheckpointBrowser +4. DiffViewer +5. RewindControls +6. ForkDialog +7. ResumeDialog + +**Routes & Pages:** + +1. Contract history page +2. Supervisor conversation page +3. Task conversation page +4. Checkpoint detail page + +--- + +## 7. Security Considerations + +### 7.1 Access Control + +- All history/resume/rewind operations must verify owner_id +- Supervisor operations require tool key authentication +- Frontend operations require user authentication +- Prevent access to other users' conversation history + +### 7.2 Data Retention + +- Conversation snapshots may contain sensitive data +- Implement configurable retention policies +- Consider encryption for stored conversation data +- Provide data export/deletion capabilities + +### 7.3 Audit Trail + +- Log all rewind/restore operations +- Track who initiated changes +- Maintain original data before modifications + +--- + +## 8. Performance Considerations + +### 8.1 Pagination + +- All list endpoints support cursor-based pagination +- Limit maximum response sizes +- Use efficient queries with proper indexes + +### 8.2 Caching + +- Cache frequently accessed conversation history +- Invalidate cache on updates +- Consider read replicas for history queries + +### 8.3 Storage + +- Compress large conversation histories +- Consider archiving old snapshots +- Monitor database size growth + +--- + +## 9. Future Enhancements + +1. **Search within history** - Full-text search across conversations +2. **History comparison** - Diff between two points in time +3. **Replay mode** - Step through conversation history +4. **Export history** - Download conversation/checkpoint history +5. **History annotations** - Add notes to historical points +6. **Automatic snapshots** - Periodic conversation backups +7. **History sharing** - Share specific history ranges +8. **AI-powered summaries** - Generate summaries of historical periods diff --git a/makima/docs/USER-STORIES-resume-history-system.md b/makima/docs/USER-STORIES-resume-history-system.md new file mode 100644 index 0000000..eba3158 --- /dev/null +++ b/makima/docs/USER-STORIES-resume-history-system.md @@ -0,0 +1,240 @@ +# Resume and History System - User Stories + +## Epic: History Viewing + +### US-HV-01: View Contract History Timeline +**As a** developer using Makima +**I want to** view a complete timeline of all activities in my contract +**So that** I can understand what happened across all phases and tasks + +**Acceptance Criteria:** +- Timeline shows all activities chronologically +- Activities include task creation, completions, checkpoints, phase changes, and chat messages +- Timeline can be filtered by phase +- Timeline can be filtered by time range +- Timeline supports pagination for large datasets + +### US-HV-02: View Task Conversation History +**As a** developer +**I want to** view the complete conversation history for any task +**So that** I can understand what the AI did and how it arrived at its solution + +**Acceptance Criteria:** +- All messages are displayed in chronological order +- Tool calls are shown with their inputs +- Tool results are shown with outputs +- Error messages are clearly highlighted +- Token usage and cost are displayed if available + +### US-HV-03: View Checkpoint with Diff +**As a** developer +**I want to** see what code changes were made in each checkpoint +**So that** I can review the work done at each step + +**Acceptance Criteria:** +- Checkpoint details show commit message and timestamp +- Diff shows files changed with additions/deletions +- Diff uses syntax highlighting for code +- Can navigate between checkpoints +- Shows summary statistics (files changed, lines added/removed) + +### US-HV-04: View Supervisor Conversation +**As a** developer managing a contract +**I want to** view the supervisor's conversation history +**So that** I can understand how tasks were orchestrated and decisions were made + +**Acceptance Criteria:** +- Full conversation history is displayed +- Spawned tasks are linked and visible +- Pending tasks are indicated +- Phase information is shown +- Tool calls and decisions are visible + +## Epic: Resume System + +### US-RS-01: Resume Interrupted Supervisor +**As a** developer +**I want to** resume my supervisor conversation after a daemon disconnect +**So that** I don't lose my progress and context + +**Acceptance Criteria:** +- Supervisor can be resumed with full conversation context +- Can choose resume mode (continue, restart phase, from checkpoint) +- Resume works on any available daemon +- Original context is preserved +- New activity continues from where it left off + +### US-RS-02: Resume Interrupted Task +**As a** developer +**I want to** resume a task that was interrupted +**So that** the AI can continue without redoing completed work + +**Acceptance Criteria:** +- Task resumes with conversation context +- Worktree state is preserved +- Can specify context size limit +- Resume works on different daemon if needed +- Progress is not lost + +### US-RS-03: Resume from Specific Checkpoint +**As a** developer +**I want to** resume work from a specific checkpoint +**So that** I can try a different approach from a known good state + +**Acceptance Criteria:** +- Can select any checkpoint to resume from +- Code state matches checkpoint +- Can optionally include conversation up to that point +- New task is created with appropriate context +- Original task remains unchanged + +### US-RS-04: Continue from Previous Task +**As a** developer +**I want to** start a new task that inherits another task's worktree +**So that** I can build on previous work without copying files manually + +**Acceptance Criteria:** +- New task inherits worktree state from source task +- Can specify which files to copy +- Git history is preserved +- Source task is not modified +- New task gets its own isolated worktree + +## Epic: Rewind/Restore Features + +### US-RW-01: Rewind Code to Checkpoint +**As a** developer +**I want to** rewind my code to a previous checkpoint +**So that** I can undo changes and try a different approach + +**Acceptance Criteria:** +- Can select any checkpoint to rewind to +- Can choose to discard current changes +- Can choose to preserve current state as branch +- Can choose to stash current changes +- Rewind is atomic (all or nothing) + +### US-RW-02: Rewind Conversation +**As a** developer +**I want to** rewind the conversation to an earlier point +**So that** I can remove problematic AI responses and try again + +**Acceptance Criteria:** +- Can specify rewind point by message ID, timestamp, or message count +- Messages after rewind point are removed +- Optionally rewind code to matching checkpoint +- Supervisor state is updated +- Can continue conversation from new point + +### US-RW-03: Fork Task from History +**As a** developer +**I want to** create a new task from a point in history +**So that** I can explore an alternative approach without losing original work + +**Acceptance Criteria:** +- Can fork from any checkpoint +- Can fork from any conversation point +- New task gets code state from fork point +- Can optionally include conversation history +- Original task is not modified +- New branch is created in git + +### US-RW-04: Create Branch from Checkpoint +**As a** developer +**I want to** create a git branch from any checkpoint +**So that** I can preserve or share a specific point in development + +**Acceptance Criteria:** +- Can specify branch name +- Branch is created at checkpoint's commit +- Can optionally checkout the new branch +- Works even if task is completed +- Branch is visible in git + +## Epic: CLI Integration + +### US-CLI-01: View History via CLI +**As a** developer using the terminal +**I want to** view contract and task history via CLI +**So that** I can understand project history without opening the web UI + +**Acceptance Criteria:** +- `makima contract history` shows timeline +- `makima task history` shows conversation +- `makima task checkpoints` lists checkpoints +- `makima task checkpoint-diff` shows code changes +- Output is formatted for terminal readability + +### US-CLI-02: Resume via CLI +**As a** developer using the terminal +**I want to** resume supervisors and tasks via CLI +**So that** I can recover from interruptions without the web UI + +**Acceptance Criteria:** +- `makima supervisor resume` resumes supervisor +- `makima task resume` resumes interrupted task +- `makima task resume-from` resumes from checkpoint +- Can specify resume options via flags +- Clear feedback on resume success/failure + +### US-CLI-03: Rewind via CLI +**As a** developer using the terminal +**I want to** perform rewind operations via CLI +**So that** I can quickly undo changes or fork without the web UI + +**Acceptance Criteria:** +- `makima task rewind` rewinds code +- `makima supervisor rewind-conversation` rewinds conversation +- `makima task fork` creates fork from history +- Can specify options via flags +- Confirmation required for destructive operations + +## Epic: Frontend Integration + +### US-FE-01: History Timeline UI +**As a** developer using the web interface +**I want to** see a visual timeline of contract activities +**So that** I can quickly understand project progression + +**Acceptance Criteria:** +- Timeline is visually clear and scrollable +- Events are color-coded by type +- Can click events for details +- Can filter and search +- Responsive on different screen sizes + +### US-FE-02: Conversation Viewer UI +**As a** developer using the web interface +**I want to** view conversations in a chat-like interface +**So that** I can easily read and understand AI interactions + +**Acceptance Criteria:** +- Messages are displayed in chat format +- Different message types are visually distinct +- Code blocks have syntax highlighting +- Tool calls are collapsible +- Can copy message content + +### US-FE-03: Checkpoint Browser UI +**As a** developer using the web interface +**I want to** browse and interact with checkpoints +**So that** I can review, restore, or branch from any point + +**Acceptance Criteria:** +- Checkpoints listed with details +- Can view diff for each checkpoint +- Can trigger rewind from UI +- Can create branch from UI +- Can resume from checkpoint via UI + +### US-FE-04: Rewind Controls UI +**As a** developer using the web interface +**I want to** easily rewind code or conversation +**So that** I can undo changes with clear visual feedback + +**Acceptance Criteria:** +- Clear controls for rewind operations +- Preview of what will be affected +- Confirmation dialog for destructive actions +- Progress indication during operation +- Clear success/failure feedback diff --git a/makima/migrations/20250117000000_history_tables.sql b/makima/migrations/20250117000000_history_tables.sql new file mode 100644 index 0000000..60e371c --- /dev/null +++ b/makima/migrations/20250117000000_history_tables.sql @@ -0,0 +1,55 @@ +-- History tables for Resume and History System +-- Enables conversation rewind, snapshots, and unified event timeline + +-- 1. Conversation Snapshots table +-- Stores conversation state at specific points for rewind capability +CREATE TABLE IF NOT EXISTS conversation_snapshots ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + task_id UUID NOT NULL REFERENCES tasks(id) ON DELETE CASCADE, + checkpoint_id UUID REFERENCES task_checkpoints(id) ON DELETE SET NULL, + snapshot_type VARCHAR(50) NOT NULL, -- 'auto', 'manual', 'checkpoint' + message_count INTEGER NOT NULL, + conversation_state JSONB NOT NULL, -- Full conversation at this point + metadata JSONB, -- Additional context (token count, cost, etc.) + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_conversation_snapshots_task ON conversation_snapshots(task_id); +CREATE INDEX idx_conversation_snapshots_checkpoint ON conversation_snapshots(checkpoint_id); +CREATE INDEX idx_conversation_snapshots_created ON conversation_snapshots(created_at DESC); + +-- 2. History Events table +-- Unified event stream for timeline views +CREATE TABLE IF NOT EXISTS history_events ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + owner_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, + contract_id UUID REFERENCES contracts(id) ON DELETE CASCADE, + task_id UUID REFERENCES tasks(id) ON DELETE CASCADE, + event_type VARCHAR(50) NOT NULL, -- 'task', 'chat', 'checkpoint', 'phase', 'file' + event_subtype VARCHAR(50), -- Specific event: 'created', 'completed', 'message', etc. + phase VARCHAR(50), -- Contract phase when event occurred + event_data JSONB NOT NULL, -- Event-specific data + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_history_events_contract ON history_events(contract_id, created_at DESC); +CREATE INDEX idx_history_events_task ON history_events(task_id, created_at DESC); +CREATE INDEX idx_history_events_owner ON history_events(owner_id, created_at DESC); +CREATE INDEX idx_history_events_type ON history_events(event_type, created_at DESC); + +-- 3. Alter task_checkpoints - add conversation snapshot reference +ALTER TABLE task_checkpoints + ADD COLUMN IF NOT EXISTS conversation_snapshot_id UUID REFERENCES conversation_snapshots(id) ON DELETE SET NULL; + +-- 4. Alter tasks - add forking fields +ALTER TABLE tasks + ADD COLUMN IF NOT EXISTS forked_from_task_id UUID REFERENCES tasks(id) ON DELETE SET NULL, + ADD COLUMN IF NOT EXISTS forked_at_checkpoint_id UUID REFERENCES task_checkpoints(id) ON DELETE SET NULL; + +CREATE INDEX IF NOT EXISTS idx_tasks_forked_from ON tasks(forked_from_task_id) WHERE forked_from_task_id IS NOT NULL; + +-- Comments for documentation +COMMENT ON TABLE conversation_snapshots IS 'Stores conversation state at specific points for rewind/resume capability'; +COMMENT ON TABLE history_events IS 'Unified event stream for timeline views across contracts and tasks'; +COMMENT ON COLUMN conversation_snapshots.snapshot_type IS 'Type: auto (periodic), manual (user-triggered), checkpoint (at git checkpoint)'; +COMMENT ON COLUMN history_events.event_type IS 'Category: task, chat, checkpoint, phase, file'; diff --git a/makima/src/daemon/cli/mod.rs b/makima/src/daemon/cli/mod.rs index da71b0d..cde6e16 100644 --- a/makima/src/daemon/cli/mod.rs +++ b/makima/src/daemon/cli/mod.rs @@ -88,6 +88,27 @@ pub enum SupervisorCommand { /// Get task output/claude log Output(supervisor::GetTaskOutputArgs), + + /// View task conversation history + TaskHistory(supervisor::TaskHistoryArgs), + + /// List task checkpoints (with optional diff) + TaskCheckpoints(supervisor::TaskCheckpointsArgs), + + /// Resume supervisor after interruption + Resume(supervisor::ResumeArgs), + + /// Resume task from checkpoint + TaskResumeFrom(supervisor::TaskResumeFromArgs), + + /// Rewind task code to checkpoint + TaskRewind(supervisor::TaskRewindArgs), + + /// Fork task from historical point + TaskFork(supervisor::TaskForkArgs), + + /// Rewind supervisor conversation + RewindConversation(supervisor::ConversationRewindArgs), } /// Contract subcommands for task-contract interaction. diff --git a/makima/src/daemon/cli/supervisor.rs b/makima/src/daemon/cli/supervisor.rs index 2bc4c89..ba4fb2b 100644 --- a/makima/src/daemon/cli/supervisor.rs +++ b/makima/src/daemon/cli/supervisor.rs @@ -221,3 +221,164 @@ pub struct GetTaskOutputArgs { #[arg(index = 1, id = "target_task_id")] pub target_task_id: Uuid, } + +// ============================================================================ +// History Command Args +// ============================================================================ + +/// Arguments for task-history command. +#[derive(Args, Debug)] +pub struct TaskHistoryArgs { + #[command(flatten)] + pub common: SupervisorArgs, + + /// Task ID to view history for + #[arg(index = 1)] + pub task_id: Uuid, + + /// Include tool calls in output + #[arg(long, default_value = "true")] + pub tool_calls: bool, + + /// Maximum messages to return + #[arg(long)] + pub limit: Option<i32>, + + /// Output format (table, json, chat) + #[arg(long, default_value = "chat")] + pub format: String, +} + +/// Arguments for task-checkpoints command (with optional diff). +#[derive(Args, Debug)] +pub struct TaskCheckpointsArgs { + #[command(flatten)] + pub common: SupervisorArgs, + + /// Task ID to list checkpoints for + #[arg(index = 1)] + pub task_id: Uuid, + + /// Include diff summary + #[arg(long)] + pub with_diff: bool, +} + +// ============================================================================ +// Resume Command Args +// ============================================================================ + +/// Arguments for resume command. +#[derive(Args, Debug)] +pub struct ResumeArgs { + #[command(flatten)] + pub common: SupervisorArgs, + + /// Resume mode: continue, restart_phase, from_checkpoint + #[arg(long, default_value = "continue")] + pub mode: String, + + /// Checkpoint ID (required for from_checkpoint mode) + #[arg(long)] + pub checkpoint: Option<Uuid>, + + /// Additional context to inject + #[arg(long)] + pub context: Option<String>, +} + +/// Arguments for task-resume-from command. +#[derive(Args, Debug)] +pub struct TaskResumeFromArgs { + #[command(flatten)] + pub common: SupervisorArgs, + + /// Source task ID + #[arg(index = 1)] + pub task_id: Uuid, + + /// Checkpoint number to resume from + #[arg(long)] + pub checkpoint: i32, + + /// Plan for the new task + #[arg(long)] + pub plan: String, + + /// Name for the new task + #[arg(long)] + pub name: Option<String>, +} + +// ============================================================================ +// Rewind Command Args +// ============================================================================ + +/// Arguments for task-rewind command. +#[derive(Args, Debug)] +pub struct TaskRewindArgs { + #[command(flatten)] + pub common: SupervisorArgs, + + /// Task ID to rewind + #[arg(index = 1)] + pub task_id: Uuid, + + /// Checkpoint number to rewind to + #[arg(long)] + pub checkpoint: i32, + + /// Preserve mode: discard, create_branch, stash + #[arg(long, default_value = "create_branch")] + pub preserve: String, + + /// Branch name (for create_branch mode) + #[arg(long)] + pub branch_name: Option<String>, +} + +/// Arguments for task-fork command. +#[derive(Args, Debug)] +pub struct TaskForkArgs { + #[command(flatten)] + pub common: SupervisorArgs, + + /// Source task ID + #[arg(index = 1)] + pub task_id: Uuid, + + /// Checkpoint number to fork from + #[arg(long)] + pub checkpoint: i32, + + /// Name for the new task + #[arg(long)] + pub name: String, + + /// Plan for the new task + #[arg(long)] + pub plan: String, + + /// Include conversation history + #[arg(long, default_value = "true")] + pub include_conversation: bool, +} + +/// Arguments for rewind-conversation command. +#[derive(Args, Debug)] +pub struct ConversationRewindArgs { + #[command(flatten)] + pub common: SupervisorArgs, + + /// Number of messages to rewind + #[arg(long)] + pub by_messages: Option<i32>, + + /// Message ID to rewind to + #[arg(long)] + pub to_message: Option<String>, + + /// Also rewind code to matching checkpoint + #[arg(long)] + pub rewind_code: bool, +} diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index 40d4109..4419580 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -1559,3 +1559,236 @@ pub struct RepositorySuggestionsQuery { /// Limit results (default: 10) pub limit: Option<i32>, } + +// ============================================================================= +// Resume and History System Types +// ============================================================================= + +/// Conversation snapshot for task resumption +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ConversationSnapshot { + pub id: Uuid, + pub task_id: Uuid, + pub checkpoint_id: Option<Uuid>, + /// Snapshot type: 'auto', 'manual', 'checkpoint' + pub snapshot_type: String, + pub message_count: i32, + #[sqlx(json)] + pub conversation_state: serde_json::Value, + #[sqlx(json)] + pub metadata: Option<serde_json::Value>, + pub created_at: DateTime<Utc>, +} + +/// History event for contract/task history tracking +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct HistoryEvent { + pub id: Uuid, + pub owner_id: Uuid, + pub contract_id: Option<Uuid>, + pub task_id: Option<Uuid>, + pub event_type: String, + pub event_subtype: Option<String>, + pub phase: Option<String>, + #[sqlx(json)] + pub event_data: serde_json::Value, + pub created_at: DateTime<Utc>, +} + +/// Unified conversation message for API responses +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ConversationMessage { + pub id: String, + /// Message role: 'user', 'assistant', 'system', 'tool' + pub role: String, + pub content: String, + pub timestamp: DateTime<Utc>, + #[serde(skip_serializing_if = "Option::is_none")] + pub tool_calls: Option<Vec<ToolCallInfo>>, + #[serde(skip_serializing_if = "Option::is_none")] + pub tool_name: Option<String>, + #[serde(skip_serializing_if = "Option::is_none")] + pub tool_input: Option<serde_json::Value>, + #[serde(skip_serializing_if = "Option::is_none")] + pub tool_result: Option<String>, + #[serde(skip_serializing_if = "Option::is_none")] + pub is_error: Option<bool>, + #[serde(skip_serializing_if = "Option::is_none")] + pub token_count: Option<i32>, + #[serde(skip_serializing_if = "Option::is_none")] + pub cost_usd: Option<f64>, +} + +/// Tool call information within a conversation message +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ToolCallInfo { + pub id: String, + pub name: String, + pub input: serde_json::Value, +} + +/// Query filters for history endpoints +#[derive(Debug, Deserialize, ToSchema, Default)] +#[serde(rename_all = "camelCase")] +pub struct HistoryQueryFilters { + pub phase: Option<String>, + pub event_types: Option<Vec<String>>, + pub from: Option<DateTime<Utc>>, + pub to: Option<DateTime<Utc>>, + pub limit: Option<i32>, + pub cursor: Option<String>, +} + +/// Request to resume a supervisor +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ResumeSupervisorRequest { + pub target_daemon_id: Option<Uuid>, + /// Resume mode: 'continue', 'restart_phase', 'from_checkpoint' + pub resume_mode: String, + pub checkpoint_id: Option<Uuid>, + pub additional_context: Option<String>, +} + +/// Request to resume from a checkpoint +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ResumeFromCheckpointRequest { + pub task_name: Option<String>, + pub plan: String, + pub include_conversation: Option<bool>, + pub target_daemon_id: Option<Uuid>, +} + +/// Request to rewind a task to a checkpoint +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct RewindTaskRequest { + pub checkpoint_id: Option<Uuid>, + pub checkpoint_sha: Option<String>, + /// Preserve mode: 'discard', 'create_branch', 'stash' + pub preserve_mode: String, + pub branch_name: Option<String>, +} + +/// Request to rewind a conversation +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct RewindConversationRequest { + pub to_message_id: Option<String>, + pub to_timestamp: Option<DateTime<Utc>>, + pub by_message_count: Option<i32>, + pub rewind_code: Option<bool>, +} + +/// Request to fork a task +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ForkTaskRequest { + /// Fork from type: 'checkpoint', 'timestamp', 'message_id' + pub fork_from_type: String, + pub fork_from_value: String, + pub new_task_name: String, + pub new_task_plan: String, + pub include_conversation: Option<bool>, + pub create_branch: Option<bool>, + pub branch_name: Option<String>, +} + +/// Response for contract history endpoint +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ContractHistoryResponse { + pub contract_id: Uuid, + pub entries: Vec<HistoryEvent>, + pub total_count: i64, + pub cursor: Option<String>, +} + +/// Response for task conversation endpoint +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct TaskConversationResponse { + pub task_id: Uuid, + pub task_name: String, + pub status: String, + pub messages: Vec<ConversationMessage>, + pub total_tokens: Option<i32>, + pub total_cost: Option<f64>, +} + +/// Response for supervisor conversation endpoint +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorConversationResponse { + pub contract_id: Uuid, + pub supervisor_task_id: Uuid, + pub phase: String, + pub last_activity: DateTime<Utc>, + pub pending_task_ids: Vec<Uuid>, + pub messages: Vec<ConversationMessage>, + pub spawned_tasks: Vec<TaskReference>, +} + +/// Reference to a task for history/conversation responses +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct TaskReference { + pub task_id: Uuid, + pub task_name: String, + pub status: String, + pub created_at: DateTime<Utc>, + pub completed_at: Option<DateTime<Utc>>, +} + +/// Response for task rewind operation +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct RewindTaskResponse { + pub task_id: Uuid, + pub rewinded_to: CheckpointInfo, + pub preserved_as: Option<PreservedState>, +} + +/// Checkpoint information in rewind response +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CheckpointInfo { + pub checkpoint_number: i32, + pub sha: String, + pub message: String, +} + +/// Preserved state information in rewind response +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct PreservedState { + /// State type: 'branch' or 'stash' + pub state_type: String, + pub reference: String, +} + +/// Response for task fork operation +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ForkTaskResponse { + pub new_task_id: Uuid, + pub source_task_id: Uuid, + pub fork_point: ForkPoint, + pub branch_name: Option<String>, + pub conversation_included: bool, + pub message_count: Option<i32>, +} + +/// Fork point information in fork response +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ForkPoint { + pub fork_type: String, + pub checkpoint: Option<TaskCheckpoint>, + pub timestamp: DateTime<Utc>, +} diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 92b2048..cb9d52f 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -1,16 +1,17 @@ //! Repository pattern for file database operations. use chrono::Utc; +use serde::Deserialize; use sqlx::PgPool; use uuid::Uuid; use super::models::{ Contract, ContractChatConversation, ContractChatMessageRecord, ContractEvent, ContractRepository, - ContractSummary, CreateContractRequest, CreateFileRequest, + ContractSummary, ConversationMessage, ConversationSnapshot, CreateContractRequest, CreateFileRequest, CreateTaskRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, File, FileSummary, - FileVersion, MeshChatConversation, MeshChatMessageRecord, SupervisorState, Task, TaskCheckpoint, - TaskEvent, TaskSummary, UpdateContractRequest, UpdateFileRequest, - UpdateTaskRequest, + FileVersion, HistoryEvent, HistoryQueryFilters, MeshChatConversation, MeshChatMessageRecord, + SupervisorState, Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest, + UpdateFileRequest, UpdateTaskRequest, }; /// Repository error types. @@ -3203,3 +3204,354 @@ pub async fn delete_repository_history( Ok(result.rows_affected() > 0) } + +// ============================================================================ +// Conversation Snapshots +// ============================================================================ + +/// Create a new conversation snapshot +pub async fn create_conversation_snapshot( + pool: &PgPool, + task_id: Uuid, + checkpoint_id: Option<Uuid>, + snapshot_type: &str, + message_count: i32, + conversation_state: serde_json::Value, + metadata: Option<serde_json::Value>, +) -> Result<ConversationSnapshot, sqlx::Error> { + sqlx::query_as::<_, ConversationSnapshot>( + r#" + INSERT INTO conversation_snapshots (task_id, checkpoint_id, snapshot_type, message_count, conversation_state, metadata) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING * + "# + ) + .bind(task_id) + .bind(checkpoint_id) + .bind(snapshot_type) + .bind(message_count) + .bind(conversation_state) + .bind(metadata) + .fetch_one(pool) + .await +} + +/// Get a conversation snapshot by ID +pub async fn get_conversation_snapshot( + pool: &PgPool, + id: Uuid, +) -> Result<Option<ConversationSnapshot>, sqlx::Error> { + sqlx::query_as::<_, ConversationSnapshot>( + "SELECT * FROM conversation_snapshots WHERE id = $1" + ) + .bind(id) + .fetch_optional(pool) + .await +} + +/// Get conversation snapshot at a specific checkpoint +pub async fn get_conversation_at_checkpoint( + pool: &PgPool, + checkpoint_id: Uuid, +) -> Result<Option<ConversationSnapshot>, sqlx::Error> { + sqlx::query_as::<_, ConversationSnapshot>( + "SELECT * FROM conversation_snapshots WHERE checkpoint_id = $1 ORDER BY created_at DESC LIMIT 1" + ) + .bind(checkpoint_id) + .fetch_optional(pool) + .await +} + +/// List conversation snapshots for a task +pub async fn list_conversation_snapshots( + pool: &PgPool, + task_id: Uuid, + limit: Option<i32>, +) -> Result<Vec<ConversationSnapshot>, sqlx::Error> { + let limit = limit.unwrap_or(100); + sqlx::query_as::<_, ConversationSnapshot>( + "SELECT * FROM conversation_snapshots WHERE task_id = $1 ORDER BY created_at DESC LIMIT $2" + ) + .bind(task_id) + .bind(limit) + .fetch_all(pool) + .await +} + +/// Delete conversation snapshots older than retention period +pub async fn cleanup_old_snapshots( + pool: &PgPool, + retention_days: i32, +) -> Result<u64, sqlx::Error> { + let result = sqlx::query( + "DELETE FROM conversation_snapshots WHERE created_at < NOW() - INTERVAL '1 day' * $1" + ) + .bind(retention_days) + .execute(pool) + .await?; + Ok(result.rows_affected()) +} + +// ============================================================================ +// History Events +// ============================================================================ + +/// Record a new history event +#[allow(clippy::too_many_arguments)] +pub async fn record_history_event( + pool: &PgPool, + owner_id: Uuid, + contract_id: Option<Uuid>, + task_id: Option<Uuid>, + event_type: &str, + event_subtype: Option<&str>, + phase: Option<&str>, + event_data: serde_json::Value, +) -> Result<HistoryEvent, sqlx::Error> { + sqlx::query_as::<_, HistoryEvent>( + r#" + INSERT INTO history_events (owner_id, contract_id, task_id, event_type, event_subtype, phase, event_data) + VALUES ($1, $2, $3, $4, $5, $6, $7) + RETURNING * + "# + ) + .bind(owner_id) + .bind(contract_id) + .bind(task_id) + .bind(event_type) + .bind(event_subtype) + .bind(phase) + .bind(event_data) + .fetch_one(pool) + .await +} + +/// Get contract history timeline +pub async fn get_contract_history( + pool: &PgPool, + contract_id: Uuid, + owner_id: Uuid, + filters: &HistoryQueryFilters, +) -> Result<(Vec<HistoryEvent>, i64), sqlx::Error> { + let limit = filters.limit.unwrap_or(100); + + let mut query = String::from( + "SELECT * FROM history_events WHERE contract_id = $1 AND owner_id = $2" + ); + let mut count_query = String::from( + "SELECT COUNT(*) FROM history_events WHERE contract_id = $1 AND owner_id = $2" + ); + + let mut param_count = 2; + + if filters.phase.is_some() { + param_count += 1; + query.push_str(&format!(" AND phase = ${}" , param_count)); + count_query.push_str(&format!(" AND phase = ${}", param_count)); + } + + if filters.from.is_some() { + param_count += 1; + query.push_str(&format!(" AND created_at >= ${}", param_count)); + count_query.push_str(&format!(" AND created_at >= ${}", param_count)); + } + + if filters.to.is_some() { + param_count += 1; + query.push_str(&format!(" AND created_at <= ${}", param_count)); + count_query.push_str(&format!(" AND created_at <= ${}", param_count)); + } + + query.push_str(" ORDER BY created_at DESC"); + query.push_str(&format!(" LIMIT {}", limit)); + + // Build and execute the query dynamically + let mut q = sqlx::query_as::<_, HistoryEvent>(&query) + .bind(contract_id) + .bind(owner_id); + + if let Some(ref phase) = filters.phase { + q = q.bind(phase); + } + if let Some(ref from) = filters.from { + q = q.bind(from); + } + if let Some(ref to) = filters.to { + q = q.bind(to); + } + + let events = q.fetch_all(pool).await?; + + // Get total count + let mut cq = sqlx::query_scalar::<_, i64>(&count_query) + .bind(contract_id) + .bind(owner_id); + + if let Some(ref phase) = filters.phase { + cq = cq.bind(phase); + } + if let Some(ref from) = filters.from { + cq = cq.bind(from); + } + if let Some(ref to) = filters.to { + cq = cq.bind(to); + } + + let count = cq.fetch_one(pool).await?; + + Ok((events, count)) +} + +/// Get task history +pub async fn get_task_history( + pool: &PgPool, + task_id: Uuid, + owner_id: Uuid, + filters: &HistoryQueryFilters, +) -> Result<(Vec<HistoryEvent>, i64), sqlx::Error> { + let limit = filters.limit.unwrap_or(100); + + let events = sqlx::query_as::<_, HistoryEvent>( + r#" + SELECT * FROM history_events + WHERE task_id = $1 AND owner_id = $2 + ORDER BY created_at DESC + LIMIT $3 + "# + ) + .bind(task_id) + .bind(owner_id) + .bind(limit) + .fetch_all(pool) + .await?; + + let count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM history_events WHERE task_id = $1 AND owner_id = $2" + ) + .bind(task_id) + .bind(owner_id) + .fetch_one(pool) + .await?; + + Ok((events, count)) +} + +/// Get unified timeline for an owner +pub async fn get_timeline( + pool: &PgPool, + owner_id: Uuid, + filters: &HistoryQueryFilters, +) -> Result<(Vec<HistoryEvent>, i64), sqlx::Error> { + let limit = filters.limit.unwrap_or(100); + + let events = sqlx::query_as::<_, HistoryEvent>( + r#" + SELECT * FROM history_events + WHERE owner_id = $1 + ORDER BY created_at DESC + LIMIT $2 + "# + ) + .bind(owner_id) + .bind(limit) + .fetch_all(pool) + .await?; + + let count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM history_events WHERE owner_id = $1" + ) + .bind(owner_id) + .fetch_one(pool) + .await?; + + Ok((events, count)) +} + +// ============================================================================ +// Task Conversation Retrieval +// ============================================================================ + +// Helper struct for parsing task output events +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct TaskOutputEvent { + message_type: String, + content: Option<String>, + tool_name: Option<String>, + tool_input: Option<serde_json::Value>, + is_error: Option<bool>, + cost_usd: Option<f32>, +} + +/// Get task conversation messages (reconstructed from task_events) +pub async fn get_task_conversation( + pool: &PgPool, + task_id: Uuid, + include_tool_calls: bool, + include_tool_results: bool, + limit: Option<i32>, +) -> Result<Vec<ConversationMessage>, sqlx::Error> { + let limit = limit.unwrap_or(1000); + + // Get output events that represent conversation turns + let events = sqlx::query_as::<_, TaskEvent>( + r#" + SELECT * FROM task_events + WHERE task_id = $1 AND event_type = 'output' + ORDER BY created_at ASC + LIMIT $2 + "# + ) + .bind(task_id) + .bind(limit) + .fetch_all(pool) + .await?; + + // Convert task events to conversation messages + let mut messages = Vec::new(); + for event in events { + if let Some(data) = event.event_data { + // Parse the event data to extract message info + if let Ok(output) = serde_json::from_value::<TaskOutputEvent>(data.clone()) { + let should_include = match output.message_type.as_str() { + "tool_use" => include_tool_calls, + "tool_result" => include_tool_results, + _ => true, + }; + + if should_include { + messages.push(ConversationMessage { + id: event.id.to_string(), + role: match output.message_type.as_str() { + "assistant" => "assistant".to_string(), + "tool_use" => "assistant".to_string(), + "tool_result" => "tool".to_string(), + "system" => "system".to_string(), + "error" => "system".to_string(), + _ => "user".to_string(), + }, + content: output.content.unwrap_or_default(), + timestamp: event.created_at, + tool_calls: None, + tool_name: output.tool_name, + tool_input: output.tool_input, + tool_result: None, + is_error: output.is_error, + token_count: None, + cost_usd: output.cost_usd.map(|c| c as f64), + }); + } + } + } + } + + Ok(messages) +} + +/// Get supervisor conversation (from supervisor_states) +pub async fn get_supervisor_conversation_full( + pool: &PgPool, + contract_id: Uuid, +) -> Result<Option<SupervisorState>, sqlx::Error> { + get_supervisor_state(pool, contract_id).await +} diff --git a/makima/src/server/handlers/history.rs b/makima/src/server/handlers/history.rs new file mode 100644 index 0000000..b3dec97 --- /dev/null +++ b/makima/src/server/handlers/history.rs @@ -0,0 +1,438 @@ +//! HTTP handlers for history and conversation APIs. + +use axum::{ + extract::{Path, Query, State}, + http::StatusCode, + response::IntoResponse, + Json, +}; +use uuid::Uuid; + +use crate::{ + db::{ + models::{ + ContractHistoryResponse, ConversationMessage, HistoryQueryFilters, + SupervisorConversationResponse, TaskConversationResponse, TaskReference, + }, + repository, + }, + server::{auth::Authenticated, messages::ApiError, state::SharedState}, +}; + +/// Query parameters for task conversation +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TaskConversationParams { + pub include_tool_calls: Option<bool>, + pub include_tool_results: Option<bool>, + pub limit: Option<i32>, +} + +/// Query parameters for timeline +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TimelineQueryFilters { + pub contract_id: Option<Uuid>, + pub task_id: Option<Uuid>, + pub include_subtasks: Option<bool>, + pub from: Option<chrono::DateTime<chrono::Utc>>, + pub to: Option<chrono::DateTime<chrono::Utc>>, + pub limit: Option<i32>, +} + +/// GET /api/v1/contracts/{id}/history +/// Returns contract history timeline with filtering and pagination +#[utoipa::path( + get, + path = "/api/v1/contracts/{id}/history", + params( + ("id" = Uuid, Path, description = "Contract ID"), + ("phase" = Option<String>, Query, description = "Filter by phase"), + ("event_types" = Option<String>, Query, description = "Filter by event types (comma-separated)"), + ("from" = Option<String>, Query, description = "Start date filter"), + ("to" = Option<String>, Query, description = "End date filter"), + ("limit" = Option<i32>, Query, description = "Limit results"), + ), + responses( + (status = 200, description = "Contract history", body = ContractHistoryResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 403, description = "Forbidden", body = ApiError), + (status = 404, description = "Contract not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "History" +)] +pub async fn get_contract_history( + State(state): State<SharedState>, + Path(contract_id): Path<Uuid>, + Query(filters): Query<HistoryQueryFilters>, + Authenticated(auth): Authenticated, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify contract exists and user has access + let contract = match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", contract_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get history events + match repository::get_contract_history(pool, contract.id, auth.owner_id, &filters).await { + Ok((events, total_count)) => { + Json(ContractHistoryResponse { + contract_id, + entries: events, + total_count, + cursor: None, // TODO: implement cursor pagination + }) + .into_response() + } + Err(e) => { + tracing::error!("Failed to get contract history: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// GET /api/v1/contracts/{id}/supervisor/conversation +/// Returns full supervisor conversation with spawned task references +#[utoipa::path( + get, + path = "/api/v1/contracts/{id}/supervisor/conversation", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + responses( + (status = 200, description = "Supervisor conversation", body = SupervisorConversationResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 403, description = "Forbidden", body = ApiError), + (status = 404, description = "Supervisor not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "History" +)] +pub async fn get_supervisor_conversation( + State(state): State<SharedState>, + Path(contract_id): Path<Uuid>, + Authenticated(auth): Authenticated, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get contract for phase info and ownership check + let contract = match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", contract_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get the supervisor state + let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await { + Ok(Some(s)) => s, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Supervisor not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get supervisor state for {}: {}", contract_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Parse conversation history from JSONB + let messages: Vec<ConversationMessage> = supervisor_state + .conversation_history + .as_array() + .map(|arr| { + arr.iter() + .enumerate() + .map(|(i, v)| ConversationMessage { + id: i.to_string(), + role: v + .get("role") + .and_then(|r| r.as_str()) + .unwrap_or("user") + .to_string(), + content: v + .get("content") + .and_then(|c| c.as_str()) + .unwrap_or("") + .to_string(), + timestamp: supervisor_state.last_activity, + tool_calls: None, + tool_name: None, + tool_input: None, + tool_result: None, + is_error: None, + token_count: None, + cost_usd: None, + }) + .collect() + }) + .unwrap_or_default(); + + // Get spawned tasks + let tasks = match repository::list_contract_tasks(pool, contract_id).await { + Ok(t) => t, + Err(e) => { + tracing::warn!("Failed to get tasks for contract {}: {}", contract_id, e); + Vec::new() + } + }; + + let spawned_tasks: Vec<TaskReference> = tasks + .into_iter() + .filter(|t| !t.is_supervisor) + .map(|t| TaskReference { + task_id: t.id, + task_name: t.name, + status: t.status, + created_at: t.created_at, + completed_at: t.completed_at, + }) + .collect(); + + Json(SupervisorConversationResponse { + contract_id, + supervisor_task_id: supervisor_state.task_id, + phase: contract.phase, + last_activity: supervisor_state.last_activity, + pending_task_ids: supervisor_state.pending_task_ids, + messages, + spawned_tasks, + }) + .into_response() +} + +/// GET /api/v1/mesh/tasks/{id}/conversation +/// Returns task conversation history +#[utoipa::path( + get, + path = "/api/v1/mesh/tasks/{id}/conversation", + params( + ("id" = Uuid, Path, description = "Task ID"), + ("include_tool_calls" = Option<bool>, Query, description = "Include tool call messages"), + ("include_tool_results" = Option<bool>, Query, description = "Include tool result messages"), + ("limit" = Option<i32>, Query, description = "Limit messages"), + ), + responses( + (status = 200, description = "Task conversation", body = TaskConversationResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 403, description = "Forbidden", body = ApiError), + (status = 404, description = "Task not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "History" +)] +pub async fn get_task_conversation( + State(state): State<SharedState>, + Path(task_id): Path<Uuid>, + Query(params): Query<TaskConversationParams>, + Authenticated(auth): Authenticated, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get task and verify ownership + let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task {}: {}", task_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get conversation messages + let messages = match repository::get_task_conversation( + pool, + task_id, + params.include_tool_calls.unwrap_or(true), + params.include_tool_results.unwrap_or(true), + params.limit, + ) + .await + { + Ok(m) => m, + Err(e) => { + tracing::error!("Failed to get task conversation: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Calculate totals + let total_cost: f64 = messages.iter().filter_map(|m| m.cost_usd).sum(); + + Json(TaskConversationResponse { + task_id, + task_name: task.name, + status: task.status, + messages, + total_tokens: None, + total_cost: if total_cost > 0.0 { + Some(total_cost) + } else { + None + }, + }) + .into_response() +} + +/// GET /api/v1/timeline +/// Returns unified timeline for authenticated user +#[utoipa::path( + get, + path = "/api/v1/timeline", + params( + ("contract_id" = Option<Uuid>, Query, description = "Filter by contract"), + ("task_id" = Option<Uuid>, Query, description = "Filter by task"), + ("include_subtasks" = Option<bool>, Query, description = "Include subtask events"), + ("from" = Option<String>, Query, description = "Start date filter"), + ("to" = Option<String>, Query, description = "End date filter"), + ("limit" = Option<i32>, Query, description = "Limit results"), + ), + responses( + (status = 200, description = "Timeline events", body = ContractHistoryResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "History" +)] +pub async fn get_timeline( + State(state): State<SharedState>, + Query(filters): Query<TimelineQueryFilters>, + Authenticated(auth): Authenticated, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + let history_filters = HistoryQueryFilters { + phase: None, + event_types: None, + from: filters.from, + to: filters.to, + limit: filters.limit, + cursor: None, + }; + + let result = if let Some(contract_id) = filters.contract_id { + repository::get_contract_history(pool, contract_id, auth.owner_id, &history_filters).await + } else if let Some(task_id) = filters.task_id { + repository::get_task_history(pool, task_id, auth.owner_id, &history_filters).await + } else { + repository::get_timeline(pool, auth.owner_id, &history_filters).await + }; + + match result { + Ok((events, total_count)) => { + Json(ContractHistoryResponse { + contract_id: filters.contract_id.unwrap_or_default(), + entries: events, + total_count, + cursor: None, + }) + .into_response() + } + Err(e) => { + tracing::error!("Failed to get timeline: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index 3da6fd5..b5ade53 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -2459,3 +2459,667 @@ pub async fn continue_task( }) .into_response() } + +// ============================================================================= +// Task Rewind and Fork (Resume and History System) +// ============================================================================= + +/// Rewind task code to specified checkpoint. +/// +/// POST /api/v1/mesh/tasks/{id}/rewind +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/rewind", + params( + ("id" = Uuid, Path, description = "Task ID") + ), + request_body = crate::db::models::RewindTaskRequest, + responses( + (status = 200, description = "Task rewound successfully", body = crate::db::models::RewindTaskResponse), + (status = 400, description = "Invalid request", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task or checkpoint not found", body = ApiError), + (status = 409, description = "Cannot rewind a running task", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn rewind_task( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(task_id): Path<Uuid>, + Json(req): Json<crate::db::models::RewindTaskRequest>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get task and verify ownership + let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task {}: {}", task_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Task cannot be running during rewind + if task.status == "running" { + return ( + StatusCode::CONFLICT, + Json(ApiError::new("TASK_RUNNING", "Cannot rewind a running task")), + ) + .into_response(); + } + + // Get checkpoint info + let checkpoint = if let Some(checkpoint_id) = req.checkpoint_id { + match repository::get_task_checkpoint(pool, checkpoint_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Checkpoint not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get checkpoint: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + } else if let Some(ref sha) = req.checkpoint_sha { + match repository::get_task_checkpoint_by_sha(pool, sha).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Checkpoint not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get checkpoint by SHA: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + } else { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "MISSING_CHECKPOINT", + "Must provide checkpoint_id or checkpoint_sha", + )), + ) + .into_response(); + }; + + // Verify checkpoint belongs to this task + if checkpoint.task_id != task_id { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "CHECKPOINT_MISMATCH", + "Checkpoint does not belong to this task", + )), + ) + .into_response(); + } + + // TODO: Send rewind command to daemon when daemon integration is complete + // For now, return a success response with checkpoint info + + tracing::info!( + task_id = %task_id, + checkpoint_number = checkpoint.checkpoint_number, + commit_sha = %checkpoint.commit_sha, + "Task rewind requested" + ); + + Json(crate::db::models::RewindTaskResponse { + task_id, + rewinded_to: crate::db::models::CheckpointInfo { + checkpoint_number: checkpoint.checkpoint_number, + sha: checkpoint.commit_sha.clone(), + message: checkpoint.message, + }, + preserved_as: req.branch_name.map(|name| crate::db::models::PreservedState { + state_type: "branch".to_string(), + reference: name, + }), + }) + .into_response() +} + +/// Fork task from historical point. +/// +/// POST /api/v1/mesh/tasks/{id}/fork +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/fork", + params( + ("id" = Uuid, Path, description = "Task ID") + ), + request_body = crate::db::models::ForkTaskRequest, + responses( + (status = 201, description = "Task forked successfully", body = crate::db::models::ForkTaskResponse), + (status = 400, description = "Invalid request", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task or checkpoint not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn fork_task( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(task_id): Path<Uuid>, + Json(req): Json<crate::db::models::ForkTaskRequest>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get source task and verify ownership + let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task {}: {}", task_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Find the checkpoint to fork from + let checkpoint = match req.fork_from_type.as_str() { + "checkpoint" => { + // fork_from_value is checkpoint number + let checkpoint_num: i32 = match req.fork_from_value.parse() { + Ok(n) => n, + Err(_) => { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("INVALID_CHECKPOINT", "Invalid checkpoint number")), + ) + .into_response(); + } + }; + + let checkpoints = match repository::list_task_checkpoints(pool, task_id).await { + Ok(c) => c, + Err(e) => { + tracing::error!("Failed to list checkpoints: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + match checkpoints + .into_iter() + .find(|c| c.checkpoint_number == checkpoint_num) + { + Some(c) => c, + None => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Checkpoint not found")), + ) + .into_response(); + } + } + } + _ => { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "UNSUPPORTED_FORK_TYPE", + "Only 'checkpoint' fork type is currently supported", + )), + ) + .into_response(); + } + }; + + // Create the new forked task + let create_req = CreateTaskRequest { + contract_id: task.contract_id.unwrap_or(Uuid::nil()), + name: req.new_task_name.clone(), + description: task.description.clone(), + plan: req.new_task_plan.clone(), + parent_task_id: None, // Forked tasks are independent + is_supervisor: false, + priority: task.priority, + repository_url: task.repository_url.clone(), + base_branch: task.base_branch.clone(), + target_branch: None, // New branch for forked work + merge_mode: task.merge_mode.clone(), + target_repo_path: task.target_repo_path.clone(), + completion_action: task.completion_action.clone(), + continue_from_task_id: None, + copy_files: None, + checkpoint_sha: Some(checkpoint.commit_sha.clone()), + }; + + let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await { + Ok(t) => t, + Err(e) => { + tracing::error!("Failed to create forked task: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + tracing::info!( + source_task_id = %task_id, + new_task_id = %new_task.id, + checkpoint_number = checkpoint.checkpoint_number, + "Task forked from checkpoint" + ); + + ( + StatusCode::CREATED, + Json(crate::db::models::ForkTaskResponse { + new_task_id: new_task.id, + source_task_id: task_id, + fork_point: crate::db::models::ForkPoint { + fork_type: "checkpoint".to_string(), + checkpoint: Some(checkpoint.clone()), + timestamp: checkpoint.created_at, + }, + branch_name: req.branch_name, + conversation_included: req.include_conversation.unwrap_or(false), + message_count: None, + }), + ) + .into_response() +} + +/// Create new task starting from specific checkpoint. +/// +/// POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume", + params( + ("id" = Uuid, Path, description = "Task ID"), + ("cid" = Uuid, Path, description = "Checkpoint ID") + ), + request_body = crate::db::models::ResumeFromCheckpointRequest, + responses( + (status = 201, description = "Task created from checkpoint", body = Task), + (status = 400, description = "Invalid request", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task or checkpoint not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn resume_from_checkpoint( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path((task_id, checkpoint_id)): Path<(Uuid, Uuid)>, + Json(req): Json<crate::db::models::ResumeFromCheckpointRequest>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get source task and verify ownership + let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task {}: {}", task_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get checkpoint + let checkpoint = match repository::get_task_checkpoint(pool, checkpoint_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Checkpoint not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get checkpoint: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Verify checkpoint belongs to the task + if checkpoint.task_id != task_id { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "CHECKPOINT_MISMATCH", + "Checkpoint does not belong to this task", + )), + ) + .into_response(); + } + + // Create the new task that will start from checkpoint + let task_name = req.task_name.unwrap_or_else(|| { + format!( + "{} (resumed from checkpoint {})", + task.name, checkpoint.checkpoint_number + ) + }); + + let create_req = CreateTaskRequest { + contract_id: task.contract_id.unwrap_or(Uuid::nil()), + name: task_name, + description: task.description.clone(), + plan: req.plan, + parent_task_id: None, + is_supervisor: false, + priority: task.priority, + repository_url: task.repository_url.clone(), + base_branch: task.base_branch.clone(), + target_branch: None, // New branch for resumed work + merge_mode: task.merge_mode.clone(), + target_repo_path: task.target_repo_path.clone(), + completion_action: task.completion_action.clone(), + continue_from_task_id: Some(task_id), // Copy worktree from original task + copy_files: None, + checkpoint_sha: Some(checkpoint.commit_sha.clone()), + }; + + let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await { + Ok(t) => t, + Err(e) => { + tracing::error!("Failed to create resumed task: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + tracing::info!( + source_task_id = %task_id, + new_task_id = %new_task.id, + checkpoint_id = %checkpoint_id, + checkpoint_number = checkpoint.checkpoint_number, + "Task resumed from checkpoint" + ); + + (StatusCode::CREATED, Json(new_task)).into_response() +} + +/// Request to create branch from checkpoint. +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreateBranchFromCheckpointRequest { + pub branch_name: String, + #[serde(default)] + pub checkout: bool, +} + +/// Response for branch creation from checkpoint. +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct BranchCreatedResponse { + pub branch_name: String, + pub commit_sha: String, + pub task_id: Uuid, + pub checkpoint_number: i32, +} + +/// Create git branch from checkpoint without starting task. +/// +/// POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch", + params( + ("id" = Uuid, Path, description = "Task ID"), + ("cid" = Uuid, Path, description = "Checkpoint ID") + ), + request_body = CreateBranchFromCheckpointRequest, + responses( + (status = 201, description = "Branch created", body = BranchCreatedResponse), + (status = 400, description = "Invalid request", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task or checkpoint not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn branch_from_checkpoint( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path((task_id, checkpoint_id)): Path<(Uuid, Uuid)>, + Json(req): Json<CreateBranchFromCheckpointRequest>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get task and verify ownership + let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task {}: {}", task_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get checkpoint + let checkpoint = match repository::get_task_checkpoint(pool, checkpoint_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Checkpoint not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get checkpoint: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Verify checkpoint belongs to the task + if checkpoint.task_id != task_id { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "CHECKPOINT_MISMATCH", + "Checkpoint does not belong to this task", + )), + ) + .into_response(); + } + + // Find a daemon to execute the branch creation + let target_daemon_id = if let Some(daemon_id) = task.daemon_id { + // Check if the original daemon is still connected + if state + .daemon_connections + .iter() + .any(|d| d.value().id == daemon_id && d.value().owner_id == auth.owner_id) + { + daemon_id + } else { + // Find any connected daemon for this owner + match state + .daemon_connections + .iter() + .find(|d| d.value().owner_id == auth.owner_id) + { + Some(d) => d.value().id, + None => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new( + "NO_DAEMON", + "No daemon connected to create branch", + )), + ) + .into_response(); + } + } + } + } else { + // No daemon assigned - use any available for this owner + match state + .daemon_connections + .iter() + .find(|d| d.value().owner_id == auth.owner_id) + { + Some(d) => d.value().id, + None => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new( + "NO_DAEMON", + "No daemon connected to create branch", + )), + ) + .into_response(); + } + } + }; + + // Send CreateBranch command to daemon + let cmd = DaemonCommand::CreateBranch { + task_id, + branch_name: req.branch_name.clone(), + from_ref: Some(checkpoint.commit_sha.clone()), + }; + + if let Err(e) = state.send_daemon_command(target_daemon_id, cmd).await { + tracing::error!("Failed to send CreateBranch command: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DAEMON_ERROR", e)), + ) + .into_response(); + } + + tracing::info!( + task_id = %task_id, + checkpoint_id = %checkpoint_id, + branch_name = %req.branch_name, + commit_sha = %checkpoint.commit_sha, + "Branch creation requested from checkpoint" + ); + + ( + StatusCode::CREATED, + Json(BranchCreatedResponse { + branch_name: req.branch_name, + commit_sha: checkpoint.commit_sha, + task_id, + checkpoint_number: checkpoint.checkpoint_number, + }), + ) + .into_response() +} diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index 278d0f5..b45dda5 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -1504,3 +1504,384 @@ pub async fn answer_question( Json(AnswerQuestionResponse { success }).into_response() } + +// ============================================================================= +// Supervisor Resume and Conversation Rewind +// ============================================================================= + +/// Response for supervisor resume +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ResumeSupervisorResponse { + pub supervisor_task_id: Uuid, + pub daemon_id: Option<Uuid>, + pub resumed_from: ResumedFromInfo, + pub status: String, +} + +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ResumedFromInfo { + pub phase: String, + pub last_activity: chrono::DateTime<chrono::Utc>, + pub message_count: i32, +} + +/// Resume interrupted supervisor with specified mode. +/// +/// POST /api/v1/contracts/{id}/supervisor/resume +#[utoipa::path( + post, + path = "/api/v1/contracts/{id}/supervisor/resume", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + request_body = crate::db::models::ResumeSupervisorRequest, + responses( + (status = 200, description = "Supervisor resumed", body = ResumeSupervisorResponse), + (status = 400, description = "Invalid request", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract or supervisor not found", body = ApiError), + (status = 409, description = "Supervisor is already running", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh Supervisor" +)] +pub async fn resume_supervisor( + State(state): State<SharedState>, + Path(contract_id): Path<Uuid>, + auth: crate::server::auth::Authenticated, + Json(req): Json<crate::db::models::ResumeSupervisorRequest>, +) -> impl IntoResponse { + let crate::server::auth::Authenticated(auth_info) = auth; + + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get contract and verify ownership + let contract = match repository::get_contract_for_owner(pool, contract_id, auth_info.owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", contract_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get existing supervisor state + let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await { + Ok(Some(s)) => s, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new( + "NO_SUPERVISOR_STATE", + "No supervisor state found - supervisor may not have been started", + )), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get supervisor state: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get supervisor task + let supervisor_task = match repository::get_task_for_owner(pool, supervisor_state.task_id, auth_info.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Supervisor task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get supervisor task: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Check if already running + if supervisor_task.status == "running" { + return ( + StatusCode::CONFLICT, + Json(ApiError::new("ALREADY_RUNNING", "Supervisor is already running")), + ) + .into_response(); + } + + // Calculate message count from conversation history + let message_count = supervisor_state + .conversation_history + .as_array() + .map(|arr| arr.len() as i32) + .unwrap_or(0); + + // Based on resume mode, handle differently + match req.resume_mode.as_str() { + "continue" => { + // Mark task for reassignment with existing conversation context + if let Err(e) = sqlx::query("UPDATE tasks SET status = 'pending' WHERE id = $1") + .bind(supervisor_state.task_id) + .execute(pool) + .await + { + tracing::error!("Failed to update task status: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + "restart_phase" => { + // Clear conversation but keep phase progress + if let Err(e) = repository::update_supervisor_conversation( + pool, + contract_id, + serde_json::json!([]), + ) + .await + { + tracing::error!("Failed to clear conversation: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + + if let Err(e) = sqlx::query("UPDATE tasks SET status = 'pending' WHERE id = $1") + .bind(supervisor_state.task_id) + .execute(pool) + .await + { + tracing::error!("Failed to update task status: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + "from_checkpoint" => { + // This would require more complex handling with checkpoint system + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "NOT_IMPLEMENTED", + "from_checkpoint mode not yet implemented", + )), + ) + .into_response(); + } + _ => { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "INVALID_RESUME_MODE", + "Invalid resume_mode. Use: continue, restart_phase, or from_checkpoint", + )), + ) + .into_response(); + } + } + + tracing::info!( + contract_id = %contract_id, + supervisor_task_id = %supervisor_state.task_id, + resume_mode = %req.resume_mode, + message_count = message_count, + "Supervisor resume requested" + ); + + Json(ResumeSupervisorResponse { + supervisor_task_id: supervisor_state.task_id, + daemon_id: supervisor_task.daemon_id, + resumed_from: ResumedFromInfo { + phase: contract.phase, + last_activity: supervisor_state.last_activity, + message_count, + }, + status: "pending".to_string(), + }) + .into_response() +} + +/// Response for conversation rewind +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct RewindConversationResponse { + pub contract_id: Uuid, + pub messages_removed: i32, + pub new_message_count: i32, + pub code_rewound: bool, +} + +/// Rewind supervisor conversation to specified point. +/// +/// POST /api/v1/contracts/{id}/supervisor/conversation/rewind +#[utoipa::path( + post, + path = "/api/v1/contracts/{id}/supervisor/conversation/rewind", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + request_body = crate::db::models::RewindConversationRequest, + responses( + (status = 200, description = "Conversation rewound", body = RewindConversationResponse), + (status = 400, description = "Invalid request", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract or supervisor not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh Supervisor" +)] +pub async fn rewind_conversation( + State(state): State<SharedState>, + Path(contract_id): Path<Uuid>, + auth: crate::server::auth::Authenticated, + Json(req): Json<crate::db::models::RewindConversationRequest>, +) -> impl IntoResponse { + let crate::server::auth::Authenticated(auth_info) = auth; + + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get contract and verify ownership + let _contract = match repository::get_contract_for_owner(pool, contract_id, auth_info.owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", contract_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get supervisor state + let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await { + Ok(Some(s)) => s, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Supervisor state not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get supervisor state: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + let conversation = supervisor_state + .conversation_history + .as_array() + .cloned() + .unwrap_or_default(); + + let original_count = conversation.len() as i32; + + // Determine how many messages to keep + let new_count = if let Some(by_count) = req.by_message_count { + (original_count - by_count).max(0) + } else if let Some(to_index) = req.to_message_index { + // Keep messages up to and including the specified index + (to_index + 1).min(original_count).max(0) + } else { + // Default to removing last message + (original_count - 1).max(0) + }; + + // Truncate conversation + let new_conversation: Vec<serde_json::Value> = conversation + .into_iter() + .take(new_count as usize) + .collect(); + + // Update the conversation + if let Err(e) = repository::update_supervisor_conversation( + pool, + contract_id, + serde_json::Value::Array(new_conversation), + ) + .await + { + tracing::error!("Failed to update conversation: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + + tracing::info!( + contract_id = %contract_id, + original_count = original_count, + new_count = new_count, + messages_removed = original_count - new_count, + "Conversation rewound" + ); + + Json(RewindConversationResponse { + contract_id, + messages_removed: original_count - new_count, + new_message_count: new_count, + code_rewound: req.rewind_code.unwrap_or(false), // TODO: implement code rewind + }) + .into_response() +} diff --git a/makima/src/server/handlers/mod.rs b/makima/src/server/handlers/mod.rs index b5650fd..609b63b 100644 --- a/makima/src/server/handlers/mod.rs +++ b/makima/src/server/handlers/mod.rs @@ -7,6 +7,7 @@ pub mod contract_daemon; pub mod contracts; pub mod file_ws; pub mod files; +pub mod history; pub mod listen; pub mod mesh; pub mod mesh_chat; diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index 0eba009..cf63f71 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -105,6 +105,11 @@ pub fn make_router(state: SharedState) -> Router { // Checkpoint endpoints .route("/mesh/tasks/{id}/checkpoint", post(mesh_supervisor::create_checkpoint)) .route("/mesh/tasks/{id}/checkpoints", get(mesh_supervisor::list_checkpoints)) + // Resume and rewind endpoints + .route("/mesh/tasks/{id}/rewind", post(mesh::rewind_task)) + .route("/mesh/tasks/{id}/fork", post(mesh::fork_task)) + .route("/mesh/tasks/{id}/checkpoints/{cid}/resume", post(mesh::resume_from_checkpoint)) + .route("/mesh/tasks/{id}/checkpoints/{cid}/branch", post(mesh::branch_from_checkpoint)) // Supervisor endpoints (for supervisor.sh) .route("/mesh/supervisor/contracts/{contract_id}/tasks", get(mesh_supervisor::list_contract_tasks)) .route("/mesh/supervisor/contracts/{contract_id}/tree", get(mesh_supervisor::get_contract_tree)) @@ -156,6 +161,9 @@ pub fn make_router(state: SharedState) -> Router { "/contracts/{id}/chat/history", get(contract_chat::get_contract_chat_history).delete(contract_chat::clear_contract_chat_history), ) + // Contract supervisor resume endpoints + .route("/contracts/{id}/supervisor/resume", post(mesh_supervisor::resume_supervisor)) + .route("/contracts/{id}/supervisor/conversation/rewind", post(mesh_supervisor::rewind_conversation)) // Contract daemon endpoints (for tasks to interact with contracts) .route("/contracts/{id}/daemon/status", get(contract_daemon::get_contract_status)) .route("/contracts/{id}/daemon/checklist", get(contract_daemon::get_contract_checklist)) |
