import { VNWebSocket } from './ws' export interface TaskOutputMessage { task_id: string message_type: string content: string tool_name?: string tool_input?: string is_error?: boolean cost_usd?: number duration_ms?: number is_partial?: boolean } type TaskOutputCallback = (msg: TaskOutputMessage) => void export class TaskOutputStream { private ws: VNWebSocket private listeners: Map> = new Map() private connected = false constructor() { const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:' this.ws = new VNWebSocket(`${protocol}//${window.location.host}/ws/tasks`) this.ws.on('message', (data: any) => { if (data && data.type === 'TaskOutput') { const payload = data.payload || data const taskId = payload.task_id if (taskId && this.listeners.has(taskId)) { this.listeners.get(taskId)!.forEach(cb => cb(payload)) } } }) this.ws.on('open', () => { this.connected = true // Re-subscribe all active subscriptions on reconnect for (const taskId of this.listeners.keys()) { this.ws.send({ type: 'SubscribeOutput', task_id: taskId }) } }) this.ws.on('close', () => { this.connected = false }) } connect() { this.ws.connect() } subscribe(taskId: string, callback: TaskOutputCallback) { if (!this.listeners.has(taskId)) { this.listeners.set(taskId, new Set()) // Only send subscribe if this is a new task subscription this.ws.send({ type: 'SubscribeOutput', task_id: taskId }) } this.listeners.get(taskId)!.add(callback) } unsubscribe(taskId: string, callback?: TaskOutputCallback) { if (!this.listeners.has(taskId)) return if (callback) { this.listeners.get(taskId)!.delete(callback) if (this.listeners.get(taskId)!.size > 0) return } this.listeners.delete(taskId) this.ws.send({ type: 'UnsubscribeOutput', task_id: taskId }) } close() { this.ws.close() this.listeners.clear() } } let instance: TaskOutputStream | null = null export function getTaskOutputStream(): TaskOutputStream { if (!instance) { instance = new TaskOutputStream() instance.connect() } return instance }