diff options
| author | soryu <soryu@soryu.co> | 2026-01-16 19:50:27 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-17 05:38:07 +0000 |
| commit | 75d9644d44ba998a32ed14c072e883a75145ab72 (patch) | |
| tree | b82dee94632fd40764a92a9b11da24ef21600ed5 | |
| parent | 6b94b5895ed27e3aef052a1843fb3f334397d1b4 (diff) | |
| download | soryu-75d9644d44ba998a32ed14c072e883a75145ab72.tar.gz soryu-75d9644d44ba998a32ed14c072e883a75145ab72.zip | |
Add autopilot panel and retry system
| -rw-r--r-- | makima/frontend/src/components/contracts/AutopilotPanel.tsx | 208 | ||||
| -rw-r--r-- | makima/frontend/src/components/contracts/ContractDetail.tsx | 7 | ||||
| -rw-r--r-- | makima/frontend/src/lib/api.ts | 114 | ||||
| -rw-r--r-- | makima/frontend/tsconfig.tsbuildinfo | 2 | ||||
| -rw-r--r-- | makima/migrations/20250119000000_add_task_retry_tracking.sql | 22 | ||||
| -rw-r--r-- | makima/src/db/models.rs | 19 | ||||
| -rw-r--r-- | makima/src/db/repository.rs | 92 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 83 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_supervisor.rs | 155 |
9 files changed, 607 insertions, 95 deletions
diff --git a/makima/frontend/src/components/contracts/AutopilotPanel.tsx b/makima/frontend/src/components/contracts/AutopilotPanel.tsx new file mode 100644 index 0000000..a8a8e2e --- /dev/null +++ b/makima/frontend/src/components/contracts/AutopilotPanel.tsx @@ -0,0 +1,208 @@ +import { useState, useCallback } from "react"; +import type { ContractWithRelations } from "../../lib/api"; +import { + getSupervisorStatus, + startSupervisor, + stopSupervisor, + resumeSupervisor, + type SupervisorStatus, +} from "../../lib/api"; + +interface AutopilotPanelProps { + contract: ContractWithRelations; + onUpdate: () => void; +} + +const statusConfig: Record< + SupervisorStatus["status"], + { label: string; color: string; bgColor: string } +> = { + not_configured: { + label: "Not Configured", + color: "text-[#555]", + bgColor: "bg-[#555]/10", + }, + pending: { + label: "Ready", + color: "text-yellow-400", + bgColor: "bg-yellow-400/10", + }, + starting: { + label: "Starting...", + color: "text-blue-400", + bgColor: "bg-blue-400/10", + }, + running: { + label: "Running", + color: "text-green-400", + bgColor: "bg-green-400/10", + }, + paused: { + label: "Paused", + color: "text-orange-400", + bgColor: "bg-orange-400/10", + }, + done: { + label: "Completed", + color: "text-blue-400", + bgColor: "bg-blue-400/10", + }, + failed: { + label: "Failed", + color: "text-red-400", + bgColor: "bg-red-400/10", + }, +}; + +export function AutopilotPanel({ contract, onUpdate }: AutopilotPanelProps) { + const [loading, setLoading] = useState(false); + const [error, setError] = useState<string | null>(null); + + const supervisorStatus = getSupervisorStatus(contract); + const config = statusConfig[supervisorStatus.status]; + + const handleStart = useCallback(async () => { + if (!supervisorStatus.supervisorTaskId) return; + + setLoading(true); + setError(null); + + try { + await startSupervisor(supervisorStatus.supervisorTaskId); + onUpdate(); + } catch (e) { + setError(e instanceof Error ? e.message : "Failed to start autopilot"); + } finally { + setLoading(false); + } + }, [supervisorStatus.supervisorTaskId, onUpdate]); + + const handleStop = useCallback(async () => { + if (!supervisorStatus.supervisorTaskId) return; + + setLoading(true); + setError(null); + + try { + await stopSupervisor(supervisorStatus.supervisorTaskId); + onUpdate(); + } catch (e) { + setError(e instanceof Error ? e.message : "Failed to stop autopilot"); + } finally { + setLoading(false); + } + }, [supervisorStatus.supervisorTaskId, onUpdate]); + + const handleResume = useCallback(async () => { + setLoading(true); + setError(null); + + try { + await resumeSupervisor(contract.id, { resumeMode: "continue" }); + // After resuming, we need to start the task + if (supervisorStatus.supervisorTaskId) { + await startSupervisor(supervisorStatus.supervisorTaskId); + } + onUpdate(); + } catch (e) { + setError(e instanceof Error ? e.message : "Failed to resume autopilot"); + } finally { + setLoading(false); + } + }, [contract.id, supervisorStatus.supervisorTaskId, onUpdate]); + + // Don't show panel for task-type contracts (they don't have supervisors) + if (contract.contractType === "task") { + return null; + } + + return ( + <div className="space-y-3"> + <div className="flex items-center justify-between"> + <h3 className="font-mono text-xs text-[#75aafc] uppercase"> + Autopilot Mode + </h3> + <div + className={`px-2 py-1 rounded font-mono text-xs ${config.color} ${config.bgColor}`} + > + {config.label} + </div> + </div> + + <p className="font-mono text-xs text-[#555]"> + {supervisorStatus.status === "not_configured" ? ( + "This contract does not have an autopilot supervisor configured." + ) : supervisorStatus.status === "running" ? ( + "Autopilot is actively working on this contract, spawning tasks and managing progress." + ) : supervisorStatus.status === "pending" ? ( + "Autopilot is ready to start. Click 'Enable' to begin autonomous work." + ) : supervisorStatus.status === "paused" ? ( + "Autopilot is paused. Click 'Resume' to continue work." + ) : supervisorStatus.status === "failed" ? ( + "Autopilot encountered an error. You can resume to retry." + ) : supervisorStatus.status === "done" ? ( + "Autopilot has completed its work on this contract." + ) : ( + "Autopilot is initializing..." + )} + </p> + + {error && ( + <div className="px-3 py-2 bg-red-500/10 border border-red-400/30 font-mono text-xs text-red-400"> + {error} + </div> + )} + + <div className="flex gap-2"> + {supervisorStatus.canStart && ( + <button + onClick={handleStart} + disabled={loading} + className="px-4 py-2 font-mono text-xs text-[#dbe7ff] bg-green-600/20 border border-green-400/50 hover:bg-green-600/30 transition-colors disabled:opacity-50 disabled:cursor-not-allowed" + > + {loading ? "Starting..." : "Enable Autopilot"} + </button> + )} + + {supervisorStatus.canResume && ( + <button + onClick={handleResume} + disabled={loading} + className="px-4 py-2 font-mono text-xs text-[#dbe7ff] bg-blue-600/20 border border-blue-400/50 hover:bg-blue-600/30 transition-colors disabled:opacity-50 disabled:cursor-not-allowed" + > + {loading ? "Resuming..." : "Resume Autopilot"} + </button> + )} + + {supervisorStatus.canStop && ( + <button + onClick={handleStop} + disabled={loading} + className="px-4 py-2 font-mono text-xs text-[#dbe7ff] bg-orange-600/20 border border-orange-400/50 hover:bg-orange-600/30 transition-colors disabled:opacity-50 disabled:cursor-not-allowed" + > + {loading ? "Stopping..." : "Pause Autopilot"} + </button> + )} + </div> + + {/* Show running indicator when active */} + {supervisorStatus.status === "running" && ( + <div className="flex items-center gap-2 pt-2 border-t border-dashed border-[rgba(117,170,252,0.2)]"> + <div className="w-2 h-2 rounded-full bg-green-400 animate-pulse" /> + <span className="font-mono text-xs text-green-400"> + Autopilot is actively working + </span> + </div> + )} + + {supervisorStatus.status === "starting" && ( + <div className="flex items-center gap-2 pt-2 border-t border-dashed border-[rgba(117,170,252,0.2)]"> + <div className="w-2 h-2 rounded-full bg-blue-400 animate-pulse" /> + <span className="font-mono text-xs text-blue-400"> + Initializing autopilot... + </span> + </div> + )} + </div> + ); +} diff --git a/makima/frontend/src/components/contracts/ContractDetail.tsx b/makima/frontend/src/components/contracts/ContractDetail.tsx index cf5f8f2..f93097a 100644 --- a/makima/frontend/src/components/contracts/ContractDetail.tsx +++ b/makima/frontend/src/components/contracts/ContractDetail.tsx @@ -18,6 +18,7 @@ import { PhaseHint } from "./PhaseHint"; import { RepositoryPanel } from "./RepositoryPanel"; import { ContractCliInput } from "./ContractCliInput"; import { PhaseDeliverablesPanel } from "./PhaseDeliverablesPanel"; +import { AutopilotPanel } from "./AutopilotPanel"; import { TaskTree } from "../mesh/TaskTree"; type Tab = "overview" | "repos" | "files" | "tasks"; @@ -225,6 +226,7 @@ export function ContractDetail({ onStatusChange={onStatusChange} onPhaseChange={onPhaseChange} onCreateFile={onCreateFileFromTemplate} + onRefresh={onRefresh} /> )} @@ -276,14 +278,19 @@ function OverviewTab({ onStatusChange, onPhaseChange, onCreateFile, + onRefresh, }: { contract: ContractWithRelations; onStatusChange: (status: ContractStatus) => void; onPhaseChange: (phase: ContractPhase) => void; onCreateFile?: (templateId: string, suggestedName: string) => void; + onRefresh: () => void; }) { return ( <div className="space-y-6"> + {/* Autopilot controls */} + <AutopilotPanel contract={contract} onUpdate={onRefresh} /> + {/* Phase deliverables checklist */} <PhaseDeliverablesPanel contract={contract} diff --git a/makima/frontend/src/lib/api.ts b/makima/frontend/src/lib/api.ts index 9c56f6b..ee04935 100644 --- a/makima/frontend/src/lib/api.ts +++ b/makima/frontend/src/lib/api.ts @@ -2492,3 +2492,117 @@ export async function resumeFromCheckpoint( } return res.json(); } + +// ============================================================================= +// Supervisor/Autopilot Control Functions +// ============================================================================= + +/** + * Start a contract's supervisor task (enable autopilot mode). + * This is a convenience wrapper around startTask. + */ +export async function startSupervisor(supervisorTaskId: string): Promise<Task> { + return startTask(supervisorTaskId); +} + +/** + * Stop a contract's supervisor task (pause autopilot mode). + * This is a convenience wrapper around stopTask. + */ +export async function stopSupervisor(supervisorTaskId: string): Promise<Task> { + return stopTask(supervisorTaskId); +} + +/** Status of the supervisor/autopilot for a contract */ +export interface SupervisorStatus { + supervisorTaskId: string | null; + status: "not_configured" | "pending" | "starting" | "running" | "paused" | "done" | "failed"; + daemonId: string | null; + canStart: boolean; + canStop: boolean; + canResume: boolean; +} + +/** + * Get the supervisor status for a contract. + */ +export function getSupervisorStatus( + contract: ContractWithRelations +): SupervisorStatus { + const supervisorTaskId = contract.supervisorTaskId; + + if (!supervisorTaskId) { + return { + supervisorTaskId: null, + status: "not_configured", + daemonId: null, + canStart: false, + canStop: false, + canResume: false, + }; + } + + // Find the supervisor task in the contract's tasks + const supervisorTask = contract.tasks.find( + (t) => t.id === supervisorTaskId && t.isSupervisor + ); + + if (!supervisorTask) { + return { + supervisorTaskId, + status: "pending", + daemonId: null, + canStart: true, + canStop: false, + canResume: false, + }; + } + + // Map task status to supervisor status + let status: SupervisorStatus["status"]; + let canStart = false; + let canStop = false; + let canResume = false; + + switch (supervisorTask.status) { + case "pending": + status = "pending"; + canStart = true; + break; + case "initializing": + case "starting": + status = "starting"; + canStop = true; + break; + case "running": + status = "running"; + canStop = true; + break; + case "paused": + case "blocked": + status = "paused"; + canResume = true; + canStop = true; + break; + case "done": + case "merged": + status = "done"; + break; + case "failed": + status = "failed"; + canResume = true; + break; + default: + status = "pending"; + canStart = true; + } + + return { + supervisorTaskId, + status, + daemonId: null, // Task summary doesn't have daemon_id, would need full task + canStart, + canStop, + canResume, + }; +} diff --git a/makima/frontend/tsconfig.tsbuildinfo b/makima/frontend/tsconfig.tsbuildinfo index 33deafa..a3ef773 100644 --- a/makima/frontend/tsconfig.tsbuildinfo +++ b/makima/frontend/tsconfig.tsbuildinfo @@ -1 +1 @@ -{"root":["./src/main.tsx","./src/vite-env.d.ts","./src/components/gridoverlay.tsx","./src/components/japanesehovertext.tsx","./src/components/logo.tsx","./src/components/masthead.tsx","./src/components/navstrip.tsx","./src/components/protectedroute.tsx","./src/components/rewritelink.tsx","./src/components/simplemarkdown.tsx","./src/components/supervisorquestionnotification.tsx","./src/components/charts/chartrenderer.tsx","./src/components/contracts/contractcliinput.tsx","./src/components/contracts/contractdetail.tsx","./src/components/contracts/contractlist.tsx","./src/components/contracts/phasebadge.tsx","./src/components/contracts/phasedeliverablespanel.tsx","./src/components/contracts/phasehint.tsx","./src/components/contracts/phaseprogressbar.tsx","./src/components/contracts/quickactionbuttons.tsx","./src/components/contracts/repositorypanel.tsx","./src/components/contracts/taskderivationpreview.tsx","./src/components/files/bodyrenderer.tsx","./src/components/files/cliinput.tsx","./src/components/files/conflictnotification.tsx","./src/components/files/elementcontextmenu.tsx","./src/components/files/filedetail.tsx","./src/components/files/filelist.tsx","./src/components/files/reposyncindicator.tsx","./src/components/files/updatenotification.tsx","./src/components/files/versionhistorydropdown.tsx","./src/components/history/checkpointcard.tsx","./src/components/history/checkpointlist.tsx","./src/components/history/conversationmessage.tsx","./src/components/history/conversationview.tsx","./src/components/history/historyfilters.tsx","./src/components/history/resumecontrols.tsx","./src/components/history/timelineeventcard.tsx","./src/components/history/timelinelist.tsx","./src/components/history/index.ts","./src/components/listen/contractpickermodal.tsx","./src/components/listen/controlpanel.tsx","./src/components/listen/speakerpanel.tsx","./src/components/listen/transcriptanalysispanel.tsx","./src/components/listen/transcriptpanel.tsx","./src/components/mesh/directoryinput.tsx","./src/components/mesh/inlinesubtaskeditor.tsx","./src/components/mesh/mergeconflictresolver.tsx","./src/components/mesh/overlaydiffviewer.tsx","./src/components/mesh/prpreview.tsx","./src/components/mesh/subtasktree.tsx","./src/components/mesh/taskdetail.tsx","./src/components/mesh/tasklist.tsx","./src/components/mesh/taskoutput.tsx","./src/components/mesh/tasktree.tsx","./src/components/mesh/unifiedmeshchatinput.tsx","./src/components/workflow/phasecolumn.tsx","./src/components/workflow/workflowboard.tsx","./src/components/workflow/workflowcontractcard.tsx","./src/contexts/authcontext.tsx","./src/contexts/supervisorquestionscontext.tsx","./src/hooks/usecontracts.ts","./src/hooks/usefilesubscription.ts","./src/hooks/usefiles.ts","./src/hooks/usemeshchathistory.ts","./src/hooks/usemicrophone.ts","./src/hooks/usetasksubscription.ts","./src/hooks/usetasks.ts","./src/hooks/usetextscramble.ts","./src/hooks/useversionhistory.ts","./src/hooks/usewebsocket.ts","./src/lib/api.ts","./src/lib/listenapi.ts","./src/lib/markdown.ts","./src/lib/supabase.ts","./src/routes/_index.tsx","./src/routes/contracts.tsx","./src/routes/files.tsx","./src/routes/history.tsx","./src/routes/listen.tsx","./src/routes/login.tsx","./src/routes/mesh.tsx","./src/routes/settings.tsx","./src/routes/workflow.tsx","./src/types/messages.ts"],"version":"5.9.3"}
\ No newline at end of file +{"root":["./src/main.tsx","./src/vite-env.d.ts","./src/components/gridoverlay.tsx","./src/components/japanesehovertext.tsx","./src/components/logo.tsx","./src/components/masthead.tsx","./src/components/navstrip.tsx","./src/components/protectedroute.tsx","./src/components/rewritelink.tsx","./src/components/simplemarkdown.tsx","./src/components/supervisorquestionnotification.tsx","./src/components/charts/chartrenderer.tsx","./src/components/contracts/autopilotpanel.tsx","./src/components/contracts/contractcliinput.tsx","./src/components/contracts/contractdetail.tsx","./src/components/contracts/contractlist.tsx","./src/components/contracts/phasebadge.tsx","./src/components/contracts/phasedeliverablespanel.tsx","./src/components/contracts/phasehint.tsx","./src/components/contracts/phaseprogressbar.tsx","./src/components/contracts/quickactionbuttons.tsx","./src/components/contracts/repositorypanel.tsx","./src/components/contracts/taskderivationpreview.tsx","./src/components/files/bodyrenderer.tsx","./src/components/files/cliinput.tsx","./src/components/files/conflictnotification.tsx","./src/components/files/elementcontextmenu.tsx","./src/components/files/filedetail.tsx","./src/components/files/filelist.tsx","./src/components/files/reposyncindicator.tsx","./src/components/files/updatenotification.tsx","./src/components/files/versionhistorydropdown.tsx","./src/components/history/checkpointcard.tsx","./src/components/history/checkpointlist.tsx","./src/components/history/conversationmessage.tsx","./src/components/history/conversationview.tsx","./src/components/history/historyfilters.tsx","./src/components/history/resumecontrols.tsx","./src/components/history/timelineeventcard.tsx","./src/components/history/timelinelist.tsx","./src/components/history/index.ts","./src/components/listen/contractpickermodal.tsx","./src/components/listen/controlpanel.tsx","./src/components/listen/speakerpanel.tsx","./src/components/listen/transcriptanalysispanel.tsx","./src/components/listen/transcriptpanel.tsx","./src/components/mesh/directoryinput.tsx","./src/components/mesh/inlinesubtaskeditor.tsx","./src/components/mesh/mergeconflictresolver.tsx","./src/components/mesh/overlaydiffviewer.tsx","./src/components/mesh/prpreview.tsx","./src/components/mesh/subtasktree.tsx","./src/components/mesh/taskdetail.tsx","./src/components/mesh/tasklist.tsx","./src/components/mesh/taskoutput.tsx","./src/components/mesh/tasktree.tsx","./src/components/mesh/unifiedmeshchatinput.tsx","./src/components/workflow/phasecolumn.tsx","./src/components/workflow/workflowboard.tsx","./src/components/workflow/workflowcontractcard.tsx","./src/contexts/authcontext.tsx","./src/contexts/supervisorquestionscontext.tsx","./src/hooks/usecontracts.ts","./src/hooks/usefilesubscription.ts","./src/hooks/usefiles.ts","./src/hooks/usemeshchathistory.ts","./src/hooks/usemicrophone.ts","./src/hooks/usetasksubscription.ts","./src/hooks/usetasks.ts","./src/hooks/usetextscramble.ts","./src/hooks/useversionhistory.ts","./src/hooks/usewebsocket.ts","./src/lib/api.ts","./src/lib/listenapi.ts","./src/lib/markdown.ts","./src/lib/supabase.ts","./src/routes/_index.tsx","./src/routes/contracts.tsx","./src/routes/files.tsx","./src/routes/history.tsx","./src/routes/listen.tsx","./src/routes/login.tsx","./src/routes/mesh.tsx","./src/routes/settings.tsx","./src/routes/workflow.tsx","./src/types/messages.ts"],"version":"5.9.3"}
\ No newline at end of file diff --git a/makima/migrations/20250119000000_add_task_retry_tracking.sql b/makima/migrations/20250119000000_add_task_retry_tracking.sql new file mode 100644 index 0000000..4d8eea6 --- /dev/null +++ b/makima/migrations/20250119000000_add_task_retry_tracking.sql @@ -0,0 +1,22 @@ +-- Add retry tracking columns to tasks table for daemon failover + +-- Number of times this task has been retried after daemon failure +ALTER TABLE tasks ADD COLUMN IF NOT EXISTS retry_count INTEGER NOT NULL DEFAULT 0; + +-- Maximum retry attempts before marking as permanently failed +ALTER TABLE tasks ADD COLUMN IF NOT EXISTS max_retries INTEGER NOT NULL DEFAULT 3; + +-- Array of daemon IDs that have failed this task (excluded from retry selection) +ALTER TABLE tasks ADD COLUMN IF NOT EXISTS failed_daemon_ids UUID[] DEFAULT '{}'; + +-- When the task was last interrupted due to daemon disconnect +ALTER TABLE tasks ADD COLUMN IF NOT EXISTS interrupted_at TIMESTAMPTZ; + +-- Index for efficient pending task queries with retry consideration +CREATE INDEX IF NOT EXISTS idx_tasks_status_retry ON tasks(status, retry_count) + WHERE status = 'pending'; + +COMMENT ON COLUMN tasks.retry_count IS 'Number of times this task has been retried after daemon failure'; +COMMENT ON COLUMN tasks.max_retries IS 'Maximum retry attempts before marking as permanently failed'; +COMMENT ON COLUMN tasks.failed_daemon_ids IS 'Array of daemon IDs that have failed this task (excluded from retry)'; +COMMENT ON COLUMN tasks.interrupted_at IS 'When the task was last interrupted due to daemon disconnect'; diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index 0e1303c..72ba6f2 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -6,6 +6,11 @@ use sqlx::FromRow; use utoipa::ToSchema; use uuid::Uuid; +/// Default max retries for task daemon failover (3 attempts) +fn default_max_retries() -> i32 { + 3 +} + /// Flexible datetime deserialization module. /// Accepts both date-only ("2026-01-15") and full ISO 8601 datetime ("2026-01-15T00:00:00Z") formats. pub mod flexible_datetime { @@ -500,6 +505,20 @@ pub struct Task { /// Files to copy from parent task's worktree when starting. #[serde(skip_serializing_if = "Option::is_none")] pub copy_files: Option<serde_json::Value>, + + // Retry tracking for daemon failover + /// Number of times this task has been retried after daemon failure + #[serde(default)] + pub retry_count: i32, + /// Maximum retry attempts before marking as permanently failed + #[serde(default = "default_max_retries")] + pub max_retries: i32, + /// Array of daemon IDs that have failed this task (excluded from retry) + #[serde(skip_serializing_if = "Option::is_none")] + pub failed_daemon_ids: Option<Vec<Uuid>>, + /// When the task was last interrupted due to daemon disconnect + #[serde(skip_serializing_if = "Option::is_none")] + pub interrupted_at: Option<DateTime<Utc>>, } impl Task { diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 2b069d5..43b8e3a 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -790,6 +790,8 @@ pub async fn list_tasks_by_contract( } /// Get pending tasks for a contract (non-supervisor tasks only). +/// Includes tasks that were interrupted (retry candidates). +/// Prioritizes interrupted tasks and excludes those that exceeded max_retries. pub async fn get_pending_tasks_for_contract( pool: &PgPool, contract_id: Uuid, @@ -801,7 +803,11 @@ pub async fn get_pending_tasks_for_contract( WHERE contract_id = $1 AND owner_id = $2 AND status = 'pending' AND is_supervisor = false - ORDER BY priority DESC, created_at ASC + AND retry_count < max_retries + ORDER BY + interrupted_at DESC NULLS LAST, + priority DESC, + created_at ASC "#, ) .bind(contract_id) @@ -810,6 +816,61 @@ pub async fn get_pending_tasks_for_contract( .await } +/// Mark a task as pending for retry after daemon failure. +/// Increments retry count and adds the failed daemon to exclusion list. +pub async fn mark_task_for_retry( + pool: &PgPool, + task_id: Uuid, + failed_daemon_id: Uuid, +) -> Result<Option<Task>, sqlx::Error> { + sqlx::query_as::<_, Task>( + r#" + UPDATE tasks + SET status = 'pending', + daemon_id = NULL, + retry_count = retry_count + 1, + failed_daemon_ids = array_append(COALESCE(failed_daemon_ids, '{}'), $2), + last_active_daemon_id = $2, + interrupted_at = NOW(), + error_message = 'Daemon disconnected, awaiting retry', + updated_at = NOW() + WHERE id = $1 + AND retry_count < max_retries + RETURNING * + "#, + ) + .bind(task_id) + .bind(failed_daemon_id) + .fetch_optional(pool) + .await +} + +/// Mark a task as permanently failed (exceeded retry limit). +pub async fn mark_task_permanently_failed( + pool: &PgPool, + task_id: Uuid, + failed_daemon_id: Uuid, +) -> Result<(), sqlx::Error> { + sqlx::query( + r#" + UPDATE tasks + SET status = 'failed', + daemon_id = NULL, + retry_count = retry_count + 1, + failed_daemon_ids = array_append(COALESCE(failed_daemon_ids, '{}'), $2), + last_active_daemon_id = $2, + error_message = 'Task failed: exceeded maximum retry attempts', + updated_at = NOW() + WHERE id = $1 + "#, + ) + .bind(task_id) + .bind(failed_daemon_id) + .execute(pool) + .await?; + Ok(()) +} + /// Update a task by ID with optimistic locking. pub async fn update_task( pool: &PgPool, @@ -3008,6 +3069,35 @@ pub async fn get_available_daemons( .await } +/// Get daemons with capacity info for selection, excluding specified daemon IDs. +/// Used for task retry to avoid reassigning to daemons that have already failed. +pub async fn get_available_daemons_excluding( + pool: &PgPool, + owner_id: Uuid, + exclude_daemon_ids: &[Uuid], +) -> Result<Vec<DaemonWithCapacity>, sqlx::Error> { + sqlx::query_as::<_, DaemonWithCapacity>( + r#" + SELECT id, owner_id, connection_id, hostname, machine_id, + max_concurrent_tasks, current_task_count, + capacity_score, task_queue_length, supports_migration, + status, last_heartbeat_at, connected_at + FROM daemons + WHERE owner_id = $1 + AND status = 'connected' + AND id != ALL($2) + ORDER BY + COALESCE(capacity_score, 100) DESC, + (max_concurrent_tasks - current_task_count) DESC, + COALESCE(task_queue_length, 0) ASC + "#, + ) + .bind(owner_id) + .bind(exclude_daemon_ids) + .fetch_all(pool) + .await +} + /// Create a daemon task assignment. pub async fn create_daemon_task_assignment( pool: &PgPool, diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 4bcb5cd..beb676e 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -20,6 +20,7 @@ use sqlx::Row; use tokio::sync::mpsc; use uuid::Uuid; +use crate::db::models::Task; use crate::db::repository; use crate::server::auth::{hash_api_key, API_KEY_HEADER}; use crate::server::messages::ApiError; @@ -1334,42 +1335,86 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re ); } - // Find tasks assigned to this daemon that are still active - if let Err(e) = clear_daemon_from_tasks(&pool, daemon_uuid).await { + // Find tasks assigned to this daemon and mark for retry or fail permanently + if let Err(e) = handle_daemon_disconnect_tasks(&pool, daemon_uuid).await { tracing::error!( daemon_id = %daemon_uuid, error = %e, - "Failed to clear daemon from tasks on disconnect" + "Failed to handle daemon disconnect for tasks" ); } }); } } -/// Clear daemon_id from tasks when daemon disconnects -async fn clear_daemon_from_tasks(pool: &sqlx::PgPool, daemon_id: Uuid) -> Result<(), sqlx::Error> { - // Update tasks that were running on this daemon to failed state - let result = sqlx::query( +/// Handle tasks when daemon disconnects - mark for retry or fail permanently. +async fn handle_daemon_disconnect_tasks(pool: &sqlx::PgPool, daemon_id: Uuid) -> Result<(), sqlx::Error> { + // Get all active tasks on this daemon + let active_tasks: Vec<Task> = sqlx::query_as( r#" - UPDATE tasks - SET daemon_id = NULL, - status = 'failed', - error_message = 'Daemon disconnected', - updated_at = NOW() + SELECT * FROM tasks WHERE daemon_id = $1 AND status IN ('starting', 'running', 'initializing') "#, ) .bind(daemon_id) - .execute(pool) + .fetch_all(pool) .await?; - if result.rows_affected() > 0 { - tracing::warn!( - daemon_id = %daemon_id, - tasks_affected = result.rows_affected(), - "Marked tasks as failed due to daemon disconnect" - ); + if active_tasks.is_empty() { + return Ok(()); + } + + tracing::info!( + daemon_id = %daemon_id, + task_count = active_tasks.len(), + "Processing tasks for disconnected daemon" + ); + + for task in active_tasks { + if task.retry_count < task.max_retries { + // Mark for retry + match repository::mark_task_for_retry(pool, task.id, daemon_id).await { + Ok(Some(updated_task)) => { + tracing::info!( + task_id = %task.id, + task_name = %task.name, + retry_count = updated_task.retry_count, + max_retries = updated_task.max_retries, + "Task marked for retry after daemon disconnect" + ); + } + Ok(None) => { + tracing::warn!( + task_id = %task.id, + "Task not found or already at max retries" + ); + } + Err(e) => { + tracing::error!( + task_id = %task.id, + error = %e, + "Failed to mark task for retry" + ); + } + } + } else { + // Exceeded retries, mark as permanently failed + if let Err(e) = repository::mark_task_permanently_failed(pool, task.id, daemon_id).await { + tracing::error!( + task_id = %task.id, + error = %e, + "Failed to mark task as permanently failed" + ); + } else { + tracing::warn!( + task_id = %task.id, + task_name = %task.name, + retry_count = task.retry_count + 1, + "Task permanently failed: exceeded maximum retries" + ); + } + } } Ok(()) diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index 1014fdc..754d086 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -256,6 +256,7 @@ async fn verify_supervisor_auth( /// Try to start a pending task on an available daemon. /// Returns Ok(Some(task)) if a task was started, Ok(None) if no tasks could be started. +/// For retried tasks, excludes daemons that previously failed the task. async fn try_start_pending_task( state: &SharedState, contract_id: Uuid, @@ -263,7 +264,7 @@ async fn try_start_pending_task( ) -> Result<Option<Task>, String> { let pool = state.db_pool.as_ref().ok_or("Database not configured")?; - // Get pending tasks for this contract + // Get pending tasks for this contract (includes interrupted tasks awaiting retry) let pending_tasks = repository::get_pending_tasks_for_contract(pool, contract_id, owner_id) .await .map_err(|e| format!("Failed to get pending tasks: {}", e))?; @@ -272,89 +273,95 @@ async fn try_start_pending_task( return Ok(None); } - // Get available daemons with capacity - let daemons = repository::get_available_daemons(pool, owner_id) - .await - .map_err(|e| format!("Failed to get available daemons: {}", e))?; - - // Find a daemon with capacity - let available_daemon = daemons.iter().find(|d| { - d.current_task_count < d.max_concurrent_tasks - && state.daemon_connections.contains_key(&d.connection_id) - }); + // Try each pending task until we find one we can start + for task in &pending_tasks { + // Get excluded daemon IDs for this task (daemons that have already failed it) + let exclude_ids: Vec<Uuid> = task.failed_daemon_ids.clone().unwrap_or_default(); - let daemon = match available_daemon { - Some(d) => d, - None => return Ok(None), // No daemon with capacity - }; + // Get available daemons excluding failed ones for this task + let daemons = repository::get_available_daemons_excluding(pool, owner_id, &exclude_ids) + .await + .map_err(|e| format!("Failed to get available daemons: {}", e))?; - // Try to start the first pending task - let task = &pending_tasks[0]; + // Find a daemon with capacity + let available_daemon = daemons.iter().find(|d| { + d.current_task_count < d.max_concurrent_tasks + && state.daemon_connections.contains_key(&d.connection_id) + }); - // Get repo URL from task or contract - let repo_url = if let Some(url) = &task.repository_url { - Some(url.clone()) - } else { - match repository::list_contract_repositories(pool, contract_id).await { - Ok(repos) => repos - .iter() - .find(|r| r.is_primary) - .or(repos.first()) - .and_then(|r| r.repository_url.clone().or_else(|| r.local_path.clone())), - Err(_) => None, - } - }; + let daemon = match available_daemon { + Some(d) => d, + None => continue, // Try next task + }; - // Update task with daemon assignment - let update_req = UpdateTaskRequest { - status: Some("starting".to_string()), - daemon_id: Some(daemon.id), - version: Some(task.version), - ..Default::default() - }; + // Get repo URL from task or contract + let repo_url = if let Some(url) = &task.repository_url { + Some(url.clone()) + } else { + match repository::list_contract_repositories(pool, contract_id).await { + Ok(repos) => repos + .iter() + .find(|r| r.is_primary) + .or(repos.first()) + .and_then(|r| r.repository_url.clone().or_else(|| r.local_path.clone())), + Err(_) => None, + } + }; - let updated_task = match repository::update_task_for_owner(pool, task.id, owner_id, update_req).await { - Ok(Some(t)) => t, - Ok(None) => return Ok(None), - Err(e) => { - tracing::warn!(task_id = %task.id, error = %e, "Failed to update task for daemon assignment"); - return Ok(None); - } - }; + // Update task with daemon assignment + let update_req = UpdateTaskRequest { + status: Some("starting".to_string()), + daemon_id: Some(daemon.id), + version: Some(task.version), + ..Default::default() + }; - // Send spawn command - let cmd = DaemonCommand::SpawnTask { - task_id: updated_task.id, - task_name: updated_task.name.clone(), - plan: updated_task.plan.clone(), - repo_url, - base_branch: updated_task.base_branch.clone(), - target_branch: updated_task.target_branch.clone(), - parent_task_id: updated_task.parent_task_id, - depth: updated_task.depth, - is_orchestrator: false, - target_repo_path: updated_task.target_repo_path.clone(), - completion_action: updated_task.completion_action.clone(), - continue_from_task_id: updated_task.continue_from_task_id, - copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), - contract_id: updated_task.contract_id, - is_supervisor: false, - }; + let updated_task = match repository::update_task_for_owner(pool, task.id, owner_id, update_req).await { + Ok(Some(t)) => t, + Ok(None) => continue, // Task was modified concurrently, try next + Err(e) => { + tracing::warn!(task_id = %task.id, error = %e, "Failed to update task for daemon assignment"); + continue; // Try next task + } + }; - if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { - tracing::warn!(error = %e, daemon_id = %daemon.id, task_id = %task.id, "Failed to send spawn command"); - // Rollback - let rollback_req = UpdateTaskRequest { - status: Some("pending".to_string()), - clear_daemon_id: true, - ..Default::default() + // Send spawn command + let cmd = DaemonCommand::SpawnTask { + task_id: updated_task.id, + task_name: updated_task.name.clone(), + plan: updated_task.plan.clone(), + repo_url, + base_branch: updated_task.base_branch.clone(), + target_branch: updated_task.target_branch.clone(), + parent_task_id: updated_task.parent_task_id, + depth: updated_task.depth, + is_orchestrator: false, + target_repo_path: updated_task.target_repo_path.clone(), + completion_action: updated_task.completion_action.clone(), + continue_from_task_id: updated_task.continue_from_task_id, + copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), + contract_id: updated_task.contract_id, + is_supervisor: false, }; - let _ = repository::update_task_for_owner(pool, task.id, owner_id, rollback_req).await; - return Ok(None); + + if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { + tracing::warn!(error = %e, daemon_id = %daemon.id, task_id = %task.id, "Failed to send spawn command"); + // Rollback + let rollback_req = UpdateTaskRequest { + status: Some("pending".to_string()), + clear_daemon_id: true, + ..Default::default() + }; + let _ = repository::update_task_for_owner(pool, task.id, owner_id, rollback_req).await; + continue; // Try next task + } + + tracing::info!(task_id = %task.id, daemon_id = %daemon.id, "Started pending task from wait loop"); + return Ok(Some(updated_task)); } - tracing::info!(task_id = %task.id, daemon_id = %daemon.id, "Started pending task from wait loop"); - Ok(Some(updated_task)) + // No tasks could be started + Ok(None) } // ============================================================================= |
