summaryrefslogtreecommitdiff
path: root/makima/src/server
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/server')
-rw-r--r--makima/src/server/handlers/history.rs438
-rw-r--r--makima/src/server/handlers/mesh.rs664
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs381
-rw-r--r--makima/src/server/handlers/mod.rs1
-rw-r--r--makima/src/server/mod.rs8
5 files changed, 1492 insertions, 0 deletions
diff --git a/makima/src/server/handlers/history.rs b/makima/src/server/handlers/history.rs
new file mode 100644
index 0000000..b3dec97
--- /dev/null
+++ b/makima/src/server/handlers/history.rs
@@ -0,0 +1,438 @@
+//! HTTP handlers for history and conversation APIs.
+
+use axum::{
+ extract::{Path, Query, State},
+ http::StatusCode,
+ response::IntoResponse,
+ Json,
+};
+use uuid::Uuid;
+
+use crate::{
+ db::{
+ models::{
+ ContractHistoryResponse, ConversationMessage, HistoryQueryFilters,
+ SupervisorConversationResponse, TaskConversationResponse, TaskReference,
+ },
+ repository,
+ },
+ server::{auth::Authenticated, messages::ApiError, state::SharedState},
+};
+
+/// Query parameters for task conversation
+#[derive(Debug, serde::Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct TaskConversationParams {
+ pub include_tool_calls: Option<bool>,
+ pub include_tool_results: Option<bool>,
+ pub limit: Option<i32>,
+}
+
+/// Query parameters for timeline
+#[derive(Debug, serde::Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct TimelineQueryFilters {
+ pub contract_id: Option<Uuid>,
+ pub task_id: Option<Uuid>,
+ pub include_subtasks: Option<bool>,
+ pub from: Option<chrono::DateTime<chrono::Utc>>,
+ pub to: Option<chrono::DateTime<chrono::Utc>>,
+ pub limit: Option<i32>,
+}
+
+/// GET /api/v1/contracts/{id}/history
+/// Returns contract history timeline with filtering and pagination
+#[utoipa::path(
+ get,
+ path = "/api/v1/contracts/{id}/history",
+ params(
+ ("id" = Uuid, Path, description = "Contract ID"),
+ ("phase" = Option<String>, Query, description = "Filter by phase"),
+ ("event_types" = Option<String>, Query, description = "Filter by event types (comma-separated)"),
+ ("from" = Option<String>, Query, description = "Start date filter"),
+ ("to" = Option<String>, Query, description = "End date filter"),
+ ("limit" = Option<i32>, Query, description = "Limit results"),
+ ),
+ responses(
+ (status = 200, description = "Contract history", body = ContractHistoryResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 403, description = "Forbidden", body = ApiError),
+ (status = 404, description = "Contract not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "History"
+)]
+pub async fn get_contract_history(
+ State(state): State<SharedState>,
+ Path(contract_id): Path<Uuid>,
+ Query(filters): Query<HistoryQueryFilters>,
+ Authenticated(auth): Authenticated,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Verify contract exists and user has access
+ let contract = match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
+ Ok(Some(c)) => c,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Contract not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get contract {}: {}", contract_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get history events
+ match repository::get_contract_history(pool, contract.id, auth.owner_id, &filters).await {
+ Ok((events, total_count)) => {
+ Json(ContractHistoryResponse {
+ contract_id,
+ entries: events,
+ total_count,
+ cursor: None, // TODO: implement cursor pagination
+ })
+ .into_response()
+ }
+ Err(e) => {
+ tracing::error!("Failed to get contract history: {}", e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ }
+}
+
+/// GET /api/v1/contracts/{id}/supervisor/conversation
+/// Returns full supervisor conversation with spawned task references
+#[utoipa::path(
+ get,
+ path = "/api/v1/contracts/{id}/supervisor/conversation",
+ params(
+ ("id" = Uuid, Path, description = "Contract ID")
+ ),
+ responses(
+ (status = 200, description = "Supervisor conversation", body = SupervisorConversationResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 403, description = "Forbidden", body = ApiError),
+ (status = 404, description = "Supervisor not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "History"
+)]
+pub async fn get_supervisor_conversation(
+ State(state): State<SharedState>,
+ Path(contract_id): Path<Uuid>,
+ Authenticated(auth): Authenticated,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get contract for phase info and ownership check
+ let contract = match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
+ Ok(Some(c)) => c,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Contract not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get contract {}: {}", contract_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get the supervisor state
+ let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await {
+ Ok(Some(s)) => s,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Supervisor not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get supervisor state for {}: {}", contract_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Parse conversation history from JSONB
+ let messages: Vec<ConversationMessage> = supervisor_state
+ .conversation_history
+ .as_array()
+ .map(|arr| {
+ arr.iter()
+ .enumerate()
+ .map(|(i, v)| ConversationMessage {
+ id: i.to_string(),
+ role: v
+ .get("role")
+ .and_then(|r| r.as_str())
+ .unwrap_or("user")
+ .to_string(),
+ content: v
+ .get("content")
+ .and_then(|c| c.as_str())
+ .unwrap_or("")
+ .to_string(),
+ timestamp: supervisor_state.last_activity,
+ tool_calls: None,
+ tool_name: None,
+ tool_input: None,
+ tool_result: None,
+ is_error: None,
+ token_count: None,
+ cost_usd: None,
+ })
+ .collect()
+ })
+ .unwrap_or_default();
+
+ // Get spawned tasks
+ let tasks = match repository::list_contract_tasks(pool, contract_id).await {
+ Ok(t) => t,
+ Err(e) => {
+ tracing::warn!("Failed to get tasks for contract {}: {}", contract_id, e);
+ Vec::new()
+ }
+ };
+
+ let spawned_tasks: Vec<TaskReference> = tasks
+ .into_iter()
+ .filter(|t| !t.is_supervisor)
+ .map(|t| TaskReference {
+ task_id: t.id,
+ task_name: t.name,
+ status: t.status,
+ created_at: t.created_at,
+ completed_at: t.completed_at,
+ })
+ .collect();
+
+ Json(SupervisorConversationResponse {
+ contract_id,
+ supervisor_task_id: supervisor_state.task_id,
+ phase: contract.phase,
+ last_activity: supervisor_state.last_activity,
+ pending_task_ids: supervisor_state.pending_task_ids,
+ messages,
+ spawned_tasks,
+ })
+ .into_response()
+}
+
+/// GET /api/v1/mesh/tasks/{id}/conversation
+/// Returns task conversation history
+#[utoipa::path(
+ get,
+ path = "/api/v1/mesh/tasks/{id}/conversation",
+ params(
+ ("id" = Uuid, Path, description = "Task ID"),
+ ("include_tool_calls" = Option<bool>, Query, description = "Include tool call messages"),
+ ("include_tool_results" = Option<bool>, Query, description = "Include tool result messages"),
+ ("limit" = Option<i32>, Query, description = "Limit messages"),
+ ),
+ responses(
+ (status = 200, description = "Task conversation", body = TaskConversationResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 403, description = "Forbidden", body = ApiError),
+ (status = 404, description = "Task not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "History"
+)]
+pub async fn get_task_conversation(
+ State(state): State<SharedState>,
+ Path(task_id): Path<Uuid>,
+ Query(params): Query<TaskConversationParams>,
+ Authenticated(auth): Authenticated,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get task and verify ownership
+ let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task {}: {}", task_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get conversation messages
+ let messages = match repository::get_task_conversation(
+ pool,
+ task_id,
+ params.include_tool_calls.unwrap_or(true),
+ params.include_tool_results.unwrap_or(true),
+ params.limit,
+ )
+ .await
+ {
+ Ok(m) => m,
+ Err(e) => {
+ tracing::error!("Failed to get task conversation: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Calculate totals
+ let total_cost: f64 = messages.iter().filter_map(|m| m.cost_usd).sum();
+
+ Json(TaskConversationResponse {
+ task_id,
+ task_name: task.name,
+ status: task.status,
+ messages,
+ total_tokens: None,
+ total_cost: if total_cost > 0.0 {
+ Some(total_cost)
+ } else {
+ None
+ },
+ })
+ .into_response()
+}
+
+/// GET /api/v1/timeline
+/// Returns unified timeline for authenticated user
+#[utoipa::path(
+ get,
+ path = "/api/v1/timeline",
+ params(
+ ("contract_id" = Option<Uuid>, Query, description = "Filter by contract"),
+ ("task_id" = Option<Uuid>, Query, description = "Filter by task"),
+ ("include_subtasks" = Option<bool>, Query, description = "Include subtask events"),
+ ("from" = Option<String>, Query, description = "Start date filter"),
+ ("to" = Option<String>, Query, description = "End date filter"),
+ ("limit" = Option<i32>, Query, description = "Limit results"),
+ ),
+ responses(
+ (status = 200, description = "Timeline events", body = ContractHistoryResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "History"
+)]
+pub async fn get_timeline(
+ State(state): State<SharedState>,
+ Query(filters): Query<TimelineQueryFilters>,
+ Authenticated(auth): Authenticated,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ let history_filters = HistoryQueryFilters {
+ phase: None,
+ event_types: None,
+ from: filters.from,
+ to: filters.to,
+ limit: filters.limit,
+ cursor: None,
+ };
+
+ let result = if let Some(contract_id) = filters.contract_id {
+ repository::get_contract_history(pool, contract_id, auth.owner_id, &history_filters).await
+ } else if let Some(task_id) = filters.task_id {
+ repository::get_task_history(pool, task_id, auth.owner_id, &history_filters).await
+ } else {
+ repository::get_timeline(pool, auth.owner_id, &history_filters).await
+ };
+
+ match result {
+ Ok((events, total_count)) => {
+ Json(ContractHistoryResponse {
+ contract_id: filters.contract_id.unwrap_or_default(),
+ entries: events,
+ total_count,
+ cursor: None,
+ })
+ .into_response()
+ }
+ Err(e) => {
+ tracing::error!("Failed to get timeline: {}", e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ }
+}
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs
index 3da6fd5..b5ade53 100644
--- a/makima/src/server/handlers/mesh.rs
+++ b/makima/src/server/handlers/mesh.rs
@@ -2459,3 +2459,667 @@ pub async fn continue_task(
})
.into_response()
}
+
+// =============================================================================
+// Task Rewind and Fork (Resume and History System)
+// =============================================================================
+
+/// Rewind task code to specified checkpoint.
+///
+/// POST /api/v1/mesh/tasks/{id}/rewind
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks/{id}/rewind",
+ params(
+ ("id" = Uuid, Path, description = "Task ID")
+ ),
+ request_body = crate::db::models::RewindTaskRequest,
+ responses(
+ (status = 200, description = "Task rewound successfully", body = crate::db::models::RewindTaskResponse),
+ (status = 400, description = "Invalid request", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task or checkpoint not found", body = ApiError),
+ (status = 409, description = "Cannot rewind a running task", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn rewind_task(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(task_id): Path<Uuid>,
+ Json(req): Json<crate::db::models::RewindTaskRequest>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get task and verify ownership
+ let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task {}: {}", task_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Task cannot be running during rewind
+ if task.status == "running" {
+ return (
+ StatusCode::CONFLICT,
+ Json(ApiError::new("TASK_RUNNING", "Cannot rewind a running task")),
+ )
+ .into_response();
+ }
+
+ // Get checkpoint info
+ let checkpoint = if let Some(checkpoint_id) = req.checkpoint_id {
+ match repository::get_task_checkpoint(pool, checkpoint_id).await {
+ Ok(Some(c)) => c,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Checkpoint not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get checkpoint: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ }
+ } else if let Some(ref sha) = req.checkpoint_sha {
+ match repository::get_task_checkpoint_by_sha(pool, sha).await {
+ Ok(Some(c)) => c,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Checkpoint not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get checkpoint by SHA: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ }
+ } else {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "MISSING_CHECKPOINT",
+ "Must provide checkpoint_id or checkpoint_sha",
+ )),
+ )
+ .into_response();
+ };
+
+ // Verify checkpoint belongs to this task
+ if checkpoint.task_id != task_id {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "CHECKPOINT_MISMATCH",
+ "Checkpoint does not belong to this task",
+ )),
+ )
+ .into_response();
+ }
+
+ // TODO: Send rewind command to daemon when daemon integration is complete
+ // For now, return a success response with checkpoint info
+
+ tracing::info!(
+ task_id = %task_id,
+ checkpoint_number = checkpoint.checkpoint_number,
+ commit_sha = %checkpoint.commit_sha,
+ "Task rewind requested"
+ );
+
+ Json(crate::db::models::RewindTaskResponse {
+ task_id,
+ rewinded_to: crate::db::models::CheckpointInfo {
+ checkpoint_number: checkpoint.checkpoint_number,
+ sha: checkpoint.commit_sha.clone(),
+ message: checkpoint.message,
+ },
+ preserved_as: req.branch_name.map(|name| crate::db::models::PreservedState {
+ state_type: "branch".to_string(),
+ reference: name,
+ }),
+ })
+ .into_response()
+}
+
+/// Fork task from historical point.
+///
+/// POST /api/v1/mesh/tasks/{id}/fork
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks/{id}/fork",
+ params(
+ ("id" = Uuid, Path, description = "Task ID")
+ ),
+ request_body = crate::db::models::ForkTaskRequest,
+ responses(
+ (status = 201, description = "Task forked successfully", body = crate::db::models::ForkTaskResponse),
+ (status = 400, description = "Invalid request", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task or checkpoint not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn fork_task(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(task_id): Path<Uuid>,
+ Json(req): Json<crate::db::models::ForkTaskRequest>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get source task and verify ownership
+ let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task {}: {}", task_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Find the checkpoint to fork from
+ let checkpoint = match req.fork_from_type.as_str() {
+ "checkpoint" => {
+ // fork_from_value is checkpoint number
+ let checkpoint_num: i32 = match req.fork_from_value.parse() {
+ Ok(n) => n,
+ Err(_) => {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new("INVALID_CHECKPOINT", "Invalid checkpoint number")),
+ )
+ .into_response();
+ }
+ };
+
+ let checkpoints = match repository::list_task_checkpoints(pool, task_id).await {
+ Ok(c) => c,
+ Err(e) => {
+ tracing::error!("Failed to list checkpoints: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ match checkpoints
+ .into_iter()
+ .find(|c| c.checkpoint_number == checkpoint_num)
+ {
+ Some(c) => c,
+ None => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Checkpoint not found")),
+ )
+ .into_response();
+ }
+ }
+ }
+ _ => {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "UNSUPPORTED_FORK_TYPE",
+ "Only 'checkpoint' fork type is currently supported",
+ )),
+ )
+ .into_response();
+ }
+ };
+
+ // Create the new forked task
+ let create_req = CreateTaskRequest {
+ contract_id: task.contract_id.unwrap_or(Uuid::nil()),
+ name: req.new_task_name.clone(),
+ description: task.description.clone(),
+ plan: req.new_task_plan.clone(),
+ parent_task_id: None, // Forked tasks are independent
+ is_supervisor: false,
+ priority: task.priority,
+ repository_url: task.repository_url.clone(),
+ base_branch: task.base_branch.clone(),
+ target_branch: None, // New branch for forked work
+ merge_mode: task.merge_mode.clone(),
+ target_repo_path: task.target_repo_path.clone(),
+ completion_action: task.completion_action.clone(),
+ continue_from_task_id: None,
+ copy_files: None,
+ checkpoint_sha: Some(checkpoint.commit_sha.clone()),
+ };
+
+ let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
+ Ok(t) => t,
+ Err(e) => {
+ tracing::error!("Failed to create forked task: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ tracing::info!(
+ source_task_id = %task_id,
+ new_task_id = %new_task.id,
+ checkpoint_number = checkpoint.checkpoint_number,
+ "Task forked from checkpoint"
+ );
+
+ (
+ StatusCode::CREATED,
+ Json(crate::db::models::ForkTaskResponse {
+ new_task_id: new_task.id,
+ source_task_id: task_id,
+ fork_point: crate::db::models::ForkPoint {
+ fork_type: "checkpoint".to_string(),
+ checkpoint: Some(checkpoint.clone()),
+ timestamp: checkpoint.created_at,
+ },
+ branch_name: req.branch_name,
+ conversation_included: req.include_conversation.unwrap_or(false),
+ message_count: None,
+ }),
+ )
+ .into_response()
+}
+
+/// Create new task starting from specific checkpoint.
+///
+/// POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks/{id}/checkpoints/{cid}/resume",
+ params(
+ ("id" = Uuid, Path, description = "Task ID"),
+ ("cid" = Uuid, Path, description = "Checkpoint ID")
+ ),
+ request_body = crate::db::models::ResumeFromCheckpointRequest,
+ responses(
+ (status = 201, description = "Task created from checkpoint", body = Task),
+ (status = 400, description = "Invalid request", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task or checkpoint not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn resume_from_checkpoint(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path((task_id, checkpoint_id)): Path<(Uuid, Uuid)>,
+ Json(req): Json<crate::db::models::ResumeFromCheckpointRequest>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get source task and verify ownership
+ let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task {}: {}", task_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get checkpoint
+ let checkpoint = match repository::get_task_checkpoint(pool, checkpoint_id).await {
+ Ok(Some(c)) => c,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Checkpoint not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get checkpoint: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Verify checkpoint belongs to the task
+ if checkpoint.task_id != task_id {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "CHECKPOINT_MISMATCH",
+ "Checkpoint does not belong to this task",
+ )),
+ )
+ .into_response();
+ }
+
+ // Create the new task that will start from checkpoint
+ let task_name = req.task_name.unwrap_or_else(|| {
+ format!(
+ "{} (resumed from checkpoint {})",
+ task.name, checkpoint.checkpoint_number
+ )
+ });
+
+ let create_req = CreateTaskRequest {
+ contract_id: task.contract_id.unwrap_or(Uuid::nil()),
+ name: task_name,
+ description: task.description.clone(),
+ plan: req.plan,
+ parent_task_id: None,
+ is_supervisor: false,
+ priority: task.priority,
+ repository_url: task.repository_url.clone(),
+ base_branch: task.base_branch.clone(),
+ target_branch: None, // New branch for resumed work
+ merge_mode: task.merge_mode.clone(),
+ target_repo_path: task.target_repo_path.clone(),
+ completion_action: task.completion_action.clone(),
+ continue_from_task_id: Some(task_id), // Copy worktree from original task
+ copy_files: None,
+ checkpoint_sha: Some(checkpoint.commit_sha.clone()),
+ };
+
+ let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
+ Ok(t) => t,
+ Err(e) => {
+ tracing::error!("Failed to create resumed task: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ tracing::info!(
+ source_task_id = %task_id,
+ new_task_id = %new_task.id,
+ checkpoint_id = %checkpoint_id,
+ checkpoint_number = checkpoint.checkpoint_number,
+ "Task resumed from checkpoint"
+ );
+
+ (StatusCode::CREATED, Json(new_task)).into_response()
+}
+
+/// Request to create branch from checkpoint.
+#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct CreateBranchFromCheckpointRequest {
+ pub branch_name: String,
+ #[serde(default)]
+ pub checkout: bool,
+}
+
+/// Response for branch creation from checkpoint.
+#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct BranchCreatedResponse {
+ pub branch_name: String,
+ pub commit_sha: String,
+ pub task_id: Uuid,
+ pub checkpoint_number: i32,
+}
+
+/// Create git branch from checkpoint without starting task.
+///
+/// POST /api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks/{id}/checkpoints/{cid}/branch",
+ params(
+ ("id" = Uuid, Path, description = "Task ID"),
+ ("cid" = Uuid, Path, description = "Checkpoint ID")
+ ),
+ request_body = CreateBranchFromCheckpointRequest,
+ responses(
+ (status = 201, description = "Branch created", body = BranchCreatedResponse),
+ (status = 400, description = "Invalid request", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task or checkpoint not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn branch_from_checkpoint(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path((task_id, checkpoint_id)): Path<(Uuid, Uuid)>,
+ Json(req): Json<CreateBranchFromCheckpointRequest>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get task and verify ownership
+ let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task {}: {}", task_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get checkpoint
+ let checkpoint = match repository::get_task_checkpoint(pool, checkpoint_id).await {
+ Ok(Some(c)) => c,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Checkpoint not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get checkpoint: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Verify checkpoint belongs to the task
+ if checkpoint.task_id != task_id {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "CHECKPOINT_MISMATCH",
+ "Checkpoint does not belong to this task",
+ )),
+ )
+ .into_response();
+ }
+
+ // Find a daemon to execute the branch creation
+ let target_daemon_id = if let Some(daemon_id) = task.daemon_id {
+ // Check if the original daemon is still connected
+ if state
+ .daemon_connections
+ .iter()
+ .any(|d| d.value().id == daemon_id && d.value().owner_id == auth.owner_id)
+ {
+ daemon_id
+ } else {
+ // Find any connected daemon for this owner
+ match state
+ .daemon_connections
+ .iter()
+ .find(|d| d.value().owner_id == auth.owner_id)
+ {
+ Some(d) => d.value().id,
+ None => {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new(
+ "NO_DAEMON",
+ "No daemon connected to create branch",
+ )),
+ )
+ .into_response();
+ }
+ }
+ }
+ } else {
+ // No daemon assigned - use any available for this owner
+ match state
+ .daemon_connections
+ .iter()
+ .find(|d| d.value().owner_id == auth.owner_id)
+ {
+ Some(d) => d.value().id,
+ None => {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new(
+ "NO_DAEMON",
+ "No daemon connected to create branch",
+ )),
+ )
+ .into_response();
+ }
+ }
+ };
+
+ // Send CreateBranch command to daemon
+ let cmd = DaemonCommand::CreateBranch {
+ task_id,
+ branch_name: req.branch_name.clone(),
+ from_ref: Some(checkpoint.commit_sha.clone()),
+ };
+
+ if let Err(e) = state.send_daemon_command(target_daemon_id, cmd).await {
+ tracing::error!("Failed to send CreateBranch command: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DAEMON_ERROR", e)),
+ )
+ .into_response();
+ }
+
+ tracing::info!(
+ task_id = %task_id,
+ checkpoint_id = %checkpoint_id,
+ branch_name = %req.branch_name,
+ commit_sha = %checkpoint.commit_sha,
+ "Branch creation requested from checkpoint"
+ );
+
+ (
+ StatusCode::CREATED,
+ Json(BranchCreatedResponse {
+ branch_name: req.branch_name,
+ commit_sha: checkpoint.commit_sha,
+ task_id,
+ checkpoint_number: checkpoint.checkpoint_number,
+ }),
+ )
+ .into_response()
+}
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index 278d0f5..b45dda5 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -1504,3 +1504,384 @@ pub async fn answer_question(
Json(AnswerQuestionResponse { success }).into_response()
}
+
+// =============================================================================
+// Supervisor Resume and Conversation Rewind
+// =============================================================================
+
+/// Response for supervisor resume
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ResumeSupervisorResponse {
+ pub supervisor_task_id: Uuid,
+ pub daemon_id: Option<Uuid>,
+ pub resumed_from: ResumedFromInfo,
+ pub status: String,
+}
+
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ResumedFromInfo {
+ pub phase: String,
+ pub last_activity: chrono::DateTime<chrono::Utc>,
+ pub message_count: i32,
+}
+
+/// Resume interrupted supervisor with specified mode.
+///
+/// POST /api/v1/contracts/{id}/supervisor/resume
+#[utoipa::path(
+ post,
+ path = "/api/v1/contracts/{id}/supervisor/resume",
+ params(
+ ("id" = Uuid, Path, description = "Contract ID")
+ ),
+ request_body = crate::db::models::ResumeSupervisorRequest,
+ responses(
+ (status = 200, description = "Supervisor resumed", body = ResumeSupervisorResponse),
+ (status = 400, description = "Invalid request", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Contract or supervisor not found", body = ApiError),
+ (status = 409, description = "Supervisor is already running", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh Supervisor"
+)]
+pub async fn resume_supervisor(
+ State(state): State<SharedState>,
+ Path(contract_id): Path<Uuid>,
+ auth: crate::server::auth::Authenticated,
+ Json(req): Json<crate::db::models::ResumeSupervisorRequest>,
+) -> impl IntoResponse {
+ let crate::server::auth::Authenticated(auth_info) = auth;
+
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get contract and verify ownership
+ let contract = match repository::get_contract_for_owner(pool, contract_id, auth_info.owner_id).await {
+ Ok(Some(c)) => c,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Contract not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get contract {}: {}", contract_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get existing supervisor state
+ let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await {
+ Ok(Some(s)) => s,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new(
+ "NO_SUPERVISOR_STATE",
+ "No supervisor state found - supervisor may not have been started",
+ )),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get supervisor state: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get supervisor task
+ let supervisor_task = match repository::get_task_for_owner(pool, supervisor_state.task_id, auth_info.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Supervisor task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get supervisor task: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Check if already running
+ if supervisor_task.status == "running" {
+ return (
+ StatusCode::CONFLICT,
+ Json(ApiError::new("ALREADY_RUNNING", "Supervisor is already running")),
+ )
+ .into_response();
+ }
+
+ // Calculate message count from conversation history
+ let message_count = supervisor_state
+ .conversation_history
+ .as_array()
+ .map(|arr| arr.len() as i32)
+ .unwrap_or(0);
+
+ // Based on resume mode, handle differently
+ match req.resume_mode.as_str() {
+ "continue" => {
+ // Mark task for reassignment with existing conversation context
+ if let Err(e) = sqlx::query("UPDATE tasks SET status = 'pending' WHERE id = $1")
+ .bind(supervisor_state.task_id)
+ .execute(pool)
+ .await
+ {
+ tracing::error!("Failed to update task status: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ }
+ "restart_phase" => {
+ // Clear conversation but keep phase progress
+ if let Err(e) = repository::update_supervisor_conversation(
+ pool,
+ contract_id,
+ serde_json::json!([]),
+ )
+ .await
+ {
+ tracing::error!("Failed to clear conversation: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+
+ if let Err(e) = sqlx::query("UPDATE tasks SET status = 'pending' WHERE id = $1")
+ .bind(supervisor_state.task_id)
+ .execute(pool)
+ .await
+ {
+ tracing::error!("Failed to update task status: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ }
+ "from_checkpoint" => {
+ // This would require more complex handling with checkpoint system
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "NOT_IMPLEMENTED",
+ "from_checkpoint mode not yet implemented",
+ )),
+ )
+ .into_response();
+ }
+ _ => {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "INVALID_RESUME_MODE",
+ "Invalid resume_mode. Use: continue, restart_phase, or from_checkpoint",
+ )),
+ )
+ .into_response();
+ }
+ }
+
+ tracing::info!(
+ contract_id = %contract_id,
+ supervisor_task_id = %supervisor_state.task_id,
+ resume_mode = %req.resume_mode,
+ message_count = message_count,
+ "Supervisor resume requested"
+ );
+
+ Json(ResumeSupervisorResponse {
+ supervisor_task_id: supervisor_state.task_id,
+ daemon_id: supervisor_task.daemon_id,
+ resumed_from: ResumedFromInfo {
+ phase: contract.phase,
+ last_activity: supervisor_state.last_activity,
+ message_count,
+ },
+ status: "pending".to_string(),
+ })
+ .into_response()
+}
+
+/// Response for conversation rewind
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct RewindConversationResponse {
+ pub contract_id: Uuid,
+ pub messages_removed: i32,
+ pub new_message_count: i32,
+ pub code_rewound: bool,
+}
+
+/// Rewind supervisor conversation to specified point.
+///
+/// POST /api/v1/contracts/{id}/supervisor/conversation/rewind
+#[utoipa::path(
+ post,
+ path = "/api/v1/contracts/{id}/supervisor/conversation/rewind",
+ params(
+ ("id" = Uuid, Path, description = "Contract ID")
+ ),
+ request_body = crate::db::models::RewindConversationRequest,
+ responses(
+ (status = 200, description = "Conversation rewound", body = RewindConversationResponse),
+ (status = 400, description = "Invalid request", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Contract or supervisor not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh Supervisor"
+)]
+pub async fn rewind_conversation(
+ State(state): State<SharedState>,
+ Path(contract_id): Path<Uuid>,
+ auth: crate::server::auth::Authenticated,
+ Json(req): Json<crate::db::models::RewindConversationRequest>,
+) -> impl IntoResponse {
+ let crate::server::auth::Authenticated(auth_info) = auth;
+
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get contract and verify ownership
+ let _contract = match repository::get_contract_for_owner(pool, contract_id, auth_info.owner_id).await {
+ Ok(Some(c)) => c,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Contract not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get contract {}: {}", contract_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get supervisor state
+ let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await {
+ Ok(Some(s)) => s,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Supervisor state not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get supervisor state: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ let conversation = supervisor_state
+ .conversation_history
+ .as_array()
+ .cloned()
+ .unwrap_or_default();
+
+ let original_count = conversation.len() as i32;
+
+ // Determine how many messages to keep
+ let new_count = if let Some(by_count) = req.by_message_count {
+ (original_count - by_count).max(0)
+ } else if let Some(to_index) = req.to_message_index {
+ // Keep messages up to and including the specified index
+ (to_index + 1).min(original_count).max(0)
+ } else {
+ // Default to removing last message
+ (original_count - 1).max(0)
+ };
+
+ // Truncate conversation
+ let new_conversation: Vec<serde_json::Value> = conversation
+ .into_iter()
+ .take(new_count as usize)
+ .collect();
+
+ // Update the conversation
+ if let Err(e) = repository::update_supervisor_conversation(
+ pool,
+ contract_id,
+ serde_json::Value::Array(new_conversation),
+ )
+ .await
+ {
+ tracing::error!("Failed to update conversation: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+
+ tracing::info!(
+ contract_id = %contract_id,
+ original_count = original_count,
+ new_count = new_count,
+ messages_removed = original_count - new_count,
+ "Conversation rewound"
+ );
+
+ Json(RewindConversationResponse {
+ contract_id,
+ messages_removed: original_count - new_count,
+ new_message_count: new_count,
+ code_rewound: req.rewind_code.unwrap_or(false), // TODO: implement code rewind
+ })
+ .into_response()
+}
diff --git a/makima/src/server/handlers/mod.rs b/makima/src/server/handlers/mod.rs
index b5650fd..609b63b 100644
--- a/makima/src/server/handlers/mod.rs
+++ b/makima/src/server/handlers/mod.rs
@@ -7,6 +7,7 @@ pub mod contract_daemon;
pub mod contracts;
pub mod file_ws;
pub mod files;
+pub mod history;
pub mod listen;
pub mod mesh;
pub mod mesh_chat;
diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs
index 0eba009..cf63f71 100644
--- a/makima/src/server/mod.rs
+++ b/makima/src/server/mod.rs
@@ -105,6 +105,11 @@ pub fn make_router(state: SharedState) -> Router {
// Checkpoint endpoints
.route("/mesh/tasks/{id}/checkpoint", post(mesh_supervisor::create_checkpoint))
.route("/mesh/tasks/{id}/checkpoints", get(mesh_supervisor::list_checkpoints))
+ // Resume and rewind endpoints
+ .route("/mesh/tasks/{id}/rewind", post(mesh::rewind_task))
+ .route("/mesh/tasks/{id}/fork", post(mesh::fork_task))
+ .route("/mesh/tasks/{id}/checkpoints/{cid}/resume", post(mesh::resume_from_checkpoint))
+ .route("/mesh/tasks/{id}/checkpoints/{cid}/branch", post(mesh::branch_from_checkpoint))
// Supervisor endpoints (for supervisor.sh)
.route("/mesh/supervisor/contracts/{contract_id}/tasks", get(mesh_supervisor::list_contract_tasks))
.route("/mesh/supervisor/contracts/{contract_id}/tree", get(mesh_supervisor::get_contract_tree))
@@ -156,6 +161,9 @@ pub fn make_router(state: SharedState) -> Router {
"/contracts/{id}/chat/history",
get(contract_chat::get_contract_chat_history).delete(contract_chat::clear_contract_chat_history),
)
+ // Contract supervisor resume endpoints
+ .route("/contracts/{id}/supervisor/resume", post(mesh_supervisor::resume_supervisor))
+ .route("/contracts/{id}/supervisor/conversation/rewind", post(mesh_supervisor::rewind_conversation))
// Contract daemon endpoints (for tasks to interact with contracts)
.route("/contracts/{id}/daemon/status", get(contract_daemon::get_contract_status))
.route("/contracts/{id}/daemon/checklist", get(contract_daemon::get_contract_checklist))