/**
* MakimaAgent — Cloudflare Durable Object Agent
*
* Acts as an edge relay between the Makima server and downstream daemon
* instances. It does NOT execute tasks itself (that requires native process
* spawning, git, filesystem access). Instead it:
*
* 1. Maintains a persistent WebSocket to the Makima server (upstream).
* 2. Accepts WebSocket connections from native daemons (downstream).
* 3. Relays messages bidirectionally between upstream and downstream.
* 4. Tracks task dispatch history and daemon health in SQLite.
* 5. Exposes an HTTP API for status and management.
*/
import { Agent } from "agents";
import type { Connection, ConnectionContext, WSMessage } from "agents";
import type {
Env,
AgentState,
DaemonMessage,
DaemonCommand,
TaskRecord,
DownstreamDaemon,
StatusResponse,
TaskHistoryResponse,
} from "./types";
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
const HEARTBEAT_INTERVAL_MS = 30_000; // 30s
const RECONNECT_BASE_MS = 1_000; // 1s initial backoff
const RECONNECT_MAX_MS = 60_000; // 60s max backoff
const MAX_RECONNECT_ATTEMPTS = 20;
const TASK_HISTORY_MAX = 500;
// ---------------------------------------------------------------------------
// MakimaAgent Durable Object
// ---------------------------------------------------------------------------
export class MakimaAgent extends Agent<Env, AgentState> {
// -- Upstream (Makima server) connection --
private upstreamWs: WebSocket | null = null;
private heartbeatInterval: ReturnType<typeof setInterval> | null = null;
private reconnectTimeout: ReturnType<typeof setTimeout> | null = null;
// -- Downstream (native daemon) connections --
private downstreamDaemons: Map<string, DownstreamDaemon> = new Map();
// -- Bookkeeping --
private totalTasksProcessed = 0;
// -- Default state for new agent instances --
initialState: AgentState = {
connected: false,
lastConnectedAt: null,
lastHeartbeatAt: null,
reconnectAttempts: 0,
daemonId: null,
activeTasks: [],
};
// =========================================================================
// Lifecycle
// =========================================================================
/**
* Called once when the Durable Object is first instantiated.
* Sets up the SQLite schema and establishes the upstream connection.
*/
async onStart(): Promise<void> {
this.initDatabase();
this.loadStats();
// Attempt upstream connection
await this.connectUpstream();
}
// =========================================================================
// SQLite Schema
// =========================================================================
private initDatabase(): void {
// Use the raw storage sql for DDL since tagged template doesn't support multi-statement
this.ctx.storage.sql.exec(`
CREATE TABLE IF NOT EXISTS task_history (
task_id TEXT PRIMARY KEY,
task_name TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
dispatched_to TEXT,
received_at TEXT NOT NULL,
dispatched_at TEXT,
completed_at TEXT,
error TEXT
)
`);
this.ctx.storage.sql.exec(`
CREATE TABLE IF NOT EXISTS connection_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event TEXT NOT NULL,
timestamp TEXT NOT NULL,
details TEXT
)
`);
this.ctx.storage.sql.exec(`
CREATE INDEX IF NOT EXISTS idx_task_history_status
ON task_history(status)
`);
this.ctx.storage.sql.exec(`
CREATE INDEX IF NOT EXISTS idx_task_history_received
ON task_history(received_at DESC)
`);
}
private loadStats(): void {
const rows = this.sql<{ cnt: number }>`SELECT COUNT(*) as cnt FROM task_history`;
this.totalTasksProcessed = rows[0]?.cnt ?? 0;
}
// =========================================================================
// Upstream (Makima Server) Connection
// =========================================================================
private async connectUpstream(): Promise<void> {
const serverUrl = this.env.MAKIMA_SERVER_URL;
const apiKey = this.env.MAKIMA_API_KEY;
if (!serverUrl || !apiKey) {
this.logConnection(
"error",
"Missing MAKIMA_SERVER_URL or MAKIMA_API_KEY"
);
return;
}
// Normalize URL: ensure it ends with /ws/daemon
let wsUrl = serverUrl;
if (!wsUrl.endsWith("/ws/daemon")) {
wsUrl = wsUrl.replace(/\/$/, "") + "/ws/daemon";
}
try {
// Cloudflare Workers use the standard WebSocket API via fetch upgrade
const resp = await fetch(wsUrl, {
headers: {
Upgrade: "websocket",
},
});
const ws = resp.webSocket;
if (!ws) {
throw new Error(`WebSocket upgrade failed (status ${resp.status})`);
}
ws.accept();
this.upstreamWs = ws;
// Authenticate immediately
const authMsg: DaemonMessage = {
type: "authenticate",
apiKey,
machineId: `cf-agent-${this.name}`,
hostname: `cloudflare-edge-${this.name}`,
maxConcurrentTasks: 0, // Relay only — does not execute tasks directly
};
ws.send(JSON.stringify(authMsg));
// Wire up event handlers
ws.addEventListener("message", (event) => {
this.handleUpstreamMessage(event);
});
ws.addEventListener("close", (event) => {
this.handleUpstreamClose(event.code, event.reason);
});
ws.addEventListener("error", () => {
this.logConnection("error", "Upstream WebSocket error");
this.handleUpstreamClose(1006, "WebSocket error");
});
// Start heartbeat
this.startHeartbeat();
this.logConnection("connected", `Connected to ${wsUrl}`);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
this.logConnection("error", `Failed to connect: ${msg}`);
this.scheduleReconnect();
}
}
private handleUpstreamMessage(event: MessageEvent): void {
let command: DaemonCommand;
try {
command = JSON.parse(
typeof event.data === "string" ? event.data : ""
) as DaemonCommand;
} catch {
this.logConnection("error", "Failed to parse upstream message");
return;
}
switch (command.type) {
case "authenticated":
this.onUpstreamAuthenticated(command.daemonId);
break;
case "spawnTask":
this.onSpawnTask(command);
break;
case "pauseTask":
case "resumeTask":
case "interruptTask":
case "sendMessage":
// Forward task control commands to the appropriate downstream daemon
this.forwardToDownstream(command);
break;
default:
// Forward any unrecognized commands to all downstream daemons
this.broadcastToDownstream(JSON.stringify(command));
break;
}
}
private onUpstreamAuthenticated(daemonId: string): void {
this.setState({
...this.state,
connected: true,
lastConnectedAt: new Date().toISOString(),
reconnectAttempts: 0,
daemonId,
});
this.logConnection(
"authenticated",
`Authenticated as daemon ${daemonId}`
);
}
private onSpawnTask(command: Extract<DaemonCommand, { type: "spawnTask" }>): void {
// Record in task history
const now = new Date().toISOString();
const taskId = command.taskId;
const taskName = command.taskName;
this.sql`INSERT OR REPLACE INTO task_history (task_id, task_name, status, received_at)
VALUES (${taskId}, ${taskName}, 'pending', ${now})`;
this.totalTasksProcessed++;
// Find the best downstream daemon to dispatch to
const daemon = this.selectDownstreamDaemon();
if (daemon) {
this.dispatchTaskToDownstream(daemon, command);
} else {
// No downstream daemons connected — store as pending
this.logConnection(
"warning",
`No downstream daemons available for task ${command.taskId}`
);
// Notify upstream that we can't handle this task right now
this.sendUpstream({
type: "taskStatusChange",
taskId: command.taskId,
oldStatus: "pending",
newStatus: "pending",
});
}
}
private dispatchTaskToDownstream(
daemon: DownstreamDaemon,
command: Extract<DaemonCommand, { type: "spawnTask" }>
): void {
try {
daemon.ws.send(JSON.stringify(command));
daemon.activeTasks.add(command.taskId);
const now = new Date().toISOString();
const did = daemon.id;
const tid = command.taskId;
this.sql`UPDATE task_history
SET status = 'dispatched', dispatched_to = ${did}, dispatched_at = ${now}
WHERE task_id = ${tid}`;
this.setState({
...this.state,
activeTasks: [...this.state.activeTasks, command.taskId],
});
this.logConnection(
"dispatch",
`Task ${command.taskId} dispatched to daemon ${daemon.hostname}`
);
} catch (err) {
this.logConnection(
"error",
`Failed to dispatch task ${command.taskId}: ${err}`
);
}
}
private selectDownstreamDaemon(): DownstreamDaemon | null {
let best: DownstreamDaemon | null = null;
let leastTasks = Infinity;
for (const daemon of this.downstreamDaemons.values()) {
if (daemon.activeTasks.size < daemon.maxConcurrentTasks) {
if (daemon.activeTasks.size < leastTasks) {
leastTasks = daemon.activeTasks.size;
best = daemon;
}
}
}
return best;
}
private forwardToDownstream(command: DaemonCommand): void {
// Find which downstream daemon has this task
const taskId = "taskId" in command ? (command as { taskId: string }).taskId : null;
if (!taskId) return;
for (const daemon of this.downstreamDaemons.values()) {
if (daemon.activeTasks.has(taskId)) {
daemon.ws.send(JSON.stringify(command));
return;
}
}
this.logConnection(
"warning",
`No downstream daemon found for task ${taskId}`
);
}
private broadcastToDownstream(message: string): void {
for (const daemon of this.downstreamDaemons.values()) {
try {
daemon.ws.send(message);
} catch {
// Ignore send failures on individual daemons
}
}
}
// =========================================================================
// Downstream (Native Daemon) WebSocket Handling
// =========================================================================
/**
* Called by the Agents SDK when a client connects via WebSocket.
* In our architecture, "clients" are native Makima daemons that connect
* to this edge relay to receive task dispatches.
*/
onConnect(connection: Connection, ctx: ConnectionContext): void {
const daemonId = crypto.randomUUID();
const daemon: DownstreamDaemon = {
id: daemonId,
ws: connection as unknown as WebSocket,
hostname: "unknown",
maxConcurrentTasks: 10,
activeTasks: new Set(),
lastHeartbeat: new Date().toISOString(),
};
this.downstreamDaemons.set(daemonId, daemon);
this.logConnection(
"downstream_connect",
`Downstream daemon connected: ${daemonId}`
);
}
/**
* Called when a downstream daemon sends a message.
*/
onMessage(connection: Connection, message: WSMessage): void {
const data =
typeof message === "string"
? message
: message instanceof ArrayBuffer
? new TextDecoder().decode(message)
: "";
let parsed: DaemonMessage;
try {
parsed = JSON.parse(data) as DaemonMessage;
} catch {
return;
}
// Find which downstream daemon sent this
const daemon = this.findDaemonByConnection(connection);
// Handle authenticate from downstream daemons
if (parsed.type === "authenticate" && daemon) {
daemon.hostname = (parsed as Extract<DaemonMessage, { type: "authenticate" }>).hostname;
daemon.maxConcurrentTasks = (parsed as Extract<DaemonMessage, { type: "authenticate" }>).maxConcurrentTasks;
// Send back a synthetic authentication confirmation
connection.send(
JSON.stringify({
type: "authenticated",
daemonId: daemon.id,
})
);
return;
}
// Handle heartbeats from downstream
if (parsed.type === "heartbeat" && daemon) {
daemon.lastHeartbeat = new Date().toISOString();
return;
}
// Handle task completion from downstream
if (parsed.type === "taskComplete" && daemon) {
const tc = parsed as Extract<DaemonMessage, { type: "taskComplete" }>;
daemon.activeTasks.delete(tc.taskId);
const status = tc.success ? "completed" : "failed";
const now = new Date().toISOString();
const errMsg = tc.error ?? null;
const tid = tc.taskId;
this.sql`UPDATE task_history
SET status = ${status}, completed_at = ${now}, error = ${errMsg}
WHERE task_id = ${tid}`;
this.setState({
...this.state,
activeTasks: this.state.activeTasks.filter((id) => id !== tc.taskId),
});
}
// Forward all daemon messages to the upstream server
this.sendUpstream(parsed);
}
/**
* Called when a downstream daemon disconnects.
*/
onClose(connection: Connection, code: number, reason: string, wasClean: boolean): void {
const daemon = this.findDaemonByConnection(connection);
if (daemon) {
this.logConnection(
"downstream_disconnect",
`Downstream daemon ${daemon.hostname} disconnected: code=${code} reason=${reason} clean=${wasClean}`
);
this.downstreamDaemons.delete(daemon.id);
}
}
onError(connectionOrError: Connection | unknown, error?: unknown): void {
// Handle the overloaded signature: (connection, error) or (error)
if (error !== undefined) {
const connection = connectionOrError as Connection;
const daemon = this.findDaemonByConnection(connection);
if (daemon) {
this.logConnection(
"error",
`Downstream daemon ${daemon.hostname} error: ${error}`
);
}
} else {
this.logConnection("error", `Agent error: ${connectionOrError}`);
}
}
private findDaemonByConnection(connection: Connection): DownstreamDaemon | undefined {
for (const daemon of this.downstreamDaemons.values()) {
if (daemon.ws === (connection as unknown as WebSocket)) {
return daemon;
}
}
return undefined;
}
// =========================================================================
// Heartbeat
// =========================================================================
private startHeartbeat(): void {
this.stopHeartbeat();
this.heartbeatInterval = setInterval(() => {
this.sendHeartbeat();
}, HEARTBEAT_INTERVAL_MS);
}
private stopHeartbeat(): void {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
}
}
private sendHeartbeat(): void {
const activeTasks = this.state.activeTasks;
this.sendUpstream({
type: "heartbeat",
activeTasks,
});
this.setState({
...this.state,
lastHeartbeatAt: new Date().toISOString(),
});
}
// =========================================================================
// Upstream Reconnection
// =========================================================================
private handleUpstreamClose(code: number, reason: string): void {
this.upstreamWs = null;
this.stopHeartbeat();
this.setState({
...this.state,
connected: false,
});
this.logConnection(
"disconnected",
`Upstream disconnected: code=${code} reason=${reason}`
);
this.scheduleReconnect();
}
private scheduleReconnect(): void {
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout);
}
const attempts = this.state.reconnectAttempts;
if (attempts >= MAX_RECONNECT_ATTEMPTS) {
this.logConnection(
"error",
`Max reconnect attempts (${MAX_RECONNECT_ATTEMPTS}) reached — giving up`
);
return;
}
// Exponential backoff with jitter
const delay = Math.min(
RECONNECT_BASE_MS * Math.pow(2, attempts) +
Math.random() * RECONNECT_BASE_MS,
RECONNECT_MAX_MS
);
this.setState({
...this.state,
reconnectAttempts: attempts + 1,
});
this.logConnection(
"reconnecting",
`Scheduling reconnect in ${Math.round(delay)}ms (attempt ${attempts + 1})`
);
this.reconnectTimeout = setTimeout(() => {
this.connectUpstream();
}, delay);
}
// =========================================================================
// Upstream Send Helper
// =========================================================================
private sendUpstream(message: DaemonMessage): void {
if (!this.upstreamWs) return;
try {
this.upstreamWs.send(JSON.stringify(message));
} catch {
this.logConnection("error", "Failed to send upstream message");
}
}
// =========================================================================
// Connection Logging
// =========================================================================
private logConnection(event: string, details: string): void {
try {
const now = new Date().toISOString();
this.sql`INSERT INTO connection_log (event, timestamp, details)
VALUES (${event}, ${now}, ${details})`;
// Prune old logs (keep last 200)
this.ctx.storage.sql.exec(`
DELETE FROM connection_log
WHERE id NOT IN (
SELECT id FROM connection_log ORDER BY id DESC LIMIT 200
)
`);
} catch {
// Swallow — logging should never break the agent
}
}
// =========================================================================
// RPC Methods (callable from Agent SDK clients)
// =========================================================================
/** Returns the current status of the edge relay agent. */
getStatus(): StatusResponse {
const agentName = this.env.MAKIMA_AGENT_NAME ?? `makima-edge-${this.name}`;
return {
status: this.state.connected
? this.downstreamDaemons.size > 0
? "ok"
: "degraded"
: "disconnected",
agentName,
upstreamConnected: this.state.connected,
daemonId: this.state.daemonId,
lastHeartbeat: this.state.lastHeartbeatAt,
connectedDaemons: this.downstreamDaemons.size,
activeTasks: this.state.activeTasks.length,
totalTasksProcessed: this.totalTasksProcessed,
};
}
/** Returns paginated task history from the SQLite store. */
getTaskHistory(limit = 50, offset = 0): TaskHistoryResponse {
const clampedLimit = Math.min(Math.max(1, limit), TASK_HISTORY_MAX);
const rows = this.sql<TaskRecord>`
SELECT task_id as taskId, task_name as taskName, status,
dispatched_to as dispatchedTo, received_at as receivedAt,
dispatched_at as dispatchedAt, completed_at as completedAt, error
FROM task_history
ORDER BY received_at DESC
LIMIT ${clampedLimit} OFFSET ${offset}
`;
const totalRows = this.sql<{ cnt: number }>`SELECT COUNT(*) as cnt FROM task_history`;
return {
tasks: rows,
total: totalRows[0]?.cnt ?? 0,
limit: clampedLimit,
offset,
};
}
/** Force a reconnection to the upstream Makima server. */
reconnect(): { success: boolean; message: string } {
// Close existing connection if any
if (this.upstreamWs) {
try {
this.upstreamWs.close(1000, "Manual reconnect requested");
} catch {
// Ignore
}
this.upstreamWs = null;
}
this.stopHeartbeat();
// Reset reconnect counter
this.setState({
...this.state,
reconnectAttempts: 0,
connected: false,
});
// Start fresh connection
this.connectUpstream();
return {
success: true,
message: "Reconnection initiated",
};
}
// =========================================================================
// HTTP Request Handler
// =========================================================================
/**
* Handles HTTP requests routed to this Durable Object.
* Provides a REST API for management and status.
*/
async onRequest(request: Request): Promise<Response> {
const url = new URL(request.url);
const path = url.pathname;
// CORS headers for browser-based clients
const corsHeaders: Record<string, string> = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type, Authorization",
};
if (request.method === "OPTIONS") {
return new Response(null, { status: 204, headers: corsHeaders });
}
const jsonHeaders: Record<string, string> = {
"Content-Type": "application/json",
...corsHeaders,
};
try {
// GET /status — Agent status
if (path === "/status" || path === "/") {
return new Response(JSON.stringify(this.getStatus()), {
headers: jsonHeaders,
});
}
// GET /tasks — Task history
if (path === "/tasks" && request.method === "GET") {
const limit = parseInt(url.searchParams.get("limit") ?? "50", 10);
const offset = parseInt(url.searchParams.get("offset") ?? "0", 10);
return new Response(
JSON.stringify(this.getTaskHistory(limit, offset)),
{ headers: jsonHeaders }
);
}
// POST /reconnect — Force reconnection
if (path === "/reconnect" && request.method === "POST") {
return new Response(JSON.stringify(this.reconnect()), {
headers: jsonHeaders,
});
}
// GET /logs — Connection logs
if (path === "/logs" && request.method === "GET") {
const logLimit = Math.min(
parseInt(url.searchParams.get("limit") ?? "50", 10),
200
);
const rows = this.sql<{
id: number;
event: string;
timestamp: string;
details: string | null;
}>`SELECT id, event, timestamp, details
FROM connection_log
ORDER BY id DESC
LIMIT ${logLimit}`;
return new Response(JSON.stringify({ logs: rows }), {
headers: jsonHeaders,
});
}
// GET /health — Simple health check
if (path === "/health") {
return new Response(
JSON.stringify({
healthy: true,
upstreamConnected: this.state.connected,
}),
{ headers: jsonHeaders }
);
}
return new Response(JSON.stringify({ error: "Not found" }), {
status: 404,
headers: jsonHeaders,
});
} catch (err) {
return new Response(
JSON.stringify({
error: err instanceof Error ? err.message : "Internal error",
}),
{ status: 500, headers: jsonHeaders }
);
}
}
}