diff options
| author | soryu <soryu@soryu.co> | 2026-01-21 17:31:46 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-01-21 17:31:46 +0000 |
| commit | 94e5604e770d6589f786ea71e51738e21492f301 (patch) | |
| tree | 6c9b0f32a8d77464bc1a5131ba0828d252851abc | |
| parent | da246c4c4e23c9ad976705f9a3fa80e0d75b4425 (diff) | |
| download | soryu-94e5604e770d6589f786ea71e51738e21492f301.tar.gz soryu-94e5604e770d6589f786ea71e51738e21492f301.zip | |
Add task branching feature (#15)
| -rw-r--r-- | makima/frontend/src/components/mesh/BranchTaskModal.tsx | 132 | ||||
| -rw-r--r-- | makima/frontend/src/components/mesh/TaskDetail.tsx | 26 | ||||
| -rw-r--r-- | makima/frontend/src/lib/api.ts | 47 | ||||
| -rw-r--r-- | makima/frontend/src/routes/mesh.tsx | 22 | ||||
| -rw-r--r-- | makima/migrations/20250121000000_add_branched_from.sql | 10 | ||||
| -rw-r--r-- | makima/src/db/models.rs | 45 | ||||
| -rw-r--r-- | makima/src/db/repository.rs | 54 | ||||
| -rw-r--r-- | makima/src/server/handlers/contract_chat.rs | 16 | ||||
| -rw-r--r-- | makima/src/server/handlers/contracts.rs | 4 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh.rs | 269 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_chat.rs | 4 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_supervisor.rs | 4 | ||||
| -rw-r--r-- | makima/src/server/handlers/transcript_analysis.rs | 8 | ||||
| -rw-r--r-- | makima/src/server/mod.rs | 61 | ||||
| -rw-r--r-- | makima/src/server/openapi.rs | 25 |
15 files changed, 683 insertions, 44 deletions
diff --git a/makima/frontend/src/components/mesh/BranchTaskModal.tsx b/makima/frontend/src/components/mesh/BranchTaskModal.tsx new file mode 100644 index 0000000..ade4c7d --- /dev/null +++ b/makima/frontend/src/components/mesh/BranchTaskModal.tsx @@ -0,0 +1,132 @@ +import { useState } from "react"; +import type { TaskWithSubtasks } from "../../lib/api"; + +interface BranchTaskModalProps { + task: TaskWithSubtasks; + onBranch: (taskId: string, message: string, name?: string) => Promise<void>; + onClose: () => void; +} + +export function BranchTaskModal({ + task, + onBranch, + onClose, +}: BranchTaskModalProps) { + const [name, setName] = useState(`Branch of ${task.name}`); + const [message, setMessage] = useState(""); + const [submitting, setSubmitting] = useState(false); + const [error, setError] = useState<string | null>(null); + + const handleSubmit = async () => { + if (!message.trim()) { + setError("Message is required"); + return; + } + + setSubmitting(true); + setError(null); + + try { + await onBranch(task.id, message.trim(), name.trim() || undefined); + onClose(); + } catch (err) { + setError(err instanceof Error ? err.message : "Failed to create branch"); + } finally { + setSubmitting(false); + } + }; + + return ( + <div className="fixed inset-0 z-50 flex items-center justify-center bg-black/70 backdrop-blur-sm"> + <div className="w-full max-w-lg mx-4 bg-[#0a1628] border border-[rgba(117,170,252,0.3)] shadow-2xl"> + {/* Header */} + <div className="flex items-center justify-between p-4 border-b border-[rgba(117,170,252,0.2)]"> + <div className="flex items-center gap-2"> + <span className="text-purple-400 text-lg">*</span> + <h2 className="font-mono text-sm text-[#75aafc] uppercase tracking-wide"> + Branch Task + </h2> + </div> + <button + onClick={onClose} + className="text-[#555] hover:text-[#9bc3ff] transition-colors" + aria-label="Close" + > + <span className="text-xl">×</span> + </button> + </div> + + {/* Content */} + <div className="p-4 space-y-4"> + {/* Source task info */} + <div className="bg-[#0d1b2d] border border-[rgba(117,170,252,0.15)] p-3"> + <p className="font-mono text-xs text-[#9bc3ff] mb-1 uppercase"> + Branching From + </p> + <p className="font-mono text-sm text-[#dbe7ff]">{task.name}</p> + <p className="font-mono text-[10px] text-[#555] mt-1"> + Status: {task.status} + </p> + </div> + + {/* Name input */} + <div className="space-y-2"> + <label className="block font-mono text-xs text-[#9bc3ff] uppercase"> + Branch Name + </label> + <input + type="text" + value={name} + onChange={(e) => setName(e.target.value)} + placeholder="Enter branch task name..." + className="w-full px-3 py-2 bg-[#0d1b2d] border border-[#3f6fb3] text-[#dbe7ff] font-mono text-sm focus:outline-none focus:border-[#75aafc]" + /> + </div> + + {/* Message input */} + <div className="space-y-2"> + <label className="block font-mono text-xs text-[#9bc3ff] uppercase"> + Message <span className="text-red-400">*</span> + </label> + <textarea + value={message} + onChange={(e) => setMessage(e.target.value)} + placeholder="Enter a message for the new task to continue with..." + rows={4} + className="w-full px-3 py-2 bg-[#0d1b2d] border border-[#3f6fb3] text-[#dbe7ff] font-mono text-sm focus:outline-none focus:border-[#75aafc] resize-none" + autoFocus + /> + <p className="font-mono text-[10px] text-[#555]"> + This message will be sent to the branched task when it starts. + </p> + </div> + + {/* Error message */} + {error && ( + <div className="bg-red-900/20 border border-red-500/30 p-3 text-red-400 font-mono text-sm"> + {error} + </div> + )} + + {/* Actions */} + <div className="flex gap-3 pt-4 border-t border-[rgba(117,170,252,0.2)]"> + <button + onClick={onClose} + disabled={submitting} + className="flex-1 px-4 py-2.5 font-mono text-sm text-[#9bc3ff] border border-[rgba(117,170,252,0.25)] hover:border-[#3f6fb3] disabled:opacity-50 disabled:cursor-not-allowed transition-colors uppercase" + > + Cancel + </button> + <button + onClick={handleSubmit} + disabled={submitting || !message.trim()} + className="flex-1 px-4 py-2.5 font-mono text-sm text-[#dbe7ff] bg-purple-700/30 border border-purple-400/50 hover:bg-purple-700/40 hover:border-purple-400/70 disabled:opacity-50 disabled:cursor-not-allowed transition-colors uppercase" + > + {submitting ? "Creating..." : "Create Branch"} + </button> + </div> + </div> + </div> + </div> + ); +} diff --git a/makima/frontend/src/components/mesh/TaskDetail.tsx b/makima/frontend/src/components/mesh/TaskDetail.tsx index efe26a8..a74f394 100644 --- a/makima/frontend/src/components/mesh/TaskDetail.tsx +++ b/makima/frontend/src/components/mesh/TaskDetail.tsx @@ -6,6 +6,7 @@ import { OverlayDiffViewer } from "./OverlayDiffViewer"; import { PRPreview } from "./PRPreview"; import { InlineSubtaskEditor } from "./InlineSubtaskEditor"; import { DirectoryInput } from "./DirectoryInput"; +import { BranchTaskModal } from "./BranchTaskModal"; interface TaskDetailProps { task: TaskWithSubtasks; @@ -25,6 +26,8 @@ interface TaskDetailProps { viewingSubtaskId?: string | null; /** Navigate to view the contract */ onViewContract?: (contractId: string) => void; + /** Branch the task to create a new task with same state */ + onBranch?: (taskId: string, message: string, name?: string) => Promise<void>; // Optional advanced features overlayDiff?: string; changedFiles?: string[]; @@ -110,6 +113,7 @@ export function TaskDetail({ onToggleSubtaskOutput, viewingSubtaskId, onViewContract, + onBranch, overlayDiff, changedFiles, onRequestDiff, @@ -142,6 +146,8 @@ export function TaskDetail({ const [isCloning, setIsCloning] = useState(false); const [cloneError, setCloneError] = useState<string | null>(null); const [cloneTargetDir, setCloneTargetDir] = useState(""); + // Track branch modal state + const [showBranchModal, setShowBranchModal] = useState(false); // Check if task is running const isTaskRunning = task.status === "running" || task.status === "initializing" || task.status === "starting"; @@ -151,6 +157,8 @@ export function TaskDetail({ const isSupervisor = task.isSupervisor === true; // Show continue for supervisors (always) or terminal states for other tasks const canContinue = isSupervisor || isTaskTerminal; + // Show branch button when task has run at least once (not pending) + const canBranch = onBranch && task.status !== "pending"; // Determine which tasks to show: for supervisors, show contractTasks; for regular tasks, show subtasks const displayTasks = useMemo(() => { @@ -380,6 +388,15 @@ export function TaskDetail({ Continue </button> )} + {canBranch && ( + <button + onClick={() => setShowBranchModal(true)} + className="px-3 py-1 font-mono text-xs text-purple-400 border border-purple-400/30 hover:border-purple-400/50 hover:bg-purple-400/10 transition-colors uppercase flex items-center gap-1" + > + <span className="w-1.5 h-1.5 bg-purple-400 rounded-full" /> + Branch + </button> + )} <button onClick={() => setIsEditing(true)} className="px-3 py-1 font-mono text-xs text-[#9bc3ff] border border-[rgba(117,170,252,0.25)] hover:border-[#3f6fb3] transition-colors uppercase" @@ -908,6 +925,15 @@ export function TaskDetail({ </div> </div> )} + + {/* Branch Task Modal */} + {showBranchModal && onBranch && ( + <BranchTaskModal + task={task} + onBranch={onBranch} + onClose={() => setShowBranchModal(false)} + /> + )} </div> ); } diff --git a/makima/frontend/src/lib/api.ts b/makima/frontend/src/lib/api.ts index 14ec9f2..86ff06c 100644 --- a/makima/frontend/src/lib/api.ts +++ b/makima/frontend/src/lib/api.ts @@ -607,8 +607,8 @@ export interface TaskListResponse { } export interface CreateTaskRequest { - /** Contract this task belongs to (required) */ - contractId: string; + /** Contract this task belongs to (optional - can be standalone) */ + contractId?: string; name: string; description?: string; plan: string; @@ -974,6 +974,49 @@ export async function listSubtasks(taskId: string): Promise<TaskListResponse> { return res.json(); } +// ============================================================================= +// Task Branching +// ============================================================================= + +/** Request to branch a task */ +export interface BranchTaskRequest { + /** Message to send to the new branched task */ + message: string; + /** Optional name for the new task (defaults to "Branch of {original task name}") */ + name?: string; + /** Whether to include conversation history from the source task */ + includeConversation?: boolean; +} + +/** Response from branching a task */ +export interface BranchTaskResponse { + /** The newly created task */ + task: Task; + /** Number of conversation messages copied to the new task */ + messageCount: number; + /** ID of the daemon assigned to the new task (null if not yet assigned) */ + daemonId: string | null; +} + +/** + * Branch a task to create a new task with the same state. + * Copies the worktree and optionally the conversation history. + */ +export async function branchTask( + taskId: string, + request: BranchTaskRequest +): Promise<BranchTaskResponse> { + const res = await authFetch(`${API_BASE}/api/v1/mesh/tasks/${taskId}/branch`, { + method: "POST", + body: JSON.stringify(request), + }); + if (!res.ok) { + const errorText = await res.text(); + throw new Error(`Failed to branch task: ${errorText || res.statusText}`); + } + return res.json(); +} + export async function listTaskEvents( taskId: string ): Promise<TaskEventListResponse> { diff --git a/makima/frontend/src/routes/mesh.tsx b/makima/frontend/src/routes/mesh.tsx index 142cc54..453bdff 100644 --- a/makima/frontend/src/routes/mesh.tsx +++ b/makima/frontend/src/routes/mesh.tsx @@ -9,7 +9,7 @@ import { ContractCompleteQuestion } from "../components/mesh/ContractCompleteQue import { useTasks } from "../hooks/useTasks"; import { useTaskSubscription, type TaskUpdateEvent, type TaskOutputEvent } from "../hooks/useTaskSubscription"; import type { TaskWithSubtasks, MeshChatContext, ContractSummary, ContractWithRelations, DaemonDirectory, TaskSummary } from "../lib/api"; -import { startTask as startTaskApi, stopTask as stopTaskApi, getTaskOutput, listContracts, getContract, getDaemonDirectories, continueTask as continueTaskApi, resumeSupervisor } from "../lib/api"; +import { startTask as startTaskApi, stopTask as stopTaskApi, getTaskOutput, listContracts, getContract, getDaemonDirectories, continueTask as continueTaskApi, resumeSupervisor, branchTask } from "../lib/api"; import { DirectoryInput } from "../components/mesh/DirectoryInput"; import { useAuth } from "../contexts/AuthContext"; import { useSupervisorQuestions } from "../contexts/SupervisorQuestionsContext"; @@ -461,6 +461,25 @@ export default function MeshPage() { [editTask, taskDetail] ); + const handleBranch = useCallback( + async (taskId: string, message: string, name?: string) => { + try { + const result = await branchTask(taskId, { + message, + name, + includeConversation: true, + }); + console.log(`[Mesh] Task branched, new task ID: ${result.task.id}`); + // Navigate to the new branched task + navigate(`/mesh/${result.task.id}`); + } catch (e) { + console.error("Failed to branch task:", e); + throw e; // Re-throw so the modal can display the error + } + }, + [navigate] + ); + // Open contract selection modal const handleCreate = useCallback(async () => { if (creating || contractsLoading) return; @@ -742,6 +761,7 @@ export default function MeshPage() { onToggleSubtaskOutput={handleToggleSubtaskOutput} viewingSubtaskId={viewingSubtaskId} onViewContract={(contractId) => navigate(`/contracts/${contractId}`)} + onBranch={handleBranch} contractTasks={taskDetail.isSupervisor ? contractTasks : undefined} /> </div> diff --git a/makima/migrations/20250121000000_add_branched_from.sql b/makima/migrations/20250121000000_add_branched_from.sql new file mode 100644 index 0000000..bbb395b --- /dev/null +++ b/makima/migrations/20250121000000_add_branched_from.sql @@ -0,0 +1,10 @@ +-- Add branched_from_task_id column to tasks table for task branching support. +-- This allows creating new tasks that branch from an existing task's conversation, +-- enabling "what if" exploration of different approaches from the same starting point. + +ALTER TABLE tasks +ADD COLUMN IF NOT EXISTS branched_from_task_id UUID REFERENCES tasks(id) ON DELETE SET NULL; + +CREATE INDEX IF NOT EXISTS idx_tasks_branched_from ON tasks(branched_from_task_id); + +COMMENT ON COLUMN tasks.branched_from_task_id IS 'Source task ID when this task was branched from another task conversation'; diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index 291fad7..bf95a3a 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -519,6 +519,12 @@ pub struct Task { /// When the task was last interrupted due to daemon disconnect #[serde(skip_serializing_if = "Option::is_none")] pub interrupted_at: Option<DateTime<Utc>>, + + // Task branching + /// Source task ID when this task was branched from another task's conversation. + /// Used to track the origin of "what if" explorations. + #[serde(skip_serializing_if = "Option::is_none")] + pub branched_from_task_id: Option<Uuid>, } impl Task { @@ -598,8 +604,8 @@ pub struct TaskListResponse { #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct CreateTaskRequest { - /// Contract this task belongs to (required) - pub contract_id: Uuid, + /// Contract this task belongs to (optional for branched/anonymous tasks) + pub contract_id: Option<Uuid>, /// Name of the task pub name: String, /// Optional description @@ -633,6 +639,10 @@ pub struct CreateTaskRequest { pub copy_files: Option<Vec<String>>, /// Checkpoint SHA to branch from (optional) pub checkpoint_sha: Option<String>, + /// Source task ID when branching from another task's conversation + pub branched_from_task_id: Option<Uuid>, + /// Conversation history to initialize the task with (JSON array of messages) + pub conversation_history: Option<serde_json::Value>, } /// Request payload for updating a task @@ -681,6 +691,37 @@ pub struct SendMessageRequest { pub message: String, } +/// Default for include_conversation field in BranchTaskRequest +fn default_include_conversation() -> bool { + true +} + +/// Request to branch a task from an existing task's conversation. +/// Creates a new anonymous task that continues from the source task's state. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct BranchTaskRequest { + /// The initial message/instructions for the branched task + pub message: String, + /// Optional name for the branched task (auto-generated if not provided) + pub name: Option<String>, + /// Whether to include conversation history from the source task (default: true) + #[serde(default = "default_include_conversation")] + pub include_conversation: bool, +} + +/// Response from branching a task. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct BranchTaskResponse { + /// The newly created branched task + pub task: Task, + /// Number of conversation messages included from source task + pub message_count: usize, + /// Daemon ID if the task was started (None if no daemon available) + pub daemon_id: Option<Uuid>, +} + // ============================================================================= // Daemon Types // ============================================================================= diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 536bc9b..7387735 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -654,8 +654,8 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, let new_depth = parent.depth + 1; - // Subtasks inherit contract_id from parent - let contract_id = parent.contract_id.unwrap_or(req.contract_id); + // Subtasks inherit contract_id from parent (or use request contract_id if parent has none) + let contract_id = parent.contract_id.or(req.contract_id); // Inherit repo settings if not provided let repo_url = req.repository_url.clone().or(parent.repository_url); @@ -669,7 +669,7 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, (new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) } else { - // Top-level task: depth 0, use contract_id from request + // Top-level task: depth 0, use contract_id from request (may be None for branched tasks) ( 0, req.contract_id, @@ -689,9 +689,10 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, INSERT INTO tasks ( contract_id, parent_task_id, depth, name, description, plan, priority, is_supervisor, repository_url, base_branch, target_branch, merge_mode, - target_repo_path, completion_action, continue_from_task_id, copy_files + target_repo_path, completion_action, continue_from_task_id, copy_files, + branched_from_task_id, conversation_state ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18) RETURNING * "#, ) @@ -711,6 +712,8 @@ pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, .bind(&completion_action) .bind(&req.continue_from_task_id) .bind(©_files_json) + .bind(&req.branched_from_task_id) + .bind(&req.conversation_history) .fetch_one(pool) .await } @@ -1041,8 +1044,8 @@ pub async fn create_task_for_owner( ))); } - // Subtasks inherit contract_id from parent - let contract_id = parent.contract_id.unwrap_or(req.contract_id); + // Subtasks inherit contract_id from parent (or use request contract_id if parent has none) + let contract_id = parent.contract_id.or(req.contract_id); // Inherit repo settings if not provided let repo_url = req.repository_url.clone().or(parent.repository_url); @@ -1056,7 +1059,7 @@ pub async fn create_task_for_owner( (new_depth, contract_id, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) } else { - // Top-level task: depth 0, use contract_id from request + // Top-level task: depth 0, use contract_id from request (may be None for branched tasks) ( 0, req.contract_id, @@ -1076,9 +1079,10 @@ pub async fn create_task_for_owner( INSERT INTO tasks ( owner_id, contract_id, parent_task_id, depth, name, description, plan, priority, is_supervisor, repository_url, base_branch, target_branch, merge_mode, - target_repo_path, completion_action, continue_from_task_id, copy_files + target_repo_path, completion_action, continue_from_task_id, copy_files, + branched_from_task_id, conversation_state ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19) RETURNING * "#, ) @@ -1099,6 +1103,8 @@ pub async fn create_task_for_owner( .bind(&completion_action) .bind(&req.continue_from_task_id) .bind(©_files_json) + .bind(&req.branched_from_task_id) + .bind(&req.conversation_history) .fetch_one(pool) .await } @@ -3678,3 +3684,31 @@ pub async fn get_supervisor_conversation_full( ) -> Result<Option<SupervisorState>, sqlx::Error> { get_supervisor_state(pool, contract_id).await } + +// ============================================================================= +// Anonymous Task Cleanup Functions +// ============================================================================= + +/// Delete stale anonymous tasks (tasks with contract_id = NULL) that: +/// - Are in a terminal state (done, failed, merged) +/// - Are older than the specified number of days +/// +/// Returns the number of deleted tasks. +pub async fn cleanup_stale_anonymous_tasks( + pool: &PgPool, + max_age_days: i32, +) -> Result<i64, sqlx::Error> { + let result = sqlx::query( + r#" + DELETE FROM tasks + WHERE contract_id IS NULL + AND status IN ('done', 'failed', 'merged') + AND created_at < NOW() - INTERVAL '1 day' * $1 + "#, + ) + .bind(max_age_days) + .execute(pool) + .await?; + + Ok(result.rows_affected() as i64) +} diff --git a/makima/src/server/handlers/contract_chat.rs b/makima/src/server/handlers/contract_chat.rs index 0f794c1..c94538d 100644 --- a/makima/src/server/handlers/contract_chat.rs +++ b/makima/src/server/handlers/contract_chat.rs @@ -1356,7 +1356,7 @@ async fn handle_contract_request( }; let create_req = CreateTaskRequest { - contract_id, + contract_id: Some(contract_id), name: name.clone(), description: None, plan, @@ -1372,6 +1372,8 @@ async fn handle_contract_request( copy_files: None, is_supervisor: false, checkpoint_sha: None, + branched_from_task_id: None, + conversation_history: None, }; match repository::create_task_for_owner(pool, owner_id, create_req).await { @@ -1450,7 +1452,7 @@ async fn handle_contract_request( ); let create_req = CreateTaskRequest { - contract_id, + contract_id: Some(contract_id), name: task_name.clone(), description: Some(instruction.clone()), plan, @@ -1466,6 +1468,8 @@ async fn handle_contract_request( copy_files: None, is_supervisor: false, checkpoint_sha: None, + branched_from_task_id: None, + conversation_history: None, }; match repository::create_task_for_owner(pool, owner_id, create_req).await { @@ -2054,7 +2058,7 @@ async fn handle_contract_request( for task_def in &tasks { let create_req = CreateTaskRequest { - contract_id, + contract_id: Some(contract_id), name: task_def.name.clone(), description: None, plan: task_def.plan.clone(), @@ -2070,6 +2074,8 @@ async fn handle_contract_request( copy_files: None, is_supervisor: false, checkpoint_sha: None, + branched_from_task_id: None, + conversation_history: None, }; match repository::create_task_for_owner(pool, owner_id, create_req).await { @@ -2564,7 +2570,7 @@ async fn handle_contract_request( if include_action_items && !analysis.action_items.is_empty() { for item in &analysis.action_items { let task_req = CreateTaskRequest { - contract_id: contract.id, + contract_id: Some(contract.id), name: item.text.chars().take(100).collect(), description: Some(format!("Action item from: {}", item.speaker)), plan: item.text.clone(), @@ -2584,6 +2590,8 @@ async fn handle_contract_request( copy_files: None, is_supervisor: false, checkpoint_sha: None, + branched_from_task_id: None, + conversation_history: None, }; if repository::create_task_for_owner(pool, owner_id, task_req).await.is_ok() { diff --git a/makima/src/server/handlers/contracts.rs b/makima/src/server/handlers/contracts.rs index 11337f2..462b385 100644 --- a/makima/src/server/handlers/contracts.rs +++ b/makima/src/server/handlers/contracts.rs @@ -287,7 +287,7 @@ pub async fn create_contract( base_branch: None, target_branch: None, parent_task_id: None, - contract_id: contract.id, + contract_id: Some(contract.id), target_repo_path: None, completion_action: None, continue_from_task_id: None, @@ -296,6 +296,8 @@ pub async fn create_contract( checkpoint_sha: None, priority: 0, merge_mode: None, + branched_from_task_id: None, + conversation_history: None, }; match repository::create_task_for_owner(pool, auth.owner_id, supervisor_req).await { diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index 275dc3c..99c3d9d 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -9,9 +9,10 @@ use axum::{ use uuid::Uuid; use crate::db::models::{ - CreateTaskRequest, DaemonDirectory, DaemonDirectoriesResponse, DaemonListResponse, - SendMessageRequest, Task, TaskEventListResponse, TaskListResponse, TaskOutputEntry, - TaskOutputResponse, TaskWithSubtasks, UpdateTaskRequest, + BranchTaskRequest, BranchTaskResponse, CreateTaskRequest, DaemonDirectory, + DaemonDirectoriesResponse, DaemonListResponse, SendMessageRequest, Task, + TaskEventListResponse, TaskListResponse, TaskOutputEntry, TaskOutputResponse, + TaskWithSubtasks, UpdateTaskRequest, }; use crate::db::repository::{self, RepositoryError}; use crate::server::auth::Authenticated; @@ -2196,7 +2197,7 @@ pub async fn reassign_task( // Create a NEW task with the conversation context let create_req = CreateTaskRequest { - contract_id: task.contract_id.unwrap_or(Uuid::nil()), + contract_id: task.contract_id, name: format!("{} (resumed)", task.name), description: task.description.clone(), plan: updated_plan.clone(), @@ -2212,6 +2213,8 @@ pub async fn reassign_task( continue_from_task_id: Some(id), // Continue from the old task's worktree if possible copy_files: None, checkpoint_sha: task.last_checkpoint_sha.clone(), + branched_from_task_id: None, + conversation_history: None, }; let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await { @@ -2913,7 +2916,7 @@ pub async fn fork_task( // Create the new forked task let create_req = CreateTaskRequest { - contract_id: task.contract_id.unwrap_or(Uuid::nil()), + contract_id: task.contract_id, name: req.new_task_name.clone(), description: task.description.clone(), plan: req.new_task_plan.clone(), @@ -2929,6 +2932,8 @@ pub async fn fork_task( continue_from_task_id: None, copy_files: None, checkpoint_sha: Some(checkpoint.commit_sha.clone()), + branched_from_task_id: None, + conversation_history: None, }; let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await { @@ -3068,7 +3073,7 @@ pub async fn resume_from_checkpoint( }); let create_req = CreateTaskRequest { - contract_id: task.contract_id.unwrap_or(Uuid::nil()), + contract_id: task.contract_id, name: task_name, description: task.description.clone(), plan: req.plan, @@ -3084,6 +3089,8 @@ pub async fn resume_from_checkpoint( continue_from_task_id: Some(task_id), // Copy worktree from original task copy_files: None, checkpoint_sha: Some(checkpoint.commit_sha.clone()), + branched_from_task_id: None, + conversation_history: None, }; let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await { @@ -3304,3 +3311,253 @@ pub async fn branch_from_checkpoint( ) .into_response() } + +// ============================================================================= +// Task Branching +// ============================================================================= + +/// Branch a task, creating a new anonymous task from an existing task's conversation. +/// +/// Creates a new task that: +/// - Has no contract_id (anonymous task) +/// - Has branched_from_task_id pointing to the source task +/// - Optionally includes conversation history from the source task +/// - Can be started on an available daemon +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/branch", + params( + ("id" = Uuid, Path, description = "Source task ID to branch from") + ), + request_body = BranchTaskRequest, + responses( + (status = 201, description = "Task branched successfully", body = BranchTaskResponse), + (status = 400, description = "Invalid request", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Source task not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn branch_task( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(source_task_id): Path<Uuid>, + Json(req): Json<BranchTaskRequest>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get the source task (must belong to the same owner) + let source_task = match repository::get_task_for_owner(pool, source_task_id, auth.owner_id).await { + Ok(Some(task)) => task, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Source task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get source task: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Build conversation history if requested + let (conversation_history, message_count) = if req.include_conversation { + match repository::get_task_output(pool, source_task_id, Some(500)).await { + Ok(events) => { + let entries: Vec<TaskOutputEntry> = events + .into_iter() + .filter_map(TaskOutputEntry::from_task_event) + .collect(); + let count = entries.len(); + + // Convert entries to a JSON array for conversation_history + let history_json = serde_json::to_value(&entries).unwrap_or(serde_json::Value::Null); + (Some(history_json), count) + } + Err(e) => { + tracing::warn!("Failed to get task output for branching: {}", e); + (None, 0) + } + } + } else { + (None, 0) + }; + + // Generate task name if not provided + let task_name = req.name.unwrap_or_else(|| { + format!("{} (branch)", source_task.name) + }); + + // Create the branched task (anonymous - no contract_id) + let create_req = CreateTaskRequest { + contract_id: None, // Anonymous task + name: task_name, + description: Some(format!("Branched from task: {}", source_task.name)), + plan: req.message, + parent_task_id: None, + is_supervisor: false, + priority: source_task.priority, + repository_url: source_task.repository_url.clone(), + base_branch: source_task.base_branch.clone(), + target_branch: None, // Branched tasks don't auto-merge + merge_mode: None, + target_repo_path: source_task.target_repo_path.clone(), + completion_action: Some("none".to_string()), // Don't auto-complete + continue_from_task_id: Some(source_task_id), // Continue from source task's worktree + copy_files: None, + checkpoint_sha: None, + branched_from_task_id: Some(source_task_id), + conversation_history, + }; + + let task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await { + Ok(task) => task, + Err(e) => { + tracing::error!("Failed to create branched task: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Record history event for task branching + let _ = repository::record_history_event( + pool, + auth.owner_id, + None, // No contract for anonymous tasks + Some(task.id), + "task", + Some("branched"), + None, + serde_json::json!({ + "name": &task.name, + "sourceTaskId": source_task_id, + "sourceTaskName": &source_task.name, + "messageCount": message_count, + }), + ).await; + + // Try to find an available daemon to start the task + let daemon_id = state.daemon_connections + .iter() + .find(|d| d.value().owner_id == auth.owner_id) + .map(|d| d.value().id); + + // If a daemon is available, start the task + if let Some(target_daemon_id) = daemon_id { + // Update task with daemon assignment + let update_req = UpdateTaskRequest { + status: Some("starting".to_string()), + daemon_id: Some(target_daemon_id), + ..Default::default() + }; + + if let Ok(Some(updated_task)) = repository::update_task_for_owner(pool, task.id, auth.owner_id, update_req).await { + // Send SpawnTask command to daemon + let command = DaemonCommand::SpawnTask { + task_id: task.id, + task_name: updated_task.name.clone(), + plan: updated_task.plan.clone(), + repo_url: updated_task.repository_url.clone(), + base_branch: updated_task.base_branch.clone(), + target_branch: updated_task.target_branch.clone(), + parent_task_id: None, + depth: 0, + is_orchestrator: false, + target_repo_path: updated_task.target_repo_path.clone(), + completion_action: updated_task.completion_action.clone(), + continue_from_task_id: updated_task.continue_from_task_id, + copy_files: None, + contract_id: None, + is_supervisor: false, + resume_session: message_count > 0, // Resume if we have conversation history + conversation_history: updated_task.conversation_state.clone(), + }; + + if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { + tracing::warn!( + task_id = %task.id, + daemon_id = %target_daemon_id, + error = %e, + "Failed to send SpawnTask command for branched task, task created but not started" + ); + // Task was created but not started - return without daemon_id + return ( + StatusCode::CREATED, + Json(BranchTaskResponse { + task, + message_count, + daemon_id: None, + }), + ) + .into_response(); + } + + tracing::info!( + task_id = %task.id, + source_task_id = %source_task_id, + daemon_id = %target_daemon_id, + message_count = message_count, + "Branched task created and started" + ); + + // Broadcast task update notification + state.broadcast_task_update(TaskUpdateNotification { + task_id: task.id, + owner_id: Some(auth.owner_id), + version: updated_task.version, + status: "starting".to_string(), + updated_fields: vec!["status".to_string(), "daemon_id".to_string()], + updated_by: "system".to_string(), + }); + + return ( + StatusCode::CREATED, + Json(BranchTaskResponse { + task: updated_task, + message_count, + daemon_id: Some(target_daemon_id), + }), + ) + .into_response(); + } + } + + // No daemon available or failed to start - return task without daemon_id + tracing::info!( + task_id = %task.id, + source_task_id = %source_task_id, + message_count = message_count, + "Branched task created (no daemon available to start)" + ); + + ( + StatusCode::CREATED, + Json(BranchTaskResponse { + task, + message_count, + daemon_id: None, + }), + ) + .into_response() +} diff --git a/makima/src/server/handlers/mesh_chat.rs b/makima/src/server/handlers/mesh_chat.rs index c468446..0fc5513 100644 --- a/makima/src/server/handlers/mesh_chat.rs +++ b/makima/src/server/handlers/mesh_chat.rs @@ -1002,7 +1002,7 @@ async fn handle_mesh_request( }; let create_req = CreateTaskRequest { - contract_id, + contract_id: Some(contract_id), name: name.clone(), description: None, plan, @@ -1018,6 +1018,8 @@ async fn handle_mesh_request( copy_files: None, is_supervisor: false, checkpoint_sha: None, + branched_from_task_id: None, + conversation_history: None, }; match repository::create_task_for_owner(pool, owner_id, create_req).await { diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index e5d33c7..6cdbba6 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -567,7 +567,7 @@ pub async fn spawn_task( description: None, plan: request.plan.clone(), repository_url: repo_url.clone(), - contract_id: request.contract_id, + contract_id: Some(request.contract_id), parent_task_id: request.parent_task_id, is_supervisor: false, checkpoint_sha: request.checkpoint_sha.clone(), @@ -579,6 +579,8 @@ pub async fn spawn_task( completion_action: None, continue_from_task_id: None, copy_files: None, + branched_from_task_id: None, + conversation_history: None, }; // Create task in DB diff --git a/makima/src/server/handlers/transcript_analysis.rs b/makima/src/server/handlers/transcript_analysis.rs index 99f9ea7..3b71eca 100644 --- a/makima/src/server/handlers/transcript_analysis.rs +++ b/makima/src/server/handlers/transcript_analysis.rs @@ -344,7 +344,7 @@ pub async fn create_contract_from_analysis( if request.include_action_items && !analysis.action_items.is_empty() { for item in &analysis.action_items { let task_req = models::CreateTaskRequest { - contract_id: contract.id, + contract_id: Some(contract.id), name: truncate_for_name(&item.text, 100), description: Some(format!("Action item from transcript (Speaker: {})", item.speaker)), plan: item.text.clone(), @@ -364,6 +364,8 @@ pub async fn create_contract_from_analysis( _ => 0, }, merge_mode: None, + branched_from_task_id: None, + conversation_history: None, }; if let Ok(t) = repository::create_task_for_owner(pool, auth.owner_id, task_req).await { @@ -515,7 +517,7 @@ pub async fn update_contract_from_analysis( if request.create_tasks && !analysis.action_items.is_empty() { for item in &analysis.action_items { let task_req = models::CreateTaskRequest { - contract_id: request.contract_id, + contract_id: Some(request.contract_id), name: truncate_for_name(&item.text, 100), description: Some(format!("Action item from {} (Speaker: {})", file.name, item.speaker)), plan: item.text.clone(), @@ -531,6 +533,8 @@ pub async fn update_contract_from_analysis( checkpoint_sha: None, priority: 0, merge_mode: None, + branched_from_task_id: None, + conversation_history: None, }; if let Ok(t) = repository::create_task_for_owner(pool, auth.owner_id, task_req).await { diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index 7e31285..d575997 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -111,6 +111,8 @@ pub fn make_router(state: SharedState) -> Router { .route("/mesh/tasks/{id}/fork", post(mesh::fork_task)) .route("/mesh/tasks/{id}/checkpoints/{cid}/resume", post(mesh::resume_from_checkpoint)) .route("/mesh/tasks/{id}/checkpoints/{cid}/branch", post(mesh::branch_from_checkpoint)) + // Task branching endpoint + .route("/mesh/tasks/{id}/branch", post(mesh::branch_task)) // Supervisor endpoints (for supervisor.sh) .route("/mesh/supervisor/contracts/{contract_id}/tasks", get(mesh_supervisor::list_contract_tasks)) .route("/mesh/supervisor/contracts/{contract_id}/tree", get(mesh_supervisor::get_contract_tree)) @@ -241,14 +243,23 @@ const DAEMON_CLEANUP_INTERVAL_SECS: u64 = 60; /// Daemon heartbeat timeout in seconds (delete daemons older than this) const DAEMON_HEARTBEAT_TIMEOUT_SECS: i64 = 120; +/// Anonymous task cleanup interval in seconds (24 hours) +const ANONYMOUS_TASK_CLEANUP_INTERVAL_SECS: u64 = 24 * 60 * 60; +/// Maximum age in days for anonymous tasks before cleanup +const ANONYMOUS_TASK_MAX_AGE_DAYS: i32 = 7; + /// Run the HTTP server with graceful shutdown support. /// /// # Arguments /// * `state` - Shared application state containing ML models /// * `addr` - Address to bind to (e.g., "0.0.0.0:8080") pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> { - // Start background daemon cleanup task if database is available + // Start background cleanup tasks if database is available if let Some(pool) = state.db_pool.clone() { + // Clone pool for each background task that needs it + let daemon_cleanup_pool = pool.clone(); + let anonymous_task_cleanup_pool = pool.clone(); + // Initial cleanup of any stale daemons from previous server run match crate::db::repository::delete_stale_daemons(&pool, 0).await { Ok(deleted) if deleted > 0 => { @@ -263,7 +274,25 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> { _ => {} } - // Spawn periodic cleanup task + // Initial cleanup of any stale anonymous tasks + match crate::db::repository::cleanup_stale_anonymous_tasks( + &pool, + ANONYMOUS_TASK_MAX_AGE_DAYS, + ).await { + Ok(deleted) if deleted > 0 => { + tracing::info!( + deleted = deleted, + max_age_days = ANONYMOUS_TASK_MAX_AGE_DAYS, + "Cleaned up stale anonymous tasks on startup" + ); + } + Err(e) => { + tracing::warn!(error = %e, "Failed to clean up stale anonymous tasks on startup"); + } + _ => {} + } + + // Spawn periodic daemon cleanup task tokio::spawn(async move { let mut interval = tokio::time::interval( std::time::Duration::from_secs(DAEMON_CLEANUP_INTERVAL_SECS) @@ -271,7 +300,7 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> { loop { interval.tick().await; match crate::db::repository::delete_stale_daemons( - &pool, + &daemon_cleanup_pool, DAEMON_HEARTBEAT_TIMEOUT_SECS, ).await { Ok(deleted) if deleted > 0 => { @@ -288,6 +317,32 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> { } } }); + + // Spawn periodic anonymous task cleanup task (runs daily) + tokio::spawn(async move { + let mut interval = tokio::time::interval( + std::time::Duration::from_secs(ANONYMOUS_TASK_CLEANUP_INTERVAL_SECS) + ); + loop { + interval.tick().await; + match crate::db::repository::cleanup_stale_anonymous_tasks( + &anonymous_task_cleanup_pool, + ANONYMOUS_TASK_MAX_AGE_DAYS, + ).await { + Ok(deleted) if deleted > 0 => { + tracing::info!( + deleted = deleted, + max_age_days = ANONYMOUS_TASK_MAX_AGE_DAYS, + "Cleaned up stale anonymous tasks" + ); + } + Err(e) => { + tracing::warn!(error = %e, "Failed to clean up stale anonymous tasks"); + } + _ => {} + } + } + }); } let app = make_router(state); diff --git a/makima/src/server/openapi.rs b/makima/src/server/openapi.rs index 4daae3b..f8c5474 100644 --- a/makima/src/server/openapi.rs +++ b/makima/src/server/openapi.rs @@ -4,17 +4,17 @@ use utoipa::OpenApi; use crate::db::models::{ AddLocalRepositoryRequest, AddRemoteRepositoryRequest, BranchInfo, BranchListResponse, - ChangePhaseRequest, Contract, ContractChatHistoryResponse, - ContractChatMessageRecord, ContractEvent, ContractListResponse, ContractRepository, - ContractSummary, ContractWithRelations, CreateContractRequest, CreateFileRequest, - CreateManagedRepositoryRequest, CreateTaskRequest, Daemon, DaemonDirectoriesResponse, - DaemonDirectory, DaemonListResponse, File, FileListResponse, FileSummary, MergeCommitRequest, - MergeCompleteCheckResponse, MergeResolveRequest, MergeResultResponse, MergeSkipRequest, - MergeStartRequest, MergeStatusResponse, MeshChatConversation, MeshChatHistoryResponse, - MeshChatMessageRecord, RepositoryHistoryEntry, RepositoryHistoryListResponse, - RepositorySuggestionsQuery, SendMessageRequest, Task, TaskEventListResponse, TaskListResponse, - TaskSummary, TaskWithSubtasks, TranscriptEntry, UpdateContractRequest, UpdateFileRequest, - UpdateTaskRequest, + BranchTaskRequest, BranchTaskResponse, ChangePhaseRequest, Contract, + ContractChatHistoryResponse, ContractChatMessageRecord, ContractEvent, ContractListResponse, + ContractRepository, ContractSummary, ContractWithRelations, CreateContractRequest, + CreateFileRequest, CreateManagedRepositoryRequest, CreateTaskRequest, Daemon, + DaemonDirectoriesResponse, DaemonDirectory, DaemonListResponse, File, FileListResponse, + FileSummary, MergeCommitRequest, MergeCompleteCheckResponse, MergeResolveRequest, + MergeResultResponse, MergeSkipRequest, MergeStartRequest, MergeStatusResponse, + MeshChatConversation, MeshChatHistoryResponse, MeshChatMessageRecord, RepositoryHistoryEntry, + RepositoryHistoryListResponse, RepositorySuggestionsQuery, SendMessageRequest, Task, + TaskEventListResponse, TaskListResponse, TaskSummary, TaskWithSubtasks, TranscriptEntry, + UpdateContractRequest, UpdateFileRequest, UpdateTaskRequest, }; use crate::server::auth::{ ApiKey, ApiKeyInfoResponse, CreateApiKeyRequest, CreateApiKeyResponse, @@ -57,6 +57,7 @@ use crate::server::messages::{ApiError, AudioEncoding, StartMessage, StopMessage mesh::get_daemon_directories, mesh::clone_worktree, mesh::check_target_exists, + mesh::branch_task, mesh_chat::get_chat_history, mesh_chat::clear_chat_history, // Merge endpoints @@ -123,6 +124,8 @@ use crate::server::messages::{ApiError, AudioEncoding, StartMessage, StopMessage CreateTaskRequest, UpdateTaskRequest, SendMessageRequest, + BranchTaskRequest, + BranchTaskResponse, TaskEventListResponse, Daemon, DaemonListResponse, |
