import React, { useEffect, useRef, useState, useCallback } from 'react'; interface StepLogFeedProps { taskId: string; stepName: string; stepStatus: string; onCollapse: () => void; } interface LogEntry { timestamp: string; content: string; type: 'stdout' | 'stderr' | 'system' | 'user'; } /** * Live log feed for an expanded step row. * Connects via WebSocket to stream task output and allows * sending messages (comments) and interrupting the task. */ export function StepLogFeed({ taskId, stepName, stepStatus, onCollapse }: StepLogFeedProps) { const [logs, setLogs] = useState([]); const [message, setMessage] = useState(''); const [sending, setSending] = useState(false); const [connected, setConnected] = useState(false); const [error, setError] = useState(null); const logsEndRef = useRef(null); const wsRef = useRef(null); const logContainerRef = useRef(null); const inputRef = useRef(null); const isActive = ['running', 'starting'].includes(stepStatus.toLowerCase()); // Auto-scroll to bottom when new logs arrive useEffect(() => { logsEndRef.current?.scrollIntoView({ behavior: 'smooth' }); }, [logs]); // Connect to WebSocket for live streaming useEffect(() => { if (!taskId) return; const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; const wsUrl = `${protocol}//${window.location.host}/api/v1/mesh/tasks/subscribe`; let ws: WebSocket; let reconnectTimer: ReturnType; let shouldReconnect = true; function connect() { try { ws = new WebSocket(wsUrl); wsRef.current = ws; ws.addEventListener('open', () => { setConnected(true); setError(null); // Subscribe to this specific task ws.send(JSON.stringify({ type: 'subscribe', taskId })); }); ws.addEventListener('message', (evt) => { try { const data = JSON.parse(evt.data); // Handle different message formats from the backend if (data.taskId === taskId || data.task_id === taskId) { const entry: LogEntry = { timestamp: data.timestamp || new Date().toISOString(), content: data.content || data.output || data.message || JSON.stringify(data), type: data.type || data.stream || 'stdout', }; setLogs(prev => [...prev, entry]); } } catch { // Non-JSON message, treat as raw log setLogs(prev => [...prev, { timestamp: new Date().toISOString(), content: evt.data, type: 'stdout', }]); } }); ws.addEventListener('close', () => { setConnected(false); wsRef.current = null; if (shouldReconnect && isActive) { reconnectTimer = setTimeout(connect, 3000); } }); ws.addEventListener('error', () => { setConnected(false); setError('WebSocket connection failed'); }); } catch (err) { setError('Failed to connect to log stream'); } } connect(); return () => { shouldReconnect = false; clearTimeout(reconnectTimer); if (wsRef.current) { wsRef.current.close(); wsRef.current = null; } }; }, [taskId, isActive]); // Keyboard shortcut: Escape to collapse useEffect(() => { const handler = (e: KeyboardEvent) => { if (e.key === 'Escape') { onCollapse(); } }; document.addEventListener('keydown', handler); return () => document.removeEventListener('keydown', handler); }, [onCollapse]); // Send a message/comment to the task const handleSendMessage = useCallback(async () => { if (!message.trim() || !taskId || sending) return; setSending(true); try { const response = await fetch(`/api/v1/mesh/tasks/${taskId}/message`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ message: message.trim() }), }); if (!response.ok) { const body = await response.json().catch(() => ({ message: response.statusText })); throw new Error(body.message || body.error || `HTTP ${response.status}`); } // Add as a user message in the log setLogs(prev => [...prev, { timestamp: new Date().toISOString(), content: message.trim(), type: 'user', }]); setMessage(''); inputRef.current?.focus(); } catch (err) { setError(err instanceof Error ? err.message : 'Failed to send message'); } finally { setSending(false); } }, [message, taskId, sending]); const handleKeyDown = useCallback((e: React.KeyboardEvent) => { if (e.key === 'Enter' && !e.shiftKey) { e.preventDefault(); handleSendMessage(); } // Prevent Escape from bubbling when input is focused if (e.key === 'Escape') { e.stopPropagation(); inputRef.current?.blur(); } }, [handleSendMessage]); // Interrupt the running task const handleInterrupt = useCallback(async () => { if (!taskId) return; try { // Send a special interrupt message const response = await fetch(`/api/v1/mesh/tasks/${taskId}/message`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ message: '/interrupt' }), }); if (!response.ok) { throw new Error(`HTTP ${response.status}`); } setLogs(prev => [...prev, { timestamp: new Date().toISOString(), content: 'Interrupt signal sent', type: 'system', }]); } catch (err) { setError(err instanceof Error ? err.message : 'Failed to interrupt'); } }, [taskId]); const formatTimestamp = (ts: string) => { try { return new Date(ts).toLocaleTimeString([], { hour: '2-digit', minute: '2-digit', second: '2-digit' }); } catch { return ''; } }; return (
{/* Header */}
{stepName} - Logs {connected ? 'Live' : 'Disconnected'}
{isActive && ( )}
{/* Log content */}
{logs.length === 0 && !error && (
{isActive ? 'Waiting for log output...' : 'No logs available for this step.'}
)} {error && (
{error}
)} {logs.map((entry, idx) => (
{formatTimestamp(entry.timestamp)} {entry.content}
))}
{/* Message input (comment/interrupt controls) */} {isActive && (
setMessage(e.target.value)} onKeyDown={handleKeyDown} disabled={sending} />
)}
); }