summaryrefslogtreecommitdiff
path: root/makima/src/server/state.rs
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/server/state.rs')
-rw-r--r--makima/src/server/state.rs185
1 files changed, 185 insertions, 0 deletions
diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs
index 1c28544..495fc15 100644
--- a/makima/src/server/state.rs
+++ b/makima/src/server/state.rs
@@ -103,6 +103,54 @@ pub struct TaskCompletionNotification {
pub error_message: Option<String>,
}
+/// Notification for supervisor questions requiring user feedback.
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct SupervisorQuestionNotification {
+ /// Unique ID for this question
+ pub question_id: Uuid,
+ /// Supervisor task that asked the question
+ pub task_id: Uuid,
+ /// Contract this question relates to
+ pub contract_id: Uuid,
+ /// Owner ID for data isolation
+ #[serde(skip)]
+ pub owner_id: Option<Uuid>,
+ /// The question text
+ pub question: String,
+ /// Optional choices for the user (if empty, free-form text response)
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ pub choices: Vec<String>,
+ /// Context about what phase/action this relates to
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub context: Option<String>,
+ /// Whether this question is still pending
+ pub pending: bool,
+ /// When the question was asked
+ pub created_at: chrono::DateTime<chrono::Utc>,
+}
+
+/// Stored supervisor question for persistence
+#[derive(Debug, Clone)]
+pub struct PendingSupervisorQuestion {
+ pub question_id: Uuid,
+ pub task_id: Uuid,
+ pub contract_id: Uuid,
+ pub owner_id: Uuid,
+ pub question: String,
+ pub choices: Vec<String>,
+ pub context: Option<String>,
+ pub created_at: chrono::DateTime<chrono::Utc>,
+}
+
+/// Response to a supervisor question
+#[derive(Debug, Clone)]
+pub struct SupervisorQuestionResponse {
+ pub question_id: Uuid,
+ pub response: String,
+ pub responded_at: chrono::DateTime<chrono::Utc>,
+}
+
/// Command sent from server to daemon.
#[derive(Debug, Clone, serde::Serialize)]
#[serde(tag = "type", rename_all = "camelCase")]
@@ -408,6 +456,12 @@ pub struct AppState {
pub task_output: broadcast::Sender<TaskOutputNotification>,
/// Broadcast channel for task completion notifications (for supervisors)
pub task_completions: broadcast::Sender<TaskCompletionNotification>,
+ /// Broadcast channel for supervisor question notifications
+ pub supervisor_questions: broadcast::Sender<SupervisorQuestionNotification>,
+ /// Pending supervisor questions awaiting user response (keyed by question_id)
+ pub pending_questions: DashMap<Uuid, PendingSupervisorQuestion>,
+ /// Responses to supervisor questions (keyed by question_id)
+ pub question_responses: DashMap<Uuid, SupervisorQuestionResponse>,
/// Active daemon connections (keyed by connection_id)
pub daemon_connections: DashMap<String, DaemonConnectionInfo>,
/// Tool keys for orchestrator API access (key -> task_id)
@@ -435,6 +489,7 @@ impl AppState {
let (task_updates, _) = broadcast::channel(256);
let (task_output, _) = broadcast::channel(1024); // Larger buffer for output streaming
let (task_completions, _) = broadcast::channel(256); // For supervisor task monitoring
+ let (supervisor_questions, _) = broadcast::channel(256); // For supervisor questions to users
// Initialize JWT verifier from environment (optional)
// Requires SUPABASE_URL and either SUPABASE_JWT_PUBLIC_KEY (RS256) or SUPABASE_JWT_SECRET (HS256)
@@ -476,6 +531,9 @@ impl AppState {
task_updates,
task_output,
task_completions,
+ supervisor_questions,
+ pending_questions: DashMap::new(),
+ question_responses: DashMap::new(),
daemon_connections: DashMap::new(),
tool_keys: DashMap::new(),
jwt_verifier,
@@ -556,6 +614,133 @@ impl AppState {
let _ = self.task_completions.send(notification);
}
+ /// Broadcast a supervisor question notification to all subscribers.
+ ///
+ /// Used to notify frontend clients when a supervisor needs user feedback.
+ pub fn broadcast_supervisor_question(&self, notification: SupervisorQuestionNotification) {
+ let _ = self.supervisor_questions.send(notification);
+ }
+
+ /// Add a pending supervisor question and broadcast it.
+ pub fn add_supervisor_question(
+ &self,
+ task_id: Uuid,
+ contract_id: Uuid,
+ owner_id: Uuid,
+ question: String,
+ choices: Vec<String>,
+ context: Option<String>,
+ ) -> Uuid {
+ let question_id = Uuid::new_v4();
+ let now = chrono::Utc::now();
+
+ // Store the pending question
+ self.pending_questions.insert(
+ question_id,
+ PendingSupervisorQuestion {
+ question_id,
+ task_id,
+ contract_id,
+ owner_id,
+ question: question.clone(),
+ choices: choices.clone(),
+ context: context.clone(),
+ created_at: now,
+ },
+ );
+
+ // Broadcast to subscribers
+ self.broadcast_supervisor_question(SupervisorQuestionNotification {
+ question_id,
+ task_id,
+ contract_id,
+ owner_id: Some(owner_id),
+ question,
+ choices,
+ context,
+ pending: true,
+ created_at: now,
+ });
+
+ tracing::info!(
+ question_id = %question_id,
+ task_id = %task_id,
+ contract_id = %contract_id,
+ "Supervisor question added"
+ );
+
+ question_id
+ }
+
+ /// Remove a pending question (after it's been answered).
+ pub fn remove_pending_question(&self, question_id: Uuid) -> Option<PendingSupervisorQuestion> {
+ self.pending_questions.remove(&question_id).map(|(_, q)| q)
+ }
+
+ /// Get all pending questions for an owner.
+ pub fn get_pending_questions_for_owner(&self, owner_id: Uuid) -> Vec<PendingSupervisorQuestion> {
+ self.pending_questions
+ .iter()
+ .filter(|entry| entry.value().owner_id == owner_id)
+ .map(|entry| entry.value().clone())
+ .collect()
+ }
+
+ /// Get a specific pending question.
+ pub fn get_pending_question(&self, question_id: Uuid) -> Option<PendingSupervisorQuestion> {
+ self.pending_questions.get(&question_id).map(|entry| entry.value().clone())
+ }
+
+ /// Submit a response to a supervisor question.
+ pub fn submit_question_response(&self, question_id: Uuid, response: String) -> bool {
+ // Check if the question exists
+ if let Some(question) = self.pending_questions.remove(&question_id) {
+ let now = chrono::Utc::now();
+
+ // Store the response
+ self.question_responses.insert(
+ question_id,
+ SupervisorQuestionResponse {
+ question_id,
+ response: response.clone(),
+ responded_at: now,
+ },
+ );
+
+ // Broadcast that the question is no longer pending
+ self.broadcast_supervisor_question(SupervisorQuestionNotification {
+ question_id,
+ task_id: question.1.task_id,
+ contract_id: question.1.contract_id,
+ owner_id: Some(question.1.owner_id),
+ question: question.1.question,
+ choices: question.1.choices,
+ context: question.1.context,
+ pending: false,
+ created_at: question.1.created_at,
+ });
+
+ tracing::info!(
+ question_id = %question_id,
+ "Supervisor question answered"
+ );
+
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Get the response to a question (if answered).
+ pub fn get_question_response(&self, question_id: Uuid) -> Option<SupervisorQuestionResponse> {
+ self.question_responses.get(&question_id).map(|entry| entry.value().clone())
+ }
+
+ /// Clean up a question response after the supervisor has read it.
+ pub fn cleanup_question_response(&self, question_id: Uuid) {
+ self.question_responses.remove(&question_id);
+ }
+
/// Register a new daemon connection.
///
/// Returns the connection_id for later reference.