diff options
| author | soryu <soryu@soryu.co> | 2026-05-18 01:21:30 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-05-18 01:21:30 +0100 |
| commit | f240675da99bc7705e473b8f70a2628812aa4c10 (patch) | |
| tree | 3ee2d24b431ccb8cd1a3013c86b34a5782a3e224 /makima/src/daemon/tui/ws_client.rs | |
| parent | 0d996cf7590e3e52f424859c7d6f0e68640f119e (diff) | |
| download | soryu-master.tar.gz soryu-master.zip | |
The contracts table, supervisor task type, and all their backing
machinery have been inert for several PRs. The directives system reads
its own active contract body for spec text, and PR #135 removed the
last LLM surface that spawned supervisors.
This PR wipes the dead surface in one shot — the user authorised a DB
wipe, so the migration drops every legacy table with CASCADE rather
than carrying forward stub rows. Net change: −12k LOC across handlers,
repository, state, models, the TUI, and the listen module.
What's gone:
- contracts, contract_chat_*, contract_events, contract_repositories,
contract_type_templates tables.
- supervisor_states, supervisor_heartbeats tables.
- mesh_chat_conversations, mesh_chat_messages tables.
- tasks.contract_id/is_supervisor/supervisor_task_id/supervisor_worktree_task_id columns.
- directive_steps.contract_id/contract_type columns.
- files.contract_id/contract_phase columns.
- history_events.contract_id/phase columns.
- The Contract/Supervisor/MeshChat handler + model + repository
surface, plus the daemon TUI views that read them.
- The standalone listen.rs websocket handler (orphaned with the LLM).
What stays:
- mesh_supervisor handler: trimmed to just the questions + orders
backchannel used by `makima directive ask` / `create-order` (kept
the URL prefix for CLI client compat).
- directive_documents (the user-facing "contracts" surface).
- pending_questions in-memory state for the directive Ask flow.
cargo check, cargo test --lib (68 passed), tsc, and vite build all
clean.
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Diffstat (limited to 'makima/src/daemon/tui/ws_client.rs')
| -rw-r--r-- | makima/src/daemon/tui/ws_client.rs | 353 |
1 files changed, 0 insertions, 353 deletions
diff --git a/makima/src/daemon/tui/ws_client.rs b/makima/src/daemon/tui/ws_client.rs deleted file mode 100644 index 3462467..0000000 --- a/makima/src/daemon/tui/ws_client.rs +++ /dev/null @@ -1,353 +0,0 @@ -//! TUI WebSocket client for task output streaming. -//! -//! Uses a dedicated async thread to handle WebSocket communication, -//! bridging async/sync worlds via channels. - -use std::sync::mpsc as std_mpsc; -use std::thread; -use std::time::Duration; - -use serde::{Deserialize, Serialize}; -use tokio::runtime::Runtime; -use tokio::sync::mpsc as tokio_mpsc; -use uuid::Uuid; - -/// Commands sent from TUI to WebSocket client -#[derive(Debug, Clone)] -pub enum WsCommand { - /// Subscribe to task output - Subscribe { task_id: Uuid }, - /// Unsubscribe from task output - Unsubscribe { task_id: Uuid }, - /// Shutdown the WebSocket client - Shutdown, -} - -/// Events sent from WebSocket client to TUI -#[derive(Debug, Clone)] -pub enum WsEvent { - /// WebSocket connected - Connected, - /// WebSocket disconnected - Disconnected, - /// WebSocket reconnecting - Reconnecting { attempt: u32 }, - /// Subscription confirmed - Subscribed { task_id: Uuid }, - /// Unsubscription confirmed - Unsubscribed { task_id: Uuid }, - /// Task output received - TaskOutput(TaskOutputEvent), - /// Error occurred - Error { message: String }, -} - -/// Task output event from server -#[derive(Debug, Clone)] -pub struct TaskOutputEvent { - pub task_id: Uuid, - pub message_type: String, - pub content: String, - pub tool_name: Option<String>, - pub tool_input: Option<serde_json::Value>, - pub is_error: Option<bool>, - pub cost_usd: Option<f64>, - pub duration_ms: Option<u64>, - pub is_partial: bool, -} - -/// Messages sent to the WebSocket server -#[derive(Debug, Clone, Serialize)] -#[serde(tag = "type", rename_all = "camelCase")] -enum ClientMessage { - SubscribeOutput { - #[serde(rename = "taskId")] - task_id: Uuid, - }, - UnsubscribeOutput { - #[serde(rename = "taskId")] - task_id: Uuid, - }, -} - -/// Messages received from the WebSocket server -#[derive(Debug, Clone, Deserialize)] -#[serde(tag = "type", rename_all = "camelCase")] -enum ServerMessage { - OutputSubscribed { - #[serde(rename = "taskId")] - task_id: Uuid, - }, - OutputUnsubscribed { - #[serde(rename = "taskId")] - task_id: Uuid, - }, - TaskOutput { - #[serde(rename = "taskId")] - task_id: Uuid, - #[serde(rename = "messageType")] - message_type: String, - content: String, - #[serde(rename = "toolName")] - tool_name: Option<String>, - #[serde(rename = "toolInput")] - tool_input: Option<serde_json::Value>, - #[serde(rename = "isError")] - is_error: Option<bool>, - #[serde(rename = "costUsd")] - cost_usd: Option<f64>, - #[serde(rename = "durationMs")] - duration_ms: Option<u64>, - #[serde(rename = "isPartial")] - is_partial: bool, - }, - Error { - code: String, - message: String, - }, - // Other message types we don't care about - #[serde(other)] - Other, -} - -/// TUI WebSocket client handle -pub struct TuiWsClient { - /// Command sender to WebSocket thread - command_tx: tokio_mpsc::Sender<WsCommand>, - /// Event receiver from WebSocket thread - event_rx: std_mpsc::Receiver<WsEvent>, -} - -impl TuiWsClient { - /// Start a new WebSocket client in a dedicated thread - pub fn start(api_url: String, api_key: String) -> Self { - let (command_tx, command_rx) = tokio_mpsc::channel(32); - let (event_tx, event_rx) = std_mpsc::channel(); - - // Spawn as daemon thread so it doesn't block process exit - thread::Builder::new() - .name("ws-client".to_string()) - .spawn(move || { - let rt = match Runtime::new() { - Ok(rt) => rt, - Err(e) => { - let _ = event_tx.send(WsEvent::Error { - message: format!("Failed to create tokio runtime: {}", e), - }); - return; - } - }; - rt.block_on(run_ws_client(api_url, api_key, command_rx, event_tx)); - }) - .ok(); - - Self { - command_tx, - event_rx, - } - } - - /// Send a command to the WebSocket client (non-blocking) - pub fn send(&self, command: WsCommand) { - // Use try_send to avoid blocking on shutdown - let _ = self.command_tx.try_send(command); - } - - /// Subscribe to task output - pub fn subscribe(&self, task_id: Uuid) { - self.send(WsCommand::Subscribe { task_id }); - } - - /// Unsubscribe from task output - pub fn unsubscribe(&self, task_id: Uuid) { - self.send(WsCommand::Unsubscribe { task_id }); - } - - /// Shutdown the WebSocket client - pub fn shutdown(&self) { - self.send(WsCommand::Shutdown); - } - - /// Try to receive an event (non-blocking) - pub fn try_recv(&self) -> Option<WsEvent> { - self.event_rx.try_recv().ok() - } - - /// Receive an event with timeout - pub fn recv_timeout(&self, timeout: Duration) -> Option<WsEvent> { - self.event_rx.recv_timeout(timeout).ok() - } -} - -impl Drop for TuiWsClient { - fn drop(&mut self) { - // Try to send shutdown command, but don't wait - let _ = self.command_tx.try_send(WsCommand::Shutdown); - } -} - -/// WebSocket client main loop -async fn run_ws_client( - api_url: String, - api_key: String, - mut command_rx: tokio_mpsc::Receiver<WsCommand>, - event_tx: std_mpsc::Sender<WsEvent>, -) { - use futures::{SinkExt, StreamExt}; - use tokio_tungstenite::{connect_async, tungstenite::client::IntoClientRequest, tungstenite::Message}; - - // Build WebSocket URL from HTTP URL - let ws_url = api_url - .replace("https://", "wss://") - .replace("http://", "ws://"); - let ws_url = format!("{}/api/v1/mesh/tasks/subscribe", ws_url); - - let mut reconnect_attempt = 0u32; - let max_reconnect_delay = Duration::from_secs(30); - let initial_delay = Duration::from_secs(1); - - loop { - // Build request with API key header - let mut request = match ws_url.clone().into_client_request() { - Ok(r) => r, - Err(e) => { - let _ = event_tx.send(WsEvent::Error { - message: format!("Invalid URL: {}", e), - }); - return; - } - }; - - // Send both headers - server will try tool key first, then API key - if let Ok(header_value) = api_key.parse() { - request.headers_mut().insert("x-makima-tool-key", header_value); - } - if let Ok(header_value) = api_key.parse() { - request.headers_mut().insert("x-makima-api-key", header_value); - } - - if reconnect_attempt > 0 { - let _ = event_tx.send(WsEvent::Reconnecting { - attempt: reconnect_attempt, - }); - - // Exponential backoff - let delay = std::cmp::min( - initial_delay * 2u32.saturating_pow(reconnect_attempt - 1), - max_reconnect_delay, - ); - tokio::time::sleep(delay).await; - } - - // Try to connect - let (ws_stream, _) = match connect_async(request).await { - Ok(result) => { - reconnect_attempt = 0; - let _ = event_tx.send(WsEvent::Connected); - result - } - Err(e) => { - reconnect_attempt += 1; - let _ = event_tx.send(WsEvent::Error { - message: format!("Connection failed: {}", e), - }); - continue; - } - }; - - let (mut write, mut read) = ws_stream.split(); - - // Main message loop - loop { - tokio::select! { - // Handle commands from TUI - cmd = command_rx.recv() => { - match cmd { - Some(WsCommand::Subscribe { task_id }) => { - let msg = ClientMessage::SubscribeOutput { task_id }; - if let Ok(json) = serde_json::to_string(&msg) { - let _ = write.send(Message::Text(json)).await; - } - } - Some(WsCommand::Unsubscribe { task_id }) => { - let msg = ClientMessage::UnsubscribeOutput { task_id }; - if let Ok(json) = serde_json::to_string(&msg) { - let _ = write.send(Message::Text(json)).await; - } - } - Some(WsCommand::Shutdown) | None => { - let _ = write.close().await; - return; - } - } - } - - // Handle messages from server - msg = read.next() => { - match msg { - Some(Ok(Message::Text(text))) => { - if let Ok(server_msg) = serde_json::from_str::<ServerMessage>(&text) { - match server_msg { - ServerMessage::OutputSubscribed { task_id } => { - let _ = event_tx.send(WsEvent::Subscribed { task_id }); - } - ServerMessage::OutputUnsubscribed { task_id } => { - let _ = event_tx.send(WsEvent::Unsubscribed { task_id }); - } - ServerMessage::TaskOutput { - task_id, - message_type, - content, - tool_name, - tool_input, - is_error, - cost_usd, - duration_ms, - is_partial, - } => { - let _ = event_tx.send(WsEvent::TaskOutput(TaskOutputEvent { - task_id, - message_type, - content, - tool_name, - tool_input, - is_error, - cost_usd, - duration_ms, - is_partial, - })); - } - ServerMessage::Error { code, message } => { - let _ = event_tx.send(WsEvent::Error { - message: format!("{}: {}", code, message), - }); - } - ServerMessage::Other => { - // Ignore other message types - } - } - } - } - Some(Ok(Message::Ping(data))) => { - let _ = write.send(Message::Pong(data)).await; - } - Some(Ok(Message::Close(_))) | None => { - let _ = event_tx.send(WsEvent::Disconnected); - reconnect_attempt += 1; - break; // Reconnect - } - Some(Err(e)) => { - let _ = event_tx.send(WsEvent::Error { - message: format!("WebSocket error: {}", e), - }); - let _ = event_tx.send(WsEvent::Disconnected); - reconnect_attempt += 1; - break; // Reconnect - } - _ => {} - } - } - } - } - } -} |
