/** * MakimaAgent — Cloudflare Durable Object Agent * * Acts as an edge relay between the Makima server and downstream daemon * instances. It does NOT execute tasks itself (that requires native process * spawning, git, filesystem access). Instead it: * * 1. Maintains a persistent WebSocket to the Makima server (upstream). * 2. Accepts WebSocket connections from native daemons (downstream). * 3. Relays messages bidirectionally between upstream and downstream. * 4. Tracks task dispatch history and daemon health in SQLite. * 5. Exposes an HTTP API for status and management. */ import { Agent } from "agents"; import type { Connection, ConnectionContext, WSMessage } from "agents"; import type { Env, AgentState, DaemonMessage, DaemonCommand, TaskRecord, DownstreamDaemon, StatusResponse, TaskHistoryResponse, } from "./types"; // --------------------------------------------------------------------------- // Constants // --------------------------------------------------------------------------- const HEARTBEAT_INTERVAL_MS = 30_000; // 30s const RECONNECT_BASE_MS = 1_000; // 1s initial backoff const RECONNECT_MAX_MS = 60_000; // 60s max backoff const MAX_RECONNECT_ATTEMPTS = 20; const TASK_HISTORY_MAX = 500; // --------------------------------------------------------------------------- // MakimaAgent Durable Object // --------------------------------------------------------------------------- export class MakimaAgent extends Agent { // -- Upstream (Makima server) connection -- private upstreamWs: WebSocket | null = null; private heartbeatInterval: ReturnType | null = null; private reconnectTimeout: ReturnType | null = null; // -- Downstream (native daemon) connections -- private downstreamDaemons: Map = new Map(); // -- Bookkeeping -- private totalTasksProcessed = 0; // -- Default state for new agent instances -- initialState: AgentState = { connected: false, lastConnectedAt: null, lastHeartbeatAt: null, reconnectAttempts: 0, daemonId: null, activeTasks: [], }; // ========================================================================= // Lifecycle // ========================================================================= /** * Called once when the Durable Object is first instantiated. * Sets up the SQLite schema and establishes the upstream connection. */ async onStart(): Promise { this.initDatabase(); this.loadStats(); // Attempt upstream connection await this.connectUpstream(); } // ========================================================================= // SQLite Schema // ========================================================================= private initDatabase(): void { // Use the raw storage sql for DDL since tagged template doesn't support multi-statement this.ctx.storage.sql.exec(` CREATE TABLE IF NOT EXISTS task_history ( task_id TEXT PRIMARY KEY, task_name TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', dispatched_to TEXT, received_at TEXT NOT NULL, dispatched_at TEXT, completed_at TEXT, error TEXT ) `); this.ctx.storage.sql.exec(` CREATE TABLE IF NOT EXISTS connection_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, event TEXT NOT NULL, timestamp TEXT NOT NULL, details TEXT ) `); this.ctx.storage.sql.exec(` CREATE INDEX IF NOT EXISTS idx_task_history_status ON task_history(status) `); this.ctx.storage.sql.exec(` CREATE INDEX IF NOT EXISTS idx_task_history_received ON task_history(received_at DESC) `); } private loadStats(): void { const rows = this.sql<{ cnt: number }>`SELECT COUNT(*) as cnt FROM task_history`; this.totalTasksProcessed = rows[0]?.cnt ?? 0; } // ========================================================================= // Upstream (Makima Server) Connection // ========================================================================= private async connectUpstream(): Promise { const serverUrl = this.env.MAKIMA_SERVER_URL; const apiKey = this.env.MAKIMA_API_KEY; if (!serverUrl || !apiKey) { this.logConnection( "error", "Missing MAKIMA_SERVER_URL or MAKIMA_API_KEY" ); return; } // Normalize URL: ensure it ends with /ws/daemon let wsUrl = serverUrl; if (!wsUrl.endsWith("/ws/daemon")) { wsUrl = wsUrl.replace(/\/$/, "") + "/ws/daemon"; } try { // Cloudflare Workers use the standard WebSocket API via fetch upgrade const resp = await fetch(wsUrl, { headers: { Upgrade: "websocket", }, }); const ws = resp.webSocket; if (!ws) { throw new Error(`WebSocket upgrade failed (status ${resp.status})`); } ws.accept(); this.upstreamWs = ws; // Authenticate immediately const authMsg: DaemonMessage = { type: "authenticate", apiKey, machineId: `cf-agent-${this.name}`, hostname: `cloudflare-edge-${this.name}`, maxConcurrentTasks: 0, // Relay only — does not execute tasks directly }; ws.send(JSON.stringify(authMsg)); // Wire up event handlers ws.addEventListener("message", (event) => { this.handleUpstreamMessage(event); }); ws.addEventListener("close", (event) => { this.handleUpstreamClose(event.code, event.reason); }); ws.addEventListener("error", () => { this.logConnection("error", "Upstream WebSocket error"); this.handleUpstreamClose(1006, "WebSocket error"); }); // Start heartbeat this.startHeartbeat(); this.logConnection("connected", `Connected to ${wsUrl}`); } catch (err) { const msg = err instanceof Error ? err.message : String(err); this.logConnection("error", `Failed to connect: ${msg}`); this.scheduleReconnect(); } } private handleUpstreamMessage(event: MessageEvent): void { let command: DaemonCommand; try { command = JSON.parse( typeof event.data === "string" ? event.data : "" ) as DaemonCommand; } catch { this.logConnection("error", "Failed to parse upstream message"); return; } switch (command.type) { case "authenticated": this.onUpstreamAuthenticated(command.daemonId); break; case "spawnTask": this.onSpawnTask(command); break; case "pauseTask": case "resumeTask": case "interruptTask": case "sendMessage": // Forward task control commands to the appropriate downstream daemon this.forwardToDownstream(command); break; default: // Forward any unrecognized commands to all downstream daemons this.broadcastToDownstream(JSON.stringify(command)); break; } } private onUpstreamAuthenticated(daemonId: string): void { this.setState({ ...this.state, connected: true, lastConnectedAt: new Date().toISOString(), reconnectAttempts: 0, daemonId, }); this.logConnection( "authenticated", `Authenticated as daemon ${daemonId}` ); } private onSpawnTask(command: Extract): void { // Record in task history const now = new Date().toISOString(); const taskId = command.taskId; const taskName = command.taskName; this.sql`INSERT OR REPLACE INTO task_history (task_id, task_name, status, received_at) VALUES (${taskId}, ${taskName}, 'pending', ${now})`; this.totalTasksProcessed++; // Find the best downstream daemon to dispatch to const daemon = this.selectDownstreamDaemon(); if (daemon) { this.dispatchTaskToDownstream(daemon, command); } else { // No downstream daemons connected — store as pending this.logConnection( "warning", `No downstream daemons available for task ${command.taskId}` ); // Notify upstream that we can't handle this task right now this.sendUpstream({ type: "taskStatusChange", taskId: command.taskId, oldStatus: "pending", newStatus: "pending", }); } } private dispatchTaskToDownstream( daemon: DownstreamDaemon, command: Extract ): void { try { daemon.ws.send(JSON.stringify(command)); daemon.activeTasks.add(command.taskId); const now = new Date().toISOString(); const did = daemon.id; const tid = command.taskId; this.sql`UPDATE task_history SET status = 'dispatched', dispatched_to = ${did}, dispatched_at = ${now} WHERE task_id = ${tid}`; this.setState({ ...this.state, activeTasks: [...this.state.activeTasks, command.taskId], }); this.logConnection( "dispatch", `Task ${command.taskId} dispatched to daemon ${daemon.hostname}` ); } catch (err) { this.logConnection( "error", `Failed to dispatch task ${command.taskId}: ${err}` ); } } private selectDownstreamDaemon(): DownstreamDaemon | null { let best: DownstreamDaemon | null = null; let leastTasks = Infinity; for (const daemon of this.downstreamDaemons.values()) { if (daemon.activeTasks.size < daemon.maxConcurrentTasks) { if (daemon.activeTasks.size < leastTasks) { leastTasks = daemon.activeTasks.size; best = daemon; } } } return best; } private forwardToDownstream(command: DaemonCommand): void { // Find which downstream daemon has this task const taskId = "taskId" in command ? (command as { taskId: string }).taskId : null; if (!taskId) return; for (const daemon of this.downstreamDaemons.values()) { if (daemon.activeTasks.has(taskId)) { daemon.ws.send(JSON.stringify(command)); return; } } this.logConnection( "warning", `No downstream daemon found for task ${taskId}` ); } private broadcastToDownstream(message: string): void { for (const daemon of this.downstreamDaemons.values()) { try { daemon.ws.send(message); } catch { // Ignore send failures on individual daemons } } } // ========================================================================= // Downstream (Native Daemon) WebSocket Handling // ========================================================================= /** * Called by the Agents SDK when a client connects via WebSocket. * In our architecture, "clients" are native Makima daemons that connect * to this edge relay to receive task dispatches. */ onConnect(connection: Connection, ctx: ConnectionContext): void { const daemonId = crypto.randomUUID(); const daemon: DownstreamDaemon = { id: daemonId, ws: connection as unknown as WebSocket, hostname: "unknown", maxConcurrentTasks: 10, activeTasks: new Set(), lastHeartbeat: new Date().toISOString(), }; this.downstreamDaemons.set(daemonId, daemon); this.logConnection( "downstream_connect", `Downstream daemon connected: ${daemonId}` ); } /** * Called when a downstream daemon sends a message. */ onMessage(connection: Connection, message: WSMessage): void { const data = typeof message === "string" ? message : message instanceof ArrayBuffer ? new TextDecoder().decode(message) : ""; let parsed: DaemonMessage; try { parsed = JSON.parse(data) as DaemonMessage; } catch { return; } // Find which downstream daemon sent this const daemon = this.findDaemonByConnection(connection); // Handle authenticate from downstream daemons if (parsed.type === "authenticate" && daemon) { daemon.hostname = (parsed as Extract).hostname; daemon.maxConcurrentTasks = (parsed as Extract).maxConcurrentTasks; // Send back a synthetic authentication confirmation connection.send( JSON.stringify({ type: "authenticated", daemonId: daemon.id, }) ); return; } // Handle heartbeats from downstream if (parsed.type === "heartbeat" && daemon) { daemon.lastHeartbeat = new Date().toISOString(); return; } // Handle task completion from downstream if (parsed.type === "taskComplete" && daemon) { const tc = parsed as Extract; daemon.activeTasks.delete(tc.taskId); const status = tc.success ? "completed" : "failed"; const now = new Date().toISOString(); const errMsg = tc.error ?? null; const tid = tc.taskId; this.sql`UPDATE task_history SET status = ${status}, completed_at = ${now}, error = ${errMsg} WHERE task_id = ${tid}`; this.setState({ ...this.state, activeTasks: this.state.activeTasks.filter((id) => id !== tc.taskId), }); } // Forward all daemon messages to the upstream server this.sendUpstream(parsed); } /** * Called when a downstream daemon disconnects. */ onClose(connection: Connection, code: number, reason: string, wasClean: boolean): void { const daemon = this.findDaemonByConnection(connection); if (daemon) { this.logConnection( "downstream_disconnect", `Downstream daemon ${daemon.hostname} disconnected: code=${code} reason=${reason} clean=${wasClean}` ); this.downstreamDaemons.delete(daemon.id); } } onError(connectionOrError: Connection | unknown, error?: unknown): void { // Handle the overloaded signature: (connection, error) or (error) if (error !== undefined) { const connection = connectionOrError as Connection; const daemon = this.findDaemonByConnection(connection); if (daemon) { this.logConnection( "error", `Downstream daemon ${daemon.hostname} error: ${error}` ); } } else { this.logConnection("error", `Agent error: ${connectionOrError}`); } } private findDaemonByConnection(connection: Connection): DownstreamDaemon | undefined { for (const daemon of this.downstreamDaemons.values()) { if (daemon.ws === (connection as unknown as WebSocket)) { return daemon; } } return undefined; } // ========================================================================= // Heartbeat // ========================================================================= private startHeartbeat(): void { this.stopHeartbeat(); this.heartbeatInterval = setInterval(() => { this.sendHeartbeat(); }, HEARTBEAT_INTERVAL_MS); } private stopHeartbeat(): void { if (this.heartbeatInterval) { clearInterval(this.heartbeatInterval); this.heartbeatInterval = null; } } private sendHeartbeat(): void { const activeTasks = this.state.activeTasks; this.sendUpstream({ type: "heartbeat", activeTasks, }); this.setState({ ...this.state, lastHeartbeatAt: new Date().toISOString(), }); } // ========================================================================= // Upstream Reconnection // ========================================================================= private handleUpstreamClose(code: number, reason: string): void { this.upstreamWs = null; this.stopHeartbeat(); this.setState({ ...this.state, connected: false, }); this.logConnection( "disconnected", `Upstream disconnected: code=${code} reason=${reason}` ); this.scheduleReconnect(); } private scheduleReconnect(): void { if (this.reconnectTimeout) { clearTimeout(this.reconnectTimeout); } const attempts = this.state.reconnectAttempts; if (attempts >= MAX_RECONNECT_ATTEMPTS) { this.logConnection( "error", `Max reconnect attempts (${MAX_RECONNECT_ATTEMPTS}) reached — giving up` ); return; } // Exponential backoff with jitter const delay = Math.min( RECONNECT_BASE_MS * Math.pow(2, attempts) + Math.random() * RECONNECT_BASE_MS, RECONNECT_MAX_MS ); this.setState({ ...this.state, reconnectAttempts: attempts + 1, }); this.logConnection( "reconnecting", `Scheduling reconnect in ${Math.round(delay)}ms (attempt ${attempts + 1})` ); this.reconnectTimeout = setTimeout(() => { this.connectUpstream(); }, delay); } // ========================================================================= // Upstream Send Helper // ========================================================================= private sendUpstream(message: DaemonMessage): void { if (!this.upstreamWs) return; try { this.upstreamWs.send(JSON.stringify(message)); } catch { this.logConnection("error", "Failed to send upstream message"); } } // ========================================================================= // Connection Logging // ========================================================================= private logConnection(event: string, details: string): void { try { const now = new Date().toISOString(); this.sql`INSERT INTO connection_log (event, timestamp, details) VALUES (${event}, ${now}, ${details})`; // Prune old logs (keep last 200) this.ctx.storage.sql.exec(` DELETE FROM connection_log WHERE id NOT IN ( SELECT id FROM connection_log ORDER BY id DESC LIMIT 200 ) `); } catch { // Swallow — logging should never break the agent } } // ========================================================================= // RPC Methods (callable from Agent SDK clients) // ========================================================================= /** Returns the current status of the edge relay agent. */ getStatus(): StatusResponse { const agentName = this.env.MAKIMA_AGENT_NAME ?? `makima-edge-${this.name}`; return { status: this.state.connected ? this.downstreamDaemons.size > 0 ? "ok" : "degraded" : "disconnected", agentName, upstreamConnected: this.state.connected, daemonId: this.state.daemonId, lastHeartbeat: this.state.lastHeartbeatAt, connectedDaemons: this.downstreamDaemons.size, activeTasks: this.state.activeTasks.length, totalTasksProcessed: this.totalTasksProcessed, }; } /** Returns paginated task history from the SQLite store. */ getTaskHistory(limit = 50, offset = 0): TaskHistoryResponse { const clampedLimit = Math.min(Math.max(1, limit), TASK_HISTORY_MAX); const rows = this.sql` SELECT task_id as taskId, task_name as taskName, status, dispatched_to as dispatchedTo, received_at as receivedAt, dispatched_at as dispatchedAt, completed_at as completedAt, error FROM task_history ORDER BY received_at DESC LIMIT ${clampedLimit} OFFSET ${offset} `; const totalRows = this.sql<{ cnt: number }>`SELECT COUNT(*) as cnt FROM task_history`; return { tasks: rows, total: totalRows[0]?.cnt ?? 0, limit: clampedLimit, offset, }; } /** Force a reconnection to the upstream Makima server. */ reconnect(): { success: boolean; message: string } { // Close existing connection if any if (this.upstreamWs) { try { this.upstreamWs.close(1000, "Manual reconnect requested"); } catch { // Ignore } this.upstreamWs = null; } this.stopHeartbeat(); // Reset reconnect counter this.setState({ ...this.state, reconnectAttempts: 0, connected: false, }); // Start fresh connection this.connectUpstream(); return { success: true, message: "Reconnection initiated", }; } // ========================================================================= // HTTP Request Handler // ========================================================================= /** * Handles HTTP requests routed to this Durable Object. * Provides a REST API for management and status. */ async onRequest(request: Request): Promise { const url = new URL(request.url); const path = url.pathname; // CORS headers for browser-based clients const corsHeaders: Record = { "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, POST, OPTIONS", "Access-Control-Allow-Headers": "Content-Type, Authorization", }; if (request.method === "OPTIONS") { return new Response(null, { status: 204, headers: corsHeaders }); } const jsonHeaders: Record = { "Content-Type": "application/json", ...corsHeaders, }; try { // GET /status — Agent status if (path === "/status" || path === "/") { return new Response(JSON.stringify(this.getStatus()), { headers: jsonHeaders, }); } // GET /tasks — Task history if (path === "/tasks" && request.method === "GET") { const limit = parseInt(url.searchParams.get("limit") ?? "50", 10); const offset = parseInt(url.searchParams.get("offset") ?? "0", 10); return new Response( JSON.stringify(this.getTaskHistory(limit, offset)), { headers: jsonHeaders } ); } // POST /reconnect — Force reconnection if (path === "/reconnect" && request.method === "POST") { return new Response(JSON.stringify(this.reconnect()), { headers: jsonHeaders, }); } // GET /logs — Connection logs if (path === "/logs" && request.method === "GET") { const logLimit = Math.min( parseInt(url.searchParams.get("limit") ?? "50", 10), 200 ); const rows = this.sql<{ id: number; event: string; timestamp: string; details: string | null; }>`SELECT id, event, timestamp, details FROM connection_log ORDER BY id DESC LIMIT ${logLimit}`; return new Response(JSON.stringify({ logs: rows }), { headers: jsonHeaders, }); } // GET /health — Simple health check if (path === "/health") { return new Response( JSON.stringify({ healthy: true, upstreamConnected: this.state.connected, }), { headers: jsonHeaders } ); } return new Response(JSON.stringify({ error: "Not found" }), { status: 404, headers: jsonHeaders, }); } catch (err) { return new Response( JSON.stringify({ error: err instanceof Error ? err.message : "Internal error", }), { status: 500, headers: jsonHeaders } ); } } }