diff options
Diffstat (limited to 'makima/src/server')
| -rw-r--r-- | makima/src/server/handlers/directives.rs | 12 | ||||
| -rw-r--r-- | makima/src/server/handlers/files.rs | 207 | ||||
| -rw-r--r-- | makima/src/server/handlers/history.rs | 268 | ||||
| -rw-r--r-- | makima/src/server/handlers/listen.rs | 783 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh.rs | 247 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 419 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_supervisor.rs | 2980 | ||||
| -rw-r--r-- | makima/src/server/handlers/mod.rs | 1 | ||||
| -rw-r--r-- | makima/src/server/messages.rs | 5 | ||||
| -rw-r--r-- | makima/src/server/mod.rs | 88 | ||||
| -rw-r--r-- | makima/src/server/openapi.rs | 34 | ||||
| -rw-r--r-- | makima/src/server/state.rs | 249 |
12 files changed, 234 insertions, 5059 deletions
diff --git a/makima/src/server/handlers/directives.rs b/makima/src/server/handlers/directives.rs index 35a46a0..410b2b3 100644 --- a/makima/src/server/handlers/directives.rs +++ b/makima/src/server/handlers/directives.rs @@ -1019,12 +1019,10 @@ pub async fn cleanup_directive( // Create the cleanup task (following pick_up_orders pattern) let req = CreateTaskRequest { - contract_id: None, name: format!("Cleanup: {}", directive.title), description: Some("Directive cleanup — verify merged branches and remove merged steps".to_string()), plan: prompt, parent_task_id: None, - is_supervisor: false, priority: 0, repository_url: directive.repository_url.clone(), base_branch: directive.base_branch.clone(), @@ -1037,7 +1035,6 @@ pub async fn cleanup_directive( checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, - supervisor_worktree_task_id: None, directive_id: Some(directive.id), directive_step_id: None, }; @@ -1330,12 +1327,10 @@ pub async fn pick_up_orders( // Create the planning task let req = CreateTaskRequest { - contract_id: None, name: format!("Pick up orders: {}", directive.title), description: Some("Directive order pickup planning task".to_string()), plan, parent_task_id: None, - is_supervisor: false, priority: 0, repository_url: directive.repository_url.clone(), base_branch: directive.base_branch.clone(), @@ -1348,7 +1343,6 @@ pub async fn pick_up_orders( checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, - supervisor_worktree_task_id: None, directive_id: Some(directive.id), directive_step_id: None, }; @@ -1907,12 +1901,10 @@ pub async fn pick_up_dog_orders( // Create the planning task let req = CreateTaskRequest { - contract_id: None, name: format!("Pick up DOG orders: {}", directive.title), description: Some("Directive order group pickup planning task".to_string()), plan, parent_task_id: None, - is_supervisor: false, priority: 0, repository_url: directive.repository_url.clone(), base_branch: directive.base_branch.clone(), @@ -1925,7 +1917,6 @@ pub async fn pick_up_dog_orders( checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, - supervisor_worktree_task_id: None, directive_id: Some(directive.id), directive_step_id: None, }; @@ -2209,12 +2200,10 @@ pub async fn create_directive_task( let base_branch = req.base_branch.or_else(|| directive.base_branch.clone()); let create_req = CreateTaskRequest { - contract_id: None, name: req.name, description: None, plan: req.plan, parent_task_id: None, - is_supervisor: false, priority: 0, repository_url: repo_url, base_branch, @@ -2227,7 +2216,6 @@ pub async fn create_directive_task( checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, - supervisor_worktree_task_id: None, directive_id: Some(directive.id), // No directive_step_id — this is what makes the task "ephemeral": // it lives under the directive folder but isn't part of the DAG. diff --git a/makima/src/server/handlers/files.rs b/makima/src/server/handlers/files.rs index 711be41..023b9ff 100644 --- a/makima/src/server/handlers/files.rs +++ b/makima/src/server/handlers/files.rs @@ -145,26 +145,7 @@ pub async fn create_file( .into_response(); }; - // Verify the contract exists and belongs to the owner - match repository::get_contract_for_owner(pool, req.contract_id, auth.owner_id).await { - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("CONTRACT_NOT_FOUND", "Contract not found")), - ) - .into_response(); - } - Err(e) => { - tracing::error!("Failed to verify contract: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", e.to_string())), - ) - .into_response(); - } - Ok(Some(_)) => {} // Contract exists, proceed - } - + // Legacy contract scope removed; files are owner-scoped only now. match repository::create_file_for_owner(pool, auth.owner_id, req).await { Ok(file) => (StatusCode::CREATED, Json(file)).into_response(), Err(e) => { @@ -336,189 +317,3 @@ pub async fn delete_file( } } -/// Sync a file from its linked repository file. -/// -/// This endpoint triggers an async sync operation. The file must have a -/// repo_file_path set, and its contract must have a linked repository. -/// A connected daemon will read the file and update the file content. -#[utoipa::path( - post, - path = "/api/v1/files/{id}/sync-from-repo", - params( - ("id" = Uuid, Path, description = "File ID") - ), - responses( - (status = 202, description = "Sync operation started"), - (status = 400, description = "File not linked to repository", body = ApiError), - (status = 401, description = "Unauthorized", body = ApiError), - (status = 404, description = "File not found", body = ApiError), - (status = 503, description = "No daemon available", body = ApiError), - (status = 500, description = "Internal server error", body = ApiError), - ), - security( - ("bearer_auth" = []), - ("api_key" = []) - ), - tag = "Files" -)] -pub async fn sync_file_from_repo( - State(state): State<SharedState>, - Authenticated(auth): Authenticated, - Path(id): Path<Uuid>, -) -> impl IntoResponse { - let Some(ref pool) = state.db_pool else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), - ) - .into_response(); - }; - - // Get the file and verify it has a repo_file_path - let file = match repository::get_file_for_owner(pool, id, auth.owner_id).await { - Ok(Some(f)) => f, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "File not found")), - ) - .into_response(); - } - Err(e) => { - tracing::error!("Failed to get file {}: {}", id, e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", e.to_string())), - ) - .into_response(); - } - }; - - // Check if file has a repo path and contract_id - let contract_id = match file.contract_id { - Some(id) => id, - None => { - return ( - StatusCode::BAD_REQUEST, - Json(ApiError::new( - "NO_CONTRACT", - "File is not associated with a contract", - )), - ) - .into_response(); - } - }; - - let repo_file_path = match file.repo_file_path { - Some(ref path) if !path.is_empty() => path.clone(), - _ => { - return ( - StatusCode::BAD_REQUEST, - Json(ApiError::new( - "NOT_LINKED", - "File is not linked to a repository file", - )), - ) - .into_response(); - } - }; - - // Get contract repositories - let repositories = match repository::list_contract_repositories(pool, contract_id).await { - Ok(repos) => repos, - Err(e) => { - tracing::error!("Failed to get contract repositories: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", e.to_string())), - ) - .into_response(); - } - }; - - // Check if contract has repositories - if repositories.is_empty() { - return ( - StatusCode::BAD_REQUEST, - Json(ApiError::new( - "NO_REPOSITORY", - "Contract has no linked repositories", - )), - ) - .into_response(); - } - - // Use the first repository's local path - let repo = &repositories[0]; - let repo_local_path = match &repo.local_path { - Some(path) if !path.is_empty() => path.clone(), - _ => { - return ( - StatusCode::BAD_REQUEST, - Json(ApiError::new( - "NO_LOCAL_PATH", - "Repository has no local path configured", - )), - ) - .into_response(); - } - }; - - // Find a connected daemon for this owner - let daemon_id = state - .daemon_connections - .iter() - .find(|entry| entry.value().owner_id == auth.owner_id) - .map(|entry| entry.value().id); - - let daemon_id = match daemon_id { - Some(id) => id, - None => { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(ApiError::new( - "NO_DAEMON", - "No daemon connected. Start a daemon to sync files from repository.", - )), - ) - .into_response(); - } - }; - - // Send ReadRepoFile command to daemon - // Use the file ID as the request_id so we can match the response - let command = DaemonCommand::ReadRepoFile { - request_id: id, - contract_id, - file_path: repo_file_path, - repo_path: repo_local_path, - }; - - if let Err(e) = state.send_daemon_command(daemon_id, command).await { - tracing::error!("Failed to send ReadRepoFile command: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DAEMON_ERROR", e)), - ) - .into_response(); - } - - // Update status to indicate sync in progress - if let Err(e) = sqlx::query("UPDATE files SET repo_sync_status = 'syncing' WHERE id = $1") - .bind(id) - .execute(pool) - .await - { - tracing::warn!("Failed to update repo_sync_status: {}", e); - } - - // Return 202 Accepted - the sync happens asynchronously - ( - StatusCode::ACCEPTED, - Json(serde_json::json!({ - "message": "Sync operation started", - "fileId": id, - })), - ) - .into_response() -} diff --git a/makima/src/server/handlers/history.rs b/makima/src/server/handlers/history.rs index bee6b02..46be7ac 100644 --- a/makima/src/server/handlers/history.rs +++ b/makima/src/server/handlers/history.rs @@ -10,10 +10,7 @@ use uuid::Uuid; use crate::{ db::{ - models::{ - flexible_datetime, ContractHistoryResponse, ConversationMessage, HistoryQueryFilters, - SupervisorConversationResponse, TaskConversationResponse, TaskReference, - }, + models::{flexible_datetime, ConversationMessage, HistoryQueryFilters, TaskConversationResponse}, repository, }, server::{auth::Authenticated, messages::ApiError, state::SharedState}, @@ -32,7 +29,6 @@ pub struct TaskConversationParams { #[derive(Debug, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct TimelineQueryFilters { - pub contract_id: Option<Uuid>, pub task_id: Option<Uuid>, pub include_subtasks: Option<bool>, #[serde(default, deserialize_with = "flexible_datetime::deserialize")] @@ -42,231 +38,6 @@ pub struct TimelineQueryFilters { pub limit: Option<i32>, } -/// GET /api/v1/contracts/{id}/history -/// Returns contract history timeline with filtering and pagination -#[utoipa::path( - get, - path = "/api/v1/contracts/{id}/history", - params( - ("id" = Uuid, Path, description = "Contract ID"), - ("phase" = Option<String>, Query, description = "Filter by phase"), - ("event_types" = Option<String>, Query, description = "Filter by event types (comma-separated)"), - ("from" = Option<String>, Query, description = "Start date filter"), - ("to" = Option<String>, Query, description = "End date filter"), - ("limit" = Option<i32>, Query, description = "Limit results"), - ), - responses( - (status = 200, description = "Contract history", body = ContractHistoryResponse), - (status = 401, description = "Unauthorized", body = ApiError), - (status = 403, description = "Forbidden", body = ApiError), - (status = 404, description = "Contract not found", body = ApiError), - (status = 503, description = "Database not configured", body = ApiError), - (status = 500, description = "Internal server error", body = ApiError), - ), - security( - ("bearer_auth" = []), - ("api_key" = []) - ), - tag = "History" -)] -pub async fn get_contract_history( - State(state): State<SharedState>, - Path(contract_id): Path<Uuid>, - Query(filters): Query<HistoryQueryFilters>, - Authenticated(auth): Authenticated, -) -> impl IntoResponse { - let Some(ref pool) = state.db_pool else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), - ) - .into_response(); - }; - - // Verify contract exists and user has access - let contract = match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { - Ok(Some(c)) => c, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Contract not found")), - ) - .into_response(); - } - Err(e) => { - tracing::error!("Failed to get contract {}: {}", contract_id, e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", e.to_string())), - ) - .into_response(); - } - }; - - // Get history events - match repository::get_contract_history(pool, contract.id, auth.owner_id, &filters).await { - Ok((events, total_count)) => { - Json(ContractHistoryResponse { - contract_id, - entries: events, - total_count, - cursor: None, // TODO: implement cursor pagination - }) - .into_response() - } - Err(e) => { - tracing::error!("Failed to get contract history: {}", e); - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", e.to_string())), - ) - .into_response() - } - } -} - -/// GET /api/v1/contracts/{id}/supervisor/conversation -/// Returns full supervisor conversation with spawned task references -#[utoipa::path( - get, - path = "/api/v1/contracts/{id}/supervisor/conversation", - params( - ("id" = Uuid, Path, description = "Contract ID") - ), - responses( - (status = 200, description = "Supervisor conversation", body = SupervisorConversationResponse), - (status = 401, description = "Unauthorized", body = ApiError), - (status = 403, description = "Forbidden", body = ApiError), - (status = 404, description = "Supervisor not found", body = ApiError), - (status = 503, description = "Database not configured", body = ApiError), - (status = 500, description = "Internal server error", body = ApiError), - ), - security( - ("bearer_auth" = []), - ("api_key" = []) - ), - tag = "History" -)] -pub async fn get_supervisor_conversation( - State(state): State<SharedState>, - Path(contract_id): Path<Uuid>, - Authenticated(auth): Authenticated, -) -> impl IntoResponse { - let Some(ref pool) = state.db_pool else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), - ) - .into_response(); - }; - - // Get contract for phase info and ownership check - let contract = match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { - Ok(Some(c)) => c, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Contract not found")), - ) - .into_response(); - } - Err(e) => { - tracing::error!("Failed to get contract {}: {}", contract_id, e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", e.to_string())), - ) - .into_response(); - } - }; - - // Get the supervisor state - let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await { - Ok(Some(s)) => s, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Supervisor not found")), - ) - .into_response(); - } - Err(e) => { - tracing::error!("Failed to get supervisor state for {}: {}", contract_id, e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", e.to_string())), - ) - .into_response(); - } - }; - - // Parse conversation history from JSONB - let messages: Vec<ConversationMessage> = supervisor_state - .conversation_history - .as_array() - .map(|arr| { - arr.iter() - .enumerate() - .map(|(i, v)| ConversationMessage { - id: i.to_string(), - role: v - .get("role") - .and_then(|r| r.as_str()) - .unwrap_or("user") - .to_string(), - content: v - .get("content") - .and_then(|c| c.as_str()) - .unwrap_or("") - .to_string(), - timestamp: supervisor_state.last_activity, - tool_calls: None, - tool_name: None, - tool_input: None, - tool_result: None, - is_error: None, - token_count: None, - cost_usd: None, - }) - .collect() - }) - .unwrap_or_default(); - - // Get spawned tasks - let tasks = match repository::list_tasks_by_contract(pool, contract_id, auth.owner_id).await { - Ok(t) => t, - Err(e) => { - tracing::warn!("Failed to get tasks for contract {}: {}", contract_id, e); - Vec::new() - } - }; - - let spawned_tasks: Vec<TaskReference> = tasks - .into_iter() - .filter(|t| !t.is_supervisor) - .map(|t| TaskReference { - task_id: t.id, - task_name: t.name, - status: t.status, - created_at: t.created_at, - completed_at: t.completed_at, - }) - .collect(); - - Json(SupervisorConversationResponse { - contract_id, - supervisor_task_id: supervisor_state.task_id, - phase: contract.phase, - last_activity: supervisor_state.last_activity, - pending_task_ids: supervisor_state.pending_task_ids, - messages, - spawned_tasks, - }) - .into_response() -} - -/// GET /api/v1/mesh/tasks/{id}/conversation -/// Returns task conversation history #[utoipa::path( get, path = "/api/v1/mesh/tasks/{id}/conversation", @@ -364,28 +135,16 @@ pub async fn get_task_conversation( } /// GET /api/v1/timeline -/// Returns unified timeline for authenticated user +/// Returns unified task-history timeline for the authenticated user. #[utoipa::path( get, path = "/api/v1/timeline", - params( - ("contract_id" = Option<Uuid>, Query, description = "Filter by contract"), - ("task_id" = Option<Uuid>, Query, description = "Filter by task"), - ("include_subtasks" = Option<bool>, Query, description = "Include subtask events"), - ("from" = Option<String>, Query, description = "Start date filter"), - ("to" = Option<String>, Query, description = "End date filter"), - ("limit" = Option<i32>, Query, description = "Limit results"), - ), responses( - (status = 200, description = "Timeline events", body = ContractHistoryResponse), + (status = 200, description = "Timeline events"), (status = 401, description = "Unauthorized", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), - (status = 500, description = "Internal server error", body = ApiError), - ), - security( - ("bearer_auth" = []), - ("api_key" = []) ), + security(("bearer_auth" = []), ("api_key" = [])), tag = "History" )] pub async fn get_timeline( @@ -402,7 +161,6 @@ pub async fn get_timeline( }; let history_filters = HistoryQueryFilters { - phase: None, event_types: None, from: filters.from, to: filters.to, @@ -410,24 +168,18 @@ pub async fn get_timeline( cursor: None, }; - let result = if let Some(contract_id) = filters.contract_id { - repository::get_contract_history(pool, contract_id, auth.owner_id, &history_filters).await - } else if let Some(task_id) = filters.task_id { + let result = if let Some(task_id) = filters.task_id { repository::get_task_history(pool, task_id, auth.owner_id, &history_filters).await } else { repository::get_timeline(pool, auth.owner_id, &history_filters).await }; match result { - Ok((events, total_count)) => { - Json(ContractHistoryResponse { - contract_id: filters.contract_id.unwrap_or_default(), - entries: events, - total_count, - cursor: None, - }) - .into_response() - } + Ok((events, total_count)) => Json(serde_json::json!({ + "entries": events, + "totalCount": total_count, + })) + .into_response(), Err(e) => { tracing::error!("Failed to get timeline: {}", e); ( diff --git a/makima/src/server/handlers/listen.rs b/makima/src/server/handlers/listen.rs deleted file mode 100644 index e1bc30e..0000000 --- a/makima/src/server/handlers/listen.rs +++ /dev/null @@ -1,783 +0,0 @@ -//! WebSocket handler for streaming speech-to-text with sliding window optimization. - -use axum::{ - extract::{ws::Message, ws::WebSocket, State, WebSocketUpgrade}, - response::Response, -}; -use futures::{SinkExt, StreamExt}; -use tokio::sync::mpsc; -use uuid::Uuid; - -use crate::audio::{resample_and_mixdown, TARGET_CHANNELS, TARGET_SAMPLE_RATE}; -use crate::db::models::{TranscriptEntry, UpdateFileRequest}; -use crate::db::repository; -use crate::listen::{align_speakers, samples_per_chunk, DialogueSegment, TimestampMode}; -use crate::server::messages::{ - AudioEncoding, ClientMessage, ServerMessage, StartMessage, TranscriptMessage, -}; -use crate::server::state::{MlModels, SharedState}; - -/// Chunk size in milliseconds for triggering transcription processing. -const STREAM_CHUNK_MS: u32 = 5_000; - -/// Maximum window size in seconds for sliding window processing. -const MAX_WINDOW_SECONDS: f32 = 30.0; - -/// Maximum window size in samples at 16kHz. -const MAX_WINDOW_SAMPLES: usize = (MAX_WINDOW_SECONDS as usize) * (TARGET_SAMPLE_RATE as usize); - -/// EOU chunk size in samples (160ms at 16kHz). -const EOU_CHUNK_SIZE: usize = 2560; - -/// Context overlap in seconds to keep when trimming finalized audio. -const CONTEXT_OVERLAP_SECONDS: f32 = 2.0; - -/// WebSocket upgrade handler for STT streaming. -/// -/// This endpoint accepts WebSocket connections for real-time speech-to-text -/// transcription with speaker diarization. -#[utoipa::path( - get, - path = "/api/v1/listen", - responses( - (status = 101, description = "WebSocket connection established"), - ), - tag = "Listen" -)] -pub async fn websocket_handler( - ws: WebSocketUpgrade, - State(state): State<SharedState>, -) -> Response { - ws.on_upgrade(|socket| handle_socket(socket, state)) -} - -async fn handle_socket(socket: WebSocket, state: SharedState) { - let session_id = Uuid::new_v4().to_string(); - tracing::info!(session_id = %session_id, "New WebSocket connection"); - - // Split socket for concurrent read/write - let (mut sender, mut receiver) = socket.split(); - - // Channel for sending responses back to client - let (response_tx, mut response_rx) = mpsc::channel::<ServerMessage>(32); - - // Spawn task to forward responses to WebSocket - let sender_task = tokio::spawn(async move { - while let Some(msg) = response_rx.recv().await { - let json = match serde_json::to_string(&msg) { - Ok(j) => j, - Err(e) => { - tracing::error!("Failed to serialize message: {}", e); - continue; - } - }; - if sender.send(Message::Text(json.into())).await.is_err() { - break; - } - } - }); - - // Lazy-load ML models on first Listen connection - let ml_models = match state.get_ml_models().await { - Ok(models) => models, - Err(e) => { - tracing::error!(session_id = %session_id, error = %e, "Failed to load ML models"); - let _ = response_tx - .send(ServerMessage::Error { - code: "MODEL_LOAD_ERROR".into(), - message: format!("Failed to load ML models: {}", e), - }) - .await; - drop(response_tx); - let _ = sender_task.await; - return; - } - }; - - // Send ready message - let _ = response_tx - .send(ServerMessage::Ready { - session_id: session_id.clone(), - }) - .await; - - // Audio format state - let mut audio_format: Option<StartMessage> = None; - - // Main audio buffer for transcription (accumulates resampled 16kHz mono audio) - let mut audio_buffer: Vec<f32> = Vec::new(); - - // EOU detection buffer (resampled audio for utterance detection) - let mut eou_buffer: Vec<f32> = Vec::new(); - let mut last_eou_text: String = String::new(); - let mut utterance_ended: bool = false; - - // Tracking state - let mut last_sent_end_time: f32 = 0.0; - let mut last_processed_len: usize = 0; - let mut audio_offset: f32 = 0.0; // Time offset from trimmed audio - let mut finalized_segments: Vec<DialogueSegment> = Vec::new(); - - // File persistence state - let mut file_id: Option<Uuid> = None; - let mut transcript_entries: Vec<TranscriptEntry> = Vec::new(); - let mut transcript_counter: u32 = 0; - - // Auth state (set when Start message includes valid auth_token and contract_id) - let mut authenticated_owner_id: Option<Uuid> = None; - let mut target_contract_id: Option<Uuid> = None; - - // Reset Sortformer state for new session - { - let mut sortformer = ml_models.sortformer.lock().await; - sortformer.reset_state(); - } - - // Process incoming messages - while let Some(msg_result) = receiver.next().await { - let msg = match msg_result { - Ok(m) => m, - Err(e) => { - tracing::error!("WebSocket error: {}", e); - break; - } - }; - - match msg { - Message::Text(text) => { - // Parse JSON control messages - match serde_json::from_str::<ClientMessage>(&text) { - Ok(ClientMessage::Start(start)) => { - tracing::info!( - session_id = %session_id, - sample_rate = start.sample_rate, - channels = start.channels, - encoding = ?start.encoding, - contract_id = ?start.contract_id, - has_auth = start.auth_token.is_some(), - "Session started" - ); - - // Validate auth and contract if provided - if let (Some(token), Some(contract_id_str)) = (&start.auth_token, &start.contract_id) { - // Parse contract ID - if let Ok(contract_id) = Uuid::parse_str(contract_id_str) { - // Validate JWT token - if let Some(ref verifier) = state.jwt_verifier { - match verifier.verify(token) { - Ok(claims) => { - authenticated_owner_id = Some(claims.sub); - target_contract_id = Some(contract_id); - tracing::info!( - session_id = %session_id, - owner_id = %claims.sub, - contract_id = %contract_id, - "Authenticated session - transcripts will be saved to contract" - ); - } - Err(e) => { - tracing::warn!( - session_id = %session_id, - error = %e, - "Invalid auth token - transcripts will not be saved" - ); - } - } - } else { - tracing::debug!( - session_id = %session_id, - "No JWT verifier configured - transcripts will not be saved" - ); - } - } else { - tracing::warn!( - session_id = %session_id, - contract_id = contract_id_str, - "Invalid contract ID format" - ); - } - } - - audio_format = Some(start); - audio_buffer.clear(); - eou_buffer.clear(); - last_eou_text.clear(); - utterance_ended = false; - last_sent_end_time = 0.0; - last_processed_len = 0; - audio_offset = 0.0; - finalized_segments.clear(); - file_id = None; - authenticated_owner_id = authenticated_owner_id; // Keep from above - target_contract_id = target_contract_id; // Keep from above - - // Reset models for new session - let mut sortformer = ml_models.sortformer.lock().await; - sortformer.reset_state(); - } - Ok(ClientMessage::Stop(stop)) => { - tracing::info!( - session_id = %session_id, - reason = ?stop.reason, - audio_buffer_len = audio_buffer.len(), - "Session stopped by client" - ); - - if audio_format.is_some() { - if !audio_buffer.is_empty() { - tracing::debug!( - session_id = %session_id, - samples = audio_buffer.len(), - "Processing final audio buffer" - ); - - // Process remaining audio with sliding window - match process_audio_window(&audio_buffer, audio_offset, ml_models).await { - Ok(segments) => { - tracing::debug!( - session_id = %session_id, - total_segments = segments.len(), - finalized_count = finalized_segments.len(), - last_sent_end = last_sent_end_time, - "Final transcription complete" - ); - - // Combine finalized segments with new segments - let mut all_segments = finalized_segments.clone(); - - // Add segments from current window that weren't finalized - for seg in &segments { - // Adjust timestamps with offset - let adjusted_seg = DialogueSegment { - speaker: seg.speaker.clone(), - start: seg.start + audio_offset, - end: seg.end + audio_offset, - text: seg.text.clone(), - }; - - // Only add if not already finalized - if !finalized_segments.iter().any(|f| - (f.start - adjusted_seg.start).abs() < 0.1 && - f.text == adjusted_seg.text - ) { - all_segments.push(adjusted_seg); - } - } - - // Sort by start time - all_segments.sort_by(|a, b| a.start.partial_cmp(&b.start).unwrap()); - - // Send any NEW segments as interim first - for seg in &all_segments { - if seg.end > last_sent_end_time { - let _ = response_tx - .send(ServerMessage::Transcript(TranscriptMessage { - speaker: seg.speaker.clone(), - start: seg.start, - end: seg.end, - text: seg.text.clone(), - is_final: false, - })) - .await; - } - } - - // Send ALL segments as final - for seg in &all_segments { - let _ = response_tx - .send(ServerMessage::Transcript(TranscriptMessage { - speaker: seg.speaker.clone(), - start: seg.start, - end: seg.end, - text: seg.text.clone(), - is_final: true, - })) - .await; - } - } - Err(e) => { - tracing::error!( - session_id = %session_id, - error = %e, - "Final transcription failed" - ); - let _ = response_tx - .send(ServerMessage::Error { - code: "TRANSCRIPTION_ERROR".into(), - message: e.to_string(), - }) - .await; - } - } - } - } - - let _ = response_tx - .send(ServerMessage::Stopped { - reason: stop.reason.unwrap_or_else(|| "client_requested".into()), - }) - .await; - break; - } - Err(e) => { - tracing::warn!(session_id = %session_id, error = %e, "Failed to parse message"); - let _ = response_tx - .send(ServerMessage::Error { - code: "PARSE_ERROR".into(), - message: format!("Failed to parse message: {}", e), - }) - .await; - } - } - } - Message::Binary(data) => { - let Some(ref format) = audio_format else { - let _ = response_tx - .send(ServerMessage::Error { - code: "NO_FORMAT".into(), - message: "Received audio before start message".into(), - }) - .await; - continue; - }; - - // Decode binary audio data to f32 samples - let samples = decode_audio_chunk(&data, format); - - // Resample to 16kHz mono for all processing - let resampled = if format.sample_rate != TARGET_SAMPLE_RATE || format.channels != TARGET_CHANNELS { - resample_and_mixdown(&samples, format.sample_rate, format.channels) - } else { - samples - }; - - audio_buffer.extend(&resampled); - eou_buffer.extend(&resampled); - - // Process EOU detection in 160ms chunks - while eou_buffer.len() >= EOU_CHUNK_SIZE { - let chunk: Vec<f32> = eou_buffer.drain(..EOU_CHUNK_SIZE).collect(); - - let mut eou = ml_models.parakeet_eou.lock().await; - if let Ok(text) = eou.transcribe(&chunk, false) { - // Detect utterance boundary (sentence-ending punctuation) - if !text.is_empty() && text != last_eou_text { - if last_eou_text.ends_with('.') - || last_eou_text.ends_with('?') - || last_eou_text.ends_with('!') - { - utterance_ended = true; - tracing::debug!( - session_id = %session_id, - "Utterance boundary detected via EOU" - ); - } - last_eou_text = text; - } - } - } - - // Calculate if we should process (utterance ended OR enough new audio) - let chunk_samples = samples_per_chunk(TARGET_SAMPLE_RATE, STREAM_CHUNK_MS); - let new_audio_len = audio_buffer.len() - last_processed_len; - let should_process = utterance_ended || new_audio_len >= chunk_samples; - - if should_process { - tracing::debug!( - session_id = %session_id, - total_samples = audio_buffer.len(), - new_samples = new_audio_len, - utterance_ended = utterance_ended, - audio_offset = audio_offset, - "Processing audio with sliding window" - ); - - match process_audio_window(&audio_buffer, audio_offset, ml_models).await { - Ok(segments) => { - tracing::debug!( - session_id = %session_id, - total_segments = segments.len(), - last_sent_end = last_sent_end_time, - "Transcription produced segments" - ); - - // Send segments with adjusted timestamps - for seg in &segments { - let adjusted_start = seg.start + audio_offset; - let adjusted_end = seg.end + audio_offset; - if adjusted_end > last_sent_end_time { - // Create file on first transcript if authenticated with contract - if file_id.is_none() { - if let (Some(owner_id), Some(contract_id), Some(pool)) = - (authenticated_owner_id, target_contract_id, &state.db_pool) - { - let create_req = crate::db::models::CreateFileRequest { - contract_id, - name: None, // Auto-generated - description: Some("Live transcription".to_string()), - transcript: vec![], - location: None, - body: vec![], - repo_file_path: None, - contract_phase: None, // Will be looked up from contract - }; - match repository::create_file_for_owner(pool, owner_id, create_req).await { - Ok(file) => { - file_id = Some(file.id); - tracing::info!( - session_id = %session_id, - file_id = %file.id, - contract_id = %contract_id, - "Created file for session in contract" - ); - } - Err(e) => { - tracing::warn!( - session_id = %session_id, - error = %e, - "Failed to create file for session" - ); - } - } - } - } - - // Track transcript entry - transcript_counter += 1; - transcript_entries.push(TranscriptEntry { - id: format!("{}-{}", session_id, transcript_counter), - speaker: seg.speaker.clone(), - start: adjusted_start, - end: adjusted_end, - text: seg.text.clone(), - is_final: false, - }); - - let _ = response_tx - .send(ServerMessage::Transcript(TranscriptMessage { - speaker: seg.speaker.clone(), - start: adjusted_start, - end: adjusted_end, - text: seg.text.clone(), - is_final: false, - })) - .await; - last_sent_end_time = adjusted_end; - } - } - - // If utterance ended, finalize and trim - if utterance_ended && segments.len() > 1 { - // Finalize all but the last segment - let to_finalize = &segments[..segments.len() - 1]; - for seg in to_finalize { - finalized_segments.push(DialogueSegment { - speaker: seg.speaker.clone(), - start: seg.start + audio_offset, - end: seg.end + audio_offset, - text: seg.text.clone(), - }); - } - - // Trim audio buffer - if let Some(last_finalized) = to_finalize.last() { - let trim_to_time = (last_finalized.end - CONTEXT_OVERLAP_SECONDS).max(0.0); - let trim_samples = (trim_to_time * TARGET_SAMPLE_RATE as f32) as usize; - - if trim_samples > 0 && trim_samples < audio_buffer.len() { - audio_buffer.drain(..trim_samples); - audio_offset += trim_to_time; - tracing::debug!( - session_id = %session_id, - trimmed_samples = trim_samples, - new_offset = audio_offset, - remaining_samples = audio_buffer.len(), - "Trimmed audio buffer after finalization" - ); - } - } - } - - last_processed_len = audio_buffer.len(); - utterance_ended = false; - } - Err(e) => { - tracing::error!(session_id = %session_id, error = %e, "Transcription error"); - let _ = response_tx - .send(ServerMessage::Error { - code: "TRANSCRIPTION_ERROR".into(), - message: e.to_string(), - }) - .await; - } - } - } - } - Message::Close(_) => { - tracing::info!(session_id = %session_id, "WebSocket closed by client"); - break; - } - _ => {} - } - } - - // Save final transcript to file if we have one - if let Some(fid) = file_id { - if let Some(ref pool) = state.db_pool { - // Deduplicate transcript entries before saving - let deduplicated = deduplicate_transcripts(&transcript_entries); - - // Mark all entries as final - let final_entries: Vec<TranscriptEntry> = deduplicated - .into_iter() - .map(|mut entry| { - entry.is_final = true; - entry - }) - .collect(); - - match repository::update_file(pool, fid, UpdateFileRequest { - name: None, - description: None, - transcript: Some(final_entries.clone()), - summary: None, - body: None, - version: None, // Internal update, skip version check - repo_file_path: None, - }).await { - Ok(_) => { - tracing::info!( - session_id = %session_id, - file_id = %fid, - original_count = transcript_entries.len(), - deduplicated_count = final_entries.len(), - "Saved final transcript to file" - ); - - // Send TranscriptSaved message to client - if let Some(contract_id) = target_contract_id { - let _ = response_tx - .send(ServerMessage::TranscriptSaved { - file_id: fid.to_string(), - contract_id: contract_id.to_string(), - }) - .await; - } - } - Err(e) => { - tracing::error!( - session_id = %session_id, - file_id = %fid, - error = %e, - "Failed to save final transcript to file" - ); - } - } - } - } - - // Cleanup - drop(response_tx); - let _ = sender_task.await; - tracing::info!(session_id = %session_id, "WebSocket connection closed"); -} - -/// Decode binary audio chunk to f32 samples based on encoding format. -fn decode_audio_chunk(data: &[u8], format: &StartMessage) -> Vec<f32> { - match format.encoding { - AudioEncoding::Pcm32f => data - .chunks_exact(4) - .map(|chunk| f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]])) - .collect(), - AudioEncoding::Pcm16 | AudioEncoding::Raw => data - .chunks_exact(2) - .map(|chunk| { - let sample = i16::from_le_bytes([chunk[0], chunk[1]]); - sample as f32 / 32768.0 - }) - .collect(), - } -} - -/// Deduplicate transcript entries by removing entries with similar times and text. -/// -/// Entries are considered duplicates if any of these are true: -/// - Start times are within 1.5 seconds AND text is similar (same, substring, or high overlap) -/// - Time ranges overlap significantly AND text is similar -/// - Text is identical regardless of timing -fn deduplicate_transcripts(entries: &[TranscriptEntry]) -> Vec<TranscriptEntry> { - if entries.is_empty() { - return vec![]; - } - - // Sort by start time - let mut sorted: Vec<TranscriptEntry> = entries.to_vec(); - sorted.sort_by(|a, b| a.start.partial_cmp(&b.start).unwrap_or(std::cmp::Ordering::Equal)); - - let mut result: Vec<TranscriptEntry> = Vec::new(); - - for entry in sorted { - // Normalize text for comparison - let entry_text_normalized = normalize_text(&entry.text); - - // Check if this entry is a duplicate of any existing entry - let duplicate_idx = result.iter().position(|existing| { - let existing_text_normalized = normalize_text(&existing.text); - - // Check if same speaker - let same_speaker = existing.speaker == entry.speaker; - - // Check if start times are identical or very close - let start_identical = (existing.start - entry.start).abs() < 0.1; - let start_close = (existing.start - entry.start).abs() < 1.5; - - // Check if time ranges overlap - let time_overlap = existing.start < entry.end && entry.start < existing.end; - - // Check various text similarity conditions - let text_identical = existing_text_normalized == entry_text_normalized; - let text_contained = existing_text_normalized.contains(&entry_text_normalized) - || entry_text_normalized.contains(&existing_text_normalized); - let text_similar = text_similarity(&existing_text_normalized, &entry_text_normalized) > 0.7; - - // Duplicate conditions: - // 1. Same speaker + identical start time (different end times = same segment refined) - // 2. Same speaker + close start + similar text - // 3. Same speaker + overlapping time + similar text - // 4. Identical text (likely a re-transcription) - (same_speaker && start_identical) - || (same_speaker && start_close && (text_identical || text_contained || text_similar)) - || (same_speaker && time_overlap && (text_identical || text_contained)) - || text_identical - }); - - match duplicate_idx { - Some(idx) => { - // If the new entry has longer text, update the existing one - if entry.text.len() > result[idx].text.len() { - result[idx].text = entry.text.clone(); - result[idx].end = result[idx].end.max(entry.end); - } else { - // Extend end time if needed - result[idx].end = result[idx].end.max(entry.end); - } - } - None => { - result.push(entry); - } - } - } - - // Second pass: merge adjacent segments with same speaker and similar text - let mut merged: Vec<TranscriptEntry> = Vec::new(); - for entry in result { - if let Some(last) = merged.last_mut() { - // Check if this should be merged with the previous entry - let same_speaker = last.speaker == entry.speaker; - let adjacent = (entry.start - last.end).abs() < 0.5; - let text_overlap = normalize_text(&last.text).contains(&normalize_text(&entry.text)) - || normalize_text(&entry.text).contains(&normalize_text(&last.text)); - - if same_speaker && adjacent && text_overlap { - // Merge: keep longer text, extend time range - if entry.text.len() > last.text.len() { - last.text = entry.text; - } - last.end = last.end.max(entry.end); - continue; - } - } - merged.push(entry); - } - - // Reassign IDs to be sequential - for (i, entry) in merged.iter_mut().enumerate() { - let parts: Vec<&str> = entry.id.split('-').collect(); - if let Some(session_prefix) = parts.first() { - entry.id = format!("{}-{}", session_prefix, i + 1); - } - } - - merged -} - -/// Normalize text for comparison by lowercasing and collapsing whitespace. -fn normalize_text(text: &str) -> String { - text.to_lowercase() - .split_whitespace() - .collect::<Vec<_>>() - .join(" ") -} - -/// Calculate text similarity as a ratio of shared words. -fn text_similarity(a: &str, b: &str) -> f32 { - if a.is_empty() || b.is_empty() { - return 0.0; - } - - let words_a: std::collections::HashSet<&str> = a.split_whitespace().collect(); - let words_b: std::collections::HashSet<&str> = b.split_whitespace().collect(); - - let intersection = words_a.intersection(&words_b).count(); - let union = words_a.union(&words_b).count(); - - if union == 0 { - 0.0 - } else { - intersection as f32 / union as f32 - } -} - -/// Process audio using sliding window through STT and streaming diarization models. -/// -/// Only processes the last MAX_WINDOW_SECONDS of audio to maintain constant -/// processing time regardless of total audio length. -async fn process_audio_window( - samples: &[f32], - _audio_offset: f32, - ml_models: &MlModels, -) -> Result<Vec<DialogueSegment>, Box<dyn std::error::Error + Send + Sync>> { - // Apply sliding window - only process the last 30 seconds - let window_start = samples.len().saturating_sub(MAX_WINDOW_SAMPLES); - let window = &samples[window_start..]; - - tracing::trace!( - total_samples = samples.len(), - window_samples = window.len(), - window_start = window_start, - "Using sliding window for processing" - ); - - // Acquire model locks and run inference - let mut parakeet = ml_models.parakeet.lock().await; - let mut sortformer = ml_models.sortformer.lock().await; - - // Run streaming diarization (maintains speaker cache across calls) - let diarization_segments = - sortformer.diarize_streaming(window.to_vec(), TARGET_SAMPLE_RATE, TARGET_CHANNELS)?; - - // Run transcription - let transcription = parakeet.transcribe_samples( - window.to_vec(), - TARGET_SAMPLE_RATE, - TARGET_CHANNELS, - Some(TimestampMode::Sentences), - )?; - - // Align speakers with transcription - let aligned = align_speakers(&transcription.tokens, &diarization_segments); - - // Adjust timestamps for window offset within the buffer - let window_offset = window_start as f32 / TARGET_SAMPLE_RATE as f32; - let adjusted: Vec<DialogueSegment> = aligned - .into_iter() - .map(|seg| DialogueSegment { - speaker: seg.speaker, - start: seg.start + window_offset, - end: seg.end + window_offset, - text: seg.text, - }) - .collect(); - - Ok(adjusted) -} diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index be5387e..6ba4c8b 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -274,35 +274,16 @@ pub async fn create_task( let _ = repository::record_history_event( pool, auth.owner_id, - task.contract_id, Some(task.id), "task", Some("created"), - None, serde_json::json!({ "name": &task.name, - "isSupervisor": task.is_supervisor, }), ).await; - // Notify supervisor of new task creation if task belongs to a contract - if let Some(contract_id) = task.contract_id { - if !task.is_supervisor { - let pool = pool.clone(); - let state_clone = state.clone(); - let task_clone = task.clone(); - tokio::spawn(async move { - if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await { - state_clone.notify_supervisor_of_task_created( - supervisor.id, - supervisor.daemon_id, - task_clone.id, - &task_clone.name, - ).await; - } - }); - } - } + // Supervisor notification on task creation removed alongside + // legacy contracts. (StatusCode::CREATED, Json(task)).into_response() } Err(e) => { @@ -352,26 +333,6 @@ pub async fn update_task( .into_response(); }; - // Check if trying to set a supervisor task to a terminal status - if let Some(ref new_status) = req.status { - let terminal_statuses = ["done", "failed", "merged"]; - if terminal_statuses.contains(&new_status.as_str()) { - // Get the task to check if it's a supervisor - if let Ok(Some(task)) = repository::get_task_for_owner(pool, id, auth.owner_id).await { - if task.is_supervisor { - return ( - StatusCode::BAD_REQUEST, - Json(ApiError::new( - "SUPERVISOR_CANNOT_COMPLETE", - "Supervisor tasks cannot be marked as done, failed, or merged. They run for the lifetime of the contract.", - )), - ) - .into_response(); - } - } - } - } - // Track which fields are being updated for the notification let mut updated_fields = Vec::new(); if req.name.is_some() { @@ -410,26 +371,9 @@ pub async fn update_task( updated_by: "user".to_string(), }); - // Notify supervisor of status change if task belongs to a contract - if let Some(contract_id) = task.contract_id { - if !task.is_supervisor && updated_fields_clone.contains(&"status".to_string()) { - let pool = pool.clone(); - let state_clone = state.clone(); - let task_clone = task.clone(); - tokio::spawn(async move { - if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await { - state_clone.notify_supervisor_of_task_update( - supervisor.id, - supervisor.daemon_id, - task_clone.id, - &task_clone.name, - &task_clone.status, - &updated_fields_clone, - ).await; - } - }); - } - } + // Supervisor notification on task update removed alongside + // legacy contracts. + let _ = updated_fields_clone; Json(task).into_response() } @@ -657,15 +601,10 @@ pub async fn start_task( .into_response(); } - // Get local_only and auto_merge_local flags from contract if task has one - let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id { - match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { - Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local), - _ => (false, false), - } - } else { - (false, false) - }; + // local_only / auto_merge_local used to come from the parent contract. + // With legacy contracts removed they default to false; the directive + // lifecycle handles its own completion now. + let (local_only, auto_merge_local) = (false, false); // Get list of daemons that have previously failed this task let mut exclude_daemon_ids: Vec<Uuid> = task.failed_daemon_ids.clone().unwrap_or_default(); @@ -708,8 +647,7 @@ pub async fn start_task( task_depth = task.depth, subtask_count = subtask_count, is_orchestrator = is_orchestrator, - is_supervisor = task.is_supervisor, - "Starting task with orchestrator/supervisor determination" + "Starting task" ); // IMPORTANT: Update database FIRST to assign daemon_id before sending command @@ -755,8 +693,6 @@ pub async fn start_task( completion_action: task.completion_action.clone(), continue_from_task_id: task.continue_from_task_id, copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), - contract_id: task.contract_id, - is_supervisor: task.is_supervisor, autonomous_loop: false, resume_session: false, conversation_history: None, @@ -764,13 +700,11 @@ pub async fn start_task( patch_base_sha: None, local_only, auto_merge_local, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: task.directive_id, }; tracing::info!( task_id = %id, - is_supervisor = task.is_supervisor, is_orchestrator = is_orchestrator, daemon_id = %target_daemon_id, "Sending SpawnTask command to daemon" @@ -811,8 +745,6 @@ pub async fn start_task( completion_action: task.completion_action.clone(), continue_from_task_id: task.continue_from_task_id, copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), - contract_id: task.contract_id, - is_supervisor: task.is_supervisor, autonomous_loop: false, resume_session: false, conversation_history: None, @@ -820,7 +752,6 @@ pub async fn start_task( patch_base_sha: None, local_only, auto_merge_local, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: task.directive_id, }; @@ -1128,11 +1059,10 @@ pub async fn send_message( // Check if task is in a state that can receive messages // Allow "running" and "starting" (to handle race between status update and message send) - // Also allow AUTH_CODE messages and supervisor tasks regardless of status + // Also allow AUTH_CODE messages regardless of status let is_auth_code = req.message.starts_with("AUTH_CODE:"); - let is_supervisor = task.is_supervisor; let can_receive_message = task.status == "running" || task.status == "starting"; - if !can_receive_message && !is_auth_code && !is_supervisor { + if !can_receive_message && !is_auth_code { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( @@ -1147,27 +1077,8 @@ pub async fn send_message( } // Find the daemon running this task - // For supervisors, if no daemon is assigned, find any available daemon for this owner let target_daemon_id = if let Some(daemon_id) = task.daemon_id { daemon_id - } else if is_supervisor { - // Supervisor without daemon - find one - match state.daemon_connections - .iter() - .find(|d| d.value().owner_id == auth.owner_id) - { - Some(entry) => entry.value().id, - None => { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(ApiError::new( - "NO_DAEMON", - "No daemon available. Please start a daemon.", - )), - ) - .into_response(); - } - } } else { return ( StatusCode::SERVICE_UNAVAILABLE, @@ -1206,15 +1117,7 @@ pub async fn send_message( }; if let Ok(Some(updated_task)) = repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await { - // Get local_only and auto_merge_local from contract if task has one - let (local_only, auto_merge_local) = if let Some(contract_id) = updated_task.contract_id { - match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { - Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local), - _ => (false, false), - } - } else { - (false, false) - }; + let (local_only, auto_merge_local) = (false, false); // Send spawn command to new daemon let spawn_cmd = DaemonCommand::SpawnTask { @@ -1231,8 +1134,6 @@ pub async fn send_message( completion_action: updated_task.completion_action.clone(), continue_from_task_id: updated_task.continue_from_task_id, copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), - contract_id: updated_task.contract_id, - is_supervisor: updated_task.is_supervisor, autonomous_loop: false, resume_session: false, conversation_history: None, @@ -1240,7 +1141,6 @@ pub async fn send_message( patch_base_sha: None, local_only, auto_merge_local, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: updated_task.directive_id, }; @@ -2433,13 +2333,11 @@ pub async fn commit_worktree( // Task Patches // ============================================================================= -/// Query parameters for listing task patches +/// Query parameters for listing task patches (legacy contract scope +/// removed; query is currently empty). #[derive(Debug, serde::Deserialize)] #[serde(rename_all = "camelCase")] -pub struct ListPatchesQuery { - /// Contract ID to scope the patches - pub contract_id: Uuid, -} +pub struct ListPatchesQuery {} /// Patch summary for API response #[derive(Debug, serde::Serialize, utoipa::ToSchema)] @@ -2453,8 +2351,6 @@ pub struct PatchSummary { pub description: Option<String>, /// Task ID pub task_id: Uuid, - /// Contract ID - pub contract_id: Uuid, /// Number of files in the patch pub files_count: i32, /// Total lines added (estimated from patch size) @@ -2523,14 +2419,8 @@ pub async fn list_task_patches( } }; - // Verify task belongs to the specified contract - if task.contract_id != Some(query.contract_id) { - return ( - StatusCode::BAD_REQUEST, - Json(ApiError::new("INVALID_CONTRACT", "Task does not belong to the specified contract")), - ) - .into_response(); - } + // Legacy contract verification removed; checkpoint patches are + // accessible to any owner of the task. // Get checkpoint patches for this task let patches = match repository::list_checkpoint_patches(pool, id).await { @@ -2586,7 +2476,6 @@ pub async fn list_task_patches( name, description, task_id: p.task_id, - contract_id: query.contract_id, files_count: p.files_count, lines_added, lines_removed, @@ -3040,12 +2929,10 @@ pub async fn reassign_task( // Create a NEW task with the conversation context let create_req = CreateTaskRequest { - contract_id: task.contract_id, name: format!("{} (resumed)", task.name), description: task.description.clone(), plan: updated_plan.clone(), parent_task_id: task.parent_task_id, - is_supervisor: task.is_supervisor, priority: task.priority, repository_url: task.repository_url.clone(), base_branch: task.base_branch.clone(), @@ -3058,7 +2945,6 @@ pub async fn reassign_task( checkpoint_sha: task.last_checkpoint_sha.clone(), branched_from_task_id: None, conversation_history: None, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: None, directive_step_id: None, }; @@ -3126,15 +3012,8 @@ pub async fn reassign_task( } }; - // Get local_only and auto_merge_local from contract if task has one - let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id { - match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { - Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local), - _ => (false, false), - } - } else { - (false, false) - }; + // Legacy contract scope removed; defaults to false. + let (local_only, auto_merge_local) = (false, false); // Send SpawnTask command to daemon for the new task let command = DaemonCommand::SpawnTask { @@ -3151,8 +3030,6 @@ pub async fn reassign_task( completion_action: task.completion_action.clone(), continue_from_task_id: Some(id), // Continue from old task's worktree copy_files: None, - contract_id: task.contract_id, - is_supervisor: task.is_supervisor, autonomous_loop: false, resume_session: false, conversation_history: None, @@ -3160,7 +3037,6 @@ pub async fn reassign_task( patch_base_sha, local_only, auto_merge_local, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: task.directive_id, }; @@ -3190,56 +3066,10 @@ pub async fn reassign_task( // Don't fail the request, the new task is already running } - // Notify the contract's supervisor about the reassignment (if applicable) - if let Some(contract_id) = task.contract_id { - if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { - if let Some(supervisor_task_id) = contract.supervisor_task_id { - // Don't notify if we're reassigning the supervisor itself - if supervisor_task_id != old_task_id { - // Find the supervisor's daemon and send a message - if let Ok(Some(supervisor_task)) = repository::get_task_for_owner(pool, supervisor_task_id, auth.owner_id).await { - if supervisor_task.status == "running" { - if let Some(supervisor_daemon_id) = supervisor_task.daemon_id { - // Find the daemon by its UUID - if let Some(daemon_entry) = state.daemon_connections.iter().find(|d| d.value().id == supervisor_daemon_id) { - let notification_msg = format!( - "\n\n[SYSTEM NOTIFICATION] Task '{}' (ID: {}) was reassigned due to daemon disconnect. \ - A new task '{}' (ID: {}) has been created to continue the work. \ - The new task has {} context entries from the previous conversation.\n\n", - task.name, - old_task_id, - final_task.name, - new_task.id, - context_entries - ); - - let notify_cmd = DaemonCommand::SendMessage { - task_id: supervisor_task_id, - message: notification_msg, - }; - - if let Err(e) = state.send_daemon_command(daemon_entry.value().id, notify_cmd).await { - tracing::warn!( - supervisor_id = %supervisor_task_id, - error = %e, - "Failed to notify supervisor about task reassignment" - ); - } else { - tracing::info!( - supervisor_id = %supervisor_task_id, - old_task_id = %old_task_id, - new_task_id = %new_task.id, - "Notified supervisor about task reassignment" - ); - } - } - } - } - } - } - } - } - } + // Supervisor reassignment notification removed alongside legacy + // contracts. The directive reconciler picks up reassigned tasks on + // its next tick. + let _ = context_entries; // Broadcast task update for the new task state.broadcast_task_update(TaskUpdateNotification { @@ -3467,15 +3297,8 @@ pub async fn continue_task( }; let is_orchestrator = task.depth == 0 && subtask_count > 0; - // Get local_only and auto_merge_local from contract if task has one - let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id { - match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { - Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local), - _ => (false, false), - } - } else { - (false, false) - }; + // Legacy contract scope removed; defaults to false. + let (local_only, auto_merge_local) = (false, false); // Send SpawnTask command to daemon let command = DaemonCommand::SpawnTask { @@ -3492,8 +3315,6 @@ pub async fn continue_task( completion_action: task.completion_action.clone(), continue_from_task_id: task.continue_from_task_id, copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), - contract_id: task.contract_id, - is_supervisor: task.is_supervisor, autonomous_loop: false, resume_session: false, conversation_history: None, @@ -3501,7 +3322,6 @@ pub async fn continue_task( patch_base_sha: None, local_only, auto_merge_local, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: task.directive_id, }; @@ -3509,7 +3329,6 @@ pub async fn continue_task( task_id = %id, daemon_id = %target_daemon_id, context_entries = context_entries, - is_supervisor = task.is_supervisor, "Continuing task with conversation context" ); @@ -3820,12 +3639,10 @@ pub async fn fork_task( // Create the new forked task let create_req = CreateTaskRequest { - contract_id: task.contract_id, name: req.new_task_name.clone(), description: task.description.clone(), plan: req.new_task_plan.clone(), parent_task_id: None, // Forked tasks are independent - is_supervisor: false, priority: task.priority, repository_url: task.repository_url.clone(), base_branch: task.base_branch.clone(), @@ -3838,7 +3655,6 @@ pub async fn fork_task( checkpoint_sha: Some(checkpoint.commit_sha.clone()), branched_from_task_id: None, conversation_history: None, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: None, directive_step_id: None, }; @@ -3980,12 +3796,10 @@ pub async fn resume_from_checkpoint( }); let create_req = CreateTaskRequest { - contract_id: task.contract_id, name: task_name, description: task.description.clone(), plan: req.plan, parent_task_id: None, - is_supervisor: false, priority: task.priority, repository_url: task.repository_url.clone(), base_branch: task.base_branch.clone(), @@ -3998,7 +3812,6 @@ pub async fn resume_from_checkpoint( checkpoint_sha: Some(checkpoint.commit_sha.clone()), branched_from_task_id: None, conversation_history: None, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: None, directive_step_id: None, }; @@ -4318,12 +4131,10 @@ pub async fn branch_task( // Create the branched task (anonymous - no contract_id) let create_req = CreateTaskRequest { - contract_id: None, // Anonymous task name: task_name, description: Some(format!("Branched from task: {}", source_task.name)), plan: req.message, parent_task_id: None, - is_supervisor: false, priority: source_task.priority, repository_url: source_task.repository_url.clone(), base_branch: source_task.base_branch.clone(), @@ -4336,7 +4147,6 @@ pub async fn branch_task( checkpoint_sha: None, branched_from_task_id: Some(source_task_id), conversation_history, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: None, directive_step_id: None, }; @@ -4357,11 +4167,9 @@ pub async fn branch_task( let _ = repository::record_history_event( pool, auth.owner_id, - None, // No contract for anonymous tasks Some(task.id), "task", Some("branched"), - None, serde_json::json!({ "name": &task.name, "sourceTaskId": source_task_id, @@ -4425,8 +4233,6 @@ pub async fn branch_task( completion_action: updated_task.completion_action.clone(), continue_from_task_id: updated_task.continue_from_task_id, copy_files: None, - contract_id: None, - is_supervisor: false, autonomous_loop: false, resume_session: message_count > 0, // Resume if we have conversation history conversation_history: updated_task.conversation_state.clone(), @@ -4434,7 +4240,6 @@ pub async fn branch_task( patch_base_sha, local_only: false, // No contract, so not local_only auto_merge_local: false, // No contract, so no auto_merge_local - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: None, }; diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 19d2166..9900385 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -262,27 +262,6 @@ pub enum DaemonMessage { #[serde(rename = "activeTasks")] active_tasks: Vec<Uuid>, }, - /// Enhanced supervisor heartbeat with detailed state - SupervisorHeartbeat { - #[serde(rename = "taskId")] - task_id: Uuid, - #[serde(rename = "contractId")] - contract_id: Uuid, - /// Supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted - state: String, - /// Current contract phase - phase: String, - /// Description of current activity - #[serde(rename = "currentActivity")] - current_activity: Option<String>, - /// Progress percentage (0-100) - progress: u8, - /// Task IDs the supervisor is waiting on - #[serde(rename = "pendingTaskIds")] - pending_task_ids: Vec<Uuid>, - /// Timestamp of this heartbeat - timestamp: DateTime<Utc>, - }, /// Task output streaming (stdout/stderr from Claude Code) TaskOutput { #[serde(rename = "taskId")] @@ -618,96 +597,6 @@ struct DaemonAuthResult { /// Automatically create a PR when all non-supervisor tasks for a contract are done. /// Only applies to remote-repo contracts in the "execute" phase. /// Fires as a best-effort operation — errors are logged but not propagated. -async fn auto_create_pr_if_ready( - pool: &sqlx::PgPool, - state: &SharedState, - contract_id: Uuid, - owner_id: Uuid, -) { - // 1. Load contract — must be remote (not local_only) and in execute phase - let contract = match repository::get_contract_for_owner(pool, contract_id, owner_id).await { - Ok(Some(c)) => c, - _ => return, - }; - if contract.local_only || contract.phase != "execute" { - return; - } - - // 2. Load non-supervisor tasks — all must be done - let tasks = match repository::list_tasks_by_contract(pool, contract_id, owner_id).await { - Ok(t) => t, - _ => return, - }; - let non_supervisor_tasks: Vec<_> = tasks.iter().filter(|t| !t.is_supervisor).collect(); - if non_supervisor_tasks.is_empty() || !non_supervisor_tasks.iter().all(|t| t.status == "done") { - return; - } - - // 3. Check pull-request deliverable not already complete - let completed_deliverables = contract.get_completed_deliverables(&contract.phase); - if completed_deliverables.contains(&"pull-request".to_string()) { - return; - } - - // 4. Check at least one repository has a remote URL - let repos = match repository::list_contract_repositories(pool, contract_id).await { - Ok(r) => r, - _ => return, - }; - if !repos.iter().any(|r| r.repository_url.is_some()) { - return; - } - - // 5. Load supervisor task - let supervisor = match repository::get_contract_supervisor_task(pool, contract_id).await { - Ok(Some(s)) => s, - _ => return, - }; - - // Need supervisor's daemon_id to send command - let daemon_id = match supervisor.daemon_id { - Some(id) => id, - None => return, - }; - - // 6. Construct branch name - let sanitized_name: String = supervisor - .name - .chars() - .map(|c| if c.is_alphanumeric() || c == '-' || c == '_' { c } else { '-' }) - .collect::<String>() - .to_lowercase(); - let short_id = &supervisor.id.to_string()[..8]; - let branch = format!("makima/{}-{}", sanitized_name, short_id); - - // 7. Send CreatePR command to supervisor's daemon - let command = DaemonCommand::CreatePR { - task_id: supervisor.id, - title: contract.name.clone(), - body: contract.description.clone(), - base_branch: supervisor.base_branch.clone(), - branch, - }; - - match state.send_daemon_command(daemon_id, command).await { - Ok(()) => { - tracing::info!( - contract_id = %contract_id, - supervisor_id = %supervisor.id, - "Auto-PR: sent CreatePR command to supervisor daemon" - ); - } - Err(e) => { - tracing::warn!( - contract_id = %contract_id, - error = %e, - "Auto-PR: failed to send CreatePR command" - ); - } - } -} - -/// Validate an API key and return (user_id, owner_id). async fn validate_daemon_api_key(pool: &sqlx::PgPool, key: &str) -> Result<DaemonAuthResult, String> { let key_hash = hash_api_key(key); @@ -983,83 +872,6 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re }); } } - Ok(DaemonMessage::SupervisorHeartbeat { - task_id, - contract_id, - state: supervisor_state, - phase, - current_activity, - progress, - pending_task_ids, - timestamp: _, - }) => { - tracing::debug!( - task_id = %task_id, - contract_id = %contract_id, - state = %supervisor_state, - phase = %phase, - progress = progress, - "Supervisor heartbeat received" - ); - - // Store heartbeat in database and update supervisor state (Task 3.3) - if let Some(ref pool) = state.db_pool { - let pool = pool.clone(); - let pending_ids = pending_task_ids.clone(); - let activity = current_activity.clone(); - let state_str = supervisor_state.clone(); - let phase_str = phase.clone(); - tokio::spawn(async move { - // Store the heartbeat record - if let Err(e) = repository::create_supervisor_heartbeat( - &pool, - task_id, - contract_id, - &state_str, - &phase_str, - activity.as_deref(), - progress as i32, - &pending_ids, - ).await { - tracing::warn!( - task_id = %task_id, - contract_id = %contract_id, - error = %e, - "Failed to store supervisor heartbeat" - ); - } - - // Update supervisor_states table (lightweight heartbeat state update - Task 3.3) - if let Err(e) = repository::update_supervisor_heartbeat_state( - &pool, - contract_id, - &state_str, - activity.as_deref(), - progress as i32, - &pending_ids, - ).await { - tracing::debug!( - contract_id = %contract_id, - error = %e, - "Failed to update supervisor state from heartbeat (may not exist yet)" - ); - } - - // Also update the daemon heartbeat - if let Ok(Some(task)) = repository::get_task(&pool, task_id).await { - if let Some(daemon_id) = task.daemon_id { - if let Err(e) = repository::update_daemon_heartbeat(&pool, daemon_id).await { - tracing::warn!( - daemon_id = %daemon_id, - error = %e, - "Failed to update daemon heartbeat from supervisor" - ); - } - } - } - }); - } - } Ok(DaemonMessage::TaskOutput { task_id, output, is_partial }) => { // Parse the output line and broadcast structured data if let Some(notification) = parse_claude_output(task_id, owner_id, &output, is_partial) { @@ -1120,136 +932,16 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re updated_by: "daemon".into(), }); - // Initialize or restore supervisor_state when supervisor task starts running (Task 3.4) - if updated_task.is_supervisor && new_status_owned == "running" { - if let Some(contract_id) = updated_task.contract_id { - // Check if supervisor state already exists (restoration scenario) - match repository::get_supervisor_state(&pool, contract_id).await { - Ok(Some(existing_state)) => { - // State exists - this is a restoration - tracing::info!( - task_id = %task_id, - contract_id = %contract_id, - existing_state = %existing_state.state, - restoration_count = existing_state.restoration_count, - "Supervisor starting with existing state - restoration in progress" - ); - - // Mark as restored (increments restoration_count) - match repository::mark_supervisor_restored( - &pool, - contract_id, - "daemon_restart", - ).await { - Ok(restored_state) => { - tracing::info!( - task_id = %task_id, - contract_id = %contract_id, - restoration_count = restored_state.restoration_count, - "Supervisor restoration marked" - ); - - // Check for pending questions to re-deliver - if let Ok(questions) = serde_json::from_value::<Vec<crate::db::models::PendingQuestion>>( - restored_state.pending_questions.clone() - ) { - if !questions.is_empty() { - tracing::info!( - contract_id = %contract_id, - question_count = questions.len(), - "Pending questions found for re-delivery" - ); - // Questions will be re-delivered by the supervisor when it restores - } - } - } - Err(e) => { - tracing::warn!( - task_id = %task_id, - contract_id = %contract_id, - error = %e, - "Failed to mark supervisor as restored" - ); - } - } - } - Ok(None) => { - // No existing state - fresh start - // Get contract to get its phase - match repository::get_contract_for_owner( - &pool, - contract_id, - updated_task.owner_id, - ).await { - Ok(Some(contract)) => { - match repository::upsert_supervisor_state( - &pool, - contract_id, - task_id, - serde_json::json!([]), // Empty conversation - &[], // No pending tasks - &contract.phase, - ).await { - Ok(_) => { - tracing::info!( - task_id = %task_id, - contract_id = %contract_id, - phase = %contract.phase, - "Initialized fresh supervisor state" - ); - } - Err(e) => { - tracing::warn!( - task_id = %task_id, - contract_id = %contract_id, - error = %e, - "Failed to initialize supervisor state" - ); - } - } - } - Ok(None) => { - tracing::warn!( - task_id = %task_id, - contract_id = %contract_id, - "Contract not found when initializing supervisor state" - ); - } - Err(e) => { - tracing::warn!( - task_id = %task_id, - contract_id = %contract_id, - error = %e, - "Failed to get contract for supervisor state" - ); - } - } - } - Err(e) => { - tracing::warn!( - task_id = %task_id, - contract_id = %contract_id, - error = %e, - "Failed to check existing supervisor state" - ); - } - } - } - } - // Record history event when task starts running if new_status_owned == "running" { let _ = repository::record_history_event( &pool, updated_task.owner_id, - updated_task.contract_id, Some(task_id), "task", Some("started"), - None, serde_json::json!({ "name": &updated_task.name, - "isSupervisor": updated_task.is_supervisor, }), ).await; } @@ -1329,51 +1021,19 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re updated_by: "daemon".into(), }); - // Notify supervisor if this task belongs to a contract - if let Some(contract_id) = updated_task.contract_id { - // Don't notify for supervisor tasks (they don't report to themselves) - if !updated_task.is_supervisor { - if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await { - // action_directive used to come from - // compute_action_directive (now removed alongside the - // LLM module). Passing None preserves the existing - // supervisor protocol; the auto-PR path below still - // fires when every task is done. - state.notify_supervisor_of_task_completion( - supervisor.id, - supervisor.daemon_id, - updated_task.id, - &updated_task.name, - &updated_task.status, - updated_task.progress_summary.as_deref(), - updated_task.error_message.as_deref(), - None, - ).await; - } - } - } - - // Auto-create PR if all tasks are done and repo is remote - if updated_task.status == "done" { - if let Some(contract_id) = updated_task.contract_id { - let pool_c = pool.clone(); - let state_c = state.clone(); - tokio::spawn(async move { - auto_create_pr_if_ready(&pool_c, &state_c, contract_id, owner_id).await; - }); - } - } + // Supervisor notification + auto-PR removed alongside + // legacy contracts. Directive completion is handled + // by the directive reconciler. + let _ = owner_id; // Record history event for task completion let subtype = if updated_task.status == "done" { "completed" } else { "failed" }; let _ = repository::record_history_event( &pool, updated_task.owner_id, - updated_task.contract_id, Some(task_id), "task", Some(subtype), - None, serde_json::json!({ "name": &updated_task.name, "status": &updated_task.status, @@ -1962,16 +1622,13 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re }); // Record history event for checkpoint - // Get task to get contract_id if let Ok(Some(task)) = repository::get_task(pool, task_id).await { let _ = repository::record_history_event( pool, task.owner_id, - task.contract_id, Some(task_id), "checkpoint", Some("created"), - None, serde_json::json!({ "checkpointNumber": checkpoint.checkpoint_number, "commitSha": &sha, @@ -2103,28 +1760,6 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re conflicts: conflicts.clone(), }); - // On successful merge, notify supervisor to check if all merges complete - if success { - if let Some(pool) = state.db_pool.as_ref() { - if let Ok(Some(task)) = repository::get_task(pool, task_id).await { - if let Some(contract_id) = task.contract_id { - if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(pool, contract_id).await { - let prompt = format!( - "[INFO] Merge completed: {}\n\ - Check if all tasks are merged with `makima supervisor tasks`.\n\ - If ready, create PR with `makima supervisor pr`.", - message - ); - let _ = state.notify_supervisor( - supervisor.id, - supervisor.daemon_id, - &prompt, - ).await; - } - } - } - } - } } Ok(DaemonMessage::PRCreated { task_id, @@ -2150,52 +1785,6 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re pr_number, }); - // Notify supervisor of PR result (both success and failure) - if let Some(pool) = state.db_pool.as_ref() { - if let Ok(Some(task)) = repository::get_task(pool, task_id).await { - if let Some(contract_id) = task.contract_id { - if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(pool, contract_id).await { - let prompt = if success { - // Get contract to determine next action - let next_action = if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, task.owner_id).await { - match (contract.contract_type.as_str(), contract.phase.as_str()) { - ("simple", "execute") => { - "Mark contract complete with `makima supervisor complete`".to_string() - } - ("specification", "execute") => { - "Advance to review phase with `makima supervisor advance-phase review`".to_string() - } - _ => "Check contract status with `makima supervisor status`".to_string() - } - } else { - "Check contract status with `makima supervisor status`".to_string() - }; - - format!( - "[ACTION REQUIRED] PR created successfully!\n\ - PR: {}\n\n\ - Next step: {}", - pr_url.as_deref().unwrap_or(&message), - next_action - ) - } else { - format!( - "[ERROR] PR creation failed for task {}:\n\ - {}\n\n\ - Please fix the issue and retry with `makima supervisor pr`.", - task_id, - message - ) - }; - let _ = state.notify_supervisor( - supervisor.id, - supervisor.daemon_id, - &prompt, - ).await; - } - } - } - } } Ok(DaemonMessage::GitConfigInherited { success, diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index ebde52b..4a9a00b 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -1,7 +1,13 @@ -//! HTTP handlers for supervisor-specific mesh operations. +//! Question + order backchannel for directive-spawned tasks. //! -//! These endpoints are used by supervisor tasks (via supervisor.sh) to orchestrate -//! contract work: spawning tasks, waiting for completion, reading worktree files, etc. +//! Originally a much larger handler that orchestrated contract-supervisor +//! task trees (spawn / wait / merge / PR / etc.). Legacy contracts and +//! supervisor tasks have been removed; what remains is the in-memory +//! question machinery (`makima directive ask`) and order creation +//! (`makima directive create-order`). +//! +//! Module name is kept as `mesh_supervisor` for route-path stability — +//! the CLI client still hits `/api/v1/mesh/supervisor/...` endpoints. use axum::{ extract::{Path, State}, @@ -9,238 +15,38 @@ use axum::{ response::IntoResponse, Json, }; -use base64::Engine; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use uuid::Uuid; -use crate::db::models::{CreateOrderRequest, CreateTaskRequest, PendingQuestion, Task, TaskSummary, UpdateTaskRequest}; +use crate::db::models::CreateOrderRequest; use crate::db::repository; -use sqlx::PgPool; use crate::server::auth::Authenticated; use crate::server::handlers::mesh::{extract_auth, AuthSource}; use crate::server::messages::ApiError; -use crate::server::state::{DaemonCommand, SharedState, TaskOutputNotification, TaskUpdateNotification}; - -// ============================================================================= -// Request/Response Types -// ============================================================================= - -/// Request to spawn a new task from supervisor. -#[derive(Debug, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct SpawnTaskRequest { - pub name: String, - pub plan: String, - pub contract_id: Uuid, - pub parent_task_id: Option<Uuid>, - pub checkpoint_sha: Option<String>, - /// Repository URL for the task (optional - if not provided, will be looked up from contract). - pub repository_url: Option<String>, -} - -/// Request to wait for task completion. -#[derive(Debug, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct WaitForTaskRequest { - #[serde(default = "default_timeout")] - pub timeout_seconds: i32, -} - -fn default_timeout() -> i32 { - 300 -} - -/// Request to read a file from task worktree. -#[derive(Debug, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ReadWorktreeFileRequest { - pub file_path: String, -} - -/// Request to ask a question and wait for user feedback. -#[derive(Debug, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct AskQuestionRequest { - /// The question to ask the user - pub question: String, - /// Optional choices (if empty, free-form text response) - #[serde(default)] - pub choices: Vec<String>, - /// Optional context about what this relates to - pub context: Option<String>, - /// How long to wait for a response (seconds) - #[serde(default = "default_question_timeout")] - pub timeout_seconds: i32, - /// When true, the request will block indefinitely until user responds (no timeout) - #[serde(default)] - pub phaseguard: bool, - /// When true, allow selecting multiple choices (response will be comma-separated) - #[serde(default)] - pub multi_select: bool, - /// When true, return immediately without waiting for response - #[serde(default)] - pub non_blocking: bool, - /// Question type: general, phase_confirmation, or contract_complete - #[serde(default = "default_question_type")] - pub question_type: String, -} - -fn default_question_type() -> String { - "general".to_string() -} - -fn default_question_timeout() -> i32 { - 3600 // 1 hour default -} - -/// Response from asking a question. -#[derive(Debug, Serialize, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct AskQuestionResponse { - /// The question ID for tracking - pub question_id: Uuid, - /// The user's response (None if timed out) - pub response: Option<String>, - /// Whether the question timed out - pub timed_out: bool, - /// Whether the question is still pending (server-side timeout reached but question not removed). - /// The client should poll the poll endpoint to continue waiting. - #[serde(default)] - pub still_pending: bool, -} - -/// Request to answer a supervisor question. -#[derive(Debug, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct AnswerQuestionRequest { - /// The user's response - pub response: String, -} - -/// Response to answering a question. -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct AnswerQuestionResponse { - /// Whether the answer was accepted - pub success: bool, -} - -/// Pending question summary. -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct PendingQuestionSummary { - pub question_id: Uuid, - pub task_id: Uuid, - pub contract_id: Uuid, - /// Directive this question relates to (if from a directive task) - #[serde(skip_serializing_if = "Option::is_none")] - pub directive_id: Option<Uuid>, - pub question: String, - pub choices: Vec<String>, - pub context: Option<String>, - pub created_at: chrono::DateTime<chrono::Utc>, - /// Whether multiple choices can be selected - #[serde(default)] - pub multi_select: bool, - /// Question type: general, phase_confirmation, or contract_complete - #[serde(default)] - pub question_type: String, -} - -/// Request to create a checkpoint. -#[derive(Debug, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct CreateCheckpointRequest { - pub message: String, -} - -/// Response for task tree. -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct TaskTreeResponse { - pub tasks: Vec<TaskSummary>, - pub supervisor_task_id: Option<Uuid>, - pub total_count: usize, -} - -/// Response for wait operation. -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct WaitResponse { - pub task_id: Uuid, - pub status: String, - pub completed: bool, - pub output_summary: Option<String>, -} - -/// Response for read file operation. -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ReadFileResponse { - pub task_id: Uuid, - pub file_path: String, - pub content: String, - pub exists: bool, -} - -/// Response for checkpoint operations. -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct CheckpointResponse { - pub task_id: Uuid, - pub checkpoint_number: i32, - pub commit_sha: String, - pub message: String, -} - -/// Task checkpoint info. -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct TaskCheckpoint { - pub id: Uuid, - pub task_id: Uuid, - pub checkpoint_number: i32, - pub commit_sha: String, - pub branch_name: String, - pub message: String, - pub files_changed: Option<serde_json::Value>, - pub lines_added: i32, - pub lines_removed: i32, - pub created_at: chrono::DateTime<chrono::Utc>, -} - -/// Response for list checkpoints. -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct CheckpointListResponse { - pub task_id: Uuid, - pub checkpoints: Vec<TaskCheckpoint>, -} +use crate::server::state::SharedState; // ============================================================================= -// Helper Functions +// Auth helper // ============================================================================= -/// Verify the request comes from a supervisor task and extract ownership info. -async fn verify_supervisor_auth( +/// Verify the request comes from a directive task (tool-key auth) and +/// return the calling task id + owner id. +async fn verify_task_auth( state: &SharedState, headers: &HeaderMap, - contract_id: Option<Uuid>, ) -> Result<(Uuid, Uuid), (StatusCode, Json<ApiError>)> { let auth = extract_auth(state, headers); - let task_id = match auth { AuthSource::ToolKey(task_id) => task_id, _ => { return Err(( StatusCode::UNAUTHORIZED, - Json(ApiError::new("UNAUTHORIZED", "Supervisor endpoints require tool key auth")), + Json(ApiError::new("UNAUTHORIZED", "These endpoints require tool key auth")), )); } }; - // Get the task to verify it's a supervisor and get owner_id let pool = state.db_pool.as_ref().ok_or_else(|| { ( StatusCode::SERVICE_UNAVAILABLE, @@ -251,10 +57,10 @@ async fn verify_supervisor_auth( let task = repository::get_task(pool, task_id) .await .map_err(|e| { - tracing::error!(error = %e, "Failed to get supervisor task"); + tracing::error!(error = %e, "Failed to load task"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to verify supervisor")), + Json(ApiError::new("DB_ERROR", "Failed to load task")), ) })? .ok_or_else(|| { @@ -264,1411 +70,113 @@ async fn verify_supervisor_auth( ) })?; - // Verify task is a supervisor or a directive task - if !task.is_supervisor && task.directive_id.is_none() { + // Only directive-attached tasks may use this backchannel. + if task.directive_id.is_none() { return Err(( StatusCode::FORBIDDEN, - Json(ApiError::new("NOT_SUPERVISOR", "Only supervisor or directive tasks can use these endpoints")), + Json(ApiError::new( + "NOT_DIRECTIVE_TASK", + "Only directive-attached tasks can use these endpoints", + )), )); } - // If contract_id provided, verify the supervisor belongs to that contract - if let Some(cid) = contract_id { - if task.contract_id != Some(cid) { - return Err(( - StatusCode::FORBIDDEN, - Json(ApiError::new("CONTRACT_MISMATCH", "Supervisor does not belong to this contract")), - )); - } - } - Ok((task_id, task.owner_id)) } -/// Try to start a pending task on an available daemon. -/// Returns Ok(Some(task)) if a task was started, Ok(None) if no tasks could be started. -/// For retried tasks, excludes daemons that previously failed the task and includes -/// checkpoint patch data for worktree recovery. -pub async fn try_start_pending_task( - state: &SharedState, - contract_id: Uuid, - owner_id: Uuid, -) -> Result<Option<Task>, String> { - let pool = state.db_pool.as_ref().ok_or("Database not configured")?; - - // Get pending tasks for this contract (includes interrupted tasks awaiting retry) - let pending_tasks = repository::get_pending_tasks_for_contract(pool, contract_id, owner_id) - .await - .map_err(|e| format!("Failed to get pending tasks: {}", e))?; - - if pending_tasks.is_empty() { - return Ok(None); - } - - // Get contract to check local_only flag - let contract = repository::get_contract_for_owner(pool, contract_id, owner_id) - .await - .map_err(|e| format!("Failed to get contract: {}", e))? - .ok_or_else(|| "Contract not found".to_string())?; - - // Try each pending task until we find one we can start - for task in &pending_tasks { - // Get excluded daemon IDs for this task (daemons that have already failed it) - let exclude_ids: Vec<Uuid> = task.failed_daemon_ids.clone().unwrap_or_default(); - - // Get available daemons excluding failed ones for this task - let daemons = repository::get_available_daemons_excluding(pool, owner_id, &exclude_ids) - .await - .map_err(|e| format!("Failed to get available daemons: {}", e))?; - - // Find a daemon with capacity - let available_daemon = daemons.iter().find(|d| { - d.current_task_count < d.max_concurrent_tasks - && state.daemon_connections.contains_key(&d.connection_id) - }); - - let daemon = match available_daemon { - Some(d) => d, - None => continue, // Try next task - }; - - // Get repo URL from task or contract - let repo_url = if let Some(url) = &task.repository_url { - Some(url.clone()) - } else { - match repository::list_contract_repositories(pool, contract_id).await { - Ok(repos) => repos - .iter() - .find(|r| r.is_primary) - .or(repos.first()) - .and_then(|r| r.repository_url.clone().or_else(|| r.local_path.clone())), - Err(_) => None, - } - }; - - // Update task with daemon assignment - let update_req = UpdateTaskRequest { - status: Some("starting".to_string()), - daemon_id: Some(daemon.id), - version: Some(task.version), - ..Default::default() - }; - - let updated_task = match repository::update_task_for_owner(pool, task.id, owner_id, update_req).await { - Ok(Some(t)) => t, - Ok(None) => continue, // Task was modified concurrently, try next - Err(e) => { - tracing::warn!(task_id = %task.id, error = %e, "Failed to update task for daemon assignment"); - continue; // Try next task - } - }; - - // For retried tasks, fetch checkpoint patch for worktree recovery - let (patch_data, patch_base_sha) = if task.retry_count > 0 { - // This is a retry - try to restore from checkpoint - match repository::get_latest_checkpoint_patch(pool, task.id).await { - Ok(Some(patch)) => { - tracing::info!( - task_id = %task.id, - retry_count = task.retry_count, - patch_size = patch.patch_size_bytes, - base_sha = %patch.base_commit_sha, - "Including checkpoint patch for task retry recovery" - ); - let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data); - (Some(encoded), Some(patch.base_commit_sha)) - } - Ok(None) => { - tracing::debug!(task_id = %task.id, "No checkpoint patch found for retry"); - (None, None) - } - Err(e) => { - tracing::warn!(task_id = %task.id, error = %e, "Failed to fetch checkpoint patch for retry"); - (None, None) - } - } - } else { - (None, None) - }; - - // Send spawn command - let cmd = DaemonCommand::SpawnTask { - task_id: updated_task.id, - task_name: updated_task.name.clone(), - plan: updated_task.plan.clone(), - repo_url, - base_branch: updated_task.base_branch.clone(), - target_branch: updated_task.target_branch.clone(), - parent_task_id: updated_task.parent_task_id, - depth: updated_task.depth, - is_orchestrator: false, - target_repo_path: updated_task.target_repo_path.clone(), - completion_action: updated_task.completion_action.clone(), - continue_from_task_id: updated_task.continue_from_task_id, - copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), - contract_id: updated_task.contract_id, - is_supervisor: updated_task.is_supervisor, - autonomous_loop: updated_task.is_supervisor, - resume_session: task.retry_count > 0, // Use --continue for retried tasks - conversation_history: None, - patch_data, - patch_base_sha, - local_only: contract.local_only, - auto_merge_local: contract.auto_merge_local, - // For retried tasks, use their own worktree (they already have state from previous attempt) - supervisor_worktree_task_id: None, - directive_id: updated_task.directive_id, - }; - - if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { - tracing::warn!(error = %e, daemon_id = %daemon.id, task_id = %task.id, "Failed to send spawn command"); - // Rollback - let rollback_req = UpdateTaskRequest { - status: Some("pending".to_string()), - clear_daemon_id: true, - ..Default::default() - }; - let _ = repository::update_task_for_owner(pool, task.id, owner_id, rollback_req).await; - continue; // Try next task - } - - tracing::info!(task_id = %task.id, daemon_id = %daemon.id, "Started pending task from wait loop"); - return Ok(Some(updated_task)); - } - - // No tasks could be started - Ok(None) -} - -// ============================================================================= -// Contract Task Handlers -// ============================================================================= - -/// List all tasks in a contract's tree. -#[utoipa::path( - get, - path = "/api/v1/mesh/supervisor/contracts/{contract_id}/tasks", - params( - ("contract_id" = Uuid, Path, description = "Contract ID") - ), - responses( - (status = 200, description = "List of tasks in contract", body = TaskTreeResponse), - (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - not a supervisor"), - (status = 500, description = "Internal server error"), - ), - tag = "Mesh Supervisor" -)] -pub async fn list_contract_tasks( - State(state): State<SharedState>, - Path(contract_id): Path<Uuid>, - headers: HeaderMap, -) -> impl IntoResponse { - let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, Some(contract_id)).await { - Ok(ids) => ids, - Err(e) => return e.into_response(), - }; - - let pool = state.db_pool.as_ref().unwrap(); - - // Get all tasks for this contract - match repository::list_tasks_by_contract(pool, contract_id, owner_id).await { - Ok(tasks) => { - let supervisor_task_id = tasks.iter().find(|t| t.is_supervisor).map(|t| t.id); - let summaries: Vec<TaskSummary> = tasks.into_iter().map(TaskSummary::from).collect(); - let total_count = summaries.len(); - - ( - StatusCode::OK, - Json(TaskTreeResponse { - tasks: summaries, - supervisor_task_id, - total_count, - }), - ).into_response() - } - Err(e) => { - tracing::error!(error = %e, "Failed to list contract tasks"); - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to list tasks")), - ).into_response() - } - } -} - -/// Get full task tree structure for a contract. -#[utoipa::path( - get, - path = "/api/v1/mesh/supervisor/contracts/{contract_id}/tree", - params( - ("contract_id" = Uuid, Path, description = "Contract ID") - ), - responses( - (status = 200, description = "Task tree structure", body = TaskTreeResponse), - (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - not a supervisor"), - (status = 500, description = "Internal server error"), - ), - tag = "Mesh Supervisor" -)] -pub async fn get_contract_tree( - State(state): State<SharedState>, - Path(contract_id): Path<Uuid>, - headers: HeaderMap, -) -> impl IntoResponse { - // Same as list_contract_tasks for now - can add tree structure later - list_contract_tasks(State(state), Path(contract_id), headers).await -} - -// ============================================================================= -// Task Spawn Handler -// ============================================================================= - -/// Spawn a new task (supervisor only). -#[utoipa::path( - post, - path = "/api/v1/mesh/supervisor/tasks", - request_body = SpawnTaskRequest, - responses( - (status = 201, description = "Task created", body = Task), - (status = 400, description = "Invalid request"), - (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - not a supervisor"), - (status = 500, description = "Internal server error"), - ), - tag = "Mesh Supervisor" -)] -pub async fn spawn_task( - State(state): State<SharedState>, - headers: HeaderMap, - Json(request): Json<SpawnTaskRequest>, -) -> impl IntoResponse { - let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, Some(request.contract_id)).await { - Ok(ids) => ids, - Err(e) => return e.into_response(), - }; - - let pool = state.db_pool.as_ref().unwrap(); - - // Verify contract exists and get local_only flag - let contract = match repository::get_contract_for_owner(pool, request.contract_id, owner_id).await { - Ok(Some(c)) => c, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Contract not found")), - ).into_response(); - } - Err(e) => { - tracing::error!(error = %e, "Failed to get contract"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to get contract")), - ).into_response(); - } - }; - - // Get repository URL - either from request or from contract's repositories - let repo_url = if let Some(url) = request.repository_url.clone() { - if !url.trim().is_empty() { - Some(url) - } else { - None - } - } else { - None - }; - - // If no repo URL provided, look it up from the contract - let repo_url = match repo_url { - Some(url) => Some(url), - None => { - match repository::list_contract_repositories(pool, request.contract_id).await { - Ok(repos) => { - // Prefer primary repo, fallback to first repo - let repo = repos.iter() - .find(|r| r.is_primary) - .or(repos.first()); - - // Use repository_url if set, otherwise use local_path - repo.and_then(|r| { - r.repository_url.clone() - .or_else(|| r.local_path.clone()) - }) - } - Err(e) => { - tracing::warn!(error = %e, "Failed to get contract repositories"); - None - } - } - } - }; - - // Validate that we have a repo URL - if repo_url.is_none() { - return ( - StatusCode::BAD_REQUEST, - Json(ApiError::new("MISSING_REPO_URL", "No repository URL found. Either provide one or ensure the contract has repositories configured.")), - ).into_response(); - } - - // Create task request - // All tasks share the supervisor's worktree - let supervisor_worktree_task_id = Some(supervisor_id); - - let create_req = CreateTaskRequest { - name: request.name.clone(), - description: None, - plan: request.plan.clone(), - repository_url: repo_url.clone(), - contract_id: Some(request.contract_id), - parent_task_id: request.parent_task_id, - is_supervisor: false, - checkpoint_sha: request.checkpoint_sha.clone(), - merge_mode: Some("manual".to_string()), - priority: 0, - base_branch: None, - target_branch: None, - target_repo_path: None, - completion_action: None, - continue_from_task_id: None, - copy_files: None, - branched_from_task_id: None, - conversation_history: None, - supervisor_worktree_task_id, - directive_id: None, - directive_step_id: None, - }; - - // Create task in DB - let task = match repository::create_task_for_owner(pool, owner_id, create_req).await { - Ok(t) => t, - Err(e) => { - tracing::error!(error = %e, "Failed to create task"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to create task")), - ).into_response(); - } - }; - - tracing::info!( - supervisor_id = %supervisor_id, - task_id = %task.id, - task_name = %task.name, - "Supervisor spawned new task" - ); - - // Record history event for task spawned by supervisor - let _ = repository::record_history_event( - pool, - owner_id, - task.contract_id, - Some(task.id), - "task", - Some("spawned"), - None, - serde_json::json!({ - "name": &task.name, - "spawnedBy": supervisor_id.to_string(), - }), - ).await; - - // Broadcast task creation notification to WebSocket subscribers - state.broadcast_task_update(TaskUpdateNotification { - task_id: task.id, - owner_id: Some(owner_id), - version: task.version, - status: task.status.clone(), - updated_fields: vec!["created".to_string()], - updated_by: "supervisor".to_string(), - }); - - // Start task on a daemon - // Find a daemon that belongs to this owner - let mut updated_task = task; - for entry in state.daemon_connections.iter() { - let daemon = entry.value(); - if daemon.owner_id == owner_id { - // IMPORTANT: Update database FIRST to assign daemon_id before sending command - // This prevents race conditions where the task starts but daemon_id is not set - let update_req = UpdateTaskRequest { - status: Some("starting".to_string()), - daemon_id: Some(daemon.id), - version: Some(updated_task.version), - ..Default::default() - }; - - match repository::update_task_for_owner(pool, updated_task.id, owner_id, update_req).await { - Ok(Some(t)) => { - updated_task = t; - } - Ok(None) => { - tracing::warn!(task_id = %updated_task.id, "Task not found when updating daemon_id"); - break; - } - Err(e) => { - tracing::error!(task_id = %updated_task.id, error = %e, "Failed to update task with daemon_id"); - break; - } - } - - // Send spawn command to daemon - let cmd = DaemonCommand::SpawnTask { - task_id: updated_task.id, - task_name: updated_task.name.clone(), - plan: updated_task.plan.clone(), - repo_url: repo_url.clone(), - base_branch: updated_task.base_branch.clone(), - target_branch: updated_task.target_branch.clone(), - parent_task_id: updated_task.parent_task_id, - depth: updated_task.depth, - is_orchestrator: false, - target_repo_path: updated_task.target_repo_path.clone(), - completion_action: updated_task.completion_action.clone(), - continue_from_task_id: updated_task.continue_from_task_id, - copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), - contract_id: updated_task.contract_id, - is_supervisor: false, - autonomous_loop: false, - resume_session: false, - conversation_history: None, - patch_data: None, - patch_base_sha: None, - local_only: contract.local_only, - auto_merge_local: contract.auto_merge_local, - // All tasks share the supervisor's worktree - supervisor_worktree_task_id: Some(supervisor_id), - directive_id: updated_task.directive_id, - }; - - if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { - tracing::warn!(error = %e, daemon_id = %daemon.id, "Failed to send spawn command"); - // Rollback: clear daemon_id and reset status since command failed - let rollback_req = UpdateTaskRequest { - status: Some("pending".to_string()), - clear_daemon_id: true, - ..Default::default() - }; - let _ = repository::update_task_for_owner(pool, updated_task.id, owner_id, rollback_req).await; - } else { - tracing::info!(task_id = %updated_task.id, daemon_id = %daemon.id, repo_url = ?repo_url, "Task spawn command sent"); - - // Save state: task spawn is a key save point (Task 3.3) - save_state_on_task_spawn(pool, request.contract_id, updated_task.id).await; - - // Broadcast task status update notification to WebSocket subscribers - state.broadcast_task_update(TaskUpdateNotification { - task_id: updated_task.id, - owner_id: Some(owner_id), - version: updated_task.version, - status: "starting".to_string(), - updated_fields: vec!["status".to_string(), "daemon_id".to_string()], - updated_by: "supervisor".to_string(), - }); - - } - break; - } - } - - (StatusCode::CREATED, Json(updated_task)).into_response() -} - // ============================================================================= -// Wait for Task Handler +// Question types // ============================================================================= -/// Wait for a task to complete. -#[utoipa::path( - post, - path = "/api/v1/mesh/supervisor/tasks/{task_id}/wait", - params( - ("task_id" = Uuid, Path, description = "Task ID to wait for") - ), - request_body = WaitForTaskRequest, - responses( - (status = 200, description = "Task completed or timed out", body = WaitResponse), - (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - not a supervisor"), - (status = 404, description = "Task not found"), - (status = 500, description = "Internal server error"), - ), - tag = "Mesh Supervisor" -)] -pub async fn wait_for_task( - State(state): State<SharedState>, - Path(task_id): Path<Uuid>, - headers: HeaderMap, - Json(request): Json<WaitForTaskRequest>, -) -> impl IntoResponse { - let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { - Ok(ids) => ids, - Err(e) => return e.into_response(), - }; - - let pool = state.db_pool.as_ref().unwrap(); - - // Verify task belongs to same owner - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Task not found")), - ).into_response(); - } - Err(e) => { - tracing::error!(error = %e, "Failed to get task"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to get task")), - ).into_response(); - } - }; - - // Check if already done - if task.status == "done" || task.status == "failed" || task.status == "merged" { - return ( - StatusCode::OK, - Json(WaitResponse { - task_id, - status: task.status, - completed: true, - output_summary: None, - }), - ).into_response(); - } - - // Get contract_id for pending task scheduling - let contract_id = task.contract_id; - - // Subscribe to task completions - let mut rx = state.task_completions.subscribe(); - let timeout = tokio::time::Duration::from_secs(request.timeout_seconds as u64); - - // Wait for completion or timeout, periodically trying to start pending tasks - let result = tokio::time::timeout(timeout, async { - let mut pending_check_interval = tokio::time::interval(tokio::time::Duration::from_secs(5)); - pending_check_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - - loop { - tokio::select! { - // Check for task completion notifications - recv_result = rx.recv() => { - match recv_result { - Ok(notification) => { - if notification.task_id == task_id { - return Some(notification); - } - } - Err(_) => { - // Channel closed or lagged - check DB directly - if let Ok(Some(t)) = repository::get_task(pool, task_id).await { - if t.status == "done" || t.status == "failed" || t.status == "merged" { - return Some(crate::server::state::TaskCompletionNotification { - task_id: t.id, - owner_id: Some(t.owner_id), - contract_id: t.contract_id, - parent_task_id: t.parent_task_id, - status: t.status, - output_summary: None, - worktree_path: None, - error_message: t.error_message, - }); - } - } - } - } - } - // Periodically try to start pending tasks - _ = pending_check_interval.tick() => { - if let Some(cid) = contract_id { - match try_start_pending_task(&state, cid, owner_id).await { - Ok(Some(started_task)) => { - tracing::debug!( - task_id = %started_task.id, - task_name = %started_task.name, - "Started pending task while waiting" - ); - } - Ok(None) => { - // No pending tasks or no capacity - that's fine - } - Err(e) => { - tracing::warn!(error = %e, "Error trying to start pending task"); - } - } - } - } - } - } - }).await; - - match result { - Ok(Some(notification)) => { - ( - StatusCode::OK, - Json(WaitResponse { - task_id, - status: notification.status, - completed: true, - output_summary: notification.output_summary, - }), - ).into_response() - } - Ok(None) | Err(_) => { - // Timeout - check final status - let final_status = repository::get_task(pool, task_id) - .await - .ok() - .flatten() - .map(|t| t.status) - .unwrap_or_else(|| "unknown".to_string()); - - ( - StatusCode::OK, - Json(WaitResponse { - task_id, - status: final_status.clone(), - completed: final_status == "done" || final_status == "failed" || final_status == "merged", - output_summary: None, - }), - ).into_response() - } - } -} - -// ============================================================================= -// Read Worktree File Handler -// ============================================================================= - -/// Read a file from a task's worktree. -#[utoipa::path( - post, - path = "/api/v1/mesh/supervisor/tasks/{task_id}/read-file", - params( - ("task_id" = Uuid, Path, description = "Task ID") - ), - request_body = ReadWorktreeFileRequest, - responses( - (status = 200, description = "File content", body = ReadFileResponse), - (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - not a supervisor"), - (status = 404, description = "Task not found"), - (status = 500, description = "Internal server error"), - ), - tag = "Mesh Supervisor" -)] -pub async fn read_worktree_file( - State(state): State<SharedState>, - Path(task_id): Path<Uuid>, - headers: HeaderMap, - Json(request): Json<ReadWorktreeFileRequest>, -) -> impl IntoResponse { - let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { - Ok(ids) => ids, - Err(e) => return e.into_response(), - }; - - let pool = state.db_pool.as_ref().unwrap(); - - // Get task to verify ownership - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Task not found")), - ).into_response(); - } - Err(e) => { - tracing::error!(error = %e, "Failed to get task"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to get task")), - ).into_response(); - } - }; - - // TODO: Implement file reading via worktree path - // For now, return not implemented - supervisor should use local file access via worktree - let _ = (task, request); - - ( - StatusCode::NOT_IMPLEMENTED, - Json(ApiError::new( - "NOT_IMPLEMENTED", - "Worktree file reading via API not yet implemented. Use local filesystem access via worktree path.", - )), - ).into_response() -} - -// ============================================================================= -// Checkpoint Handlers -// ============================================================================= - -/// Create a git checkpoint for a task. -#[utoipa::path( - post, - path = "/api/v1/mesh/tasks/{task_id}/checkpoint", - params( - ("task_id" = Uuid, Path, description = "Task ID") - ), - request_body = CreateCheckpointRequest, - responses( - (status = 202, description = "Checkpoint creation accepted", body = CheckpointResponse), - (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - can only create checkpoint for own task"), - (status = 404, description = "Task not found"), - (status = 500, description = "Internal server error"), - (status = 503, description = "Task has no assigned daemon"), - ), - tag = "Mesh Supervisor" -)] -pub async fn create_checkpoint( - State(state): State<SharedState>, - Path(task_id): Path<Uuid>, - headers: HeaderMap, - Json(request): Json<CreateCheckpointRequest>, -) -> impl IntoResponse { - let auth = extract_auth(&state, &headers); - - let task_id_from_auth = match auth { - AuthSource::ToolKey(tid) => tid, - _ => { - return ( - StatusCode::UNAUTHORIZED, - Json(ApiError::new("UNAUTHORIZED", "Tool key required")), - ).into_response(); - } - }; - - // Can only create checkpoint for own task - if task_id_from_auth != task_id { - return ( - StatusCode::FORBIDDEN, - Json(ApiError::new("FORBIDDEN", "Can only create checkpoint for own task")), - ).into_response(); - } - - let pool = state.db_pool.as_ref().unwrap(); - - // Get task and daemon_id - let task = match repository::get_task(pool, task_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Task not found")), - ).into_response(); - } - Err(e) => { - tracing::error!(error = %e, "Failed to get task"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to get task")), - ).into_response(); - } - }; - - let Some(daemon_id) = task.daemon_id else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")), - ).into_response(); - }; - - // Send CreateCheckpoint command to daemon - let cmd = DaemonCommand::CreateCheckpoint { - task_id, - message: request.message.clone(), - }; - - if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { - tracing::error!(error = %e, "Failed to send CreateCheckpoint command"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), - ).into_response(); - } - - // Return accepted - the checkpoint result will be delivered via WebSocket - // and stored in the database by the daemon message handler - ( - StatusCode::ACCEPTED, - Json(CheckpointResponse { - task_id, - checkpoint_number: 0, // Will be assigned by DB on actual creation - commit_sha: "pending".to_string(), - message: request.message, - }), - ).into_response() -} - -/// List checkpoints for a task. -#[utoipa::path( - get, - path = "/api/v1/mesh/tasks/{task_id}/checkpoints", - params( - ("task_id" = Uuid, Path, description = "Task ID") - ), - responses( - (status = 200, description = "List of checkpoints", body = CheckpointListResponse), - (status = 401, description = "Unauthorized"), - (status = 404, description = "Task not found"), - (status = 500, description = "Internal server error"), - ), - tag = "Mesh Supervisor" -)] -pub async fn list_checkpoints( - State(state): State<SharedState>, - Path(task_id): Path<Uuid>, - headers: HeaderMap, -) -> impl IntoResponse { - let auth = extract_auth(&state, &headers); - - let _task_id_from_auth = match auth { - AuthSource::ToolKey(tid) => tid, - _ => { - return ( - StatusCode::UNAUTHORIZED, - Json(ApiError::new("UNAUTHORIZED", "Tool key required")), - ).into_response(); - } - }; - - let pool = state.db_pool.as_ref().unwrap(); - - // Get checkpoints from DB - match repository::list_task_checkpoints(pool, task_id).await { - Ok(checkpoints) => { - let checkpoint_list: Vec<TaskCheckpoint> = checkpoints - .into_iter() - .map(|c| TaskCheckpoint { - id: c.id, - task_id: c.task_id, - checkpoint_number: c.checkpoint_number, - commit_sha: c.commit_sha, - branch_name: c.branch_name, - message: c.message, - files_changed: c.files_changed, - lines_added: c.lines_added.unwrap_or(0), - lines_removed: c.lines_removed.unwrap_or(0), - created_at: c.created_at, - }) - .collect(); - - ( - StatusCode::OK, - Json(CheckpointListResponse { - task_id, - checkpoints: checkpoint_list, - }), - ).into_response() - } - Err(e) => { - tracing::error!(error = %e, "Failed to list checkpoints"); - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to list checkpoints")), - ).into_response() - } - } -} - -// ============================================================================= -// Git Operations - Request/Response Types -// ============================================================================= - -/// Request to create a new branch. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] -pub struct CreateBranchRequest { - pub branch_name: String, - pub from_ref: Option<String>, +pub struct AskQuestionRequest { + pub question: String, + #[serde(default)] + pub choices: Vec<String>, + pub context: Option<String>, + #[serde(default = "default_question_timeout")] + pub timeout_seconds: i32, + /// When true the request blocks until the user responds (no + /// timeout) — the CLI reconnects via the poll endpoint if the + /// server-side timeout is reached. + #[serde(default)] + pub phaseguard: bool, + #[serde(default)] + pub multi_select: bool, + /// Return immediately without waiting for a response. + #[serde(default)] + pub non_blocking: bool, + /// Question type: general, phase_confirmation, contract_complete. + #[serde(default = "default_question_type")] + pub question_type: String, } -/// Response for branch creation. -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct CreateBranchResponse { - pub success: bool, - pub branch_name: String, - pub message: String, +fn default_question_type() -> String { + "general".to_string() } -/// Request to merge task changes. -#[derive(Debug, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct MergeTaskRequest { - pub target_branch: Option<String>, - #[serde(default)] - pub squash: bool, +fn default_question_timeout() -> i32 { + 3600 } -/// Response for merge operation. -#[derive(Debug, Serialize, ToSchema)] +#[derive(Debug, Serialize, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] -pub struct MergeTaskResponse { - pub task_id: Uuid, - pub success: bool, - pub message: String, - pub commit_sha: Option<String>, - pub conflicts: Option<Vec<String>>, +pub struct AskQuestionResponse { + pub question_id: Uuid, + pub response: Option<String>, + pub timed_out: bool, + /// Server-side timeout was reached but the question is still + /// pending. CLI should re-poll via `/poll`. + #[serde(default)] + pub still_pending: bool, } -/// Request to create a pull request. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] -pub struct CreatePRRequest { - pub branch: String, - pub title: String, - pub body: Option<String>, +pub struct AnswerQuestionRequest { + pub response: String, } -/// Response for PR creation. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] -pub struct CreatePRResponse { - pub task_id: Uuid, +pub struct AnswerQuestionResponse { pub success: bool, - pub message: String, - pub pr_url: Option<String>, - pub pr_number: Option<i32>, } -/// Response for task diff. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] -pub struct TaskDiffResponse { +pub struct PendingQuestionSummary { + pub question_id: Uuid, pub task_id: Uuid, - pub success: bool, - pub diff: Option<String>, - pub error: Option<String>, -} - -// ============================================================================= -// Git Operations - Handlers -// ============================================================================= - -/// Create a new branch from supervisor's worktree. -#[utoipa::path( - post, - path = "/api/v1/mesh/supervisor/branches", - request_body = CreateBranchRequest, - responses( - (status = 201, description = "Branch created", body = CreateBranchResponse), - (status = 400, description = "Invalid request"), - (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - not a supervisor"), - (status = 500, description = "Internal server error"), - ), - tag = "Mesh Supervisor" -)] -pub async fn create_branch( - State(state): State<SharedState>, - headers: HeaderMap, - Json(request): Json<CreateBranchRequest>, -) -> impl IntoResponse { - let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { - Ok(ids) => ids, - Err(e) => return e.into_response(), - }; - - // Find daemon running supervisor - let daemon_id = { - let pool = state.db_pool.as_ref().unwrap(); - match repository::get_task(pool, supervisor_id).await { - Ok(Some(task)) => task.daemon_id, - _ => None, - } - }; - - let Some(daemon_id) = daemon_id else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(ApiError::new("NO_DAEMON", "Supervisor has no assigned daemon")), - ).into_response(); - }; - - // Send CreateBranch command to daemon - let cmd = DaemonCommand::CreateBranch { - task_id: supervisor_id, - branch_name: request.branch_name.clone(), - from_ref: request.from_ref, - }; - - if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { - tracing::error!(error = %e, "Failed to send CreateBranch command"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), - ).into_response(); - } - - // Note: Real implementation would wait for daemon response - // For now, return success immediately - daemon will send response via WebSocket - ( - StatusCode::CREATED, - Json(CreateBranchResponse { - success: true, - branch_name: request.branch_name, - message: "Branch creation command sent".to_string(), - }), - ).into_response() -} - -/// Merge a task's changes to a target branch. -#[utoipa::path( - post, - path = "/api/v1/mesh/supervisor/tasks/{task_id}/merge", - params( - ("task_id" = Uuid, Path, description = "Task ID to merge") - ), - request_body = MergeTaskRequest, - responses( - (status = 200, description = "Merge initiated", body = MergeTaskResponse), - (status = 400, description = "Invalid request"), - (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - not a supervisor"), - (status = 404, description = "Task not found"), - (status = 500, description = "Internal server error"), - ), - tag = "Mesh Supervisor" -)] -pub async fn merge_task( - State(state): State<SharedState>, - Path(task_id): Path<Uuid>, - headers: HeaderMap, - Json(request): Json<MergeTaskRequest>, -) -> impl IntoResponse { - let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { - Ok(ids) => ids, - Err(e) => return e.into_response(), - }; - - let pool = state.db_pool.as_ref().unwrap(); - - // Get the target task - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Task not found")), - ).into_response(); - } - Err(e) => { - tracing::error!(error = %e, "Failed to get task"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to get task")), - ).into_response(); - } - }; - - // Get daemon running the task - let Some(daemon_id) = task.daemon_id else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")), - ).into_response(); - }; - - // Subscribe to merge results BEFORE sending the command - let mut rx = state.merge_results.subscribe(); - - // Send MergeTaskToTarget command to daemon - let cmd = DaemonCommand::MergeTaskToTarget { - task_id, - target_branch: request.target_branch, - squash: request.squash, - }; - - if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { - tracing::error!(error = %e, "Failed to send MergeTaskToTarget command"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), - ).into_response(); - } - - // Wait for the merge result with a timeout (60 seconds should be plenty for a merge) - let timeout = tokio::time::Duration::from_secs(60); - let result = tokio::time::timeout(timeout, async { - loop { - match rx.recv().await { - Ok(notification) => { - if notification.task_id == task_id { - return Some(notification); - } - // Not our task, keep waiting - } - Err(_) => { - // Channel closed or lagged - return None; - } - } - } - }).await; - - match result { - Ok(Some(notification)) => { - ( - StatusCode::OK, - Json(MergeTaskResponse { - task_id, - success: notification.success, - message: notification.message, - commit_sha: notification.commit_sha, - conflicts: notification.conflicts, - }), - ).into_response() - } - Ok(None) | Err(_) => { - // Timeout or channel error - return error status - ( - StatusCode::GATEWAY_TIMEOUT, - Json(MergeTaskResponse { - task_id, - success: false, - message: "Merge operation timed out waiting for daemon response".to_string(), - commit_sha: None, - conflicts: None, - }), - ).into_response() - } - } -} - -/// Create a pull request for a task's changes. -#[utoipa::path( - post, - path = "/api/v1/mesh/supervisor/pr", - request_body = CreatePRRequest, - responses( - (status = 201, description = "PR created", body = CreatePRResponse), - (status = 400, description = "Invalid request"), - (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - not a supervisor"), - (status = 404, description = "Task not found"), - (status = 500, description = "Internal server error"), - ), - tag = "Mesh Supervisor" -)] -pub async fn create_pr( - State(state): State<SharedState>, - headers: HeaderMap, - Json(request): Json<CreatePRRequest>, -) -> impl IntoResponse { - let (supervisor_id, _owner_id) = match verify_supervisor_auth(&state, &headers, None).await { - Ok(ids) => ids, - Err(e) => return e.into_response(), - }; - - let pool = state.db_pool.as_ref().unwrap(); - - // Get the supervisor's own task to find daemon and base_branch - let task = match repository::get_task(pool, supervisor_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Supervisor task not found")), - ).into_response(); - } - Err(e) => { - tracing::error!(error = %e, "Failed to get supervisor task"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to get supervisor task")), - ).into_response(); - } - }; - - // Get daemon running the supervisor - let Some(daemon_id) = task.daemon_id else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(ApiError::new("NO_DAEMON", "Supervisor has no assigned daemon")), - ).into_response(); - }; - - // Subscribe to PR results BEFORE sending the command - let mut rx = state.pr_results.subscribe(); - - // Send CreatePR command to daemon using the supervisor's task ID - // (the branch is in the supervisor's worktree) - // Pass base_branch from task if available, otherwise daemon will auto-detect - let cmd = DaemonCommand::CreatePR { - task_id: supervisor_id, - title: request.title.clone(), - body: request.body.clone(), - base_branch: task.base_branch.clone(), - branch: request.branch.clone(), - }; - - if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { - tracing::error!(error = %e, "Failed to send CreatePR command"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), - ).into_response(); - } - - // Wait for the PR result with a timeout (60 seconds should be plenty for PR creation) - let timeout = tokio::time::Duration::from_secs(60); - let result = tokio::time::timeout(timeout, async { - loop { - match rx.recv().await { - Ok(notification) => { - if notification.task_id == supervisor_id { - return Some(notification); - } - // Not our task, keep waiting - } - Err(_) => { - // Channel closed or lagged - return None; - } - } - } - }).await; - - match result { - Ok(Some(notification)) => { - let status = if notification.success { - StatusCode::CREATED - } else { - StatusCode::INTERNAL_SERVER_ERROR - }; - ( - status, - Json(CreatePRResponse { - task_id: supervisor_id, - success: notification.success, - message: notification.message, - pr_url: notification.pr_url, - pr_number: notification.pr_number, - }), - ).into_response() - } - Ok(None) | Err(_) => { - // Timeout or channel error - return error status - ( - StatusCode::GATEWAY_TIMEOUT, - Json(CreatePRResponse { - task_id: supervisor_id, - success: false, - message: "PR creation timed out waiting for daemon response".to_string(), - pr_url: None, - pr_number: None, - }), - ).into_response() - } - } -} - -/// Get the diff for a task's changes. -#[utoipa::path( - get, - path = "/api/v1/mesh/supervisor/tasks/{task_id}/diff", - params( - ("task_id" = Uuid, Path, description = "Task ID") - ), - responses( - (status = 200, description = "Task diff", body = TaskDiffResponse), - (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - not a supervisor"), - (status = 404, description = "Task not found"), - (status = 500, description = "Internal server error"), - ), - tag = "Mesh Supervisor" -)] -pub async fn get_task_diff( - State(state): State<SharedState>, - Path(task_id): Path<Uuid>, - headers: HeaderMap, -) -> impl IntoResponse { - let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { - Ok(ids) => ids, - Err(e) => return e.into_response(), - }; - - let pool = state.db_pool.as_ref().unwrap(); - - // Get the target task - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Task not found")), - ).into_response(); - } - Err(e) => { - tracing::error!(error = %e, "Failed to get task"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to get task")), - ).into_response(); - } - }; - - // Get daemon running the task - let Some(daemon_id) = task.daemon_id else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")), - ).into_response(); - }; - - // Send GetTaskDiff command to daemon - let cmd = DaemonCommand::GetTaskDiff { task_id }; - - if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { - tracing::error!(error = %e, "Failed to send GetTaskDiff command"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), - ).into_response(); - } - - ( - StatusCode::OK, - Json(TaskDiffResponse { - task_id, - success: true, - diff: None, - error: Some("Diff command sent - response will be streamed".to_string()), - }), - ).into_response() + #[serde(skip_serializing_if = "Option::is_none")] + pub directive_id: Option<Uuid>, + pub question: String, + pub choices: Vec<String>, + pub context: Option<String>, + pub created_at: chrono::DateTime<chrono::Utc>, + #[serde(default)] + pub multi_select: bool, + #[serde(default)] + pub question_type: String, } // ============================================================================= -// Supervisor Question Handlers +// Question handlers // ============================================================================= -/// Ask a question and wait for user feedback. -/// -/// The supervisor calls this to ask a question. The endpoint will poll until -/// either the user responds or the timeout is reached. +/// Ask the user a question from a directive task. Blocks until the user +/// answers, the timeout fires, or `non_blocking` returns immediately. #[utoipa::path( post, path = "/api/v1/mesh/supervisor/questions", request_body = AskQuestionRequest, responses( - (status = 200, description = "Question answered", body = AskQuestionResponse), - (status = 408, description = "Question timed out", body = AskQuestionResponse), + (status = 200, description = "Question asked", body = AskQuestionResponse), (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - not a supervisor"), - (status = 500, description = "Internal server error"), - ), - security( - ("tool_key" = []) + (status = 403, description = "Not a directive task"), ), + security(("tool_key" = [])), tag = "Mesh Supervisor" )] pub async fn ask_question( @@ -1676,67 +184,49 @@ pub async fn ask_question( headers: HeaderMap, Json(request): Json<AskQuestionRequest>, ) -> impl IntoResponse { - let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { + let (task_id, owner_id) = match verify_task_auth(&state, &headers).await { Ok(ids) => ids, Err(e) => return e.into_response(), }; let pool = state.db_pool.as_ref().unwrap(); - // Get the supervisor task to find its contract - let supervisor = match repository::get_task_for_owner(pool, supervisor_id, owner_id).await { + // Pull the directive_id off the calling task so subscribers can + // route the question to the right directive view. + let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Supervisor task not found")), - ).into_response(); + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); } Err(e) => { - tracing::error!(error = %e, "Failed to get supervisor task"); + tracing::error!(error = %e, "Failed to fetch task"); return ( StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to get supervisor task")), - ).into_response(); + Json(ApiError::new("DB_ERROR", "Failed to fetch task")), + ) + .into_response(); } }; - // Determine context: contract or directive - let contract_id = supervisor.contract_id; - let directive_id = supervisor.directive_id; + let directive_id = task.directive_id; - if contract_id.is_none() && directive_id.is_none() { - return ( - StatusCode::BAD_REQUEST, - Json(ApiError::new("NO_CONTEXT", "Supervisor has no associated contract or directive")), - ).into_response(); - } - - let is_directive_context = directive_id.is_some() && contract_id.is_none(); - - // For directive context, check reconcile_mode to determine behavior - let directive_reconcile_mode: String = if let Some(did) = directive_id { - if is_directive_context { - match repository::get_directive_for_owner(pool, owner_id, did).await { - Ok(Some(d)) => d.reconcile_mode.clone(), - Ok(None) => "auto".to_string(), - Err(e) => { - tracing::warn!(error = %e, "Failed to get directive for reconcile_mode check"); - "auto".to_string() - } - } - } else { - "auto".to_string() - } - } else { - "auto".to_string() + // Reconcile mode controls block-vs-timeout behaviour on directive + // tasks: semi-auto / manual block indefinitely (effectively + // phaseguard); auto times out after 30s. + let reconcile_mode: String = match directive_id { + Some(did) => match repository::get_directive_for_owner(pool, owner_id, did).await { + Ok(Some(d)) => d.reconcile_mode.clone(), + _ => "auto".to_string(), + }, + None => "auto".to_string(), }; - // Add the question (use Uuid::nil() for contract_id in directive-only context) - let effective_contract_id = contract_id.unwrap_or(Uuid::nil()); - let question_id = state.add_supervisor_question_with_directive( - supervisor_id, - effective_contract_id, + let question_id = state.add_supervisor_question( + task_id, directive_id, owner_id, request.question.clone(), @@ -1746,60 +236,6 @@ pub async fn ask_question( request.question_type.clone(), ); - // Save state: question asked is a key save point (Task 3.3) - // Only for contract context — directive tasks don't use supervisor_states table - if let Some(cid) = contract_id { - let pending_question = PendingQuestion { - id: question_id, - question: request.question.clone(), - choices: request.choices.clone(), - context: request.context.clone(), - question_type: request.question_type.clone(), - asked_at: chrono::Utc::now(), - }; - save_state_on_question_asked(pool, cid, pending_question).await; - } - - // Broadcast question as task output entry for the task's chat - let question_data = serde_json::json!({ - "question_id": question_id.to_string(), - "choices": request.choices, - "context": request.context, - "multi_select": request.multi_select, - "question_type": request.question_type, - }); - state.broadcast_task_output(TaskOutputNotification { - task_id: supervisor_id, - owner_id: Some(owner_id), - message_type: "supervisor_question".to_string(), - content: request.question.clone(), - tool_name: None, - tool_input: Some(question_data.clone()), - is_error: None, - cost_usd: None, - duration_ms: None, - is_partial: false, - }); - - // Persist to database so it appears when reloading the page - // Use event_type "output" with messageType "supervisor_question" to match TaskOutputEntry format - if let Some(pool) = state.db_pool.as_ref() { - let event_data = serde_json::json!({ - "messageType": "supervisor_question", - "content": request.question, - "toolInput": question_data, - }); - let _ = repository::create_task_event( - pool, - supervisor_id, - "output", - None, - None, - Some(event_data), - ).await; - } - - // If non_blocking mode, return immediately if request.non_blocking { return ( StatusCode::OK, @@ -1809,41 +245,28 @@ pub async fn ask_question( timed_out: false, still_pending: false, }), - ).into_response(); + ) + .into_response(); } - // Determine if we should block indefinitely (phaseguard or directive reconcile mode) - let use_phaseguard = request.phaseguard || (is_directive_context && (directive_reconcile_mode == "semi-auto" || directive_reconcile_mode == "manual")); - - // Poll for response with timeout - // - Phaseguard: block indefinitely until user responds - // - Directive tasks without reconcile mode: 30s default timeout - // - Contract tasks: use requested timeout_seconds + // Determine block behaviour. + let use_phaseguard = + request.phaseguard || reconcile_mode == "semi-auto" || reconcile_mode == "manual"; let timeout_secs = if use_phaseguard { - // Cap at 5 minutes per HTTP request (well under Claude Code's 10-min limit). - // The CLI will automatically reconnect via the poll endpoint. 300 - } else if is_directive_context && directive_reconcile_mode == "auto" { + } else if reconcile_mode == "auto" { 30 } else { request.timeout_seconds.max(1) as u64 }; + let timeout_duration = std::time::Duration::from_secs(timeout_secs); let start = std::time::Instant::now(); let poll_interval = std::time::Duration::from_millis(500); loop { - // Check if response has been submitted if let Some(response) = state.get_question_response(question_id) { - // Clean up the response state.cleanup_question_response(question_id); - - // Clear pending question from supervisor state (Task 3.3) - // Skip for directive context — no supervisor_states for directives - if let Some(cid) = contract_id { - clear_pending_question(pool, cid, question_id).await; - } - return ( StatusCode::OK, Json(AskQuestionResponse { @@ -1852,14 +275,12 @@ pub async fn ask_question( timed_out: false, still_pending: false, }), - ).into_response(); + ) + .into_response(); } - // Check timeout if start.elapsed() >= timeout_duration { if use_phaseguard { - // Phaseguard/reconcile: DON'T remove the pending question. - // Return still_pending so the CLI can reconnect via the poll endpoint. return ( StatusCode::OK, Json(AskQuestionResponse { @@ -1868,18 +289,10 @@ pub async fn ask_question( timed_out: false, still_pending: true, }), - ).into_response(); + ) + .into_response(); } - - // Non-phaseguard: remove the pending question on timeout state.remove_pending_question(question_id); - - // Clear pending question from supervisor state on timeout (Task 3.3) - // Skip for directive context — no supervisor_states for directives - if let Some(cid) = contract_id { - clear_pending_question(pool, cid, question_id).await; - } - return ( StatusCode::REQUEST_TIMEOUT, Json(AskQuestionResponse { @@ -1888,34 +301,25 @@ pub async fn ask_question( timed_out: true, still_pending: false, }), - ).into_response(); + ) + .into_response(); } - // Wait before polling again tokio::time::sleep(poll_interval).await; } } -/// Poll for a question response by question_id. -/// -/// Used by the CLI to reconnect after a still_pending response from ask_question. -/// Blocks for up to 5 minutes polling every 500ms. Returns still_pending if timeout -/// is reached without a response, allowing the CLI to reconnect again. +/// Re-poll a question by id. Used by the CLI to reconnect after +/// `still_pending` from `ask_question`. Blocks up to 5 minutes. #[utoipa::path( get, path = "/api/v1/mesh/supervisor/questions/{question_id}/poll", - params( - ("question_id" = Uuid, Path, description = "The question ID to poll for"), - ), + params(("question_id" = Uuid, Path, description = "Question id")), responses( - (status = 200, description = "Question answered or still pending", body = AskQuestionResponse), - (status = 404, description = "Question not found"), - (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - not a supervisor"), - ), - security( - ("tool_key" = []) + (status = 200, description = "Answered or still pending", body = AskQuestionResponse), + (status = 404, description = "Not found"), ), + security(("tool_key" = [])), tag = "Mesh Supervisor" )] pub async fn poll_question( @@ -1923,23 +327,16 @@ pub async fn poll_question( headers: HeaderMap, Path(question_id): Path<Uuid>, ) -> impl IntoResponse { - let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { - Ok(ids) => ids, - Err(e) => return e.into_response(), - }; + if verify_task_auth(&state, &headers).await.is_err() { + return ( + StatusCode::UNAUTHORIZED, + Json(ApiError::new("UNAUTHORIZED", "Tool key required")), + ) + .into_response(); + } - // Check if a response is already available if let Some(response) = state.get_question_response(question_id) { state.cleanup_question_response(question_id); - - // Clear pending question from supervisor state for contract context - let pool = state.db_pool.as_ref().unwrap(); - if let Ok(Some(task)) = repository::get_task_for_owner(pool, supervisor_id, owner_id).await { - if let Some(cid) = task.contract_id { - clear_pending_question(pool, cid, question_id).await; - } - } - return ( StatusCode::OK, Json(AskQuestionResponse { @@ -1948,35 +345,25 @@ pub async fn poll_question( timed_out: false, still_pending: false, }), - ).into_response(); + ) + .into_response(); } - // Check if the question exists at all (pending or response) - if !state.has_pending_question(question_id) { + if state.get_pending_question(question_id).is_none() { return ( StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Question not found or already answered")), - ).into_response(); + Json(ApiError::new("NOT_FOUND", "Question not found")), + ) + .into_response(); } - // Block for up to 5 minutes polling every 500ms - let timeout_duration = std::time::Duration::from_secs(300); + let timeout = std::time::Duration::from_secs(300); let start = std::time::Instant::now(); let poll_interval = std::time::Duration::from_millis(500); loop { - // Check if response has been submitted if let Some(response) = state.get_question_response(question_id) { state.cleanup_question_response(question_id); - - // Clear pending question from supervisor state for contract context - let pool = state.db_pool.as_ref().unwrap(); - if let Ok(Some(task)) = repository::get_task_for_owner(pool, supervisor_id, owner_id).await { - if let Some(cid) = task.contract_id { - clear_pending_question(pool, cid, question_id).await; - } - } - return ( StatusCode::OK, Json(AskQuestionResponse { @@ -1985,19 +372,10 @@ pub async fn poll_question( timed_out: false, still_pending: false, }), - ).into_response(); - } - - // Check if the question was removed (e.g., task deleted) - if !state.has_pending_question(question_id) { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Question no longer exists")), - ).into_response(); + ) + .into_response(); } - - // Check timeout - if start.elapsed() >= timeout_duration { + if start.elapsed() >= timeout { return ( StatusCode::OK, Json(AskQuestionResponse { @@ -2006,27 +384,21 @@ pub async fn poll_question( timed_out: false, still_pending: true, }), - ).into_response(); + ) + .into_response(); } - - // Wait before polling again tokio::time::sleep(poll_interval).await; } } -/// Get all pending questions for the current user. +/// List currently-pending questions for the caller. #[utoipa::path( get, path = "/api/v1/mesh/questions", responses( - (status = 200, description = "List of pending questions", body = Vec<PendingQuestionSummary>), - (status = 401, description = "Unauthorized"), - (status = 500, description = "Internal server error"), - ), - security( - ("bearer_auth" = []), - ("api_key" = []) + (status = 200, description = "Pending questions", body = Vec<PendingQuestionSummary>), ), + security(("bearer_auth" = []), ("api_key" = [])), tag = "Mesh" )] pub async fn list_pending_questions( @@ -2039,7 +411,6 @@ pub async fn list_pending_questions( .map(|q| PendingQuestionSummary { question_id: q.question_id, task_id: q.task_id, - contract_id: q.contract_id, directive_id: q.directive_id, question: q.question, choices: q.choices, @@ -2053,1051 +424,59 @@ pub async fn list_pending_questions( Json(questions).into_response() } -/// Answer a pending supervisor question. +/// Answer a pending question. #[utoipa::path( post, path = "/api/v1/mesh/questions/{question_id}/answer", - params( - ("question_id" = Uuid, Path, description = "Question ID") - ), + params(("question_id" = Uuid, Path, description = "Question id")), request_body = AnswerQuestionRequest, responses( - (status = 200, description = "Question answered", body = AnswerQuestionResponse), - (status = 401, description = "Unauthorized"), - (status = 404, description = "Question not found"), - (status = 500, description = "Internal server error"), - ), - security( - ("bearer_auth" = []), - ("api_key" = []) + (status = 200, description = "Answered", body = AnswerQuestionResponse), + (status = 404, description = "Not found"), ), + security(("bearer_auth" = []), ("api_key" = [])), tag = "Mesh" )] pub async fn answer_question( State(state): State<SharedState>, Authenticated(auth): Authenticated, Path(question_id): Path<Uuid>, - Json(request): Json<AnswerQuestionRequest>, + Json(req): Json<AnswerQuestionRequest>, ) -> impl IntoResponse { - // Verify the question exists and belongs to this owner + // Ownership check: only the owner of the question can answer it. let question = match state.get_pending_question(question_id) { - Some(q) if q.owner_id == auth.owner_id => q, - Some(_) => { - return ( - StatusCode::FORBIDDEN, - Json(ApiError::new("FORBIDDEN", "Question belongs to another user")), - ).into_response(); - } - None => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Question not found or already answered")), - ).into_response(); - } - }; - - // Submit the response - let success = state.submit_question_response(question_id, request.response.clone()); - - if success { - tracing::info!( - question_id = %question_id, - task_id = %question.task_id, - "User answered supervisor question" - ); - - // Send the response to the task as a message - // This will auto-resume the task if it was paused (phaseguard) - let pool = state.db_pool.as_ref().unwrap(); - if let Ok(Some(task)) = repository::get_task_for_owner(pool, question.task_id, auth.owner_id).await { - if let Some(daemon_id) = task.daemon_id { - // Format the response message - let response_msg = format!( - "\n[User Response to Question]\nQuestion: {}\nAnswer: {}\n", - question.question, - request.response - ); - let cmd = DaemonCommand::SendMessage { - task_id: question.task_id, - message: response_msg, - }; - if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { - tracing::warn!( - task_id = %question.task_id, - error = %e, - "Failed to send response message to task" - ); - } else { - tracing::info!( - task_id = %question.task_id, - "Sent response message to task (will auto-resume if paused)" - ); - } - } - } - } - - Json(AnswerQuestionResponse { success }).into_response() -} - -// ============================================================================= -// Supervisor Resume and Conversation Rewind -// ============================================================================= - -/// Response for supervisor resume -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ResumeSupervisorResponse { - pub supervisor_task_id: Uuid, - pub daemon_id: Option<Uuid>, - pub resumed_from: ResumedFromInfo, - pub status: String, - /// Restoration context (Task 3.4) - pub restoration: Option<RestorationInfo>, -} - -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ResumedFromInfo { - pub phase: String, - pub last_activity: chrono::DateTime<chrono::Utc>, - pub message_count: i32, -} - -/// Information about supervisor restoration (Task 3.4) -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct RestorationInfo { - /// Previous state before restoration - pub previous_state: String, - /// How many times this supervisor has been restored - pub restoration_count: i32, - /// Number of pending questions to re-deliver - pub pending_questions_count: usize, - /// Number of tasks being waited on - pub waiting_tasks_count: usize, - /// Number of tasks spawned before crash - pub spawned_tasks_count: usize, - /// Any warnings from state validation - pub warnings: Vec<String>, -} - -/// Resume interrupted supervisor with specified mode. -/// -/// POST /api/v1/contracts/{id}/supervisor/resume -#[utoipa::path( - post, - path = "/api/v1/contracts/{id}/supervisor/resume", - params( - ("id" = Uuid, Path, description = "Contract ID") - ), - request_body = crate::db::models::ResumeSupervisorRequest, - responses( - (status = 200, description = "Supervisor resumed", body = ResumeSupervisorResponse), - (status = 400, description = "Invalid request", body = ApiError), - (status = 401, description = "Unauthorized", body = ApiError), - (status = 404, description = "Contract or supervisor not found", body = ApiError), - (status = 409, description = "Supervisor is already running", body = ApiError), - (status = 503, description = "Database not configured", body = ApiError), - (status = 500, description = "Internal server error", body = ApiError), - ), - security( - ("bearer_auth" = []), - ("api_key" = []) - ), - tag = "Mesh Supervisor" -)] -pub async fn resume_supervisor( - State(state): State<SharedState>, - Path(contract_id): Path<Uuid>, - auth: crate::server::auth::Authenticated, - Json(req): Json<crate::db::models::ResumeSupervisorRequest>, -) -> impl IntoResponse { - let crate::server::auth::Authenticated(auth_info) = auth; - - let Some(ref pool) = state.db_pool else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), - ) - .into_response(); - }; - - // Get contract and verify ownership - let contract = match repository::get_contract_for_owner(pool, contract_id, auth_info.owner_id).await { - Ok(Some(c)) => c, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Contract not found")), - ) - .into_response(); - } - Err(e) => { - tracing::error!("Failed to get contract {}: {}", contract_id, e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", e.to_string())), - ) - .into_response(); - } - }; - - // Get existing supervisor state - let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await { - Ok(Some(s)) => s, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new( - "NO_SUPERVISOR_STATE", - "No supervisor state found - supervisor may not have been started", - )), - ) - .into_response(); - } - Err(e) => { - tracing::error!("Failed to get supervisor state: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", e.to_string())), - ) - .into_response(); - } - }; - - // Get supervisor task - let supervisor_task = match repository::get_task_for_owner(pool, supervisor_state.task_id, auth_info.owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Supervisor task not found")), - ) - .into_response(); - } - Err(e) => { - tracing::error!("Failed to get supervisor task: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", e.to_string())), - ) - .into_response(); - } - }; - - // Check if already running - but only if daemon is actually connected - // (daemon disconnect handler may not have updated status yet) - if supervisor_task.status == "running" { - let daemon_connected = supervisor_task - .daemon_id - .map(|d| state.is_daemon_connected(d)) - .unwrap_or(false); - - if daemon_connected { - return ( - StatusCode::CONFLICT, - Json(ApiError::new("ALREADY_RUNNING", "Supervisor is already running")), - ) - .into_response(); - } - // Daemon not connected - allow resume (treat as interrupted) - tracing::info!( - supervisor_task_id = %supervisor_task.id, - daemon_id = ?supervisor_task.daemon_id, - "Supervisor status is 'running' but daemon is not connected, allowing resume" - ); - } - - // Calculate message count from conversation history - let message_count = supervisor_state - .conversation_history - .as_array() - .map(|arr| arr.len() as i32) - .unwrap_or(0); - - // Find a connected daemon for this owner - let target_daemon_id = match state.find_alternative_daemon(auth_info.owner_id, &[]) { - Some(id) => id, + Some(q) => q, None => { return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(ApiError::new( - "NO_DAEMON", - "No daemons connected for your account. Cannot resume supervisor.", - )), - ) - .into_response(); - } - }; - - // Track response values (may be updated by resume modes) - let mut response_daemon_id = supervisor_task.daemon_id; - let mut response_status = "pending".to_string(); - - // Based on resume mode, handle differently - match req.resume_mode.as_str() { - "continue" => { - // Update task status to starting and assign daemon - if let Err(e) = sqlx::query("UPDATE tasks SET status = 'starting', daemon_id = $1 WHERE id = $2") - .bind(target_daemon_id) - .bind(supervisor_state.task_id) - .execute(pool) - .await - { - tracing::error!("Failed to update task for resume: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", e.to_string())), - ) - .into_response(); - } - - // Fetch latest checkpoint patch for worktree recovery - let (patch_data, patch_base_sha) = match repository::get_latest_checkpoint_patch(pool, supervisor_state.task_id).await { - Ok(Some(patch)) => { - tracing::info!( - task_id = %supervisor_state.task_id, - patch_size = patch.patch_size_bytes, - base_sha = %patch.base_commit_sha, - "Including checkpoint patch for worktree recovery" - ); - // Encode patch as base64 for JSON transport - let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data); - (Some(encoded), Some(patch.base_commit_sha)) - } - Ok(None) => { - tracing::debug!(task_id = %supervisor_state.task_id, "No checkpoint patch found"); - (None, None) - } - Err(e) => { - tracing::warn!(task_id = %supervisor_state.task_id, error = %e, "Failed to fetch checkpoint patch"); - (None, None) - } - }; - - // Send SpawnTask with resume_session=true to use Claude's --continue - // Include conversation_history as fallback if worktree doesn't exist on target daemon - let command = DaemonCommand::SpawnTask { - task_id: supervisor_state.task_id, - task_name: supervisor_task.name.clone(), - plan: supervisor_task.plan.clone(), - repo_url: supervisor_task.repository_url.clone(), - base_branch: supervisor_task.base_branch.clone(), - target_branch: supervisor_task.target_branch.clone(), - parent_task_id: supervisor_task.parent_task_id, - depth: supervisor_task.depth, - is_orchestrator: false, - target_repo_path: supervisor_task.target_repo_path.clone(), - completion_action: supervisor_task.completion_action.clone(), - continue_from_task_id: supervisor_task.continue_from_task_id, - copy_files: supervisor_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), - contract_id: supervisor_task.contract_id, - is_supervisor: true, - autonomous_loop: false, - resume_session: true, // Use --continue to preserve conversation - conversation_history: Some(supervisor_state.conversation_history.clone()), // Fallback if worktree missing - patch_data, - patch_base_sha, - local_only: contract.local_only, - auto_merge_local: contract.auto_merge_local, - supervisor_worktree_task_id: None, // Supervisor uses its own worktree - directive_id: supervisor_task.directive_id, - }; - - if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { - // Rollback status on failure - let _ = sqlx::query("UPDATE tasks SET status = 'interrupted', daemon_id = NULL WHERE id = $1") - .bind(supervisor_state.task_id) - .execute(pool) - .await; - tracing::error!("Failed to send SpawnTask to daemon: {}", e); - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(ApiError::new("DAEMON_ERROR", format!("Failed to send to daemon: {}", e))), - ) - .into_response(); - } - - tracing::info!( - contract_id = %contract_id, - supervisor_task_id = %supervisor_state.task_id, - daemon_id = %target_daemon_id, - message_count = message_count, - "Supervisor resumed with --continue (resume_session=true)" - ); - - // Update response values for successful resume - response_daemon_id = Some(target_daemon_id); - response_status = "starting".to_string(); - } - "restart_phase" => { - // Clear conversation but keep phase progress - if let Err(e) = repository::update_supervisor_conversation( - pool, - contract_id, - serde_json::json!([]), - ) - .await - { - tracing::error!("Failed to clear conversation: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", e.to_string())), - ) - .into_response(); - } - - if let Err(e) = sqlx::query("UPDATE tasks SET status = 'pending' WHERE id = $1") - .bind(supervisor_state.task_id) - .execute(pool) - .await - { - tracing::error!("Failed to update task status: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", e.to_string())), - ) - .into_response(); - } - } - "from_checkpoint" => { - // This would require more complex handling with checkpoint system - return ( - StatusCode::BAD_REQUEST, - Json(ApiError::new( - "NOT_IMPLEMENTED", - "from_checkpoint mode not yet implemented", - )), - ) - .into_response(); - } - _ => { - return ( - StatusCode::BAD_REQUEST, - Json(ApiError::new( - "INVALID_RESUME_MODE", - "Invalid resume_mode. Use: continue, restart_phase, or from_checkpoint", - )), - ) - .into_response(); - } - } - - tracing::info!( - contract_id = %contract_id, - supervisor_task_id = %supervisor_state.task_id, - resume_mode = %req.resume_mode, - message_count = message_count, - "Supervisor resume requested" - ); - - // Build restoration info (Task 3.4) - let pending_questions: Vec<PendingQuestion> = serde_json::from_value( - supervisor_state.pending_questions.clone() - ).unwrap_or_default(); - - let restoration_info = RestorationInfo { - previous_state: supervisor_state.state.clone(), - restoration_count: supervisor_state.restoration_count, - pending_questions_count: pending_questions.len(), - waiting_tasks_count: supervisor_state.pending_task_ids.len(), - spawned_tasks_count: supervisor_state.spawned_task_ids.len(), - warnings: vec![], // Could add validation warnings here - }; - - // Re-deliver pending questions if any (Task 3.4) - if !pending_questions.is_empty() { - redeliver_pending_questions( - &state, - supervisor_state.task_id, - contract_id, - auth_info.owner_id, - &pending_questions, - ).await; - } - - Json(ResumeSupervisorResponse { - supervisor_task_id: supervisor_state.task_id, - daemon_id: response_daemon_id, - resumed_from: ResumedFromInfo { - phase: contract.phase, - last_activity: supervisor_state.last_activity, - message_count, - }, - status: response_status, - restoration: Some(restoration_info), - }) - .into_response() -} - -/// Response for conversation rewind -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct RewindConversationResponse { - pub contract_id: Uuid, - pub messages_removed: i32, - pub new_message_count: i32, - pub code_rewound: bool, -} - -/// Rewind supervisor conversation to specified point. -/// -/// POST /api/v1/contracts/{id}/supervisor/conversation/rewind -#[utoipa::path( - post, - path = "/api/v1/contracts/{id}/supervisor/conversation/rewind", - params( - ("id" = Uuid, Path, description = "Contract ID") - ), - request_body = crate::db::models::RewindConversationRequest, - responses( - (status = 200, description = "Conversation rewound", body = RewindConversationResponse), - (status = 400, description = "Invalid request", body = ApiError), - (status = 401, description = "Unauthorized", body = ApiError), - (status = 404, description = "Contract or supervisor not found", body = ApiError), - (status = 503, description = "Database not configured", body = ApiError), - (status = 500, description = "Internal server error", body = ApiError), - ), - security( - ("bearer_auth" = []), - ("api_key" = []) - ), - tag = "Mesh Supervisor" -)] -pub async fn rewind_conversation( - State(state): State<SharedState>, - Path(contract_id): Path<Uuid>, - auth: crate::server::auth::Authenticated, - Json(req): Json<crate::db::models::RewindConversationRequest>, -) -> impl IntoResponse { - let crate::server::auth::Authenticated(auth_info) = auth; - - let Some(ref pool) = state.db_pool else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), - ) - .into_response(); - }; - - // Get contract and verify ownership - let _contract = match repository::get_contract_for_owner(pool, contract_id, auth_info.owner_id).await { - Ok(Some(c)) => c, - Ok(None) => { - return ( StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Contract not found")), - ) - .into_response(); - } - Err(e) => { - tracing::error!("Failed to get contract {}: {}", contract_id, e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", e.to_string())), + Json(ApiError::new("NOT_FOUND", "Question not found")), ) .into_response(); } }; - - // Get supervisor state - let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await { - Ok(Some(s)) => s, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(ApiError::new("NOT_FOUND", "Supervisor state not found")), - ) - .into_response(); - } - Err(e) => { - tracing::error!("Failed to get supervisor state: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", e.to_string())), - ) - .into_response(); - } - }; - - let conversation = supervisor_state - .conversation_history - .as_array() - .cloned() - .unwrap_or_default(); - - let original_count = conversation.len() as i32; - - // Determine how many messages to keep - let new_count = if let Some(by_count) = req.by_message_count { - (original_count - by_count).max(0) - } else if let Some(ref to_id) = req.to_message_id { - // Find message by ID and keep up to and including it - let index = conversation - .iter() - .position(|msg| msg.get("id").and_then(|v| v.as_str()) == Some(to_id.as_str())) - .map(|i| i as i32) - .unwrap_or(original_count - 1); - (index + 1).min(original_count).max(0) - } else { - // Default to removing last message - (original_count - 1).max(0) - }; - - // Truncate conversation - let new_conversation: Vec<serde_json::Value> = conversation - .into_iter() - .take(new_count as usize) - .collect(); - - // Update the conversation - if let Err(e) = repository::update_supervisor_conversation( - pool, - contract_id, - serde_json::Value::Array(new_conversation), - ) - .await - { - tracing::error!("Failed to update conversation: {}", e); + if question.owner_id != auth.owner_id { return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", e.to_string())), + StatusCode::FORBIDDEN, + Json(ApiError::new("FORBIDDEN", "Not your question")), ) .into_response(); } - tracing::info!( - contract_id = %contract_id, - original_count = original_count, - new_count = new_count, - messages_removed = original_count - new_count, - "Conversation rewound" - ); - - Json(RewindConversationResponse { - contract_id, - messages_removed: original_count - new_count, - new_message_count: new_count, - code_rewound: req.rewind_code.unwrap_or(false), // TODO: implement code rewind - }) - .into_response() -} - -// ============================================================================= -// Supervisor State Persistence Helpers (Task 3.3) -// ============================================================================= - -use crate::db::models::{ - SupervisorRestorationContext, SupervisorStateEnum, - StateValidationResult, StateRecoveryAction, -}; - -/// Save supervisor state on task spawn. -/// This is called when a supervisor spawns a new task. -pub async fn save_state_on_task_spawn( - pool: &PgPool, - contract_id: Uuid, - spawned_task_id: Uuid, -) { - if let Err(e) = repository::add_supervisor_spawned_task(pool, contract_id, spawned_task_id).await { - tracing::warn!( - contract_id = %contract_id, - spawned_task_id = %spawned_task_id, - error = %e, - "Failed to save spawned task to supervisor state" - ); + if state.submit_question_response(question_id, req.response) { + Json(AnswerQuestionResponse { success: true }).into_response() } else { - tracing::debug!( - contract_id = %contract_id, - spawned_task_id = %spawned_task_id, - "Saved spawned task to supervisor state" - ); - } - - // Also update state to working - if let Err(e) = repository::update_supervisor_detailed_state( - pool, - contract_id, - "working", - Some(&format!("Spawned task {}", spawned_task_id)), - 0, // Progress resets when spawning new work - None, - ).await { - tracing::warn!(contract_id = %contract_id, error = %e, "Failed to update supervisor state on task spawn"); - } -} - -/// Save supervisor state on question asked. -/// This is called when a supervisor asks a question and is waiting for user input. -pub async fn save_state_on_question_asked( - pool: &PgPool, - contract_id: Uuid, - question: PendingQuestion, -) { - let question_json = match serde_json::to_value(&[&question]) { - Ok(v) => v, - Err(e) => { - tracing::warn!(contract_id = %contract_id, error = %e, "Failed to serialize pending question"); - return; - } - }; - - if let Err(e) = repository::add_supervisor_pending_question(pool, contract_id, question_json).await { - tracing::warn!( - contract_id = %contract_id, - question_id = %question.id, - error = %e, - "Failed to save pending question to supervisor state" - ); - } else { - tracing::debug!( - contract_id = %contract_id, - question_id = %question.id, - "Saved pending question to supervisor state" - ); - } -} - -/// Clear pending question after it's answered. -pub async fn clear_pending_question( - pool: &PgPool, - contract_id: Uuid, - question_id: Uuid, -) { - if let Err(e) = repository::remove_supervisor_pending_question(pool, contract_id, question_id).await { - tracing::warn!( - contract_id = %contract_id, - question_id = %question_id, - error = %e, - "Failed to remove pending question from supervisor state" - ); - } - - // Update state back to working (if no more pending questions) - match repository::get_supervisor_state(pool, contract_id).await { - Ok(Some(state)) => { - let questions: Vec<PendingQuestion> = serde_json::from_value(state.pending_questions.clone()) - .unwrap_or_default(); - if questions.is_empty() { - let _ = repository::update_supervisor_detailed_state( - pool, - contract_id, - "working", - Some("Resumed after user response"), - state.progress, - None, - ).await; - } - } - Ok(None) => {} - Err(e) => { - tracing::warn!(contract_id = %contract_id, error = %e, "Failed to check supervisor state after clearing question"); - } - } -} - -/// Save supervisor state on phase change. -pub async fn save_state_on_phase_change( - pool: &PgPool, - contract_id: Uuid, - new_phase: &str, -) { - if let Err(e) = repository::update_supervisor_phase(pool, contract_id, new_phase).await { - tracing::warn!( - contract_id = %contract_id, - new_phase = %new_phase, - error = %e, - "Failed to update supervisor state on phase change" - ); - } else { - tracing::info!( - contract_id = %contract_id, - new_phase = %new_phase, - "Updated supervisor state on phase change" - ); - } -} - -// ============================================================================= -// Supervisor Restoration Protocol (Task 3.4) -// ============================================================================= - -/// Validate supervisor state consistency before restoration. -/// Checks that spawned tasks and pending questions are in expected states. -pub async fn validate_supervisor_state( - pool: &PgPool, - state: &crate::db::models::SupervisorState, -) -> StateValidationResult { - let mut issues = Vec::new(); - - // Validate spawned tasks - if !state.spawned_task_ids.is_empty() { - match repository::validate_spawned_tasks(pool, &state.spawned_task_ids).await { - Ok(task_statuses) => { - for task_id in &state.spawned_task_ids { - if !task_statuses.contains_key(task_id) { - issues.push(format!("Spawned task {} not found in database", task_id)); - } - } - } - Err(e) => { - issues.push(format!("Failed to validate spawned tasks: {}", e)); - } - } - } - - // Validate pending questions - let pending_questions: Vec<PendingQuestion> = serde_json::from_value(state.pending_questions.clone()) - .unwrap_or_default(); - - // Check if questions are not too old (e.g., more than 24 hours) - for question in &pending_questions { - let age = chrono::Utc::now() - question.asked_at; - if age.num_hours() > 24 { - issues.push(format!( - "Pending question {} is {} hours old, may be stale", - question.id, age.num_hours() - )); - } - } - - // Validate conversation history - if let Some(history) = state.conversation_history.as_array() { - if history.is_empty() && state.restoration_count > 0 { - issues.push("Conversation history is empty after previous restoration".to_string()); - } - } - - // Determine recovery action - let recovery_action = if issues.is_empty() { - StateRecoveryAction::Proceed - } else if issues.iter().any(|i| i.contains("not found")) { - // Missing tasks suggest corruption - use checkpoint - StateRecoveryAction::UseCheckpoint - } else if issues.len() > 3 { - // Many issues suggest manual intervention needed - StateRecoveryAction::ManualIntervention - } else { - // Minor issues - proceed with warnings - StateRecoveryAction::Proceed - }; - - StateValidationResult { - is_valid: issues.is_empty(), - issues, - recovery_action, - } -} - -/// Restore supervisor from saved state after daemon crash or task reassignment. -/// Returns restoration context to send to the supervisor. -pub async fn restore_supervisor( - pool: &PgPool, - contract_id: Uuid, - restoration_source: &str, -) -> Result<SupervisorRestorationContext, String> { - // Step 1: Load supervisor state - let state = match repository::get_supervisor_state_for_restoration(pool, contract_id).await { - Ok(Some(s)) => s, - Ok(None) => { - tracing::warn!( - contract_id = %contract_id, - "No supervisor state found for restoration - starting fresh" - ); - return Ok(SupervisorRestorationContext { - success: true, - previous_state: SupervisorStateEnum::Initializing, - conversation_history: serde_json::json!([]), - pending_questions: vec![], - waiting_task_ids: vec![], - spawned_task_ids: vec![], - restoration_count: 0, - restoration_context_message: "No previous state found. Starting fresh.".to_string(), - warnings: vec!["No previous supervisor state found".to_string()], - }); - } - Err(e) => { - return Err(format!("Failed to load supervisor state: {}", e)); - } - }; - - // Step 2: Parse previous state - let previous_state: SupervisorStateEnum = state.state.parse().unwrap_or(SupervisorStateEnum::Interrupted); - - // Step 3: Validate state consistency - let validation = validate_supervisor_state(pool, &state).await; - let mut warnings = validation.issues.clone(); - - // Step 4: Handle based on validation result - let (conversation_history, pending_questions, restoration_message) = match validation.recovery_action { - StateRecoveryAction::Proceed => { - // State is valid, use it - let questions: Vec<PendingQuestion> = serde_json::from_value(state.pending_questions.clone()) - .unwrap_or_default(); - - let message = format!( - "Restored from {} state. {} pending questions, {} spawned tasks, {} waiting tasks.", - state.state, - questions.len(), - state.spawned_task_ids.len(), - state.pending_task_ids.len() - ); - - (state.conversation_history.clone(), questions, message) - } - StateRecoveryAction::UseCheckpoint => { - // State is corrupted, try to use checkpoint - warnings.push("State validation failed, attempting checkpoint recovery".to_string()); - - // TODO: Implement checkpoint-based recovery - // For now, start with empty questions but preserve conversation - let message = "Restored from last checkpoint due to state inconsistency.".to_string(); - (state.conversation_history.clone(), vec![], message) - } - StateRecoveryAction::StartFresh => { - warnings.push("Starting fresh due to unrecoverable state".to_string()); - let message = "Starting fresh due to unrecoverable state corruption.".to_string(); - (serde_json::json!([]), vec![], message) - } - StateRecoveryAction::ManualIntervention => { - warnings.push("Manual intervention may be required".to_string()); - // Still try to restore but with warning - let questions: Vec<PendingQuestion> = serde_json::from_value(state.pending_questions.clone()) - .unwrap_or_default(); - let message = "Restored with warnings - manual intervention may be required.".to_string(); - (state.conversation_history.clone(), questions, message) - } - }; - - // Step 5: Mark supervisor as restored - let new_state = match repository::mark_supervisor_restored(pool, contract_id, restoration_source).await { - Ok(s) => s, - Err(e) => { - return Err(format!("Failed to mark supervisor as restored: {}", e)); - } - }; - - // Step 6: Build restoration context - let context = SupervisorRestorationContext { - success: true, - previous_state, - conversation_history, - pending_questions, - waiting_task_ids: state.pending_task_ids.clone(), - spawned_task_ids: state.spawned_task_ids.clone(), - restoration_count: new_state.restoration_count, - restoration_context_message: restoration_message, - warnings, - }; - - tracing::info!( - contract_id = %contract_id, - restoration_source = %restoration_source, - restoration_count = new_state.restoration_count, - pending_questions_count = context.pending_questions.len(), - waiting_tasks_count = context.waiting_task_ids.len(), - spawned_tasks_count = context.spawned_task_ids.len(), - "Supervisor restoration completed" - ); - - Ok(context) -} - -/// Re-deliver pending questions to the user after restoration. -/// This ensures questions asked before crash are shown to the user again. -pub async fn redeliver_pending_questions( - state: &SharedState, - supervisor_id: Uuid, - contract_id: Uuid, - owner_id: Uuid, - questions: &[PendingQuestion], -) { - for question in questions { - // Add to in-memory question state - state.add_supervisor_question( - supervisor_id, - contract_id, - owner_id, - question.question.clone(), - question.choices.clone(), - question.context.clone(), - false, // Assume single select for restored questions - question.question_type.clone(), - ); - - // Broadcast to WebSocket clients - let question_data = serde_json::json!({ - "question_id": question.id.to_string(), - "choices": question.choices, - "context": question.context, - "question_type": question.question_type, - "is_restored": true, - "originally_asked_at": question.asked_at.to_rfc3339(), - }); - - state.broadcast_task_output(TaskOutputNotification { - task_id: supervisor_id, - owner_id: Some(owner_id), - message_type: "supervisor_question".to_string(), - content: question.question.clone(), - tool_name: None, - tool_input: Some(question_data), - is_error: None, - cost_usd: None, - duration_ms: None, - is_partial: false, - }); - - tracing::info!( - supervisor_id = %supervisor_id, - question_id = %question.id, - "Re-delivered pending question after restoration" - ); - } -} - -/// Generate restoration context message for Claude. -/// This message is injected into the conversation to inform Claude about the restoration. -pub fn generate_restoration_context_message(context: &SupervisorRestorationContext) -> String { - let mut message = String::new(); - - message.push_str("=== SUPERVISOR RESTORATION NOTICE ===\n\n"); - message.push_str(&format!("This supervisor has been restored after interruption. {}\n\n", context.restoration_context_message)); - message.push_str(&format!("Restoration count: {}\n", context.restoration_count)); - - if !context.pending_questions.is_empty() { - message.push_str(&format!("\nPending questions ({}): These have been re-delivered to the user.\n", context.pending_questions.len())); - for q in &context.pending_questions { - message.push_str(&format!(" - {}: {}\n", q.id, q.question)); - } - } - - if !context.waiting_task_ids.is_empty() { - message.push_str(&format!("\nWaiting on {} task(s) to complete. Check their status before continuing.\n", context.waiting_task_ids.len())); - } - - if !context.spawned_task_ids.is_empty() { - message.push_str(&format!("\n{} task(s) were spawned before interruption. Their status may need verification.\n", context.spawned_task_ids.len())); - } - - if !context.warnings.is_empty() { - message.push_str("\nWarnings:\n"); - for warning in &context.warnings { - message.push_str(&format!(" - {}\n", warning)); - } + ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Question not found")), + ) + .into_response() } - - message.push_str("\n=== END RESTORATION NOTICE ===\n"); - - message } // ============================================================================= -// Order Creation from Directive Tasks +// Order creation (from directive tasks) // ============================================================================= -/// Request to create an order from a directive task. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct CreateOrderForTaskRequest { @@ -3117,30 +496,25 @@ pub struct CreateOrderForTaskRequest { fn default_order_priority() -> String { "medium".to_string() } - fn default_order_type() -> String { "spike".to_string() } - fn default_order_labels() -> serde_json::Value { serde_json::json!([]) } -/// Create an order for future work from a directive task. -/// -/// Only spike and chore order types are allowed. The order is automatically -/// linked to the directive associated with the calling task. +/// Create a follow-up order from a directive task (spike/chore only). #[utoipa::path( post, path = "/api/v1/mesh/supervisor/orders", request_body = CreateOrderForTaskRequest, responses( (status = 201, description = "Order created"), - (status = 400, description = "Invalid order type"), + (status = 400, description = "Invalid order type or no directive context"), (status = 401, description = "Unauthorized"), - (status = 403, description = "Forbidden - not a supervisor/directive task"), - (status = 500, description = "Internal server error"), + (status = 403, description = "Not a directive task"), ), + security(("tool_key" = [])), tag = "Mesh Supervisor" )] pub async fn create_order_for_task( @@ -3148,14 +522,11 @@ pub async fn create_order_for_task( headers: HeaderMap, Json(request): Json<CreateOrderForTaskRequest>, ) -> impl IntoResponse { - let (task_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { + let (task_id, owner_id) = match verify_task_auth(&state, &headers).await { Ok(ids) => ids, Err(e) => return e.into_response(), }; - let pool = state.db_pool.as_ref().unwrap(); - - // Validate order_type is spike or chore if request.order_type != "spike" && request.order_type != "chore" { return ( StatusCode::BAD_REQUEST, @@ -3167,7 +538,8 @@ pub async fn create_order_for_task( .into_response(); } - // Get the task to find its directive_id + let pool = state.db_pool.as_ref().unwrap(); + let task = match repository::get_task(pool, task_id).await { Ok(Some(t)) => t, Ok(None) => { @@ -3178,10 +550,10 @@ pub async fn create_order_for_task( .into_response(); } Err(e) => { - tracing::error!(error = %e, "Failed to get task"); + tracing::error!(error = %e, "Failed to fetch task"); return ( StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError::new("DB_ERROR", "Failed to get task")), + Json(ApiError::new("DB_ERROR", "Failed to fetch task")), ) .into_response(); } @@ -3192,27 +564,21 @@ pub async fn create_order_for_task( None => { return ( StatusCode::BAD_REQUEST, - Json(ApiError::new( - "NO_DIRECTIVE", - "Task is not associated with a directive", - )), + Json(ApiError::new("NO_DIRECTIVE", "Task is not directive-attached")), ) .into_response(); } }; - // Determine repository_url: use request value, or fall back to directive's repository_url let repository_url = if request.repository_url.is_some() { request.repository_url } else { - // Look up directive for its repository_url match repository::get_directive_for_owner(pool, owner_id, directive_id).await { Ok(Some(d)) => d.repository_url, _ => None, } }; - // Create the order let order_req = CreateOrderRequest { title: request.title, description: request.description, diff --git a/makima/src/server/handlers/mod.rs b/makima/src/server/handlers/mod.rs index 5737360..312fdc7 100644 --- a/makima/src/server/handlers/mod.rs +++ b/makima/src/server/handlers/mod.rs @@ -14,7 +14,6 @@ pub mod directives; pub mod file_ws; pub mod files; pub mod history; -pub mod listen; pub mod mesh; pub mod orders; pub mod mesh_daemon; diff --git a/makima/src/server/messages.rs b/makima/src/server/messages.rs index cecb622..5fa8c24 100644 --- a/makima/src/server/messages.rs +++ b/makima/src/server/messages.rs @@ -25,9 +25,6 @@ pub struct StartMessage { pub channels: u16, /// Audio encoding format pub encoding: AudioEncoding, - /// Optional contract ID to save transcript to (requires auth_token) - #[serde(skip_serializing_if = "Option::is_none")] - pub contract_id: Option<String>, /// Optional auth token (JWT) for authenticated sessions #[serde(skip_serializing_if = "Option::is_none")] pub auth_token: Option<String>, @@ -77,8 +74,6 @@ pub enum ServerMessage { TranscriptSaved { /// The ID of the file where the transcript was saved file_id: String, - /// The ID of the contract the file belongs to - contract_id: String, }, /// Error occurred during processing Error { code: String, message: String }, diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index bd48a8f..62ad1c7 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -18,7 +18,7 @@ use tower_http::trace::TraceLayer; use utoipa::OpenApi; use utoipa_swagger_ui::SwaggerUi; -use crate::server::handlers::{api_keys, daemon_download, directive_documents, directives, file_ws, files, history, listen, mesh, mesh_daemon, mesh_merge, mesh_supervisor, mesh_ws, orders, repository_history, speak, users, versions}; +use crate::server::handlers::{api_keys, daemon_download, directive_documents, directives, file_ws, files, history, mesh, mesh_daemon, mesh_merge, mesh_supervisor, mesh_ws, orders, repository_history, speak, users, versions}; use crate::server::openapi::ApiDoc; use crate::server::state::SharedState; @@ -43,7 +43,6 @@ async fn health_check() -> impl IntoResponse { pub fn make_router(state: SharedState) -> Router { // API v1 routes let api_v1 = Router::new() - .route("/listen", get(listen::websocket_handler)) .route("/speak", get(speak::websocket_handler)) // Listen/transcript-analysis endpoints removed in Phase 5 with the // contracts subsystem. @@ -55,7 +54,6 @@ pub fn make_router(state: SharedState) -> Router { .put(files::update_file) .delete(files::delete_file), ) - .route("/files/{id}/sync-from-repo", post(files::sync_file_from_repo)) // Version history endpoints .route("/files/{id}/versions", get(versions::list_versions)) .route("/files/{id}/versions/{version}", get(versions::get_version)) @@ -103,9 +101,7 @@ pub fn make_router(state: SharedState) -> Router { .route("/mesh/tasks/{id}/merge/abort", post(mesh_merge::merge_abort)) .route("/mesh/tasks/{id}/merge/skip", post(mesh_merge::merge_skip)) .route("/mesh/tasks/{id}/merge/check", get(mesh_merge::merge_check)) - // Checkpoint endpoints - .route("/mesh/tasks/{id}/checkpoint", post(mesh_supervisor::create_checkpoint)) - .route("/mesh/tasks/{id}/checkpoints", get(mesh_supervisor::list_checkpoints)) + // Task conversation history. .route("/mesh/tasks/{id}/conversation", get(history::get_task_conversation)) // Resume and rewind endpoints .route("/mesh/tasks/{id}/rewind", post(mesh::rewind_task)) @@ -114,20 +110,11 @@ pub fn make_router(state: SharedState) -> Router { .route("/mesh/tasks/{id}/checkpoints/{cid}/branch", post(mesh::branch_from_checkpoint)) // Task branching endpoint .route("/mesh/tasks/{id}/branch", post(mesh::branch_task)) - // Supervisor endpoints (for supervisor.sh) - .route("/mesh/supervisor/contracts/{contract_id}/tasks", get(mesh_supervisor::list_contract_tasks)) - .route("/mesh/supervisor/contracts/{contract_id}/tree", get(mesh_supervisor::get_contract_tree)) - .route("/mesh/supervisor/tasks", post(mesh_supervisor::spawn_task)) - .route("/mesh/supervisor/tasks/{task_id}/wait", post(mesh_supervisor::wait_for_task)) - .route("/mesh/supervisor/tasks/{task_id}/read-file", post(mesh_supervisor::read_worktree_file)) - // Supervisor git operations - .route("/mesh/supervisor/branches", post(mesh_supervisor::create_branch)) - .route("/mesh/supervisor/tasks/{task_id}/merge", post(mesh_supervisor::merge_task)) - .route("/mesh/supervisor/tasks/{task_id}/diff", get(mesh_supervisor::get_task_diff)) - .route("/mesh/supervisor/pr", post(mesh_supervisor::create_pr)) - // Supervisor order creation endpoint + // Directive backchannel — used by `makima directive ask` and + // `makima directive create-order`. The /supervisor/ path is + // kept for CLI client backwards compat (see mesh_supervisor.rs + // module docstring). .route("/mesh/supervisor/orders", post(mesh_supervisor::create_order_for_task)) - // Supervisor question endpoints .route("/mesh/supervisor/questions", post(mesh_supervisor::ask_question)) .route("/mesh/supervisor/questions/{question_id}/poll", get(mesh_supervisor::poll_question)) .route("/mesh/questions", get(mesh_supervisor::list_pending_questions)) @@ -315,9 +302,6 @@ const ANONYMOUS_TASK_MAX_AGE_DAYS: i32 = 7; /// Interval for checkpoint patch cleanup (hourly) const CHECKPOINT_PATCH_CLEANUP_INTERVAL_SECS: u64 = 3600; -// Retry orchestrator checks for pending tasks every 30 seconds -const RETRY_ORCHESTRATOR_INTERVAL_SECS: u64 = 30; - /// Run the HTTP server with graceful shutdown support. /// /// # Arguments @@ -455,63 +439,9 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> { } }); - // Clone state and pool for retry orchestrator - let retry_pool = pool.clone(); - let retry_state = state.clone(); - - // Spawn retry orchestrator - periodically retries pending tasks on available daemons - tokio::spawn(async move { - let mut interval = tokio::time::interval( - std::time::Duration::from_secs(RETRY_ORCHESTRATOR_INTERVAL_SECS) - ); - loop { - interval.tick().await; - - // Get all contracts with pending tasks awaiting retry - match crate::db::repository::get_all_pending_task_contracts(&retry_pool).await { - Ok(contract_owners) => { - for (contract_id, owner_id) in contract_owners { - // Try to start a pending task for this contract - match handlers::mesh_supervisor::try_start_pending_task( - &retry_state, - contract_id, - owner_id, - ).await { - Ok(Some(task)) => { - tracing::info!( - task_id = %task.id, - contract_id = %contract_id, - retry_count = task.retry_count, - "Retry orchestrator started pending task" - ); - } - Ok(None) => { - // No tasks could be started (no available daemons, etc.) - } - Err(e) => { - tracing::warn!( - contract_id = %contract_id, - error = %e, - "Retry orchestrator failed to start pending task" - ); - } - } - } - } - Err(e) => { - tracing::warn!( - error = %e, - "Retry orchestrator failed to query pending task contracts" - ); - } - } - } - }); - - tracing::info!( - "Retry orchestrator started (interval: {}s)", - RETRY_ORCHESTRATOR_INTERVAL_SECS - ); + // Retry orchestrator (contract-keyed) removed alongside legacy + // contracts — the directive system has its own reconciler that + // handles pending directive tasks. // Spawn directive orchestrator - automates directive lifecycle let directive_pool = pool.clone(); diff --git a/makima/src/server/openapi.rs b/makima/src/server/openapi.rs index 13ba787..51ce01d 100644 --- a/makima/src/server/openapi.rs +++ b/makima/src/server/openapi.rs @@ -3,35 +3,30 @@ use utoipa::OpenApi; use crate::db::models::{ - AddLocalRepositoryRequest, AddRemoteRepositoryRequest, BranchInfo, BranchListResponse, BranchTaskRequest, BranchTaskResponse, - ChangePhaseRequest, - Contract, ContractEvent, - ContractListResponse, ContractRepository, ContractSummary, ContractWithRelations, CleanupResponse, - CreateContractRequest, CreateDirectiveRequest, CreateDirectiveStepRequest, CreateFileRequest, - CreateManagedRepositoryRequest, CreateOrderRequest, CreateTaskRequest, + CreateDirectiveRequest, CreateDirectiveStepRequest, CreateFileRequest, + CreateOrderRequest, CreateTaskRequest, Daemon, DaemonDirectoriesResponse, DaemonDirectory, DaemonListResponse, Directive, DirectiveDocument, DirectiveListResponse, DirectiveRevision, DirectiveStep, DirectiveSummary, DirectiveWithSteps, File, FileListResponse, FileSummary, LinkDirectiveRequest, MergeCommitRequest, MergeCompleteCheckResponse, MergeResolveRequest, MergeResultResponse, - MergeSkipRequest, MergeStartRequest, MergeStatusResponse, MeshChatConversation, - MeshChatHistoryResponse, MeshChatMessageRecord, + MergeSkipRequest, MergeStartRequest, MergeStatusResponse, Order, OrderListResponse, OrderListQuery, RepositoryHistoryEntry, RepositoryHistoryListResponse, RepositorySuggestionsQuery, SendMessageRequest, Task, TaskEventListResponse, TaskListResponse, TaskSummary, TaskWithSubtasks, TranscriptEntry, - UpdateContractRequest, UpdateDirectiveRequest, UpdateDirectiveStepRequest, + UpdateDirectiveRequest, UpdateDirectiveStepRequest, UpdateFileRequest, UpdateOrderRequest, UpdateTaskRequest, }; use crate::server::auth::{ ApiKey, ApiKeyInfoResponse, CreateApiKeyRequest, CreateApiKeyResponse, RefreshApiKeyRequest, RefreshApiKeyResponse, RevokeApiKeyResponse, }; -use crate::server::handlers::{api_keys, directive_documents, directives, files, listen, mesh, mesh_merge, orders, repository_history, users}; +use crate::server::handlers::{api_keys, directive_documents, directives, files, mesh, mesh_merge, orders, repository_history, users}; use crate::server::messages::{ApiError, AudioEncoding, StartMessage, StopMessage, TranscriptMessage}; #[derive(OpenApi)] @@ -43,13 +38,11 @@ use crate::server::messages::{ApiError, AudioEncoding, StartMessage, StopMessage license(name = "MIT"), ), paths( - listen::websocket_handler, files::list_files, files::get_file, files::create_file, files::update_file, files::delete_file, - files::sync_file_from_repo, // Mesh endpoints mesh::list_tasks, mesh::get_task, @@ -170,10 +163,6 @@ use crate::server::messages::{ApiError, AudioEncoding, StartMessage, StopMessage DaemonDirectoriesResponse, DaemonDirectory, mesh::TaskPatchDataResponse, - MeshChatConversation, - MeshChatMessageRecord, - MeshChatHistoryResponse, - // Contract chat / discuss schemas removed in Phase 5. // Merge schemas BranchInfo, BranchListResponse, @@ -201,19 +190,6 @@ use crate::server::messages::{ApiError, AudioEncoding, StartMessage, StopMessage users::DeleteAccountResponse, users::UserSettingsResponse, users::UpdateUserSettingsRequest, - // Contract schemas - Contract, - ContractSummary, - ContractListResponse, - ContractWithRelations, - ContractRepository, - ContractEvent, - CreateContractRequest, - UpdateContractRequest, - AddRemoteRepositoryRequest, - AddLocalRepositoryRequest, - CreateManagedRepositoryRequest, - ChangePhaseRequest, // Directive schemas Directive, DirectiveStep, diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs index e267da1..9e06b4c 100644 --- a/makima/src/server/state.rs +++ b/makima/src/server/state.rs @@ -140,10 +140,8 @@ pub struct PrResultNotification { pub struct SupervisorQuestionNotification { /// Unique ID for this question pub question_id: Uuid, - /// Supervisor task that asked the question + /// Task that asked the question pub task_id: Uuid, - /// Contract this question relates to (Uuid::nil() for directive context) - pub contract_id: Uuid, /// Directive this question relates to (if from a directive task) #[serde(skip_serializing_if = "Option::is_none")] pub directive_id: Option<Uuid>, @@ -172,7 +170,6 @@ pub struct SupervisorQuestionNotification { pub struct PendingSupervisorQuestion { pub question_id: Uuid, pub task_id: Uuid, - pub contract_id: Uuid, /// Directive this question relates to (if from a directive task) pub directive_id: Option<Uuid>, pub owner_id: Uuid, @@ -285,12 +282,6 @@ pub enum DaemonCommand { /// Files to copy from parent task's worktree #[serde(rename = "copyFiles")] copy_files: Option<Vec<String>>, - /// Contract ID if this task is associated with a contract - #[serde(rename = "contractId")] - contract_id: Option<Uuid>, - /// Whether this task is a supervisor (long-running contract orchestrator) - #[serde(rename = "isSupervisor")] - is_supervisor: bool, /// Whether to run in autonomous loop mode #[serde(rename = "autonomousLoop", default)] autonomous_loop: bool, @@ -306,15 +297,12 @@ pub enum DaemonCommand { /// Commit SHA to apply the patch on top of #[serde(rename = "patchBaseSha", default, skip_serializing_if = "Option::is_none")] patch_base_sha: Option<String>, - /// Whether the contract is in local-only mode (skips automatic completion actions) + /// Whether to skip automatic completion actions (local-only mode). #[serde(rename = "localOnly", default)] local_only: bool, /// Whether to auto-merge to target branch locally when local_only mode is enabled #[serde(rename = "autoMergeLocal", default)] auto_merge_local: bool, - /// Task ID to share worktree with (supervisor's task ID). If Some, use that task's worktree instead of creating a new one. - #[serde(rename = "supervisorWorktreeTaskId", default, skip_serializing_if = "Option::is_none")] - supervisor_worktree_task_id: Option<Uuid>, /// Directive ID if this task is associated with a directive #[serde(rename = "directiveId", default, skip_serializing_if = "Option::is_none")] directive_id: Option<Uuid>, @@ -906,29 +894,12 @@ impl AppState { let _ = self.pr_results.send(notification); } - /// Add a pending supervisor question and broadcast it. + /// Add a pending question and broadcast it. Questions live in + /// memory only; they're a back-channel for directive tasks to + /// pause for clarification (used by `makima directive ask`). pub fn add_supervisor_question( &self, task_id: Uuid, - contract_id: Uuid, - owner_id: Uuid, - question: String, - choices: Vec<String>, - context: Option<String>, - multi_select: bool, - question_type: String, - ) -> Uuid { - self.add_supervisor_question_with_directive( - task_id, contract_id, None, owner_id, - question, choices, context, multi_select, question_type, - ) - } - - /// Add a pending supervisor question with optional directive context and broadcast it. - pub fn add_supervisor_question_with_directive( - &self, - task_id: Uuid, - contract_id: Uuid, directive_id: Option<Uuid>, owner_id: Uuid, question: String, @@ -940,13 +911,11 @@ impl AppState { let question_id = Uuid::new_v4(); let now = chrono::Utc::now(); - // Store the pending question self.pending_questions.insert( question_id, PendingSupervisorQuestion { question_id, task_id, - contract_id, directive_id, owner_id, question: question.clone(), @@ -958,11 +927,9 @@ impl AppState { }, ); - // Broadcast to subscribers self.broadcast_supervisor_question(SupervisorQuestionNotification { question_id, task_id, - contract_id, directive_id, owner_id: Some(owner_id), question, @@ -976,10 +943,9 @@ impl AppState { tracing::info!( question_id = %question_id, task_id = %task_id, - contract_id = %contract_id, directive_id = ?directive_id, question_type = %question_type, - "Supervisor question added" + "Question added" ); question_id @@ -1029,7 +995,6 @@ impl AppState { self.broadcast_supervisor_question(SupervisorQuestionNotification { question_id, task_id: question.1.task_id, - contract_id: question.1.contract_id, directive_id: question.1.directive_id, owner_id: Some(question.1.owner_id), question: question.1.question, @@ -1093,38 +1058,6 @@ impl AppState { count } - /// Remove all pending questions for a specific contract. - /// - /// This should be called when a contract is deleted to clean up orphaned questions. - /// Returns the number of questions removed. - pub fn remove_pending_questions_for_contract(&self, contract_id: Uuid) -> usize { - // Collect question IDs to remove - let question_ids: Vec<Uuid> = self - .pending_questions - .iter() - .filter(|entry| entry.value().contract_id == contract_id) - .map(|entry| entry.value().question_id) - .collect(); - - let count = question_ids.len(); - - // Remove pending questions and their responses - for question_id in question_ids { - self.pending_questions.remove(&question_id); - self.question_responses.remove(&question_id); - } - - if count > 0 { - tracing::info!( - contract_id = %contract_id, - count = count, - "Cleaned up pending questions for deleted contract" - ); - } - - count - } - /// Register a new daemon connection. /// /// Returns the connection_id for later reference. @@ -1329,176 +1262,6 @@ impl AppState { .map(|entry| entry.value().clone()) } - // ========================================================================= - // Supervisor Notifications - // ========================================================================= - - /// Notify a contract's supervisor task about an event. - /// - /// This sends a message to the supervisor's stdin so it can react to changes - /// in tasks or contract state. - pub async fn notify_supervisor( - &self, - supervisor_task_id: Uuid, - supervisor_daemon_id: Option<Uuid>, - message: &str, - ) -> Result<(), String> { - // Only send if we have a daemon ID - let daemon_id = match supervisor_daemon_id { - Some(id) => id, - None => { - tracing::debug!( - supervisor_task_id = %supervisor_task_id, - "Supervisor has no daemon assigned, skipping notification" - ); - return Ok(()); - } - }; - - let command = DaemonCommand::SendMessage { - task_id: supervisor_task_id, - message: message.to_string(), - }; - - self.send_daemon_command(daemon_id, command).await - } - - /// Format and send a task completion notification to a supervisor. - /// - /// If `action_directive` is provided, it will be appended to the message - /// as an [ACTION REQUIRED] block to prompt the supervisor to take action. - pub async fn notify_supervisor_of_task_completion( - &self, - supervisor_task_id: Uuid, - supervisor_daemon_id: Option<Uuid>, - completed_task_id: Uuid, - completed_task_name: &str, - status: &str, - progress_summary: Option<&str>, - error_message: Option<&str>, - action_directive: Option<&str>, - ) { - let mut message = format!( - "TASK_COMPLETED task_id={} name=\"{}\" status={}", - completed_task_id, completed_task_name, status - ); - - if let Some(summary) = progress_summary { - // Escape newlines in summary - let escaped = summary.replace('\n', "\\n"); - message.push_str(&format!(" summary=\"{}\"", escaped)); - } - - if let Some(err) = error_message { - let escaped = err.replace('\n', "\\n"); - message.push_str(&format!(" error=\"{}\"", escaped)); - } - - // Append action directive if provided - if let Some(directive) = action_directive { - message.push_str("\n\n"); - message.push_str(directive); - } - - if let Err(e) = self.notify_supervisor( - supervisor_task_id, - supervisor_daemon_id, - &message, - ).await { - tracing::warn!( - supervisor_task_id = %supervisor_task_id, - completed_task_id = %completed_task_id, - "Failed to notify supervisor of task completion: {}", - e - ); - } - } - - /// Format and send a task status change notification to a supervisor. - pub async fn notify_supervisor_of_task_update( - &self, - supervisor_task_id: Uuid, - supervisor_daemon_id: Option<Uuid>, - updated_task_id: Uuid, - updated_task_name: &str, - new_status: &str, - updated_fields: &[String], - ) { - let message = format!( - "TASK_UPDATED task_id={} name=\"{}\" status={} fields={}", - updated_task_id, - updated_task_name, - new_status, - updated_fields.join(",") - ); - - if let Err(e) = self.notify_supervisor( - supervisor_task_id, - supervisor_daemon_id, - &message, - ).await { - tracing::warn!( - supervisor_task_id = %supervisor_task_id, - updated_task_id = %updated_task_id, - "Failed to notify supervisor of task update: {}", - e - ); - } - } - - /// Format and send a contract phase change notification to a supervisor. - pub async fn notify_supervisor_of_phase_change( - &self, - supervisor_task_id: Uuid, - supervisor_daemon_id: Option<Uuid>, - contract_id: Uuid, - new_phase: &str, - ) { - let message = format!( - "PHASE_CHANGED contract_id={} phase={}", - contract_id, new_phase - ); - - if let Err(e) = self.notify_supervisor( - supervisor_task_id, - supervisor_daemon_id, - &message, - ).await { - tracing::warn!( - supervisor_task_id = %supervisor_task_id, - contract_id = %contract_id, - "Failed to notify supervisor of phase change: {}", - e - ); - } - } - - /// Format and send a new task created notification to a supervisor. - pub async fn notify_supervisor_of_task_created( - &self, - supervisor_task_id: Uuid, - supervisor_daemon_id: Option<Uuid>, - new_task_id: Uuid, - new_task_name: &str, - ) { - let message = format!( - "TASK_CREATED task_id={} name=\"{}\"", - new_task_id, new_task_name - ); - - if let Err(e) = self.notify_supervisor( - supervisor_task_id, - supervisor_daemon_id, - &message, - ).await { - tracing::warn!( - supervisor_task_id = %supervisor_task_id, - new_task_id = %new_task_id, - "Failed to notify supervisor of task creation: {}", - e - ); - } - } } /// Type alias for the shared application state. |
