From d670dcb72984cfa483063d161bb468704038895c Mon Sep 17 00:00:00 2001 From: soryu Date: Sat, 21 Feb 2026 19:33:44 +0000 Subject: feat: add directive ask command, log backfill & specialized DAG steps (#75) * feat: soryu-co/soryu - makima: Add makima directive ask CLI command * feat: soryu-co/soryu - makima: Update directive skill docs and planning prompt to support asking questions * feat: soryu-co/soryu - makima: Add log stream backfill for directive tasks * feat: soryu-co/soryu - makima: Update planning prompts to inform tasks they can ask questions * WIP: heartbeat checkpoint * feat: soryu-co/soryu - makima: Add ask command to directive SKILL.md documentation * feat: soryu-co/soryu - makima: Add log stream backfill for directive task output history * feat: soryu-co/soryu - makima: Update planning prompt to tell planning tasks they can ask questions * WIP: heartbeat checkpoint * feat: soryu-co/soryu - makima: Show Planning, PR, and Cleanup tasks as specialized steps in DAG --- .../src/components/directives/DirectiveDAG.tsx | 114 +++++++++++++-- .../src/components/directives/DirectiveDetail.tsx | 91 ++++++++---- .../components/directives/OrchestratorStepNode.tsx | 161 +++++++++++++++++++++ .../frontend/src/hooks/useMultiTaskSubscription.ts | 153 +++++++++++++++++++- 4 files changed, 475 insertions(+), 44 deletions(-) create mode 100644 makima/frontend/src/components/directives/OrchestratorStepNode.tsx (limited to 'makima/frontend') diff --git a/makima/frontend/src/components/directives/DirectiveDAG.tsx b/makima/frontend/src/components/directives/DirectiveDAG.tsx index 27a80ac..8c7def9 100644 --- a/makima/frontend/src/components/directives/DirectiveDAG.tsx +++ b/makima/frontend/src/components/directives/DirectiveDAG.tsx @@ -1,9 +1,31 @@ import { useMemo } from "react"; import type { DirectiveStep } from "../../lib/api"; import { StepNode } from "./StepNode"; +import { + OrchestratorStepNode, + type OrchestratorStepType, + type OrchestratorStepStatus, +} from "./OrchestratorStepNode"; + +export interface VirtualStep { + type: OrchestratorStepType; + taskId: string; + status: OrchestratorStepStatus; + label: string; + hasQuestions?: boolean; +} + +export interface SpecializedStep { + id: string; + name: string; + type: "orchestrator" | "completion"; + taskId: string; + status: "running" | "completed"; +} interface DirectiveDAGProps { steps: DirectiveStep[]; + specializedSteps?: SpecializedStep[]; onComplete?: (stepId: string) => void; onFail?: (stepId: string) => void; onSkip?: (stepId: string) => void; @@ -13,6 +35,13 @@ interface Layer { steps: DirectiveStep[]; } +/** Types that should appear before the regular DAG steps */ +const BEFORE_TYPES = new Set([ + "planning", + "replanning", + "plan-orders", +]); + function topoSort(steps: DirectiveStep[]): Layer[] { if (steps.length === 0) return []; @@ -31,10 +60,13 @@ function topoSort(steps: DirectiveStep[]): Layer[] { })); } -export function DirectiveDAG({ steps, onComplete, onFail, onSkip }: DirectiveDAGProps) { +export function DirectiveDAG({ steps, specializedSteps, onComplete, onFail, onSkip }: DirectiveDAGProps) { const layers = useMemo(() => topoSort(steps), [steps]); - if (steps.length === 0) { + const orchestratorSteps = specializedSteps?.filter(s => s.type === "orchestrator") ?? []; + const completionSteps = specializedSteps?.filter(s => s.type === "completion") ?? []; + + if (steps.length === 0 && orchestratorSteps.length === 0 && completionSteps.length === 0) { return (
No steps yet. Add steps to build the DAG. @@ -44,6 +76,19 @@ export function DirectiveDAG({ steps, onComplete, onFail, onSkip }: DirectiveDAG return (
+ {/* Orchestrator steps (Planning/Cleanup/Orders) - rendered above regular steps */} + {orchestratorSteps.map(step => ( + + ))} + + {/* Connector line if both orchestrator step and regular steps exist */} + {orchestratorSteps.length > 0 && layers.length > 0 && ( +
+
+
+ )} + + {/* Regular step layers */} {layers.map((layer, layerIdx) => (
{layerIdx > 0 && ( @@ -52,18 +97,69 @@ export function DirectiveDAG({ steps, onComplete, onFail, onSkip }: DirectiveDAG
)}
- {layer.steps.map((step) => ( - onComplete(step.id) : undefined} - onFail={onFail ? () => onFail(step.id) : undefined} - onSkip={onSkip ? () => onSkip(step.id) : undefined} + {afterSteps.map((vs) => ( + ))}
))} + + {/* Connector line if both regular steps and completion step exist */} + {completionSteps.length > 0 && layers.length > 0 && ( +
+
+
+ )} + + {/* Completion steps (PR creation) - rendered below regular steps */} + {completionSteps.map(step => ( + + ))} +
+ ); +} + +function SpecializedStepNode({ step }: { step: SpecializedStep }) { + const themeColors = step.type === "orchestrator" + ? { + bg: "bg-[#1a1a30]", + border: "border-[rgba(117,170,252,0.3)]", + text: "text-[#75aafc]", + dot: "bg-[#75aafc]", + label: step.name.startsWith("Cleanup") ? "CLEANUP" + : step.name.startsWith("Pick up") ? "ORDERS" + : "PLANNING", + } + : { + bg: "bg-[#1a1a10]", + border: "border-yellow-900/50", + text: "text-yellow-400", + dot: "bg-yellow-400", + label: "PR", + }; + + return ( +
+ + + {themeColors.label} + + + {step.name} + + + View task +
); } diff --git a/makima/frontend/src/components/directives/DirectiveDetail.tsx b/makima/frontend/src/components/directives/DirectiveDetail.tsx index 98940d0..171654d 100644 --- a/makima/frontend/src/components/directives/DirectiveDetail.tsx +++ b/makima/frontend/src/components/directives/DirectiveDetail.tsx @@ -1,6 +1,7 @@ import { useState, useMemo, useEffect, useRef } from "react"; import type { DirectiveWithSteps, DirectiveStatus, UpdateDirectiveRequest } from "../../lib/api"; import { DirectiveDAG } from "./DirectiveDAG"; +import type { SpecializedStep } from "./DirectiveDAG"; import { DirectiveLogStream } from "./DirectiveLogStream"; import { useMultiTaskSubscription } from "../../hooks/useMultiTaskSubscription"; import { useSupervisorQuestions } from "../../contexts/SupervisorQuestionsContext"; @@ -108,6 +109,33 @@ export function DirectiveDetail({ return map; }, [taskMapKey]); // eslint-disable-line react-hooks/exhaustive-deps + // Build specialized steps for DAG visualization + const specializedSteps = useMemo(() => { + const steps: SpecializedStep[] = []; + + if (directive.orchestratorTaskId) { + steps.push({ + id: `orchestrator-${directive.orchestratorTaskId}`, + name: taskMap.get(directive.orchestratorTaskId) || "Planning", + type: "orchestrator", + taskId: directive.orchestratorTaskId, + status: "running", + }); + } + + if (directive.completionTaskId) { + steps.push({ + id: `completion-${directive.completionTaskId}`, + name: directive.prUrl ? "Updating PR" : "Creating PR", + type: "completion", + taskId: directive.completionTaskId, + status: "running", + }); + } + + return steps; + }, [directive.orchestratorTaskId, directive.completionTaskId, directive.prUrl, taskMap]); + // Subscribe to all task outputs const { connected, entries, clearEntries } = useMultiTaskSubscription({ taskMap, @@ -149,6 +177,36 @@ export function DirectiveDetail({ setEditingGoal(false); }; + // Build virtual steps for orchestrator tasks to display in the DAG + const virtualSteps = useMemo(() => { + const steps: VirtualStep[] = []; + if (directive.orchestratorTaskId) { + const hasOrchestratorQuestions = directiveQuestions.some( + (q) => q.taskId === directive.orchestratorTaskId + ); + steps.push({ + type: "planning", + taskId: directive.orchestratorTaskId, + status: "running", + label: "Planning", + hasQuestions: hasOrchestratorQuestions, + }); + } + if (directive.completionTaskId) { + const hasCompletionQuestions = directiveQuestions.some( + (q) => q.taskId === directive.completionTaskId + ); + steps.push({ + type: directive.prUrl ? "pr-update" : "pr", + taskId: directive.completionTaskId, + status: "running", + label: directive.prUrl ? "Updating PR" : "Creating PR", + hasQuestions: hasCompletionQuestions, + }); + } + return steps; + }, [directive.orchestratorTaskId, directive.completionTaskId, directive.prUrl, directiveQuestions]); + return (
{/* Header */} @@ -217,22 +275,6 @@ export function DirectiveDetail({
- {/* Orchestrator planning indicator */} - {directive.orchestratorTaskId && ( -
- - - Planning in progress... - - - View task - -
- )} - {/* PR link */} {directive.prUrl && (
@@ -251,22 +293,6 @@ export function DirectiveDetail({
)} - {/* Completion task indicator */} - {directive.completionTaskId && ( -
- - - {directive.prUrl ? "Updating PR..." : "Creating PR..."} - - - View task - -
- )} - {/* Pending Questions */} {directiveQuestions.length > 0 && (
@@ -423,6 +449,7 @@ export function DirectiveDetail({ = { + planning: { + accent: "#75aafc", + bg: "bg-[#0d1a30]", + border: "border-[#75aafc]", + text: "text-[#75aafc]", + dot: "bg-[#75aafc]", + }, + replanning: { + accent: "#75aafc", + bg: "bg-[#0d1a30]", + border: "border-[#75aafc]", + text: "text-[#75aafc]", + dot: "bg-[#75aafc]", + }, + "plan-orders": { + accent: "#c084fc", + bg: "bg-[#1a0d30]", + border: "border-[#c084fc]", + text: "text-[#c084fc]", + dot: "bg-[#c084fc]", + }, + pr: { + accent: "#34d399", + bg: "bg-[#0a1a14]", + border: "border-[#34d399]", + text: "text-[#34d399]", + dot: "bg-[#34d399]", + }, + "pr-update": { + accent: "#34d399", + bg: "bg-[#0a1a14]", + border: "border-[#34d399]", + text: "text-[#34d399]", + dot: "bg-[#34d399]", + }, + cleanup: { + accent: "#7788aa", + bg: "bg-[#141a24]", + border: "border-[#7788aa]", + text: "text-[#7788aa]", + dot: "bg-[#7788aa]", + }, + verification: { + accent: "#7788aa", + bg: "bg-[#141a24]", + border: "border-[#7788aa]", + text: "text-[#7788aa]", + dot: "bg-[#7788aa]", + }, +}; + +const TYPE_LABELS: Record = { + planning: "PLANNING", + replanning: "REPLANNING", + "plan-orders": "PLAN ORDERS", + pr: "PR", + "pr-update": "PR UPDATE", + cleanup: "CLEANUP", + verification: "VERIFICATION", +}; + +const STATUS_LABELS: Record = { + pending: "PENDING", + running: "RUNNING", + completed: "DONE", + failed: "FAILED", +}; + +export function OrchestratorStepNode({ + type, + taskId, + status, + label, + hasQuestions, +}: OrchestratorStepNodeProps) { + const colors = TYPE_COLORS[type]; + const typeLabel = TYPE_LABELS[type]; + const statusLabel = STATUS_LABELS[status]; + + return ( +
+ {/* Type badge */} +
+
+ {/* Status dot */} + {status === "running" && ( + + )} + {status === "completed" && ( + + )} + {status === "failed" && ( + + )} + {status === "pending" && ( + + )} + + {typeLabel} + +
+
+ {hasQuestions && ( + + )} + + {statusLabel} + +
+
+ + {/* Label */} + + {label} + + + {/* Task link */} + + {status === "running" ? "View running task" : "View task"} + +
+ ); +} diff --git a/makima/frontend/src/hooks/useMultiTaskSubscription.ts b/makima/frontend/src/hooks/useMultiTaskSubscription.ts index 4303f1b..41489c7 100644 --- a/makima/frontend/src/hooks/useMultiTaskSubscription.ts +++ b/makima/frontend/src/hooks/useMultiTaskSubscription.ts @@ -1,5 +1,5 @@ import { useState, useCallback, useRef, useEffect, useMemo } from "react"; -import { TASK_SUBSCRIBE_ENDPOINT } from "../lib/api"; +import { TASK_SUBSCRIBE_ENDPOINT, getTaskOutput } from "../lib/api"; import type { TaskOutputEvent } from "./useTaskSubscription"; export interface MultiTaskOutputEntry extends TaskOutputEvent { @@ -7,6 +7,8 @@ export interface MultiTaskOutputEntry extends TaskOutputEvent { taskLabel: string; /** Timestamp when the entry was received */ receivedAt: number; + /** Whether this entry was backfilled from historical data (not live-streamed) */ + isBackfill?: boolean; } interface UseMultiTaskSubscriptionOptions { @@ -26,8 +28,11 @@ export function useMultiTaskSubscription(options: UseMultiTaskSubscriptionOption const wsRef = useRef(null); const reconnectTimeoutRef = useRef(null); const subscribedTasksRef = useRef>(new Set()); + const backfilledTasksRef = useRef>(new Set()); const taskMapRef = useRef(taskMap); const enabledRef = useRef(enabled); + /** Track which task IDs have already been backfilled to avoid re-fetching */ + const backfilledTasksRef = useRef>(new Set()); // Keep refs in sync useEffect(() => { @@ -38,6 +43,88 @@ export function useMultiTaskSubscription(options: UseMultiTaskSubscriptionOption enabledRef.current = enabled; }, [enabled]); + /** Max number of historical events to backfill per task */ + const MAX_BACKFILL_PER_TASK = 200; + + /** + * Convert a TaskEvent (from the REST API) into a MultiTaskOutputEntry. + * Only converts events with event_type === 'output'. + */ + const convertTaskEventToEntry = useCallback( + (event: TaskEvent): MultiTaskOutputEntry | null => { + if (event.eventType !== "output") return null; + const data = event.eventData; + if (!data) return null; + + return { + taskId: event.taskId, + messageType: (data.messageType as string) || "system", + content: (data.content as string) || "", + toolName: data.toolName as string | undefined, + toolInput: data.toolInput as Record | undefined, + isError: data.isError as boolean | undefined, + costUsd: data.costUsd as number | undefined, + durationMs: data.durationMs as number | undefined, + isPartial: false, + taskLabel: + taskMapRef.current.get(event.taskId) || event.taskId, + receivedAt: new Date(event.createdAt).getTime(), + isBackfill: true, + }; + }, + [] + ); + + /** + * Backfill historical log entries for a task from the REST API. + * Only fetches once per task ID (tracked in backfilledTasksRef). + */ + const backfillTask = useCallback( + async (taskId: string) => { + if (backfilledTasksRef.current.has(taskId)) return; + backfilledTasksRef.current.add(taskId); + + try { + const response = await listTaskEvents(taskId); + const events = response.events; + + // The API returns events in DESC order; reverse to get chronological ASC + const chronologicalEvents = [...events].reverse(); + + // Filter to output events and convert, limiting to MAX_BACKFILL_PER_TASK + const backfillEntries: MultiTaskOutputEntry[] = []; + for (const event of chronologicalEvents) { + const entry = convertTaskEventToEntry(event); + if (entry) { + backfillEntries.push(entry); + if (backfillEntries.length >= MAX_BACKFILL_PER_TASK) break; + } + } + + if (backfillEntries.length === 0) return; + + // Prepend historical entries before any existing live entries for this task, + // maintaining overall chronological order across all tasks + setEntries((prev) => { + // Merge backfill entries with existing entries, maintaining chronological order + const merged = [...backfillEntries, ...prev]; + // Sort by receivedAt to ensure proper chronological ordering + merged.sort((a, b) => a.receivedAt - b.receivedAt); + // Trim to maxEntries + if (merged.length > maxEntries) { + return merged.slice(merged.length - maxEntries); + } + return merged; + }); + } catch (e) { + console.error(`Failed to backfill task events for ${taskId}:`, e); + // Remove from backfilled set so it can be retried + backfilledTasksRef.current.delete(taskId); + } + }, + [convertTaskEventToEntry, maxEntries] + ); + // Derive task IDs from the map, stabilized to avoid unnecessary effect triggers const taskIdsKey = useMemo(() => Array.from(taskMap.keys()).sort().join(","), [taskMap]); const taskIds = useMemo(() => Array.from(taskMap.keys()), [taskIdsKey]); // eslint-disable-line react-hooks/exhaustive-deps @@ -56,6 +143,58 @@ export function useMultiTaskSubscription(options: UseMultiTaskSubscriptionOption } }, []); + const backfillTask = useCallback( + async (taskId: string, label: string) => { + if (backfilledTasksRef.current.has(taskId)) return; + backfilledTasksRef.current.add(taskId); + + try { + const response = await getTaskOutput(taskId); + if (response.entries.length === 0) return; + + const historicalEntries: MultiTaskOutputEntry[] = response.entries.map( + (entry) => ({ + taskId: entry.taskId, + messageType: entry.messageType, + content: entry.content, + toolName: entry.toolName, + toolInput: entry.toolInput, + isError: entry.isError, + costUsd: entry.costUsd, + durationMs: entry.durationMs, + isPartial: false, + taskLabel: label, + receivedAt: new Date(entry.createdAt || Date.now()).getTime(), + }) + ); + + setEntries((prev) => { + // De-duplicate by checking if content+taskId+messageType already exists + const existingKeys = new Set( + prev.map( + (e) => + `${e.taskId}:${e.messageType}:${e.content.slice(0, 100)}` + ) + ); + const newHistorical = historicalEntries.filter( + (e) => + !existingKeys.has( + `${e.taskId}:${e.messageType}:${e.content.slice(0, 100)}` + ) + ); + const combined = [...newHistorical, ...prev]; + if (combined.length > maxEntries) { + return combined.slice(combined.length - maxEntries); + } + return combined; + }); + } catch (e) { + console.error(`Failed to backfill task ${taskId}:`, e); + } + }, + [maxEntries] + ); + const connect = useCallback(() => { const currentState = wsRef.current?.readyState; if (currentState === WebSocket.OPEN || currentState === WebSocket.CONNECTING) { @@ -150,6 +289,11 @@ export function useMultiTaskSubscription(options: UseMultiTaskSubscriptionOption // Set desired subscriptions and connect subscribedTasksRef.current = newTaskIds; connect(); + // Backfill all initial tasks + for (const taskId of newTaskIds) { + const label = taskMapRef.current.get(taskId) || taskId; + backfillTask(taskId, label); + } return; } @@ -160,13 +304,15 @@ export function useMultiTaskSubscription(options: UseMultiTaskSubscriptionOption } } - // Subscribe to new tasks + // Subscribe to new tasks and backfill their history for (const newId of newTaskIds) { if (!subscribedTasksRef.current.has(newId)) { subscribeToTask(ws, newId); + const label = taskMapRef.current.get(newId) || newId; + backfillTask(newId, label); } } - }, [taskIds, enabled, connect, subscribeToTask, unsubscribeFromTask]); + }, [taskIds, enabled, connect, subscribeToTask, unsubscribeFromTask, backfillTask]); // Cleanup on unmount useEffect(() => { @@ -182,6 +328,7 @@ export function useMultiTaskSubscription(options: UseMultiTaskSubscriptionOption const clearEntries = useCallback(() => { setEntries([]); + backfilledTasksRef.current.clear(); }, []); return { -- cgit v1.2.3