summaryrefslogtreecommitdiff
path: root/makima/src/server/handlers/mesh_chat.rs
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/server/handlers/mesh_chat.rs')
-rw-r--r--makima/src/server/handlers/mesh_chat.rs2088
1 files changed, 2088 insertions, 0 deletions
diff --git a/makima/src/server/handlers/mesh_chat.rs b/makima/src/server/handlers/mesh_chat.rs
new file mode 100644
index 0000000..5d6d2ee
--- /dev/null
+++ b/makima/src/server/handlers/mesh_chat.rs
@@ -0,0 +1,2088 @@
+//! Chat endpoint for LLM-powered task orchestration.
+//!
+//! This handler provides an agentic loop for managing tasks, daemons, and
+//! overlay operations through LLM tool calling.
+
+use axum::{
+ extract::{Path, State},
+ http::StatusCode,
+ response::IntoResponse,
+ Json,
+};
+use serde::{Deserialize, Serialize};
+use serde_json::json;
+use utoipa::ToSchema;
+use uuid::Uuid;
+
+use crate::db::{models::CreateTaskRequest, repository};
+use crate::llm::{
+ claude::{self, ClaudeClient, ClaudeError, ClaudeModel},
+ groq::{GroqClient, GroqError, Message, ToolCallResponse},
+ parse_mesh_tool_call, LlmModel, MeshToolRequest, ToolCall, ToolResult, UserQuestion,
+ MESH_TOOLS,
+};
+use crate::server::auth::Authenticated;
+use crate::server::state::{DaemonCommand, SharedState, TaskUpdateNotification};
+
+/// Maximum number of tool-calling rounds to prevent infinite loops
+const MAX_TOOL_ROUNDS: usize = 30;
+
+#[derive(Debug, Clone, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct MeshChatHistoryMessage {
+ /// Role: "user" or "assistant"
+ pub role: String,
+ /// Message content
+ pub content: String,
+}
+
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct MeshChatRequest {
+ /// The user's message/instruction
+ pub message: String,
+ /// Optional model selection: "claude-sonnet" (default), "claude-opus", or "groq"
+ #[serde(default)]
+ pub model: Option<String>,
+ /// Optional conversation history for context continuity (deprecated - now loaded from DB)
+ #[serde(default)]
+ pub history: Option<Vec<MeshChatHistoryMessage>>,
+ /// Context type: "mesh", "task", or "subtask"
+ #[serde(default)]
+ pub context_type: Option<String>,
+ /// Task ID if context is task/subtask
+ #[serde(default)]
+ pub context_task_id: Option<Uuid>,
+}
+
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct MeshChatResponse {
+ /// The LLM's response message
+ pub response: String,
+ /// Tool calls that were executed
+ pub tool_calls: Vec<MeshToolCallInfo>,
+ /// Questions pending user answers (pauses conversation)
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub pending_questions: Option<Vec<UserQuestion>>,
+}
+
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct MeshToolCallInfo {
+ pub name: String,
+ pub result: ToolResult,
+}
+
+/// Enum to hold LLM clients
+enum LlmClient {
+ Groq(GroqClient),
+ Claude(ClaudeClient),
+}
+
+/// Unified result from LLM call
+struct LlmResult {
+ content: Option<String>,
+ tool_calls: Vec<ToolCall>,
+ raw_tool_calls: Vec<ToolCallResponse>,
+ finish_reason: String,
+}
+
+/// Chat with mesh orchestrator at the top level (no specific task context)
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/chat",
+ request_body = MeshChatRequest,
+ responses(
+ (status = 200, description = "Chat completed successfully", body = MeshChatResponse),
+ (status = 401, description = "Unauthorized"),
+ (status = 500, description = "Internal server error")
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn mesh_toplevel_chat_handler(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Json(request): Json<MeshChatRequest>,
+) -> impl IntoResponse {
+ // Check if database is configured
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(json!({ "error": "Database not configured" })),
+ )
+ .into_response();
+ };
+
+ // Parse model selection (default to Claude Sonnet)
+ let model = request
+ .model
+ .as_ref()
+ .and_then(|m| LlmModel::from_str(m))
+ .unwrap_or(LlmModel::ClaudeSonnet);
+
+ tracing::info!("Mesh top-level chat using LLM model: {:?}", model);
+
+ // Initialize the appropriate LLM client
+ let llm_client = match model {
+ LlmModel::ClaudeSonnet => match ClaudeClient::from_env(ClaudeModel::Sonnet) {
+ Ok(client) => LlmClient::Claude(client),
+ Err(ClaudeError::MissingApiKey) => {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(json!({ "error": format!("Claude client error: {}", e) })),
+ )
+ .into_response();
+ }
+ },
+ LlmModel::ClaudeOpus => match ClaudeClient::from_env(ClaudeModel::Opus) {
+ Ok(client) => LlmClient::Claude(client),
+ Err(ClaudeError::MissingApiKey) => {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(json!({ "error": format!("Claude client error: {}", e) })),
+ )
+ .into_response();
+ }
+ },
+ LlmModel::GroqKimi => match GroqClient::from_env() {
+ Ok(client) => LlmClient::Groq(client),
+ Err(GroqError::MissingApiKey) => {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(json!({ "error": "GROQ_API_KEY not configured" })),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(json!({ "error": format!("Groq client error: {}", e) })),
+ )
+ .into_response();
+ }
+ },
+ };
+
+ // Build context about all tasks and daemons
+ let mesh_context = build_mesh_overview_context(pool, &state, auth.owner_id).await;
+
+ // Build agentic system prompt for top-level mesh orchestration
+ let system_prompt = format!(
+ r#"You are an intelligent task orchestration agent. You help users manage and coordinate tasks running on connected daemons with Claude Code containers.
+
+## Your Capabilities
+You have access to tools for:
+- **Task Lifecycle**: create_task, run_task, pause_task, resume_task, interrupt_task, discard_task
+- **Task Queries**: query_task_status, list_tasks, list_subtasks, list_siblings, list_daemons
+- **File Access**: list_files, read_file (read documents from the files system)
+- **Task Communication**: send_message_to_task, update_task_plan
+- **Overlay/Merge Operations**: peek_sibling_overlay, get_overlay_diff, preview_merge, merge_subtask, complete_task, set_merge_mode
+
+## Current Mesh Overview
+{mesh_context}
+
+## Agentic Behavior Guidelines
+
+### 1. Analyze Before Acting
+- For complex orchestration requests, first gather information using query_task_status, list_tasks, or list_daemons
+- Understand the current state before making changes
+- For simple, direct requests (e.g., "create a new task"), you can act immediately
+
+### 2. Plan Multi-Step Operations
+- Break complex orchestration into logical steps
+- For parallel execution: create multiple subtasks, then run them on different daemons
+- For sequential execution: create subtasks and run them in order
+
+### 3. Create and Manage Tasks
+- Use create_task to create new top-level tasks or subtasks
+- Assign appropriate priorities and plans
+- **Repository Default**: When creating tasks, use the daemon's working directory as the repository_url by default (shown as "Default Repository" above). Only omit repository_url if the task doesn't involve code, or use a different URL if the user explicitly requests it.
+- If a working directory is a git repository, use it as the repository_url for code-related tasks
+
+### 4. Coordinate Multiple Tasks
+- Use list_tasks to see all tasks and their statuses
+- Use list_daemons to see available compute resources
+- Balance workload across daemons
+
+### 5. Be Efficient
+- Don't over-analyze simple requests
+- Use the minimum number of tool calls needed
+- Provide clear summaries of actions taken
+
+## Important Notes
+- Task IDs are UUIDs - ensure you use the correct format
+- Running a task requires at least one connected daemon
+- When creating subtasks, specify the parent_task_id
+- Always confirm destructive operations (discard_task) with the user"#,
+ mesh_context = mesh_context
+ );
+
+ // Run the shared agentic loop
+ run_mesh_agentic_loop(pool, &state, &llm_client, system_prompt, &request, auth.owner_id).await
+}
+
+/// Chat with task mesh orchestrator using LLM tool calling (scoped by owner)
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks/{id}/chat",
+ request_body = MeshChatRequest,
+ responses(
+ (status = 200, description = "Chat completed successfully", body = MeshChatResponse),
+ (status = 401, description = "Unauthorized"),
+ (status = 404, description = "Task not found"),
+ (status = 500, description = "Internal server error")
+ ),
+ params(
+ ("id" = Uuid, Path, description = "Task ID (context for orchestration)")
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn mesh_chat_handler(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(task_id): Path<Uuid>,
+ Json(request): Json<MeshChatRequest>,
+) -> impl IntoResponse {
+ // Check if database is configured
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(json!({ "error": "Database not configured" })),
+ )
+ .into_response();
+ };
+
+ // Get the context task (scoped by owner)
+ let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
+ Ok(Some(task)) => task,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(json!({ "error": "Task not found" })),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Database error: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(json!({ "error": format!("Database error: {}", e) })),
+ )
+ .into_response();
+ }
+ };
+
+ // Parse model selection (default to Claude Sonnet)
+ let model = request
+ .model
+ .as_ref()
+ .and_then(|m| LlmModel::from_str(m))
+ .unwrap_or(LlmModel::ClaudeSonnet);
+
+ tracing::info!("Mesh chat using LLM model: {:?}", model);
+
+ // Initialize the appropriate LLM client
+ let llm_client = match model {
+ LlmModel::ClaudeSonnet => match ClaudeClient::from_env(ClaudeModel::Sonnet) {
+ Ok(client) => LlmClient::Claude(client),
+ Err(ClaudeError::MissingApiKey) => {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(json!({ "error": format!("Claude client error: {}", e) })),
+ )
+ .into_response();
+ }
+ },
+ LlmModel::ClaudeOpus => match ClaudeClient::from_env(ClaudeModel::Opus) {
+ Ok(client) => LlmClient::Claude(client),
+ Err(ClaudeError::MissingApiKey) => {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(json!({ "error": format!("Claude client error: {}", e) })),
+ )
+ .into_response();
+ }
+ },
+ LlmModel::GroqKimi => match GroqClient::from_env() {
+ Ok(client) => LlmClient::Groq(client),
+ Err(GroqError::MissingApiKey) => {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(json!({ "error": "GROQ_API_KEY not configured" })),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(json!({ "error": format!("Groq client error: {}", e) })),
+ )
+ .into_response();
+ }
+ },
+ };
+
+ // Build context about the current task and mesh state
+ let task_context = build_task_context(&task);
+
+ // Build agentic system prompt for task orchestration
+ let system_prompt = format!(
+ r#"You are an intelligent task orchestration agent. You help users manage and coordinate tasks running on connected daemons with Claude Code containers.
+
+## Your Capabilities
+You have access to tools for:
+- **Task Lifecycle**: create_task, run_task, pause_task, resume_task, interrupt_task, discard_task
+- **Task Queries**: query_task_status, list_tasks, list_subtasks, list_siblings, list_daemons
+- **File Access**: list_files, read_file (read documents from the files system)
+- **Task Communication**: send_message_to_task, update_task_plan
+- **Overlay/Merge Operations**: peek_sibling_overlay, get_overlay_diff, preview_merge, merge_subtask, complete_task, set_merge_mode
+
+## Current Context
+{task_context}
+
+## Agentic Behavior Guidelines
+
+### 1. Analyze Before Acting
+- For complex orchestration requests, first gather information using query_task_status, list_tasks, or list_daemons
+- Understand the current state before making changes
+- For simple, direct requests (e.g., "pause this task"), you can act immediately
+
+### 2. Plan Multi-Step Operations
+- Break complex orchestration into logical steps
+- For parallel execution: create multiple subtasks, then run them on different daemons
+- For sequential execution: create subtasks and run them in order
+
+### 3. Monitor Task Progress
+- Use query_task_status to check on running tasks
+- Watch for status changes and react accordingly
+- Handle failures gracefully (retry, escalate, or report)
+
+### 4. Coordinate Sibling Tasks
+- Use peek_sibling_overlay to see what other tasks have changed
+- Preview merges before completing to catch conflicts
+- Coordinate timing when multiple tasks need to merge
+
+### 5. Be Efficient
+- Don't over-analyze simple requests
+- Use the minimum number of tool calls needed
+- Provide clear summaries of actions taken
+
+## Important Notes
+- Task IDs are UUIDs - ensure you use the correct format
+- Running a task requires at least one connected daemon
+- Overlay operations require the task to have been run at least once
+- Always confirm destructive operations (discard_task) with the user
+- When creating subtasks for this task, use parent_task_id: {task_id}"#,
+ task_context = task_context,
+ task_id = task_id
+ );
+
+ // Run the shared agentic loop
+ run_mesh_agentic_loop(pool, &state, &llm_client, system_prompt, &request, auth.owner_id).await
+}
+
+fn build_task_context(task: &crate::db::models::Task) -> String {
+ let mut context = format!(
+ "Current Task: {} (ID: {})\n",
+ task.name, task.id
+ );
+ context.push_str(&format!("Status: {}\n", task.status));
+ context.push_str(&format!("Priority: {}\n", task.priority));
+
+ if let Some(ref desc) = task.description {
+ context.push_str(&format!("Description: {}\n", desc));
+ }
+
+ // Truncate plan preview if too long
+ let plan_preview = if task.plan.len() > 200 {
+ format!("{}...", &task.plan[..200])
+ } else {
+ task.plan.clone()
+ };
+ context.push_str(&format!("Plan: {}\n", plan_preview));
+
+ if let Some(ref summary) = task.progress_summary {
+ context.push_str(&format!("Progress: {}\n", summary));
+ }
+
+ if let Some(ref error) = task.error_message {
+ context.push_str(&format!("Error: {}\n", error));
+ }
+
+ // Repository info
+ if let Some(ref url) = task.repository_url {
+ context.push_str(&format!("Repository: {}\n", url));
+ }
+ if let Some(ref branch) = task.base_branch {
+ context.push_str(&format!("Base branch: {}\n", branch));
+ }
+
+ context
+}
+
+/// Build overview context for top-level mesh orchestration
+async fn build_mesh_overview_context(pool: &sqlx::PgPool, state: &SharedState, owner_id: Uuid) -> String {
+ let mut context = String::new();
+
+ // Get task counts by status
+ match repository::list_tasks_for_owner(pool, owner_id).await {
+ Ok(tasks) => {
+ let total = tasks.len();
+ let pending = tasks.iter().filter(|t| t.status == "pending").count();
+ let running = tasks.iter().filter(|t| t.status == "running").count();
+ let paused = tasks.iter().filter(|t| t.status == "paused").count();
+ let done = tasks.iter().filter(|t| t.status == "done").count();
+ let failed = tasks.iter().filter(|t| t.status == "failed").count();
+
+ context.push_str(&format!(
+ "Tasks: {} total ({} pending, {} running, {} paused, {} done, {} failed)\n",
+ total, pending, running, paused, done, failed
+ ));
+
+ // List recent/active tasks
+ if !tasks.is_empty() {
+ context.push_str("\nRecent Tasks:\n");
+ for task in tasks.iter().take(5) {
+ context.push_str(&format!(
+ " - {} (ID: {}, Status: {})\n",
+ task.name, task.id, task.status
+ ));
+ }
+ if tasks.len() > 5 {
+ context.push_str(&format!(" ... and {} more\n", tasks.len() - 5));
+ }
+ }
+ }
+ Err(e) => {
+ context.push_str(&format!("Error fetching tasks: {}\n", e));
+ }
+ }
+
+ // Get connected daemons for this owner
+ let owner_daemons: Vec<_> = state.daemon_connections.iter()
+ .filter(|e| e.value().owner_id == owner_id)
+ .collect();
+ let daemon_count = owner_daemons.len();
+ context.push_str(&format!("\nConnected Daemons: {}\n", daemon_count));
+
+ for entry in owner_daemons.iter().take(3) {
+ let daemon = entry.value();
+ let working_dir = daemon.working_directory.as_deref().unwrap_or("not set");
+ context.push_str(&format!(
+ " - {} (ID: {}, Working Directory: {})\n",
+ daemon.hostname.as_deref().unwrap_or("unknown"),
+ daemon.id,
+ working_dir
+ ));
+ }
+
+ // Add default repository guidance if there's exactly one daemon with a working directory
+ let daemons_with_working_dir: Vec<_> = owner_daemons.iter()
+ .filter(|e| e.value().working_directory.is_some())
+ .collect();
+
+ if daemons_with_working_dir.len() == 1 {
+ if let Some(dir) = &daemons_with_working_dir[0].value().working_directory {
+ context.push_str(&format!(
+ "\nDefault Repository: {} (use this as repository_url when creating tasks unless user specifies otherwise)\n",
+ dir
+ ));
+ }
+ }
+
+ context
+}
+
+/// Run the shared agentic loop for mesh chat
+async fn run_mesh_agentic_loop(
+ pool: &sqlx::PgPool,
+ state: &SharedState,
+ llm_client: &LlmClient,
+ system_prompt: String,
+ request: &MeshChatRequest,
+ owner_id: Uuid,
+) -> axum::response::Response {
+ // Get or create conversation for storing messages
+ let conversation = match repository::get_or_create_active_conversation(pool, owner_id).await {
+ Ok(c) => c,
+ Err(e) => {
+ tracing::error!("Failed to get/create conversation: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(json!({ "error": format!("Failed to initialize conversation: {}", e) })),
+ )
+ .into_response();
+ }
+ };
+
+ // Build initial messages
+ let mut messages = vec![Message {
+ role: "system".to_string(),
+ content: Some(system_prompt),
+ tool_calls: None,
+ tool_call_id: None,
+ }];
+
+ // Load conversation history from database (or use provided for backwards compatibility)
+ if let Some(history) = &request.history {
+ // Legacy: use provided history
+ for hist_msg in history {
+ messages.push(Message {
+ role: hist_msg.role.clone(),
+ content: Some(hist_msg.content.clone()),
+ tool_calls: None,
+ tool_call_id: None,
+ });
+ }
+ tracing::info!(
+ history_messages = history.len(),
+ "Loaded mesh conversation history from request (legacy)"
+ );
+ } else {
+ // New: load from database
+ match repository::list_chat_messages(pool, conversation.id, Some(50)).await {
+ Ok(db_messages) => {
+ for msg in db_messages {
+ messages.push(Message {
+ role: msg.role.clone(),
+ content: Some(msg.content.clone()),
+ tool_calls: None,
+ tool_call_id: None,
+ });
+ }
+ tracing::info!(
+ history_messages = messages.len() - 1, // minus system message
+ "Loaded mesh conversation history from database"
+ );
+ }
+ Err(e) => {
+ tracing::warn!("Failed to load chat history: {}", e);
+ // Continue without history
+ }
+ }
+ }
+
+ // Add current user message
+ messages.push(Message {
+ role: "user".to_string(),
+ content: Some(request.message.clone()),
+ tool_calls: None,
+ tool_call_id: None,
+ });
+
+ // State for tracking
+ let mut all_tool_call_infos: Vec<MeshToolCallInfo> = Vec::new();
+ let mut final_response: Option<String> = None;
+ let mut consecutive_failures = 0;
+ const MAX_CONSECUTIVE_FAILURES: usize = 3;
+ let mut pending_questions: Option<Vec<UserQuestion>> = None;
+
+ // Multi-turn agentic tool calling loop
+ for round in 0..MAX_TOOL_ROUNDS {
+ tracing::info!(
+ round = round,
+ total_tool_calls = all_tool_call_infos.len(),
+ "Mesh agentic loop iteration"
+ );
+
+ // Check consecutive failures
+ if consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
+ tracing::warn!(
+ "Breaking mesh loop due to {} consecutive failures",
+ consecutive_failures
+ );
+ final_response = Some(
+ "I encountered multiple consecutive errors and stopped. \
+ Please check the task state and try again."
+ .to_string(),
+ );
+ break;
+ }
+
+ // Call the appropriate LLM API
+ let result = match llm_client {
+ LlmClient::Groq(groq) => {
+ match groq.chat_with_tools(messages.clone(), &MESH_TOOLS).await {
+ Ok(r) => LlmResult {
+ content: r.content,
+ tool_calls: r.tool_calls,
+ raw_tool_calls: r.raw_tool_calls,
+ finish_reason: r.finish_reason,
+ },
+ Err(e) => {
+ tracing::error!("Groq API error: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(json!({ "error": format!("LLM API error: {}", e) })),
+ )
+ .into_response();
+ }
+ }
+ }
+ LlmClient::Claude(claude_client) => {
+ let claude_messages = claude::groq_messages_to_claude(&messages);
+ match claude_client
+ .chat_with_tools(claude_messages, &MESH_TOOLS)
+ .await
+ {
+ Ok(r) => {
+ let raw_tool_calls: Vec<ToolCallResponse> = r
+ .tool_calls
+ .iter()
+ .map(|tc| ToolCallResponse {
+ id: tc.id.clone(),
+ call_type: "function".to_string(),
+ function: crate::llm::groq::FunctionCall {
+ name: tc.name.clone(),
+ arguments: tc.arguments.to_string(),
+ },
+ })
+ .collect();
+
+ LlmResult {
+ content: r.content,
+ tool_calls: r.tool_calls,
+ raw_tool_calls,
+ finish_reason: r.stop_reason,
+ }
+ }
+ Err(e) => {
+ tracing::error!("Claude API error: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(json!({ "error": format!("LLM API error: {}", e) })),
+ )
+ .into_response();
+ }
+ }
+ }
+ };
+
+ // Check if there are tool calls to execute
+ if result.tool_calls.is_empty() {
+ final_response = result.content;
+ break;
+ }
+
+ // Add assistant message with tool calls to conversation
+ messages.push(Message {
+ role: "assistant".to_string(),
+ content: result.content.clone(),
+ tool_calls: Some(result.raw_tool_calls.clone()),
+ tool_call_id: None,
+ });
+
+ // Execute each tool call
+ for (i, tool_call) in result.tool_calls.iter().enumerate() {
+ tracing::info!(tool = %tool_call.name, round = round, "Executing mesh tool call");
+
+ // Parse the tool call
+ let mut execution_result = parse_mesh_tool_call(tool_call);
+
+ // Handle async mesh tool requests
+ if let Some(mesh_request) = execution_result.request.take() {
+ let async_result = handle_mesh_request(pool, state, mesh_request, owner_id).await;
+ execution_result.success = async_result.success;
+ execution_result.message = async_result.message;
+ execution_result.data = async_result.data;
+ }
+
+ // Track consecutive failures
+ if execution_result.success {
+ consecutive_failures = 0;
+ } else {
+ consecutive_failures += 1;
+ tracing::warn!(
+ tool = %tool_call.name,
+ consecutive_failures = consecutive_failures,
+ "Mesh tool call failed"
+ );
+ }
+
+ // Check for pending user questions
+ if let Some(questions) = execution_result.pending_questions {
+ tracing::info!(
+ question_count = questions.len(),
+ "Mesh LLM requesting user input"
+ );
+ pending_questions = Some(questions);
+ all_tool_call_infos.push(MeshToolCallInfo {
+ name: tool_call.name.clone(),
+ result: ToolResult {
+ success: execution_result.success,
+ message: execution_result.message.clone(),
+ },
+ });
+ break;
+ }
+
+ // Build tool result message
+ let result_content = if let Some(data) = &execution_result.data {
+ json!({
+ "success": execution_result.success,
+ "message": execution_result.message,
+ "data": data
+ })
+ .to_string()
+ } else {
+ json!({
+ "success": execution_result.success,
+ "message": execution_result.message
+ })
+ .to_string()
+ };
+
+ // Add tool result message
+ let tool_call_id = match llm_client {
+ LlmClient::Groq(_) => result.raw_tool_calls[i].id.clone(),
+ LlmClient::Claude(_) => tool_call.id.clone(),
+ };
+
+ messages.push(Message {
+ role: "tool".to_string(),
+ content: Some(result_content),
+ tool_calls: None,
+ tool_call_id: Some(tool_call_id),
+ });
+
+ // Track for response
+ all_tool_call_infos.push(MeshToolCallInfo {
+ name: tool_call.name.clone(),
+ result: ToolResult {
+ success: execution_result.success,
+ message: execution_result.message,
+ },
+ });
+ }
+
+ // If user questions are pending, pause
+ if pending_questions.is_some() {
+ final_response = result.content;
+ break;
+ }
+
+ // If finish reason indicates completion, exit loop
+ let finish_lower = result.finish_reason.to_lowercase();
+ if finish_lower == "stop" || finish_lower == "end_turn" {
+ final_response = result.content;
+ break;
+ }
+ }
+
+ // Build response
+ let response_text = final_response.unwrap_or_else(|| {
+ if all_tool_call_infos.is_empty() {
+ "I couldn't understand your request. Please try rephrasing.".to_string()
+ } else {
+ format!(
+ "Done! Executed {} tool{}.",
+ all_tool_call_infos.len(),
+ if all_tool_call_infos.len() == 1 {
+ ""
+ } else {
+ "s"
+ }
+ )
+ }
+ });
+
+ // Save messages to database (only if not using legacy history mode)
+ if request.history.is_none() {
+ let context_type = request.context_type.clone().unwrap_or_else(|| "mesh".to_string());
+
+ // Validate context_task_id exists before using it (to avoid FK constraint violation)
+ let context_task_id = if let Some(task_id) = request.context_task_id {
+ match repository::get_task_for_owner(pool, task_id, owner_id).await {
+ Ok(Some(_)) => Some(task_id),
+ Ok(None) => {
+ tracing::warn!("context_task_id {} not found, ignoring", task_id);
+ None
+ }
+ Err(e) => {
+ tracing::warn!("Failed to validate context_task_id {}: {}", task_id, e);
+ None
+ }
+ }
+ } else {
+ None
+ };
+
+ // Save user message
+ if let Err(e) = repository::add_chat_message(
+ pool,
+ conversation.id,
+ "user",
+ &request.message,
+ &context_type,
+ context_task_id,
+ None,
+ None,
+ )
+ .await
+ {
+ tracing::warn!("Failed to save user message to DB: {}", e);
+ }
+
+ // Serialize tool calls for storage
+ let tool_calls_json = if all_tool_call_infos.is_empty() {
+ None
+ } else {
+ Some(serde_json::to_value(&all_tool_call_infos).unwrap_or_default())
+ };
+
+ // Serialize pending questions for storage
+ let pending_questions_json = pending_questions
+ .as_ref()
+ .map(|q| serde_json::to_value(q).unwrap_or_default());
+
+ // Save assistant message
+ if let Err(e) = repository::add_chat_message(
+ pool,
+ conversation.id,
+ "assistant",
+ &response_text,
+ &context_type,
+ context_task_id,
+ tool_calls_json,
+ pending_questions_json,
+ )
+ .await
+ {
+ tracing::warn!("Failed to save assistant message to DB: {}", e);
+ }
+
+ tracing::info!(
+ conversation_id = %conversation.id,
+ context_type = %context_type,
+ "Saved mesh chat messages to database"
+ );
+ }
+
+ (
+ StatusCode::OK,
+ Json(MeshChatResponse {
+ response: response_text,
+ tool_calls: all_tool_call_infos,
+ pending_questions,
+ }),
+ )
+ .into_response()
+}
+
+/// Result from handling an async mesh tool request
+struct MeshRequestResult {
+ success: bool,
+ message: String,
+ data: Option<serde_json::Value>,
+}
+
+/// Handle async mesh tool requests that require database/daemon access
+async fn handle_mesh_request(
+ pool: &sqlx::PgPool,
+ state: &SharedState,
+ request: MeshToolRequest,
+ owner_id: Uuid,
+) -> MeshRequestResult {
+ match request {
+ MeshToolRequest::CreateTask {
+ name,
+ plan,
+ parent_task_id,
+ repository_url,
+ base_branch,
+ merge_mode,
+ priority,
+ } => {
+ // Check if repository_url matches a daemon's working directory (for this owner)
+ let is_daemon_working_dir = repository_url.as_ref().map(|url| {
+ state.daemon_connections.iter().any(|entry| {
+ entry.value().owner_id == owner_id &&
+ entry.value().working_directory.as_ref() == Some(url)
+ })
+ }).unwrap_or(false);
+
+ // Derive completion_action from merge_mode, or default to "branch" if using daemon working dir
+ let (completion_action, target_repo_path) = if let Some(ref mode) = merge_mode {
+ // Explicit merge_mode provided - derive from it
+ let action = match mode.as_str() {
+ "pr" => "pr".to_string(),
+ "auto" => "merge".to_string(),
+ "manual" => "branch".to_string(),
+ _ => "none".to_string(),
+ };
+ // If using daemon working dir and action involves the repo, set target_repo_path
+ let target = if is_daemon_working_dir && action != "none" {
+ repository_url.clone()
+ } else {
+ None
+ };
+ (Some(action), target)
+ } else if is_daemon_working_dir {
+ // No merge_mode but using daemon working dir - default to "branch"
+ (Some("branch".to_string()), repository_url.clone())
+ } else {
+ (None, None)
+ };
+
+ let create_req = CreateTaskRequest {
+ name: name.clone(),
+ description: None,
+ plan,
+ parent_task_id,
+ repository_url,
+ base_branch,
+ target_branch: None,
+ merge_mode,
+ priority: priority.unwrap_or(0),
+ target_repo_path,
+ completion_action,
+ continue_from_task_id: None,
+ copy_files: None,
+ };
+
+ match repository::create_task_for_owner(pool, owner_id, create_req).await {
+ Ok(task) => MeshRequestResult {
+ success: true,
+ message: format!("Created task '{}' with ID {}", name, task.id),
+ data: Some(json!({
+ "taskId": task.id,
+ "name": task.name,
+ "status": task.status,
+ })),
+ },
+ Err(e) => MeshRequestResult {
+ success: false,
+ message: format!("Failed to create task: {}", e),
+ data: None,
+ },
+ }
+ }
+
+ MeshToolRequest::RunTask { task_id, daemon_id } => {
+ // Get task to check status
+ let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Task {} not found", task_id),
+ data: None,
+ }
+ }
+ Err(e) => {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Database error: {}", e),
+ data: None,
+ }
+ }
+ };
+
+ if task.status != "pending" && task.status != "paused" {
+ return MeshRequestResult {
+ success: false,
+ message: format!(
+ "Task cannot be run - status is '{}' (must be 'pending' or 'paused')",
+ task.status
+ ),
+ data: None,
+ };
+ }
+
+ // Find a daemon to run on (must belong to this owner)
+ let target_daemon_id = if let Some(id) = daemon_id {
+ // Verify the specified daemon belongs to this owner
+ if !state.daemon_connections.iter().any(|d| d.value().id == id && d.value().owner_id == owner_id) {
+ return MeshRequestResult {
+ success: false,
+ message: "Specified daemon not found or not accessible.".to_string(),
+ data: None,
+ };
+ }
+ id
+ } else {
+ // Find any connected daemon for this owner
+ let daemon = state.daemon_connections.iter().find(|d| d.value().owner_id == owner_id);
+ match daemon {
+ Some(d) => d.value().id,
+ None => {
+ return MeshRequestResult {
+ success: false,
+ message: "No daemons connected for your account. Cannot run task.".to_string(),
+ data: None,
+ }
+ }
+ }
+ };
+
+ // Check if this is an orchestrator (depth 0 with subtasks)
+ let subtask_count = match repository::list_subtasks_for_owner(pool, task_id, owner_id).await {
+ Ok(subtasks) => subtasks.len(),
+ Err(_) => 0,
+ };
+ let is_orchestrator = task.depth == 0 && subtask_count > 0;
+
+ // Send SpawnTask command to daemon
+ let command = DaemonCommand::SpawnTask {
+ task_id,
+ task_name: task.name.clone(),
+ plan: task.plan.clone(),
+ repo_url: task.repository_url.clone(),
+ base_branch: task.base_branch.clone(),
+ target_branch: task.target_branch.clone(),
+ parent_task_id: task.parent_task_id,
+ depth: task.depth,
+ is_orchestrator,
+ target_repo_path: task.target_repo_path.clone(),
+ completion_action: task.completion_action.clone(),
+ continue_from_task_id: task.continue_from_task_id,
+ copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
+ };
+
+ match state.send_daemon_command(target_daemon_id, command).await {
+ Ok(()) => {
+ // Update task status to running
+ let update_req = crate::db::models::UpdateTaskRequest {
+ status: Some("running".to_string()),
+ version: Some(task.version),
+ ..Default::default()
+ };
+
+ if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await {
+ state.broadcast_task_update(TaskUpdateNotification {
+ task_id,
+ owner_id: Some(task.owner_id),
+ version: updated.version,
+ status: "running".to_string(),
+ updated_fields: vec!["status".to_string()],
+ updated_by: "system".to_string(),
+ });
+ }
+
+ MeshRequestResult {
+ success: true,
+ message: format!("Task {} is now running on daemon {}", task_id, target_daemon_id),
+ data: Some(json!({
+ "taskId": task_id,
+ "daemonId": target_daemon_id,
+ "status": "running",
+ })),
+ }
+ }
+ Err(e) => MeshRequestResult {
+ success: false,
+ message: format!("Failed to start task: {}", e),
+ data: None,
+ },
+ }
+ }
+
+ MeshToolRequest::PauseTask { task_id } => {
+ // Get task and its daemon
+ let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Task {} not found", task_id),
+ data: None,
+ }
+ }
+ Err(e) => {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Database error: {}", e),
+ data: None,
+ }
+ }
+ };
+
+ if task.status != "running" {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Task is not running (status: {})", task.status),
+ data: None,
+ };
+ }
+
+ if let Some(daemon_id) = task.daemon_id {
+ let command = DaemonCommand::PauseTask { task_id };
+ if let Err(e) = state.send_daemon_command(daemon_id, command).await {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Failed to pause task: {}", e),
+ data: None,
+ };
+ }
+ }
+
+ // Update status
+ let update_req = crate::db::models::UpdateTaskRequest {
+ status: Some("paused".to_string()),
+ version: Some(task.version),
+ ..Default::default()
+ };
+
+ if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await {
+ state.broadcast_task_update(TaskUpdateNotification {
+ task_id,
+ owner_id: Some(task.owner_id),
+ version: updated.version,
+ status: "paused".to_string(),
+ updated_fields: vec!["status".to_string()],
+ updated_by: "system".to_string(),
+ });
+ }
+
+ MeshRequestResult {
+ success: true,
+ message: format!("Task {} paused", task_id),
+ data: Some(json!({ "taskId": task_id, "status": "paused" })),
+ }
+ }
+
+ MeshToolRequest::ResumeTask { task_id } => {
+ let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Task {} not found", task_id),
+ data: None,
+ }
+ }
+ Err(e) => {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Database error: {}", e),
+ data: None,
+ }
+ }
+ };
+
+ if task.status != "paused" {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Task is not paused (status: {})", task.status),
+ data: None,
+ };
+ }
+
+ if let Some(daemon_id) = task.daemon_id {
+ let command = DaemonCommand::ResumeTask { task_id };
+ if let Err(e) = state.send_daemon_command(daemon_id, command).await {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Failed to resume task: {}", e),
+ data: None,
+ };
+ }
+ }
+
+ // Update status
+ let update_req = crate::db::models::UpdateTaskRequest {
+ status: Some("running".to_string()),
+ version: Some(task.version),
+ ..Default::default()
+ };
+
+ if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await {
+ state.broadcast_task_update(TaskUpdateNotification {
+ task_id,
+ owner_id: Some(task.owner_id),
+ version: updated.version,
+ status: "running".to_string(),
+ updated_fields: vec!["status".to_string()],
+ updated_by: "system".to_string(),
+ });
+ }
+
+ MeshRequestResult {
+ success: true,
+ message: format!("Task {} resumed", task_id),
+ data: Some(json!({ "taskId": task_id, "status": "running" })),
+ }
+ }
+
+ MeshToolRequest::InterruptTask { task_id, graceful } => {
+ let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Task {} not found", task_id),
+ data: None,
+ }
+ }
+ Err(e) => {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Database error: {}", e),
+ data: None,
+ }
+ }
+ };
+
+ if let Some(daemon_id) = task.daemon_id {
+ let command = DaemonCommand::InterruptTask { task_id, graceful };
+ if let Err(e) = state.send_daemon_command(daemon_id, command).await {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Failed to interrupt task: {}", e),
+ data: None,
+ };
+ }
+ }
+
+ // Update status
+ let update_req = crate::db::models::UpdateTaskRequest {
+ status: Some("paused".to_string()),
+ version: Some(task.version),
+ ..Default::default()
+ };
+
+ if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await {
+ state.broadcast_task_update(TaskUpdateNotification {
+ task_id,
+ owner_id: Some(task.owner_id),
+ version: updated.version,
+ status: "paused".to_string(),
+ updated_fields: vec!["status".to_string()],
+ updated_by: "system".to_string(),
+ });
+ }
+
+ MeshRequestResult {
+ success: true,
+ message: format!(
+ "Task {} {}interrupted",
+ task_id,
+ if graceful { "gracefully " } else { "" }
+ ),
+ data: Some(json!({ "taskId": task_id, "status": "paused" })),
+ }
+ }
+
+ MeshToolRequest::DiscardTask { task_id } => {
+ match repository::delete_task_for_owner(pool, task_id, owner_id).await {
+ Ok(true) => MeshRequestResult {
+ success: true,
+ message: format!("Task {} discarded", task_id),
+ data: Some(json!({ "taskId": task_id, "deleted": true })),
+ },
+ Ok(false) => MeshRequestResult {
+ success: false,
+ message: format!("Task {} not found", task_id),
+ data: None,
+ },
+ Err(e) => MeshRequestResult {
+ success: false,
+ message: format!("Failed to delete task: {}", e),
+ data: None,
+ },
+ }
+ }
+
+ MeshToolRequest::QueryTaskStatus { task_id } => {
+ match repository::get_task_for_owner(pool, task_id, owner_id).await {
+ Ok(Some(task)) => MeshRequestResult {
+ success: true,
+ message: format!("Task '{}' is {}", task.name, task.status),
+ data: Some(json!({
+ "taskId": task.id,
+ "name": task.name,
+ "status": task.status,
+ "priority": task.priority,
+ "description": task.description,
+ "plan": task.plan,
+ "progressSummary": task.progress_summary,
+ "errorMessage": task.error_message,
+ "repositoryUrl": task.repository_url,
+ "baseBranch": task.base_branch,
+ "targetBranch": task.target_branch,
+ "mergeMode": task.merge_mode,
+ "prUrl": task.pr_url,
+ "daemonId": task.daemon_id,
+ "containerId": task.container_id,
+ "createdAt": task.created_at,
+ "startedAt": task.started_at,
+ "completedAt": task.completed_at,
+ })),
+ },
+ Ok(None) => MeshRequestResult {
+ success: false,
+ message: format!("Task {} not found", task_id),
+ data: None,
+ },
+ Err(e) => MeshRequestResult {
+ success: false,
+ message: format!("Database error: {}", e),
+ data: None,
+ },
+ }
+ }
+
+ MeshToolRequest::ListTasks {
+ status_filter,
+ parent_task_id,
+ } => {
+ // TODO: Add filtering support to repository
+ match repository::list_tasks_for_owner(pool, owner_id).await {
+ Ok(mut tasks) => {
+ // Apply filters
+ if let Some(ref status) = status_filter {
+ tasks.retain(|t| &t.status == status);
+ }
+ if let Some(ref parent_id) = parent_task_id {
+ tasks.retain(|t| t.parent_task_id.as_ref() == Some(parent_id));
+ }
+
+ let task_data: Vec<serde_json::Value> = tasks
+ .iter()
+ .map(|t| {
+ json!({
+ "taskId": t.id,
+ "name": t.name,
+ "status": t.status,
+ "priority": t.priority,
+ "parentTaskId": t.parent_task_id,
+ })
+ })
+ .collect();
+
+ MeshRequestResult {
+ success: true,
+ message: format!("Found {} tasks", tasks.len()),
+ data: Some(json!({ "tasks": task_data })),
+ }
+ }
+ Err(e) => MeshRequestResult {
+ success: false,
+ message: format!("Database error: {}", e),
+ data: None,
+ },
+ }
+ }
+
+ MeshToolRequest::ListSubtasks { task_id } => {
+ match repository::list_subtasks_for_owner(pool, task_id, owner_id).await {
+ Ok(subtasks) => {
+ let subtask_data: Vec<serde_json::Value> = subtasks
+ .iter()
+ .map(|t| {
+ json!({
+ "taskId": t.id,
+ "name": t.name,
+ "status": t.status,
+ "priority": t.priority,
+ })
+ })
+ .collect();
+
+ MeshRequestResult {
+ success: true,
+ message: format!("Found {} subtasks", subtasks.len()),
+ data: Some(json!({ "subtasks": subtask_data })),
+ }
+ }
+ Err(e) => MeshRequestResult {
+ success: false,
+ message: format!("Database error: {}", e),
+ data: None,
+ },
+ }
+ }
+
+ MeshToolRequest::ListSiblings { task_id } => {
+ // Get task to find parent
+ let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Task {} not found", task_id),
+ data: None,
+ }
+ }
+ Err(e) => {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Database error: {}", e),
+ data: None,
+ }
+ }
+ };
+
+ let Some(parent_id) = task.parent_task_id else {
+ return MeshRequestResult {
+ success: true,
+ message: "Task has no parent, so no siblings".to_string(),
+ data: Some(json!({ "siblings": [] })),
+ };
+ };
+
+ // Get all subtasks of parent, excluding current task
+ match repository::list_subtasks_for_owner(pool, parent_id, owner_id).await {
+ Ok(siblings) => {
+ let sibling_data: Vec<serde_json::Value> = siblings
+ .iter()
+ .filter(|t| t.id != task_id)
+ .map(|t| {
+ json!({
+ "taskId": t.id,
+ "name": t.name,
+ "status": t.status,
+ "priority": t.priority,
+ })
+ })
+ .collect();
+
+ MeshRequestResult {
+ success: true,
+ message: format!("Found {} sibling tasks", sibling_data.len()),
+ data: Some(json!({ "siblings": sibling_data })),
+ }
+ }
+ Err(e) => MeshRequestResult {
+ success: false,
+ message: format!("Database error: {}", e),
+ data: None,
+ },
+ }
+ }
+
+ MeshToolRequest::ListDaemons => {
+ // Only list daemons belonging to this owner
+ let daemons: Vec<serde_json::Value> = state
+ .daemon_connections
+ .iter()
+ .filter(|entry| entry.value().owner_id == owner_id)
+ .map(|entry| {
+ let d = entry.value();
+ json!({
+ "daemonId": d.id,
+ "connectionId": d.connection_id,
+ "hostname": d.hostname,
+ "machineId": d.machine_id,
+ })
+ })
+ .collect();
+
+ MeshRequestResult {
+ success: true,
+ message: format!("{} daemon(s) connected", daemons.len()),
+ data: Some(json!({ "daemons": daemons })),
+ }
+ }
+
+ MeshToolRequest::ListDaemonDirectories => {
+ let mut directories: Vec<serde_json::Value> = Vec::new();
+
+ // Only list directories from daemons belonging to this owner
+ for entry in state.daemon_connections.iter() {
+ let daemon = entry.value();
+
+ // Only include daemons belonging to this owner
+ if daemon.owner_id != owner_id {
+ continue;
+ }
+
+ // Add working directory if available
+ if let Some(ref working_dir) = daemon.working_directory {
+ directories.push(json!({
+ "path": working_dir,
+ "label": "Working Directory",
+ "directoryType": "working",
+ "hostname": daemon.hostname,
+ }));
+ }
+
+ // Add home directory if available
+ if let Some(ref home_dir) = daemon.home_directory {
+ directories.push(json!({
+ "path": home_dir,
+ "label": "Makima Home",
+ "directoryType": "home",
+ "hostname": daemon.hostname,
+ }));
+ }
+ }
+
+ MeshRequestResult {
+ success: true,
+ message: format!("Found {} available directories", directories.len()),
+ data: Some(json!({ "directories": directories })),
+ }
+ }
+
+ MeshToolRequest::ListFiles => {
+ match repository::list_files_for_owner(pool, owner_id).await {
+ Ok(files) => {
+ let file_data: Vec<serde_json::Value> = files
+ .iter()
+ .map(|f| {
+ json!({
+ "fileId": f.id,
+ "name": f.name,
+ "description": f.description,
+ "createdAt": f.created_at,
+ "updatedAt": f.updated_at,
+ })
+ })
+ .collect();
+
+ MeshRequestResult {
+ success: true,
+ message: format!("Found {} files", files.len()),
+ data: Some(json!({ "files": file_data })),
+ }
+ }
+ Err(e) => MeshRequestResult {
+ success: false,
+ message: format!("Database error: {}", e),
+ data: None,
+ },
+ }
+ }
+
+ MeshToolRequest::ReadFile { file_id } => {
+ match repository::get_file_for_owner(pool, file_id, owner_id).await {
+ Ok(Some(file)) => {
+ // Convert body elements to readable text
+ let body_content: Vec<serde_json::Value> = file
+ .body
+ .iter()
+ .map(|elem| {
+ match elem {
+ crate::db::models::BodyElement::Heading { level, text } => {
+ json!({ "type": "heading", "level": level, "text": text })
+ }
+ crate::db::models::BodyElement::Paragraph { text } => {
+ json!({ "type": "paragraph", "text": text })
+ }
+ crate::db::models::BodyElement::Code { language, content } => {
+ json!({ "type": "code", "language": language, "content": content })
+ }
+ crate::db::models::BodyElement::List { ordered, items } => {
+ json!({ "type": "list", "ordered": ordered, "items": items })
+ }
+ crate::db::models::BodyElement::Chart { chart_type, title, data, config: _ } => {
+ let data_count = data.as_array().map(|arr| arr.len()).unwrap_or(0);
+ json!({ "type": "chart", "chartType": chart_type, "title": title, "dataPoints": data_count })
+ }
+ crate::db::models::BodyElement::Image { src, alt, caption } => {
+ json!({ "type": "image", "src": src, "alt": alt, "caption": caption })
+ }
+ }
+ })
+ .collect();
+
+ // Also build a plain text version for easier reading
+ let plain_text: String = file
+ .body
+ .iter()
+ .filter_map(|elem| {
+ match elem {
+ crate::db::models::BodyElement::Heading { level, text } => {
+ Some(format!("{} {}", "#".repeat(*level as usize), text))
+ }
+ crate::db::models::BodyElement::Paragraph { text } => {
+ Some(text.clone())
+ }
+ crate::db::models::BodyElement::Code { language, content } => {
+ let lang = language.as_deref().unwrap_or("");
+ Some(format!("```{}\n{}\n```", lang, content))
+ }
+ crate::db::models::BodyElement::List { ordered, items } => {
+ let list_text: Vec<String> = items.iter().enumerate().map(|(i, item)| {
+ if *ordered {
+ format!("{}. {}", i + 1, item)
+ } else {
+ format!("- {}", item)
+ }
+ }).collect();
+ Some(list_text.join("\n"))
+ }
+ _ => None,
+ }
+ })
+ .collect::<Vec<_>>()
+ .join("\n\n");
+
+ // Convert transcript entries to JSON
+ let transcript: Vec<serde_json::Value> = file
+ .transcript
+ .iter()
+ .map(|entry| {
+ json!({
+ "id": entry.id,
+ "speaker": entry.speaker,
+ "start": entry.start,
+ "end": entry.end,
+ "text": entry.text,
+ })
+ })
+ .collect();
+
+ // Build a plain text transcript for easier reading
+ let transcript_text: String = file
+ .transcript
+ .iter()
+ .map(|entry| {
+ format!("[{:.1}s] {}: {}", entry.start, entry.speaker, entry.text)
+ })
+ .collect::<Vec<_>>()
+ .join("\n");
+
+ MeshRequestResult {
+ success: true,
+ message: format!("Read file '{}'", file.name),
+ data: Some(json!({
+ "fileId": file.id,
+ "name": file.name,
+ "description": file.description,
+ "summary": file.summary,
+ "body": body_content,
+ "plainText": plain_text,
+ "transcript": transcript,
+ "transcriptText": transcript_text,
+ "transcriptCount": file.transcript.len(),
+ "createdAt": file.created_at,
+ "updatedAt": file.updated_at,
+ })),
+ }
+ }
+ Ok(None) => MeshRequestResult {
+ success: false,
+ message: format!("File {} not found", file_id),
+ data: None,
+ },
+ Err(e) => MeshRequestResult {
+ success: false,
+ message: format!("Database error: {}", e),
+ data: None,
+ },
+ }
+ }
+
+ MeshToolRequest::SendMessageToTask { task_id, message } => {
+ let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Task {} not found", task_id),
+ data: None,
+ }
+ }
+ Err(e) => {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Database error: {}", e),
+ data: None,
+ }
+ }
+ };
+
+ if task.status != "running" {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Task is not running (status: {})", task.status),
+ data: None,
+ };
+ }
+
+ if let Some(daemon_id) = task.daemon_id {
+ let command = DaemonCommand::SendMessage { task_id, message };
+ match state.send_daemon_command(daemon_id, command).await {
+ Ok(()) => MeshRequestResult {
+ success: true,
+ message: "Message sent to task".to_string(),
+ data: Some(json!({ "taskId": task_id })),
+ },
+ Err(e) => MeshRequestResult {
+ success: false,
+ message: format!("Failed to send message: {}", e),
+ data: None,
+ },
+ }
+ } else {
+ MeshRequestResult {
+ success: false,
+ message: "Task has no daemon assigned".to_string(),
+ data: None,
+ }
+ }
+ }
+
+ MeshToolRequest::UpdateTaskPlan {
+ task_id,
+ new_plan,
+ interrupt_if_running,
+ } => {
+ let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Task {} not found", task_id),
+ data: None,
+ }
+ }
+ Err(e) => {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Database error: {}", e),
+ data: None,
+ }
+ }
+ };
+
+ // Interrupt if running and requested
+ if task.status == "running" && interrupt_if_running {
+ if let Some(daemon_id) = task.daemon_id {
+ let command = DaemonCommand::InterruptTask {
+ task_id,
+ graceful: true,
+ };
+ let _ = state.send_daemon_command(daemon_id, command).await;
+ }
+ }
+
+ let update_req = crate::db::models::UpdateTaskRequest {
+ plan: Some(new_plan),
+ version: Some(task.version),
+ ..Default::default()
+ };
+
+ match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await {
+ Ok(Some(updated)) => {
+ state.broadcast_task_update(TaskUpdateNotification {
+ task_id,
+ owner_id: Some(task.owner_id),
+ version: updated.version,
+ status: updated.status.clone(),
+ updated_fields: vec!["plan".to_string()],
+ updated_by: "system".to_string(),
+ });
+ MeshRequestResult {
+ success: true,
+ message: "Task plan updated".to_string(),
+ data: Some(json!({ "taskId": task_id })),
+ }
+ }
+ Ok(None) => MeshRequestResult {
+ success: false,
+ message: "Task not found".to_string(),
+ data: None,
+ },
+ Err(e) => MeshRequestResult {
+ success: false,
+ message: format!("Failed to update task: {}", e),
+ data: None,
+ },
+ }
+ }
+
+ // Overlay operations - these require daemon communication
+ // For now, return placeholder responses since daemon implementation is separate
+ MeshToolRequest::PeekSiblingOverlay { sibling_task_id } => MeshRequestResult {
+ success: false,
+ message: format!(
+ "Overlay operations require a connected daemon. Task {} may not have overlay data yet.",
+ sibling_task_id
+ ),
+ data: None,
+ },
+
+ MeshToolRequest::GetOverlayDiff { task_id } => MeshRequestResult {
+ success: false,
+ message: format!(
+ "Overlay operations require a connected daemon. Task {} may not have overlay data yet.",
+ task_id
+ ),
+ data: None,
+ },
+
+ MeshToolRequest::PreviewMerge { task_id } => MeshRequestResult {
+ success: false,
+ message: format!(
+ "Merge preview requires a connected daemon. Task {} may not have overlay data yet.",
+ task_id
+ ),
+ data: None,
+ },
+
+ MeshToolRequest::MergeSubtask { task_id } => MeshRequestResult {
+ success: false,
+ message: format!(
+ "Merge operations require a connected daemon. Task {}",
+ task_id
+ ),
+ data: None,
+ },
+
+ MeshToolRequest::CompleteTask { task_id } => {
+ let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Task {} not found", task_id),
+ data: None,
+ }
+ }
+ Err(e) => {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Database error: {}", e),
+ data: None,
+ }
+ }
+ };
+
+ // Update status to done
+ let update_req = crate::db::models::UpdateTaskRequest {
+ status: Some("done".to_string()),
+ version: Some(task.version),
+ ..Default::default()
+ };
+
+ match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await {
+ Ok(Some(updated)) => {
+ state.broadcast_task_update(TaskUpdateNotification {
+ task_id,
+ owner_id: Some(task.owner_id),
+ version: updated.version,
+ status: "done".to_string(),
+ updated_fields: vec!["status".to_string()],
+ updated_by: "system".to_string(),
+ });
+ let merge_mode = task.merge_mode.unwrap_or_else(|| "pr".to_string());
+ MeshRequestResult {
+ success: true,
+ message: format!(
+ "Task {} completed. Merge mode: {}",
+ task_id,
+ &merge_mode
+ ),
+ data: Some(json!({
+ "taskId": task_id,
+ "status": "done",
+ "mergeMode": merge_mode,
+ })),
+ }
+ }
+ Ok(None) => MeshRequestResult {
+ success: false,
+ message: "Task not found".to_string(),
+ data: None,
+ },
+ Err(e) => MeshRequestResult {
+ success: false,
+ message: format!("Failed to complete task: {}", e),
+ data: None,
+ },
+ }
+ }
+
+ MeshToolRequest::SetMergeMode { task_id, mode } => {
+ let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Task {} not found", task_id),
+ data: None,
+ }
+ }
+ Err(e) => {
+ return MeshRequestResult {
+ success: false,
+ message: format!("Database error: {}", e),
+ data: None,
+ }
+ }
+ };
+
+ let update_req = crate::db::models::UpdateTaskRequest {
+ merge_mode: Some(mode.clone()),
+ version: Some(task.version),
+ ..Default::default()
+ };
+
+ match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await {
+ Ok(Some(updated)) => {
+ state.broadcast_task_update(TaskUpdateNotification {
+ task_id,
+ owner_id: Some(task.owner_id),
+ version: updated.version,
+ status: updated.status,
+ updated_fields: vec!["merge_mode".to_string()],
+ updated_by: "system".to_string(),
+ });
+ MeshRequestResult {
+ success: true,
+ message: format!("Merge mode set to '{}'", mode),
+ data: Some(json!({ "taskId": task_id, "mergeMode": mode })),
+ }
+ }
+ Ok(None) => MeshRequestResult {
+ success: false,
+ message: "Task not found".to_string(),
+ data: None,
+ },
+ Err(e) => MeshRequestResult {
+ success: false,
+ message: format!("Failed to update merge mode: {}", e),
+ data: None,
+ },
+ }
+ }
+ }
+}
+
+// =============================================================================
+// Chat History Endpoints
+// =============================================================================
+
+use crate::db::models::MeshChatHistoryResponse;
+
+/// Get chat history for the current conversation (requires authentication)
+#[utoipa::path(
+ get,
+ path = "/api/v1/mesh/chat/history",
+ responses(
+ (status = 200, description = "Chat history", body = MeshChatHistoryResponse),
+ (status = 401, description = "Unauthorized"),
+ (status = 503, description = "Database not configured"),
+ (status = 500, description = "Internal server error")
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn get_chat_history(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(json!({ "error": "Database not configured" })),
+ )
+ .into_response();
+ };
+
+ let conversation = match repository::get_or_create_active_conversation(pool, auth.owner_id).await {
+ Ok(c) => c,
+ Err(e) => {
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(json!({ "error": e.to_string() })),
+ )
+ .into_response()
+ }
+ };
+
+ let messages = match repository::list_chat_messages(pool, conversation.id, None).await {
+ Ok(m) => m,
+ Err(e) => {
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(json!({ "error": e.to_string() })),
+ )
+ .into_response()
+ }
+ };
+
+ (
+ StatusCode::OK,
+ Json(MeshChatHistoryResponse {
+ conversation_id: conversation.id,
+ messages,
+ }),
+ )
+ .into_response()
+}
+
+/// Clear chat history (archives current conversation and starts new, requires authentication)
+#[utoipa::path(
+ delete,
+ path = "/api/v1/mesh/chat/history",
+ responses(
+ (status = 200, description = "History cleared"),
+ (status = 401, description = "Unauthorized"),
+ (status = 503, description = "Database not configured"),
+ (status = 500, description = "Internal server error")
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn clear_chat_history(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(json!({ "error": "Database not configured" })),
+ )
+ .into_response();
+ };
+
+ match repository::clear_conversation(pool, auth.owner_id).await {
+ Ok(new_conv) => (
+ StatusCode::OK,
+ Json(json!({ "success": true, "conversationId": new_conv.id })),
+ )
+ .into_response(),
+ Err(e) => (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(json!({ "error": e.to_string() })),
+ )
+ .into_response(),
+ }
+}