summaryrefslogtreecommitdiff
path: root/frontend/src/services/taskWs.ts
blob: 832648ee406d1662739cebfa0ee22a0effd2999b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import { VNWebSocket } from './ws'

export interface TaskOutputMessage {
  task_id: string
  message_type: string
  content: string
  tool_name?: string
  tool_input?: string
  is_error?: boolean
  cost_usd?: number
  duration_ms?: number
  is_partial?: boolean
}

type TaskOutputCallback = (msg: TaskOutputMessage) => void

export class TaskOutputStream {
  private ws: VNWebSocket
  private listeners: Map<string, Set<TaskOutputCallback>> = new Map()
  private connected = false

  constructor() {
    const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
    this.ws = new VNWebSocket(`${protocol}//${window.location.host}/ws/tasks`)

    this.ws.on('message', (data: any) => {
      if (data && data.type === 'TaskOutput') {
        const payload = data.payload || data
        const taskId = payload.task_id
        if (taskId && this.listeners.has(taskId)) {
          this.listeners.get(taskId)!.forEach(cb => cb(payload))
        }
      }
    })

    this.ws.on('open', () => {
      this.connected = true
      // Re-subscribe all active subscriptions on reconnect
      for (const taskId of this.listeners.keys()) {
        this.ws.send({ type: 'SubscribeOutput', task_id: taskId })
      }
    })

    this.ws.on('close', () => {
      this.connected = false
    })
  }

  connect() {
    this.ws.connect()
  }

  subscribe(taskId: string, callback: TaskOutputCallback) {
    if (!this.listeners.has(taskId)) {
      this.listeners.set(taskId, new Set())
      // Only send subscribe if this is a new task subscription
      this.ws.send({ type: 'SubscribeOutput', task_id: taskId })
    }
    this.listeners.get(taskId)!.add(callback)
  }

  unsubscribe(taskId: string, callback?: TaskOutputCallback) {
    if (!this.listeners.has(taskId)) return

    if (callback) {
      this.listeners.get(taskId)!.delete(callback)
      if (this.listeners.get(taskId)!.size > 0) return
    }

    this.listeners.delete(taskId)
    this.ws.send({ type: 'UnsubscribeOutput', task_id: taskId })
  }

  close() {
    this.ws.close()
    this.listeners.clear()
  }
}

let instance: TaskOutputStream | null = null

export function getTaskOutputStream(): TaskOutputStream {
  if (!instance) {
    instance = new TaskOutputStream()
    instance.connect()
  }
  return instance
}