diff options
Diffstat (limited to 'makima/src/server/handlers/mesh_chat.rs')
| -rw-r--r-- | makima/src/server/handlers/mesh_chat.rs | 2264 |
1 files changed, 0 insertions, 2264 deletions
diff --git a/makima/src/server/handlers/mesh_chat.rs b/makima/src/server/handlers/mesh_chat.rs deleted file mode 100644 index 638a4d3..0000000 --- a/makima/src/server/handlers/mesh_chat.rs +++ /dev/null @@ -1,2264 +0,0 @@ -//! Chat endpoint for LLM-powered task orchestration. -//! -//! This handler provides an agentic loop for managing tasks, daemons, and -//! overlay operations through LLM tool calling. - -use axum::{ - extract::{Path, State}, - http::StatusCode, - response::IntoResponse, - Json, -}; -use serde::{Deserialize, Serialize}; -use serde_json::json; -use utoipa::ToSchema; -use uuid::Uuid; - -use crate::db::{models::CreateTaskRequest, repository}; -use crate::llm::{ - claude::{self, ClaudeClient, ClaudeError, ClaudeModel}, - groq::{GroqClient, GroqError, Message, ToolCallResponse}, - parse_mesh_tool_call, LlmModel, MeshToolRequest, ToolCall, ToolResult, UserQuestion, - MESH_TOOLS, -}; -use crate::server::auth::Authenticated; -use crate::server::state::{DaemonCommand, SharedState, TaskUpdateNotification}; - -/// Maximum number of tool-calling rounds to prevent infinite loops -const MAX_TOOL_ROUNDS: usize = 30; - -#[derive(Debug, Clone, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct MeshChatHistoryMessage { - /// Role: "user" or "assistant" - pub role: String, - /// Message content - pub content: String, -} - -#[derive(Debug, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct MeshChatRequest { - /// The user's message/instruction - pub message: String, - /// Optional model selection: "claude-sonnet" (default), "claude-opus", or "groq" - #[serde(default)] - pub model: Option<String>, - /// Optional conversation history for context continuity (deprecated - now loaded from DB) - #[serde(default)] - pub history: Option<Vec<MeshChatHistoryMessage>>, - /// Context type: "mesh", "task", or "subtask" - #[serde(default)] - pub context_type: Option<String>, - /// Task ID if context is task/subtask - #[serde(default)] - pub context_task_id: Option<Uuid>, -} - -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct MeshChatResponse { - /// The LLM's response message - pub response: String, - /// Tool calls that were executed - pub tool_calls: Vec<MeshToolCallInfo>, - /// Questions pending user answers (pauses conversation) - #[serde(skip_serializing_if = "Option::is_none")] - pub pending_questions: Option<Vec<UserQuestion>>, -} - -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct MeshToolCallInfo { - pub name: String, - pub result: ToolResult, -} - -/// Enum to hold LLM clients -enum LlmClient { - Groq(GroqClient), - Claude(ClaudeClient), -} - -/// Unified result from LLM call -struct LlmResult { - content: Option<String>, - tool_calls: Vec<ToolCall>, - raw_tool_calls: Vec<ToolCallResponse>, - finish_reason: String, -} - -/// Chat with mesh orchestrator at the top level (no specific task context) -#[utoipa::path( - post, - path = "/api/v1/mesh/chat", - request_body = MeshChatRequest, - responses( - (status = 200, description = "Chat completed successfully", body = MeshChatResponse), - (status = 401, description = "Unauthorized"), - (status = 500, description = "Internal server error") - ), - security( - ("bearer_auth" = []), - ("api_key" = []) - ), - tag = "Mesh" -)] -pub async fn mesh_toplevel_chat_handler( - State(state): State<SharedState>, - Authenticated(auth): Authenticated, - Json(request): Json<MeshChatRequest>, -) -> impl IntoResponse { - // Check if database is configured - let Some(ref pool) = state.db_pool else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(json!({ "error": "Database not configured" })), - ) - .into_response(); - }; - - // Parse model selection (default to Claude Sonnet) - let model = request - .model - .as_ref() - .and_then(|m| LlmModel::from_str(m)) - .unwrap_or(LlmModel::ClaudeSonnet); - - tracing::info!("Mesh top-level chat using LLM model: {:?}", model); - - // Initialize the appropriate LLM client - let llm_client = match model { - LlmModel::ClaudeSonnet => match ClaudeClient::from_env(ClaudeModel::Sonnet) { - Ok(client) => LlmClient::Claude(client), - Err(ClaudeError::MissingApiKey) => { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })), - ) - .into_response(); - } - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("Claude client error: {}", e) })), - ) - .into_response(); - } - }, - LlmModel::ClaudeOpus => match ClaudeClient::from_env(ClaudeModel::Opus) { - Ok(client) => LlmClient::Claude(client), - Err(ClaudeError::MissingApiKey) => { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })), - ) - .into_response(); - } - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("Claude client error: {}", e) })), - ) - .into_response(); - } - }, - LlmModel::GroqKimi => match GroqClient::from_env() { - Ok(client) => LlmClient::Groq(client), - Err(GroqError::MissingApiKey) => { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(json!({ "error": "GROQ_API_KEY not configured" })), - ) - .into_response(); - } - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("Groq client error: {}", e) })), - ) - .into_response(); - } - }, - }; - - // Build context about all tasks and daemons - let mesh_context = build_mesh_overview_context(pool, &state, auth.owner_id).await; - - // Build agentic system prompt for top-level mesh orchestration - let system_prompt = format!( - r#"You are an intelligent task orchestration agent. You help users manage and coordinate tasks running on connected daemons with Claude Code containers. - -## Your Capabilities -You have access to tools for: -- **Task Lifecycle**: create_task, run_task, pause_task, resume_task, interrupt_task, discard_task -- **Task Queries**: query_task_status, list_tasks, list_subtasks, list_siblings, list_daemons -- **File Access**: list_files, read_file (read documents from the files system) -- **Task Communication**: send_message_to_task, update_task_plan -- **Overlay/Merge Operations**: peek_sibling_overlay, get_overlay_diff, preview_merge, merge_subtask, complete_task, set_merge_mode - -## Current Mesh Overview -{mesh_context} - -## Agentic Behavior Guidelines - -### 1. Analyze Before Acting -- For complex orchestration requests, first gather information using query_task_status, list_tasks, or list_daemons -- Understand the current state before making changes -- For simple, direct requests (e.g., "create a new task"), you can act immediately - -### 2. Plan Multi-Step Operations -- Break complex orchestration into logical steps -- For parallel execution: create multiple subtasks, then run them on different daemons -- For sequential execution: create subtasks and run them in order - -### 3. Create and Manage Tasks -- Use create_task to create new top-level tasks or subtasks -- Assign appropriate priorities and plans -- **Repository Default**: When creating tasks, use the daemon's working directory as the repository_url by default (shown as "Default Repository" above). Only omit repository_url if the task doesn't involve code, or use a different URL if the user explicitly requests it. -- If a working directory is a git repository, use it as the repository_url for code-related tasks - -### 4. Coordinate Multiple Tasks -- Use list_tasks to see all tasks and their statuses -- Use list_daemons to see available compute resources -- Balance workload across daemons - -### 5. Be Efficient -- Don't over-analyze simple requests -- Use the minimum number of tool calls needed -- Provide clear summaries of actions taken - -## Important Notes -- Task IDs are UUIDs - ensure you use the correct format -- Running a task requires at least one connected daemon -- When creating subtasks, specify the parent_task_id -- Always confirm destructive operations (discard_task) with the user"#, - mesh_context = mesh_context - ); - - // Run the shared agentic loop - run_mesh_agentic_loop(pool, &state, &llm_client, system_prompt, &request, auth.owner_id).await -} - -/// Chat with task mesh orchestrator using LLM tool calling (scoped by owner) -#[utoipa::path( - post, - path = "/api/v1/mesh/tasks/{id}/chat", - request_body = MeshChatRequest, - responses( - (status = 200, description = "Chat completed successfully", body = MeshChatResponse), - (status = 401, description = "Unauthorized"), - (status = 404, description = "Task not found"), - (status = 500, description = "Internal server error") - ), - params( - ("id" = Uuid, Path, description = "Task ID (context for orchestration)") - ), - security( - ("bearer_auth" = []), - ("api_key" = []) - ), - tag = "Mesh" -)] -pub async fn mesh_chat_handler( - State(state): State<SharedState>, - Authenticated(auth): Authenticated, - Path(task_id): Path<Uuid>, - Json(request): Json<MeshChatRequest>, -) -> impl IntoResponse { - // Check if database is configured - let Some(ref pool) = state.db_pool else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(json!({ "error": "Database not configured" })), - ) - .into_response(); - }; - - // Get the context task (scoped by owner) - let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { - Ok(Some(task)) => task, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(json!({ "error": "Task not found" })), - ) - .into_response(); - } - Err(e) => { - tracing::error!("Database error: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("Database error: {}", e) })), - ) - .into_response(); - } - }; - - // Parse model selection (default to Claude Sonnet) - let model = request - .model - .as_ref() - .and_then(|m| LlmModel::from_str(m)) - .unwrap_or(LlmModel::ClaudeSonnet); - - tracing::info!("Mesh chat using LLM model: {:?}", model); - - // Initialize the appropriate LLM client - let llm_client = match model { - LlmModel::ClaudeSonnet => match ClaudeClient::from_env(ClaudeModel::Sonnet) { - Ok(client) => LlmClient::Claude(client), - Err(ClaudeError::MissingApiKey) => { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })), - ) - .into_response(); - } - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("Claude client error: {}", e) })), - ) - .into_response(); - } - }, - LlmModel::ClaudeOpus => match ClaudeClient::from_env(ClaudeModel::Opus) { - Ok(client) => LlmClient::Claude(client), - Err(ClaudeError::MissingApiKey) => { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })), - ) - .into_response(); - } - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("Claude client error: {}", e) })), - ) - .into_response(); - } - }, - LlmModel::GroqKimi => match GroqClient::from_env() { - Ok(client) => LlmClient::Groq(client), - Err(GroqError::MissingApiKey) => { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(json!({ "error": "GROQ_API_KEY not configured" })), - ) - .into_response(); - } - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("Groq client error: {}", e) })), - ) - .into_response(); - } - }, - }; - - // Build context about the current task and mesh state - let task_context = build_task_context(&task); - - // Build agentic system prompt for task orchestration - let system_prompt = format!( - r#"You are an intelligent task orchestration agent. You help users manage and coordinate tasks running on connected daemons with Claude Code containers. - -## Your Capabilities -You have access to tools for: -- **Task Lifecycle**: create_task, run_task, pause_task, resume_task, interrupt_task, discard_task -- **Task Queries**: query_task_status, list_tasks, list_subtasks, list_siblings, list_daemons -- **File Access**: list_files, read_file (read documents from the files system) -- **Task Communication**: send_message_to_task, update_task_plan -- **Overlay/Merge Operations**: peek_sibling_overlay, get_overlay_diff, preview_merge, merge_subtask, complete_task, set_merge_mode - -## Current Context -{task_context} - -## Agentic Behavior Guidelines - -### 1. Analyze Before Acting -- For complex orchestration requests, first gather information using query_task_status, list_tasks, or list_daemons -- Understand the current state before making changes -- For simple, direct requests (e.g., "pause this task"), you can act immediately - -### 2. Plan Multi-Step Operations -- Break complex orchestration into logical steps -- For parallel execution: create multiple subtasks, then run them on different daemons -- For sequential execution: create subtasks and run them in order - -### 3. Monitor Task Progress -- Use query_task_status to check on running tasks -- Watch for status changes and react accordingly -- Handle failures gracefully (retry, escalate, or report) - -### 4. Coordinate Sibling Tasks -- Use peek_sibling_overlay to see what other tasks have changed -- Preview merges before completing to catch conflicts -- Coordinate timing when multiple tasks need to merge - -### 5. Be Efficient -- Don't over-analyze simple requests -- Use the minimum number of tool calls needed -- Provide clear summaries of actions taken - -## Important Notes -- Task IDs are UUIDs - ensure you use the correct format -- Running a task requires at least one connected daemon -- Overlay operations require the task to have been run at least once -- Always confirm destructive operations (discard_task) with the user -- When creating subtasks for this task, use parent_task_id: {task_id}"#, - task_context = task_context, - task_id = task_id - ); - - // Run the shared agentic loop - run_mesh_agentic_loop(pool, &state, &llm_client, system_prompt, &request, auth.owner_id).await -} - -fn build_task_context(task: &crate::db::models::Task) -> String { - let mut context = format!( - "Current Task: {} (ID: {})\n", - task.name, task.id - ); - context.push_str(&format!("Status: {}\n", task.status)); - context.push_str(&format!("Priority: {}\n", task.priority)); - - if let Some(ref desc) = task.description { - context.push_str(&format!("Description: {}\n", desc)); - } - - // Truncate plan preview if too long - let plan_preview = if task.plan.len() > 200 { - format!("{}...", &task.plan[..200]) - } else { - task.plan.clone() - }; - context.push_str(&format!("Plan: {}\n", plan_preview)); - - if let Some(ref summary) = task.progress_summary { - context.push_str(&format!("Progress: {}\n", summary)); - } - - if let Some(ref error) = task.error_message { - context.push_str(&format!("Error: {}\n", error)); - } - - // Repository info - if let Some(ref url) = task.repository_url { - context.push_str(&format!("Repository: {}\n", url)); - } - if let Some(ref branch) = task.base_branch { - context.push_str(&format!("Base branch: {}\n", branch)); - } - - context -} - -/// Build overview context for top-level mesh orchestration -async fn build_mesh_overview_context(pool: &sqlx::PgPool, state: &SharedState, owner_id: Uuid) -> String { - let mut context = String::new(); - - // Get task counts by status - match repository::list_tasks_for_owner(pool, owner_id).await { - Ok(tasks) => { - let total = tasks.len(); - let pending = tasks.iter().filter(|t| t.status == "pending").count(); - let running = tasks.iter().filter(|t| t.status == "running").count(); - let paused = tasks.iter().filter(|t| t.status == "paused").count(); - let done = tasks.iter().filter(|t| t.status == "done").count(); - let failed = tasks.iter().filter(|t| t.status == "failed").count(); - - context.push_str(&format!( - "Tasks: {} total ({} pending, {} running, {} paused, {} done, {} failed)\n", - total, pending, running, paused, done, failed - )); - - // List recent/active tasks - if !tasks.is_empty() { - context.push_str("\nRecent Tasks:\n"); - for task in tasks.iter().take(5) { - context.push_str(&format!( - " - {} (ID: {}, Status: {})\n", - task.name, task.id, task.status - )); - } - if tasks.len() > 5 { - context.push_str(&format!(" ... and {} more\n", tasks.len() - 5)); - } - } - } - Err(e) => { - context.push_str(&format!("Error fetching tasks: {}\n", e)); - } - } - - // Get connected daemons for this owner - let owner_daemons: Vec<_> = state.daemon_connections.iter() - .filter(|e| e.value().owner_id == owner_id) - .collect(); - let daemon_count = owner_daemons.len(); - context.push_str(&format!("\nConnected Daemons: {}\n", daemon_count)); - - for entry in owner_daemons.iter().take(3) { - let daemon = entry.value(); - let working_dir = daemon.working_directory.as_deref().unwrap_or("not set"); - context.push_str(&format!( - " - {} (ID: {}, Working Directory: {})\n", - daemon.hostname.as_deref().unwrap_or("unknown"), - daemon.id, - working_dir - )); - } - - // Add default repository guidance if there's exactly one daemon with a working directory - let daemons_with_working_dir: Vec<_> = owner_daemons.iter() - .filter(|e| e.value().working_directory.is_some()) - .collect(); - - if daemons_with_working_dir.len() == 1 { - if let Some(dir) = &daemons_with_working_dir[0].value().working_directory { - context.push_str(&format!( - "\nDefault Repository: {} (use this as repository_url when creating tasks unless user specifies otherwise)\n", - dir - )); - } - } - - context -} - -/// Run the shared agentic loop for mesh chat -async fn run_mesh_agentic_loop( - pool: &sqlx::PgPool, - state: &SharedState, - llm_client: &LlmClient, - system_prompt: String, - request: &MeshChatRequest, - owner_id: Uuid, -) -> axum::response::Response { - // Get or create conversation for storing messages - let conversation = match repository::get_or_create_active_conversation(pool, owner_id).await { - Ok(c) => c, - Err(e) => { - tracing::error!("Failed to get/create conversation: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("Failed to initialize conversation: {}", e) })), - ) - .into_response(); - } - }; - - // Build initial messages - let mut messages = vec![Message { - role: "system".to_string(), - content: Some(system_prompt), - tool_calls: None, - tool_call_id: None, - }]; - - // Load conversation history from database (or use provided for backwards compatibility) - if let Some(history) = &request.history { - // Legacy: use provided history - for hist_msg in history { - messages.push(Message { - role: hist_msg.role.clone(), - content: Some(hist_msg.content.clone()), - tool_calls: None, - tool_call_id: None, - }); - } - tracing::info!( - history_messages = history.len(), - "Loaded mesh conversation history from request (legacy)" - ); - } else { - // New: load from database - match repository::list_chat_messages(pool, conversation.id, Some(50)).await { - Ok(db_messages) => { - for msg in db_messages { - messages.push(Message { - role: msg.role.clone(), - content: Some(msg.content.clone()), - tool_calls: None, - tool_call_id: None, - }); - } - tracing::info!( - history_messages = messages.len() - 1, // minus system message - "Loaded mesh conversation history from database" - ); - } - Err(e) => { - tracing::warn!("Failed to load chat history: {}", e); - // Continue without history - } - } - } - - // Add current user message - messages.push(Message { - role: "user".to_string(), - content: Some(request.message.clone()), - tool_calls: None, - tool_call_id: None, - }); - - // State for tracking - let mut all_tool_call_infos: Vec<MeshToolCallInfo> = Vec::new(); - let mut final_response: Option<String> = None; - let mut consecutive_failures = 0; - const MAX_CONSECUTIVE_FAILURES: usize = 3; - let mut pending_questions: Option<Vec<UserQuestion>> = None; - - // Multi-turn agentic tool calling loop - for round in 0..MAX_TOOL_ROUNDS { - tracing::info!( - round = round, - total_tool_calls = all_tool_call_infos.len(), - "Mesh agentic loop iteration" - ); - - // Check consecutive failures - if consecutive_failures >= MAX_CONSECUTIVE_FAILURES { - tracing::warn!( - "Breaking mesh loop due to {} consecutive failures", - consecutive_failures - ); - final_response = Some( - "I encountered multiple consecutive errors and stopped. \ - Please check the task state and try again." - .to_string(), - ); - break; - } - - // Call the appropriate LLM API - let result = match llm_client { - LlmClient::Groq(groq) => { - match groq.chat_with_tools(messages.clone(), &MESH_TOOLS).await { - Ok(r) => LlmResult { - content: r.content, - tool_calls: r.tool_calls, - raw_tool_calls: r.raw_tool_calls, - finish_reason: r.finish_reason, - }, - Err(e) => { - tracing::error!("Groq API error: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("LLM API error: {}", e) })), - ) - .into_response(); - } - } - } - LlmClient::Claude(claude_client) => { - let claude_messages = claude::groq_messages_to_claude(&messages); - match claude_client - .chat_with_tools(claude_messages, &MESH_TOOLS) - .await - { - Ok(r) => { - let raw_tool_calls: Vec<ToolCallResponse> = r - .tool_calls - .iter() - .map(|tc| ToolCallResponse { - id: tc.id.clone(), - call_type: "function".to_string(), - function: crate::llm::groq::FunctionCall { - name: tc.name.clone(), - arguments: tc.arguments.to_string(), - }, - }) - .collect(); - - LlmResult { - content: r.content, - tool_calls: r.tool_calls, - raw_tool_calls, - finish_reason: r.stop_reason, - } - } - Err(e) => { - tracing::error!("Claude API error: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("LLM API error: {}", e) })), - ) - .into_response(); - } - } - } - }; - - // Check if there are tool calls to execute - if result.tool_calls.is_empty() { - final_response = result.content; - break; - } - - // Add assistant message with tool calls to conversation - messages.push(Message { - role: "assistant".to_string(), - content: result.content.clone(), - tool_calls: Some(result.raw_tool_calls.clone()), - tool_call_id: None, - }); - - // Execute each tool call - for (i, tool_call) in result.tool_calls.iter().enumerate() { - tracing::info!(tool = %tool_call.name, round = round, "Executing mesh tool call"); - - // Parse the tool call - let mut execution_result = parse_mesh_tool_call(tool_call); - - // Handle async mesh tool requests - if let Some(mesh_request) = execution_result.request.take() { - let async_result = handle_mesh_request(pool, state, mesh_request, owner_id).await; - execution_result.success = async_result.success; - execution_result.message = async_result.message; - execution_result.data = async_result.data; - } - - // Track consecutive failures - if execution_result.success { - consecutive_failures = 0; - } else { - consecutive_failures += 1; - tracing::warn!( - tool = %tool_call.name, - consecutive_failures = consecutive_failures, - "Mesh tool call failed" - ); - } - - // Check for pending user questions - if let Some(questions) = execution_result.pending_questions { - tracing::info!( - question_count = questions.len(), - "Mesh LLM requesting user input" - ); - pending_questions = Some(questions); - all_tool_call_infos.push(MeshToolCallInfo { - name: tool_call.name.clone(), - result: ToolResult { - success: execution_result.success, - message: execution_result.message.clone(), - }, - }); - break; - } - - // Build tool result message - let result_content = if let Some(data) = &execution_result.data { - json!({ - "success": execution_result.success, - "message": execution_result.message, - "data": data - }) - .to_string() - } else { - json!({ - "success": execution_result.success, - "message": execution_result.message - }) - .to_string() - }; - - // Add tool result message - let tool_call_id = match llm_client { - LlmClient::Groq(_) => result.raw_tool_calls[i].id.clone(), - LlmClient::Claude(_) => tool_call.id.clone(), - }; - - messages.push(Message { - role: "tool".to_string(), - content: Some(result_content), - tool_calls: None, - tool_call_id: Some(tool_call_id), - }); - - // Track for response - all_tool_call_infos.push(MeshToolCallInfo { - name: tool_call.name.clone(), - result: ToolResult { - success: execution_result.success, - message: execution_result.message, - }, - }); - } - - // If user questions are pending, pause - if pending_questions.is_some() { - final_response = result.content; - break; - } - - // If finish reason indicates completion, exit loop - let finish_lower = result.finish_reason.to_lowercase(); - if finish_lower == "stop" || finish_lower == "end_turn" { - final_response = result.content; - break; - } - } - - // Build response - let response_text = final_response.unwrap_or_else(|| { - if all_tool_call_infos.is_empty() { - "I couldn't understand your request. Please try rephrasing.".to_string() - } else { - format!( - "Done! Executed {} tool{}.", - all_tool_call_infos.len(), - if all_tool_call_infos.len() == 1 { - "" - } else { - "s" - } - ) - } - }); - - // Save messages to database (only if not using legacy history mode) - if request.history.is_none() { - let context_type = request.context_type.clone().unwrap_or_else(|| "mesh".to_string()); - - // Validate context_task_id exists before using it (to avoid FK constraint violation) - let context_task_id = if let Some(task_id) = request.context_task_id { - match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(_)) => Some(task_id), - Ok(None) => { - tracing::warn!("context_task_id {} not found, ignoring", task_id); - None - } - Err(e) => { - tracing::warn!("Failed to validate context_task_id {}: {}", task_id, e); - None - } - } - } else { - None - }; - - // Save user message - if let Err(e) = repository::add_chat_message( - pool, - conversation.id, - "user", - &request.message, - &context_type, - context_task_id, - None, - None, - ) - .await - { - tracing::warn!("Failed to save user message to DB: {}", e); - } - - // Serialize tool calls for storage - let tool_calls_json = if all_tool_call_infos.is_empty() { - None - } else { - Some(serde_json::to_value(&all_tool_call_infos).unwrap_or_default()) - }; - - // Serialize pending questions for storage - let pending_questions_json = pending_questions - .as_ref() - .map(|q| serde_json::to_value(q).unwrap_or_default()); - - // Save assistant message - if let Err(e) = repository::add_chat_message( - pool, - conversation.id, - "assistant", - &response_text, - &context_type, - context_task_id, - tool_calls_json, - pending_questions_json, - ) - .await - { - tracing::warn!("Failed to save assistant message to DB: {}", e); - } - - tracing::info!( - conversation_id = %conversation.id, - context_type = %context_type, - "Saved mesh chat messages to database" - ); - } - - ( - StatusCode::OK, - Json(MeshChatResponse { - response: response_text, - tool_calls: all_tool_call_infos, - pending_questions, - }), - ) - .into_response() -} - -/// Result from handling an async mesh tool request -struct MeshRequestResult { - success: bool, - message: String, - data: Option<serde_json::Value>, -} - -/// Handle async mesh tool requests that require database/daemon access -async fn handle_mesh_request( - pool: &sqlx::PgPool, - state: &SharedState, - request: MeshToolRequest, - owner_id: Uuid, -) -> MeshRequestResult { - match request { - MeshToolRequest::CreateTask { - name, - plan, - parent_task_id, - repository_url, - base_branch, - merge_mode, - priority, - } => { - // Subtasks inherit contract_id from parent task - let contract_id = if let Some(parent_id) = parent_task_id { - match repository::get_task(pool, parent_id).await { - Ok(Some(parent_task)) => { - match parent_task.contract_id { - Some(cid) => cid, - None => { - return MeshRequestResult { - success: false, - message: "Parent task has no contract_id".to_string(), - data: None, - }; - } - } - } - Ok(None) => { - return MeshRequestResult { - success: false, - message: format!("Parent task {} not found", parent_id), - data: None, - }; - } - Err(e) => { - return MeshRequestResult { - success: false, - message: format!("Failed to look up parent task: {}", e), - data: None, - }; - } - } - } else { - // Root tasks created via LLM chat require a contract_id - // TODO: Add contract_id to create_task tool definition - return MeshRequestResult { - success: false, - message: "Cannot create root task without contract_id. Use parent_task_id to create subtasks.".to_string(), - data: None, - }; - }; - - // Check if repository_url matches a daemon's working directory (for this owner) - let is_daemon_working_dir = repository_url.as_ref().map(|url| { - state.daemon_connections.iter().any(|entry| { - entry.value().owner_id == owner_id && - entry.value().working_directory.as_ref() == Some(url) - }) - }).unwrap_or(false); - - // Derive completion_action from merge_mode, or default to "branch" if using daemon working dir - let (completion_action, target_repo_path) = if let Some(ref mode) = merge_mode { - // Explicit merge_mode provided - derive from it - let action = match mode.as_str() { - "pr" => "pr".to_string(), - "auto" => "merge".to_string(), - "manual" => "branch".to_string(), - _ => "none".to_string(), - }; - // If using daemon working dir and action involves the repo, set target_repo_path - let target = if is_daemon_working_dir && action != "none" { - repository_url.clone() - } else { - None - }; - (Some(action), target) - } else if is_daemon_working_dir { - // No merge_mode but using daemon working dir - default to "pr" - (Some("pr".to_string()), repository_url.clone()) - } else { - (None, None) - }; - - let create_req = CreateTaskRequest { - contract_id: Some(contract_id), - name: name.clone(), - description: None, - plan, - parent_task_id, - repository_url, - base_branch, - target_branch: None, - merge_mode, - priority: priority.unwrap_or(0), - target_repo_path, - completion_action, - continue_from_task_id: None, - copy_files: None, - is_supervisor: false, - checkpoint_sha: None, - branched_from_task_id: None, - conversation_history: None, - supervisor_worktree_task_id: None, // Not spawned by supervisor - directive_id: None, - directive_step_id: None, - }; - - match repository::create_task_for_owner(pool, owner_id, create_req).await { - Ok(task) => MeshRequestResult { - success: true, - message: format!("Created task '{}' with ID {}", name, task.id), - data: Some(json!({ - "taskId": task.id, - "name": task.name, - "status": task.status, - })), - }, - Err(e) => MeshRequestResult { - success: false, - message: format!("Failed to create task: {}", e), - data: None, - }, - } - } - - MeshToolRequest::RunTask { task_id, daemon_id } => { - // Get task to check status - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return MeshRequestResult { - success: false, - message: format!("Task {} not found", task_id), - data: None, - } - } - Err(e) => { - return MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - } - } - }; - - if task.status != "pending" && task.status != "paused" { - return MeshRequestResult { - success: false, - message: format!( - "Task cannot be run - status is '{}' (must be 'pending' or 'paused')", - task.status - ), - data: None, - }; - } - - // Find a daemon to run on (must belong to this owner) - let target_daemon_id = if let Some(id) = daemon_id { - // Verify the specified daemon belongs to this owner - if !state.daemon_connections.iter().any(|d| d.value().id == id && d.value().owner_id == owner_id) { - return MeshRequestResult { - success: false, - message: "Specified daemon not found or not accessible.".to_string(), - data: None, - }; - } - id - } else { - // Find any connected daemon for this owner - let daemon = state.daemon_connections.iter().find(|d| d.value().owner_id == owner_id); - match daemon { - Some(d) => d.value().id, - None => { - return MeshRequestResult { - success: false, - message: "No daemons connected for your account. Cannot run task.".to_string(), - data: None, - } - } - } - }; - - // Check if this is an orchestrator (depth 0 with subtasks) - let subtask_count = match repository::list_subtasks_for_owner(pool, task_id, owner_id).await { - Ok(subtasks) => subtasks.len(), - Err(_) => 0, - }; - let is_orchestrator = task.depth == 0 && subtask_count > 0; - - // IMPORTANT: Update database FIRST to assign daemon_id before sending command - // This prevents race conditions where the task starts but daemon_id is not set - let update_req = crate::db::models::UpdateTaskRequest { - status: Some("starting".to_string()), - daemon_id: Some(target_daemon_id), - version: Some(task.version), - ..Default::default() - }; - - let updated_task = match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { - Ok(Some(t)) => t, - Ok(None) => { - return MeshRequestResult { - success: false, - message: format!("Task {} not found", task_id), - data: None, - } - } - Err(e) => { - return MeshRequestResult { - success: false, - message: format!("Failed to update task: {}", e), - data: None, - } - } - }; - - // Get local_only and auto_merge_local from contract if task has one - let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id { - match repository::get_contract_for_owner(pool, contract_id, owner_id).await { - Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local), - _ => (false, false), - } - } else { - (false, false) - }; - - // Send SpawnTask command to daemon - let command = DaemonCommand::SpawnTask { - task_id, - task_name: task.name.clone(), - plan: task.plan.clone(), - repo_url: task.repository_url.clone(), - base_branch: task.base_branch.clone(), - target_branch: task.target_branch.clone(), - parent_task_id: task.parent_task_id, - depth: task.depth, - is_orchestrator, - target_repo_path: task.target_repo_path.clone(), - completion_action: task.completion_action.clone(), - continue_from_task_id: task.continue_from_task_id, - copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), - contract_id: task.contract_id, - is_supervisor: task.is_supervisor, - autonomous_loop: false, - resume_session: false, - conversation_history: None, - patch_data: None, - patch_base_sha: None, - local_only, - auto_merge_local, - supervisor_worktree_task_id: None, // Not spawned by supervisor - directive_id: task.directive_id, - }; - - match state.send_daemon_command(target_daemon_id, command).await { - Ok(()) => { - state.broadcast_task_update(TaskUpdateNotification { - task_id, - owner_id: Some(task.owner_id), - version: updated_task.version, - status: "starting".to_string(), - updated_fields: vec!["status".to_string(), "daemon_id".to_string()], - updated_by: "system".to_string(), - }); - - MeshRequestResult { - success: true, - message: format!("Task {} is now running on daemon {}", task_id, target_daemon_id), - data: Some(json!({ - "taskId": task_id, - "daemonId": target_daemon_id, - "status": "starting", - })), - } - } - Err(e) => { - // Rollback: clear daemon_id and reset status since command failed - let rollback_req = crate::db::models::UpdateTaskRequest { - status: Some("pending".to_string()), - clear_daemon_id: true, - ..Default::default() - }; - let _ = repository::update_task_for_owner(pool, task_id, owner_id, rollback_req).await; - - MeshRequestResult { - success: false, - message: format!("Failed to start task: {}", e), - data: None, - } - } - } - } - - MeshToolRequest::PauseTask { task_id } => { - // Get task and its daemon - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return MeshRequestResult { - success: false, - message: format!("Task {} not found", task_id), - data: None, - } - } - Err(e) => { - return MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - } - } - }; - - if task.status != "running" { - return MeshRequestResult { - success: false, - message: format!("Task is not running (status: {})", task.status), - data: None, - }; - } - - if let Some(daemon_id) = task.daemon_id { - let command = DaemonCommand::PauseTask { task_id }; - if let Err(e) = state.send_daemon_command(daemon_id, command).await { - return MeshRequestResult { - success: false, - message: format!("Failed to pause task: {}", e), - data: None, - }; - } - } - - // Update status - let update_req = crate::db::models::UpdateTaskRequest { - status: Some("paused".to_string()), - version: Some(task.version), - ..Default::default() - }; - - if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { - state.broadcast_task_update(TaskUpdateNotification { - task_id, - owner_id: Some(task.owner_id), - version: updated.version, - status: "paused".to_string(), - updated_fields: vec!["status".to_string()], - updated_by: "system".to_string(), - }); - } - - MeshRequestResult { - success: true, - message: format!("Task {} paused", task_id), - data: Some(json!({ "taskId": task_id, "status": "paused" })), - } - } - - MeshToolRequest::ResumeTask { task_id } => { - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return MeshRequestResult { - success: false, - message: format!("Task {} not found", task_id), - data: None, - } - } - Err(e) => { - return MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - } - } - }; - - if task.status != "paused" { - return MeshRequestResult { - success: false, - message: format!("Task is not paused (status: {})", task.status), - data: None, - }; - } - - if let Some(daemon_id) = task.daemon_id { - let command = DaemonCommand::ResumeTask { task_id }; - if let Err(e) = state.send_daemon_command(daemon_id, command).await { - return MeshRequestResult { - success: false, - message: format!("Failed to resume task: {}", e), - data: None, - }; - } - } - - // Update status - let update_req = crate::db::models::UpdateTaskRequest { - status: Some("running".to_string()), - version: Some(task.version), - ..Default::default() - }; - - if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { - state.broadcast_task_update(TaskUpdateNotification { - task_id, - owner_id: Some(task.owner_id), - version: updated.version, - status: "running".to_string(), - updated_fields: vec!["status".to_string()], - updated_by: "system".to_string(), - }); - } - - MeshRequestResult { - success: true, - message: format!("Task {} resumed", task_id), - data: Some(json!({ "taskId": task_id, "status": "running" })), - } - } - - MeshToolRequest::InterruptTask { task_id, graceful } => { - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return MeshRequestResult { - success: false, - message: format!("Task {} not found", task_id), - data: None, - } - } - Err(e) => { - return MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - } - } - }; - - if let Some(daemon_id) = task.daemon_id { - let command = DaemonCommand::InterruptTask { task_id, graceful }; - if let Err(e) = state.send_daemon_command(daemon_id, command).await { - return MeshRequestResult { - success: false, - message: format!("Failed to interrupt task: {}", e), - data: None, - }; - } - } - - // Update status - let update_req = crate::db::models::UpdateTaskRequest { - status: Some("paused".to_string()), - version: Some(task.version), - ..Default::default() - }; - - if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { - state.broadcast_task_update(TaskUpdateNotification { - task_id, - owner_id: Some(task.owner_id), - version: updated.version, - status: "paused".to_string(), - updated_fields: vec!["status".to_string()], - updated_by: "system".to_string(), - }); - } - - MeshRequestResult { - success: true, - message: format!( - "Task {} {}interrupted", - task_id, - if graceful { "gracefully " } else { "" } - ), - data: Some(json!({ "taskId": task_id, "status": "paused" })), - } - } - - MeshToolRequest::DiscardTask { task_id } => { - match repository::delete_task_for_owner(pool, task_id, owner_id).await { - Ok(true) => MeshRequestResult { - success: true, - message: format!("Task {} discarded", task_id), - data: Some(json!({ "taskId": task_id, "deleted": true })), - }, - Ok(false) => MeshRequestResult { - success: false, - message: format!("Task {} not found", task_id), - data: None, - }, - Err(e) => MeshRequestResult { - success: false, - message: format!("Failed to delete task: {}", e), - data: None, - }, - } - } - - MeshToolRequest::QueryTaskStatus { task_id } => { - match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(task)) => MeshRequestResult { - success: true, - message: format!("Task '{}' is {}", task.name, task.status), - data: Some(json!({ - "taskId": task.id, - "name": task.name, - "status": task.status, - "priority": task.priority, - "description": task.description, - "plan": task.plan, - "progressSummary": task.progress_summary, - "errorMessage": task.error_message, - "repositoryUrl": task.repository_url, - "baseBranch": task.base_branch, - "targetBranch": task.target_branch, - "mergeMode": task.merge_mode, - "prUrl": task.pr_url, - "daemonId": task.daemon_id, - "containerId": task.container_id, - "createdAt": task.created_at, - "startedAt": task.started_at, - "completedAt": task.completed_at, - })), - }, - Ok(None) => MeshRequestResult { - success: false, - message: format!("Task {} not found", task_id), - data: None, - }, - Err(e) => MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - }, - } - } - - MeshToolRequest::ListTasks { - status_filter, - parent_task_id, - } => { - // TODO: Add filtering support to repository - match repository::list_tasks_for_owner(pool, owner_id).await { - Ok(mut tasks) => { - // Apply filters - if let Some(ref status) = status_filter { - tasks.retain(|t| &t.status == status); - } - if let Some(ref parent_id) = parent_task_id { - tasks.retain(|t| t.parent_task_id.as_ref() == Some(parent_id)); - } - - let task_data: Vec<serde_json::Value> = tasks - .iter() - .map(|t| { - json!({ - "taskId": t.id, - "name": t.name, - "status": t.status, - "priority": t.priority, - "parentTaskId": t.parent_task_id, - }) - }) - .collect(); - - MeshRequestResult { - success: true, - message: format!("Found {} tasks", tasks.len()), - data: Some(json!({ "tasks": task_data })), - } - } - Err(e) => MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - }, - } - } - - MeshToolRequest::ListSubtasks { task_id } => { - match repository::list_subtasks_for_owner(pool, task_id, owner_id).await { - Ok(subtasks) => { - let subtask_data: Vec<serde_json::Value> = subtasks - .iter() - .map(|t| { - json!({ - "taskId": t.id, - "name": t.name, - "status": t.status, - "priority": t.priority, - }) - }) - .collect(); - - MeshRequestResult { - success: true, - message: format!("Found {} subtasks", subtasks.len()), - data: Some(json!({ "subtasks": subtask_data })), - } - } - Err(e) => MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - }, - } - } - - MeshToolRequest::ListSiblings { task_id } => { - // Get task to find parent - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return MeshRequestResult { - success: false, - message: format!("Task {} not found", task_id), - data: None, - } - } - Err(e) => { - return MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - } - } - }; - - let Some(parent_id) = task.parent_task_id else { - return MeshRequestResult { - success: true, - message: "Task has no parent, so no siblings".to_string(), - data: Some(json!({ "siblings": [] })), - }; - }; - - // Get all subtasks of parent, excluding current task - match repository::list_subtasks_for_owner(pool, parent_id, owner_id).await { - Ok(siblings) => { - let sibling_data: Vec<serde_json::Value> = siblings - .iter() - .filter(|t| t.id != task_id) - .map(|t| { - json!({ - "taskId": t.id, - "name": t.name, - "status": t.status, - "priority": t.priority, - }) - }) - .collect(); - - MeshRequestResult { - success: true, - message: format!("Found {} sibling tasks", sibling_data.len()), - data: Some(json!({ "siblings": sibling_data })), - } - } - Err(e) => MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - }, - } - } - - MeshToolRequest::ListDaemons => { - // Only list daemons belonging to this owner - let daemons: Vec<serde_json::Value> = state - .daemon_connections - .iter() - .filter(|entry| entry.value().owner_id == owner_id) - .map(|entry| { - let d = entry.value(); - json!({ - "daemonId": d.id, - "connectionId": d.connection_id, - "hostname": d.hostname, - "machineId": d.machine_id, - }) - }) - .collect(); - - MeshRequestResult { - success: true, - message: format!("{} daemon(s) connected", daemons.len()), - data: Some(json!({ "daemons": daemons })), - } - } - - MeshToolRequest::ListDaemonDirectories => { - let mut directories: Vec<serde_json::Value> = Vec::new(); - - // Only list directories from daemons belonging to this owner - for entry in state.daemon_connections.iter() { - let daemon = entry.value(); - - // Only include daemons belonging to this owner - if daemon.owner_id != owner_id { - continue; - } - - // Add working directory if available - if let Some(ref working_dir) = daemon.working_directory { - directories.push(json!({ - "path": working_dir, - "label": "Working Directory", - "directoryType": "working", - "hostname": daemon.hostname, - })); - } - - // Add home directory if available - if let Some(ref home_dir) = daemon.home_directory { - directories.push(json!({ - "path": home_dir, - "label": "Makima Home", - "directoryType": "home", - "hostname": daemon.hostname, - })); - } - } - - MeshRequestResult { - success: true, - message: format!("Found {} available directories", directories.len()), - data: Some(json!({ "directories": directories })), - } - } - - MeshToolRequest::ListFiles => { - match repository::list_files_for_owner(pool, owner_id).await { - Ok(files) => { - let file_data: Vec<serde_json::Value> = files - .iter() - .map(|f| { - json!({ - "fileId": f.id, - "name": f.name, - "description": f.description, - "createdAt": f.created_at, - "updatedAt": f.updated_at, - }) - }) - .collect(); - - MeshRequestResult { - success: true, - message: format!("Found {} files", files.len()), - data: Some(json!({ "files": file_data })), - } - } - Err(e) => MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - }, - } - } - - MeshToolRequest::ReadFile { file_id } => { - match repository::get_file_for_owner(pool, file_id, owner_id).await { - Ok(Some(file)) => { - // Convert body elements to readable text - let body_content: Vec<serde_json::Value> = file - .body - .iter() - .map(|elem| { - match elem { - crate::db::models::BodyElement::Heading { level, text } => { - json!({ "type": "heading", "level": level, "text": text }) - } - crate::db::models::BodyElement::Paragraph { text } => { - json!({ "type": "paragraph", "text": text }) - } - crate::db::models::BodyElement::Code { language, content } => { - json!({ "type": "code", "language": language, "content": content }) - } - crate::db::models::BodyElement::List { ordered, items } => { - json!({ "type": "list", "ordered": ordered, "items": items }) - } - crate::db::models::BodyElement::Chart { chart_type, title, data, config: _ } => { - let data_count = data.as_array().map(|arr| arr.len()).unwrap_or(0); - json!({ "type": "chart", "chartType": chart_type, "title": title, "dataPoints": data_count }) - } - crate::db::models::BodyElement::Image { src, alt, caption } => { - json!({ "type": "image", "src": src, "alt": alt, "caption": caption }) - } - crate::db::models::BodyElement::Markdown { content } => { - json!({ "type": "markdown", "content": content }) - } - } - }) - .collect(); - - // Also build a plain text version for easier reading - let plain_text: String = file - .body - .iter() - .filter_map(|elem| { - match elem { - crate::db::models::BodyElement::Heading { level, text } => { - Some(format!("{} {}", "#".repeat(*level as usize), text)) - } - crate::db::models::BodyElement::Paragraph { text } => { - Some(text.clone()) - } - crate::db::models::BodyElement::Code { language, content } => { - let lang = language.as_deref().unwrap_or(""); - Some(format!("```{}\n{}\n```", lang, content)) - } - crate::db::models::BodyElement::List { ordered, items } => { - let list_text: Vec<String> = items.iter().enumerate().map(|(i, item)| { - if *ordered { - format!("{}. {}", i + 1, item) - } else { - format!("- {}", item) - } - }).collect(); - Some(list_text.join("\n")) - } - crate::db::models::BodyElement::Markdown { content } => { - Some(content.clone()) - } - _ => None, - } - }) - .collect::<Vec<_>>() - .join("\n\n"); - - // Convert transcript entries to JSON - let transcript: Vec<serde_json::Value> = file - .transcript - .iter() - .map(|entry| { - json!({ - "id": entry.id, - "speaker": entry.speaker, - "start": entry.start, - "end": entry.end, - "text": entry.text, - }) - }) - .collect(); - - // Build a plain text transcript for easier reading - let transcript_text: String = file - .transcript - .iter() - .map(|entry| { - format!("[{:.1}s] {}: {}", entry.start, entry.speaker, entry.text) - }) - .collect::<Vec<_>>() - .join("\n"); - - MeshRequestResult { - success: true, - message: format!("Read file '{}'", file.name), - data: Some(json!({ - "fileId": file.id, - "name": file.name, - "description": file.description, - "summary": file.summary, - "body": body_content, - "plainText": plain_text, - "transcript": transcript, - "transcriptText": transcript_text, - "transcriptCount": file.transcript.len(), - "createdAt": file.created_at, - "updatedAt": file.updated_at, - })), - } - } - Ok(None) => MeshRequestResult { - success: false, - message: format!("File {} not found", file_id), - data: None, - }, - Err(e) => MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - }, - } - } - - MeshToolRequest::SendMessageToTask { task_id, message } => { - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return MeshRequestResult { - success: false, - message: format!("Task {} not found", task_id), - data: None, - } - } - Err(e) => { - return MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - } - } - }; - - if task.status != "running" { - return MeshRequestResult { - success: false, - message: format!("Task is not running (status: {})", task.status), - data: None, - }; - } - - if let Some(daemon_id) = task.daemon_id { - let command = DaemonCommand::SendMessage { task_id, message }; - match state.send_daemon_command(daemon_id, command).await { - Ok(()) => MeshRequestResult { - success: true, - message: "Message sent to task".to_string(), - data: Some(json!({ "taskId": task_id })), - }, - Err(e) => MeshRequestResult { - success: false, - message: format!("Failed to send message: {}", e), - data: None, - }, - } - } else { - MeshRequestResult { - success: false, - message: "Task has no daemon assigned".to_string(), - data: None, - } - } - } - - MeshToolRequest::UpdateTaskPlan { - task_id, - new_plan, - interrupt_if_running, - } => { - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return MeshRequestResult { - success: false, - message: format!("Task {} not found", task_id), - data: None, - } - } - Err(e) => { - return MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - } - } - }; - - // Interrupt if running and requested - if task.status == "running" && interrupt_if_running { - if let Some(daemon_id) = task.daemon_id { - let command = DaemonCommand::InterruptTask { - task_id, - graceful: true, - }; - let _ = state.send_daemon_command(daemon_id, command).await; - } - } - - let update_req = crate::db::models::UpdateTaskRequest { - plan: Some(new_plan), - version: Some(task.version), - ..Default::default() - }; - - match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { - Ok(Some(updated)) => { - state.broadcast_task_update(TaskUpdateNotification { - task_id, - owner_id: Some(task.owner_id), - version: updated.version, - status: updated.status.clone(), - updated_fields: vec!["plan".to_string()], - updated_by: "system".to_string(), - }); - MeshRequestResult { - success: true, - message: "Task plan updated".to_string(), - data: Some(json!({ "taskId": task_id })), - } - } - Ok(None) => MeshRequestResult { - success: false, - message: "Task not found".to_string(), - data: None, - }, - Err(e) => MeshRequestResult { - success: false, - message: format!("Failed to update task: {}", e), - data: None, - }, - } - } - - // Overlay operations - these require daemon communication - // For now, return placeholder responses since daemon implementation is separate - MeshToolRequest::PeekSiblingOverlay { sibling_task_id } => MeshRequestResult { - success: false, - message: format!( - "Overlay operations require a connected daemon. Task {} may not have overlay data yet.", - sibling_task_id - ), - data: None, - }, - - MeshToolRequest::GetOverlayDiff { task_id } => MeshRequestResult { - success: false, - message: format!( - "Overlay operations require a connected daemon. Task {} may not have overlay data yet.", - task_id - ), - data: None, - }, - - MeshToolRequest::PreviewMerge { task_id } => MeshRequestResult { - success: false, - message: format!( - "Merge preview requires a connected daemon. Task {} may not have overlay data yet.", - task_id - ), - data: None, - }, - - MeshToolRequest::MergeSubtask { task_id } => MeshRequestResult { - success: false, - message: format!( - "Merge operations require a connected daemon. Task {}", - task_id - ), - data: None, - }, - - MeshToolRequest::CompleteTask { task_id } => { - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return MeshRequestResult { - success: false, - message: format!("Task {} not found", task_id), - data: None, - } - } - Err(e) => { - return MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - } - } - }; - - // Update status to done - let update_req = crate::db::models::UpdateTaskRequest { - status: Some("done".to_string()), - version: Some(task.version), - ..Default::default() - }; - - match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { - Ok(Some(updated)) => { - state.broadcast_task_update(TaskUpdateNotification { - task_id, - owner_id: Some(task.owner_id), - version: updated.version, - status: "done".to_string(), - updated_fields: vec!["status".to_string()], - updated_by: "system".to_string(), - }); - let merge_mode = task.merge_mode.unwrap_or_else(|| "pr".to_string()); - MeshRequestResult { - success: true, - message: format!( - "Task {} completed. Merge mode: {}", - task_id, - &merge_mode - ), - data: Some(json!({ - "taskId": task_id, - "status": "done", - "mergeMode": merge_mode, - })), - } - } - Ok(None) => MeshRequestResult { - success: false, - message: "Task not found".to_string(), - data: None, - }, - Err(e) => MeshRequestResult { - success: false, - message: format!("Failed to complete task: {}", e), - data: None, - }, - } - } - - MeshToolRequest::SetMergeMode { task_id, mode } => { - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return MeshRequestResult { - success: false, - message: format!("Task {} not found", task_id), - data: None, - } - } - Err(e) => { - return MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - } - } - }; - - let update_req = crate::db::models::UpdateTaskRequest { - merge_mode: Some(mode.clone()), - version: Some(task.version), - ..Default::default() - }; - - match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { - Ok(Some(updated)) => { - state.broadcast_task_update(TaskUpdateNotification { - task_id, - owner_id: Some(task.owner_id), - version: updated.version, - status: updated.status, - updated_fields: vec!["merge_mode".to_string()], - updated_by: "system".to_string(), - }); - MeshRequestResult { - success: true, - message: format!("Merge mode set to '{}'", mode), - data: Some(json!({ "taskId": task_id, "mergeMode": mode })), - } - } - Ok(None) => MeshRequestResult { - success: false, - message: "Task not found".to_string(), - data: None, - }, - Err(e) => MeshRequestResult { - success: false, - message: format!("Failed to update merge mode: {}", e), - data: None, - }, - } - } - - // Supervisor-only tools - these should be handled via the supervisor.sh script, - // not through the mesh chat. Return an informative error. - MeshToolRequest::GetAllContractTasks { contract_id } => { - MeshRequestResult { - success: false, - message: format!( - "get_all_contract_tasks is a supervisor-only tool. Use supervisor.sh to access this functionality. Contract: {}", - contract_id - ), - data: None, - } - } - MeshToolRequest::WaitForTaskCompletion { task_id, timeout_seconds } => { - MeshRequestResult { - success: false, - message: format!( - "wait_for_task_completion is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}, Timeout: {}s", - task_id, timeout_seconds - ), - data: None, - } - } - MeshToolRequest::ReadTaskWorktree { task_id, file_path } => { - MeshRequestResult { - success: false, - message: format!( - "read_task_worktree is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}, Path: {}", - task_id, file_path - ), - data: None, - } - } - MeshToolRequest::SpawnTask { name, .. } => { - MeshRequestResult { - success: false, - message: format!( - "spawn_task is a supervisor-only tool. Only the contract supervisor can spawn new tasks. Task name: {}", - name - ), - data: None, - } - } - MeshToolRequest::CreateCheckpoint { task_id, message } => { - MeshRequestResult { - success: false, - message: format!( - "create_checkpoint is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}, Message: {}", - task_id, message - ), - data: None, - } - } - MeshToolRequest::ListTaskCheckpoints { task_id } => { - MeshRequestResult { - success: false, - message: format!( - "list_task_checkpoints is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}", - task_id - ), - data: None, - } - } - MeshToolRequest::GetTaskTree { task_id } => { - MeshRequestResult { - success: false, - message: format!( - "get_task_tree is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}", - task_id - ), - data: None, - } - } - } -} - -// ============================================================================= -// Chat History Endpoints -// ============================================================================= - -use crate::db::models::MeshChatHistoryResponse; - -/// Get chat history for the current conversation (requires authentication) -#[utoipa::path( - get, - path = "/api/v1/mesh/chat/history", - responses( - (status = 200, description = "Chat history", body = MeshChatHistoryResponse), - (status = 401, description = "Unauthorized"), - (status = 503, description = "Database not configured"), - (status = 500, description = "Internal server error") - ), - security( - ("bearer_auth" = []), - ("api_key" = []) - ), - tag = "Mesh" -)] -pub async fn get_chat_history( - State(state): State<SharedState>, - Authenticated(auth): Authenticated, -) -> impl IntoResponse { - let Some(ref pool) = state.db_pool else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(json!({ "error": "Database not configured" })), - ) - .into_response(); - }; - - let conversation = match repository::get_or_create_active_conversation(pool, auth.owner_id).await { - Ok(c) => c, - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": e.to_string() })), - ) - .into_response() - } - }; - - let messages = match repository::list_chat_messages(pool, conversation.id, None).await { - Ok(m) => m, - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": e.to_string() })), - ) - .into_response() - } - }; - - ( - StatusCode::OK, - Json(MeshChatHistoryResponse { - conversation_id: conversation.id, - messages, - }), - ) - .into_response() -} - -/// Clear chat history (archives current conversation and starts new, requires authentication) -#[utoipa::path( - delete, - path = "/api/v1/mesh/chat/history", - responses( - (status = 200, description = "History cleared"), - (status = 401, description = "Unauthorized"), - (status = 503, description = "Database not configured"), - (status = 500, description = "Internal server error") - ), - security( - ("bearer_auth" = []), - ("api_key" = []) - ), - tag = "Mesh" -)] -pub async fn clear_chat_history( - State(state): State<SharedState>, - Authenticated(auth): Authenticated, -) -> impl IntoResponse { - let Some(ref pool) = state.db_pool else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(json!({ "error": "Database not configured" })), - ) - .into_response(); - }; - - match repository::clear_conversation(pool, auth.owner_id).await { - Ok(new_conv) => ( - StatusCode::OK, - Json(json!({ "success": true, "conversationId": new_conv.id })), - ) - .into_response(), - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": e.to_string() })), - ) - .into_response(), - } -} |
