diff options
| author | soryu <soryu@soryu.co> | 2025-12-23 22:20:52 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2025-12-23 22:20:52 +0000 |
| commit | 72c2590571104b8d10e3f72d7a5b984d0b520c51 (patch) | |
| tree | 735aa03056a44a93b9abdf915545ad034ee2b597 | |
| parent | f5222a7ae5ade5589436778cb01fc0abe625b3c3 (diff) | |
| download | soryu-72c2590571104b8d10e3f72d7a5b984d0b520c51.tar.gz soryu-72c2590571104b8d10e3f72d7a5b984d0b520c51.zip | |
Add conflict notification and file update WS endpoint
| -rw-r--r-- | makima/frontend/src/components/files/ConflictNotification.tsx | 47 | ||||
| -rw-r--r-- | makima/frontend/src/components/files/UpdateNotification.tsx | 43 | ||||
| -rw-r--r-- | makima/frontend/src/hooks/useFileSubscription.ts | 154 | ||||
| -rw-r--r-- | makima/frontend/src/hooks/useFiles.ts | 23 | ||||
| -rw-r--r-- | makima/frontend/src/lib/api.ts | 30 | ||||
| -rw-r--r-- | makima/frontend/src/routes/files.tsx | 133 | ||||
| -rw-r--r-- | makima/frontend/tsconfig.tsbuildinfo | 2 | ||||
| -rw-r--r-- | makima/migrations/20241224000000_add_file_version.sql | 17 | ||||
| -rw-r--r-- | makima/src/db/models.rs | 7 | ||||
| -rw-r--r-- | makima/src/db/repository.rs | 132 | ||||
| -rw-r--r-- | makima/src/server/handlers/chat.rs | 46 | ||||
| -rw-r--r-- | makima/src/server/handlers/file_ws.rs | 163 | ||||
| -rw-r--r-- | makima/src/server/handlers/files.rs | 57 | ||||
| -rw-r--r-- | makima/src/server/handlers/listen.rs | 1 | ||||
| -rw-r--r-- | makima/src/server/handlers/mod.rs | 1 | ||||
| -rw-r--r-- | makima/src/server/mod.rs | 3 | ||||
| -rw-r--r-- | makima/src/server/state.rs | 30 |
17 files changed, 840 insertions, 49 deletions
diff --git a/makima/frontend/src/components/files/ConflictNotification.tsx b/makima/frontend/src/components/files/ConflictNotification.tsx new file mode 100644 index 0000000..5d54c32 --- /dev/null +++ b/makima/frontend/src/components/files/ConflictNotification.tsx @@ -0,0 +1,47 @@ +interface ConflictNotificationProps { + onReload: () => void; + onForceOverwrite: () => void; + onDismiss: () => void; +} + +export function ConflictNotification({ + onReload, + onForceOverwrite, + onDismiss, +}: ConflictNotificationProps) { + return ( + <div className="fixed bottom-4 right-4 max-w-md p-4 bg-[#1a2332] border border-yellow-500/50 shadow-lg z-50"> + <div className="flex items-start gap-3"> + <div className="text-yellow-500 text-xl font-bold">!</div> + <div className="flex-1"> + <h3 className="font-mono text-sm text-[#9bc3ff] font-semibold mb-1"> + Conflict Detected + </h3> + <p className="font-mono text-xs text-white/70 mb-3"> + This file was modified elsewhere. Your changes could not be saved. + </p> + <div className="flex gap-2"> + <button + onClick={onReload} + className="px-3 py-1 font-mono text-xs text-[#9bc3ff] border border-[rgba(117,170,252,0.25)] hover:border-[#3f6fb3] transition-colors" + > + Reload Latest + </button> + <button + onClick={onForceOverwrite} + className="px-3 py-1 font-mono text-xs text-red-400 border border-red-400/25 hover:border-red-400/50 transition-colors" + > + Force Save + </button> + <button + onClick={onDismiss} + className="px-3 py-1 font-mono text-xs text-[#555] hover:text-white/70 transition-colors" + > + Dismiss + </button> + </div> + </div> + </div> + </div> + ); +} diff --git a/makima/frontend/src/components/files/UpdateNotification.tsx b/makima/frontend/src/components/files/UpdateNotification.tsx new file mode 100644 index 0000000..92b2b15 --- /dev/null +++ b/makima/frontend/src/components/files/UpdateNotification.tsx @@ -0,0 +1,43 @@ +interface UpdateNotificationProps { + updatedBy: "user" | "llm" | "system"; + onRefresh: () => void; + onDismiss: () => void; +} + +export function UpdateNotification({ + updatedBy, + onRefresh, + onDismiss, +}: UpdateNotificationProps) { + const source = updatedBy === "llm" ? "AI assistant" : "another session"; + + return ( + <div className="fixed bottom-4 right-4 max-w-md p-4 bg-[#1a2332] border border-[#3f6fb3]/50 shadow-lg z-50"> + <div className="flex items-start gap-3"> + <div className="text-[#75aafc] text-xl font-bold">i</div> + <div className="flex-1"> + <h3 className="font-mono text-sm text-[#9bc3ff] font-semibold mb-1"> + File Updated + </h3> + <p className="font-mono text-xs text-white/70 mb-3"> + This file was updated by {source}. + </p> + <div className="flex gap-2"> + <button + onClick={onRefresh} + className="px-3 py-1 font-mono text-xs text-[#9bc3ff] border border-[rgba(117,170,252,0.25)] hover:border-[#3f6fb3] transition-colors" + > + Refresh Now + </button> + <button + onClick={onDismiss} + className="px-3 py-1 font-mono text-xs text-[#555] hover:text-white/70 transition-colors" + > + Dismiss + </button> + </div> + </div> + </div> + </div> + ); +} diff --git a/makima/frontend/src/hooks/useFileSubscription.ts b/makima/frontend/src/hooks/useFileSubscription.ts new file mode 100644 index 0000000..7260b96 --- /dev/null +++ b/makima/frontend/src/hooks/useFileSubscription.ts @@ -0,0 +1,154 @@ +import { useState, useCallback, useRef, useEffect } from "react"; +import { FILE_SUBSCRIBE_ENDPOINT } from "../lib/api"; + +export interface FileUpdateEvent { + fileId: string; + version: number; + updatedFields: string[]; + updatedBy: "user" | "llm" | "system"; +} + +interface UseFileSubscriptionOptions { + fileId: string | null; + onUpdate?: (event: FileUpdateEvent) => void; + onError?: (error: string) => void; +} + +export function useFileSubscription(options: UseFileSubscriptionOptions) { + const { fileId, onUpdate, onError } = options; + const [connected, setConnected] = useState(false); + const wsRef = useRef<WebSocket | null>(null); + const reconnectTimeoutRef = useRef<number | null>(null); + const subscribedFileRef = useRef<string | null>(null); + + // Store callbacks in refs to avoid re-connecting when callbacks change + const callbacksRef = useRef({ onUpdate, onError }); + useEffect(() => { + callbacksRef.current = { onUpdate, onError }; + }, [onUpdate, onError]); + + const connect = useCallback(() => { + if (wsRef.current?.readyState === WebSocket.OPEN) return; + + try { + const ws = new WebSocket(FILE_SUBSCRIBE_ENDPOINT); + wsRef.current = ws; + + ws.onopen = () => { + setConnected(true); + // Re-subscribe if we had a subscription + if (subscribedFileRef.current) { + ws.send( + JSON.stringify({ + type: "subscribe", + fileId: subscribedFileRef.current, + }) + ); + } + }; + + ws.onmessage = (event) => { + try { + const message = JSON.parse(event.data); + + if (message.type === "fileUpdated") { + callbacksRef.current.onUpdate?.({ + fileId: message.fileId, + version: message.version, + updatedFields: message.updatedFields, + updatedBy: message.updatedBy, + }); + } else if (message.type === "error") { + callbacksRef.current.onError?.(message.message); + } + } catch (e) { + console.error("Failed to parse file subscription message:", e); + } + }; + + ws.onerror = () => { + callbacksRef.current.onError?.("WebSocket connection error"); + }; + + ws.onclose = () => { + setConnected(false); + wsRef.current = null; + + // Attempt reconnection after 3 seconds if we still have a subscription + if (subscribedFileRef.current) { + reconnectTimeoutRef.current = window.setTimeout(() => { + connect(); + }, 3000); + } + }; + } catch (e) { + callbacksRef.current.onError?.( + e instanceof Error ? e.message : "Failed to connect" + ); + } + }, []); + + const subscribe = useCallback( + (id: string) => { + subscribedFileRef.current = id; + + if (wsRef.current?.readyState === WebSocket.OPEN) { + wsRef.current.send( + JSON.stringify({ + type: "subscribe", + fileId: id, + }) + ); + } else { + connect(); + } + }, + [connect] + ); + + const unsubscribe = useCallback(() => { + if ( + subscribedFileRef.current && + wsRef.current?.readyState === WebSocket.OPEN + ) { + wsRef.current.send( + JSON.stringify({ + type: "unsubscribe", + fileId: subscribedFileRef.current, + }) + ); + } + subscribedFileRef.current = null; + }, []); + + // Auto-subscribe when fileId changes + useEffect(() => { + if (fileId) { + subscribe(fileId); + } else { + unsubscribe(); + } + + return () => { + unsubscribe(); + }; + }, [fileId, subscribe, unsubscribe]); + + // Cleanup on unmount + useEffect(() => { + return () => { + if (reconnectTimeoutRef.current) { + clearTimeout(reconnectTimeoutRef.current); + } + if (wsRef.current) { + wsRef.current.close(); + } + }; + }, []); + + return { + connected, + subscribe, + unsubscribe, + }; +} diff --git a/makima/frontend/src/hooks/useFiles.ts b/makima/frontend/src/hooks/useFiles.ts index aacbb6a..1998357 100644 --- a/makima/frontend/src/hooks/useFiles.ts +++ b/makima/frontend/src/hooks/useFiles.ts @@ -5,16 +5,24 @@ import { createFile, updateFile, deleteFile, + VersionConflictError, type FileSummary, type FileDetail, type CreateFileRequest, type UpdateFileRequest, } from "../lib/api"; +export interface ConflictState { + hasConflict: boolean; + expectedVersion: number; + actualVersion: number; +} + export function useFiles() { const [files, setFiles] = useState<FileSummary[]>([]); const [loading, setLoading] = useState(false); const [error, setError] = useState<string | null>(null); + const [conflict, setConflict] = useState<ConflictState | null>(null); const fetchFiles = useCallback(async () => { setLoading(true); @@ -60,11 +68,20 @@ export function useFiles() { const editFile = useCallback( async (id: string, data: UpdateFileRequest): Promise<FileDetail | null> => { setError(null); + setConflict(null); try { const file = await updateFile(id, data); await fetchFiles(); // Refresh list return file; } catch (e) { + if (e instanceof VersionConflictError) { + setConflict({ + hasConflict: true, + expectedVersion: e.expectedVersion, + actualVersion: e.actualVersion, + }); + return null; + } setError(e instanceof Error ? e.message : "Failed to update file"); return null; } @@ -72,6 +89,10 @@ export function useFiles() { [fetchFiles] ); + const clearConflict = useCallback(() => { + setConflict(null); + }, []); + const removeFile = useCallback( async (id: string): Promise<boolean> => { setError(null); @@ -96,6 +117,8 @@ export function useFiles() { files, loading, error, + conflict, + clearConflict, fetchFiles, fetchFile, saveFile, diff --git a/makima/frontend/src/lib/api.ts b/makima/frontend/src/lib/api.ts index 6f7071d..f1e3a9f 100644 --- a/makima/frontend/src/lib/api.ts +++ b/makima/frontend/src/lib/api.ts @@ -34,6 +34,7 @@ const env = detectEnvironment(); export const API_BASE = API_CONFIG[env].http; export const WS_BASE = API_CONFIG[env].ws; export const LISTEN_ENDPOINT = `${WS_BASE}/api/v1/listen`; +export const FILE_SUBSCRIBE_ENDPOINT = `${WS_BASE}/api/v1/files/subscribe`; export function getEnvironment(): Environment { return env; @@ -71,6 +72,7 @@ export interface FileSummary { description: string | null; transcriptCount: number; duration: number | null; + version: number; createdAt: string; updatedAt: string; } @@ -84,6 +86,7 @@ export interface FileDetail { location: string | null; summary: string | null; body: BodyElement[]; + version: number; createdAt: string; updatedAt: string; } @@ -106,6 +109,27 @@ export interface UpdateFileRequest { transcript?: TranscriptEntry[]; summary?: string; body?: BodyElement[]; + version?: number; +} + +// Conflict error types +export interface ConflictErrorResponse { + code: "VERSION_CONFLICT"; + message: string; + expectedVersion: number; + actualVersion: number; +} + +export class VersionConflictError extends Error { + expectedVersion: number; + actualVersion: number; + + constructor(conflict: ConflictErrorResponse) { + super(conflict.message); + this.name = "VersionConflictError"; + this.expectedVersion = conflict.expectedVersion; + this.actualVersion = conflict.actualVersion; + } } // Available LLM models @@ -170,6 +194,12 @@ export async function updateFile( headers: { "Content-Type": "application/json" }, body: JSON.stringify(data), }); + + if (res.status === 409) { + const conflict = (await res.json()) as ConflictErrorResponse; + throw new VersionConflictError(conflict); + } + if (!res.ok) { throw new Error(`Failed to update file: ${res.statusText}`); } diff --git a/makima/frontend/src/routes/files.tsx b/makima/frontend/src/routes/files.tsx index 79544c5..423baa1 100644 --- a/makima/frontend/src/routes/files.tsx +++ b/makima/frontend/src/routes/files.tsx @@ -1,33 +1,71 @@ -import { useState, useCallback, useEffect } from "react"; +import { useState, useCallback, useEffect, useRef } from "react"; import { useParams, useNavigate } from "react-router"; import { Masthead } from "../components/Masthead"; import { FileList } from "../components/files/FileList"; import { FileDetail } from "../components/files/FileDetail"; import { CliInput } from "../components/files/CliInput"; +import { ConflictNotification } from "../components/files/ConflictNotification"; +import { UpdateNotification } from "../components/files/UpdateNotification"; import { useFiles } from "../hooks/useFiles"; +import { + useFileSubscription, + type FileUpdateEvent, +} from "../hooks/useFileSubscription"; import type { FileDetail as FileDetailType, BodyElement } from "../lib/api"; export default function FilesPage() { const { id } = useParams<{ id: string }>(); const navigate = useNavigate(); - const { files, loading, error, fetchFile, editFile, removeFile, saveFile } = useFiles(); + const { files, loading, error, conflict, clearConflict, fetchFile, editFile, removeFile, saveFile } = useFiles(); const [fileDetail, setFileDetail] = useState<FileDetailType | null>(null); const [detailLoading, setDetailLoading] = useState(false); const [creating, setCreating] = useState(false); + const [remoteUpdate, setRemoteUpdate] = useState<FileUpdateEvent | null>(null); + const [hasLocalChanges, setHasLocalChanges] = useState(false); + const pendingUpdateRef = useRef(false); // Load file detail when URL has an id useEffect(() => { if (id) { setDetailLoading(true); + setHasLocalChanges(false); fetchFile(id).then((detail) => { setFileDetail(detail); setDetailLoading(false); }); } else { setFileDetail(null); + setHasLocalChanges(false); } }, [id, fetchFile]); + // Handle file update events from WebSocket + const handleFileUpdate = useCallback( + async (event: FileUpdateEvent) => { + // Ignore our own updates + if (pendingUpdateRef.current) { + pendingUpdateRef.current = false; + return; + } + + // If no local changes, auto-refresh + if (!hasLocalChanges) { + const detail = await fetchFile(event.fileId); + setFileDetail(detail); + } else { + // Show notification about remote update + setRemoteUpdate(event); + } + }, + [hasLocalChanges, fetchFile] + ); + + // Subscribe to file updates + useFileSubscription({ + fileId: id || null, + onUpdate: handleFileUpdate, + }); + const handleSelectFile = useCallback( (fileId: string) => { navigate(`/files/${fileId}`); @@ -53,11 +91,15 @@ export default function FilesPage() { const handleSave = useCallback( async (fileId: string, name: string, description: string) => { - await editFile(fileId, { name, description }); - const detail = await fetchFile(fileId); - setFileDetail(detail); + if (!fileDetail) return; + pendingUpdateRef.current = true; + const result = await editFile(fileId, { name, description, version: fileDetail.version }); + if (result) { + setFileDetail(result); + setHasLocalChanges(false); + } }, - [editFile, fetchFile] + [editFile, fileDetail] ); const handleBodyUpdate = useCallback( @@ -85,9 +127,15 @@ export default function FilesPage() { ...fileDetail, body: newBody, }); + setHasLocalChanges(true); - // Save to backend - await editFile(id, { body: newBody }); + // Save to backend with version for optimistic locking + pendingUpdateRef.current = true; + const result = await editFile(id, { body: newBody, version: fileDetail.version }); + if (result) { + setFileDetail(result); + setHasLocalChanges(false); + } } }, [fileDetail, id, editFile] @@ -106,9 +154,15 @@ export default function FilesPage() { ...fileDetail, body: newBody, }); + setHasLocalChanges(true); - // Save to backend - await editFile(id, { body: newBody }); + // Save to backend with version for optimistic locking + pendingUpdateRef.current = true; + const result = await editFile(id, { body: newBody, version: fileDetail.version }); + if (result) { + setFileDetail(result); + setHasLocalChanges(false); + } } }, [fileDetail, id, editFile] @@ -130,6 +184,47 @@ export default function FilesPage() { } }, [creating, saveFile, navigate]); + // Conflict resolution handlers + const handleConflictReload = useCallback(async () => { + if (id) { + clearConflict(); + const detail = await fetchFile(id); + setFileDetail(detail); + setHasLocalChanges(false); + } + }, [id, clearConflict, fetchFile]); + + const handleConflictForceOverwrite = useCallback(async () => { + if (id && fileDetail) { + clearConflict(); + // Fetch latest version first + const latest = await fetchFile(id); + if (latest) { + // Retry with latest version + pendingUpdateRef.current = true; + const result = await editFile(id, { body: fileDetail.body, version: latest.version }); + if (result) { + setFileDetail(result); + setHasLocalChanges(false); + } + } + } + }, [id, fileDetail, clearConflict, fetchFile, editFile]); + + // Remote update handlers + const handleRemoteUpdateRefresh = useCallback(async () => { + if (id) { + const detail = await fetchFile(id); + setFileDetail(detail); + setRemoteUpdate(null); + setHasLocalChanges(false); + } + }, [id, fetchFile]); + + const handleRemoteUpdateDismiss = useCallback(() => { + setRemoteUpdate(null); + }, []); + return ( <div className="relative z-10 h-screen flex flex-col overflow-hidden"> <Masthead showTicker={false} showNav /> @@ -172,6 +267,24 @@ export default function FilesPage() { /> )} </main> + + {/* Conflict notification */} + {conflict?.hasConflict && ( + <ConflictNotification + onReload={handleConflictReload} + onForceOverwrite={handleConflictForceOverwrite} + onDismiss={clearConflict} + /> + )} + + {/* Remote update notification */} + {remoteUpdate && ( + <UpdateNotification + updatedBy={remoteUpdate.updatedBy} + onRefresh={handleRemoteUpdateRefresh} + onDismiss={handleRemoteUpdateDismiss} + /> + )} </div> ); } diff --git a/makima/frontend/tsconfig.tsbuildinfo b/makima/frontend/tsconfig.tsbuildinfo index b2542f9..bda8af0 100644 --- a/makima/frontend/tsconfig.tsbuildinfo +++ b/makima/frontend/tsconfig.tsbuildinfo @@ -1 +1 @@ -{"root":["./src/main.tsx","./src/vite-env.d.ts","./src/components/gridoverlay.tsx","./src/components/logo.tsx","./src/components/masthead.tsx","./src/components/navstrip.tsx","./src/components/rewritelink.tsx","./src/components/charts/chartrenderer.tsx","./src/components/files/bodyrenderer.tsx","./src/components/files/cliinput.tsx","./src/components/files/filedetail.tsx","./src/components/files/filelist.tsx","./src/components/listen/controlpanel.tsx","./src/components/listen/speakerpanel.tsx","./src/components/listen/transcriptpanel.tsx","./src/hooks/usefiles.ts","./src/hooks/usemicrophone.ts","./src/hooks/usetextscramble.ts","./src/hooks/usewebsocket.ts","./src/lib/api.ts","./src/routes/_index.tsx","./src/routes/files.tsx","./src/routes/listen.tsx","./src/types/messages.ts"],"version":"5.9.3"}
\ No newline at end of file +{"root":["./src/main.tsx","./src/vite-env.d.ts","./src/components/gridoverlay.tsx","./src/components/logo.tsx","./src/components/masthead.tsx","./src/components/navstrip.tsx","./src/components/rewritelink.tsx","./src/components/charts/chartrenderer.tsx","./src/components/files/bodyrenderer.tsx","./src/components/files/cliinput.tsx","./src/components/files/conflictnotification.tsx","./src/components/files/filedetail.tsx","./src/components/files/filelist.tsx","./src/components/files/updatenotification.tsx","./src/components/listen/controlpanel.tsx","./src/components/listen/speakerpanel.tsx","./src/components/listen/transcriptpanel.tsx","./src/hooks/usefilesubscription.ts","./src/hooks/usefiles.ts","./src/hooks/usemicrophone.ts","./src/hooks/usetextscramble.ts","./src/hooks/usewebsocket.ts","./src/lib/api.ts","./src/routes/_index.tsx","./src/routes/files.tsx","./src/routes/listen.tsx","./src/types/messages.ts"],"version":"5.9.3"}
\ No newline at end of file diff --git a/makima/migrations/20241224000000_add_file_version.sql b/makima/migrations/20241224000000_add_file_version.sql new file mode 100644 index 0000000..1505983 --- /dev/null +++ b/makima/migrations/20241224000000_add_file_version.sql @@ -0,0 +1,17 @@ +-- Add version column for optimistic locking +ALTER TABLE files ADD COLUMN version INTEGER NOT NULL DEFAULT 1; + +-- Create trigger function to increment version on update +CREATE OR REPLACE FUNCTION increment_file_version() +RETURNS TRIGGER AS $$ +BEGIN + NEW.version = OLD.version + 1; + RETURN NEW; +END; +$$ language 'plpgsql'; + +-- Create trigger that fires before each update +CREATE TRIGGER increment_files_version + BEFORE UPDATE ON files + FOR EACH ROW + EXECUTE FUNCTION increment_file_version(); diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index 135ae75..8204b86 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -68,6 +68,8 @@ pub struct File { /// Structured body content (headings, paragraphs, charts) #[sqlx(json)] pub body: Vec<BodyElement>, + /// Version number for optimistic locking + pub version: i32, pub created_at: DateTime<Utc>, pub updated_at: DateTime<Utc>, } @@ -100,6 +102,8 @@ pub struct UpdateFileRequest { pub summary: Option<String>, /// Structured body content (optional) pub body: Option<Vec<BodyElement>>, + /// Version for optimistic locking (required for updates from frontend) + pub version: Option<i32>, } /// Response for file list endpoint. @@ -120,6 +124,8 @@ pub struct FileSummary { pub transcript_count: usize, /// Duration derived from last transcript end time pub duration: Option<f32>, + /// Version number for optimistic locking + pub version: i32, pub created_at: DateTime<Utc>, pub updated_at: DateTime<Utc>, } @@ -137,6 +143,7 @@ impl From<File> for FileSummary { description: file.description, transcript_count: file.transcript.len(), duration: if duration > 0.0 { Some(duration) } else { None }, + version: file.version, created_at: file.created_at, updated_at: file.updated_at, } diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index f8b90b3..5b962ee 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -9,6 +9,43 @@ use super::models::{CreateFileRequest, File, UpdateFileRequest}; /// Default owner ID for anonymous users. pub const ANONYMOUS_OWNER_ID: Uuid = Uuid::from_u128(0x00000000_0000_0000_0000_000000000002); +/// Repository error types. +#[derive(Debug)] +pub enum RepositoryError { + /// Database error + Database(sqlx::Error), + /// Version conflict (optimistic locking failure) + VersionConflict { + /// The version the client expected + expected: i32, + /// The actual current version in the database + actual: i32, + }, +} + +impl From<sqlx::Error> for RepositoryError { + fn from(e: sqlx::Error) -> Self { + RepositoryError::Database(e) + } +} + +impl std::fmt::Display for RepositoryError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RepositoryError::Database(e) => write!(f, "Database error: {}", e), + RepositoryError::VersionConflict { expected, actual } => { + write!( + f, + "Version conflict: expected {}, actual {}", + expected, actual + ) + } + } + } +} + +impl std::error::Error for RepositoryError {} + /// Generate a default name based on current timestamp. fn generate_default_name() -> String { let now = Utc::now(); @@ -25,7 +62,7 @@ pub async fn create_file(pool: &PgPool, req: CreateFileRequest) -> Result<File, r#" INSERT INTO files (owner_id, name, description, transcript, location, summary, body) VALUES ($1, $2, $3, $4, $5, NULL, $6) - RETURNING id, owner_id, name, description, transcript, location, summary, body, created_at, updated_at + RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at "#, ) .bind(ANONYMOUS_OWNER_ID) @@ -42,7 +79,7 @@ pub async fn create_file(pool: &PgPool, req: CreateFileRequest) -> Result<File, pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Error> { sqlx::query_as::<_, File>( r#" - SELECT id, owner_id, name, description, transcript, location, summary, body, created_at, updated_at + SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at FROM files WHERE id = $1 AND owner_id = $2 "#, @@ -57,7 +94,7 @@ pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Err pub async fn list_files(pool: &PgPool) -> Result<Vec<File>, sqlx::Error> { sqlx::query_as::<_, File>( r#" - SELECT id, owner_id, name, description, transcript, location, summary, body, created_at, updated_at + SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at FROM files WHERE owner_id = $1 ORDER BY created_at DESC @@ -68,18 +105,33 @@ pub async fn list_files(pool: &PgPool) -> Result<Vec<File>, sqlx::Error> { .await } -/// Update a file by ID. +/// Update a file by ID with optimistic locking. +/// +/// If `req.version` is provided, the update will only succeed if the current +/// version matches. Returns `RepositoryError::VersionConflict` if there's a mismatch. +/// +/// If `req.version` is None (e.g., internal system updates), version checking is skipped. pub async fn update_file( pool: &PgPool, id: Uuid, req: UpdateFileRequest, -) -> Result<Option<File>, sqlx::Error> { +) -> Result<Option<File>, RepositoryError> { // Get the existing file first let existing = get_file(pool, id).await?; let Some(existing) = existing else { return Ok(None); }; + // Check version if provided (optimistic locking) + if let Some(expected_version) = req.version { + if existing.version != expected_version { + return Err(RepositoryError::VersionConflict { + expected: expected_version, + actual: existing.version, + }); + } + } + // Apply updates let name = req.name.unwrap_or(existing.name); let description = req.description.or(existing.description); @@ -89,23 +141,59 @@ pub async fn update_file( let body = req.body.unwrap_or(existing.body); let body_json = serde_json::to_value(&body).unwrap_or_default(); - sqlx::query_as::<_, File>( - r#" - UPDATE files - SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() - WHERE id = $1 AND owner_id = $2 - RETURNING id, owner_id, name, description, transcript, location, summary, body, created_at, updated_at - "#, - ) - .bind(id) - .bind(ANONYMOUS_OWNER_ID) - .bind(&name) - .bind(&description) - .bind(&transcript_json) - .bind(&summary) - .bind(&body_json) - .fetch_optional(pool) - .await + // Update with version check in WHERE clause for race condition safety + let result = if req.version.is_some() { + sqlx::query_as::<_, File>( + r#" + UPDATE files + SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() + WHERE id = $1 AND owner_id = $2 AND version = $8 + RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + "#, + ) + .bind(id) + .bind(ANONYMOUS_OWNER_ID) + .bind(&name) + .bind(&description) + .bind(&transcript_json) + .bind(&summary) + .bind(&body_json) + .bind(req.version.unwrap()) + .fetch_optional(pool) + .await? + } else { + // No version check for internal updates + sqlx::query_as::<_, File>( + r#" + UPDATE files + SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW() + WHERE id = $1 AND owner_id = $2 + RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at + "#, + ) + .bind(id) + .bind(ANONYMOUS_OWNER_ID) + .bind(&name) + .bind(&description) + .bind(&transcript_json) + .bind(&summary) + .bind(&body_json) + .fetch_optional(pool) + .await? + }; + + // If versioned update returned None, there was a race condition + if result.is_none() && req.version.is_some() { + // Re-fetch to get the actual version + if let Some(current) = get_file(pool, id).await? { + return Err(RepositoryError::VersionConflict { + expected: req.version.unwrap(), + actual: current.version, + }); + } + } + + Ok(result) } /// Delete a file by ID. diff --git a/makima/src/server/handlers/chat.rs b/makima/src/server/handlers/chat.rs index 92c4ec8..3bdbc74 100644 --- a/makima/src/server/handlers/chat.rs +++ b/makima/src/server/handlers/chat.rs @@ -17,7 +17,7 @@ use crate::llm::{ groq::{GroqClient, GroqError, Message, ToolCallResponse}, LlmModel, ToolCall, ToolResult, AVAILABLE_TOOLS, }; -use crate::server::state::SharedState; +use crate::server::state::{FileUpdateNotification, SharedState}; /// Maximum number of tool-calling rounds to prevent infinite loops const MAX_TOOL_ROUNDS: usize = 10; @@ -385,17 +385,43 @@ pub async fn chat_handler( transcript: None, summary: current_summary.clone(), body: Some(current_body.clone()), + version: None, // Internal update, skip version check }; - if let Err(e) = repository::update_file(pool, id, update_req).await { - tracing::error!("Failed to save file changes: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ - "error": format!("Failed to save changes: {}", e) - })), - ) - .into_response(); + match repository::update_file(pool, id, update_req).await { + Ok(Some(updated_file)) => { + // Broadcast update notification for LLM changes + let mut updated_fields = vec!["body".to_string()]; + if current_summary.is_some() { + updated_fields.push("summary".to_string()); + } + state.broadcast_file_update(FileUpdateNotification { + file_id: id, + version: updated_file.version, + updated_fields, + updated_by: "llm".to_string(), + }); + } + Ok(None) => { + // File was deleted during processing + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({ + "error": "File not found" + })), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to save file changes: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({ + "error": format!("Failed to save changes: {}", e) + })), + ) + .into_response(); + } } } diff --git a/makima/src/server/handlers/file_ws.rs b/makima/src/server/handlers/file_ws.rs new file mode 100644 index 0000000..5a44309 --- /dev/null +++ b/makima/src/server/handlers/file_ws.rs @@ -0,0 +1,163 @@ +//! WebSocket handler for file change subscriptions. +//! +//! Clients can subscribe to specific files and receive real-time notifications +//! when those files are updated by any source (user edits, LLM modifications, etc.). + +use axum::{ + extract::{ws::Message, ws::WebSocket, State, WebSocketUpgrade}, + response::Response, +}; +use futures::{SinkExt, StreamExt}; +use serde::{Deserialize, Serialize}; +use std::collections::HashSet; +use uuid::Uuid; + +use crate::server::state::SharedState; + +/// Client message for file subscription management. +#[derive(Debug, Clone, Deserialize)] +#[serde(tag = "type", rename_all = "camelCase")] +pub enum FileClientMessage { + /// Subscribe to updates for a specific file + Subscribe { + #[serde(rename = "fileId")] + file_id: Uuid, + }, + /// Unsubscribe from updates for a specific file + Unsubscribe { + #[serde(rename = "fileId")] + file_id: Uuid, + }, +} + +/// Server message for file subscription WebSocket. +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "type", rename_all = "camelCase")] +pub enum FileServerMessage { + /// Subscription confirmed + Subscribed { + #[serde(rename = "fileId")] + file_id: Uuid, + }, + /// Unsubscription confirmed + Unsubscribed { + #[serde(rename = "fileId")] + file_id: Uuid, + }, + /// File was updated + FileUpdated { + #[serde(rename = "fileId")] + file_id: Uuid, + version: i32, + #[serde(rename = "updatedFields")] + updated_fields: Vec<String>, + #[serde(rename = "updatedBy")] + updated_by: String, + }, + /// Error occurred + Error { code: String, message: String }, +} + +/// WebSocket upgrade handler for file subscriptions. +#[utoipa::path( + get, + path = "/api/v1/files/subscribe", + responses( + (status = 101, description = "WebSocket connection established"), + ), + tag = "Files" +)] +pub async fn file_subscription_handler( + ws: WebSocketUpgrade, + State(state): State<SharedState>, +) -> Response { + ws.on_upgrade(|socket| handle_file_subscription(socket, state)) +} + +async fn handle_file_subscription(socket: WebSocket, state: SharedState) { + let (mut sender, mut receiver) = socket.split(); + + // Set of file IDs this client is subscribed to + let mut subscriptions: HashSet<Uuid> = HashSet::new(); + + // Subscribe to the broadcast channel + let mut broadcast_rx = state.file_updates.subscribe(); + + loop { + tokio::select! { + // Handle incoming WebSocket messages from client + msg = receiver.next() => { + match msg { + Some(Ok(Message::Text(text))) => { + match serde_json::from_str::<FileClientMessage>(&text) { + Ok(FileClientMessage::Subscribe { file_id }) => { + subscriptions.insert(file_id); + let response = FileServerMessage::Subscribed { file_id }; + let json = serde_json::to_string(&response).unwrap(); + if sender.send(Message::Text(json.into())).await.is_err() { + break; + } + tracing::debug!("Client subscribed to file {}", file_id); + } + Ok(FileClientMessage::Unsubscribe { file_id }) => { + subscriptions.remove(&file_id); + let response = FileServerMessage::Unsubscribed { file_id }; + let json = serde_json::to_string(&response).unwrap(); + if sender.send(Message::Text(json.into())).await.is_err() { + break; + } + tracing::debug!("Client unsubscribed from file {}", file_id); + } + Err(e) => { + let response = FileServerMessage::Error { + code: "PARSE_ERROR".into(), + message: e.to_string(), + }; + let json = serde_json::to_string(&response).unwrap(); + let _ = sender.send(Message::Text(json.into())).await; + } + } + } + Some(Ok(Message::Close(_))) | None => { + tracing::debug!("Client disconnected from file subscription"); + break; + } + Some(Err(e)) => { + tracing::warn!("WebSocket error: {}", e); + break; + } + _ => {} + } + } + + // Handle broadcast notifications + notification = broadcast_rx.recv() => { + match notification { + Ok(notification) => { + // Only forward if client is subscribed to this file + if subscriptions.contains(¬ification.file_id) { + let response = FileServerMessage::FileUpdated { + file_id: notification.file_id, + version: notification.version, + updated_fields: notification.updated_fields, + updated_by: notification.updated_by, + }; + let json = serde_json::to_string(&response).unwrap(); + if sender.send(Message::Text(json.into())).await.is_err() { + break; + } + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + // Client is too slow, skip some messages + tracing::warn!("File subscription client lagged, skipped {} messages", n); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + // Channel closed, exit + break; + } + } + } + } + } +} diff --git a/makima/src/server/handlers/files.rs b/makima/src/server/handlers/files.rs index 746d66b..c65eed5 100644 --- a/makima/src/server/handlers/files.rs +++ b/makima/src/server/handlers/files.rs @@ -9,9 +9,9 @@ use axum::{ use uuid::Uuid; use crate::db::models::{CreateFileRequest, FileListResponse, FileSummary, UpdateFileRequest}; -use crate::db::repository; +use crate::db::repository::{self, RepositoryError}; use crate::server::messages::ApiError; -use crate::server::state::SharedState; +use crate::server::state::{FileUpdateNotification, SharedState}; /// List all files for the current owner. #[utoipa::path( @@ -148,6 +148,7 @@ pub async fn create_file( responses( (status = 200, description = "File updated", body = crate::db::models::File), (status = 404, description = "File not found", body = ApiError), + (status = 409, description = "Version conflict", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), @@ -166,14 +167,62 @@ pub async fn update_file( .into_response(); }; + // Collect which fields are being updated for broadcast + let mut updated_fields = Vec::new(); + if req.name.is_some() { + updated_fields.push("name".to_string()); + } + if req.description.is_some() { + updated_fields.push("description".to_string()); + } + if req.transcript.is_some() { + updated_fields.push("transcript".to_string()); + } + if req.summary.is_some() { + updated_fields.push("summary".to_string()); + } + if req.body.is_some() { + updated_fields.push("body".to_string()); + } + match repository::update_file(pool, id, req).await { - Ok(Some(file)) => Json(file).into_response(), + Ok(Some(file)) => { + // Broadcast update notification + state.broadcast_file_update(FileUpdateNotification { + file_id: id, + version: file.version, + updated_fields, + updated_by: "user".to_string(), + }); + Json(file).into_response() + } Ok(None) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "File not found")), ) .into_response(), - Err(e) => { + Err(RepositoryError::VersionConflict { expected, actual }) => { + tracing::info!( + "Version conflict on file {}: expected {}, actual {}", + id, + expected, + actual + ); + ( + StatusCode::CONFLICT, + Json(serde_json::json!({ + "code": "VERSION_CONFLICT", + "message": format!( + "File was modified by another user. Expected version {}, actual version {}", + expected, actual + ), + "expectedVersion": expected, + "actualVersion": actual, + })), + ) + .into_response() + } + Err(RepositoryError::Database(e)) => { tracing::error!("Failed to update file {}: {}", id, e); ( StatusCode::INTERNAL_SERVER_ERROR, diff --git a/makima/src/server/handlers/listen.rs b/makima/src/server/handlers/listen.rs index 5fc5cea..a26c208 100644 --- a/makima/src/server/handlers/listen.rs +++ b/makima/src/server/handlers/listen.rs @@ -467,6 +467,7 @@ async fn handle_socket(socket: WebSocket, state: SharedState) { transcript: Some(final_entries.clone()), summary: None, body: None, + version: None, // Internal update, skip version check }).await { Ok(_) => { tracing::info!( diff --git a/makima/src/server/handlers/mod.rs b/makima/src/server/handlers/mod.rs index b13668a..c08f1bd 100644 --- a/makima/src/server/handlers/mod.rs +++ b/makima/src/server/handlers/mod.rs @@ -1,5 +1,6 @@ //! HTTP and WebSocket request handlers. pub mod chat; +pub mod file_ws; pub mod files; pub mod listen; diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index a8f98a6..f132cf4 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -17,7 +17,7 @@ use tower_http::trace::TraceLayer; use utoipa::OpenApi; use utoipa_swagger_ui::SwaggerUi; -use crate::server::handlers::{chat, files, listen}; +use crate::server::handlers::{chat, file_ws, files, listen}; use crate::server::openapi::ApiDoc; use crate::server::state::SharedState; @@ -43,6 +43,7 @@ pub fn make_router(state: SharedState) -> Router { // API v1 routes let api_v1 = Router::new() .route("/listen", get(listen::websocket_handler)) + .route("/files/subscribe", get(file_ws::file_subscription_handler)) .route("/files", get(files::list_files).post(files::create_file)) .route( "/files/{id}", diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs index 8cdc26c..239ab77 100644 --- a/makima/src/server/state.rs +++ b/makima/src/server/state.rs @@ -2,10 +2,24 @@ use std::sync::Arc; use sqlx::PgPool; -use tokio::sync::Mutex; +use tokio::sync::{broadcast, Mutex}; +use uuid::Uuid; use crate::listen::{DiarizationConfig, ParakeetEOU, ParakeetTDT, Sortformer}; +/// Notification payload for file updates (broadcast to WebSocket subscribers). +#[derive(Debug, Clone)] +pub struct FileUpdateNotification { + /// ID of the updated file + pub file_id: Uuid, + /// New version number after update + pub version: i32, + /// List of fields that were updated + pub updated_fields: Vec<String>, + /// Source of the update: "user", "llm", or "system" + pub updated_by: String, +} + /// Shared application state containing ML models and database pool. /// /// Models are wrapped in `Mutex` for thread-safe mutable access during inference. @@ -18,6 +32,8 @@ pub struct AppState { pub sortformer: Mutex<Sortformer>, /// Optional database connection pool pub db_pool: Option<PgPool>, + /// Broadcast channel for file update notifications + pub file_updates: broadcast::Sender<FileUpdateNotification>, } impl AppState { @@ -40,11 +56,15 @@ impl AppState { DiarizationConfig::callhome(), )?; + // Create broadcast channel with buffer for 256 messages + let (file_updates, _) = broadcast::channel(256); + Ok(Self { parakeet: Mutex::new(parakeet), parakeet_eou: Mutex::new(parakeet_eou), sortformer: Mutex::new(sortformer), db_pool: None, + file_updates, }) } @@ -53,6 +73,14 @@ impl AppState { self.db_pool = Some(pool); self } + + /// Broadcast a file update notification to all subscribers. + /// + /// This is a no-op if there are no subscribers (ignores send errors). + pub fn broadcast_file_update(&self, notification: FileUpdateNotification) { + // Ignore send errors - they just mean no one is listening + let _ = self.file_updates.send(notification); + } } /// Type alias for the shared application state. |
