//! HTTP handlers for supervisor-specific mesh operations.
//!
//! These endpoints are used by supervisor tasks (via supervisor.sh) to orchestrate
//! contract work: spawning tasks, waiting for completion, reading worktree files, etc.
use axum::{
extract::{Path, State},
http::{HeaderMap, StatusCode},
response::IntoResponse,
Json,
};
use base64::Engine;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use uuid::Uuid;
use crate::db::models::{CreateOrderRequest, CreateTaskRequest, PendingQuestion, Task, TaskSummary, UpdateTaskRequest};
use crate::db::repository;
use sqlx::PgPool;
use crate::server::auth::Authenticated;
use crate::server::handlers::mesh::{extract_auth, AuthSource};
use crate::server::messages::ApiError;
use crate::server::state::{DaemonCommand, SharedState, TaskOutputNotification, TaskUpdateNotification};
// =============================================================================
// Request/Response Types
// =============================================================================
/// Request to spawn a new task from supervisor.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct SpawnTaskRequest {
pub name: String,
pub plan: String,
pub contract_id: Uuid,
pub parent_task_id: Option<Uuid>,
pub checkpoint_sha: Option<String>,
/// Repository URL for the task (optional - if not provided, will be looked up from contract).
pub repository_url: Option<String>,
}
/// Request to wait for task completion.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct WaitForTaskRequest {
#[serde(default = "default_timeout")]
pub timeout_seconds: i32,
}
fn default_timeout() -> i32 {
300
}
/// Request to read a file from task worktree.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ReadWorktreeFileRequest {
pub file_path: String,
}
/// Request to ask a question and wait for user feedback.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct AskQuestionRequest {
/// The question to ask the user
pub question: String,
/// Optional choices (if empty, free-form text response)
#[serde(default)]
pub choices: Vec<String>,
/// Optional context about what this relates to
pub context: Option<String>,
/// How long to wait for a response (seconds)
#[serde(default = "default_question_timeout")]
pub timeout_seconds: i32,
/// When true, the request will block indefinitely until user responds (no timeout)
#[serde(default)]
pub phaseguard: bool,
/// When true, allow selecting multiple choices (response will be comma-separated)
#[serde(default)]
pub multi_select: bool,
/// When true, return immediately without waiting for response
#[serde(default)]
pub non_blocking: bool,
/// Question type: general, phase_confirmation, or contract_complete
#[serde(default = "default_question_type")]
pub question_type: String,
}
fn default_question_type() -> String {
"general".to_string()
}
fn default_question_timeout() -> i32 {
3600 // 1 hour default
}
/// Response from asking a question.
#[derive(Debug, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct AskQuestionResponse {
/// The question ID for tracking
pub question_id: Uuid,
/// The user's response (None if timed out)
pub response: Option<String>,
/// Whether the question timed out
pub timed_out: bool,
/// Whether the question is still pending (server-side timeout reached but question not removed).
/// The client should poll the poll endpoint to continue waiting.
#[serde(default)]
pub still_pending: bool,
}
/// Request to answer a supervisor question.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct AnswerQuestionRequest {
/// The user's response
pub response: String,
}
/// Response to answering a question.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct AnswerQuestionResponse {
/// Whether the answer was accepted
pub success: bool,
}
/// Pending question summary.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct PendingQuestionSummary {
pub question_id: Uuid,
pub task_id: Uuid,
pub contract_id: Uuid,
/// Directive this question relates to (if from a directive task)
#[serde(skip_serializing_if = "Option::is_none")]
pub directive_id: Option<Uuid>,
pub question: String,
pub choices: Vec<String>,
pub context: Option<String>,
pub created_at: chrono::DateTime<chrono::Utc>,
/// Whether multiple choices can be selected
#[serde(default)]
pub multi_select: bool,
/// Question type: general, phase_confirmation, or contract_complete
#[serde(default)]
pub question_type: String,
}
/// Request to create a checkpoint.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateCheckpointRequest {
pub message: String,
}
/// Response for task tree.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct TaskTreeResponse {
pub tasks: Vec<TaskSummary>,
pub supervisor_task_id: Option<Uuid>,
pub total_count: usize,
}
/// Response for wait operation.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct WaitResponse {
pub task_id: Uuid,
pub status: String,
pub completed: bool,
pub output_summary: Option<String>,
}
/// Response for read file operation.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ReadFileResponse {
pub task_id: Uuid,
pub file_path: String,
pub content: String,
pub exists: bool,
}
/// Response for checkpoint operations.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CheckpointResponse {
pub task_id: Uuid,
pub checkpoint_number: i32,
pub commit_sha: String,
pub message: String,
}
/// Task checkpoint info.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct TaskCheckpoint {
pub id: Uuid,
pub task_id: Uuid,
pub checkpoint_number: i32,
pub commit_sha: String,
pub branch_name: String,
pub message: String,
pub files_changed: Option<serde_json::Value>,
pub lines_added: i32,
pub lines_removed: i32,
pub created_at: chrono::DateTime<chrono::Utc>,
}
/// Response for list checkpoints.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CheckpointListResponse {
pub task_id: Uuid,
pub checkpoints: Vec<TaskCheckpoint>,
}
// =============================================================================
// Helper Functions
// =============================================================================
/// Verify the request comes from a supervisor task and extract ownership info.
async fn verify_supervisor_auth(
state: &SharedState,
headers: &HeaderMap,
contract_id: Option<Uuid>,
) -> Result<(Uuid, Uuid), (StatusCode, Json<ApiError>)> {
let auth = extract_auth(state, headers);
let task_id = match auth {
AuthSource::ToolKey(task_id) => task_id,
_ => {
return Err((
StatusCode::UNAUTHORIZED,
Json(ApiError::new("UNAUTHORIZED", "Supervisor endpoints require tool key auth")),
));
}
};
// Get the task to verify it's a supervisor and get owner_id
let pool = state.db_pool.as_ref().ok_or_else(|| {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
})?;
let task = repository::get_task(pool, task_id)
.await
.map_err(|e| {
tracing::error!(error = %e, "Failed to get supervisor task");
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to verify supervisor")),
)
})?
.ok_or_else(|| {
(
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
})?;
// Verify task is a supervisor or a directive task
if !task.is_supervisor && task.directive_id.is_none() {
return Err((
StatusCode::FORBIDDEN,
Json(ApiError::new("NOT_SUPERVISOR", "Only supervisor or directive tasks can use these endpoints")),
));
}
// If contract_id provided, verify the supervisor belongs to that contract
if let Some(cid) = contract_id {
if task.contract_id != Some(cid) {
return Err((
StatusCode::FORBIDDEN,
Json(ApiError::new("CONTRACT_MISMATCH", "Supervisor does not belong to this contract")),
));
}
}
Ok((task_id, task.owner_id))
}
/// Try to start a pending task on an available daemon.
/// Returns Ok(Some(task)) if a task was started, Ok(None) if no tasks could be started.
/// For retried tasks, excludes daemons that previously failed the task and includes
/// checkpoint patch data for worktree recovery.
pub async fn try_start_pending_task(
state: &SharedState,
contract_id: Uuid,
owner_id: Uuid,
) -> Result<Option<Task>, String> {
let pool = state.db_pool.as_ref().ok_or("Database not configured")?;
// Get pending tasks for this contract (includes interrupted tasks awaiting retry)
let pending_tasks = repository::get_pending_tasks_for_contract(pool, contract_id, owner_id)
.await
.map_err(|e| format!("Failed to get pending tasks: {}", e))?;
if pending_tasks.is_empty() {
return Ok(None);
}
// Get contract to check local_only flag
let contract = repository::get_contract_for_owner(pool, contract_id, owner_id)
.await
.map_err(|e| format!("Failed to get contract: {}", e))?
.ok_or_else(|| "Contract not found".to_string())?;
// Try each pending task until we find one we can start
for task in &pending_tasks {
// Get excluded daemon IDs for this task (daemons that have already failed it)
let exclude_ids: Vec<Uuid> = task.failed_daemon_ids.clone().unwrap_or_default();
// Get available daemons excluding failed ones for this task
let daemons = repository::get_available_daemons_excluding(pool, owner_id, &exclude_ids)
.await
.map_err(|e| format!("Failed to get available daemons: {}", e))?;
// Find a daemon with capacity
let available_daemon = daemons.iter().find(|d| {
d.current_task_count < d.max_concurrent_tasks
&& state.daemon_connections.contains_key(&d.connection_id)
});
let daemon = match available_daemon {
Some(d) => d,
None => continue, // Try next task
};
// Get repo URL from task or contract
let repo_url = if let Some(url) = &task.repository_url {
Some(url.clone())
} else {
match repository::list_contract_repositories(pool, contract_id).await {
Ok(repos) => repos
.iter()
.find(|r| r.is_primary)
.or(repos.first())
.and_then(|r| r.repository_url.clone().or_else(|| r.local_path.clone())),
Err(_) => None,
}
};
// Update task with daemon assignment
let update_req = UpdateTaskRequest {
status: Some("starting".to_string()),
daemon_id: Some(daemon.id),
version: Some(task.version),
..Default::default()
};
let updated_task = match repository::update_task_for_owner(pool, task.id, owner_id, update_req).await {
Ok(Some(t)) => t,
Ok(None) => continue, // Task was modified concurrently, try next
Err(e) => {
tracing::warn!(task_id = %task.id, error = %e, "Failed to update task for daemon assignment");
continue; // Try next task
}
};
// For retried tasks, fetch checkpoint patch for worktree recovery
let (patch_data, patch_base_sha) = if task.retry_count > 0 {
// This is a retry - try to restore from checkpoint
match repository::get_latest_checkpoint_patch(pool, task.id).await {
Ok(Some(patch)) => {
tracing::info!(
task_id = %task.id,
retry_count = task.retry_count,
patch_size = patch.patch_size_bytes,
base_sha = %patch.base_commit_sha,
"Including checkpoint patch for task retry recovery"
);
let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data);
(Some(encoded), Some(patch.base_commit_sha))
}
Ok(None) => {
tracing::debug!(task_id = %task.id, "No checkpoint patch found for retry");
(None, None)
}
Err(e) => {
tracing::warn!(task_id = %task.id, error = %e, "Failed to fetch checkpoint patch for retry");
(None, None)
}
}
} else {
(None, None)
};
// Send spawn command
let cmd = DaemonCommand::SpawnTask {
task_id: updated_task.id,
task_name: updated_task.name.clone(),
plan: updated_task.plan.clone(),
repo_url,
base_branch: updated_task.base_branch.clone(),
target_branch: updated_task.target_branch.clone(),
parent_task_id: updated_task.parent_task_id,
depth: updated_task.depth,
is_orchestrator: false,
target_repo_path: updated_task.target_repo_path.clone(),
completion_action: updated_task.completion_action.clone(),
continue_from_task_id: updated_task.continue_from_task_id,
copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: updated_task.contract_id,
is_supervisor: updated_task.is_supervisor,
autonomous_loop: updated_task.is_supervisor,
resume_session: task.retry_count > 0, // Use --continue for retried tasks
conversation_history: None,
patch_data,
patch_base_sha,
local_only: contract.local_only,
auto_merge_local: contract.auto_merge_local,
// For retried tasks, use their own worktree (they already have state from previous attempt)
supervisor_worktree_task_id: None,
directive_id: updated_task.directive_id,
};
if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
tracing::warn!(error = %e, daemon_id = %daemon.id, task_id = %task.id, "Failed to send spawn command");
// Rollback
let rollback_req = UpdateTaskRequest {
status: Some("pending".to_string()),
clear_daemon_id: true,
..Default::default()
};
let _ = repository::update_task_for_owner(pool, task.id, owner_id, rollback_req).await;
continue; // Try next task
}
tracing::info!(task_id = %task.id, daemon_id = %daemon.id, "Started pending task from wait loop");
return Ok(Some(updated_task));
}
// No tasks could be started
Ok(None)
}
// =============================================================================
// Contract Task Handlers
// =============================================================================
/// List all tasks in a contract's tree.
#[utoipa::path(
get,
path = "/api/v1/mesh/supervisor/contracts/{contract_id}/tasks",
params(
("contract_id" = Uuid, Path, description = "Contract ID")
),
responses(
(status = 200, description = "List of tasks in contract", body = TaskTreeResponse),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden - not a supervisor"),
(status = 500, description = "Internal server error"),
),
tag = "Mesh Supervisor"
)]
pub async fn list_contract_tasks(
State(state): State<SharedState>,
Path(contract_id): Path<Uuid>,
headers: HeaderMap,
) -> impl IntoResponse {
let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, Some(contract_id)).await {
Ok(ids) => ids,
Err(e) => return e.into_response(),
};
let pool = state.db_pool.as_ref().unwrap();
// Get all tasks for this contract
match repository::list_tasks_by_contract(pool, contract_id, owner_id).await {
Ok(tasks) => {
let supervisor_task_id = tasks.iter().find(|t| t.is_supervisor).map(|t| t.id);
let summaries: Vec<TaskSummary> = tasks.into_iter().map(TaskSummary::from).collect();
let total_count = summaries.len();
(
StatusCode::OK,
Json(TaskTreeResponse {
tasks: summaries,
supervisor_task_id,
total_count,
}),
).into_response()
}
Err(e) => {
tracing::error!(error = %e, "Failed to list contract tasks");
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to list tasks")),
).into_response()
}
}
}
/// Get full task tree structure for a contract.
#[utoipa::path(
get,
path = "/api/v1/mesh/supervisor/contracts/{contract_id}/tree",
params(
("contract_id" = Uuid, Path, description = "Contract ID")
),
responses(
(status = 200, description = "Task tree structure", body = TaskTreeResponse),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden - not a supervisor"),
(status = 500, description = "Internal server error"),
),
tag = "Mesh Supervisor"
)]
pub async fn get_contract_tree(
State(state): State<SharedState>,
Path(contract_id): Path<Uuid>,
headers: HeaderMap,
) -> impl IntoResponse {
// Same as list_contract_tasks for now - can add tree structure later
list_contract_tasks(State(state), Path(contract_id), headers).await
}
// =============================================================================
// Task Spawn Handler
// =============================================================================
/// Spawn a new task (supervisor only).
#[utoipa::path(
post,
path = "/api/v1/mesh/supervisor/tasks",
request_body = SpawnTaskRequest,
responses(
(status = 201, description = "Task created", body = Task),
(status = 400, description = "Invalid request"),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden - not a supervisor"),
(status = 500, description = "Internal server error"),
),
tag = "Mesh Supervisor"
)]
pub async fn spawn_task(
State(state): State<SharedState>,
headers: HeaderMap,
Json(request): Json<SpawnTaskRequest>,
) -> impl IntoResponse {
let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, Some(request.contract_id)).await {
Ok(ids) => ids,
Err(e) => return e.into_response(),
};
let pool = state.db_pool.as_ref().unwrap();
// Verify contract exists and get local_only flag
let contract = match repository::get_contract_for_owner(pool, request.contract_id, owner_id).await {
Ok(Some(c)) => c,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Contract not found")),
).into_response();
}
Err(e) => {
tracing::error!(error = %e, "Failed to get contract");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to get contract")),
).into_response();
}
};
// Get repository URL - either from request or from contract's repositories
let repo_url = if let Some(url) = request.repository_url.clone() {
if !url.trim().is_empty() {
Some(url)
} else {
None
}
} else {
None
};
// If no repo URL provided, look it up from the contract
let repo_url = match repo_url {
Some(url) => Some(url),
None => {
match repository::list_contract_repositories(pool, request.contract_id).await {
Ok(repos) => {
// Prefer primary repo, fallback to first repo
let repo = repos.iter()
.find(|r| r.is_primary)
.or(repos.first());
// Use repository_url if set, otherwise use local_path
repo.and_then(|r| {
r.repository_url.clone()
.or_else(|| r.local_path.clone())
})
}
Err(e) => {
tracing::warn!(error = %e, "Failed to get contract repositories");
None
}
}
}
};
// Validate that we have a repo URL
if repo_url.is_none() {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new("MISSING_REPO_URL", "No repository URL found. Either provide one or ensure the contract has repositories configured.")),
).into_response();
}
// Create task request
// All tasks share the supervisor's worktree
let supervisor_worktree_task_id = Some(supervisor_id);
let create_req = CreateTaskRequest {
name: request.name.clone(),
description: None,
plan: request.plan.clone(),
repository_url: repo_url.clone(),
contract_id: Some(request.contract_id),
parent_task_id: request.parent_task_id,
is_supervisor: false,
checkpoint_sha: request.checkpoint_sha.clone(),
merge_mode: Some("manual".to_string()),
priority: 0,
base_branch: None,
target_branch: None,
target_repo_path: None,
completion_action: None,
continue_from_task_id: None,
copy_files: None,
branched_from_task_id: None,
conversation_history: None,
supervisor_worktree_task_id,
directive_id: None,
directive_step_id: None,
};
// Create task in DB
let task = match repository::create_task_for_owner(pool, owner_id, create_req).await {
Ok(t) => t,
Err(e) => {
tracing::error!(error = %e, "Failed to create task");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to create task")),
).into_response();
}
};
tracing::info!(
supervisor_id = %supervisor_id,
task_id = %task.id,
task_name = %task.name,
"Supervisor spawned new task"
);
// Record history event for task spawned by supervisor
let _ = repository::record_history_event(
pool,
owner_id,
task.contract_id,
Some(task.id),
"task",
Some("spawned"),
None,
serde_json::json!({
"name": &task.name,
"spawnedBy": supervisor_id.to_string(),
}),
).await;
// Broadcast task creation notification to WebSocket subscribers
state.broadcast_task_update(TaskUpdateNotification {
task_id: task.id,
owner_id: Some(owner_id),
version: task.version,
status: task.status.clone(),
updated_fields: vec!["created".to_string()],
updated_by: "supervisor".to_string(),
});
// Start task on a daemon
// Find a daemon that belongs to this owner
let mut updated_task = task;
for entry in state.daemon_connections.iter() {
let daemon = entry.value();
if daemon.owner_id == owner_id {
// IMPORTANT: Update database FIRST to assign daemon_id before sending command
// This prevents race conditions where the task starts but daemon_id is not set
let update_req = UpdateTaskRequest {
status: Some("starting".to_string()),
daemon_id: Some(daemon.id),
version: Some(updated_task.version),
..Default::default()
};
match repository::update_task_for_owner(pool, updated_task.id, owner_id, update_req).await {
Ok(Some(t)) => {
updated_task = t;
}
Ok(None) => {
tracing::warn!(task_id = %updated_task.id, "Task not found when updating daemon_id");
break;
}
Err(e) => {
tracing::error!(task_id = %updated_task.id, error = %e, "Failed to update task with daemon_id");
break;
}
}
// Send spawn command to daemon
let cmd = DaemonCommand::SpawnTask {
task_id: updated_task.id,
task_name: updated_task.name.clone(),
plan: updated_task.plan.clone(),
repo_url: repo_url.clone(),
base_branch: updated_task.base_branch.clone(),
target_branch: updated_task.target_branch.clone(),
parent_task_id: updated_task.parent_task_id,
depth: updated_task.depth,
is_orchestrator: false,
target_repo_path: updated_task.target_repo_path.clone(),
completion_action: updated_task.completion_action.clone(),
continue_from_task_id: updated_task.continue_from_task_id,
copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: updated_task.contract_id,
is_supervisor: false,
autonomous_loop: false,
resume_session: false,
conversation_history: None,
patch_data: None,
patch_base_sha: None,
local_only: contract.local_only,
auto_merge_local: contract.auto_merge_local,
// All tasks share the supervisor's worktree
supervisor_worktree_task_id: Some(supervisor_id),
directive_id: updated_task.directive_id,
};
if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
tracing::warn!(error = %e, daemon_id = %daemon.id, "Failed to send spawn command");
// Rollback: clear daemon_id and reset status since command failed
let rollback_req = UpdateTaskRequest {
status: Some("pending".to_string()),
clear_daemon_id: true,
..Default::default()
};
let _ = repository::update_task_for_owner(pool, updated_task.id, owner_id, rollback_req).await;
} else {
tracing::info!(task_id = %updated_task.id, daemon_id = %daemon.id, repo_url = ?repo_url, "Task spawn command sent");
// Save state: task spawn is a key save point (Task 3.3)
save_state_on_task_spawn(pool, request.contract_id, updated_task.id).await;
// Broadcast task status update notification to WebSocket subscribers
state.broadcast_task_update(TaskUpdateNotification {
task_id: updated_task.id,
owner_id: Some(owner_id),
version: updated_task.version,
status: "starting".to_string(),
updated_fields: vec!["status".to_string(), "daemon_id".to_string()],
updated_by: "supervisor".to_string(),
});
}
break;
}
}
(StatusCode::CREATED, Json(updated_task)).into_response()
}
// =============================================================================
// Wait for Task Handler
// =============================================================================
/// Wait for a task to complete.
#[utoipa::path(
post,
path = "/api/v1/mesh/supervisor/tasks/{task_id}/wait",
params(
("task_id" = Uuid, Path, description = "Task ID to wait for")
),
request_body = WaitForTaskRequest,
responses(
(status = 200, description = "Task completed or timed out", body = WaitResponse),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden - not a supervisor"),
(status = 404, description = "Task not found"),
(status = 500, description = "Internal server error"),
),
tag = "Mesh Supervisor"
)]
pub async fn wait_for_task(
State(state): State<SharedState>,
Path(task_id): Path<Uuid>,
headers: HeaderMap,
Json(request): Json<WaitForTaskRequest>,
) -> impl IntoResponse {
let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
Ok(ids) => ids,
Err(e) => return e.into_response(),
};
let pool = state.db_pool.as_ref().unwrap();
// Verify task belongs to same owner
let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
).into_response();
}
Err(e) => {
tracing::error!(error = %e, "Failed to get task");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to get task")),
).into_response();
}
};
// Check if already done
if task.status == "done" || task.status == "failed" || task.status == "merged" {
return (
StatusCode::OK,
Json(WaitResponse {
task_id,
status: task.status,
completed: true,
output_summary: None,
}),
).into_response();
}
// Get contract_id for pending task scheduling
let contract_id = task.contract_id;
// Subscribe to task completions
let mut rx = state.task_completions.subscribe();
let timeout = tokio::time::Duration::from_secs(request.timeout_seconds as u64);
// Wait for completion or timeout, periodically trying to start pending tasks
let result = tokio::time::timeout(timeout, async {
let mut pending_check_interval = tokio::time::interval(tokio::time::Duration::from_secs(5));
pending_check_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
// Check for task completion notifications
recv_result = rx.recv() => {
match recv_result {
Ok(notification) => {
if notification.task_id == task_id {
return Some(notification);
}
}
Err(_) => {
// Channel closed or lagged - check DB directly
if let Ok(Some(t)) = repository::get_task(pool, task_id).await {
if t.status == "done" || t.status == "failed" || t.status == "merged" {
return Some(crate::server::state::TaskCompletionNotification {
task_id: t.id,
owner_id: Some(t.owner_id),
contract_id: t.contract_id,
parent_task_id: t.parent_task_id,
status: t.status,
output_summary: None,
worktree_path: None,
error_message: t.error_message,
});
}
}
}
}
}
// Periodically try to start pending tasks
_ = pending_check_interval.tick() => {
if let Some(cid) = contract_id {
match try_start_pending_task(&state, cid, owner_id).await {
Ok(Some(started_task)) => {
tracing::debug!(
task_id = %started_task.id,
task_name = %started_task.name,
"Started pending task while waiting"
);
}
Ok(None) => {
// No pending tasks or no capacity - that's fine
}
Err(e) => {
tracing::warn!(error = %e, "Error trying to start pending task");
}
}
}
}
}
}
}).await;
match result {
Ok(Some(notification)) => {
(
StatusCode::OK,
Json(WaitResponse {
task_id,
status: notification.status,
completed: true,
output_summary: notification.output_summary,
}),
).into_response()
}
Ok(None) | Err(_) => {
// Timeout - check final status
let final_status = repository::get_task(pool, task_id)
.await
.ok()
.flatten()
.map(|t| t.status)
.unwrap_or_else(|| "unknown".to_string());
(
StatusCode::OK,
Json(WaitResponse {
task_id,
status: final_status.clone(),
completed: final_status == "done" || final_status == "failed" || final_status == "merged",
output_summary: None,
}),
).into_response()
}
}
}
// =============================================================================
// Read Worktree File Handler
// =============================================================================
/// Read a file from a task's worktree.
#[utoipa::path(
post,
path = "/api/v1/mesh/supervisor/tasks/{task_id}/read-file",
params(
("task_id" = Uuid, Path, description = "Task ID")
),
request_body = ReadWorktreeFileRequest,
responses(
(status = 200, description = "File content", body = ReadFileResponse),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden - not a supervisor"),
(status = 404, description = "Task not found"),
(status = 500, description = "Internal server error"),
),
tag = "Mesh Supervisor"
)]
pub async fn read_worktree_file(
State(state): State<SharedState>,
Path(task_id): Path<Uuid>,
headers: HeaderMap,
Json(request): Json<ReadWorktreeFileRequest>,
) -> impl IntoResponse {
let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
Ok(ids) => ids,
Err(e) => return e.into_response(),
};
let pool = state.db_pool.as_ref().unwrap();
// Get task to verify ownership
let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
).into_response();
}
Err(e) => {
tracing::error!(error = %e, "Failed to get task");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to get task")),
).into_response();
}
};
// TODO: Implement file reading via worktree path
// For now, return not implemented - supervisor should use local file access via worktree
let _ = (task, request);
(
StatusCode::NOT_IMPLEMENTED,
Json(ApiError::new(
"NOT_IMPLEMENTED",
"Worktree file reading via API not yet implemented. Use local filesystem access via worktree path.",
)),
).into_response()
}
// =============================================================================
// Checkpoint Handlers
// =============================================================================
/// Create a git checkpoint for a task.
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{task_id}/checkpoint",
params(
("task_id" = Uuid, Path, description = "Task ID")
),
request_body = CreateCheckpointRequest,
responses(
(status = 202, description = "Checkpoint creation accepted", body = CheckpointResponse),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden - can only create checkpoint for own task"),
(status = 404, description = "Task not found"),
(status = 500, description = "Internal server error"),
(status = 503, description = "Task has no assigned daemon"),
),
tag = "Mesh Supervisor"
)]
pub async fn create_checkpoint(
State(state): State<SharedState>,
Path(task_id): Path<Uuid>,
headers: HeaderMap,
Json(request): Json<CreateCheckpointRequest>,
) -> impl IntoResponse {
let auth = extract_auth(&state, &headers);
let task_id_from_auth = match auth {
AuthSource::ToolKey(tid) => tid,
_ => {
return (
StatusCode::UNAUTHORIZED,
Json(ApiError::new("UNAUTHORIZED", "Tool key required")),
).into_response();
}
};
// Can only create checkpoint for own task
if task_id_from_auth != task_id {
return (
StatusCode::FORBIDDEN,
Json(ApiError::new("FORBIDDEN", "Can only create checkpoint for own task")),
).into_response();
}
let pool = state.db_pool.as_ref().unwrap();
// Get task and daemon_id
let task = match repository::get_task(pool, task_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
).into_response();
}
Err(e) => {
tracing::error!(error = %e, "Failed to get task");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to get task")),
).into_response();
}
};
let Some(daemon_id) = task.daemon_id else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")),
).into_response();
};
// Send CreateCheckpoint command to daemon
let cmd = DaemonCommand::CreateCheckpoint {
task_id,
message: request.message.clone(),
};
if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
tracing::error!(error = %e, "Failed to send CreateCheckpoint command");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")),
).into_response();
}
// Return accepted - the checkpoint result will be delivered via WebSocket
// and stored in the database by the daemon message handler
(
StatusCode::ACCEPTED,
Json(CheckpointResponse {
task_id,
checkpoint_number: 0, // Will be assigned by DB on actual creation
commit_sha: "pending".to_string(),
message: request.message,
}),
).into_response()
}
/// List checkpoints for a task.
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks/{task_id}/checkpoints",
params(
("task_id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "List of checkpoints", body = CheckpointListResponse),
(status = 401, description = "Unauthorized"),
(status = 404, description = "Task not found"),
(status = 500, description = "Internal server error"),
),
tag = "Mesh Supervisor"
)]
pub async fn list_checkpoints(
State(state): State<SharedState>,
Path(task_id): Path<Uuid>,
headers: HeaderMap,
) -> impl IntoResponse {
let auth = extract_auth(&state, &headers);
let _task_id_from_auth = match auth {
AuthSource::ToolKey(tid) => tid,
_ => {
return (
StatusCode::UNAUTHORIZED,
Json(ApiError::new("UNAUTHORIZED", "Tool key required")),
).into_response();
}
};
let pool = state.db_pool.as_ref().unwrap();
// Get checkpoints from DB
match repository::list_task_checkpoints(pool, task_id).await {
Ok(checkpoints) => {
let checkpoint_list: Vec<TaskCheckpoint> = checkpoints
.into_iter()
.map(|c| TaskCheckpoint {
id: c.id,
task_id: c.task_id,
checkpoint_number: c.checkpoint_number,
commit_sha: c.commit_sha,
branch_name: c.branch_name,
message: c.message,
files_changed: c.files_changed,
lines_added: c.lines_added.unwrap_or(0),
lines_removed: c.lines_removed.unwrap_or(0),
created_at: c.created_at,
})
.collect();
(
StatusCode::OK,
Json(CheckpointListResponse {
task_id,
checkpoints: checkpoint_list,
}),
).into_response()
}
Err(e) => {
tracing::error!(error = %e, "Failed to list checkpoints");
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to list checkpoints")),
).into_response()
}
}
}
// =============================================================================
// Git Operations - Request/Response Types
// =============================================================================
/// Request to create a new branch.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateBranchRequest {
pub branch_name: String,
pub from_ref: Option<String>,
}
/// Response for branch creation.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateBranchResponse {
pub success: bool,
pub branch_name: String,
pub message: String,
}
/// Request to merge task changes.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct MergeTaskRequest {
pub target_branch: Option<String>,
#[serde(default)]
pub squash: bool,
}
/// Response for merge operation.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct MergeTaskResponse {
pub task_id: Uuid,
pub success: bool,
pub message: String,
pub commit_sha: Option<String>,
pub conflicts: Option<Vec<String>>,
}
/// Request to create a pull request.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreatePRRequest {
pub branch: String,
pub title: String,
pub body: Option<String>,
}
/// Response for PR creation.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreatePRResponse {
pub task_id: Uuid,
pub success: bool,
pub message: String,
pub pr_url: Option<String>,
pub pr_number: Option<i32>,
}
/// Response for task diff.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct TaskDiffResponse {
pub task_id: Uuid,
pub success: bool,
pub diff: Option<String>,
pub error: Option<String>,
}
// =============================================================================
// Git Operations - Handlers
// =============================================================================
/// Create a new branch from supervisor's worktree.
#[utoipa::path(
post,
path = "/api/v1/mesh/supervisor/branches",
request_body = CreateBranchRequest,
responses(
(status = 201, description = "Branch created", body = CreateBranchResponse),
(status = 400, description = "Invalid request"),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden - not a supervisor"),
(status = 500, description = "Internal server error"),
),
tag = "Mesh Supervisor"
)]
pub async fn create_branch(
State(state): State<SharedState>,
headers: HeaderMap,
Json(request): Json<CreateBranchRequest>,
) -> impl IntoResponse {
let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
Ok(ids) => ids,
Err(e) => return e.into_response(),
};
// Find daemon running supervisor
let daemon_id = {
let pool = state.db_pool.as_ref().unwrap();
match repository::get_task(pool, supervisor_id).await {
Ok(Some(task)) => task.daemon_id,
_ => None,
}
};
let Some(daemon_id) = daemon_id else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("NO_DAEMON", "Supervisor has no assigned daemon")),
).into_response();
};
// Send CreateBranch command to daemon
let cmd = DaemonCommand::CreateBranch {
task_id: supervisor_id,
branch_name: request.branch_name.clone(),
from_ref: request.from_ref,
};
if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
tracing::error!(error = %e, "Failed to send CreateBranch command");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")),
).into_response();
}
// Note: Real implementation would wait for daemon response
// For now, return success immediately - daemon will send response via WebSocket
(
StatusCode::CREATED,
Json(CreateBranchResponse {
success: true,
branch_name: request.branch_name,
message: "Branch creation command sent".to_string(),
}),
).into_response()
}
/// Merge a task's changes to a target branch.
#[utoipa::path(
post,
path = "/api/v1/mesh/supervisor/tasks/{task_id}/merge",
params(
("task_id" = Uuid, Path, description = "Task ID to merge")
),
request_body = MergeTaskRequest,
responses(
(status = 200, description = "Merge initiated", body = MergeTaskResponse),
(status = 400, description = "Invalid request"),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden - not a supervisor"),
(status = 404, description = "Task not found"),
(status = 500, description = "Internal server error"),
),
tag = "Mesh Supervisor"
)]
pub async fn merge_task(
State(state): State<SharedState>,
Path(task_id): Path<Uuid>,
headers: HeaderMap,
Json(request): Json<MergeTaskRequest>,
) -> impl IntoResponse {
let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
Ok(ids) => ids,
Err(e) => return e.into_response(),
};
let pool = state.db_pool.as_ref().unwrap();
// Get the target task
let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
).into_response();
}
Err(e) => {
tracing::error!(error = %e, "Failed to get task");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to get task")),
).into_response();
}
};
// Get daemon running the task
let Some(daemon_id) = task.daemon_id else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")),
).into_response();
};
// Subscribe to merge results BEFORE sending the command
let mut rx = state.merge_results.subscribe();
// Send MergeTaskToTarget command to daemon
let cmd = DaemonCommand::MergeTaskToTarget {
task_id,
target_branch: request.target_branch,
squash: request.squash,
};
if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
tracing::error!(error = %e, "Failed to send MergeTaskToTarget command");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")),
).into_response();
}
// Wait for the merge result with a timeout (60 seconds should be plenty for a merge)
let timeout = tokio::time::Duration::from_secs(60);
let result = tokio::time::timeout(timeout, async {
loop {
match rx.recv().await {
Ok(notification) => {
if notification.task_id == task_id {
return Some(notification);
}
// Not our task, keep waiting
}
Err(_) => {
// Channel closed or lagged
return None;
}
}
}
}).await;
match result {
Ok(Some(notification)) => {
(
StatusCode::OK,
Json(MergeTaskResponse {
task_id,
success: notification.success,
message: notification.message,
commit_sha: notification.commit_sha,
conflicts: notification.conflicts,
}),
).into_response()
}
Ok(None) | Err(_) => {
// Timeout or channel error - return error status
(
StatusCode::GATEWAY_TIMEOUT,
Json(MergeTaskResponse {
task_id,
success: false,
message: "Merge operation timed out waiting for daemon response".to_string(),
commit_sha: None,
conflicts: None,
}),
).into_response()
}
}
}
/// Create a pull request for a task's changes.
#[utoipa::path(
post,
path = "/api/v1/mesh/supervisor/pr",
request_body = CreatePRRequest,
responses(
(status = 201, description = "PR created", body = CreatePRResponse),
(status = 400, description = "Invalid request"),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden - not a supervisor"),
(status = 404, description = "Task not found"),
(status = 500, description = "Internal server error"),
),
tag = "Mesh Supervisor"
)]
pub async fn create_pr(
State(state): State<SharedState>,
headers: HeaderMap,
Json(request): Json<CreatePRRequest>,
) -> impl IntoResponse {
let (supervisor_id, _owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
Ok(ids) => ids,
Err(e) => return e.into_response(),
};
let pool = state.db_pool.as_ref().unwrap();
// Get the supervisor's own task to find daemon and base_branch
let task = match repository::get_task(pool, supervisor_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Supervisor task not found")),
).into_response();
}
Err(e) => {
tracing::error!(error = %e, "Failed to get supervisor task");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to get supervisor task")),
).into_response();
}
};
// Get daemon running the supervisor
let Some(daemon_id) = task.daemon_id else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("NO_DAEMON", "Supervisor has no assigned daemon")),
).into_response();
};
// Subscribe to PR results BEFORE sending the command
let mut rx = state.pr_results.subscribe();
// Send CreatePR command to daemon using the supervisor's task ID
// (the branch is in the supervisor's worktree)
// Pass base_branch from task if available, otherwise daemon will auto-detect
let cmd = DaemonCommand::CreatePR {
task_id: supervisor_id,
title: request.title.clone(),
body: request.body.clone(),
base_branch: task.base_branch.clone(),
branch: request.branch.clone(),
};
if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
tracing::error!(error = %e, "Failed to send CreatePR command");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")),
).into_response();
}
// Wait for the PR result with a timeout (60 seconds should be plenty for PR creation)
let timeout = tokio::time::Duration::from_secs(60);
let result = tokio::time::timeout(timeout, async {
loop {
match rx.recv().await {
Ok(notification) => {
if notification.task_id == supervisor_id {
return Some(notification);
}
// Not our task, keep waiting
}
Err(_) => {
// Channel closed or lagged
return None;
}
}
}
}).await;
match result {
Ok(Some(notification)) => {
let status = if notification.success {
StatusCode::CREATED
} else {
StatusCode::INTERNAL_SERVER_ERROR
};
(
status,
Json(CreatePRResponse {
task_id: supervisor_id,
success: notification.success,
message: notification.message,
pr_url: notification.pr_url,
pr_number: notification.pr_number,
}),
).into_response()
}
Ok(None) | Err(_) => {
// Timeout or channel error - return error status
(
StatusCode::GATEWAY_TIMEOUT,
Json(CreatePRResponse {
task_id: supervisor_id,
success: false,
message: "PR creation timed out waiting for daemon response".to_string(),
pr_url: None,
pr_number: None,
}),
).into_response()
}
}
}
/// Get the diff for a task's changes.
#[utoipa::path(
get,
path = "/api/v1/mesh/supervisor/tasks/{task_id}/diff",
params(
("task_id" = Uuid, Path, description = "Task ID")
),
responses(
(status = 200, description = "Task diff", body = TaskDiffResponse),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden - not a supervisor"),
(status = 404, description = "Task not found"),
(status = 500, description = "Internal server error"),
),
tag = "Mesh Supervisor"
)]
pub async fn get_task_diff(
State(state): State<SharedState>,
Path(task_id): Path<Uuid>,
headers: HeaderMap,
) -> impl IntoResponse {
let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
Ok(ids) => ids,
Err(e) => return e.into_response(),
};
let pool = state.db_pool.as_ref().unwrap();
// Get the target task
let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
).into_response();
}
Err(e) => {
tracing::error!(error = %e, "Failed to get task");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to get task")),
).into_response();
}
};
// Get daemon running the task
let Some(daemon_id) = task.daemon_id else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")),
).into_response();
};
// Send GetTaskDiff command to daemon
let cmd = DaemonCommand::GetTaskDiff { task_id };
if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
tracing::error!(error = %e, "Failed to send GetTaskDiff command");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")),
).into_response();
}
(
StatusCode::OK,
Json(TaskDiffResponse {
task_id,
success: true,
diff: None,
error: Some("Diff command sent - response will be streamed".to_string()),
}),
).into_response()
}
// =============================================================================
// Supervisor Question Handlers
// =============================================================================
/// Ask a question and wait for user feedback.
///
/// The supervisor calls this to ask a question. The endpoint will poll until
/// either the user responds or the timeout is reached.
#[utoipa::path(
post,
path = "/api/v1/mesh/supervisor/questions",
request_body = AskQuestionRequest,
responses(
(status = 200, description = "Question answered", body = AskQuestionResponse),
(status = 408, description = "Question timed out", body = AskQuestionResponse),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden - not a supervisor"),
(status = 500, description = "Internal server error"),
),
security(
("tool_key" = [])
),
tag = "Mesh Supervisor"
)]
pub async fn ask_question(
State(state): State<SharedState>,
headers: HeaderMap,
Json(request): Json<AskQuestionRequest>,
) -> impl IntoResponse {
let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
Ok(ids) => ids,
Err(e) => return e.into_response(),
};
let pool = state.db_pool.as_ref().unwrap();
// Get the supervisor task to find its contract
let supervisor = match repository::get_task_for_owner(pool, supervisor_id, owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Supervisor task not found")),
).into_response();
}
Err(e) => {
tracing::error!(error = %e, "Failed to get supervisor task");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to get supervisor task")),
).into_response();
}
};
// Determine context: contract or directive
let contract_id = supervisor.contract_id;
let directive_id = supervisor.directive_id;
if contract_id.is_none() && directive_id.is_none() {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new("NO_CONTEXT", "Supervisor has no associated contract or directive")),
).into_response();
}
let is_directive_context = directive_id.is_some() && contract_id.is_none();
// For directive context, check reconcile_mode to determine behavior
let directive_reconcile_mode: String = if let Some(did) = directive_id {
if is_directive_context {
match repository::get_directive_for_owner(pool, owner_id, did).await {
Ok(Some(d)) => d.reconcile_mode.clone(),
Ok(None) => "auto".to_string(),
Err(e) => {
tracing::warn!(error = %e, "Failed to get directive for reconcile_mode check");
"auto".to_string()
}
}
} else {
"auto".to_string()
}
} else {
"auto".to_string()
};
// Add the question (use Uuid::nil() for contract_id in directive-only context)
let effective_contract_id = contract_id.unwrap_or(Uuid::nil());
let question_id = state.add_supervisor_question_with_directive(
supervisor_id,
effective_contract_id,
directive_id,
owner_id,
request.question.clone(),
request.choices.clone(),
request.context.clone(),
request.multi_select,
request.question_type.clone(),
);
// Save state: question asked is a key save point (Task 3.3)
// Only for contract context — directive tasks don't use supervisor_states table
if let Some(cid) = contract_id {
let pending_question = PendingQuestion {
id: question_id,
question: request.question.clone(),
choices: request.choices.clone(),
context: request.context.clone(),
question_type: request.question_type.clone(),
asked_at: chrono::Utc::now(),
};
save_state_on_question_asked(pool, cid, pending_question).await;
}
// Broadcast question as task output entry for the task's chat
let question_data = serde_json::json!({
"question_id": question_id.to_string(),
"choices": request.choices,
"context": request.context,
"multi_select": request.multi_select,
"question_type": request.question_type,
});
state.broadcast_task_output(TaskOutputNotification {
task_id: supervisor_id,
owner_id: Some(owner_id),
message_type: "supervisor_question".to_string(),
content: request.question.clone(),
tool_name: None,
tool_input: Some(question_data.clone()),
is_error: None,
cost_usd: None,
duration_ms: None,
is_partial: false,
});
// Persist to database so it appears when reloading the page
// Use event_type "output" with messageType "supervisor_question" to match TaskOutputEntry format
if let Some(pool) = state.db_pool.as_ref() {
let event_data = serde_json::json!({
"messageType": "supervisor_question",
"content": request.question,
"toolInput": question_data,
});
let _ = repository::create_task_event(
pool,
supervisor_id,
"output",
None,
None,
Some(event_data),
).await;
}
// If non_blocking mode, return immediately
if request.non_blocking {
return (
StatusCode::OK,
Json(AskQuestionResponse {
question_id,
response: None,
timed_out: false,
still_pending: false,
}),
).into_response();
}
// Determine if we should block indefinitely (phaseguard or directive reconcile mode)
let use_phaseguard = request.phaseguard || (is_directive_context && (directive_reconcile_mode == "semi-auto" || directive_reconcile_mode == "manual"));
// Poll for response with timeout
// - Phaseguard: block indefinitely until user responds
// - Directive tasks without reconcile mode: 30s default timeout
// - Contract tasks: use requested timeout_seconds
let timeout_secs = if use_phaseguard {
// Cap at 5 minutes per HTTP request (well under Claude Code's 10-min limit).
// The CLI will automatically reconnect via the poll endpoint.
300
} else if is_directive_context && directive_reconcile_mode == "auto" {
30
} else {
request.timeout_seconds.max(1) as u64
};
let timeout_duration = std::time::Duration::from_secs(timeout_secs);
let start = std::time::Instant::now();
let poll_interval = std::time::Duration::from_millis(500);
loop {
// Check if response has been submitted
if let Some(response) = state.get_question_response(question_id) {
// Clean up the response
state.cleanup_question_response(question_id);
// Clear pending question from supervisor state (Task 3.3)
// Skip for directive context — no supervisor_states for directives
if let Some(cid) = contract_id {
clear_pending_question(pool, cid, question_id).await;
}
return (
StatusCode::OK,
Json(AskQuestionResponse {
question_id,
response: Some(response.response),
timed_out: false,
still_pending: false,
}),
).into_response();
}
// Check timeout
if start.elapsed() >= timeout_duration {
if use_phaseguard {
// Phaseguard/reconcile: DON'T remove the pending question.
// Return still_pending so the CLI can reconnect via the poll endpoint.
return (
StatusCode::OK,
Json(AskQuestionResponse {
question_id,
response: None,
timed_out: false,
still_pending: true,
}),
).into_response();
}
// Non-phaseguard: remove the pending question on timeout
state.remove_pending_question(question_id);
// Clear pending question from supervisor state on timeout (Task 3.3)
// Skip for directive context — no supervisor_states for directives
if let Some(cid) = contract_id {
clear_pending_question(pool, cid, question_id).await;
}
return (
StatusCode::REQUEST_TIMEOUT,
Json(AskQuestionResponse {
question_id,
response: None,
timed_out: true,
still_pending: false,
}),
).into_response();
}
// Wait before polling again
tokio::time::sleep(poll_interval).await;
}
}
/// Poll for a question response by question_id.
///
/// Used by the CLI to reconnect after a still_pending response from ask_question.
/// Blocks for up to 5 minutes polling every 500ms. Returns still_pending if timeout
/// is reached without a response, allowing the CLI to reconnect again.
#[utoipa::path(
get,
path = "/api/v1/mesh/supervisor/questions/{question_id}/poll",
params(
("question_id" = Uuid, Path, description = "The question ID to poll for"),
),
responses(
(status = 200, description = "Question answered or still pending", body = AskQuestionResponse),
(status = 404, description = "Question not found"),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden - not a supervisor"),
),
security(
("tool_key" = [])
),
tag = "Mesh Supervisor"
)]
pub async fn poll_question(
State(state): State<SharedState>,
headers: HeaderMap,
Path(question_id): Path<Uuid>,
) -> impl IntoResponse {
let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
Ok(ids) => ids,
Err(e) => return e.into_response(),
};
// Check if a response is already available
if let Some(response) = state.get_question_response(question_id) {
state.cleanup_question_response(question_id);
// Clear pending question from supervisor state for contract context
let pool = state.db_pool.as_ref().unwrap();
if let Ok(Some(task)) = repository::get_task_for_owner(pool, supervisor_id, owner_id).await {
if let Some(cid) = task.contract_id {
clear_pending_question(pool, cid, question_id).await;
}
}
return (
StatusCode::OK,
Json(AskQuestionResponse {
question_id,
response: Some(response.response),
timed_out: false,
still_pending: false,
}),
).into_response();
}
// Check if the question exists at all (pending or response)
if !state.has_pending_question(question_id) {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Question not found or already answered")),
).into_response();
}
// Block for up to 5 minutes polling every 500ms
let timeout_duration = std::time::Duration::from_secs(300);
let start = std::time::Instant::now();
let poll_interval = std::time::Duration::from_millis(500);
loop {
// Check if response has been submitted
if let Some(response) = state.get_question_response(question_id) {
state.cleanup_question_response(question_id);
// Clear pending question from supervisor state for contract context
let pool = state.db_pool.as_ref().unwrap();
if let Ok(Some(task)) = repository::get_task_for_owner(pool, supervisor_id, owner_id).await {
if let Some(cid) = task.contract_id {
clear_pending_question(pool, cid, question_id).await;
}
}
return (
StatusCode::OK,
Json(AskQuestionResponse {
question_id,
response: Some(response.response),
timed_out: false,
still_pending: false,
}),
).into_response();
}
// Check if the question was removed (e.g., task deleted)
if !state.has_pending_question(question_id) {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Question no longer exists")),
).into_response();
}
// Check timeout
if start.elapsed() >= timeout_duration {
return (
StatusCode::OK,
Json(AskQuestionResponse {
question_id,
response: None,
timed_out: false,
still_pending: true,
}),
).into_response();
}
// Wait before polling again
tokio::time::sleep(poll_interval).await;
}
}
/// Get all pending questions for the current user.
#[utoipa::path(
get,
path = "/api/v1/mesh/questions",
responses(
(status = 200, description = "List of pending questions", body = Vec<PendingQuestionSummary>),
(status = 401, description = "Unauthorized"),
(status = 500, description = "Internal server error"),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn list_pending_questions(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
) -> impl IntoResponse {
let questions: Vec<PendingQuestionSummary> = state
.get_pending_questions_for_owner(auth.owner_id)
.into_iter()
.map(|q| PendingQuestionSummary {
question_id: q.question_id,
task_id: q.task_id,
contract_id: q.contract_id,
directive_id: q.directive_id,
question: q.question,
choices: q.choices,
context: q.context,
created_at: q.created_at,
multi_select: q.multi_select,
question_type: q.question_type,
})
.collect();
Json(questions).into_response()
}
/// Answer a pending supervisor question.
#[utoipa::path(
post,
path = "/api/v1/mesh/questions/{question_id}/answer",
params(
("question_id" = Uuid, Path, description = "Question ID")
),
request_body = AnswerQuestionRequest,
responses(
(status = 200, description = "Question answered", body = AnswerQuestionResponse),
(status = 401, description = "Unauthorized"),
(status = 404, description = "Question not found"),
(status = 500, description = "Internal server error"),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn answer_question(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(question_id): Path<Uuid>,
Json(request): Json<AnswerQuestionRequest>,
) -> impl IntoResponse {
// Verify the question exists and belongs to this owner
let question = match state.get_pending_question(question_id) {
Some(q) if q.owner_id == auth.owner_id => q,
Some(_) => {
return (
StatusCode::FORBIDDEN,
Json(ApiError::new("FORBIDDEN", "Question belongs to another user")),
).into_response();
}
None => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Question not found or already answered")),
).into_response();
}
};
// Submit the response
let success = state.submit_question_response(question_id, request.response.clone());
if success {
tracing::info!(
question_id = %question_id,
task_id = %question.task_id,
"User answered supervisor question"
);
// Send the response to the task as a message
// This will auto-resume the task if it was paused (phaseguard)
let pool = state.db_pool.as_ref().unwrap();
if let Ok(Some(task)) = repository::get_task_for_owner(pool, question.task_id, auth.owner_id).await {
if let Some(daemon_id) = task.daemon_id {
// Format the response message
let response_msg = format!(
"\n[User Response to Question]\nQuestion: {}\nAnswer: {}\n",
question.question,
request.response
);
let cmd = DaemonCommand::SendMessage {
task_id: question.task_id,
message: response_msg,
};
if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
tracing::warn!(
task_id = %question.task_id,
error = %e,
"Failed to send response message to task"
);
} else {
tracing::info!(
task_id = %question.task_id,
"Sent response message to task (will auto-resume if paused)"
);
}
}
}
}
Json(AnswerQuestionResponse { success }).into_response()
}
// =============================================================================
// Supervisor Resume and Conversation Rewind
// =============================================================================
/// Response for supervisor resume
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ResumeSupervisorResponse {
pub supervisor_task_id: Uuid,
pub daemon_id: Option<Uuid>,
pub resumed_from: ResumedFromInfo,
pub status: String,
/// Restoration context (Task 3.4)
pub restoration: Option<RestorationInfo>,
}
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ResumedFromInfo {
pub phase: String,
pub last_activity: chrono::DateTime<chrono::Utc>,
pub message_count: i32,
}
/// Information about supervisor restoration (Task 3.4)
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct RestorationInfo {
/// Previous state before restoration
pub previous_state: String,
/// How many times this supervisor has been restored
pub restoration_count: i32,
/// Number of pending questions to re-deliver
pub pending_questions_count: usize,
/// Number of tasks being waited on
pub waiting_tasks_count: usize,
/// Number of tasks spawned before crash
pub spawned_tasks_count: usize,
/// Any warnings from state validation
pub warnings: Vec<String>,
}
/// Resume interrupted supervisor with specified mode.
///
/// POST /api/v1/contracts/{id}/supervisor/resume
#[utoipa::path(
post,
path = "/api/v1/contracts/{id}/supervisor/resume",
params(
("id" = Uuid, Path, description = "Contract ID")
),
request_body = crate::db::models::ResumeSupervisorRequest,
responses(
(status = 200, description = "Supervisor resumed", body = ResumeSupervisorResponse),
(status = 400, description = "Invalid request", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Contract or supervisor not found", body = ApiError),
(status = 409, description = "Supervisor is already running", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh Supervisor"
)]
pub async fn resume_supervisor(
State(state): State<SharedState>,
Path(contract_id): Path<Uuid>,
auth: crate::server::auth::Authenticated,
Json(req): Json<crate::db::models::ResumeSupervisorRequest>,
) -> impl IntoResponse {
let crate::server::auth::Authenticated(auth_info) = auth;
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get contract and verify ownership
let contract = match repository::get_contract_for_owner(pool, contract_id, auth_info.owner_id).await {
Ok(Some(c)) => c,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Contract not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get contract {}: {}", contract_id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Get existing supervisor state
let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await {
Ok(Some(s)) => s,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new(
"NO_SUPERVISOR_STATE",
"No supervisor state found - supervisor may not have been started",
)),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get supervisor state: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Get supervisor task
let supervisor_task = match repository::get_task_for_owner(pool, supervisor_state.task_id, auth_info.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Supervisor task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get supervisor task: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Check if already running - but only if daemon is actually connected
// (daemon disconnect handler may not have updated status yet)
if supervisor_task.status == "running" {
let daemon_connected = supervisor_task
.daemon_id
.map(|d| state.is_daemon_connected(d))
.unwrap_or(false);
if daemon_connected {
return (
StatusCode::CONFLICT,
Json(ApiError::new("ALREADY_RUNNING", "Supervisor is already running")),
)
.into_response();
}
// Daemon not connected - allow resume (treat as interrupted)
tracing::info!(
supervisor_task_id = %supervisor_task.id,
daemon_id = ?supervisor_task.daemon_id,
"Supervisor status is 'running' but daemon is not connected, allowing resume"
);
}
// Calculate message count from conversation history
let message_count = supervisor_state
.conversation_history
.as_array()
.map(|arr| arr.len() as i32)
.unwrap_or(0);
// Find a connected daemon for this owner
let target_daemon_id = match state.find_alternative_daemon(auth_info.owner_id, &[]) {
Some(id) => id,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new(
"NO_DAEMON",
"No daemons connected for your account. Cannot resume supervisor.",
)),
)
.into_response();
}
};
// Track response values (may be updated by resume modes)
let mut response_daemon_id = supervisor_task.daemon_id;
let mut response_status = "pending".to_string();
// Based on resume mode, handle differently
match req.resume_mode.as_str() {
"continue" => {
// Update task status to starting and assign daemon
if let Err(e) = sqlx::query("UPDATE tasks SET status = 'starting', daemon_id = $1 WHERE id = $2")
.bind(target_daemon_id)
.bind(supervisor_state.task_id)
.execute(pool)
.await
{
tracing::error!("Failed to update task for resume: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
// Fetch latest checkpoint patch for worktree recovery
let (patch_data, patch_base_sha) = match repository::get_latest_checkpoint_patch(pool, supervisor_state.task_id).await {
Ok(Some(patch)) => {
tracing::info!(
task_id = %supervisor_state.task_id,
patch_size = patch.patch_size_bytes,
base_sha = %patch.base_commit_sha,
"Including checkpoint patch for worktree recovery"
);
// Encode patch as base64 for JSON transport
let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data);
(Some(encoded), Some(patch.base_commit_sha))
}
Ok(None) => {
tracing::debug!(task_id = %supervisor_state.task_id, "No checkpoint patch found");
(None, None)
}
Err(e) => {
tracing::warn!(task_id = %supervisor_state.task_id, error = %e, "Failed to fetch checkpoint patch");
(None, None)
}
};
// Send SpawnTask with resume_session=true to use Claude's --continue
// Include conversation_history as fallback if worktree doesn't exist on target daemon
let command = DaemonCommand::SpawnTask {
task_id: supervisor_state.task_id,
task_name: supervisor_task.name.clone(),
plan: supervisor_task.plan.clone(),
repo_url: supervisor_task.repository_url.clone(),
base_branch: supervisor_task.base_branch.clone(),
target_branch: supervisor_task.target_branch.clone(),
parent_task_id: supervisor_task.parent_task_id,
depth: supervisor_task.depth,
is_orchestrator: false,
target_repo_path: supervisor_task.target_repo_path.clone(),
completion_action: supervisor_task.completion_action.clone(),
continue_from_task_id: supervisor_task.continue_from_task_id,
copy_files: supervisor_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: supervisor_task.contract_id,
is_supervisor: true,
autonomous_loop: false,
resume_session: true, // Use --continue to preserve conversation
conversation_history: Some(supervisor_state.conversation_history.clone()), // Fallback if worktree missing
patch_data,
patch_base_sha,
local_only: contract.local_only,
auto_merge_local: contract.auto_merge_local,
supervisor_worktree_task_id: None, // Supervisor uses its own worktree
directive_id: supervisor_task.directive_id,
};
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
// Rollback status on failure
let _ = sqlx::query("UPDATE tasks SET status = 'interrupted', daemon_id = NULL WHERE id = $1")
.bind(supervisor_state.task_id)
.execute(pool)
.await;
tracing::error!("Failed to send SpawnTask to daemon: {}", e);
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DAEMON_ERROR", format!("Failed to send to daemon: {}", e))),
)
.into_response();
}
tracing::info!(
contract_id = %contract_id,
supervisor_task_id = %supervisor_state.task_id,
daemon_id = %target_daemon_id,
message_count = message_count,
"Supervisor resumed with --continue (resume_session=true)"
);
// Update response values for successful resume
response_daemon_id = Some(target_daemon_id);
response_status = "starting".to_string();
}
"restart_phase" => {
// Clear conversation but keep phase progress
if let Err(e) = repository::update_supervisor_conversation(
pool,
contract_id,
serde_json::json!([]),
)
.await
{
tracing::error!("Failed to clear conversation: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
if let Err(e) = sqlx::query("UPDATE tasks SET status = 'pending' WHERE id = $1")
.bind(supervisor_state.task_id)
.execute(pool)
.await
{
tracing::error!("Failed to update task status: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
}
"from_checkpoint" => {
// This would require more complex handling with checkpoint system
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"NOT_IMPLEMENTED",
"from_checkpoint mode not yet implemented",
)),
)
.into_response();
}
_ => {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"INVALID_RESUME_MODE",
"Invalid resume_mode. Use: continue, restart_phase, or from_checkpoint",
)),
)
.into_response();
}
}
tracing::info!(
contract_id = %contract_id,
supervisor_task_id = %supervisor_state.task_id,
resume_mode = %req.resume_mode,
message_count = message_count,
"Supervisor resume requested"
);
// Build restoration info (Task 3.4)
let pending_questions: Vec<PendingQuestion> = serde_json::from_value(
supervisor_state.pending_questions.clone()
).unwrap_or_default();
let restoration_info = RestorationInfo {
previous_state: supervisor_state.state.clone(),
restoration_count: supervisor_state.restoration_count,
pending_questions_count: pending_questions.len(),
waiting_tasks_count: supervisor_state.pending_task_ids.len(),
spawned_tasks_count: supervisor_state.spawned_task_ids.len(),
warnings: vec![], // Could add validation warnings here
};
// Re-deliver pending questions if any (Task 3.4)
if !pending_questions.is_empty() {
redeliver_pending_questions(
&state,
supervisor_state.task_id,
contract_id,
auth_info.owner_id,
&pending_questions,
).await;
}
Json(ResumeSupervisorResponse {
supervisor_task_id: supervisor_state.task_id,
daemon_id: response_daemon_id,
resumed_from: ResumedFromInfo {
phase: contract.phase,
last_activity: supervisor_state.last_activity,
message_count,
},
status: response_status,
restoration: Some(restoration_info),
})
.into_response()
}
/// Response for conversation rewind
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct RewindConversationResponse {
pub contract_id: Uuid,
pub messages_removed: i32,
pub new_message_count: i32,
pub code_rewound: bool,
}
/// Rewind supervisor conversation to specified point.
///
/// POST /api/v1/contracts/{id}/supervisor/conversation/rewind
#[utoipa::path(
post,
path = "/api/v1/contracts/{id}/supervisor/conversation/rewind",
params(
("id" = Uuid, Path, description = "Contract ID")
),
request_body = crate::db::models::RewindConversationRequest,
responses(
(status = 200, description = "Conversation rewound", body = RewindConversationResponse),
(status = 400, description = "Invalid request", body = ApiError),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 404, description = "Contract or supervisor not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh Supervisor"
)]
pub async fn rewind_conversation(
State(state): State<SharedState>,
Path(contract_id): Path<Uuid>,
auth: crate::server::auth::Authenticated,
Json(req): Json<crate::db::models::RewindConversationRequest>,
) -> impl IntoResponse {
let crate::server::auth::Authenticated(auth_info) = auth;
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get contract and verify ownership
let _contract = match repository::get_contract_for_owner(pool, contract_id, auth_info.owner_id).await {
Ok(Some(c)) => c,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Contract not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get contract {}: {}", contract_id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Get supervisor state
let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await {
Ok(Some(s)) => s,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Supervisor state not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get supervisor state: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
let conversation = supervisor_state
.conversation_history
.as_array()
.cloned()
.unwrap_or_default();
let original_count = conversation.len() as i32;
// Determine how many messages to keep
let new_count = if let Some(by_count) = req.by_message_count {
(original_count - by_count).max(0)
} else if let Some(ref to_id) = req.to_message_id {
// Find message by ID and keep up to and including it
let index = conversation
.iter()
.position(|msg| msg.get("id").and_then(|v| v.as_str()) == Some(to_id.as_str()))
.map(|i| i as i32)
.unwrap_or(original_count - 1);
(index + 1).min(original_count).max(0)
} else {
// Default to removing last message
(original_count - 1).max(0)
};
// Truncate conversation
let new_conversation: Vec<serde_json::Value> = conversation
.into_iter()
.take(new_count as usize)
.collect();
// Update the conversation
if let Err(e) = repository::update_supervisor_conversation(
pool,
contract_id,
serde_json::Value::Array(new_conversation),
)
.await
{
tracing::error!("Failed to update conversation: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
tracing::info!(
contract_id = %contract_id,
original_count = original_count,
new_count = new_count,
messages_removed = original_count - new_count,
"Conversation rewound"
);
Json(RewindConversationResponse {
contract_id,
messages_removed: original_count - new_count,
new_message_count: new_count,
code_rewound: req.rewind_code.unwrap_or(false), // TODO: implement code rewind
})
.into_response()
}
// =============================================================================
// Supervisor State Persistence Helpers (Task 3.3)
// =============================================================================
use crate::db::models::{
SupervisorRestorationContext, SupervisorStateEnum,
StateValidationResult, StateRecoveryAction,
};
/// Save supervisor state on task spawn.
/// This is called when a supervisor spawns a new task.
pub async fn save_state_on_task_spawn(
pool: &PgPool,
contract_id: Uuid,
spawned_task_id: Uuid,
) {
if let Err(e) = repository::add_supervisor_spawned_task(pool, contract_id, spawned_task_id).await {
tracing::warn!(
contract_id = %contract_id,
spawned_task_id = %spawned_task_id,
error = %e,
"Failed to save spawned task to supervisor state"
);
} else {
tracing::debug!(
contract_id = %contract_id,
spawned_task_id = %spawned_task_id,
"Saved spawned task to supervisor state"
);
}
// Also update state to working
if let Err(e) = repository::update_supervisor_detailed_state(
pool,
contract_id,
"working",
Some(&format!("Spawned task {}", spawned_task_id)),
0, // Progress resets when spawning new work
None,
).await {
tracing::warn!(contract_id = %contract_id, error = %e, "Failed to update supervisor state on task spawn");
}
}
/// Save supervisor state on question asked.
/// This is called when a supervisor asks a question and is waiting for user input.
pub async fn save_state_on_question_asked(
pool: &PgPool,
contract_id: Uuid,
question: PendingQuestion,
) {
let question_json = match serde_json::to_value(&[&question]) {
Ok(v) => v,
Err(e) => {
tracing::warn!(contract_id = %contract_id, error = %e, "Failed to serialize pending question");
return;
}
};
if let Err(e) = repository::add_supervisor_pending_question(pool, contract_id, question_json).await {
tracing::warn!(
contract_id = %contract_id,
question_id = %question.id,
error = %e,
"Failed to save pending question to supervisor state"
);
} else {
tracing::debug!(
contract_id = %contract_id,
question_id = %question.id,
"Saved pending question to supervisor state"
);
}
}
/// Clear pending question after it's answered.
pub async fn clear_pending_question(
pool: &PgPool,
contract_id: Uuid,
question_id: Uuid,
) {
if let Err(e) = repository::remove_supervisor_pending_question(pool, contract_id, question_id).await {
tracing::warn!(
contract_id = %contract_id,
question_id = %question_id,
error = %e,
"Failed to remove pending question from supervisor state"
);
}
// Update state back to working (if no more pending questions)
match repository::get_supervisor_state(pool, contract_id).await {
Ok(Some(state)) => {
let questions: Vec<PendingQuestion> = serde_json::from_value(state.pending_questions.clone())
.unwrap_or_default();
if questions.is_empty() {
let _ = repository::update_supervisor_detailed_state(
pool,
contract_id,
"working",
Some("Resumed after user response"),
state.progress,
None,
).await;
}
}
Ok(None) => {}
Err(e) => {
tracing::warn!(contract_id = %contract_id, error = %e, "Failed to check supervisor state after clearing question");
}
}
}
/// Save supervisor state on phase change.
pub async fn save_state_on_phase_change(
pool: &PgPool,
contract_id: Uuid,
new_phase: &str,
) {
if let Err(e) = repository::update_supervisor_phase(pool, contract_id, new_phase).await {
tracing::warn!(
contract_id = %contract_id,
new_phase = %new_phase,
error = %e,
"Failed to update supervisor state on phase change"
);
} else {
tracing::info!(
contract_id = %contract_id,
new_phase = %new_phase,
"Updated supervisor state on phase change"
);
}
}
// =============================================================================
// Supervisor Restoration Protocol (Task 3.4)
// =============================================================================
/// Validate supervisor state consistency before restoration.
/// Checks that spawned tasks and pending questions are in expected states.
pub async fn validate_supervisor_state(
pool: &PgPool,
state: &crate::db::models::SupervisorState,
) -> StateValidationResult {
let mut issues = Vec::new();
// Validate spawned tasks
if !state.spawned_task_ids.is_empty() {
match repository::validate_spawned_tasks(pool, &state.spawned_task_ids).await {
Ok(task_statuses) => {
for task_id in &state.spawned_task_ids {
if !task_statuses.contains_key(task_id) {
issues.push(format!("Spawned task {} not found in database", task_id));
}
}
}
Err(e) => {
issues.push(format!("Failed to validate spawned tasks: {}", e));
}
}
}
// Validate pending questions
let pending_questions: Vec<PendingQuestion> = serde_json::from_value(state.pending_questions.clone())
.unwrap_or_default();
// Check if questions are not too old (e.g., more than 24 hours)
for question in &pending_questions {
let age = chrono::Utc::now() - question.asked_at;
if age.num_hours() > 24 {
issues.push(format!(
"Pending question {} is {} hours old, may be stale",
question.id, age.num_hours()
));
}
}
// Validate conversation history
if let Some(history) = state.conversation_history.as_array() {
if history.is_empty() && state.restoration_count > 0 {
issues.push("Conversation history is empty after previous restoration".to_string());
}
}
// Determine recovery action
let recovery_action = if issues.is_empty() {
StateRecoveryAction::Proceed
} else if issues.iter().any(|i| i.contains("not found")) {
// Missing tasks suggest corruption - use checkpoint
StateRecoveryAction::UseCheckpoint
} else if issues.len() > 3 {
// Many issues suggest manual intervention needed
StateRecoveryAction::ManualIntervention
} else {
// Minor issues - proceed with warnings
StateRecoveryAction::Proceed
};
StateValidationResult {
is_valid: issues.is_empty(),
issues,
recovery_action,
}
}
/// Restore supervisor from saved state after daemon crash or task reassignment.
/// Returns restoration context to send to the supervisor.
pub async fn restore_supervisor(
pool: &PgPool,
contract_id: Uuid,
restoration_source: &str,
) -> Result<SupervisorRestorationContext, String> {
// Step 1: Load supervisor state
let state = match repository::get_supervisor_state_for_restoration(pool, contract_id).await {
Ok(Some(s)) => s,
Ok(None) => {
tracing::warn!(
contract_id = %contract_id,
"No supervisor state found for restoration - starting fresh"
);
return Ok(SupervisorRestorationContext {
success: true,
previous_state: SupervisorStateEnum::Initializing,
conversation_history: serde_json::json!([]),
pending_questions: vec![],
waiting_task_ids: vec![],
spawned_task_ids: vec![],
restoration_count: 0,
restoration_context_message: "No previous state found. Starting fresh.".to_string(),
warnings: vec!["No previous supervisor state found".to_string()],
});
}
Err(e) => {
return Err(format!("Failed to load supervisor state: {}", e));
}
};
// Step 2: Parse previous state
let previous_state: SupervisorStateEnum = state.state.parse().unwrap_or(SupervisorStateEnum::Interrupted);
// Step 3: Validate state consistency
let validation = validate_supervisor_state(pool, &state).await;
let mut warnings = validation.issues.clone();
// Step 4: Handle based on validation result
let (conversation_history, pending_questions, restoration_message) = match validation.recovery_action {
StateRecoveryAction::Proceed => {
// State is valid, use it
let questions: Vec<PendingQuestion> = serde_json::from_value(state.pending_questions.clone())
.unwrap_or_default();
let message = format!(
"Restored from {} state. {} pending questions, {} spawned tasks, {} waiting tasks.",
state.state,
questions.len(),
state.spawned_task_ids.len(),
state.pending_task_ids.len()
);
(state.conversation_history.clone(), questions, message)
}
StateRecoveryAction::UseCheckpoint => {
// State is corrupted, try to use checkpoint
warnings.push("State validation failed, attempting checkpoint recovery".to_string());
// TODO: Implement checkpoint-based recovery
// For now, start with empty questions but preserve conversation
let message = "Restored from last checkpoint due to state inconsistency.".to_string();
(state.conversation_history.clone(), vec![], message)
}
StateRecoveryAction::StartFresh => {
warnings.push("Starting fresh due to unrecoverable state".to_string());
let message = "Starting fresh due to unrecoverable state corruption.".to_string();
(serde_json::json!([]), vec![], message)
}
StateRecoveryAction::ManualIntervention => {
warnings.push("Manual intervention may be required".to_string());
// Still try to restore but with warning
let questions: Vec<PendingQuestion> = serde_json::from_value(state.pending_questions.clone())
.unwrap_or_default();
let message = "Restored with warnings - manual intervention may be required.".to_string();
(state.conversation_history.clone(), questions, message)
}
};
// Step 5: Mark supervisor as restored
let new_state = match repository::mark_supervisor_restored(pool, contract_id, restoration_source).await {
Ok(s) => s,
Err(e) => {
return Err(format!("Failed to mark supervisor as restored: {}", e));
}
};
// Step 6: Build restoration context
let context = SupervisorRestorationContext {
success: true,
previous_state,
conversation_history,
pending_questions,
waiting_task_ids: state.pending_task_ids.clone(),
spawned_task_ids: state.spawned_task_ids.clone(),
restoration_count: new_state.restoration_count,
restoration_context_message: restoration_message,
warnings,
};
tracing::info!(
contract_id = %contract_id,
restoration_source = %restoration_source,
restoration_count = new_state.restoration_count,
pending_questions_count = context.pending_questions.len(),
waiting_tasks_count = context.waiting_task_ids.len(),
spawned_tasks_count = context.spawned_task_ids.len(),
"Supervisor restoration completed"
);
Ok(context)
}
/// Re-deliver pending questions to the user after restoration.
/// This ensures questions asked before crash are shown to the user again.
pub async fn redeliver_pending_questions(
state: &SharedState,
supervisor_id: Uuid,
contract_id: Uuid,
owner_id: Uuid,
questions: &[PendingQuestion],
) {
for question in questions {
// Add to in-memory question state
state.add_supervisor_question(
supervisor_id,
contract_id,
owner_id,
question.question.clone(),
question.choices.clone(),
question.context.clone(),
false, // Assume single select for restored questions
question.question_type.clone(),
);
// Broadcast to WebSocket clients
let question_data = serde_json::json!({
"question_id": question.id.to_string(),
"choices": question.choices,
"context": question.context,
"question_type": question.question_type,
"is_restored": true,
"originally_asked_at": question.asked_at.to_rfc3339(),
});
state.broadcast_task_output(TaskOutputNotification {
task_id: supervisor_id,
owner_id: Some(owner_id),
message_type: "supervisor_question".to_string(),
content: question.question.clone(),
tool_name: None,
tool_input: Some(question_data),
is_error: None,
cost_usd: None,
duration_ms: None,
is_partial: false,
});
tracing::info!(
supervisor_id = %supervisor_id,
question_id = %question.id,
"Re-delivered pending question after restoration"
);
}
}
/// Generate restoration context message for Claude.
/// This message is injected into the conversation to inform Claude about the restoration.
pub fn generate_restoration_context_message(context: &SupervisorRestorationContext) -> String {
let mut message = String::new();
message.push_str("=== SUPERVISOR RESTORATION NOTICE ===\n\n");
message.push_str(&format!("This supervisor has been restored after interruption. {}\n\n", context.restoration_context_message));
message.push_str(&format!("Restoration count: {}\n", context.restoration_count));
if !context.pending_questions.is_empty() {
message.push_str(&format!("\nPending questions ({}): These have been re-delivered to the user.\n", context.pending_questions.len()));
for q in &context.pending_questions {
message.push_str(&format!(" - {}: {}\n", q.id, q.question));
}
}
if !context.waiting_task_ids.is_empty() {
message.push_str(&format!("\nWaiting on {} task(s) to complete. Check their status before continuing.\n", context.waiting_task_ids.len()));
}
if !context.spawned_task_ids.is_empty() {
message.push_str(&format!("\n{} task(s) were spawned before interruption. Their status may need verification.\n", context.spawned_task_ids.len()));
}
if !context.warnings.is_empty() {
message.push_str("\nWarnings:\n");
for warning in &context.warnings {
message.push_str(&format!(" - {}\n", warning));
}
}
message.push_str("\n=== END RESTORATION NOTICE ===\n");
message
}
// =============================================================================
// Order Creation from Directive Tasks
// =============================================================================
/// Request to create an order from a directive task.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateOrderForTaskRequest {
pub title: String,
#[serde(default)]
pub description: Option<String>,
#[serde(default = "default_order_priority")]
pub priority: String,
#[serde(default = "default_order_type")]
pub order_type: String,
#[serde(default = "default_order_labels")]
pub labels: serde_json::Value,
#[serde(default)]
pub repository_url: Option<String>,
}
fn default_order_priority() -> String {
"medium".to_string()
}
fn default_order_type() -> String {
"spike".to_string()
}
fn default_order_labels() -> serde_json::Value {
serde_json::json!([])
}
/// Create an order for future work from a directive task.
///
/// Only spike and chore order types are allowed. The order is automatically
/// linked to the directive associated with the calling task.
#[utoipa::path(
post,
path = "/api/v1/mesh/supervisor/orders",
request_body = CreateOrderForTaskRequest,
responses(
(status = 201, description = "Order created"),
(status = 400, description = "Invalid order type"),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden - not a supervisor/directive task"),
(status = 500, description = "Internal server error"),
),
tag = "Mesh Supervisor"
)]
pub async fn create_order_for_task(
State(state): State<SharedState>,
headers: HeaderMap,
Json(request): Json<CreateOrderForTaskRequest>,
) -> impl IntoResponse {
let (task_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
Ok(ids) => ids,
Err(e) => return e.into_response(),
};
let pool = state.db_pool.as_ref().unwrap();
// Validate order_type is spike or chore
if request.order_type != "spike" && request.order_type != "chore" {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"INVALID_ORDER_TYPE",
"Only spike and chore order types are allowed from directive tasks",
)),
)
.into_response();
}
// Get the task to find its directive_id
let task = match repository::get_task(pool, task_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!(error = %e, "Failed to get task");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to get task")),
)
.into_response();
}
};
let directive_id = match task.directive_id {
Some(id) => id,
None => {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
"NO_DIRECTIVE",
"Task is not associated with a directive",
)),
)
.into_response();
}
};
// Determine repository_url: use request value, or fall back to directive's repository_url
let repository_url = if request.repository_url.is_some() {
request.repository_url
} else {
// Look up directive for its repository_url
match repository::get_directive_for_owner(pool, owner_id, directive_id).await {
Ok(Some(d)) => d.repository_url,
_ => None,
}
};
// Create the order
let order_req = CreateOrderRequest {
title: request.title,
description: request.description,
priority: Some(request.priority),
status: Some("open".to_string()),
order_type: Some(request.order_type),
labels: request.labels,
directive_id,
repository_url,
dog_id: None,
};
match repository::create_order(pool, owner_id, order_req).await {
Ok(order) => (
StatusCode::CREATED,
Json(serde_json::json!({
"id": order.id,
"title": order.title,
"description": order.description,
"priority": order.priority,
"status": order.status,
"orderType": order.order_type,
"directiveId": order.directive_id,
"labels": order.labels,
"repositoryUrl": order.repository_url,
"createdAt": order.created_at,
})),
)
.into_response(),
Err(e) => {
tracing::error!(error = %e, "Failed to create order");
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", "Failed to create order")),
)
.into_response()
}
}
}