summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2025-12-23 22:20:52 +0000
committersoryu <soryu@soryu.co>2025-12-23 22:20:52 +0000
commit72c2590571104b8d10e3f72d7a5b984d0b520c51 (patch)
tree735aa03056a44a93b9abdf915545ad034ee2b597
parentf5222a7ae5ade5589436778cb01fc0abe625b3c3 (diff)
downloadsoryu-72c2590571104b8d10e3f72d7a5b984d0b520c51.tar.gz
soryu-72c2590571104b8d10e3f72d7a5b984d0b520c51.zip
Add conflict notification and file update WS endpoint
-rw-r--r--makima/frontend/src/components/files/ConflictNotification.tsx47
-rw-r--r--makima/frontend/src/components/files/UpdateNotification.tsx43
-rw-r--r--makima/frontend/src/hooks/useFileSubscription.ts154
-rw-r--r--makima/frontend/src/hooks/useFiles.ts23
-rw-r--r--makima/frontend/src/lib/api.ts30
-rw-r--r--makima/frontend/src/routes/files.tsx133
-rw-r--r--makima/frontend/tsconfig.tsbuildinfo2
-rw-r--r--makima/migrations/20241224000000_add_file_version.sql17
-rw-r--r--makima/src/db/models.rs7
-rw-r--r--makima/src/db/repository.rs132
-rw-r--r--makima/src/server/handlers/chat.rs46
-rw-r--r--makima/src/server/handlers/file_ws.rs163
-rw-r--r--makima/src/server/handlers/files.rs57
-rw-r--r--makima/src/server/handlers/listen.rs1
-rw-r--r--makima/src/server/handlers/mod.rs1
-rw-r--r--makima/src/server/mod.rs3
-rw-r--r--makima/src/server/state.rs30
17 files changed, 840 insertions, 49 deletions
diff --git a/makima/frontend/src/components/files/ConflictNotification.tsx b/makima/frontend/src/components/files/ConflictNotification.tsx
new file mode 100644
index 0000000..5d54c32
--- /dev/null
+++ b/makima/frontend/src/components/files/ConflictNotification.tsx
@@ -0,0 +1,47 @@
+interface ConflictNotificationProps {
+ onReload: () => void;
+ onForceOverwrite: () => void;
+ onDismiss: () => void;
+}
+
+export function ConflictNotification({
+ onReload,
+ onForceOverwrite,
+ onDismiss,
+}: ConflictNotificationProps) {
+ return (
+ <div className="fixed bottom-4 right-4 max-w-md p-4 bg-[#1a2332] border border-yellow-500/50 shadow-lg z-50">
+ <div className="flex items-start gap-3">
+ <div className="text-yellow-500 text-xl font-bold">!</div>
+ <div className="flex-1">
+ <h3 className="font-mono text-sm text-[#9bc3ff] font-semibold mb-1">
+ Conflict Detected
+ </h3>
+ <p className="font-mono text-xs text-white/70 mb-3">
+ This file was modified elsewhere. Your changes could not be saved.
+ </p>
+ <div className="flex gap-2">
+ <button
+ onClick={onReload}
+ className="px-3 py-1 font-mono text-xs text-[#9bc3ff] border border-[rgba(117,170,252,0.25)] hover:border-[#3f6fb3] transition-colors"
+ >
+ Reload Latest
+ </button>
+ <button
+ onClick={onForceOverwrite}
+ className="px-3 py-1 font-mono text-xs text-red-400 border border-red-400/25 hover:border-red-400/50 transition-colors"
+ >
+ Force Save
+ </button>
+ <button
+ onClick={onDismiss}
+ className="px-3 py-1 font-mono text-xs text-[#555] hover:text-white/70 transition-colors"
+ >
+ Dismiss
+ </button>
+ </div>
+ </div>
+ </div>
+ </div>
+ );
+}
diff --git a/makima/frontend/src/components/files/UpdateNotification.tsx b/makima/frontend/src/components/files/UpdateNotification.tsx
new file mode 100644
index 0000000..92b2b15
--- /dev/null
+++ b/makima/frontend/src/components/files/UpdateNotification.tsx
@@ -0,0 +1,43 @@
+interface UpdateNotificationProps {
+ updatedBy: "user" | "llm" | "system";
+ onRefresh: () => void;
+ onDismiss: () => void;
+}
+
+export function UpdateNotification({
+ updatedBy,
+ onRefresh,
+ onDismiss,
+}: UpdateNotificationProps) {
+ const source = updatedBy === "llm" ? "AI assistant" : "another session";
+
+ return (
+ <div className="fixed bottom-4 right-4 max-w-md p-4 bg-[#1a2332] border border-[#3f6fb3]/50 shadow-lg z-50">
+ <div className="flex items-start gap-3">
+ <div className="text-[#75aafc] text-xl font-bold">i</div>
+ <div className="flex-1">
+ <h3 className="font-mono text-sm text-[#9bc3ff] font-semibold mb-1">
+ File Updated
+ </h3>
+ <p className="font-mono text-xs text-white/70 mb-3">
+ This file was updated by {source}.
+ </p>
+ <div className="flex gap-2">
+ <button
+ onClick={onRefresh}
+ className="px-3 py-1 font-mono text-xs text-[#9bc3ff] border border-[rgba(117,170,252,0.25)] hover:border-[#3f6fb3] transition-colors"
+ >
+ Refresh Now
+ </button>
+ <button
+ onClick={onDismiss}
+ className="px-3 py-1 font-mono text-xs text-[#555] hover:text-white/70 transition-colors"
+ >
+ Dismiss
+ </button>
+ </div>
+ </div>
+ </div>
+ </div>
+ );
+}
diff --git a/makima/frontend/src/hooks/useFileSubscription.ts b/makima/frontend/src/hooks/useFileSubscription.ts
new file mode 100644
index 0000000..7260b96
--- /dev/null
+++ b/makima/frontend/src/hooks/useFileSubscription.ts
@@ -0,0 +1,154 @@
+import { useState, useCallback, useRef, useEffect } from "react";
+import { FILE_SUBSCRIBE_ENDPOINT } from "../lib/api";
+
+export interface FileUpdateEvent {
+ fileId: string;
+ version: number;
+ updatedFields: string[];
+ updatedBy: "user" | "llm" | "system";
+}
+
+interface UseFileSubscriptionOptions {
+ fileId: string | null;
+ onUpdate?: (event: FileUpdateEvent) => void;
+ onError?: (error: string) => void;
+}
+
+export function useFileSubscription(options: UseFileSubscriptionOptions) {
+ const { fileId, onUpdate, onError } = options;
+ const [connected, setConnected] = useState(false);
+ const wsRef = useRef<WebSocket | null>(null);
+ const reconnectTimeoutRef = useRef<number | null>(null);
+ const subscribedFileRef = useRef<string | null>(null);
+
+ // Store callbacks in refs to avoid re-connecting when callbacks change
+ const callbacksRef = useRef({ onUpdate, onError });
+ useEffect(() => {
+ callbacksRef.current = { onUpdate, onError };
+ }, [onUpdate, onError]);
+
+ const connect = useCallback(() => {
+ if (wsRef.current?.readyState === WebSocket.OPEN) return;
+
+ try {
+ const ws = new WebSocket(FILE_SUBSCRIBE_ENDPOINT);
+ wsRef.current = ws;
+
+ ws.onopen = () => {
+ setConnected(true);
+ // Re-subscribe if we had a subscription
+ if (subscribedFileRef.current) {
+ ws.send(
+ JSON.stringify({
+ type: "subscribe",
+ fileId: subscribedFileRef.current,
+ })
+ );
+ }
+ };
+
+ ws.onmessage = (event) => {
+ try {
+ const message = JSON.parse(event.data);
+
+ if (message.type === "fileUpdated") {
+ callbacksRef.current.onUpdate?.({
+ fileId: message.fileId,
+ version: message.version,
+ updatedFields: message.updatedFields,
+ updatedBy: message.updatedBy,
+ });
+ } else if (message.type === "error") {
+ callbacksRef.current.onError?.(message.message);
+ }
+ } catch (e) {
+ console.error("Failed to parse file subscription message:", e);
+ }
+ };
+
+ ws.onerror = () => {
+ callbacksRef.current.onError?.("WebSocket connection error");
+ };
+
+ ws.onclose = () => {
+ setConnected(false);
+ wsRef.current = null;
+
+ // Attempt reconnection after 3 seconds if we still have a subscription
+ if (subscribedFileRef.current) {
+ reconnectTimeoutRef.current = window.setTimeout(() => {
+ connect();
+ }, 3000);
+ }
+ };
+ } catch (e) {
+ callbacksRef.current.onError?.(
+ e instanceof Error ? e.message : "Failed to connect"
+ );
+ }
+ }, []);
+
+ const subscribe = useCallback(
+ (id: string) => {
+ subscribedFileRef.current = id;
+
+ if (wsRef.current?.readyState === WebSocket.OPEN) {
+ wsRef.current.send(
+ JSON.stringify({
+ type: "subscribe",
+ fileId: id,
+ })
+ );
+ } else {
+ connect();
+ }
+ },
+ [connect]
+ );
+
+ const unsubscribe = useCallback(() => {
+ if (
+ subscribedFileRef.current &&
+ wsRef.current?.readyState === WebSocket.OPEN
+ ) {
+ wsRef.current.send(
+ JSON.stringify({
+ type: "unsubscribe",
+ fileId: subscribedFileRef.current,
+ })
+ );
+ }
+ subscribedFileRef.current = null;
+ }, []);
+
+ // Auto-subscribe when fileId changes
+ useEffect(() => {
+ if (fileId) {
+ subscribe(fileId);
+ } else {
+ unsubscribe();
+ }
+
+ return () => {
+ unsubscribe();
+ };
+ }, [fileId, subscribe, unsubscribe]);
+
+ // Cleanup on unmount
+ useEffect(() => {
+ return () => {
+ if (reconnectTimeoutRef.current) {
+ clearTimeout(reconnectTimeoutRef.current);
+ }
+ if (wsRef.current) {
+ wsRef.current.close();
+ }
+ };
+ }, []);
+
+ return {
+ connected,
+ subscribe,
+ unsubscribe,
+ };
+}
diff --git a/makima/frontend/src/hooks/useFiles.ts b/makima/frontend/src/hooks/useFiles.ts
index aacbb6a..1998357 100644
--- a/makima/frontend/src/hooks/useFiles.ts
+++ b/makima/frontend/src/hooks/useFiles.ts
@@ -5,16 +5,24 @@ import {
createFile,
updateFile,
deleteFile,
+ VersionConflictError,
type FileSummary,
type FileDetail,
type CreateFileRequest,
type UpdateFileRequest,
} from "../lib/api";
+export interface ConflictState {
+ hasConflict: boolean;
+ expectedVersion: number;
+ actualVersion: number;
+}
+
export function useFiles() {
const [files, setFiles] = useState<FileSummary[]>([]);
const [loading, setLoading] = useState(false);
const [error, setError] = useState<string | null>(null);
+ const [conflict, setConflict] = useState<ConflictState | null>(null);
const fetchFiles = useCallback(async () => {
setLoading(true);
@@ -60,11 +68,20 @@ export function useFiles() {
const editFile = useCallback(
async (id: string, data: UpdateFileRequest): Promise<FileDetail | null> => {
setError(null);
+ setConflict(null);
try {
const file = await updateFile(id, data);
await fetchFiles(); // Refresh list
return file;
} catch (e) {
+ if (e instanceof VersionConflictError) {
+ setConflict({
+ hasConflict: true,
+ expectedVersion: e.expectedVersion,
+ actualVersion: e.actualVersion,
+ });
+ return null;
+ }
setError(e instanceof Error ? e.message : "Failed to update file");
return null;
}
@@ -72,6 +89,10 @@ export function useFiles() {
[fetchFiles]
);
+ const clearConflict = useCallback(() => {
+ setConflict(null);
+ }, []);
+
const removeFile = useCallback(
async (id: string): Promise<boolean> => {
setError(null);
@@ -96,6 +117,8 @@ export function useFiles() {
files,
loading,
error,
+ conflict,
+ clearConflict,
fetchFiles,
fetchFile,
saveFile,
diff --git a/makima/frontend/src/lib/api.ts b/makima/frontend/src/lib/api.ts
index 6f7071d..f1e3a9f 100644
--- a/makima/frontend/src/lib/api.ts
+++ b/makima/frontend/src/lib/api.ts
@@ -34,6 +34,7 @@ const env = detectEnvironment();
export const API_BASE = API_CONFIG[env].http;
export const WS_BASE = API_CONFIG[env].ws;
export const LISTEN_ENDPOINT = `${WS_BASE}/api/v1/listen`;
+export const FILE_SUBSCRIBE_ENDPOINT = `${WS_BASE}/api/v1/files/subscribe`;
export function getEnvironment(): Environment {
return env;
@@ -71,6 +72,7 @@ export interface FileSummary {
description: string | null;
transcriptCount: number;
duration: number | null;
+ version: number;
createdAt: string;
updatedAt: string;
}
@@ -84,6 +86,7 @@ export interface FileDetail {
location: string | null;
summary: string | null;
body: BodyElement[];
+ version: number;
createdAt: string;
updatedAt: string;
}
@@ -106,6 +109,27 @@ export interface UpdateFileRequest {
transcript?: TranscriptEntry[];
summary?: string;
body?: BodyElement[];
+ version?: number;
+}
+
+// Conflict error types
+export interface ConflictErrorResponse {
+ code: "VERSION_CONFLICT";
+ message: string;
+ expectedVersion: number;
+ actualVersion: number;
+}
+
+export class VersionConflictError extends Error {
+ expectedVersion: number;
+ actualVersion: number;
+
+ constructor(conflict: ConflictErrorResponse) {
+ super(conflict.message);
+ this.name = "VersionConflictError";
+ this.expectedVersion = conflict.expectedVersion;
+ this.actualVersion = conflict.actualVersion;
+ }
}
// Available LLM models
@@ -170,6 +194,12 @@ export async function updateFile(
headers: { "Content-Type": "application/json" },
body: JSON.stringify(data),
});
+
+ if (res.status === 409) {
+ const conflict = (await res.json()) as ConflictErrorResponse;
+ throw new VersionConflictError(conflict);
+ }
+
if (!res.ok) {
throw new Error(`Failed to update file: ${res.statusText}`);
}
diff --git a/makima/frontend/src/routes/files.tsx b/makima/frontend/src/routes/files.tsx
index 79544c5..423baa1 100644
--- a/makima/frontend/src/routes/files.tsx
+++ b/makima/frontend/src/routes/files.tsx
@@ -1,33 +1,71 @@
-import { useState, useCallback, useEffect } from "react";
+import { useState, useCallback, useEffect, useRef } from "react";
import { useParams, useNavigate } from "react-router";
import { Masthead } from "../components/Masthead";
import { FileList } from "../components/files/FileList";
import { FileDetail } from "../components/files/FileDetail";
import { CliInput } from "../components/files/CliInput";
+import { ConflictNotification } from "../components/files/ConflictNotification";
+import { UpdateNotification } from "../components/files/UpdateNotification";
import { useFiles } from "../hooks/useFiles";
+import {
+ useFileSubscription,
+ type FileUpdateEvent,
+} from "../hooks/useFileSubscription";
import type { FileDetail as FileDetailType, BodyElement } from "../lib/api";
export default function FilesPage() {
const { id } = useParams<{ id: string }>();
const navigate = useNavigate();
- const { files, loading, error, fetchFile, editFile, removeFile, saveFile } = useFiles();
+ const { files, loading, error, conflict, clearConflict, fetchFile, editFile, removeFile, saveFile } = useFiles();
const [fileDetail, setFileDetail] = useState<FileDetailType | null>(null);
const [detailLoading, setDetailLoading] = useState(false);
const [creating, setCreating] = useState(false);
+ const [remoteUpdate, setRemoteUpdate] = useState<FileUpdateEvent | null>(null);
+ const [hasLocalChanges, setHasLocalChanges] = useState(false);
+ const pendingUpdateRef = useRef(false);
// Load file detail when URL has an id
useEffect(() => {
if (id) {
setDetailLoading(true);
+ setHasLocalChanges(false);
fetchFile(id).then((detail) => {
setFileDetail(detail);
setDetailLoading(false);
});
} else {
setFileDetail(null);
+ setHasLocalChanges(false);
}
}, [id, fetchFile]);
+ // Handle file update events from WebSocket
+ const handleFileUpdate = useCallback(
+ async (event: FileUpdateEvent) => {
+ // Ignore our own updates
+ if (pendingUpdateRef.current) {
+ pendingUpdateRef.current = false;
+ return;
+ }
+
+ // If no local changes, auto-refresh
+ if (!hasLocalChanges) {
+ const detail = await fetchFile(event.fileId);
+ setFileDetail(detail);
+ } else {
+ // Show notification about remote update
+ setRemoteUpdate(event);
+ }
+ },
+ [hasLocalChanges, fetchFile]
+ );
+
+ // Subscribe to file updates
+ useFileSubscription({
+ fileId: id || null,
+ onUpdate: handleFileUpdate,
+ });
+
const handleSelectFile = useCallback(
(fileId: string) => {
navigate(`/files/${fileId}`);
@@ -53,11 +91,15 @@ export default function FilesPage() {
const handleSave = useCallback(
async (fileId: string, name: string, description: string) => {
- await editFile(fileId, { name, description });
- const detail = await fetchFile(fileId);
- setFileDetail(detail);
+ if (!fileDetail) return;
+ pendingUpdateRef.current = true;
+ const result = await editFile(fileId, { name, description, version: fileDetail.version });
+ if (result) {
+ setFileDetail(result);
+ setHasLocalChanges(false);
+ }
},
- [editFile, fetchFile]
+ [editFile, fileDetail]
);
const handleBodyUpdate = useCallback(
@@ -85,9 +127,15 @@ export default function FilesPage() {
...fileDetail,
body: newBody,
});
+ setHasLocalChanges(true);
- // Save to backend
- await editFile(id, { body: newBody });
+ // Save to backend with version for optimistic locking
+ pendingUpdateRef.current = true;
+ const result = await editFile(id, { body: newBody, version: fileDetail.version });
+ if (result) {
+ setFileDetail(result);
+ setHasLocalChanges(false);
+ }
}
},
[fileDetail, id, editFile]
@@ -106,9 +154,15 @@ export default function FilesPage() {
...fileDetail,
body: newBody,
});
+ setHasLocalChanges(true);
- // Save to backend
- await editFile(id, { body: newBody });
+ // Save to backend with version for optimistic locking
+ pendingUpdateRef.current = true;
+ const result = await editFile(id, { body: newBody, version: fileDetail.version });
+ if (result) {
+ setFileDetail(result);
+ setHasLocalChanges(false);
+ }
}
},
[fileDetail, id, editFile]
@@ -130,6 +184,47 @@ export default function FilesPage() {
}
}, [creating, saveFile, navigate]);
+ // Conflict resolution handlers
+ const handleConflictReload = useCallback(async () => {
+ if (id) {
+ clearConflict();
+ const detail = await fetchFile(id);
+ setFileDetail(detail);
+ setHasLocalChanges(false);
+ }
+ }, [id, clearConflict, fetchFile]);
+
+ const handleConflictForceOverwrite = useCallback(async () => {
+ if (id && fileDetail) {
+ clearConflict();
+ // Fetch latest version first
+ const latest = await fetchFile(id);
+ if (latest) {
+ // Retry with latest version
+ pendingUpdateRef.current = true;
+ const result = await editFile(id, { body: fileDetail.body, version: latest.version });
+ if (result) {
+ setFileDetail(result);
+ setHasLocalChanges(false);
+ }
+ }
+ }
+ }, [id, fileDetail, clearConflict, fetchFile, editFile]);
+
+ // Remote update handlers
+ const handleRemoteUpdateRefresh = useCallback(async () => {
+ if (id) {
+ const detail = await fetchFile(id);
+ setFileDetail(detail);
+ setRemoteUpdate(null);
+ setHasLocalChanges(false);
+ }
+ }, [id, fetchFile]);
+
+ const handleRemoteUpdateDismiss = useCallback(() => {
+ setRemoteUpdate(null);
+ }, []);
+
return (
<div className="relative z-10 h-screen flex flex-col overflow-hidden">
<Masthead showTicker={false} showNav />
@@ -172,6 +267,24 @@ export default function FilesPage() {
/>
)}
</main>
+
+ {/* Conflict notification */}
+ {conflict?.hasConflict && (
+ <ConflictNotification
+ onReload={handleConflictReload}
+ onForceOverwrite={handleConflictForceOverwrite}
+ onDismiss={clearConflict}
+ />
+ )}
+
+ {/* Remote update notification */}
+ {remoteUpdate && (
+ <UpdateNotification
+ updatedBy={remoteUpdate.updatedBy}
+ onRefresh={handleRemoteUpdateRefresh}
+ onDismiss={handleRemoteUpdateDismiss}
+ />
+ )}
</div>
);
}
diff --git a/makima/frontend/tsconfig.tsbuildinfo b/makima/frontend/tsconfig.tsbuildinfo
index b2542f9..bda8af0 100644
--- a/makima/frontend/tsconfig.tsbuildinfo
+++ b/makima/frontend/tsconfig.tsbuildinfo
@@ -1 +1 @@
-{"root":["./src/main.tsx","./src/vite-env.d.ts","./src/components/gridoverlay.tsx","./src/components/logo.tsx","./src/components/masthead.tsx","./src/components/navstrip.tsx","./src/components/rewritelink.tsx","./src/components/charts/chartrenderer.tsx","./src/components/files/bodyrenderer.tsx","./src/components/files/cliinput.tsx","./src/components/files/filedetail.tsx","./src/components/files/filelist.tsx","./src/components/listen/controlpanel.tsx","./src/components/listen/speakerpanel.tsx","./src/components/listen/transcriptpanel.tsx","./src/hooks/usefiles.ts","./src/hooks/usemicrophone.ts","./src/hooks/usetextscramble.ts","./src/hooks/usewebsocket.ts","./src/lib/api.ts","./src/routes/_index.tsx","./src/routes/files.tsx","./src/routes/listen.tsx","./src/types/messages.ts"],"version":"5.9.3"} \ No newline at end of file
+{"root":["./src/main.tsx","./src/vite-env.d.ts","./src/components/gridoverlay.tsx","./src/components/logo.tsx","./src/components/masthead.tsx","./src/components/navstrip.tsx","./src/components/rewritelink.tsx","./src/components/charts/chartrenderer.tsx","./src/components/files/bodyrenderer.tsx","./src/components/files/cliinput.tsx","./src/components/files/conflictnotification.tsx","./src/components/files/filedetail.tsx","./src/components/files/filelist.tsx","./src/components/files/updatenotification.tsx","./src/components/listen/controlpanel.tsx","./src/components/listen/speakerpanel.tsx","./src/components/listen/transcriptpanel.tsx","./src/hooks/usefilesubscription.ts","./src/hooks/usefiles.ts","./src/hooks/usemicrophone.ts","./src/hooks/usetextscramble.ts","./src/hooks/usewebsocket.ts","./src/lib/api.ts","./src/routes/_index.tsx","./src/routes/files.tsx","./src/routes/listen.tsx","./src/types/messages.ts"],"version":"5.9.3"} \ No newline at end of file
diff --git a/makima/migrations/20241224000000_add_file_version.sql b/makima/migrations/20241224000000_add_file_version.sql
new file mode 100644
index 0000000..1505983
--- /dev/null
+++ b/makima/migrations/20241224000000_add_file_version.sql
@@ -0,0 +1,17 @@
+-- Add version column for optimistic locking
+ALTER TABLE files ADD COLUMN version INTEGER NOT NULL DEFAULT 1;
+
+-- Create trigger function to increment version on update
+CREATE OR REPLACE FUNCTION increment_file_version()
+RETURNS TRIGGER AS $$
+BEGIN
+ NEW.version = OLD.version + 1;
+ RETURN NEW;
+END;
+$$ language 'plpgsql';
+
+-- Create trigger that fires before each update
+CREATE TRIGGER increment_files_version
+ BEFORE UPDATE ON files
+ FOR EACH ROW
+ EXECUTE FUNCTION increment_file_version();
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs
index 135ae75..8204b86 100644
--- a/makima/src/db/models.rs
+++ b/makima/src/db/models.rs
@@ -68,6 +68,8 @@ pub struct File {
/// Structured body content (headings, paragraphs, charts)
#[sqlx(json)]
pub body: Vec<BodyElement>,
+ /// Version number for optimistic locking
+ pub version: i32,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
@@ -100,6 +102,8 @@ pub struct UpdateFileRequest {
pub summary: Option<String>,
/// Structured body content (optional)
pub body: Option<Vec<BodyElement>>,
+ /// Version for optimistic locking (required for updates from frontend)
+ pub version: Option<i32>,
}
/// Response for file list endpoint.
@@ -120,6 +124,8 @@ pub struct FileSummary {
pub transcript_count: usize,
/// Duration derived from last transcript end time
pub duration: Option<f32>,
+ /// Version number for optimistic locking
+ pub version: i32,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
@@ -137,6 +143,7 @@ impl From<File> for FileSummary {
description: file.description,
transcript_count: file.transcript.len(),
duration: if duration > 0.0 { Some(duration) } else { None },
+ version: file.version,
created_at: file.created_at,
updated_at: file.updated_at,
}
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index f8b90b3..5b962ee 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -9,6 +9,43 @@ use super::models::{CreateFileRequest, File, UpdateFileRequest};
/// Default owner ID for anonymous users.
pub const ANONYMOUS_OWNER_ID: Uuid = Uuid::from_u128(0x00000000_0000_0000_0000_000000000002);
+/// Repository error types.
+#[derive(Debug)]
+pub enum RepositoryError {
+ /// Database error
+ Database(sqlx::Error),
+ /// Version conflict (optimistic locking failure)
+ VersionConflict {
+ /// The version the client expected
+ expected: i32,
+ /// The actual current version in the database
+ actual: i32,
+ },
+}
+
+impl From<sqlx::Error> for RepositoryError {
+ fn from(e: sqlx::Error) -> Self {
+ RepositoryError::Database(e)
+ }
+}
+
+impl std::fmt::Display for RepositoryError {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ RepositoryError::Database(e) => write!(f, "Database error: {}", e),
+ RepositoryError::VersionConflict { expected, actual } => {
+ write!(
+ f,
+ "Version conflict: expected {}, actual {}",
+ expected, actual
+ )
+ }
+ }
+ }
+}
+
+impl std::error::Error for RepositoryError {}
+
/// Generate a default name based on current timestamp.
fn generate_default_name() -> String {
let now = Utc::now();
@@ -25,7 +62,7 @@ pub async fn create_file(pool: &PgPool, req: CreateFileRequest) -> Result<File,
r#"
INSERT INTO files (owner_id, name, description, transcript, location, summary, body)
VALUES ($1, $2, $3, $4, $5, NULL, $6)
- RETURNING id, owner_id, name, description, transcript, location, summary, body, created_at, updated_at
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
"#,
)
.bind(ANONYMOUS_OWNER_ID)
@@ -42,7 +79,7 @@ pub async fn create_file(pool: &PgPool, req: CreateFileRequest) -> Result<File,
pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
- SELECT id, owner_id, name, description, transcript, location, summary, body, created_at, updated_at
+ SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
FROM files
WHERE id = $1 AND owner_id = $2
"#,
@@ -57,7 +94,7 @@ pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Err
pub async fn list_files(pool: &PgPool) -> Result<Vec<File>, sqlx::Error> {
sqlx::query_as::<_, File>(
r#"
- SELECT id, owner_id, name, description, transcript, location, summary, body, created_at, updated_at
+ SELECT id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
FROM files
WHERE owner_id = $1
ORDER BY created_at DESC
@@ -68,18 +105,33 @@ pub async fn list_files(pool: &PgPool) -> Result<Vec<File>, sqlx::Error> {
.await
}
-/// Update a file by ID.
+/// Update a file by ID with optimistic locking.
+///
+/// If `req.version` is provided, the update will only succeed if the current
+/// version matches. Returns `RepositoryError::VersionConflict` if there's a mismatch.
+///
+/// If `req.version` is None (e.g., internal system updates), version checking is skipped.
pub async fn update_file(
pool: &PgPool,
id: Uuid,
req: UpdateFileRequest,
-) -> Result<Option<File>, sqlx::Error> {
+) -> Result<Option<File>, RepositoryError> {
// Get the existing file first
let existing = get_file(pool, id).await?;
let Some(existing) = existing else {
return Ok(None);
};
+ // Check version if provided (optimistic locking)
+ if let Some(expected_version) = req.version {
+ if existing.version != expected_version {
+ return Err(RepositoryError::VersionConflict {
+ expected: expected_version,
+ actual: existing.version,
+ });
+ }
+ }
+
// Apply updates
let name = req.name.unwrap_or(existing.name);
let description = req.description.or(existing.description);
@@ -89,23 +141,59 @@ pub async fn update_file(
let body = req.body.unwrap_or(existing.body);
let body_json = serde_json::to_value(&body).unwrap_or_default();
- sqlx::query_as::<_, File>(
- r#"
- UPDATE files
- SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
- WHERE id = $1 AND owner_id = $2
- RETURNING id, owner_id, name, description, transcript, location, summary, body, created_at, updated_at
- "#,
- )
- .bind(id)
- .bind(ANONYMOUS_OWNER_ID)
- .bind(&name)
- .bind(&description)
- .bind(&transcript_json)
- .bind(&summary)
- .bind(&body_json)
- .fetch_optional(pool)
- .await
+ // Update with version check in WHERE clause for race condition safety
+ let result = if req.version.is_some() {
+ sqlx::query_as::<_, File>(
+ r#"
+ UPDATE files
+ SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
+ WHERE id = $1 AND owner_id = $2 AND version = $8
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
+ "#,
+ )
+ .bind(id)
+ .bind(ANONYMOUS_OWNER_ID)
+ .bind(&name)
+ .bind(&description)
+ .bind(&transcript_json)
+ .bind(&summary)
+ .bind(&body_json)
+ .bind(req.version.unwrap())
+ .fetch_optional(pool)
+ .await?
+ } else {
+ // No version check for internal updates
+ sqlx::query_as::<_, File>(
+ r#"
+ UPDATE files
+ SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
+ WHERE id = $1 AND owner_id = $2
+ RETURNING id, owner_id, name, description, transcript, location, summary, body, version, created_at, updated_at
+ "#,
+ )
+ .bind(id)
+ .bind(ANONYMOUS_OWNER_ID)
+ .bind(&name)
+ .bind(&description)
+ .bind(&transcript_json)
+ .bind(&summary)
+ .bind(&body_json)
+ .fetch_optional(pool)
+ .await?
+ };
+
+ // If versioned update returned None, there was a race condition
+ if result.is_none() && req.version.is_some() {
+ // Re-fetch to get the actual version
+ if let Some(current) = get_file(pool, id).await? {
+ return Err(RepositoryError::VersionConflict {
+ expected: req.version.unwrap(),
+ actual: current.version,
+ });
+ }
+ }
+
+ Ok(result)
}
/// Delete a file by ID.
diff --git a/makima/src/server/handlers/chat.rs b/makima/src/server/handlers/chat.rs
index 92c4ec8..3bdbc74 100644
--- a/makima/src/server/handlers/chat.rs
+++ b/makima/src/server/handlers/chat.rs
@@ -17,7 +17,7 @@ use crate::llm::{
groq::{GroqClient, GroqError, Message, ToolCallResponse},
LlmModel, ToolCall, ToolResult, AVAILABLE_TOOLS,
};
-use crate::server::state::SharedState;
+use crate::server::state::{FileUpdateNotification, SharedState};
/// Maximum number of tool-calling rounds to prevent infinite loops
const MAX_TOOL_ROUNDS: usize = 10;
@@ -385,17 +385,43 @@ pub async fn chat_handler(
transcript: None,
summary: current_summary.clone(),
body: Some(current_body.clone()),
+ version: None, // Internal update, skip version check
};
- if let Err(e) = repository::update_file(pool, id, update_req).await {
- tracing::error!("Failed to save file changes: {}", e);
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(serde_json::json!({
- "error": format!("Failed to save changes: {}", e)
- })),
- )
- .into_response();
+ match repository::update_file(pool, id, update_req).await {
+ Ok(Some(updated_file)) => {
+ // Broadcast update notification for LLM changes
+ let mut updated_fields = vec!["body".to_string()];
+ if current_summary.is_some() {
+ updated_fields.push("summary".to_string());
+ }
+ state.broadcast_file_update(FileUpdateNotification {
+ file_id: id,
+ version: updated_file.version,
+ updated_fields,
+ updated_by: "llm".to_string(),
+ });
+ }
+ Ok(None) => {
+ // File was deleted during processing
+ return (
+ StatusCode::NOT_FOUND,
+ Json(serde_json::json!({
+ "error": "File not found"
+ })),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to save file changes: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(serde_json::json!({
+ "error": format!("Failed to save changes: {}", e)
+ })),
+ )
+ .into_response();
+ }
}
}
diff --git a/makima/src/server/handlers/file_ws.rs b/makima/src/server/handlers/file_ws.rs
new file mode 100644
index 0000000..5a44309
--- /dev/null
+++ b/makima/src/server/handlers/file_ws.rs
@@ -0,0 +1,163 @@
+//! WebSocket handler for file change subscriptions.
+//!
+//! Clients can subscribe to specific files and receive real-time notifications
+//! when those files are updated by any source (user edits, LLM modifications, etc.).
+
+use axum::{
+ extract::{ws::Message, ws::WebSocket, State, WebSocketUpgrade},
+ response::Response,
+};
+use futures::{SinkExt, StreamExt};
+use serde::{Deserialize, Serialize};
+use std::collections::HashSet;
+use uuid::Uuid;
+
+use crate::server::state::SharedState;
+
+/// Client message for file subscription management.
+#[derive(Debug, Clone, Deserialize)]
+#[serde(tag = "type", rename_all = "camelCase")]
+pub enum FileClientMessage {
+ /// Subscribe to updates for a specific file
+ Subscribe {
+ #[serde(rename = "fileId")]
+ file_id: Uuid,
+ },
+ /// Unsubscribe from updates for a specific file
+ Unsubscribe {
+ #[serde(rename = "fileId")]
+ file_id: Uuid,
+ },
+}
+
+/// Server message for file subscription WebSocket.
+#[derive(Debug, Clone, Serialize)]
+#[serde(tag = "type", rename_all = "camelCase")]
+pub enum FileServerMessage {
+ /// Subscription confirmed
+ Subscribed {
+ #[serde(rename = "fileId")]
+ file_id: Uuid,
+ },
+ /// Unsubscription confirmed
+ Unsubscribed {
+ #[serde(rename = "fileId")]
+ file_id: Uuid,
+ },
+ /// File was updated
+ FileUpdated {
+ #[serde(rename = "fileId")]
+ file_id: Uuid,
+ version: i32,
+ #[serde(rename = "updatedFields")]
+ updated_fields: Vec<String>,
+ #[serde(rename = "updatedBy")]
+ updated_by: String,
+ },
+ /// Error occurred
+ Error { code: String, message: String },
+}
+
+/// WebSocket upgrade handler for file subscriptions.
+#[utoipa::path(
+ get,
+ path = "/api/v1/files/subscribe",
+ responses(
+ (status = 101, description = "WebSocket connection established"),
+ ),
+ tag = "Files"
+)]
+pub async fn file_subscription_handler(
+ ws: WebSocketUpgrade,
+ State(state): State<SharedState>,
+) -> Response {
+ ws.on_upgrade(|socket| handle_file_subscription(socket, state))
+}
+
+async fn handle_file_subscription(socket: WebSocket, state: SharedState) {
+ let (mut sender, mut receiver) = socket.split();
+
+ // Set of file IDs this client is subscribed to
+ let mut subscriptions: HashSet<Uuid> = HashSet::new();
+
+ // Subscribe to the broadcast channel
+ let mut broadcast_rx = state.file_updates.subscribe();
+
+ loop {
+ tokio::select! {
+ // Handle incoming WebSocket messages from client
+ msg = receiver.next() => {
+ match msg {
+ Some(Ok(Message::Text(text))) => {
+ match serde_json::from_str::<FileClientMessage>(&text) {
+ Ok(FileClientMessage::Subscribe { file_id }) => {
+ subscriptions.insert(file_id);
+ let response = FileServerMessage::Subscribed { file_id };
+ let json = serde_json::to_string(&response).unwrap();
+ if sender.send(Message::Text(json.into())).await.is_err() {
+ break;
+ }
+ tracing::debug!("Client subscribed to file {}", file_id);
+ }
+ Ok(FileClientMessage::Unsubscribe { file_id }) => {
+ subscriptions.remove(&file_id);
+ let response = FileServerMessage::Unsubscribed { file_id };
+ let json = serde_json::to_string(&response).unwrap();
+ if sender.send(Message::Text(json.into())).await.is_err() {
+ break;
+ }
+ tracing::debug!("Client unsubscribed from file {}", file_id);
+ }
+ Err(e) => {
+ let response = FileServerMessage::Error {
+ code: "PARSE_ERROR".into(),
+ message: e.to_string(),
+ };
+ let json = serde_json::to_string(&response).unwrap();
+ let _ = sender.send(Message::Text(json.into())).await;
+ }
+ }
+ }
+ Some(Ok(Message::Close(_))) | None => {
+ tracing::debug!("Client disconnected from file subscription");
+ break;
+ }
+ Some(Err(e)) => {
+ tracing::warn!("WebSocket error: {}", e);
+ break;
+ }
+ _ => {}
+ }
+ }
+
+ // Handle broadcast notifications
+ notification = broadcast_rx.recv() => {
+ match notification {
+ Ok(notification) => {
+ // Only forward if client is subscribed to this file
+ if subscriptions.contains(&notification.file_id) {
+ let response = FileServerMessage::FileUpdated {
+ file_id: notification.file_id,
+ version: notification.version,
+ updated_fields: notification.updated_fields,
+ updated_by: notification.updated_by,
+ };
+ let json = serde_json::to_string(&response).unwrap();
+ if sender.send(Message::Text(json.into())).await.is_err() {
+ break;
+ }
+ }
+ }
+ Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
+ // Client is too slow, skip some messages
+ tracing::warn!("File subscription client lagged, skipped {} messages", n);
+ }
+ Err(tokio::sync::broadcast::error::RecvError::Closed) => {
+ // Channel closed, exit
+ break;
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/makima/src/server/handlers/files.rs b/makima/src/server/handlers/files.rs
index 746d66b..c65eed5 100644
--- a/makima/src/server/handlers/files.rs
+++ b/makima/src/server/handlers/files.rs
@@ -9,9 +9,9 @@ use axum::{
use uuid::Uuid;
use crate::db::models::{CreateFileRequest, FileListResponse, FileSummary, UpdateFileRequest};
-use crate::db::repository;
+use crate::db::repository::{self, RepositoryError};
use crate::server::messages::ApiError;
-use crate::server::state::SharedState;
+use crate::server::state::{FileUpdateNotification, SharedState};
/// List all files for the current owner.
#[utoipa::path(
@@ -148,6 +148,7 @@ pub async fn create_file(
responses(
(status = 200, description = "File updated", body = crate::db::models::File),
(status = 404, description = "File not found", body = ApiError),
+ (status = 409, description = "Version conflict", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
@@ -166,14 +167,62 @@ pub async fn update_file(
.into_response();
};
+ // Collect which fields are being updated for broadcast
+ let mut updated_fields = Vec::new();
+ if req.name.is_some() {
+ updated_fields.push("name".to_string());
+ }
+ if req.description.is_some() {
+ updated_fields.push("description".to_string());
+ }
+ if req.transcript.is_some() {
+ updated_fields.push("transcript".to_string());
+ }
+ if req.summary.is_some() {
+ updated_fields.push("summary".to_string());
+ }
+ if req.body.is_some() {
+ updated_fields.push("body".to_string());
+ }
+
match repository::update_file(pool, id, req).await {
- Ok(Some(file)) => Json(file).into_response(),
+ Ok(Some(file)) => {
+ // Broadcast update notification
+ state.broadcast_file_update(FileUpdateNotification {
+ file_id: id,
+ version: file.version,
+ updated_fields,
+ updated_by: "user".to_string(),
+ });
+ Json(file).into_response()
+ }
Ok(None) => (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "File not found")),
)
.into_response(),
- Err(e) => {
+ Err(RepositoryError::VersionConflict { expected, actual }) => {
+ tracing::info!(
+ "Version conflict on file {}: expected {}, actual {}",
+ id,
+ expected,
+ actual
+ );
+ (
+ StatusCode::CONFLICT,
+ Json(serde_json::json!({
+ "code": "VERSION_CONFLICT",
+ "message": format!(
+ "File was modified by another user. Expected version {}, actual version {}",
+ expected, actual
+ ),
+ "expectedVersion": expected,
+ "actualVersion": actual,
+ })),
+ )
+ .into_response()
+ }
+ Err(RepositoryError::Database(e)) => {
tracing::error!("Failed to update file {}: {}", id, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
diff --git a/makima/src/server/handlers/listen.rs b/makima/src/server/handlers/listen.rs
index 5fc5cea..a26c208 100644
--- a/makima/src/server/handlers/listen.rs
+++ b/makima/src/server/handlers/listen.rs
@@ -467,6 +467,7 @@ async fn handle_socket(socket: WebSocket, state: SharedState) {
transcript: Some(final_entries.clone()),
summary: None,
body: None,
+ version: None, // Internal update, skip version check
}).await {
Ok(_) => {
tracing::info!(
diff --git a/makima/src/server/handlers/mod.rs b/makima/src/server/handlers/mod.rs
index b13668a..c08f1bd 100644
--- a/makima/src/server/handlers/mod.rs
+++ b/makima/src/server/handlers/mod.rs
@@ -1,5 +1,6 @@
//! HTTP and WebSocket request handlers.
pub mod chat;
+pub mod file_ws;
pub mod files;
pub mod listen;
diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs
index a8f98a6..f132cf4 100644
--- a/makima/src/server/mod.rs
+++ b/makima/src/server/mod.rs
@@ -17,7 +17,7 @@ use tower_http::trace::TraceLayer;
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
-use crate::server::handlers::{chat, files, listen};
+use crate::server::handlers::{chat, file_ws, files, listen};
use crate::server::openapi::ApiDoc;
use crate::server::state::SharedState;
@@ -43,6 +43,7 @@ pub fn make_router(state: SharedState) -> Router {
// API v1 routes
let api_v1 = Router::new()
.route("/listen", get(listen::websocket_handler))
+ .route("/files/subscribe", get(file_ws::file_subscription_handler))
.route("/files", get(files::list_files).post(files::create_file))
.route(
"/files/{id}",
diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs
index 8cdc26c..239ab77 100644
--- a/makima/src/server/state.rs
+++ b/makima/src/server/state.rs
@@ -2,10 +2,24 @@
use std::sync::Arc;
use sqlx::PgPool;
-use tokio::sync::Mutex;
+use tokio::sync::{broadcast, Mutex};
+use uuid::Uuid;
use crate::listen::{DiarizationConfig, ParakeetEOU, ParakeetTDT, Sortformer};
+/// Notification payload for file updates (broadcast to WebSocket subscribers).
+#[derive(Debug, Clone)]
+pub struct FileUpdateNotification {
+ /// ID of the updated file
+ pub file_id: Uuid,
+ /// New version number after update
+ pub version: i32,
+ /// List of fields that were updated
+ pub updated_fields: Vec<String>,
+ /// Source of the update: "user", "llm", or "system"
+ pub updated_by: String,
+}
+
/// Shared application state containing ML models and database pool.
///
/// Models are wrapped in `Mutex` for thread-safe mutable access during inference.
@@ -18,6 +32,8 @@ pub struct AppState {
pub sortformer: Mutex<Sortformer>,
/// Optional database connection pool
pub db_pool: Option<PgPool>,
+ /// Broadcast channel for file update notifications
+ pub file_updates: broadcast::Sender<FileUpdateNotification>,
}
impl AppState {
@@ -40,11 +56,15 @@ impl AppState {
DiarizationConfig::callhome(),
)?;
+ // Create broadcast channel with buffer for 256 messages
+ let (file_updates, _) = broadcast::channel(256);
+
Ok(Self {
parakeet: Mutex::new(parakeet),
parakeet_eou: Mutex::new(parakeet_eou),
sortformer: Mutex::new(sortformer),
db_pool: None,
+ file_updates,
})
}
@@ -53,6 +73,14 @@ impl AppState {
self.db_pool = Some(pool);
self
}
+
+ /// Broadcast a file update notification to all subscribers.
+ ///
+ /// This is a no-op if there are no subscribers (ignores send errors).
+ pub fn broadcast_file_update(&self, notification: FileUpdateNotification) {
+ // Ignore send errors - they just mean no one is listening
+ let _ = self.file_updates.send(notification);
+ }
}
/// Type alias for the shared application state.