diff options
| author | soryu <soryu@soryu.co> | 2025-12-23 22:20:52 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2025-12-23 22:20:52 +0000 |
| commit | 72c2590571104b8d10e3f72d7a5b984d0b520c51 (patch) | |
| tree | 735aa03056a44a93b9abdf915545ad034ee2b597 /makima/src/server/handlers | |
| parent | f5222a7ae5ade5589436778cb01fc0abe625b3c3 (diff) | |
| download | soryu-72c2590571104b8d10e3f72d7a5b984d0b520c51.tar.gz soryu-72c2590571104b8d10e3f72d7a5b984d0b520c51.zip | |
Add conflict notification and file update WS endpoint
Diffstat (limited to 'makima/src/server/handlers')
| -rw-r--r-- | makima/src/server/handlers/chat.rs | 46 | ||||
| -rw-r--r-- | makima/src/server/handlers/file_ws.rs | 163 | ||||
| -rw-r--r-- | makima/src/server/handlers/files.rs | 57 | ||||
| -rw-r--r-- | makima/src/server/handlers/listen.rs | 1 | ||||
| -rw-r--r-- | makima/src/server/handlers/mod.rs | 1 |
5 files changed, 254 insertions, 14 deletions
diff --git a/makima/src/server/handlers/chat.rs b/makima/src/server/handlers/chat.rs index 92c4ec8..3bdbc74 100644 --- a/makima/src/server/handlers/chat.rs +++ b/makima/src/server/handlers/chat.rs @@ -17,7 +17,7 @@ use crate::llm::{ groq::{GroqClient, GroqError, Message, ToolCallResponse}, LlmModel, ToolCall, ToolResult, AVAILABLE_TOOLS, }; -use crate::server::state::SharedState; +use crate::server::state::{FileUpdateNotification, SharedState}; /// Maximum number of tool-calling rounds to prevent infinite loops const MAX_TOOL_ROUNDS: usize = 10; @@ -385,17 +385,43 @@ pub async fn chat_handler( transcript: None, summary: current_summary.clone(), body: Some(current_body.clone()), + version: None, // Internal update, skip version check }; - if let Err(e) = repository::update_file(pool, id, update_req).await { - tracing::error!("Failed to save file changes: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ - "error": format!("Failed to save changes: {}", e) - })), - ) - .into_response(); + match repository::update_file(pool, id, update_req).await { + Ok(Some(updated_file)) => { + // Broadcast update notification for LLM changes + let mut updated_fields = vec!["body".to_string()]; + if current_summary.is_some() { + updated_fields.push("summary".to_string()); + } + state.broadcast_file_update(FileUpdateNotification { + file_id: id, + version: updated_file.version, + updated_fields, + updated_by: "llm".to_string(), + }); + } + Ok(None) => { + // File was deleted during processing + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({ + "error": "File not found" + })), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to save file changes: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({ + "error": format!("Failed to save changes: {}", e) + })), + ) + .into_response(); + } } } diff --git a/makima/src/server/handlers/file_ws.rs b/makima/src/server/handlers/file_ws.rs new file mode 100644 index 0000000..5a44309 --- /dev/null +++ b/makima/src/server/handlers/file_ws.rs @@ -0,0 +1,163 @@ +//! WebSocket handler for file change subscriptions. +//! +//! Clients can subscribe to specific files and receive real-time notifications +//! when those files are updated by any source (user edits, LLM modifications, etc.). + +use axum::{ + extract::{ws::Message, ws::WebSocket, State, WebSocketUpgrade}, + response::Response, +}; +use futures::{SinkExt, StreamExt}; +use serde::{Deserialize, Serialize}; +use std::collections::HashSet; +use uuid::Uuid; + +use crate::server::state::SharedState; + +/// Client message for file subscription management. +#[derive(Debug, Clone, Deserialize)] +#[serde(tag = "type", rename_all = "camelCase")] +pub enum FileClientMessage { + /// Subscribe to updates for a specific file + Subscribe { + #[serde(rename = "fileId")] + file_id: Uuid, + }, + /// Unsubscribe from updates for a specific file + Unsubscribe { + #[serde(rename = "fileId")] + file_id: Uuid, + }, +} + +/// Server message for file subscription WebSocket. +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "type", rename_all = "camelCase")] +pub enum FileServerMessage { + /// Subscription confirmed + Subscribed { + #[serde(rename = "fileId")] + file_id: Uuid, + }, + /// Unsubscription confirmed + Unsubscribed { + #[serde(rename = "fileId")] + file_id: Uuid, + }, + /// File was updated + FileUpdated { + #[serde(rename = "fileId")] + file_id: Uuid, + version: i32, + #[serde(rename = "updatedFields")] + updated_fields: Vec<String>, + #[serde(rename = "updatedBy")] + updated_by: String, + }, + /// Error occurred + Error { code: String, message: String }, +} + +/// WebSocket upgrade handler for file subscriptions. +#[utoipa::path( + get, + path = "/api/v1/files/subscribe", + responses( + (status = 101, description = "WebSocket connection established"), + ), + tag = "Files" +)] +pub async fn file_subscription_handler( + ws: WebSocketUpgrade, + State(state): State<SharedState>, +) -> Response { + ws.on_upgrade(|socket| handle_file_subscription(socket, state)) +} + +async fn handle_file_subscription(socket: WebSocket, state: SharedState) { + let (mut sender, mut receiver) = socket.split(); + + // Set of file IDs this client is subscribed to + let mut subscriptions: HashSet<Uuid> = HashSet::new(); + + // Subscribe to the broadcast channel + let mut broadcast_rx = state.file_updates.subscribe(); + + loop { + tokio::select! { + // Handle incoming WebSocket messages from client + msg = receiver.next() => { + match msg { + Some(Ok(Message::Text(text))) => { + match serde_json::from_str::<FileClientMessage>(&text) { + Ok(FileClientMessage::Subscribe { file_id }) => { + subscriptions.insert(file_id); + let response = FileServerMessage::Subscribed { file_id }; + let json = serde_json::to_string(&response).unwrap(); + if sender.send(Message::Text(json.into())).await.is_err() { + break; + } + tracing::debug!("Client subscribed to file {}", file_id); + } + Ok(FileClientMessage::Unsubscribe { file_id }) => { + subscriptions.remove(&file_id); + let response = FileServerMessage::Unsubscribed { file_id }; + let json = serde_json::to_string(&response).unwrap(); + if sender.send(Message::Text(json.into())).await.is_err() { + break; + } + tracing::debug!("Client unsubscribed from file {}", file_id); + } + Err(e) => { + let response = FileServerMessage::Error { + code: "PARSE_ERROR".into(), + message: e.to_string(), + }; + let json = serde_json::to_string(&response).unwrap(); + let _ = sender.send(Message::Text(json.into())).await; + } + } + } + Some(Ok(Message::Close(_))) | None => { + tracing::debug!("Client disconnected from file subscription"); + break; + } + Some(Err(e)) => { + tracing::warn!("WebSocket error: {}", e); + break; + } + _ => {} + } + } + + // Handle broadcast notifications + notification = broadcast_rx.recv() => { + match notification { + Ok(notification) => { + // Only forward if client is subscribed to this file + if subscriptions.contains(¬ification.file_id) { + let response = FileServerMessage::FileUpdated { + file_id: notification.file_id, + version: notification.version, + updated_fields: notification.updated_fields, + updated_by: notification.updated_by, + }; + let json = serde_json::to_string(&response).unwrap(); + if sender.send(Message::Text(json.into())).await.is_err() { + break; + } + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + // Client is too slow, skip some messages + tracing::warn!("File subscription client lagged, skipped {} messages", n); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + // Channel closed, exit + break; + } + } + } + } + } +} diff --git a/makima/src/server/handlers/files.rs b/makima/src/server/handlers/files.rs index 746d66b..c65eed5 100644 --- a/makima/src/server/handlers/files.rs +++ b/makima/src/server/handlers/files.rs @@ -9,9 +9,9 @@ use axum::{ use uuid::Uuid; use crate::db::models::{CreateFileRequest, FileListResponse, FileSummary, UpdateFileRequest}; -use crate::db::repository; +use crate::db::repository::{self, RepositoryError}; use crate::server::messages::ApiError; -use crate::server::state::SharedState; +use crate::server::state::{FileUpdateNotification, SharedState}; /// List all files for the current owner. #[utoipa::path( @@ -148,6 +148,7 @@ pub async fn create_file( responses( (status = 200, description = "File updated", body = crate::db::models::File), (status = 404, description = "File not found", body = ApiError), + (status = 409, description = "Version conflict", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), @@ -166,14 +167,62 @@ pub async fn update_file( .into_response(); }; + // Collect which fields are being updated for broadcast + let mut updated_fields = Vec::new(); + if req.name.is_some() { + updated_fields.push("name".to_string()); + } + if req.description.is_some() { + updated_fields.push("description".to_string()); + } + if req.transcript.is_some() { + updated_fields.push("transcript".to_string()); + } + if req.summary.is_some() { + updated_fields.push("summary".to_string()); + } + if req.body.is_some() { + updated_fields.push("body".to_string()); + } + match repository::update_file(pool, id, req).await { - Ok(Some(file)) => Json(file).into_response(), + Ok(Some(file)) => { + // Broadcast update notification + state.broadcast_file_update(FileUpdateNotification { + file_id: id, + version: file.version, + updated_fields, + updated_by: "user".to_string(), + }); + Json(file).into_response() + } Ok(None) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "File not found")), ) .into_response(), - Err(e) => { + Err(RepositoryError::VersionConflict { expected, actual }) => { + tracing::info!( + "Version conflict on file {}: expected {}, actual {}", + id, + expected, + actual + ); + ( + StatusCode::CONFLICT, + Json(serde_json::json!({ + "code": "VERSION_CONFLICT", + "message": format!( + "File was modified by another user. Expected version {}, actual version {}", + expected, actual + ), + "expectedVersion": expected, + "actualVersion": actual, + })), + ) + .into_response() + } + Err(RepositoryError::Database(e)) => { tracing::error!("Failed to update file {}: {}", id, e); ( StatusCode::INTERNAL_SERVER_ERROR, diff --git a/makima/src/server/handlers/listen.rs b/makima/src/server/handlers/listen.rs index 5fc5cea..a26c208 100644 --- a/makima/src/server/handlers/listen.rs +++ b/makima/src/server/handlers/listen.rs @@ -467,6 +467,7 @@ async fn handle_socket(socket: WebSocket, state: SharedState) { transcript: Some(final_entries.clone()), summary: None, body: None, + version: None, // Internal update, skip version check }).await { Ok(_) => { tracing::info!( diff --git a/makima/src/server/handlers/mod.rs b/makima/src/server/handlers/mod.rs index b13668a..c08f1bd 100644 --- a/makima/src/server/handlers/mod.rs +++ b/makima/src/server/handlers/mod.rs @@ -1,5 +1,6 @@ //! HTTP and WebSocket request handlers. pub mod chat; +pub mod file_ws; pub mod files; pub mod listen; |
