diff options
| author | soryu <soryu@soryu.co> | 2026-02-22 14:39:14 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-02-22 14:39:14 +0000 |
| commit | 6a34a6f3c423a7c57616762eb4cea2b7da52eaf3 (patch) | |
| tree | 7c596eac896918466e7ef3f149b02333fef09212 /makima/cloudflare-agent/src/agent.ts | |
| parent | 0523765af84492640928d571f481e17b26008b13 (diff) | |
| download | soryu-6a34a6f3c423a7c57616762eb4cea2b7da52eaf3.tar.gz soryu-6a34a6f3c423a7c57616762eb4cea2b7da52eaf3.zip | |
feat: Add daemon page with download binary and Cloudflare Agent setup (#77)
* feat: soryu-co/soryu - makima: Create DaemonList and DaemonDetail page components
* feat: soryu-co/soryu - makima: Add daemon page routes, CSS styles, and navigation
* feat: soryu-co/soryu - makima: Create daemon page with download and monitoring
* WIP: heartbeat checkpoint
* WIP: heartbeat checkpoint
* feat: soryu-co/soryu - makima: Integrate Cloudflare Agent setup into daemon page
Diffstat (limited to 'makima/cloudflare-agent/src/agent.ts')
| -rw-r--r-- | makima/cloudflare-agent/src/agent.ts | 773 |
1 files changed, 773 insertions, 0 deletions
diff --git a/makima/cloudflare-agent/src/agent.ts b/makima/cloudflare-agent/src/agent.ts new file mode 100644 index 0000000..a5a7823 --- /dev/null +++ b/makima/cloudflare-agent/src/agent.ts @@ -0,0 +1,773 @@ +/** + * MakimaAgent — Cloudflare Durable Object Agent + * + * Acts as an edge relay between the Makima server and downstream daemon + * instances. It does NOT execute tasks itself (that requires native process + * spawning, git, filesystem access). Instead it: + * + * 1. Maintains a persistent WebSocket to the Makima server (upstream). + * 2. Accepts WebSocket connections from native daemons (downstream). + * 3. Relays messages bidirectionally between upstream and downstream. + * 4. Tracks task dispatch history and daemon health in SQLite. + * 5. Exposes an HTTP API for status and management. + */ + +import { Agent } from "agents"; +import type { Connection, ConnectionContext, WSMessage } from "agents"; +import type { + Env, + AgentState, + DaemonMessage, + DaemonCommand, + TaskRecord, + DownstreamDaemon, + StatusResponse, + TaskHistoryResponse, +} from "./types"; + +// --------------------------------------------------------------------------- +// Constants +// --------------------------------------------------------------------------- + +const HEARTBEAT_INTERVAL_MS = 30_000; // 30s +const RECONNECT_BASE_MS = 1_000; // 1s initial backoff +const RECONNECT_MAX_MS = 60_000; // 60s max backoff +const MAX_RECONNECT_ATTEMPTS = 20; +const TASK_HISTORY_MAX = 500; + +// --------------------------------------------------------------------------- +// MakimaAgent Durable Object +// --------------------------------------------------------------------------- + +export class MakimaAgent extends Agent<Env, AgentState> { + // -- Upstream (Makima server) connection -- + private upstreamWs: WebSocket | null = null; + private heartbeatInterval: ReturnType<typeof setInterval> | null = null; + private reconnectTimeout: ReturnType<typeof setTimeout> | null = null; + + // -- Downstream (native daemon) connections -- + private downstreamDaemons: Map<string, DownstreamDaemon> = new Map(); + + // -- Bookkeeping -- + private totalTasksProcessed = 0; + + // -- Default state for new agent instances -- + initialState: AgentState = { + connected: false, + lastConnectedAt: null, + lastHeartbeatAt: null, + reconnectAttempts: 0, + daemonId: null, + activeTasks: [], + }; + + // ========================================================================= + // Lifecycle + // ========================================================================= + + /** + * Called once when the Durable Object is first instantiated. + * Sets up the SQLite schema and establishes the upstream connection. + */ + async onStart(): Promise<void> { + this.initDatabase(); + this.loadStats(); + + // Attempt upstream connection + await this.connectUpstream(); + } + + // ========================================================================= + // SQLite Schema + // ========================================================================= + + private initDatabase(): void { + // Use the raw storage sql for DDL since tagged template doesn't support multi-statement + this.ctx.storage.sql.exec(` + CREATE TABLE IF NOT EXISTS task_history ( + task_id TEXT PRIMARY KEY, + task_name TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + dispatched_to TEXT, + received_at TEXT NOT NULL, + dispatched_at TEXT, + completed_at TEXT, + error TEXT + ) + `); + this.ctx.storage.sql.exec(` + CREATE TABLE IF NOT EXISTS connection_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + event TEXT NOT NULL, + timestamp TEXT NOT NULL, + details TEXT + ) + `); + this.ctx.storage.sql.exec(` + CREATE INDEX IF NOT EXISTS idx_task_history_status + ON task_history(status) + `); + this.ctx.storage.sql.exec(` + CREATE INDEX IF NOT EXISTS idx_task_history_received + ON task_history(received_at DESC) + `); + } + + private loadStats(): void { + const rows = this.sql<{ cnt: number }>`SELECT COUNT(*) as cnt FROM task_history`; + this.totalTasksProcessed = rows[0]?.cnt ?? 0; + } + + // ========================================================================= + // Upstream (Makima Server) Connection + // ========================================================================= + + private async connectUpstream(): Promise<void> { + const serverUrl = this.env.MAKIMA_SERVER_URL; + const apiKey = this.env.MAKIMA_API_KEY; + + if (!serverUrl || !apiKey) { + this.logConnection( + "error", + "Missing MAKIMA_SERVER_URL or MAKIMA_API_KEY" + ); + return; + } + + // Normalize URL: ensure it ends with /ws/daemon + let wsUrl = serverUrl; + if (!wsUrl.endsWith("/ws/daemon")) { + wsUrl = wsUrl.replace(/\/$/, "") + "/ws/daemon"; + } + + try { + // Cloudflare Workers use the standard WebSocket API via fetch upgrade + const resp = await fetch(wsUrl, { + headers: { + Upgrade: "websocket", + }, + }); + + const ws = resp.webSocket; + if (!ws) { + throw new Error(`WebSocket upgrade failed (status ${resp.status})`); + } + + ws.accept(); + this.upstreamWs = ws; + + // Authenticate immediately + const authMsg: DaemonMessage = { + type: "authenticate", + apiKey, + machineId: `cf-agent-${this.name}`, + hostname: `cloudflare-edge-${this.name}`, + maxConcurrentTasks: 0, // Relay only — does not execute tasks directly + }; + ws.send(JSON.stringify(authMsg)); + + // Wire up event handlers + ws.addEventListener("message", (event) => { + this.handleUpstreamMessage(event); + }); + + ws.addEventListener("close", (event) => { + this.handleUpstreamClose(event.code, event.reason); + }); + + ws.addEventListener("error", () => { + this.logConnection("error", "Upstream WebSocket error"); + this.handleUpstreamClose(1006, "WebSocket error"); + }); + + // Start heartbeat + this.startHeartbeat(); + + this.logConnection("connected", `Connected to ${wsUrl}`); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + this.logConnection("error", `Failed to connect: ${msg}`); + this.scheduleReconnect(); + } + } + + private handleUpstreamMessage(event: MessageEvent): void { + let command: DaemonCommand; + try { + command = JSON.parse( + typeof event.data === "string" ? event.data : "" + ) as DaemonCommand; + } catch { + this.logConnection("error", "Failed to parse upstream message"); + return; + } + + switch (command.type) { + case "authenticated": + this.onUpstreamAuthenticated(command.daemonId); + break; + + case "spawnTask": + this.onSpawnTask(command); + break; + + case "pauseTask": + case "resumeTask": + case "interruptTask": + case "sendMessage": + // Forward task control commands to the appropriate downstream daemon + this.forwardToDownstream(command); + break; + + default: + // Forward any unrecognized commands to all downstream daemons + this.broadcastToDownstream(JSON.stringify(command)); + break; + } + } + + private onUpstreamAuthenticated(daemonId: string): void { + this.setState({ + ...this.state, + connected: true, + lastConnectedAt: new Date().toISOString(), + reconnectAttempts: 0, + daemonId, + }); + + this.logConnection( + "authenticated", + `Authenticated as daemon ${daemonId}` + ); + } + + private onSpawnTask(command: Extract<DaemonCommand, { type: "spawnTask" }>): void { + // Record in task history + const now = new Date().toISOString(); + const taskId = command.taskId; + const taskName = command.taskName; + this.sql`INSERT OR REPLACE INTO task_history (task_id, task_name, status, received_at) + VALUES (${taskId}, ${taskName}, 'pending', ${now})`; + this.totalTasksProcessed++; + + // Find the best downstream daemon to dispatch to + const daemon = this.selectDownstreamDaemon(); + if (daemon) { + this.dispatchTaskToDownstream(daemon, command); + } else { + // No downstream daemons connected — store as pending + this.logConnection( + "warning", + `No downstream daemons available for task ${command.taskId}` + ); + // Notify upstream that we can't handle this task right now + this.sendUpstream({ + type: "taskStatusChange", + taskId: command.taskId, + oldStatus: "pending", + newStatus: "pending", + }); + } + } + + private dispatchTaskToDownstream( + daemon: DownstreamDaemon, + command: Extract<DaemonCommand, { type: "spawnTask" }> + ): void { + try { + daemon.ws.send(JSON.stringify(command)); + daemon.activeTasks.add(command.taskId); + + const now = new Date().toISOString(); + const did = daemon.id; + const tid = command.taskId; + this.sql`UPDATE task_history + SET status = 'dispatched', dispatched_to = ${did}, dispatched_at = ${now} + WHERE task_id = ${tid}`; + + this.setState({ + ...this.state, + activeTasks: [...this.state.activeTasks, command.taskId], + }); + + this.logConnection( + "dispatch", + `Task ${command.taskId} dispatched to daemon ${daemon.hostname}` + ); + } catch (err) { + this.logConnection( + "error", + `Failed to dispatch task ${command.taskId}: ${err}` + ); + } + } + + private selectDownstreamDaemon(): DownstreamDaemon | null { + let best: DownstreamDaemon | null = null; + let leastTasks = Infinity; + + for (const daemon of this.downstreamDaemons.values()) { + if (daemon.activeTasks.size < daemon.maxConcurrentTasks) { + if (daemon.activeTasks.size < leastTasks) { + leastTasks = daemon.activeTasks.size; + best = daemon; + } + } + } + return best; + } + + private forwardToDownstream(command: DaemonCommand): void { + // Find which downstream daemon has this task + const taskId = "taskId" in command ? (command as { taskId: string }).taskId : null; + if (!taskId) return; + + for (const daemon of this.downstreamDaemons.values()) { + if (daemon.activeTasks.has(taskId)) { + daemon.ws.send(JSON.stringify(command)); + return; + } + } + + this.logConnection( + "warning", + `No downstream daemon found for task ${taskId}` + ); + } + + private broadcastToDownstream(message: string): void { + for (const daemon of this.downstreamDaemons.values()) { + try { + daemon.ws.send(message); + } catch { + // Ignore send failures on individual daemons + } + } + } + + // ========================================================================= + // Downstream (Native Daemon) WebSocket Handling + // ========================================================================= + + /** + * Called by the Agents SDK when a client connects via WebSocket. + * In our architecture, "clients" are native Makima daemons that connect + * to this edge relay to receive task dispatches. + */ + onConnect(connection: Connection, ctx: ConnectionContext): void { + const daemonId = crypto.randomUUID(); + const daemon: DownstreamDaemon = { + id: daemonId, + ws: connection as unknown as WebSocket, + hostname: "unknown", + maxConcurrentTasks: 10, + activeTasks: new Set(), + lastHeartbeat: new Date().toISOString(), + }; + this.downstreamDaemons.set(daemonId, daemon); + + this.logConnection( + "downstream_connect", + `Downstream daemon connected: ${daemonId}` + ); + } + + /** + * Called when a downstream daemon sends a message. + */ + onMessage(connection: Connection, message: WSMessage): void { + const data = + typeof message === "string" + ? message + : message instanceof ArrayBuffer + ? new TextDecoder().decode(message) + : ""; + let parsed: DaemonMessage; + try { + parsed = JSON.parse(data) as DaemonMessage; + } catch { + return; + } + + // Find which downstream daemon sent this + const daemon = this.findDaemonByConnection(connection); + + // Handle authenticate from downstream daemons + if (parsed.type === "authenticate" && daemon) { + daemon.hostname = (parsed as Extract<DaemonMessage, { type: "authenticate" }>).hostname; + daemon.maxConcurrentTasks = (parsed as Extract<DaemonMessage, { type: "authenticate" }>).maxConcurrentTasks; + // Send back a synthetic authentication confirmation + connection.send( + JSON.stringify({ + type: "authenticated", + daemonId: daemon.id, + }) + ); + return; + } + + // Handle heartbeats from downstream + if (parsed.type === "heartbeat" && daemon) { + daemon.lastHeartbeat = new Date().toISOString(); + return; + } + + // Handle task completion from downstream + if (parsed.type === "taskComplete" && daemon) { + const tc = parsed as Extract<DaemonMessage, { type: "taskComplete" }>; + daemon.activeTasks.delete(tc.taskId); + + const status = tc.success ? "completed" : "failed"; + const now = new Date().toISOString(); + const errMsg = tc.error ?? null; + const tid = tc.taskId; + this.sql`UPDATE task_history + SET status = ${status}, completed_at = ${now}, error = ${errMsg} + WHERE task_id = ${tid}`; + + this.setState({ + ...this.state, + activeTasks: this.state.activeTasks.filter((id) => id !== tc.taskId), + }); + } + + // Forward all daemon messages to the upstream server + this.sendUpstream(parsed); + } + + /** + * Called when a downstream daemon disconnects. + */ + onClose(connection: Connection, code: number, reason: string, wasClean: boolean): void { + const daemon = this.findDaemonByConnection(connection); + if (daemon) { + this.logConnection( + "downstream_disconnect", + `Downstream daemon ${daemon.hostname} disconnected: code=${code} reason=${reason} clean=${wasClean}` + ); + this.downstreamDaemons.delete(daemon.id); + } + } + + onError(connectionOrError: Connection | unknown, error?: unknown): void { + // Handle the overloaded signature: (connection, error) or (error) + if (error !== undefined) { + const connection = connectionOrError as Connection; + const daemon = this.findDaemonByConnection(connection); + if (daemon) { + this.logConnection( + "error", + `Downstream daemon ${daemon.hostname} error: ${error}` + ); + } + } else { + this.logConnection("error", `Agent error: ${connectionOrError}`); + } + } + + private findDaemonByConnection(connection: Connection): DownstreamDaemon | undefined { + for (const daemon of this.downstreamDaemons.values()) { + if (daemon.ws === (connection as unknown as WebSocket)) { + return daemon; + } + } + return undefined; + } + + // ========================================================================= + // Heartbeat + // ========================================================================= + + private startHeartbeat(): void { + this.stopHeartbeat(); + this.heartbeatInterval = setInterval(() => { + this.sendHeartbeat(); + }, HEARTBEAT_INTERVAL_MS); + } + + private stopHeartbeat(): void { + if (this.heartbeatInterval) { + clearInterval(this.heartbeatInterval); + this.heartbeatInterval = null; + } + } + + private sendHeartbeat(): void { + const activeTasks = this.state.activeTasks; + this.sendUpstream({ + type: "heartbeat", + activeTasks, + }); + this.setState({ + ...this.state, + lastHeartbeatAt: new Date().toISOString(), + }); + } + + // ========================================================================= + // Upstream Reconnection + // ========================================================================= + + private handleUpstreamClose(code: number, reason: string): void { + this.upstreamWs = null; + this.stopHeartbeat(); + + this.setState({ + ...this.state, + connected: false, + }); + + this.logConnection( + "disconnected", + `Upstream disconnected: code=${code} reason=${reason}` + ); + + this.scheduleReconnect(); + } + + private scheduleReconnect(): void { + if (this.reconnectTimeout) { + clearTimeout(this.reconnectTimeout); + } + + const attempts = this.state.reconnectAttempts; + if (attempts >= MAX_RECONNECT_ATTEMPTS) { + this.logConnection( + "error", + `Max reconnect attempts (${MAX_RECONNECT_ATTEMPTS}) reached — giving up` + ); + return; + } + + // Exponential backoff with jitter + const delay = Math.min( + RECONNECT_BASE_MS * Math.pow(2, attempts) + + Math.random() * RECONNECT_BASE_MS, + RECONNECT_MAX_MS + ); + + this.setState({ + ...this.state, + reconnectAttempts: attempts + 1, + }); + + this.logConnection( + "reconnecting", + `Scheduling reconnect in ${Math.round(delay)}ms (attempt ${attempts + 1})` + ); + + this.reconnectTimeout = setTimeout(() => { + this.connectUpstream(); + }, delay); + } + + // ========================================================================= + // Upstream Send Helper + // ========================================================================= + + private sendUpstream(message: DaemonMessage): void { + if (!this.upstreamWs) return; + try { + this.upstreamWs.send(JSON.stringify(message)); + } catch { + this.logConnection("error", "Failed to send upstream message"); + } + } + + // ========================================================================= + // Connection Logging + // ========================================================================= + + private logConnection(event: string, details: string): void { + try { + const now = new Date().toISOString(); + this.sql`INSERT INTO connection_log (event, timestamp, details) + VALUES (${event}, ${now}, ${details})`; + + // Prune old logs (keep last 200) + this.ctx.storage.sql.exec(` + DELETE FROM connection_log + WHERE id NOT IN ( + SELECT id FROM connection_log ORDER BY id DESC LIMIT 200 + ) + `); + } catch { + // Swallow — logging should never break the agent + } + } + + // ========================================================================= + // RPC Methods (callable from Agent SDK clients) + // ========================================================================= + + /** Returns the current status of the edge relay agent. */ + getStatus(): StatusResponse { + const agentName = this.env.MAKIMA_AGENT_NAME ?? `makima-edge-${this.name}`; + + return { + status: this.state.connected + ? this.downstreamDaemons.size > 0 + ? "ok" + : "degraded" + : "disconnected", + agentName, + upstreamConnected: this.state.connected, + daemonId: this.state.daemonId, + lastHeartbeat: this.state.lastHeartbeatAt, + connectedDaemons: this.downstreamDaemons.size, + activeTasks: this.state.activeTasks.length, + totalTasksProcessed: this.totalTasksProcessed, + }; + } + + /** Returns paginated task history from the SQLite store. */ + getTaskHistory(limit = 50, offset = 0): TaskHistoryResponse { + const clampedLimit = Math.min(Math.max(1, limit), TASK_HISTORY_MAX); + + const rows = this.sql<TaskRecord>` + SELECT task_id as taskId, task_name as taskName, status, + dispatched_to as dispatchedTo, received_at as receivedAt, + dispatched_at as dispatchedAt, completed_at as completedAt, error + FROM task_history + ORDER BY received_at DESC + LIMIT ${clampedLimit} OFFSET ${offset} + `; + + const totalRows = this.sql<{ cnt: number }>`SELECT COUNT(*) as cnt FROM task_history`; + + return { + tasks: rows, + total: totalRows[0]?.cnt ?? 0, + limit: clampedLimit, + offset, + }; + } + + /** Force a reconnection to the upstream Makima server. */ + reconnect(): { success: boolean; message: string } { + // Close existing connection if any + if (this.upstreamWs) { + try { + this.upstreamWs.close(1000, "Manual reconnect requested"); + } catch { + // Ignore + } + this.upstreamWs = null; + } + + this.stopHeartbeat(); + + // Reset reconnect counter + this.setState({ + ...this.state, + reconnectAttempts: 0, + connected: false, + }); + + // Start fresh connection + this.connectUpstream(); + + return { + success: true, + message: "Reconnection initiated", + }; + } + + // ========================================================================= + // HTTP Request Handler + // ========================================================================= + + /** + * Handles HTTP requests routed to this Durable Object. + * Provides a REST API for management and status. + */ + async onRequest(request: Request): Promise<Response> { + const url = new URL(request.url); + const path = url.pathname; + + // CORS headers for browser-based clients + const corsHeaders: Record<string, string> = { + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "GET, POST, OPTIONS", + "Access-Control-Allow-Headers": "Content-Type, Authorization", + }; + + if (request.method === "OPTIONS") { + return new Response(null, { status: 204, headers: corsHeaders }); + } + + const jsonHeaders: Record<string, string> = { + "Content-Type": "application/json", + ...corsHeaders, + }; + + try { + // GET /status — Agent status + if (path === "/status" || path === "/") { + return new Response(JSON.stringify(this.getStatus()), { + headers: jsonHeaders, + }); + } + + // GET /tasks — Task history + if (path === "/tasks" && request.method === "GET") { + const limit = parseInt(url.searchParams.get("limit") ?? "50", 10); + const offset = parseInt(url.searchParams.get("offset") ?? "0", 10); + return new Response( + JSON.stringify(this.getTaskHistory(limit, offset)), + { headers: jsonHeaders } + ); + } + + // POST /reconnect — Force reconnection + if (path === "/reconnect" && request.method === "POST") { + return new Response(JSON.stringify(this.reconnect()), { + headers: jsonHeaders, + }); + } + + // GET /logs — Connection logs + if (path === "/logs" && request.method === "GET") { + const logLimit = Math.min( + parseInt(url.searchParams.get("limit") ?? "50", 10), + 200 + ); + const rows = this.sql<{ + id: number; + event: string; + timestamp: string; + details: string | null; + }>`SELECT id, event, timestamp, details + FROM connection_log + ORDER BY id DESC + LIMIT ${logLimit}`; + return new Response(JSON.stringify({ logs: rows }), { + headers: jsonHeaders, + }); + } + + // GET /health — Simple health check + if (path === "/health") { + return new Response( + JSON.stringify({ + healthy: true, + upstreamConnected: this.state.connected, + }), + { headers: jsonHeaders } + ); + } + + return new Response(JSON.stringify({ error: "Not found" }), { + status: 404, + headers: jsonHeaders, + }); + } catch (err) { + return new Response( + JSON.stringify({ + error: err instanceof Error ? err.message : "Internal error", + }), + { status: 500, headers: jsonHeaders } + ); + } + } +} |
