diff options
Diffstat (limited to 'makima/frontend/src/hooks/useMultiTaskSubscription.ts')
| -rw-r--r-- | makima/frontend/src/hooks/useMultiTaskSubscription.ts | 153 |
1 files changed, 150 insertions, 3 deletions
diff --git a/makima/frontend/src/hooks/useMultiTaskSubscription.ts b/makima/frontend/src/hooks/useMultiTaskSubscription.ts index 4303f1b..41489c7 100644 --- a/makima/frontend/src/hooks/useMultiTaskSubscription.ts +++ b/makima/frontend/src/hooks/useMultiTaskSubscription.ts @@ -1,5 +1,5 @@ import { useState, useCallback, useRef, useEffect, useMemo } from "react"; -import { TASK_SUBSCRIBE_ENDPOINT } from "../lib/api"; +import { TASK_SUBSCRIBE_ENDPOINT, getTaskOutput } from "../lib/api"; import type { TaskOutputEvent } from "./useTaskSubscription"; export interface MultiTaskOutputEntry extends TaskOutputEvent { @@ -7,6 +7,8 @@ export interface MultiTaskOutputEntry extends TaskOutputEvent { taskLabel: string; /** Timestamp when the entry was received */ receivedAt: number; + /** Whether this entry was backfilled from historical data (not live-streamed) */ + isBackfill?: boolean; } interface UseMultiTaskSubscriptionOptions { @@ -26,8 +28,11 @@ export function useMultiTaskSubscription(options: UseMultiTaskSubscriptionOption const wsRef = useRef<WebSocket | null>(null); const reconnectTimeoutRef = useRef<number | null>(null); const subscribedTasksRef = useRef<Set<string>>(new Set()); + const backfilledTasksRef = useRef<Set<string>>(new Set()); const taskMapRef = useRef(taskMap); const enabledRef = useRef(enabled); + /** Track which task IDs have already been backfilled to avoid re-fetching */ + const backfilledTasksRef = useRef<Set<string>>(new Set()); // Keep refs in sync useEffect(() => { @@ -38,6 +43,88 @@ export function useMultiTaskSubscription(options: UseMultiTaskSubscriptionOption enabledRef.current = enabled; }, [enabled]); + /** Max number of historical events to backfill per task */ + const MAX_BACKFILL_PER_TASK = 200; + + /** + * Convert a TaskEvent (from the REST API) into a MultiTaskOutputEntry. + * Only converts events with event_type === 'output'. + */ + const convertTaskEventToEntry = useCallback( + (event: TaskEvent): MultiTaskOutputEntry | null => { + if (event.eventType !== "output") return null; + const data = event.eventData; + if (!data) return null; + + return { + taskId: event.taskId, + messageType: (data.messageType as string) || "system", + content: (data.content as string) || "", + toolName: data.toolName as string | undefined, + toolInput: data.toolInput as Record<string, unknown> | undefined, + isError: data.isError as boolean | undefined, + costUsd: data.costUsd as number | undefined, + durationMs: data.durationMs as number | undefined, + isPartial: false, + taskLabel: + taskMapRef.current.get(event.taskId) || event.taskId, + receivedAt: new Date(event.createdAt).getTime(), + isBackfill: true, + }; + }, + [] + ); + + /** + * Backfill historical log entries for a task from the REST API. + * Only fetches once per task ID (tracked in backfilledTasksRef). + */ + const backfillTask = useCallback( + async (taskId: string) => { + if (backfilledTasksRef.current.has(taskId)) return; + backfilledTasksRef.current.add(taskId); + + try { + const response = await listTaskEvents(taskId); + const events = response.events; + + // The API returns events in DESC order; reverse to get chronological ASC + const chronologicalEvents = [...events].reverse(); + + // Filter to output events and convert, limiting to MAX_BACKFILL_PER_TASK + const backfillEntries: MultiTaskOutputEntry[] = []; + for (const event of chronologicalEvents) { + const entry = convertTaskEventToEntry(event); + if (entry) { + backfillEntries.push(entry); + if (backfillEntries.length >= MAX_BACKFILL_PER_TASK) break; + } + } + + if (backfillEntries.length === 0) return; + + // Prepend historical entries before any existing live entries for this task, + // maintaining overall chronological order across all tasks + setEntries((prev) => { + // Merge backfill entries with existing entries, maintaining chronological order + const merged = [...backfillEntries, ...prev]; + // Sort by receivedAt to ensure proper chronological ordering + merged.sort((a, b) => a.receivedAt - b.receivedAt); + // Trim to maxEntries + if (merged.length > maxEntries) { + return merged.slice(merged.length - maxEntries); + } + return merged; + }); + } catch (e) { + console.error(`Failed to backfill task events for ${taskId}:`, e); + // Remove from backfilled set so it can be retried + backfilledTasksRef.current.delete(taskId); + } + }, + [convertTaskEventToEntry, maxEntries] + ); + // Derive task IDs from the map, stabilized to avoid unnecessary effect triggers const taskIdsKey = useMemo(() => Array.from(taskMap.keys()).sort().join(","), [taskMap]); const taskIds = useMemo(() => Array.from(taskMap.keys()), [taskIdsKey]); // eslint-disable-line react-hooks/exhaustive-deps @@ -56,6 +143,58 @@ export function useMultiTaskSubscription(options: UseMultiTaskSubscriptionOption } }, []); + const backfillTask = useCallback( + async (taskId: string, label: string) => { + if (backfilledTasksRef.current.has(taskId)) return; + backfilledTasksRef.current.add(taskId); + + try { + const response = await getTaskOutput(taskId); + if (response.entries.length === 0) return; + + const historicalEntries: MultiTaskOutputEntry[] = response.entries.map( + (entry) => ({ + taskId: entry.taskId, + messageType: entry.messageType, + content: entry.content, + toolName: entry.toolName, + toolInput: entry.toolInput, + isError: entry.isError, + costUsd: entry.costUsd, + durationMs: entry.durationMs, + isPartial: false, + taskLabel: label, + receivedAt: new Date(entry.createdAt || Date.now()).getTime(), + }) + ); + + setEntries((prev) => { + // De-duplicate by checking if content+taskId+messageType already exists + const existingKeys = new Set( + prev.map( + (e) => + `${e.taskId}:${e.messageType}:${e.content.slice(0, 100)}` + ) + ); + const newHistorical = historicalEntries.filter( + (e) => + !existingKeys.has( + `${e.taskId}:${e.messageType}:${e.content.slice(0, 100)}` + ) + ); + const combined = [...newHistorical, ...prev]; + if (combined.length > maxEntries) { + return combined.slice(combined.length - maxEntries); + } + return combined; + }); + } catch (e) { + console.error(`Failed to backfill task ${taskId}:`, e); + } + }, + [maxEntries] + ); + const connect = useCallback(() => { const currentState = wsRef.current?.readyState; if (currentState === WebSocket.OPEN || currentState === WebSocket.CONNECTING) { @@ -150,6 +289,11 @@ export function useMultiTaskSubscription(options: UseMultiTaskSubscriptionOption // Set desired subscriptions and connect subscribedTasksRef.current = newTaskIds; connect(); + // Backfill all initial tasks + for (const taskId of newTaskIds) { + const label = taskMapRef.current.get(taskId) || taskId; + backfillTask(taskId, label); + } return; } @@ -160,13 +304,15 @@ export function useMultiTaskSubscription(options: UseMultiTaskSubscriptionOption } } - // Subscribe to new tasks + // Subscribe to new tasks and backfill their history for (const newId of newTaskIds) { if (!subscribedTasksRef.current.has(newId)) { subscribeToTask(ws, newId); + const label = taskMapRef.current.get(newId) || newId; + backfillTask(newId, label); } } - }, [taskIds, enabled, connect, subscribeToTask, unsubscribeFromTask]); + }, [taskIds, enabled, connect, subscribeToTask, unsubscribeFromTask, backfillTask]); // Cleanup on unmount useEffect(() => { @@ -182,6 +328,7 @@ export function useMultiTaskSubscription(options: UseMultiTaskSubscriptionOption const clearEntries = useCallback(() => { setEntries([]); + backfilledTasksRef.current.clear(); }, []); return { |
