From 8b17a175c3e7e27b789812eba4e3cd760beadb10 Mon Sep 17 00:00:00 2001 From: soryu Date: Tue, 6 Jan 2026 04:08:11 +0000 Subject: Initial Control system --- makima/src/server/handlers/mesh_chat.rs | 2088 +++++++++++++++++++++++++++++++ 1 file changed, 2088 insertions(+) create mode 100644 makima/src/server/handlers/mesh_chat.rs (limited to 'makima/src/server/handlers/mesh_chat.rs') diff --git a/makima/src/server/handlers/mesh_chat.rs b/makima/src/server/handlers/mesh_chat.rs new file mode 100644 index 0000000..5d6d2ee --- /dev/null +++ b/makima/src/server/handlers/mesh_chat.rs @@ -0,0 +1,2088 @@ +//! Chat endpoint for LLM-powered task orchestration. +//! +//! This handler provides an agentic loop for managing tasks, daemons, and +//! overlay operations through LLM tool calling. + +use axum::{ + extract::{Path, State}, + http::StatusCode, + response::IntoResponse, + Json, +}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use utoipa::ToSchema; +use uuid::Uuid; + +use crate::db::{models::CreateTaskRequest, repository}; +use crate::llm::{ + claude::{self, ClaudeClient, ClaudeError, ClaudeModel}, + groq::{GroqClient, GroqError, Message, ToolCallResponse}, + parse_mesh_tool_call, LlmModel, MeshToolRequest, ToolCall, ToolResult, UserQuestion, + MESH_TOOLS, +}; +use crate::server::auth::Authenticated; +use crate::server::state::{DaemonCommand, SharedState, TaskUpdateNotification}; + +/// Maximum number of tool-calling rounds to prevent infinite loops +const MAX_TOOL_ROUNDS: usize = 30; + +#[derive(Debug, Clone, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct MeshChatHistoryMessage { + /// Role: "user" or "assistant" + pub role: String, + /// Message content + pub content: String, +} + +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct MeshChatRequest { + /// The user's message/instruction + pub message: String, + /// Optional model selection: "claude-sonnet" (default), "claude-opus", or "groq" + #[serde(default)] + pub model: Option, + /// Optional conversation history for context continuity (deprecated - now loaded from DB) + #[serde(default)] + pub history: Option>, + /// Context type: "mesh", "task", or "subtask" + #[serde(default)] + pub context_type: Option, + /// Task ID if context is task/subtask + #[serde(default)] + pub context_task_id: Option, +} + +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct MeshChatResponse { + /// The LLM's response message + pub response: String, + /// Tool calls that were executed + pub tool_calls: Vec, + /// Questions pending user answers (pauses conversation) + #[serde(skip_serializing_if = "Option::is_none")] + pub pending_questions: Option>, +} + +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct MeshToolCallInfo { + pub name: String, + pub result: ToolResult, +} + +/// Enum to hold LLM clients +enum LlmClient { + Groq(GroqClient), + Claude(ClaudeClient), +} + +/// Unified result from LLM call +struct LlmResult { + content: Option, + tool_calls: Vec, + raw_tool_calls: Vec, + finish_reason: String, +} + +/// Chat with mesh orchestrator at the top level (no specific task context) +#[utoipa::path( + post, + path = "/api/v1/mesh/chat", + request_body = MeshChatRequest, + responses( + (status = 200, description = "Chat completed successfully", body = MeshChatResponse), + (status = 401, description = "Unauthorized"), + (status = 500, description = "Internal server error") + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn mesh_toplevel_chat_handler( + State(state): State, + Authenticated(auth): Authenticated, + Json(request): Json, +) -> impl IntoResponse { + // Check if database is configured + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(json!({ "error": "Database not configured" })), + ) + .into_response(); + }; + + // Parse model selection (default to Claude Sonnet) + let model = request + .model + .as_ref() + .and_then(|m| LlmModel::from_str(m)) + .unwrap_or(LlmModel::ClaudeSonnet); + + tracing::info!("Mesh top-level chat using LLM model: {:?}", model); + + // Initialize the appropriate LLM client + let llm_client = match model { + LlmModel::ClaudeSonnet => match ClaudeClient::from_env(ClaudeModel::Sonnet) { + Ok(client) => LlmClient::Claude(client), + Err(ClaudeError::MissingApiKey) => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })), + ) + .into_response(); + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("Claude client error: {}", e) })), + ) + .into_response(); + } + }, + LlmModel::ClaudeOpus => match ClaudeClient::from_env(ClaudeModel::Opus) { + Ok(client) => LlmClient::Claude(client), + Err(ClaudeError::MissingApiKey) => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })), + ) + .into_response(); + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("Claude client error: {}", e) })), + ) + .into_response(); + } + }, + LlmModel::GroqKimi => match GroqClient::from_env() { + Ok(client) => LlmClient::Groq(client), + Err(GroqError::MissingApiKey) => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(json!({ "error": "GROQ_API_KEY not configured" })), + ) + .into_response(); + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("Groq client error: {}", e) })), + ) + .into_response(); + } + }, + }; + + // Build context about all tasks and daemons + let mesh_context = build_mesh_overview_context(pool, &state, auth.owner_id).await; + + // Build agentic system prompt for top-level mesh orchestration + let system_prompt = format!( + r#"You are an intelligent task orchestration agent. You help users manage and coordinate tasks running on connected daemons with Claude Code containers. + +## Your Capabilities +You have access to tools for: +- **Task Lifecycle**: create_task, run_task, pause_task, resume_task, interrupt_task, discard_task +- **Task Queries**: query_task_status, list_tasks, list_subtasks, list_siblings, list_daemons +- **File Access**: list_files, read_file (read documents from the files system) +- **Task Communication**: send_message_to_task, update_task_plan +- **Overlay/Merge Operations**: peek_sibling_overlay, get_overlay_diff, preview_merge, merge_subtask, complete_task, set_merge_mode + +## Current Mesh Overview +{mesh_context} + +## Agentic Behavior Guidelines + +### 1. Analyze Before Acting +- For complex orchestration requests, first gather information using query_task_status, list_tasks, or list_daemons +- Understand the current state before making changes +- For simple, direct requests (e.g., "create a new task"), you can act immediately + +### 2. Plan Multi-Step Operations +- Break complex orchestration into logical steps +- For parallel execution: create multiple subtasks, then run them on different daemons +- For sequential execution: create subtasks and run them in order + +### 3. Create and Manage Tasks +- Use create_task to create new top-level tasks or subtasks +- Assign appropriate priorities and plans +- **Repository Default**: When creating tasks, use the daemon's working directory as the repository_url by default (shown as "Default Repository" above). Only omit repository_url if the task doesn't involve code, or use a different URL if the user explicitly requests it. +- If a working directory is a git repository, use it as the repository_url for code-related tasks + +### 4. Coordinate Multiple Tasks +- Use list_tasks to see all tasks and their statuses +- Use list_daemons to see available compute resources +- Balance workload across daemons + +### 5. Be Efficient +- Don't over-analyze simple requests +- Use the minimum number of tool calls needed +- Provide clear summaries of actions taken + +## Important Notes +- Task IDs are UUIDs - ensure you use the correct format +- Running a task requires at least one connected daemon +- When creating subtasks, specify the parent_task_id +- Always confirm destructive operations (discard_task) with the user"#, + mesh_context = mesh_context + ); + + // Run the shared agentic loop + run_mesh_agentic_loop(pool, &state, &llm_client, system_prompt, &request, auth.owner_id).await +} + +/// Chat with task mesh orchestrator using LLM tool calling (scoped by owner) +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/chat", + request_body = MeshChatRequest, + responses( + (status = 200, description = "Chat completed successfully", body = MeshChatResponse), + (status = 401, description = "Unauthorized"), + (status = 404, description = "Task not found"), + (status = 500, description = "Internal server error") + ), + params( + ("id" = Uuid, Path, description = "Task ID (context for orchestration)") + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn mesh_chat_handler( + State(state): State, + Authenticated(auth): Authenticated, + Path(task_id): Path, + Json(request): Json, +) -> impl IntoResponse { + // Check if database is configured + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(json!({ "error": "Database not configured" })), + ) + .into_response(); + }; + + // Get the context task (scoped by owner) + let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { + Ok(Some(task)) => task, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(json!({ "error": "Task not found" })), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Database error: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("Database error: {}", e) })), + ) + .into_response(); + } + }; + + // Parse model selection (default to Claude Sonnet) + let model = request + .model + .as_ref() + .and_then(|m| LlmModel::from_str(m)) + .unwrap_or(LlmModel::ClaudeSonnet); + + tracing::info!("Mesh chat using LLM model: {:?}", model); + + // Initialize the appropriate LLM client + let llm_client = match model { + LlmModel::ClaudeSonnet => match ClaudeClient::from_env(ClaudeModel::Sonnet) { + Ok(client) => LlmClient::Claude(client), + Err(ClaudeError::MissingApiKey) => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })), + ) + .into_response(); + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("Claude client error: {}", e) })), + ) + .into_response(); + } + }, + LlmModel::ClaudeOpus => match ClaudeClient::from_env(ClaudeModel::Opus) { + Ok(client) => LlmClient::Claude(client), + Err(ClaudeError::MissingApiKey) => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })), + ) + .into_response(); + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("Claude client error: {}", e) })), + ) + .into_response(); + } + }, + LlmModel::GroqKimi => match GroqClient::from_env() { + Ok(client) => LlmClient::Groq(client), + Err(GroqError::MissingApiKey) => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(json!({ "error": "GROQ_API_KEY not configured" })), + ) + .into_response(); + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("Groq client error: {}", e) })), + ) + .into_response(); + } + }, + }; + + // Build context about the current task and mesh state + let task_context = build_task_context(&task); + + // Build agentic system prompt for task orchestration + let system_prompt = format!( + r#"You are an intelligent task orchestration agent. You help users manage and coordinate tasks running on connected daemons with Claude Code containers. + +## Your Capabilities +You have access to tools for: +- **Task Lifecycle**: create_task, run_task, pause_task, resume_task, interrupt_task, discard_task +- **Task Queries**: query_task_status, list_tasks, list_subtasks, list_siblings, list_daemons +- **File Access**: list_files, read_file (read documents from the files system) +- **Task Communication**: send_message_to_task, update_task_plan +- **Overlay/Merge Operations**: peek_sibling_overlay, get_overlay_diff, preview_merge, merge_subtask, complete_task, set_merge_mode + +## Current Context +{task_context} + +## Agentic Behavior Guidelines + +### 1. Analyze Before Acting +- For complex orchestration requests, first gather information using query_task_status, list_tasks, or list_daemons +- Understand the current state before making changes +- For simple, direct requests (e.g., "pause this task"), you can act immediately + +### 2. Plan Multi-Step Operations +- Break complex orchestration into logical steps +- For parallel execution: create multiple subtasks, then run them on different daemons +- For sequential execution: create subtasks and run them in order + +### 3. Monitor Task Progress +- Use query_task_status to check on running tasks +- Watch for status changes and react accordingly +- Handle failures gracefully (retry, escalate, or report) + +### 4. Coordinate Sibling Tasks +- Use peek_sibling_overlay to see what other tasks have changed +- Preview merges before completing to catch conflicts +- Coordinate timing when multiple tasks need to merge + +### 5. Be Efficient +- Don't over-analyze simple requests +- Use the minimum number of tool calls needed +- Provide clear summaries of actions taken + +## Important Notes +- Task IDs are UUIDs - ensure you use the correct format +- Running a task requires at least one connected daemon +- Overlay operations require the task to have been run at least once +- Always confirm destructive operations (discard_task) with the user +- When creating subtasks for this task, use parent_task_id: {task_id}"#, + task_context = task_context, + task_id = task_id + ); + + // Run the shared agentic loop + run_mesh_agentic_loop(pool, &state, &llm_client, system_prompt, &request, auth.owner_id).await +} + +fn build_task_context(task: &crate::db::models::Task) -> String { + let mut context = format!( + "Current Task: {} (ID: {})\n", + task.name, task.id + ); + context.push_str(&format!("Status: {}\n", task.status)); + context.push_str(&format!("Priority: {}\n", task.priority)); + + if let Some(ref desc) = task.description { + context.push_str(&format!("Description: {}\n", desc)); + } + + // Truncate plan preview if too long + let plan_preview = if task.plan.len() > 200 { + format!("{}...", &task.plan[..200]) + } else { + task.plan.clone() + }; + context.push_str(&format!("Plan: {}\n", plan_preview)); + + if let Some(ref summary) = task.progress_summary { + context.push_str(&format!("Progress: {}\n", summary)); + } + + if let Some(ref error) = task.error_message { + context.push_str(&format!("Error: {}\n", error)); + } + + // Repository info + if let Some(ref url) = task.repository_url { + context.push_str(&format!("Repository: {}\n", url)); + } + if let Some(ref branch) = task.base_branch { + context.push_str(&format!("Base branch: {}\n", branch)); + } + + context +} + +/// Build overview context for top-level mesh orchestration +async fn build_mesh_overview_context(pool: &sqlx::PgPool, state: &SharedState, owner_id: Uuid) -> String { + let mut context = String::new(); + + // Get task counts by status + match repository::list_tasks_for_owner(pool, owner_id).await { + Ok(tasks) => { + let total = tasks.len(); + let pending = tasks.iter().filter(|t| t.status == "pending").count(); + let running = tasks.iter().filter(|t| t.status == "running").count(); + let paused = tasks.iter().filter(|t| t.status == "paused").count(); + let done = tasks.iter().filter(|t| t.status == "done").count(); + let failed = tasks.iter().filter(|t| t.status == "failed").count(); + + context.push_str(&format!( + "Tasks: {} total ({} pending, {} running, {} paused, {} done, {} failed)\n", + total, pending, running, paused, done, failed + )); + + // List recent/active tasks + if !tasks.is_empty() { + context.push_str("\nRecent Tasks:\n"); + for task in tasks.iter().take(5) { + context.push_str(&format!( + " - {} (ID: {}, Status: {})\n", + task.name, task.id, task.status + )); + } + if tasks.len() > 5 { + context.push_str(&format!(" ... and {} more\n", tasks.len() - 5)); + } + } + } + Err(e) => { + context.push_str(&format!("Error fetching tasks: {}\n", e)); + } + } + + // Get connected daemons for this owner + let owner_daemons: Vec<_> = state.daemon_connections.iter() + .filter(|e| e.value().owner_id == owner_id) + .collect(); + let daemon_count = owner_daemons.len(); + context.push_str(&format!("\nConnected Daemons: {}\n", daemon_count)); + + for entry in owner_daemons.iter().take(3) { + let daemon = entry.value(); + let working_dir = daemon.working_directory.as_deref().unwrap_or("not set"); + context.push_str(&format!( + " - {} (ID: {}, Working Directory: {})\n", + daemon.hostname.as_deref().unwrap_or("unknown"), + daemon.id, + working_dir + )); + } + + // Add default repository guidance if there's exactly one daemon with a working directory + let daemons_with_working_dir: Vec<_> = owner_daemons.iter() + .filter(|e| e.value().working_directory.is_some()) + .collect(); + + if daemons_with_working_dir.len() == 1 { + if let Some(dir) = &daemons_with_working_dir[0].value().working_directory { + context.push_str(&format!( + "\nDefault Repository: {} (use this as repository_url when creating tasks unless user specifies otherwise)\n", + dir + )); + } + } + + context +} + +/// Run the shared agentic loop for mesh chat +async fn run_mesh_agentic_loop( + pool: &sqlx::PgPool, + state: &SharedState, + llm_client: &LlmClient, + system_prompt: String, + request: &MeshChatRequest, + owner_id: Uuid, +) -> axum::response::Response { + // Get or create conversation for storing messages + let conversation = match repository::get_or_create_active_conversation(pool, owner_id).await { + Ok(c) => c, + Err(e) => { + tracing::error!("Failed to get/create conversation: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("Failed to initialize conversation: {}", e) })), + ) + .into_response(); + } + }; + + // Build initial messages + let mut messages = vec![Message { + role: "system".to_string(), + content: Some(system_prompt), + tool_calls: None, + tool_call_id: None, + }]; + + // Load conversation history from database (or use provided for backwards compatibility) + if let Some(history) = &request.history { + // Legacy: use provided history + for hist_msg in history { + messages.push(Message { + role: hist_msg.role.clone(), + content: Some(hist_msg.content.clone()), + tool_calls: None, + tool_call_id: None, + }); + } + tracing::info!( + history_messages = history.len(), + "Loaded mesh conversation history from request (legacy)" + ); + } else { + // New: load from database + match repository::list_chat_messages(pool, conversation.id, Some(50)).await { + Ok(db_messages) => { + for msg in db_messages { + messages.push(Message { + role: msg.role.clone(), + content: Some(msg.content.clone()), + tool_calls: None, + tool_call_id: None, + }); + } + tracing::info!( + history_messages = messages.len() - 1, // minus system message + "Loaded mesh conversation history from database" + ); + } + Err(e) => { + tracing::warn!("Failed to load chat history: {}", e); + // Continue without history + } + } + } + + // Add current user message + messages.push(Message { + role: "user".to_string(), + content: Some(request.message.clone()), + tool_calls: None, + tool_call_id: None, + }); + + // State for tracking + let mut all_tool_call_infos: Vec = Vec::new(); + let mut final_response: Option = None; + let mut consecutive_failures = 0; + const MAX_CONSECUTIVE_FAILURES: usize = 3; + let mut pending_questions: Option> = None; + + // Multi-turn agentic tool calling loop + for round in 0..MAX_TOOL_ROUNDS { + tracing::info!( + round = round, + total_tool_calls = all_tool_call_infos.len(), + "Mesh agentic loop iteration" + ); + + // Check consecutive failures + if consecutive_failures >= MAX_CONSECUTIVE_FAILURES { + tracing::warn!( + "Breaking mesh loop due to {} consecutive failures", + consecutive_failures + ); + final_response = Some( + "I encountered multiple consecutive errors and stopped. \ + Please check the task state and try again." + .to_string(), + ); + break; + } + + // Call the appropriate LLM API + let result = match llm_client { + LlmClient::Groq(groq) => { + match groq.chat_with_tools(messages.clone(), &MESH_TOOLS).await { + Ok(r) => LlmResult { + content: r.content, + tool_calls: r.tool_calls, + raw_tool_calls: r.raw_tool_calls, + finish_reason: r.finish_reason, + }, + Err(e) => { + tracing::error!("Groq API error: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("LLM API error: {}", e) })), + ) + .into_response(); + } + } + } + LlmClient::Claude(claude_client) => { + let claude_messages = claude::groq_messages_to_claude(&messages); + match claude_client + .chat_with_tools(claude_messages, &MESH_TOOLS) + .await + { + Ok(r) => { + let raw_tool_calls: Vec = r + .tool_calls + .iter() + .map(|tc| ToolCallResponse { + id: tc.id.clone(), + call_type: "function".to_string(), + function: crate::llm::groq::FunctionCall { + name: tc.name.clone(), + arguments: tc.arguments.to_string(), + }, + }) + .collect(); + + LlmResult { + content: r.content, + tool_calls: r.tool_calls, + raw_tool_calls, + finish_reason: r.stop_reason, + } + } + Err(e) => { + tracing::error!("Claude API error: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("LLM API error: {}", e) })), + ) + .into_response(); + } + } + } + }; + + // Check if there are tool calls to execute + if result.tool_calls.is_empty() { + final_response = result.content; + break; + } + + // Add assistant message with tool calls to conversation + messages.push(Message { + role: "assistant".to_string(), + content: result.content.clone(), + tool_calls: Some(result.raw_tool_calls.clone()), + tool_call_id: None, + }); + + // Execute each tool call + for (i, tool_call) in result.tool_calls.iter().enumerate() { + tracing::info!(tool = %tool_call.name, round = round, "Executing mesh tool call"); + + // Parse the tool call + let mut execution_result = parse_mesh_tool_call(tool_call); + + // Handle async mesh tool requests + if let Some(mesh_request) = execution_result.request.take() { + let async_result = handle_mesh_request(pool, state, mesh_request, owner_id).await; + execution_result.success = async_result.success; + execution_result.message = async_result.message; + execution_result.data = async_result.data; + } + + // Track consecutive failures + if execution_result.success { + consecutive_failures = 0; + } else { + consecutive_failures += 1; + tracing::warn!( + tool = %tool_call.name, + consecutive_failures = consecutive_failures, + "Mesh tool call failed" + ); + } + + // Check for pending user questions + if let Some(questions) = execution_result.pending_questions { + tracing::info!( + question_count = questions.len(), + "Mesh LLM requesting user input" + ); + pending_questions = Some(questions); + all_tool_call_infos.push(MeshToolCallInfo { + name: tool_call.name.clone(), + result: ToolResult { + success: execution_result.success, + message: execution_result.message.clone(), + }, + }); + break; + } + + // Build tool result message + let result_content = if let Some(data) = &execution_result.data { + json!({ + "success": execution_result.success, + "message": execution_result.message, + "data": data + }) + .to_string() + } else { + json!({ + "success": execution_result.success, + "message": execution_result.message + }) + .to_string() + }; + + // Add tool result message + let tool_call_id = match llm_client { + LlmClient::Groq(_) => result.raw_tool_calls[i].id.clone(), + LlmClient::Claude(_) => tool_call.id.clone(), + }; + + messages.push(Message { + role: "tool".to_string(), + content: Some(result_content), + tool_calls: None, + tool_call_id: Some(tool_call_id), + }); + + // Track for response + all_tool_call_infos.push(MeshToolCallInfo { + name: tool_call.name.clone(), + result: ToolResult { + success: execution_result.success, + message: execution_result.message, + }, + }); + } + + // If user questions are pending, pause + if pending_questions.is_some() { + final_response = result.content; + break; + } + + // If finish reason indicates completion, exit loop + let finish_lower = result.finish_reason.to_lowercase(); + if finish_lower == "stop" || finish_lower == "end_turn" { + final_response = result.content; + break; + } + } + + // Build response + let response_text = final_response.unwrap_or_else(|| { + if all_tool_call_infos.is_empty() { + "I couldn't understand your request. Please try rephrasing.".to_string() + } else { + format!( + "Done! Executed {} tool{}.", + all_tool_call_infos.len(), + if all_tool_call_infos.len() == 1 { + "" + } else { + "s" + } + ) + } + }); + + // Save messages to database (only if not using legacy history mode) + if request.history.is_none() { + let context_type = request.context_type.clone().unwrap_or_else(|| "mesh".to_string()); + + // Validate context_task_id exists before using it (to avoid FK constraint violation) + let context_task_id = if let Some(task_id) = request.context_task_id { + match repository::get_task_for_owner(pool, task_id, owner_id).await { + Ok(Some(_)) => Some(task_id), + Ok(None) => { + tracing::warn!("context_task_id {} not found, ignoring", task_id); + None + } + Err(e) => { + tracing::warn!("Failed to validate context_task_id {}: {}", task_id, e); + None + } + } + } else { + None + }; + + // Save user message + if let Err(e) = repository::add_chat_message( + pool, + conversation.id, + "user", + &request.message, + &context_type, + context_task_id, + None, + None, + ) + .await + { + tracing::warn!("Failed to save user message to DB: {}", e); + } + + // Serialize tool calls for storage + let tool_calls_json = if all_tool_call_infos.is_empty() { + None + } else { + Some(serde_json::to_value(&all_tool_call_infos).unwrap_or_default()) + }; + + // Serialize pending questions for storage + let pending_questions_json = pending_questions + .as_ref() + .map(|q| serde_json::to_value(q).unwrap_or_default()); + + // Save assistant message + if let Err(e) = repository::add_chat_message( + pool, + conversation.id, + "assistant", + &response_text, + &context_type, + context_task_id, + tool_calls_json, + pending_questions_json, + ) + .await + { + tracing::warn!("Failed to save assistant message to DB: {}", e); + } + + tracing::info!( + conversation_id = %conversation.id, + context_type = %context_type, + "Saved mesh chat messages to database" + ); + } + + ( + StatusCode::OK, + Json(MeshChatResponse { + response: response_text, + tool_calls: all_tool_call_infos, + pending_questions, + }), + ) + .into_response() +} + +/// Result from handling an async mesh tool request +struct MeshRequestResult { + success: bool, + message: String, + data: Option, +} + +/// Handle async mesh tool requests that require database/daemon access +async fn handle_mesh_request( + pool: &sqlx::PgPool, + state: &SharedState, + request: MeshToolRequest, + owner_id: Uuid, +) -> MeshRequestResult { + match request { + MeshToolRequest::CreateTask { + name, + plan, + parent_task_id, + repository_url, + base_branch, + merge_mode, + priority, + } => { + // Check if repository_url matches a daemon's working directory (for this owner) + let is_daemon_working_dir = repository_url.as_ref().map(|url| { + state.daemon_connections.iter().any(|entry| { + entry.value().owner_id == owner_id && + entry.value().working_directory.as_ref() == Some(url) + }) + }).unwrap_or(false); + + // Derive completion_action from merge_mode, or default to "branch" if using daemon working dir + let (completion_action, target_repo_path) = if let Some(ref mode) = merge_mode { + // Explicit merge_mode provided - derive from it + let action = match mode.as_str() { + "pr" => "pr".to_string(), + "auto" => "merge".to_string(), + "manual" => "branch".to_string(), + _ => "none".to_string(), + }; + // If using daemon working dir and action involves the repo, set target_repo_path + let target = if is_daemon_working_dir && action != "none" { + repository_url.clone() + } else { + None + }; + (Some(action), target) + } else if is_daemon_working_dir { + // No merge_mode but using daemon working dir - default to "branch" + (Some("branch".to_string()), repository_url.clone()) + } else { + (None, None) + }; + + let create_req = CreateTaskRequest { + name: name.clone(), + description: None, + plan, + parent_task_id, + repository_url, + base_branch, + target_branch: None, + merge_mode, + priority: priority.unwrap_or(0), + target_repo_path, + completion_action, + continue_from_task_id: None, + copy_files: None, + }; + + match repository::create_task_for_owner(pool, owner_id, create_req).await { + Ok(task) => MeshRequestResult { + success: true, + message: format!("Created task '{}' with ID {}", name, task.id), + data: Some(json!({ + "taskId": task.id, + "name": task.name, + "status": task.status, + })), + }, + Err(e) => MeshRequestResult { + success: false, + message: format!("Failed to create task: {}", e), + data: None, + }, + } + } + + MeshToolRequest::RunTask { task_id, daemon_id } => { + // Get task to check status + let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return MeshRequestResult { + success: false, + message: format!("Task {} not found", task_id), + data: None, + } + } + Err(e) => { + return MeshRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + } + } + }; + + if task.status != "pending" && task.status != "paused" { + return MeshRequestResult { + success: false, + message: format!( + "Task cannot be run - status is '{}' (must be 'pending' or 'paused')", + task.status + ), + data: None, + }; + } + + // Find a daemon to run on (must belong to this owner) + let target_daemon_id = if let Some(id) = daemon_id { + // Verify the specified daemon belongs to this owner + if !state.daemon_connections.iter().any(|d| d.value().id == id && d.value().owner_id == owner_id) { + return MeshRequestResult { + success: false, + message: "Specified daemon not found or not accessible.".to_string(), + data: None, + }; + } + id + } else { + // Find any connected daemon for this owner + let daemon = state.daemon_connections.iter().find(|d| d.value().owner_id == owner_id); + match daemon { + Some(d) => d.value().id, + None => { + return MeshRequestResult { + success: false, + message: "No daemons connected for your account. Cannot run task.".to_string(), + data: None, + } + } + } + }; + + // Check if this is an orchestrator (depth 0 with subtasks) + let subtask_count = match repository::list_subtasks_for_owner(pool, task_id, owner_id).await { + Ok(subtasks) => subtasks.len(), + Err(_) => 0, + }; + let is_orchestrator = task.depth == 0 && subtask_count > 0; + + // Send SpawnTask command to daemon + let command = DaemonCommand::SpawnTask { + task_id, + task_name: task.name.clone(), + plan: task.plan.clone(), + repo_url: task.repository_url.clone(), + base_branch: task.base_branch.clone(), + target_branch: task.target_branch.clone(), + parent_task_id: task.parent_task_id, + depth: task.depth, + is_orchestrator, + target_repo_path: task.target_repo_path.clone(), + completion_action: task.completion_action.clone(), + continue_from_task_id: task.continue_from_task_id, + copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), + }; + + match state.send_daemon_command(target_daemon_id, command).await { + Ok(()) => { + // Update task status to running + let update_req = crate::db::models::UpdateTaskRequest { + status: Some("running".to_string()), + version: Some(task.version), + ..Default::default() + }; + + if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { + state.broadcast_task_update(TaskUpdateNotification { + task_id, + owner_id: Some(task.owner_id), + version: updated.version, + status: "running".to_string(), + updated_fields: vec!["status".to_string()], + updated_by: "system".to_string(), + }); + } + + MeshRequestResult { + success: true, + message: format!("Task {} is now running on daemon {}", task_id, target_daemon_id), + data: Some(json!({ + "taskId": task_id, + "daemonId": target_daemon_id, + "status": "running", + })), + } + } + Err(e) => MeshRequestResult { + success: false, + message: format!("Failed to start task: {}", e), + data: None, + }, + } + } + + MeshToolRequest::PauseTask { task_id } => { + // Get task and its daemon + let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return MeshRequestResult { + success: false, + message: format!("Task {} not found", task_id), + data: None, + } + } + Err(e) => { + return MeshRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + } + } + }; + + if task.status != "running" { + return MeshRequestResult { + success: false, + message: format!("Task is not running (status: {})", task.status), + data: None, + }; + } + + if let Some(daemon_id) = task.daemon_id { + let command = DaemonCommand::PauseTask { task_id }; + if let Err(e) = state.send_daemon_command(daemon_id, command).await { + return MeshRequestResult { + success: false, + message: format!("Failed to pause task: {}", e), + data: None, + }; + } + } + + // Update status + let update_req = crate::db::models::UpdateTaskRequest { + status: Some("paused".to_string()), + version: Some(task.version), + ..Default::default() + }; + + if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { + state.broadcast_task_update(TaskUpdateNotification { + task_id, + owner_id: Some(task.owner_id), + version: updated.version, + status: "paused".to_string(), + updated_fields: vec!["status".to_string()], + updated_by: "system".to_string(), + }); + } + + MeshRequestResult { + success: true, + message: format!("Task {} paused", task_id), + data: Some(json!({ "taskId": task_id, "status": "paused" })), + } + } + + MeshToolRequest::ResumeTask { task_id } => { + let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return MeshRequestResult { + success: false, + message: format!("Task {} not found", task_id), + data: None, + } + } + Err(e) => { + return MeshRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + } + } + }; + + if task.status != "paused" { + return MeshRequestResult { + success: false, + message: format!("Task is not paused (status: {})", task.status), + data: None, + }; + } + + if let Some(daemon_id) = task.daemon_id { + let command = DaemonCommand::ResumeTask { task_id }; + if let Err(e) = state.send_daemon_command(daemon_id, command).await { + return MeshRequestResult { + success: false, + message: format!("Failed to resume task: {}", e), + data: None, + }; + } + } + + // Update status + let update_req = crate::db::models::UpdateTaskRequest { + status: Some("running".to_string()), + version: Some(task.version), + ..Default::default() + }; + + if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { + state.broadcast_task_update(TaskUpdateNotification { + task_id, + owner_id: Some(task.owner_id), + version: updated.version, + status: "running".to_string(), + updated_fields: vec!["status".to_string()], + updated_by: "system".to_string(), + }); + } + + MeshRequestResult { + success: true, + message: format!("Task {} resumed", task_id), + data: Some(json!({ "taskId": task_id, "status": "running" })), + } + } + + MeshToolRequest::InterruptTask { task_id, graceful } => { + let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return MeshRequestResult { + success: false, + message: format!("Task {} not found", task_id), + data: None, + } + } + Err(e) => { + return MeshRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + } + } + }; + + if let Some(daemon_id) = task.daemon_id { + let command = DaemonCommand::InterruptTask { task_id, graceful }; + if let Err(e) = state.send_daemon_command(daemon_id, command).await { + return MeshRequestResult { + success: false, + message: format!("Failed to interrupt task: {}", e), + data: None, + }; + } + } + + // Update status + let update_req = crate::db::models::UpdateTaskRequest { + status: Some("paused".to_string()), + version: Some(task.version), + ..Default::default() + }; + + if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { + state.broadcast_task_update(TaskUpdateNotification { + task_id, + owner_id: Some(task.owner_id), + version: updated.version, + status: "paused".to_string(), + updated_fields: vec!["status".to_string()], + updated_by: "system".to_string(), + }); + } + + MeshRequestResult { + success: true, + message: format!( + "Task {} {}interrupted", + task_id, + if graceful { "gracefully " } else { "" } + ), + data: Some(json!({ "taskId": task_id, "status": "paused" })), + } + } + + MeshToolRequest::DiscardTask { task_id } => { + match repository::delete_task_for_owner(pool, task_id, owner_id).await { + Ok(true) => MeshRequestResult { + success: true, + message: format!("Task {} discarded", task_id), + data: Some(json!({ "taskId": task_id, "deleted": true })), + }, + Ok(false) => MeshRequestResult { + success: false, + message: format!("Task {} not found", task_id), + data: None, + }, + Err(e) => MeshRequestResult { + success: false, + message: format!("Failed to delete task: {}", e), + data: None, + }, + } + } + + MeshToolRequest::QueryTaskStatus { task_id } => { + match repository::get_task_for_owner(pool, task_id, owner_id).await { + Ok(Some(task)) => MeshRequestResult { + success: true, + message: format!("Task '{}' is {}", task.name, task.status), + data: Some(json!({ + "taskId": task.id, + "name": task.name, + "status": task.status, + "priority": task.priority, + "description": task.description, + "plan": task.plan, + "progressSummary": task.progress_summary, + "errorMessage": task.error_message, + "repositoryUrl": task.repository_url, + "baseBranch": task.base_branch, + "targetBranch": task.target_branch, + "mergeMode": task.merge_mode, + "prUrl": task.pr_url, + "daemonId": task.daemon_id, + "containerId": task.container_id, + "createdAt": task.created_at, + "startedAt": task.started_at, + "completedAt": task.completed_at, + })), + }, + Ok(None) => MeshRequestResult { + success: false, + message: format!("Task {} not found", task_id), + data: None, + }, + Err(e) => MeshRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + }, + } + } + + MeshToolRequest::ListTasks { + status_filter, + parent_task_id, + } => { + // TODO: Add filtering support to repository + match repository::list_tasks_for_owner(pool, owner_id).await { + Ok(mut tasks) => { + // Apply filters + if let Some(ref status) = status_filter { + tasks.retain(|t| &t.status == status); + } + if let Some(ref parent_id) = parent_task_id { + tasks.retain(|t| t.parent_task_id.as_ref() == Some(parent_id)); + } + + let task_data: Vec = tasks + .iter() + .map(|t| { + json!({ + "taskId": t.id, + "name": t.name, + "status": t.status, + "priority": t.priority, + "parentTaskId": t.parent_task_id, + }) + }) + .collect(); + + MeshRequestResult { + success: true, + message: format!("Found {} tasks", tasks.len()), + data: Some(json!({ "tasks": task_data })), + } + } + Err(e) => MeshRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + }, + } + } + + MeshToolRequest::ListSubtasks { task_id } => { + match repository::list_subtasks_for_owner(pool, task_id, owner_id).await { + Ok(subtasks) => { + let subtask_data: Vec = subtasks + .iter() + .map(|t| { + json!({ + "taskId": t.id, + "name": t.name, + "status": t.status, + "priority": t.priority, + }) + }) + .collect(); + + MeshRequestResult { + success: true, + message: format!("Found {} subtasks", subtasks.len()), + data: Some(json!({ "subtasks": subtask_data })), + } + } + Err(e) => MeshRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + }, + } + } + + MeshToolRequest::ListSiblings { task_id } => { + // Get task to find parent + let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return MeshRequestResult { + success: false, + message: format!("Task {} not found", task_id), + data: None, + } + } + Err(e) => { + return MeshRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + } + } + }; + + let Some(parent_id) = task.parent_task_id else { + return MeshRequestResult { + success: true, + message: "Task has no parent, so no siblings".to_string(), + data: Some(json!({ "siblings": [] })), + }; + }; + + // Get all subtasks of parent, excluding current task + match repository::list_subtasks_for_owner(pool, parent_id, owner_id).await { + Ok(siblings) => { + let sibling_data: Vec = siblings + .iter() + .filter(|t| t.id != task_id) + .map(|t| { + json!({ + "taskId": t.id, + "name": t.name, + "status": t.status, + "priority": t.priority, + }) + }) + .collect(); + + MeshRequestResult { + success: true, + message: format!("Found {} sibling tasks", sibling_data.len()), + data: Some(json!({ "siblings": sibling_data })), + } + } + Err(e) => MeshRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + }, + } + } + + MeshToolRequest::ListDaemons => { + // Only list daemons belonging to this owner + let daemons: Vec = state + .daemon_connections + .iter() + .filter(|entry| entry.value().owner_id == owner_id) + .map(|entry| { + let d = entry.value(); + json!({ + "daemonId": d.id, + "connectionId": d.connection_id, + "hostname": d.hostname, + "machineId": d.machine_id, + }) + }) + .collect(); + + MeshRequestResult { + success: true, + message: format!("{} daemon(s) connected", daemons.len()), + data: Some(json!({ "daemons": daemons })), + } + } + + MeshToolRequest::ListDaemonDirectories => { + let mut directories: Vec = Vec::new(); + + // Only list directories from daemons belonging to this owner + for entry in state.daemon_connections.iter() { + let daemon = entry.value(); + + // Only include daemons belonging to this owner + if daemon.owner_id != owner_id { + continue; + } + + // Add working directory if available + if let Some(ref working_dir) = daemon.working_directory { + directories.push(json!({ + "path": working_dir, + "label": "Working Directory", + "directoryType": "working", + "hostname": daemon.hostname, + })); + } + + // Add home directory if available + if let Some(ref home_dir) = daemon.home_directory { + directories.push(json!({ + "path": home_dir, + "label": "Makima Home", + "directoryType": "home", + "hostname": daemon.hostname, + })); + } + } + + MeshRequestResult { + success: true, + message: format!("Found {} available directories", directories.len()), + data: Some(json!({ "directories": directories })), + } + } + + MeshToolRequest::ListFiles => { + match repository::list_files_for_owner(pool, owner_id).await { + Ok(files) => { + let file_data: Vec = files + .iter() + .map(|f| { + json!({ + "fileId": f.id, + "name": f.name, + "description": f.description, + "createdAt": f.created_at, + "updatedAt": f.updated_at, + }) + }) + .collect(); + + MeshRequestResult { + success: true, + message: format!("Found {} files", files.len()), + data: Some(json!({ "files": file_data })), + } + } + Err(e) => MeshRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + }, + } + } + + MeshToolRequest::ReadFile { file_id } => { + match repository::get_file_for_owner(pool, file_id, owner_id).await { + Ok(Some(file)) => { + // Convert body elements to readable text + let body_content: Vec = file + .body + .iter() + .map(|elem| { + match elem { + crate::db::models::BodyElement::Heading { level, text } => { + json!({ "type": "heading", "level": level, "text": text }) + } + crate::db::models::BodyElement::Paragraph { text } => { + json!({ "type": "paragraph", "text": text }) + } + crate::db::models::BodyElement::Code { language, content } => { + json!({ "type": "code", "language": language, "content": content }) + } + crate::db::models::BodyElement::List { ordered, items } => { + json!({ "type": "list", "ordered": ordered, "items": items }) + } + crate::db::models::BodyElement::Chart { chart_type, title, data, config: _ } => { + let data_count = data.as_array().map(|arr| arr.len()).unwrap_or(0); + json!({ "type": "chart", "chartType": chart_type, "title": title, "dataPoints": data_count }) + } + crate::db::models::BodyElement::Image { src, alt, caption } => { + json!({ "type": "image", "src": src, "alt": alt, "caption": caption }) + } + } + }) + .collect(); + + // Also build a plain text version for easier reading + let plain_text: String = file + .body + .iter() + .filter_map(|elem| { + match elem { + crate::db::models::BodyElement::Heading { level, text } => { + Some(format!("{} {}", "#".repeat(*level as usize), text)) + } + crate::db::models::BodyElement::Paragraph { text } => { + Some(text.clone()) + } + crate::db::models::BodyElement::Code { language, content } => { + let lang = language.as_deref().unwrap_or(""); + Some(format!("```{}\n{}\n```", lang, content)) + } + crate::db::models::BodyElement::List { ordered, items } => { + let list_text: Vec = items.iter().enumerate().map(|(i, item)| { + if *ordered { + format!("{}. {}", i + 1, item) + } else { + format!("- {}", item) + } + }).collect(); + Some(list_text.join("\n")) + } + _ => None, + } + }) + .collect::>() + .join("\n\n"); + + // Convert transcript entries to JSON + let transcript: Vec = file + .transcript + .iter() + .map(|entry| { + json!({ + "id": entry.id, + "speaker": entry.speaker, + "start": entry.start, + "end": entry.end, + "text": entry.text, + }) + }) + .collect(); + + // Build a plain text transcript for easier reading + let transcript_text: String = file + .transcript + .iter() + .map(|entry| { + format!("[{:.1}s] {}: {}", entry.start, entry.speaker, entry.text) + }) + .collect::>() + .join("\n"); + + MeshRequestResult { + success: true, + message: format!("Read file '{}'", file.name), + data: Some(json!({ + "fileId": file.id, + "name": file.name, + "description": file.description, + "summary": file.summary, + "body": body_content, + "plainText": plain_text, + "transcript": transcript, + "transcriptText": transcript_text, + "transcriptCount": file.transcript.len(), + "createdAt": file.created_at, + "updatedAt": file.updated_at, + })), + } + } + Ok(None) => MeshRequestResult { + success: false, + message: format!("File {} not found", file_id), + data: None, + }, + Err(e) => MeshRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + }, + } + } + + MeshToolRequest::SendMessageToTask { task_id, message } => { + let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return MeshRequestResult { + success: false, + message: format!("Task {} not found", task_id), + data: None, + } + } + Err(e) => { + return MeshRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + } + } + }; + + if task.status != "running" { + return MeshRequestResult { + success: false, + message: format!("Task is not running (status: {})", task.status), + data: None, + }; + } + + if let Some(daemon_id) = task.daemon_id { + let command = DaemonCommand::SendMessage { task_id, message }; + match state.send_daemon_command(daemon_id, command).await { + Ok(()) => MeshRequestResult { + success: true, + message: "Message sent to task".to_string(), + data: Some(json!({ "taskId": task_id })), + }, + Err(e) => MeshRequestResult { + success: false, + message: format!("Failed to send message: {}", e), + data: None, + }, + } + } else { + MeshRequestResult { + success: false, + message: "Task has no daemon assigned".to_string(), + data: None, + } + } + } + + MeshToolRequest::UpdateTaskPlan { + task_id, + new_plan, + interrupt_if_running, + } => { + let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return MeshRequestResult { + success: false, + message: format!("Task {} not found", task_id), + data: None, + } + } + Err(e) => { + return MeshRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + } + } + }; + + // Interrupt if running and requested + if task.status == "running" && interrupt_if_running { + if let Some(daemon_id) = task.daemon_id { + let command = DaemonCommand::InterruptTask { + task_id, + graceful: true, + }; + let _ = state.send_daemon_command(daemon_id, command).await; + } + } + + let update_req = crate::db::models::UpdateTaskRequest { + plan: Some(new_plan), + version: Some(task.version), + ..Default::default() + }; + + match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { + Ok(Some(updated)) => { + state.broadcast_task_update(TaskUpdateNotification { + task_id, + owner_id: Some(task.owner_id), + version: updated.version, + status: updated.status.clone(), + updated_fields: vec!["plan".to_string()], + updated_by: "system".to_string(), + }); + MeshRequestResult { + success: true, + message: "Task plan updated".to_string(), + data: Some(json!({ "taskId": task_id })), + } + } + Ok(None) => MeshRequestResult { + success: false, + message: "Task not found".to_string(), + data: None, + }, + Err(e) => MeshRequestResult { + success: false, + message: format!("Failed to update task: {}", e), + data: None, + }, + } + } + + // Overlay operations - these require daemon communication + // For now, return placeholder responses since daemon implementation is separate + MeshToolRequest::PeekSiblingOverlay { sibling_task_id } => MeshRequestResult { + success: false, + message: format!( + "Overlay operations require a connected daemon. Task {} may not have overlay data yet.", + sibling_task_id + ), + data: None, + }, + + MeshToolRequest::GetOverlayDiff { task_id } => MeshRequestResult { + success: false, + message: format!( + "Overlay operations require a connected daemon. Task {} may not have overlay data yet.", + task_id + ), + data: None, + }, + + MeshToolRequest::PreviewMerge { task_id } => MeshRequestResult { + success: false, + message: format!( + "Merge preview requires a connected daemon. Task {} may not have overlay data yet.", + task_id + ), + data: None, + }, + + MeshToolRequest::MergeSubtask { task_id } => MeshRequestResult { + success: false, + message: format!( + "Merge operations require a connected daemon. Task {}", + task_id + ), + data: None, + }, + + MeshToolRequest::CompleteTask { task_id } => { + let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return MeshRequestResult { + success: false, + message: format!("Task {} not found", task_id), + data: None, + } + } + Err(e) => { + return MeshRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + } + } + }; + + // Update status to done + let update_req = crate::db::models::UpdateTaskRequest { + status: Some("done".to_string()), + version: Some(task.version), + ..Default::default() + }; + + match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { + Ok(Some(updated)) => { + state.broadcast_task_update(TaskUpdateNotification { + task_id, + owner_id: Some(task.owner_id), + version: updated.version, + status: "done".to_string(), + updated_fields: vec!["status".to_string()], + updated_by: "system".to_string(), + }); + let merge_mode = task.merge_mode.unwrap_or_else(|| "pr".to_string()); + MeshRequestResult { + success: true, + message: format!( + "Task {} completed. Merge mode: {}", + task_id, + &merge_mode + ), + data: Some(json!({ + "taskId": task_id, + "status": "done", + "mergeMode": merge_mode, + })), + } + } + Ok(None) => MeshRequestResult { + success: false, + message: "Task not found".to_string(), + data: None, + }, + Err(e) => MeshRequestResult { + success: false, + message: format!("Failed to complete task: {}", e), + data: None, + }, + } + } + + MeshToolRequest::SetMergeMode { task_id, mode } => { + let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return MeshRequestResult { + success: false, + message: format!("Task {} not found", task_id), + data: None, + } + } + Err(e) => { + return MeshRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + } + } + }; + + let update_req = crate::db::models::UpdateTaskRequest { + merge_mode: Some(mode.clone()), + version: Some(task.version), + ..Default::default() + }; + + match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { + Ok(Some(updated)) => { + state.broadcast_task_update(TaskUpdateNotification { + task_id, + owner_id: Some(task.owner_id), + version: updated.version, + status: updated.status, + updated_fields: vec!["merge_mode".to_string()], + updated_by: "system".to_string(), + }); + MeshRequestResult { + success: true, + message: format!("Merge mode set to '{}'", mode), + data: Some(json!({ "taskId": task_id, "mergeMode": mode })), + } + } + Ok(None) => MeshRequestResult { + success: false, + message: "Task not found".to_string(), + data: None, + }, + Err(e) => MeshRequestResult { + success: false, + message: format!("Failed to update merge mode: {}", e), + data: None, + }, + } + } + } +} + +// ============================================================================= +// Chat History Endpoints +// ============================================================================= + +use crate::db::models::MeshChatHistoryResponse; + +/// Get chat history for the current conversation (requires authentication) +#[utoipa::path( + get, + path = "/api/v1/mesh/chat/history", + responses( + (status = 200, description = "Chat history", body = MeshChatHistoryResponse), + (status = 401, description = "Unauthorized"), + (status = 503, description = "Database not configured"), + (status = 500, description = "Internal server error") + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn get_chat_history( + State(state): State, + Authenticated(auth): Authenticated, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(json!({ "error": "Database not configured" })), + ) + .into_response(); + }; + + let conversation = match repository::get_or_create_active_conversation(pool, auth.owner_id).await { + Ok(c) => c, + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": e.to_string() })), + ) + .into_response() + } + }; + + let messages = match repository::list_chat_messages(pool, conversation.id, None).await { + Ok(m) => m, + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": e.to_string() })), + ) + .into_response() + } + }; + + ( + StatusCode::OK, + Json(MeshChatHistoryResponse { + conversation_id: conversation.id, + messages, + }), + ) + .into_response() +} + +/// Clear chat history (archives current conversation and starts new, requires authentication) +#[utoipa::path( + delete, + path = "/api/v1/mesh/chat/history", + responses( + (status = 200, description = "History cleared"), + (status = 401, description = "Unauthorized"), + (status = 503, description = "Database not configured"), + (status = 500, description = "Internal server error") + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn clear_chat_history( + State(state): State, + Authenticated(auth): Authenticated, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(json!({ "error": "Database not configured" })), + ) + .into_response(); + }; + + match repository::clear_conversation(pool, auth.owner_id).await { + Ok(new_conv) => ( + StatusCode::OK, + Json(json!({ "success": true, "conversationId": new_conv.id })), + ) + .into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": e.to_string() })), + ) + .into_response(), + } +} -- cgit v1.2.3