//! Chat endpoint for LLM-powered task orchestration. //! //! This handler provides an agentic loop for managing tasks, daemons, and //! overlay operations through LLM tool calling. use axum::{ extract::{Path, State}, http::StatusCode, response::IntoResponse, Json, }; use serde::{Deserialize, Serialize}; use serde_json::json; use utoipa::ToSchema; use uuid::Uuid; use crate::db::{models::CreateTaskRequest, repository}; use crate::llm::{ claude::{self, ClaudeClient, ClaudeError, ClaudeModel}, groq::{GroqClient, GroqError, Message, ToolCallResponse}, parse_mesh_tool_call, LlmModel, MeshToolRequest, ToolCall, ToolResult, UserQuestion, MESH_TOOLS, }; use crate::server::auth::Authenticated; use crate::server::state::{DaemonCommand, SharedState, TaskUpdateNotification}; /// Maximum number of tool-calling rounds to prevent infinite loops const MAX_TOOL_ROUNDS: usize = 30; #[derive(Debug, Clone, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct MeshChatHistoryMessage { /// Role: "user" or "assistant" pub role: String, /// Message content pub content: String, } #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct MeshChatRequest { /// The user's message/instruction pub message: String, /// Optional model selection: "claude-sonnet" (default), "claude-opus", or "groq" #[serde(default)] pub model: Option, /// Optional conversation history for context continuity (deprecated - now loaded from DB) #[serde(default)] pub history: Option>, /// Context type: "mesh", "task", or "subtask" #[serde(default)] pub context_type: Option, /// Task ID if context is task/subtask #[serde(default)] pub context_task_id: Option, } #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct MeshChatResponse { /// The LLM's response message pub response: String, /// Tool calls that were executed pub tool_calls: Vec, /// Questions pending user answers (pauses conversation) #[serde(skip_serializing_if = "Option::is_none")] pub pending_questions: Option>, } #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct MeshToolCallInfo { pub name: String, pub result: ToolResult, } /// Enum to hold LLM clients enum LlmClient { Groq(GroqClient), Claude(ClaudeClient), } /// Unified result from LLM call struct LlmResult { content: Option, tool_calls: Vec, raw_tool_calls: Vec, finish_reason: String, } /// Chat with mesh orchestrator at the top level (no specific task context) #[utoipa::path( post, path = "/api/v1/mesh/chat", request_body = MeshChatRequest, responses( (status = 200, description = "Chat completed successfully", body = MeshChatResponse), (status = 401, description = "Unauthorized"), (status = 500, description = "Internal server error") ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn mesh_toplevel_chat_handler( State(state): State, Authenticated(auth): Authenticated, Json(request): Json, ) -> impl IntoResponse { // Check if database is configured let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(json!({ "error": "Database not configured" })), ) .into_response(); }; // Parse model selection (default to Claude Sonnet) let model = request .model .as_ref() .and_then(|m| LlmModel::from_str(m)) .unwrap_or(LlmModel::ClaudeSonnet); tracing::info!("Mesh top-level chat using LLM model: {:?}", model); // Initialize the appropriate LLM client let llm_client = match model { LlmModel::ClaudeSonnet => match ClaudeClient::from_env(ClaudeModel::Sonnet) { Ok(client) => LlmClient::Claude(client), Err(ClaudeError::MissingApiKey) => { return ( StatusCode::SERVICE_UNAVAILABLE, Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })), ) .into_response(); } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(json!({ "error": format!("Claude client error: {}", e) })), ) .into_response(); } }, LlmModel::ClaudeOpus => match ClaudeClient::from_env(ClaudeModel::Opus) { Ok(client) => LlmClient::Claude(client), Err(ClaudeError::MissingApiKey) => { return ( StatusCode::SERVICE_UNAVAILABLE, Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })), ) .into_response(); } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(json!({ "error": format!("Claude client error: {}", e) })), ) .into_response(); } }, LlmModel::GroqKimi => match GroqClient::from_env() { Ok(client) => LlmClient::Groq(client), Err(GroqError::MissingApiKey) => { return ( StatusCode::SERVICE_UNAVAILABLE, Json(json!({ "error": "GROQ_API_KEY not configured" })), ) .into_response(); } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(json!({ "error": format!("Groq client error: {}", e) })), ) .into_response(); } }, }; // Build context about all tasks and daemons let mesh_context = build_mesh_overview_context(pool, &state, auth.owner_id).await; // Build agentic system prompt for top-level mesh orchestration let system_prompt = format!( r#"You are an intelligent task orchestration agent. You help users manage and coordinate tasks running on connected daemons with Claude Code containers. ## Your Capabilities You have access to tools for: - **Task Lifecycle**: create_task, run_task, pause_task, resume_task, interrupt_task, discard_task - **Task Queries**: query_task_status, list_tasks, list_subtasks, list_siblings, list_daemons - **File Access**: list_files, read_file (read documents from the files system) - **Task Communication**: send_message_to_task, update_task_plan - **Overlay/Merge Operations**: peek_sibling_overlay, get_overlay_diff, preview_merge, merge_subtask, complete_task, set_merge_mode ## Current Mesh Overview {mesh_context} ## Agentic Behavior Guidelines ### 1. Analyze Before Acting - For complex orchestration requests, first gather information using query_task_status, list_tasks, or list_daemons - Understand the current state before making changes - For simple, direct requests (e.g., "create a new task"), you can act immediately ### 2. Plan Multi-Step Operations - Break complex orchestration into logical steps - For parallel execution: create multiple subtasks, then run them on different daemons - For sequential execution: create subtasks and run them in order ### 3. Create and Manage Tasks - Use create_task to create new top-level tasks or subtasks - Assign appropriate priorities and plans - **Repository Default**: When creating tasks, use the daemon's working directory as the repository_url by default (shown as "Default Repository" above). Only omit repository_url if the task doesn't involve code, or use a different URL if the user explicitly requests it. - If a working directory is a git repository, use it as the repository_url for code-related tasks ### 4. Coordinate Multiple Tasks - Use list_tasks to see all tasks and their statuses - Use list_daemons to see available compute resources - Balance workload across daemons ### 5. Be Efficient - Don't over-analyze simple requests - Use the minimum number of tool calls needed - Provide clear summaries of actions taken ## Important Notes - Task IDs are UUIDs - ensure you use the correct format - Running a task requires at least one connected daemon - When creating subtasks, specify the parent_task_id - Always confirm destructive operations (discard_task) with the user"#, mesh_context = mesh_context ); // Run the shared agentic loop run_mesh_agentic_loop(pool, &state, &llm_client, system_prompt, &request, auth.owner_id).await } /// Chat with task mesh orchestrator using LLM tool calling (scoped by owner) #[utoipa::path( post, path = "/api/v1/mesh/tasks/{id}/chat", request_body = MeshChatRequest, responses( (status = 200, description = "Chat completed successfully", body = MeshChatResponse), (status = 401, description = "Unauthorized"), (status = 404, description = "Task not found"), (status = 500, description = "Internal server error") ), params( ("id" = Uuid, Path, description = "Task ID (context for orchestration)") ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn mesh_chat_handler( State(state): State, Authenticated(auth): Authenticated, Path(task_id): Path, Json(request): Json, ) -> impl IntoResponse { // Check if database is configured let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(json!({ "error": "Database not configured" })), ) .into_response(); }; // Get the context task (scoped by owner) let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { Ok(Some(task)) => task, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(json!({ "error": "Task not found" })), ) .into_response(); } Err(e) => { tracing::error!("Database error: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(json!({ "error": format!("Database error: {}", e) })), ) .into_response(); } }; // Parse model selection (default to Claude Sonnet) let model = request .model .as_ref() .and_then(|m| LlmModel::from_str(m)) .unwrap_or(LlmModel::ClaudeSonnet); tracing::info!("Mesh chat using LLM model: {:?}", model); // Initialize the appropriate LLM client let llm_client = match model { LlmModel::ClaudeSonnet => match ClaudeClient::from_env(ClaudeModel::Sonnet) { Ok(client) => LlmClient::Claude(client), Err(ClaudeError::MissingApiKey) => { return ( StatusCode::SERVICE_UNAVAILABLE, Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })), ) .into_response(); } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(json!({ "error": format!("Claude client error: {}", e) })), ) .into_response(); } }, LlmModel::ClaudeOpus => match ClaudeClient::from_env(ClaudeModel::Opus) { Ok(client) => LlmClient::Claude(client), Err(ClaudeError::MissingApiKey) => { return ( StatusCode::SERVICE_UNAVAILABLE, Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })), ) .into_response(); } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(json!({ "error": format!("Claude client error: {}", e) })), ) .into_response(); } }, LlmModel::GroqKimi => match GroqClient::from_env() { Ok(client) => LlmClient::Groq(client), Err(GroqError::MissingApiKey) => { return ( StatusCode::SERVICE_UNAVAILABLE, Json(json!({ "error": "GROQ_API_KEY not configured" })), ) .into_response(); } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(json!({ "error": format!("Groq client error: {}", e) })), ) .into_response(); } }, }; // Build context about the current task and mesh state let task_context = build_task_context(&task); // Build agentic system prompt for task orchestration let system_prompt = format!( r#"You are an intelligent task orchestration agent. You help users manage and coordinate tasks running on connected daemons with Claude Code containers. ## Your Capabilities You have access to tools for: - **Task Lifecycle**: create_task, run_task, pause_task, resume_task, interrupt_task, discard_task - **Task Queries**: query_task_status, list_tasks, list_subtasks, list_siblings, list_daemons - **File Access**: list_files, read_file (read documents from the files system) - **Task Communication**: send_message_to_task, update_task_plan - **Overlay/Merge Operations**: peek_sibling_overlay, get_overlay_diff, preview_merge, merge_subtask, complete_task, set_merge_mode ## Current Context {task_context} ## Agentic Behavior Guidelines ### 1. Analyze Before Acting - For complex orchestration requests, first gather information using query_task_status, list_tasks, or list_daemons - Understand the current state before making changes - For simple, direct requests (e.g., "pause this task"), you can act immediately ### 2. Plan Multi-Step Operations - Break complex orchestration into logical steps - For parallel execution: create multiple subtasks, then run them on different daemons - For sequential execution: create subtasks and run them in order ### 3. Monitor Task Progress - Use query_task_status to check on running tasks - Watch for status changes and react accordingly - Handle failures gracefully (retry, escalate, or report) ### 4. Coordinate Sibling Tasks - Use peek_sibling_overlay to see what other tasks have changed - Preview merges before completing to catch conflicts - Coordinate timing when multiple tasks need to merge ### 5. Be Efficient - Don't over-analyze simple requests - Use the minimum number of tool calls needed - Provide clear summaries of actions taken ## Important Notes - Task IDs are UUIDs - ensure you use the correct format - Running a task requires at least one connected daemon - Overlay operations require the task to have been run at least once - Always confirm destructive operations (discard_task) with the user - When creating subtasks for this task, use parent_task_id: {task_id}"#, task_context = task_context, task_id = task_id ); // Run the shared agentic loop run_mesh_agentic_loop(pool, &state, &llm_client, system_prompt, &request, auth.owner_id).await } fn build_task_context(task: &crate::db::models::Task) -> String { let mut context = format!( "Current Task: {} (ID: {})\n", task.name, task.id ); context.push_str(&format!("Status: {}\n", task.status)); context.push_str(&format!("Priority: {}\n", task.priority)); if let Some(ref desc) = task.description { context.push_str(&format!("Description: {}\n", desc)); } // Truncate plan preview if too long let plan_preview = if task.plan.len() > 200 { format!("{}...", &task.plan[..200]) } else { task.plan.clone() }; context.push_str(&format!("Plan: {}\n", plan_preview)); if let Some(ref summary) = task.progress_summary { context.push_str(&format!("Progress: {}\n", summary)); } if let Some(ref error) = task.error_message { context.push_str(&format!("Error: {}\n", error)); } // Repository info if let Some(ref url) = task.repository_url { context.push_str(&format!("Repository: {}\n", url)); } if let Some(ref branch) = task.base_branch { context.push_str(&format!("Base branch: {}\n", branch)); } context } /// Build overview context for top-level mesh orchestration async fn build_mesh_overview_context(pool: &sqlx::PgPool, state: &SharedState, owner_id: Uuid) -> String { let mut context = String::new(); // Get task counts by status match repository::list_tasks_for_owner(pool, owner_id).await { Ok(tasks) => { let total = tasks.len(); let pending = tasks.iter().filter(|t| t.status == "pending").count(); let running = tasks.iter().filter(|t| t.status == "running").count(); let paused = tasks.iter().filter(|t| t.status == "paused").count(); let done = tasks.iter().filter(|t| t.status == "done").count(); let failed = tasks.iter().filter(|t| t.status == "failed").count(); context.push_str(&format!( "Tasks: {} total ({} pending, {} running, {} paused, {} done, {} failed)\n", total, pending, running, paused, done, failed )); // List recent/active tasks if !tasks.is_empty() { context.push_str("\nRecent Tasks:\n"); for task in tasks.iter().take(5) { context.push_str(&format!( " - {} (ID: {}, Status: {})\n", task.name, task.id, task.status )); } if tasks.len() > 5 { context.push_str(&format!(" ... and {} more\n", tasks.len() - 5)); } } } Err(e) => { context.push_str(&format!("Error fetching tasks: {}\n", e)); } } // Get connected daemons for this owner let owner_daemons: Vec<_> = state.daemon_connections.iter() .filter(|e| e.value().owner_id == owner_id) .collect(); let daemon_count = owner_daemons.len(); context.push_str(&format!("\nConnected Daemons: {}\n", daemon_count)); for entry in owner_daemons.iter().take(3) { let daemon = entry.value(); let working_dir = daemon.working_directory.as_deref().unwrap_or("not set"); context.push_str(&format!( " - {} (ID: {}, Working Directory: {})\n", daemon.hostname.as_deref().unwrap_or("unknown"), daemon.id, working_dir )); } // Add default repository guidance if there's exactly one daemon with a working directory let daemons_with_working_dir: Vec<_> = owner_daemons.iter() .filter(|e| e.value().working_directory.is_some()) .collect(); if daemons_with_working_dir.len() == 1 { if let Some(dir) = &daemons_with_working_dir[0].value().working_directory { context.push_str(&format!( "\nDefault Repository: {} (use this as repository_url when creating tasks unless user specifies otherwise)\n", dir )); } } context } /// Run the shared agentic loop for mesh chat async fn run_mesh_agentic_loop( pool: &sqlx::PgPool, state: &SharedState, llm_client: &LlmClient, system_prompt: String, request: &MeshChatRequest, owner_id: Uuid, ) -> axum::response::Response { // Get or create conversation for storing messages let conversation = match repository::get_or_create_active_conversation(pool, owner_id).await { Ok(c) => c, Err(e) => { tracing::error!("Failed to get/create conversation: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(json!({ "error": format!("Failed to initialize conversation: {}", e) })), ) .into_response(); } }; // Build initial messages let mut messages = vec![Message { role: "system".to_string(), content: Some(system_prompt), tool_calls: None, tool_call_id: None, }]; // Load conversation history from database (or use provided for backwards compatibility) if let Some(history) = &request.history { // Legacy: use provided history for hist_msg in history { messages.push(Message { role: hist_msg.role.clone(), content: Some(hist_msg.content.clone()), tool_calls: None, tool_call_id: None, }); } tracing::info!( history_messages = history.len(), "Loaded mesh conversation history from request (legacy)" ); } else { // New: load from database match repository::list_chat_messages(pool, conversation.id, Some(50)).await { Ok(db_messages) => { for msg in db_messages { messages.push(Message { role: msg.role.clone(), content: Some(msg.content.clone()), tool_calls: None, tool_call_id: None, }); } tracing::info!( history_messages = messages.len() - 1, // minus system message "Loaded mesh conversation history from database" ); } Err(e) => { tracing::warn!("Failed to load chat history: {}", e); // Continue without history } } } // Add current user message messages.push(Message { role: "user".to_string(), content: Some(request.message.clone()), tool_calls: None, tool_call_id: None, }); // State for tracking let mut all_tool_call_infos: Vec = Vec::new(); let mut final_response: Option = None; let mut consecutive_failures = 0; const MAX_CONSECUTIVE_FAILURES: usize = 3; let mut pending_questions: Option> = None; // Multi-turn agentic tool calling loop for round in 0..MAX_TOOL_ROUNDS { tracing::info!( round = round, total_tool_calls = all_tool_call_infos.len(), "Mesh agentic loop iteration" ); // Check consecutive failures if consecutive_failures >= MAX_CONSECUTIVE_FAILURES { tracing::warn!( "Breaking mesh loop due to {} consecutive failures", consecutive_failures ); final_response = Some( "I encountered multiple consecutive errors and stopped. \ Please check the task state and try again." .to_string(), ); break; } // Call the appropriate LLM API let result = match llm_client { LlmClient::Groq(groq) => { match groq.chat_with_tools(messages.clone(), &MESH_TOOLS).await { Ok(r) => LlmResult { content: r.content, tool_calls: r.tool_calls, raw_tool_calls: r.raw_tool_calls, finish_reason: r.finish_reason, }, Err(e) => { tracing::error!("Groq API error: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(json!({ "error": format!("LLM API error: {}", e) })), ) .into_response(); } } } LlmClient::Claude(claude_client) => { let claude_messages = claude::groq_messages_to_claude(&messages); match claude_client .chat_with_tools(claude_messages, &MESH_TOOLS) .await { Ok(r) => { let raw_tool_calls: Vec = r .tool_calls .iter() .map(|tc| ToolCallResponse { id: tc.id.clone(), call_type: "function".to_string(), function: crate::llm::groq::FunctionCall { name: tc.name.clone(), arguments: tc.arguments.to_string(), }, }) .collect(); LlmResult { content: r.content, tool_calls: r.tool_calls, raw_tool_calls, finish_reason: r.stop_reason, } } Err(e) => { tracing::error!("Claude API error: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(json!({ "error": format!("LLM API error: {}", e) })), ) .into_response(); } } } }; // Check if there are tool calls to execute if result.tool_calls.is_empty() { final_response = result.content; break; } // Add assistant message with tool calls to conversation messages.push(Message { role: "assistant".to_string(), content: result.content.clone(), tool_calls: Some(result.raw_tool_calls.clone()), tool_call_id: None, }); // Execute each tool call for (i, tool_call) in result.tool_calls.iter().enumerate() { tracing::info!(tool = %tool_call.name, round = round, "Executing mesh tool call"); // Parse the tool call let mut execution_result = parse_mesh_tool_call(tool_call); // Handle async mesh tool requests if let Some(mesh_request) = execution_result.request.take() { let async_result = handle_mesh_request(pool, state, mesh_request, owner_id).await; execution_result.success = async_result.success; execution_result.message = async_result.message; execution_result.data = async_result.data; } // Track consecutive failures if execution_result.success { consecutive_failures = 0; } else { consecutive_failures += 1; tracing::warn!( tool = %tool_call.name, consecutive_failures = consecutive_failures, "Mesh tool call failed" ); } // Check for pending user questions if let Some(questions) = execution_result.pending_questions { tracing::info!( question_count = questions.len(), "Mesh LLM requesting user input" ); pending_questions = Some(questions); all_tool_call_infos.push(MeshToolCallInfo { name: tool_call.name.clone(), result: ToolResult { success: execution_result.success, message: execution_result.message.clone(), }, }); break; } // Build tool result message let result_content = if let Some(data) = &execution_result.data { json!({ "success": execution_result.success, "message": execution_result.message, "data": data }) .to_string() } else { json!({ "success": execution_result.success, "message": execution_result.message }) .to_string() }; // Add tool result message let tool_call_id = match llm_client { LlmClient::Groq(_) => result.raw_tool_calls[i].id.clone(), LlmClient::Claude(_) => tool_call.id.clone(), }; messages.push(Message { role: "tool".to_string(), content: Some(result_content), tool_calls: None, tool_call_id: Some(tool_call_id), }); // Track for response all_tool_call_infos.push(MeshToolCallInfo { name: tool_call.name.clone(), result: ToolResult { success: execution_result.success, message: execution_result.message, }, }); } // If user questions are pending, pause if pending_questions.is_some() { final_response = result.content; break; } // If finish reason indicates completion, exit loop let finish_lower = result.finish_reason.to_lowercase(); if finish_lower == "stop" || finish_lower == "end_turn" { final_response = result.content; break; } } // Build response let response_text = final_response.unwrap_or_else(|| { if all_tool_call_infos.is_empty() { "I couldn't understand your request. Please try rephrasing.".to_string() } else { format!( "Done! Executed {} tool{}.", all_tool_call_infos.len(), if all_tool_call_infos.len() == 1 { "" } else { "s" } ) } }); // Save messages to database (only if not using legacy history mode) if request.history.is_none() { let context_type = request.context_type.clone().unwrap_or_else(|| "mesh".to_string()); // Validate context_task_id exists before using it (to avoid FK constraint violation) let context_task_id = if let Some(task_id) = request.context_task_id { match repository::get_task_for_owner(pool, task_id, owner_id).await { Ok(Some(_)) => Some(task_id), Ok(None) => { tracing::warn!("context_task_id {} not found, ignoring", task_id); None } Err(e) => { tracing::warn!("Failed to validate context_task_id {}: {}", task_id, e); None } } } else { None }; // Save user message if let Err(e) = repository::add_chat_message( pool, conversation.id, "user", &request.message, &context_type, context_task_id, None, None, ) .await { tracing::warn!("Failed to save user message to DB: {}", e); } // Serialize tool calls for storage let tool_calls_json = if all_tool_call_infos.is_empty() { None } else { Some(serde_json::to_value(&all_tool_call_infos).unwrap_or_default()) }; // Serialize pending questions for storage let pending_questions_json = pending_questions .as_ref() .map(|q| serde_json::to_value(q).unwrap_or_default()); // Save assistant message if let Err(e) = repository::add_chat_message( pool, conversation.id, "assistant", &response_text, &context_type, context_task_id, tool_calls_json, pending_questions_json, ) .await { tracing::warn!("Failed to save assistant message to DB: {}", e); } tracing::info!( conversation_id = %conversation.id, context_type = %context_type, "Saved mesh chat messages to database" ); } ( StatusCode::OK, Json(MeshChatResponse { response: response_text, tool_calls: all_tool_call_infos, pending_questions, }), ) .into_response() } /// Result from handling an async mesh tool request struct MeshRequestResult { success: bool, message: String, data: Option, } /// Handle async mesh tool requests that require database/daemon access async fn handle_mesh_request( pool: &sqlx::PgPool, state: &SharedState, request: MeshToolRequest, owner_id: Uuid, ) -> MeshRequestResult { match request { MeshToolRequest::CreateTask { name, plan, parent_task_id, repository_url, base_branch, merge_mode, priority, } => { // Subtasks inherit contract_id from parent task let contract_id = if let Some(parent_id) = parent_task_id { match repository::get_task(pool, parent_id).await { Ok(Some(parent_task)) => { match parent_task.contract_id { Some(cid) => cid, None => { return MeshRequestResult { success: false, message: "Parent task has no contract_id".to_string(), data: None, }; } } } Ok(None) => { return MeshRequestResult { success: false, message: format!("Parent task {} not found", parent_id), data: None, }; } Err(e) => { return MeshRequestResult { success: false, message: format!("Failed to look up parent task: {}", e), data: None, }; } } } else { // Root tasks created via LLM chat require a contract_id // TODO: Add contract_id to create_task tool definition return MeshRequestResult { success: false, message: "Cannot create root task without contract_id. Use parent_task_id to create subtasks.".to_string(), data: None, }; }; // Check if repository_url matches a daemon's working directory (for this owner) let is_daemon_working_dir = repository_url.as_ref().map(|url| { state.daemon_connections.iter().any(|entry| { entry.value().owner_id == owner_id && entry.value().working_directory.as_ref() == Some(url) }) }).unwrap_or(false); // Derive completion_action from merge_mode, or default to "branch" if using daemon working dir let (completion_action, target_repo_path) = if let Some(ref mode) = merge_mode { // Explicit merge_mode provided - derive from it let action = match mode.as_str() { "pr" => "pr".to_string(), "auto" => "merge".to_string(), "manual" => "branch".to_string(), _ => "none".to_string(), }; // If using daemon working dir and action involves the repo, set target_repo_path let target = if is_daemon_working_dir && action != "none" { repository_url.clone() } else { None }; (Some(action), target) } else if is_daemon_working_dir { // No merge_mode but using daemon working dir - default to "pr" (Some("pr".to_string()), repository_url.clone()) } else { (None, None) }; let create_req = CreateTaskRequest { contract_id: Some(contract_id), name: name.clone(), description: None, plan, parent_task_id, repository_url, base_branch, target_branch: None, merge_mode, priority: priority.unwrap_or(0), target_repo_path, completion_action, continue_from_task_id: None, copy_files: None, is_supervisor: false, checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: None, directive_step_id: None, }; match repository::create_task_for_owner(pool, owner_id, create_req).await { Ok(task) => MeshRequestResult { success: true, message: format!("Created task '{}' with ID {}", name, task.id), data: Some(json!({ "taskId": task.id, "name": task.name, "status": task.status, })), }, Err(e) => MeshRequestResult { success: false, message: format!("Failed to create task: {}", e), data: None, }, } } MeshToolRequest::RunTask { task_id, daemon_id } => { // Get task to check status let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { Ok(Some(t)) => t, Ok(None) => { return MeshRequestResult { success: false, message: format!("Task {} not found", task_id), data: None, } } Err(e) => { return MeshRequestResult { success: false, message: format!("Database error: {}", e), data: None, } } }; if task.status != "pending" && task.status != "paused" { return MeshRequestResult { success: false, message: format!( "Task cannot be run - status is '{}' (must be 'pending' or 'paused')", task.status ), data: None, }; } // Find a daemon to run on (must belong to this owner) let target_daemon_id = if let Some(id) = daemon_id { // Verify the specified daemon belongs to this owner if !state.daemon_connections.iter().any(|d| d.value().id == id && d.value().owner_id == owner_id) { return MeshRequestResult { success: false, message: "Specified daemon not found or not accessible.".to_string(), data: None, }; } id } else { // Find any connected daemon for this owner let daemon = state.daemon_connections.iter().find(|d| d.value().owner_id == owner_id); match daemon { Some(d) => d.value().id, None => { return MeshRequestResult { success: false, message: "No daemons connected for your account. Cannot run task.".to_string(), data: None, } } } }; // Check if this is an orchestrator (depth 0 with subtasks) let subtask_count = match repository::list_subtasks_for_owner(pool, task_id, owner_id).await { Ok(subtasks) => subtasks.len(), Err(_) => 0, }; let is_orchestrator = task.depth == 0 && subtask_count > 0; // IMPORTANT: Update database FIRST to assign daemon_id before sending command // This prevents race conditions where the task starts but daemon_id is not set let update_req = crate::db::models::UpdateTaskRequest { status: Some("starting".to_string()), daemon_id: Some(target_daemon_id), version: Some(task.version), ..Default::default() }; let updated_task = match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { Ok(Some(t)) => t, Ok(None) => { return MeshRequestResult { success: false, message: format!("Task {} not found", task_id), data: None, } } Err(e) => { return MeshRequestResult { success: false, message: format!("Failed to update task: {}", e), data: None, } } }; // Get local_only and auto_merge_local from contract if task has one let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id { match repository::get_contract_for_owner(pool, contract_id, owner_id).await { Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local), _ => (false, false), } } else { (false, false) }; // Send SpawnTask command to daemon let command = DaemonCommand::SpawnTask { task_id, task_name: task.name.clone(), plan: task.plan.clone(), repo_url: task.repository_url.clone(), base_branch: task.base_branch.clone(), target_branch: task.target_branch.clone(), parent_task_id: task.parent_task_id, depth: task.depth, is_orchestrator, target_repo_path: task.target_repo_path.clone(), completion_action: task.completion_action.clone(), continue_from_task_id: task.continue_from_task_id, copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), contract_id: task.contract_id, is_supervisor: task.is_supervisor, autonomous_loop: false, resume_session: false, conversation_history: None, patch_data: None, patch_base_sha: None, local_only, auto_merge_local, supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: task.directive_id, }; match state.send_daemon_command(target_daemon_id, command).await { Ok(()) => { state.broadcast_task_update(TaskUpdateNotification { task_id, owner_id: Some(task.owner_id), version: updated_task.version, status: "starting".to_string(), updated_fields: vec!["status".to_string(), "daemon_id".to_string()], updated_by: "system".to_string(), }); MeshRequestResult { success: true, message: format!("Task {} is now running on daemon {}", task_id, target_daemon_id), data: Some(json!({ "taskId": task_id, "daemonId": target_daemon_id, "status": "starting", })), } } Err(e) => { // Rollback: clear daemon_id and reset status since command failed let rollback_req = crate::db::models::UpdateTaskRequest { status: Some("pending".to_string()), clear_daemon_id: true, ..Default::default() }; let _ = repository::update_task_for_owner(pool, task_id, owner_id, rollback_req).await; MeshRequestResult { success: false, message: format!("Failed to start task: {}", e), data: None, } } } } MeshToolRequest::PauseTask { task_id } => { // Get task and its daemon let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { Ok(Some(t)) => t, Ok(None) => { return MeshRequestResult { success: false, message: format!("Task {} not found", task_id), data: None, } } Err(e) => { return MeshRequestResult { success: false, message: format!("Database error: {}", e), data: None, } } }; if task.status != "running" { return MeshRequestResult { success: false, message: format!("Task is not running (status: {})", task.status), data: None, }; } if let Some(daemon_id) = task.daemon_id { let command = DaemonCommand::PauseTask { task_id }; if let Err(e) = state.send_daemon_command(daemon_id, command).await { return MeshRequestResult { success: false, message: format!("Failed to pause task: {}", e), data: None, }; } } // Update status let update_req = crate::db::models::UpdateTaskRequest { status: Some("paused".to_string()), version: Some(task.version), ..Default::default() }; if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { state.broadcast_task_update(TaskUpdateNotification { task_id, owner_id: Some(task.owner_id), version: updated.version, status: "paused".to_string(), updated_fields: vec!["status".to_string()], updated_by: "system".to_string(), }); } MeshRequestResult { success: true, message: format!("Task {} paused", task_id), data: Some(json!({ "taskId": task_id, "status": "paused" })), } } MeshToolRequest::ResumeTask { task_id } => { let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { Ok(Some(t)) => t, Ok(None) => { return MeshRequestResult { success: false, message: format!("Task {} not found", task_id), data: None, } } Err(e) => { return MeshRequestResult { success: false, message: format!("Database error: {}", e), data: None, } } }; if task.status != "paused" { return MeshRequestResult { success: false, message: format!("Task is not paused (status: {})", task.status), data: None, }; } if let Some(daemon_id) = task.daemon_id { let command = DaemonCommand::ResumeTask { task_id }; if let Err(e) = state.send_daemon_command(daemon_id, command).await { return MeshRequestResult { success: false, message: format!("Failed to resume task: {}", e), data: None, }; } } // Update status let update_req = crate::db::models::UpdateTaskRequest { status: Some("running".to_string()), version: Some(task.version), ..Default::default() }; if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { state.broadcast_task_update(TaskUpdateNotification { task_id, owner_id: Some(task.owner_id), version: updated.version, status: "running".to_string(), updated_fields: vec!["status".to_string()], updated_by: "system".to_string(), }); } MeshRequestResult { success: true, message: format!("Task {} resumed", task_id), data: Some(json!({ "taskId": task_id, "status": "running" })), } } MeshToolRequest::InterruptTask { task_id, graceful } => { let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { Ok(Some(t)) => t, Ok(None) => { return MeshRequestResult { success: false, message: format!("Task {} not found", task_id), data: None, } } Err(e) => { return MeshRequestResult { success: false, message: format!("Database error: {}", e), data: None, } } }; if let Some(daemon_id) = task.daemon_id { let command = DaemonCommand::InterruptTask { task_id, graceful }; if let Err(e) = state.send_daemon_command(daemon_id, command).await { return MeshRequestResult { success: false, message: format!("Failed to interrupt task: {}", e), data: None, }; } } // Update status let update_req = crate::db::models::UpdateTaskRequest { status: Some("paused".to_string()), version: Some(task.version), ..Default::default() }; if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { state.broadcast_task_update(TaskUpdateNotification { task_id, owner_id: Some(task.owner_id), version: updated.version, status: "paused".to_string(), updated_fields: vec!["status".to_string()], updated_by: "system".to_string(), }); } MeshRequestResult { success: true, message: format!( "Task {} {}interrupted", task_id, if graceful { "gracefully " } else { "" } ), data: Some(json!({ "taskId": task_id, "status": "paused" })), } } MeshToolRequest::DiscardTask { task_id } => { match repository::delete_task_for_owner(pool, task_id, owner_id).await { Ok(true) => MeshRequestResult { success: true, message: format!("Task {} discarded", task_id), data: Some(json!({ "taskId": task_id, "deleted": true })), }, Ok(false) => MeshRequestResult { success: false, message: format!("Task {} not found", task_id), data: None, }, Err(e) => MeshRequestResult { success: false, message: format!("Failed to delete task: {}", e), data: None, }, } } MeshToolRequest::QueryTaskStatus { task_id } => { match repository::get_task_for_owner(pool, task_id, owner_id).await { Ok(Some(task)) => MeshRequestResult { success: true, message: format!("Task '{}' is {}", task.name, task.status), data: Some(json!({ "taskId": task.id, "name": task.name, "status": task.status, "priority": task.priority, "description": task.description, "plan": task.plan, "progressSummary": task.progress_summary, "errorMessage": task.error_message, "repositoryUrl": task.repository_url, "baseBranch": task.base_branch, "targetBranch": task.target_branch, "mergeMode": task.merge_mode, "prUrl": task.pr_url, "daemonId": task.daemon_id, "containerId": task.container_id, "createdAt": task.created_at, "startedAt": task.started_at, "completedAt": task.completed_at, })), }, Ok(None) => MeshRequestResult { success: false, message: format!("Task {} not found", task_id), data: None, }, Err(e) => MeshRequestResult { success: false, message: format!("Database error: {}", e), data: None, }, } } MeshToolRequest::ListTasks { status_filter, parent_task_id, } => { // TODO: Add filtering support to repository match repository::list_tasks_for_owner(pool, owner_id).await { Ok(mut tasks) => { // Apply filters if let Some(ref status) = status_filter { tasks.retain(|t| &t.status == status); } if let Some(ref parent_id) = parent_task_id { tasks.retain(|t| t.parent_task_id.as_ref() == Some(parent_id)); } let task_data: Vec = tasks .iter() .map(|t| { json!({ "taskId": t.id, "name": t.name, "status": t.status, "priority": t.priority, "parentTaskId": t.parent_task_id, }) }) .collect(); MeshRequestResult { success: true, message: format!("Found {} tasks", tasks.len()), data: Some(json!({ "tasks": task_data })), } } Err(e) => MeshRequestResult { success: false, message: format!("Database error: {}", e), data: None, }, } } MeshToolRequest::ListSubtasks { task_id } => { match repository::list_subtasks_for_owner(pool, task_id, owner_id).await { Ok(subtasks) => { let subtask_data: Vec = subtasks .iter() .map(|t| { json!({ "taskId": t.id, "name": t.name, "status": t.status, "priority": t.priority, }) }) .collect(); MeshRequestResult { success: true, message: format!("Found {} subtasks", subtasks.len()), data: Some(json!({ "subtasks": subtask_data })), } } Err(e) => MeshRequestResult { success: false, message: format!("Database error: {}", e), data: None, }, } } MeshToolRequest::ListSiblings { task_id } => { // Get task to find parent let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { Ok(Some(t)) => t, Ok(None) => { return MeshRequestResult { success: false, message: format!("Task {} not found", task_id), data: None, } } Err(e) => { return MeshRequestResult { success: false, message: format!("Database error: {}", e), data: None, } } }; let Some(parent_id) = task.parent_task_id else { return MeshRequestResult { success: true, message: "Task has no parent, so no siblings".to_string(), data: Some(json!({ "siblings": [] })), }; }; // Get all subtasks of parent, excluding current task match repository::list_subtasks_for_owner(pool, parent_id, owner_id).await { Ok(siblings) => { let sibling_data: Vec = siblings .iter() .filter(|t| t.id != task_id) .map(|t| { json!({ "taskId": t.id, "name": t.name, "status": t.status, "priority": t.priority, }) }) .collect(); MeshRequestResult { success: true, message: format!("Found {} sibling tasks", sibling_data.len()), data: Some(json!({ "siblings": sibling_data })), } } Err(e) => MeshRequestResult { success: false, message: format!("Database error: {}", e), data: None, }, } } MeshToolRequest::ListDaemons => { // Only list daemons belonging to this owner let daemons: Vec = state .daemon_connections .iter() .filter(|entry| entry.value().owner_id == owner_id) .map(|entry| { let d = entry.value(); json!({ "daemonId": d.id, "connectionId": d.connection_id, "hostname": d.hostname, "machineId": d.machine_id, }) }) .collect(); MeshRequestResult { success: true, message: format!("{} daemon(s) connected", daemons.len()), data: Some(json!({ "daemons": daemons })), } } MeshToolRequest::ListDaemonDirectories => { let mut directories: Vec = Vec::new(); // Only list directories from daemons belonging to this owner for entry in state.daemon_connections.iter() { let daemon = entry.value(); // Only include daemons belonging to this owner if daemon.owner_id != owner_id { continue; } // Add working directory if available if let Some(ref working_dir) = daemon.working_directory { directories.push(json!({ "path": working_dir, "label": "Working Directory", "directoryType": "working", "hostname": daemon.hostname, })); } // Add home directory if available if let Some(ref home_dir) = daemon.home_directory { directories.push(json!({ "path": home_dir, "label": "Makima Home", "directoryType": "home", "hostname": daemon.hostname, })); } } MeshRequestResult { success: true, message: format!("Found {} available directories", directories.len()), data: Some(json!({ "directories": directories })), } } MeshToolRequest::ListFiles => { match repository::list_files_for_owner(pool, owner_id).await { Ok(files) => { let file_data: Vec = files .iter() .map(|f| { json!({ "fileId": f.id, "name": f.name, "description": f.description, "createdAt": f.created_at, "updatedAt": f.updated_at, }) }) .collect(); MeshRequestResult { success: true, message: format!("Found {} files", files.len()), data: Some(json!({ "files": file_data })), } } Err(e) => MeshRequestResult { success: false, message: format!("Database error: {}", e), data: None, }, } } MeshToolRequest::ReadFile { file_id } => { match repository::get_file_for_owner(pool, file_id, owner_id).await { Ok(Some(file)) => { // Convert body elements to readable text let body_content: Vec = file .body .iter() .map(|elem| { match elem { crate::db::models::BodyElement::Heading { level, text } => { json!({ "type": "heading", "level": level, "text": text }) } crate::db::models::BodyElement::Paragraph { text } => { json!({ "type": "paragraph", "text": text }) } crate::db::models::BodyElement::Code { language, content } => { json!({ "type": "code", "language": language, "content": content }) } crate::db::models::BodyElement::List { ordered, items } => { json!({ "type": "list", "ordered": ordered, "items": items }) } crate::db::models::BodyElement::Chart { chart_type, title, data, config: _ } => { let data_count = data.as_array().map(|arr| arr.len()).unwrap_or(0); json!({ "type": "chart", "chartType": chart_type, "title": title, "dataPoints": data_count }) } crate::db::models::BodyElement::Image { src, alt, caption } => { json!({ "type": "image", "src": src, "alt": alt, "caption": caption }) } crate::db::models::BodyElement::Markdown { content } => { json!({ "type": "markdown", "content": content }) } } }) .collect(); // Also build a plain text version for easier reading let plain_text: String = file .body .iter() .filter_map(|elem| { match elem { crate::db::models::BodyElement::Heading { level, text } => { Some(format!("{} {}", "#".repeat(*level as usize), text)) } crate::db::models::BodyElement::Paragraph { text } => { Some(text.clone()) } crate::db::models::BodyElement::Code { language, content } => { let lang = language.as_deref().unwrap_or(""); Some(format!("```{}\n{}\n```", lang, content)) } crate::db::models::BodyElement::List { ordered, items } => { let list_text: Vec = items.iter().enumerate().map(|(i, item)| { if *ordered { format!("{}. {}", i + 1, item) } else { format!("- {}", item) } }).collect(); Some(list_text.join("\n")) } crate::db::models::BodyElement::Markdown { content } => { Some(content.clone()) } _ => None, } }) .collect::>() .join("\n\n"); // Convert transcript entries to JSON let transcript: Vec = file .transcript .iter() .map(|entry| { json!({ "id": entry.id, "speaker": entry.speaker, "start": entry.start, "end": entry.end, "text": entry.text, }) }) .collect(); // Build a plain text transcript for easier reading let transcript_text: String = file .transcript .iter() .map(|entry| { format!("[{:.1}s] {}: {}", entry.start, entry.speaker, entry.text) }) .collect::>() .join("\n"); MeshRequestResult { success: true, message: format!("Read file '{}'", file.name), data: Some(json!({ "fileId": file.id, "name": file.name, "description": file.description, "summary": file.summary, "body": body_content, "plainText": plain_text, "transcript": transcript, "transcriptText": transcript_text, "transcriptCount": file.transcript.len(), "createdAt": file.created_at, "updatedAt": file.updated_at, })), } } Ok(None) => MeshRequestResult { success: false, message: format!("File {} not found", file_id), data: None, }, Err(e) => MeshRequestResult { success: false, message: format!("Database error: {}", e), data: None, }, } } MeshToolRequest::SendMessageToTask { task_id, message } => { let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { Ok(Some(t)) => t, Ok(None) => { return MeshRequestResult { success: false, message: format!("Task {} not found", task_id), data: None, } } Err(e) => { return MeshRequestResult { success: false, message: format!("Database error: {}", e), data: None, } } }; if task.status != "running" { return MeshRequestResult { success: false, message: format!("Task is not running (status: {})", task.status), data: None, }; } if let Some(daemon_id) = task.daemon_id { let command = DaemonCommand::SendMessage { task_id, message }; match state.send_daemon_command(daemon_id, command).await { Ok(()) => MeshRequestResult { success: true, message: "Message sent to task".to_string(), data: Some(json!({ "taskId": task_id })), }, Err(e) => MeshRequestResult { success: false, message: format!("Failed to send message: {}", e), data: None, }, } } else { MeshRequestResult { success: false, message: "Task has no daemon assigned".to_string(), data: None, } } } MeshToolRequest::UpdateTaskPlan { task_id, new_plan, interrupt_if_running, } => { let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { Ok(Some(t)) => t, Ok(None) => { return MeshRequestResult { success: false, message: format!("Task {} not found", task_id), data: None, } } Err(e) => { return MeshRequestResult { success: false, message: format!("Database error: {}", e), data: None, } } }; // Interrupt if running and requested if task.status == "running" && interrupt_if_running { if let Some(daemon_id) = task.daemon_id { let command = DaemonCommand::InterruptTask { task_id, graceful: true, }; let _ = state.send_daemon_command(daemon_id, command).await; } } let update_req = crate::db::models::UpdateTaskRequest { plan: Some(new_plan), version: Some(task.version), ..Default::default() }; match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { Ok(Some(updated)) => { state.broadcast_task_update(TaskUpdateNotification { task_id, owner_id: Some(task.owner_id), version: updated.version, status: updated.status.clone(), updated_fields: vec!["plan".to_string()], updated_by: "system".to_string(), }); MeshRequestResult { success: true, message: "Task plan updated".to_string(), data: Some(json!({ "taskId": task_id })), } } Ok(None) => MeshRequestResult { success: false, message: "Task not found".to_string(), data: None, }, Err(e) => MeshRequestResult { success: false, message: format!("Failed to update task: {}", e), data: None, }, } } // Overlay operations - these require daemon communication // For now, return placeholder responses since daemon implementation is separate MeshToolRequest::PeekSiblingOverlay { sibling_task_id } => MeshRequestResult { success: false, message: format!( "Overlay operations require a connected daemon. Task {} may not have overlay data yet.", sibling_task_id ), data: None, }, MeshToolRequest::GetOverlayDiff { task_id } => MeshRequestResult { success: false, message: format!( "Overlay operations require a connected daemon. Task {} may not have overlay data yet.", task_id ), data: None, }, MeshToolRequest::PreviewMerge { task_id } => MeshRequestResult { success: false, message: format!( "Merge preview requires a connected daemon. Task {} may not have overlay data yet.", task_id ), data: None, }, MeshToolRequest::MergeSubtask { task_id } => MeshRequestResult { success: false, message: format!( "Merge operations require a connected daemon. Task {}", task_id ), data: None, }, MeshToolRequest::CompleteTask { task_id } => { let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { Ok(Some(t)) => t, Ok(None) => { return MeshRequestResult { success: false, message: format!("Task {} not found", task_id), data: None, } } Err(e) => { return MeshRequestResult { success: false, message: format!("Database error: {}", e), data: None, } } }; // Update status to done let update_req = crate::db::models::UpdateTaskRequest { status: Some("done".to_string()), version: Some(task.version), ..Default::default() }; match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { Ok(Some(updated)) => { state.broadcast_task_update(TaskUpdateNotification { task_id, owner_id: Some(task.owner_id), version: updated.version, status: "done".to_string(), updated_fields: vec!["status".to_string()], updated_by: "system".to_string(), }); let merge_mode = task.merge_mode.unwrap_or_else(|| "pr".to_string()); MeshRequestResult { success: true, message: format!( "Task {} completed. Merge mode: {}", task_id, &merge_mode ), data: Some(json!({ "taskId": task_id, "status": "done", "mergeMode": merge_mode, })), } } Ok(None) => MeshRequestResult { success: false, message: "Task not found".to_string(), data: None, }, Err(e) => MeshRequestResult { success: false, message: format!("Failed to complete task: {}", e), data: None, }, } } MeshToolRequest::SetMergeMode { task_id, mode } => { let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { Ok(Some(t)) => t, Ok(None) => { return MeshRequestResult { success: false, message: format!("Task {} not found", task_id), data: None, } } Err(e) => { return MeshRequestResult { success: false, message: format!("Database error: {}", e), data: None, } } }; let update_req = crate::db::models::UpdateTaskRequest { merge_mode: Some(mode.clone()), version: Some(task.version), ..Default::default() }; match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { Ok(Some(updated)) => { state.broadcast_task_update(TaskUpdateNotification { task_id, owner_id: Some(task.owner_id), version: updated.version, status: updated.status, updated_fields: vec!["merge_mode".to_string()], updated_by: "system".to_string(), }); MeshRequestResult { success: true, message: format!("Merge mode set to '{}'", mode), data: Some(json!({ "taskId": task_id, "mergeMode": mode })), } } Ok(None) => MeshRequestResult { success: false, message: "Task not found".to_string(), data: None, }, Err(e) => MeshRequestResult { success: false, message: format!("Failed to update merge mode: {}", e), data: None, }, } } // Supervisor-only tools - these should be handled via the supervisor.sh script, // not through the mesh chat. Return an informative error. MeshToolRequest::GetAllContractTasks { contract_id } => { MeshRequestResult { success: false, message: format!( "get_all_contract_tasks is a supervisor-only tool. Use supervisor.sh to access this functionality. Contract: {}", contract_id ), data: None, } } MeshToolRequest::WaitForTaskCompletion { task_id, timeout_seconds } => { MeshRequestResult { success: false, message: format!( "wait_for_task_completion is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}, Timeout: {}s", task_id, timeout_seconds ), data: None, } } MeshToolRequest::ReadTaskWorktree { task_id, file_path } => { MeshRequestResult { success: false, message: format!( "read_task_worktree is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}, Path: {}", task_id, file_path ), data: None, } } MeshToolRequest::SpawnTask { name, .. } => { MeshRequestResult { success: false, message: format!( "spawn_task is a supervisor-only tool. Only the contract supervisor can spawn new tasks. Task name: {}", name ), data: None, } } MeshToolRequest::CreateCheckpoint { task_id, message } => { MeshRequestResult { success: false, message: format!( "create_checkpoint is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}, Message: {}", task_id, message ), data: None, } } MeshToolRequest::ListTaskCheckpoints { task_id } => { MeshRequestResult { success: false, message: format!( "list_task_checkpoints is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}", task_id ), data: None, } } MeshToolRequest::GetTaskTree { task_id } => { MeshRequestResult { success: false, message: format!( "get_task_tree is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}", task_id ), data: None, } } } } // ============================================================================= // Chat History Endpoints // ============================================================================= use crate::db::models::MeshChatHistoryResponse; /// Get chat history for the current conversation (requires authentication) #[utoipa::path( get, path = "/api/v1/mesh/chat/history", responses( (status = 200, description = "Chat history", body = MeshChatHistoryResponse), (status = 401, description = "Unauthorized"), (status = 503, description = "Database not configured"), (status = 500, description = "Internal server error") ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn get_chat_history( State(state): State, Authenticated(auth): Authenticated, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(json!({ "error": "Database not configured" })), ) .into_response(); }; let conversation = match repository::get_or_create_active_conversation(pool, auth.owner_id).await { Ok(c) => c, Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(json!({ "error": e.to_string() })), ) .into_response() } }; let messages = match repository::list_chat_messages(pool, conversation.id, None).await { Ok(m) => m, Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(json!({ "error": e.to_string() })), ) .into_response() } }; ( StatusCode::OK, Json(MeshChatHistoryResponse { conversation_id: conversation.id, messages, }), ) .into_response() } /// Clear chat history (archives current conversation and starts new, requires authentication) #[utoipa::path( delete, path = "/api/v1/mesh/chat/history", responses( (status = 200, description = "History cleared"), (status = 401, description = "Unauthorized"), (status = 503, description = "Database not configured"), (status = 500, description = "Internal server error") ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "Mesh" )] pub async fn clear_chat_history( State(state): State, Authenticated(auth): Authenticated, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(json!({ "error": "Database not configured" })), ) .into_response(); }; match repository::clear_conversation(pool, auth.owner_id).await { Ok(new_conv) => ( StatusCode::OK, Json(json!({ "success": true, "conversationId": new_conv.id })), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(json!({ "error": e.to_string() })), ) .into_response(), } }