summaryrefslogtreecommitdiff
path: root/makima/src/server/handlers/mesh_chat.rs
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/server/handlers/mesh_chat.rs')
-rw-r--r--makima/src/server/handlers/mesh_chat.rs2264
1 files changed, 0 insertions, 2264 deletions
diff --git a/makima/src/server/handlers/mesh_chat.rs b/makima/src/server/handlers/mesh_chat.rs
deleted file mode 100644
index 638a4d3..0000000
--- a/makima/src/server/handlers/mesh_chat.rs
+++ /dev/null
@@ -1,2264 +0,0 @@
-//! Chat endpoint for LLM-powered task orchestration.
-//!
-//! This handler provides an agentic loop for managing tasks, daemons, and
-//! overlay operations through LLM tool calling.
-
-use axum::{
- extract::{Path, State},
- http::StatusCode,
- response::IntoResponse,
- Json,
-};
-use serde::{Deserialize, Serialize};
-use serde_json::json;
-use utoipa::ToSchema;
-use uuid::Uuid;
-
-use crate::db::{models::CreateTaskRequest, repository};
-use crate::llm::{
- claude::{self, ClaudeClient, ClaudeError, ClaudeModel},
- groq::{GroqClient, GroqError, Message, ToolCallResponse},
- parse_mesh_tool_call, LlmModel, MeshToolRequest, ToolCall, ToolResult, UserQuestion,
- MESH_TOOLS,
-};
-use crate::server::auth::Authenticated;
-use crate::server::state::{DaemonCommand, SharedState, TaskUpdateNotification};
-
-/// Maximum number of tool-calling rounds to prevent infinite loops
-const MAX_TOOL_ROUNDS: usize = 30;
-
-#[derive(Debug, Clone, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct MeshChatHistoryMessage {
- /// Role: "user" or "assistant"
- pub role: String,
- /// Message content
- pub content: String,
-}
-
-#[derive(Debug, Deserialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct MeshChatRequest {
- /// The user's message/instruction
- pub message: String,
- /// Optional model selection: "claude-sonnet" (default), "claude-opus", or "groq"
- #[serde(default)]
- pub model: Option<String>,
- /// Optional conversation history for context continuity (deprecated - now loaded from DB)
- #[serde(default)]
- pub history: Option<Vec<MeshChatHistoryMessage>>,
- /// Context type: "mesh", "task", or "subtask"
- #[serde(default)]
- pub context_type: Option<String>,
- /// Task ID if context is task/subtask
- #[serde(default)]
- pub context_task_id: Option<Uuid>,
-}
-
-#[derive(Debug, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct MeshChatResponse {
- /// The LLM's response message
- pub response: String,
- /// Tool calls that were executed
- pub tool_calls: Vec<MeshToolCallInfo>,
- /// Questions pending user answers (pauses conversation)
- #[serde(skip_serializing_if = "Option::is_none")]
- pub pending_questions: Option<Vec<UserQuestion>>,
-}
-
-#[derive(Debug, Serialize, ToSchema)]
-#[serde(rename_all = "camelCase")]
-pub struct MeshToolCallInfo {
- pub name: String,
- pub result: ToolResult,
-}
-
-/// Enum to hold LLM clients
-enum LlmClient {
- Groq(GroqClient),
- Claude(ClaudeClient),
-}
-
-/// Unified result from LLM call
-struct LlmResult {
- content: Option<String>,
- tool_calls: Vec<ToolCall>,
- raw_tool_calls: Vec<ToolCallResponse>,
- finish_reason: String,
-}
-
-/// Chat with mesh orchestrator at the top level (no specific task context)
-#[utoipa::path(
- post,
- path = "/api/v1/mesh/chat",
- request_body = MeshChatRequest,
- responses(
- (status = 200, description = "Chat completed successfully", body = MeshChatResponse),
- (status = 401, description = "Unauthorized"),
- (status = 500, description = "Internal server error")
- ),
- security(
- ("bearer_auth" = []),
- ("api_key" = [])
- ),
- tag = "Mesh"
-)]
-pub async fn mesh_toplevel_chat_handler(
- State(state): State<SharedState>,
- Authenticated(auth): Authenticated,
- Json(request): Json<MeshChatRequest>,
-) -> impl IntoResponse {
- // Check if database is configured
- let Some(ref pool) = state.db_pool else {
- return (
- StatusCode::SERVICE_UNAVAILABLE,
- Json(json!({ "error": "Database not configured" })),
- )
- .into_response();
- };
-
- // Parse model selection (default to Claude Sonnet)
- let model = request
- .model
- .as_ref()
- .and_then(|m| LlmModel::from_str(m))
- .unwrap_or(LlmModel::ClaudeSonnet);
-
- tracing::info!("Mesh top-level chat using LLM model: {:?}", model);
-
- // Initialize the appropriate LLM client
- let llm_client = match model {
- LlmModel::ClaudeSonnet => match ClaudeClient::from_env(ClaudeModel::Sonnet) {
- Ok(client) => LlmClient::Claude(client),
- Err(ClaudeError::MissingApiKey) => {
- return (
- StatusCode::SERVICE_UNAVAILABLE,
- Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })),
- )
- .into_response();
- }
- Err(e) => {
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(json!({ "error": format!("Claude client error: {}", e) })),
- )
- .into_response();
- }
- },
- LlmModel::ClaudeOpus => match ClaudeClient::from_env(ClaudeModel::Opus) {
- Ok(client) => LlmClient::Claude(client),
- Err(ClaudeError::MissingApiKey) => {
- return (
- StatusCode::SERVICE_UNAVAILABLE,
- Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })),
- )
- .into_response();
- }
- Err(e) => {
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(json!({ "error": format!("Claude client error: {}", e) })),
- )
- .into_response();
- }
- },
- LlmModel::GroqKimi => match GroqClient::from_env() {
- Ok(client) => LlmClient::Groq(client),
- Err(GroqError::MissingApiKey) => {
- return (
- StatusCode::SERVICE_UNAVAILABLE,
- Json(json!({ "error": "GROQ_API_KEY not configured" })),
- )
- .into_response();
- }
- Err(e) => {
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(json!({ "error": format!("Groq client error: {}", e) })),
- )
- .into_response();
- }
- },
- };
-
- // Build context about all tasks and daemons
- let mesh_context = build_mesh_overview_context(pool, &state, auth.owner_id).await;
-
- // Build agentic system prompt for top-level mesh orchestration
- let system_prompt = format!(
- r#"You are an intelligent task orchestration agent. You help users manage and coordinate tasks running on connected daemons with Claude Code containers.
-
-## Your Capabilities
-You have access to tools for:
-- **Task Lifecycle**: create_task, run_task, pause_task, resume_task, interrupt_task, discard_task
-- **Task Queries**: query_task_status, list_tasks, list_subtasks, list_siblings, list_daemons
-- **File Access**: list_files, read_file (read documents from the files system)
-- **Task Communication**: send_message_to_task, update_task_plan
-- **Overlay/Merge Operations**: peek_sibling_overlay, get_overlay_diff, preview_merge, merge_subtask, complete_task, set_merge_mode
-
-## Current Mesh Overview
-{mesh_context}
-
-## Agentic Behavior Guidelines
-
-### 1. Analyze Before Acting
-- For complex orchestration requests, first gather information using query_task_status, list_tasks, or list_daemons
-- Understand the current state before making changes
-- For simple, direct requests (e.g., "create a new task"), you can act immediately
-
-### 2. Plan Multi-Step Operations
-- Break complex orchestration into logical steps
-- For parallel execution: create multiple subtasks, then run them on different daemons
-- For sequential execution: create subtasks and run them in order
-
-### 3. Create and Manage Tasks
-- Use create_task to create new top-level tasks or subtasks
-- Assign appropriate priorities and plans
-- **Repository Default**: When creating tasks, use the daemon's working directory as the repository_url by default (shown as "Default Repository" above). Only omit repository_url if the task doesn't involve code, or use a different URL if the user explicitly requests it.
-- If a working directory is a git repository, use it as the repository_url for code-related tasks
-
-### 4. Coordinate Multiple Tasks
-- Use list_tasks to see all tasks and their statuses
-- Use list_daemons to see available compute resources
-- Balance workload across daemons
-
-### 5. Be Efficient
-- Don't over-analyze simple requests
-- Use the minimum number of tool calls needed
-- Provide clear summaries of actions taken
-
-## Important Notes
-- Task IDs are UUIDs - ensure you use the correct format
-- Running a task requires at least one connected daemon
-- When creating subtasks, specify the parent_task_id
-- Always confirm destructive operations (discard_task) with the user"#,
- mesh_context = mesh_context
- );
-
- // Run the shared agentic loop
- run_mesh_agentic_loop(pool, &state, &llm_client, system_prompt, &request, auth.owner_id).await
-}
-
-/// Chat with task mesh orchestrator using LLM tool calling (scoped by owner)
-#[utoipa::path(
- post,
- path = "/api/v1/mesh/tasks/{id}/chat",
- request_body = MeshChatRequest,
- responses(
- (status = 200, description = "Chat completed successfully", body = MeshChatResponse),
- (status = 401, description = "Unauthorized"),
- (status = 404, description = "Task not found"),
- (status = 500, description = "Internal server error")
- ),
- params(
- ("id" = Uuid, Path, description = "Task ID (context for orchestration)")
- ),
- security(
- ("bearer_auth" = []),
- ("api_key" = [])
- ),
- tag = "Mesh"
-)]
-pub async fn mesh_chat_handler(
- State(state): State<SharedState>,
- Authenticated(auth): Authenticated,
- Path(task_id): Path<Uuid>,
- Json(request): Json<MeshChatRequest>,
-) -> impl IntoResponse {
- // Check if database is configured
- let Some(ref pool) = state.db_pool else {
- return (
- StatusCode::SERVICE_UNAVAILABLE,
- Json(json!({ "error": "Database not configured" })),
- )
- .into_response();
- };
-
- // Get the context task (scoped by owner)
- let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
- Ok(Some(task)) => task,
- Ok(None) => {
- return (
- StatusCode::NOT_FOUND,
- Json(json!({ "error": "Task not found" })),
- )
- .into_response();
- }
- Err(e) => {
- tracing::error!("Database error: {}", e);
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(json!({ "error": format!("Database error: {}", e) })),
- )
- .into_response();
- }
- };
-
- // Parse model selection (default to Claude Sonnet)
- let model = request
- .model
- .as_ref()
- .and_then(|m| LlmModel::from_str(m))
- .unwrap_or(LlmModel::ClaudeSonnet);
-
- tracing::info!("Mesh chat using LLM model: {:?}", model);
-
- // Initialize the appropriate LLM client
- let llm_client = match model {
- LlmModel::ClaudeSonnet => match ClaudeClient::from_env(ClaudeModel::Sonnet) {
- Ok(client) => LlmClient::Claude(client),
- Err(ClaudeError::MissingApiKey) => {
- return (
- StatusCode::SERVICE_UNAVAILABLE,
- Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })),
- )
- .into_response();
- }
- Err(e) => {
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(json!({ "error": format!("Claude client error: {}", e) })),
- )
- .into_response();
- }
- },
- LlmModel::ClaudeOpus => match ClaudeClient::from_env(ClaudeModel::Opus) {
- Ok(client) => LlmClient::Claude(client),
- Err(ClaudeError::MissingApiKey) => {
- return (
- StatusCode::SERVICE_UNAVAILABLE,
- Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })),
- )
- .into_response();
- }
- Err(e) => {
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(json!({ "error": format!("Claude client error: {}", e) })),
- )
- .into_response();
- }
- },
- LlmModel::GroqKimi => match GroqClient::from_env() {
- Ok(client) => LlmClient::Groq(client),
- Err(GroqError::MissingApiKey) => {
- return (
- StatusCode::SERVICE_UNAVAILABLE,
- Json(json!({ "error": "GROQ_API_KEY not configured" })),
- )
- .into_response();
- }
- Err(e) => {
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(json!({ "error": format!("Groq client error: {}", e) })),
- )
- .into_response();
- }
- },
- };
-
- // Build context about the current task and mesh state
- let task_context = build_task_context(&task);
-
- // Build agentic system prompt for task orchestration
- let system_prompt = format!(
- r#"You are an intelligent task orchestration agent. You help users manage and coordinate tasks running on connected daemons with Claude Code containers.
-
-## Your Capabilities
-You have access to tools for:
-- **Task Lifecycle**: create_task, run_task, pause_task, resume_task, interrupt_task, discard_task
-- **Task Queries**: query_task_status, list_tasks, list_subtasks, list_siblings, list_daemons
-- **File Access**: list_files, read_file (read documents from the files system)
-- **Task Communication**: send_message_to_task, update_task_plan
-- **Overlay/Merge Operations**: peek_sibling_overlay, get_overlay_diff, preview_merge, merge_subtask, complete_task, set_merge_mode
-
-## Current Context
-{task_context}
-
-## Agentic Behavior Guidelines
-
-### 1. Analyze Before Acting
-- For complex orchestration requests, first gather information using query_task_status, list_tasks, or list_daemons
-- Understand the current state before making changes
-- For simple, direct requests (e.g., "pause this task"), you can act immediately
-
-### 2. Plan Multi-Step Operations
-- Break complex orchestration into logical steps
-- For parallel execution: create multiple subtasks, then run them on different daemons
-- For sequential execution: create subtasks and run them in order
-
-### 3. Monitor Task Progress
-- Use query_task_status to check on running tasks
-- Watch for status changes and react accordingly
-- Handle failures gracefully (retry, escalate, or report)
-
-### 4. Coordinate Sibling Tasks
-- Use peek_sibling_overlay to see what other tasks have changed
-- Preview merges before completing to catch conflicts
-- Coordinate timing when multiple tasks need to merge
-
-### 5. Be Efficient
-- Don't over-analyze simple requests
-- Use the minimum number of tool calls needed
-- Provide clear summaries of actions taken
-
-## Important Notes
-- Task IDs are UUIDs - ensure you use the correct format
-- Running a task requires at least one connected daemon
-- Overlay operations require the task to have been run at least once
-- Always confirm destructive operations (discard_task) with the user
-- When creating subtasks for this task, use parent_task_id: {task_id}"#,
- task_context = task_context,
- task_id = task_id
- );
-
- // Run the shared agentic loop
- run_mesh_agentic_loop(pool, &state, &llm_client, system_prompt, &request, auth.owner_id).await
-}
-
-fn build_task_context(task: &crate::db::models::Task) -> String {
- let mut context = format!(
- "Current Task: {} (ID: {})\n",
- task.name, task.id
- );
- context.push_str(&format!("Status: {}\n", task.status));
- context.push_str(&format!("Priority: {}\n", task.priority));
-
- if let Some(ref desc) = task.description {
- context.push_str(&format!("Description: {}\n", desc));
- }
-
- // Truncate plan preview if too long
- let plan_preview = if task.plan.len() > 200 {
- format!("{}...", &task.plan[..200])
- } else {
- task.plan.clone()
- };
- context.push_str(&format!("Plan: {}\n", plan_preview));
-
- if let Some(ref summary) = task.progress_summary {
- context.push_str(&format!("Progress: {}\n", summary));
- }
-
- if let Some(ref error) = task.error_message {
- context.push_str(&format!("Error: {}\n", error));
- }
-
- // Repository info
- if let Some(ref url) = task.repository_url {
- context.push_str(&format!("Repository: {}\n", url));
- }
- if let Some(ref branch) = task.base_branch {
- context.push_str(&format!("Base branch: {}\n", branch));
- }
-
- context
-}
-
-/// Build overview context for top-level mesh orchestration
-async fn build_mesh_overview_context(pool: &sqlx::PgPool, state: &SharedState, owner_id: Uuid) -> String {
- let mut context = String::new();
-
- // Get task counts by status
- match repository::list_tasks_for_owner(pool, owner_id).await {
- Ok(tasks) => {
- let total = tasks.len();
- let pending = tasks.iter().filter(|t| t.status == "pending").count();
- let running = tasks.iter().filter(|t| t.status == "running").count();
- let paused = tasks.iter().filter(|t| t.status == "paused").count();
- let done = tasks.iter().filter(|t| t.status == "done").count();
- let failed = tasks.iter().filter(|t| t.status == "failed").count();
-
- context.push_str(&format!(
- "Tasks: {} total ({} pending, {} running, {} paused, {} done, {} failed)\n",
- total, pending, running, paused, done, failed
- ));
-
- // List recent/active tasks
- if !tasks.is_empty() {
- context.push_str("\nRecent Tasks:\n");
- for task in tasks.iter().take(5) {
- context.push_str(&format!(
- " - {} (ID: {}, Status: {})\n",
- task.name, task.id, task.status
- ));
- }
- if tasks.len() > 5 {
- context.push_str(&format!(" ... and {} more\n", tasks.len() - 5));
- }
- }
- }
- Err(e) => {
- context.push_str(&format!("Error fetching tasks: {}\n", e));
- }
- }
-
- // Get connected daemons for this owner
- let owner_daemons: Vec<_> = state.daemon_connections.iter()
- .filter(|e| e.value().owner_id == owner_id)
- .collect();
- let daemon_count = owner_daemons.len();
- context.push_str(&format!("\nConnected Daemons: {}\n", daemon_count));
-
- for entry in owner_daemons.iter().take(3) {
- let daemon = entry.value();
- let working_dir = daemon.working_directory.as_deref().unwrap_or("not set");
- context.push_str(&format!(
- " - {} (ID: {}, Working Directory: {})\n",
- daemon.hostname.as_deref().unwrap_or("unknown"),
- daemon.id,
- working_dir
- ));
- }
-
- // Add default repository guidance if there's exactly one daemon with a working directory
- let daemons_with_working_dir: Vec<_> = owner_daemons.iter()
- .filter(|e| e.value().working_directory.is_some())
- .collect();
-
- if daemons_with_working_dir.len() == 1 {
- if let Some(dir) = &daemons_with_working_dir[0].value().working_directory {
- context.push_str(&format!(
- "\nDefault Repository: {} (use this as repository_url when creating tasks unless user specifies otherwise)\n",
- dir
- ));
- }
- }
-
- context
-}
-
-/// Run the shared agentic loop for mesh chat
-async fn run_mesh_agentic_loop(
- pool: &sqlx::PgPool,
- state: &SharedState,
- llm_client: &LlmClient,
- system_prompt: String,
- request: &MeshChatRequest,
- owner_id: Uuid,
-) -> axum::response::Response {
- // Get or create conversation for storing messages
- let conversation = match repository::get_or_create_active_conversation(pool, owner_id).await {
- Ok(c) => c,
- Err(e) => {
- tracing::error!("Failed to get/create conversation: {}", e);
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(json!({ "error": format!("Failed to initialize conversation: {}", e) })),
- )
- .into_response();
- }
- };
-
- // Build initial messages
- let mut messages = vec![Message {
- role: "system".to_string(),
- content: Some(system_prompt),
- tool_calls: None,
- tool_call_id: None,
- }];
-
- // Load conversation history from database (or use provided for backwards compatibility)
- if let Some(history) = &request.history {
- // Legacy: use provided history
- for hist_msg in history {
- messages.push(Message {
- role: hist_msg.role.clone(),
- content: Some(hist_msg.content.clone()),
- tool_calls: None,
- tool_call_id: None,
- });
- }
- tracing::info!(
- history_messages = history.len(),
- "Loaded mesh conversation history from request (legacy)"
- );
- } else {
- // New: load from database
- match repository::list_chat_messages(pool, conversation.id, Some(50)).await {
- Ok(db_messages) => {
- for msg in db_messages {
- messages.push(Message {
- role: msg.role.clone(),
- content: Some(msg.content.clone()),
- tool_calls: None,
- tool_call_id: None,
- });
- }
- tracing::info!(
- history_messages = messages.len() - 1, // minus system message
- "Loaded mesh conversation history from database"
- );
- }
- Err(e) => {
- tracing::warn!("Failed to load chat history: {}", e);
- // Continue without history
- }
- }
- }
-
- // Add current user message
- messages.push(Message {
- role: "user".to_string(),
- content: Some(request.message.clone()),
- tool_calls: None,
- tool_call_id: None,
- });
-
- // State for tracking
- let mut all_tool_call_infos: Vec<MeshToolCallInfo> = Vec::new();
- let mut final_response: Option<String> = None;
- let mut consecutive_failures = 0;
- const MAX_CONSECUTIVE_FAILURES: usize = 3;
- let mut pending_questions: Option<Vec<UserQuestion>> = None;
-
- // Multi-turn agentic tool calling loop
- for round in 0..MAX_TOOL_ROUNDS {
- tracing::info!(
- round = round,
- total_tool_calls = all_tool_call_infos.len(),
- "Mesh agentic loop iteration"
- );
-
- // Check consecutive failures
- if consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
- tracing::warn!(
- "Breaking mesh loop due to {} consecutive failures",
- consecutive_failures
- );
- final_response = Some(
- "I encountered multiple consecutive errors and stopped. \
- Please check the task state and try again."
- .to_string(),
- );
- break;
- }
-
- // Call the appropriate LLM API
- let result = match llm_client {
- LlmClient::Groq(groq) => {
- match groq.chat_with_tools(messages.clone(), &MESH_TOOLS).await {
- Ok(r) => LlmResult {
- content: r.content,
- tool_calls: r.tool_calls,
- raw_tool_calls: r.raw_tool_calls,
- finish_reason: r.finish_reason,
- },
- Err(e) => {
- tracing::error!("Groq API error: {}", e);
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(json!({ "error": format!("LLM API error: {}", e) })),
- )
- .into_response();
- }
- }
- }
- LlmClient::Claude(claude_client) => {
- let claude_messages = claude::groq_messages_to_claude(&messages);
- match claude_client
- .chat_with_tools(claude_messages, &MESH_TOOLS)
- .await
- {
- Ok(r) => {
- let raw_tool_calls: Vec<ToolCallResponse> = r
- .tool_calls
- .iter()
- .map(|tc| ToolCallResponse {
- id: tc.id.clone(),
- call_type: "function".to_string(),
- function: crate::llm::groq::FunctionCall {
- name: tc.name.clone(),
- arguments: tc.arguments.to_string(),
- },
- })
- .collect();
-
- LlmResult {
- content: r.content,
- tool_calls: r.tool_calls,
- raw_tool_calls,
- finish_reason: r.stop_reason,
- }
- }
- Err(e) => {
- tracing::error!("Claude API error: {}", e);
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(json!({ "error": format!("LLM API error: {}", e) })),
- )
- .into_response();
- }
- }
- }
- };
-
- // Check if there are tool calls to execute
- if result.tool_calls.is_empty() {
- final_response = result.content;
- break;
- }
-
- // Add assistant message with tool calls to conversation
- messages.push(Message {
- role: "assistant".to_string(),
- content: result.content.clone(),
- tool_calls: Some(result.raw_tool_calls.clone()),
- tool_call_id: None,
- });
-
- // Execute each tool call
- for (i, tool_call) in result.tool_calls.iter().enumerate() {
- tracing::info!(tool = %tool_call.name, round = round, "Executing mesh tool call");
-
- // Parse the tool call
- let mut execution_result = parse_mesh_tool_call(tool_call);
-
- // Handle async mesh tool requests
- if let Some(mesh_request) = execution_result.request.take() {
- let async_result = handle_mesh_request(pool, state, mesh_request, owner_id).await;
- execution_result.success = async_result.success;
- execution_result.message = async_result.message;
- execution_result.data = async_result.data;
- }
-
- // Track consecutive failures
- if execution_result.success {
- consecutive_failures = 0;
- } else {
- consecutive_failures += 1;
- tracing::warn!(
- tool = %tool_call.name,
- consecutive_failures = consecutive_failures,
- "Mesh tool call failed"
- );
- }
-
- // Check for pending user questions
- if let Some(questions) = execution_result.pending_questions {
- tracing::info!(
- question_count = questions.len(),
- "Mesh LLM requesting user input"
- );
- pending_questions = Some(questions);
- all_tool_call_infos.push(MeshToolCallInfo {
- name: tool_call.name.clone(),
- result: ToolResult {
- success: execution_result.success,
- message: execution_result.message.clone(),
- },
- });
- break;
- }
-
- // Build tool result message
- let result_content = if let Some(data) = &execution_result.data {
- json!({
- "success": execution_result.success,
- "message": execution_result.message,
- "data": data
- })
- .to_string()
- } else {
- json!({
- "success": execution_result.success,
- "message": execution_result.message
- })
- .to_string()
- };
-
- // Add tool result message
- let tool_call_id = match llm_client {
- LlmClient::Groq(_) => result.raw_tool_calls[i].id.clone(),
- LlmClient::Claude(_) => tool_call.id.clone(),
- };
-
- messages.push(Message {
- role: "tool".to_string(),
- content: Some(result_content),
- tool_calls: None,
- tool_call_id: Some(tool_call_id),
- });
-
- // Track for response
- all_tool_call_infos.push(MeshToolCallInfo {
- name: tool_call.name.clone(),
- result: ToolResult {
- success: execution_result.success,
- message: execution_result.message,
- },
- });
- }
-
- // If user questions are pending, pause
- if pending_questions.is_some() {
- final_response = result.content;
- break;
- }
-
- // If finish reason indicates completion, exit loop
- let finish_lower = result.finish_reason.to_lowercase();
- if finish_lower == "stop" || finish_lower == "end_turn" {
- final_response = result.content;
- break;
- }
- }
-
- // Build response
- let response_text = final_response.unwrap_or_else(|| {
- if all_tool_call_infos.is_empty() {
- "I couldn't understand your request. Please try rephrasing.".to_string()
- } else {
- format!(
- "Done! Executed {} tool{}.",
- all_tool_call_infos.len(),
- if all_tool_call_infos.len() == 1 {
- ""
- } else {
- "s"
- }
- )
- }
- });
-
- // Save messages to database (only if not using legacy history mode)
- if request.history.is_none() {
- let context_type = request.context_type.clone().unwrap_or_else(|| "mesh".to_string());
-
- // Validate context_task_id exists before using it (to avoid FK constraint violation)
- let context_task_id = if let Some(task_id) = request.context_task_id {
- match repository::get_task_for_owner(pool, task_id, owner_id).await {
- Ok(Some(_)) => Some(task_id),
- Ok(None) => {
- tracing::warn!("context_task_id {} not found, ignoring", task_id);
- None
- }
- Err(e) => {
- tracing::warn!("Failed to validate context_task_id {}: {}", task_id, e);
- None
- }
- }
- } else {
- None
- };
-
- // Save user message
- if let Err(e) = repository::add_chat_message(
- pool,
- conversation.id,
- "user",
- &request.message,
- &context_type,
- context_task_id,
- None,
- None,
- )
- .await
- {
- tracing::warn!("Failed to save user message to DB: {}", e);
- }
-
- // Serialize tool calls for storage
- let tool_calls_json = if all_tool_call_infos.is_empty() {
- None
- } else {
- Some(serde_json::to_value(&all_tool_call_infos).unwrap_or_default())
- };
-
- // Serialize pending questions for storage
- let pending_questions_json = pending_questions
- .as_ref()
- .map(|q| serde_json::to_value(q).unwrap_or_default());
-
- // Save assistant message
- if let Err(e) = repository::add_chat_message(
- pool,
- conversation.id,
- "assistant",
- &response_text,
- &context_type,
- context_task_id,
- tool_calls_json,
- pending_questions_json,
- )
- .await
- {
- tracing::warn!("Failed to save assistant message to DB: {}", e);
- }
-
- tracing::info!(
- conversation_id = %conversation.id,
- context_type = %context_type,
- "Saved mesh chat messages to database"
- );
- }
-
- (
- StatusCode::OK,
- Json(MeshChatResponse {
- response: response_text,
- tool_calls: all_tool_call_infos,
- pending_questions,
- }),
- )
- .into_response()
-}
-
-/// Result from handling an async mesh tool request
-struct MeshRequestResult {
- success: bool,
- message: String,
- data: Option<serde_json::Value>,
-}
-
-/// Handle async mesh tool requests that require database/daemon access
-async fn handle_mesh_request(
- pool: &sqlx::PgPool,
- state: &SharedState,
- request: MeshToolRequest,
- owner_id: Uuid,
-) -> MeshRequestResult {
- match request {
- MeshToolRequest::CreateTask {
- name,
- plan,
- parent_task_id,
- repository_url,
- base_branch,
- merge_mode,
- priority,
- } => {
- // Subtasks inherit contract_id from parent task
- let contract_id = if let Some(parent_id) = parent_task_id {
- match repository::get_task(pool, parent_id).await {
- Ok(Some(parent_task)) => {
- match parent_task.contract_id {
- Some(cid) => cid,
- None => {
- return MeshRequestResult {
- success: false,
- message: "Parent task has no contract_id".to_string(),
- data: None,
- };
- }
- }
- }
- Ok(None) => {
- return MeshRequestResult {
- success: false,
- message: format!("Parent task {} not found", parent_id),
- data: None,
- };
- }
- Err(e) => {
- return MeshRequestResult {
- success: false,
- message: format!("Failed to look up parent task: {}", e),
- data: None,
- };
- }
- }
- } else {
- // Root tasks created via LLM chat require a contract_id
- // TODO: Add contract_id to create_task tool definition
- return MeshRequestResult {
- success: false,
- message: "Cannot create root task without contract_id. Use parent_task_id to create subtasks.".to_string(),
- data: None,
- };
- };
-
- // Check if repository_url matches a daemon's working directory (for this owner)
- let is_daemon_working_dir = repository_url.as_ref().map(|url| {
- state.daemon_connections.iter().any(|entry| {
- entry.value().owner_id == owner_id &&
- entry.value().working_directory.as_ref() == Some(url)
- })
- }).unwrap_or(false);
-
- // Derive completion_action from merge_mode, or default to "branch" if using daemon working dir
- let (completion_action, target_repo_path) = if let Some(ref mode) = merge_mode {
- // Explicit merge_mode provided - derive from it
- let action = match mode.as_str() {
- "pr" => "pr".to_string(),
- "auto" => "merge".to_string(),
- "manual" => "branch".to_string(),
- _ => "none".to_string(),
- };
- // If using daemon working dir and action involves the repo, set target_repo_path
- let target = if is_daemon_working_dir && action != "none" {
- repository_url.clone()
- } else {
- None
- };
- (Some(action), target)
- } else if is_daemon_working_dir {
- // No merge_mode but using daemon working dir - default to "pr"
- (Some("pr".to_string()), repository_url.clone())
- } else {
- (None, None)
- };
-
- let create_req = CreateTaskRequest {
- contract_id: Some(contract_id),
- name: name.clone(),
- description: None,
- plan,
- parent_task_id,
- repository_url,
- base_branch,
- target_branch: None,
- merge_mode,
- priority: priority.unwrap_or(0),
- target_repo_path,
- completion_action,
- continue_from_task_id: None,
- copy_files: None,
- is_supervisor: false,
- checkpoint_sha: None,
- branched_from_task_id: None,
- conversation_history: None,
- supervisor_worktree_task_id: None, // Not spawned by supervisor
- directive_id: None,
- directive_step_id: None,
- };
-
- match repository::create_task_for_owner(pool, owner_id, create_req).await {
- Ok(task) => MeshRequestResult {
- success: true,
- message: format!("Created task '{}' with ID {}", name, task.id),
- data: Some(json!({
- "taskId": task.id,
- "name": task.name,
- "status": task.status,
- })),
- },
- Err(e) => MeshRequestResult {
- success: false,
- message: format!("Failed to create task: {}", e),
- data: None,
- },
- }
- }
-
- MeshToolRequest::RunTask { task_id, daemon_id } => {
- // Get task to check status
- let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
- Ok(Some(t)) => t,
- Ok(None) => {
- return MeshRequestResult {
- success: false,
- message: format!("Task {} not found", task_id),
- data: None,
- }
- }
- Err(e) => {
- return MeshRequestResult {
- success: false,
- message: format!("Database error: {}", e),
- data: None,
- }
- }
- };
-
- if task.status != "pending" && task.status != "paused" {
- return MeshRequestResult {
- success: false,
- message: format!(
- "Task cannot be run - status is '{}' (must be 'pending' or 'paused')",
- task.status
- ),
- data: None,
- };
- }
-
- // Find a daemon to run on (must belong to this owner)
- let target_daemon_id = if let Some(id) = daemon_id {
- // Verify the specified daemon belongs to this owner
- if !state.daemon_connections.iter().any(|d| d.value().id == id && d.value().owner_id == owner_id) {
- return MeshRequestResult {
- success: false,
- message: "Specified daemon not found or not accessible.".to_string(),
- data: None,
- };
- }
- id
- } else {
- // Find any connected daemon for this owner
- let daemon = state.daemon_connections.iter().find(|d| d.value().owner_id == owner_id);
- match daemon {
- Some(d) => d.value().id,
- None => {
- return MeshRequestResult {
- success: false,
- message: "No daemons connected for your account. Cannot run task.".to_string(),
- data: None,
- }
- }
- }
- };
-
- // Check if this is an orchestrator (depth 0 with subtasks)
- let subtask_count = match repository::list_subtasks_for_owner(pool, task_id, owner_id).await {
- Ok(subtasks) => subtasks.len(),
- Err(_) => 0,
- };
- let is_orchestrator = task.depth == 0 && subtask_count > 0;
-
- // IMPORTANT: Update database FIRST to assign daemon_id before sending command
- // This prevents race conditions where the task starts but daemon_id is not set
- let update_req = crate::db::models::UpdateTaskRequest {
- status: Some("starting".to_string()),
- daemon_id: Some(target_daemon_id),
- version: Some(task.version),
- ..Default::default()
- };
-
- let updated_task = match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await {
- Ok(Some(t)) => t,
- Ok(None) => {
- return MeshRequestResult {
- success: false,
- message: format!("Task {} not found", task_id),
- data: None,
- }
- }
- Err(e) => {
- return MeshRequestResult {
- success: false,
- message: format!("Failed to update task: {}", e),
- data: None,
- }
- }
- };
-
- // Get local_only and auto_merge_local from contract if task has one
- let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id {
- match repository::get_contract_for_owner(pool, contract_id, owner_id).await {
- Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local),
- _ => (false, false),
- }
- } else {
- (false, false)
- };
-
- // Send SpawnTask command to daemon
- let command = DaemonCommand::SpawnTask {
- task_id,
- task_name: task.name.clone(),
- plan: task.plan.clone(),
- repo_url: task.repository_url.clone(),
- base_branch: task.base_branch.clone(),
- target_branch: task.target_branch.clone(),
- parent_task_id: task.parent_task_id,
- depth: task.depth,
- is_orchestrator,
- target_repo_path: task.target_repo_path.clone(),
- completion_action: task.completion_action.clone(),
- continue_from_task_id: task.continue_from_task_id,
- copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
- contract_id: task.contract_id,
- is_supervisor: task.is_supervisor,
- autonomous_loop: false,
- resume_session: false,
- conversation_history: None,
- patch_data: None,
- patch_base_sha: None,
- local_only,
- auto_merge_local,
- supervisor_worktree_task_id: None, // Not spawned by supervisor
- directive_id: task.directive_id,
- };
-
- match state.send_daemon_command(target_daemon_id, command).await {
- Ok(()) => {
- state.broadcast_task_update(TaskUpdateNotification {
- task_id,
- owner_id: Some(task.owner_id),
- version: updated_task.version,
- status: "starting".to_string(),
- updated_fields: vec!["status".to_string(), "daemon_id".to_string()],
- updated_by: "system".to_string(),
- });
-
- MeshRequestResult {
- success: true,
- message: format!("Task {} is now running on daemon {}", task_id, target_daemon_id),
- data: Some(json!({
- "taskId": task_id,
- "daemonId": target_daemon_id,
- "status": "starting",
- })),
- }
- }
- Err(e) => {
- // Rollback: clear daemon_id and reset status since command failed
- let rollback_req = crate::db::models::UpdateTaskRequest {
- status: Some("pending".to_string()),
- clear_daemon_id: true,
- ..Default::default()
- };
- let _ = repository::update_task_for_owner(pool, task_id, owner_id, rollback_req).await;
-
- MeshRequestResult {
- success: false,
- message: format!("Failed to start task: {}", e),
- data: None,
- }
- }
- }
- }
-
- MeshToolRequest::PauseTask { task_id } => {
- // Get task and its daemon
- let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
- Ok(Some(t)) => t,
- Ok(None) => {
- return MeshRequestResult {
- success: false,
- message: format!("Task {} not found", task_id),
- data: None,
- }
- }
- Err(e) => {
- return MeshRequestResult {
- success: false,
- message: format!("Database error: {}", e),
- data: None,
- }
- }
- };
-
- if task.status != "running" {
- return MeshRequestResult {
- success: false,
- message: format!("Task is not running (status: {})", task.status),
- data: None,
- };
- }
-
- if let Some(daemon_id) = task.daemon_id {
- let command = DaemonCommand::PauseTask { task_id };
- if let Err(e) = state.send_daemon_command(daemon_id, command).await {
- return MeshRequestResult {
- success: false,
- message: format!("Failed to pause task: {}", e),
- data: None,
- };
- }
- }
-
- // Update status
- let update_req = crate::db::models::UpdateTaskRequest {
- status: Some("paused".to_string()),
- version: Some(task.version),
- ..Default::default()
- };
-
- if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await {
- state.broadcast_task_update(TaskUpdateNotification {
- task_id,
- owner_id: Some(task.owner_id),
- version: updated.version,
- status: "paused".to_string(),
- updated_fields: vec!["status".to_string()],
- updated_by: "system".to_string(),
- });
- }
-
- MeshRequestResult {
- success: true,
- message: format!("Task {} paused", task_id),
- data: Some(json!({ "taskId": task_id, "status": "paused" })),
- }
- }
-
- MeshToolRequest::ResumeTask { task_id } => {
- let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
- Ok(Some(t)) => t,
- Ok(None) => {
- return MeshRequestResult {
- success: false,
- message: format!("Task {} not found", task_id),
- data: None,
- }
- }
- Err(e) => {
- return MeshRequestResult {
- success: false,
- message: format!("Database error: {}", e),
- data: None,
- }
- }
- };
-
- if task.status != "paused" {
- return MeshRequestResult {
- success: false,
- message: format!("Task is not paused (status: {})", task.status),
- data: None,
- };
- }
-
- if let Some(daemon_id) = task.daemon_id {
- let command = DaemonCommand::ResumeTask { task_id };
- if let Err(e) = state.send_daemon_command(daemon_id, command).await {
- return MeshRequestResult {
- success: false,
- message: format!("Failed to resume task: {}", e),
- data: None,
- };
- }
- }
-
- // Update status
- let update_req = crate::db::models::UpdateTaskRequest {
- status: Some("running".to_string()),
- version: Some(task.version),
- ..Default::default()
- };
-
- if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await {
- state.broadcast_task_update(TaskUpdateNotification {
- task_id,
- owner_id: Some(task.owner_id),
- version: updated.version,
- status: "running".to_string(),
- updated_fields: vec!["status".to_string()],
- updated_by: "system".to_string(),
- });
- }
-
- MeshRequestResult {
- success: true,
- message: format!("Task {} resumed", task_id),
- data: Some(json!({ "taskId": task_id, "status": "running" })),
- }
- }
-
- MeshToolRequest::InterruptTask { task_id, graceful } => {
- let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
- Ok(Some(t)) => t,
- Ok(None) => {
- return MeshRequestResult {
- success: false,
- message: format!("Task {} not found", task_id),
- data: None,
- }
- }
- Err(e) => {
- return MeshRequestResult {
- success: false,
- message: format!("Database error: {}", e),
- data: None,
- }
- }
- };
-
- if let Some(daemon_id) = task.daemon_id {
- let command = DaemonCommand::InterruptTask { task_id, graceful };
- if let Err(e) = state.send_daemon_command(daemon_id, command).await {
- return MeshRequestResult {
- success: false,
- message: format!("Failed to interrupt task: {}", e),
- data: None,
- };
- }
- }
-
- // Update status
- let update_req = crate::db::models::UpdateTaskRequest {
- status: Some("paused".to_string()),
- version: Some(task.version),
- ..Default::default()
- };
-
- if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await {
- state.broadcast_task_update(TaskUpdateNotification {
- task_id,
- owner_id: Some(task.owner_id),
- version: updated.version,
- status: "paused".to_string(),
- updated_fields: vec!["status".to_string()],
- updated_by: "system".to_string(),
- });
- }
-
- MeshRequestResult {
- success: true,
- message: format!(
- "Task {} {}interrupted",
- task_id,
- if graceful { "gracefully " } else { "" }
- ),
- data: Some(json!({ "taskId": task_id, "status": "paused" })),
- }
- }
-
- MeshToolRequest::DiscardTask { task_id } => {
- match repository::delete_task_for_owner(pool, task_id, owner_id).await {
- Ok(true) => MeshRequestResult {
- success: true,
- message: format!("Task {} discarded", task_id),
- data: Some(json!({ "taskId": task_id, "deleted": true })),
- },
- Ok(false) => MeshRequestResult {
- success: false,
- message: format!("Task {} not found", task_id),
- data: None,
- },
- Err(e) => MeshRequestResult {
- success: false,
- message: format!("Failed to delete task: {}", e),
- data: None,
- },
- }
- }
-
- MeshToolRequest::QueryTaskStatus { task_id } => {
- match repository::get_task_for_owner(pool, task_id, owner_id).await {
- Ok(Some(task)) => MeshRequestResult {
- success: true,
- message: format!("Task '{}' is {}", task.name, task.status),
- data: Some(json!({
- "taskId": task.id,
- "name": task.name,
- "status": task.status,
- "priority": task.priority,
- "description": task.description,
- "plan": task.plan,
- "progressSummary": task.progress_summary,
- "errorMessage": task.error_message,
- "repositoryUrl": task.repository_url,
- "baseBranch": task.base_branch,
- "targetBranch": task.target_branch,
- "mergeMode": task.merge_mode,
- "prUrl": task.pr_url,
- "daemonId": task.daemon_id,
- "containerId": task.container_id,
- "createdAt": task.created_at,
- "startedAt": task.started_at,
- "completedAt": task.completed_at,
- })),
- },
- Ok(None) => MeshRequestResult {
- success: false,
- message: format!("Task {} not found", task_id),
- data: None,
- },
- Err(e) => MeshRequestResult {
- success: false,
- message: format!("Database error: {}", e),
- data: None,
- },
- }
- }
-
- MeshToolRequest::ListTasks {
- status_filter,
- parent_task_id,
- } => {
- // TODO: Add filtering support to repository
- match repository::list_tasks_for_owner(pool, owner_id).await {
- Ok(mut tasks) => {
- // Apply filters
- if let Some(ref status) = status_filter {
- tasks.retain(|t| &t.status == status);
- }
- if let Some(ref parent_id) = parent_task_id {
- tasks.retain(|t| t.parent_task_id.as_ref() == Some(parent_id));
- }
-
- let task_data: Vec<serde_json::Value> = tasks
- .iter()
- .map(|t| {
- json!({
- "taskId": t.id,
- "name": t.name,
- "status": t.status,
- "priority": t.priority,
- "parentTaskId": t.parent_task_id,
- })
- })
- .collect();
-
- MeshRequestResult {
- success: true,
- message: format!("Found {} tasks", tasks.len()),
- data: Some(json!({ "tasks": task_data })),
- }
- }
- Err(e) => MeshRequestResult {
- success: false,
- message: format!("Database error: {}", e),
- data: None,
- },
- }
- }
-
- MeshToolRequest::ListSubtasks { task_id } => {
- match repository::list_subtasks_for_owner(pool, task_id, owner_id).await {
- Ok(subtasks) => {
- let subtask_data: Vec<serde_json::Value> = subtasks
- .iter()
- .map(|t| {
- json!({
- "taskId": t.id,
- "name": t.name,
- "status": t.status,
- "priority": t.priority,
- })
- })
- .collect();
-
- MeshRequestResult {
- success: true,
- message: format!("Found {} subtasks", subtasks.len()),
- data: Some(json!({ "subtasks": subtask_data })),
- }
- }
- Err(e) => MeshRequestResult {
- success: false,
- message: format!("Database error: {}", e),
- data: None,
- },
- }
- }
-
- MeshToolRequest::ListSiblings { task_id } => {
- // Get task to find parent
- let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
- Ok(Some(t)) => t,
- Ok(None) => {
- return MeshRequestResult {
- success: false,
- message: format!("Task {} not found", task_id),
- data: None,
- }
- }
- Err(e) => {
- return MeshRequestResult {
- success: false,
- message: format!("Database error: {}", e),
- data: None,
- }
- }
- };
-
- let Some(parent_id) = task.parent_task_id else {
- return MeshRequestResult {
- success: true,
- message: "Task has no parent, so no siblings".to_string(),
- data: Some(json!({ "siblings": [] })),
- };
- };
-
- // Get all subtasks of parent, excluding current task
- match repository::list_subtasks_for_owner(pool, parent_id, owner_id).await {
- Ok(siblings) => {
- let sibling_data: Vec<serde_json::Value> = siblings
- .iter()
- .filter(|t| t.id != task_id)
- .map(|t| {
- json!({
- "taskId": t.id,
- "name": t.name,
- "status": t.status,
- "priority": t.priority,
- })
- })
- .collect();
-
- MeshRequestResult {
- success: true,
- message: format!("Found {} sibling tasks", sibling_data.len()),
- data: Some(json!({ "siblings": sibling_data })),
- }
- }
- Err(e) => MeshRequestResult {
- success: false,
- message: format!("Database error: {}", e),
- data: None,
- },
- }
- }
-
- MeshToolRequest::ListDaemons => {
- // Only list daemons belonging to this owner
- let daemons: Vec<serde_json::Value> = state
- .daemon_connections
- .iter()
- .filter(|entry| entry.value().owner_id == owner_id)
- .map(|entry| {
- let d = entry.value();
- json!({
- "daemonId": d.id,
- "connectionId": d.connection_id,
- "hostname": d.hostname,
- "machineId": d.machine_id,
- })
- })
- .collect();
-
- MeshRequestResult {
- success: true,
- message: format!("{} daemon(s) connected", daemons.len()),
- data: Some(json!({ "daemons": daemons })),
- }
- }
-
- MeshToolRequest::ListDaemonDirectories => {
- let mut directories: Vec<serde_json::Value> = Vec::new();
-
- // Only list directories from daemons belonging to this owner
- for entry in state.daemon_connections.iter() {
- let daemon = entry.value();
-
- // Only include daemons belonging to this owner
- if daemon.owner_id != owner_id {
- continue;
- }
-
- // Add working directory if available
- if let Some(ref working_dir) = daemon.working_directory {
- directories.push(json!({
- "path": working_dir,
- "label": "Working Directory",
- "directoryType": "working",
- "hostname": daemon.hostname,
- }));
- }
-
- // Add home directory if available
- if let Some(ref home_dir) = daemon.home_directory {
- directories.push(json!({
- "path": home_dir,
- "label": "Makima Home",
- "directoryType": "home",
- "hostname": daemon.hostname,
- }));
- }
- }
-
- MeshRequestResult {
- success: true,
- message: format!("Found {} available directories", directories.len()),
- data: Some(json!({ "directories": directories })),
- }
- }
-
- MeshToolRequest::ListFiles => {
- match repository::list_files_for_owner(pool, owner_id).await {
- Ok(files) => {
- let file_data: Vec<serde_json::Value> = files
- .iter()
- .map(|f| {
- json!({
- "fileId": f.id,
- "name": f.name,
- "description": f.description,
- "createdAt": f.created_at,
- "updatedAt": f.updated_at,
- })
- })
- .collect();
-
- MeshRequestResult {
- success: true,
- message: format!("Found {} files", files.len()),
- data: Some(json!({ "files": file_data })),
- }
- }
- Err(e) => MeshRequestResult {
- success: false,
- message: format!("Database error: {}", e),
- data: None,
- },
- }
- }
-
- MeshToolRequest::ReadFile { file_id } => {
- match repository::get_file_for_owner(pool, file_id, owner_id).await {
- Ok(Some(file)) => {
- // Convert body elements to readable text
- let body_content: Vec<serde_json::Value> = file
- .body
- .iter()
- .map(|elem| {
- match elem {
- crate::db::models::BodyElement::Heading { level, text } => {
- json!({ "type": "heading", "level": level, "text": text })
- }
- crate::db::models::BodyElement::Paragraph { text } => {
- json!({ "type": "paragraph", "text": text })
- }
- crate::db::models::BodyElement::Code { language, content } => {
- json!({ "type": "code", "language": language, "content": content })
- }
- crate::db::models::BodyElement::List { ordered, items } => {
- json!({ "type": "list", "ordered": ordered, "items": items })
- }
- crate::db::models::BodyElement::Chart { chart_type, title, data, config: _ } => {
- let data_count = data.as_array().map(|arr| arr.len()).unwrap_or(0);
- json!({ "type": "chart", "chartType": chart_type, "title": title, "dataPoints": data_count })
- }
- crate::db::models::BodyElement::Image { src, alt, caption } => {
- json!({ "type": "image", "src": src, "alt": alt, "caption": caption })
- }
- crate::db::models::BodyElement::Markdown { content } => {
- json!({ "type": "markdown", "content": content })
- }
- }
- })
- .collect();
-
- // Also build a plain text version for easier reading
- let plain_text: String = file
- .body
- .iter()
- .filter_map(|elem| {
- match elem {
- crate::db::models::BodyElement::Heading { level, text } => {
- Some(format!("{} {}", "#".repeat(*level as usize), text))
- }
- crate::db::models::BodyElement::Paragraph { text } => {
- Some(text.clone())
- }
- crate::db::models::BodyElement::Code { language, content } => {
- let lang = language.as_deref().unwrap_or("");
- Some(format!("```{}\n{}\n```", lang, content))
- }
- crate::db::models::BodyElement::List { ordered, items } => {
- let list_text: Vec<String> = items.iter().enumerate().map(|(i, item)| {
- if *ordered {
- format!("{}. {}", i + 1, item)
- } else {
- format!("- {}", item)
- }
- }).collect();
- Some(list_text.join("\n"))
- }
- crate::db::models::BodyElement::Markdown { content } => {
- Some(content.clone())
- }
- _ => None,
- }
- })
- .collect::<Vec<_>>()
- .join("\n\n");
-
- // Convert transcript entries to JSON
- let transcript: Vec<serde_json::Value> = file
- .transcript
- .iter()
- .map(|entry| {
- json!({
- "id": entry.id,
- "speaker": entry.speaker,
- "start": entry.start,
- "end": entry.end,
- "text": entry.text,
- })
- })
- .collect();
-
- // Build a plain text transcript for easier reading
- let transcript_text: String = file
- .transcript
- .iter()
- .map(|entry| {
- format!("[{:.1}s] {}: {}", entry.start, entry.speaker, entry.text)
- })
- .collect::<Vec<_>>()
- .join("\n");
-
- MeshRequestResult {
- success: true,
- message: format!("Read file '{}'", file.name),
- data: Some(json!({
- "fileId": file.id,
- "name": file.name,
- "description": file.description,
- "summary": file.summary,
- "body": body_content,
- "plainText": plain_text,
- "transcript": transcript,
- "transcriptText": transcript_text,
- "transcriptCount": file.transcript.len(),
- "createdAt": file.created_at,
- "updatedAt": file.updated_at,
- })),
- }
- }
- Ok(None) => MeshRequestResult {
- success: false,
- message: format!("File {} not found", file_id),
- data: None,
- },
- Err(e) => MeshRequestResult {
- success: false,
- message: format!("Database error: {}", e),
- data: None,
- },
- }
- }
-
- MeshToolRequest::SendMessageToTask { task_id, message } => {
- let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
- Ok(Some(t)) => t,
- Ok(None) => {
- return MeshRequestResult {
- success: false,
- message: format!("Task {} not found", task_id),
- data: None,
- }
- }
- Err(e) => {
- return MeshRequestResult {
- success: false,
- message: format!("Database error: {}", e),
- data: None,
- }
- }
- };
-
- if task.status != "running" {
- return MeshRequestResult {
- success: false,
- message: format!("Task is not running (status: {})", task.status),
- data: None,
- };
- }
-
- if let Some(daemon_id) = task.daemon_id {
- let command = DaemonCommand::SendMessage { task_id, message };
- match state.send_daemon_command(daemon_id, command).await {
- Ok(()) => MeshRequestResult {
- success: true,
- message: "Message sent to task".to_string(),
- data: Some(json!({ "taskId": task_id })),
- },
- Err(e) => MeshRequestResult {
- success: false,
- message: format!("Failed to send message: {}", e),
- data: None,
- },
- }
- } else {
- MeshRequestResult {
- success: false,
- message: "Task has no daemon assigned".to_string(),
- data: None,
- }
- }
- }
-
- MeshToolRequest::UpdateTaskPlan {
- task_id,
- new_plan,
- interrupt_if_running,
- } => {
- let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
- Ok(Some(t)) => t,
- Ok(None) => {
- return MeshRequestResult {
- success: false,
- message: format!("Task {} not found", task_id),
- data: None,
- }
- }
- Err(e) => {
- return MeshRequestResult {
- success: false,
- message: format!("Database error: {}", e),
- data: None,
- }
- }
- };
-
- // Interrupt if running and requested
- if task.status == "running" && interrupt_if_running {
- if let Some(daemon_id) = task.daemon_id {
- let command = DaemonCommand::InterruptTask {
- task_id,
- graceful: true,
- };
- let _ = state.send_daemon_command(daemon_id, command).await;
- }
- }
-
- let update_req = crate::db::models::UpdateTaskRequest {
- plan: Some(new_plan),
- version: Some(task.version),
- ..Default::default()
- };
-
- match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await {
- Ok(Some(updated)) => {
- state.broadcast_task_update(TaskUpdateNotification {
- task_id,
- owner_id: Some(task.owner_id),
- version: updated.version,
- status: updated.status.clone(),
- updated_fields: vec!["plan".to_string()],
- updated_by: "system".to_string(),
- });
- MeshRequestResult {
- success: true,
- message: "Task plan updated".to_string(),
- data: Some(json!({ "taskId": task_id })),
- }
- }
- Ok(None) => MeshRequestResult {
- success: false,
- message: "Task not found".to_string(),
- data: None,
- },
- Err(e) => MeshRequestResult {
- success: false,
- message: format!("Failed to update task: {}", e),
- data: None,
- },
- }
- }
-
- // Overlay operations - these require daemon communication
- // For now, return placeholder responses since daemon implementation is separate
- MeshToolRequest::PeekSiblingOverlay { sibling_task_id } => MeshRequestResult {
- success: false,
- message: format!(
- "Overlay operations require a connected daemon. Task {} may not have overlay data yet.",
- sibling_task_id
- ),
- data: None,
- },
-
- MeshToolRequest::GetOverlayDiff { task_id } => MeshRequestResult {
- success: false,
- message: format!(
- "Overlay operations require a connected daemon. Task {} may not have overlay data yet.",
- task_id
- ),
- data: None,
- },
-
- MeshToolRequest::PreviewMerge { task_id } => MeshRequestResult {
- success: false,
- message: format!(
- "Merge preview requires a connected daemon. Task {} may not have overlay data yet.",
- task_id
- ),
- data: None,
- },
-
- MeshToolRequest::MergeSubtask { task_id } => MeshRequestResult {
- success: false,
- message: format!(
- "Merge operations require a connected daemon. Task {}",
- task_id
- ),
- data: None,
- },
-
- MeshToolRequest::CompleteTask { task_id } => {
- let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
- Ok(Some(t)) => t,
- Ok(None) => {
- return MeshRequestResult {
- success: false,
- message: format!("Task {} not found", task_id),
- data: None,
- }
- }
- Err(e) => {
- return MeshRequestResult {
- success: false,
- message: format!("Database error: {}", e),
- data: None,
- }
- }
- };
-
- // Update status to done
- let update_req = crate::db::models::UpdateTaskRequest {
- status: Some("done".to_string()),
- version: Some(task.version),
- ..Default::default()
- };
-
- match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await {
- Ok(Some(updated)) => {
- state.broadcast_task_update(TaskUpdateNotification {
- task_id,
- owner_id: Some(task.owner_id),
- version: updated.version,
- status: "done".to_string(),
- updated_fields: vec!["status".to_string()],
- updated_by: "system".to_string(),
- });
- let merge_mode = task.merge_mode.unwrap_or_else(|| "pr".to_string());
- MeshRequestResult {
- success: true,
- message: format!(
- "Task {} completed. Merge mode: {}",
- task_id,
- &merge_mode
- ),
- data: Some(json!({
- "taskId": task_id,
- "status": "done",
- "mergeMode": merge_mode,
- })),
- }
- }
- Ok(None) => MeshRequestResult {
- success: false,
- message: "Task not found".to_string(),
- data: None,
- },
- Err(e) => MeshRequestResult {
- success: false,
- message: format!("Failed to complete task: {}", e),
- data: None,
- },
- }
- }
-
- MeshToolRequest::SetMergeMode { task_id, mode } => {
- let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
- Ok(Some(t)) => t,
- Ok(None) => {
- return MeshRequestResult {
- success: false,
- message: format!("Task {} not found", task_id),
- data: None,
- }
- }
- Err(e) => {
- return MeshRequestResult {
- success: false,
- message: format!("Database error: {}", e),
- data: None,
- }
- }
- };
-
- let update_req = crate::db::models::UpdateTaskRequest {
- merge_mode: Some(mode.clone()),
- version: Some(task.version),
- ..Default::default()
- };
-
- match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await {
- Ok(Some(updated)) => {
- state.broadcast_task_update(TaskUpdateNotification {
- task_id,
- owner_id: Some(task.owner_id),
- version: updated.version,
- status: updated.status,
- updated_fields: vec!["merge_mode".to_string()],
- updated_by: "system".to_string(),
- });
- MeshRequestResult {
- success: true,
- message: format!("Merge mode set to '{}'", mode),
- data: Some(json!({ "taskId": task_id, "mergeMode": mode })),
- }
- }
- Ok(None) => MeshRequestResult {
- success: false,
- message: "Task not found".to_string(),
- data: None,
- },
- Err(e) => MeshRequestResult {
- success: false,
- message: format!("Failed to update merge mode: {}", e),
- data: None,
- },
- }
- }
-
- // Supervisor-only tools - these should be handled via the supervisor.sh script,
- // not through the mesh chat. Return an informative error.
- MeshToolRequest::GetAllContractTasks { contract_id } => {
- MeshRequestResult {
- success: false,
- message: format!(
- "get_all_contract_tasks is a supervisor-only tool. Use supervisor.sh to access this functionality. Contract: {}",
- contract_id
- ),
- data: None,
- }
- }
- MeshToolRequest::WaitForTaskCompletion { task_id, timeout_seconds } => {
- MeshRequestResult {
- success: false,
- message: format!(
- "wait_for_task_completion is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}, Timeout: {}s",
- task_id, timeout_seconds
- ),
- data: None,
- }
- }
- MeshToolRequest::ReadTaskWorktree { task_id, file_path } => {
- MeshRequestResult {
- success: false,
- message: format!(
- "read_task_worktree is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}, Path: {}",
- task_id, file_path
- ),
- data: None,
- }
- }
- MeshToolRequest::SpawnTask { name, .. } => {
- MeshRequestResult {
- success: false,
- message: format!(
- "spawn_task is a supervisor-only tool. Only the contract supervisor can spawn new tasks. Task name: {}",
- name
- ),
- data: None,
- }
- }
- MeshToolRequest::CreateCheckpoint { task_id, message } => {
- MeshRequestResult {
- success: false,
- message: format!(
- "create_checkpoint is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}, Message: {}",
- task_id, message
- ),
- data: None,
- }
- }
- MeshToolRequest::ListTaskCheckpoints { task_id } => {
- MeshRequestResult {
- success: false,
- message: format!(
- "list_task_checkpoints is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}",
- task_id
- ),
- data: None,
- }
- }
- MeshToolRequest::GetTaskTree { task_id } => {
- MeshRequestResult {
- success: false,
- message: format!(
- "get_task_tree is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}",
- task_id
- ),
- data: None,
- }
- }
- }
-}
-
-// =============================================================================
-// Chat History Endpoints
-// =============================================================================
-
-use crate::db::models::MeshChatHistoryResponse;
-
-/// Get chat history for the current conversation (requires authentication)
-#[utoipa::path(
- get,
- path = "/api/v1/mesh/chat/history",
- responses(
- (status = 200, description = "Chat history", body = MeshChatHistoryResponse),
- (status = 401, description = "Unauthorized"),
- (status = 503, description = "Database not configured"),
- (status = 500, description = "Internal server error")
- ),
- security(
- ("bearer_auth" = []),
- ("api_key" = [])
- ),
- tag = "Mesh"
-)]
-pub async fn get_chat_history(
- State(state): State<SharedState>,
- Authenticated(auth): Authenticated,
-) -> impl IntoResponse {
- let Some(ref pool) = state.db_pool else {
- return (
- StatusCode::SERVICE_UNAVAILABLE,
- Json(json!({ "error": "Database not configured" })),
- )
- .into_response();
- };
-
- let conversation = match repository::get_or_create_active_conversation(pool, auth.owner_id).await {
- Ok(c) => c,
- Err(e) => {
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(json!({ "error": e.to_string() })),
- )
- .into_response()
- }
- };
-
- let messages = match repository::list_chat_messages(pool, conversation.id, None).await {
- Ok(m) => m,
- Err(e) => {
- return (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(json!({ "error": e.to_string() })),
- )
- .into_response()
- }
- };
-
- (
- StatusCode::OK,
- Json(MeshChatHistoryResponse {
- conversation_id: conversation.id,
- messages,
- }),
- )
- .into_response()
-}
-
-/// Clear chat history (archives current conversation and starts new, requires authentication)
-#[utoipa::path(
- delete,
- path = "/api/v1/mesh/chat/history",
- responses(
- (status = 200, description = "History cleared"),
- (status = 401, description = "Unauthorized"),
- (status = 503, description = "Database not configured"),
- (status = 500, description = "Internal server error")
- ),
- security(
- ("bearer_auth" = []),
- ("api_key" = [])
- ),
- tag = "Mesh"
-)]
-pub async fn clear_chat_history(
- State(state): State<SharedState>,
- Authenticated(auth): Authenticated,
-) -> impl IntoResponse {
- let Some(ref pool) = state.db_pool else {
- return (
- StatusCode::SERVICE_UNAVAILABLE,
- Json(json!({ "error": "Database not configured" })),
- )
- .into_response();
- };
-
- match repository::clear_conversation(pool, auth.owner_id).await {
- Ok(new_conv) => (
- StatusCode::OK,
- Json(json!({ "success": true, "conversationId": new_conv.id })),
- )
- .into_response(),
- Err(e) => (
- StatusCode::INTERNAL_SERVER_ERROR,
- Json(json!({ "error": e.to_string() })),
- )
- .into_response(),
- }
-}