From 6ee2e75834bff187b8c262e0798ef365bc21cd59 Mon Sep 17 00:00:00 2001 From: soryu Date: Thu, 15 Jan 2026 22:33:47 +0000 Subject: Add resume and history system for makima (#1) This PR implements a comprehensive resume and history system that enables: 1. **History Viewing** - View complete conversation history for contracts across all phases - View conversation history for individual tasks - View task output/tool call history with timestamps - View checkpoint history - Timeline view showing all activities 2. **Resume System** - Resume interrupted supervisor conversations with full context - Resume interrupted task conversations - Resume from specific checkpoints - Continue tasks from previous task state (worktree inheritance) 3. **Rewind/Restore Features** - Rewind code to any checkpoint (git restore) - Rewind conversation to any point - Create new branches from historical points - Fork tasks from any point in history - New migration: 20250117000000_history_tables.sql - conversation_snapshots table for storing conversation state - history_events table for unified timeline - Added forking fields to tasks table - Added conversation_snapshot_id to task_checkpoints - ConversationSnapshot, HistoryEvent, ConversationMessage - Request/response types for resume and rewind operations - Query filter types for history endpoints - CRUD functions for conversation_snapshots - CRUD functions for history_events - Task conversation retrieval from task_events - GET /api/v1/contracts/{id}/history - GET /api/v1/contracts/{id}/supervisor/conversation - GET /api/v1/mesh/tasks/{id}/conversation - GET /api/v1/timeline - POST /api/v1/contracts/{id}/supervisor/resume - POST /api/v1/mesh/tasks/{id}/rewind - POST /api/v1/mesh/tasks/{id}/fork - POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume - POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch - POST /api/v1/contracts/{id}/supervisor/conversation/rewind - task-history: View task conversation history - task-checkpoints: List task checkpoints - resume: Resume supervisor after interruption - task-resume-from: Resume task from checkpoint - task-rewind: Rewind task code to checkpoint - task-fork: Fork task from historical point - rewind-conversation: Rewind supervisor conversation --- makima/src/daemon/cli/mod.rs | 21 + makima/src/daemon/cli/supervisor.rs | 161 +++++++ makima/src/db/models.rs | 233 +++++++++ makima/src/db/repository.rs | 360 +++++++++++++- makima/src/server/handlers/history.rs | 438 +++++++++++++++++ makima/src/server/handlers/mesh.rs | 664 ++++++++++++++++++++++++++ makima/src/server/handlers/mesh_supervisor.rs | 381 +++++++++++++++ makima/src/server/handlers/mod.rs | 1 + makima/src/server/mod.rs | 8 + 9 files changed, 2263 insertions(+), 4 deletions(-) create mode 100644 makima/src/server/handlers/history.rs (limited to 'makima/src') diff --git a/makima/src/daemon/cli/mod.rs b/makima/src/daemon/cli/mod.rs index da71b0d..cde6e16 100644 --- a/makima/src/daemon/cli/mod.rs +++ b/makima/src/daemon/cli/mod.rs @@ -88,6 +88,27 @@ pub enum SupervisorCommand { /// Get task output/claude log Output(supervisor::GetTaskOutputArgs), + + /// View task conversation history + TaskHistory(supervisor::TaskHistoryArgs), + + /// List task checkpoints (with optional diff) + TaskCheckpoints(supervisor::TaskCheckpointsArgs), + + /// Resume supervisor after interruption + Resume(supervisor::ResumeArgs), + + /// Resume task from checkpoint + TaskResumeFrom(supervisor::TaskResumeFromArgs), + + /// Rewind task code to checkpoint + TaskRewind(supervisor::TaskRewindArgs), + + /// Fork task from historical point + TaskFork(supervisor::TaskForkArgs), + + /// Rewind supervisor conversation + RewindConversation(supervisor::ConversationRewindArgs), } /// Contract subcommands for task-contract interaction. diff --git a/makima/src/daemon/cli/supervisor.rs b/makima/src/daemon/cli/supervisor.rs index 2bc4c89..ba4fb2b 100644 --- a/makima/src/daemon/cli/supervisor.rs +++ b/makima/src/daemon/cli/supervisor.rs @@ -221,3 +221,164 @@ pub struct GetTaskOutputArgs { #[arg(index = 1, id = "target_task_id")] pub target_task_id: Uuid, } + +// ============================================================================ +// History Command Args +// ============================================================================ + +/// Arguments for task-history command. +#[derive(Args, Debug)] +pub struct TaskHistoryArgs { + #[command(flatten)] + pub common: SupervisorArgs, + + /// Task ID to view history for + #[arg(index = 1)] + pub task_id: Uuid, + + /// Include tool calls in output + #[arg(long, default_value = "true")] + pub tool_calls: bool, + + /// Maximum messages to return + #[arg(long)] + pub limit: Option, + + /// Output format (table, json, chat) + #[arg(long, default_value = "chat")] + pub format: String, +} + +/// Arguments for task-checkpoints command (with optional diff). +#[derive(Args, Debug)] +pub struct TaskCheckpointsArgs { + #[command(flatten)] + pub common: SupervisorArgs, + + /// Task ID to list checkpoints for + #[arg(index = 1)] + pub task_id: Uuid, + + /// Include diff summary + #[arg(long)] + pub with_diff: bool, +} + +// ============================================================================ +// Resume Command Args +// ============================================================================ + +/// Arguments for resume command. +#[derive(Args, Debug)] +pub struct ResumeArgs { + #[command(flatten)] + pub common: SupervisorArgs, + + /// Resume mode: continue, restart_phase, from_checkpoint + #[arg(long, default_value = "continue")] + pub mode: String, + + /// Checkpoint ID (required for from_checkpoint mode) + #[arg(long)] + pub checkpoint: Option, + + /// Additional context to inject + #[arg(long)] + pub context: Option, +} + +/// Arguments for task-resume-from command. +#[derive(Args, Debug)] +pub struct TaskResumeFromArgs { + #[command(flatten)] + pub common: SupervisorArgs, + + /// Source task ID + #[arg(index = 1)] + pub task_id: Uuid, + + /// Checkpoint number to resume from + #[arg(long)] + pub checkpoint: i32, + + /// Plan for the new task + #[arg(long)] + pub plan: String, + + /// Name for the new task + #[arg(long)] + pub name: Option, +} + +// ============================================================================ +// Rewind Command Args +// ============================================================================ + +/// Arguments for task-rewind command. +#[derive(Args, Debug)] +pub struct TaskRewindArgs { + #[command(flatten)] + pub common: SupervisorArgs, + + /// Task ID to rewind + #[arg(index = 1)] + pub task_id: Uuid, + + /// Checkpoint number to rewind to + #[arg(long)] + pub checkpoint: i32, + + /// Preserve mode: discard, create_branch, stash + #[arg(long, default_value = "create_branch")] + pub preserve: String, + + /// Branch name (for create_branch mode) + #[arg(long)] + pub branch_name: Option, +} + +/// Arguments for task-fork command. +#[derive(Args, Debug)] +pub struct TaskForkArgs { + #[command(flatten)] + pub common: SupervisorArgs, + + /// Source task ID + #[arg(index = 1)] + pub task_id: Uuid, + + /// Checkpoint number to fork from + #[arg(long)] + pub checkpoint: i32, + + /// Name for the new task + #[arg(long)] + pub name: String, + + /// Plan for the new task + #[arg(long)] + pub plan: String, + + /// Include conversation history + #[arg(long, default_value = "true")] + pub include_conversation: bool, +} + +/// Arguments for rewind-conversation command. +#[derive(Args, Debug)] +pub struct ConversationRewindArgs { + #[command(flatten)] + pub common: SupervisorArgs, + + /// Number of messages to rewind + #[arg(long)] + pub by_messages: Option, + + /// Message ID to rewind to + #[arg(long)] + pub to_message: Option, + + /// Also rewind code to matching checkpoint + #[arg(long)] + pub rewind_code: bool, +} diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index 40d4109..4419580 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -1559,3 +1559,236 @@ pub struct RepositorySuggestionsQuery { /// Limit results (default: 10) pub limit: Option, } + +// ============================================================================= +// Resume and History System Types +// ============================================================================= + +/// Conversation snapshot for task resumption +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ConversationSnapshot { + pub id: Uuid, + pub task_id: Uuid, + pub checkpoint_id: Option, + /// Snapshot type: 'auto', 'manual', 'checkpoint' + pub snapshot_type: String, + pub message_count: i32, + #[sqlx(json)] + pub conversation_state: serde_json::Value, + #[sqlx(json)] + pub metadata: Option, + pub created_at: DateTime, +} + +/// History event for contract/task history tracking +#[derive(Debug, Clone, FromRow, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct HistoryEvent { + pub id: Uuid, + pub owner_id: Uuid, + pub contract_id: Option, + pub task_id: Option, + pub event_type: String, + pub event_subtype: Option, + pub phase: Option, + #[sqlx(json)] + pub event_data: serde_json::Value, + pub created_at: DateTime, +} + +/// Unified conversation message for API responses +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ConversationMessage { + pub id: String, + /// Message role: 'user', 'assistant', 'system', 'tool' + pub role: String, + pub content: String, + pub timestamp: DateTime, + #[serde(skip_serializing_if = "Option::is_none")] + pub tool_calls: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub tool_name: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub tool_input: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub tool_result: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub is_error: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub token_count: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub cost_usd: Option, +} + +/// Tool call information within a conversation message +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ToolCallInfo { + pub id: String, + pub name: String, + pub input: serde_json::Value, +} + +/// Query filters for history endpoints +#[derive(Debug, Deserialize, ToSchema, Default)] +#[serde(rename_all = "camelCase")] +pub struct HistoryQueryFilters { + pub phase: Option, + pub event_types: Option>, + pub from: Option>, + pub to: Option>, + pub limit: Option, + pub cursor: Option, +} + +/// Request to resume a supervisor +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ResumeSupervisorRequest { + pub target_daemon_id: Option, + /// Resume mode: 'continue', 'restart_phase', 'from_checkpoint' + pub resume_mode: String, + pub checkpoint_id: Option, + pub additional_context: Option, +} + +/// Request to resume from a checkpoint +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ResumeFromCheckpointRequest { + pub task_name: Option, + pub plan: String, + pub include_conversation: Option, + pub target_daemon_id: Option, +} + +/// Request to rewind a task to a checkpoint +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct RewindTaskRequest { + pub checkpoint_id: Option, + pub checkpoint_sha: Option, + /// Preserve mode: 'discard', 'create_branch', 'stash' + pub preserve_mode: String, + pub branch_name: Option, +} + +/// Request to rewind a conversation +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct RewindConversationRequest { + pub to_message_id: Option, + pub to_timestamp: Option>, + pub by_message_count: Option, + pub rewind_code: Option, +} + +/// Request to fork a task +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ForkTaskRequest { + /// Fork from type: 'checkpoint', 'timestamp', 'message_id' + pub fork_from_type: String, + pub fork_from_value: String, + pub new_task_name: String, + pub new_task_plan: String, + pub include_conversation: Option, + pub create_branch: Option, + pub branch_name: Option, +} + +/// Response for contract history endpoint +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ContractHistoryResponse { + pub contract_id: Uuid, + pub entries: Vec, + pub total_count: i64, + pub cursor: Option, +} + +/// Response for task conversation endpoint +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct TaskConversationResponse { + pub task_id: Uuid, + pub task_name: String, + pub status: String, + pub messages: Vec, + pub total_tokens: Option, + pub total_cost: Option, +} + +/// Response for supervisor conversation endpoint +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorConversationResponse { + pub contract_id: Uuid, + pub supervisor_task_id: Uuid, + pub phase: String, + pub last_activity: DateTime, + pub pending_task_ids: Vec, + pub messages: Vec, + pub spawned_tasks: Vec, +} + +/// Reference to a task for history/conversation responses +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct TaskReference { + pub task_id: Uuid, + pub task_name: String, + pub status: String, + pub created_at: DateTime, + pub completed_at: Option>, +} + +/// Response for task rewind operation +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct RewindTaskResponse { + pub task_id: Uuid, + pub rewinded_to: CheckpointInfo, + pub preserved_as: Option, +} + +/// Checkpoint information in rewind response +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CheckpointInfo { + pub checkpoint_number: i32, + pub sha: String, + pub message: String, +} + +/// Preserved state information in rewind response +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct PreservedState { + /// State type: 'branch' or 'stash' + pub state_type: String, + pub reference: String, +} + +/// Response for task fork operation +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ForkTaskResponse { + pub new_task_id: Uuid, + pub source_task_id: Uuid, + pub fork_point: ForkPoint, + pub branch_name: Option, + pub conversation_included: bool, + pub message_count: Option, +} + +/// Fork point information in fork response +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ForkPoint { + pub fork_type: String, + pub checkpoint: Option, + pub timestamp: DateTime, +} diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 92b2048..cb9d52f 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -1,16 +1,17 @@ //! Repository pattern for file database operations. use chrono::Utc; +use serde::Deserialize; use sqlx::PgPool; use uuid::Uuid; use super::models::{ Contract, ContractChatConversation, ContractChatMessageRecord, ContractEvent, ContractRepository, - ContractSummary, CreateContractRequest, CreateFileRequest, + ContractSummary, ConversationMessage, ConversationSnapshot, CreateContractRequest, CreateFileRequest, CreateTaskRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, File, FileSummary, - FileVersion, MeshChatConversation, MeshChatMessageRecord, SupervisorState, Task, TaskCheckpoint, - TaskEvent, TaskSummary, UpdateContractRequest, UpdateFileRequest, - UpdateTaskRequest, + FileVersion, HistoryEvent, HistoryQueryFilters, MeshChatConversation, MeshChatMessageRecord, + SupervisorState, Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest, + UpdateFileRequest, UpdateTaskRequest, }; /// Repository error types. @@ -3203,3 +3204,354 @@ pub async fn delete_repository_history( Ok(result.rows_affected() > 0) } + +// ============================================================================ +// Conversation Snapshots +// ============================================================================ + +/// Create a new conversation snapshot +pub async fn create_conversation_snapshot( + pool: &PgPool, + task_id: Uuid, + checkpoint_id: Option, + snapshot_type: &str, + message_count: i32, + conversation_state: serde_json::Value, + metadata: Option, +) -> Result { + sqlx::query_as::<_, ConversationSnapshot>( + r#" + INSERT INTO conversation_snapshots (task_id, checkpoint_id, snapshot_type, message_count, conversation_state, metadata) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING * + "# + ) + .bind(task_id) + .bind(checkpoint_id) + .bind(snapshot_type) + .bind(message_count) + .bind(conversation_state) + .bind(metadata) + .fetch_one(pool) + .await +} + +/// Get a conversation snapshot by ID +pub async fn get_conversation_snapshot( + pool: &PgPool, + id: Uuid, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, ConversationSnapshot>( + "SELECT * FROM conversation_snapshots WHERE id = $1" + ) + .bind(id) + .fetch_optional(pool) + .await +} + +/// Get conversation snapshot at a specific checkpoint +pub async fn get_conversation_at_checkpoint( + pool: &PgPool, + checkpoint_id: Uuid, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, ConversationSnapshot>( + "SELECT * FROM conversation_snapshots WHERE checkpoint_id = $1 ORDER BY created_at DESC LIMIT 1" + ) + .bind(checkpoint_id) + .fetch_optional(pool) + .await +} + +/// List conversation snapshots for a task +pub async fn list_conversation_snapshots( + pool: &PgPool, + task_id: Uuid, + limit: Option, +) -> Result, sqlx::Error> { + let limit = limit.unwrap_or(100); + sqlx::query_as::<_, ConversationSnapshot>( + "SELECT * FROM conversation_snapshots WHERE task_id = $1 ORDER BY created_at DESC LIMIT $2" + ) + .bind(task_id) + .bind(limit) + .fetch_all(pool) + .await +} + +/// Delete conversation snapshots older than retention period +pub async fn cleanup_old_snapshots( + pool: &PgPool, + retention_days: i32, +) -> Result { + let result = sqlx::query( + "DELETE FROM conversation_snapshots WHERE created_at < NOW() - INTERVAL '1 day' * $1" + ) + .bind(retention_days) + .execute(pool) + .await?; + Ok(result.rows_affected()) +} + +// ============================================================================ +// History Events +// ============================================================================ + +/// Record a new history event +#[allow(clippy::too_many_arguments)] +pub async fn record_history_event( + pool: &PgPool, + owner_id: Uuid, + contract_id: Option, + task_id: Option, + event_type: &str, + event_subtype: Option<&str>, + phase: Option<&str>, + event_data: serde_json::Value, +) -> Result { + sqlx::query_as::<_, HistoryEvent>( + r#" + INSERT INTO history_events (owner_id, contract_id, task_id, event_type, event_subtype, phase, event_data) + VALUES ($1, $2, $3, $4, $5, $6, $7) + RETURNING * + "# + ) + .bind(owner_id) + .bind(contract_id) + .bind(task_id) + .bind(event_type) + .bind(event_subtype) + .bind(phase) + .bind(event_data) + .fetch_one(pool) + .await +} + +/// Get contract history timeline +pub async fn get_contract_history( + pool: &PgPool, + contract_id: Uuid, + owner_id: Uuid, + filters: &HistoryQueryFilters, +) -> Result<(Vec, i64), sqlx::Error> { + let limit = filters.limit.unwrap_or(100); + + let mut query = String::from( + "SELECT * FROM history_events WHERE contract_id = $1 AND owner_id = $2" + ); + let mut count_query = String::from( + "SELECT COUNT(*) FROM history_events WHERE contract_id = $1 AND owner_id = $2" + ); + + let mut param_count = 2; + + if filters.phase.is_some() { + param_count += 1; + query.push_str(&format!(" AND phase = ${}" , param_count)); + count_query.push_str(&format!(" AND phase = ${}", param_count)); + } + + if filters.from.is_some() { + param_count += 1; + query.push_str(&format!(" AND created_at >= ${}", param_count)); + count_query.push_str(&format!(" AND created_at >= ${}", param_count)); + } + + if filters.to.is_some() { + param_count += 1; + query.push_str(&format!(" AND created_at <= ${}", param_count)); + count_query.push_str(&format!(" AND created_at <= ${}", param_count)); + } + + query.push_str(" ORDER BY created_at DESC"); + query.push_str(&format!(" LIMIT {}", limit)); + + // Build and execute the query dynamically + let mut q = sqlx::query_as::<_, HistoryEvent>(&query) + .bind(contract_id) + .bind(owner_id); + + if let Some(ref phase) = filters.phase { + q = q.bind(phase); + } + if let Some(ref from) = filters.from { + q = q.bind(from); + } + if let Some(ref to) = filters.to { + q = q.bind(to); + } + + let events = q.fetch_all(pool).await?; + + // Get total count + let mut cq = sqlx::query_scalar::<_, i64>(&count_query) + .bind(contract_id) + .bind(owner_id); + + if let Some(ref phase) = filters.phase { + cq = cq.bind(phase); + } + if let Some(ref from) = filters.from { + cq = cq.bind(from); + } + if let Some(ref to) = filters.to { + cq = cq.bind(to); + } + + let count = cq.fetch_one(pool).await?; + + Ok((events, count)) +} + +/// Get task history +pub async fn get_task_history( + pool: &PgPool, + task_id: Uuid, + owner_id: Uuid, + filters: &HistoryQueryFilters, +) -> Result<(Vec, i64), sqlx::Error> { + let limit = filters.limit.unwrap_or(100); + + let events = sqlx::query_as::<_, HistoryEvent>( + r#" + SELECT * FROM history_events + WHERE task_id = $1 AND owner_id = $2 + ORDER BY created_at DESC + LIMIT $3 + "# + ) + .bind(task_id) + .bind(owner_id) + .bind(limit) + .fetch_all(pool) + .await?; + + let count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM history_events WHERE task_id = $1 AND owner_id = $2" + ) + .bind(task_id) + .bind(owner_id) + .fetch_one(pool) + .await?; + + Ok((events, count)) +} + +/// Get unified timeline for an owner +pub async fn get_timeline( + pool: &PgPool, + owner_id: Uuid, + filters: &HistoryQueryFilters, +) -> Result<(Vec, i64), sqlx::Error> { + let limit = filters.limit.unwrap_or(100); + + let events = sqlx::query_as::<_, HistoryEvent>( + r#" + SELECT * FROM history_events + WHERE owner_id = $1 + ORDER BY created_at DESC + LIMIT $2 + "# + ) + .bind(owner_id) + .bind(limit) + .fetch_all(pool) + .await?; + + let count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM history_events WHERE owner_id = $1" + ) + .bind(owner_id) + .fetch_one(pool) + .await?; + + Ok((events, count)) +} + +// ============================================================================ +// Task Conversation Retrieval +// ============================================================================ + +// Helper struct for parsing task output events +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct TaskOutputEvent { + message_type: String, + content: Option, + tool_name: Option, + tool_input: Option, + is_error: Option, + cost_usd: Option, +} + +/// Get task conversation messages (reconstructed from task_events) +pub async fn get_task_conversation( + pool: &PgPool, + task_id: Uuid, + include_tool_calls: bool, + include_tool_results: bool, + limit: Option, +) -> Result, sqlx::Error> { + let limit = limit.unwrap_or(1000); + + // Get output events that represent conversation turns + let events = sqlx::query_as::<_, TaskEvent>( + r#" + SELECT * FROM task_events + WHERE task_id = $1 AND event_type = 'output' + ORDER BY created_at ASC + LIMIT $2 + "# + ) + .bind(task_id) + .bind(limit) + .fetch_all(pool) + .await?; + + // Convert task events to conversation messages + let mut messages = Vec::new(); + for event in events { + if let Some(data) = event.event_data { + // Parse the event data to extract message info + if let Ok(output) = serde_json::from_value::(data.clone()) { + let should_include = match output.message_type.as_str() { + "tool_use" => include_tool_calls, + "tool_result" => include_tool_results, + _ => true, + }; + + if should_include { + messages.push(ConversationMessage { + id: event.id.to_string(), + role: match output.message_type.as_str() { + "assistant" => "assistant".to_string(), + "tool_use" => "assistant".to_string(), + "tool_result" => "tool".to_string(), + "system" => "system".to_string(), + "error" => "system".to_string(), + _ => "user".to_string(), + }, + content: output.content.unwrap_or_default(), + timestamp: event.created_at, + tool_calls: None, + tool_name: output.tool_name, + tool_input: output.tool_input, + tool_result: None, + is_error: output.is_error, + token_count: None, + cost_usd: output.cost_usd.map(|c| c as f64), + }); + } + } + } + } + + Ok(messages) +} + +/// Get supervisor conversation (from supervisor_states) +pub async fn get_supervisor_conversation_full( + pool: &PgPool, + contract_id: Uuid, +) -> Result, sqlx::Error> { + get_supervisor_state(pool, contract_id).await +} diff --git a/makima/src/server/handlers/history.rs b/makima/src/server/handlers/history.rs new file mode 100644 index 0000000..b3dec97 --- /dev/null +++ b/makima/src/server/handlers/history.rs @@ -0,0 +1,438 @@ +//! HTTP handlers for history and conversation APIs. + +use axum::{ + extract::{Path, Query, State}, + http::StatusCode, + response::IntoResponse, + Json, +}; +use uuid::Uuid; + +use crate::{ + db::{ + models::{ + ContractHistoryResponse, ConversationMessage, HistoryQueryFilters, + SupervisorConversationResponse, TaskConversationResponse, TaskReference, + }, + repository, + }, + server::{auth::Authenticated, messages::ApiError, state::SharedState}, +}; + +/// Query parameters for task conversation +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TaskConversationParams { + pub include_tool_calls: Option, + pub include_tool_results: Option, + pub limit: Option, +} + +/// Query parameters for timeline +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TimelineQueryFilters { + pub contract_id: Option, + pub task_id: Option, + pub include_subtasks: Option, + pub from: Option>, + pub to: Option>, + pub limit: Option, +} + +/// GET /api/v1/contracts/{id}/history +/// Returns contract history timeline with filtering and pagination +#[utoipa::path( + get, + path = "/api/v1/contracts/{id}/history", + params( + ("id" = Uuid, Path, description = "Contract ID"), + ("phase" = Option, Query, description = "Filter by phase"), + ("event_types" = Option, Query, description = "Filter by event types (comma-separated)"), + ("from" = Option, Query, description = "Start date filter"), + ("to" = Option, Query, description = "End date filter"), + ("limit" = Option, Query, description = "Limit results"), + ), + responses( + (status = 200, description = "Contract history", body = ContractHistoryResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 403, description = "Forbidden", body = ApiError), + (status = 404, description = "Contract not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "History" +)] +pub async fn get_contract_history( + State(state): State, + Path(contract_id): Path, + Query(filters): Query, + Authenticated(auth): Authenticated, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify contract exists and user has access + let contract = match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", contract_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get history events + match repository::get_contract_history(pool, contract.id, auth.owner_id, &filters).await { + Ok((events, total_count)) => { + Json(ContractHistoryResponse { + contract_id, + entries: events, + total_count, + cursor: None, // TODO: implement cursor pagination + }) + .into_response() + } + Err(e) => { + tracing::error!("Failed to get contract history: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// GET /api/v1/contracts/{id}/supervisor/conversation +/// Returns full supervisor conversation with spawned task references +#[utoipa::path( + get, + path = "/api/v1/contracts/{id}/supervisor/conversation", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + responses( + (status = 200, description = "Supervisor conversation", body = SupervisorConversationResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 403, description = "Forbidden", body = ApiError), + (status = 404, description = "Supervisor not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "History" +)] +pub async fn get_supervisor_conversation( + State(state): State, + Path(contract_id): Path, + Authenticated(auth): Authenticated, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get contract for phase info and ownership check + let contract = match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", contract_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get the supervisor state + let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await { + Ok(Some(s)) => s, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Supervisor not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get supervisor state for {}: {}", contract_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Parse conversation history from JSONB + let messages: Vec = supervisor_state + .conversation_history + .as_array() + .map(|arr| { + arr.iter() + .enumerate() + .map(|(i, v)| ConversationMessage { + id: i.to_string(), + role: v + .get("role") + .and_then(|r| r.as_str()) + .unwrap_or("user") + .to_string(), + content: v + .get("content") + .and_then(|c| c.as_str()) + .unwrap_or("") + .to_string(), + timestamp: supervisor_state.last_activity, + tool_calls: None, + tool_name: None, + tool_input: None, + tool_result: None, + is_error: None, + token_count: None, + cost_usd: None, + }) + .collect() + }) + .unwrap_or_default(); + + // Get spawned tasks + let tasks = match repository::list_contract_tasks(pool, contract_id).await { + Ok(t) => t, + Err(e) => { + tracing::warn!("Failed to get tasks for contract {}: {}", contract_id, e); + Vec::new() + } + }; + + let spawned_tasks: Vec = tasks + .into_iter() + .filter(|t| !t.is_supervisor) + .map(|t| TaskReference { + task_id: t.id, + task_name: t.name, + status: t.status, + created_at: t.created_at, + completed_at: t.completed_at, + }) + .collect(); + + Json(SupervisorConversationResponse { + contract_id, + supervisor_task_id: supervisor_state.task_id, + phase: contract.phase, + last_activity: supervisor_state.last_activity, + pending_task_ids: supervisor_state.pending_task_ids, + messages, + spawned_tasks, + }) + .into_response() +} + +/// GET /api/v1/mesh/tasks/{id}/conversation +/// Returns task conversation history +#[utoipa::path( + get, + path = "/api/v1/mesh/tasks/{id}/conversation", + params( + ("id" = Uuid, Path, description = "Task ID"), + ("include_tool_calls" = Option, Query, description = "Include tool call messages"), + ("include_tool_results" = Option, Query, description = "Include tool result messages"), + ("limit" = Option, Query, description = "Limit messages"), + ), + responses( + (status = 200, description = "Task conversation", body = TaskConversationResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 403, description = "Forbidden", body = ApiError), + (status = 404, description = "Task not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "History" +)] +pub async fn get_task_conversation( + State(state): State, + Path(task_id): Path, + Query(params): Query, + Authenticated(auth): Authenticated, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get task and verify ownership + let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task {}: {}", task_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get conversation messages + let messages = match repository::get_task_conversation( + pool, + task_id, + params.include_tool_calls.unwrap_or(true), + params.include_tool_results.unwrap_or(true), + params.limit, + ) + .await + { + Ok(m) => m, + Err(e) => { + tracing::error!("Failed to get task conversation: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Calculate totals + let total_cost: f64 = messages.iter().filter_map(|m| m.cost_usd).sum(); + + Json(TaskConversationResponse { + task_id, + task_name: task.name, + status: task.status, + messages, + total_tokens: None, + total_cost: if total_cost > 0.0 { + Some(total_cost) + } else { + None + }, + }) + .into_response() +} + +/// GET /api/v1/timeline +/// Returns unified timeline for authenticated user +#[utoipa::path( + get, + path = "/api/v1/timeline", + params( + ("contract_id" = Option, Query, description = "Filter by contract"), + ("task_id" = Option, Query, description = "Filter by task"), + ("include_subtasks" = Option, Query, description = "Include subtask events"), + ("from" = Option, Query, description = "Start date filter"), + ("to" = Option, Query, description = "End date filter"), + ("limit" = Option, Query, description = "Limit results"), + ), + responses( + (status = 200, description = "Timeline events", body = ContractHistoryResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "History" +)] +pub async fn get_timeline( + State(state): State, + Query(filters): Query, + Authenticated(auth): Authenticated, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + let history_filters = HistoryQueryFilters { + phase: None, + event_types: None, + from: filters.from, + to: filters.to, + limit: filters.limit, + cursor: None, + }; + + let result = if let Some(contract_id) = filters.contract_id { + repository::get_contract_history(pool, contract_id, auth.owner_id, &history_filters).await + } else if let Some(task_id) = filters.task_id { + repository::get_task_history(pool, task_id, auth.owner_id, &history_filters).await + } else { + repository::get_timeline(pool, auth.owner_id, &history_filters).await + }; + + match result { + Ok((events, total_count)) => { + Json(ContractHistoryResponse { + contract_id: filters.contract_id.unwrap_or_default(), + entries: events, + total_count, + cursor: None, + }) + .into_response() + } + Err(e) => { + tracing::error!("Failed to get timeline: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index 3da6fd5..b5ade53 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -2459,3 +2459,667 @@ pub async fn continue_task( }) .into_response() } + +// ============================================================================= +// Task Rewind and Fork (Resume and History System) +// ============================================================================= + +/// Rewind task code to specified checkpoint. +/// +/// POST /api/v1/mesh/tasks/{id}/rewind +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/rewind", + params( + ("id" = Uuid, Path, description = "Task ID") + ), + request_body = crate::db::models::RewindTaskRequest, + responses( + (status = 200, description = "Task rewound successfully", body = crate::db::models::RewindTaskResponse), + (status = 400, description = "Invalid request", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task or checkpoint not found", body = ApiError), + (status = 409, description = "Cannot rewind a running task", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn rewind_task( + State(state): State, + Authenticated(auth): Authenticated, + Path(task_id): Path, + Json(req): Json, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get task and verify ownership + let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task {}: {}", task_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Task cannot be running during rewind + if task.status == "running" { + return ( + StatusCode::CONFLICT, + Json(ApiError::new("TASK_RUNNING", "Cannot rewind a running task")), + ) + .into_response(); + } + + // Get checkpoint info + let checkpoint = if let Some(checkpoint_id) = req.checkpoint_id { + match repository::get_task_checkpoint(pool, checkpoint_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Checkpoint not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get checkpoint: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + } else if let Some(ref sha) = req.checkpoint_sha { + match repository::get_task_checkpoint_by_sha(pool, sha).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Checkpoint not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get checkpoint by SHA: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + } else { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "MISSING_CHECKPOINT", + "Must provide checkpoint_id or checkpoint_sha", + )), + ) + .into_response(); + }; + + // Verify checkpoint belongs to this task + if checkpoint.task_id != task_id { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "CHECKPOINT_MISMATCH", + "Checkpoint does not belong to this task", + )), + ) + .into_response(); + } + + // TODO: Send rewind command to daemon when daemon integration is complete + // For now, return a success response with checkpoint info + + tracing::info!( + task_id = %task_id, + checkpoint_number = checkpoint.checkpoint_number, + commit_sha = %checkpoint.commit_sha, + "Task rewind requested" + ); + + Json(crate::db::models::RewindTaskResponse { + task_id, + rewinded_to: crate::db::models::CheckpointInfo { + checkpoint_number: checkpoint.checkpoint_number, + sha: checkpoint.commit_sha.clone(), + message: checkpoint.message, + }, + preserved_as: req.branch_name.map(|name| crate::db::models::PreservedState { + state_type: "branch".to_string(), + reference: name, + }), + }) + .into_response() +} + +/// Fork task from historical point. +/// +/// POST /api/v1/mesh/tasks/{id}/fork +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/fork", + params( + ("id" = Uuid, Path, description = "Task ID") + ), + request_body = crate::db::models::ForkTaskRequest, + responses( + (status = 201, description = "Task forked successfully", body = crate::db::models::ForkTaskResponse), + (status = 400, description = "Invalid request", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task or checkpoint not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn fork_task( + State(state): State, + Authenticated(auth): Authenticated, + Path(task_id): Path, + Json(req): Json, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get source task and verify ownership + let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task {}: {}", task_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Find the checkpoint to fork from + let checkpoint = match req.fork_from_type.as_str() { + "checkpoint" => { + // fork_from_value is checkpoint number + let checkpoint_num: i32 = match req.fork_from_value.parse() { + Ok(n) => n, + Err(_) => { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("INVALID_CHECKPOINT", "Invalid checkpoint number")), + ) + .into_response(); + } + }; + + let checkpoints = match repository::list_task_checkpoints(pool, task_id).await { + Ok(c) => c, + Err(e) => { + tracing::error!("Failed to list checkpoints: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + match checkpoints + .into_iter() + .find(|c| c.checkpoint_number == checkpoint_num) + { + Some(c) => c, + None => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Checkpoint not found")), + ) + .into_response(); + } + } + } + _ => { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "UNSUPPORTED_FORK_TYPE", + "Only 'checkpoint' fork type is currently supported", + )), + ) + .into_response(); + } + }; + + // Create the new forked task + let create_req = CreateTaskRequest { + contract_id: task.contract_id.unwrap_or(Uuid::nil()), + name: req.new_task_name.clone(), + description: task.description.clone(), + plan: req.new_task_plan.clone(), + parent_task_id: None, // Forked tasks are independent + is_supervisor: false, + priority: task.priority, + repository_url: task.repository_url.clone(), + base_branch: task.base_branch.clone(), + target_branch: None, // New branch for forked work + merge_mode: task.merge_mode.clone(), + target_repo_path: task.target_repo_path.clone(), + completion_action: task.completion_action.clone(), + continue_from_task_id: None, + copy_files: None, + checkpoint_sha: Some(checkpoint.commit_sha.clone()), + }; + + let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await { + Ok(t) => t, + Err(e) => { + tracing::error!("Failed to create forked task: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + tracing::info!( + source_task_id = %task_id, + new_task_id = %new_task.id, + checkpoint_number = checkpoint.checkpoint_number, + "Task forked from checkpoint" + ); + + ( + StatusCode::CREATED, + Json(crate::db::models::ForkTaskResponse { + new_task_id: new_task.id, + source_task_id: task_id, + fork_point: crate::db::models::ForkPoint { + fork_type: "checkpoint".to_string(), + checkpoint: Some(checkpoint.clone()), + timestamp: checkpoint.created_at, + }, + branch_name: req.branch_name, + conversation_included: req.include_conversation.unwrap_or(false), + message_count: None, + }), + ) + .into_response() +} + +/// Create new task starting from specific checkpoint. +/// +/// POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume", + params( + ("id" = Uuid, Path, description = "Task ID"), + ("cid" = Uuid, Path, description = "Checkpoint ID") + ), + request_body = crate::db::models::ResumeFromCheckpointRequest, + responses( + (status = 201, description = "Task created from checkpoint", body = Task), + (status = 400, description = "Invalid request", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task or checkpoint not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn resume_from_checkpoint( + State(state): State, + Authenticated(auth): Authenticated, + Path((task_id, checkpoint_id)): Path<(Uuid, Uuid)>, + Json(req): Json, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get source task and verify ownership + let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task {}: {}", task_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get checkpoint + let checkpoint = match repository::get_task_checkpoint(pool, checkpoint_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Checkpoint not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get checkpoint: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Verify checkpoint belongs to the task + if checkpoint.task_id != task_id { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "CHECKPOINT_MISMATCH", + "Checkpoint does not belong to this task", + )), + ) + .into_response(); + } + + // Create the new task that will start from checkpoint + let task_name = req.task_name.unwrap_or_else(|| { + format!( + "{} (resumed from checkpoint {})", + task.name, checkpoint.checkpoint_number + ) + }); + + let create_req = CreateTaskRequest { + contract_id: task.contract_id.unwrap_or(Uuid::nil()), + name: task_name, + description: task.description.clone(), + plan: req.plan, + parent_task_id: None, + is_supervisor: false, + priority: task.priority, + repository_url: task.repository_url.clone(), + base_branch: task.base_branch.clone(), + target_branch: None, // New branch for resumed work + merge_mode: task.merge_mode.clone(), + target_repo_path: task.target_repo_path.clone(), + completion_action: task.completion_action.clone(), + continue_from_task_id: Some(task_id), // Copy worktree from original task + copy_files: None, + checkpoint_sha: Some(checkpoint.commit_sha.clone()), + }; + + let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await { + Ok(t) => t, + Err(e) => { + tracing::error!("Failed to create resumed task: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + tracing::info!( + source_task_id = %task_id, + new_task_id = %new_task.id, + checkpoint_id = %checkpoint_id, + checkpoint_number = checkpoint.checkpoint_number, + "Task resumed from checkpoint" + ); + + (StatusCode::CREATED, Json(new_task)).into_response() +} + +/// Request to create branch from checkpoint. +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreateBranchFromCheckpointRequest { + pub branch_name: String, + #[serde(default)] + pub checkout: bool, +} + +/// Response for branch creation from checkpoint. +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct BranchCreatedResponse { + pub branch_name: String, + pub commit_sha: String, + pub task_id: Uuid, + pub checkpoint_number: i32, +} + +/// Create git branch from checkpoint without starting task. +/// +/// POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch", + params( + ("id" = Uuid, Path, description = "Task ID"), + ("cid" = Uuid, Path, description = "Checkpoint ID") + ), + request_body = CreateBranchFromCheckpointRequest, + responses( + (status = 201, description = "Branch created", body = BranchCreatedResponse), + (status = 400, description = "Invalid request", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task or checkpoint not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn branch_from_checkpoint( + State(state): State, + Authenticated(auth): Authenticated, + Path((task_id, checkpoint_id)): Path<(Uuid, Uuid)>, + Json(req): Json, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get task and verify ownership + let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task {}: {}", task_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get checkpoint + let checkpoint = match repository::get_task_checkpoint(pool, checkpoint_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Checkpoint not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get checkpoint: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Verify checkpoint belongs to the task + if checkpoint.task_id != task_id { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "CHECKPOINT_MISMATCH", + "Checkpoint does not belong to this task", + )), + ) + .into_response(); + } + + // Find a daemon to execute the branch creation + let target_daemon_id = if let Some(daemon_id) = task.daemon_id { + // Check if the original daemon is still connected + if state + .daemon_connections + .iter() + .any(|d| d.value().id == daemon_id && d.value().owner_id == auth.owner_id) + { + daemon_id + } else { + // Find any connected daemon for this owner + match state + .daemon_connections + .iter() + .find(|d| d.value().owner_id == auth.owner_id) + { + Some(d) => d.value().id, + None => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new( + "NO_DAEMON", + "No daemon connected to create branch", + )), + ) + .into_response(); + } + } + } + } else { + // No daemon assigned - use any available for this owner + match state + .daemon_connections + .iter() + .find(|d| d.value().owner_id == auth.owner_id) + { + Some(d) => d.value().id, + None => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new( + "NO_DAEMON", + "No daemon connected to create branch", + )), + ) + .into_response(); + } + } + }; + + // Send CreateBranch command to daemon + let cmd = DaemonCommand::CreateBranch { + task_id, + branch_name: req.branch_name.clone(), + from_ref: Some(checkpoint.commit_sha.clone()), + }; + + if let Err(e) = state.send_daemon_command(target_daemon_id, cmd).await { + tracing::error!("Failed to send CreateBranch command: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DAEMON_ERROR", e)), + ) + .into_response(); + } + + tracing::info!( + task_id = %task_id, + checkpoint_id = %checkpoint_id, + branch_name = %req.branch_name, + commit_sha = %checkpoint.commit_sha, + "Branch creation requested from checkpoint" + ); + + ( + StatusCode::CREATED, + Json(BranchCreatedResponse { + branch_name: req.branch_name, + commit_sha: checkpoint.commit_sha, + task_id, + checkpoint_number: checkpoint.checkpoint_number, + }), + ) + .into_response() +} diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index 278d0f5..b45dda5 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -1504,3 +1504,384 @@ pub async fn answer_question( Json(AnswerQuestionResponse { success }).into_response() } + +// ============================================================================= +// Supervisor Resume and Conversation Rewind +// ============================================================================= + +/// Response for supervisor resume +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ResumeSupervisorResponse { + pub supervisor_task_id: Uuid, + pub daemon_id: Option, + pub resumed_from: ResumedFromInfo, + pub status: String, +} + +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ResumedFromInfo { + pub phase: String, + pub last_activity: chrono::DateTime, + pub message_count: i32, +} + +/// Resume interrupted supervisor with specified mode. +/// +/// POST /api/v1/contracts/{id}/supervisor/resume +#[utoipa::path( + post, + path = "/api/v1/contracts/{id}/supervisor/resume", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + request_body = crate::db::models::ResumeSupervisorRequest, + responses( + (status = 200, description = "Supervisor resumed", body = ResumeSupervisorResponse), + (status = 400, description = "Invalid request", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract or supervisor not found", body = ApiError), + (status = 409, description = "Supervisor is already running", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh Supervisor" +)] +pub async fn resume_supervisor( + State(state): State, + Path(contract_id): Path, + auth: crate::server::auth::Authenticated, + Json(req): Json, +) -> impl IntoResponse { + let crate::server::auth::Authenticated(auth_info) = auth; + + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get contract and verify ownership + let contract = match repository::get_contract_for_owner(pool, contract_id, auth_info.owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", contract_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get existing supervisor state + let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await { + Ok(Some(s)) => s, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new( + "NO_SUPERVISOR_STATE", + "No supervisor state found - supervisor may not have been started", + )), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get supervisor state: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get supervisor task + let supervisor_task = match repository::get_task_for_owner(pool, supervisor_state.task_id, auth_info.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Supervisor task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get supervisor task: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Check if already running + if supervisor_task.status == "running" { + return ( + StatusCode::CONFLICT, + Json(ApiError::new("ALREADY_RUNNING", "Supervisor is already running")), + ) + .into_response(); + } + + // Calculate message count from conversation history + let message_count = supervisor_state + .conversation_history + .as_array() + .map(|arr| arr.len() as i32) + .unwrap_or(0); + + // Based on resume mode, handle differently + match req.resume_mode.as_str() { + "continue" => { + // Mark task for reassignment with existing conversation context + if let Err(e) = sqlx::query("UPDATE tasks SET status = 'pending' WHERE id = $1") + .bind(supervisor_state.task_id) + .execute(pool) + .await + { + tracing::error!("Failed to update task status: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + "restart_phase" => { + // Clear conversation but keep phase progress + if let Err(e) = repository::update_supervisor_conversation( + pool, + contract_id, + serde_json::json!([]), + ) + .await + { + tracing::error!("Failed to clear conversation: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + + if let Err(e) = sqlx::query("UPDATE tasks SET status = 'pending' WHERE id = $1") + .bind(supervisor_state.task_id) + .execute(pool) + .await + { + tracing::error!("Failed to update task status: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + "from_checkpoint" => { + // This would require more complex handling with checkpoint system + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "NOT_IMPLEMENTED", + "from_checkpoint mode not yet implemented", + )), + ) + .into_response(); + } + _ => { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "INVALID_RESUME_MODE", + "Invalid resume_mode. Use: continue, restart_phase, or from_checkpoint", + )), + ) + .into_response(); + } + } + + tracing::info!( + contract_id = %contract_id, + supervisor_task_id = %supervisor_state.task_id, + resume_mode = %req.resume_mode, + message_count = message_count, + "Supervisor resume requested" + ); + + Json(ResumeSupervisorResponse { + supervisor_task_id: supervisor_state.task_id, + daemon_id: supervisor_task.daemon_id, + resumed_from: ResumedFromInfo { + phase: contract.phase, + last_activity: supervisor_state.last_activity, + message_count, + }, + status: "pending".to_string(), + }) + .into_response() +} + +/// Response for conversation rewind +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct RewindConversationResponse { + pub contract_id: Uuid, + pub messages_removed: i32, + pub new_message_count: i32, + pub code_rewound: bool, +} + +/// Rewind supervisor conversation to specified point. +/// +/// POST /api/v1/contracts/{id}/supervisor/conversation/rewind +#[utoipa::path( + post, + path = "/api/v1/contracts/{id}/supervisor/conversation/rewind", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + request_body = crate::db::models::RewindConversationRequest, + responses( + (status = 200, description = "Conversation rewound", body = RewindConversationResponse), + (status = 400, description = "Invalid request", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract or supervisor not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh Supervisor" +)] +pub async fn rewind_conversation( + State(state): State, + Path(contract_id): Path, + auth: crate::server::auth::Authenticated, + Json(req): Json, +) -> impl IntoResponse { + let crate::server::auth::Authenticated(auth_info) = auth; + + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get contract and verify ownership + let _contract = match repository::get_contract_for_owner(pool, contract_id, auth_info.owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", contract_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get supervisor state + let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await { + Ok(Some(s)) => s, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Supervisor state not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get supervisor state: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + let conversation = supervisor_state + .conversation_history + .as_array() + .cloned() + .unwrap_or_default(); + + let original_count = conversation.len() as i32; + + // Determine how many messages to keep + let new_count = if let Some(by_count) = req.by_message_count { + (original_count - by_count).max(0) + } else if let Some(to_index) = req.to_message_index { + // Keep messages up to and including the specified index + (to_index + 1).min(original_count).max(0) + } else { + // Default to removing last message + (original_count - 1).max(0) + }; + + // Truncate conversation + let new_conversation: Vec = conversation + .into_iter() + .take(new_count as usize) + .collect(); + + // Update the conversation + if let Err(e) = repository::update_supervisor_conversation( + pool, + contract_id, + serde_json::Value::Array(new_conversation), + ) + .await + { + tracing::error!("Failed to update conversation: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + + tracing::info!( + contract_id = %contract_id, + original_count = original_count, + new_count = new_count, + messages_removed = original_count - new_count, + "Conversation rewound" + ); + + Json(RewindConversationResponse { + contract_id, + messages_removed: original_count - new_count, + new_message_count: new_count, + code_rewound: req.rewind_code.unwrap_or(false), // TODO: implement code rewind + }) + .into_response() +} diff --git a/makima/src/server/handlers/mod.rs b/makima/src/server/handlers/mod.rs index b5650fd..609b63b 100644 --- a/makima/src/server/handlers/mod.rs +++ b/makima/src/server/handlers/mod.rs @@ -7,6 +7,7 @@ pub mod contract_daemon; pub mod contracts; pub mod file_ws; pub mod files; +pub mod history; pub mod listen; pub mod mesh; pub mod mesh_chat; diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index 0eba009..cf63f71 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -105,6 +105,11 @@ pub fn make_router(state: SharedState) -> Router { // Checkpoint endpoints .route("/mesh/tasks/{id}/checkpoint", post(mesh_supervisor::create_checkpoint)) .route("/mesh/tasks/{id}/checkpoints", get(mesh_supervisor::list_checkpoints)) + // Resume and rewind endpoints + .route("/mesh/tasks/{id}/rewind", post(mesh::rewind_task)) + .route("/mesh/tasks/{id}/fork", post(mesh::fork_task)) + .route("/mesh/tasks/{id}/checkpoints/{cid}/resume", post(mesh::resume_from_checkpoint)) + .route("/mesh/tasks/{id}/checkpoints/{cid}/branch", post(mesh::branch_from_checkpoint)) // Supervisor endpoints (for supervisor.sh) .route("/mesh/supervisor/contracts/{contract_id}/tasks", get(mesh_supervisor::list_contract_tasks)) .route("/mesh/supervisor/contracts/{contract_id}/tree", get(mesh_supervisor::get_contract_tree)) @@ -156,6 +161,9 @@ pub fn make_router(state: SharedState) -> Router { "/contracts/{id}/chat/history", get(contract_chat::get_contract_chat_history).delete(contract_chat::clear_contract_chat_history), ) + // Contract supervisor resume endpoints + .route("/contracts/{id}/supervisor/resume", post(mesh_supervisor::resume_supervisor)) + .route("/contracts/{id}/supervisor/conversation/rewind", post(mesh_supervisor::rewind_conversation)) // Contract daemon endpoints (for tasks to interact with contracts) .route("/contracts/{id}/daemon/status", get(contract_daemon::get_contract_status)) .route("/contracts/{id}/daemon/checklist", get(contract_daemon::get_contract_checklist)) -- cgit v1.2.3