diff options
| author | soryu <soryu@soryu.co> | 2026-01-15 03:26:28 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-15 03:26:28 +0000 |
| commit | eeafe072bc6bb81459f7d087b48fc921afe9cc11 (patch) | |
| tree | 7f835993edd732f8ff66d756391dedffe3d44e90 | |
| parent | c61a2b9b9c988f5460f85980d4ddf285f1a730b5 (diff) | |
| download | soryu-eeafe072bc6bb81459f7d087b48fc921afe9cc11.tar.gz soryu-eeafe072bc6bb81459f7d087b48fc921afe9cc11.zip | |
Automatically derive repo URL and add notifications for input
20 files changed, 2017 insertions, 91 deletions
diff --git a/makima/frontend/src/components/SupervisorQuestionNotification.tsx b/makima/frontend/src/components/SupervisorQuestionNotification.tsx new file mode 100644 index 0000000..6a71de2 --- /dev/null +++ b/makima/frontend/src/components/SupervisorQuestionNotification.tsx @@ -0,0 +1,135 @@ +import { useState } from "react"; +import { useNavigate } from "react-router"; +import { useSupervisorQuestions } from "../contexts/SupervisorQuestionsContext"; +import type { PendingQuestion } from "../lib/api"; + +export function SupervisorQuestionNotification() { + const navigate = useNavigate(); + const { pendingQuestions, submitAnswer } = useSupervisorQuestions(); + const [expandedQuestion, setExpandedQuestion] = useState<string | null>(null); + const [response, setResponse] = useState(""); + const [submitting, setSubmitting] = useState(false); + + if (pendingQuestions.length === 0) { + return null; + } + + const handleGoToTask = (taskId: string) => { + navigate(`/mesh/${taskId}`); + }; + + const handleExpand = (questionId: string) => { + setExpandedQuestion(expandedQuestion === questionId ? null : questionId); + setResponse(""); + }; + + const handleSubmit = async (question: PendingQuestion) => { + if (!response.trim()) return; + + setSubmitting(true); + const success = await submitAnswer(question.questionId, response.trim()); + setSubmitting(false); + + if (success) { + setExpandedQuestion(null); + setResponse(""); + } + }; + + const handleChoiceSelect = async (question: PendingQuestion, choice: string) => { + setSubmitting(true); + await submitAnswer(question.questionId, choice); + setSubmitting(false); + }; + + return ( + <div className="fixed bottom-4 right-4 z-50 max-w-md space-y-2"> + {pendingQuestions.map((question) => ( + <div + key={question.questionId} + className="bg-[#0d1b2d] border border-amber-500/50 rounded-lg shadow-lg overflow-hidden" + > + {/* Header */} + <div className="flex items-center justify-between px-4 py-3 bg-amber-900/30"> + <div className="flex items-center gap-2"> + <span className="text-amber-400 text-lg">?</span> + <span className="font-mono text-sm text-amber-300 uppercase"> + Supervisor Question + </span> + </div> + <div className="flex items-center gap-2"> + <button + onClick={() => handleGoToTask(question.taskId)} + className="px-2 py-1 font-mono text-xs text-amber-400 hover:text-amber-300 transition-colors" + title="Go to task" + > + View Task + </button> + <button + onClick={() => handleExpand(question.questionId)} + className="px-2 py-1 font-mono text-xs text-amber-400 border border-amber-500/30 hover:border-amber-400/50 transition-colors uppercase" + > + {expandedQuestion === question.questionId ? "Collapse" : "Answer"} + </button> + </div> + </div> + + {/* Question preview */} + <div className="px-4 py-3"> + {question.context && ( + <div className="text-xs text-[#8b949e] font-mono mb-1 uppercase"> + {question.context} + </div> + )} + <p className="text-sm text-[#dbe7ff] font-mono"> + {question.question} + </p> + </div> + + {/* Expanded answer section */} + {expandedQuestion === question.questionId && ( + <div className="px-4 pb-4 border-t border-amber-500/20 pt-3"> + {question.choices.length > 0 ? ( + // Choice buttons + <div className="space-y-2"> + <p className="text-xs text-[#8b949e] font-mono uppercase mb-2"> + Select an option: + </p> + {question.choices.map((choice, idx) => ( + <button + key={idx} + onClick={() => handleChoiceSelect(question, choice)} + disabled={submitting} + className="w-full px-3 py-2 text-left font-mono text-sm text-[#dbe7ff] bg-[#0a1628] border border-[#3f6fb3] hover:border-amber-400/50 hover:bg-amber-900/20 disabled:opacity-50 transition-colors" + > + {choice} + </button> + ))} + </div> + ) : ( + // Free-form text input + <div className="space-y-2"> + <textarea + value={response} + onChange={(e) => setResponse(e.target.value)} + placeholder="Type your response..." + rows={3} + className="w-full px-3 py-2 bg-[#0a1628] border border-[#3f6fb3] text-[#dbe7ff] font-mono text-sm focus:outline-none focus:border-amber-400 resize-none" + disabled={submitting} + /> + <button + onClick={() => handleSubmit(question)} + disabled={submitting || !response.trim()} + className="w-full px-4 py-2 font-mono text-xs text-[#0a1628] bg-amber-500 hover:bg-amber-400 disabled:bg-amber-700 disabled:cursor-not-allowed transition-colors uppercase" + > + {submitting ? "Submitting..." : "Submit Response"} + </button> + </div> + )} + </div> + )} + </div> + ))} + </div> + ); +} diff --git a/makima/frontend/src/components/listen/ContractPickerModal.tsx b/makima/frontend/src/components/listen/ContractPickerModal.tsx new file mode 100644 index 0000000..961ccba --- /dev/null +++ b/makima/frontend/src/components/listen/ContractPickerModal.tsx @@ -0,0 +1,118 @@ +import { useEffect, useRef } from "react"; +import type { ContractOption } from "./ControlPanel"; + +interface ContractPickerModalProps { + isOpen: boolean; + onClose: () => void; + contracts: ContractOption[]; + selectedContractId: string | null; + onSelect: (contractId: string | null) => void; + loading?: boolean; +} + +export function ContractPickerModal({ + isOpen, + onClose, + contracts, + selectedContractId, + onSelect, + loading, +}: ContractPickerModalProps) { + const modalRef = useRef<HTMLDivElement>(null); + + useEffect(() => { + if (!isOpen) return; + + function handleKeyDown(e: KeyboardEvent) { + if (e.key === "Escape") { + onClose(); + } + } + + function handleClickOutside(e: MouseEvent) { + if (modalRef.current && !modalRef.current.contains(e.target as Node)) { + onClose(); + } + } + + document.addEventListener("keydown", handleKeyDown); + document.addEventListener("mousedown", handleClickOutside); + + return () => { + document.removeEventListener("keydown", handleKeyDown); + document.removeEventListener("mousedown", handleClickOutside); + }; + }, [isOpen, onClose]); + + if (!isOpen) return null; + + const handleSelect = (contractId: string | null) => { + onSelect(contractId); + onClose(); + }; + + return ( + <div className="fixed inset-0 z-50 flex items-center justify-center bg-black/60"> + <div + ref={modalRef} + className="panel p-4 w-[300px] max-h-[400px] flex flex-col gap-3" + > + <div className="flex items-center justify-between"> + <h2 className="font-mono text-sm text-[#dbe7ff] uppercase tracking-wide"> + Select Contract + </h2> + <button + onClick={onClose} + className="font-mono text-xs text-[#9bc3ff] hover:text-[#dbe7ff] transition-colors" + > + [X] + </button> + </div> + + <div className="flex-1 overflow-y-auto flex flex-col gap-1 min-h-0"> + {loading ? ( + <div className="font-mono text-xs text-[#9bc3ff] text-center py-4"> + Loading... + </div> + ) : ( + <> + <button + onClick={() => handleSelect(null)} + className={`w-full text-left px-3 py-2 font-mono text-xs border transition-colors ${ + selectedContractId === null + ? "bg-[#0f3c78]/50 border-[#3f6fb3] text-[#dbe7ff]" + : "bg-[#0d1b2d] border-[#0f3c78] text-[#9bc3ff] hover:border-[#3f6fb3] hover:text-[#dbe7ff]" + }`} + > + <span className="uppercase tracking-wide">Ephemeral</span> + <span className="block text-[10px] text-[#75aafc] mt-0.5"> + Transcript not saved + </span> + </button> + + {contracts.map((contract) => ( + <button + key={contract.id} + onClick={() => handleSelect(contract.id)} + className={`w-full text-left px-3 py-2 font-mono text-xs border transition-colors ${ + selectedContractId === contract.id + ? "bg-[#0f3c78]/50 border-[#3f6fb3] text-[#dbe7ff]" + : "bg-[#0d1b2d] border-[#0f3c78] text-[#9bc3ff] hover:border-[#3f6fb3] hover:text-[#dbe7ff]" + }`} + > + <span className="block truncate">{contract.name}</span> + </button> + ))} + + {contracts.length === 0 && ( + <div className="font-mono text-xs text-[#9bc3ff] text-center py-4"> + No contracts available + </div> + )} + </> + )} + </div> + </div> + </div> + ); +} diff --git a/makima/frontend/src/components/listen/ControlPanel.tsx b/makima/frontend/src/components/listen/ControlPanel.tsx index 35834d4..f0e5702 100644 --- a/makima/frontend/src/components/listen/ControlPanel.tsx +++ b/makima/frontend/src/components/listen/ControlPanel.tsx @@ -1,5 +1,7 @@ +import { useState } from "react"; import { Logo } from "../Logo"; import type { MicrophoneStatus } from "../../hooks/useMicrophone"; +import { ContractPickerModal } from "./ContractPickerModal"; export interface ContractOption { id: string; @@ -53,9 +55,12 @@ export function ControlPanel({ onContractChange, contractsLoading, }: ControlPanelProps) { + const [isModalOpen, setIsModalOpen] = useState(false); const statusText = getStatusText(isListening, micStatus); const isRequesting = micStatus === "requesting"; + const selectedContract = contracts.find((c) => c.id === selectedContractId); + return ( <div className="panel p-4 flex flex-col items-center justify-center gap-3"> {/* Logo button */} @@ -147,21 +152,24 @@ export function ControlPanel({ > New </button> - <select - value={selectedContractId || ""} - onChange={(e) => onContractChange(e.target.value || null)} + <button + onClick={() => setIsModalOpen(true)} disabled={isListening || contractsLoading} - className="px-3 py-1.5 font-mono text-xs text-[#dbe7ff] bg-[#0d1b2d] border border-[#0f3c78] focus:border-[#3f6fb3] transition-colors disabled:opacity-50 disabled:cursor-not-allowed uppercase tracking-wide" - title={selectedContractId ? "Saving to selected contract" : "Transcript not saved"} + className="px-3 py-1.5 font-mono text-xs text-[#dbe7ff] bg-[#0d1b2d] border border-[#0f3c78] hover:border-[#3f6fb3] transition-colors disabled:opacity-50 disabled:cursor-not-allowed uppercase tracking-wide" + title={selectedContract ? `Saving to: ${selectedContract.name}` : "Transcript not saved"} > - <option value="">Ephemeral Transcript</option> - {contracts.map((contract) => ( - <option key={contract.id} value={contract.id}> - {contract.name} - </option> - ))} - </select> + {selectedContract ? "Contract" : "Ephemeral"} + </button> </div> + + <ContractPickerModal + isOpen={isModalOpen} + onClose={() => setIsModalOpen(false)} + contracts={contracts} + selectedContractId={selectedContractId} + onSelect={onContractChange} + loading={contractsLoading} + /> </div> ); } diff --git a/makima/frontend/src/components/mesh/TaskDetail.tsx b/makima/frontend/src/components/mesh/TaskDetail.tsx index 967b1d1..8e853e7 100644 --- a/makima/frontend/src/components/mesh/TaskDetail.tsx +++ b/makima/frontend/src/components/mesh/TaskDetail.tsx @@ -144,6 +144,10 @@ export function TaskDetail({ const isTaskRunning = task.status === "running" || task.status === "initializing" || task.status === "starting"; // Check if task is in a terminal state (can be continued/reopened) const isTaskTerminal = task.status === "done" || task.status === "failed" || task.status === "merged"; + // Check if this is a supervisor task + const isSupervisor = task.isSupervisor === true; + // Show continue for supervisors (always) or terminal states for other tasks + const canContinue = isSupervisor || isTaskTerminal; // Calculate subtask statistics const subtaskStats = useMemo( @@ -356,7 +360,7 @@ export function TaskDetail({ )} </div> )} - {isTaskTerminal && ( + {canContinue && ( <button onClick={() => onContinue(task.id)} className="px-3 py-1 font-mono text-xs text-cyan-400 border border-cyan-400/30 hover:border-cyan-400/50 hover:bg-cyan-400/10 transition-colors uppercase flex items-center gap-1" diff --git a/makima/frontend/src/contexts/SupervisorQuestionsContext.tsx b/makima/frontend/src/contexts/SupervisorQuestionsContext.tsx new file mode 100644 index 0000000..aa1bb12 --- /dev/null +++ b/makima/frontend/src/contexts/SupervisorQuestionsContext.tsx @@ -0,0 +1,94 @@ +import { createContext, useContext, useState, useEffect, useCallback, ReactNode } from "react"; +import { listPendingQuestions, answerQuestion, type PendingQuestion } from "../lib/api"; +import { useAuth } from "./AuthContext"; + +interface SupervisorQuestionsContextValue { + pendingQuestions: PendingQuestion[]; + loading: boolean; + error: string | null; + refreshQuestions: () => Promise<void>; + submitAnswer: (questionId: string, response: string) => Promise<boolean>; +} + +const SupervisorQuestionsContext = createContext<SupervisorQuestionsContextValue | null>(null); + +export function SupervisorQuestionsProvider({ children }: { children: ReactNode }) { + const { isAuthenticated } = useAuth(); + const [pendingQuestions, setPendingQuestions] = useState<PendingQuestion[]>([]); + const [loading, setLoading] = useState(false); + const [error, setError] = useState<string | null>(null); + + const refreshQuestions = useCallback(async () => { + if (!isAuthenticated) return; + + try { + setLoading(true); + const questions = await listPendingQuestions(); + if (questions.length > 0) { + console.log("[SupervisorQuestions] Received questions:", questions); + } + setPendingQuestions(questions); + setError(null); + } catch (err) { + // Log but don't spam + console.warn("[SupervisorQuestions] Failed to fetch:", err); + setError(err instanceof Error ? err.message : "Failed to load questions"); + } finally { + setLoading(false); + } + }, [isAuthenticated]); + + const submitAnswer = useCallback(async (questionId: string, response: string): Promise<boolean> => { + try { + const result = await answerQuestion(questionId, response); + if (result.success) { + // Remove the question from local state + setPendingQuestions(prev => prev.filter(q => q.questionId !== questionId)); + } + return result.success; + } catch (err) { + console.error("Failed to submit answer:", err); + return false; + } + }, []); + + // Poll for questions every 5 seconds when authenticated + useEffect(() => { + if (!isAuthenticated) { + setPendingQuestions([]); + return; + } + + // Initial fetch (delayed slightly to ensure auth is ready) + const initialTimeout = setTimeout(refreshQuestions, 500); + + // Poll periodically (every 5 seconds for responsiveness) + const interval = setInterval(refreshQuestions, 5000); + return () => { + clearTimeout(initialTimeout); + clearInterval(interval); + }; + }, [isAuthenticated, refreshQuestions]); + + return ( + <SupervisorQuestionsContext.Provider + value={{ + pendingQuestions, + loading, + error, + refreshQuestions, + submitAnswer, + }} + > + {children} + </SupervisorQuestionsContext.Provider> + ); +} + +export function useSupervisorQuestions() { + const context = useContext(SupervisorQuestionsContext); + if (!context) { + throw new Error("useSupervisorQuestions must be used within SupervisorQuestionsProvider"); + } + return context; +} diff --git a/makima/frontend/src/lib/api.ts b/makima/frontend/src/lib/api.ts index d7ac8b6..2ea1128 100644 --- a/makima/frontend/src/lib/api.ts +++ b/makima/frontend/src/lib/api.ts @@ -590,6 +590,9 @@ export interface Task { version: number; createdAt: string; updatedAt: string; + + // Supervisor flag + isSupervisor: boolean; } export interface TaskWithSubtasks extends Task { @@ -892,6 +895,75 @@ export async function checkTargetExists( return res.json(); } +// ============================================================================= +// Task Recovery (Daemon Failover) +// ============================================================================= + +/** Request to reassign a task to a new daemon */ +export interface ReassignTaskRequest { + targetDaemonId?: string; + includeContext?: boolean; +} + +/** Response from reassigning a task */ +export interface ReassignTaskResponse { + task: Task; + daemonId: string; + oldTaskId: string; + contextIncluded: boolean; + contextEntries: number; +} + +/** + * Reassign a task to a new daemon after daemon disconnect. + * Creates a new task with conversation context, deletes the old one. + */ +export async function reassignTask( + taskId: string, + options?: ReassignTaskRequest +): Promise<ReassignTaskResponse> { + const res = await authFetch(`${API_BASE}/api/v1/mesh/tasks/${taskId}/reassign`, { + method: "POST", + body: JSON.stringify(options || {}), + }); + if (!res.ok) { + const errorText = await res.text(); + throw new Error(`Failed to reassign task: ${errorText || res.statusText}`); + } + return res.json(); +} + +/** Request to continue a task */ +export interface ContinueTaskRequest { + targetDaemonId?: string; +} + +/** Response from continuing a task */ +export interface ContinueTaskResponse { + task: Task; + daemonId: string; + contextEntries: number; +} + +/** + * Continue a task after daemon disconnect by restarting it with conversation context. + * Unlike reassign, this keeps the same task ID. + */ +export async function continueTask( + taskId: string, + options?: ContinueTaskRequest +): Promise<ContinueTaskResponse> { + const res = await authFetch(`${API_BASE}/api/v1/mesh/tasks/${taskId}/continue`, { + method: "POST", + body: JSON.stringify(options || {}), + }); + if (!res.ok) { + const errorText = await res.text(); + throw new Error(`Failed to continue task: ${errorText || res.statusText}`); + } + return res.json(); +} + export async function listSubtasks(taskId: string): Promise<TaskListResponse> { const res = await authFetch(`${API_BASE}/api/v1/mesh/tasks/${taskId}/subtasks`); if (!res.ok) { @@ -1848,3 +1920,56 @@ export async function getTemplate(id: string): Promise<FileTemplate> { } return res.json(); } + +// ============================================================================= +// Supervisor Question Types and Functions +// ============================================================================= + +export interface PendingQuestion { + questionId: string; + taskId: string; + contractId: string; + question: string; + choices: string[]; + context: string | null; + createdAt: string; +} + +export interface AnswerQuestionRequest { + response: string; +} + +export interface AnswerQuestionResponse { + success: boolean; +} + +/** + * Get all pending supervisor questions for the current user. + */ +export async function listPendingQuestions(): Promise<PendingQuestion[]> { + const res = await authFetch(`${API_BASE}/api/v1/mesh/questions`); + if (!res.ok) { + throw new Error(`Failed to list questions: ${res.statusText}`); + } + return res.json(); +} + +/** + * Answer a pending supervisor question. + */ +export async function answerQuestion( + questionId: string, + response: string +): Promise<AnswerQuestionResponse> { + const res = await authFetch( + `${API_BASE}/api/v1/mesh/questions/${questionId}/answer`, + { + method: "POST", + body: JSON.stringify({ response }), + } + ); + if (!res.ok) { + throw new Error(`Failed to answer question: ${res.statusText}`); + } + return res.json(); +} diff --git a/makima/frontend/src/main.tsx b/makima/frontend/src/main.tsx index 496a569..5d389fc 100644 --- a/makima/frontend/src/main.tsx +++ b/makima/frontend/src/main.tsx @@ -3,7 +3,9 @@ import { createRoot } from "react-dom/client"; import { BrowserRouter, Routes, Route } from "react-router"; import "./index.css"; import { AuthProvider } from "./contexts/AuthContext"; +import { SupervisorQuestionsProvider } from "./contexts/SupervisorQuestionsContext"; import { GridOverlay } from "./components/GridOverlay"; +import { SupervisorQuestionNotification } from "./components/SupervisorQuestionNotification"; import { ProtectedRoute } from "./components/ProtectedRoute"; import HomePage from "./routes/_index"; import ListenPage from "./routes/listen"; @@ -17,9 +19,11 @@ import SettingsPage from "./routes/settings"; createRoot(document.getElementById("root")!).render( <StrictMode> <AuthProvider> - <BrowserRouter> - <GridOverlay /> - <Routes> + <SupervisorQuestionsProvider> + <BrowserRouter> + <GridOverlay /> + <SupervisorQuestionNotification /> + <Routes> <Route path="/" element={<HomePage />} /> <Route path="/login" element={<LoginPage />} /> <Route @@ -94,8 +98,9 @@ createRoot(document.getElementById("root")!).render( </ProtectedRoute> } /> - </Routes> - </BrowserRouter> + </Routes> + </BrowserRouter> + </SupervisorQuestionsProvider> </AuthProvider> </StrictMode> ); diff --git a/makima/frontend/src/routes/mesh.tsx b/makima/frontend/src/routes/mesh.tsx index d067865..ed5a6d0 100644 --- a/makima/frontend/src/routes/mesh.tsx +++ b/makima/frontend/src/routes/mesh.tsx @@ -8,7 +8,7 @@ import { UnifiedMeshChatInput } from "../components/mesh/UnifiedMeshChatInput"; import { useTasks } from "../hooks/useTasks"; import { useTaskSubscription, type TaskUpdateEvent, type TaskOutputEvent } from "../hooks/useTaskSubscription"; import type { TaskWithSubtasks, MeshChatContext, ContractSummary, ContractWithRelations, DaemonDirectory } from "../lib/api"; -import { startTask as startTaskApi, stopTask as stopTaskApi, getTaskOutput, listContracts, getContract, getDaemonDirectories } from "../lib/api"; +import { startTask as startTaskApi, stopTask as stopTaskApi, getTaskOutput, listContracts, getContract, getDaemonDirectories, continueTask as continueTaskApi } from "../lib/api"; import { DirectoryInput } from "../components/mesh/DirectoryInput"; import { useAuth } from "../contexts/AuthContext"; @@ -374,9 +374,10 @@ export default function MeshPage() { const handleContinue = useCallback( async (taskId: string) => { try { - // Start the task again from terminal state - const updated = await startTaskApi(taskId); - setTaskDetail((prev) => prev ? { ...prev, ...updated } : prev); + // Continue the task with conversation context from previous run + const result = await continueTaskApi(taskId); + console.log(`[Mesh] Task continued with ${result.contextEntries} context entries`); + setTaskDetail((prev) => prev ? { ...prev, ...result.task } : prev); } catch (e) { console.error("Failed to continue task:", e); alert(e instanceof Error ? e.message : "Failed to continue task"); diff --git a/makima/frontend/tsconfig.tsbuildinfo b/makima/frontend/tsconfig.tsbuildinfo new file mode 100644 index 0000000..7af14b5 --- /dev/null +++ b/makima/frontend/tsconfig.tsbuildinfo @@ -0,0 +1 @@ +{"root":["./src/main.tsx","./src/vite-env.d.ts","./src/components/gridoverlay.tsx","./src/components/japanesehovertext.tsx","./src/components/logo.tsx","./src/components/masthead.tsx","./src/components/navstrip.tsx","./src/components/protectedroute.tsx","./src/components/rewritelink.tsx","./src/components/simplemarkdown.tsx","./src/components/supervisorquestionnotification.tsx","./src/components/charts/chartrenderer.tsx","./src/components/contracts/contractcliinput.tsx","./src/components/contracts/contractdetail.tsx","./src/components/contracts/contractlist.tsx","./src/components/contracts/phasebadge.tsx","./src/components/contracts/phasedeliverablespanel.tsx","./src/components/contracts/phasehint.tsx","./src/components/contracts/phaseprogressbar.tsx","./src/components/contracts/quickactionbuttons.tsx","./src/components/contracts/repositorypanel.tsx","./src/components/contracts/taskderivationpreview.tsx","./src/components/files/bodyrenderer.tsx","./src/components/files/cliinput.tsx","./src/components/files/conflictnotification.tsx","./src/components/files/elementcontextmenu.tsx","./src/components/files/filedetail.tsx","./src/components/files/filelist.tsx","./src/components/files/reposyncindicator.tsx","./src/components/files/updatenotification.tsx","./src/components/files/versionhistorydropdown.tsx","./src/components/listen/contractpickermodal.tsx","./src/components/listen/controlpanel.tsx","./src/components/listen/speakerpanel.tsx","./src/components/listen/transcriptanalysispanel.tsx","./src/components/listen/transcriptpanel.tsx","./src/components/mesh/directoryinput.tsx","./src/components/mesh/inlinesubtaskeditor.tsx","./src/components/mesh/mergeconflictresolver.tsx","./src/components/mesh/overlaydiffviewer.tsx","./src/components/mesh/prpreview.tsx","./src/components/mesh/subtasktree.tsx","./src/components/mesh/taskdetail.tsx","./src/components/mesh/tasklist.tsx","./src/components/mesh/taskoutput.tsx","./src/components/mesh/tasktree.tsx","./src/components/mesh/unifiedmeshchatinput.tsx","./src/components/workflow/phasecolumn.tsx","./src/components/workflow/workflowboard.tsx","./src/components/workflow/workflowcontractcard.tsx","./src/contexts/authcontext.tsx","./src/contexts/supervisorquestionscontext.tsx","./src/hooks/usecontracts.ts","./src/hooks/usefilesubscription.ts","./src/hooks/usefiles.ts","./src/hooks/usemeshchathistory.ts","./src/hooks/usemicrophone.ts","./src/hooks/usetasksubscription.ts","./src/hooks/usetasks.ts","./src/hooks/usetextscramble.ts","./src/hooks/useversionhistory.ts","./src/hooks/usewebsocket.ts","./src/lib/api.ts","./src/lib/listenapi.ts","./src/lib/markdown.ts","./src/lib/supabase.ts","./src/routes/_index.tsx","./src/routes/contracts.tsx","./src/routes/files.tsx","./src/routes/listen.tsx","./src/routes/login.tsx","./src/routes/mesh.tsx","./src/routes/settings.tsx","./src/routes/workflow.tsx","./src/types/messages.ts"],"version":"5.9.3"}
\ No newline at end of file diff --git a/makima/src/bin/makima.rs b/makima/src/bin/makima.rs index 649a8e7..35783dc 100644 --- a/makima/src/bin/makima.rs +++ b/makima/src/bin/makima.rs @@ -321,6 +321,18 @@ async fn run_supervisor( let result = client.supervisor_status(args.contract_id).await?; println!("{}", serde_json::to_string(&result.0)?); } + SupervisorCommand::Ask(args) => { + let client = ApiClient::new(args.common.api_url, args.common.api_key)?; + eprintln!("Asking user: {}...", args.question); + let choices = args + .choices + .map(|c| c.split(',').map(|s| s.trim().to_string()).collect()) + .unwrap_or_default(); + let result = client + .supervisor_ask(&args.question, choices, args.context, args.timeout) + .await?; + println!("{}", serde_json::to_string(&result.0)?); + } } Ok(()) diff --git a/makima/src/daemon/api/supervisor.rs b/makima/src/daemon/api/supervisor.rs index b691cc4..a1e8682 100644 --- a/makima/src/daemon/api/supervisor.rs +++ b/makima/src/daemon/api/supervisor.rs @@ -62,6 +62,17 @@ pub struct CheckpointRequest { pub message: String, } +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AskQuestionRequest { + pub question: String, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub choices: Vec<String>, + #[serde(skip_serializing_if = "Option::is_none")] + pub context: Option<String>, + pub timeout_seconds: i32, +} + // Generic response type for JSON output #[derive(Deserialize, Serialize)] pub struct JsonValue(pub serde_json::Value); @@ -183,4 +194,21 @@ impl ApiClient { self.get(&format!("/api/v1/contracts/{}/daemon/status", contract_id)) .await } + + /// Ask a question and wait for user feedback. + pub async fn supervisor_ask( + &self, + question: &str, + choices: Vec<String>, + context: Option<String>, + timeout_seconds: i32, + ) -> Result<JsonValue, ApiError> { + let req = AskQuestionRequest { + question: question.to_string(), + choices, + context, + timeout_seconds, + }; + self.post("/api/v1/mesh/supervisor/questions", &req).await + } } diff --git a/makima/src/daemon/cli/mod.rs b/makima/src/daemon/cli/mod.rs index 24c19c6..66c7941 100644 --- a/makima/src/daemon/cli/mod.rs +++ b/makima/src/daemon/cli/mod.rs @@ -76,6 +76,9 @@ pub enum SupervisorCommand { /// Get contract status Status(SupervisorArgs), + + /// Ask a question and wait for user feedback + Ask(supervisor::AskArgs), } /// Contract subcommands for task-contract interaction. diff --git a/makima/src/daemon/cli/supervisor.rs b/makima/src/daemon/cli/supervisor.rs index 00c7ff4..017730d 100644 --- a/makima/src/daemon/cli/supervisor.rs +++ b/makima/src/daemon/cli/supervisor.rs @@ -42,6 +42,10 @@ pub struct SpawnArgs { /// Checkpoint SHA to start from #[arg(long)] pub checkpoint: Option<String>, + + /// Repository URL (local path or remote URL). If not provided, will try to detect from current directory. + #[arg(long)] + pub repo: Option<String>, } /// Arguments for wait command. @@ -144,3 +148,25 @@ pub struct CheckpointArgs { /// Checkpoint message pub message: String, } + +/// Arguments for ask command (ask user a question). +#[derive(Args, Debug)] +pub struct AskArgs { + #[command(flatten)] + pub common: SupervisorArgs, + + /// The question to ask + pub question: String, + + /// Optional choices (comma-separated) + #[arg(long)] + pub choices: Option<String>, + + /// Context about what this relates to + #[arg(long)] + pub context: Option<String>, + + /// Timeout in seconds (default: 3600 = 1 hour) + #[arg(long, default_value = "3600")] + pub timeout: i32, +} diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs index 8269083..3b4ffdd 100644 --- a/makima/src/daemon/task/manager.rs +++ b/makima/src/daemon/task/manager.rs @@ -27,12 +27,27 @@ fn generate_tool_key() -> String { } /// Check if output contains an OAuth authentication error. -fn is_oauth_auth_error(output: &str) -> bool { +/// Only checks system/error messages, not assistant responses (which could mention auth errors conversationally). +fn is_oauth_auth_error(output: &str, json_type: Option<&str>, is_stdout: bool) -> bool { + // Only check system messages or stderr output - not assistant messages + // which could mention auth errors in conversation + match json_type { + Some("assistant") | Some("user") | Some("result") => return false, + _ => {} + } + + // For stdout JSON messages, only check system/error types + if is_stdout && json_type.is_none() { + // Non-JSON stdout output - could be startup messages, check carefully + // Only match very specific patterns that wouldn't appear in conversation + } + // Match various authentication error patterns from Claude Code - if output.contains("Please run /login") { + // These patterns are specific enough that they shouldn't appear in normal conversation + if output.contains("Please run /login") && output.contains("authenticate") { return true; } - if output.contains("Invalid API key") { + if output.contains("Invalid API key") && output.contains("ANTHROPIC_API_KEY") { return true; } if output.contains("authentication_error") @@ -41,6 +56,10 @@ fn is_oauth_auth_error(output: &str) -> bool { { return true; } + // Check for Claude Code's specific error format + if output.contains("\"type\":\"error\"") && output.contains("authentication") { + return true; + } false } @@ -695,6 +714,51 @@ makima supervisor checkpoints [task_id] makima supervisor status ``` +### User Feedback +```bash +# Ask a free-form question +makima supervisor ask "Your question here" + +# Ask with choices (comma-separated) +makima supervisor ask "Choose an option" --choices "Option A,Option B,Option C" + +# Ask with context +makima supervisor ask "Ready to proceed?" --context "After completing task X" + +# Ask with custom timeout (default 1 hour) +makima supervisor ask "Question" --timeout 3600 +``` + +## User Feedback (Ask Command) + +You can ask the user questions when you need clarification or approval: + +```bash +# Ask a free-form question (waits for user to respond) +makima supervisor ask "What authentication method should I use?" + +# Ask with predefined choices +makima supervisor ask "Ready to create PR?" --choices "Yes,No,Need more changes" + +# Ask with context +makima supervisor ask "Should I proceed?" --context "Plan phase complete" +``` + +The ask command will block until the user responds (or timeout). Use this to: +- Clarify requirements before starting work +- Get approval before creating PRs +- Ask for guidance when tasks fail + +## Contract Phase Progression + +### For "Simple" contracts (Plan → Execute): +1. **Plan Phase**: Review the plan document and understand the goal +2. **Execute Phase**: Spawn tasks to implement the plan, then create PR +3. Mark contract as complete when PR is created + +### For "Specification" contracts (Research → Specify → Plan → Execute → Review): +Progress through each phase, spawning tasks as needed and asking for user feedback. + ## Key Points 1. **Create a makima branch first** - use `branch "makima/{name}"` for the contract's work @@ -703,6 +767,7 @@ makima supervisor status 4. **Never fire-and-forget** - always wait for each task before moving on 5. **Merge to your makima branch** - use `merge <task_id> --to "makima/{name}"` to collect completed work 6. **Create PR when done** - use `pr "makima/{name}" --title "..." --base main` +7. **Ask when unsure** - use `ask` to get user feedback on decisions ## Standard Workflow @@ -711,16 +776,19 @@ makima supervisor status - `spawn` - Create task - `wait` - Block until complete - `merge --to "makima/{name}"` - Merge to branch -3. `pr "makima/{name}" --title "..." --base main` - Create PR +3. `ask "Ready to create PR?"` - Get user approval +4. `pr "makima/{name}" --title "..." --base main` - Create PR ## Important Reminders - **ONLY YOU can spawn tasks** - regular tasks cannot create children - **NEVER implement anything yourself** - always spawn tasks - **ALWAYS create a makima branch** - use `makima/{name}` naming convention +- **ASK for feedback** when you need clarification or approval - Tasks run independently - you just coordinate - You will be resumed if interrupted - your conversation is preserved - Create checkpoints before major transitions +- **Mark contract complete** when PR is created by updating status --- @@ -2834,6 +2902,8 @@ impl TaskManagerInner { // Check for OAuth auth error before sending output let content_for_auth_check = line.content.clone(); + let json_type_for_auth_check = line.json_type.clone(); + let is_stdout_for_auth_check = line.is_stdout; let msg = DaemonMessage::task_output(task_id, line.content, false); if ws_tx.send(msg).await.is_err() { @@ -2842,7 +2912,7 @@ impl TaskManagerInner { } // Detect OAuth token expiration and trigger remote login flow - if !auth_error_handled && is_oauth_auth_error(&content_for_auth_check) { + if !auth_error_handled && is_oauth_auth_error(&content_for_auth_check, json_type_for_auth_check.as_deref(), is_stdout_for_auth_check) { auth_error_handled = true; tracing::warn!(task_id = %task_id, "OAuth authentication error detected, initiating remote login flow"); diff --git a/makima/src/daemon/worktree/manager.rs b/makima/src/daemon/worktree/manager.rs index 9af5dcb..ff0e9e7 100644 --- a/makima/src/daemon/worktree/manager.rs +++ b/makima/src/daemon/worktree/manager.rs @@ -333,34 +333,76 @@ impl WorktreeManager { // Create base directory tokio::fs::create_dir_all(&self.base_dir).await?; - tracing::info!( - task_id = %task_id, - worktree_path = %worktree_path.display(), - branch = %branch_name, - base_branch = %base_branch, - "Creating worktree from local branch" - ); + // Prune stale worktree entries first (handles "missing but registered" errors) + let _ = Command::new("git") + .args(["worktree", "prune"]) + .current_dir(source_repo) + .output() + .await; - // Create the worktree with a new branch based on the local base_branch - let output = Command::new("git") - .args([ - "worktree", - "add", - "-b", - &branch_name, - ]) - .arg(&worktree_path) - .arg(base_branch) + // Check if the branch already exists (e.g., from a previous run of the same task) + let branch_exists = Command::new("git") + .args(["rev-parse", "--verify", &format!("refs/heads/{}", branch_name)]) .current_dir(source_repo) .output() - .await?; + .await + .map(|o| o.status.success()) + .unwrap_or(false); - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(WorktreeError::GitCommand(format!( - "Failed to create worktree: {}", - stderr - ))); + if branch_exists { + tracing::info!( + task_id = %task_id, + worktree_path = %worktree_path.display(), + branch = %branch_name, + "Branch already exists, creating worktree from existing branch" + ); + + // Use existing branch - try without force first, then with force + let output = Command::new("git") + .args(["worktree", "add", "-f"]) + .arg(&worktree_path) + .arg(&branch_name) + .current_dir(source_repo) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to create worktree from existing branch: {}", + stderr + ))); + } + } else { + tracing::info!( + task_id = %task_id, + worktree_path = %worktree_path.display(), + branch = %branch_name, + base_branch = %base_branch, + "Creating worktree with new branch" + ); + + // Create the worktree with a new branch based on the local base_branch + let output = Command::new("git") + .args([ + "worktree", + "add", + "-b", + &branch_name, + ]) + .arg(&worktree_path) + .arg(base_branch) + .current_dir(source_repo) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to create worktree: {}", + stderr + ))); + } } tracing::info!( @@ -439,35 +481,78 @@ impl WorktreeManager { // Create base directory tokio::fs::create_dir_all(&self.base_dir).await?; - tracing::info!( - task_id = %task_id, - source_worktree = %source_worktree.display(), - worktree_path = %worktree_path.display(), - branch = %branch_name, - source_commit = %source_commit, - "Creating worktree from source task" - ); + // Prune stale worktree entries first (handles "missing but registered" errors) + let _ = Command::new("git") + .args(["worktree", "prune"]) + .current_dir(&source_repo) + .output() + .await; - // Create a new worktree based on the source commit - let output = Command::new("git") - .args([ - "worktree", - "add", - "-b", - &branch_name, - ]) - .arg(&worktree_path) - .arg(&source_commit) + // Check if the branch already exists (e.g., from a previous run of the same task) + let branch_exists = Command::new("git") + .args(["rev-parse", "--verify", &format!("refs/heads/{}", branch_name)]) .current_dir(&source_repo) .output() - .await?; + .await + .map(|o| o.status.success()) + .unwrap_or(false); - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(WorktreeError::GitCommand(format!( - "Failed to create worktree: {}", - stderr - ))); + if branch_exists { + tracing::info!( + task_id = %task_id, + source_worktree = %source_worktree.display(), + worktree_path = %worktree_path.display(), + branch = %branch_name, + "Branch already exists, creating worktree from existing branch" + ); + + // Use existing branch with force flag + let output = Command::new("git") + .args(["worktree", "add", "-f"]) + .arg(&worktree_path) + .arg(&branch_name) + .current_dir(&source_repo) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to create worktree from existing branch: {}", + stderr + ))); + } + } else { + tracing::info!( + task_id = %task_id, + source_worktree = %source_worktree.display(), + worktree_path = %worktree_path.display(), + branch = %branch_name, + source_commit = %source_commit, + "Creating worktree from source task with new branch" + ); + + // Create a new worktree based on the source commit + let output = Command::new("git") + .args([ + "worktree", + "add", + "-b", + &branch_name, + ]) + .arg(&worktree_path) + .arg(&source_commit) + .current_dir(&source_repo) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to create worktree: {}", + stderr + ))); + } } // Now copy uncommitted changes from source worktree diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index 2d90a04..3da6fd5 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -52,7 +52,7 @@ pub fn extract_auth(state: &SharedState, headers: &HeaderMap) -> AuthSource { if let Some(task_id) = state.validate_tool_key(key_str) { return AuthSource::ToolKey(task_id); } - tracing::warn!("Invalid tool key provided"); + tracing::warn!("Invalid tool key provided: {}", key_str); } } @@ -1774,3 +1774,688 @@ pub async fn check_target_exists( })) .into_response() } + +// ============================================================================= +// Task Reassignment (Daemon Failover) +// ============================================================================= + +/// Request to reassign a task to a new daemon after daemon disconnect. +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ReassignTaskRequest { + /// Target daemon ID to reassign to. If not provided, will select any available daemon. + pub target_daemon_id: Option<Uuid>, + /// Whether to include conversation context from previous run. + #[serde(default = "default_include_context")] + pub include_context: bool, +} + +fn default_include_context() -> bool { + true +} + +/// Response from task reassignment. +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ReassignTaskResponse { + /// The new task that was created. + pub task: Task, + /// The new daemon ID. + pub daemon_id: Uuid, + /// The ID of the old task that was deleted. + pub old_task_id: Uuid, + /// Whether conversation context was included. + pub context_included: bool, + /// Number of context entries from previous conversation. + pub context_entries: usize, +} + +/// Build a conversation context summary from task output entries. +/// Returns a formatted string that can be prepended to the task plan. +fn build_conversation_context(entries: &[TaskOutputEntry]) -> String { + if entries.is_empty() { + return String::new(); + } + + let mut context = String::from("\n\n=== PREVIOUS CONVERSATION CONTEXT ===\n"); + context.push_str("The daemon running this task disconnected. Here is what happened so far:\n\n"); + + for entry in entries.iter() { + match entry.message_type.as_str() { + "assistant" => { + context.push_str("Assistant: "); + // Truncate long messages + let content = if entry.content.len() > 500 { + format!("{}... [truncated]", &entry.content[..500]) + } else { + entry.content.clone() + }; + context.push_str(&content); + context.push_str("\n\n"); + } + "tool_use" => { + if let Some(ref tool_name) = entry.tool_name { + context.push_str(&format!("[Used tool: {}]\n", tool_name)); + } + } + "tool_result" => { + // Summarize tool results briefly + if entry.content.len() > 200 { + context.push_str(&format!("[Tool result: {}... truncated]\n", &entry.content[..200])); + } else if !entry.content.is_empty() { + context.push_str(&format!("[Tool result: {}]\n", entry.content)); + } + } + "user" => { + context.push_str("User: "); + context.push_str(&entry.content); + context.push_str("\n\n"); + } + _ => {} + } + } + + context.push_str("=== END PREVIOUS CONTEXT ===\n\n"); + context.push_str("Please continue from where the conversation left off. Do not repeat work that was already done.\n\n"); + + context +} + +/// Reassign a task to a new daemon after the original daemon disconnected. +/// +/// This endpoint is used for daemon failover - when a daemon restarts or disconnects, +/// the task can be reassigned to a new daemon with the conversation context preserved. +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/reassign", + params( + ("id" = Uuid, Path, description = "Task ID") + ), + request_body = ReassignTaskRequest, + responses( + (status = 200, description = "Task reassigned successfully", body = ReassignTaskResponse), + (status = 400, description = "Task cannot be reassigned (not in failed/interrupted state)", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task not found", body = ApiError), + (status = 503, description = "No daemon available", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn reassign_task( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, + Json(body): Json<ReassignTaskRequest>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get the task + let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Check if task is in a state that can be reassigned + // Allow reassignment for: failed, interrupted, pending, or tasks whose daemon disconnected + // Helper closure to check if a daemon is connected by its UUID + let is_daemon_connected = |daemon_id: Uuid| { + state.daemon_connections.iter().any(|d| d.value().id == daemon_id) + }; + + let can_reassign = matches!( + task.status.as_str(), + "failed" | "interrupted" | "pending" | "starting" + ) || { + // Also allow if daemon is not connected + if let Some(daemon_id) = task.daemon_id { + !is_daemon_connected(daemon_id) + } else { + true + } + }; + + if !can_reassign && task.status == "running" { + // Running task - check if its daemon is still connected + if let Some(daemon_id) = task.daemon_id { + if is_daemon_connected(daemon_id) { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "TASK_RUNNING", + "Task is running on a connected daemon. Stop it first to reassign.", + )), + ) + .into_response(); + } + } + } + + // Find a target daemon + let target_daemon_id = if let Some(requested_daemon_id) = body.target_daemon_id { + // Verify the requested daemon is connected and belongs to the owner + let daemon = state.daemon_connections.iter().find(|d| d.value().id == requested_daemon_id); + if let Some(daemon) = daemon { + if daemon.owner_id != auth.owner_id { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("INVALID_DAEMON", "Daemon does not belong to your account")), + ) + .into_response(); + } + requested_daemon_id + } else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DAEMON_NOT_CONNECTED", "Requested daemon is not connected")), + ) + .into_response(); + } + } else { + // Find any available daemon for this owner + match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) { + Some(entry) => entry.value().id, + None => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")), + ) + .into_response(); + } + } + }; + + // Build conversation context if requested + let (context_str, context_entries) = if body.include_context { + match repository::get_task_output(pool, id, Some(500)).await { + Ok(events) => { + let entries: Vec<TaskOutputEntry> = events + .into_iter() + .filter_map(TaskOutputEntry::from_task_event) + .collect(); + let context = build_conversation_context(&entries); + let count = entries.len(); + (context, count) + } + Err(e) => { + tracing::warn!("Failed to get task output for context: {}", e); + (String::new(), 0) + } + } + } else { + (String::new(), 0) + }; + + // Build updated plan with context prepended + let updated_plan = if !context_str.is_empty() { + format!("{}{}", context_str, task.plan) + } else { + task.plan.clone() + }; + + // Create a NEW task with the conversation context + let create_req = CreateTaskRequest { + contract_id: task.contract_id.unwrap_or(Uuid::nil()), + name: format!("{} (resumed)", task.name), + description: task.description.clone(), + plan: updated_plan.clone(), + parent_task_id: task.parent_task_id, + is_supervisor: task.is_supervisor, + priority: task.priority, + repository_url: task.repository_url.clone(), + base_branch: task.base_branch.clone(), + target_branch: task.target_branch.clone(), + merge_mode: task.merge_mode.clone(), + target_repo_path: task.target_repo_path.clone(), + completion_action: task.completion_action.clone(), + continue_from_task_id: Some(id), // Continue from the old task's worktree if possible + copy_files: None, + checkpoint_sha: task.last_checkpoint_sha.clone(), + }; + + let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await { + Ok(t) => t, + Err(e) => { + tracing::error!("Failed to create new task for reassignment: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Update new task to starting and assign daemon + let start_update = UpdateTaskRequest { + status: Some("starting".to_string()), + daemon_id: Some(target_daemon_id), + version: Some(new_task.version), + ..Default::default() + }; + + let final_task = match repository::update_task_for_owner(pool, new_task.id, auth.owner_id, start_update).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "New task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to update new task daemon assignment: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Send SpawnTask command to daemon for the new task + let command = DaemonCommand::SpawnTask { + task_id: new_task.id, + task_name: final_task.name.clone(), + plan: updated_plan, + repo_url: task.repository_url.clone(), + base_branch: task.base_branch.clone(), + target_branch: task.target_branch.clone(), + parent_task_id: task.parent_task_id, + depth: task.depth, + is_orchestrator: false, // New task starts fresh + target_repo_path: task.target_repo_path.clone(), + completion_action: task.completion_action.clone(), + continue_from_task_id: Some(id), // Continue from old task's worktree + copy_files: None, + contract_id: task.contract_id, + is_supervisor: task.is_supervisor, + }; + + tracing::info!( + old_task_id = %id, + new_task_id = %new_task.id, + new_daemon_id = %target_daemon_id, + context_entries = context_entries, + "Reassigning task: creating new task and deleting old one" + ); + + if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { + tracing::error!("Failed to send SpawnTask command for reassignment: {}", e); + // Rollback: delete the new task we created + let _ = repository::delete_task_for_owner(pool, new_task.id, auth.owner_id).await; + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DAEMON_ERROR", e)), + ) + .into_response(); + } + + // Delete the old task now that the new one is spawned + let old_task_id = id; + if let Err(e) = repository::delete_task_for_owner(pool, old_task_id, auth.owner_id).await { + tracing::warn!("Failed to delete old task {}: {}", old_task_id, e); + // Don't fail the request, the new task is already running + } + + // Notify the contract's supervisor about the reassignment (if applicable) + if let Some(contract_id) = task.contract_id { + if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { + if let Some(supervisor_task_id) = contract.supervisor_task_id { + // Don't notify if we're reassigning the supervisor itself + if supervisor_task_id != old_task_id { + // Find the supervisor's daemon and send a message + if let Ok(Some(supervisor_task)) = repository::get_task_for_owner(pool, supervisor_task_id, auth.owner_id).await { + if supervisor_task.status == "running" { + if let Some(supervisor_daemon_id) = supervisor_task.daemon_id { + // Find the daemon by its UUID + if let Some(daemon_entry) = state.daemon_connections.iter().find(|d| d.value().id == supervisor_daemon_id) { + let notification_msg = format!( + "\n\n[SYSTEM NOTIFICATION] Task '{}' (ID: {}) was reassigned due to daemon disconnect. \ + A new task '{}' (ID: {}) has been created to continue the work. \ + The new task has {} context entries from the previous conversation.\n\n", + task.name, + old_task_id, + final_task.name, + new_task.id, + context_entries + ); + + let notify_cmd = DaemonCommand::SendMessage { + task_id: supervisor_task_id, + message: notification_msg, + }; + + if let Err(e) = state.send_daemon_command(daemon_entry.value().id, notify_cmd).await { + tracing::warn!( + supervisor_id = %supervisor_task_id, + error = %e, + "Failed to notify supervisor about task reassignment" + ); + } else { + tracing::info!( + supervisor_id = %supervisor_task_id, + old_task_id = %old_task_id, + new_task_id = %new_task.id, + "Notified supervisor about task reassignment" + ); + } + } + } + } + } + } + } + } + } + + // Broadcast task update for the new task + state.broadcast_task_update(TaskUpdateNotification { + task_id: new_task.id, + owner_id: Some(auth.owner_id), + version: final_task.version, + status: "starting".to_string(), + updated_fields: vec!["status".to_string(), "daemon_id".to_string()], + updated_by: "reassignment".to_string(), + }); + + Json(ReassignTaskResponse { + task: final_task, + daemon_id: target_daemon_id, + old_task_id, + context_included: !context_str.is_empty(), + context_entries, + }) + .into_response() +} + +// ============================================================================= +// Task Continue (Restart with Context) +// ============================================================================= + +/// Request to continue a task after daemon disconnect (restart in-place with context). +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ContinueTaskRequest { + /// Target daemon ID to continue on. If not provided, will select any available daemon. + pub target_daemon_id: Option<Uuid>, +} + +/// Response from continuing a task. +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ContinueTaskResponse { + /// The continued task (same ID, updated plan with context). + pub task: Task, + /// The daemon ID running the task. + pub daemon_id: Uuid, + /// Number of context entries from previous conversation. + pub context_entries: usize, +} + +/// Continue a task after daemon disconnect by restarting it with conversation context. +/// +/// Unlike reassign, this keeps the same task ID and just restarts it with the +/// previous conversation context prepended to the plan. Useful for supervisors. +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/continue", + params( + ("id" = Uuid, Path, description = "Task ID") + ), + request_body = ContinueTaskRequest, + responses( + (status = 200, description = "Task continued successfully", body = ContinueTaskResponse), + (status = 400, description = "Task cannot be continued", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task not found", body = ApiError), + (status = 503, description = "No daemon available", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn continue_task( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, + Json(body): Json<ContinueTaskRequest>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get the task + let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Helper closure to check if a daemon is connected by its UUID + let is_daemon_connected = |daemon_id: Uuid| { + state.daemon_connections.iter().any(|d| d.value().id == daemon_id) + }; + + // Check if task can be continued (not currently running on a connected daemon) + let can_continue = matches!( + task.status.as_str(), + "failed" | "interrupted" | "pending" | "starting" | "completed" + ) || { + if let Some(daemon_id) = task.daemon_id { + !is_daemon_connected(daemon_id) + } else { + true + } + }; + + if !can_continue && task.status == "running" { + if let Some(daemon_id) = task.daemon_id { + if is_daemon_connected(daemon_id) { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "TASK_RUNNING", + "Task is running on a connected daemon. Stop it first to continue.", + )), + ) + .into_response(); + } + } + } + + // Find a target daemon + let target_daemon_id = if let Some(requested_daemon_id) = body.target_daemon_id { + let daemon = state.daemon_connections.iter().find(|d| d.value().id == requested_daemon_id); + if let Some(daemon) = daemon { + if daemon.owner_id != auth.owner_id { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("INVALID_DAEMON", "Daemon does not belong to your account")), + ) + .into_response(); + } + requested_daemon_id + } else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DAEMON_NOT_CONNECTED", "Requested daemon is not connected")), + ) + .into_response(); + } + } else { + match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) { + Some(entry) => entry.value().id, + None => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")), + ) + .into_response(); + } + } + }; + + // Build conversation context from task output + let (context_str, context_entries) = match repository::get_task_output(pool, id, Some(500)).await { + Ok(events) => { + let entries: Vec<TaskOutputEntry> = events + .into_iter() + .filter_map(TaskOutputEntry::from_task_event) + .collect(); + let context = build_conversation_context(&entries); + let count = entries.len(); + (context, count) + } + Err(e) => { + tracing::warn!("Failed to get task output for context: {}", e); + (String::new(), 0) + } + }; + + // Build updated plan with context prepended + let updated_plan = if !context_str.is_empty() { + format!("{}{}", context_str, task.plan) + } else { + task.plan.clone() + }; + + // Update task in database: reset status, update plan with context, assign daemon + let update_req = UpdateTaskRequest { + status: Some("starting".to_string()), + plan: Some(updated_plan.clone()), + daemon_id: Some(target_daemon_id), + error_message: None, + ..Default::default() + }; + + let updated_task = match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to update task for continuation: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Check if this is an orchestrator + let subtask_count = match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await { + Ok(subtasks) => subtasks.len(), + Err(_) => 0, + }; + let is_orchestrator = task.depth == 0 && subtask_count > 0; + + // Send SpawnTask command to daemon + let command = DaemonCommand::SpawnTask { + task_id: id, + task_name: task.name.clone(), + plan: updated_plan, + repo_url: task.repository_url.clone(), + base_branch: task.base_branch.clone(), + target_branch: task.target_branch.clone(), + parent_task_id: task.parent_task_id, + depth: task.depth, + is_orchestrator, + target_repo_path: task.target_repo_path.clone(), + completion_action: task.completion_action.clone(), + continue_from_task_id: task.continue_from_task_id, + copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), + contract_id: task.contract_id, + is_supervisor: task.is_supervisor, + }; + + tracing::info!( + task_id = %id, + daemon_id = %target_daemon_id, + context_entries = context_entries, + is_supervisor = task.is_supervisor, + "Continuing task with conversation context" + ); + + if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { + tracing::error!("Failed to send SpawnTask command for continuation: {}", e); + // Rollback + let rollback_req = UpdateTaskRequest { + status: Some("failed".to_string()), + clear_daemon_id: true, + error_message: Some(format!("Continuation failed: {}", e)), + ..Default::default() + }; + let _ = repository::update_task_for_owner(pool, id, auth.owner_id, rollback_req).await; + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DAEMON_ERROR", e)), + ) + .into_response(); + } + + // Broadcast task update + state.broadcast_task_update(TaskUpdateNotification { + task_id: id, + owner_id: Some(auth.owner_id), + version: updated_task.version, + status: "starting".to_string(), + updated_fields: vec!["status".to_string(), "daemon_id".to_string(), "plan".to_string()], + updated_by: "continuation".to_string(), + }); + + Json(ContinueTaskResponse { + task: updated_task, + daemon_id: target_daemon_id, + context_entries, + }) + .into_response() +} diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 178e5e1..39b12da 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -369,6 +369,18 @@ pub enum DaemonMessage { /// Error message if operation failed error: Option<String>, }, + /// Notification that a branch was created + BranchCreated { + #[serde(rename = "taskId")] + task_id: Option<Uuid>, + /// Name of the branch that was created + #[serde(rename = "branchName")] + branch_name: String, + /// Whether the operation succeeded + success: bool, + /// Error message if operation failed + error: Option<String>, + }, } /// Validated daemon authentication result. @@ -1073,6 +1085,36 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re ); } } + Ok(DaemonMessage::BranchCreated { task_id, branch_name, success, error }) => { + tracing::info!( + task_id = ?task_id, + branch_name = %branch_name, + success = success, + error = ?error, + "Branch created notification received" + ); + + // Broadcast as task output if we have a task_id + if let Some(tid) = task_id { + let output_text = if success { + format!("✓ Branch '{}' created successfully", branch_name) + } else { + format!("✗ Failed to create branch '{}': {}", branch_name, error.unwrap_or_default()) + }; + state.broadcast_task_output(TaskOutputNotification { + task_id: tid, + owner_id: Some(owner_id), + message_type: "system".to_string(), + content: output_text, + tool_name: None, + tool_input: None, + is_error: Some(!success), + cost_usd: None, + duration_ms: None, + is_partial: false, + }); + } + } Err(e) => { tracing::warn!("Failed to parse daemon message: {}", e); } diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index ac59130..d0fa4d1 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -15,6 +15,7 @@ use uuid::Uuid; use crate::db::models::{CreateTaskRequest, Task, TaskSummary}; use crate::db::repository; +use crate::server::auth::Authenticated; use crate::server::handlers::mesh::{extract_auth, AuthSource}; use crate::server::messages::ApiError; use crate::server::state::{DaemonCommand, SharedState}; @@ -32,7 +33,7 @@ pub struct SpawnTaskRequest { pub contract_id: Uuid, pub parent_task_id: Option<Uuid>, pub checkpoint_sha: Option<String>, - /// Repository URL for the task (supervisor should provide this) + /// Repository URL for the task (optional - if not provided, will be looked up from contract). pub repository_url: Option<String>, } @@ -55,6 +56,67 @@ pub struct ReadWorktreeFileRequest { pub file_path: String, } +/// Request to ask a question and wait for user feedback. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct AskQuestionRequest { + /// The question to ask the user + pub question: String, + /// Optional choices (if empty, free-form text response) + #[serde(default)] + pub choices: Vec<String>, + /// Optional context about what this relates to + pub context: Option<String>, + /// How long to wait for a response (seconds) + #[serde(default = "default_question_timeout")] + pub timeout_seconds: i32, +} + +fn default_question_timeout() -> i32 { + 3600 // 1 hour default +} + +/// Response from asking a question. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct AskQuestionResponse { + /// The question ID for tracking + pub question_id: Uuid, + /// The user's response (None if timed out) + pub response: Option<String>, + /// Whether the question timed out + pub timed_out: bool, +} + +/// Request to answer a supervisor question. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct AnswerQuestionRequest { + /// The user's response + pub response: String, +} + +/// Response to answering a question. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct AnswerQuestionResponse { + /// Whether the answer was accepted + pub success: bool, +} + +/// Pending question summary. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct PendingQuestionSummary { + pub question_id: Uuid, + pub task_id: Uuid, + pub contract_id: Uuid, + pub question: String, + pub choices: Vec<String>, + pub context: Option<String>, + pub created_at: chrono::DateTime<chrono::Utc>, +} + /// Request to create a checkpoint. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] @@ -321,23 +383,49 @@ pub async fn spawn_task( } }; - // Get repository URL from the contract's primary repository - let repo_url = match repository::list_contract_repositories(pool, request.contract_id).await { - Ok(repos) => { - // Prefer primary repo, fallback to first repo - repos.iter() - .find(|r| r.is_primary) - .or(repos.first()) - .and_then(|r| r.repository_url.clone()) - } - Err(e) => { - tracing::warn!(error = %e, "Failed to get contract repositories, continuing without repo URL"); + // Get repository URL - either from request or from contract's repositories + let repo_url = if let Some(url) = request.repository_url.clone() { + if !url.trim().is_empty() { + Some(url) + } else { None } + } else { + None }; - // Supervisor can override with explicit repository_url - let repo_url = request.repository_url.clone().or(repo_url); + // If no repo URL provided, look it up from the contract + let repo_url = match repo_url { + Some(url) => Some(url), + None => { + match repository::list_contract_repositories(pool, request.contract_id).await { + Ok(repos) => { + // Prefer primary repo, fallback to first repo + let repo = repos.iter() + .find(|r| r.is_primary) + .or(repos.first()); + + // Use repository_url if set, otherwise use local_path + repo.and_then(|r| { + r.repository_url.clone() + .or_else(|| r.local_path.clone()) + }) + } + Err(e) => { + tracing::warn!(error = %e, "Failed to get contract repositories"); + None + } + } + } + }; + + // Validate that we have a repo URL + if repo_url.is_none() { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("MISSING_REPO_URL", "No repository URL found. Either provide one or ensure the contract has repositories configured.")), + ).into_response(); + } // Create task request let create_req = CreateTaskRequest { @@ -1151,3 +1239,208 @@ pub async fn get_task_diff( }), ).into_response() } + +// ============================================================================= +// Supervisor Question Handlers +// ============================================================================= + +/// Ask a question and wait for user feedback. +/// +/// The supervisor calls this to ask a question. The endpoint will poll until +/// either the user responds or the timeout is reached. +#[utoipa::path( + post, + path = "/api/v1/mesh/supervisor/questions", + request_body = AskQuestionRequest, + responses( + (status = 200, description = "Question answered", body = AskQuestionResponse), + (status = 408, description = "Question timed out", body = AskQuestionResponse), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - not a supervisor"), + (status = 500, description = "Internal server error"), + ), + security( + ("tool_key" = []) + ), + tag = "Mesh Supervisor" +)] +pub async fn ask_question( + State(state): State<SharedState>, + headers: HeaderMap, + Json(request): Json<AskQuestionRequest>, +) -> impl IntoResponse { + let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { + Ok(ids) => ids, + Err(e) => return e.into_response(), + }; + + let pool = state.db_pool.as_ref().unwrap(); + + // Get the supervisor task to find its contract + let supervisor = match repository::get_task_for_owner(pool, supervisor_id, owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Supervisor task not found")), + ).into_response(); + } + Err(e) => { + tracing::error!(error = %e, "Failed to get supervisor task"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to get supervisor task")), + ).into_response(); + } + }; + + let Some(contract_id) = supervisor.contract_id else { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("NO_CONTRACT", "Supervisor has no associated contract")), + ).into_response(); + }; + + // Add the question + let question_id = state.add_supervisor_question( + supervisor_id, + contract_id, + owner_id, + request.question.clone(), + request.choices.clone(), + request.context.clone(), + ); + + // Poll for response with timeout + let timeout_duration = std::time::Duration::from_secs(request.timeout_seconds.max(1) as u64); + let start = std::time::Instant::now(); + let poll_interval = std::time::Duration::from_millis(500); + + loop { + // Check if response has been submitted + if let Some(response) = state.get_question_response(question_id) { + // Clean up the response + state.cleanup_question_response(question_id); + + return ( + StatusCode::OK, + Json(AskQuestionResponse { + question_id, + response: Some(response.response), + timed_out: false, + }), + ).into_response(); + } + + // Check timeout + if start.elapsed() >= timeout_duration { + // Remove the pending question on timeout + state.remove_pending_question(question_id); + + return ( + StatusCode::REQUEST_TIMEOUT, + Json(AskQuestionResponse { + question_id, + response: None, + timed_out: true, + }), + ).into_response(); + } + + // Wait before polling again + tokio::time::sleep(poll_interval).await; + } +} + +/// Get all pending questions for the current user. +#[utoipa::path( + get, + path = "/api/v1/mesh/questions", + responses( + (status = 200, description = "List of pending questions", body = Vec<PendingQuestionSummary>), + (status = 401, description = "Unauthorized"), + (status = 500, description = "Internal server error"), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn list_pending_questions( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, +) -> impl IntoResponse { + let questions: Vec<PendingQuestionSummary> = state + .get_pending_questions_for_owner(auth.owner_id) + .into_iter() + .map(|q| PendingQuestionSummary { + question_id: q.question_id, + task_id: q.task_id, + contract_id: q.contract_id, + question: q.question, + choices: q.choices, + context: q.context, + created_at: q.created_at, + }) + .collect(); + + Json(questions).into_response() +} + +/// Answer a pending supervisor question. +#[utoipa::path( + post, + path = "/api/v1/mesh/questions/{question_id}/answer", + params( + ("question_id" = Uuid, Path, description = "Question ID") + ), + request_body = AnswerQuestionRequest, + responses( + (status = 200, description = "Question answered", body = AnswerQuestionResponse), + (status = 401, description = "Unauthorized"), + (status = 404, description = "Question not found"), + (status = 500, description = "Internal server error"), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn answer_question( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(question_id): Path<Uuid>, + Json(request): Json<AnswerQuestionRequest>, +) -> impl IntoResponse { + // Verify the question exists and belongs to this owner + let question = match state.get_pending_question(question_id) { + Some(q) if q.owner_id == auth.owner_id => q, + Some(_) => { + return ( + StatusCode::FORBIDDEN, + Json(ApiError::new("FORBIDDEN", "Question belongs to another user")), + ).into_response(); + } + None => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Question not found or already answered")), + ).into_response(); + } + }; + + // Submit the response + let success = state.submit_question_response(question_id, request.response); + + if success { + tracing::info!( + question_id = %question_id, + task_id = %question.task_id, + "User answered supervisor question" + ); + } + + Json(AnswerQuestionResponse { success }).into_response() +} diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index a4a01e0..27ee06c 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -82,6 +82,8 @@ pub fn make_router(state: SharedState) -> Router { .route("/mesh/tasks/{id}/retry-completion", post(mesh::retry_completion_action)) .route("/mesh/tasks/{id}/clone", post(mesh::clone_worktree)) .route("/mesh/tasks/{id}/check-target", post(mesh::check_target_exists)) + .route("/mesh/tasks/{id}/reassign", post(mesh::reassign_task)) + .route("/mesh/tasks/{id}/continue", post(mesh::continue_task)) .route("/mesh/chat", post(mesh_chat::mesh_toplevel_chat_handler)) .route( "/mesh/chat/history", @@ -114,6 +116,10 @@ pub fn make_router(state: SharedState) -> Router { .route("/mesh/supervisor/tasks/{task_id}/merge", post(mesh_supervisor::merge_task)) .route("/mesh/supervisor/tasks/{task_id}/diff", get(mesh_supervisor::get_task_diff)) .route("/mesh/supervisor/pr", post(mesh_supervisor::create_pr)) + // Supervisor question endpoints + .route("/mesh/supervisor/questions", post(mesh_supervisor::ask_question)) + .route("/mesh/questions", get(mesh_supervisor::list_pending_questions)) + .route("/mesh/questions/{question_id}/answer", post(mesh_supervisor::answer_question)) // Mesh WebSocket endpoints .route("/mesh/tasks/subscribe", get(mesh_ws::task_subscription_handler)) .route("/mesh/daemons/connect", get(mesh_daemon::daemon_handler)) diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs index 1c28544..495fc15 100644 --- a/makima/src/server/state.rs +++ b/makima/src/server/state.rs @@ -103,6 +103,54 @@ pub struct TaskCompletionNotification { pub error_message: Option<String>, } +/// Notification for supervisor questions requiring user feedback. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SupervisorQuestionNotification { + /// Unique ID for this question + pub question_id: Uuid, + /// Supervisor task that asked the question + pub task_id: Uuid, + /// Contract this question relates to + pub contract_id: Uuid, + /// Owner ID for data isolation + #[serde(skip)] + pub owner_id: Option<Uuid>, + /// The question text + pub question: String, + /// Optional choices for the user (if empty, free-form text response) + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub choices: Vec<String>, + /// Context about what phase/action this relates to + #[serde(skip_serializing_if = "Option::is_none")] + pub context: Option<String>, + /// Whether this question is still pending + pub pending: bool, + /// When the question was asked + pub created_at: chrono::DateTime<chrono::Utc>, +} + +/// Stored supervisor question for persistence +#[derive(Debug, Clone)] +pub struct PendingSupervisorQuestion { + pub question_id: Uuid, + pub task_id: Uuid, + pub contract_id: Uuid, + pub owner_id: Uuid, + pub question: String, + pub choices: Vec<String>, + pub context: Option<String>, + pub created_at: chrono::DateTime<chrono::Utc>, +} + +/// Response to a supervisor question +#[derive(Debug, Clone)] +pub struct SupervisorQuestionResponse { + pub question_id: Uuid, + pub response: String, + pub responded_at: chrono::DateTime<chrono::Utc>, +} + /// Command sent from server to daemon. #[derive(Debug, Clone, serde::Serialize)] #[serde(tag = "type", rename_all = "camelCase")] @@ -408,6 +456,12 @@ pub struct AppState { pub task_output: broadcast::Sender<TaskOutputNotification>, /// Broadcast channel for task completion notifications (for supervisors) pub task_completions: broadcast::Sender<TaskCompletionNotification>, + /// Broadcast channel for supervisor question notifications + pub supervisor_questions: broadcast::Sender<SupervisorQuestionNotification>, + /// Pending supervisor questions awaiting user response (keyed by question_id) + pub pending_questions: DashMap<Uuid, PendingSupervisorQuestion>, + /// Responses to supervisor questions (keyed by question_id) + pub question_responses: DashMap<Uuid, SupervisorQuestionResponse>, /// Active daemon connections (keyed by connection_id) pub daemon_connections: DashMap<String, DaemonConnectionInfo>, /// Tool keys for orchestrator API access (key -> task_id) @@ -435,6 +489,7 @@ impl AppState { let (task_updates, _) = broadcast::channel(256); let (task_output, _) = broadcast::channel(1024); // Larger buffer for output streaming let (task_completions, _) = broadcast::channel(256); // For supervisor task monitoring + let (supervisor_questions, _) = broadcast::channel(256); // For supervisor questions to users // Initialize JWT verifier from environment (optional) // Requires SUPABASE_URL and either SUPABASE_JWT_PUBLIC_KEY (RS256) or SUPABASE_JWT_SECRET (HS256) @@ -476,6 +531,9 @@ impl AppState { task_updates, task_output, task_completions, + supervisor_questions, + pending_questions: DashMap::new(), + question_responses: DashMap::new(), daemon_connections: DashMap::new(), tool_keys: DashMap::new(), jwt_verifier, @@ -556,6 +614,133 @@ impl AppState { let _ = self.task_completions.send(notification); } + /// Broadcast a supervisor question notification to all subscribers. + /// + /// Used to notify frontend clients when a supervisor needs user feedback. + pub fn broadcast_supervisor_question(&self, notification: SupervisorQuestionNotification) { + let _ = self.supervisor_questions.send(notification); + } + + /// Add a pending supervisor question and broadcast it. + pub fn add_supervisor_question( + &self, + task_id: Uuid, + contract_id: Uuid, + owner_id: Uuid, + question: String, + choices: Vec<String>, + context: Option<String>, + ) -> Uuid { + let question_id = Uuid::new_v4(); + let now = chrono::Utc::now(); + + // Store the pending question + self.pending_questions.insert( + question_id, + PendingSupervisorQuestion { + question_id, + task_id, + contract_id, + owner_id, + question: question.clone(), + choices: choices.clone(), + context: context.clone(), + created_at: now, + }, + ); + + // Broadcast to subscribers + self.broadcast_supervisor_question(SupervisorQuestionNotification { + question_id, + task_id, + contract_id, + owner_id: Some(owner_id), + question, + choices, + context, + pending: true, + created_at: now, + }); + + tracing::info!( + question_id = %question_id, + task_id = %task_id, + contract_id = %contract_id, + "Supervisor question added" + ); + + question_id + } + + /// Remove a pending question (after it's been answered). + pub fn remove_pending_question(&self, question_id: Uuid) -> Option<PendingSupervisorQuestion> { + self.pending_questions.remove(&question_id).map(|(_, q)| q) + } + + /// Get all pending questions for an owner. + pub fn get_pending_questions_for_owner(&self, owner_id: Uuid) -> Vec<PendingSupervisorQuestion> { + self.pending_questions + .iter() + .filter(|entry| entry.value().owner_id == owner_id) + .map(|entry| entry.value().clone()) + .collect() + } + + /// Get a specific pending question. + pub fn get_pending_question(&self, question_id: Uuid) -> Option<PendingSupervisorQuestion> { + self.pending_questions.get(&question_id).map(|entry| entry.value().clone()) + } + + /// Submit a response to a supervisor question. + pub fn submit_question_response(&self, question_id: Uuid, response: String) -> bool { + // Check if the question exists + if let Some(question) = self.pending_questions.remove(&question_id) { + let now = chrono::Utc::now(); + + // Store the response + self.question_responses.insert( + question_id, + SupervisorQuestionResponse { + question_id, + response: response.clone(), + responded_at: now, + }, + ); + + // Broadcast that the question is no longer pending + self.broadcast_supervisor_question(SupervisorQuestionNotification { + question_id, + task_id: question.1.task_id, + contract_id: question.1.contract_id, + owner_id: Some(question.1.owner_id), + question: question.1.question, + choices: question.1.choices, + context: question.1.context, + pending: false, + created_at: question.1.created_at, + }); + + tracing::info!( + question_id = %question_id, + "Supervisor question answered" + ); + + true + } else { + false + } + } + + /// Get the response to a question (if answered). + pub fn get_question_response(&self, question_id: Uuid) -> Option<SupervisorQuestionResponse> { + self.question_responses.get(&question_id).map(|entry| entry.value().clone()) + } + + /// Clean up a question response after the supervisor has read it. + pub fn cleanup_question_response(&self, question_id: Uuid) { + self.question_responses.remove(&question_id); + } + /// Register a new daemon connection. /// /// Returns the connection_id for later reference. |
