diff options
| author | soryu <soryu@soryu.co> | 2026-01-16 18:46:22 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-17 05:38:07 +0000 |
| commit | 6b94b5895ed27e3aef052a1843fb3f334397d1b4 (patch) | |
| tree | a55ba3ba6b806efc60cf580e6202cf10666b5992 | |
| parent | 4de5b1857c7ac637b8826ce785e1db97cf0e02e3 (diff) | |
| download | soryu-6b94b5895ed27e3aef052a1843fb3f334397d1b4.tar.gz soryu-6b94b5895ed27e3aef052a1843fb3f334397d1b4.zip | |
Update continue task system and daemon IDs
| -rw-r--r-- | makima/frontend/src/routes/mesh.tsx | 25 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh.rs | 67 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 48 |
3 files changed, 89 insertions, 51 deletions
diff --git a/makima/frontend/src/routes/mesh.tsx b/makima/frontend/src/routes/mesh.tsx index 050381a..cc09bca 100644 --- a/makima/frontend/src/routes/mesh.tsx +++ b/makima/frontend/src/routes/mesh.tsx @@ -8,7 +8,7 @@ import { UnifiedMeshChatInput } from "../components/mesh/UnifiedMeshChatInput"; import { useTasks } from "../hooks/useTasks"; import { useTaskSubscription, type TaskUpdateEvent, type TaskOutputEvent } from "../hooks/useTaskSubscription"; import type { TaskWithSubtasks, MeshChatContext, ContractSummary, ContractWithRelations, DaemonDirectory } from "../lib/api"; -import { startTask as startTaskApi, stopTask as stopTaskApi, getTaskOutput, listContracts, getContract, getDaemonDirectories, continueTask as continueTaskApi } from "../lib/api"; +import { startTask as startTaskApi, stopTask as stopTaskApi, getTaskOutput, listContracts, getContract, getDaemonDirectories, continueTask as continueTaskApi, resumeSupervisor } from "../lib/api"; import { DirectoryInput } from "../components/mesh/DirectoryInput"; import { useAuth } from "../contexts/AuthContext"; import { useSupervisorQuestions } from "../contexts/SupervisorQuestionsContext"; @@ -387,16 +387,29 @@ export default function MeshPage() { const handleContinue = useCallback( async (taskId: string) => { try { - // Continue the task with conversation context from previous run - const result = await continueTaskApi(taskId); - console.log(`[Mesh] Task continued with ${result.contextEntries} context entries`); - setTaskDetail((prev) => prev ? { ...prev, ...result.task } : prev); + // Check if this is a supervisor task - use resumeSupervisor API instead + if (taskDetail?.isSupervisor && taskDetail?.contractId) { + const result = await resumeSupervisor(taskDetail.contractId, { + resumeMode: "continue", + }); + console.log(`[Mesh] Supervisor resumed, daemon: ${result.daemonId}`); + // Refresh task detail to get updated state + const updated = await fetchTask(taskId); + if (updated) { + setTaskDetail(updated); + } + } else { + // Continue regular task with conversation context from previous run + const result = await continueTaskApi(taskId); + console.log(`[Mesh] Task continued with ${result.contextEntries} context entries`); + setTaskDetail((prev) => prev ? { ...prev, ...result.task } : prev); + } } catch (e) { console.error("Failed to continue task:", e); alert(e instanceof Error ? e.message : "Failed to continue task"); } }, - [] + [taskDetail?.isSupervisor, taskDetail?.contractId, fetchTask] ); const handleSave = useCallback( diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index 3cd38b5..cdda3fd 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -1847,49 +1847,66 @@ pub struct ReassignTaskResponse { /// Build a conversation context summary from task output entries. /// Returns a formatted string that can be prepended to the task plan. +/// Only includes user and assistant messages - tool use is excluded for cleaner context. +/// Limited to ~4000 characters to avoid bloating the plan. fn build_conversation_context(entries: &[TaskOutputEntry]) -> String { + const MAX_CONTEXT_LEN: usize = 4000; + const MAX_MESSAGE_LEN: usize = 500; + if entries.is_empty() { return String::new(); } - let mut context = String::from("\n\n=== PREVIOUS CONVERSATION CONTEXT ===\n"); - context.push_str("The daemon running this task disconnected. Here is what happened so far:\n\n"); - + // Collect only user and assistant messages + let mut messages: Vec<String> = Vec::new(); for entry in entries.iter() { match entry.message_type.as_str() { "assistant" => { - context.push_str("Assistant: "); // Truncate long messages - let content = if entry.content.len() > 500 { - format!("{}... [truncated]", &entry.content[..500]) + let content = if entry.content.len() > MAX_MESSAGE_LEN { + format!("{}... [truncated]", &entry.content[..MAX_MESSAGE_LEN]) } else { entry.content.clone() }; - context.push_str(&content); - context.push_str("\n\n"); - } - "tool_use" => { - if let Some(ref tool_name) = entry.tool_name { - context.push_str(&format!("[Used tool: {}]\n", tool_name)); - } + messages.push(format!("Assistant: {}\n", content)); } - "tool_result" => { - // Summarize tool results briefly - if entry.content.len() > 200 { - context.push_str(&format!("[Tool result: {}... truncated]\n", &entry.content[..200])); - } else if !entry.content.is_empty() { - context.push_str(&format!("[Tool result: {}]\n", entry.content)); - } - } - "user" => { - context.push_str("User: "); - context.push_str(&entry.content); - context.push_str("\n\n"); + "user" | "user_input" => { + let content = if entry.content.len() > MAX_MESSAGE_LEN { + format!("{}... [truncated]", &entry.content[..MAX_MESSAGE_LEN]) + } else { + entry.content.clone() + }; + messages.push(format!("User: {}\n", content)); } + // Skip tool_use, tool_result, and other message types for cleaner context _ => {} } } + if messages.is_empty() { + return String::new(); + } + + // Build context from the end (most recent messages) up to the max length + let mut context_body = String::new(); + let mut included_count = 0; + for msg in messages.iter().rev() { + if context_body.len() + msg.len() > MAX_CONTEXT_LEN { + break; + } + context_body = format!("{}{}\n", msg, context_body); + included_count += 1; + } + + // If we couldn't include all messages, note how many were skipped + let skipped = messages.len() - included_count; + let mut context = String::from("\n\n=== PREVIOUS CONVERSATION CONTEXT ===\n"); + context.push_str("The daemon running this task disconnected. Here is what happened so far:\n"); + if skipped > 0 { + context.push_str(&format!("(Showing last {} of {} messages)\n", included_count, messages.len())); + } + context.push('\n'); + context.push_str(&context_body); context.push_str("=== END PREVIOUS CONTEXT ===\n\n"); context.push_str("Please continue from where the conversation left off. Do not repeat work that was already done.\n\n"); diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 9833d51..4bcb5cd 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -556,9 +556,9 @@ pub async fn daemon_handler( async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_result: DaemonAuthResult) { let (mut sender, mut receiver) = socket.split(); - // Generate a unique connection ID and daemon ID + // Generate a unique connection ID (daemon ID will come from database) let connection_id = Uuid::new_v4().to_string(); - let daemon_id = Uuid::new_v4(); + let mut daemon_id: Option<Uuid> = None; let owner_id = auth_result.owner_id; // Create command channel for sending commands to this daemon @@ -580,17 +580,8 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re // API key was already validated via headers, but we use this message // for backward compatibility to get the machine_id and hostname - tracing::info!( - daemon_id = %daemon_id, - owner_id = %owner_id, - hostname = %hostname, - machine_id = %machine_id, - max_concurrent_tasks = max_concurrent_tasks, - "Daemon registered" - ); - - // Register daemon in database - if let Some(ref pool) = state.db_pool { + // Register daemon in database first to get the persistent ID + let db_daemon_id = if let Some(ref pool) = state.db_pool { match repository::register_daemon( pool, owner_id, @@ -600,24 +591,40 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re max_concurrent_tasks as i32, ).await { Ok(db_daemon) => { - tracing::debug!( + tracing::info!( daemon_id = %db_daemon.id, + owner_id = %owner_id, + hostname = %hostname, + machine_id = %machine_id, + max_concurrent_tasks = max_concurrent_tasks, "Daemon registered in database" ); + Some(db_daemon.id) } Err(e) => { tracing::error!( error = %e, "Failed to register daemon in database" ); + None } } - } + } else { + None + }; + + // If database registration failed, we can't proceed + let Some(actual_daemon_id) = db_daemon_id else { + tracing::error!("Cannot proceed without database daemon ID"); + break; + }; + + daemon_id = Some(actual_daemon_id); - // Register daemon in state with owner_id + // Register daemon in state with owner_id using database ID state.register_daemon( connection_id.clone(), - daemon_id, + actual_daemon_id, owner_id, Some(hostname), Some(machine_id), @@ -626,8 +633,8 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re registered = true; - // Send authentication confirmation - let response = DaemonCommand::Authenticated { daemon_id }; + // Send authentication confirmation with database ID + let response = DaemonCommand::Authenticated { daemon_id: actual_daemon_id }; let json = serde_json::to_string(&response).unwrap(); if sender.send(Message::Text(json.into())).await.is_err() { break; @@ -672,7 +679,8 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re return; } - let daemon_uuid = daemon_id; + // daemon_id is guaranteed to be Some here since registered is true + let daemon_uuid = daemon_id.expect("daemon_id should be set when registered is true"); // Main message loop after authentication loop { |
