diff options
Diffstat (limited to 'frontend/src/services/taskWs.ts')
| -rw-r--r-- | frontend/src/services/taskWs.ts | 88 |
1 files changed, 88 insertions, 0 deletions
diff --git a/frontend/src/services/taskWs.ts b/frontend/src/services/taskWs.ts new file mode 100644 index 0000000..832648e --- /dev/null +++ b/frontend/src/services/taskWs.ts @@ -0,0 +1,88 @@ +import { VNWebSocket } from './ws' + +export interface TaskOutputMessage { + task_id: string + message_type: string + content: string + tool_name?: string + tool_input?: string + is_error?: boolean + cost_usd?: number + duration_ms?: number + is_partial?: boolean +} + +type TaskOutputCallback = (msg: TaskOutputMessage) => void + +export class TaskOutputStream { + private ws: VNWebSocket + private listeners: Map<string, Set<TaskOutputCallback>> = new Map() + private connected = false + + constructor() { + const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:' + this.ws = new VNWebSocket(`${protocol}//${window.location.host}/ws/tasks`) + + this.ws.on('message', (data: any) => { + if (data && data.type === 'TaskOutput') { + const payload = data.payload || data + const taskId = payload.task_id + if (taskId && this.listeners.has(taskId)) { + this.listeners.get(taskId)!.forEach(cb => cb(payload)) + } + } + }) + + this.ws.on('open', () => { + this.connected = true + // Re-subscribe all active subscriptions on reconnect + for (const taskId of this.listeners.keys()) { + this.ws.send({ type: 'SubscribeOutput', task_id: taskId }) + } + }) + + this.ws.on('close', () => { + this.connected = false + }) + } + + connect() { + this.ws.connect() + } + + subscribe(taskId: string, callback: TaskOutputCallback) { + if (!this.listeners.has(taskId)) { + this.listeners.set(taskId, new Set()) + // Only send subscribe if this is a new task subscription + this.ws.send({ type: 'SubscribeOutput', task_id: taskId }) + } + this.listeners.get(taskId)!.add(callback) + } + + unsubscribe(taskId: string, callback?: TaskOutputCallback) { + if (!this.listeners.has(taskId)) return + + if (callback) { + this.listeners.get(taskId)!.delete(callback) + if (this.listeners.get(taskId)!.size > 0) return + } + + this.listeners.delete(taskId) + this.ws.send({ type: 'UnsubscribeOutput', task_id: taskId }) + } + + close() { + this.ws.close() + this.listeners.clear() + } +} + +let instance: TaskOutputStream | null = null + +export function getTaskOutputStream(): TaskOutputStream { + if (!instance) { + instance = new TaskOutputStream() + instance.connect() + } + return instance +} |
