summaryrefslogtreecommitdiff
path: root/frontend/src/services/taskWs.ts
diff options
context:
space:
mode:
Diffstat (limited to 'frontend/src/services/taskWs.ts')
-rw-r--r--frontend/src/services/taskWs.ts88
1 files changed, 88 insertions, 0 deletions
diff --git a/frontend/src/services/taskWs.ts b/frontend/src/services/taskWs.ts
new file mode 100644
index 0000000..832648e
--- /dev/null
+++ b/frontend/src/services/taskWs.ts
@@ -0,0 +1,88 @@
+import { VNWebSocket } from './ws'
+
+export interface TaskOutputMessage {
+ task_id: string
+ message_type: string
+ content: string
+ tool_name?: string
+ tool_input?: string
+ is_error?: boolean
+ cost_usd?: number
+ duration_ms?: number
+ is_partial?: boolean
+}
+
+type TaskOutputCallback = (msg: TaskOutputMessage) => void
+
+export class TaskOutputStream {
+ private ws: VNWebSocket
+ private listeners: Map<string, Set<TaskOutputCallback>> = new Map()
+ private connected = false
+
+ constructor() {
+ const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
+ this.ws = new VNWebSocket(`${protocol}//${window.location.host}/ws/tasks`)
+
+ this.ws.on('message', (data: any) => {
+ if (data && data.type === 'TaskOutput') {
+ const payload = data.payload || data
+ const taskId = payload.task_id
+ if (taskId && this.listeners.has(taskId)) {
+ this.listeners.get(taskId)!.forEach(cb => cb(payload))
+ }
+ }
+ })
+
+ this.ws.on('open', () => {
+ this.connected = true
+ // Re-subscribe all active subscriptions on reconnect
+ for (const taskId of this.listeners.keys()) {
+ this.ws.send({ type: 'SubscribeOutput', task_id: taskId })
+ }
+ })
+
+ this.ws.on('close', () => {
+ this.connected = false
+ })
+ }
+
+ connect() {
+ this.ws.connect()
+ }
+
+ subscribe(taskId: string, callback: TaskOutputCallback) {
+ if (!this.listeners.has(taskId)) {
+ this.listeners.set(taskId, new Set())
+ // Only send subscribe if this is a new task subscription
+ this.ws.send({ type: 'SubscribeOutput', task_id: taskId })
+ }
+ this.listeners.get(taskId)!.add(callback)
+ }
+
+ unsubscribe(taskId: string, callback?: TaskOutputCallback) {
+ if (!this.listeners.has(taskId)) return
+
+ if (callback) {
+ this.listeners.get(taskId)!.delete(callback)
+ if (this.listeners.get(taskId)!.size > 0) return
+ }
+
+ this.listeners.delete(taskId)
+ this.ws.send({ type: 'UnsubscribeOutput', task_id: taskId })
+ }
+
+ close() {
+ this.ws.close()
+ this.listeners.clear()
+ }
+}
+
+let instance: TaskOutputStream | null = null
+
+export function getTaskOutputStream(): TaskOutputStream {
+ if (!instance) {
+ instance = new TaskOutputStream()
+ instance.connect()
+ }
+ return instance
+}