summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-15 03:26:28 +0000
committersoryu <soryu@soryu.co>2026-01-15 03:26:28 +0000
commiteeafe072bc6bb81459f7d087b48fc921afe9cc11 (patch)
tree7f835993edd732f8ff66d756391dedffe3d44e90
parentc61a2b9b9c988f5460f85980d4ddf285f1a730b5 (diff)
downloadsoryu-eeafe072bc6bb81459f7d087b48fc921afe9cc11.tar.gz
soryu-eeafe072bc6bb81459f7d087b48fc921afe9cc11.zip
Automatically derive repo URL and add notifications for input
-rw-r--r--makima/frontend/src/components/SupervisorQuestionNotification.tsx135
-rw-r--r--makima/frontend/src/components/listen/ContractPickerModal.tsx118
-rw-r--r--makima/frontend/src/components/listen/ControlPanel.tsx32
-rw-r--r--makima/frontend/src/components/mesh/TaskDetail.tsx6
-rw-r--r--makima/frontend/src/contexts/SupervisorQuestionsContext.tsx94
-rw-r--r--makima/frontend/src/lib/api.ts125
-rw-r--r--makima/frontend/src/main.tsx15
-rw-r--r--makima/frontend/src/routes/mesh.tsx9
-rw-r--r--makima/frontend/tsconfig.tsbuildinfo1
-rw-r--r--makima/src/bin/makima.rs12
-rw-r--r--makima/src/daemon/api/supervisor.rs28
-rw-r--r--makima/src/daemon/cli/mod.rs3
-rw-r--r--makima/src/daemon/cli/supervisor.rs26
-rw-r--r--makima/src/daemon/task/manager.rs80
-rw-r--r--makima/src/daemon/worktree/manager.rs183
-rw-r--r--makima/src/server/handlers/mesh.rs687
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs42
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs321
-rw-r--r--makima/src/server/mod.rs6
-rw-r--r--makima/src/server/state.rs185
20 files changed, 2017 insertions, 91 deletions
diff --git a/makima/frontend/src/components/SupervisorQuestionNotification.tsx b/makima/frontend/src/components/SupervisorQuestionNotification.tsx
new file mode 100644
index 0000000..6a71de2
--- /dev/null
+++ b/makima/frontend/src/components/SupervisorQuestionNotification.tsx
@@ -0,0 +1,135 @@
+import { useState } from "react";
+import { useNavigate } from "react-router";
+import { useSupervisorQuestions } from "../contexts/SupervisorQuestionsContext";
+import type { PendingQuestion } from "../lib/api";
+
+export function SupervisorQuestionNotification() {
+ const navigate = useNavigate();
+ const { pendingQuestions, submitAnswer } = useSupervisorQuestions();
+ const [expandedQuestion, setExpandedQuestion] = useState<string | null>(null);
+ const [response, setResponse] = useState("");
+ const [submitting, setSubmitting] = useState(false);
+
+ if (pendingQuestions.length === 0) {
+ return null;
+ }
+
+ const handleGoToTask = (taskId: string) => {
+ navigate(`/mesh/${taskId}`);
+ };
+
+ const handleExpand = (questionId: string) => {
+ setExpandedQuestion(expandedQuestion === questionId ? null : questionId);
+ setResponse("");
+ };
+
+ const handleSubmit = async (question: PendingQuestion) => {
+ if (!response.trim()) return;
+
+ setSubmitting(true);
+ const success = await submitAnswer(question.questionId, response.trim());
+ setSubmitting(false);
+
+ if (success) {
+ setExpandedQuestion(null);
+ setResponse("");
+ }
+ };
+
+ const handleChoiceSelect = async (question: PendingQuestion, choice: string) => {
+ setSubmitting(true);
+ await submitAnswer(question.questionId, choice);
+ setSubmitting(false);
+ };
+
+ return (
+ <div className="fixed bottom-4 right-4 z-50 max-w-md space-y-2">
+ {pendingQuestions.map((question) => (
+ <div
+ key={question.questionId}
+ className="bg-[#0d1b2d] border border-amber-500/50 rounded-lg shadow-lg overflow-hidden"
+ >
+ {/* Header */}
+ <div className="flex items-center justify-between px-4 py-3 bg-amber-900/30">
+ <div className="flex items-center gap-2">
+ <span className="text-amber-400 text-lg">?</span>
+ <span className="font-mono text-sm text-amber-300 uppercase">
+ Supervisor Question
+ </span>
+ </div>
+ <div className="flex items-center gap-2">
+ <button
+ onClick={() => handleGoToTask(question.taskId)}
+ className="px-2 py-1 font-mono text-xs text-amber-400 hover:text-amber-300 transition-colors"
+ title="Go to task"
+ >
+ View Task
+ </button>
+ <button
+ onClick={() => handleExpand(question.questionId)}
+ className="px-2 py-1 font-mono text-xs text-amber-400 border border-amber-500/30 hover:border-amber-400/50 transition-colors uppercase"
+ >
+ {expandedQuestion === question.questionId ? "Collapse" : "Answer"}
+ </button>
+ </div>
+ </div>
+
+ {/* Question preview */}
+ <div className="px-4 py-3">
+ {question.context && (
+ <div className="text-xs text-[#8b949e] font-mono mb-1 uppercase">
+ {question.context}
+ </div>
+ )}
+ <p className="text-sm text-[#dbe7ff] font-mono">
+ {question.question}
+ </p>
+ </div>
+
+ {/* Expanded answer section */}
+ {expandedQuestion === question.questionId && (
+ <div className="px-4 pb-4 border-t border-amber-500/20 pt-3">
+ {question.choices.length > 0 ? (
+ // Choice buttons
+ <div className="space-y-2">
+ <p className="text-xs text-[#8b949e] font-mono uppercase mb-2">
+ Select an option:
+ </p>
+ {question.choices.map((choice, idx) => (
+ <button
+ key={idx}
+ onClick={() => handleChoiceSelect(question, choice)}
+ disabled={submitting}
+ className="w-full px-3 py-2 text-left font-mono text-sm text-[#dbe7ff] bg-[#0a1628] border border-[#3f6fb3] hover:border-amber-400/50 hover:bg-amber-900/20 disabled:opacity-50 transition-colors"
+ >
+ {choice}
+ </button>
+ ))}
+ </div>
+ ) : (
+ // Free-form text input
+ <div className="space-y-2">
+ <textarea
+ value={response}
+ onChange={(e) => setResponse(e.target.value)}
+ placeholder="Type your response..."
+ rows={3}
+ className="w-full px-3 py-2 bg-[#0a1628] border border-[#3f6fb3] text-[#dbe7ff] font-mono text-sm focus:outline-none focus:border-amber-400 resize-none"
+ disabled={submitting}
+ />
+ <button
+ onClick={() => handleSubmit(question)}
+ disabled={submitting || !response.trim()}
+ className="w-full px-4 py-2 font-mono text-xs text-[#0a1628] bg-amber-500 hover:bg-amber-400 disabled:bg-amber-700 disabled:cursor-not-allowed transition-colors uppercase"
+ >
+ {submitting ? "Submitting..." : "Submit Response"}
+ </button>
+ </div>
+ )}
+ </div>
+ )}
+ </div>
+ ))}
+ </div>
+ );
+}
diff --git a/makima/frontend/src/components/listen/ContractPickerModal.tsx b/makima/frontend/src/components/listen/ContractPickerModal.tsx
new file mode 100644
index 0000000..961ccba
--- /dev/null
+++ b/makima/frontend/src/components/listen/ContractPickerModal.tsx
@@ -0,0 +1,118 @@
+import { useEffect, useRef } from "react";
+import type { ContractOption } from "./ControlPanel";
+
+interface ContractPickerModalProps {
+ isOpen: boolean;
+ onClose: () => void;
+ contracts: ContractOption[];
+ selectedContractId: string | null;
+ onSelect: (contractId: string | null) => void;
+ loading?: boolean;
+}
+
+export function ContractPickerModal({
+ isOpen,
+ onClose,
+ contracts,
+ selectedContractId,
+ onSelect,
+ loading,
+}: ContractPickerModalProps) {
+ const modalRef = useRef<HTMLDivElement>(null);
+
+ useEffect(() => {
+ if (!isOpen) return;
+
+ function handleKeyDown(e: KeyboardEvent) {
+ if (e.key === "Escape") {
+ onClose();
+ }
+ }
+
+ function handleClickOutside(e: MouseEvent) {
+ if (modalRef.current && !modalRef.current.contains(e.target as Node)) {
+ onClose();
+ }
+ }
+
+ document.addEventListener("keydown", handleKeyDown);
+ document.addEventListener("mousedown", handleClickOutside);
+
+ return () => {
+ document.removeEventListener("keydown", handleKeyDown);
+ document.removeEventListener("mousedown", handleClickOutside);
+ };
+ }, [isOpen, onClose]);
+
+ if (!isOpen) return null;
+
+ const handleSelect = (contractId: string | null) => {
+ onSelect(contractId);
+ onClose();
+ };
+
+ return (
+ <div className="fixed inset-0 z-50 flex items-center justify-center bg-black/60">
+ <div
+ ref={modalRef}
+ className="panel p-4 w-[300px] max-h-[400px] flex flex-col gap-3"
+ >
+ <div className="flex items-center justify-between">
+ <h2 className="font-mono text-sm text-[#dbe7ff] uppercase tracking-wide">
+ Select Contract
+ </h2>
+ <button
+ onClick={onClose}
+ className="font-mono text-xs text-[#9bc3ff] hover:text-[#dbe7ff] transition-colors"
+ >
+ [X]
+ </button>
+ </div>
+
+ <div className="flex-1 overflow-y-auto flex flex-col gap-1 min-h-0">
+ {loading ? (
+ <div className="font-mono text-xs text-[#9bc3ff] text-center py-4">
+ Loading...
+ </div>
+ ) : (
+ <>
+ <button
+ onClick={() => handleSelect(null)}
+ className={`w-full text-left px-3 py-2 font-mono text-xs border transition-colors ${
+ selectedContractId === null
+ ? "bg-[#0f3c78]/50 border-[#3f6fb3] text-[#dbe7ff]"
+ : "bg-[#0d1b2d] border-[#0f3c78] text-[#9bc3ff] hover:border-[#3f6fb3] hover:text-[#dbe7ff]"
+ }`}
+ >
+ <span className="uppercase tracking-wide">Ephemeral</span>
+ <span className="block text-[10px] text-[#75aafc] mt-0.5">
+ Transcript not saved
+ </span>
+ </button>
+
+ {contracts.map((contract) => (
+ <button
+ key={contract.id}
+ onClick={() => handleSelect(contract.id)}
+ className={`w-full text-left px-3 py-2 font-mono text-xs border transition-colors ${
+ selectedContractId === contract.id
+ ? "bg-[#0f3c78]/50 border-[#3f6fb3] text-[#dbe7ff]"
+ : "bg-[#0d1b2d] border-[#0f3c78] text-[#9bc3ff] hover:border-[#3f6fb3] hover:text-[#dbe7ff]"
+ }`}
+ >
+ <span className="block truncate">{contract.name}</span>
+ </button>
+ ))}
+
+ {contracts.length === 0 && (
+ <div className="font-mono text-xs text-[#9bc3ff] text-center py-4">
+ No contracts available
+ </div>
+ )}
+ </>
+ )}
+ </div>
+ </div>
+ </div>
+ );
+}
diff --git a/makima/frontend/src/components/listen/ControlPanel.tsx b/makima/frontend/src/components/listen/ControlPanel.tsx
index 35834d4..f0e5702 100644
--- a/makima/frontend/src/components/listen/ControlPanel.tsx
+++ b/makima/frontend/src/components/listen/ControlPanel.tsx
@@ -1,5 +1,7 @@
+import { useState } from "react";
import { Logo } from "../Logo";
import type { MicrophoneStatus } from "../../hooks/useMicrophone";
+import { ContractPickerModal } from "./ContractPickerModal";
export interface ContractOption {
id: string;
@@ -53,9 +55,12 @@ export function ControlPanel({
onContractChange,
contractsLoading,
}: ControlPanelProps) {
+ const [isModalOpen, setIsModalOpen] = useState(false);
const statusText = getStatusText(isListening, micStatus);
const isRequesting = micStatus === "requesting";
+ const selectedContract = contracts.find((c) => c.id === selectedContractId);
+
return (
<div className="panel p-4 flex flex-col items-center justify-center gap-3">
{/* Logo button */}
@@ -147,21 +152,24 @@ export function ControlPanel({
>
New
</button>
- <select
- value={selectedContractId || ""}
- onChange={(e) => onContractChange(e.target.value || null)}
+ <button
+ onClick={() => setIsModalOpen(true)}
disabled={isListening || contractsLoading}
- className="px-3 py-1.5 font-mono text-xs text-[#dbe7ff] bg-[#0d1b2d] border border-[#0f3c78] focus:border-[#3f6fb3] transition-colors disabled:opacity-50 disabled:cursor-not-allowed uppercase tracking-wide"
- title={selectedContractId ? "Saving to selected contract" : "Transcript not saved"}
+ className="px-3 py-1.5 font-mono text-xs text-[#dbe7ff] bg-[#0d1b2d] border border-[#0f3c78] hover:border-[#3f6fb3] transition-colors disabled:opacity-50 disabled:cursor-not-allowed uppercase tracking-wide"
+ title={selectedContract ? `Saving to: ${selectedContract.name}` : "Transcript not saved"}
>
- <option value="">Ephemeral Transcript</option>
- {contracts.map((contract) => (
- <option key={contract.id} value={contract.id}>
- {contract.name}
- </option>
- ))}
- </select>
+ {selectedContract ? "Contract" : "Ephemeral"}
+ </button>
</div>
+
+ <ContractPickerModal
+ isOpen={isModalOpen}
+ onClose={() => setIsModalOpen(false)}
+ contracts={contracts}
+ selectedContractId={selectedContractId}
+ onSelect={onContractChange}
+ loading={contractsLoading}
+ />
</div>
);
}
diff --git a/makima/frontend/src/components/mesh/TaskDetail.tsx b/makima/frontend/src/components/mesh/TaskDetail.tsx
index 967b1d1..8e853e7 100644
--- a/makima/frontend/src/components/mesh/TaskDetail.tsx
+++ b/makima/frontend/src/components/mesh/TaskDetail.tsx
@@ -144,6 +144,10 @@ export function TaskDetail({
const isTaskRunning = task.status === "running" || task.status === "initializing" || task.status === "starting";
// Check if task is in a terminal state (can be continued/reopened)
const isTaskTerminal = task.status === "done" || task.status === "failed" || task.status === "merged";
+ // Check if this is a supervisor task
+ const isSupervisor = task.isSupervisor === true;
+ // Show continue for supervisors (always) or terminal states for other tasks
+ const canContinue = isSupervisor || isTaskTerminal;
// Calculate subtask statistics
const subtaskStats = useMemo(
@@ -356,7 +360,7 @@ export function TaskDetail({
)}
</div>
)}
- {isTaskTerminal && (
+ {canContinue && (
<button
onClick={() => onContinue(task.id)}
className="px-3 py-1 font-mono text-xs text-cyan-400 border border-cyan-400/30 hover:border-cyan-400/50 hover:bg-cyan-400/10 transition-colors uppercase flex items-center gap-1"
diff --git a/makima/frontend/src/contexts/SupervisorQuestionsContext.tsx b/makima/frontend/src/contexts/SupervisorQuestionsContext.tsx
new file mode 100644
index 0000000..aa1bb12
--- /dev/null
+++ b/makima/frontend/src/contexts/SupervisorQuestionsContext.tsx
@@ -0,0 +1,94 @@
+import { createContext, useContext, useState, useEffect, useCallback, ReactNode } from "react";
+import { listPendingQuestions, answerQuestion, type PendingQuestion } from "../lib/api";
+import { useAuth } from "./AuthContext";
+
+interface SupervisorQuestionsContextValue {
+ pendingQuestions: PendingQuestion[];
+ loading: boolean;
+ error: string | null;
+ refreshQuestions: () => Promise<void>;
+ submitAnswer: (questionId: string, response: string) => Promise<boolean>;
+}
+
+const SupervisorQuestionsContext = createContext<SupervisorQuestionsContextValue | null>(null);
+
+export function SupervisorQuestionsProvider({ children }: { children: ReactNode }) {
+ const { isAuthenticated } = useAuth();
+ const [pendingQuestions, setPendingQuestions] = useState<PendingQuestion[]>([]);
+ const [loading, setLoading] = useState(false);
+ const [error, setError] = useState<string | null>(null);
+
+ const refreshQuestions = useCallback(async () => {
+ if (!isAuthenticated) return;
+
+ try {
+ setLoading(true);
+ const questions = await listPendingQuestions();
+ if (questions.length > 0) {
+ console.log("[SupervisorQuestions] Received questions:", questions);
+ }
+ setPendingQuestions(questions);
+ setError(null);
+ } catch (err) {
+ // Log but don't spam
+ console.warn("[SupervisorQuestions] Failed to fetch:", err);
+ setError(err instanceof Error ? err.message : "Failed to load questions");
+ } finally {
+ setLoading(false);
+ }
+ }, [isAuthenticated]);
+
+ const submitAnswer = useCallback(async (questionId: string, response: string): Promise<boolean> => {
+ try {
+ const result = await answerQuestion(questionId, response);
+ if (result.success) {
+ // Remove the question from local state
+ setPendingQuestions(prev => prev.filter(q => q.questionId !== questionId));
+ }
+ return result.success;
+ } catch (err) {
+ console.error("Failed to submit answer:", err);
+ return false;
+ }
+ }, []);
+
+ // Poll for questions every 5 seconds when authenticated
+ useEffect(() => {
+ if (!isAuthenticated) {
+ setPendingQuestions([]);
+ return;
+ }
+
+ // Initial fetch (delayed slightly to ensure auth is ready)
+ const initialTimeout = setTimeout(refreshQuestions, 500);
+
+ // Poll periodically (every 5 seconds for responsiveness)
+ const interval = setInterval(refreshQuestions, 5000);
+ return () => {
+ clearTimeout(initialTimeout);
+ clearInterval(interval);
+ };
+ }, [isAuthenticated, refreshQuestions]);
+
+ return (
+ <SupervisorQuestionsContext.Provider
+ value={{
+ pendingQuestions,
+ loading,
+ error,
+ refreshQuestions,
+ submitAnswer,
+ }}
+ >
+ {children}
+ </SupervisorQuestionsContext.Provider>
+ );
+}
+
+export function useSupervisorQuestions() {
+ const context = useContext(SupervisorQuestionsContext);
+ if (!context) {
+ throw new Error("useSupervisorQuestions must be used within SupervisorQuestionsProvider");
+ }
+ return context;
+}
diff --git a/makima/frontend/src/lib/api.ts b/makima/frontend/src/lib/api.ts
index d7ac8b6..2ea1128 100644
--- a/makima/frontend/src/lib/api.ts
+++ b/makima/frontend/src/lib/api.ts
@@ -590,6 +590,9 @@ export interface Task {
version: number;
createdAt: string;
updatedAt: string;
+
+ // Supervisor flag
+ isSupervisor: boolean;
}
export interface TaskWithSubtasks extends Task {
@@ -892,6 +895,75 @@ export async function checkTargetExists(
return res.json();
}
+// =============================================================================
+// Task Recovery (Daemon Failover)
+// =============================================================================
+
+/** Request to reassign a task to a new daemon */
+export interface ReassignTaskRequest {
+ targetDaemonId?: string;
+ includeContext?: boolean;
+}
+
+/** Response from reassigning a task */
+export interface ReassignTaskResponse {
+ task: Task;
+ daemonId: string;
+ oldTaskId: string;
+ contextIncluded: boolean;
+ contextEntries: number;
+}
+
+/**
+ * Reassign a task to a new daemon after daemon disconnect.
+ * Creates a new task with conversation context, deletes the old one.
+ */
+export async function reassignTask(
+ taskId: string,
+ options?: ReassignTaskRequest
+): Promise<ReassignTaskResponse> {
+ const res = await authFetch(`${API_BASE}/api/v1/mesh/tasks/${taskId}/reassign`, {
+ method: "POST",
+ body: JSON.stringify(options || {}),
+ });
+ if (!res.ok) {
+ const errorText = await res.text();
+ throw new Error(`Failed to reassign task: ${errorText || res.statusText}`);
+ }
+ return res.json();
+}
+
+/** Request to continue a task */
+export interface ContinueTaskRequest {
+ targetDaemonId?: string;
+}
+
+/** Response from continuing a task */
+export interface ContinueTaskResponse {
+ task: Task;
+ daemonId: string;
+ contextEntries: number;
+}
+
+/**
+ * Continue a task after daemon disconnect by restarting it with conversation context.
+ * Unlike reassign, this keeps the same task ID.
+ */
+export async function continueTask(
+ taskId: string,
+ options?: ContinueTaskRequest
+): Promise<ContinueTaskResponse> {
+ const res = await authFetch(`${API_BASE}/api/v1/mesh/tasks/${taskId}/continue`, {
+ method: "POST",
+ body: JSON.stringify(options || {}),
+ });
+ if (!res.ok) {
+ const errorText = await res.text();
+ throw new Error(`Failed to continue task: ${errorText || res.statusText}`);
+ }
+ return res.json();
+}
+
export async function listSubtasks(taskId: string): Promise<TaskListResponse> {
const res = await authFetch(`${API_BASE}/api/v1/mesh/tasks/${taskId}/subtasks`);
if (!res.ok) {
@@ -1848,3 +1920,56 @@ export async function getTemplate(id: string): Promise<FileTemplate> {
}
return res.json();
}
+
+// =============================================================================
+// Supervisor Question Types and Functions
+// =============================================================================
+
+export interface PendingQuestion {
+ questionId: string;
+ taskId: string;
+ contractId: string;
+ question: string;
+ choices: string[];
+ context: string | null;
+ createdAt: string;
+}
+
+export interface AnswerQuestionRequest {
+ response: string;
+}
+
+export interface AnswerQuestionResponse {
+ success: boolean;
+}
+
+/**
+ * Get all pending supervisor questions for the current user.
+ */
+export async function listPendingQuestions(): Promise<PendingQuestion[]> {
+ const res = await authFetch(`${API_BASE}/api/v1/mesh/questions`);
+ if (!res.ok) {
+ throw new Error(`Failed to list questions: ${res.statusText}`);
+ }
+ return res.json();
+}
+
+/**
+ * Answer a pending supervisor question.
+ */
+export async function answerQuestion(
+ questionId: string,
+ response: string
+): Promise<AnswerQuestionResponse> {
+ const res = await authFetch(
+ `${API_BASE}/api/v1/mesh/questions/${questionId}/answer`,
+ {
+ method: "POST",
+ body: JSON.stringify({ response }),
+ }
+ );
+ if (!res.ok) {
+ throw new Error(`Failed to answer question: ${res.statusText}`);
+ }
+ return res.json();
+}
diff --git a/makima/frontend/src/main.tsx b/makima/frontend/src/main.tsx
index 496a569..5d389fc 100644
--- a/makima/frontend/src/main.tsx
+++ b/makima/frontend/src/main.tsx
@@ -3,7 +3,9 @@ import { createRoot } from "react-dom/client";
import { BrowserRouter, Routes, Route } from "react-router";
import "./index.css";
import { AuthProvider } from "./contexts/AuthContext";
+import { SupervisorQuestionsProvider } from "./contexts/SupervisorQuestionsContext";
import { GridOverlay } from "./components/GridOverlay";
+import { SupervisorQuestionNotification } from "./components/SupervisorQuestionNotification";
import { ProtectedRoute } from "./components/ProtectedRoute";
import HomePage from "./routes/_index";
import ListenPage from "./routes/listen";
@@ -17,9 +19,11 @@ import SettingsPage from "./routes/settings";
createRoot(document.getElementById("root")!).render(
<StrictMode>
<AuthProvider>
- <BrowserRouter>
- <GridOverlay />
- <Routes>
+ <SupervisorQuestionsProvider>
+ <BrowserRouter>
+ <GridOverlay />
+ <SupervisorQuestionNotification />
+ <Routes>
<Route path="/" element={<HomePage />} />
<Route path="/login" element={<LoginPage />} />
<Route
@@ -94,8 +98,9 @@ createRoot(document.getElementById("root")!).render(
</ProtectedRoute>
}
/>
- </Routes>
- </BrowserRouter>
+ </Routes>
+ </BrowserRouter>
+ </SupervisorQuestionsProvider>
</AuthProvider>
</StrictMode>
);
diff --git a/makima/frontend/src/routes/mesh.tsx b/makima/frontend/src/routes/mesh.tsx
index d067865..ed5a6d0 100644
--- a/makima/frontend/src/routes/mesh.tsx
+++ b/makima/frontend/src/routes/mesh.tsx
@@ -8,7 +8,7 @@ import { UnifiedMeshChatInput } from "../components/mesh/UnifiedMeshChatInput";
import { useTasks } from "../hooks/useTasks";
import { useTaskSubscription, type TaskUpdateEvent, type TaskOutputEvent } from "../hooks/useTaskSubscription";
import type { TaskWithSubtasks, MeshChatContext, ContractSummary, ContractWithRelations, DaemonDirectory } from "../lib/api";
-import { startTask as startTaskApi, stopTask as stopTaskApi, getTaskOutput, listContracts, getContract, getDaemonDirectories } from "../lib/api";
+import { startTask as startTaskApi, stopTask as stopTaskApi, getTaskOutput, listContracts, getContract, getDaemonDirectories, continueTask as continueTaskApi } from "../lib/api";
import { DirectoryInput } from "../components/mesh/DirectoryInput";
import { useAuth } from "../contexts/AuthContext";
@@ -374,9 +374,10 @@ export default function MeshPage() {
const handleContinue = useCallback(
async (taskId: string) => {
try {
- // Start the task again from terminal state
- const updated = await startTaskApi(taskId);
- setTaskDetail((prev) => prev ? { ...prev, ...updated } : prev);
+ // Continue the task with conversation context from previous run
+ const result = await continueTaskApi(taskId);
+ console.log(`[Mesh] Task continued with ${result.contextEntries} context entries`);
+ setTaskDetail((prev) => prev ? { ...prev, ...result.task } : prev);
} catch (e) {
console.error("Failed to continue task:", e);
alert(e instanceof Error ? e.message : "Failed to continue task");
diff --git a/makima/frontend/tsconfig.tsbuildinfo b/makima/frontend/tsconfig.tsbuildinfo
new file mode 100644
index 0000000..7af14b5
--- /dev/null
+++ b/makima/frontend/tsconfig.tsbuildinfo
@@ -0,0 +1 @@
+{"root":["./src/main.tsx","./src/vite-env.d.ts","./src/components/gridoverlay.tsx","./src/components/japanesehovertext.tsx","./src/components/logo.tsx","./src/components/masthead.tsx","./src/components/navstrip.tsx","./src/components/protectedroute.tsx","./src/components/rewritelink.tsx","./src/components/simplemarkdown.tsx","./src/components/supervisorquestionnotification.tsx","./src/components/charts/chartrenderer.tsx","./src/components/contracts/contractcliinput.tsx","./src/components/contracts/contractdetail.tsx","./src/components/contracts/contractlist.tsx","./src/components/contracts/phasebadge.tsx","./src/components/contracts/phasedeliverablespanel.tsx","./src/components/contracts/phasehint.tsx","./src/components/contracts/phaseprogressbar.tsx","./src/components/contracts/quickactionbuttons.tsx","./src/components/contracts/repositorypanel.tsx","./src/components/contracts/taskderivationpreview.tsx","./src/components/files/bodyrenderer.tsx","./src/components/files/cliinput.tsx","./src/components/files/conflictnotification.tsx","./src/components/files/elementcontextmenu.tsx","./src/components/files/filedetail.tsx","./src/components/files/filelist.tsx","./src/components/files/reposyncindicator.tsx","./src/components/files/updatenotification.tsx","./src/components/files/versionhistorydropdown.tsx","./src/components/listen/contractpickermodal.tsx","./src/components/listen/controlpanel.tsx","./src/components/listen/speakerpanel.tsx","./src/components/listen/transcriptanalysispanel.tsx","./src/components/listen/transcriptpanel.tsx","./src/components/mesh/directoryinput.tsx","./src/components/mesh/inlinesubtaskeditor.tsx","./src/components/mesh/mergeconflictresolver.tsx","./src/components/mesh/overlaydiffviewer.tsx","./src/components/mesh/prpreview.tsx","./src/components/mesh/subtasktree.tsx","./src/components/mesh/taskdetail.tsx","./src/components/mesh/tasklist.tsx","./src/components/mesh/taskoutput.tsx","./src/components/mesh/tasktree.tsx","./src/components/mesh/unifiedmeshchatinput.tsx","./src/components/workflow/phasecolumn.tsx","./src/components/workflow/workflowboard.tsx","./src/components/workflow/workflowcontractcard.tsx","./src/contexts/authcontext.tsx","./src/contexts/supervisorquestionscontext.tsx","./src/hooks/usecontracts.ts","./src/hooks/usefilesubscription.ts","./src/hooks/usefiles.ts","./src/hooks/usemeshchathistory.ts","./src/hooks/usemicrophone.ts","./src/hooks/usetasksubscription.ts","./src/hooks/usetasks.ts","./src/hooks/usetextscramble.ts","./src/hooks/useversionhistory.ts","./src/hooks/usewebsocket.ts","./src/lib/api.ts","./src/lib/listenapi.ts","./src/lib/markdown.ts","./src/lib/supabase.ts","./src/routes/_index.tsx","./src/routes/contracts.tsx","./src/routes/files.tsx","./src/routes/listen.tsx","./src/routes/login.tsx","./src/routes/mesh.tsx","./src/routes/settings.tsx","./src/routes/workflow.tsx","./src/types/messages.ts"],"version":"5.9.3"} \ No newline at end of file
diff --git a/makima/src/bin/makima.rs b/makima/src/bin/makima.rs
index 649a8e7..35783dc 100644
--- a/makima/src/bin/makima.rs
+++ b/makima/src/bin/makima.rs
@@ -321,6 +321,18 @@ async fn run_supervisor(
let result = client.supervisor_status(args.contract_id).await?;
println!("{}", serde_json::to_string(&result.0)?);
}
+ SupervisorCommand::Ask(args) => {
+ let client = ApiClient::new(args.common.api_url, args.common.api_key)?;
+ eprintln!("Asking user: {}...", args.question);
+ let choices = args
+ .choices
+ .map(|c| c.split(',').map(|s| s.trim().to_string()).collect())
+ .unwrap_or_default();
+ let result = client
+ .supervisor_ask(&args.question, choices, args.context, args.timeout)
+ .await?;
+ println!("{}", serde_json::to_string(&result.0)?);
+ }
}
Ok(())
diff --git a/makima/src/daemon/api/supervisor.rs b/makima/src/daemon/api/supervisor.rs
index b691cc4..a1e8682 100644
--- a/makima/src/daemon/api/supervisor.rs
+++ b/makima/src/daemon/api/supervisor.rs
@@ -62,6 +62,17 @@ pub struct CheckpointRequest {
pub message: String,
}
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct AskQuestionRequest {
+ pub question: String,
+ #[serde(skip_serializing_if = "Vec::is_empty")]
+ pub choices: Vec<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub context: Option<String>,
+ pub timeout_seconds: i32,
+}
+
// Generic response type for JSON output
#[derive(Deserialize, Serialize)]
pub struct JsonValue(pub serde_json::Value);
@@ -183,4 +194,21 @@ impl ApiClient {
self.get(&format!("/api/v1/contracts/{}/daemon/status", contract_id))
.await
}
+
+ /// Ask a question and wait for user feedback.
+ pub async fn supervisor_ask(
+ &self,
+ question: &str,
+ choices: Vec<String>,
+ context: Option<String>,
+ timeout_seconds: i32,
+ ) -> Result<JsonValue, ApiError> {
+ let req = AskQuestionRequest {
+ question: question.to_string(),
+ choices,
+ context,
+ timeout_seconds,
+ };
+ self.post("/api/v1/mesh/supervisor/questions", &req).await
+ }
}
diff --git a/makima/src/daemon/cli/mod.rs b/makima/src/daemon/cli/mod.rs
index 24c19c6..66c7941 100644
--- a/makima/src/daemon/cli/mod.rs
+++ b/makima/src/daemon/cli/mod.rs
@@ -76,6 +76,9 @@ pub enum SupervisorCommand {
/// Get contract status
Status(SupervisorArgs),
+
+ /// Ask a question and wait for user feedback
+ Ask(supervisor::AskArgs),
}
/// Contract subcommands for task-contract interaction.
diff --git a/makima/src/daemon/cli/supervisor.rs b/makima/src/daemon/cli/supervisor.rs
index 00c7ff4..017730d 100644
--- a/makima/src/daemon/cli/supervisor.rs
+++ b/makima/src/daemon/cli/supervisor.rs
@@ -42,6 +42,10 @@ pub struct SpawnArgs {
/// Checkpoint SHA to start from
#[arg(long)]
pub checkpoint: Option<String>,
+
+ /// Repository URL (local path or remote URL). If not provided, will try to detect from current directory.
+ #[arg(long)]
+ pub repo: Option<String>,
}
/// Arguments for wait command.
@@ -144,3 +148,25 @@ pub struct CheckpointArgs {
/// Checkpoint message
pub message: String,
}
+
+/// Arguments for ask command (ask user a question).
+#[derive(Args, Debug)]
+pub struct AskArgs {
+ #[command(flatten)]
+ pub common: SupervisorArgs,
+
+ /// The question to ask
+ pub question: String,
+
+ /// Optional choices (comma-separated)
+ #[arg(long)]
+ pub choices: Option<String>,
+
+ /// Context about what this relates to
+ #[arg(long)]
+ pub context: Option<String>,
+
+ /// Timeout in seconds (default: 3600 = 1 hour)
+ #[arg(long, default_value = "3600")]
+ pub timeout: i32,
+}
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs
index 8269083..3b4ffdd 100644
--- a/makima/src/daemon/task/manager.rs
+++ b/makima/src/daemon/task/manager.rs
@@ -27,12 +27,27 @@ fn generate_tool_key() -> String {
}
/// Check if output contains an OAuth authentication error.
-fn is_oauth_auth_error(output: &str) -> bool {
+/// Only checks system/error messages, not assistant responses (which could mention auth errors conversationally).
+fn is_oauth_auth_error(output: &str, json_type: Option<&str>, is_stdout: bool) -> bool {
+ // Only check system messages or stderr output - not assistant messages
+ // which could mention auth errors in conversation
+ match json_type {
+ Some("assistant") | Some("user") | Some("result") => return false,
+ _ => {}
+ }
+
+ // For stdout JSON messages, only check system/error types
+ if is_stdout && json_type.is_none() {
+ // Non-JSON stdout output - could be startup messages, check carefully
+ // Only match very specific patterns that wouldn't appear in conversation
+ }
+
// Match various authentication error patterns from Claude Code
- if output.contains("Please run /login") {
+ // These patterns are specific enough that they shouldn't appear in normal conversation
+ if output.contains("Please run /login") && output.contains("authenticate") {
return true;
}
- if output.contains("Invalid API key") {
+ if output.contains("Invalid API key") && output.contains("ANTHROPIC_API_KEY") {
return true;
}
if output.contains("authentication_error")
@@ -41,6 +56,10 @@ fn is_oauth_auth_error(output: &str) -> bool {
{
return true;
}
+ // Check for Claude Code's specific error format
+ if output.contains("\"type\":\"error\"") && output.contains("authentication") {
+ return true;
+ }
false
}
@@ -695,6 +714,51 @@ makima supervisor checkpoints [task_id]
makima supervisor status
```
+### User Feedback
+```bash
+# Ask a free-form question
+makima supervisor ask "Your question here"
+
+# Ask with choices (comma-separated)
+makima supervisor ask "Choose an option" --choices "Option A,Option B,Option C"
+
+# Ask with context
+makima supervisor ask "Ready to proceed?" --context "After completing task X"
+
+# Ask with custom timeout (default 1 hour)
+makima supervisor ask "Question" --timeout 3600
+```
+
+## User Feedback (Ask Command)
+
+You can ask the user questions when you need clarification or approval:
+
+```bash
+# Ask a free-form question (waits for user to respond)
+makima supervisor ask "What authentication method should I use?"
+
+# Ask with predefined choices
+makima supervisor ask "Ready to create PR?" --choices "Yes,No,Need more changes"
+
+# Ask with context
+makima supervisor ask "Should I proceed?" --context "Plan phase complete"
+```
+
+The ask command will block until the user responds (or timeout). Use this to:
+- Clarify requirements before starting work
+- Get approval before creating PRs
+- Ask for guidance when tasks fail
+
+## Contract Phase Progression
+
+### For "Simple" contracts (Plan → Execute):
+1. **Plan Phase**: Review the plan document and understand the goal
+2. **Execute Phase**: Spawn tasks to implement the plan, then create PR
+3. Mark contract as complete when PR is created
+
+### For "Specification" contracts (Research → Specify → Plan → Execute → Review):
+Progress through each phase, spawning tasks as needed and asking for user feedback.
+
## Key Points
1. **Create a makima branch first** - use `branch "makima/{name}"` for the contract's work
@@ -703,6 +767,7 @@ makima supervisor status
4. **Never fire-and-forget** - always wait for each task before moving on
5. **Merge to your makima branch** - use `merge <task_id> --to "makima/{name}"` to collect completed work
6. **Create PR when done** - use `pr "makima/{name}" --title "..." --base main`
+7. **Ask when unsure** - use `ask` to get user feedback on decisions
## Standard Workflow
@@ -711,16 +776,19 @@ makima supervisor status
- `spawn` - Create task
- `wait` - Block until complete
- `merge --to "makima/{name}"` - Merge to branch
-3. `pr "makima/{name}" --title "..." --base main` - Create PR
+3. `ask "Ready to create PR?"` - Get user approval
+4. `pr "makima/{name}" --title "..." --base main` - Create PR
## Important Reminders
- **ONLY YOU can spawn tasks** - regular tasks cannot create children
- **NEVER implement anything yourself** - always spawn tasks
- **ALWAYS create a makima branch** - use `makima/{name}` naming convention
+- **ASK for feedback** when you need clarification or approval
- Tasks run independently - you just coordinate
- You will be resumed if interrupted - your conversation is preserved
- Create checkpoints before major transitions
+- **Mark contract complete** when PR is created by updating status
---
@@ -2834,6 +2902,8 @@ impl TaskManagerInner {
// Check for OAuth auth error before sending output
let content_for_auth_check = line.content.clone();
+ let json_type_for_auth_check = line.json_type.clone();
+ let is_stdout_for_auth_check = line.is_stdout;
let msg = DaemonMessage::task_output(task_id, line.content, false);
if ws_tx.send(msg).await.is_err() {
@@ -2842,7 +2912,7 @@ impl TaskManagerInner {
}
// Detect OAuth token expiration and trigger remote login flow
- if !auth_error_handled && is_oauth_auth_error(&content_for_auth_check) {
+ if !auth_error_handled && is_oauth_auth_error(&content_for_auth_check, json_type_for_auth_check.as_deref(), is_stdout_for_auth_check) {
auth_error_handled = true;
tracing::warn!(task_id = %task_id, "OAuth authentication error detected, initiating remote login flow");
diff --git a/makima/src/daemon/worktree/manager.rs b/makima/src/daemon/worktree/manager.rs
index 9af5dcb..ff0e9e7 100644
--- a/makima/src/daemon/worktree/manager.rs
+++ b/makima/src/daemon/worktree/manager.rs
@@ -333,34 +333,76 @@ impl WorktreeManager {
// Create base directory
tokio::fs::create_dir_all(&self.base_dir).await?;
- tracing::info!(
- task_id = %task_id,
- worktree_path = %worktree_path.display(),
- branch = %branch_name,
- base_branch = %base_branch,
- "Creating worktree from local branch"
- );
+ // Prune stale worktree entries first (handles "missing but registered" errors)
+ let _ = Command::new("git")
+ .args(["worktree", "prune"])
+ .current_dir(source_repo)
+ .output()
+ .await;
- // Create the worktree with a new branch based on the local base_branch
- let output = Command::new("git")
- .args([
- "worktree",
- "add",
- "-b",
- &branch_name,
- ])
- .arg(&worktree_path)
- .arg(base_branch)
+ // Check if the branch already exists (e.g., from a previous run of the same task)
+ let branch_exists = Command::new("git")
+ .args(["rev-parse", "--verify", &format!("refs/heads/{}", branch_name)])
.current_dir(source_repo)
.output()
- .await?;
+ .await
+ .map(|o| o.status.success())
+ .unwrap_or(false);
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- return Err(WorktreeError::GitCommand(format!(
- "Failed to create worktree: {}",
- stderr
- )));
+ if branch_exists {
+ tracing::info!(
+ task_id = %task_id,
+ worktree_path = %worktree_path.display(),
+ branch = %branch_name,
+ "Branch already exists, creating worktree from existing branch"
+ );
+
+ // Use existing branch - try without force first, then with force
+ let output = Command::new("git")
+ .args(["worktree", "add", "-f"])
+ .arg(&worktree_path)
+ .arg(&branch_name)
+ .current_dir(source_repo)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(WorktreeError::GitCommand(format!(
+ "Failed to create worktree from existing branch: {}",
+ stderr
+ )));
+ }
+ } else {
+ tracing::info!(
+ task_id = %task_id,
+ worktree_path = %worktree_path.display(),
+ branch = %branch_name,
+ base_branch = %base_branch,
+ "Creating worktree with new branch"
+ );
+
+ // Create the worktree with a new branch based on the local base_branch
+ let output = Command::new("git")
+ .args([
+ "worktree",
+ "add",
+ "-b",
+ &branch_name,
+ ])
+ .arg(&worktree_path)
+ .arg(base_branch)
+ .current_dir(source_repo)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(WorktreeError::GitCommand(format!(
+ "Failed to create worktree: {}",
+ stderr
+ )));
+ }
}
tracing::info!(
@@ -439,35 +481,78 @@ impl WorktreeManager {
// Create base directory
tokio::fs::create_dir_all(&self.base_dir).await?;
- tracing::info!(
- task_id = %task_id,
- source_worktree = %source_worktree.display(),
- worktree_path = %worktree_path.display(),
- branch = %branch_name,
- source_commit = %source_commit,
- "Creating worktree from source task"
- );
+ // Prune stale worktree entries first (handles "missing but registered" errors)
+ let _ = Command::new("git")
+ .args(["worktree", "prune"])
+ .current_dir(&source_repo)
+ .output()
+ .await;
- // Create a new worktree based on the source commit
- let output = Command::new("git")
- .args([
- "worktree",
- "add",
- "-b",
- &branch_name,
- ])
- .arg(&worktree_path)
- .arg(&source_commit)
+ // Check if the branch already exists (e.g., from a previous run of the same task)
+ let branch_exists = Command::new("git")
+ .args(["rev-parse", "--verify", &format!("refs/heads/{}", branch_name)])
.current_dir(&source_repo)
.output()
- .await?;
+ .await
+ .map(|o| o.status.success())
+ .unwrap_or(false);
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- return Err(WorktreeError::GitCommand(format!(
- "Failed to create worktree: {}",
- stderr
- )));
+ if branch_exists {
+ tracing::info!(
+ task_id = %task_id,
+ source_worktree = %source_worktree.display(),
+ worktree_path = %worktree_path.display(),
+ branch = %branch_name,
+ "Branch already exists, creating worktree from existing branch"
+ );
+
+ // Use existing branch with force flag
+ let output = Command::new("git")
+ .args(["worktree", "add", "-f"])
+ .arg(&worktree_path)
+ .arg(&branch_name)
+ .current_dir(&source_repo)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(WorktreeError::GitCommand(format!(
+ "Failed to create worktree from existing branch: {}",
+ stderr
+ )));
+ }
+ } else {
+ tracing::info!(
+ task_id = %task_id,
+ source_worktree = %source_worktree.display(),
+ worktree_path = %worktree_path.display(),
+ branch = %branch_name,
+ source_commit = %source_commit,
+ "Creating worktree from source task with new branch"
+ );
+
+ // Create a new worktree based on the source commit
+ let output = Command::new("git")
+ .args([
+ "worktree",
+ "add",
+ "-b",
+ &branch_name,
+ ])
+ .arg(&worktree_path)
+ .arg(&source_commit)
+ .current_dir(&source_repo)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(WorktreeError::GitCommand(format!(
+ "Failed to create worktree: {}",
+ stderr
+ )));
+ }
}
// Now copy uncommitted changes from source worktree
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs
index 2d90a04..3da6fd5 100644
--- a/makima/src/server/handlers/mesh.rs
+++ b/makima/src/server/handlers/mesh.rs
@@ -52,7 +52,7 @@ pub fn extract_auth(state: &SharedState, headers: &HeaderMap) -> AuthSource {
if let Some(task_id) = state.validate_tool_key(key_str) {
return AuthSource::ToolKey(task_id);
}
- tracing::warn!("Invalid tool key provided");
+ tracing::warn!("Invalid tool key provided: {}", key_str);
}
}
@@ -1774,3 +1774,688 @@ pub async fn check_target_exists(
}))
.into_response()
}
+
+// =============================================================================
+// Task Reassignment (Daemon Failover)
+// =============================================================================
+
+/// Request to reassign a task to a new daemon after daemon disconnect.
+#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ReassignTaskRequest {
+ /// Target daemon ID to reassign to. If not provided, will select any available daemon.
+ pub target_daemon_id: Option<Uuid>,
+ /// Whether to include conversation context from previous run.
+ #[serde(default = "default_include_context")]
+ pub include_context: bool,
+}
+
+fn default_include_context() -> bool {
+ true
+}
+
+/// Response from task reassignment.
+#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ReassignTaskResponse {
+ /// The new task that was created.
+ pub task: Task,
+ /// The new daemon ID.
+ pub daemon_id: Uuid,
+ /// The ID of the old task that was deleted.
+ pub old_task_id: Uuid,
+ /// Whether conversation context was included.
+ pub context_included: bool,
+ /// Number of context entries from previous conversation.
+ pub context_entries: usize,
+}
+
+/// Build a conversation context summary from task output entries.
+/// Returns a formatted string that can be prepended to the task plan.
+fn build_conversation_context(entries: &[TaskOutputEntry]) -> String {
+ if entries.is_empty() {
+ return String::new();
+ }
+
+ let mut context = String::from("\n\n=== PREVIOUS CONVERSATION CONTEXT ===\n");
+ context.push_str("The daemon running this task disconnected. Here is what happened so far:\n\n");
+
+ for entry in entries.iter() {
+ match entry.message_type.as_str() {
+ "assistant" => {
+ context.push_str("Assistant: ");
+ // Truncate long messages
+ let content = if entry.content.len() > 500 {
+ format!("{}... [truncated]", &entry.content[..500])
+ } else {
+ entry.content.clone()
+ };
+ context.push_str(&content);
+ context.push_str("\n\n");
+ }
+ "tool_use" => {
+ if let Some(ref tool_name) = entry.tool_name {
+ context.push_str(&format!("[Used tool: {}]\n", tool_name));
+ }
+ }
+ "tool_result" => {
+ // Summarize tool results briefly
+ if entry.content.len() > 200 {
+ context.push_str(&format!("[Tool result: {}... truncated]\n", &entry.content[..200]));
+ } else if !entry.content.is_empty() {
+ context.push_str(&format!("[Tool result: {}]\n", entry.content));
+ }
+ }
+ "user" => {
+ context.push_str("User: ");
+ context.push_str(&entry.content);
+ context.push_str("\n\n");
+ }
+ _ => {}
+ }
+ }
+
+ context.push_str("=== END PREVIOUS CONTEXT ===\n\n");
+ context.push_str("Please continue from where the conversation left off. Do not repeat work that was already done.\n\n");
+
+ context
+}
+
+/// Reassign a task to a new daemon after the original daemon disconnected.
+///
+/// This endpoint is used for daemon failover - when a daemon restarts or disconnects,
+/// the task can be reassigned to a new daemon with the conversation context preserved.
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks/{id}/reassign",
+ params(
+ ("id" = Uuid, Path, description = "Task ID")
+ ),
+ request_body = ReassignTaskRequest,
+ responses(
+ (status = 200, description = "Task reassigned successfully", body = ReassignTaskResponse),
+ (status = 400, description = "Task cannot be reassigned (not in failed/interrupted state)", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task not found", body = ApiError),
+ (status = 503, description = "No daemon available", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn reassign_task(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+ Json(body): Json<ReassignTaskRequest>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get the task
+ let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Check if task is in a state that can be reassigned
+ // Allow reassignment for: failed, interrupted, pending, or tasks whose daemon disconnected
+ // Helper closure to check if a daemon is connected by its UUID
+ let is_daemon_connected = |daemon_id: Uuid| {
+ state.daemon_connections.iter().any(|d| d.value().id == daemon_id)
+ };
+
+ let can_reassign = matches!(
+ task.status.as_str(),
+ "failed" | "interrupted" | "pending" | "starting"
+ ) || {
+ // Also allow if daemon is not connected
+ if let Some(daemon_id) = task.daemon_id {
+ !is_daemon_connected(daemon_id)
+ } else {
+ true
+ }
+ };
+
+ if !can_reassign && task.status == "running" {
+ // Running task - check if its daemon is still connected
+ if let Some(daemon_id) = task.daemon_id {
+ if is_daemon_connected(daemon_id) {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "TASK_RUNNING",
+ "Task is running on a connected daemon. Stop it first to reassign.",
+ )),
+ )
+ .into_response();
+ }
+ }
+ }
+
+ // Find a target daemon
+ let target_daemon_id = if let Some(requested_daemon_id) = body.target_daemon_id {
+ // Verify the requested daemon is connected and belongs to the owner
+ let daemon = state.daemon_connections.iter().find(|d| d.value().id == requested_daemon_id);
+ if let Some(daemon) = daemon {
+ if daemon.owner_id != auth.owner_id {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new("INVALID_DAEMON", "Daemon does not belong to your account")),
+ )
+ .into_response();
+ }
+ requested_daemon_id
+ } else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DAEMON_NOT_CONNECTED", "Requested daemon is not connected")),
+ )
+ .into_response();
+ }
+ } else {
+ // Find any available daemon for this owner
+ match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) {
+ Some(entry) => entry.value().id,
+ None => {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")),
+ )
+ .into_response();
+ }
+ }
+ };
+
+ // Build conversation context if requested
+ let (context_str, context_entries) = if body.include_context {
+ match repository::get_task_output(pool, id, Some(500)).await {
+ Ok(events) => {
+ let entries: Vec<TaskOutputEntry> = events
+ .into_iter()
+ .filter_map(TaskOutputEntry::from_task_event)
+ .collect();
+ let context = build_conversation_context(&entries);
+ let count = entries.len();
+ (context, count)
+ }
+ Err(e) => {
+ tracing::warn!("Failed to get task output for context: {}", e);
+ (String::new(), 0)
+ }
+ }
+ } else {
+ (String::new(), 0)
+ };
+
+ // Build updated plan with context prepended
+ let updated_plan = if !context_str.is_empty() {
+ format!("{}{}", context_str, task.plan)
+ } else {
+ task.plan.clone()
+ };
+
+ // Create a NEW task with the conversation context
+ let create_req = CreateTaskRequest {
+ contract_id: task.contract_id.unwrap_or(Uuid::nil()),
+ name: format!("{} (resumed)", task.name),
+ description: task.description.clone(),
+ plan: updated_plan.clone(),
+ parent_task_id: task.parent_task_id,
+ is_supervisor: task.is_supervisor,
+ priority: task.priority,
+ repository_url: task.repository_url.clone(),
+ base_branch: task.base_branch.clone(),
+ target_branch: task.target_branch.clone(),
+ merge_mode: task.merge_mode.clone(),
+ target_repo_path: task.target_repo_path.clone(),
+ completion_action: task.completion_action.clone(),
+ continue_from_task_id: Some(id), // Continue from the old task's worktree if possible
+ copy_files: None,
+ checkpoint_sha: task.last_checkpoint_sha.clone(),
+ };
+
+ let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
+ Ok(t) => t,
+ Err(e) => {
+ tracing::error!("Failed to create new task for reassignment: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Update new task to starting and assign daemon
+ let start_update = UpdateTaskRequest {
+ status: Some("starting".to_string()),
+ daemon_id: Some(target_daemon_id),
+ version: Some(new_task.version),
+ ..Default::default()
+ };
+
+ let final_task = match repository::update_task_for_owner(pool, new_task.id, auth.owner_id, start_update).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "New task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to update new task daemon assignment: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Send SpawnTask command to daemon for the new task
+ let command = DaemonCommand::SpawnTask {
+ task_id: new_task.id,
+ task_name: final_task.name.clone(),
+ plan: updated_plan,
+ repo_url: task.repository_url.clone(),
+ base_branch: task.base_branch.clone(),
+ target_branch: task.target_branch.clone(),
+ parent_task_id: task.parent_task_id,
+ depth: task.depth,
+ is_orchestrator: false, // New task starts fresh
+ target_repo_path: task.target_repo_path.clone(),
+ completion_action: task.completion_action.clone(),
+ continue_from_task_id: Some(id), // Continue from old task's worktree
+ copy_files: None,
+ contract_id: task.contract_id,
+ is_supervisor: task.is_supervisor,
+ };
+
+ tracing::info!(
+ old_task_id = %id,
+ new_task_id = %new_task.id,
+ new_daemon_id = %target_daemon_id,
+ context_entries = context_entries,
+ "Reassigning task: creating new task and deleting old one"
+ );
+
+ if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
+ tracing::error!("Failed to send SpawnTask command for reassignment: {}", e);
+ // Rollback: delete the new task we created
+ let _ = repository::delete_task_for_owner(pool, new_task.id, auth.owner_id).await;
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DAEMON_ERROR", e)),
+ )
+ .into_response();
+ }
+
+ // Delete the old task now that the new one is spawned
+ let old_task_id = id;
+ if let Err(e) = repository::delete_task_for_owner(pool, old_task_id, auth.owner_id).await {
+ tracing::warn!("Failed to delete old task {}: {}", old_task_id, e);
+ // Don't fail the request, the new task is already running
+ }
+
+ // Notify the contract's supervisor about the reassignment (if applicable)
+ if let Some(contract_id) = task.contract_id {
+ if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
+ if let Some(supervisor_task_id) = contract.supervisor_task_id {
+ // Don't notify if we're reassigning the supervisor itself
+ if supervisor_task_id != old_task_id {
+ // Find the supervisor's daemon and send a message
+ if let Ok(Some(supervisor_task)) = repository::get_task_for_owner(pool, supervisor_task_id, auth.owner_id).await {
+ if supervisor_task.status == "running" {
+ if let Some(supervisor_daemon_id) = supervisor_task.daemon_id {
+ // Find the daemon by its UUID
+ if let Some(daemon_entry) = state.daemon_connections.iter().find(|d| d.value().id == supervisor_daemon_id) {
+ let notification_msg = format!(
+ "\n\n[SYSTEM NOTIFICATION] Task '{}' (ID: {}) was reassigned due to daemon disconnect. \
+ A new task '{}' (ID: {}) has been created to continue the work. \
+ The new task has {} context entries from the previous conversation.\n\n",
+ task.name,
+ old_task_id,
+ final_task.name,
+ new_task.id,
+ context_entries
+ );
+
+ let notify_cmd = DaemonCommand::SendMessage {
+ task_id: supervisor_task_id,
+ message: notification_msg,
+ };
+
+ if let Err(e) = state.send_daemon_command(daemon_entry.value().id, notify_cmd).await {
+ tracing::warn!(
+ supervisor_id = %supervisor_task_id,
+ error = %e,
+ "Failed to notify supervisor about task reassignment"
+ );
+ } else {
+ tracing::info!(
+ supervisor_id = %supervisor_task_id,
+ old_task_id = %old_task_id,
+ new_task_id = %new_task.id,
+ "Notified supervisor about task reassignment"
+ );
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ // Broadcast task update for the new task
+ state.broadcast_task_update(TaskUpdateNotification {
+ task_id: new_task.id,
+ owner_id: Some(auth.owner_id),
+ version: final_task.version,
+ status: "starting".to_string(),
+ updated_fields: vec!["status".to_string(), "daemon_id".to_string()],
+ updated_by: "reassignment".to_string(),
+ });
+
+ Json(ReassignTaskResponse {
+ task: final_task,
+ daemon_id: target_daemon_id,
+ old_task_id,
+ context_included: !context_str.is_empty(),
+ context_entries,
+ })
+ .into_response()
+}
+
+// =============================================================================
+// Task Continue (Restart with Context)
+// =============================================================================
+
+/// Request to continue a task after daemon disconnect (restart in-place with context).
+#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ContinueTaskRequest {
+ /// Target daemon ID to continue on. If not provided, will select any available daemon.
+ pub target_daemon_id: Option<Uuid>,
+}
+
+/// Response from continuing a task.
+#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ContinueTaskResponse {
+ /// The continued task (same ID, updated plan with context).
+ pub task: Task,
+ /// The daemon ID running the task.
+ pub daemon_id: Uuid,
+ /// Number of context entries from previous conversation.
+ pub context_entries: usize,
+}
+
+/// Continue a task after daemon disconnect by restarting it with conversation context.
+///
+/// Unlike reassign, this keeps the same task ID and just restarts it with the
+/// previous conversation context prepended to the plan. Useful for supervisors.
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks/{id}/continue",
+ params(
+ ("id" = Uuid, Path, description = "Task ID")
+ ),
+ request_body = ContinueTaskRequest,
+ responses(
+ (status = 200, description = "Task continued successfully", body = ContinueTaskResponse),
+ (status = 400, description = "Task cannot be continued", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task not found", body = ApiError),
+ (status = 503, description = "No daemon available", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn continue_task(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+ Json(body): Json<ContinueTaskRequest>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get the task
+ let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Helper closure to check if a daemon is connected by its UUID
+ let is_daemon_connected = |daemon_id: Uuid| {
+ state.daemon_connections.iter().any(|d| d.value().id == daemon_id)
+ };
+
+ // Check if task can be continued (not currently running on a connected daemon)
+ let can_continue = matches!(
+ task.status.as_str(),
+ "failed" | "interrupted" | "pending" | "starting" | "completed"
+ ) || {
+ if let Some(daemon_id) = task.daemon_id {
+ !is_daemon_connected(daemon_id)
+ } else {
+ true
+ }
+ };
+
+ if !can_continue && task.status == "running" {
+ if let Some(daemon_id) = task.daemon_id {
+ if is_daemon_connected(daemon_id) {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "TASK_RUNNING",
+ "Task is running on a connected daemon. Stop it first to continue.",
+ )),
+ )
+ .into_response();
+ }
+ }
+ }
+
+ // Find a target daemon
+ let target_daemon_id = if let Some(requested_daemon_id) = body.target_daemon_id {
+ let daemon = state.daemon_connections.iter().find(|d| d.value().id == requested_daemon_id);
+ if let Some(daemon) = daemon {
+ if daemon.owner_id != auth.owner_id {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new("INVALID_DAEMON", "Daemon does not belong to your account")),
+ )
+ .into_response();
+ }
+ requested_daemon_id
+ } else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DAEMON_NOT_CONNECTED", "Requested daemon is not connected")),
+ )
+ .into_response();
+ }
+ } else {
+ match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) {
+ Some(entry) => entry.value().id,
+ None => {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")),
+ )
+ .into_response();
+ }
+ }
+ };
+
+ // Build conversation context from task output
+ let (context_str, context_entries) = match repository::get_task_output(pool, id, Some(500)).await {
+ Ok(events) => {
+ let entries: Vec<TaskOutputEntry> = events
+ .into_iter()
+ .filter_map(TaskOutputEntry::from_task_event)
+ .collect();
+ let context = build_conversation_context(&entries);
+ let count = entries.len();
+ (context, count)
+ }
+ Err(e) => {
+ tracing::warn!("Failed to get task output for context: {}", e);
+ (String::new(), 0)
+ }
+ };
+
+ // Build updated plan with context prepended
+ let updated_plan = if !context_str.is_empty() {
+ format!("{}{}", context_str, task.plan)
+ } else {
+ task.plan.clone()
+ };
+
+ // Update task in database: reset status, update plan with context, assign daemon
+ let update_req = UpdateTaskRequest {
+ status: Some("starting".to_string()),
+ plan: Some(updated_plan.clone()),
+ daemon_id: Some(target_daemon_id),
+ error_message: None,
+ ..Default::default()
+ };
+
+ let updated_task = match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to update task for continuation: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Check if this is an orchestrator
+ let subtask_count = match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await {
+ Ok(subtasks) => subtasks.len(),
+ Err(_) => 0,
+ };
+ let is_orchestrator = task.depth == 0 && subtask_count > 0;
+
+ // Send SpawnTask command to daemon
+ let command = DaemonCommand::SpawnTask {
+ task_id: id,
+ task_name: task.name.clone(),
+ plan: updated_plan,
+ repo_url: task.repository_url.clone(),
+ base_branch: task.base_branch.clone(),
+ target_branch: task.target_branch.clone(),
+ parent_task_id: task.parent_task_id,
+ depth: task.depth,
+ is_orchestrator,
+ target_repo_path: task.target_repo_path.clone(),
+ completion_action: task.completion_action.clone(),
+ continue_from_task_id: task.continue_from_task_id,
+ copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
+ contract_id: task.contract_id,
+ is_supervisor: task.is_supervisor,
+ };
+
+ tracing::info!(
+ task_id = %id,
+ daemon_id = %target_daemon_id,
+ context_entries = context_entries,
+ is_supervisor = task.is_supervisor,
+ "Continuing task with conversation context"
+ );
+
+ if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
+ tracing::error!("Failed to send SpawnTask command for continuation: {}", e);
+ // Rollback
+ let rollback_req = UpdateTaskRequest {
+ status: Some("failed".to_string()),
+ clear_daemon_id: true,
+ error_message: Some(format!("Continuation failed: {}", e)),
+ ..Default::default()
+ };
+ let _ = repository::update_task_for_owner(pool, id, auth.owner_id, rollback_req).await;
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DAEMON_ERROR", e)),
+ )
+ .into_response();
+ }
+
+ // Broadcast task update
+ state.broadcast_task_update(TaskUpdateNotification {
+ task_id: id,
+ owner_id: Some(auth.owner_id),
+ version: updated_task.version,
+ status: "starting".to_string(),
+ updated_fields: vec!["status".to_string(), "daemon_id".to_string(), "plan".to_string()],
+ updated_by: "continuation".to_string(),
+ });
+
+ Json(ContinueTaskResponse {
+ task: updated_task,
+ daemon_id: target_daemon_id,
+ context_entries,
+ })
+ .into_response()
+}
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index 178e5e1..39b12da 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -369,6 +369,18 @@ pub enum DaemonMessage {
/// Error message if operation failed
error: Option<String>,
},
+ /// Notification that a branch was created
+ BranchCreated {
+ #[serde(rename = "taskId")]
+ task_id: Option<Uuid>,
+ /// Name of the branch that was created
+ #[serde(rename = "branchName")]
+ branch_name: String,
+ /// Whether the operation succeeded
+ success: bool,
+ /// Error message if operation failed
+ error: Option<String>,
+ },
}
/// Validated daemon authentication result.
@@ -1073,6 +1085,36 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
);
}
}
+ Ok(DaemonMessage::BranchCreated { task_id, branch_name, success, error }) => {
+ tracing::info!(
+ task_id = ?task_id,
+ branch_name = %branch_name,
+ success = success,
+ error = ?error,
+ "Branch created notification received"
+ );
+
+ // Broadcast as task output if we have a task_id
+ if let Some(tid) = task_id {
+ let output_text = if success {
+ format!("✓ Branch '{}' created successfully", branch_name)
+ } else {
+ format!("✗ Failed to create branch '{}': {}", branch_name, error.unwrap_or_default())
+ };
+ state.broadcast_task_output(TaskOutputNotification {
+ task_id: tid,
+ owner_id: Some(owner_id),
+ message_type: "system".to_string(),
+ content: output_text,
+ tool_name: None,
+ tool_input: None,
+ is_error: Some(!success),
+ cost_usd: None,
+ duration_ms: None,
+ is_partial: false,
+ });
+ }
+ }
Err(e) => {
tracing::warn!("Failed to parse daemon message: {}", e);
}
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index ac59130..d0fa4d1 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -15,6 +15,7 @@ use uuid::Uuid;
use crate::db::models::{CreateTaskRequest, Task, TaskSummary};
use crate::db::repository;
+use crate::server::auth::Authenticated;
use crate::server::handlers::mesh::{extract_auth, AuthSource};
use crate::server::messages::ApiError;
use crate::server::state::{DaemonCommand, SharedState};
@@ -32,7 +33,7 @@ pub struct SpawnTaskRequest {
pub contract_id: Uuid,
pub parent_task_id: Option<Uuid>,
pub checkpoint_sha: Option<String>,
- /// Repository URL for the task (supervisor should provide this)
+ /// Repository URL for the task (optional - if not provided, will be looked up from contract).
pub repository_url: Option<String>,
}
@@ -55,6 +56,67 @@ pub struct ReadWorktreeFileRequest {
pub file_path: String,
}
+/// Request to ask a question and wait for user feedback.
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct AskQuestionRequest {
+ /// The question to ask the user
+ pub question: String,
+ /// Optional choices (if empty, free-form text response)
+ #[serde(default)]
+ pub choices: Vec<String>,
+ /// Optional context about what this relates to
+ pub context: Option<String>,
+ /// How long to wait for a response (seconds)
+ #[serde(default = "default_question_timeout")]
+ pub timeout_seconds: i32,
+}
+
+fn default_question_timeout() -> i32 {
+ 3600 // 1 hour default
+}
+
+/// Response from asking a question.
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct AskQuestionResponse {
+ /// The question ID for tracking
+ pub question_id: Uuid,
+ /// The user's response (None if timed out)
+ pub response: Option<String>,
+ /// Whether the question timed out
+ pub timed_out: bool,
+}
+
+/// Request to answer a supervisor question.
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct AnswerQuestionRequest {
+ /// The user's response
+ pub response: String,
+}
+
+/// Response to answering a question.
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct AnswerQuestionResponse {
+ /// Whether the answer was accepted
+ pub success: bool,
+}
+
+/// Pending question summary.
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct PendingQuestionSummary {
+ pub question_id: Uuid,
+ pub task_id: Uuid,
+ pub contract_id: Uuid,
+ pub question: String,
+ pub choices: Vec<String>,
+ pub context: Option<String>,
+ pub created_at: chrono::DateTime<chrono::Utc>,
+}
+
/// Request to create a checkpoint.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
@@ -321,23 +383,49 @@ pub async fn spawn_task(
}
};
- // Get repository URL from the contract's primary repository
- let repo_url = match repository::list_contract_repositories(pool, request.contract_id).await {
- Ok(repos) => {
- // Prefer primary repo, fallback to first repo
- repos.iter()
- .find(|r| r.is_primary)
- .or(repos.first())
- .and_then(|r| r.repository_url.clone())
- }
- Err(e) => {
- tracing::warn!(error = %e, "Failed to get contract repositories, continuing without repo URL");
+ // Get repository URL - either from request or from contract's repositories
+ let repo_url = if let Some(url) = request.repository_url.clone() {
+ if !url.trim().is_empty() {
+ Some(url)
+ } else {
None
}
+ } else {
+ None
};
- // Supervisor can override with explicit repository_url
- let repo_url = request.repository_url.clone().or(repo_url);
+ // If no repo URL provided, look it up from the contract
+ let repo_url = match repo_url {
+ Some(url) => Some(url),
+ None => {
+ match repository::list_contract_repositories(pool, request.contract_id).await {
+ Ok(repos) => {
+ // Prefer primary repo, fallback to first repo
+ let repo = repos.iter()
+ .find(|r| r.is_primary)
+ .or(repos.first());
+
+ // Use repository_url if set, otherwise use local_path
+ repo.and_then(|r| {
+ r.repository_url.clone()
+ .or_else(|| r.local_path.clone())
+ })
+ }
+ Err(e) => {
+ tracing::warn!(error = %e, "Failed to get contract repositories");
+ None
+ }
+ }
+ }
+ };
+
+ // Validate that we have a repo URL
+ if repo_url.is_none() {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new("MISSING_REPO_URL", "No repository URL found. Either provide one or ensure the contract has repositories configured.")),
+ ).into_response();
+ }
// Create task request
let create_req = CreateTaskRequest {
@@ -1151,3 +1239,208 @@ pub async fn get_task_diff(
}),
).into_response()
}
+
+// =============================================================================
+// Supervisor Question Handlers
+// =============================================================================
+
+/// Ask a question and wait for user feedback.
+///
+/// The supervisor calls this to ask a question. The endpoint will poll until
+/// either the user responds or the timeout is reached.
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/supervisor/questions",
+ request_body = AskQuestionRequest,
+ responses(
+ (status = 200, description = "Question answered", body = AskQuestionResponse),
+ (status = 408, description = "Question timed out", body = AskQuestionResponse),
+ (status = 401, description = "Unauthorized"),
+ (status = 403, description = "Forbidden - not a supervisor"),
+ (status = 500, description = "Internal server error"),
+ ),
+ security(
+ ("tool_key" = [])
+ ),
+ tag = "Mesh Supervisor"
+)]
+pub async fn ask_question(
+ State(state): State<SharedState>,
+ headers: HeaderMap,
+ Json(request): Json<AskQuestionRequest>,
+) -> impl IntoResponse {
+ let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
+ Ok(ids) => ids,
+ Err(e) => return e.into_response(),
+ };
+
+ let pool = state.db_pool.as_ref().unwrap();
+
+ // Get the supervisor task to find its contract
+ let supervisor = match repository::get_task_for_owner(pool, supervisor_id, owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Supervisor task not found")),
+ ).into_response();
+ }
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to get supervisor task");
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", "Failed to get supervisor task")),
+ ).into_response();
+ }
+ };
+
+ let Some(contract_id) = supervisor.contract_id else {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new("NO_CONTRACT", "Supervisor has no associated contract")),
+ ).into_response();
+ };
+
+ // Add the question
+ let question_id = state.add_supervisor_question(
+ supervisor_id,
+ contract_id,
+ owner_id,
+ request.question.clone(),
+ request.choices.clone(),
+ request.context.clone(),
+ );
+
+ // Poll for response with timeout
+ let timeout_duration = std::time::Duration::from_secs(request.timeout_seconds.max(1) as u64);
+ let start = std::time::Instant::now();
+ let poll_interval = std::time::Duration::from_millis(500);
+
+ loop {
+ // Check if response has been submitted
+ if let Some(response) = state.get_question_response(question_id) {
+ // Clean up the response
+ state.cleanup_question_response(question_id);
+
+ return (
+ StatusCode::OK,
+ Json(AskQuestionResponse {
+ question_id,
+ response: Some(response.response),
+ timed_out: false,
+ }),
+ ).into_response();
+ }
+
+ // Check timeout
+ if start.elapsed() >= timeout_duration {
+ // Remove the pending question on timeout
+ state.remove_pending_question(question_id);
+
+ return (
+ StatusCode::REQUEST_TIMEOUT,
+ Json(AskQuestionResponse {
+ question_id,
+ response: None,
+ timed_out: true,
+ }),
+ ).into_response();
+ }
+
+ // Wait before polling again
+ tokio::time::sleep(poll_interval).await;
+ }
+}
+
+/// Get all pending questions for the current user.
+#[utoipa::path(
+ get,
+ path = "/api/v1/mesh/questions",
+ responses(
+ (status = 200, description = "List of pending questions", body = Vec<PendingQuestionSummary>),
+ (status = 401, description = "Unauthorized"),
+ (status = 500, description = "Internal server error"),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn list_pending_questions(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+) -> impl IntoResponse {
+ let questions: Vec<PendingQuestionSummary> = state
+ .get_pending_questions_for_owner(auth.owner_id)
+ .into_iter()
+ .map(|q| PendingQuestionSummary {
+ question_id: q.question_id,
+ task_id: q.task_id,
+ contract_id: q.contract_id,
+ question: q.question,
+ choices: q.choices,
+ context: q.context,
+ created_at: q.created_at,
+ })
+ .collect();
+
+ Json(questions).into_response()
+}
+
+/// Answer a pending supervisor question.
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/questions/{question_id}/answer",
+ params(
+ ("question_id" = Uuid, Path, description = "Question ID")
+ ),
+ request_body = AnswerQuestionRequest,
+ responses(
+ (status = 200, description = "Question answered", body = AnswerQuestionResponse),
+ (status = 401, description = "Unauthorized"),
+ (status = 404, description = "Question not found"),
+ (status = 500, description = "Internal server error"),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn answer_question(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(question_id): Path<Uuid>,
+ Json(request): Json<AnswerQuestionRequest>,
+) -> impl IntoResponse {
+ // Verify the question exists and belongs to this owner
+ let question = match state.get_pending_question(question_id) {
+ Some(q) if q.owner_id == auth.owner_id => q,
+ Some(_) => {
+ return (
+ StatusCode::FORBIDDEN,
+ Json(ApiError::new("FORBIDDEN", "Question belongs to another user")),
+ ).into_response();
+ }
+ None => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Question not found or already answered")),
+ ).into_response();
+ }
+ };
+
+ // Submit the response
+ let success = state.submit_question_response(question_id, request.response);
+
+ if success {
+ tracing::info!(
+ question_id = %question_id,
+ task_id = %question.task_id,
+ "User answered supervisor question"
+ );
+ }
+
+ Json(AnswerQuestionResponse { success }).into_response()
+}
diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs
index a4a01e0..27ee06c 100644
--- a/makima/src/server/mod.rs
+++ b/makima/src/server/mod.rs
@@ -82,6 +82,8 @@ pub fn make_router(state: SharedState) -> Router {
.route("/mesh/tasks/{id}/retry-completion", post(mesh::retry_completion_action))
.route("/mesh/tasks/{id}/clone", post(mesh::clone_worktree))
.route("/mesh/tasks/{id}/check-target", post(mesh::check_target_exists))
+ .route("/mesh/tasks/{id}/reassign", post(mesh::reassign_task))
+ .route("/mesh/tasks/{id}/continue", post(mesh::continue_task))
.route("/mesh/chat", post(mesh_chat::mesh_toplevel_chat_handler))
.route(
"/mesh/chat/history",
@@ -114,6 +116,10 @@ pub fn make_router(state: SharedState) -> Router {
.route("/mesh/supervisor/tasks/{task_id}/merge", post(mesh_supervisor::merge_task))
.route("/mesh/supervisor/tasks/{task_id}/diff", get(mesh_supervisor::get_task_diff))
.route("/mesh/supervisor/pr", post(mesh_supervisor::create_pr))
+ // Supervisor question endpoints
+ .route("/mesh/supervisor/questions", post(mesh_supervisor::ask_question))
+ .route("/mesh/questions", get(mesh_supervisor::list_pending_questions))
+ .route("/mesh/questions/{question_id}/answer", post(mesh_supervisor::answer_question))
// Mesh WebSocket endpoints
.route("/mesh/tasks/subscribe", get(mesh_ws::task_subscription_handler))
.route("/mesh/daemons/connect", get(mesh_daemon::daemon_handler))
diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs
index 1c28544..495fc15 100644
--- a/makima/src/server/state.rs
+++ b/makima/src/server/state.rs
@@ -103,6 +103,54 @@ pub struct TaskCompletionNotification {
pub error_message: Option<String>,
}
+/// Notification for supervisor questions requiring user feedback.
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct SupervisorQuestionNotification {
+ /// Unique ID for this question
+ pub question_id: Uuid,
+ /// Supervisor task that asked the question
+ pub task_id: Uuid,
+ /// Contract this question relates to
+ pub contract_id: Uuid,
+ /// Owner ID for data isolation
+ #[serde(skip)]
+ pub owner_id: Option<Uuid>,
+ /// The question text
+ pub question: String,
+ /// Optional choices for the user (if empty, free-form text response)
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ pub choices: Vec<String>,
+ /// Context about what phase/action this relates to
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub context: Option<String>,
+ /// Whether this question is still pending
+ pub pending: bool,
+ /// When the question was asked
+ pub created_at: chrono::DateTime<chrono::Utc>,
+}
+
+/// Stored supervisor question for persistence
+#[derive(Debug, Clone)]
+pub struct PendingSupervisorQuestion {
+ pub question_id: Uuid,
+ pub task_id: Uuid,
+ pub contract_id: Uuid,
+ pub owner_id: Uuid,
+ pub question: String,
+ pub choices: Vec<String>,
+ pub context: Option<String>,
+ pub created_at: chrono::DateTime<chrono::Utc>,
+}
+
+/// Response to a supervisor question
+#[derive(Debug, Clone)]
+pub struct SupervisorQuestionResponse {
+ pub question_id: Uuid,
+ pub response: String,
+ pub responded_at: chrono::DateTime<chrono::Utc>,
+}
+
/// Command sent from server to daemon.
#[derive(Debug, Clone, serde::Serialize)]
#[serde(tag = "type", rename_all = "camelCase")]
@@ -408,6 +456,12 @@ pub struct AppState {
pub task_output: broadcast::Sender<TaskOutputNotification>,
/// Broadcast channel for task completion notifications (for supervisors)
pub task_completions: broadcast::Sender<TaskCompletionNotification>,
+ /// Broadcast channel for supervisor question notifications
+ pub supervisor_questions: broadcast::Sender<SupervisorQuestionNotification>,
+ /// Pending supervisor questions awaiting user response (keyed by question_id)
+ pub pending_questions: DashMap<Uuid, PendingSupervisorQuestion>,
+ /// Responses to supervisor questions (keyed by question_id)
+ pub question_responses: DashMap<Uuid, SupervisorQuestionResponse>,
/// Active daemon connections (keyed by connection_id)
pub daemon_connections: DashMap<String, DaemonConnectionInfo>,
/// Tool keys for orchestrator API access (key -> task_id)
@@ -435,6 +489,7 @@ impl AppState {
let (task_updates, _) = broadcast::channel(256);
let (task_output, _) = broadcast::channel(1024); // Larger buffer for output streaming
let (task_completions, _) = broadcast::channel(256); // For supervisor task monitoring
+ let (supervisor_questions, _) = broadcast::channel(256); // For supervisor questions to users
// Initialize JWT verifier from environment (optional)
// Requires SUPABASE_URL and either SUPABASE_JWT_PUBLIC_KEY (RS256) or SUPABASE_JWT_SECRET (HS256)
@@ -476,6 +531,9 @@ impl AppState {
task_updates,
task_output,
task_completions,
+ supervisor_questions,
+ pending_questions: DashMap::new(),
+ question_responses: DashMap::new(),
daemon_connections: DashMap::new(),
tool_keys: DashMap::new(),
jwt_verifier,
@@ -556,6 +614,133 @@ impl AppState {
let _ = self.task_completions.send(notification);
}
+ /// Broadcast a supervisor question notification to all subscribers.
+ ///
+ /// Used to notify frontend clients when a supervisor needs user feedback.
+ pub fn broadcast_supervisor_question(&self, notification: SupervisorQuestionNotification) {
+ let _ = self.supervisor_questions.send(notification);
+ }
+
+ /// Add a pending supervisor question and broadcast it.
+ pub fn add_supervisor_question(
+ &self,
+ task_id: Uuid,
+ contract_id: Uuid,
+ owner_id: Uuid,
+ question: String,
+ choices: Vec<String>,
+ context: Option<String>,
+ ) -> Uuid {
+ let question_id = Uuid::new_v4();
+ let now = chrono::Utc::now();
+
+ // Store the pending question
+ self.pending_questions.insert(
+ question_id,
+ PendingSupervisorQuestion {
+ question_id,
+ task_id,
+ contract_id,
+ owner_id,
+ question: question.clone(),
+ choices: choices.clone(),
+ context: context.clone(),
+ created_at: now,
+ },
+ );
+
+ // Broadcast to subscribers
+ self.broadcast_supervisor_question(SupervisorQuestionNotification {
+ question_id,
+ task_id,
+ contract_id,
+ owner_id: Some(owner_id),
+ question,
+ choices,
+ context,
+ pending: true,
+ created_at: now,
+ });
+
+ tracing::info!(
+ question_id = %question_id,
+ task_id = %task_id,
+ contract_id = %contract_id,
+ "Supervisor question added"
+ );
+
+ question_id
+ }
+
+ /// Remove a pending question (after it's been answered).
+ pub fn remove_pending_question(&self, question_id: Uuid) -> Option<PendingSupervisorQuestion> {
+ self.pending_questions.remove(&question_id).map(|(_, q)| q)
+ }
+
+ /// Get all pending questions for an owner.
+ pub fn get_pending_questions_for_owner(&self, owner_id: Uuid) -> Vec<PendingSupervisorQuestion> {
+ self.pending_questions
+ .iter()
+ .filter(|entry| entry.value().owner_id == owner_id)
+ .map(|entry| entry.value().clone())
+ .collect()
+ }
+
+ /// Get a specific pending question.
+ pub fn get_pending_question(&self, question_id: Uuid) -> Option<PendingSupervisorQuestion> {
+ self.pending_questions.get(&question_id).map(|entry| entry.value().clone())
+ }
+
+ /// Submit a response to a supervisor question.
+ pub fn submit_question_response(&self, question_id: Uuid, response: String) -> bool {
+ // Check if the question exists
+ if let Some(question) = self.pending_questions.remove(&question_id) {
+ let now = chrono::Utc::now();
+
+ // Store the response
+ self.question_responses.insert(
+ question_id,
+ SupervisorQuestionResponse {
+ question_id,
+ response: response.clone(),
+ responded_at: now,
+ },
+ );
+
+ // Broadcast that the question is no longer pending
+ self.broadcast_supervisor_question(SupervisorQuestionNotification {
+ question_id,
+ task_id: question.1.task_id,
+ contract_id: question.1.contract_id,
+ owner_id: Some(question.1.owner_id),
+ question: question.1.question,
+ choices: question.1.choices,
+ context: question.1.context,
+ pending: false,
+ created_at: question.1.created_at,
+ });
+
+ tracing::info!(
+ question_id = %question_id,
+ "Supervisor question answered"
+ );
+
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Get the response to a question (if answered).
+ pub fn get_question_response(&self, question_id: Uuid) -> Option<SupervisorQuestionResponse> {
+ self.question_responses.get(&question_id).map(|entry| entry.value().clone())
+ }
+
+ /// Clean up a question response after the supervisor has read it.
+ pub fn cleanup_question_response(&self, question_id: Uuid) {
+ self.question_responses.remove(&question_id);
+ }
+
/// Register a new daemon connection.
///
/// Returns the connection_id for later reference.