summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-16 18:46:22 +0000
committersoryu <soryu@soryu.co>2026-01-17 05:38:07 +0000
commit6b94b5895ed27e3aef052a1843fb3f334397d1b4 (patch)
treea55ba3ba6b806efc60cf580e6202cf10666b5992
parent4de5b1857c7ac637b8826ce785e1db97cf0e02e3 (diff)
downloadsoryu-6b94b5895ed27e3aef052a1843fb3f334397d1b4.tar.gz
soryu-6b94b5895ed27e3aef052a1843fb3f334397d1b4.zip
Update continue task system and daemon IDs
-rw-r--r--makima/frontend/src/routes/mesh.tsx25
-rw-r--r--makima/src/server/handlers/mesh.rs67
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs48
3 files changed, 89 insertions, 51 deletions
diff --git a/makima/frontend/src/routes/mesh.tsx b/makima/frontend/src/routes/mesh.tsx
index 050381a..cc09bca 100644
--- a/makima/frontend/src/routes/mesh.tsx
+++ b/makima/frontend/src/routes/mesh.tsx
@@ -8,7 +8,7 @@ import { UnifiedMeshChatInput } from "../components/mesh/UnifiedMeshChatInput";
import { useTasks } from "../hooks/useTasks";
import { useTaskSubscription, type TaskUpdateEvent, type TaskOutputEvent } from "../hooks/useTaskSubscription";
import type { TaskWithSubtasks, MeshChatContext, ContractSummary, ContractWithRelations, DaemonDirectory } from "../lib/api";
-import { startTask as startTaskApi, stopTask as stopTaskApi, getTaskOutput, listContracts, getContract, getDaemonDirectories, continueTask as continueTaskApi } from "../lib/api";
+import { startTask as startTaskApi, stopTask as stopTaskApi, getTaskOutput, listContracts, getContract, getDaemonDirectories, continueTask as continueTaskApi, resumeSupervisor } from "../lib/api";
import { DirectoryInput } from "../components/mesh/DirectoryInput";
import { useAuth } from "../contexts/AuthContext";
import { useSupervisorQuestions } from "../contexts/SupervisorQuestionsContext";
@@ -387,16 +387,29 @@ export default function MeshPage() {
const handleContinue = useCallback(
async (taskId: string) => {
try {
- // Continue the task with conversation context from previous run
- const result = await continueTaskApi(taskId);
- console.log(`[Mesh] Task continued with ${result.contextEntries} context entries`);
- setTaskDetail((prev) => prev ? { ...prev, ...result.task } : prev);
+ // Check if this is a supervisor task - use resumeSupervisor API instead
+ if (taskDetail?.isSupervisor && taskDetail?.contractId) {
+ const result = await resumeSupervisor(taskDetail.contractId, {
+ resumeMode: "continue",
+ });
+ console.log(`[Mesh] Supervisor resumed, daemon: ${result.daemonId}`);
+ // Refresh task detail to get updated state
+ const updated = await fetchTask(taskId);
+ if (updated) {
+ setTaskDetail(updated);
+ }
+ } else {
+ // Continue regular task with conversation context from previous run
+ const result = await continueTaskApi(taskId);
+ console.log(`[Mesh] Task continued with ${result.contextEntries} context entries`);
+ setTaskDetail((prev) => prev ? { ...prev, ...result.task } : prev);
+ }
} catch (e) {
console.error("Failed to continue task:", e);
alert(e instanceof Error ? e.message : "Failed to continue task");
}
},
- []
+ [taskDetail?.isSupervisor, taskDetail?.contractId, fetchTask]
);
const handleSave = useCallback(
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs
index 3cd38b5..cdda3fd 100644
--- a/makima/src/server/handlers/mesh.rs
+++ b/makima/src/server/handlers/mesh.rs
@@ -1847,49 +1847,66 @@ pub struct ReassignTaskResponse {
/// Build a conversation context summary from task output entries.
/// Returns a formatted string that can be prepended to the task plan.
+/// Only includes user and assistant messages - tool use is excluded for cleaner context.
+/// Limited to ~4000 characters to avoid bloating the plan.
fn build_conversation_context(entries: &[TaskOutputEntry]) -> String {
+ const MAX_CONTEXT_LEN: usize = 4000;
+ const MAX_MESSAGE_LEN: usize = 500;
+
if entries.is_empty() {
return String::new();
}
- let mut context = String::from("\n\n=== PREVIOUS CONVERSATION CONTEXT ===\n");
- context.push_str("The daemon running this task disconnected. Here is what happened so far:\n\n");
-
+ // Collect only user and assistant messages
+ let mut messages: Vec<String> = Vec::new();
for entry in entries.iter() {
match entry.message_type.as_str() {
"assistant" => {
- context.push_str("Assistant: ");
// Truncate long messages
- let content = if entry.content.len() > 500 {
- format!("{}... [truncated]", &entry.content[..500])
+ let content = if entry.content.len() > MAX_MESSAGE_LEN {
+ format!("{}... [truncated]", &entry.content[..MAX_MESSAGE_LEN])
} else {
entry.content.clone()
};
- context.push_str(&content);
- context.push_str("\n\n");
- }
- "tool_use" => {
- if let Some(ref tool_name) = entry.tool_name {
- context.push_str(&format!("[Used tool: {}]\n", tool_name));
- }
+ messages.push(format!("Assistant: {}\n", content));
}
- "tool_result" => {
- // Summarize tool results briefly
- if entry.content.len() > 200 {
- context.push_str(&format!("[Tool result: {}... truncated]\n", &entry.content[..200]));
- } else if !entry.content.is_empty() {
- context.push_str(&format!("[Tool result: {}]\n", entry.content));
- }
- }
- "user" => {
- context.push_str("User: ");
- context.push_str(&entry.content);
- context.push_str("\n\n");
+ "user" | "user_input" => {
+ let content = if entry.content.len() > MAX_MESSAGE_LEN {
+ format!("{}... [truncated]", &entry.content[..MAX_MESSAGE_LEN])
+ } else {
+ entry.content.clone()
+ };
+ messages.push(format!("User: {}\n", content));
}
+ // Skip tool_use, tool_result, and other message types for cleaner context
_ => {}
}
}
+ if messages.is_empty() {
+ return String::new();
+ }
+
+ // Build context from the end (most recent messages) up to the max length
+ let mut context_body = String::new();
+ let mut included_count = 0;
+ for msg in messages.iter().rev() {
+ if context_body.len() + msg.len() > MAX_CONTEXT_LEN {
+ break;
+ }
+ context_body = format!("{}{}\n", msg, context_body);
+ included_count += 1;
+ }
+
+ // If we couldn't include all messages, note how many were skipped
+ let skipped = messages.len() - included_count;
+ let mut context = String::from("\n\n=== PREVIOUS CONVERSATION CONTEXT ===\n");
+ context.push_str("The daemon running this task disconnected. Here is what happened so far:\n");
+ if skipped > 0 {
+ context.push_str(&format!("(Showing last {} of {} messages)\n", included_count, messages.len()));
+ }
+ context.push('\n');
+ context.push_str(&context_body);
context.push_str("=== END PREVIOUS CONTEXT ===\n\n");
context.push_str("Please continue from where the conversation left off. Do not repeat work that was already done.\n\n");
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index 9833d51..4bcb5cd 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -556,9 +556,9 @@ pub async fn daemon_handler(
async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_result: DaemonAuthResult) {
let (mut sender, mut receiver) = socket.split();
- // Generate a unique connection ID and daemon ID
+ // Generate a unique connection ID (daemon ID will come from database)
let connection_id = Uuid::new_v4().to_string();
- let daemon_id = Uuid::new_v4();
+ let mut daemon_id: Option<Uuid> = None;
let owner_id = auth_result.owner_id;
// Create command channel for sending commands to this daemon
@@ -580,17 +580,8 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
// API key was already validated via headers, but we use this message
// for backward compatibility to get the machine_id and hostname
- tracing::info!(
- daemon_id = %daemon_id,
- owner_id = %owner_id,
- hostname = %hostname,
- machine_id = %machine_id,
- max_concurrent_tasks = max_concurrent_tasks,
- "Daemon registered"
- );
-
- // Register daemon in database
- if let Some(ref pool) = state.db_pool {
+ // Register daemon in database first to get the persistent ID
+ let db_daemon_id = if let Some(ref pool) = state.db_pool {
match repository::register_daemon(
pool,
owner_id,
@@ -600,24 +591,40 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
max_concurrent_tasks as i32,
).await {
Ok(db_daemon) => {
- tracing::debug!(
+ tracing::info!(
daemon_id = %db_daemon.id,
+ owner_id = %owner_id,
+ hostname = %hostname,
+ machine_id = %machine_id,
+ max_concurrent_tasks = max_concurrent_tasks,
"Daemon registered in database"
);
+ Some(db_daemon.id)
}
Err(e) => {
tracing::error!(
error = %e,
"Failed to register daemon in database"
);
+ None
}
}
- }
+ } else {
+ None
+ };
+
+ // If database registration failed, we can't proceed
+ let Some(actual_daemon_id) = db_daemon_id else {
+ tracing::error!("Cannot proceed without database daemon ID");
+ break;
+ };
+
+ daemon_id = Some(actual_daemon_id);
- // Register daemon in state with owner_id
+ // Register daemon in state with owner_id using database ID
state.register_daemon(
connection_id.clone(),
- daemon_id,
+ actual_daemon_id,
owner_id,
Some(hostname),
Some(machine_id),
@@ -626,8 +633,8 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
registered = true;
- // Send authentication confirmation
- let response = DaemonCommand::Authenticated { daemon_id };
+ // Send authentication confirmation with database ID
+ let response = DaemonCommand::Authenticated { daemon_id: actual_daemon_id };
let json = serde_json::to_string(&response).unwrap();
if sender.send(Message::Text(json.into())).await.is_err() {
break;
@@ -672,7 +679,8 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
return;
}
- let daemon_uuid = daemon_id;
+ // daemon_id is guaranteed to be Some here since registered is true
+ let daemon_uuid = daemon_id.expect("daemon_id should be set when registered is true");
// Main message loop after authentication
loop {