From eeafe072bc6bb81459f7d087b48fc921afe9cc11 Mon Sep 17 00:00:00 2001 From: soryu Date: Thu, 15 Jan 2026 03:26:28 +0000 Subject: Automatically derive repo URL and add notifications for input --- makima/src/server/state.rs | 185 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 185 insertions(+) (limited to 'makima/src/server/state.rs') diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs index 1c28544..495fc15 100644 --- a/makima/src/server/state.rs +++ b/makima/src/server/state.rs @@ -103,6 +103,54 @@ pub struct TaskCompletionNotification { pub error_message: Option, } +/// Notification for supervisor questions requiring user feedback. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorQuestionNotification { + /// Unique ID for this question + pub question_id: Uuid, + /// Supervisor task that asked the question + pub task_id: Uuid, + /// Contract this question relates to + pub contract_id: Uuid, + /// Owner ID for data isolation + #[serde(skip)] + pub owner_id: Option, + /// The question text + pub question: String, + /// Optional choices for the user (if empty, free-form text response) + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub choices: Vec, + /// Context about what phase/action this relates to + #[serde(skip_serializing_if = "Option::is_none")] + pub context: Option, + /// Whether this question is still pending + pub pending: bool, + /// When the question was asked + pub created_at: chrono::DateTime, +} + +/// Stored supervisor question for persistence +#[derive(Debug, Clone)] +pub struct PendingSupervisorQuestion { + pub question_id: Uuid, + pub task_id: Uuid, + pub contract_id: Uuid, + pub owner_id: Uuid, + pub question: String, + pub choices: Vec, + pub context: Option, + pub created_at: chrono::DateTime, +} + +/// Response to a supervisor question +#[derive(Debug, Clone)] +pub struct SupervisorQuestionResponse { + pub question_id: Uuid, + pub response: String, + pub responded_at: chrono::DateTime, +} + /// Command sent from server to daemon. #[derive(Debug, Clone, serde::Serialize)] #[serde(tag = "type", rename_all = "camelCase")] @@ -408,6 +456,12 @@ pub struct AppState { pub task_output: broadcast::Sender, /// Broadcast channel for task completion notifications (for supervisors) pub task_completions: broadcast::Sender, + /// Broadcast channel for supervisor question notifications + pub supervisor_questions: broadcast::Sender, + /// Pending supervisor questions awaiting user response (keyed by question_id) + pub pending_questions: DashMap, + /// Responses to supervisor questions (keyed by question_id) + pub question_responses: DashMap, /// Active daemon connections (keyed by connection_id) pub daemon_connections: DashMap, /// Tool keys for orchestrator API access (key -> task_id) @@ -435,6 +489,7 @@ impl AppState { let (task_updates, _) = broadcast::channel(256); let (task_output, _) = broadcast::channel(1024); // Larger buffer for output streaming let (task_completions, _) = broadcast::channel(256); // For supervisor task monitoring + let (supervisor_questions, _) = broadcast::channel(256); // For supervisor questions to users // Initialize JWT verifier from environment (optional) // Requires SUPABASE_URL and either SUPABASE_JWT_PUBLIC_KEY (RS256) or SUPABASE_JWT_SECRET (HS256) @@ -476,6 +531,9 @@ impl AppState { task_updates, task_output, task_completions, + supervisor_questions, + pending_questions: DashMap::new(), + question_responses: DashMap::new(), daemon_connections: DashMap::new(), tool_keys: DashMap::new(), jwt_verifier, @@ -556,6 +614,133 @@ impl AppState { let _ = self.task_completions.send(notification); } + /// Broadcast a supervisor question notification to all subscribers. + /// + /// Used to notify frontend clients when a supervisor needs user feedback. + pub fn broadcast_supervisor_question(&self, notification: SupervisorQuestionNotification) { + let _ = self.supervisor_questions.send(notification); + } + + /// Add a pending supervisor question and broadcast it. + pub fn add_supervisor_question( + &self, + task_id: Uuid, + contract_id: Uuid, + owner_id: Uuid, + question: String, + choices: Vec, + context: Option, + ) -> Uuid { + let question_id = Uuid::new_v4(); + let now = chrono::Utc::now(); + + // Store the pending question + self.pending_questions.insert( + question_id, + PendingSupervisorQuestion { + question_id, + task_id, + contract_id, + owner_id, + question: question.clone(), + choices: choices.clone(), + context: context.clone(), + created_at: now, + }, + ); + + // Broadcast to subscribers + self.broadcast_supervisor_question(SupervisorQuestionNotification { + question_id, + task_id, + contract_id, + owner_id: Some(owner_id), + question, + choices, + context, + pending: true, + created_at: now, + }); + + tracing::info!( + question_id = %question_id, + task_id = %task_id, + contract_id = %contract_id, + "Supervisor question added" + ); + + question_id + } + + /// Remove a pending question (after it's been answered). + pub fn remove_pending_question(&self, question_id: Uuid) -> Option { + self.pending_questions.remove(&question_id).map(|(_, q)| q) + } + + /// Get all pending questions for an owner. + pub fn get_pending_questions_for_owner(&self, owner_id: Uuid) -> Vec { + self.pending_questions + .iter() + .filter(|entry| entry.value().owner_id == owner_id) + .map(|entry| entry.value().clone()) + .collect() + } + + /// Get a specific pending question. + pub fn get_pending_question(&self, question_id: Uuid) -> Option { + self.pending_questions.get(&question_id).map(|entry| entry.value().clone()) + } + + /// Submit a response to a supervisor question. + pub fn submit_question_response(&self, question_id: Uuid, response: String) -> bool { + // Check if the question exists + if let Some(question) = self.pending_questions.remove(&question_id) { + let now = chrono::Utc::now(); + + // Store the response + self.question_responses.insert( + question_id, + SupervisorQuestionResponse { + question_id, + response: response.clone(), + responded_at: now, + }, + ); + + // Broadcast that the question is no longer pending + self.broadcast_supervisor_question(SupervisorQuestionNotification { + question_id, + task_id: question.1.task_id, + contract_id: question.1.contract_id, + owner_id: Some(question.1.owner_id), + question: question.1.question, + choices: question.1.choices, + context: question.1.context, + pending: false, + created_at: question.1.created_at, + }); + + tracing::info!( + question_id = %question_id, + "Supervisor question answered" + ); + + true + } else { + false + } + } + + /// Get the response to a question (if answered). + pub fn get_question_response(&self, question_id: Uuid) -> Option { + self.question_responses.get(&question_id).map(|entry| entry.value().clone()) + } + + /// Clean up a question response after the supervisor has read it. + pub fn cleanup_question_response(&self, question_id: Uuid) { + self.question_responses.remove(&question_id); + } + /// Register a new daemon connection. /// /// Returns the connection_id for later reference. -- cgit v1.2.3