summaryrefslogtreecommitdiff
path: root/makima/src/server/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/server/handlers')
-rw-r--r--makima/src/server/handlers/directives.rs12
-rw-r--r--makima/src/server/handlers/files.rs207
-rw-r--r--makima/src/server/handlers/history.rs268
-rw-r--r--makima/src/server/handlers/listen.rs783
-rw-r--r--makima/src/server/handlers/mesh.rs247
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs419
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs2980
-rw-r--r--makima/src/server/handlers/mod.rs1
8 files changed, 214 insertions, 4703 deletions
diff --git a/makima/src/server/handlers/directives.rs b/makima/src/server/handlers/directives.rs
index 35a46a0..410b2b3 100644
--- a/makima/src/server/handlers/directives.rs
+++ b/makima/src/server/handlers/directives.rs
@@ -1019,12 +1019,10 @@ pub async fn cleanup_directive(
// Create the cleanup task (following pick_up_orders pattern)
let req = CreateTaskRequest {
- contract_id: None,
name: format!("Cleanup: {}", directive.title),
description: Some("Directive cleanup — verify merged branches and remove merged steps".to_string()),
plan: prompt,
parent_task_id: None,
- is_supervisor: false,
priority: 0,
repository_url: directive.repository_url.clone(),
base_branch: directive.base_branch.clone(),
@@ -1037,7 +1035,6 @@ pub async fn cleanup_directive(
checkpoint_sha: None,
branched_from_task_id: None,
conversation_history: None,
- supervisor_worktree_task_id: None,
directive_id: Some(directive.id),
directive_step_id: None,
};
@@ -1330,12 +1327,10 @@ pub async fn pick_up_orders(
// Create the planning task
let req = CreateTaskRequest {
- contract_id: None,
name: format!("Pick up orders: {}", directive.title),
description: Some("Directive order pickup planning task".to_string()),
plan,
parent_task_id: None,
- is_supervisor: false,
priority: 0,
repository_url: directive.repository_url.clone(),
base_branch: directive.base_branch.clone(),
@@ -1348,7 +1343,6 @@ pub async fn pick_up_orders(
checkpoint_sha: None,
branched_from_task_id: None,
conversation_history: None,
- supervisor_worktree_task_id: None,
directive_id: Some(directive.id),
directive_step_id: None,
};
@@ -1907,12 +1901,10 @@ pub async fn pick_up_dog_orders(
// Create the planning task
let req = CreateTaskRequest {
- contract_id: None,
name: format!("Pick up DOG orders: {}", directive.title),
description: Some("Directive order group pickup planning task".to_string()),
plan,
parent_task_id: None,
- is_supervisor: false,
priority: 0,
repository_url: directive.repository_url.clone(),
base_branch: directive.base_branch.clone(),
@@ -1925,7 +1917,6 @@ pub async fn pick_up_dog_orders(
checkpoint_sha: None,
branched_from_task_id: None,
conversation_history: None,
- supervisor_worktree_task_id: None,
directive_id: Some(directive.id),
directive_step_id: None,
};
@@ -2209,12 +2200,10 @@ pub async fn create_directive_task(
let base_branch = req.base_branch.or_else(|| directive.base_branch.clone());
let create_req = CreateTaskRequest {
- contract_id: None,
name: req.name,
description: None,
plan: req.plan,
parent_task_id: None,
- is_supervisor: false,
priority: 0,
repository_url: repo_url,
base_branch,
@@ -2227,7 +2216,6 @@ pub async fn create_directive_task(
checkpoint_sha: None,
branched_from_task_id: None,
conversation_history: None,
- supervisor_worktree_task_id: None,
directive_id: Some(directive.id),
// No directive_step_id — this is what makes the task "ephemeral":
// it lives under the directive folder but isn't part of the DAG.
diff --git a/makima/src/server/handlers/files.rs b/makima/src/server/handlers/files.rs
index 711be41..023b9ff 100644
--- a/makima/src/server/handlers/files.rs
+++ b/makima/src/server/handlers/files.rs
@@ -145,26 +145,7 @@ pub async fn create_file(
.into_response();
};
- // Verify the contract exists and belongs to the owner
- match repository::get_contract_for_owner(pool, req.contract_id, auth.owner_id).await {
- Ok(None) => {
- return (
- StatusCode::NOT_FOUND,
- Json(ApiError::new("CONTRACT_NOT_FOUND", "Contract not found")),
- )
- .into_response();
- }
- Err(e) => {
- tracing::error!("Failed to verify contract: {}", e);
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", e.to_string())),
- )
- .into_response();
- }
- Ok(Some(_)) => {} // Contract exists, proceed
- }
-
+ // Legacy contract scope removed; files are owner-scoped only now.
match repository::create_file_for_owner(pool, auth.owner_id, req).await {
Ok(file) => (StatusCode::CREATED, Json(file)).into_response(),
Err(e) => {
@@ -336,189 +317,3 @@ pub async fn delete_file(
}
}
-/// Sync a file from its linked repository file.
-///
-/// This endpoint triggers an async sync operation. The file must have a
-/// repo_file_path set, and its contract must have a linked repository.
-/// A connected daemon will read the file and update the file content.
-#[utoipa::path(
- post,
- path = "/api/v1/files/{id}/sync-from-repo",
- params(
- ("id" = Uuid, Path, description = "File ID")
- ),
- responses(
- (status = 202, description = "Sync operation started"),
- (status = 400, description = "File not linked to repository", body = ApiError),
- (status = 401, description = "Unauthorized", body = ApiError),
- (status = 404, description = "File not found", body = ApiError),
- (status = 503, description = "No daemon available", body = ApiError),
- (status = 500, description = "Internal server error", body = ApiError),
- ),
- security(
- ("bearer_auth" = []),
- ("api_key" = [])
- ),
- tag = "Files"
-)]
-pub async fn sync_file_from_repo(
- State(state): State<SharedState>,
- Authenticated(auth): Authenticated,
- Path(id): Path<Uuid>,
-) -> impl IntoResponse {
- let Some(ref pool) = state.db_pool else {
- return (
- StatusCode::SERVICE_UNAVAILABLE,
- Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
- )
- .into_response();
- };
-
- // Get the file and verify it has a repo_file_path
- let file = match repository::get_file_for_owner(pool, id, auth.owner_id).await {
- Ok(Some(f)) => f,
- Ok(None) => {
- return (
- StatusCode::NOT_FOUND,
- Json(ApiError::new("NOT_FOUND", "File not found")),
- )
- .into_response();
- }
- Err(e) => {
- tracing::error!("Failed to get file {}: {}", id, e);
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", e.to_string())),
- )
- .into_response();
- }
- };
-
- // Check if file has a repo path and contract_id
- let contract_id = match file.contract_id {
- Some(id) => id,
- None => {
- return (
- StatusCode::BAD_REQUEST,
- Json(ApiError::new(
- "NO_CONTRACT",
- "File is not associated with a contract",
- )),
- )
- .into_response();
- }
- };
-
- let repo_file_path = match file.repo_file_path {
- Some(ref path) if !path.is_empty() => path.clone(),
- _ => {
- return (
- StatusCode::BAD_REQUEST,
- Json(ApiError::new(
- "NOT_LINKED",
- "File is not linked to a repository file",
- )),
- )
- .into_response();
- }
- };
-
- // Get contract repositories
- let repositories = match repository::list_contract_repositories(pool, contract_id).await {
- Ok(repos) => repos,
- Err(e) => {
- tracing::error!("Failed to get contract repositories: {}", e);
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", e.to_string())),
- )
- .into_response();
- }
- };
-
- // Check if contract has repositories
- if repositories.is_empty() {
- return (
- StatusCode::BAD_REQUEST,
- Json(ApiError::new(
- "NO_REPOSITORY",
- "Contract has no linked repositories",
- )),
- )
- .into_response();
- }
-
- // Use the first repository's local path
- let repo = &repositories[0];
- let repo_local_path = match &repo.local_path {
- Some(path) if !path.is_empty() => path.clone(),
- _ => {
- return (
- StatusCode::BAD_REQUEST,
- Json(ApiError::new(
- "NO_LOCAL_PATH",
- "Repository has no local path configured",
- )),
- )
- .into_response();
- }
- };
-
- // Find a connected daemon for this owner
- let daemon_id = state
- .daemon_connections
- .iter()
- .find(|entry| entry.value().owner_id == auth.owner_id)
- .map(|entry| entry.value().id);
-
- let daemon_id = match daemon_id {
- Some(id) => id,
- None => {
- return (
- StatusCode::SERVICE_UNAVAILABLE,
- Json(ApiError::new(
- "NO_DAEMON",
- "No daemon connected. Start a daemon to sync files from repository.",
- )),
- )
- .into_response();
- }
- };
-
- // Send ReadRepoFile command to daemon
- // Use the file ID as the request_id so we can match the response
- let command = DaemonCommand::ReadRepoFile {
- request_id: id,
- contract_id,
- file_path: repo_file_path,
- repo_path: repo_local_path,
- };
-
- if let Err(e) = state.send_daemon_command(daemon_id, command).await {
- tracing::error!("Failed to send ReadRepoFile command: {}", e);
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DAEMON_ERROR", e)),
- )
- .into_response();
- }
-
- // Update status to indicate sync in progress
- if let Err(e) = sqlx::query("UPDATE files SET repo_sync_status = 'syncing' WHERE id = $1")
- .bind(id)
- .execute(pool)
- .await
- {
- tracing::warn!("Failed to update repo_sync_status: {}", e);
- }
-
- // Return 202 Accepted - the sync happens asynchronously
- (
- StatusCode::ACCEPTED,
- Json(serde_json::json!({
- "message": "Sync operation started",
- "fileId": id,
- })),
- )
- .into_response()
-}
diff --git a/makima/src/server/handlers/history.rs b/makima/src/server/handlers/history.rs
index bee6b02..46be7ac 100644
--- a/makima/src/server/handlers/history.rs
+++ b/makima/src/server/handlers/history.rs
@@ -10,10 +10,7 @@ use uuid::Uuid;
use crate::{
db::{
- models::{
- flexible_datetime, ContractHistoryResponse, ConversationMessage, HistoryQueryFilters,
- SupervisorConversationResponse, TaskConversationResponse, TaskReference,
- },
+ models::{flexible_datetime, ConversationMessage, HistoryQueryFilters, TaskConversationResponse},
repository,
},
server::{auth::Authenticated, messages::ApiError, state::SharedState},
@@ -32,7 +29,6 @@ pub struct TaskConversationParams {
#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TimelineQueryFilters {
- pub contract_id: Option<Uuid>,
pub task_id: Option<Uuid>,
pub include_subtasks: Option<bool>,
#[serde(default, deserialize_with = "flexible_datetime::deserialize")]
@@ -42,231 +38,6 @@ pub struct TimelineQueryFilters {
pub limit: Option<i32>,
}
-/// GET /api/v1/contracts/{id}/history
-/// Returns contract history timeline with filtering and pagination
-#[utoipa::path(
- get,
- path = "/api/v1/contracts/{id}/history",
- params(
- ("id" = Uuid, Path, description = "Contract ID"),
- ("phase" = Option<String>, Query, description = "Filter by phase"),
- ("event_types" = Option<String>, Query, description = "Filter by event types (comma-separated)"),
- ("from" = Option<String>, Query, description = "Start date filter"),
- ("to" = Option<String>, Query, description = "End date filter"),
- ("limit" = Option<i32>, Query, description = "Limit results"),
- ),
- responses(
- (status = 200, description = "Contract history", body = ContractHistoryResponse),
- (status = 401, description = "Unauthorized", body = ApiError),
- (status = 403, description = "Forbidden", body = ApiError),
- (status = 404, description = "Contract not found", body = ApiError),
- (status = 503, description = "Database not configured", body = ApiError),
- (status = 500, description = "Internal server error", body = ApiError),
- ),
- security(
- ("bearer_auth" = []),
- ("api_key" = [])
- ),
- tag = "History"
-)]
-pub async fn get_contract_history(
- State(state): State<SharedState>,
- Path(contract_id): Path<Uuid>,
- Query(filters): Query<HistoryQueryFilters>,
- Authenticated(auth): Authenticated,
-) -> impl IntoResponse {
- let Some(ref pool) = state.db_pool else {
- return (
- StatusCode::SERVICE_UNAVAILABLE,
- Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
- )
- .into_response();
- };
-
- // Verify contract exists and user has access
- let contract = match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
- Ok(Some(c)) => c,
- Ok(None) => {
- return (
- StatusCode::NOT_FOUND,
- Json(ApiError::new("NOT_FOUND", "Contract not found")),
- )
- .into_response();
- }
- Err(e) => {
- tracing::error!("Failed to get contract {}: {}", contract_id, e);
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", e.to_string())),
- )
- .into_response();
- }
- };
-
- // Get history events
- match repository::get_contract_history(pool, contract.id, auth.owner_id, &filters).await {
- Ok((events, total_count)) => {
- Json(ContractHistoryResponse {
- contract_id,
- entries: events,
- total_count,
- cursor: None, // TODO: implement cursor pagination
- })
- .into_response()
- }
- Err(e) => {
- tracing::error!("Failed to get contract history: {}", e);
- (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", e.to_string())),
- )
- .into_response()
- }
- }
-}
-
-/// GET /api/v1/contracts/{id}/supervisor/conversation
-/// Returns full supervisor conversation with spawned task references
-#[utoipa::path(
- get,
- path = "/api/v1/contracts/{id}/supervisor/conversation",
- params(
- ("id" = Uuid, Path, description = "Contract ID")
- ),
- responses(
- (status = 200, description = "Supervisor conversation", body = SupervisorConversationResponse),
- (status = 401, description = "Unauthorized", body = ApiError),
- (status = 403, description = "Forbidden", body = ApiError),
- (status = 404, description = "Supervisor not found", body = ApiError),
- (status = 503, description = "Database not configured", body = ApiError),
- (status = 500, description = "Internal server error", body = ApiError),
- ),
- security(
- ("bearer_auth" = []),
- ("api_key" = [])
- ),
- tag = "History"
-)]
-pub async fn get_supervisor_conversation(
- State(state): State<SharedState>,
- Path(contract_id): Path<Uuid>,
- Authenticated(auth): Authenticated,
-) -> impl IntoResponse {
- let Some(ref pool) = state.db_pool else {
- return (
- StatusCode::SERVICE_UNAVAILABLE,
- Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
- )
- .into_response();
- };
-
- // Get contract for phase info and ownership check
- let contract = match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
- Ok(Some(c)) => c,
- Ok(None) => {
- return (
- StatusCode::NOT_FOUND,
- Json(ApiError::new("NOT_FOUND", "Contract not found")),
- )
- .into_response();
- }
- Err(e) => {
- tracing::error!("Failed to get contract {}: {}", contract_id, e);
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", e.to_string())),
- )
- .into_response();
- }
- };
-
- // Get the supervisor state
- let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await {
- Ok(Some(s)) => s,
- Ok(None) => {
- return (
- StatusCode::NOT_FOUND,
- Json(ApiError::new("NOT_FOUND", "Supervisor not found")),
- )
- .into_response();
- }
- Err(e) => {
- tracing::error!("Failed to get supervisor state for {}: {}", contract_id, e);
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", e.to_string())),
- )
- .into_response();
- }
- };
-
- // Parse conversation history from JSONB
- let messages: Vec<ConversationMessage> = supervisor_state
- .conversation_history
- .as_array()
- .map(|arr| {
- arr.iter()
- .enumerate()
- .map(|(i, v)| ConversationMessage {
- id: i.to_string(),
- role: v
- .get("role")
- .and_then(|r| r.as_str())
- .unwrap_or("user")
- .to_string(),
- content: v
- .get("content")
- .and_then(|c| c.as_str())
- .unwrap_or("")
- .to_string(),
- timestamp: supervisor_state.last_activity,
- tool_calls: None,
- tool_name: None,
- tool_input: None,
- tool_result: None,
- is_error: None,
- token_count: None,
- cost_usd: None,
- })
- .collect()
- })
- .unwrap_or_default();
-
- // Get spawned tasks
- let tasks = match repository::list_tasks_by_contract(pool, contract_id, auth.owner_id).await {
- Ok(t) => t,
- Err(e) => {
- tracing::warn!("Failed to get tasks for contract {}: {}", contract_id, e);
- Vec::new()
- }
- };
-
- let spawned_tasks: Vec<TaskReference> = tasks
- .into_iter()
- .filter(|t| !t.is_supervisor)
- .map(|t| TaskReference {
- task_id: t.id,
- task_name: t.name,
- status: t.status,
- created_at: t.created_at,
- completed_at: t.completed_at,
- })
- .collect();
-
- Json(SupervisorConversationResponse {
- contract_id,
- supervisor_task_id: supervisor_state.task_id,
- phase: contract.phase,
- last_activity: supervisor_state.last_activity,
- pending_task_ids: supervisor_state.pending_task_ids,
- messages,
- spawned_tasks,
- })
- .into_response()
-}
-
-/// GET /api/v1/mesh/tasks/{id}/conversation
-/// Returns task conversation history
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks/{id}/conversation",
@@ -364,28 +135,16 @@ pub async fn get_task_conversation(
}
/// GET /api/v1/timeline
-/// Returns unified timeline for authenticated user
+/// Returns unified task-history timeline for the authenticated user.
#[utoipa::path(
get,
path = "/api/v1/timeline",
- params(
- ("contract_id" = Option<Uuid>, Query, description = "Filter by contract"),
- ("task_id" = Option<Uuid>, Query, description = "Filter by task"),
- ("include_subtasks" = Option<bool>, Query, description = "Include subtask events"),
- ("from" = Option<String>, Query, description = "Start date filter"),
- ("to" = Option<String>, Query, description = "End date filter"),
- ("limit" = Option<i32>, Query, description = "Limit results"),
- ),
responses(
- (status = 200, description = "Timeline events", body = ContractHistoryResponse),
+ (status = 200, description = "Timeline events"),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
- (status = 500, description = "Internal server error", body = ApiError),
- ),
- security(
- ("bearer_auth" = []),
- ("api_key" = [])
),
+ security(("bearer_auth" = []), ("api_key" = [])),
tag = "History"
)]
pub async fn get_timeline(
@@ -402,7 +161,6 @@ pub async fn get_timeline(
};
let history_filters = HistoryQueryFilters {
- phase: None,
event_types: None,
from: filters.from,
to: filters.to,
@@ -410,24 +168,18 @@ pub async fn get_timeline(
cursor: None,
};
- let result = if let Some(contract_id) = filters.contract_id {
- repository::get_contract_history(pool, contract_id, auth.owner_id, &history_filters).await
- } else if let Some(task_id) = filters.task_id {
+ let result = if let Some(task_id) = filters.task_id {
repository::get_task_history(pool, task_id, auth.owner_id, &history_filters).await
} else {
repository::get_timeline(pool, auth.owner_id, &history_filters).await
};
match result {
- Ok((events, total_count)) => {
- Json(ContractHistoryResponse {
- contract_id: filters.contract_id.unwrap_or_default(),
- entries: events,
- total_count,
- cursor: None,
- })
- .into_response()
- }
+ Ok((events, total_count)) => Json(serde_json::json!({
+ "entries": events,
+ "totalCount": total_count,
+ }))
+ .into_response(),
Err(e) => {
tracing::error!("Failed to get timeline: {}", e);
(
diff --git a/makima/src/server/handlers/listen.rs b/makima/src/server/handlers/listen.rs
deleted file mode 100644
index e1bc30e..0000000
--- a/makima/src/server/handlers/listen.rs
+++ /dev/null
@@ -1,783 +0,0 @@
-//! WebSocket handler for streaming speech-to-text with sliding window optimization.
-
-use axum::{
- extract::{ws::Message, ws::WebSocket, State, WebSocketUpgrade},
- response::Response,
-};
-use futures::{SinkExt, StreamExt};
-use tokio::sync::mpsc;
-use uuid::Uuid;
-
-use crate::audio::{resample_and_mixdown, TARGET_CHANNELS, TARGET_SAMPLE_RATE};
-use crate::db::models::{TranscriptEntry, UpdateFileRequest};
-use crate::db::repository;
-use crate::listen::{align_speakers, samples_per_chunk, DialogueSegment, TimestampMode};
-use crate::server::messages::{
- AudioEncoding, ClientMessage, ServerMessage, StartMessage, TranscriptMessage,
-};
-use crate::server::state::{MlModels, SharedState};
-
-/// Chunk size in milliseconds for triggering transcription processing.
-const STREAM_CHUNK_MS: u32 = 5_000;
-
-/// Maximum window size in seconds for sliding window processing.
-const MAX_WINDOW_SECONDS: f32 = 30.0;
-
-/// Maximum window size in samples at 16kHz.
-const MAX_WINDOW_SAMPLES: usize = (MAX_WINDOW_SECONDS as usize) * (TARGET_SAMPLE_RATE as usize);
-
-/// EOU chunk size in samples (160ms at 16kHz).
-const EOU_CHUNK_SIZE: usize = 2560;
-
-/// Context overlap in seconds to keep when trimming finalized audio.
-const CONTEXT_OVERLAP_SECONDS: f32 = 2.0;
-
-/// WebSocket upgrade handler for STT streaming.
-///
-/// This endpoint accepts WebSocket connections for real-time speech-to-text
-/// transcription with speaker diarization.
-#[utoipa::path(
- get,
- path = "/api/v1/listen",
- responses(
- (status = 101, description = "WebSocket connection established"),
- ),
- tag = "Listen"
-)]
-pub async fn websocket_handler(
- ws: WebSocketUpgrade,
- State(state): State<SharedState>,
-) -> Response {
- ws.on_upgrade(|socket| handle_socket(socket, state))
-}
-
-async fn handle_socket(socket: WebSocket, state: SharedState) {
- let session_id = Uuid::new_v4().to_string();
- tracing::info!(session_id = %session_id, "New WebSocket connection");
-
- // Split socket for concurrent read/write
- let (mut sender, mut receiver) = socket.split();
-
- // Channel for sending responses back to client
- let (response_tx, mut response_rx) = mpsc::channel::<ServerMessage>(32);
-
- // Spawn task to forward responses to WebSocket
- let sender_task = tokio::spawn(async move {
- while let Some(msg) = response_rx.recv().await {
- let json = match serde_json::to_string(&msg) {
- Ok(j) => j,
- Err(e) => {
- tracing::error!("Failed to serialize message: {}", e);
- continue;
- }
- };
- if sender.send(Message::Text(json.into())).await.is_err() {
- break;
- }
- }
- });
-
- // Lazy-load ML models on first Listen connection
- let ml_models = match state.get_ml_models().await {
- Ok(models) => models,
- Err(e) => {
- tracing::error!(session_id = %session_id, error = %e, "Failed to load ML models");
- let _ = response_tx
- .send(ServerMessage::Error {
- code: "MODEL_LOAD_ERROR".into(),
- message: format!("Failed to load ML models: {}", e),
- })
- .await;
- drop(response_tx);
- let _ = sender_task.await;
- return;
- }
- };
-
- // Send ready message
- let _ = response_tx
- .send(ServerMessage::Ready {
- session_id: session_id.clone(),
- })
- .await;
-
- // Audio format state
- let mut audio_format: Option<StartMessage> = None;
-
- // Main audio buffer for transcription (accumulates resampled 16kHz mono audio)
- let mut audio_buffer: Vec<f32> = Vec::new();
-
- // EOU detection buffer (resampled audio for utterance detection)
- let mut eou_buffer: Vec<f32> = Vec::new();
- let mut last_eou_text: String = String::new();
- let mut utterance_ended: bool = false;
-
- // Tracking state
- let mut last_sent_end_time: f32 = 0.0;
- let mut last_processed_len: usize = 0;
- let mut audio_offset: f32 = 0.0; // Time offset from trimmed audio
- let mut finalized_segments: Vec<DialogueSegment> = Vec::new();
-
- // File persistence state
- let mut file_id: Option<Uuid> = None;
- let mut transcript_entries: Vec<TranscriptEntry> = Vec::new();
- let mut transcript_counter: u32 = 0;
-
- // Auth state (set when Start message includes valid auth_token and contract_id)
- let mut authenticated_owner_id: Option<Uuid> = None;
- let mut target_contract_id: Option<Uuid> = None;
-
- // Reset Sortformer state for new session
- {
- let mut sortformer = ml_models.sortformer.lock().await;
- sortformer.reset_state();
- }
-
- // Process incoming messages
- while let Some(msg_result) = receiver.next().await {
- let msg = match msg_result {
- Ok(m) => m,
- Err(e) => {
- tracing::error!("WebSocket error: {}", e);
- break;
- }
- };
-
- match msg {
- Message::Text(text) => {
- // Parse JSON control messages
- match serde_json::from_str::<ClientMessage>(&text) {
- Ok(ClientMessage::Start(start)) => {
- tracing::info!(
- session_id = %session_id,
- sample_rate = start.sample_rate,
- channels = start.channels,
- encoding = ?start.encoding,
- contract_id = ?start.contract_id,
- has_auth = start.auth_token.is_some(),
- "Session started"
- );
-
- // Validate auth and contract if provided
- if let (Some(token), Some(contract_id_str)) = (&start.auth_token, &start.contract_id) {
- // Parse contract ID
- if let Ok(contract_id) = Uuid::parse_str(contract_id_str) {
- // Validate JWT token
- if let Some(ref verifier) = state.jwt_verifier {
- match verifier.verify(token) {
- Ok(claims) => {
- authenticated_owner_id = Some(claims.sub);
- target_contract_id = Some(contract_id);
- tracing::info!(
- session_id = %session_id,
- owner_id = %claims.sub,
- contract_id = %contract_id,
- "Authenticated session - transcripts will be saved to contract"
- );
- }
- Err(e) => {
- tracing::warn!(
- session_id = %session_id,
- error = %e,
- "Invalid auth token - transcripts will not be saved"
- );
- }
- }
- } else {
- tracing::debug!(
- session_id = %session_id,
- "No JWT verifier configured - transcripts will not be saved"
- );
- }
- } else {
- tracing::warn!(
- session_id = %session_id,
- contract_id = contract_id_str,
- "Invalid contract ID format"
- );
- }
- }
-
- audio_format = Some(start);
- audio_buffer.clear();
- eou_buffer.clear();
- last_eou_text.clear();
- utterance_ended = false;
- last_sent_end_time = 0.0;
- last_processed_len = 0;
- audio_offset = 0.0;
- finalized_segments.clear();
- file_id = None;
- authenticated_owner_id = authenticated_owner_id; // Keep from above
- target_contract_id = target_contract_id; // Keep from above
-
- // Reset models for new session
- let mut sortformer = ml_models.sortformer.lock().await;
- sortformer.reset_state();
- }
- Ok(ClientMessage::Stop(stop)) => {
- tracing::info!(
- session_id = %session_id,
- reason = ?stop.reason,
- audio_buffer_len = audio_buffer.len(),
- "Session stopped by client"
- );
-
- if audio_format.is_some() {
- if !audio_buffer.is_empty() {
- tracing::debug!(
- session_id = %session_id,
- samples = audio_buffer.len(),
- "Processing final audio buffer"
- );
-
- // Process remaining audio with sliding window
- match process_audio_window(&audio_buffer, audio_offset, ml_models).await {
- Ok(segments) => {
- tracing::debug!(
- session_id = %session_id,
- total_segments = segments.len(),
- finalized_count = finalized_segments.len(),
- last_sent_end = last_sent_end_time,
- "Final transcription complete"
- );
-
- // Combine finalized segments with new segments
- let mut all_segments = finalized_segments.clone();
-
- // Add segments from current window that weren't finalized
- for seg in &segments {
- // Adjust timestamps with offset
- let adjusted_seg = DialogueSegment {
- speaker: seg.speaker.clone(),
- start: seg.start + audio_offset,
- end: seg.end + audio_offset,
- text: seg.text.clone(),
- };
-
- // Only add if not already finalized
- if !finalized_segments.iter().any(|f|
- (f.start - adjusted_seg.start).abs() < 0.1 &&
- f.text == adjusted_seg.text
- ) {
- all_segments.push(adjusted_seg);
- }
- }
-
- // Sort by start time
- all_segments.sort_by(|a, b| a.start.partial_cmp(&b.start).unwrap());
-
- // Send any NEW segments as interim first
- for seg in &all_segments {
- if seg.end > last_sent_end_time {
- let _ = response_tx
- .send(ServerMessage::Transcript(TranscriptMessage {
- speaker: seg.speaker.clone(),
- start: seg.start,
- end: seg.end,
- text: seg.text.clone(),
- is_final: false,
- }))
- .await;
- }
- }
-
- // Send ALL segments as final
- for seg in &all_segments {
- let _ = response_tx
- .send(ServerMessage::Transcript(TranscriptMessage {
- speaker: seg.speaker.clone(),
- start: seg.start,
- end: seg.end,
- text: seg.text.clone(),
- is_final: true,
- }))
- .await;
- }
- }
- Err(e) => {
- tracing::error!(
- session_id = %session_id,
- error = %e,
- "Final transcription failed"
- );
- let _ = response_tx
- .send(ServerMessage::Error {
- code: "TRANSCRIPTION_ERROR".into(),
- message: e.to_string(),
- })
- .await;
- }
- }
- }
- }
-
- let _ = response_tx
- .send(ServerMessage::Stopped {
- reason: stop.reason.unwrap_or_else(|| "client_requested".into()),
- })
- .await;
- break;
- }
- Err(e) => {
- tracing::warn!(session_id = %session_id, error = %e, "Failed to parse message");
- let _ = response_tx
- .send(ServerMessage::Error {
- code: "PARSE_ERROR".into(),
- message: format!("Failed to parse message: {}", e),
- })
- .await;
- }
- }
- }
- Message::Binary(data) => {
- let Some(ref format) = audio_format else {
- let _ = response_tx
- .send(ServerMessage::Error {
- code: "NO_FORMAT".into(),
- message: "Received audio before start message".into(),
- })
- .await;
- continue;
- };
-
- // Decode binary audio data to f32 samples
- let samples = decode_audio_chunk(&data, format);
-
- // Resample to 16kHz mono for all processing
- let resampled = if format.sample_rate != TARGET_SAMPLE_RATE || format.channels != TARGET_CHANNELS {
- resample_and_mixdown(&samples, format.sample_rate, format.channels)
- } else {
- samples
- };
-
- audio_buffer.extend(&resampled);
- eou_buffer.extend(&resampled);
-
- // Process EOU detection in 160ms chunks
- while eou_buffer.len() >= EOU_CHUNK_SIZE {
- let chunk: Vec<f32> = eou_buffer.drain(..EOU_CHUNK_SIZE).collect();
-
- let mut eou = ml_models.parakeet_eou.lock().await;
- if let Ok(text) = eou.transcribe(&chunk, false) {
- // Detect utterance boundary (sentence-ending punctuation)
- if !text.is_empty() && text != last_eou_text {
- if last_eou_text.ends_with('.')
- || last_eou_text.ends_with('?')
- || last_eou_text.ends_with('!')
- {
- utterance_ended = true;
- tracing::debug!(
- session_id = %session_id,
- "Utterance boundary detected via EOU"
- );
- }
- last_eou_text = text;
- }
- }
- }
-
- // Calculate if we should process (utterance ended OR enough new audio)
- let chunk_samples = samples_per_chunk(TARGET_SAMPLE_RATE, STREAM_CHUNK_MS);
- let new_audio_len = audio_buffer.len() - last_processed_len;
- let should_process = utterance_ended || new_audio_len >= chunk_samples;
-
- if should_process {
- tracing::debug!(
- session_id = %session_id,
- total_samples = audio_buffer.len(),
- new_samples = new_audio_len,
- utterance_ended = utterance_ended,
- audio_offset = audio_offset,
- "Processing audio with sliding window"
- );
-
- match process_audio_window(&audio_buffer, audio_offset, ml_models).await {
- Ok(segments) => {
- tracing::debug!(
- session_id = %session_id,
- total_segments = segments.len(),
- last_sent_end = last_sent_end_time,
- "Transcription produced segments"
- );
-
- // Send segments with adjusted timestamps
- for seg in &segments {
- let adjusted_start = seg.start + audio_offset;
- let adjusted_end = seg.end + audio_offset;
- if adjusted_end > last_sent_end_time {
- // Create file on first transcript if authenticated with contract
- if file_id.is_none() {
- if let (Some(owner_id), Some(contract_id), Some(pool)) =
- (authenticated_owner_id, target_contract_id, &state.db_pool)
- {
- let create_req = crate::db::models::CreateFileRequest {
- contract_id,
- name: None, // Auto-generated
- description: Some("Live transcription".to_string()),
- transcript: vec![],
- location: None,
- body: vec![],
- repo_file_path: None,
- contract_phase: None, // Will be looked up from contract
- };
- match repository::create_file_for_owner(pool, owner_id, create_req).await {
- Ok(file) => {
- file_id = Some(file.id);
- tracing::info!(
- session_id = %session_id,
- file_id = %file.id,
- contract_id = %contract_id,
- "Created file for session in contract"
- );
- }
- Err(e) => {
- tracing::warn!(
- session_id = %session_id,
- error = %e,
- "Failed to create file for session"
- );
- }
- }
- }
- }
-
- // Track transcript entry
- transcript_counter += 1;
- transcript_entries.push(TranscriptEntry {
- id: format!("{}-{}", session_id, transcript_counter),
- speaker: seg.speaker.clone(),
- start: adjusted_start,
- end: adjusted_end,
- text: seg.text.clone(),
- is_final: false,
- });
-
- let _ = response_tx
- .send(ServerMessage::Transcript(TranscriptMessage {
- speaker: seg.speaker.clone(),
- start: adjusted_start,
- end: adjusted_end,
- text: seg.text.clone(),
- is_final: false,
- }))
- .await;
- last_sent_end_time = adjusted_end;
- }
- }
-
- // If utterance ended, finalize and trim
- if utterance_ended && segments.len() > 1 {
- // Finalize all but the last segment
- let to_finalize = &segments[..segments.len() - 1];
- for seg in to_finalize {
- finalized_segments.push(DialogueSegment {
- speaker: seg.speaker.clone(),
- start: seg.start + audio_offset,
- end: seg.end + audio_offset,
- text: seg.text.clone(),
- });
- }
-
- // Trim audio buffer
- if let Some(last_finalized) = to_finalize.last() {
- let trim_to_time = (last_finalized.end - CONTEXT_OVERLAP_SECONDS).max(0.0);
- let trim_samples = (trim_to_time * TARGET_SAMPLE_RATE as f32) as usize;
-
- if trim_samples > 0 && trim_samples < audio_buffer.len() {
- audio_buffer.drain(..trim_samples);
- audio_offset += trim_to_time;
- tracing::debug!(
- session_id = %session_id,
- trimmed_samples = trim_samples,
- new_offset = audio_offset,
- remaining_samples = audio_buffer.len(),
- "Trimmed audio buffer after finalization"
- );
- }
- }
- }
-
- last_processed_len = audio_buffer.len();
- utterance_ended = false;
- }
- Err(e) => {
- tracing::error!(session_id = %session_id, error = %e, "Transcription error");
- let _ = response_tx
- .send(ServerMessage::Error {
- code: "TRANSCRIPTION_ERROR".into(),
- message: e.to_string(),
- })
- .await;
- }
- }
- }
- }
- Message::Close(_) => {
- tracing::info!(session_id = %session_id, "WebSocket closed by client");
- break;
- }
- _ => {}
- }
- }
-
- // Save final transcript to file if we have one
- if let Some(fid) = file_id {
- if let Some(ref pool) = state.db_pool {
- // Deduplicate transcript entries before saving
- let deduplicated = deduplicate_transcripts(&transcript_entries);
-
- // Mark all entries as final
- let final_entries: Vec<TranscriptEntry> = deduplicated
- .into_iter()
- .map(|mut entry| {
- entry.is_final = true;
- entry
- })
- .collect();
-
- match repository::update_file(pool, fid, UpdateFileRequest {
- name: None,
- description: None,
- transcript: Some(final_entries.clone()),
- summary: None,
- body: None,
- version: None, // Internal update, skip version check
- repo_file_path: None,
- }).await {
- Ok(_) => {
- tracing::info!(
- session_id = %session_id,
- file_id = %fid,
- original_count = transcript_entries.len(),
- deduplicated_count = final_entries.len(),
- "Saved final transcript to file"
- );
-
- // Send TranscriptSaved message to client
- if let Some(contract_id) = target_contract_id {
- let _ = response_tx
- .send(ServerMessage::TranscriptSaved {
- file_id: fid.to_string(),
- contract_id: contract_id.to_string(),
- })
- .await;
- }
- }
- Err(e) => {
- tracing::error!(
- session_id = %session_id,
- file_id = %fid,
- error = %e,
- "Failed to save final transcript to file"
- );
- }
- }
- }
- }
-
- // Cleanup
- drop(response_tx);
- let _ = sender_task.await;
- tracing::info!(session_id = %session_id, "WebSocket connection closed");
-}
-
-/// Decode binary audio chunk to f32 samples based on encoding format.
-fn decode_audio_chunk(data: &[u8], format: &StartMessage) -> Vec<f32> {
- match format.encoding {
- AudioEncoding::Pcm32f => data
- .chunks_exact(4)
- .map(|chunk| f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
- .collect(),
- AudioEncoding::Pcm16 | AudioEncoding::Raw => data
- .chunks_exact(2)
- .map(|chunk| {
- let sample = i16::from_le_bytes([chunk[0], chunk[1]]);
- sample as f32 / 32768.0
- })
- .collect(),
- }
-}
-
-/// Deduplicate transcript entries by removing entries with similar times and text.
-///
-/// Entries are considered duplicates if any of these are true:
-/// - Start times are within 1.5 seconds AND text is similar (same, substring, or high overlap)
-/// - Time ranges overlap significantly AND text is similar
-/// - Text is identical regardless of timing
-fn deduplicate_transcripts(entries: &[TranscriptEntry]) -> Vec<TranscriptEntry> {
- if entries.is_empty() {
- return vec![];
- }
-
- // Sort by start time
- let mut sorted: Vec<TranscriptEntry> = entries.to_vec();
- sorted.sort_by(|a, b| a.start.partial_cmp(&b.start).unwrap_or(std::cmp::Ordering::Equal));
-
- let mut result: Vec<TranscriptEntry> = Vec::new();
-
- for entry in sorted {
- // Normalize text for comparison
- let entry_text_normalized = normalize_text(&entry.text);
-
- // Check if this entry is a duplicate of any existing entry
- let duplicate_idx = result.iter().position(|existing| {
- let existing_text_normalized = normalize_text(&existing.text);
-
- // Check if same speaker
- let same_speaker = existing.speaker == entry.speaker;
-
- // Check if start times are identical or very close
- let start_identical = (existing.start - entry.start).abs() < 0.1;
- let start_close = (existing.start - entry.start).abs() < 1.5;
-
- // Check if time ranges overlap
- let time_overlap = existing.start < entry.end && entry.start < existing.end;
-
- // Check various text similarity conditions
- let text_identical = existing_text_normalized == entry_text_normalized;
- let text_contained = existing_text_normalized.contains(&entry_text_normalized)
- || entry_text_normalized.contains(&existing_text_normalized);
- let text_similar = text_similarity(&existing_text_normalized, &entry_text_normalized) > 0.7;
-
- // Duplicate conditions:
- // 1. Same speaker + identical start time (different end times = same segment refined)
- // 2. Same speaker + close start + similar text
- // 3. Same speaker + overlapping time + similar text
- // 4. Identical text (likely a re-transcription)
- (same_speaker && start_identical)
- || (same_speaker && start_close && (text_identical || text_contained || text_similar))
- || (same_speaker && time_overlap && (text_identical || text_contained))
- || text_identical
- });
-
- match duplicate_idx {
- Some(idx) => {
- // If the new entry has longer text, update the existing one
- if entry.text.len() > result[idx].text.len() {
- result[idx].text = entry.text.clone();
- result[idx].end = result[idx].end.max(entry.end);
- } else {
- // Extend end time if needed
- result[idx].end = result[idx].end.max(entry.end);
- }
- }
- None => {
- result.push(entry);
- }
- }
- }
-
- // Second pass: merge adjacent segments with same speaker and similar text
- let mut merged: Vec<TranscriptEntry> = Vec::new();
- for entry in result {
- if let Some(last) = merged.last_mut() {
- // Check if this should be merged with the previous entry
- let same_speaker = last.speaker == entry.speaker;
- let adjacent = (entry.start - last.end).abs() < 0.5;
- let text_overlap = normalize_text(&last.text).contains(&normalize_text(&entry.text))
- || normalize_text(&entry.text).contains(&normalize_text(&last.text));
-
- if same_speaker && adjacent && text_overlap {
- // Merge: keep longer text, extend time range
- if entry.text.len() > last.text.len() {
- last.text = entry.text;
- }
- last.end = last.end.max(entry.end);
- continue;
- }
- }
- merged.push(entry);
- }
-
- // Reassign IDs to be sequential
- for (i, entry) in merged.iter_mut().enumerate() {
- let parts: Vec<&str> = entry.id.split('-').collect();
- if let Some(session_prefix) = parts.first() {
- entry.id = format!("{}-{}", session_prefix, i + 1);
- }
- }
-
- merged
-}
-
-/// Normalize text for comparison by lowercasing and collapsing whitespace.
-fn normalize_text(text: &str) -> String {
- text.to_lowercase()
- .split_whitespace()
- .collect::<Vec<_>>()
- .join(" ")
-}
-
-/// Calculate text similarity as a ratio of shared words.
-fn text_similarity(a: &str, b: &str) -> f32 {
- if a.is_empty() || b.is_empty() {
- return 0.0;
- }
-
- let words_a: std::collections::HashSet<&str> = a.split_whitespace().collect();
- let words_b: std::collections::HashSet<&str> = b.split_whitespace().collect();
-
- let intersection = words_a.intersection(&words_b).count();
- let union = words_a.union(&words_b).count();
-
- if union == 0 {
- 0.0
- } else {
- intersection as f32 / union as f32
- }
-}
-
-/// Process audio using sliding window through STT and streaming diarization models.
-///
-/// Only processes the last MAX_WINDOW_SECONDS of audio to maintain constant
-/// processing time regardless of total audio length.
-async fn process_audio_window(
- samples: &[f32],
- _audio_offset: f32,
- ml_models: &MlModels,
-) -> Result<Vec<DialogueSegment>, Box<dyn std::error::Error + Send + Sync>> {
- // Apply sliding window - only process the last 30 seconds
- let window_start = samples.len().saturating_sub(MAX_WINDOW_SAMPLES);
- let window = &samples[window_start..];
-
- tracing::trace!(
- total_samples = samples.len(),
- window_samples = window.len(),
- window_start = window_start,
- "Using sliding window for processing"
- );
-
- // Acquire model locks and run inference
- let mut parakeet = ml_models.parakeet.lock().await;
- let mut sortformer = ml_models.sortformer.lock().await;
-
- // Run streaming diarization (maintains speaker cache across calls)
- let diarization_segments =
- sortformer.diarize_streaming(window.to_vec(), TARGET_SAMPLE_RATE, TARGET_CHANNELS)?;
-
- // Run transcription
- let transcription = parakeet.transcribe_samples(
- window.to_vec(),
- TARGET_SAMPLE_RATE,
- TARGET_CHANNELS,
- Some(TimestampMode::Sentences),
- )?;
-
- // Align speakers with transcription
- let aligned = align_speakers(&transcription.tokens, &diarization_segments);
-
- // Adjust timestamps for window offset within the buffer
- let window_offset = window_start as f32 / TARGET_SAMPLE_RATE as f32;
- let adjusted: Vec<DialogueSegment> = aligned
- .into_iter()
- .map(|seg| DialogueSegment {
- speaker: seg.speaker,
- start: seg.start + window_offset,
- end: seg.end + window_offset,
- text: seg.text,
- })
- .collect();
-
- Ok(adjusted)
-}
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs
index be5387e..6ba4c8b 100644
--- a/makima/src/server/handlers/mesh.rs
+++ b/makima/src/server/handlers/mesh.rs
@@ -274,35 +274,16 @@ pub async fn create_task(
let _ = repository::record_history_event(
pool,
auth.owner_id,
- task.contract_id,
Some(task.id),
"task",
Some("created"),
- None,
serde_json::json!({
"name": &task.name,
- "isSupervisor": task.is_supervisor,
}),
).await;
- // Notify supervisor of new task creation if task belongs to a contract
- if let Some(contract_id) = task.contract_id {
- if !task.is_supervisor {
- let pool = pool.clone();
- let state_clone = state.clone();
- let task_clone = task.clone();
- tokio::spawn(async move {
- if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await {
- state_clone.notify_supervisor_of_task_created(
- supervisor.id,
- supervisor.daemon_id,
- task_clone.id,
- &task_clone.name,
- ).await;
- }
- });
- }
- }
+ // Supervisor notification on task creation removed alongside
+ // legacy contracts.
(StatusCode::CREATED, Json(task)).into_response()
}
Err(e) => {
@@ -352,26 +333,6 @@ pub async fn update_task(
.into_response();
};
- // Check if trying to set a supervisor task to a terminal status
- if let Some(ref new_status) = req.status {
- let terminal_statuses = ["done", "failed", "merged"];
- if terminal_statuses.contains(&new_status.as_str()) {
- // Get the task to check if it's a supervisor
- if let Ok(Some(task)) = repository::get_task_for_owner(pool, id, auth.owner_id).await {
- if task.is_supervisor {
- return (
- StatusCode::BAD_REQUEST,
- Json(ApiError::new(
- "SUPERVISOR_CANNOT_COMPLETE",
- "Supervisor tasks cannot be marked as done, failed, or merged. They run for the lifetime of the contract.",
- )),
- )
- .into_response();
- }
- }
- }
- }
-
// Track which fields are being updated for the notification
let mut updated_fields = Vec::new();
if req.name.is_some() {
@@ -410,26 +371,9 @@ pub async fn update_task(
updated_by: "user".to_string(),
});
- // Notify supervisor of status change if task belongs to a contract
- if let Some(contract_id) = task.contract_id {
- if !task.is_supervisor && updated_fields_clone.contains(&"status".to_string()) {
- let pool = pool.clone();
- let state_clone = state.clone();
- let task_clone = task.clone();
- tokio::spawn(async move {
- if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await {
- state_clone.notify_supervisor_of_task_update(
- supervisor.id,
- supervisor.daemon_id,
- task_clone.id,
- &task_clone.name,
- &task_clone.status,
- &updated_fields_clone,
- ).await;
- }
- });
- }
- }
+ // Supervisor notification on task update removed alongside
+ // legacy contracts.
+ let _ = updated_fields_clone;
Json(task).into_response()
}
@@ -657,15 +601,10 @@ pub async fn start_task(
.into_response();
}
- // Get local_only and auto_merge_local flags from contract if task has one
- let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id {
- match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
- Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local),
- _ => (false, false),
- }
- } else {
- (false, false)
- };
+ // local_only / auto_merge_local used to come from the parent contract.
+ // With legacy contracts removed they default to false; the directive
+ // lifecycle handles its own completion now.
+ let (local_only, auto_merge_local) = (false, false);
// Get list of daemons that have previously failed this task
let mut exclude_daemon_ids: Vec<Uuid> = task.failed_daemon_ids.clone().unwrap_or_default();
@@ -708,8 +647,7 @@ pub async fn start_task(
task_depth = task.depth,
subtask_count = subtask_count,
is_orchestrator = is_orchestrator,
- is_supervisor = task.is_supervisor,
- "Starting task with orchestrator/supervisor determination"
+ "Starting task"
);
// IMPORTANT: Update database FIRST to assign daemon_id before sending command
@@ -755,8 +693,6 @@ pub async fn start_task(
completion_action: task.completion_action.clone(),
continue_from_task_id: task.continue_from_task_id,
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
- contract_id: task.contract_id,
- is_supervisor: task.is_supervisor,
autonomous_loop: false,
resume_session: false,
conversation_history: None,
@@ -764,13 +700,11 @@ pub async fn start_task(
patch_base_sha: None,
local_only,
auto_merge_local,
- supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: task.directive_id,
};
tracing::info!(
task_id = %id,
- is_supervisor = task.is_supervisor,
is_orchestrator = is_orchestrator,
daemon_id = %target_daemon_id,
"Sending SpawnTask command to daemon"
@@ -811,8 +745,6 @@ pub async fn start_task(
completion_action: task.completion_action.clone(),
continue_from_task_id: task.continue_from_task_id,
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
- contract_id: task.contract_id,
- is_supervisor: task.is_supervisor,
autonomous_loop: false,
resume_session: false,
conversation_history: None,
@@ -820,7 +752,6 @@ pub async fn start_task(
patch_base_sha: None,
local_only,
auto_merge_local,
- supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: task.directive_id,
};
@@ -1128,11 +1059,10 @@ pub async fn send_message(
// Check if task is in a state that can receive messages
// Allow "running" and "starting" (to handle race between status update and message send)
- // Also allow AUTH_CODE messages and supervisor tasks regardless of status
+ // Also allow AUTH_CODE messages regardless of status
let is_auth_code = req.message.starts_with("AUTH_CODE:");
- let is_supervisor = task.is_supervisor;
let can_receive_message = task.status == "running" || task.status == "starting";
- if !can_receive_message && !is_auth_code && !is_supervisor {
+ if !can_receive_message && !is_auth_code {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
@@ -1147,27 +1077,8 @@ pub async fn send_message(
}
// Find the daemon running this task
- // For supervisors, if no daemon is assigned, find any available daemon for this owner
let target_daemon_id = if let Some(daemon_id) = task.daemon_id {
daemon_id
- } else if is_supervisor {
- // Supervisor without daemon - find one
- match state.daemon_connections
- .iter()
- .find(|d| d.value().owner_id == auth.owner_id)
- {
- Some(entry) => entry.value().id,
- None => {
- return (
- StatusCode::SERVICE_UNAVAILABLE,
- Json(ApiError::new(
- "NO_DAEMON",
- "No daemon available. Please start a daemon.",
- )),
- )
- .into_response();
- }
- }
} else {
return (
StatusCode::SERVICE_UNAVAILABLE,
@@ -1206,15 +1117,7 @@ pub async fn send_message(
};
if let Ok(Some(updated_task)) = repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
- // Get local_only and auto_merge_local from contract if task has one
- let (local_only, auto_merge_local) = if let Some(contract_id) = updated_task.contract_id {
- match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
- Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local),
- _ => (false, false),
- }
- } else {
- (false, false)
- };
+ let (local_only, auto_merge_local) = (false, false);
// Send spawn command to new daemon
let spawn_cmd = DaemonCommand::SpawnTask {
@@ -1231,8 +1134,6 @@ pub async fn send_message(
completion_action: updated_task.completion_action.clone(),
continue_from_task_id: updated_task.continue_from_task_id,
copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
- contract_id: updated_task.contract_id,
- is_supervisor: updated_task.is_supervisor,
autonomous_loop: false,
resume_session: false,
conversation_history: None,
@@ -1240,7 +1141,6 @@ pub async fn send_message(
patch_base_sha: None,
local_only,
auto_merge_local,
- supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: updated_task.directive_id,
};
@@ -2433,13 +2333,11 @@ pub async fn commit_worktree(
// Task Patches
// =============================================================================
-/// Query parameters for listing task patches
+/// Query parameters for listing task patches (legacy contract scope
+/// removed; query is currently empty).
#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
-pub struct ListPatchesQuery {
- /// Contract ID to scope the patches
- pub contract_id: Uuid,
-}
+pub struct ListPatchesQuery {}
/// Patch summary for API response
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
@@ -2453,8 +2351,6 @@ pub struct PatchSummary {
pub description: Option<String>,
/// Task ID
pub task_id: Uuid,
- /// Contract ID
- pub contract_id: Uuid,
/// Number of files in the patch
pub files_count: i32,
/// Total lines added (estimated from patch size)
@@ -2523,14 +2419,8 @@ pub async fn list_task_patches(
}
};
- // Verify task belongs to the specified contract
- if task.contract_id != Some(query.contract_id) {
- return (
- StatusCode::BAD_REQUEST,
- Json(ApiError::new("INVALID_CONTRACT", "Task does not belong to the specified contract")),
- )
- .into_response();
- }
+ // Legacy contract verification removed; checkpoint patches are
+ // accessible to any owner of the task.
// Get checkpoint patches for this task
let patches = match repository::list_checkpoint_patches(pool, id).await {
@@ -2586,7 +2476,6 @@ pub async fn list_task_patches(
name,
description,
task_id: p.task_id,
- contract_id: query.contract_id,
files_count: p.files_count,
lines_added,
lines_removed,
@@ -3040,12 +2929,10 @@ pub async fn reassign_task(
// Create a NEW task with the conversation context
let create_req = CreateTaskRequest {
- contract_id: task.contract_id,
name: format!("{} (resumed)", task.name),
description: task.description.clone(),
plan: updated_plan.clone(),
parent_task_id: task.parent_task_id,
- is_supervisor: task.is_supervisor,
priority: task.priority,
repository_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
@@ -3058,7 +2945,6 @@ pub async fn reassign_task(
checkpoint_sha: task.last_checkpoint_sha.clone(),
branched_from_task_id: None,
conversation_history: None,
- supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: None,
directive_step_id: None,
};
@@ -3126,15 +3012,8 @@ pub async fn reassign_task(
}
};
- // Get local_only and auto_merge_local from contract if task has one
- let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id {
- match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
- Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local),
- _ => (false, false),
- }
- } else {
- (false, false)
- };
+ // Legacy contract scope removed; defaults to false.
+ let (local_only, auto_merge_local) = (false, false);
// Send SpawnTask command to daemon for the new task
let command = DaemonCommand::SpawnTask {
@@ -3151,8 +3030,6 @@ pub async fn reassign_task(
completion_action: task.completion_action.clone(),
continue_from_task_id: Some(id), // Continue from old task's worktree
copy_files: None,
- contract_id: task.contract_id,
- is_supervisor: task.is_supervisor,
autonomous_loop: false,
resume_session: false,
conversation_history: None,
@@ -3160,7 +3037,6 @@ pub async fn reassign_task(
patch_base_sha,
local_only,
auto_merge_local,
- supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: task.directive_id,
};
@@ -3190,56 +3066,10 @@ pub async fn reassign_task(
// Don't fail the request, the new task is already running
}
- // Notify the contract's supervisor about the reassignment (if applicable)
- if let Some(contract_id) = task.contract_id {
- if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
- if let Some(supervisor_task_id) = contract.supervisor_task_id {
- // Don't notify if we're reassigning the supervisor itself
- if supervisor_task_id != old_task_id {
- // Find the supervisor's daemon and send a message
- if let Ok(Some(supervisor_task)) = repository::get_task_for_owner(pool, supervisor_task_id, auth.owner_id).await {
- if supervisor_task.status == "running" {
- if let Some(supervisor_daemon_id) = supervisor_task.daemon_id {
- // Find the daemon by its UUID
- if let Some(daemon_entry) = state.daemon_connections.iter().find(|d| d.value().id == supervisor_daemon_id) {
- let notification_msg = format!(
- "\n\n[SYSTEM NOTIFICATION] Task '{}' (ID: {}) was reassigned due to daemon disconnect. \
- A new task '{}' (ID: {}) has been created to continue the work. \
- The new task has {} context entries from the previous conversation.\n\n",
- task.name,
- old_task_id,
- final_task.name,
- new_task.id,
- context_entries
- );
-
- let notify_cmd = DaemonCommand::SendMessage {
- task_id: supervisor_task_id,
- message: notification_msg,
- };
-
- if let Err(e) = state.send_daemon_command(daemon_entry.value().id, notify_cmd).await {
- tracing::warn!(
- supervisor_id = %supervisor_task_id,
- error = %e,
- "Failed to notify supervisor about task reassignment"
- );
- } else {
- tracing::info!(
- supervisor_id = %supervisor_task_id,
- old_task_id = %old_task_id,
- new_task_id = %new_task.id,
- "Notified supervisor about task reassignment"
- );
- }
- }
- }
- }
- }
- }
- }
- }
- }
+ // Supervisor reassignment notification removed alongside legacy
+ // contracts. The directive reconciler picks up reassigned tasks on
+ // its next tick.
+ let _ = context_entries;
// Broadcast task update for the new task
state.broadcast_task_update(TaskUpdateNotification {
@@ -3467,15 +3297,8 @@ pub async fn continue_task(
};
let is_orchestrator = task.depth == 0 && subtask_count > 0;
- // Get local_only and auto_merge_local from contract if task has one
- let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id {
- match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
- Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local),
- _ => (false, false),
- }
- } else {
- (false, false)
- };
+ // Legacy contract scope removed; defaults to false.
+ let (local_only, auto_merge_local) = (false, false);
// Send SpawnTask command to daemon
let command = DaemonCommand::SpawnTask {
@@ -3492,8 +3315,6 @@ pub async fn continue_task(
completion_action: task.completion_action.clone(),
continue_from_task_id: task.continue_from_task_id,
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
- contract_id: task.contract_id,
- is_supervisor: task.is_supervisor,
autonomous_loop: false,
resume_session: false,
conversation_history: None,
@@ -3501,7 +3322,6 @@ pub async fn continue_task(
patch_base_sha: None,
local_only,
auto_merge_local,
- supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: task.directive_id,
};
@@ -3509,7 +3329,6 @@ pub async fn continue_task(
task_id = %id,
daemon_id = %target_daemon_id,
context_entries = context_entries,
- is_supervisor = task.is_supervisor,
"Continuing task with conversation context"
);
@@ -3820,12 +3639,10 @@ pub async fn fork_task(
// Create the new forked task
let create_req = CreateTaskRequest {
- contract_id: task.contract_id,
name: req.new_task_name.clone(),
description: task.description.clone(),
plan: req.new_task_plan.clone(),
parent_task_id: None, // Forked tasks are independent
- is_supervisor: false,
priority: task.priority,
repository_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
@@ -3838,7 +3655,6 @@ pub async fn fork_task(
checkpoint_sha: Some(checkpoint.commit_sha.clone()),
branched_from_task_id: None,
conversation_history: None,
- supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: None,
directive_step_id: None,
};
@@ -3980,12 +3796,10 @@ pub async fn resume_from_checkpoint(
});
let create_req = CreateTaskRequest {
- contract_id: task.contract_id,
name: task_name,
description: task.description.clone(),
plan: req.plan,
parent_task_id: None,
- is_supervisor: false,
priority: task.priority,
repository_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
@@ -3998,7 +3812,6 @@ pub async fn resume_from_checkpoint(
checkpoint_sha: Some(checkpoint.commit_sha.clone()),
branched_from_task_id: None,
conversation_history: None,
- supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: None,
directive_step_id: None,
};
@@ -4318,12 +4131,10 @@ pub async fn branch_task(
// Create the branched task (anonymous - no contract_id)
let create_req = CreateTaskRequest {
- contract_id: None, // Anonymous task
name: task_name,
description: Some(format!("Branched from task: {}", source_task.name)),
plan: req.message,
parent_task_id: None,
- is_supervisor: false,
priority: source_task.priority,
repository_url: source_task.repository_url.clone(),
base_branch: source_task.base_branch.clone(),
@@ -4336,7 +4147,6 @@ pub async fn branch_task(
checkpoint_sha: None,
branched_from_task_id: Some(source_task_id),
conversation_history,
- supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: None,
directive_step_id: None,
};
@@ -4357,11 +4167,9 @@ pub async fn branch_task(
let _ = repository::record_history_event(
pool,
auth.owner_id,
- None, // No contract for anonymous tasks
Some(task.id),
"task",
Some("branched"),
- None,
serde_json::json!({
"name": &task.name,
"sourceTaskId": source_task_id,
@@ -4425,8 +4233,6 @@ pub async fn branch_task(
completion_action: updated_task.completion_action.clone(),
continue_from_task_id: updated_task.continue_from_task_id,
copy_files: None,
- contract_id: None,
- is_supervisor: false,
autonomous_loop: false,
resume_session: message_count > 0, // Resume if we have conversation history
conversation_history: updated_task.conversation_state.clone(),
@@ -4434,7 +4240,6 @@ pub async fn branch_task(
patch_base_sha,
local_only: false, // No contract, so not local_only
auto_merge_local: false, // No contract, so no auto_merge_local
- supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: None,
};
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index 19d2166..9900385 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -262,27 +262,6 @@ pub enum DaemonMessage {
#[serde(rename = "activeTasks")]
active_tasks: Vec<Uuid>,
},
- /// Enhanced supervisor heartbeat with detailed state
- SupervisorHeartbeat {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- #[serde(rename = "contractId")]
- contract_id: Uuid,
- /// Supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted
- state: String,
- /// Current contract phase
- phase: String,
- /// Description of current activity
- #[serde(rename = "currentActivity")]
- current_activity: Option<String>,
- /// Progress percentage (0-100)
- progress: u8,
- /// Task IDs the supervisor is waiting on
- #[serde(rename = "pendingTaskIds")]
- pending_task_ids: Vec<Uuid>,
- /// Timestamp of this heartbeat
- timestamp: DateTime<Utc>,
- },
/// Task output streaming (stdout/stderr from Claude Code)
TaskOutput {
#[serde(rename = "taskId")]
@@ -618,96 +597,6 @@ struct DaemonAuthResult {
/// Automatically create a PR when all non-supervisor tasks for a contract are done.
/// Only applies to remote-repo contracts in the "execute" phase.
/// Fires as a best-effort operation — errors are logged but not propagated.
-async fn auto_create_pr_if_ready(
- pool: &sqlx::PgPool,
- state: &SharedState,
- contract_id: Uuid,
- owner_id: Uuid,
-) {
- // 1. Load contract — must be remote (not local_only) and in execute phase
- let contract = match repository::get_contract_for_owner(pool, contract_id, owner_id).await {
- Ok(Some(c)) => c,
- _ => return,
- };
- if contract.local_only || contract.phase != "execute" {
- return;
- }
-
- // 2. Load non-supervisor tasks — all must be done
- let tasks = match repository::list_tasks_by_contract(pool, contract_id, owner_id).await {
- Ok(t) => t,
- _ => return,
- };
- let non_supervisor_tasks: Vec<_> = tasks.iter().filter(|t| !t.is_supervisor).collect();
- if non_supervisor_tasks.is_empty() || !non_supervisor_tasks.iter().all(|t| t.status == "done") {
- return;
- }
-
- // 3. Check pull-request deliverable not already complete
- let completed_deliverables = contract.get_completed_deliverables(&contract.phase);
- if completed_deliverables.contains(&"pull-request".to_string()) {
- return;
- }
-
- // 4. Check at least one repository has a remote URL
- let repos = match repository::list_contract_repositories(pool, contract_id).await {
- Ok(r) => r,
- _ => return,
- };
- if !repos.iter().any(|r| r.repository_url.is_some()) {
- return;
- }
-
- // 5. Load supervisor task
- let supervisor = match repository::get_contract_supervisor_task(pool, contract_id).await {
- Ok(Some(s)) => s,
- _ => return,
- };
-
- // Need supervisor's daemon_id to send command
- let daemon_id = match supervisor.daemon_id {
- Some(id) => id,
- None => return,
- };
-
- // 6. Construct branch name
- let sanitized_name: String = supervisor
- .name
- .chars()
- .map(|c| if c.is_alphanumeric() || c == '-' || c == '_' { c } else { '-' })
- .collect::<String>()
- .to_lowercase();
- let short_id = &supervisor.id.to_string()[..8];
- let branch = format!("makima/{}-{}", sanitized_name, short_id);
-
- // 7. Send CreatePR command to supervisor's daemon
- let command = DaemonCommand::CreatePR {
- task_id: supervisor.id,
- title: contract.name.clone(),
- body: contract.description.clone(),
- base_branch: supervisor.base_branch.clone(),
- branch,
- };
-
- match state.send_daemon_command(daemon_id, command).await {
- Ok(()) => {
- tracing::info!(
- contract_id = %contract_id,
- supervisor_id = %supervisor.id,
- "Auto-PR: sent CreatePR command to supervisor daemon"
- );
- }
- Err(e) => {
- tracing::warn!(
- contract_id = %contract_id,
- error = %e,
- "Auto-PR: failed to send CreatePR command"
- );
- }
- }
-}
-
-/// Validate an API key and return (user_id, owner_id).
async fn validate_daemon_api_key(pool: &sqlx::PgPool, key: &str) -> Result<DaemonAuthResult, String> {
let key_hash = hash_api_key(key);
@@ -983,83 +872,6 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
});
}
}
- Ok(DaemonMessage::SupervisorHeartbeat {
- task_id,
- contract_id,
- state: supervisor_state,
- phase,
- current_activity,
- progress,
- pending_task_ids,
- timestamp: _,
- }) => {
- tracing::debug!(
- task_id = %task_id,
- contract_id = %contract_id,
- state = %supervisor_state,
- phase = %phase,
- progress = progress,
- "Supervisor heartbeat received"
- );
-
- // Store heartbeat in database and update supervisor state (Task 3.3)
- if let Some(ref pool) = state.db_pool {
- let pool = pool.clone();
- let pending_ids = pending_task_ids.clone();
- let activity = current_activity.clone();
- let state_str = supervisor_state.clone();
- let phase_str = phase.clone();
- tokio::spawn(async move {
- // Store the heartbeat record
- if let Err(e) = repository::create_supervisor_heartbeat(
- &pool,
- task_id,
- contract_id,
- &state_str,
- &phase_str,
- activity.as_deref(),
- progress as i32,
- &pending_ids,
- ).await {
- tracing::warn!(
- task_id = %task_id,
- contract_id = %contract_id,
- error = %e,
- "Failed to store supervisor heartbeat"
- );
- }
-
- // Update supervisor_states table (lightweight heartbeat state update - Task 3.3)
- if let Err(e) = repository::update_supervisor_heartbeat_state(
- &pool,
- contract_id,
- &state_str,
- activity.as_deref(),
- progress as i32,
- &pending_ids,
- ).await {
- tracing::debug!(
- contract_id = %contract_id,
- error = %e,
- "Failed to update supervisor state from heartbeat (may not exist yet)"
- );
- }
-
- // Also update the daemon heartbeat
- if let Ok(Some(task)) = repository::get_task(&pool, task_id).await {
- if let Some(daemon_id) = task.daemon_id {
- if let Err(e) = repository::update_daemon_heartbeat(&pool, daemon_id).await {
- tracing::warn!(
- daemon_id = %daemon_id,
- error = %e,
- "Failed to update daemon heartbeat from supervisor"
- );
- }
- }
- }
- });
- }
- }
Ok(DaemonMessage::TaskOutput { task_id, output, is_partial }) => {
// Parse the output line and broadcast structured data
if let Some(notification) = parse_claude_output(task_id, owner_id, &output, is_partial) {
@@ -1120,136 +932,16 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
updated_by: "daemon".into(),
});
- // Initialize or restore supervisor_state when supervisor task starts running (Task 3.4)
- if updated_task.is_supervisor && new_status_owned == "running" {
- if let Some(contract_id) = updated_task.contract_id {
- // Check if supervisor state already exists (restoration scenario)
- match repository::get_supervisor_state(&pool, contract_id).await {
- Ok(Some(existing_state)) => {
- // State exists - this is a restoration
- tracing::info!(
- task_id = %task_id,
- contract_id = %contract_id,
- existing_state = %existing_state.state,
- restoration_count = existing_state.restoration_count,
- "Supervisor starting with existing state - restoration in progress"
- );
-
- // Mark as restored (increments restoration_count)
- match repository::mark_supervisor_restored(
- &pool,
- contract_id,
- "daemon_restart",
- ).await {
- Ok(restored_state) => {
- tracing::info!(
- task_id = %task_id,
- contract_id = %contract_id,
- restoration_count = restored_state.restoration_count,
- "Supervisor restoration marked"
- );
-
- // Check for pending questions to re-deliver
- if let Ok(questions) = serde_json::from_value::<Vec<crate::db::models::PendingQuestion>>(
- restored_state.pending_questions.clone()
- ) {
- if !questions.is_empty() {
- tracing::info!(
- contract_id = %contract_id,
- question_count = questions.len(),
- "Pending questions found for re-delivery"
- );
- // Questions will be re-delivered by the supervisor when it restores
- }
- }
- }
- Err(e) => {
- tracing::warn!(
- task_id = %task_id,
- contract_id = %contract_id,
- error = %e,
- "Failed to mark supervisor as restored"
- );
- }
- }
- }
- Ok(None) => {
- // No existing state - fresh start
- // Get contract to get its phase
- match repository::get_contract_for_owner(
- &pool,
- contract_id,
- updated_task.owner_id,
- ).await {
- Ok(Some(contract)) => {
- match repository::upsert_supervisor_state(
- &pool,
- contract_id,
- task_id,
- serde_json::json!([]), // Empty conversation
- &[], // No pending tasks
- &contract.phase,
- ).await {
- Ok(_) => {
- tracing::info!(
- task_id = %task_id,
- contract_id = %contract_id,
- phase = %contract.phase,
- "Initialized fresh supervisor state"
- );
- }
- Err(e) => {
- tracing::warn!(
- task_id = %task_id,
- contract_id = %contract_id,
- error = %e,
- "Failed to initialize supervisor state"
- );
- }
- }
- }
- Ok(None) => {
- tracing::warn!(
- task_id = %task_id,
- contract_id = %contract_id,
- "Contract not found when initializing supervisor state"
- );
- }
- Err(e) => {
- tracing::warn!(
- task_id = %task_id,
- contract_id = %contract_id,
- error = %e,
- "Failed to get contract for supervisor state"
- );
- }
- }
- }
- Err(e) => {
- tracing::warn!(
- task_id = %task_id,
- contract_id = %contract_id,
- error = %e,
- "Failed to check existing supervisor state"
- );
- }
- }
- }
- }
-
// Record history event when task starts running
if new_status_owned == "running" {
let _ = repository::record_history_event(
&pool,
updated_task.owner_id,
- updated_task.contract_id,
Some(task_id),
"task",
Some("started"),
- None,
serde_json::json!({
"name": &updated_task.name,
- "isSupervisor": updated_task.is_supervisor,
}),
).await;
}
@@ -1329,51 +1021,19 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
updated_by: "daemon".into(),
});
- // Notify supervisor if this task belongs to a contract
- if let Some(contract_id) = updated_task.contract_id {
- // Don't notify for supervisor tasks (they don't report to themselves)
- if !updated_task.is_supervisor {
- if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await {
- // action_directive used to come from
- // compute_action_directive (now removed alongside the
- // LLM module). Passing None preserves the existing
- // supervisor protocol; the auto-PR path below still
- // fires when every task is done.
- state.notify_supervisor_of_task_completion(
- supervisor.id,
- supervisor.daemon_id,
- updated_task.id,
- &updated_task.name,
- &updated_task.status,
- updated_task.progress_summary.as_deref(),
- updated_task.error_message.as_deref(),
- None,
- ).await;
- }
- }
- }
-
- // Auto-create PR if all tasks are done and repo is remote
- if updated_task.status == "done" {
- if let Some(contract_id) = updated_task.contract_id {
- let pool_c = pool.clone();
- let state_c = state.clone();
- tokio::spawn(async move {
- auto_create_pr_if_ready(&pool_c, &state_c, contract_id, owner_id).await;
- });
- }
- }
+ // Supervisor notification + auto-PR removed alongside
+ // legacy contracts. Directive completion is handled
+ // by the directive reconciler.
+ let _ = owner_id;
// Record history event for task completion
let subtype = if updated_task.status == "done" { "completed" } else { "failed" };
let _ = repository::record_history_event(
&pool,
updated_task.owner_id,
- updated_task.contract_id,
Some(task_id),
"task",
Some(subtype),
- None,
serde_json::json!({
"name": &updated_task.name,
"status": &updated_task.status,
@@ -1962,16 +1622,13 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
});
// Record history event for checkpoint
- // Get task to get contract_id
if let Ok(Some(task)) = repository::get_task(pool, task_id).await {
let _ = repository::record_history_event(
pool,
task.owner_id,
- task.contract_id,
Some(task_id),
"checkpoint",
Some("created"),
- None,
serde_json::json!({
"checkpointNumber": checkpoint.checkpoint_number,
"commitSha": &sha,
@@ -2103,28 +1760,6 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
conflicts: conflicts.clone(),
});
- // On successful merge, notify supervisor to check if all merges complete
- if success {
- if let Some(pool) = state.db_pool.as_ref() {
- if let Ok(Some(task)) = repository::get_task(pool, task_id).await {
- if let Some(contract_id) = task.contract_id {
- if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(pool, contract_id).await {
- let prompt = format!(
- "[INFO] Merge completed: {}\n\
- Check if all tasks are merged with `makima supervisor tasks`.\n\
- If ready, create PR with `makima supervisor pr`.",
- message
- );
- let _ = state.notify_supervisor(
- supervisor.id,
- supervisor.daemon_id,
- &prompt,
- ).await;
- }
- }
- }
- }
- }
}
Ok(DaemonMessage::PRCreated {
task_id,
@@ -2150,52 +1785,6 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
pr_number,
});
- // Notify supervisor of PR result (both success and failure)
- if let Some(pool) = state.db_pool.as_ref() {
- if let Ok(Some(task)) = repository::get_task(pool, task_id).await {
- if let Some(contract_id) = task.contract_id {
- if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(pool, contract_id).await {
- let prompt = if success {
- // Get contract to determine next action
- let next_action = if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, task.owner_id).await {
- match (contract.contract_type.as_str(), contract.phase.as_str()) {
- ("simple", "execute") => {
- "Mark contract complete with `makima supervisor complete`".to_string()
- }
- ("specification", "execute") => {
- "Advance to review phase with `makima supervisor advance-phase review`".to_string()
- }
- _ => "Check contract status with `makima supervisor status`".to_string()
- }
- } else {
- "Check contract status with `makima supervisor status`".to_string()
- };
-
- format!(
- "[ACTION REQUIRED] PR created successfully!\n\
- PR: {}\n\n\
- Next step: {}",
- pr_url.as_deref().unwrap_or(&message),
- next_action
- )
- } else {
- format!(
- "[ERROR] PR creation failed for task {}:\n\
- {}\n\n\
- Please fix the issue and retry with `makima supervisor pr`.",
- task_id,
- message
- )
- };
- let _ = state.notify_supervisor(
- supervisor.id,
- supervisor.daemon_id,
- &prompt,
- ).await;
- }
- }
- }
- }
}
Ok(DaemonMessage::GitConfigInherited {
success,
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index ebde52b..4a9a00b 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -1,7 +1,13 @@
-//! HTTP handlers for supervisor-specific mesh operations.
+//! Question + order backchannel for directive-spawned tasks.
//!
-//! These endpoints are used by supervisor tasks (via supervisor.sh) to orchestrate
-//! contract work: spawning tasks, waiting for completion, reading worktree files, etc.
+//! Originally a much larger handler that orchestrated contract-supervisor
+//! task trees (spawn / wait / merge / PR / etc.). Legacy contracts and
+//! supervisor tasks have been removed; what remains is the in-memory
+//! question machinery (`makima directive ask`) and order creation
+//! (`makima directive create-order`).
+//!
+//! Module name is kept as `mesh_supervisor` for route-path stability —
+//! the CLI client still hits `/api/v1/mesh/supervisor/...` endpoints.
use axum::{
extract::{Path, State},
@@ -9,238 +15,38 @@ use axum::{
response::IntoResponse,
Json,
};
-use base64::Engine;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use uuid::Uuid;
-use crate::db::models::{CreateOrderRequest, CreateTaskRequest, PendingQuestion, Task, TaskSummary, UpdateTaskRequest};
+use crate::db::models::CreateOrderRequest;
use crate::db::repository;
-use sqlx::PgPool;
use crate::server::auth::Authenticated;
use crate::server::handlers::mesh::{extract_auth, AuthSource};
use crate::server::messages::ApiError;
-use crate::server::state::{DaemonCommand, SharedState, TaskOutputNotification, TaskUpdateNotification};
-
-// =============================================================================
-// Request/Response Types
-// =============================================================================
-
-/// Request to spawn a new task from supervisor.
-#[derive(Debug, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct SpawnTaskRequest {
- pub name: String,
- pub plan: String,
- pub contract_id: Uuid,
- pub parent_task_id: Option<Uuid>,
- pub checkpoint_sha: Option<String>,
- /// Repository URL for the task (optional - if not provided, will be looked up from contract).
- pub repository_url: Option<String>,
-}
-
-/// Request to wait for task completion.
-#[derive(Debug, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct WaitForTaskRequest {
- #[serde(default = "default_timeout")]
- pub timeout_seconds: i32,
-}
-
-fn default_timeout() -> i32 {
- 300
-}
-
-/// Request to read a file from task worktree.
-#[derive(Debug, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct ReadWorktreeFileRequest {
- pub file_path: String,
-}
-
-/// Request to ask a question and wait for user feedback.
-#[derive(Debug, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct AskQuestionRequest {
- /// The question to ask the user
- pub question: String,
- /// Optional choices (if empty, free-form text response)
- #[serde(default)]
- pub choices: Vec<String>,
- /// Optional context about what this relates to
- pub context: Option<String>,
- /// How long to wait for a response (seconds)
- #[serde(default = "default_question_timeout")]
- pub timeout_seconds: i32,
- /// When true, the request will block indefinitely until user responds (no timeout)
- #[serde(default)]
- pub phaseguard: bool,
- /// When true, allow selecting multiple choices (response will be comma-separated)
- #[serde(default)]
- pub multi_select: bool,
- /// When true, return immediately without waiting for response
- #[serde(default)]
- pub non_blocking: bool,
- /// Question type: general, phase_confirmation, or contract_complete
- #[serde(default = "default_question_type")]
- pub question_type: String,
-}
-
-fn default_question_type() -> String {
- "general".to_string()
-}
-
-fn default_question_timeout() -> i32 {
- 3600 // 1 hour default
-}
-
-/// Response from asking a question.
-#[derive(Debug, Serialize, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct AskQuestionResponse {
- /// The question ID for tracking
- pub question_id: Uuid,
- /// The user's response (None if timed out)
- pub response: Option<String>,
- /// Whether the question timed out
- pub timed_out: bool,
- /// Whether the question is still pending (server-side timeout reached but question not removed).
- /// The client should poll the poll endpoint to continue waiting.
- #[serde(default)]
- pub still_pending: bool,
-}
-
-/// Request to answer a supervisor question.
-#[derive(Debug, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct AnswerQuestionRequest {
- /// The user's response
- pub response: String,
-}
-
-/// Response to answering a question.
-#[derive(Debug, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct AnswerQuestionResponse {
- /// Whether the answer was accepted
- pub success: bool,
-}
-
-/// Pending question summary.
-#[derive(Debug, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct PendingQuestionSummary {
- pub question_id: Uuid,
- pub task_id: Uuid,
- pub contract_id: Uuid,
- /// Directive this question relates to (if from a directive task)
- #[serde(skip_serializing_if = "Option::is_none")]
- pub directive_id: Option<Uuid>,
- pub question: String,
- pub choices: Vec<String>,
- pub context: Option<String>,
- pub created_at: chrono::DateTime<chrono::Utc>,
- /// Whether multiple choices can be selected
- #[serde(default)]
- pub multi_select: bool,
- /// Question type: general, phase_confirmation, or contract_complete
- #[serde(default)]
- pub question_type: String,
-}
-
-/// Request to create a checkpoint.
-#[derive(Debug, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct CreateCheckpointRequest {
- pub message: String,
-}
-
-/// Response for task tree.
-#[derive(Debug, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct TaskTreeResponse {
- pub tasks: Vec<TaskSummary>,
- pub supervisor_task_id: Option<Uuid>,
- pub total_count: usize,
-}
-
-/// Response for wait operation.
-#[derive(Debug, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct WaitResponse {
- pub task_id: Uuid,
- pub status: String,
- pub completed: bool,
- pub output_summary: Option<String>,
-}
-
-/// Response for read file operation.
-#[derive(Debug, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct ReadFileResponse {
- pub task_id: Uuid,
- pub file_path: String,
- pub content: String,
- pub exists: bool,
-}
-
-/// Response for checkpoint operations.
-#[derive(Debug, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct CheckpointResponse {
- pub task_id: Uuid,
- pub checkpoint_number: i32,
- pub commit_sha: String,
- pub message: String,
-}
-
-/// Task checkpoint info.
-#[derive(Debug, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct TaskCheckpoint {
- pub id: Uuid,
- pub task_id: Uuid,
- pub checkpoint_number: i32,
- pub commit_sha: String,
- pub branch_name: String,
- pub message: String,
- pub files_changed: Option<serde_json::Value>,
- pub lines_added: i32,
- pub lines_removed: i32,
- pub created_at: chrono::DateTime<chrono::Utc>,
-}
-
-/// Response for list checkpoints.
-#[derive(Debug, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct CheckpointListResponse {
- pub task_id: Uuid,
- pub checkpoints: Vec<TaskCheckpoint>,
-}
+use crate::server::state::SharedState;
// =============================================================================
-// Helper Functions
+// Auth helper
// =============================================================================
-/// Verify the request comes from a supervisor task and extract ownership info.
-async fn verify_supervisor_auth(
+/// Verify the request comes from a directive task (tool-key auth) and
+/// return the calling task id + owner id.
+async fn verify_task_auth(
state: &SharedState,
headers: &HeaderMap,
- contract_id: Option<Uuid>,
) -> Result<(Uuid, Uuid), (StatusCode, Json<ApiError>)> {
let auth = extract_auth(state, headers);
-
let task_id = match auth {
AuthSource::ToolKey(task_id) => task_id,
_ => {
return Err((
StatusCode::UNAUTHORIZED,
- Json(ApiError::new("UNAUTHORIZED", "Supervisor endpoints require tool key auth")),
+ Json(ApiError::new("UNAUTHORIZED", "These endpoints require tool key auth")),
));
}
};
- // Get the task to verify it's a supervisor and get owner_id
let pool = state.db_pool.as_ref().ok_or_else(|| {
(
StatusCode::SERVICE_UNAVAILABLE,
@@ -251,10 +57,10 @@ async fn verify_supervisor_auth(
let task = repository::get_task(pool, task_id)
.await
.map_err(|e| {
- tracing::error!(error = %e, "Failed to get supervisor task");
+ tracing::error!(error = %e, "Failed to load task");
(
StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", "Failed to verify supervisor")),
+ Json(ApiError::new("DB_ERROR", "Failed to load task")),
)
})?
.ok_or_else(|| {
@@ -264,1411 +70,113 @@ async fn verify_supervisor_auth(
)
})?;
- // Verify task is a supervisor or a directive task
- if !task.is_supervisor && task.directive_id.is_none() {
+ // Only directive-attached tasks may use this backchannel.
+ if task.directive_id.is_none() {
return Err((
StatusCode::FORBIDDEN,
- Json(ApiError::new("NOT_SUPERVISOR", "Only supervisor or directive tasks can use these endpoints")),
+ Json(ApiError::new(
+ "NOT_DIRECTIVE_TASK",
+ "Only directive-attached tasks can use these endpoints",
+ )),
));
}
- // If contract_id provided, verify the supervisor belongs to that contract
- if let Some(cid) = contract_id {
- if task.contract_id != Some(cid) {
- return Err((
- StatusCode::FORBIDDEN,
- Json(ApiError::new("CONTRACT_MISMATCH", "Supervisor does not belong to this contract")),
- ));
- }
- }
-
Ok((task_id, task.owner_id))
}
-/// Try to start a pending task on an available daemon.
-/// Returns Ok(Some(task)) if a task was started, Ok(None) if no tasks could be started.
-/// For retried tasks, excludes daemons that previously failed the task and includes
-/// checkpoint patch data for worktree recovery.
-pub async fn try_start_pending_task(
- state: &SharedState,
- contract_id: Uuid,
- owner_id: Uuid,
-) -> Result<Option<Task>, String> {
- let pool = state.db_pool.as_ref().ok_or("Database not configured")?;
-
- // Get pending tasks for this contract (includes interrupted tasks awaiting retry)
- let pending_tasks = repository::get_pending_tasks_for_contract(pool, contract_id, owner_id)
- .await
- .map_err(|e| format!("Failed to get pending tasks: {}", e))?;
-
- if pending_tasks.is_empty() {
- return Ok(None);
- }
-
- // Get contract to check local_only flag
- let contract = repository::get_contract_for_owner(pool, contract_id, owner_id)
- .await
- .map_err(|e| format!("Failed to get contract: {}", e))?
- .ok_or_else(|| "Contract not found".to_string())?;
-
- // Try each pending task until we find one we can start
- for task in &pending_tasks {
- // Get excluded daemon IDs for this task (daemons that have already failed it)
- let exclude_ids: Vec<Uuid> = task.failed_daemon_ids.clone().unwrap_or_default();
-
- // Get available daemons excluding failed ones for this task
- let daemons = repository::get_available_daemons_excluding(pool, owner_id, &exclude_ids)
- .await
- .map_err(|e| format!("Failed to get available daemons: {}", e))?;
-
- // Find a daemon with capacity
- let available_daemon = daemons.iter().find(|d| {
- d.current_task_count < d.max_concurrent_tasks
- && state.daemon_connections.contains_key(&d.connection_id)
- });
-
- let daemon = match available_daemon {
- Some(d) => d,
- None => continue, // Try next task
- };
-
- // Get repo URL from task or contract
- let repo_url = if let Some(url) = &task.repository_url {
- Some(url.clone())
- } else {
- match repository::list_contract_repositories(pool, contract_id).await {
- Ok(repos) => repos
- .iter()
- .find(|r| r.is_primary)
- .or(repos.first())
- .and_then(|r| r.repository_url.clone().or_else(|| r.local_path.clone())),
- Err(_) => None,
- }
- };
-
- // Update task with daemon assignment
- let update_req = UpdateTaskRequest {
- status: Some("starting".to_string()),
- daemon_id: Some(daemon.id),
- version: Some(task.version),
- ..Default::default()
- };
-
- let updated_task = match repository::update_task_for_owner(pool, task.id, owner_id, update_req).await {
- Ok(Some(t)) => t,
- Ok(None) => continue, // Task was modified concurrently, try next
- Err(e) => {
- tracing::warn!(task_id = %task.id, error = %e, "Failed to update task for daemon assignment");
- continue; // Try next task
- }
- };
-
- // For retried tasks, fetch checkpoint patch for worktree recovery
- let (patch_data, patch_base_sha) = if task.retry_count > 0 {
- // This is a retry - try to restore from checkpoint
- match repository::get_latest_checkpoint_patch(pool, task.id).await {
- Ok(Some(patch)) => {
- tracing::info!(
- task_id = %task.id,
- retry_count = task.retry_count,
- patch_size = patch.patch_size_bytes,
- base_sha = %patch.base_commit_sha,
- "Including checkpoint patch for task retry recovery"
- );
- let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data);
- (Some(encoded), Some(patch.base_commit_sha))
- }
- Ok(None) => {
- tracing::debug!(task_id = %task.id, "No checkpoint patch found for retry");
- (None, None)
- }
- Err(e) => {
- tracing::warn!(task_id = %task.id, error = %e, "Failed to fetch checkpoint patch for retry");
- (None, None)
- }
- }
- } else {
- (None, None)
- };
-
- // Send spawn command
- let cmd = DaemonCommand::SpawnTask {
- task_id: updated_task.id,
- task_name: updated_task.name.clone(),
- plan: updated_task.plan.clone(),
- repo_url,
- base_branch: updated_task.base_branch.clone(),
- target_branch: updated_task.target_branch.clone(),
- parent_task_id: updated_task.parent_task_id,
- depth: updated_task.depth,
- is_orchestrator: false,
- target_repo_path: updated_task.target_repo_path.clone(),
- completion_action: updated_task.completion_action.clone(),
- continue_from_task_id: updated_task.continue_from_task_id,
- copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
- contract_id: updated_task.contract_id,
- is_supervisor: updated_task.is_supervisor,
- autonomous_loop: updated_task.is_supervisor,
- resume_session: task.retry_count > 0, // Use --continue for retried tasks
- conversation_history: None,
- patch_data,
- patch_base_sha,
- local_only: contract.local_only,
- auto_merge_local: contract.auto_merge_local,
- // For retried tasks, use their own worktree (they already have state from previous attempt)
- supervisor_worktree_task_id: None,
- directive_id: updated_task.directive_id,
- };
-
- if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
- tracing::warn!(error = %e, daemon_id = %daemon.id, task_id = %task.id, "Failed to send spawn command");
- // Rollback
- let rollback_req = UpdateTaskRequest {
- status: Some("pending".to_string()),
- clear_daemon_id: true,
- ..Default::default()
- };
- let _ = repository::update_task_for_owner(pool, task.id, owner_id, rollback_req).await;
- continue; // Try next task
- }
-
- tracing::info!(task_id = %task.id, daemon_id = %daemon.id, "Started pending task from wait loop");
- return Ok(Some(updated_task));
- }
-
- // No tasks could be started
- Ok(None)
-}
-
-// =============================================================================
-// Contract Task Handlers
-// =============================================================================
-
-/// List all tasks in a contract's tree.
-#[utoipa::path(
- get,
- path = "/api/v1/mesh/supervisor/contracts/{contract_id}/tasks",
- params(
- ("contract_id" = Uuid, Path, description = "Contract ID")
- ),
- responses(
- (status = 200, description = "List of tasks in contract", body = TaskTreeResponse),
- (status = 401, description = "Unauthorized"),
- (status = 403, description = "Forbidden - not a supervisor"),
- (status = 500, description = "Internal server error"),
- ),
- tag = "Mesh Supervisor"
-)]
-pub async fn list_contract_tasks(
- State(state): State<SharedState>,
- Path(contract_id): Path<Uuid>,
- headers: HeaderMap,
-) -> impl IntoResponse {
- let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, Some(contract_id)).await {
- Ok(ids) => ids,
- Err(e) => return e.into_response(),
- };
-
- let pool = state.db_pool.as_ref().unwrap();
-
- // Get all tasks for this contract
- match repository::list_tasks_by_contract(pool, contract_id, owner_id).await {
- Ok(tasks) => {
- let supervisor_task_id = tasks.iter().find(|t| t.is_supervisor).map(|t| t.id);
- let summaries: Vec<TaskSummary> = tasks.into_iter().map(TaskSummary::from).collect();
- let total_count = summaries.len();
-
- (
- StatusCode::OK,
- Json(TaskTreeResponse {
- tasks: summaries,
- supervisor_task_id,
- total_count,
- }),
- ).into_response()
- }
- Err(e) => {
- tracing::error!(error = %e, "Failed to list contract tasks");
- (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", "Failed to list tasks")),
- ).into_response()
- }
- }
-}
-
-/// Get full task tree structure for a contract.
-#[utoipa::path(
- get,
- path = "/api/v1/mesh/supervisor/contracts/{contract_id}/tree",
- params(
- ("contract_id" = Uuid, Path, description = "Contract ID")
- ),
- responses(
- (status = 200, description = "Task tree structure", body = TaskTreeResponse),
- (status = 401, description = "Unauthorized"),
- (status = 403, description = "Forbidden - not a supervisor"),
- (status = 500, description = "Internal server error"),
- ),
- tag = "Mesh Supervisor"
-)]
-pub async fn get_contract_tree(
- State(state): State<SharedState>,
- Path(contract_id): Path<Uuid>,
- headers: HeaderMap,
-) -> impl IntoResponse {
- // Same as list_contract_tasks for now - can add tree structure later
- list_contract_tasks(State(state), Path(contract_id), headers).await
-}
-
-// =============================================================================
-// Task Spawn Handler
-// =============================================================================
-
-/// Spawn a new task (supervisor only).
-#[utoipa::path(
- post,
- path = "/api/v1/mesh/supervisor/tasks",
- request_body = SpawnTaskRequest,
- responses(
- (status = 201, description = "Task created", body = Task),
- (status = 400, description = "Invalid request"),
- (status = 401, description = "Unauthorized"),
- (status = 403, description = "Forbidden - not a supervisor"),
- (status = 500, description = "Internal server error"),
- ),
- tag = "Mesh Supervisor"
-)]
-pub async fn spawn_task(
- State(state): State<SharedState>,
- headers: HeaderMap,
- Json(request): Json<SpawnTaskRequest>,
-) -> impl IntoResponse {
- let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, Some(request.contract_id)).await {
- Ok(ids) => ids,
- Err(e) => return e.into_response(),
- };
-
- let pool = state.db_pool.as_ref().unwrap();
-
- // Verify contract exists and get local_only flag
- let contract = match repository::get_contract_for_owner(pool, request.contract_id, owner_id).await {
- Ok(Some(c)) => c,
- Ok(None) => {
- return (
- StatusCode::NOT_FOUND,
- Json(ApiError::new("NOT_FOUND", "Contract not found")),
- ).into_response();
- }
- Err(e) => {
- tracing::error!(error = %e, "Failed to get contract");
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", "Failed to get contract")),
- ).into_response();
- }
- };
-
- // Get repository URL - either from request or from contract's repositories
- let repo_url = if let Some(url) = request.repository_url.clone() {
- if !url.trim().is_empty() {
- Some(url)
- } else {
- None
- }
- } else {
- None
- };
-
- // If no repo URL provided, look it up from the contract
- let repo_url = match repo_url {
- Some(url) => Some(url),
- None => {
- match repository::list_contract_repositories(pool, request.contract_id).await {
- Ok(repos) => {
- // Prefer primary repo, fallback to first repo
- let repo = repos.iter()
- .find(|r| r.is_primary)
- .or(repos.first());
-
- // Use repository_url if set, otherwise use local_path
- repo.and_then(|r| {
- r.repository_url.clone()
- .or_else(|| r.local_path.clone())
- })
- }
- Err(e) => {
- tracing::warn!(error = %e, "Failed to get contract repositories");
- None
- }
- }
- }
- };
-
- // Validate that we have a repo URL
- if repo_url.is_none() {
- return (
- StatusCode::BAD_REQUEST,
- Json(ApiError::new("MISSING_REPO_URL", "No repository URL found. Either provide one or ensure the contract has repositories configured.")),
- ).into_response();
- }
-
- // Create task request
- // All tasks share the supervisor's worktree
- let supervisor_worktree_task_id = Some(supervisor_id);
-
- let create_req = CreateTaskRequest {
- name: request.name.clone(),
- description: None,
- plan: request.plan.clone(),
- repository_url: repo_url.clone(),
- contract_id: Some(request.contract_id),
- parent_task_id: request.parent_task_id,
- is_supervisor: false,
- checkpoint_sha: request.checkpoint_sha.clone(),
- merge_mode: Some("manual".to_string()),
- priority: 0,
- base_branch: None,
- target_branch: None,
- target_repo_path: None,
- completion_action: None,
- continue_from_task_id: None,
- copy_files: None,
- branched_from_task_id: None,
- conversation_history: None,
- supervisor_worktree_task_id,
- directive_id: None,
- directive_step_id: None,
- };
-
- // Create task in DB
- let task = match repository::create_task_for_owner(pool, owner_id, create_req).await {
- Ok(t) => t,
- Err(e) => {
- tracing::error!(error = %e, "Failed to create task");
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", "Failed to create task")),
- ).into_response();
- }
- };
-
- tracing::info!(
- supervisor_id = %supervisor_id,
- task_id = %task.id,
- task_name = %task.name,
- "Supervisor spawned new task"
- );
-
- // Record history event for task spawned by supervisor
- let _ = repository::record_history_event(
- pool,
- owner_id,
- task.contract_id,
- Some(task.id),
- "task",
- Some("spawned"),
- None,
- serde_json::json!({
- "name": &task.name,
- "spawnedBy": supervisor_id.to_string(),
- }),
- ).await;
-
- // Broadcast task creation notification to WebSocket subscribers
- state.broadcast_task_update(TaskUpdateNotification {
- task_id: task.id,
- owner_id: Some(owner_id),
- version: task.version,
- status: task.status.clone(),
- updated_fields: vec!["created".to_string()],
- updated_by: "supervisor".to_string(),
- });
-
- // Start task on a daemon
- // Find a daemon that belongs to this owner
- let mut updated_task = task;
- for entry in state.daemon_connections.iter() {
- let daemon = entry.value();
- if daemon.owner_id == owner_id {
- // IMPORTANT: Update database FIRST to assign daemon_id before sending command
- // This prevents race conditions where the task starts but daemon_id is not set
- let update_req = UpdateTaskRequest {
- status: Some("starting".to_string()),
- daemon_id: Some(daemon.id),
- version: Some(updated_task.version),
- ..Default::default()
- };
-
- match repository::update_task_for_owner(pool, updated_task.id, owner_id, update_req).await {
- Ok(Some(t)) => {
- updated_task = t;
- }
- Ok(None) => {
- tracing::warn!(task_id = %updated_task.id, "Task not found when updating daemon_id");
- break;
- }
- Err(e) => {
- tracing::error!(task_id = %updated_task.id, error = %e, "Failed to update task with daemon_id");
- break;
- }
- }
-
- // Send spawn command to daemon
- let cmd = DaemonCommand::SpawnTask {
- task_id: updated_task.id,
- task_name: updated_task.name.clone(),
- plan: updated_task.plan.clone(),
- repo_url: repo_url.clone(),
- base_branch: updated_task.base_branch.clone(),
- target_branch: updated_task.target_branch.clone(),
- parent_task_id: updated_task.parent_task_id,
- depth: updated_task.depth,
- is_orchestrator: false,
- target_repo_path: updated_task.target_repo_path.clone(),
- completion_action: updated_task.completion_action.clone(),
- continue_from_task_id: updated_task.continue_from_task_id,
- copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
- contract_id: updated_task.contract_id,
- is_supervisor: false,
- autonomous_loop: false,
- resume_session: false,
- conversation_history: None,
- patch_data: None,
- patch_base_sha: None,
- local_only: contract.local_only,
- auto_merge_local: contract.auto_merge_local,
- // All tasks share the supervisor's worktree
- supervisor_worktree_task_id: Some(supervisor_id),
- directive_id: updated_task.directive_id,
- };
-
- if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
- tracing::warn!(error = %e, daemon_id = %daemon.id, "Failed to send spawn command");
- // Rollback: clear daemon_id and reset status since command failed
- let rollback_req = UpdateTaskRequest {
- status: Some("pending".to_string()),
- clear_daemon_id: true,
- ..Default::default()
- };
- let _ = repository::update_task_for_owner(pool, updated_task.id, owner_id, rollback_req).await;
- } else {
- tracing::info!(task_id = %updated_task.id, daemon_id = %daemon.id, repo_url = ?repo_url, "Task spawn command sent");
-
- // Save state: task spawn is a key save point (Task 3.3)
- save_state_on_task_spawn(pool, request.contract_id, updated_task.id).await;
-
- // Broadcast task status update notification to WebSocket subscribers
- state.broadcast_task_update(TaskUpdateNotification {
- task_id: updated_task.id,
- owner_id: Some(owner_id),
- version: updated_task.version,
- status: "starting".to_string(),
- updated_fields: vec!["status".to_string(), "daemon_id".to_string()],
- updated_by: "supervisor".to_string(),
- });
-
- }
- break;
- }
- }
-
- (StatusCode::CREATED, Json(updated_task)).into_response()
-}
-
// =============================================================================
-// Wait for Task Handler
+// Question types
// =============================================================================
-/// Wait for a task to complete.
-#[utoipa::path(
- post,
- path = "/api/v1/mesh/supervisor/tasks/{task_id}/wait",
- params(
- ("task_id" = Uuid, Path, description = "Task ID to wait for")
- ),
- request_body = WaitForTaskRequest,
- responses(
- (status = 200, description = "Task completed or timed out", body = WaitResponse),
- (status = 401, description = "Unauthorized"),
- (status = 403, description = "Forbidden - not a supervisor"),
- (status = 404, description = "Task not found"),
- (status = 500, description = "Internal server error"),
- ),
- tag = "Mesh Supervisor"
-)]
-pub async fn wait_for_task(
- State(state): State<SharedState>,
- Path(task_id): Path<Uuid>,
- headers: HeaderMap,
- Json(request): Json<WaitForTaskRequest>,
-) -> impl IntoResponse {
- let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
- Ok(ids) => ids,
- Err(e) => return e.into_response(),
- };
-
- let pool = state.db_pool.as_ref().unwrap();
-
- // Verify task belongs to same owner
- let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
- Ok(Some(t)) => t,
- Ok(None) => {
- return (
- StatusCode::NOT_FOUND,
- Json(ApiError::new("NOT_FOUND", "Task not found")),
- ).into_response();
- }
- Err(e) => {
- tracing::error!(error = %e, "Failed to get task");
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", "Failed to get task")),
- ).into_response();
- }
- };
-
- // Check if already done
- if task.status == "done" || task.status == "failed" || task.status == "merged" {
- return (
- StatusCode::OK,
- Json(WaitResponse {
- task_id,
- status: task.status,
- completed: true,
- output_summary: None,
- }),
- ).into_response();
- }
-
- // Get contract_id for pending task scheduling
- let contract_id = task.contract_id;
-
- // Subscribe to task completions
- let mut rx = state.task_completions.subscribe();
- let timeout = tokio::time::Duration::from_secs(request.timeout_seconds as u64);
-
- // Wait for completion or timeout, periodically trying to start pending tasks
- let result = tokio::time::timeout(timeout, async {
- let mut pending_check_interval = tokio::time::interval(tokio::time::Duration::from_secs(5));
- pending_check_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
-
- loop {
- tokio::select! {
- // Check for task completion notifications
- recv_result = rx.recv() => {
- match recv_result {
- Ok(notification) => {
- if notification.task_id == task_id {
- return Some(notification);
- }
- }
- Err(_) => {
- // Channel closed or lagged - check DB directly
- if let Ok(Some(t)) = repository::get_task(pool, task_id).await {
- if t.status == "done" || t.status == "failed" || t.status == "merged" {
- return Some(crate::server::state::TaskCompletionNotification {
- task_id: t.id,
- owner_id: Some(t.owner_id),
- contract_id: t.contract_id,
- parent_task_id: t.parent_task_id,
- status: t.status,
- output_summary: None,
- worktree_path: None,
- error_message: t.error_message,
- });
- }
- }
- }
- }
- }
- // Periodically try to start pending tasks
- _ = pending_check_interval.tick() => {
- if let Some(cid) = contract_id {
- match try_start_pending_task(&state, cid, owner_id).await {
- Ok(Some(started_task)) => {
- tracing::debug!(
- task_id = %started_task.id,
- task_name = %started_task.name,
- "Started pending task while waiting"
- );
- }
- Ok(None) => {
- // No pending tasks or no capacity - that's fine
- }
- Err(e) => {
- tracing::warn!(error = %e, "Error trying to start pending task");
- }
- }
- }
- }
- }
- }
- }).await;
-
- match result {
- Ok(Some(notification)) => {
- (
- StatusCode::OK,
- Json(WaitResponse {
- task_id,
- status: notification.status,
- completed: true,
- output_summary: notification.output_summary,
- }),
- ).into_response()
- }
- Ok(None) | Err(_) => {
- // Timeout - check final status
- let final_status = repository::get_task(pool, task_id)
- .await
- .ok()
- .flatten()
- .map(|t| t.status)
- .unwrap_or_else(|| "unknown".to_string());
-
- (
- StatusCode::OK,
- Json(WaitResponse {
- task_id,
- status: final_status.clone(),
- completed: final_status == "done" || final_status == "failed" || final_status == "merged",
- output_summary: None,
- }),
- ).into_response()
- }
- }
-}
-
-// =============================================================================
-// Read Worktree File Handler
-// =============================================================================
-
-/// Read a file from a task's worktree.
-#[utoipa::path(
- post,
- path = "/api/v1/mesh/supervisor/tasks/{task_id}/read-file",
- params(
- ("task_id" = Uuid, Path, description = "Task ID")
- ),
- request_body = ReadWorktreeFileRequest,
- responses(
- (status = 200, description = "File content", body = ReadFileResponse),
- (status = 401, description = "Unauthorized"),
- (status = 403, description = "Forbidden - not a supervisor"),
- (status = 404, description = "Task not found"),
- (status = 500, description = "Internal server error"),
- ),
- tag = "Mesh Supervisor"
-)]
-pub async fn read_worktree_file(
- State(state): State<SharedState>,
- Path(task_id): Path<Uuid>,
- headers: HeaderMap,
- Json(request): Json<ReadWorktreeFileRequest>,
-) -> impl IntoResponse {
- let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
- Ok(ids) => ids,
- Err(e) => return e.into_response(),
- };
-
- let pool = state.db_pool.as_ref().unwrap();
-
- // Get task to verify ownership
- let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
- Ok(Some(t)) => t,
- Ok(None) => {
- return (
- StatusCode::NOT_FOUND,
- Json(ApiError::new("NOT_FOUND", "Task not found")),
- ).into_response();
- }
- Err(e) => {
- tracing::error!(error = %e, "Failed to get task");
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", "Failed to get task")),
- ).into_response();
- }
- };
-
- // TODO: Implement file reading via worktree path
- // For now, return not implemented - supervisor should use local file access via worktree
- let _ = (task, request);
-
- (
- StatusCode::NOT_IMPLEMENTED,
- Json(ApiError::new(
- "NOT_IMPLEMENTED",
- "Worktree file reading via API not yet implemented. Use local filesystem access via worktree path.",
- )),
- ).into_response()
-}
-
-// =============================================================================
-// Checkpoint Handlers
-// =============================================================================
-
-/// Create a git checkpoint for a task.
-#[utoipa::path(
- post,
- path = "/api/v1/mesh/tasks/{task_id}/checkpoint",
- params(
- ("task_id" = Uuid, Path, description = "Task ID")
- ),
- request_body = CreateCheckpointRequest,
- responses(
- (status = 202, description = "Checkpoint creation accepted", body = CheckpointResponse),
- (status = 401, description = "Unauthorized"),
- (status = 403, description = "Forbidden - can only create checkpoint for own task"),
- (status = 404, description = "Task not found"),
- (status = 500, description = "Internal server error"),
- (status = 503, description = "Task has no assigned daemon"),
- ),
- tag = "Mesh Supervisor"
-)]
-pub async fn create_checkpoint(
- State(state): State<SharedState>,
- Path(task_id): Path<Uuid>,
- headers: HeaderMap,
- Json(request): Json<CreateCheckpointRequest>,
-) -> impl IntoResponse {
- let auth = extract_auth(&state, &headers);
-
- let task_id_from_auth = match auth {
- AuthSource::ToolKey(tid) => tid,
- _ => {
- return (
- StatusCode::UNAUTHORIZED,
- Json(ApiError::new("UNAUTHORIZED", "Tool key required")),
- ).into_response();
- }
- };
-
- // Can only create checkpoint for own task
- if task_id_from_auth != task_id {
- return (
- StatusCode::FORBIDDEN,
- Json(ApiError::new("FORBIDDEN", "Can only create checkpoint for own task")),
- ).into_response();
- }
-
- let pool = state.db_pool.as_ref().unwrap();
-
- // Get task and daemon_id
- let task = match repository::get_task(pool, task_id).await {
- Ok(Some(t)) => t,
- Ok(None) => {
- return (
- StatusCode::NOT_FOUND,
- Json(ApiError::new("NOT_FOUND", "Task not found")),
- ).into_response();
- }
- Err(e) => {
- tracing::error!(error = %e, "Failed to get task");
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", "Failed to get task")),
- ).into_response();
- }
- };
-
- let Some(daemon_id) = task.daemon_id else {
- return (
- StatusCode::SERVICE_UNAVAILABLE,
- Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")),
- ).into_response();
- };
-
- // Send CreateCheckpoint command to daemon
- let cmd = DaemonCommand::CreateCheckpoint {
- task_id,
- message: request.message.clone(),
- };
-
- if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
- tracing::error!(error = %e, "Failed to send CreateCheckpoint command");
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")),
- ).into_response();
- }
-
- // Return accepted - the checkpoint result will be delivered via WebSocket
- // and stored in the database by the daemon message handler
- (
- StatusCode::ACCEPTED,
- Json(CheckpointResponse {
- task_id,
- checkpoint_number: 0, // Will be assigned by DB on actual creation
- commit_sha: "pending".to_string(),
- message: request.message,
- }),
- ).into_response()
-}
-
-/// List checkpoints for a task.
-#[utoipa::path(
- get,
- path = "/api/v1/mesh/tasks/{task_id}/checkpoints",
- params(
- ("task_id" = Uuid, Path, description = "Task ID")
- ),
- responses(
- (status = 200, description = "List of checkpoints", body = CheckpointListResponse),
- (status = 401, description = "Unauthorized"),
- (status = 404, description = "Task not found"),
- (status = 500, description = "Internal server error"),
- ),
- tag = "Mesh Supervisor"
-)]
-pub async fn list_checkpoints(
- State(state): State<SharedState>,
- Path(task_id): Path<Uuid>,
- headers: HeaderMap,
-) -> impl IntoResponse {
- let auth = extract_auth(&state, &headers);
-
- let _task_id_from_auth = match auth {
- AuthSource::ToolKey(tid) => tid,
- _ => {
- return (
- StatusCode::UNAUTHORIZED,
- Json(ApiError::new("UNAUTHORIZED", "Tool key required")),
- ).into_response();
- }
- };
-
- let pool = state.db_pool.as_ref().unwrap();
-
- // Get checkpoints from DB
- match repository::list_task_checkpoints(pool, task_id).await {
- Ok(checkpoints) => {
- let checkpoint_list: Vec<TaskCheckpoint> = checkpoints
- .into_iter()
- .map(|c| TaskCheckpoint {
- id: c.id,
- task_id: c.task_id,
- checkpoint_number: c.checkpoint_number,
- commit_sha: c.commit_sha,
- branch_name: c.branch_name,
- message: c.message,
- files_changed: c.files_changed,
- lines_added: c.lines_added.unwrap_or(0),
- lines_removed: c.lines_removed.unwrap_or(0),
- created_at: c.created_at,
- })
- .collect();
-
- (
- StatusCode::OK,
- Json(CheckpointListResponse {
- task_id,
- checkpoints: checkpoint_list,
- }),
- ).into_response()
- }
- Err(e) => {
- tracing::error!(error = %e, "Failed to list checkpoints");
- (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", "Failed to list checkpoints")),
- ).into_response()
- }
- }
-}
-
-// =============================================================================
-// Git Operations - Request/Response Types
-// =============================================================================
-
-/// Request to create a new branch.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
-pub struct CreateBranchRequest {
- pub branch_name: String,
- pub from_ref: Option<String>,
+pub struct AskQuestionRequest {
+ pub question: String,
+ #[serde(default)]
+ pub choices: Vec<String>,
+ pub context: Option<String>,
+ #[serde(default = "default_question_timeout")]
+ pub timeout_seconds: i32,
+ /// When true the request blocks until the user responds (no
+ /// timeout) — the CLI reconnects via the poll endpoint if the
+ /// server-side timeout is reached.
+ #[serde(default)]
+ pub phaseguard: bool,
+ #[serde(default)]
+ pub multi_select: bool,
+ /// Return immediately without waiting for a response.
+ #[serde(default)]
+ pub non_blocking: bool,
+ /// Question type: general, phase_confirmation, contract_complete.
+ #[serde(default = "default_question_type")]
+ pub question_type: String,
}
-/// Response for branch creation.
-#[derive(Debug, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct CreateBranchResponse {
- pub success: bool,
- pub branch_name: String,
- pub message: String,
+fn default_question_type() -> String {
+ "general".to_string()
}
-/// Request to merge task changes.
-#[derive(Debug, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct MergeTaskRequest {
- pub target_branch: Option<String>,
- #[serde(default)]
- pub squash: bool,
+fn default_question_timeout() -> i32 {
+ 3600
}
-/// Response for merge operation.
-#[derive(Debug, Serialize, ToSchema)]
+#[derive(Debug, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
-pub struct MergeTaskResponse {
- pub task_id: Uuid,
- pub success: bool,
- pub message: String,
- pub commit_sha: Option<String>,
- pub conflicts: Option<Vec<String>>,
+pub struct AskQuestionResponse {
+ pub question_id: Uuid,
+ pub response: Option<String>,
+ pub timed_out: bool,
+ /// Server-side timeout was reached but the question is still
+ /// pending. CLI should re-poll via `/poll`.
+ #[serde(default)]
+ pub still_pending: bool,
}
-/// Request to create a pull request.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
-pub struct CreatePRRequest {
- pub branch: String,
- pub title: String,
- pub body: Option<String>,
+pub struct AnswerQuestionRequest {
+ pub response: String,
}
-/// Response for PR creation.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
-pub struct CreatePRResponse {
- pub task_id: Uuid,
+pub struct AnswerQuestionResponse {
pub success: bool,
- pub message: String,
- pub pr_url: Option<String>,
- pub pr_number: Option<i32>,
}
-/// Response for task diff.
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
-pub struct TaskDiffResponse {
+pub struct PendingQuestionSummary {
+ pub question_id: Uuid,
pub task_id: Uuid,
- pub success: bool,
- pub diff: Option<String>,
- pub error: Option<String>,
-}
-
-// =============================================================================
-// Git Operations - Handlers
-// =============================================================================
-
-/// Create a new branch from supervisor's worktree.
-#[utoipa::path(
- post,
- path = "/api/v1/mesh/supervisor/branches",
- request_body = CreateBranchRequest,
- responses(
- (status = 201, description = "Branch created", body = CreateBranchResponse),
- (status = 400, description = "Invalid request"),
- (status = 401, description = "Unauthorized"),
- (status = 403, description = "Forbidden - not a supervisor"),
- (status = 500, description = "Internal server error"),
- ),
- tag = "Mesh Supervisor"
-)]
-pub async fn create_branch(
- State(state): State<SharedState>,
- headers: HeaderMap,
- Json(request): Json<CreateBranchRequest>,
-) -> impl IntoResponse {
- let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
- Ok(ids) => ids,
- Err(e) => return e.into_response(),
- };
-
- // Find daemon running supervisor
- let daemon_id = {
- let pool = state.db_pool.as_ref().unwrap();
- match repository::get_task(pool, supervisor_id).await {
- Ok(Some(task)) => task.daemon_id,
- _ => None,
- }
- };
-
- let Some(daemon_id) = daemon_id else {
- return (
- StatusCode::SERVICE_UNAVAILABLE,
- Json(ApiError::new("NO_DAEMON", "Supervisor has no assigned daemon")),
- ).into_response();
- };
-
- // Send CreateBranch command to daemon
- let cmd = DaemonCommand::CreateBranch {
- task_id: supervisor_id,
- branch_name: request.branch_name.clone(),
- from_ref: request.from_ref,
- };
-
- if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
- tracing::error!(error = %e, "Failed to send CreateBranch command");
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")),
- ).into_response();
- }
-
- // Note: Real implementation would wait for daemon response
- // For now, return success immediately - daemon will send response via WebSocket
- (
- StatusCode::CREATED,
- Json(CreateBranchResponse {
- success: true,
- branch_name: request.branch_name,
- message: "Branch creation command sent".to_string(),
- }),
- ).into_response()
-}
-
-/// Merge a task's changes to a target branch.
-#[utoipa::path(
- post,
- path = "/api/v1/mesh/supervisor/tasks/{task_id}/merge",
- params(
- ("task_id" = Uuid, Path, description = "Task ID to merge")
- ),
- request_body = MergeTaskRequest,
- responses(
- (status = 200, description = "Merge initiated", body = MergeTaskResponse),
- (status = 400, description = "Invalid request"),
- (status = 401, description = "Unauthorized"),
- (status = 403, description = "Forbidden - not a supervisor"),
- (status = 404, description = "Task not found"),
- (status = 500, description = "Internal server error"),
- ),
- tag = "Mesh Supervisor"
-)]
-pub async fn merge_task(
- State(state): State<SharedState>,
- Path(task_id): Path<Uuid>,
- headers: HeaderMap,
- Json(request): Json<MergeTaskRequest>,
-) -> impl IntoResponse {
- let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
- Ok(ids) => ids,
- Err(e) => return e.into_response(),
- };
-
- let pool = state.db_pool.as_ref().unwrap();
-
- // Get the target task
- let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
- Ok(Some(t)) => t,
- Ok(None) => {
- return (
- StatusCode::NOT_FOUND,
- Json(ApiError::new("NOT_FOUND", "Task not found")),
- ).into_response();
- }
- Err(e) => {
- tracing::error!(error = %e, "Failed to get task");
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", "Failed to get task")),
- ).into_response();
- }
- };
-
- // Get daemon running the task
- let Some(daemon_id) = task.daemon_id else {
- return (
- StatusCode::SERVICE_UNAVAILABLE,
- Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")),
- ).into_response();
- };
-
- // Subscribe to merge results BEFORE sending the command
- let mut rx = state.merge_results.subscribe();
-
- // Send MergeTaskToTarget command to daemon
- let cmd = DaemonCommand::MergeTaskToTarget {
- task_id,
- target_branch: request.target_branch,
- squash: request.squash,
- };
-
- if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
- tracing::error!(error = %e, "Failed to send MergeTaskToTarget command");
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")),
- ).into_response();
- }
-
- // Wait for the merge result with a timeout (60 seconds should be plenty for a merge)
- let timeout = tokio::time::Duration::from_secs(60);
- let result = tokio::time::timeout(timeout, async {
- loop {
- match rx.recv().await {
- Ok(notification) => {
- if notification.task_id == task_id {
- return Some(notification);
- }
- // Not our task, keep waiting
- }
- Err(_) => {
- // Channel closed or lagged
- return None;
- }
- }
- }
- }).await;
-
- match result {
- Ok(Some(notification)) => {
- (
- StatusCode::OK,
- Json(MergeTaskResponse {
- task_id,
- success: notification.success,
- message: notification.message,
- commit_sha: notification.commit_sha,
- conflicts: notification.conflicts,
- }),
- ).into_response()
- }
- Ok(None) | Err(_) => {
- // Timeout or channel error - return error status
- (
- StatusCode::GATEWAY_TIMEOUT,
- Json(MergeTaskResponse {
- task_id,
- success: false,
- message: "Merge operation timed out waiting for daemon response".to_string(),
- commit_sha: None,
- conflicts: None,
- }),
- ).into_response()
- }
- }
-}
-
-/// Create a pull request for a task's changes.
-#[utoipa::path(
- post,
- path = "/api/v1/mesh/supervisor/pr",
- request_body = CreatePRRequest,
- responses(
- (status = 201, description = "PR created", body = CreatePRResponse),
- (status = 400, description = "Invalid request"),
- (status = 401, description = "Unauthorized"),
- (status = 403, description = "Forbidden - not a supervisor"),
- (status = 404, description = "Task not found"),
- (status = 500, description = "Internal server error"),
- ),
- tag = "Mesh Supervisor"
-)]
-pub async fn create_pr(
- State(state): State<SharedState>,
- headers: HeaderMap,
- Json(request): Json<CreatePRRequest>,
-) -> impl IntoResponse {
- let (supervisor_id, _owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
- Ok(ids) => ids,
- Err(e) => return e.into_response(),
- };
-
- let pool = state.db_pool.as_ref().unwrap();
-
- // Get the supervisor's own task to find daemon and base_branch
- let task = match repository::get_task(pool, supervisor_id).await {
- Ok(Some(t)) => t,
- Ok(None) => {
- return (
- StatusCode::NOT_FOUND,
- Json(ApiError::new("NOT_FOUND", "Supervisor task not found")),
- ).into_response();
- }
- Err(e) => {
- tracing::error!(error = %e, "Failed to get supervisor task");
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", "Failed to get supervisor task")),
- ).into_response();
- }
- };
-
- // Get daemon running the supervisor
- let Some(daemon_id) = task.daemon_id else {
- return (
- StatusCode::SERVICE_UNAVAILABLE,
- Json(ApiError::new("NO_DAEMON", "Supervisor has no assigned daemon")),
- ).into_response();
- };
-
- // Subscribe to PR results BEFORE sending the command
- let mut rx = state.pr_results.subscribe();
-
- // Send CreatePR command to daemon using the supervisor's task ID
- // (the branch is in the supervisor's worktree)
- // Pass base_branch from task if available, otherwise daemon will auto-detect
- let cmd = DaemonCommand::CreatePR {
- task_id: supervisor_id,
- title: request.title.clone(),
- body: request.body.clone(),
- base_branch: task.base_branch.clone(),
- branch: request.branch.clone(),
- };
-
- if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
- tracing::error!(error = %e, "Failed to send CreatePR command");
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")),
- ).into_response();
- }
-
- // Wait for the PR result with a timeout (60 seconds should be plenty for PR creation)
- let timeout = tokio::time::Duration::from_secs(60);
- let result = tokio::time::timeout(timeout, async {
- loop {
- match rx.recv().await {
- Ok(notification) => {
- if notification.task_id == supervisor_id {
- return Some(notification);
- }
- // Not our task, keep waiting
- }
- Err(_) => {
- // Channel closed or lagged
- return None;
- }
- }
- }
- }).await;
-
- match result {
- Ok(Some(notification)) => {
- let status = if notification.success {
- StatusCode::CREATED
- } else {
- StatusCode::INTERNAL_SERVER_ERROR
- };
- (
- status,
- Json(CreatePRResponse {
- task_id: supervisor_id,
- success: notification.success,
- message: notification.message,
- pr_url: notification.pr_url,
- pr_number: notification.pr_number,
- }),
- ).into_response()
- }
- Ok(None) | Err(_) => {
- // Timeout or channel error - return error status
- (
- StatusCode::GATEWAY_TIMEOUT,
- Json(CreatePRResponse {
- task_id: supervisor_id,
- success: false,
- message: "PR creation timed out waiting for daemon response".to_string(),
- pr_url: None,
- pr_number: None,
- }),
- ).into_response()
- }
- }
-}
-
-/// Get the diff for a task's changes.
-#[utoipa::path(
- get,
- path = "/api/v1/mesh/supervisor/tasks/{task_id}/diff",
- params(
- ("task_id" = Uuid, Path, description = "Task ID")
- ),
- responses(
- (status = 200, description = "Task diff", body = TaskDiffResponse),
- (status = 401, description = "Unauthorized"),
- (status = 403, description = "Forbidden - not a supervisor"),
- (status = 404, description = "Task not found"),
- (status = 500, description = "Internal server error"),
- ),
- tag = "Mesh Supervisor"
-)]
-pub async fn get_task_diff(
- State(state): State<SharedState>,
- Path(task_id): Path<Uuid>,
- headers: HeaderMap,
-) -> impl IntoResponse {
- let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
- Ok(ids) => ids,
- Err(e) => return e.into_response(),
- };
-
- let pool = state.db_pool.as_ref().unwrap();
-
- // Get the target task
- let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
- Ok(Some(t)) => t,
- Ok(None) => {
- return (
- StatusCode::NOT_FOUND,
- Json(ApiError::new("NOT_FOUND", "Task not found")),
- ).into_response();
- }
- Err(e) => {
- tracing::error!(error = %e, "Failed to get task");
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", "Failed to get task")),
- ).into_response();
- }
- };
-
- // Get daemon running the task
- let Some(daemon_id) = task.daemon_id else {
- return (
- StatusCode::SERVICE_UNAVAILABLE,
- Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")),
- ).into_response();
- };
-
- // Send GetTaskDiff command to daemon
- let cmd = DaemonCommand::GetTaskDiff { task_id };
-
- if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
- tracing::error!(error = %e, "Failed to send GetTaskDiff command");
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")),
- ).into_response();
- }
-
- (
- StatusCode::OK,
- Json(TaskDiffResponse {
- task_id,
- success: true,
- diff: None,
- error: Some("Diff command sent - response will be streamed".to_string()),
- }),
- ).into_response()
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub directive_id: Option<Uuid>,
+ pub question: String,
+ pub choices: Vec<String>,
+ pub context: Option<String>,
+ pub created_at: chrono::DateTime<chrono::Utc>,
+ #[serde(default)]
+ pub multi_select: bool,
+ #[serde(default)]
+ pub question_type: String,
}
// =============================================================================
-// Supervisor Question Handlers
+// Question handlers
// =============================================================================
-/// Ask a question and wait for user feedback.
-///
-/// The supervisor calls this to ask a question. The endpoint will poll until
-/// either the user responds or the timeout is reached.
+/// Ask the user a question from a directive task. Blocks until the user
+/// answers, the timeout fires, or `non_blocking` returns immediately.
#[utoipa::path(
post,
path = "/api/v1/mesh/supervisor/questions",
request_body = AskQuestionRequest,
responses(
- (status = 200, description = "Question answered", body = AskQuestionResponse),
- (status = 408, description = "Question timed out", body = AskQuestionResponse),
+ (status = 200, description = "Question asked", body = AskQuestionResponse),
(status = 401, description = "Unauthorized"),
- (status = 403, description = "Forbidden - not a supervisor"),
- (status = 500, description = "Internal server error"),
- ),
- security(
- ("tool_key" = [])
+ (status = 403, description = "Not a directive task"),
),
+ security(("tool_key" = [])),
tag = "Mesh Supervisor"
)]
pub async fn ask_question(
@@ -1676,67 +184,49 @@ pub async fn ask_question(
headers: HeaderMap,
Json(request): Json<AskQuestionRequest>,
) -> impl IntoResponse {
- let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
+ let (task_id, owner_id) = match verify_task_auth(&state, &headers).await {
Ok(ids) => ids,
Err(e) => return e.into_response(),
};
let pool = state.db_pool.as_ref().unwrap();
- // Get the supervisor task to find its contract
- let supervisor = match repository::get_task_for_owner(pool, supervisor_id, owner_id).await {
+ // Pull the directive_id off the calling task so subscribers can
+ // route the question to the right directive view.
+ let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
- Json(ApiError::new("NOT_FOUND", "Supervisor task not found")),
- ).into_response();
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
}
Err(e) => {
- tracing::error!(error = %e, "Failed to get supervisor task");
+ tracing::error!(error = %e, "Failed to fetch task");
return (
StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", "Failed to get supervisor task")),
- ).into_response();
+ Json(ApiError::new("DB_ERROR", "Failed to fetch task")),
+ )
+ .into_response();
}
};
- // Determine context: contract or directive
- let contract_id = supervisor.contract_id;
- let directive_id = supervisor.directive_id;
+ let directive_id = task.directive_id;
- if contract_id.is_none() && directive_id.is_none() {
- return (
- StatusCode::BAD_REQUEST,
- Json(ApiError::new("NO_CONTEXT", "Supervisor has no associated contract or directive")),
- ).into_response();
- }
-
- let is_directive_context = directive_id.is_some() && contract_id.is_none();
-
- // For directive context, check reconcile_mode to determine behavior
- let directive_reconcile_mode: String = if let Some(did) = directive_id {
- if is_directive_context {
- match repository::get_directive_for_owner(pool, owner_id, did).await {
- Ok(Some(d)) => d.reconcile_mode.clone(),
- Ok(None) => "auto".to_string(),
- Err(e) => {
- tracing::warn!(error = %e, "Failed to get directive for reconcile_mode check");
- "auto".to_string()
- }
- }
- } else {
- "auto".to_string()
- }
- } else {
- "auto".to_string()
+ // Reconcile mode controls block-vs-timeout behaviour on directive
+ // tasks: semi-auto / manual block indefinitely (effectively
+ // phaseguard); auto times out after 30s.
+ let reconcile_mode: String = match directive_id {
+ Some(did) => match repository::get_directive_for_owner(pool, owner_id, did).await {
+ Ok(Some(d)) => d.reconcile_mode.clone(),
+ _ => "auto".to_string(),
+ },
+ None => "auto".to_string(),
};
- // Add the question (use Uuid::nil() for contract_id in directive-only context)
- let effective_contract_id = contract_id.unwrap_or(Uuid::nil());
- let question_id = state.add_supervisor_question_with_directive(
- supervisor_id,
- effective_contract_id,
+ let question_id = state.add_supervisor_question(
+ task_id,
directive_id,
owner_id,
request.question.clone(),
@@ -1746,60 +236,6 @@ pub async fn ask_question(
request.question_type.clone(),
);
- // Save state: question asked is a key save point (Task 3.3)
- // Only for contract context — directive tasks don't use supervisor_states table
- if let Some(cid) = contract_id {
- let pending_question = PendingQuestion {
- id: question_id,
- question: request.question.clone(),
- choices: request.choices.clone(),
- context: request.context.clone(),
- question_type: request.question_type.clone(),
- asked_at: chrono::Utc::now(),
- };
- save_state_on_question_asked(pool, cid, pending_question).await;
- }
-
- // Broadcast question as task output entry for the task's chat
- let question_data = serde_json::json!({
- "question_id": question_id.to_string(),
- "choices": request.choices,
- "context": request.context,
- "multi_select": request.multi_select,
- "question_type": request.question_type,
- });
- state.broadcast_task_output(TaskOutputNotification {
- task_id: supervisor_id,
- owner_id: Some(owner_id),
- message_type: "supervisor_question".to_string(),
- content: request.question.clone(),
- tool_name: None,
- tool_input: Some(question_data.clone()),
- is_error: None,
- cost_usd: None,
- duration_ms: None,
- is_partial: false,
- });
-
- // Persist to database so it appears when reloading the page
- // Use event_type "output" with messageType "supervisor_question" to match TaskOutputEntry format
- if let Some(pool) = state.db_pool.as_ref() {
- let event_data = serde_json::json!({
- "messageType": "supervisor_question",
- "content": request.question,
- "toolInput": question_data,
- });
- let _ = repository::create_task_event(
- pool,
- supervisor_id,
- "output",
- None,
- None,
- Some(event_data),
- ).await;
- }
-
- // If non_blocking mode, return immediately
if request.non_blocking {
return (
StatusCode::OK,
@@ -1809,41 +245,28 @@ pub async fn ask_question(
timed_out: false,
still_pending: false,
}),
- ).into_response();
+ )
+ .into_response();
}
- // Determine if we should block indefinitely (phaseguard or directive reconcile mode)
- let use_phaseguard = request.phaseguard || (is_directive_context && (directive_reconcile_mode == "semi-auto" || directive_reconcile_mode == "manual"));
-
- // Poll for response with timeout
- // - Phaseguard: block indefinitely until user responds
- // - Directive tasks without reconcile mode: 30s default timeout
- // - Contract tasks: use requested timeout_seconds
+ // Determine block behaviour.
+ let use_phaseguard =
+ request.phaseguard || reconcile_mode == "semi-auto" || reconcile_mode == "manual";
let timeout_secs = if use_phaseguard {
- // Cap at 5 minutes per HTTP request (well under Claude Code's 10-min limit).
- // The CLI will automatically reconnect via the poll endpoint.
300
- } else if is_directive_context && directive_reconcile_mode == "auto" {
+ } else if reconcile_mode == "auto" {
30
} else {
request.timeout_seconds.max(1) as u64
};
+
let timeout_duration = std::time::Duration::from_secs(timeout_secs);
let start = std::time::Instant::now();
let poll_interval = std::time::Duration::from_millis(500);
loop {
- // Check if response has been submitted
if let Some(response) = state.get_question_response(question_id) {
- // Clean up the response
state.cleanup_question_response(question_id);
-
- // Clear pending question from supervisor state (Task 3.3)
- // Skip for directive context — no supervisor_states for directives
- if let Some(cid) = contract_id {
- clear_pending_question(pool, cid, question_id).await;
- }
-
return (
StatusCode::OK,
Json(AskQuestionResponse {
@@ -1852,14 +275,12 @@ pub async fn ask_question(
timed_out: false,
still_pending: false,
}),
- ).into_response();
+ )
+ .into_response();
}
- // Check timeout
if start.elapsed() >= timeout_duration {
if use_phaseguard {
- // Phaseguard/reconcile: DON'T remove the pending question.
- // Return still_pending so the CLI can reconnect via the poll endpoint.
return (
StatusCode::OK,
Json(AskQuestionResponse {
@@ -1868,18 +289,10 @@ pub async fn ask_question(
timed_out: false,
still_pending: true,
}),
- ).into_response();
+ )
+ .into_response();
}
-
- // Non-phaseguard: remove the pending question on timeout
state.remove_pending_question(question_id);
-
- // Clear pending question from supervisor state on timeout (Task 3.3)
- // Skip for directive context — no supervisor_states for directives
- if let Some(cid) = contract_id {
- clear_pending_question(pool, cid, question_id).await;
- }
-
return (
StatusCode::REQUEST_TIMEOUT,
Json(AskQuestionResponse {
@@ -1888,34 +301,25 @@ pub async fn ask_question(
timed_out: true,
still_pending: false,
}),
- ).into_response();
+ )
+ .into_response();
}
- // Wait before polling again
tokio::time::sleep(poll_interval).await;
}
}
-/// Poll for a question response by question_id.
-///
-/// Used by the CLI to reconnect after a still_pending response from ask_question.
-/// Blocks for up to 5 minutes polling every 500ms. Returns still_pending if timeout
-/// is reached without a response, allowing the CLI to reconnect again.
+/// Re-poll a question by id. Used by the CLI to reconnect after
+/// `still_pending` from `ask_question`. Blocks up to 5 minutes.
#[utoipa::path(
get,
path = "/api/v1/mesh/supervisor/questions/{question_id}/poll",
- params(
- ("question_id" = Uuid, Path, description = "The question ID to poll for"),
- ),
+ params(("question_id" = Uuid, Path, description = "Question id")),
responses(
- (status = 200, description = "Question answered or still pending", body = AskQuestionResponse),
- (status = 404, description = "Question not found"),
- (status = 401, description = "Unauthorized"),
- (status = 403, description = "Forbidden - not a supervisor"),
- ),
- security(
- ("tool_key" = [])
+ (status = 200, description = "Answered or still pending", body = AskQuestionResponse),
+ (status = 404, description = "Not found"),
),
+ security(("tool_key" = [])),
tag = "Mesh Supervisor"
)]
pub async fn poll_question(
@@ -1923,23 +327,16 @@ pub async fn poll_question(
headers: HeaderMap,
Path(question_id): Path<Uuid>,
) -> impl IntoResponse {
- let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
- Ok(ids) => ids,
- Err(e) => return e.into_response(),
- };
+ if verify_task_auth(&state, &headers).await.is_err() {
+ return (
+ StatusCode::UNAUTHORIZED,
+ Json(ApiError::new("UNAUTHORIZED", "Tool key required")),
+ )
+ .into_response();
+ }
- // Check if a response is already available
if let Some(response) = state.get_question_response(question_id) {
state.cleanup_question_response(question_id);
-
- // Clear pending question from supervisor state for contract context
- let pool = state.db_pool.as_ref().unwrap();
- if let Ok(Some(task)) = repository::get_task_for_owner(pool, supervisor_id, owner_id).await {
- if let Some(cid) = task.contract_id {
- clear_pending_question(pool, cid, question_id).await;
- }
- }
-
return (
StatusCode::OK,
Json(AskQuestionResponse {
@@ -1948,35 +345,25 @@ pub async fn poll_question(
timed_out: false,
still_pending: false,
}),
- ).into_response();
+ )
+ .into_response();
}
- // Check if the question exists at all (pending or response)
- if !state.has_pending_question(question_id) {
+ if state.get_pending_question(question_id).is_none() {
return (
StatusCode::NOT_FOUND,
- Json(ApiError::new("NOT_FOUND", "Question not found or already answered")),
- ).into_response();
+ Json(ApiError::new("NOT_FOUND", "Question not found")),
+ )
+ .into_response();
}
- // Block for up to 5 minutes polling every 500ms
- let timeout_duration = std::time::Duration::from_secs(300);
+ let timeout = std::time::Duration::from_secs(300);
let start = std::time::Instant::now();
let poll_interval = std::time::Duration::from_millis(500);
loop {
- // Check if response has been submitted
if let Some(response) = state.get_question_response(question_id) {
state.cleanup_question_response(question_id);
-
- // Clear pending question from supervisor state for contract context
- let pool = state.db_pool.as_ref().unwrap();
- if let Ok(Some(task)) = repository::get_task_for_owner(pool, supervisor_id, owner_id).await {
- if let Some(cid) = task.contract_id {
- clear_pending_question(pool, cid, question_id).await;
- }
- }
-
return (
StatusCode::OK,
Json(AskQuestionResponse {
@@ -1985,19 +372,10 @@ pub async fn poll_question(
timed_out: false,
still_pending: false,
}),
- ).into_response();
- }
-
- // Check if the question was removed (e.g., task deleted)
- if !state.has_pending_question(question_id) {
- return (
- StatusCode::NOT_FOUND,
- Json(ApiError::new("NOT_FOUND", "Question no longer exists")),
- ).into_response();
+ )
+ .into_response();
}
-
- // Check timeout
- if start.elapsed() >= timeout_duration {
+ if start.elapsed() >= timeout {
return (
StatusCode::OK,
Json(AskQuestionResponse {
@@ -2006,27 +384,21 @@ pub async fn poll_question(
timed_out: false,
still_pending: true,
}),
- ).into_response();
+ )
+ .into_response();
}
-
- // Wait before polling again
tokio::time::sleep(poll_interval).await;
}
}
-/// Get all pending questions for the current user.
+/// List currently-pending questions for the caller.
#[utoipa::path(
get,
path = "/api/v1/mesh/questions",
responses(
- (status = 200, description = "List of pending questions", body = Vec<PendingQuestionSummary>),
- (status = 401, description = "Unauthorized"),
- (status = 500, description = "Internal server error"),
- ),
- security(
- ("bearer_auth" = []),
- ("api_key" = [])
+ (status = 200, description = "Pending questions", body = Vec<PendingQuestionSummary>),
),
+ security(("bearer_auth" = []), ("api_key" = [])),
tag = "Mesh"
)]
pub async fn list_pending_questions(
@@ -2039,7 +411,6 @@ pub async fn list_pending_questions(
.map(|q| PendingQuestionSummary {
question_id: q.question_id,
task_id: q.task_id,
- contract_id: q.contract_id,
directive_id: q.directive_id,
question: q.question,
choices: q.choices,
@@ -2053,1051 +424,59 @@ pub async fn list_pending_questions(
Json(questions).into_response()
}
-/// Answer a pending supervisor question.
+/// Answer a pending question.
#[utoipa::path(
post,
path = "/api/v1/mesh/questions/{question_id}/answer",
- params(
- ("question_id" = Uuid, Path, description = "Question ID")
- ),
+ params(("question_id" = Uuid, Path, description = "Question id")),
request_body = AnswerQuestionRequest,
responses(
- (status = 200, description = "Question answered", body = AnswerQuestionResponse),
- (status = 401, description = "Unauthorized"),
- (status = 404, description = "Question not found"),
- (status = 500, description = "Internal server error"),
- ),
- security(
- ("bearer_auth" = []),
- ("api_key" = [])
+ (status = 200, description = "Answered", body = AnswerQuestionResponse),
+ (status = 404, description = "Not found"),
),
+ security(("bearer_auth" = []), ("api_key" = [])),
tag = "Mesh"
)]
pub async fn answer_question(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(question_id): Path<Uuid>,
- Json(request): Json<AnswerQuestionRequest>,
+ Json(req): Json<AnswerQuestionRequest>,
) -> impl IntoResponse {
- // Verify the question exists and belongs to this owner
+ // Ownership check: only the owner of the question can answer it.
let question = match state.get_pending_question(question_id) {
- Some(q) if q.owner_id == auth.owner_id => q,
- Some(_) => {
- return (
- StatusCode::FORBIDDEN,
- Json(ApiError::new("FORBIDDEN", "Question belongs to another user")),
- ).into_response();
- }
- None => {
- return (
- StatusCode::NOT_FOUND,
- Json(ApiError::new("NOT_FOUND", "Question not found or already answered")),
- ).into_response();
- }
- };
-
- // Submit the response
- let success = state.submit_question_response(question_id, request.response.clone());
-
- if success {
- tracing::info!(
- question_id = %question_id,
- task_id = %question.task_id,
- "User answered supervisor question"
- );
-
- // Send the response to the task as a message
- // This will auto-resume the task if it was paused (phaseguard)
- let pool = state.db_pool.as_ref().unwrap();
- if let Ok(Some(task)) = repository::get_task_for_owner(pool, question.task_id, auth.owner_id).await {
- if let Some(daemon_id) = task.daemon_id {
- // Format the response message
- let response_msg = format!(
- "\n[User Response to Question]\nQuestion: {}\nAnswer: {}\n",
- question.question,
- request.response
- );
- let cmd = DaemonCommand::SendMessage {
- task_id: question.task_id,
- message: response_msg,
- };
- if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
- tracing::warn!(
- task_id = %question.task_id,
- error = %e,
- "Failed to send response message to task"
- );
- } else {
- tracing::info!(
- task_id = %question.task_id,
- "Sent response message to task (will auto-resume if paused)"
- );
- }
- }
- }
- }
-
- Json(AnswerQuestionResponse { success }).into_response()
-}
-
-// =============================================================================
-// Supervisor Resume and Conversation Rewind
-// =============================================================================
-
-/// Response for supervisor resume
-#[derive(Debug, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct ResumeSupervisorResponse {
- pub supervisor_task_id: Uuid,
- pub daemon_id: Option<Uuid>,
- pub resumed_from: ResumedFromInfo,
- pub status: String,
- /// Restoration context (Task 3.4)
- pub restoration: Option<RestorationInfo>,
-}
-
-#[derive(Debug, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct ResumedFromInfo {
- pub phase: String,
- pub last_activity: chrono::DateTime<chrono::Utc>,
- pub message_count: i32,
-}
-
-/// Information about supervisor restoration (Task 3.4)
-#[derive(Debug, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct RestorationInfo {
- /// Previous state before restoration
- pub previous_state: String,
- /// How many times this supervisor has been restored
- pub restoration_count: i32,
- /// Number of pending questions to re-deliver
- pub pending_questions_count: usize,
- /// Number of tasks being waited on
- pub waiting_tasks_count: usize,
- /// Number of tasks spawned before crash
- pub spawned_tasks_count: usize,
- /// Any warnings from state validation
- pub warnings: Vec<String>,
-}
-
-/// Resume interrupted supervisor with specified mode.
-///
-/// POST /api/v1/contracts/{id}/supervisor/resume
-#[utoipa::path(
- post,
- path = "/api/v1/contracts/{id}/supervisor/resume",
- params(
- ("id" = Uuid, Path, description = "Contract ID")
- ),
- request_body = crate::db::models::ResumeSupervisorRequest,
- responses(
- (status = 200, description = "Supervisor resumed", body = ResumeSupervisorResponse),
- (status = 400, description = "Invalid request", body = ApiError),
- (status = 401, description = "Unauthorized", body = ApiError),
- (status = 404, description = "Contract or supervisor not found", body = ApiError),
- (status = 409, description = "Supervisor is already running", body = ApiError),
- (status = 503, description = "Database not configured", body = ApiError),
- (status = 500, description = "Internal server error", body = ApiError),
- ),
- security(
- ("bearer_auth" = []),
- ("api_key" = [])
- ),
- tag = "Mesh Supervisor"
-)]
-pub async fn resume_supervisor(
- State(state): State<SharedState>,
- Path(contract_id): Path<Uuid>,
- auth: crate::server::auth::Authenticated,
- Json(req): Json<crate::db::models::ResumeSupervisorRequest>,
-) -> impl IntoResponse {
- let crate::server::auth::Authenticated(auth_info) = auth;
-
- let Some(ref pool) = state.db_pool else {
- return (
- StatusCode::SERVICE_UNAVAILABLE,
- Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
- )
- .into_response();
- };
-
- // Get contract and verify ownership
- let contract = match repository::get_contract_for_owner(pool, contract_id, auth_info.owner_id).await {
- Ok(Some(c)) => c,
- Ok(None) => {
- return (
- StatusCode::NOT_FOUND,
- Json(ApiError::new("NOT_FOUND", "Contract not found")),
- )
- .into_response();
- }
- Err(e) => {
- tracing::error!("Failed to get contract {}: {}", contract_id, e);
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", e.to_string())),
- )
- .into_response();
- }
- };
-
- // Get existing supervisor state
- let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await {
- Ok(Some(s)) => s,
- Ok(None) => {
- return (
- StatusCode::NOT_FOUND,
- Json(ApiError::new(
- "NO_SUPERVISOR_STATE",
- "No supervisor state found - supervisor may not have been started",
- )),
- )
- .into_response();
- }
- Err(e) => {
- tracing::error!("Failed to get supervisor state: {}", e);
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", e.to_string())),
- )
- .into_response();
- }
- };
-
- // Get supervisor task
- let supervisor_task = match repository::get_task_for_owner(pool, supervisor_state.task_id, auth_info.owner_id).await {
- Ok(Some(t)) => t,
- Ok(None) => {
- return (
- StatusCode::NOT_FOUND,
- Json(ApiError::new("NOT_FOUND", "Supervisor task not found")),
- )
- .into_response();
- }
- Err(e) => {
- tracing::error!("Failed to get supervisor task: {}", e);
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", e.to_string())),
- )
- .into_response();
- }
- };
-
- // Check if already running - but only if daemon is actually connected
- // (daemon disconnect handler may not have updated status yet)
- if supervisor_task.status == "running" {
- let daemon_connected = supervisor_task
- .daemon_id
- .map(|d| state.is_daemon_connected(d))
- .unwrap_or(false);
-
- if daemon_connected {
- return (
- StatusCode::CONFLICT,
- Json(ApiError::new("ALREADY_RUNNING", "Supervisor is already running")),
- )
- .into_response();
- }
- // Daemon not connected - allow resume (treat as interrupted)
- tracing::info!(
- supervisor_task_id = %supervisor_task.id,
- daemon_id = ?supervisor_task.daemon_id,
- "Supervisor status is 'running' but daemon is not connected, allowing resume"
- );
- }
-
- // Calculate message count from conversation history
- let message_count = supervisor_state
- .conversation_history
- .as_array()
- .map(|arr| arr.len() as i32)
- .unwrap_or(0);
-
- // Find a connected daemon for this owner
- let target_daemon_id = match state.find_alternative_daemon(auth_info.owner_id, &[]) {
- Some(id) => id,
+ Some(q) => q,
None => {
return (
- StatusCode::SERVICE_UNAVAILABLE,
- Json(ApiError::new(
- "NO_DAEMON",
- "No daemons connected for your account. Cannot resume supervisor.",
- )),
- )
- .into_response();
- }
- };
-
- // Track response values (may be updated by resume modes)
- let mut response_daemon_id = supervisor_task.daemon_id;
- let mut response_status = "pending".to_string();
-
- // Based on resume mode, handle differently
- match req.resume_mode.as_str() {
- "continue" => {
- // Update task status to starting and assign daemon
- if let Err(e) = sqlx::query("UPDATE tasks SET status = 'starting', daemon_id = $1 WHERE id = $2")
- .bind(target_daemon_id)
- .bind(supervisor_state.task_id)
- .execute(pool)
- .await
- {
- tracing::error!("Failed to update task for resume: {}", e);
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", e.to_string())),
- )
- .into_response();
- }
-
- // Fetch latest checkpoint patch for worktree recovery
- let (patch_data, patch_base_sha) = match repository::get_latest_checkpoint_patch(pool, supervisor_state.task_id).await {
- Ok(Some(patch)) => {
- tracing::info!(
- task_id = %supervisor_state.task_id,
- patch_size = patch.patch_size_bytes,
- base_sha = %patch.base_commit_sha,
- "Including checkpoint patch for worktree recovery"
- );
- // Encode patch as base64 for JSON transport
- let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data);
- (Some(encoded), Some(patch.base_commit_sha))
- }
- Ok(None) => {
- tracing::debug!(task_id = %supervisor_state.task_id, "No checkpoint patch found");
- (None, None)
- }
- Err(e) => {
- tracing::warn!(task_id = %supervisor_state.task_id, error = %e, "Failed to fetch checkpoint patch");
- (None, None)
- }
- };
-
- // Send SpawnTask with resume_session=true to use Claude's --continue
- // Include conversation_history as fallback if worktree doesn't exist on target daemon
- let command = DaemonCommand::SpawnTask {
- task_id: supervisor_state.task_id,
- task_name: supervisor_task.name.clone(),
- plan: supervisor_task.plan.clone(),
- repo_url: supervisor_task.repository_url.clone(),
- base_branch: supervisor_task.base_branch.clone(),
- target_branch: supervisor_task.target_branch.clone(),
- parent_task_id: supervisor_task.parent_task_id,
- depth: supervisor_task.depth,
- is_orchestrator: false,
- target_repo_path: supervisor_task.target_repo_path.clone(),
- completion_action: supervisor_task.completion_action.clone(),
- continue_from_task_id: supervisor_task.continue_from_task_id,
- copy_files: supervisor_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
- contract_id: supervisor_task.contract_id,
- is_supervisor: true,
- autonomous_loop: false,
- resume_session: true, // Use --continue to preserve conversation
- conversation_history: Some(supervisor_state.conversation_history.clone()), // Fallback if worktree missing
- patch_data,
- patch_base_sha,
- local_only: contract.local_only,
- auto_merge_local: contract.auto_merge_local,
- supervisor_worktree_task_id: None, // Supervisor uses its own worktree
- directive_id: supervisor_task.directive_id,
- };
-
- if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
- // Rollback status on failure
- let _ = sqlx::query("UPDATE tasks SET status = 'interrupted', daemon_id = NULL WHERE id = $1")
- .bind(supervisor_state.task_id)
- .execute(pool)
- .await;
- tracing::error!("Failed to send SpawnTask to daemon: {}", e);
- return (
- StatusCode::SERVICE_UNAVAILABLE,
- Json(ApiError::new("DAEMON_ERROR", format!("Failed to send to daemon: {}", e))),
- )
- .into_response();
- }
-
- tracing::info!(
- contract_id = %contract_id,
- supervisor_task_id = %supervisor_state.task_id,
- daemon_id = %target_daemon_id,
- message_count = message_count,
- "Supervisor resumed with --continue (resume_session=true)"
- );
-
- // Update response values for successful resume
- response_daemon_id = Some(target_daemon_id);
- response_status = "starting".to_string();
- }
- "restart_phase" => {
- // Clear conversation but keep phase progress
- if let Err(e) = repository::update_supervisor_conversation(
- pool,
- contract_id,
- serde_json::json!([]),
- )
- .await
- {
- tracing::error!("Failed to clear conversation: {}", e);
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", e.to_string())),
- )
- .into_response();
- }
-
- if let Err(e) = sqlx::query("UPDATE tasks SET status = 'pending' WHERE id = $1")
- .bind(supervisor_state.task_id)
- .execute(pool)
- .await
- {
- tracing::error!("Failed to update task status: {}", e);
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", e.to_string())),
- )
- .into_response();
- }
- }
- "from_checkpoint" => {
- // This would require more complex handling with checkpoint system
- return (
- StatusCode::BAD_REQUEST,
- Json(ApiError::new(
- "NOT_IMPLEMENTED",
- "from_checkpoint mode not yet implemented",
- )),
- )
- .into_response();
- }
- _ => {
- return (
- StatusCode::BAD_REQUEST,
- Json(ApiError::new(
- "INVALID_RESUME_MODE",
- "Invalid resume_mode. Use: continue, restart_phase, or from_checkpoint",
- )),
- )
- .into_response();
- }
- }
-
- tracing::info!(
- contract_id = %contract_id,
- supervisor_task_id = %supervisor_state.task_id,
- resume_mode = %req.resume_mode,
- message_count = message_count,
- "Supervisor resume requested"
- );
-
- // Build restoration info (Task 3.4)
- let pending_questions: Vec<PendingQuestion> = serde_json::from_value(
- supervisor_state.pending_questions.clone()
- ).unwrap_or_default();
-
- let restoration_info = RestorationInfo {
- previous_state: supervisor_state.state.clone(),
- restoration_count: supervisor_state.restoration_count,
- pending_questions_count: pending_questions.len(),
- waiting_tasks_count: supervisor_state.pending_task_ids.len(),
- spawned_tasks_count: supervisor_state.spawned_task_ids.len(),
- warnings: vec![], // Could add validation warnings here
- };
-
- // Re-deliver pending questions if any (Task 3.4)
- if !pending_questions.is_empty() {
- redeliver_pending_questions(
- &state,
- supervisor_state.task_id,
- contract_id,
- auth_info.owner_id,
- &pending_questions,
- ).await;
- }
-
- Json(ResumeSupervisorResponse {
- supervisor_task_id: supervisor_state.task_id,
- daemon_id: response_daemon_id,
- resumed_from: ResumedFromInfo {
- phase: contract.phase,
- last_activity: supervisor_state.last_activity,
- message_count,
- },
- status: response_status,
- restoration: Some(restoration_info),
- })
- .into_response()
-}
-
-/// Response for conversation rewind
-#[derive(Debug, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct RewindConversationResponse {
- pub contract_id: Uuid,
- pub messages_removed: i32,
- pub new_message_count: i32,
- pub code_rewound: bool,
-}
-
-/// Rewind supervisor conversation to specified point.
-///
-/// POST /api/v1/contracts/{id}/supervisor/conversation/rewind
-#[utoipa::path(
- post,
- path = "/api/v1/contracts/{id}/supervisor/conversation/rewind",
- params(
- ("id" = Uuid, Path, description = "Contract ID")
- ),
- request_body = crate::db::models::RewindConversationRequest,
- responses(
- (status = 200, description = "Conversation rewound", body = RewindConversationResponse),
- (status = 400, description = "Invalid request", body = ApiError),
- (status = 401, description = "Unauthorized", body = ApiError),
- (status = 404, description = "Contract or supervisor not found", body = ApiError),
- (status = 503, description = "Database not configured", body = ApiError),
- (status = 500, description = "Internal server error", body = ApiError),
- ),
- security(
- ("bearer_auth" = []),
- ("api_key" = [])
- ),
- tag = "Mesh Supervisor"
-)]
-pub async fn rewind_conversation(
- State(state): State<SharedState>,
- Path(contract_id): Path<Uuid>,
- auth: crate::server::auth::Authenticated,
- Json(req): Json<crate::db::models::RewindConversationRequest>,
-) -> impl IntoResponse {
- let crate::server::auth::Authenticated(auth_info) = auth;
-
- let Some(ref pool) = state.db_pool else {
- return (
- StatusCode::SERVICE_UNAVAILABLE,
- Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
- )
- .into_response();
- };
-
- // Get contract and verify ownership
- let _contract = match repository::get_contract_for_owner(pool, contract_id, auth_info.owner_id).await {
- Ok(Some(c)) => c,
- Ok(None) => {
- return (
StatusCode::NOT_FOUND,
- Json(ApiError::new("NOT_FOUND", "Contract not found")),
- )
- .into_response();
- }
- Err(e) => {
- tracing::error!("Failed to get contract {}: {}", contract_id, e);
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", e.to_string())),
+ Json(ApiError::new("NOT_FOUND", "Question not found")),
)
.into_response();
}
};
-
- // Get supervisor state
- let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await {
- Ok(Some(s)) => s,
- Ok(None) => {
- return (
- StatusCode::NOT_FOUND,
- Json(ApiError::new("NOT_FOUND", "Supervisor state not found")),
- )
- .into_response();
- }
- Err(e) => {
- tracing::error!("Failed to get supervisor state: {}", e);
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", e.to_string())),
- )
- .into_response();
- }
- };
-
- let conversation = supervisor_state
- .conversation_history
- .as_array()
- .cloned()
- .unwrap_or_default();
-
- let original_count = conversation.len() as i32;
-
- // Determine how many messages to keep
- let new_count = if let Some(by_count) = req.by_message_count {
- (original_count - by_count).max(0)
- } else if let Some(ref to_id) = req.to_message_id {
- // Find message by ID and keep up to and including it
- let index = conversation
- .iter()
- .position(|msg| msg.get("id").and_then(|v| v.as_str()) == Some(to_id.as_str()))
- .map(|i| i as i32)
- .unwrap_or(original_count - 1);
- (index + 1).min(original_count).max(0)
- } else {
- // Default to removing last message
- (original_count - 1).max(0)
- };
-
- // Truncate conversation
- let new_conversation: Vec<serde_json::Value> = conversation
- .into_iter()
- .take(new_count as usize)
- .collect();
-
- // Update the conversation
- if let Err(e) = repository::update_supervisor_conversation(
- pool,
- contract_id,
- serde_json::Value::Array(new_conversation),
- )
- .await
- {
- tracing::error!("Failed to update conversation: {}", e);
+ if question.owner_id != auth.owner_id {
return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", e.to_string())),
+ StatusCode::FORBIDDEN,
+ Json(ApiError::new("FORBIDDEN", "Not your question")),
)
.into_response();
}
- tracing::info!(
- contract_id = %contract_id,
- original_count = original_count,
- new_count = new_count,
- messages_removed = original_count - new_count,
- "Conversation rewound"
- );
-
- Json(RewindConversationResponse {
- contract_id,
- messages_removed: original_count - new_count,
- new_message_count: new_count,
- code_rewound: req.rewind_code.unwrap_or(false), // TODO: implement code rewind
- })
- .into_response()
-}
-
-// =============================================================================
-// Supervisor State Persistence Helpers (Task 3.3)
-// =============================================================================
-
-use crate::db::models::{
- SupervisorRestorationContext, SupervisorStateEnum,
- StateValidationResult, StateRecoveryAction,
-};
-
-/// Save supervisor state on task spawn.
-/// This is called when a supervisor spawns a new task.
-pub async fn save_state_on_task_spawn(
- pool: &PgPool,
- contract_id: Uuid,
- spawned_task_id: Uuid,
-) {
- if let Err(e) = repository::add_supervisor_spawned_task(pool, contract_id, spawned_task_id).await {
- tracing::warn!(
- contract_id = %contract_id,
- spawned_task_id = %spawned_task_id,
- error = %e,
- "Failed to save spawned task to supervisor state"
- );
+ if state.submit_question_response(question_id, req.response) {
+ Json(AnswerQuestionResponse { success: true }).into_response()
} else {
- tracing::debug!(
- contract_id = %contract_id,
- spawned_task_id = %spawned_task_id,
- "Saved spawned task to supervisor state"
- );
- }
-
- // Also update state to working
- if let Err(e) = repository::update_supervisor_detailed_state(
- pool,
- contract_id,
- "working",
- Some(&format!("Spawned task {}", spawned_task_id)),
- 0, // Progress resets when spawning new work
- None,
- ).await {
- tracing::warn!(contract_id = %contract_id, error = %e, "Failed to update supervisor state on task spawn");
- }
-}
-
-/// Save supervisor state on question asked.
-/// This is called when a supervisor asks a question and is waiting for user input.
-pub async fn save_state_on_question_asked(
- pool: &PgPool,
- contract_id: Uuid,
- question: PendingQuestion,
-) {
- let question_json = match serde_json::to_value(&[&question]) {
- Ok(v) => v,
- Err(e) => {
- tracing::warn!(contract_id = %contract_id, error = %e, "Failed to serialize pending question");
- return;
- }
- };
-
- if let Err(e) = repository::add_supervisor_pending_question(pool, contract_id, question_json).await {
- tracing::warn!(
- contract_id = %contract_id,
- question_id = %question.id,
- error = %e,
- "Failed to save pending question to supervisor state"
- );
- } else {
- tracing::debug!(
- contract_id = %contract_id,
- question_id = %question.id,
- "Saved pending question to supervisor state"
- );
- }
-}
-
-/// Clear pending question after it's answered.
-pub async fn clear_pending_question(
- pool: &PgPool,
- contract_id: Uuid,
- question_id: Uuid,
-) {
- if let Err(e) = repository::remove_supervisor_pending_question(pool, contract_id, question_id).await {
- tracing::warn!(
- contract_id = %contract_id,
- question_id = %question_id,
- error = %e,
- "Failed to remove pending question from supervisor state"
- );
- }
-
- // Update state back to working (if no more pending questions)
- match repository::get_supervisor_state(pool, contract_id).await {
- Ok(Some(state)) => {
- let questions: Vec<PendingQuestion> = serde_json::from_value(state.pending_questions.clone())
- .unwrap_or_default();
- if questions.is_empty() {
- let _ = repository::update_supervisor_detailed_state(
- pool,
- contract_id,
- "working",
- Some("Resumed after user response"),
- state.progress,
- None,
- ).await;
- }
- }
- Ok(None) => {}
- Err(e) => {
- tracing::warn!(contract_id = %contract_id, error = %e, "Failed to check supervisor state after clearing question");
- }
- }
-}
-
-/// Save supervisor state on phase change.
-pub async fn save_state_on_phase_change(
- pool: &PgPool,
- contract_id: Uuid,
- new_phase: &str,
-) {
- if let Err(e) = repository::update_supervisor_phase(pool, contract_id, new_phase).await {
- tracing::warn!(
- contract_id = %contract_id,
- new_phase = %new_phase,
- error = %e,
- "Failed to update supervisor state on phase change"
- );
- } else {
- tracing::info!(
- contract_id = %contract_id,
- new_phase = %new_phase,
- "Updated supervisor state on phase change"
- );
- }
-}
-
-// =============================================================================
-// Supervisor Restoration Protocol (Task 3.4)
-// =============================================================================
-
-/// Validate supervisor state consistency before restoration.
-/// Checks that spawned tasks and pending questions are in expected states.
-pub async fn validate_supervisor_state(
- pool: &PgPool,
- state: &crate::db::models::SupervisorState,
-) -> StateValidationResult {
- let mut issues = Vec::new();
-
- // Validate spawned tasks
- if !state.spawned_task_ids.is_empty() {
- match repository::validate_spawned_tasks(pool, &state.spawned_task_ids).await {
- Ok(task_statuses) => {
- for task_id in &state.spawned_task_ids {
- if !task_statuses.contains_key(task_id) {
- issues.push(format!("Spawned task {} not found in database", task_id));
- }
- }
- }
- Err(e) => {
- issues.push(format!("Failed to validate spawned tasks: {}", e));
- }
- }
- }
-
- // Validate pending questions
- let pending_questions: Vec<PendingQuestion> = serde_json::from_value(state.pending_questions.clone())
- .unwrap_or_default();
-
- // Check if questions are not too old (e.g., more than 24 hours)
- for question in &pending_questions {
- let age = chrono::Utc::now() - question.asked_at;
- if age.num_hours() > 24 {
- issues.push(format!(
- "Pending question {} is {} hours old, may be stale",
- question.id, age.num_hours()
- ));
- }
- }
-
- // Validate conversation history
- if let Some(history) = state.conversation_history.as_array() {
- if history.is_empty() && state.restoration_count > 0 {
- issues.push("Conversation history is empty after previous restoration".to_string());
- }
- }
-
- // Determine recovery action
- let recovery_action = if issues.is_empty() {
- StateRecoveryAction::Proceed
- } else if issues.iter().any(|i| i.contains("not found")) {
- // Missing tasks suggest corruption - use checkpoint
- StateRecoveryAction::UseCheckpoint
- } else if issues.len() > 3 {
- // Many issues suggest manual intervention needed
- StateRecoveryAction::ManualIntervention
- } else {
- // Minor issues - proceed with warnings
- StateRecoveryAction::Proceed
- };
-
- StateValidationResult {
- is_valid: issues.is_empty(),
- issues,
- recovery_action,
- }
-}
-
-/// Restore supervisor from saved state after daemon crash or task reassignment.
-/// Returns restoration context to send to the supervisor.
-pub async fn restore_supervisor(
- pool: &PgPool,
- contract_id: Uuid,
- restoration_source: &str,
-) -> Result<SupervisorRestorationContext, String> {
- // Step 1: Load supervisor state
- let state = match repository::get_supervisor_state_for_restoration(pool, contract_id).await {
- Ok(Some(s)) => s,
- Ok(None) => {
- tracing::warn!(
- contract_id = %contract_id,
- "No supervisor state found for restoration - starting fresh"
- );
- return Ok(SupervisorRestorationContext {
- success: true,
- previous_state: SupervisorStateEnum::Initializing,
- conversation_history: serde_json::json!([]),
- pending_questions: vec![],
- waiting_task_ids: vec![],
- spawned_task_ids: vec![],
- restoration_count: 0,
- restoration_context_message: "No previous state found. Starting fresh.".to_string(),
- warnings: vec!["No previous supervisor state found".to_string()],
- });
- }
- Err(e) => {
- return Err(format!("Failed to load supervisor state: {}", e));
- }
- };
-
- // Step 2: Parse previous state
- let previous_state: SupervisorStateEnum = state.state.parse().unwrap_or(SupervisorStateEnum::Interrupted);
-
- // Step 3: Validate state consistency
- let validation = validate_supervisor_state(pool, &state).await;
- let mut warnings = validation.issues.clone();
-
- // Step 4: Handle based on validation result
- let (conversation_history, pending_questions, restoration_message) = match validation.recovery_action {
- StateRecoveryAction::Proceed => {
- // State is valid, use it
- let questions: Vec<PendingQuestion> = serde_json::from_value(state.pending_questions.clone())
- .unwrap_or_default();
-
- let message = format!(
- "Restored from {} state. {} pending questions, {} spawned tasks, {} waiting tasks.",
- state.state,
- questions.len(),
- state.spawned_task_ids.len(),
- state.pending_task_ids.len()
- );
-
- (state.conversation_history.clone(), questions, message)
- }
- StateRecoveryAction::UseCheckpoint => {
- // State is corrupted, try to use checkpoint
- warnings.push("State validation failed, attempting checkpoint recovery".to_string());
-
- // TODO: Implement checkpoint-based recovery
- // For now, start with empty questions but preserve conversation
- let message = "Restored from last checkpoint due to state inconsistency.".to_string();
- (state.conversation_history.clone(), vec![], message)
- }
- StateRecoveryAction::StartFresh => {
- warnings.push("Starting fresh due to unrecoverable state".to_string());
- let message = "Starting fresh due to unrecoverable state corruption.".to_string();
- (serde_json::json!([]), vec![], message)
- }
- StateRecoveryAction::ManualIntervention => {
- warnings.push("Manual intervention may be required".to_string());
- // Still try to restore but with warning
- let questions: Vec<PendingQuestion> = serde_json::from_value(state.pending_questions.clone())
- .unwrap_or_default();
- let message = "Restored with warnings - manual intervention may be required.".to_string();
- (state.conversation_history.clone(), questions, message)
- }
- };
-
- // Step 5: Mark supervisor as restored
- let new_state = match repository::mark_supervisor_restored(pool, contract_id, restoration_source).await {
- Ok(s) => s,
- Err(e) => {
- return Err(format!("Failed to mark supervisor as restored: {}", e));
- }
- };
-
- // Step 6: Build restoration context
- let context = SupervisorRestorationContext {
- success: true,
- previous_state,
- conversation_history,
- pending_questions,
- waiting_task_ids: state.pending_task_ids.clone(),
- spawned_task_ids: state.spawned_task_ids.clone(),
- restoration_count: new_state.restoration_count,
- restoration_context_message: restoration_message,
- warnings,
- };
-
- tracing::info!(
- contract_id = %contract_id,
- restoration_source = %restoration_source,
- restoration_count = new_state.restoration_count,
- pending_questions_count = context.pending_questions.len(),
- waiting_tasks_count = context.waiting_task_ids.len(),
- spawned_tasks_count = context.spawned_task_ids.len(),
- "Supervisor restoration completed"
- );
-
- Ok(context)
-}
-
-/// Re-deliver pending questions to the user after restoration.
-/// This ensures questions asked before crash are shown to the user again.
-pub async fn redeliver_pending_questions(
- state: &SharedState,
- supervisor_id: Uuid,
- contract_id: Uuid,
- owner_id: Uuid,
- questions: &[PendingQuestion],
-) {
- for question in questions {
- // Add to in-memory question state
- state.add_supervisor_question(
- supervisor_id,
- contract_id,
- owner_id,
- question.question.clone(),
- question.choices.clone(),
- question.context.clone(),
- false, // Assume single select for restored questions
- question.question_type.clone(),
- );
-
- // Broadcast to WebSocket clients
- let question_data = serde_json::json!({
- "question_id": question.id.to_string(),
- "choices": question.choices,
- "context": question.context,
- "question_type": question.question_type,
- "is_restored": true,
- "originally_asked_at": question.asked_at.to_rfc3339(),
- });
-
- state.broadcast_task_output(TaskOutputNotification {
- task_id: supervisor_id,
- owner_id: Some(owner_id),
- message_type: "supervisor_question".to_string(),
- content: question.question.clone(),
- tool_name: None,
- tool_input: Some(question_data),
- is_error: None,
- cost_usd: None,
- duration_ms: None,
- is_partial: false,
- });
-
- tracing::info!(
- supervisor_id = %supervisor_id,
- question_id = %question.id,
- "Re-delivered pending question after restoration"
- );
- }
-}
-
-/// Generate restoration context message for Claude.
-/// This message is injected into the conversation to inform Claude about the restoration.
-pub fn generate_restoration_context_message(context: &SupervisorRestorationContext) -> String {
- let mut message = String::new();
-
- message.push_str("=== SUPERVISOR RESTORATION NOTICE ===\n\n");
- message.push_str(&format!("This supervisor has been restored after interruption. {}\n\n", context.restoration_context_message));
- message.push_str(&format!("Restoration count: {}\n", context.restoration_count));
-
- if !context.pending_questions.is_empty() {
- message.push_str(&format!("\nPending questions ({}): These have been re-delivered to the user.\n", context.pending_questions.len()));
- for q in &context.pending_questions {
- message.push_str(&format!(" - {}: {}\n", q.id, q.question));
- }
- }
-
- if !context.waiting_task_ids.is_empty() {
- message.push_str(&format!("\nWaiting on {} task(s) to complete. Check their status before continuing.\n", context.waiting_task_ids.len()));
- }
-
- if !context.spawned_task_ids.is_empty() {
- message.push_str(&format!("\n{} task(s) were spawned before interruption. Their status may need verification.\n", context.spawned_task_ids.len()));
- }
-
- if !context.warnings.is_empty() {
- message.push_str("\nWarnings:\n");
- for warning in &context.warnings {
- message.push_str(&format!(" - {}\n", warning));
- }
+ (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Question not found")),
+ )
+ .into_response()
}
-
- message.push_str("\n=== END RESTORATION NOTICE ===\n");
-
- message
}
// =============================================================================
-// Order Creation from Directive Tasks
+// Order creation (from directive tasks)
// =============================================================================
-/// Request to create an order from a directive task.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateOrderForTaskRequest {
@@ -3117,30 +496,25 @@ pub struct CreateOrderForTaskRequest {
fn default_order_priority() -> String {
"medium".to_string()
}
-
fn default_order_type() -> String {
"spike".to_string()
}
-
fn default_order_labels() -> serde_json::Value {
serde_json::json!([])
}
-/// Create an order for future work from a directive task.
-///
-/// Only spike and chore order types are allowed. The order is automatically
-/// linked to the directive associated with the calling task.
+/// Create a follow-up order from a directive task (spike/chore only).
#[utoipa::path(
post,
path = "/api/v1/mesh/supervisor/orders",
request_body = CreateOrderForTaskRequest,
responses(
(status = 201, description = "Order created"),
- (status = 400, description = "Invalid order type"),
+ (status = 400, description = "Invalid order type or no directive context"),
(status = 401, description = "Unauthorized"),
- (status = 403, description = "Forbidden - not a supervisor/directive task"),
- (status = 500, description = "Internal server error"),
+ (status = 403, description = "Not a directive task"),
),
+ security(("tool_key" = [])),
tag = "Mesh Supervisor"
)]
pub async fn create_order_for_task(
@@ -3148,14 +522,11 @@ pub async fn create_order_for_task(
headers: HeaderMap,
Json(request): Json<CreateOrderForTaskRequest>,
) -> impl IntoResponse {
- let (task_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
+ let (task_id, owner_id) = match verify_task_auth(&state, &headers).await {
Ok(ids) => ids,
Err(e) => return e.into_response(),
};
- let pool = state.db_pool.as_ref().unwrap();
-
- // Validate order_type is spike or chore
if request.order_type != "spike" && request.order_type != "chore" {
return (
StatusCode::BAD_REQUEST,
@@ -3167,7 +538,8 @@ pub async fn create_order_for_task(
.into_response();
}
- // Get the task to find its directive_id
+ let pool = state.db_pool.as_ref().unwrap();
+
let task = match repository::get_task(pool, task_id).await {
Ok(Some(t)) => t,
Ok(None) => {
@@ -3178,10 +550,10 @@ pub async fn create_order_for_task(
.into_response();
}
Err(e) => {
- tracing::error!(error = %e, "Failed to get task");
+ tracing::error!(error = %e, "Failed to fetch task");
return (
StatusCode::INTERNAL_SERVER_ERROR,
- Json(ApiError::new("DB_ERROR", "Failed to get task")),
+ Json(ApiError::new("DB_ERROR", "Failed to fetch task")),
)
.into_response();
}
@@ -3192,27 +564,21 @@ pub async fn create_order_for_task(
None => {
return (
StatusCode::BAD_REQUEST,
- Json(ApiError::new(
- "NO_DIRECTIVE",
- "Task is not associated with a directive",
- )),
+ Json(ApiError::new("NO_DIRECTIVE", "Task is not directive-attached")),
)
.into_response();
}
};
- // Determine repository_url: use request value, or fall back to directive's repository_url
let repository_url = if request.repository_url.is_some() {
request.repository_url
} else {
- // Look up directive for its repository_url
match repository::get_directive_for_owner(pool, owner_id, directive_id).await {
Ok(Some(d)) => d.repository_url,
_ => None,
}
};
- // Create the order
let order_req = CreateOrderRequest {
title: request.title,
description: request.description,
diff --git a/makima/src/server/handlers/mod.rs b/makima/src/server/handlers/mod.rs
index 5737360..312fdc7 100644
--- a/makima/src/server/handlers/mod.rs
+++ b/makima/src/server/handlers/mod.rs
@@ -14,7 +14,6 @@ pub mod directives;
pub mod file_ws;
pub mod files;
pub mod history;
-pub mod listen;
pub mod mesh;
pub mod orders;
pub mod mesh_daemon;