diff options
| author | soryu <soryu@soryu.co> | 2026-02-11 00:37:53 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-02-11 00:37:53 +0000 |
| commit | 6355e8babac39aec1fca738721c3e82c29603070 (patch) | |
| tree | b2f81d368352b20e4bfcfaffd0c365bc20d13f95 | |
| parent | 15b6e5fba161a194fe5427d7d29b0c4286423260 (diff) | |
| download | soryu-makima/makima--add-an-optional-memory-system-for-directiv-91b9fb4f.tar.gz soryu-makima/makima--add-an-optional-memory-system-for-directiv-91b9fb4f.zip | |
feat: makima: Add an optional memory system for directives: Create useMultiTaskSubscription hook for multi-output WebSocket streamingmakima/makima--add-an-optional-memory-system-for-directiv-91b9fb4f
| -rw-r--r-- | makima/frontend/package-lock.json | 15 | ||||
| -rw-r--r-- | makima/frontend/src/hooks/useMultiTaskSubscription.ts | 479 |
2 files changed, 480 insertions, 14 deletions
diff --git a/makima/frontend/package-lock.json b/makima/frontend/package-lock.json index 38adfc4..f1d54d6 100644 --- a/makima/frontend/package-lock.json +++ b/makima/frontend/package-lock.json @@ -55,7 +55,6 @@ "resolved": "https://registry.npmjs.org/@babel/core/-/core-7.28.5.tgz", "integrity": "sha512-e7jT4DxYvIDLk1ZHmU/m/mB19rex9sv0c2ftBtjSBv+kVM/902eh0fINUzD7UwLLNR+jU585GxUJ8/EBfAM5fw==", "dev": true, - "peer": true, "dependencies": { "@babel/code-frame": "^7.27.1", "@babel/generator": "^7.28.5", @@ -962,7 +961,6 @@ "resolved": "https://registry.npmjs.org/@react-router/dev/-/dev-7.11.0.tgz", "integrity": "sha512-g1ou5Zw3r4mCU0L+EXH4vRtAiyt8qz1JOvL1k+PW4rZ4+71h5nBy/fLgD7cg5BnzQZmjRO1PzCgpF5BIrlKYxQ==", "dev": true, - "peer": true, "dependencies": { "@babel/core": "^7.27.7", "@babel/generator": "^7.27.5", @@ -1894,7 +1892,6 @@ "resolved": "https://registry.npmjs.org/@types/react/-/react-19.2.7.tgz", "integrity": "sha512-MWtvHrGZLFttgeEj28VXHxpmwYbor/ATPYbBfSFZEIRK0ecCFLl2Qo55z52Hss+UV9CRN7trSeq1zbgx7YDWWg==", "devOptional": true, - "peer": true, "dependencies": { "csstype": "^3.2.2" } @@ -2043,7 +2040,6 @@ "url": "https://github.com/sponsors/ai" } ], - "peer": true, "dependencies": { "baseline-browser-mapping": "^2.9.0", "caniuse-lite": "^1.0.30001759", @@ -2242,7 +2238,6 @@ "resolved": "https://registry.npmjs.org/d3-selection/-/d3-selection-3.0.0.tgz", "integrity": "sha512-fmTRWbNMmsmWq6xJV8D19U/gw/bwrHfNXxrIN+HfZgnzqTHp9jOmKMhsTUjXOJnZOdZY9Q28y4yebKzqDKlxlQ==", "license": "ISC", - "peer": true, "engines": { "node": ">=12" } @@ -2937,7 +2932,6 @@ "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-4.0.3.tgz", "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "dev": true, - "peer": true, "engines": { "node": ">=12" }, @@ -3009,7 +3003,6 @@ "version": "19.2.3", "resolved": "https://registry.npmjs.org/react/-/react-19.2.3.tgz", "integrity": "sha512-Ku/hhYbVjOQnXDZFv2+RibmLFGwFdeeKHFcOTlrt7xplBnya5OGn/hIRDsqDiSUcfORsDC7MPxwork8jBwsIWA==", - "peer": true, "engines": { "node": ">=0.10.0" } @@ -3018,7 +3011,6 @@ "version": "19.2.3", "resolved": "https://registry.npmjs.org/react-dom/-/react-dom-19.2.3.tgz", "integrity": "sha512-yELu4WmLPw5Mr/lmeEpox5rw3RETacE++JgHqQzd2dg+YbJuat3jH4ingc+WPZhxaoFzdv9y33G+F7Nl5O0GBg==", - "peer": true, "dependencies": { "scheduler": "^0.27.0" }, @@ -3036,7 +3028,6 @@ "version": "9.2.0", "resolved": "https://registry.npmjs.org/react-redux/-/react-redux-9.2.0.tgz", "integrity": "sha512-ROY9fvHhwOD9ySfrF0wmvu//bKCQ6AeZZq1nJNtbDC+kk5DuSuNX/n6YWYF/SYy7bSba4D4FSz8DJeKY/S/r+g==", - "peer": true, "dependencies": { "@types/use-sync-external-store": "^0.0.6", "use-sync-external-store": "^1.4.0" @@ -3068,7 +3059,6 @@ "version": "7.11.0", "resolved": "https://registry.npmjs.org/react-router/-/react-router-7.11.0.tgz", "integrity": "sha512-uI4JkMmjbWCZc01WVP2cH7ZfSzH91JAZUDd7/nIprDgWxBV1TkkmLToFh7EbMTcMak8URFRa2YoBL/W8GWnCTQ==", - "peer": true, "dependencies": { "cookie": "^1.0.1", "set-cookie-parser": "^2.6.0" @@ -3128,8 +3118,7 @@ "node_modules/redux": { "version": "5.0.1", "resolved": "https://registry.npmjs.org/redux/-/redux-5.0.1.tgz", - "integrity": "sha512-M9/ELqF6fy8FwmkpnF0S3YKOqMyoWJ4+CS5Efg2ct3oY9daQvd/Pc71FpGZsVsbl3Cpb+IIcjBDUnnyBdQbq4w==", - "peer": true + "integrity": "sha512-M9/ELqF6fy8FwmkpnF0S3YKOqMyoWJ4+CS5Efg2ct3oY9daQvd/Pc71FpGZsVsbl3Cpb+IIcjBDUnnyBdQbq4w==" }, "node_modules/redux-thunk": { "version": "3.1.0", @@ -3266,7 +3255,6 @@ "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.9.3.tgz", "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", "dev": true, - "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -3358,7 +3346,6 @@ "resolved": "https://registry.npmjs.org/vite/-/vite-6.4.1.tgz", "integrity": "sha512-+Oxm7q9hDoLMyJOYfUYBuHQo+dkAloi33apOPP56pzj+vsdJDzr+j1NISE5pyaAuKL4A3UD34qd0lx5+kfKp2g==", "dev": true, - "peer": true, "dependencies": { "esbuild": "^0.25.0", "fdir": "^6.4.4", diff --git a/makima/frontend/src/hooks/useMultiTaskSubscription.ts b/makima/frontend/src/hooks/useMultiTaskSubscription.ts new file mode 100644 index 0000000..e555d56 --- /dev/null +++ b/makima/frontend/src/hooks/useMultiTaskSubscription.ts @@ -0,0 +1,479 @@ +import { useState, useCallback, useRef, useEffect, useMemo } from "react"; +import { TASK_SUBSCRIBE_ENDPOINT, getTaskOutput } from "../lib/api"; +import type { TaskUpdateEvent, TaskOutputEvent } from "./useTaskSubscription"; + +/** + * Color palette for task streams - 10 distinct colors that work well on dark backgrounds. + */ +const TASK_COLORS = [ + "#58a6ff", // blue + "#3fb950", // green + "#d2a8ff", // purple + "#f0883e", // orange + "#f778ba", // pink + "#79c0ff", // light blue + "#7ee787", // light green + "#ffa657", // light orange + "#ff7b72", // red/coral + "#d8c96a", // yellow +]; + +/** + * A single combined output entry tagged with task metadata. + */ +export interface MultiTaskOutputEntry { + taskId: string; + taskName: string; + color: string; + event: TaskOutputEvent; + timestamp: number; +} + +/** + * Options for the multi-task subscription hook. + */ +export interface UseMultiTaskSubscriptionOptions { + /** Map of taskId -> display name */ + tasks: Map<string, string>; + /** Set of task IDs to show (all if null/undefined) */ + visibleTaskIds?: Set<string> | null; + onOutput?: (entry: MultiTaskOutputEntry) => void; + onUpdate?: (event: TaskUpdateEvent) => void; +} + +/** + * Return value from useMultiTaskSubscription. + */ +export interface UseMultiTaskSubscriptionReturn { + connected: boolean; + /** All output entries (combined stream) */ + entries: MultiTaskOutputEntry[]; + /** Filtered entries based on visibleTaskIds */ + filteredEntries: MultiTaskOutputEntry[]; + /** Add a new task to subscribe to */ + addTask: (taskId: string, name: string) => void; + /** Remove a task subscription */ + removeTask: (taskId: string) => void; + /** Clear output for a specific task */ + clearTask: (taskId: string) => void; + /** Clear all output */ + clearAll: () => void; +} + +/** + * Hook that subscribes to output from multiple tasks simultaneously over a + * single WebSocket connection. Routes output events to per-task buffers and + * maintains a combined/interleaved output stream with task labels. + * + * Also subscribes to all task updates (subscribeAll) to detect when tasks + * start/stop running. + * + * When a task is added, fetches initial output history via getTaskOutput. + */ +export function useMultiTaskSubscription( + options: UseMultiTaskSubscriptionOptions +): UseMultiTaskSubscriptionReturn { + const { tasks, visibleTaskIds, onOutput, onUpdate } = options; + + const [connected, setConnected] = useState(false); + const [entries, setEntries] = useState<MultiTaskOutputEntry[]>([]); + + const wsRef = useRef<WebSocket | null>(null); + const reconnectTimeoutRef = useRef<number | null>(null); + + // Track which task IDs have active output subscriptions on the WS + const subscribedOutputIdsRef = useRef<Set<string>>(new Set()); + // Track whether we've sent subscribeAll + const subscribedAllRef = useRef(false); + // Track task metadata: id -> { name, color, colorIndex } + const taskMetaRef = useRef< + Map<string, { name: string; color: string; colorIndex: number }> + >(new Map()); + // Round-robin color index counter + const colorIndexRef = useRef(0); + // Track tasks for which we've already fetched initial history + const fetchedHistoryRef = useRef<Set<string>>(new Set()); + + // Store callbacks in refs to avoid re-connecting when callbacks change + const callbacksRef = useRef({ onOutput, onUpdate }); + useEffect(() => { + callbacksRef.current = { onOutput, onUpdate }; + }, [onOutput, onUpdate]); + + /** + * Get or assign a color for a task. + */ + const getTaskColor = useCallback((taskId: string): string => { + const existing = taskMetaRef.current.get(taskId); + if (existing) return existing.color; + const idx = colorIndexRef.current; + colorIndexRef.current = (idx + 1) % TASK_COLORS.length; + return TASK_COLORS[idx]; + }, []); + + /** + * Get task name from our metadata, falling back to truncated ID. + */ + const getTaskName = useCallback((taskId: string): string => { + const meta = taskMetaRef.current.get(taskId); + return meta?.name ?? taskId.slice(0, 8); + }, []); + + /** + * Send a WebSocket message if connected. + */ + const wsSend = useCallback((msg: Record<string, unknown>) => { + if (wsRef.current?.readyState === WebSocket.OPEN) { + wsRef.current.send(JSON.stringify(msg)); + } + }, []); + + /** + * Subscribe output for a single task ID on the WebSocket. + */ + const subscribeOutputForTask = useCallback( + (taskId: string) => { + subscribedOutputIdsRef.current.add(taskId); + wsSend({ type: "subscribeOutput", taskId }); + }, + [wsSend] + ); + + /** + * Unsubscribe output for a single task ID on the WebSocket. + */ + const unsubscribeOutputForTask = useCallback( + (taskId: string) => { + subscribedOutputIdsRef.current.delete(taskId); + wsSend({ type: "unsubscribeOutput", taskId }); + }, + [wsSend] + ); + + /** + * Fetch initial output history for a task and prepend to entries. + */ + const fetchInitialHistory = useCallback( + async (taskId: string) => { + if (fetchedHistoryRef.current.has(taskId)) return; + fetchedHistoryRef.current.add(taskId); + + try { + const response = await getTaskOutput(taskId); + if (response.entries.length === 0) return; + + const name = getTaskName(taskId); + const color = getTaskColor(taskId); + + const historyEntries: MultiTaskOutputEntry[] = response.entries.map( + (entry) => ({ + taskId, + taskName: name, + color, + event: { + taskId: entry.taskId, + messageType: entry.messageType, + content: entry.content, + toolName: entry.toolName ?? undefined, + toolInput: entry.toolInput as + | Record<string, unknown> + | undefined, + isError: entry.isError ?? undefined, + costUsd: entry.costUsd ?? undefined, + durationMs: entry.durationMs ?? undefined, + isPartial: false, // Historical entries are always complete + }, + timestamp: new Date(entry.createdAt).getTime(), + }) + ); + + setEntries((prev) => { + // Merge history entries with existing, sort by timestamp + const merged = [...historyEntries, ...prev]; + merged.sort((a, b) => a.timestamp - b.timestamp); + return merged; + }); + } catch (err) { + console.error( + `Failed to fetch output history for task ${taskId}:`, + err + ); + } + }, + [getTaskName, getTaskColor] + ); + + /** + * Establish the WebSocket connection. + */ + const connect = useCallback(() => { + const currentState = wsRef.current?.readyState; + if ( + currentState === WebSocket.OPEN || + currentState === WebSocket.CONNECTING + ) { + return; + } + + // Clear any stale CLOSING socket + if (wsRef.current && currentState === WebSocket.CLOSING) { + wsRef.current = null; + } + + try { + const ws = new WebSocket(TASK_SUBSCRIBE_ENDPOINT); + wsRef.current = ws; + + ws.onopen = () => { + setConnected(true); + + // Re-subscribe to all task updates + if (subscribedAllRef.current) { + ws.send(JSON.stringify({ type: "subscribeAll" })); + } + + // Re-subscribe output for all tracked tasks + for (const taskId of subscribedOutputIdsRef.current) { + ws.send( + JSON.stringify({ type: "subscribeOutput", taskId }) + ); + } + }; + + ws.onmessage = (event) => { + try { + const message = JSON.parse(event.data); + + switch (message.type) { + case "taskUpdated": { + const updateEvent: TaskUpdateEvent = { + taskId: message.taskId, + version: message.version, + status: message.status, + updatedFields: message.updatedFields, + updatedBy: message.updatedBy, + }; + callbacksRef.current.onUpdate?.(updateEvent); + break; + } + case "taskOutput": { + const outputTaskId: string = message.taskId; + // Only process output for tasks we're tracking + if (!subscribedOutputIdsRef.current.has(outputTaskId)) break; + + const outputEvent: TaskOutputEvent = { + taskId: outputTaskId, + messageType: message.messageType, + content: message.content, + toolName: message.toolName, + toolInput: message.toolInput, + isError: message.isError, + costUsd: message.costUsd, + durationMs: message.durationMs, + isPartial: message.isPartial, + }; + + const name = getTaskName(outputTaskId); + const color = getTaskColor(outputTaskId); + + const entry: MultiTaskOutputEntry = { + taskId: outputTaskId, + taskName: name, + color, + event: outputEvent, + timestamp: Date.now(), + }; + + setEntries((prev) => [...prev, entry]); + callbacksRef.current.onOutput?.(entry); + break; + } + case "error": + console.error("Multi-task subscription error:", message.message); + break; + // Acknowledgement messages + case "subscribed": + case "unsubscribed": + case "subscribedAll": + case "unsubscribedAll": + case "outputSubscribed": + case "outputUnsubscribed": + break; + } + } catch (e) { + console.error( + "Failed to parse multi-task subscription message:", + e + ); + } + }; + + ws.onerror = () => { + console.error("Multi-task WebSocket connection error"); + }; + + ws.onclose = () => { + setConnected(false); + wsRef.current = null; + + // Attempt reconnection if we still have active subscriptions + if ( + subscribedOutputIdsRef.current.size > 0 || + subscribedAllRef.current + ) { + reconnectTimeoutRef.current = window.setTimeout(() => { + connect(); + }, 3000); + } + }; + } catch (e) { + console.error( + "Failed to connect multi-task WebSocket:", + e instanceof Error ? e.message : e + ); + } + }, [getTaskName, getTaskColor]); + + /** + * Ensure the WebSocket is connected and subscribeAll is active. + */ + const ensureConnected = useCallback(() => { + if (!subscribedAllRef.current) { + subscribedAllRef.current = true; + } + + if (wsRef.current?.readyState === WebSocket.OPEN) { + wsSend({ type: "subscribeAll" }); + } else { + connect(); + } + }, [connect, wsSend]); + + // ─── Public API ─────────────────────────────────────────────────────────── + + /** + * Add a new task to subscribe to. + */ + const addTask = useCallback( + (taskId: string, name: string) => { + const color = getTaskColor(taskId); + const colorIndex = colorIndexRef.current; + taskMetaRef.current.set(taskId, { name, color, colorIndex }); + + ensureConnected(); + subscribeOutputForTask(taskId); + + // Fetch initial history in the background + fetchInitialHistory(taskId); + }, + [ + getTaskColor, + ensureConnected, + subscribeOutputForTask, + fetchInitialHistory, + ] + ); + + /** + * Remove a task subscription and optionally its output. + */ + const removeTask = useCallback( + (taskId: string) => { + unsubscribeOutputForTask(taskId); + taskMetaRef.current.delete(taskId); + fetchedHistoryRef.current.delete(taskId); + + // Remove entries for this task + setEntries((prev) => prev.filter((e) => e.taskId !== taskId)); + }, + [unsubscribeOutputForTask] + ); + + /** + * Clear output for a specific task (keep subscription active). + */ + const clearTask = useCallback((taskId: string) => { + setEntries((prev) => prev.filter((e) => e.taskId !== taskId)); + }, []); + + /** + * Clear all output (keep subscriptions active). + */ + const clearAll = useCallback(() => { + setEntries([]); + }, []); + + // ─── Sync tasks from options ────────────────────────────────────────────── + + // Serialize the tasks map to a stable string for dependency tracking + const tasksKey = useMemo(() => { + const sorted = [...tasks.entries()].sort(([a], [b]) => + a.localeCompare(b) + ); + return JSON.stringify(sorted); + }, [tasks]); + + useEffect(() => { + const currentIds = new Set(tasks.keys()); + const subscribedIds = subscribedOutputIdsRef.current; + + // Add new tasks + for (const [taskId, name] of tasks) { + if (!subscribedIds.has(taskId)) { + addTask(taskId, name); + } else { + // Update name if changed + const meta = taskMetaRef.current.get(taskId); + if (meta && meta.name !== name) { + meta.name = name; + // Update existing entries with the new name + setEntries((prev) => + prev.map((e) => + e.taskId === taskId ? { ...e, taskName: name } : e + ) + ); + } + } + } + + // Remove tasks no longer in the map + for (const taskId of subscribedIds) { + if (!currentIds.has(taskId)) { + removeTask(taskId); + } + } + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [tasksKey]); + + // ─── Filtered entries ───────────────────────────────────────────────────── + + const filteredEntries = useMemo(() => { + if (!visibleTaskIds) return entries; + return entries.filter((e) => visibleTaskIds.has(e.taskId)); + }, [entries, visibleTaskIds]); + + // ─── Cleanup on unmount ─────────────────────────────────────────────────── + + useEffect(() => { + return () => { + if (reconnectTimeoutRef.current) { + clearTimeout(reconnectTimeoutRef.current); + } + if (wsRef.current) { + wsRef.current.close(); + } + subscribedOutputIdsRef.current.clear(); + subscribedAllRef.current = false; + }; + }, []); + + return { + connected, + entries, + filteredEntries, + addTask, + removeTask, + clearTask, + clearAll, + }; +} |
