summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-16 19:50:27 +0000
committersoryu <soryu@soryu.co>2026-01-17 05:38:07 +0000
commit75d9644d44ba998a32ed14c072e883a75145ab72 (patch)
treeb82dee94632fd40764a92a9b11da24ef21600ed5
parent6b94b5895ed27e3aef052a1843fb3f334397d1b4 (diff)
downloadsoryu-75d9644d44ba998a32ed14c072e883a75145ab72.tar.gz
soryu-75d9644d44ba998a32ed14c072e883a75145ab72.zip
Add autopilot panel and retry system
-rw-r--r--makima/frontend/src/components/contracts/AutopilotPanel.tsx208
-rw-r--r--makima/frontend/src/components/contracts/ContractDetail.tsx7
-rw-r--r--makima/frontend/src/lib/api.ts114
-rw-r--r--makima/frontend/tsconfig.tsbuildinfo2
-rw-r--r--makima/migrations/20250119000000_add_task_retry_tracking.sql22
-rw-r--r--makima/src/db/models.rs19
-rw-r--r--makima/src/db/repository.rs92
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs83
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs155
9 files changed, 607 insertions, 95 deletions
diff --git a/makima/frontend/src/components/contracts/AutopilotPanel.tsx b/makima/frontend/src/components/contracts/AutopilotPanel.tsx
new file mode 100644
index 0000000..a8a8e2e
--- /dev/null
+++ b/makima/frontend/src/components/contracts/AutopilotPanel.tsx
@@ -0,0 +1,208 @@
+import { useState, useCallback } from "react";
+import type { ContractWithRelations } from "../../lib/api";
+import {
+ getSupervisorStatus,
+ startSupervisor,
+ stopSupervisor,
+ resumeSupervisor,
+ type SupervisorStatus,
+} from "../../lib/api";
+
+interface AutopilotPanelProps {
+ contract: ContractWithRelations;
+ onUpdate: () => void;
+}
+
+const statusConfig: Record<
+ SupervisorStatus["status"],
+ { label: string; color: string; bgColor: string }
+> = {
+ not_configured: {
+ label: "Not Configured",
+ color: "text-[#555]",
+ bgColor: "bg-[#555]/10",
+ },
+ pending: {
+ label: "Ready",
+ color: "text-yellow-400",
+ bgColor: "bg-yellow-400/10",
+ },
+ starting: {
+ label: "Starting...",
+ color: "text-blue-400",
+ bgColor: "bg-blue-400/10",
+ },
+ running: {
+ label: "Running",
+ color: "text-green-400",
+ bgColor: "bg-green-400/10",
+ },
+ paused: {
+ label: "Paused",
+ color: "text-orange-400",
+ bgColor: "bg-orange-400/10",
+ },
+ done: {
+ label: "Completed",
+ color: "text-blue-400",
+ bgColor: "bg-blue-400/10",
+ },
+ failed: {
+ label: "Failed",
+ color: "text-red-400",
+ bgColor: "bg-red-400/10",
+ },
+};
+
+export function AutopilotPanel({ contract, onUpdate }: AutopilotPanelProps) {
+ const [loading, setLoading] = useState(false);
+ const [error, setError] = useState<string | null>(null);
+
+ const supervisorStatus = getSupervisorStatus(contract);
+ const config = statusConfig[supervisorStatus.status];
+
+ const handleStart = useCallback(async () => {
+ if (!supervisorStatus.supervisorTaskId) return;
+
+ setLoading(true);
+ setError(null);
+
+ try {
+ await startSupervisor(supervisorStatus.supervisorTaskId);
+ onUpdate();
+ } catch (e) {
+ setError(e instanceof Error ? e.message : "Failed to start autopilot");
+ } finally {
+ setLoading(false);
+ }
+ }, [supervisorStatus.supervisorTaskId, onUpdate]);
+
+ const handleStop = useCallback(async () => {
+ if (!supervisorStatus.supervisorTaskId) return;
+
+ setLoading(true);
+ setError(null);
+
+ try {
+ await stopSupervisor(supervisorStatus.supervisorTaskId);
+ onUpdate();
+ } catch (e) {
+ setError(e instanceof Error ? e.message : "Failed to stop autopilot");
+ } finally {
+ setLoading(false);
+ }
+ }, [supervisorStatus.supervisorTaskId, onUpdate]);
+
+ const handleResume = useCallback(async () => {
+ setLoading(true);
+ setError(null);
+
+ try {
+ await resumeSupervisor(contract.id, { resumeMode: "continue" });
+ // After resuming, we need to start the task
+ if (supervisorStatus.supervisorTaskId) {
+ await startSupervisor(supervisorStatus.supervisorTaskId);
+ }
+ onUpdate();
+ } catch (e) {
+ setError(e instanceof Error ? e.message : "Failed to resume autopilot");
+ } finally {
+ setLoading(false);
+ }
+ }, [contract.id, supervisorStatus.supervisorTaskId, onUpdate]);
+
+ // Don't show panel for task-type contracts (they don't have supervisors)
+ if (contract.contractType === "task") {
+ return null;
+ }
+
+ return (
+ <div className="space-y-3">
+ <div className="flex items-center justify-between">
+ <h3 className="font-mono text-xs text-[#75aafc] uppercase">
+ Autopilot Mode
+ </h3>
+ <div
+ className={`px-2 py-1 rounded font-mono text-xs ${config.color} ${config.bgColor}`}
+ >
+ {config.label}
+ </div>
+ </div>
+
+ <p className="font-mono text-xs text-[#555]">
+ {supervisorStatus.status === "not_configured" ? (
+ "This contract does not have an autopilot supervisor configured."
+ ) : supervisorStatus.status === "running" ? (
+ "Autopilot is actively working on this contract, spawning tasks and managing progress."
+ ) : supervisorStatus.status === "pending" ? (
+ "Autopilot is ready to start. Click 'Enable' to begin autonomous work."
+ ) : supervisorStatus.status === "paused" ? (
+ "Autopilot is paused. Click 'Resume' to continue work."
+ ) : supervisorStatus.status === "failed" ? (
+ "Autopilot encountered an error. You can resume to retry."
+ ) : supervisorStatus.status === "done" ? (
+ "Autopilot has completed its work on this contract."
+ ) : (
+ "Autopilot is initializing..."
+ )}
+ </p>
+
+ {error && (
+ <div className="px-3 py-2 bg-red-500/10 border border-red-400/30 font-mono text-xs text-red-400">
+ {error}
+ </div>
+ )}
+
+ <div className="flex gap-2">
+ {supervisorStatus.canStart && (
+ <button
+ onClick={handleStart}
+ disabled={loading}
+ className="px-4 py-2 font-mono text-xs text-[#dbe7ff] bg-green-600/20 border border-green-400/50 hover:bg-green-600/30 transition-colors disabled:opacity-50 disabled:cursor-not-allowed"
+ >
+ {loading ? "Starting..." : "Enable Autopilot"}
+ </button>
+ )}
+
+ {supervisorStatus.canResume && (
+ <button
+ onClick={handleResume}
+ disabled={loading}
+ className="px-4 py-2 font-mono text-xs text-[#dbe7ff] bg-blue-600/20 border border-blue-400/50 hover:bg-blue-600/30 transition-colors disabled:opacity-50 disabled:cursor-not-allowed"
+ >
+ {loading ? "Resuming..." : "Resume Autopilot"}
+ </button>
+ )}
+
+ {supervisorStatus.canStop && (
+ <button
+ onClick={handleStop}
+ disabled={loading}
+ className="px-4 py-2 font-mono text-xs text-[#dbe7ff] bg-orange-600/20 border border-orange-400/50 hover:bg-orange-600/30 transition-colors disabled:opacity-50 disabled:cursor-not-allowed"
+ >
+ {loading ? "Stopping..." : "Pause Autopilot"}
+ </button>
+ )}
+ </div>
+
+ {/* Show running indicator when active */}
+ {supervisorStatus.status === "running" && (
+ <div className="flex items-center gap-2 pt-2 border-t border-dashed border-[rgba(117,170,252,0.2)]">
+ <div className="w-2 h-2 rounded-full bg-green-400 animate-pulse" />
+ <span className="font-mono text-xs text-green-400">
+ Autopilot is actively working
+ </span>
+ </div>
+ )}
+
+ {supervisorStatus.status === "starting" && (
+ <div className="flex items-center gap-2 pt-2 border-t border-dashed border-[rgba(117,170,252,0.2)]">
+ <div className="w-2 h-2 rounded-full bg-blue-400 animate-pulse" />
+ <span className="font-mono text-xs text-blue-400">
+ Initializing autopilot...
+ </span>
+ </div>
+ )}
+ </div>
+ );
+}
diff --git a/makima/frontend/src/components/contracts/ContractDetail.tsx b/makima/frontend/src/components/contracts/ContractDetail.tsx
index cf5f8f2..f93097a 100644
--- a/makima/frontend/src/components/contracts/ContractDetail.tsx
+++ b/makima/frontend/src/components/contracts/ContractDetail.tsx
@@ -18,6 +18,7 @@ import { PhaseHint } from "./PhaseHint";
import { RepositoryPanel } from "./RepositoryPanel";
import { ContractCliInput } from "./ContractCliInput";
import { PhaseDeliverablesPanel } from "./PhaseDeliverablesPanel";
+import { AutopilotPanel } from "./AutopilotPanel";
import { TaskTree } from "../mesh/TaskTree";
type Tab = "overview" | "repos" | "files" | "tasks";
@@ -225,6 +226,7 @@ export function ContractDetail({
onStatusChange={onStatusChange}
onPhaseChange={onPhaseChange}
onCreateFile={onCreateFileFromTemplate}
+ onRefresh={onRefresh}
/>
)}
@@ -276,14 +278,19 @@ function OverviewTab({
onStatusChange,
onPhaseChange,
onCreateFile,
+ onRefresh,
}: {
contract: ContractWithRelations;
onStatusChange: (status: ContractStatus) => void;
onPhaseChange: (phase: ContractPhase) => void;
onCreateFile?: (templateId: string, suggestedName: string) => void;
+ onRefresh: () => void;
}) {
return (
<div className="space-y-6">
+ {/* Autopilot controls */}
+ <AutopilotPanel contract={contract} onUpdate={onRefresh} />
+
{/* Phase deliverables checklist */}
<PhaseDeliverablesPanel
contract={contract}
diff --git a/makima/frontend/src/lib/api.ts b/makima/frontend/src/lib/api.ts
index 9c56f6b..ee04935 100644
--- a/makima/frontend/src/lib/api.ts
+++ b/makima/frontend/src/lib/api.ts
@@ -2492,3 +2492,117 @@ export async function resumeFromCheckpoint(
}
return res.json();
}
+
+// =============================================================================
+// Supervisor/Autopilot Control Functions
+// =============================================================================
+
+/**
+ * Start a contract's supervisor task (enable autopilot mode).
+ * This is a convenience wrapper around startTask.
+ */
+export async function startSupervisor(supervisorTaskId: string): Promise<Task> {
+ return startTask(supervisorTaskId);
+}
+
+/**
+ * Stop a contract's supervisor task (pause autopilot mode).
+ * This is a convenience wrapper around stopTask.
+ */
+export async function stopSupervisor(supervisorTaskId: string): Promise<Task> {
+ return stopTask(supervisorTaskId);
+}
+
+/** Status of the supervisor/autopilot for a contract */
+export interface SupervisorStatus {
+ supervisorTaskId: string | null;
+ status: "not_configured" | "pending" | "starting" | "running" | "paused" | "done" | "failed";
+ daemonId: string | null;
+ canStart: boolean;
+ canStop: boolean;
+ canResume: boolean;
+}
+
+/**
+ * Get the supervisor status for a contract.
+ */
+export function getSupervisorStatus(
+ contract: ContractWithRelations
+): SupervisorStatus {
+ const supervisorTaskId = contract.supervisorTaskId;
+
+ if (!supervisorTaskId) {
+ return {
+ supervisorTaskId: null,
+ status: "not_configured",
+ daemonId: null,
+ canStart: false,
+ canStop: false,
+ canResume: false,
+ };
+ }
+
+ // Find the supervisor task in the contract's tasks
+ const supervisorTask = contract.tasks.find(
+ (t) => t.id === supervisorTaskId && t.isSupervisor
+ );
+
+ if (!supervisorTask) {
+ return {
+ supervisorTaskId,
+ status: "pending",
+ daemonId: null,
+ canStart: true,
+ canStop: false,
+ canResume: false,
+ };
+ }
+
+ // Map task status to supervisor status
+ let status: SupervisorStatus["status"];
+ let canStart = false;
+ let canStop = false;
+ let canResume = false;
+
+ switch (supervisorTask.status) {
+ case "pending":
+ status = "pending";
+ canStart = true;
+ break;
+ case "initializing":
+ case "starting":
+ status = "starting";
+ canStop = true;
+ break;
+ case "running":
+ status = "running";
+ canStop = true;
+ break;
+ case "paused":
+ case "blocked":
+ status = "paused";
+ canResume = true;
+ canStop = true;
+ break;
+ case "done":
+ case "merged":
+ status = "done";
+ break;
+ case "failed":
+ status = "failed";
+ canResume = true;
+ break;
+ default:
+ status = "pending";
+ canStart = true;
+ }
+
+ return {
+ supervisorTaskId,
+ status,
+ daemonId: null, // Task summary doesn't have daemon_id, would need full task
+ canStart,
+ canStop,
+ canResume,
+ };
+}
diff --git a/makima/frontend/tsconfig.tsbuildinfo b/makima/frontend/tsconfig.tsbuildinfo
index 33deafa..a3ef773 100644
--- a/makima/frontend/tsconfig.tsbuildinfo
+++ b/makima/frontend/tsconfig.tsbuildinfo
@@ -1 +1 @@
-{"root":["./src/main.tsx","./src/vite-env.d.ts","./src/components/gridoverlay.tsx","./src/components/japanesehovertext.tsx","./src/components/logo.tsx","./src/components/masthead.tsx","./src/components/navstrip.tsx","./src/components/protectedroute.tsx","./src/components/rewritelink.tsx","./src/components/simplemarkdown.tsx","./src/components/supervisorquestionnotification.tsx","./src/components/charts/chartrenderer.tsx","./src/components/contracts/contractcliinput.tsx","./src/components/contracts/contractdetail.tsx","./src/components/contracts/contractlist.tsx","./src/components/contracts/phasebadge.tsx","./src/components/contracts/phasedeliverablespanel.tsx","./src/components/contracts/phasehint.tsx","./src/components/contracts/phaseprogressbar.tsx","./src/components/contracts/quickactionbuttons.tsx","./src/components/contracts/repositorypanel.tsx","./src/components/contracts/taskderivationpreview.tsx","./src/components/files/bodyrenderer.tsx","./src/components/files/cliinput.tsx","./src/components/files/conflictnotification.tsx","./src/components/files/elementcontextmenu.tsx","./src/components/files/filedetail.tsx","./src/components/files/filelist.tsx","./src/components/files/reposyncindicator.tsx","./src/components/files/updatenotification.tsx","./src/components/files/versionhistorydropdown.tsx","./src/components/history/checkpointcard.tsx","./src/components/history/checkpointlist.tsx","./src/components/history/conversationmessage.tsx","./src/components/history/conversationview.tsx","./src/components/history/historyfilters.tsx","./src/components/history/resumecontrols.tsx","./src/components/history/timelineeventcard.tsx","./src/components/history/timelinelist.tsx","./src/components/history/index.ts","./src/components/listen/contractpickermodal.tsx","./src/components/listen/controlpanel.tsx","./src/components/listen/speakerpanel.tsx","./src/components/listen/transcriptanalysispanel.tsx","./src/components/listen/transcriptpanel.tsx","./src/components/mesh/directoryinput.tsx","./src/components/mesh/inlinesubtaskeditor.tsx","./src/components/mesh/mergeconflictresolver.tsx","./src/components/mesh/overlaydiffviewer.tsx","./src/components/mesh/prpreview.tsx","./src/components/mesh/subtasktree.tsx","./src/components/mesh/taskdetail.tsx","./src/components/mesh/tasklist.tsx","./src/components/mesh/taskoutput.tsx","./src/components/mesh/tasktree.tsx","./src/components/mesh/unifiedmeshchatinput.tsx","./src/components/workflow/phasecolumn.tsx","./src/components/workflow/workflowboard.tsx","./src/components/workflow/workflowcontractcard.tsx","./src/contexts/authcontext.tsx","./src/contexts/supervisorquestionscontext.tsx","./src/hooks/usecontracts.ts","./src/hooks/usefilesubscription.ts","./src/hooks/usefiles.ts","./src/hooks/usemeshchathistory.ts","./src/hooks/usemicrophone.ts","./src/hooks/usetasksubscription.ts","./src/hooks/usetasks.ts","./src/hooks/usetextscramble.ts","./src/hooks/useversionhistory.ts","./src/hooks/usewebsocket.ts","./src/lib/api.ts","./src/lib/listenapi.ts","./src/lib/markdown.ts","./src/lib/supabase.ts","./src/routes/_index.tsx","./src/routes/contracts.tsx","./src/routes/files.tsx","./src/routes/history.tsx","./src/routes/listen.tsx","./src/routes/login.tsx","./src/routes/mesh.tsx","./src/routes/settings.tsx","./src/routes/workflow.tsx","./src/types/messages.ts"],"version":"5.9.3"} \ No newline at end of file
+{"root":["./src/main.tsx","./src/vite-env.d.ts","./src/components/gridoverlay.tsx","./src/components/japanesehovertext.tsx","./src/components/logo.tsx","./src/components/masthead.tsx","./src/components/navstrip.tsx","./src/components/protectedroute.tsx","./src/components/rewritelink.tsx","./src/components/simplemarkdown.tsx","./src/components/supervisorquestionnotification.tsx","./src/components/charts/chartrenderer.tsx","./src/components/contracts/autopilotpanel.tsx","./src/components/contracts/contractcliinput.tsx","./src/components/contracts/contractdetail.tsx","./src/components/contracts/contractlist.tsx","./src/components/contracts/phasebadge.tsx","./src/components/contracts/phasedeliverablespanel.tsx","./src/components/contracts/phasehint.tsx","./src/components/contracts/phaseprogressbar.tsx","./src/components/contracts/quickactionbuttons.tsx","./src/components/contracts/repositorypanel.tsx","./src/components/contracts/taskderivationpreview.tsx","./src/components/files/bodyrenderer.tsx","./src/components/files/cliinput.tsx","./src/components/files/conflictnotification.tsx","./src/components/files/elementcontextmenu.tsx","./src/components/files/filedetail.tsx","./src/components/files/filelist.tsx","./src/components/files/reposyncindicator.tsx","./src/components/files/updatenotification.tsx","./src/components/files/versionhistorydropdown.tsx","./src/components/history/checkpointcard.tsx","./src/components/history/checkpointlist.tsx","./src/components/history/conversationmessage.tsx","./src/components/history/conversationview.tsx","./src/components/history/historyfilters.tsx","./src/components/history/resumecontrols.tsx","./src/components/history/timelineeventcard.tsx","./src/components/history/timelinelist.tsx","./src/components/history/index.ts","./src/components/listen/contractpickermodal.tsx","./src/components/listen/controlpanel.tsx","./src/components/listen/speakerpanel.tsx","./src/components/listen/transcriptanalysispanel.tsx","./src/components/listen/transcriptpanel.tsx","./src/components/mesh/directoryinput.tsx","./src/components/mesh/inlinesubtaskeditor.tsx","./src/components/mesh/mergeconflictresolver.tsx","./src/components/mesh/overlaydiffviewer.tsx","./src/components/mesh/prpreview.tsx","./src/components/mesh/subtasktree.tsx","./src/components/mesh/taskdetail.tsx","./src/components/mesh/tasklist.tsx","./src/components/mesh/taskoutput.tsx","./src/components/mesh/tasktree.tsx","./src/components/mesh/unifiedmeshchatinput.tsx","./src/components/workflow/phasecolumn.tsx","./src/components/workflow/workflowboard.tsx","./src/components/workflow/workflowcontractcard.tsx","./src/contexts/authcontext.tsx","./src/contexts/supervisorquestionscontext.tsx","./src/hooks/usecontracts.ts","./src/hooks/usefilesubscription.ts","./src/hooks/usefiles.ts","./src/hooks/usemeshchathistory.ts","./src/hooks/usemicrophone.ts","./src/hooks/usetasksubscription.ts","./src/hooks/usetasks.ts","./src/hooks/usetextscramble.ts","./src/hooks/useversionhistory.ts","./src/hooks/usewebsocket.ts","./src/lib/api.ts","./src/lib/listenapi.ts","./src/lib/markdown.ts","./src/lib/supabase.ts","./src/routes/_index.tsx","./src/routes/contracts.tsx","./src/routes/files.tsx","./src/routes/history.tsx","./src/routes/listen.tsx","./src/routes/login.tsx","./src/routes/mesh.tsx","./src/routes/settings.tsx","./src/routes/workflow.tsx","./src/types/messages.ts"],"version":"5.9.3"} \ No newline at end of file
diff --git a/makima/migrations/20250119000000_add_task_retry_tracking.sql b/makima/migrations/20250119000000_add_task_retry_tracking.sql
new file mode 100644
index 0000000..4d8eea6
--- /dev/null
+++ b/makima/migrations/20250119000000_add_task_retry_tracking.sql
@@ -0,0 +1,22 @@
+-- Add retry tracking columns to tasks table for daemon failover
+
+-- Number of times this task has been retried after daemon failure
+ALTER TABLE tasks ADD COLUMN IF NOT EXISTS retry_count INTEGER NOT NULL DEFAULT 0;
+
+-- Maximum retry attempts before marking as permanently failed
+ALTER TABLE tasks ADD COLUMN IF NOT EXISTS max_retries INTEGER NOT NULL DEFAULT 3;
+
+-- Array of daemon IDs that have failed this task (excluded from retry selection)
+ALTER TABLE tasks ADD COLUMN IF NOT EXISTS failed_daemon_ids UUID[] DEFAULT '{}';
+
+-- When the task was last interrupted due to daemon disconnect
+ALTER TABLE tasks ADD COLUMN IF NOT EXISTS interrupted_at TIMESTAMPTZ;
+
+-- Index for efficient pending task queries with retry consideration
+CREATE INDEX IF NOT EXISTS idx_tasks_status_retry ON tasks(status, retry_count)
+ WHERE status = 'pending';
+
+COMMENT ON COLUMN tasks.retry_count IS 'Number of times this task has been retried after daemon failure';
+COMMENT ON COLUMN tasks.max_retries IS 'Maximum retry attempts before marking as permanently failed';
+COMMENT ON COLUMN tasks.failed_daemon_ids IS 'Array of daemon IDs that have failed this task (excluded from retry)';
+COMMENT ON COLUMN tasks.interrupted_at IS 'When the task was last interrupted due to daemon disconnect';
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs
index 0e1303c..72ba6f2 100644
--- a/makima/src/db/models.rs
+++ b/makima/src/db/models.rs
@@ -6,6 +6,11 @@ use sqlx::FromRow;
use utoipa::ToSchema;
use uuid::Uuid;
+/// Default max retries for task daemon failover (3 attempts)
+fn default_max_retries() -> i32 {
+ 3
+}
+
/// Flexible datetime deserialization module.
/// Accepts both date-only ("2026-01-15") and full ISO 8601 datetime ("2026-01-15T00:00:00Z") formats.
pub mod flexible_datetime {
@@ -500,6 +505,20 @@ pub struct Task {
/// Files to copy from parent task's worktree when starting.
#[serde(skip_serializing_if = "Option::is_none")]
pub copy_files: Option<serde_json::Value>,
+
+ // Retry tracking for daemon failover
+ /// Number of times this task has been retried after daemon failure
+ #[serde(default)]
+ pub retry_count: i32,
+ /// Maximum retry attempts before marking as permanently failed
+ #[serde(default = "default_max_retries")]
+ pub max_retries: i32,
+ /// Array of daemon IDs that have failed this task (excluded from retry)
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub failed_daemon_ids: Option<Vec<Uuid>>,
+ /// When the task was last interrupted due to daemon disconnect
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub interrupted_at: Option<DateTime<Utc>>,
}
impl Task {
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index 2b069d5..43b8e3a 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -790,6 +790,8 @@ pub async fn list_tasks_by_contract(
}
/// Get pending tasks for a contract (non-supervisor tasks only).
+/// Includes tasks that were interrupted (retry candidates).
+/// Prioritizes interrupted tasks and excludes those that exceeded max_retries.
pub async fn get_pending_tasks_for_contract(
pool: &PgPool,
contract_id: Uuid,
@@ -801,7 +803,11 @@ pub async fn get_pending_tasks_for_contract(
WHERE contract_id = $1 AND owner_id = $2
AND status = 'pending'
AND is_supervisor = false
- ORDER BY priority DESC, created_at ASC
+ AND retry_count < max_retries
+ ORDER BY
+ interrupted_at DESC NULLS LAST,
+ priority DESC,
+ created_at ASC
"#,
)
.bind(contract_id)
@@ -810,6 +816,61 @@ pub async fn get_pending_tasks_for_contract(
.await
}
+/// Mark a task as pending for retry after daemon failure.
+/// Increments retry count and adds the failed daemon to exclusion list.
+pub async fn mark_task_for_retry(
+ pool: &PgPool,
+ task_id: Uuid,
+ failed_daemon_id: Uuid,
+) -> Result<Option<Task>, sqlx::Error> {
+ sqlx::query_as::<_, Task>(
+ r#"
+ UPDATE tasks
+ SET status = 'pending',
+ daemon_id = NULL,
+ retry_count = retry_count + 1,
+ failed_daemon_ids = array_append(COALESCE(failed_daemon_ids, '{}'), $2),
+ last_active_daemon_id = $2,
+ interrupted_at = NOW(),
+ error_message = 'Daemon disconnected, awaiting retry',
+ updated_at = NOW()
+ WHERE id = $1
+ AND retry_count < max_retries
+ RETURNING *
+ "#,
+ )
+ .bind(task_id)
+ .bind(failed_daemon_id)
+ .fetch_optional(pool)
+ .await
+}
+
+/// Mark a task as permanently failed (exceeded retry limit).
+pub async fn mark_task_permanently_failed(
+ pool: &PgPool,
+ task_id: Uuid,
+ failed_daemon_id: Uuid,
+) -> Result<(), sqlx::Error> {
+ sqlx::query(
+ r#"
+ UPDATE tasks
+ SET status = 'failed',
+ daemon_id = NULL,
+ retry_count = retry_count + 1,
+ failed_daemon_ids = array_append(COALESCE(failed_daemon_ids, '{}'), $2),
+ last_active_daemon_id = $2,
+ error_message = 'Task failed: exceeded maximum retry attempts',
+ updated_at = NOW()
+ WHERE id = $1
+ "#,
+ )
+ .bind(task_id)
+ .bind(failed_daemon_id)
+ .execute(pool)
+ .await?;
+ Ok(())
+}
+
/// Update a task by ID with optimistic locking.
pub async fn update_task(
pool: &PgPool,
@@ -3008,6 +3069,35 @@ pub async fn get_available_daemons(
.await
}
+/// Get daemons with capacity info for selection, excluding specified daemon IDs.
+/// Used for task retry to avoid reassigning to daemons that have already failed.
+pub async fn get_available_daemons_excluding(
+ pool: &PgPool,
+ owner_id: Uuid,
+ exclude_daemon_ids: &[Uuid],
+) -> Result<Vec<DaemonWithCapacity>, sqlx::Error> {
+ sqlx::query_as::<_, DaemonWithCapacity>(
+ r#"
+ SELECT id, owner_id, connection_id, hostname, machine_id,
+ max_concurrent_tasks, current_task_count,
+ capacity_score, task_queue_length, supports_migration,
+ status, last_heartbeat_at, connected_at
+ FROM daemons
+ WHERE owner_id = $1
+ AND status = 'connected'
+ AND id != ALL($2)
+ ORDER BY
+ COALESCE(capacity_score, 100) DESC,
+ (max_concurrent_tasks - current_task_count) DESC,
+ COALESCE(task_queue_length, 0) ASC
+ "#,
+ )
+ .bind(owner_id)
+ .bind(exclude_daemon_ids)
+ .fetch_all(pool)
+ .await
+}
+
/// Create a daemon task assignment.
pub async fn create_daemon_task_assignment(
pool: &PgPool,
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index 4bcb5cd..beb676e 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -20,6 +20,7 @@ use sqlx::Row;
use tokio::sync::mpsc;
use uuid::Uuid;
+use crate::db::models::Task;
use crate::db::repository;
use crate::server::auth::{hash_api_key, API_KEY_HEADER};
use crate::server::messages::ApiError;
@@ -1334,42 +1335,86 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
);
}
- // Find tasks assigned to this daemon that are still active
- if let Err(e) = clear_daemon_from_tasks(&pool, daemon_uuid).await {
+ // Find tasks assigned to this daemon and mark for retry or fail permanently
+ if let Err(e) = handle_daemon_disconnect_tasks(&pool, daemon_uuid).await {
tracing::error!(
daemon_id = %daemon_uuid,
error = %e,
- "Failed to clear daemon from tasks on disconnect"
+ "Failed to handle daemon disconnect for tasks"
);
}
});
}
}
-/// Clear daemon_id from tasks when daemon disconnects
-async fn clear_daemon_from_tasks(pool: &sqlx::PgPool, daemon_id: Uuid) -> Result<(), sqlx::Error> {
- // Update tasks that were running on this daemon to failed state
- let result = sqlx::query(
+/// Handle tasks when daemon disconnects - mark for retry or fail permanently.
+async fn handle_daemon_disconnect_tasks(pool: &sqlx::PgPool, daemon_id: Uuid) -> Result<(), sqlx::Error> {
+ // Get all active tasks on this daemon
+ let active_tasks: Vec<Task> = sqlx::query_as(
r#"
- UPDATE tasks
- SET daemon_id = NULL,
- status = 'failed',
- error_message = 'Daemon disconnected',
- updated_at = NOW()
+ SELECT * FROM tasks
WHERE daemon_id = $1
AND status IN ('starting', 'running', 'initializing')
"#,
)
.bind(daemon_id)
- .execute(pool)
+ .fetch_all(pool)
.await?;
- if result.rows_affected() > 0 {
- tracing::warn!(
- daemon_id = %daemon_id,
- tasks_affected = result.rows_affected(),
- "Marked tasks as failed due to daemon disconnect"
- );
+ if active_tasks.is_empty() {
+ return Ok(());
+ }
+
+ tracing::info!(
+ daemon_id = %daemon_id,
+ task_count = active_tasks.len(),
+ "Processing tasks for disconnected daemon"
+ );
+
+ for task in active_tasks {
+ if task.retry_count < task.max_retries {
+ // Mark for retry
+ match repository::mark_task_for_retry(pool, task.id, daemon_id).await {
+ Ok(Some(updated_task)) => {
+ tracing::info!(
+ task_id = %task.id,
+ task_name = %task.name,
+ retry_count = updated_task.retry_count,
+ max_retries = updated_task.max_retries,
+ "Task marked for retry after daemon disconnect"
+ );
+ }
+ Ok(None) => {
+ tracing::warn!(
+ task_id = %task.id,
+ "Task not found or already at max retries"
+ );
+ }
+ Err(e) => {
+ tracing::error!(
+ task_id = %task.id,
+ error = %e,
+ "Failed to mark task for retry"
+ );
+ }
+ }
+ } else {
+ // Exceeded retries, mark as permanently failed
+ if let Err(e) = repository::mark_task_permanently_failed(pool, task.id, daemon_id).await {
+ tracing::error!(
+ task_id = %task.id,
+ error = %e,
+ "Failed to mark task as permanently failed"
+ );
+ } else {
+ tracing::warn!(
+ task_id = %task.id,
+ task_name = %task.name,
+ retry_count = task.retry_count + 1,
+ "Task permanently failed: exceeded maximum retries"
+ );
+ }
+ }
}
Ok(())
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index 1014fdc..754d086 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -256,6 +256,7 @@ async fn verify_supervisor_auth(
/// Try to start a pending task on an available daemon.
/// Returns Ok(Some(task)) if a task was started, Ok(None) if no tasks could be started.
+/// For retried tasks, excludes daemons that previously failed the task.
async fn try_start_pending_task(
state: &SharedState,
contract_id: Uuid,
@@ -263,7 +264,7 @@ async fn try_start_pending_task(
) -> Result<Option<Task>, String> {
let pool = state.db_pool.as_ref().ok_or("Database not configured")?;
- // Get pending tasks for this contract
+ // Get pending tasks for this contract (includes interrupted tasks awaiting retry)
let pending_tasks = repository::get_pending_tasks_for_contract(pool, contract_id, owner_id)
.await
.map_err(|e| format!("Failed to get pending tasks: {}", e))?;
@@ -272,89 +273,95 @@ async fn try_start_pending_task(
return Ok(None);
}
- // Get available daemons with capacity
- let daemons = repository::get_available_daemons(pool, owner_id)
- .await
- .map_err(|e| format!("Failed to get available daemons: {}", e))?;
-
- // Find a daemon with capacity
- let available_daemon = daemons.iter().find(|d| {
- d.current_task_count < d.max_concurrent_tasks
- && state.daemon_connections.contains_key(&d.connection_id)
- });
+ // Try each pending task until we find one we can start
+ for task in &pending_tasks {
+ // Get excluded daemon IDs for this task (daemons that have already failed it)
+ let exclude_ids: Vec<Uuid> = task.failed_daemon_ids.clone().unwrap_or_default();
- let daemon = match available_daemon {
- Some(d) => d,
- None => return Ok(None), // No daemon with capacity
- };
+ // Get available daemons excluding failed ones for this task
+ let daemons = repository::get_available_daemons_excluding(pool, owner_id, &exclude_ids)
+ .await
+ .map_err(|e| format!("Failed to get available daemons: {}", e))?;
- // Try to start the first pending task
- let task = &pending_tasks[0];
+ // Find a daemon with capacity
+ let available_daemon = daemons.iter().find(|d| {
+ d.current_task_count < d.max_concurrent_tasks
+ && state.daemon_connections.contains_key(&d.connection_id)
+ });
- // Get repo URL from task or contract
- let repo_url = if let Some(url) = &task.repository_url {
- Some(url.clone())
- } else {
- match repository::list_contract_repositories(pool, contract_id).await {
- Ok(repos) => repos
- .iter()
- .find(|r| r.is_primary)
- .or(repos.first())
- .and_then(|r| r.repository_url.clone().or_else(|| r.local_path.clone())),
- Err(_) => None,
- }
- };
+ let daemon = match available_daemon {
+ Some(d) => d,
+ None => continue, // Try next task
+ };
- // Update task with daemon assignment
- let update_req = UpdateTaskRequest {
- status: Some("starting".to_string()),
- daemon_id: Some(daemon.id),
- version: Some(task.version),
- ..Default::default()
- };
+ // Get repo URL from task or contract
+ let repo_url = if let Some(url) = &task.repository_url {
+ Some(url.clone())
+ } else {
+ match repository::list_contract_repositories(pool, contract_id).await {
+ Ok(repos) => repos
+ .iter()
+ .find(|r| r.is_primary)
+ .or(repos.first())
+ .and_then(|r| r.repository_url.clone().or_else(|| r.local_path.clone())),
+ Err(_) => None,
+ }
+ };
- let updated_task = match repository::update_task_for_owner(pool, task.id, owner_id, update_req).await {
- Ok(Some(t)) => t,
- Ok(None) => return Ok(None),
- Err(e) => {
- tracing::warn!(task_id = %task.id, error = %e, "Failed to update task for daemon assignment");
- return Ok(None);
- }
- };
+ // Update task with daemon assignment
+ let update_req = UpdateTaskRequest {
+ status: Some("starting".to_string()),
+ daemon_id: Some(daemon.id),
+ version: Some(task.version),
+ ..Default::default()
+ };
- // Send spawn command
- let cmd = DaemonCommand::SpawnTask {
- task_id: updated_task.id,
- task_name: updated_task.name.clone(),
- plan: updated_task.plan.clone(),
- repo_url,
- base_branch: updated_task.base_branch.clone(),
- target_branch: updated_task.target_branch.clone(),
- parent_task_id: updated_task.parent_task_id,
- depth: updated_task.depth,
- is_orchestrator: false,
- target_repo_path: updated_task.target_repo_path.clone(),
- completion_action: updated_task.completion_action.clone(),
- continue_from_task_id: updated_task.continue_from_task_id,
- copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
- contract_id: updated_task.contract_id,
- is_supervisor: false,
- };
+ let updated_task = match repository::update_task_for_owner(pool, task.id, owner_id, update_req).await {
+ Ok(Some(t)) => t,
+ Ok(None) => continue, // Task was modified concurrently, try next
+ Err(e) => {
+ tracing::warn!(task_id = %task.id, error = %e, "Failed to update task for daemon assignment");
+ continue; // Try next task
+ }
+ };
- if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
- tracing::warn!(error = %e, daemon_id = %daemon.id, task_id = %task.id, "Failed to send spawn command");
- // Rollback
- let rollback_req = UpdateTaskRequest {
- status: Some("pending".to_string()),
- clear_daemon_id: true,
- ..Default::default()
+ // Send spawn command
+ let cmd = DaemonCommand::SpawnTask {
+ task_id: updated_task.id,
+ task_name: updated_task.name.clone(),
+ plan: updated_task.plan.clone(),
+ repo_url,
+ base_branch: updated_task.base_branch.clone(),
+ target_branch: updated_task.target_branch.clone(),
+ parent_task_id: updated_task.parent_task_id,
+ depth: updated_task.depth,
+ is_orchestrator: false,
+ target_repo_path: updated_task.target_repo_path.clone(),
+ completion_action: updated_task.completion_action.clone(),
+ continue_from_task_id: updated_task.continue_from_task_id,
+ copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
+ contract_id: updated_task.contract_id,
+ is_supervisor: false,
};
- let _ = repository::update_task_for_owner(pool, task.id, owner_id, rollback_req).await;
- return Ok(None);
+
+ if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
+ tracing::warn!(error = %e, daemon_id = %daemon.id, task_id = %task.id, "Failed to send spawn command");
+ // Rollback
+ let rollback_req = UpdateTaskRequest {
+ status: Some("pending".to_string()),
+ clear_daemon_id: true,
+ ..Default::default()
+ };
+ let _ = repository::update_task_for_owner(pool, task.id, owner_id, rollback_req).await;
+ continue; // Try next task
+ }
+
+ tracing::info!(task_id = %task.id, daemon_id = %daemon.id, "Started pending task from wait loop");
+ return Ok(Some(updated_task));
}
- tracing::info!(task_id = %task.id, daemon_id = %daemon.id, "Started pending task from wait loop");
- Ok(Some(updated_task))
+ // No tasks could be started
+ Ok(None)
}
// =============================================================================