summaryrefslogtreecommitdiff
path: root/makima/src
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src')
-rw-r--r--makima/src/daemon/cli/mod.rs21
-rw-r--r--makima/src/daemon/cli/supervisor.rs161
-rw-r--r--makima/src/db/models.rs233
-rw-r--r--makima/src/db/repository.rs360
-rw-r--r--makima/src/server/handlers/history.rs438
-rw-r--r--makima/src/server/handlers/mesh.rs664
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs381
-rw-r--r--makima/src/server/handlers/mod.rs1
-rw-r--r--makima/src/server/mod.rs8
9 files changed, 2263 insertions, 4 deletions
diff --git a/makima/src/daemon/cli/mod.rs b/makima/src/daemon/cli/mod.rs
index da71b0d..cde6e16 100644
--- a/makima/src/daemon/cli/mod.rs
+++ b/makima/src/daemon/cli/mod.rs
@@ -88,6 +88,27 @@ pub enum SupervisorCommand {
/// Get task output/claude log
Output(supervisor::GetTaskOutputArgs),
+
+ /// View task conversation history
+ TaskHistory(supervisor::TaskHistoryArgs),
+
+ /// List task checkpoints (with optional diff)
+ TaskCheckpoints(supervisor::TaskCheckpointsArgs),
+
+ /// Resume supervisor after interruption
+ Resume(supervisor::ResumeArgs),
+
+ /// Resume task from checkpoint
+ TaskResumeFrom(supervisor::TaskResumeFromArgs),
+
+ /// Rewind task code to checkpoint
+ TaskRewind(supervisor::TaskRewindArgs),
+
+ /// Fork task from historical point
+ TaskFork(supervisor::TaskForkArgs),
+
+ /// Rewind supervisor conversation
+ RewindConversation(supervisor::ConversationRewindArgs),
}
/// Contract subcommands for task-contract interaction.
diff --git a/makima/src/daemon/cli/supervisor.rs b/makima/src/daemon/cli/supervisor.rs
index 2bc4c89..ba4fb2b 100644
--- a/makima/src/daemon/cli/supervisor.rs
+++ b/makima/src/daemon/cli/supervisor.rs
@@ -221,3 +221,164 @@ pub struct GetTaskOutputArgs {
#[arg(index = 1, id = "target_task_id")]
pub target_task_id: Uuid,
}
+
+// ============================================================================
+// History Command Args
+// ============================================================================
+
+/// Arguments for task-history command.
+#[derive(Args, Debug)]
+pub struct TaskHistoryArgs {
+ #[command(flatten)]
+ pub common: SupervisorArgs,
+
+ /// Task ID to view history for
+ #[arg(index = 1)]
+ pub task_id: Uuid,
+
+ /// Include tool calls in output
+ #[arg(long, default_value = "true")]
+ pub tool_calls: bool,
+
+ /// Maximum messages to return
+ #[arg(long)]
+ pub limit: Option<i32>,
+
+ /// Output format (table, json, chat)
+ #[arg(long, default_value = "chat")]
+ pub format: String,
+}
+
+/// Arguments for task-checkpoints command (with optional diff).
+#[derive(Args, Debug)]
+pub struct TaskCheckpointsArgs {
+ #[command(flatten)]
+ pub common: SupervisorArgs,
+
+ /// Task ID to list checkpoints for
+ #[arg(index = 1)]
+ pub task_id: Uuid,
+
+ /// Include diff summary
+ #[arg(long)]
+ pub with_diff: bool,
+}
+
+// ============================================================================
+// Resume Command Args
+// ============================================================================
+
+/// Arguments for resume command.
+#[derive(Args, Debug)]
+pub struct ResumeArgs {
+ #[command(flatten)]
+ pub common: SupervisorArgs,
+
+ /// Resume mode: continue, restart_phase, from_checkpoint
+ #[arg(long, default_value = "continue")]
+ pub mode: String,
+
+ /// Checkpoint ID (required for from_checkpoint mode)
+ #[arg(long)]
+ pub checkpoint: Option<Uuid>,
+
+ /// Additional context to inject
+ #[arg(long)]
+ pub context: Option<String>,
+}
+
+/// Arguments for task-resume-from command.
+#[derive(Args, Debug)]
+pub struct TaskResumeFromArgs {
+ #[command(flatten)]
+ pub common: SupervisorArgs,
+
+ /// Source task ID
+ #[arg(index = 1)]
+ pub task_id: Uuid,
+
+ /// Checkpoint number to resume from
+ #[arg(long)]
+ pub checkpoint: i32,
+
+ /// Plan for the new task
+ #[arg(long)]
+ pub plan: String,
+
+ /// Name for the new task
+ #[arg(long)]
+ pub name: Option<String>,
+}
+
+// ============================================================================
+// Rewind Command Args
+// ============================================================================
+
+/// Arguments for task-rewind command.
+#[derive(Args, Debug)]
+pub struct TaskRewindArgs {
+ #[command(flatten)]
+ pub common: SupervisorArgs,
+
+ /// Task ID to rewind
+ #[arg(index = 1)]
+ pub task_id: Uuid,
+
+ /// Checkpoint number to rewind to
+ #[arg(long)]
+ pub checkpoint: i32,
+
+ /// Preserve mode: discard, create_branch, stash
+ #[arg(long, default_value = "create_branch")]
+ pub preserve: String,
+
+ /// Branch name (for create_branch mode)
+ #[arg(long)]
+ pub branch_name: Option<String>,
+}
+
+/// Arguments for task-fork command.
+#[derive(Args, Debug)]
+pub struct TaskForkArgs {
+ #[command(flatten)]
+ pub common: SupervisorArgs,
+
+ /// Source task ID
+ #[arg(index = 1)]
+ pub task_id: Uuid,
+
+ /// Checkpoint number to fork from
+ #[arg(long)]
+ pub checkpoint: i32,
+
+ /// Name for the new task
+ #[arg(long)]
+ pub name: String,
+
+ /// Plan for the new task
+ #[arg(long)]
+ pub plan: String,
+
+ /// Include conversation history
+ #[arg(long, default_value = "true")]
+ pub include_conversation: bool,
+}
+
+/// Arguments for rewind-conversation command.
+#[derive(Args, Debug)]
+pub struct ConversationRewindArgs {
+ #[command(flatten)]
+ pub common: SupervisorArgs,
+
+ /// Number of messages to rewind
+ #[arg(long)]
+ pub by_messages: Option<i32>,
+
+ /// Message ID to rewind to
+ #[arg(long)]
+ pub to_message: Option<String>,
+
+ /// Also rewind code to matching checkpoint
+ #[arg(long)]
+ pub rewind_code: bool,
+}
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs
index 40d4109..4419580 100644
--- a/makima/src/db/models.rs
+++ b/makima/src/db/models.rs
@@ -1559,3 +1559,236 @@ pub struct RepositorySuggestionsQuery {
/// Limit results (default: 10)
pub limit: Option<i32>,
}
+
+// =============================================================================
+// Resume and History System Types
+// =============================================================================
+
+/// Conversation snapshot for task resumption
+#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ConversationSnapshot {
+ pub id: Uuid,
+ pub task_id: Uuid,
+ pub checkpoint_id: Option<Uuid>,
+ /// Snapshot type: 'auto', 'manual', 'checkpoint'
+ pub snapshot_type: String,
+ pub message_count: i32,
+ #[sqlx(json)]
+ pub conversation_state: serde_json::Value,
+ #[sqlx(json)]
+ pub metadata: Option<serde_json::Value>,
+ pub created_at: DateTime<Utc>,
+}
+
+/// History event for contract/task history tracking
+#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct HistoryEvent {
+ pub id: Uuid,
+ pub owner_id: Uuid,
+ pub contract_id: Option<Uuid>,
+ pub task_id: Option<Uuid>,
+ pub event_type: String,
+ pub event_subtype: Option<String>,
+ pub phase: Option<String>,
+ #[sqlx(json)]
+ pub event_data: serde_json::Value,
+ pub created_at: DateTime<Utc>,
+}
+
+/// Unified conversation message for API responses
+#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ConversationMessage {
+ pub id: String,
+ /// Message role: 'user', 'assistant', 'system', 'tool'
+ pub role: String,
+ pub content: String,
+ pub timestamp: DateTime<Utc>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub tool_calls: Option<Vec<ToolCallInfo>>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub tool_name: Option<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub tool_input: Option<serde_json::Value>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub tool_result: Option<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub is_error: Option<bool>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub token_count: Option<i32>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub cost_usd: Option<f64>,
+}
+
+/// Tool call information within a conversation message
+#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ToolCallInfo {
+ pub id: String,
+ pub name: String,
+ pub input: serde_json::Value,
+}
+
+/// Query filters for history endpoints
+#[derive(Debug, Deserialize, ToSchema, Default)]
+#[serde(rename_all = "camelCase")]
+pub struct HistoryQueryFilters {
+ pub phase: Option<String>,
+ pub event_types: Option<Vec<String>>,
+ pub from: Option<DateTime<Utc>>,
+ pub to: Option<DateTime<Utc>>,
+ pub limit: Option<i32>,
+ pub cursor: Option<String>,
+}
+
+/// Request to resume a supervisor
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ResumeSupervisorRequest {
+ pub target_daemon_id: Option<Uuid>,
+ /// Resume mode: 'continue', 'restart_phase', 'from_checkpoint'
+ pub resume_mode: String,
+ pub checkpoint_id: Option<Uuid>,
+ pub additional_context: Option<String>,
+}
+
+/// Request to resume from a checkpoint
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ResumeFromCheckpointRequest {
+ pub task_name: Option<String>,
+ pub plan: String,
+ pub include_conversation: Option<bool>,
+ pub target_daemon_id: Option<Uuid>,
+}
+
+/// Request to rewind a task to a checkpoint
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct RewindTaskRequest {
+ pub checkpoint_id: Option<Uuid>,
+ pub checkpoint_sha: Option<String>,
+ /// Preserve mode: 'discard', 'create_branch', 'stash'
+ pub preserve_mode: String,
+ pub branch_name: Option<String>,
+}
+
+/// Request to rewind a conversation
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct RewindConversationRequest {
+ pub to_message_id: Option<String>,
+ pub to_timestamp: Option<DateTime<Utc>>,
+ pub by_message_count: Option<i32>,
+ pub rewind_code: Option<bool>,
+}
+
+/// Request to fork a task
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ForkTaskRequest {
+ /// Fork from type: 'checkpoint', 'timestamp', 'message_id'
+ pub fork_from_type: String,
+ pub fork_from_value: String,
+ pub new_task_name: String,
+ pub new_task_plan: String,
+ pub include_conversation: Option<bool>,
+ pub create_branch: Option<bool>,
+ pub branch_name: Option<String>,
+}
+
+/// Response for contract history endpoint
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ContractHistoryResponse {
+ pub contract_id: Uuid,
+ pub entries: Vec<HistoryEvent>,
+ pub total_count: i64,
+ pub cursor: Option<String>,
+}
+
+/// Response for task conversation endpoint
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct TaskConversationResponse {
+ pub task_id: Uuid,
+ pub task_name: String,
+ pub status: String,
+ pub messages: Vec<ConversationMessage>,
+ pub total_tokens: Option<i32>,
+ pub total_cost: Option<f64>,
+}
+
+/// Response for supervisor conversation endpoint
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct SupervisorConversationResponse {
+ pub contract_id: Uuid,
+ pub supervisor_task_id: Uuid,
+ pub phase: String,
+ pub last_activity: DateTime<Utc>,
+ pub pending_task_ids: Vec<Uuid>,
+ pub messages: Vec<ConversationMessage>,
+ pub spawned_tasks: Vec<TaskReference>,
+}
+
+/// Reference to a task for history/conversation responses
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct TaskReference {
+ pub task_id: Uuid,
+ pub task_name: String,
+ pub status: String,
+ pub created_at: DateTime<Utc>,
+ pub completed_at: Option<DateTime<Utc>>,
+}
+
+/// Response for task rewind operation
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct RewindTaskResponse {
+ pub task_id: Uuid,
+ pub rewinded_to: CheckpointInfo,
+ pub preserved_as: Option<PreservedState>,
+}
+
+/// Checkpoint information in rewind response
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct CheckpointInfo {
+ pub checkpoint_number: i32,
+ pub sha: String,
+ pub message: String,
+}
+
+/// Preserved state information in rewind response
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct PreservedState {
+ /// State type: 'branch' or 'stash'
+ pub state_type: String,
+ pub reference: String,
+}
+
+/// Response for task fork operation
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ForkTaskResponse {
+ pub new_task_id: Uuid,
+ pub source_task_id: Uuid,
+ pub fork_point: ForkPoint,
+ pub branch_name: Option<String>,
+ pub conversation_included: bool,
+ pub message_count: Option<i32>,
+}
+
+/// Fork point information in fork response
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ForkPoint {
+ pub fork_type: String,
+ pub checkpoint: Option<TaskCheckpoint>,
+ pub timestamp: DateTime<Utc>,
+}
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index 92b2048..cb9d52f 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -1,16 +1,17 @@
//! Repository pattern for file database operations.
use chrono::Utc;
+use serde::Deserialize;
use sqlx::PgPool;
use uuid::Uuid;
use super::models::{
Contract, ContractChatConversation, ContractChatMessageRecord, ContractEvent, ContractRepository,
- ContractSummary, CreateContractRequest, CreateFileRequest,
+ ContractSummary, ConversationMessage, ConversationSnapshot, CreateContractRequest, CreateFileRequest,
CreateTaskRequest, Daemon, DaemonTaskAssignment, DaemonWithCapacity, File, FileSummary,
- FileVersion, MeshChatConversation, MeshChatMessageRecord, SupervisorState, Task, TaskCheckpoint,
- TaskEvent, TaskSummary, UpdateContractRequest, UpdateFileRequest,
- UpdateTaskRequest,
+ FileVersion, HistoryEvent, HistoryQueryFilters, MeshChatConversation, MeshChatMessageRecord,
+ SupervisorState, Task, TaskCheckpoint, TaskEvent, TaskSummary, UpdateContractRequest,
+ UpdateFileRequest, UpdateTaskRequest,
};
/// Repository error types.
@@ -3203,3 +3204,354 @@ pub async fn delete_repository_history(
Ok(result.rows_affected() > 0)
}
+
+// ============================================================================
+// Conversation Snapshots
+// ============================================================================
+
+/// Create a new conversation snapshot
+pub async fn create_conversation_snapshot(
+ pool: &PgPool,
+ task_id: Uuid,
+ checkpoint_id: Option<Uuid>,
+ snapshot_type: &str,
+ message_count: i32,
+ conversation_state: serde_json::Value,
+ metadata: Option<serde_json::Value>,
+) -> Result<ConversationSnapshot, sqlx::Error> {
+ sqlx::query_as::<_, ConversationSnapshot>(
+ r#"
+ INSERT INTO conversation_snapshots (task_id, checkpoint_id, snapshot_type, message_count, conversation_state, metadata)
+ VALUES ($1, $2, $3, $4, $5, $6)
+ RETURNING *
+ "#
+ )
+ .bind(task_id)
+ .bind(checkpoint_id)
+ .bind(snapshot_type)
+ .bind(message_count)
+ .bind(conversation_state)
+ .bind(metadata)
+ .fetch_one(pool)
+ .await
+}
+
+/// Get a conversation snapshot by ID
+pub async fn get_conversation_snapshot(
+ pool: &PgPool,
+ id: Uuid,
+) -> Result<Option<ConversationSnapshot>, sqlx::Error> {
+ sqlx::query_as::<_, ConversationSnapshot>(
+ "SELECT * FROM conversation_snapshots WHERE id = $1"
+ )
+ .bind(id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Get conversation snapshot at a specific checkpoint
+pub async fn get_conversation_at_checkpoint(
+ pool: &PgPool,
+ checkpoint_id: Uuid,
+) -> Result<Option<ConversationSnapshot>, sqlx::Error> {
+ sqlx::query_as::<_, ConversationSnapshot>(
+ "SELECT * FROM conversation_snapshots WHERE checkpoint_id = $1 ORDER BY created_at DESC LIMIT 1"
+ )
+ .bind(checkpoint_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// List conversation snapshots for a task
+pub async fn list_conversation_snapshots(
+ pool: &PgPool,
+ task_id: Uuid,
+ limit: Option<i32>,
+) -> Result<Vec<ConversationSnapshot>, sqlx::Error> {
+ let limit = limit.unwrap_or(100);
+ sqlx::query_as::<_, ConversationSnapshot>(
+ "SELECT * FROM conversation_snapshots WHERE task_id = $1 ORDER BY created_at DESC LIMIT $2"
+ )
+ .bind(task_id)
+ .bind(limit)
+ .fetch_all(pool)
+ .await
+}
+
+/// Delete conversation snapshots older than retention period
+pub async fn cleanup_old_snapshots(
+ pool: &PgPool,
+ retention_days: i32,
+) -> Result<u64, sqlx::Error> {
+ let result = sqlx::query(
+ "DELETE FROM conversation_snapshots WHERE created_at < NOW() - INTERVAL '1 day' * $1"
+ )
+ .bind(retention_days)
+ .execute(pool)
+ .await?;
+ Ok(result.rows_affected())
+}
+
+// ============================================================================
+// History Events
+// ============================================================================
+
+/// Record a new history event
+#[allow(clippy::too_many_arguments)]
+pub async fn record_history_event(
+ pool: &PgPool,
+ owner_id: Uuid,
+ contract_id: Option<Uuid>,
+ task_id: Option<Uuid>,
+ event_type: &str,
+ event_subtype: Option<&str>,
+ phase: Option<&str>,
+ event_data: serde_json::Value,
+) -> Result<HistoryEvent, sqlx::Error> {
+ sqlx::query_as::<_, HistoryEvent>(
+ r#"
+ INSERT INTO history_events (owner_id, contract_id, task_id, event_type, event_subtype, phase, event_data)
+ VALUES ($1, $2, $3, $4, $5, $6, $7)
+ RETURNING *
+ "#
+ )
+ .bind(owner_id)
+ .bind(contract_id)
+ .bind(task_id)
+ .bind(event_type)
+ .bind(event_subtype)
+ .bind(phase)
+ .bind(event_data)
+ .fetch_one(pool)
+ .await
+}
+
+/// Get contract history timeline
+pub async fn get_contract_history(
+ pool: &PgPool,
+ contract_id: Uuid,
+ owner_id: Uuid,
+ filters: &HistoryQueryFilters,
+) -> Result<(Vec<HistoryEvent>, i64), sqlx::Error> {
+ let limit = filters.limit.unwrap_or(100);
+
+ let mut query = String::from(
+ "SELECT * FROM history_events WHERE contract_id = $1 AND owner_id = $2"
+ );
+ let mut count_query = String::from(
+ "SELECT COUNT(*) FROM history_events WHERE contract_id = $1 AND owner_id = $2"
+ );
+
+ let mut param_count = 2;
+
+ if filters.phase.is_some() {
+ param_count += 1;
+ query.push_str(&format!(" AND phase = ${}" , param_count));
+ count_query.push_str(&format!(" AND phase = ${}", param_count));
+ }
+
+ if filters.from.is_some() {
+ param_count += 1;
+ query.push_str(&format!(" AND created_at >= ${}", param_count));
+ count_query.push_str(&format!(" AND created_at >= ${}", param_count));
+ }
+
+ if filters.to.is_some() {
+ param_count += 1;
+ query.push_str(&format!(" AND created_at <= ${}", param_count));
+ count_query.push_str(&format!(" AND created_at <= ${}", param_count));
+ }
+
+ query.push_str(" ORDER BY created_at DESC");
+ query.push_str(&format!(" LIMIT {}", limit));
+
+ // Build and execute the query dynamically
+ let mut q = sqlx::query_as::<_, HistoryEvent>(&query)
+ .bind(contract_id)
+ .bind(owner_id);
+
+ if let Some(ref phase) = filters.phase {
+ q = q.bind(phase);
+ }
+ if let Some(ref from) = filters.from {
+ q = q.bind(from);
+ }
+ if let Some(ref to) = filters.to {
+ q = q.bind(to);
+ }
+
+ let events = q.fetch_all(pool).await?;
+
+ // Get total count
+ let mut cq = sqlx::query_scalar::<_, i64>(&count_query)
+ .bind(contract_id)
+ .bind(owner_id);
+
+ if let Some(ref phase) = filters.phase {
+ cq = cq.bind(phase);
+ }
+ if let Some(ref from) = filters.from {
+ cq = cq.bind(from);
+ }
+ if let Some(ref to) = filters.to {
+ cq = cq.bind(to);
+ }
+
+ let count = cq.fetch_one(pool).await?;
+
+ Ok((events, count))
+}
+
+/// Get task history
+pub async fn get_task_history(
+ pool: &PgPool,
+ task_id: Uuid,
+ owner_id: Uuid,
+ filters: &HistoryQueryFilters,
+) -> Result<(Vec<HistoryEvent>, i64), sqlx::Error> {
+ let limit = filters.limit.unwrap_or(100);
+
+ let events = sqlx::query_as::<_, HistoryEvent>(
+ r#"
+ SELECT * FROM history_events
+ WHERE task_id = $1 AND owner_id = $2
+ ORDER BY created_at DESC
+ LIMIT $3
+ "#
+ )
+ .bind(task_id)
+ .bind(owner_id)
+ .bind(limit)
+ .fetch_all(pool)
+ .await?;
+
+ let count: i64 = sqlx::query_scalar(
+ "SELECT COUNT(*) FROM history_events WHERE task_id = $1 AND owner_id = $2"
+ )
+ .bind(task_id)
+ .bind(owner_id)
+ .fetch_one(pool)
+ .await?;
+
+ Ok((events, count))
+}
+
+/// Get unified timeline for an owner
+pub async fn get_timeline(
+ pool: &PgPool,
+ owner_id: Uuid,
+ filters: &HistoryQueryFilters,
+) -> Result<(Vec<HistoryEvent>, i64), sqlx::Error> {
+ let limit = filters.limit.unwrap_or(100);
+
+ let events = sqlx::query_as::<_, HistoryEvent>(
+ r#"
+ SELECT * FROM history_events
+ WHERE owner_id = $1
+ ORDER BY created_at DESC
+ LIMIT $2
+ "#
+ )
+ .bind(owner_id)
+ .bind(limit)
+ .fetch_all(pool)
+ .await?;
+
+ let count: i64 = sqlx::query_scalar(
+ "SELECT COUNT(*) FROM history_events WHERE owner_id = $1"
+ )
+ .bind(owner_id)
+ .fetch_one(pool)
+ .await?;
+
+ Ok((events, count))
+}
+
+// ============================================================================
+// Task Conversation Retrieval
+// ============================================================================
+
+// Helper struct for parsing task output events
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct TaskOutputEvent {
+ message_type: String,
+ content: Option<String>,
+ tool_name: Option<String>,
+ tool_input: Option<serde_json::Value>,
+ is_error: Option<bool>,
+ cost_usd: Option<f32>,
+}
+
+/// Get task conversation messages (reconstructed from task_events)
+pub async fn get_task_conversation(
+ pool: &PgPool,
+ task_id: Uuid,
+ include_tool_calls: bool,
+ include_tool_results: bool,
+ limit: Option<i32>,
+) -> Result<Vec<ConversationMessage>, sqlx::Error> {
+ let limit = limit.unwrap_or(1000);
+
+ // Get output events that represent conversation turns
+ let events = sqlx::query_as::<_, TaskEvent>(
+ r#"
+ SELECT * FROM task_events
+ WHERE task_id = $1 AND event_type = 'output'
+ ORDER BY created_at ASC
+ LIMIT $2
+ "#
+ )
+ .bind(task_id)
+ .bind(limit)
+ .fetch_all(pool)
+ .await?;
+
+ // Convert task events to conversation messages
+ let mut messages = Vec::new();
+ for event in events {
+ if let Some(data) = event.event_data {
+ // Parse the event data to extract message info
+ if let Ok(output) = serde_json::from_value::<TaskOutputEvent>(data.clone()) {
+ let should_include = match output.message_type.as_str() {
+ "tool_use" => include_tool_calls,
+ "tool_result" => include_tool_results,
+ _ => true,
+ };
+
+ if should_include {
+ messages.push(ConversationMessage {
+ id: event.id.to_string(),
+ role: match output.message_type.as_str() {
+ "assistant" => "assistant".to_string(),
+ "tool_use" => "assistant".to_string(),
+ "tool_result" => "tool".to_string(),
+ "system" => "system".to_string(),
+ "error" => "system".to_string(),
+ _ => "user".to_string(),
+ },
+ content: output.content.unwrap_or_default(),
+ timestamp: event.created_at,
+ tool_calls: None,
+ tool_name: output.tool_name,
+ tool_input: output.tool_input,
+ tool_result: None,
+ is_error: output.is_error,
+ token_count: None,
+ cost_usd: output.cost_usd.map(|c| c as f64),
+ });
+ }
+ }
+ }
+ }
+
+ Ok(messages)
+}
+
+/// Get supervisor conversation (from supervisor_states)
+pub async fn get_supervisor_conversation_full(
+ pool: &PgPool,
+ contract_id: Uuid,
+) -> Result<Option<SupervisorState>, sqlx::Error> {
+ get_supervisor_state(pool, contract_id).await
+}
diff --git a/makima/src/server/handlers/history.rs b/makima/src/server/handlers/history.rs
new file mode 100644
index 0000000..b3dec97
--- /dev/null
+++ b/makima/src/server/handlers/history.rs
@@ -0,0 +1,438 @@
+//! HTTP handlers for history and conversation APIs.
+
+use axum::{
+ extract::{Path, Query, State},
+ http::StatusCode,
+ response::IntoResponse,
+ Json,
+};
+use uuid::Uuid;
+
+use crate::{
+ db::{
+ models::{
+ ContractHistoryResponse, ConversationMessage, HistoryQueryFilters,
+ SupervisorConversationResponse, TaskConversationResponse, TaskReference,
+ },
+ repository,
+ },
+ server::{auth::Authenticated, messages::ApiError, state::SharedState},
+};
+
+/// Query parameters for task conversation
+#[derive(Debug, serde::Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct TaskConversationParams {
+ pub include_tool_calls: Option<bool>,
+ pub include_tool_results: Option<bool>,
+ pub limit: Option<i32>,
+}
+
+/// Query parameters for timeline
+#[derive(Debug, serde::Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct TimelineQueryFilters {
+ pub contract_id: Option<Uuid>,
+ pub task_id: Option<Uuid>,
+ pub include_subtasks: Option<bool>,
+ pub from: Option<chrono::DateTime<chrono::Utc>>,
+ pub to: Option<chrono::DateTime<chrono::Utc>>,
+ pub limit: Option<i32>,
+}
+
+/// GET /api/v1/contracts/{id}/history
+/// Returns contract history timeline with filtering and pagination
+#[utoipa::path(
+ get,
+ path = "/api/v1/contracts/{id}/history",
+ params(
+ ("id" = Uuid, Path, description = "Contract ID"),
+ ("phase" = Option<String>, Query, description = "Filter by phase"),
+ ("event_types" = Option<String>, Query, description = "Filter by event types (comma-separated)"),
+ ("from" = Option<String>, Query, description = "Start date filter"),
+ ("to" = Option<String>, Query, description = "End date filter"),
+ ("limit" = Option<i32>, Query, description = "Limit results"),
+ ),
+ responses(
+ (status = 200, description = "Contract history", body = ContractHistoryResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 403, description = "Forbidden", body = ApiError),
+ (status = 404, description = "Contract not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "History"
+)]
+pub async fn get_contract_history(
+ State(state): State<SharedState>,
+ Path(contract_id): Path<Uuid>,
+ Query(filters): Query<HistoryQueryFilters>,
+ Authenticated(auth): Authenticated,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Verify contract exists and user has access
+ let contract = match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
+ Ok(Some(c)) => c,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Contract not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get contract {}: {}", contract_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get history events
+ match repository::get_contract_history(pool, contract.id, auth.owner_id, &filters).await {
+ Ok((events, total_count)) => {
+ Json(ContractHistoryResponse {
+ contract_id,
+ entries: events,
+ total_count,
+ cursor: None, // TODO: implement cursor pagination
+ })
+ .into_response()
+ }
+ Err(e) => {
+ tracing::error!("Failed to get contract history: {}", e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ }
+}
+
+/// GET /api/v1/contracts/{id}/supervisor/conversation
+/// Returns full supervisor conversation with spawned task references
+#[utoipa::path(
+ get,
+ path = "/api/v1/contracts/{id}/supervisor/conversation",
+ params(
+ ("id" = Uuid, Path, description = "Contract ID")
+ ),
+ responses(
+ (status = 200, description = "Supervisor conversation", body = SupervisorConversationResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 403, description = "Forbidden", body = ApiError),
+ (status = 404, description = "Supervisor not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "History"
+)]
+pub async fn get_supervisor_conversation(
+ State(state): State<SharedState>,
+ Path(contract_id): Path<Uuid>,
+ Authenticated(auth): Authenticated,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get contract for phase info and ownership check
+ let contract = match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
+ Ok(Some(c)) => c,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Contract not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get contract {}: {}", contract_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get the supervisor state
+ let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await {
+ Ok(Some(s)) => s,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Supervisor not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get supervisor state for {}: {}", contract_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Parse conversation history from JSONB
+ let messages: Vec<ConversationMessage> = supervisor_state
+ .conversation_history
+ .as_array()
+ .map(|arr| {
+ arr.iter()
+ .enumerate()
+ .map(|(i, v)| ConversationMessage {
+ id: i.to_string(),
+ role: v
+ .get("role")
+ .and_then(|r| r.as_str())
+ .unwrap_or("user")
+ .to_string(),
+ content: v
+ .get("content")
+ .and_then(|c| c.as_str())
+ .unwrap_or("")
+ .to_string(),
+ timestamp: supervisor_state.last_activity,
+ tool_calls: None,
+ tool_name: None,
+ tool_input: None,
+ tool_result: None,
+ is_error: None,
+ token_count: None,
+ cost_usd: None,
+ })
+ .collect()
+ })
+ .unwrap_or_default();
+
+ // Get spawned tasks
+ let tasks = match repository::list_contract_tasks(pool, contract_id).await {
+ Ok(t) => t,
+ Err(e) => {
+ tracing::warn!("Failed to get tasks for contract {}: {}", contract_id, e);
+ Vec::new()
+ }
+ };
+
+ let spawned_tasks: Vec<TaskReference> = tasks
+ .into_iter()
+ .filter(|t| !t.is_supervisor)
+ .map(|t| TaskReference {
+ task_id: t.id,
+ task_name: t.name,
+ status: t.status,
+ created_at: t.created_at,
+ completed_at: t.completed_at,
+ })
+ .collect();
+
+ Json(SupervisorConversationResponse {
+ contract_id,
+ supervisor_task_id: supervisor_state.task_id,
+ phase: contract.phase,
+ last_activity: supervisor_state.last_activity,
+ pending_task_ids: supervisor_state.pending_task_ids,
+ messages,
+ spawned_tasks,
+ })
+ .into_response()
+}
+
+/// GET /api/v1/mesh/tasks/{id}/conversation
+/// Returns task conversation history
+#[utoipa::path(
+ get,
+ path = "/api/v1/mesh/tasks/{id}/conversation",
+ params(
+ ("id" = Uuid, Path, description = "Task ID"),
+ ("include_tool_calls" = Option<bool>, Query, description = "Include tool call messages"),
+ ("include_tool_results" = Option<bool>, Query, description = "Include tool result messages"),
+ ("limit" = Option<i32>, Query, description = "Limit messages"),
+ ),
+ responses(
+ (status = 200, description = "Task conversation", body = TaskConversationResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 403, description = "Forbidden", body = ApiError),
+ (status = 404, description = "Task not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "History"
+)]
+pub async fn get_task_conversation(
+ State(state): State<SharedState>,
+ Path(task_id): Path<Uuid>,
+ Query(params): Query<TaskConversationParams>,
+ Authenticated(auth): Authenticated,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get task and verify ownership
+ let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task {}: {}", task_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get conversation messages
+ let messages = match repository::get_task_conversation(
+ pool,
+ task_id,
+ params.include_tool_calls.unwrap_or(true),
+ params.include_tool_results.unwrap_or(true),
+ params.limit,
+ )
+ .await
+ {
+ Ok(m) => m,
+ Err(e) => {
+ tracing::error!("Failed to get task conversation: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Calculate totals
+ let total_cost: f64 = messages.iter().filter_map(|m| m.cost_usd).sum();
+
+ Json(TaskConversationResponse {
+ task_id,
+ task_name: task.name,
+ status: task.status,
+ messages,
+ total_tokens: None,
+ total_cost: if total_cost > 0.0 {
+ Some(total_cost)
+ } else {
+ None
+ },
+ })
+ .into_response()
+}
+
+/// GET /api/v1/timeline
+/// Returns unified timeline for authenticated user
+#[utoipa::path(
+ get,
+ path = "/api/v1/timeline",
+ params(
+ ("contract_id" = Option<Uuid>, Query, description = "Filter by contract"),
+ ("task_id" = Option<Uuid>, Query, description = "Filter by task"),
+ ("include_subtasks" = Option<bool>, Query, description = "Include subtask events"),
+ ("from" = Option<String>, Query, description = "Start date filter"),
+ ("to" = Option<String>, Query, description = "End date filter"),
+ ("limit" = Option<i32>, Query, description = "Limit results"),
+ ),
+ responses(
+ (status = 200, description = "Timeline events", body = ContractHistoryResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "History"
+)]
+pub async fn get_timeline(
+ State(state): State<SharedState>,
+ Query(filters): Query<TimelineQueryFilters>,
+ Authenticated(auth): Authenticated,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ let history_filters = HistoryQueryFilters {
+ phase: None,
+ event_types: None,
+ from: filters.from,
+ to: filters.to,
+ limit: filters.limit,
+ cursor: None,
+ };
+
+ let result = if let Some(contract_id) = filters.contract_id {
+ repository::get_contract_history(pool, contract_id, auth.owner_id, &history_filters).await
+ } else if let Some(task_id) = filters.task_id {
+ repository::get_task_history(pool, task_id, auth.owner_id, &history_filters).await
+ } else {
+ repository::get_timeline(pool, auth.owner_id, &history_filters).await
+ };
+
+ match result {
+ Ok((events, total_count)) => {
+ Json(ContractHistoryResponse {
+ contract_id: filters.contract_id.unwrap_or_default(),
+ entries: events,
+ total_count,
+ cursor: None,
+ })
+ .into_response()
+ }
+ Err(e) => {
+ tracing::error!("Failed to get timeline: {}", e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ }
+}
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs
index 3da6fd5..b5ade53 100644
--- a/makima/src/server/handlers/mesh.rs
+++ b/makima/src/server/handlers/mesh.rs
@@ -2459,3 +2459,667 @@ pub async fn continue_task(
})
.into_response()
}
+
+// =============================================================================
+// Task Rewind and Fork (Resume and History System)
+// =============================================================================
+
+/// Rewind task code to specified checkpoint.
+///
+/// POST /api/v1/mesh/tasks/{id}/rewind
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks/{id}/rewind",
+ params(
+ ("id" = Uuid, Path, description = "Task ID")
+ ),
+ request_body = crate::db::models::RewindTaskRequest,
+ responses(
+ (status = 200, description = "Task rewound successfully", body = crate::db::models::RewindTaskResponse),
+ (status = 400, description = "Invalid request", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task or checkpoint not found", body = ApiError),
+ (status = 409, description = "Cannot rewind a running task", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn rewind_task(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(task_id): Path<Uuid>,
+ Json(req): Json<crate::db::models::RewindTaskRequest>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get task and verify ownership
+ let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task {}: {}", task_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Task cannot be running during rewind
+ if task.status == "running" {
+ return (
+ StatusCode::CONFLICT,
+ Json(ApiError::new("TASK_RUNNING", "Cannot rewind a running task")),
+ )
+ .into_response();
+ }
+
+ // Get checkpoint info
+ let checkpoint = if let Some(checkpoint_id) = req.checkpoint_id {
+ match repository::get_task_checkpoint(pool, checkpoint_id).await {
+ Ok(Some(c)) => c,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Checkpoint not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get checkpoint: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ }
+ } else if let Some(ref sha) = req.checkpoint_sha {
+ match repository::get_task_checkpoint_by_sha(pool, sha).await {
+ Ok(Some(c)) => c,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Checkpoint not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get checkpoint by SHA: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ }
+ } else {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "MISSING_CHECKPOINT",
+ "Must provide checkpoint_id or checkpoint_sha",
+ )),
+ )
+ .into_response();
+ };
+
+ // Verify checkpoint belongs to this task
+ if checkpoint.task_id != task_id {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "CHECKPOINT_MISMATCH",
+ "Checkpoint does not belong to this task",
+ )),
+ )
+ .into_response();
+ }
+
+ // TODO: Send rewind command to daemon when daemon integration is complete
+ // For now, return a success response with checkpoint info
+
+ tracing::info!(
+ task_id = %task_id,
+ checkpoint_number = checkpoint.checkpoint_number,
+ commit_sha = %checkpoint.commit_sha,
+ "Task rewind requested"
+ );
+
+ Json(crate::db::models::RewindTaskResponse {
+ task_id,
+ rewinded_to: crate::db::models::CheckpointInfo {
+ checkpoint_number: checkpoint.checkpoint_number,
+ sha: checkpoint.commit_sha.clone(),
+ message: checkpoint.message,
+ },
+ preserved_as: req.branch_name.map(|name| crate::db::models::PreservedState {
+ state_type: "branch".to_string(),
+ reference: name,
+ }),
+ })
+ .into_response()
+}
+
+/// Fork task from historical point.
+///
+/// POST /api/v1/mesh/tasks/{id}/fork
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks/{id}/fork",
+ params(
+ ("id" = Uuid, Path, description = "Task ID")
+ ),
+ request_body = crate::db::models::ForkTaskRequest,
+ responses(
+ (status = 201, description = "Task forked successfully", body = crate::db::models::ForkTaskResponse),
+ (status = 400, description = "Invalid request", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task or checkpoint not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn fork_task(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(task_id): Path<Uuid>,
+ Json(req): Json<crate::db::models::ForkTaskRequest>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get source task and verify ownership
+ let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task {}: {}", task_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Find the checkpoint to fork from
+ let checkpoint = match req.fork_from_type.as_str() {
+ "checkpoint" => {
+ // fork_from_value is checkpoint number
+ let checkpoint_num: i32 = match req.fork_from_value.parse() {
+ Ok(n) => n,
+ Err(_) => {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new("INVALID_CHECKPOINT", "Invalid checkpoint number")),
+ )
+ .into_response();
+ }
+ };
+
+ let checkpoints = match repository::list_task_checkpoints(pool, task_id).await {
+ Ok(c) => c,
+ Err(e) => {
+ tracing::error!("Failed to list checkpoints: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ match checkpoints
+ .into_iter()
+ .find(|c| c.checkpoint_number == checkpoint_num)
+ {
+ Some(c) => c,
+ None => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Checkpoint not found")),
+ )
+ .into_response();
+ }
+ }
+ }
+ _ => {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "UNSUPPORTED_FORK_TYPE",
+ "Only 'checkpoint' fork type is currently supported",
+ )),
+ )
+ .into_response();
+ }
+ };
+
+ // Create the new forked task
+ let create_req = CreateTaskRequest {
+ contract_id: task.contract_id.unwrap_or(Uuid::nil()),
+ name: req.new_task_name.clone(),
+ description: task.description.clone(),
+ plan: req.new_task_plan.clone(),
+ parent_task_id: None, // Forked tasks are independent
+ is_supervisor: false,
+ priority: task.priority,
+ repository_url: task.repository_url.clone(),
+ base_branch: task.base_branch.clone(),
+ target_branch: None, // New branch for forked work
+ merge_mode: task.merge_mode.clone(),
+ target_repo_path: task.target_repo_path.clone(),
+ completion_action: task.completion_action.clone(),
+ continue_from_task_id: None,
+ copy_files: None,
+ checkpoint_sha: Some(checkpoint.commit_sha.clone()),
+ };
+
+ let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
+ Ok(t) => t,
+ Err(e) => {
+ tracing::error!("Failed to create forked task: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ tracing::info!(
+ source_task_id = %task_id,
+ new_task_id = %new_task.id,
+ checkpoint_number = checkpoint.checkpoint_number,
+ "Task forked from checkpoint"
+ );
+
+ (
+ StatusCode::CREATED,
+ Json(crate::db::models::ForkTaskResponse {
+ new_task_id: new_task.id,
+ source_task_id: task_id,
+ fork_point: crate::db::models::ForkPoint {
+ fork_type: "checkpoint".to_string(),
+ checkpoint: Some(checkpoint.clone()),
+ timestamp: checkpoint.created_at,
+ },
+ branch_name: req.branch_name,
+ conversation_included: req.include_conversation.unwrap_or(false),
+ message_count: None,
+ }),
+ )
+ .into_response()
+}
+
+/// Create new task starting from specific checkpoint.
+///
+/// POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume",
+ params(
+ ("id" = Uuid, Path, description = "Task ID"),
+ ("cid" = Uuid, Path, description = "Checkpoint ID")
+ ),
+ request_body = crate::db::models::ResumeFromCheckpointRequest,
+ responses(
+ (status = 201, description = "Task created from checkpoint", body = Task),
+ (status = 400, description = "Invalid request", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task or checkpoint not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn resume_from_checkpoint(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path((task_id, checkpoint_id)): Path<(Uuid, Uuid)>,
+ Json(req): Json<crate::db::models::ResumeFromCheckpointRequest>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get source task and verify ownership
+ let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task {}: {}", task_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get checkpoint
+ let checkpoint = match repository::get_task_checkpoint(pool, checkpoint_id).await {
+ Ok(Some(c)) => c,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Checkpoint not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get checkpoint: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Verify checkpoint belongs to the task
+ if checkpoint.task_id != task_id {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "CHECKPOINT_MISMATCH",
+ "Checkpoint does not belong to this task",
+ )),
+ )
+ .into_response();
+ }
+
+ // Create the new task that will start from checkpoint
+ let task_name = req.task_name.unwrap_or_else(|| {
+ format!(
+ "{} (resumed from checkpoint {})",
+ task.name, checkpoint.checkpoint_number
+ )
+ });
+
+ let create_req = CreateTaskRequest {
+ contract_id: task.contract_id.unwrap_or(Uuid::nil()),
+ name: task_name,
+ description: task.description.clone(),
+ plan: req.plan,
+ parent_task_id: None,
+ is_supervisor: false,
+ priority: task.priority,
+ repository_url: task.repository_url.clone(),
+ base_branch: task.base_branch.clone(),
+ target_branch: None, // New branch for resumed work
+ merge_mode: task.merge_mode.clone(),
+ target_repo_path: task.target_repo_path.clone(),
+ completion_action: task.completion_action.clone(),
+ continue_from_task_id: Some(task_id), // Copy worktree from original task
+ copy_files: None,
+ checkpoint_sha: Some(checkpoint.commit_sha.clone()),
+ };
+
+ let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
+ Ok(t) => t,
+ Err(e) => {
+ tracing::error!("Failed to create resumed task: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ tracing::info!(
+ source_task_id = %task_id,
+ new_task_id = %new_task.id,
+ checkpoint_id = %checkpoint_id,
+ checkpoint_number = checkpoint.checkpoint_number,
+ "Task resumed from checkpoint"
+ );
+
+ (StatusCode::CREATED, Json(new_task)).into_response()
+}
+
+/// Request to create branch from checkpoint.
+#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct CreateBranchFromCheckpointRequest {
+ pub branch_name: String,
+ #[serde(default)]
+ pub checkout: bool,
+}
+
+/// Response for branch creation from checkpoint.
+#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct BranchCreatedResponse {
+ pub branch_name: String,
+ pub commit_sha: String,
+ pub task_id: Uuid,
+ pub checkpoint_number: i32,
+}
+
+/// Create git branch from checkpoint without starting task.
+///
+/// POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch",
+ params(
+ ("id" = Uuid, Path, description = "Task ID"),
+ ("cid" = Uuid, Path, description = "Checkpoint ID")
+ ),
+ request_body = CreateBranchFromCheckpointRequest,
+ responses(
+ (status = 201, description = "Branch created", body = BranchCreatedResponse),
+ (status = 400, description = "Invalid request", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task or checkpoint not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn branch_from_checkpoint(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path((task_id, checkpoint_id)): Path<(Uuid, Uuid)>,
+ Json(req): Json<CreateBranchFromCheckpointRequest>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get task and verify ownership
+ let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task {}: {}", task_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get checkpoint
+ let checkpoint = match repository::get_task_checkpoint(pool, checkpoint_id).await {
+ Ok(Some(c)) => c,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Checkpoint not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get checkpoint: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Verify checkpoint belongs to the task
+ if checkpoint.task_id != task_id {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "CHECKPOINT_MISMATCH",
+ "Checkpoint does not belong to this task",
+ )),
+ )
+ .into_response();
+ }
+
+ // Find a daemon to execute the branch creation
+ let target_daemon_id = if let Some(daemon_id) = task.daemon_id {
+ // Check if the original daemon is still connected
+ if state
+ .daemon_connections
+ .iter()
+ .any(|d| d.value().id == daemon_id && d.value().owner_id == auth.owner_id)
+ {
+ daemon_id
+ } else {
+ // Find any connected daemon for this owner
+ match state
+ .daemon_connections
+ .iter()
+ .find(|d| d.value().owner_id == auth.owner_id)
+ {
+ Some(d) => d.value().id,
+ None => {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new(
+ "NO_DAEMON",
+ "No daemon connected to create branch",
+ )),
+ )
+ .into_response();
+ }
+ }
+ }
+ } else {
+ // No daemon assigned - use any available for this owner
+ match state
+ .daemon_connections
+ .iter()
+ .find(|d| d.value().owner_id == auth.owner_id)
+ {
+ Some(d) => d.value().id,
+ None => {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new(
+ "NO_DAEMON",
+ "No daemon connected to create branch",
+ )),
+ )
+ .into_response();
+ }
+ }
+ };
+
+ // Send CreateBranch command to daemon
+ let cmd = DaemonCommand::CreateBranch {
+ task_id,
+ branch_name: req.branch_name.clone(),
+ from_ref: Some(checkpoint.commit_sha.clone()),
+ };
+
+ if let Err(e) = state.send_daemon_command(target_daemon_id, cmd).await {
+ tracing::error!("Failed to send CreateBranch command: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DAEMON_ERROR", e)),
+ )
+ .into_response();
+ }
+
+ tracing::info!(
+ task_id = %task_id,
+ checkpoint_id = %checkpoint_id,
+ branch_name = %req.branch_name,
+ commit_sha = %checkpoint.commit_sha,
+ "Branch creation requested from checkpoint"
+ );
+
+ (
+ StatusCode::CREATED,
+ Json(BranchCreatedResponse {
+ branch_name: req.branch_name,
+ commit_sha: checkpoint.commit_sha,
+ task_id,
+ checkpoint_number: checkpoint.checkpoint_number,
+ }),
+ )
+ .into_response()
+}
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index 278d0f5..b45dda5 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -1504,3 +1504,384 @@ pub async fn answer_question(
Json(AnswerQuestionResponse { success }).into_response()
}
+
+// =============================================================================
+// Supervisor Resume and Conversation Rewind
+// =============================================================================
+
+/// Response for supervisor resume
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ResumeSupervisorResponse {
+ pub supervisor_task_id: Uuid,
+ pub daemon_id: Option<Uuid>,
+ pub resumed_from: ResumedFromInfo,
+ pub status: String,
+}
+
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ResumedFromInfo {
+ pub phase: String,
+ pub last_activity: chrono::DateTime<chrono::Utc>,
+ pub message_count: i32,
+}
+
+/// Resume interrupted supervisor with specified mode.
+///
+/// POST /api/v1/contracts/{id}/supervisor/resume
+#[utoipa::path(
+ post,
+ path = "/api/v1/contracts/{id}/supervisor/resume",
+ params(
+ ("id" = Uuid, Path, description = "Contract ID")
+ ),
+ request_body = crate::db::models::ResumeSupervisorRequest,
+ responses(
+ (status = 200, description = "Supervisor resumed", body = ResumeSupervisorResponse),
+ (status = 400, description = "Invalid request", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Contract or supervisor not found", body = ApiError),
+ (status = 409, description = "Supervisor is already running", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh Supervisor"
+)]
+pub async fn resume_supervisor(
+ State(state): State<SharedState>,
+ Path(contract_id): Path<Uuid>,
+ auth: crate::server::auth::Authenticated,
+ Json(req): Json<crate::db::models::ResumeSupervisorRequest>,
+) -> impl IntoResponse {
+ let crate::server::auth::Authenticated(auth_info) = auth;
+
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get contract and verify ownership
+ let contract = match repository::get_contract_for_owner(pool, contract_id, auth_info.owner_id).await {
+ Ok(Some(c)) => c,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Contract not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get contract {}: {}", contract_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get existing supervisor state
+ let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await {
+ Ok(Some(s)) => s,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new(
+ "NO_SUPERVISOR_STATE",
+ "No supervisor state found - supervisor may not have been started",
+ )),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get supervisor state: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get supervisor task
+ let supervisor_task = match repository::get_task_for_owner(pool, supervisor_state.task_id, auth_info.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Supervisor task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get supervisor task: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Check if already running
+ if supervisor_task.status == "running" {
+ return (
+ StatusCode::CONFLICT,
+ Json(ApiError::new("ALREADY_RUNNING", "Supervisor is already running")),
+ )
+ .into_response();
+ }
+
+ // Calculate message count from conversation history
+ let message_count = supervisor_state
+ .conversation_history
+ .as_array()
+ .map(|arr| arr.len() as i32)
+ .unwrap_or(0);
+
+ // Based on resume mode, handle differently
+ match req.resume_mode.as_str() {
+ "continue" => {
+ // Mark task for reassignment with existing conversation context
+ if let Err(e) = sqlx::query("UPDATE tasks SET status = 'pending' WHERE id = $1")
+ .bind(supervisor_state.task_id)
+ .execute(pool)
+ .await
+ {
+ tracing::error!("Failed to update task status: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ }
+ "restart_phase" => {
+ // Clear conversation but keep phase progress
+ if let Err(e) = repository::update_supervisor_conversation(
+ pool,
+ contract_id,
+ serde_json::json!([]),
+ )
+ .await
+ {
+ tracing::error!("Failed to clear conversation: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+
+ if let Err(e) = sqlx::query("UPDATE tasks SET status = 'pending' WHERE id = $1")
+ .bind(supervisor_state.task_id)
+ .execute(pool)
+ .await
+ {
+ tracing::error!("Failed to update task status: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ }
+ "from_checkpoint" => {
+ // This would require more complex handling with checkpoint system
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "NOT_IMPLEMENTED",
+ "from_checkpoint mode not yet implemented",
+ )),
+ )
+ .into_response();
+ }
+ _ => {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "INVALID_RESUME_MODE",
+ "Invalid resume_mode. Use: continue, restart_phase, or from_checkpoint",
+ )),
+ )
+ .into_response();
+ }
+ }
+
+ tracing::info!(
+ contract_id = %contract_id,
+ supervisor_task_id = %supervisor_state.task_id,
+ resume_mode = %req.resume_mode,
+ message_count = message_count,
+ "Supervisor resume requested"
+ );
+
+ Json(ResumeSupervisorResponse {
+ supervisor_task_id: supervisor_state.task_id,
+ daemon_id: supervisor_task.daemon_id,
+ resumed_from: ResumedFromInfo {
+ phase: contract.phase,
+ last_activity: supervisor_state.last_activity,
+ message_count,
+ },
+ status: "pending".to_string(),
+ })
+ .into_response()
+}
+
+/// Response for conversation rewind
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct RewindConversationResponse {
+ pub contract_id: Uuid,
+ pub messages_removed: i32,
+ pub new_message_count: i32,
+ pub code_rewound: bool,
+}
+
+/// Rewind supervisor conversation to specified point.
+///
+/// POST /api/v1/contracts/{id}/supervisor/conversation/rewind
+#[utoipa::path(
+ post,
+ path = "/api/v1/contracts/{id}/supervisor/conversation/rewind",
+ params(
+ ("id" = Uuid, Path, description = "Contract ID")
+ ),
+ request_body = crate::db::models::RewindConversationRequest,
+ responses(
+ (status = 200, description = "Conversation rewound", body = RewindConversationResponse),
+ (status = 400, description = "Invalid request", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Contract or supervisor not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh Supervisor"
+)]
+pub async fn rewind_conversation(
+ State(state): State<SharedState>,
+ Path(contract_id): Path<Uuid>,
+ auth: crate::server::auth::Authenticated,
+ Json(req): Json<crate::db::models::RewindConversationRequest>,
+) -> impl IntoResponse {
+ let crate::server::auth::Authenticated(auth_info) = auth;
+
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get contract and verify ownership
+ let _contract = match repository::get_contract_for_owner(pool, contract_id, auth_info.owner_id).await {
+ Ok(Some(c)) => c,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Contract not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get contract {}: {}", contract_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get supervisor state
+ let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await {
+ Ok(Some(s)) => s,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Supervisor state not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get supervisor state: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ let conversation = supervisor_state
+ .conversation_history
+ .as_array()
+ .cloned()
+ .unwrap_or_default();
+
+ let original_count = conversation.len() as i32;
+
+ // Determine how many messages to keep
+ let new_count = if let Some(by_count) = req.by_message_count {
+ (original_count - by_count).max(0)
+ } else if let Some(to_index) = req.to_message_index {
+ // Keep messages up to and including the specified index
+ (to_index + 1).min(original_count).max(0)
+ } else {
+ // Default to removing last message
+ (original_count - 1).max(0)
+ };
+
+ // Truncate conversation
+ let new_conversation: Vec<serde_json::Value> = conversation
+ .into_iter()
+ .take(new_count as usize)
+ .collect();
+
+ // Update the conversation
+ if let Err(e) = repository::update_supervisor_conversation(
+ pool,
+ contract_id,
+ serde_json::Value::Array(new_conversation),
+ )
+ .await
+ {
+ tracing::error!("Failed to update conversation: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+
+ tracing::info!(
+ contract_id = %contract_id,
+ original_count = original_count,
+ new_count = new_count,
+ messages_removed = original_count - new_count,
+ "Conversation rewound"
+ );
+
+ Json(RewindConversationResponse {
+ contract_id,
+ messages_removed: original_count - new_count,
+ new_message_count: new_count,
+ code_rewound: req.rewind_code.unwrap_or(false), // TODO: implement code rewind
+ })
+ .into_response()
+}
diff --git a/makima/src/server/handlers/mod.rs b/makima/src/server/handlers/mod.rs
index b5650fd..609b63b 100644
--- a/makima/src/server/handlers/mod.rs
+++ b/makima/src/server/handlers/mod.rs
@@ -7,6 +7,7 @@ pub mod contract_daemon;
pub mod contracts;
pub mod file_ws;
pub mod files;
+pub mod history;
pub mod listen;
pub mod mesh;
pub mod mesh_chat;
diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs
index 0eba009..cf63f71 100644
--- a/makima/src/server/mod.rs
+++ b/makima/src/server/mod.rs
@@ -105,6 +105,11 @@ pub fn make_router(state: SharedState) -> Router {
// Checkpoint endpoints
.route("/mesh/tasks/{id}/checkpoint", post(mesh_supervisor::create_checkpoint))
.route("/mesh/tasks/{id}/checkpoints", get(mesh_supervisor::list_checkpoints))
+ // Resume and rewind endpoints
+ .route("/mesh/tasks/{id}/rewind", post(mesh::rewind_task))
+ .route("/mesh/tasks/{id}/fork", post(mesh::fork_task))
+ .route("/mesh/tasks/{id}/checkpoints/{cid}/resume", post(mesh::resume_from_checkpoint))
+ .route("/mesh/tasks/{id}/checkpoints/{cid}/branch", post(mesh::branch_from_checkpoint))
// Supervisor endpoints (for supervisor.sh)
.route("/mesh/supervisor/contracts/{contract_id}/tasks", get(mesh_supervisor::list_contract_tasks))
.route("/mesh/supervisor/contracts/{contract_id}/tree", get(mesh_supervisor::get_contract_tree))
@@ -156,6 +161,9 @@ pub fn make_router(state: SharedState) -> Router {
"/contracts/{id}/chat/history",
get(contract_chat::get_contract_chat_history).delete(contract_chat::clear_contract_chat_history),
)
+ // Contract supervisor resume endpoints
+ .route("/contracts/{id}/supervisor/resume", post(mesh_supervisor::resume_supervisor))
+ .route("/contracts/{id}/supervisor/conversation/rewind", post(mesh_supervisor::rewind_conversation))
// Contract daemon endpoints (for tasks to interact with contracts)
.route("/contracts/{id}/daemon/status", get(contract_daemon::get_contract_status))
.route("/contracts/{id}/daemon/checklist", get(contract_daemon::get_contract_checklist))