diff options
Diffstat (limited to 'makima/src/server')
| -rw-r--r-- | makima/src/server/handlers/chat.rs | 1210 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_chat.rs | 2264 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 96 | ||||
| -rw-r--r-- | makima/src/server/handlers/mod.rs | 7 | ||||
| -rw-r--r-- | makima/src/server/handlers/templates.rs | 43 | ||||
| -rw-r--r-- | makima/src/server/mod.rs | 11 | ||||
| -rw-r--r-- | makima/src/server/openapi.rs | 6 |
7 files changed, 27 insertions, 3610 deletions
diff --git a/makima/src/server/handlers/chat.rs b/makima/src/server/handlers/chat.rs deleted file mode 100644 index 9d8cd19..0000000 --- a/makima/src/server/handlers/chat.rs +++ /dev/null @@ -1,1210 +0,0 @@ -//! Chat endpoint for LLM-powered file editing. - -use axum::{ - extract::{Path, State}, - http::StatusCode, - response::IntoResponse, - Json, -}; -use serde::{Deserialize, Serialize}; -use utoipa::ToSchema; -use uuid::Uuid; - -use crate::db::{models::BodyElement, repository::{self, RepositoryError}}; -use crate::llm::{ - claude::{self, ClaudeClient, ClaudeError, ClaudeModel}, - execute_tool_call, - groq::{GroqClient, GroqError, Message, ToolCallResponse}, - LlmModel, ToolCall, ToolResult, UserQuestion, VersionToolRequest, AVAILABLE_TOOLS, -}; -use crate::server::state::{FileUpdateNotification, SharedState}; - -/// Maximum number of tool-calling rounds to prevent infinite loops -const MAX_TOOL_ROUNDS: usize = 20; - -/// Context limits for different models (in tokens) -/// Claude models have 200K context, Groq models vary -const CLAUDE_CONTEXT_LIMIT: usize = 200_000; -const GROQ_CONTEXT_LIMIT: usize = 32_000; - -/// Threshold for triggering context compaction (90% of limit) -const CONTEXT_COMPACTION_THRESHOLD: f32 = 0.90; - -/// Approximate characters per token (rough estimate for English text) -const CHARS_PER_TOKEN: usize = 4; - -#[derive(Debug, Clone, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ChatHistoryMessage { - /// Role: "user" or "assistant" - pub role: String, - /// Message content - pub content: String, -} - -#[derive(Debug, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ChatRequest { - /// The user's message/instruction - pub message: String, - /// Optional model selection: "claude-sonnet" (default), "claude-opus", or "groq" - #[serde(default)] - pub model: Option<String>, - /// Optional conversation history for context continuity - #[serde(default)] - pub history: Option<Vec<ChatHistoryMessage>>, - /// Optional focused element index (for targeted editing) - #[serde(default)] - pub focused_element_index: Option<usize>, -} - -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ChatResponse { - /// The LLM's response message - pub response: String, - /// Tool calls that were executed - pub tool_calls: Vec<ToolCallInfo>, - /// Updated file body after tool execution - pub updated_body: Vec<BodyElement>, - /// Updated summary (if changed) - pub updated_summary: Option<String>, - /// Questions pending user answers (pauses conversation) - #[serde(skip_serializing_if = "Option::is_none")] - pub pending_questions: Option<Vec<UserQuestion>>, -} - -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ToolCallInfo { - pub name: String, - pub result: ToolResult, -} - -/// Enum to hold LLM clients -enum LlmClient { - Groq(GroqClient), - Claude(ClaudeClient), -} - -/// Unified result from LLM call -struct LlmResult { - content: Option<String>, - tool_calls: Vec<ToolCall>, - raw_tool_calls: Vec<ToolCallResponse>, - finish_reason: String, -} - -/// Chat with a file using LLM tool calling -#[utoipa::path( - post, - path = "/api/v1/files/{id}/chat", - request_body = ChatRequest, - responses( - (status = 200, description = "Chat completed successfully", body = ChatResponse), - (status = 404, description = "File not found"), - (status = 500, description = "Internal server error") - ), - params( - ("id" = Uuid, Path, description = "File ID") - ), - tag = "chat" -)] -pub async fn chat_handler( - State(state): State<SharedState>, - Path(id): Path<Uuid>, - Json(request): Json<ChatRequest>, -) -> impl IntoResponse { - // Check if database is configured - let Some(ref pool) = state.db_pool else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({ - "error": "Database not configured" - })), - ) - .into_response(); - }; - - // Get the file - let file = match repository::get_file(pool, id).await { - Ok(Some(file)) => file, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(serde_json::json!({ - "error": "File not found" - })), - ) - .into_response(); - } - Err(e) => { - tracing::error!("Database error: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ - "error": format!("Database error: {}", e) - })), - ) - .into_response(); - } - }; - - // Parse model selection (default to Claude Sonnet) - let model = request - .model - .as_ref() - .and_then(|m| LlmModel::from_str(m)) - .unwrap_or_default(); - - tracing::info!("Using LLM model: {:?}", model); - - // Initialize the appropriate LLM client - let llm_client = match model { - LlmModel::ClaudeSonnet => { - match ClaudeClient::from_env(ClaudeModel::Sonnet) { - Ok(client) => LlmClient::Claude(client), - Err(ClaudeError::MissingApiKey) => { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({ - "error": "ANTHROPIC_API_KEY not configured" - })), - ) - .into_response(); - } - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ - "error": format!("Claude client error: {}", e) - })), - ) - .into_response(); - } - } - } - LlmModel::ClaudeOpus => { - match ClaudeClient::from_env(ClaudeModel::Opus) { - Ok(client) => LlmClient::Claude(client), - Err(ClaudeError::MissingApiKey) => { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({ - "error": "ANTHROPIC_API_KEY not configured" - })), - ) - .into_response(); - } - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ - "error": format!("Claude client error: {}", e) - })), - ) - .into_response(); - } - } - } - LlmModel::GroqKimi => { - match GroqClient::from_env() { - Ok(client) => LlmClient::Groq(client), - Err(GroqError::MissingApiKey) => { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({ - "error": "GROQ_API_KEY not configured" - })), - ) - .into_response(); - } - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ - "error": format!("Groq client error: {}", e) - })), - ) - .into_response(); - } - } - } - }; - - // Build context about the file - let file_context = build_file_context(&file); - - // Build focused element context if specified - let focused_context = build_focused_element_context(&file.body, request.focused_element_index); - - // Build agentic system prompt - let system_prompt = format!( - r#"You are an intelligent document editing agent. You help users view, analyze, and modify document files. - -## Your Capabilities -You have access to tools for: -- **Viewing content**: view_body (see all elements), read_element (inspect specific element), view_transcript (read full transcript) -- **Adding content**: add_heading, add_paragraph, add_code, add_list, add_chart -- **Modifying content**: update_element, remove_element, reorder_elements, clear_body -- **Document metadata**: set_summary -- **Data processing**: parse_csv (convert CSV to JSON), jq (transform JSON data) -- **Version history**: list_versions, read_version, restore_version -- **Templates**: suggest_templates (get phase-appropriate templates), apply_template (apply a template structure) - -## Agentic Behavior Guidelines - -### 1. Analyze Before Acting -- For complex requests, first gather information using view_body, view_transcript, or read_element -- Understand the current state of the document before making changes -- For simple, direct requests (e.g., "add a heading called X"), you can act immediately without prior inspection - -### 2. Plan Multi-Step Operations -- Break complex tasks into logical steps -- For data visualization: parse_csv → (optionally jq to transform) → add_chart -- For restructuring: view_body → understand structure → make targeted changes - -### 3. Handle Errors Gracefully -- If a tool call fails, analyze the error message -- Try an alternative approach or different parameters -- Don't repeat the exact same failing call - -### 4. Know When to Stop -- Stop when you've completed the user's request -- Stop when you've provided the requested information -- Provide a clear summary of what you did in your final response - -### 5. Be Efficient -- Don't over-analyze simple requests -- Use the minimum number of tool calls needed -- Combine operations when possible - -## Current Document Context -{file_context} -{focused_context} -## Important Notes -- Body element indices are 0-based -- When updating elements, provide ALL required fields for that element type -- The transcript is read-only (you cannot modify it, only read it) -- Changes are saved automatically after tool execution"#, - file_context = file_context, - focused_context = focused_context - ); - - // Build initial messages (Groq/OpenAI format - will be converted for Claude) - let mut messages = vec![ - Message { - role: "system".to_string(), - content: Some(system_prompt), - tool_calls: None, - tool_call_id: None, - }, - ]; - - // Add conversation history if provided (for context continuity) - if let Some(history) = &request.history { - for hist_msg in history { - messages.push(Message { - role: hist_msg.role.clone(), - content: Some(hist_msg.content.clone()), - tool_calls: None, - tool_call_id: None, - }); - } - tracing::info!( - history_messages = history.len(), - "Loaded conversation history" - ); - } - - // Add current user message - messages.push(Message { - role: "user".to_string(), - content: Some(request.message.clone()), - tool_calls: None, - tool_call_id: None, - }); - - // State for tracking changes - let mut current_body = file.body.clone(); - let mut current_summary = file.summary.clone(); - let mut all_tool_call_infos: Vec<ToolCallInfo> = Vec::new(); - let mut final_response: Option<String> = None; - // Track if a version restore already happened (to avoid double-saving) - let mut version_restored = false; - // Track if there were modifications after a restore - let mut has_changes_after_restore = false; - // Track consecutive failures for agentic retry logic - let mut consecutive_failures = 0; - const MAX_CONSECUTIVE_FAILURES: usize = 3; - // Track pending user questions (pauses the conversation) - let mut pending_questions: Option<Vec<UserQuestion>> = None; - - // Multi-turn agentic tool calling loop - for round in 0..MAX_TOOL_ROUNDS { - tracing::info!( - round = round, - body_elements = current_body.len(), - total_tool_calls = all_tool_call_infos.len(), - "Agentic loop iteration" - ); - - // Check if we've hit too many consecutive failures - if consecutive_failures >= MAX_CONSECUTIVE_FAILURES { - tracing::warn!("Breaking loop due to {} consecutive failures", consecutive_failures); - final_response = Some(format!( - "I encountered multiple consecutive errors and stopped to avoid an infinite loop. \ - Please try rephrasing your request or check if the document state is as expected." - )); - break; - } - - // Check context usage and compact if nearing limit - if is_context_near_limit(&messages, &model) { - let estimated_tokens = estimate_total_tokens(&messages); - tracing::warn!( - estimated_tokens = estimated_tokens, - round = round, - "Context nearing limit, compacting conversation" - ); - compact_conversation(&mut messages, &all_tool_call_infos); - - // Log the new token count - let new_tokens = estimate_total_tokens(&messages); - tracing::info!( - tokens_before = estimated_tokens, - tokens_after = new_tokens, - tokens_saved = estimated_tokens - new_tokens, - "Conversation compacted" - ); - } - - // Call the appropriate LLM API - let result = match &llm_client { - LlmClient::Groq(groq) => { - match groq.chat_with_tools(messages.clone(), &AVAILABLE_TOOLS).await { - Ok(r) => LlmResult { - content: r.content, - tool_calls: r.tool_calls, - raw_tool_calls: r.raw_tool_calls, - finish_reason: r.finish_reason, - }, - Err(e) => { - tracing::error!("Groq API error: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ - "error": format!("LLM API error: {}", e) - })), - ) - .into_response(); - } - } - } - LlmClient::Claude(claude_client) => { - // Convert messages to Claude format - let claude_messages = claude::groq_messages_to_claude(&messages); - match claude_client.chat_with_tools(claude_messages, &AVAILABLE_TOOLS).await { - Ok(r) => { - // Convert Claude tool uses to Groq-style ToolCallResponse for consistency - let raw_tool_calls: Vec<ToolCallResponse> = r - .tool_calls - .iter() - .map(|tc| ToolCallResponse { - id: tc.id.clone(), - call_type: "function".to_string(), - function: crate::llm::groq::FunctionCall { - name: tc.name.clone(), - arguments: tc.arguments.to_string(), - }, - }) - .collect(); - - LlmResult { - content: r.content, - tool_calls: r.tool_calls, - raw_tool_calls, - finish_reason: r.stop_reason, - } - } - Err(e) => { - tracing::error!("Claude API error: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ - "error": format!("LLM API error: {}", e) - })), - ) - .into_response(); - } - } - } - }; - - // Check if there are tool calls to execute - if result.tool_calls.is_empty() { - // No more tool calls - capture the final response and exit loop - final_response = result.content; - break; - } - - // Add assistant message with tool calls to conversation - messages.push(Message { - role: "assistant".to_string(), - content: result.content.clone(), - tool_calls: Some(result.raw_tool_calls.clone()), - tool_call_id: None, - }); - - // Execute each tool call and add results to conversation - for (i, tool_call) in result.tool_calls.iter().enumerate() { - tracing::info!( - tool = %tool_call.name, - round = round, - "Executing tool call" - ); - - let mut execution_result = - execute_tool_call(tool_call, ¤t_body, current_summary.as_deref(), &file.transcript); - - // Handle version tool requests that need async database access - if let Some(version_request) = &execution_result.version_request { - let version_result = handle_version_request( - pool, - id, - version_request, - ¤t_body, - current_summary.as_deref(), - file.version, - ) - .await; - - // Update execution result with actual version operation result - execution_result.result = version_result.result; - execution_result.parsed_data = version_result.data; - - // Apply state changes from restore operation - if let Some(new_body) = version_result.new_body { - current_body = new_body; - // Mark that a restore happened - file was already saved - version_restored = true; - } - if let Some(new_summary) = version_result.new_summary { - current_summary = Some(new_summary); - } - } - - // Apply state changes from regular tools - if let Some(new_body) = execution_result.new_body { - current_body = new_body; - // If this is a regular tool (not a version operation), track it - if execution_result.version_request.is_none() && version_restored { - has_changes_after_restore = true; - } - } - if let Some(new_summary) = execution_result.new_summary { - current_summary = Some(new_summary); - if execution_result.version_request.is_none() && version_restored { - has_changes_after_restore = true; - } - } - - // Track consecutive failures for agentic behavior - if execution_result.result.success { - consecutive_failures = 0; - } else { - consecutive_failures += 1; - tracing::warn!( - tool = %tool_call.name, - consecutive_failures = consecutive_failures, - "Tool call failed" - ); - } - - // Check for pending user questions (pauses the conversation) - if let Some(questions) = execution_result.pending_questions { - tracing::info!( - question_count = questions.len(), - "LLM requesting user input, pausing conversation" - ); - pending_questions = Some(questions); - // Track this tool call before breaking - all_tool_call_infos.push(ToolCallInfo { - name: tool_call.name.clone(), - result: execution_result.result, - }); - break; // Exit inner loop - } - - // Build tool result message content with enhanced context for agentic reasoning - let result_content = if let Some(parsed_data) = &execution_result.parsed_data { - // Include parsed data in the result for the LLM to use - serde_json::json!({ - "success": execution_result.result.success, - "message": execution_result.result.message, - "data": parsed_data - }) - .to_string() - } else if !execution_result.result.success { - // On failure, include hints for the LLM - let hint = if consecutive_failures >= MAX_CONSECUTIVE_FAILURES { - " [HINT: Multiple consecutive failures detected. Consider a different approach or verify your parameters.]" - } else { - "" - }; - serde_json::json!({ - "success": false, - "message": format!("{}{}", execution_result.result.message, hint), - "currentBodyElementCount": current_body.len() - }) - .to_string() - } else { - serde_json::json!({ - "success": execution_result.result.success, - "message": execution_result.result.message - }) - .to_string() - }; - - // Add tool result message - // Use the appropriate ID format for each provider - let tool_call_id = match &llm_client { - LlmClient::Groq(_) => result.raw_tool_calls[i].id.clone(), - LlmClient::Claude(_) => tool_call.id.clone(), - }; - - messages.push(Message { - role: "tool".to_string(), - content: Some(result_content), - tool_calls: None, - tool_call_id: Some(tool_call_id), - }); - - // Track for response - all_tool_call_infos.push(ToolCallInfo { - name: tool_call.name.clone(), - result: execution_result.result, - }); - } - - // If user questions are pending, pause the conversation - if pending_questions.is_some() { - final_response = result.content; - break; - } - - // If finish reason indicates completion, exit loop - let finish_lower = result.finish_reason.to_lowercase(); - if finish_lower == "stop" || finish_lower == "end_turn" { - final_response = result.content; - break; - } - } - - // Save changes to database if any tools were executed - // Skip if a version restore already happened (file was already saved during restore) - // UNLESS there were additional modifications after the restore - if !all_tool_call_infos.is_empty() && (!version_restored || has_changes_after_restore) { - let update_req = crate::db::models::UpdateFileRequest { - name: None, - description: None, - transcript: None, - summary: current_summary.clone(), - body: Some(current_body.clone()), - version: None, // Internal update, skip version check - repo_file_path: None, - }; - - match repository::update_file(pool, id, update_req).await { - Ok(Some(updated_file)) => { - // Broadcast update notification for LLM changes - let mut updated_fields = vec!["body".to_string()]; - if current_summary.is_some() { - updated_fields.push("summary".to_string()); - } - state.broadcast_file_update(FileUpdateNotification { - file_id: id, - version: updated_file.version, - updated_fields, - updated_by: "llm".to_string(), - }); - } - Ok(None) => { - // File was deleted during processing - return ( - StatusCode::NOT_FOUND, - Json(serde_json::json!({ - "error": "File not found" - })), - ) - .into_response(); - } - Err(e) => { - tracing::error!("Failed to save file changes: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ - "error": format!("Failed to save changes: {}", e) - })), - ) - .into_response(); - } - } - } - - // Build response - let response_text = final_response.unwrap_or_else(|| { - if all_tool_call_infos.is_empty() { - "I couldn't understand your request. Please try rephrasing.".to_string() - } else { - format!( - "Done! Executed {} tool{}.", - all_tool_call_infos.len(), - if all_tool_call_infos.len() == 1 { "" } else { "s" } - ) - } - }); - - ( - StatusCode::OK, - Json(ChatResponse { - response: response_text, - tool_calls: all_tool_call_infos, - updated_body: current_body, - updated_summary: current_summary, - pending_questions, - }), - ) - .into_response() -} - -fn build_file_context(file: &crate::db::models::File) -> String { - let mut context = format!("File: {}\n", file.name); - - if let Some(ref desc) = file.description { - context.push_str(&format!("Description: {}\n", desc)); - } - - if let Some(ref summary) = file.summary { - context.push_str(&format!("Summary: {}\n", summary)); - } - - // Include contract phase context if file belongs to a contract - if let Some(ref phase) = file.contract_phase { - context.push_str(&format!("\n## Contract Context\n")); - context.push_str(&format!("This file belongs to a contract in the '{}' phase.\n", phase)); - context.push_str("You can use 'suggest_templates' to get phase-appropriate templates, "); - context.push_str("or 'apply_template' to apply a template structure.\n"); - context.push_str(&format!( - "Templates for '{}' phase include: {}\n", - phase, - match phase.as_str() { - "research" => "research-notes, competitor-analysis, user-research", - "specify" => "requirements, user-stories, acceptance-criteria", - "plan" => "architecture, technical-design, task-breakdown", - "execute" => "dev-notes, test-plan, implementation-log", - "review" => "review-checklist, release-notes, retrospective", - _ => "(use suggest_templates to see available)", - } - )); - } - - context.push_str(&format!("\nTranscript entries: {}\n", file.transcript.len())); - context.push_str(&format!("Body elements: {}\n", file.body.len())); - - // Add body overview - if !file.body.is_empty() { - context.push_str("\nCurrent body elements:\n"); - for (i, element) in file.body.iter().enumerate() { - let desc = match element { - BodyElement::Heading { level, text } => format!("H{}: {}", level, text), - BodyElement::Paragraph { text } => { - let preview: String = text.chars().take(50).collect(); - if text.chars().count() > 50 { - format!("Paragraph: {}...", preview) - } else { - format!("Paragraph: {}", preview) - } - } - BodyElement::Code { language, content } => { - let lang = language.as_deref().unwrap_or("plain"); - let preview: String = content.chars().take(50).collect(); - if content.chars().count() > 50 { - format!("Code ({}): {}...", lang, preview) - } else { - format!("Code ({}): {}", lang, preview) - } - } - BodyElement::List { ordered, items } => { - let list_type = if *ordered { "ordered" } else { "unordered" }; - format!("List ({}): {} items", list_type, items.len()) - } - BodyElement::Chart { chart_type, title, .. } => { - format!( - "Chart ({:?}){}", - chart_type, - title.as_ref().map(|t| format!(": {}", t)).unwrap_or_default() - ) - } - BodyElement::Image { alt, .. } => { - format!("Image{}", alt.as_ref().map(|a| format!(": {}", a)).unwrap_or_default()) - } - BodyElement::Markdown { content } => { - let preview: String = content.chars().take(50).collect(); - if content.chars().count() > 50 { - format!("Markdown: {}...", preview) - } else { - format!("Markdown: {}", preview) - } - } - }; - context.push_str(&format!(" [{}] {}\n", i, desc)); - } - } - - // Add transcript preview if available - if !file.transcript.is_empty() { - context.push_str("\nTranscript preview (first 5 entries):\n"); - for entry in file.transcript.iter().take(5) { - context.push_str(&format!(" - {}: {}\n", entry.speaker, entry.text)); - } - if file.transcript.len() > 5 { - context.push_str(&format!(" ... and {} more entries\n", file.transcript.len() - 5)); - } - } - - context -} - -/// Build context for a focused element -fn build_focused_element_context(body: &[BodyElement], focused_index: Option<usize>) -> String { - let Some(index) = focused_index else { - return String::new(); - }; - - let Some(element) = body.get(index) else { - return format!( - "\n## Focused Element\nNote: User focused on element [{}] but it doesn't exist (document has {} elements).\n", - index, - body.len() - ); - }; - - let (element_type, full_content) = match element { - BodyElement::Heading { level, text } => { - (format!("Heading (level {})", level), text.clone()) - } - BodyElement::Paragraph { text } => { - ("Paragraph".to_string(), text.clone()) - } - BodyElement::Code { language, content } => { - let lang = language.as_deref().unwrap_or("plain"); - (format!("Code ({})", lang), content.clone()) - } - BodyElement::List { ordered, items } => { - let list_type = if *ordered { "Ordered list" } else { "Unordered list" }; - let content = items.iter() - .enumerate() - .map(|(i, item)| format!("{}. {}", i + 1, item)) - .collect::<Vec<_>>() - .join("\n"); - (list_type.to_string(), content) - } - BodyElement::Chart { chart_type, title, .. } => { - let title_str = title.as_deref().unwrap_or("untitled"); - (format!("Chart ({:?})", chart_type), title_str.to_string()) - } - BodyElement::Image { alt, caption, .. } => { - let desc = alt.as_deref().or(caption.as_deref()).unwrap_or("no description"); - ("Image".to_string(), desc.to_string()) - } - BodyElement::Markdown { content } => { - ("Markdown".to_string(), content.clone()) - } - }; - - format!( - r#" -## Focused Element -The user is focusing on element [{}]: {} -Full content of focused element: ---- -{} ---- -When the user's request is ambiguous about which element to modify, prioritize this focused element. -"#, - index, element_type, full_content - ) -} - -/// Result of handling a version tool request -struct VersionRequestResult { - result: ToolResult, - data: Option<serde_json::Value>, - new_body: Option<Vec<BodyElement>>, - new_summary: Option<String>, -} - -/// Handle version tool requests that require async database access -async fn handle_version_request( - pool: &sqlx::PgPool, - file_id: Uuid, - request: &VersionToolRequest, - _current_body: &[BodyElement], - _current_summary: Option<&str>, - current_version: i32, -) -> VersionRequestResult { - match request { - VersionToolRequest::ListVersions => { - match repository::list_file_versions(pool, file_id).await { - Ok(versions) => { - let version_data: Vec<serde_json::Value> = versions - .iter() - .map(|v| { - serde_json::json!({ - "version": v.version, - "source": v.source, - "createdAt": v.created_at.to_rfc3339(), - "changeDescription": v.change_description, - }) - }) - .collect(); - - VersionRequestResult { - result: ToolResult { - success: true, - message: format!("Found {} versions. Current version is {}.", versions.len(), current_version), - }, - data: Some(serde_json::json!({ - "currentVersion": current_version, - "versions": version_data, - })), - new_body: None, - new_summary: None, - } - } - Err(e) => VersionRequestResult { - result: ToolResult { - success: false, - message: format!("Failed to list versions: {}", e), - }, - data: None, - new_body: None, - new_summary: None, - }, - } - } - VersionToolRequest::ReadVersion { version } => { - match repository::get_file_version(pool, file_id, *version).await { - Ok(Some(ver)) => { - // Convert body elements to a readable format - let body_preview: Vec<String> = ver - .body - .iter() - .enumerate() - .map(|(i, element)| { - let desc = match element { - BodyElement::Heading { level, text } => format!("H{}: {}", level, text), - BodyElement::Paragraph { text } => { - let preview: String = text.chars().take(100).collect(); - if text.chars().count() > 100 { - format!("Paragraph: {}...", preview) - } else { - format!("Paragraph: {}", preview) - } - } - BodyElement::Code { language, content } => { - let lang = language.as_deref().unwrap_or("plain"); - let preview: String = content.chars().take(100).collect(); - if content.chars().count() > 100 { - format!("Code ({}): {}...", lang, preview) - } else { - format!("Code ({}): {}", lang, preview) - } - } - BodyElement::List { ordered, items } => { - let list_type = if *ordered { "ordered" } else { "unordered" }; - format!("List ({}): {} items", list_type, items.len()) - } - BodyElement::Chart { chart_type, title, .. } => { - format!( - "Chart ({:?}){}", - chart_type, - title.as_ref().map(|t| format!(": {}", t)).unwrap_or_default() - ) - } - BodyElement::Image { alt, .. } => { - format!("Image{}", alt.as_ref().map(|a| format!(": {}", a)).unwrap_or_default()) - } - BodyElement::Markdown { content } => { - let preview: String = content.chars().take(100).collect(); - if content.chars().count() > 100 { - format!("Markdown: {}...", preview) - } else { - format!("Markdown: {}", preview) - } - } - }; - format!("[{}] {}", i, desc) - }) - .collect(); - - VersionRequestResult { - result: ToolResult { - success: true, - message: format!( - "Version {} from {} (source: {}). {} body elements.", - ver.version, - ver.created_at.format("%Y-%m-%d %H:%M"), - ver.source, - ver.body.len() - ), - }, - data: Some(serde_json::json!({ - "version": ver.version, - "source": ver.source, - "createdAt": ver.created_at.to_rfc3339(), - "summary": ver.summary, - "bodyPreview": body_preview, - "changeDescription": ver.change_description, - })), - new_body: None, - new_summary: None, - } - } - Ok(None) => VersionRequestResult { - result: ToolResult { - success: false, - message: format!("Version {} not found", version), - }, - data: None, - new_body: None, - new_summary: None, - }, - Err(e) => VersionRequestResult { - result: ToolResult { - success: false, - message: format!("Failed to read version: {}", e), - }, - data: None, - new_body: None, - new_summary: None, - }, - } - } - VersionToolRequest::RestoreVersion { target_version, reason } => { - // Set change description if provided - if let Some(reason) = reason { - let _ = repository::set_change_description(pool, reason).await; - } - - match repository::restore_file_version(pool, file_id, *target_version, current_version).await { - Ok(Some(restored_file)) => { - VersionRequestResult { - result: ToolResult { - success: true, - message: format!( - "Restored to version {}. New version is {}.", - target_version, restored_file.version - ), - }, - data: Some(serde_json::json!({ - "previousVersion": current_version, - "restoredFromVersion": target_version, - "newVersion": restored_file.version, - })), - new_body: Some(restored_file.body), - new_summary: restored_file.summary, - } - } - Ok(None) => VersionRequestResult { - result: ToolResult { - success: false, - message: format!("Version {} not found", target_version), - }, - data: None, - new_body: None, - new_summary: None, - }, - Err(RepositoryError::VersionConflict { expected, actual }) => { - VersionRequestResult { - result: ToolResult { - success: false, - message: format!( - "Version conflict: expected {}, actual {}. Document was modified.", - expected, actual - ), - }, - data: None, - new_body: None, - new_summary: None, - } - } - Err(e) => VersionRequestResult { - result: ToolResult { - success: false, - message: format!("Failed to restore version: {}", e), - }, - data: None, - new_body: None, - new_summary: None, - }, - } - } - } -} - -/// Estimate the token count of a message -fn estimate_message_tokens(message: &Message) -> usize { - let mut chars = 0; - - // Count content characters - if let Some(ref content) = message.content { - chars += content.len(); - } - - // Count tool call characters (rough estimate) - if let Some(ref tool_calls) = message.tool_calls { - for tc in tool_calls { - chars += tc.function.name.len(); - chars += tc.function.arguments.len(); - } - } - - // Count tool call ID - if let Some(ref id) = message.tool_call_id { - chars += id.len(); - } - - // Add overhead for role and structure - chars += message.role.len() + 20; - - // Convert to tokens - chars / CHARS_PER_TOKEN -} - -/// Estimate total token count of all messages -fn estimate_total_tokens(messages: &[Message]) -> usize { - messages.iter().map(estimate_message_tokens).sum() -} - -/// Check if context is nearing the limit -fn is_context_near_limit(messages: &[Message], model: &LlmModel) -> bool { - let estimated_tokens = estimate_total_tokens(messages); - let limit = match model { - LlmModel::ClaudeSonnet | LlmModel::ClaudeOpus => CLAUDE_CONTEXT_LIMIT, - LlmModel::GroqKimi => GROQ_CONTEXT_LIMIT, - }; - let threshold = (limit as f32 * CONTEXT_COMPACTION_THRESHOLD) as usize; - - estimated_tokens >= threshold -} - -/// Compact the conversation by summarizing older messages -/// Keeps: system message, last N user/assistant exchanges, and a summary of older content -fn compact_conversation(messages: &mut Vec<Message>, tool_call_history: &[ToolCallInfo]) { - // Keep at least system message + 4 recent messages (2 exchanges) - const MIN_MESSAGES_TO_KEEP: usize = 5; - - if messages.len() <= MIN_MESSAGES_TO_KEEP { - return; - } - - // Extract system message (always first) - let system_message = messages.remove(0); - - // Calculate how many messages to summarize - // Keep the last ~1/3 of messages for recent context - let messages_to_keep = std::cmp::max(4, messages.len() / 3); - let messages_to_summarize = messages.len() - messages_to_keep; - - if messages_to_summarize < 2 { - // Not enough to summarize, just put system message back - messages.insert(0, system_message); - return; - } - - // Extract messages to summarize - let old_messages: Vec<Message> = messages.drain(..messages_to_summarize).collect(); - - // Build summary of old messages - let mut summary_parts: Vec<String> = Vec::new(); - - // Summarize user requests - let user_requests: Vec<&str> = old_messages - .iter() - .filter(|m| m.role == "user") - .filter_map(|m| m.content.as_deref()) - .collect(); - - if !user_requests.is_empty() { - summary_parts.push(format!( - "Previous user requests: {}", - user_requests.join("; ") - )); - } - - // Summarize tool calls executed so far - if !tool_call_history.is_empty() { - let tool_summary: Vec<String> = tool_call_history - .iter() - .map(|tc| { - if tc.result.success { - format!("{}(ok)", tc.name) - } else { - format!("{}(failed: {})", tc.name, tc.result.message) - } - }) - .collect(); - - summary_parts.push(format!( - "Tools executed: {}", - tool_summary.join(", ") - )); - } - - // Count assistant responses that were summarized - let assistant_responses = old_messages - .iter() - .filter(|m| m.role == "assistant" && m.content.is_some()) - .count(); - - if assistant_responses > 0 { - summary_parts.push(format!( - "({} previous assistant responses omitted for brevity)", - assistant_responses - )); - } - - // Create compacted context message - let compacted_content = format!( - "[CONTEXT SUMMARY - Earlier conversation compacted to save tokens]\n{}", - summary_parts.join("\n") - ); - - // Rebuild messages: system + summary + remaining recent messages - let mut new_messages = vec![ - system_message, - Message { - role: "user".to_string(), - content: Some(compacted_content), - tool_calls: None, - tool_call_id: None, - }, - Message { - role: "assistant".to_string(), - content: Some("Understood. I have context from the previous conversation and will continue from here.".to_string()), - tool_calls: None, - tool_call_id: None, - }, - ]; - - new_messages.append(messages); - *messages = new_messages; - - tracing::info!( - summarized_messages = messages_to_summarize, - remaining_messages = messages.len(), - "Compacted conversation to save context" - ); -} diff --git a/makima/src/server/handlers/mesh_chat.rs b/makima/src/server/handlers/mesh_chat.rs deleted file mode 100644 index 638a4d3..0000000 --- a/makima/src/server/handlers/mesh_chat.rs +++ /dev/null @@ -1,2264 +0,0 @@ -//! Chat endpoint for LLM-powered task orchestration. -//! -//! This handler provides an agentic loop for managing tasks, daemons, and -//! overlay operations through LLM tool calling. - -use axum::{ - extract::{Path, State}, - http::StatusCode, - response::IntoResponse, - Json, -}; -use serde::{Deserialize, Serialize}; -use serde_json::json; -use utoipa::ToSchema; -use uuid::Uuid; - -use crate::db::{models::CreateTaskRequest, repository}; -use crate::llm::{ - claude::{self, ClaudeClient, ClaudeError, ClaudeModel}, - groq::{GroqClient, GroqError, Message, ToolCallResponse}, - parse_mesh_tool_call, LlmModel, MeshToolRequest, ToolCall, ToolResult, UserQuestion, - MESH_TOOLS, -}; -use crate::server::auth::Authenticated; -use crate::server::state::{DaemonCommand, SharedState, TaskUpdateNotification}; - -/// Maximum number of tool-calling rounds to prevent infinite loops -const MAX_TOOL_ROUNDS: usize = 30; - -#[derive(Debug, Clone, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct MeshChatHistoryMessage { - /// Role: "user" or "assistant" - pub role: String, - /// Message content - pub content: String, -} - -#[derive(Debug, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct MeshChatRequest { - /// The user's message/instruction - pub message: String, - /// Optional model selection: "claude-sonnet" (default), "claude-opus", or "groq" - #[serde(default)] - pub model: Option<String>, - /// Optional conversation history for context continuity (deprecated - now loaded from DB) - #[serde(default)] - pub history: Option<Vec<MeshChatHistoryMessage>>, - /// Context type: "mesh", "task", or "subtask" - #[serde(default)] - pub context_type: Option<String>, - /// Task ID if context is task/subtask - #[serde(default)] - pub context_task_id: Option<Uuid>, -} - -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct MeshChatResponse { - /// The LLM's response message - pub response: String, - /// Tool calls that were executed - pub tool_calls: Vec<MeshToolCallInfo>, - /// Questions pending user answers (pauses conversation) - #[serde(skip_serializing_if = "Option::is_none")] - pub pending_questions: Option<Vec<UserQuestion>>, -} - -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct MeshToolCallInfo { - pub name: String, - pub result: ToolResult, -} - -/// Enum to hold LLM clients -enum LlmClient { - Groq(GroqClient), - Claude(ClaudeClient), -} - -/// Unified result from LLM call -struct LlmResult { - content: Option<String>, - tool_calls: Vec<ToolCall>, - raw_tool_calls: Vec<ToolCallResponse>, - finish_reason: String, -} - -/// Chat with mesh orchestrator at the top level (no specific task context) -#[utoipa::path( - post, - path = "/api/v1/mesh/chat", - request_body = MeshChatRequest, - responses( - (status = 200, description = "Chat completed successfully", body = MeshChatResponse), - (status = 401, description = "Unauthorized"), - (status = 500, description = "Internal server error") - ), - security( - ("bearer_auth" = []), - ("api_key" = []) - ), - tag = "Mesh" -)] -pub async fn mesh_toplevel_chat_handler( - State(state): State<SharedState>, - Authenticated(auth): Authenticated, - Json(request): Json<MeshChatRequest>, -) -> impl IntoResponse { - // Check if database is configured - let Some(ref pool) = state.db_pool else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(json!({ "error": "Database not configured" })), - ) - .into_response(); - }; - - // Parse model selection (default to Claude Sonnet) - let model = request - .model - .as_ref() - .and_then(|m| LlmModel::from_str(m)) - .unwrap_or(LlmModel::ClaudeSonnet); - - tracing::info!("Mesh top-level chat using LLM model: {:?}", model); - - // Initialize the appropriate LLM client - let llm_client = match model { - LlmModel::ClaudeSonnet => match ClaudeClient::from_env(ClaudeModel::Sonnet) { - Ok(client) => LlmClient::Claude(client), - Err(ClaudeError::MissingApiKey) => { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })), - ) - .into_response(); - } - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("Claude client error: {}", e) })), - ) - .into_response(); - } - }, - LlmModel::ClaudeOpus => match ClaudeClient::from_env(ClaudeModel::Opus) { - Ok(client) => LlmClient::Claude(client), - Err(ClaudeError::MissingApiKey) => { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })), - ) - .into_response(); - } - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("Claude client error: {}", e) })), - ) - .into_response(); - } - }, - LlmModel::GroqKimi => match GroqClient::from_env() { - Ok(client) => LlmClient::Groq(client), - Err(GroqError::MissingApiKey) => { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(json!({ "error": "GROQ_API_KEY not configured" })), - ) - .into_response(); - } - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("Groq client error: {}", e) })), - ) - .into_response(); - } - }, - }; - - // Build context about all tasks and daemons - let mesh_context = build_mesh_overview_context(pool, &state, auth.owner_id).await; - - // Build agentic system prompt for top-level mesh orchestration - let system_prompt = format!( - r#"You are an intelligent task orchestration agent. You help users manage and coordinate tasks running on connected daemons with Claude Code containers. - -## Your Capabilities -You have access to tools for: -- **Task Lifecycle**: create_task, run_task, pause_task, resume_task, interrupt_task, discard_task -- **Task Queries**: query_task_status, list_tasks, list_subtasks, list_siblings, list_daemons -- **File Access**: list_files, read_file (read documents from the files system) -- **Task Communication**: send_message_to_task, update_task_plan -- **Overlay/Merge Operations**: peek_sibling_overlay, get_overlay_diff, preview_merge, merge_subtask, complete_task, set_merge_mode - -## Current Mesh Overview -{mesh_context} - -## Agentic Behavior Guidelines - -### 1. Analyze Before Acting -- For complex orchestration requests, first gather information using query_task_status, list_tasks, or list_daemons -- Understand the current state before making changes -- For simple, direct requests (e.g., "create a new task"), you can act immediately - -### 2. Plan Multi-Step Operations -- Break complex orchestration into logical steps -- For parallel execution: create multiple subtasks, then run them on different daemons -- For sequential execution: create subtasks and run them in order - -### 3. Create and Manage Tasks -- Use create_task to create new top-level tasks or subtasks -- Assign appropriate priorities and plans -- **Repository Default**: When creating tasks, use the daemon's working directory as the repository_url by default (shown as "Default Repository" above). Only omit repository_url if the task doesn't involve code, or use a different URL if the user explicitly requests it. -- If a working directory is a git repository, use it as the repository_url for code-related tasks - -### 4. Coordinate Multiple Tasks -- Use list_tasks to see all tasks and their statuses -- Use list_daemons to see available compute resources -- Balance workload across daemons - -### 5. Be Efficient -- Don't over-analyze simple requests -- Use the minimum number of tool calls needed -- Provide clear summaries of actions taken - -## Important Notes -- Task IDs are UUIDs - ensure you use the correct format -- Running a task requires at least one connected daemon -- When creating subtasks, specify the parent_task_id -- Always confirm destructive operations (discard_task) with the user"#, - mesh_context = mesh_context - ); - - // Run the shared agentic loop - run_mesh_agentic_loop(pool, &state, &llm_client, system_prompt, &request, auth.owner_id).await -} - -/// Chat with task mesh orchestrator using LLM tool calling (scoped by owner) -#[utoipa::path( - post, - path = "/api/v1/mesh/tasks/{id}/chat", - request_body = MeshChatRequest, - responses( - (status = 200, description = "Chat completed successfully", body = MeshChatResponse), - (status = 401, description = "Unauthorized"), - (status = 404, description = "Task not found"), - (status = 500, description = "Internal server error") - ), - params( - ("id" = Uuid, Path, description = "Task ID (context for orchestration)") - ), - security( - ("bearer_auth" = []), - ("api_key" = []) - ), - tag = "Mesh" -)] -pub async fn mesh_chat_handler( - State(state): State<SharedState>, - Authenticated(auth): Authenticated, - Path(task_id): Path<Uuid>, - Json(request): Json<MeshChatRequest>, -) -> impl IntoResponse { - // Check if database is configured - let Some(ref pool) = state.db_pool else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(json!({ "error": "Database not configured" })), - ) - .into_response(); - }; - - // Get the context task (scoped by owner) - let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { - Ok(Some(task)) => task, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(json!({ "error": "Task not found" })), - ) - .into_response(); - } - Err(e) => { - tracing::error!("Database error: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("Database error: {}", e) })), - ) - .into_response(); - } - }; - - // Parse model selection (default to Claude Sonnet) - let model = request - .model - .as_ref() - .and_then(|m| LlmModel::from_str(m)) - .unwrap_or(LlmModel::ClaudeSonnet); - - tracing::info!("Mesh chat using LLM model: {:?}", model); - - // Initialize the appropriate LLM client - let llm_client = match model { - LlmModel::ClaudeSonnet => match ClaudeClient::from_env(ClaudeModel::Sonnet) { - Ok(client) => LlmClient::Claude(client), - Err(ClaudeError::MissingApiKey) => { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })), - ) - .into_response(); - } - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("Claude client error: {}", e) })), - ) - .into_response(); - } - }, - LlmModel::ClaudeOpus => match ClaudeClient::from_env(ClaudeModel::Opus) { - Ok(client) => LlmClient::Claude(client), - Err(ClaudeError::MissingApiKey) => { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })), - ) - .into_response(); - } - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("Claude client error: {}", e) })), - ) - .into_response(); - } - }, - LlmModel::GroqKimi => match GroqClient::from_env() { - Ok(client) => LlmClient::Groq(client), - Err(GroqError::MissingApiKey) => { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(json!({ "error": "GROQ_API_KEY not configured" })), - ) - .into_response(); - } - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("Groq client error: {}", e) })), - ) - .into_response(); - } - }, - }; - - // Build context about the current task and mesh state - let task_context = build_task_context(&task); - - // Build agentic system prompt for task orchestration - let system_prompt = format!( - r#"You are an intelligent task orchestration agent. You help users manage and coordinate tasks running on connected daemons with Claude Code containers. - -## Your Capabilities -You have access to tools for: -- **Task Lifecycle**: create_task, run_task, pause_task, resume_task, interrupt_task, discard_task -- **Task Queries**: query_task_status, list_tasks, list_subtasks, list_siblings, list_daemons -- **File Access**: list_files, read_file (read documents from the files system) -- **Task Communication**: send_message_to_task, update_task_plan -- **Overlay/Merge Operations**: peek_sibling_overlay, get_overlay_diff, preview_merge, merge_subtask, complete_task, set_merge_mode - -## Current Context -{task_context} - -## Agentic Behavior Guidelines - -### 1. Analyze Before Acting -- For complex orchestration requests, first gather information using query_task_status, list_tasks, or list_daemons -- Understand the current state before making changes -- For simple, direct requests (e.g., "pause this task"), you can act immediately - -### 2. Plan Multi-Step Operations -- Break complex orchestration into logical steps -- For parallel execution: create multiple subtasks, then run them on different daemons -- For sequential execution: create subtasks and run them in order - -### 3. Monitor Task Progress -- Use query_task_status to check on running tasks -- Watch for status changes and react accordingly -- Handle failures gracefully (retry, escalate, or report) - -### 4. Coordinate Sibling Tasks -- Use peek_sibling_overlay to see what other tasks have changed -- Preview merges before completing to catch conflicts -- Coordinate timing when multiple tasks need to merge - -### 5. Be Efficient -- Don't over-analyze simple requests -- Use the minimum number of tool calls needed -- Provide clear summaries of actions taken - -## Important Notes -- Task IDs are UUIDs - ensure you use the correct format -- Running a task requires at least one connected daemon -- Overlay operations require the task to have been run at least once -- Always confirm destructive operations (discard_task) with the user -- When creating subtasks for this task, use parent_task_id: {task_id}"#, - task_context = task_context, - task_id = task_id - ); - - // Run the shared agentic loop - run_mesh_agentic_loop(pool, &state, &llm_client, system_prompt, &request, auth.owner_id).await -} - -fn build_task_context(task: &crate::db::models::Task) -> String { - let mut context = format!( - "Current Task: {} (ID: {})\n", - task.name, task.id - ); - context.push_str(&format!("Status: {}\n", task.status)); - context.push_str(&format!("Priority: {}\n", task.priority)); - - if let Some(ref desc) = task.description { - context.push_str(&format!("Description: {}\n", desc)); - } - - // Truncate plan preview if too long - let plan_preview = if task.plan.len() > 200 { - format!("{}...", &task.plan[..200]) - } else { - task.plan.clone() - }; - context.push_str(&format!("Plan: {}\n", plan_preview)); - - if let Some(ref summary) = task.progress_summary { - context.push_str(&format!("Progress: {}\n", summary)); - } - - if let Some(ref error) = task.error_message { - context.push_str(&format!("Error: {}\n", error)); - } - - // Repository info - if let Some(ref url) = task.repository_url { - context.push_str(&format!("Repository: {}\n", url)); - } - if let Some(ref branch) = task.base_branch { - context.push_str(&format!("Base branch: {}\n", branch)); - } - - context -} - -/// Build overview context for top-level mesh orchestration -async fn build_mesh_overview_context(pool: &sqlx::PgPool, state: &SharedState, owner_id: Uuid) -> String { - let mut context = String::new(); - - // Get task counts by status - match repository::list_tasks_for_owner(pool, owner_id).await { - Ok(tasks) => { - let total = tasks.len(); - let pending = tasks.iter().filter(|t| t.status == "pending").count(); - let running = tasks.iter().filter(|t| t.status == "running").count(); - let paused = tasks.iter().filter(|t| t.status == "paused").count(); - let done = tasks.iter().filter(|t| t.status == "done").count(); - let failed = tasks.iter().filter(|t| t.status == "failed").count(); - - context.push_str(&format!( - "Tasks: {} total ({} pending, {} running, {} paused, {} done, {} failed)\n", - total, pending, running, paused, done, failed - )); - - // List recent/active tasks - if !tasks.is_empty() { - context.push_str("\nRecent Tasks:\n"); - for task in tasks.iter().take(5) { - context.push_str(&format!( - " - {} (ID: {}, Status: {})\n", - task.name, task.id, task.status - )); - } - if tasks.len() > 5 { - context.push_str(&format!(" ... and {} more\n", tasks.len() - 5)); - } - } - } - Err(e) => { - context.push_str(&format!("Error fetching tasks: {}\n", e)); - } - } - - // Get connected daemons for this owner - let owner_daemons: Vec<_> = state.daemon_connections.iter() - .filter(|e| e.value().owner_id == owner_id) - .collect(); - let daemon_count = owner_daemons.len(); - context.push_str(&format!("\nConnected Daemons: {}\n", daemon_count)); - - for entry in owner_daemons.iter().take(3) { - let daemon = entry.value(); - let working_dir = daemon.working_directory.as_deref().unwrap_or("not set"); - context.push_str(&format!( - " - {} (ID: {}, Working Directory: {})\n", - daemon.hostname.as_deref().unwrap_or("unknown"), - daemon.id, - working_dir - )); - } - - // Add default repository guidance if there's exactly one daemon with a working directory - let daemons_with_working_dir: Vec<_> = owner_daemons.iter() - .filter(|e| e.value().working_directory.is_some()) - .collect(); - - if daemons_with_working_dir.len() == 1 { - if let Some(dir) = &daemons_with_working_dir[0].value().working_directory { - context.push_str(&format!( - "\nDefault Repository: {} (use this as repository_url when creating tasks unless user specifies otherwise)\n", - dir - )); - } - } - - context -} - -/// Run the shared agentic loop for mesh chat -async fn run_mesh_agentic_loop( - pool: &sqlx::PgPool, - state: &SharedState, - llm_client: &LlmClient, - system_prompt: String, - request: &MeshChatRequest, - owner_id: Uuid, -) -> axum::response::Response { - // Get or create conversation for storing messages - let conversation = match repository::get_or_create_active_conversation(pool, owner_id).await { - Ok(c) => c, - Err(e) => { - tracing::error!("Failed to get/create conversation: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("Failed to initialize conversation: {}", e) })), - ) - .into_response(); - } - }; - - // Build initial messages - let mut messages = vec![Message { - role: "system".to_string(), - content: Some(system_prompt), - tool_calls: None, - tool_call_id: None, - }]; - - // Load conversation history from database (or use provided for backwards compatibility) - if let Some(history) = &request.history { - // Legacy: use provided history - for hist_msg in history { - messages.push(Message { - role: hist_msg.role.clone(), - content: Some(hist_msg.content.clone()), - tool_calls: None, - tool_call_id: None, - }); - } - tracing::info!( - history_messages = history.len(), - "Loaded mesh conversation history from request (legacy)" - ); - } else { - // New: load from database - match repository::list_chat_messages(pool, conversation.id, Some(50)).await { - Ok(db_messages) => { - for msg in db_messages { - messages.push(Message { - role: msg.role.clone(), - content: Some(msg.content.clone()), - tool_calls: None, - tool_call_id: None, - }); - } - tracing::info!( - history_messages = messages.len() - 1, // minus system message - "Loaded mesh conversation history from database" - ); - } - Err(e) => { - tracing::warn!("Failed to load chat history: {}", e); - // Continue without history - } - } - } - - // Add current user message - messages.push(Message { - role: "user".to_string(), - content: Some(request.message.clone()), - tool_calls: None, - tool_call_id: None, - }); - - // State for tracking - let mut all_tool_call_infos: Vec<MeshToolCallInfo> = Vec::new(); - let mut final_response: Option<String> = None; - let mut consecutive_failures = 0; - const MAX_CONSECUTIVE_FAILURES: usize = 3; - let mut pending_questions: Option<Vec<UserQuestion>> = None; - - // Multi-turn agentic tool calling loop - for round in 0..MAX_TOOL_ROUNDS { - tracing::info!( - round = round, - total_tool_calls = all_tool_call_infos.len(), - "Mesh agentic loop iteration" - ); - - // Check consecutive failures - if consecutive_failures >= MAX_CONSECUTIVE_FAILURES { - tracing::warn!( - "Breaking mesh loop due to {} consecutive failures", - consecutive_failures - ); - final_response = Some( - "I encountered multiple consecutive errors and stopped. \ - Please check the task state and try again." - .to_string(), - ); - break; - } - - // Call the appropriate LLM API - let result = match llm_client { - LlmClient::Groq(groq) => { - match groq.chat_with_tools(messages.clone(), &MESH_TOOLS).await { - Ok(r) => LlmResult { - content: r.content, - tool_calls: r.tool_calls, - raw_tool_calls: r.raw_tool_calls, - finish_reason: r.finish_reason, - }, - Err(e) => { - tracing::error!("Groq API error: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("LLM API error: {}", e) })), - ) - .into_response(); - } - } - } - LlmClient::Claude(claude_client) => { - let claude_messages = claude::groq_messages_to_claude(&messages); - match claude_client - .chat_with_tools(claude_messages, &MESH_TOOLS) - .await - { - Ok(r) => { - let raw_tool_calls: Vec<ToolCallResponse> = r - .tool_calls - .iter() - .map(|tc| ToolCallResponse { - id: tc.id.clone(), - call_type: "function".to_string(), - function: crate::llm::groq::FunctionCall { - name: tc.name.clone(), - arguments: tc.arguments.to_string(), - }, - }) - .collect(); - - LlmResult { - content: r.content, - tool_calls: r.tool_calls, - raw_tool_calls, - finish_reason: r.stop_reason, - } - } - Err(e) => { - tracing::error!("Claude API error: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("LLM API error: {}", e) })), - ) - .into_response(); - } - } - } - }; - - // Check if there are tool calls to execute - if result.tool_calls.is_empty() { - final_response = result.content; - break; - } - - // Add assistant message with tool calls to conversation - messages.push(Message { - role: "assistant".to_string(), - content: result.content.clone(), - tool_calls: Some(result.raw_tool_calls.clone()), - tool_call_id: None, - }); - - // Execute each tool call - for (i, tool_call) in result.tool_calls.iter().enumerate() { - tracing::info!(tool = %tool_call.name, round = round, "Executing mesh tool call"); - - // Parse the tool call - let mut execution_result = parse_mesh_tool_call(tool_call); - - // Handle async mesh tool requests - if let Some(mesh_request) = execution_result.request.take() { - let async_result = handle_mesh_request(pool, state, mesh_request, owner_id).await; - execution_result.success = async_result.success; - execution_result.message = async_result.message; - execution_result.data = async_result.data; - } - - // Track consecutive failures - if execution_result.success { - consecutive_failures = 0; - } else { - consecutive_failures += 1; - tracing::warn!( - tool = %tool_call.name, - consecutive_failures = consecutive_failures, - "Mesh tool call failed" - ); - } - - // Check for pending user questions - if let Some(questions) = execution_result.pending_questions { - tracing::info!( - question_count = questions.len(), - "Mesh LLM requesting user input" - ); - pending_questions = Some(questions); - all_tool_call_infos.push(MeshToolCallInfo { - name: tool_call.name.clone(), - result: ToolResult { - success: execution_result.success, - message: execution_result.message.clone(), - }, - }); - break; - } - - // Build tool result message - let result_content = if let Some(data) = &execution_result.data { - json!({ - "success": execution_result.success, - "message": execution_result.message, - "data": data - }) - .to_string() - } else { - json!({ - "success": execution_result.success, - "message": execution_result.message - }) - .to_string() - }; - - // Add tool result message - let tool_call_id = match llm_client { - LlmClient::Groq(_) => result.raw_tool_calls[i].id.clone(), - LlmClient::Claude(_) => tool_call.id.clone(), - }; - - messages.push(Message { - role: "tool".to_string(), - content: Some(result_content), - tool_calls: None, - tool_call_id: Some(tool_call_id), - }); - - // Track for response - all_tool_call_infos.push(MeshToolCallInfo { - name: tool_call.name.clone(), - result: ToolResult { - success: execution_result.success, - message: execution_result.message, - }, - }); - } - - // If user questions are pending, pause - if pending_questions.is_some() { - final_response = result.content; - break; - } - - // If finish reason indicates completion, exit loop - let finish_lower = result.finish_reason.to_lowercase(); - if finish_lower == "stop" || finish_lower == "end_turn" { - final_response = result.content; - break; - } - } - - // Build response - let response_text = final_response.unwrap_or_else(|| { - if all_tool_call_infos.is_empty() { - "I couldn't understand your request. Please try rephrasing.".to_string() - } else { - format!( - "Done! Executed {} tool{}.", - all_tool_call_infos.len(), - if all_tool_call_infos.len() == 1 { - "" - } else { - "s" - } - ) - } - }); - - // Save messages to database (only if not using legacy history mode) - if request.history.is_none() { - let context_type = request.context_type.clone().unwrap_or_else(|| "mesh".to_string()); - - // Validate context_task_id exists before using it (to avoid FK constraint violation) - let context_task_id = if let Some(task_id) = request.context_task_id { - match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(_)) => Some(task_id), - Ok(None) => { - tracing::warn!("context_task_id {} not found, ignoring", task_id); - None - } - Err(e) => { - tracing::warn!("Failed to validate context_task_id {}: {}", task_id, e); - None - } - } - } else { - None - }; - - // Save user message - if let Err(e) = repository::add_chat_message( - pool, - conversation.id, - "user", - &request.message, - &context_type, - context_task_id, - None, - None, - ) - .await - { - tracing::warn!("Failed to save user message to DB: {}", e); - } - - // Serialize tool calls for storage - let tool_calls_json = if all_tool_call_infos.is_empty() { - None - } else { - Some(serde_json::to_value(&all_tool_call_infos).unwrap_or_default()) - }; - - // Serialize pending questions for storage - let pending_questions_json = pending_questions - .as_ref() - .map(|q| serde_json::to_value(q).unwrap_or_default()); - - // Save assistant message - if let Err(e) = repository::add_chat_message( - pool, - conversation.id, - "assistant", - &response_text, - &context_type, - context_task_id, - tool_calls_json, - pending_questions_json, - ) - .await - { - tracing::warn!("Failed to save assistant message to DB: {}", e); - } - - tracing::info!( - conversation_id = %conversation.id, - context_type = %context_type, - "Saved mesh chat messages to database" - ); - } - - ( - StatusCode::OK, - Json(MeshChatResponse { - response: response_text, - tool_calls: all_tool_call_infos, - pending_questions, - }), - ) - .into_response() -} - -/// Result from handling an async mesh tool request -struct MeshRequestResult { - success: bool, - message: String, - data: Option<serde_json::Value>, -} - -/// Handle async mesh tool requests that require database/daemon access -async fn handle_mesh_request( - pool: &sqlx::PgPool, - state: &SharedState, - request: MeshToolRequest, - owner_id: Uuid, -) -> MeshRequestResult { - match request { - MeshToolRequest::CreateTask { - name, - plan, - parent_task_id, - repository_url, - base_branch, - merge_mode, - priority, - } => { - // Subtasks inherit contract_id from parent task - let contract_id = if let Some(parent_id) = parent_task_id { - match repository::get_task(pool, parent_id).await { - Ok(Some(parent_task)) => { - match parent_task.contract_id { - Some(cid) => cid, - None => { - return MeshRequestResult { - success: false, - message: "Parent task has no contract_id".to_string(), - data: None, - }; - } - } - } - Ok(None) => { - return MeshRequestResult { - success: false, - message: format!("Parent task {} not found", parent_id), - data: None, - }; - } - Err(e) => { - return MeshRequestResult { - success: false, - message: format!("Failed to look up parent task: {}", e), - data: None, - }; - } - } - } else { - // Root tasks created via LLM chat require a contract_id - // TODO: Add contract_id to create_task tool definition - return MeshRequestResult { - success: false, - message: "Cannot create root task without contract_id. Use parent_task_id to create subtasks.".to_string(), - data: None, - }; - }; - - // Check if repository_url matches a daemon's working directory (for this owner) - let is_daemon_working_dir = repository_url.as_ref().map(|url| { - state.daemon_connections.iter().any(|entry| { - entry.value().owner_id == owner_id && - entry.value().working_directory.as_ref() == Some(url) - }) - }).unwrap_or(false); - - // Derive completion_action from merge_mode, or default to "branch" if using daemon working dir - let (completion_action, target_repo_path) = if let Some(ref mode) = merge_mode { - // Explicit merge_mode provided - derive from it - let action = match mode.as_str() { - "pr" => "pr".to_string(), - "auto" => "merge".to_string(), - "manual" => "branch".to_string(), - _ => "none".to_string(), - }; - // If using daemon working dir and action involves the repo, set target_repo_path - let target = if is_daemon_working_dir && action != "none" { - repository_url.clone() - } else { - None - }; - (Some(action), target) - } else if is_daemon_working_dir { - // No merge_mode but using daemon working dir - default to "pr" - (Some("pr".to_string()), repository_url.clone()) - } else { - (None, None) - }; - - let create_req = CreateTaskRequest { - contract_id: Some(contract_id), - name: name.clone(), - description: None, - plan, - parent_task_id, - repository_url, - base_branch, - target_branch: None, - merge_mode, - priority: priority.unwrap_or(0), - target_repo_path, - completion_action, - continue_from_task_id: None, - copy_files: None, - is_supervisor: false, - checkpoint_sha: None, - branched_from_task_id: None, - conversation_history: None, - supervisor_worktree_task_id: None, // Not spawned by supervisor - directive_id: None, - directive_step_id: None, - }; - - match repository::create_task_for_owner(pool, owner_id, create_req).await { - Ok(task) => MeshRequestResult { - success: true, - message: format!("Created task '{}' with ID {}", name, task.id), - data: Some(json!({ - "taskId": task.id, - "name": task.name, - "status": task.status, - })), - }, - Err(e) => MeshRequestResult { - success: false, - message: format!("Failed to create task: {}", e), - data: None, - }, - } - } - - MeshToolRequest::RunTask { task_id, daemon_id } => { - // Get task to check status - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return MeshRequestResult { - success: false, - message: format!("Task {} not found", task_id), - data: None, - } - } - Err(e) => { - return MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - } - } - }; - - if task.status != "pending" && task.status != "paused" { - return MeshRequestResult { - success: false, - message: format!( - "Task cannot be run - status is '{}' (must be 'pending' or 'paused')", - task.status - ), - data: None, - }; - } - - // Find a daemon to run on (must belong to this owner) - let target_daemon_id = if let Some(id) = daemon_id { - // Verify the specified daemon belongs to this owner - if !state.daemon_connections.iter().any(|d| d.value().id == id && d.value().owner_id == owner_id) { - return MeshRequestResult { - success: false, - message: "Specified daemon not found or not accessible.".to_string(), - data: None, - }; - } - id - } else { - // Find any connected daemon for this owner - let daemon = state.daemon_connections.iter().find(|d| d.value().owner_id == owner_id); - match daemon { - Some(d) => d.value().id, - None => { - return MeshRequestResult { - success: false, - message: "No daemons connected for your account. Cannot run task.".to_string(), - data: None, - } - } - } - }; - - // Check if this is an orchestrator (depth 0 with subtasks) - let subtask_count = match repository::list_subtasks_for_owner(pool, task_id, owner_id).await { - Ok(subtasks) => subtasks.len(), - Err(_) => 0, - }; - let is_orchestrator = task.depth == 0 && subtask_count > 0; - - // IMPORTANT: Update database FIRST to assign daemon_id before sending command - // This prevents race conditions where the task starts but daemon_id is not set - let update_req = crate::db::models::UpdateTaskRequest { - status: Some("starting".to_string()), - daemon_id: Some(target_daemon_id), - version: Some(task.version), - ..Default::default() - }; - - let updated_task = match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { - Ok(Some(t)) => t, - Ok(None) => { - return MeshRequestResult { - success: false, - message: format!("Task {} not found", task_id), - data: None, - } - } - Err(e) => { - return MeshRequestResult { - success: false, - message: format!("Failed to update task: {}", e), - data: None, - } - } - }; - - // Get local_only and auto_merge_local from contract if task has one - let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id { - match repository::get_contract_for_owner(pool, contract_id, owner_id).await { - Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local), - _ => (false, false), - } - } else { - (false, false) - }; - - // Send SpawnTask command to daemon - let command = DaemonCommand::SpawnTask { - task_id, - task_name: task.name.clone(), - plan: task.plan.clone(), - repo_url: task.repository_url.clone(), - base_branch: task.base_branch.clone(), - target_branch: task.target_branch.clone(), - parent_task_id: task.parent_task_id, - depth: task.depth, - is_orchestrator, - target_repo_path: task.target_repo_path.clone(), - completion_action: task.completion_action.clone(), - continue_from_task_id: task.continue_from_task_id, - copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), - contract_id: task.contract_id, - is_supervisor: task.is_supervisor, - autonomous_loop: false, - resume_session: false, - conversation_history: None, - patch_data: None, - patch_base_sha: None, - local_only, - auto_merge_local, - supervisor_worktree_task_id: None, // Not spawned by supervisor - directive_id: task.directive_id, - }; - - match state.send_daemon_command(target_daemon_id, command).await { - Ok(()) => { - state.broadcast_task_update(TaskUpdateNotification { - task_id, - owner_id: Some(task.owner_id), - version: updated_task.version, - status: "starting".to_string(), - updated_fields: vec!["status".to_string(), "daemon_id".to_string()], - updated_by: "system".to_string(), - }); - - MeshRequestResult { - success: true, - message: format!("Task {} is now running on daemon {}", task_id, target_daemon_id), - data: Some(json!({ - "taskId": task_id, - "daemonId": target_daemon_id, - "status": "starting", - })), - } - } - Err(e) => { - // Rollback: clear daemon_id and reset status since command failed - let rollback_req = crate::db::models::UpdateTaskRequest { - status: Some("pending".to_string()), - clear_daemon_id: true, - ..Default::default() - }; - let _ = repository::update_task_for_owner(pool, task_id, owner_id, rollback_req).await; - - MeshRequestResult { - success: false, - message: format!("Failed to start task: {}", e), - data: None, - } - } - } - } - - MeshToolRequest::PauseTask { task_id } => { - // Get task and its daemon - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return MeshRequestResult { - success: false, - message: format!("Task {} not found", task_id), - data: None, - } - } - Err(e) => { - return MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - } - } - }; - - if task.status != "running" { - return MeshRequestResult { - success: false, - message: format!("Task is not running (status: {})", task.status), - data: None, - }; - } - - if let Some(daemon_id) = task.daemon_id { - let command = DaemonCommand::PauseTask { task_id }; - if let Err(e) = state.send_daemon_command(daemon_id, command).await { - return MeshRequestResult { - success: false, - message: format!("Failed to pause task: {}", e), - data: None, - }; - } - } - - // Update status - let update_req = crate::db::models::UpdateTaskRequest { - status: Some("paused".to_string()), - version: Some(task.version), - ..Default::default() - }; - - if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { - state.broadcast_task_update(TaskUpdateNotification { - task_id, - owner_id: Some(task.owner_id), - version: updated.version, - status: "paused".to_string(), - updated_fields: vec!["status".to_string()], - updated_by: "system".to_string(), - }); - } - - MeshRequestResult { - success: true, - message: format!("Task {} paused", task_id), - data: Some(json!({ "taskId": task_id, "status": "paused" })), - } - } - - MeshToolRequest::ResumeTask { task_id } => { - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return MeshRequestResult { - success: false, - message: format!("Task {} not found", task_id), - data: None, - } - } - Err(e) => { - return MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - } - } - }; - - if task.status != "paused" { - return MeshRequestResult { - success: false, - message: format!("Task is not paused (status: {})", task.status), - data: None, - }; - } - - if let Some(daemon_id) = task.daemon_id { - let command = DaemonCommand::ResumeTask { task_id }; - if let Err(e) = state.send_daemon_command(daemon_id, command).await { - return MeshRequestResult { - success: false, - message: format!("Failed to resume task: {}", e), - data: None, - }; - } - } - - // Update status - let update_req = crate::db::models::UpdateTaskRequest { - status: Some("running".to_string()), - version: Some(task.version), - ..Default::default() - }; - - if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { - state.broadcast_task_update(TaskUpdateNotification { - task_id, - owner_id: Some(task.owner_id), - version: updated.version, - status: "running".to_string(), - updated_fields: vec!["status".to_string()], - updated_by: "system".to_string(), - }); - } - - MeshRequestResult { - success: true, - message: format!("Task {} resumed", task_id), - data: Some(json!({ "taskId": task_id, "status": "running" })), - } - } - - MeshToolRequest::InterruptTask { task_id, graceful } => { - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return MeshRequestResult { - success: false, - message: format!("Task {} not found", task_id), - data: None, - } - } - Err(e) => { - return MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - } - } - }; - - if let Some(daemon_id) = task.daemon_id { - let command = DaemonCommand::InterruptTask { task_id, graceful }; - if let Err(e) = state.send_daemon_command(daemon_id, command).await { - return MeshRequestResult { - success: false, - message: format!("Failed to interrupt task: {}", e), - data: None, - }; - } - } - - // Update status - let update_req = crate::db::models::UpdateTaskRequest { - status: Some("paused".to_string()), - version: Some(task.version), - ..Default::default() - }; - - if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { - state.broadcast_task_update(TaskUpdateNotification { - task_id, - owner_id: Some(task.owner_id), - version: updated.version, - status: "paused".to_string(), - updated_fields: vec!["status".to_string()], - updated_by: "system".to_string(), - }); - } - - MeshRequestResult { - success: true, - message: format!( - "Task {} {}interrupted", - task_id, - if graceful { "gracefully " } else { "" } - ), - data: Some(json!({ "taskId": task_id, "status": "paused" })), - } - } - - MeshToolRequest::DiscardTask { task_id } => { - match repository::delete_task_for_owner(pool, task_id, owner_id).await { - Ok(true) => MeshRequestResult { - success: true, - message: format!("Task {} discarded", task_id), - data: Some(json!({ "taskId": task_id, "deleted": true })), - }, - Ok(false) => MeshRequestResult { - success: false, - message: format!("Task {} not found", task_id), - data: None, - }, - Err(e) => MeshRequestResult { - success: false, - message: format!("Failed to delete task: {}", e), - data: None, - }, - } - } - - MeshToolRequest::QueryTaskStatus { task_id } => { - match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(task)) => MeshRequestResult { - success: true, - message: format!("Task '{}' is {}", task.name, task.status), - data: Some(json!({ - "taskId": task.id, - "name": task.name, - "status": task.status, - "priority": task.priority, - "description": task.description, - "plan": task.plan, - "progressSummary": task.progress_summary, - "errorMessage": task.error_message, - "repositoryUrl": task.repository_url, - "baseBranch": task.base_branch, - "targetBranch": task.target_branch, - "mergeMode": task.merge_mode, - "prUrl": task.pr_url, - "daemonId": task.daemon_id, - "containerId": task.container_id, - "createdAt": task.created_at, - "startedAt": task.started_at, - "completedAt": task.completed_at, - })), - }, - Ok(None) => MeshRequestResult { - success: false, - message: format!("Task {} not found", task_id), - data: None, - }, - Err(e) => MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - }, - } - } - - MeshToolRequest::ListTasks { - status_filter, - parent_task_id, - } => { - // TODO: Add filtering support to repository - match repository::list_tasks_for_owner(pool, owner_id).await { - Ok(mut tasks) => { - // Apply filters - if let Some(ref status) = status_filter { - tasks.retain(|t| &t.status == status); - } - if let Some(ref parent_id) = parent_task_id { - tasks.retain(|t| t.parent_task_id.as_ref() == Some(parent_id)); - } - - let task_data: Vec<serde_json::Value> = tasks - .iter() - .map(|t| { - json!({ - "taskId": t.id, - "name": t.name, - "status": t.status, - "priority": t.priority, - "parentTaskId": t.parent_task_id, - }) - }) - .collect(); - - MeshRequestResult { - success: true, - message: format!("Found {} tasks", tasks.len()), - data: Some(json!({ "tasks": task_data })), - } - } - Err(e) => MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - }, - } - } - - MeshToolRequest::ListSubtasks { task_id } => { - match repository::list_subtasks_for_owner(pool, task_id, owner_id).await { - Ok(subtasks) => { - let subtask_data: Vec<serde_json::Value> = subtasks - .iter() - .map(|t| { - json!({ - "taskId": t.id, - "name": t.name, - "status": t.status, - "priority": t.priority, - }) - }) - .collect(); - - MeshRequestResult { - success: true, - message: format!("Found {} subtasks", subtasks.len()), - data: Some(json!({ "subtasks": subtask_data })), - } - } - Err(e) => MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - }, - } - } - - MeshToolRequest::ListSiblings { task_id } => { - // Get task to find parent - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return MeshRequestResult { - success: false, - message: format!("Task {} not found", task_id), - data: None, - } - } - Err(e) => { - return MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - } - } - }; - - let Some(parent_id) = task.parent_task_id else { - return MeshRequestResult { - success: true, - message: "Task has no parent, so no siblings".to_string(), - data: Some(json!({ "siblings": [] })), - }; - }; - - // Get all subtasks of parent, excluding current task - match repository::list_subtasks_for_owner(pool, parent_id, owner_id).await { - Ok(siblings) => { - let sibling_data: Vec<serde_json::Value> = siblings - .iter() - .filter(|t| t.id != task_id) - .map(|t| { - json!({ - "taskId": t.id, - "name": t.name, - "status": t.status, - "priority": t.priority, - }) - }) - .collect(); - - MeshRequestResult { - success: true, - message: format!("Found {} sibling tasks", sibling_data.len()), - data: Some(json!({ "siblings": sibling_data })), - } - } - Err(e) => MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - }, - } - } - - MeshToolRequest::ListDaemons => { - // Only list daemons belonging to this owner - let daemons: Vec<serde_json::Value> = state - .daemon_connections - .iter() - .filter(|entry| entry.value().owner_id == owner_id) - .map(|entry| { - let d = entry.value(); - json!({ - "daemonId": d.id, - "connectionId": d.connection_id, - "hostname": d.hostname, - "machineId": d.machine_id, - }) - }) - .collect(); - - MeshRequestResult { - success: true, - message: format!("{} daemon(s) connected", daemons.len()), - data: Some(json!({ "daemons": daemons })), - } - } - - MeshToolRequest::ListDaemonDirectories => { - let mut directories: Vec<serde_json::Value> = Vec::new(); - - // Only list directories from daemons belonging to this owner - for entry in state.daemon_connections.iter() { - let daemon = entry.value(); - - // Only include daemons belonging to this owner - if daemon.owner_id != owner_id { - continue; - } - - // Add working directory if available - if let Some(ref working_dir) = daemon.working_directory { - directories.push(json!({ - "path": working_dir, - "label": "Working Directory", - "directoryType": "working", - "hostname": daemon.hostname, - })); - } - - // Add home directory if available - if let Some(ref home_dir) = daemon.home_directory { - directories.push(json!({ - "path": home_dir, - "label": "Makima Home", - "directoryType": "home", - "hostname": daemon.hostname, - })); - } - } - - MeshRequestResult { - success: true, - message: format!("Found {} available directories", directories.len()), - data: Some(json!({ "directories": directories })), - } - } - - MeshToolRequest::ListFiles => { - match repository::list_files_for_owner(pool, owner_id).await { - Ok(files) => { - let file_data: Vec<serde_json::Value> = files - .iter() - .map(|f| { - json!({ - "fileId": f.id, - "name": f.name, - "description": f.description, - "createdAt": f.created_at, - "updatedAt": f.updated_at, - }) - }) - .collect(); - - MeshRequestResult { - success: true, - message: format!("Found {} files", files.len()), - data: Some(json!({ "files": file_data })), - } - } - Err(e) => MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - }, - } - } - - MeshToolRequest::ReadFile { file_id } => { - match repository::get_file_for_owner(pool, file_id, owner_id).await { - Ok(Some(file)) => { - // Convert body elements to readable text - let body_content: Vec<serde_json::Value> = file - .body - .iter() - .map(|elem| { - match elem { - crate::db::models::BodyElement::Heading { level, text } => { - json!({ "type": "heading", "level": level, "text": text }) - } - crate::db::models::BodyElement::Paragraph { text } => { - json!({ "type": "paragraph", "text": text }) - } - crate::db::models::BodyElement::Code { language, content } => { - json!({ "type": "code", "language": language, "content": content }) - } - crate::db::models::BodyElement::List { ordered, items } => { - json!({ "type": "list", "ordered": ordered, "items": items }) - } - crate::db::models::BodyElement::Chart { chart_type, title, data, config: _ } => { - let data_count = data.as_array().map(|arr| arr.len()).unwrap_or(0); - json!({ "type": "chart", "chartType": chart_type, "title": title, "dataPoints": data_count }) - } - crate::db::models::BodyElement::Image { src, alt, caption } => { - json!({ "type": "image", "src": src, "alt": alt, "caption": caption }) - } - crate::db::models::BodyElement::Markdown { content } => { - json!({ "type": "markdown", "content": content }) - } - } - }) - .collect(); - - // Also build a plain text version for easier reading - let plain_text: String = file - .body - .iter() - .filter_map(|elem| { - match elem { - crate::db::models::BodyElement::Heading { level, text } => { - Some(format!("{} {}", "#".repeat(*level as usize), text)) - } - crate::db::models::BodyElement::Paragraph { text } => { - Some(text.clone()) - } - crate::db::models::BodyElement::Code { language, content } => { - let lang = language.as_deref().unwrap_or(""); - Some(format!("```{}\n{}\n```", lang, content)) - } - crate::db::models::BodyElement::List { ordered, items } => { - let list_text: Vec<String> = items.iter().enumerate().map(|(i, item)| { - if *ordered { - format!("{}. {}", i + 1, item) - } else { - format!("- {}", item) - } - }).collect(); - Some(list_text.join("\n")) - } - crate::db::models::BodyElement::Markdown { content } => { - Some(content.clone()) - } - _ => None, - } - }) - .collect::<Vec<_>>() - .join("\n\n"); - - // Convert transcript entries to JSON - let transcript: Vec<serde_json::Value> = file - .transcript - .iter() - .map(|entry| { - json!({ - "id": entry.id, - "speaker": entry.speaker, - "start": entry.start, - "end": entry.end, - "text": entry.text, - }) - }) - .collect(); - - // Build a plain text transcript for easier reading - let transcript_text: String = file - .transcript - .iter() - .map(|entry| { - format!("[{:.1}s] {}: {}", entry.start, entry.speaker, entry.text) - }) - .collect::<Vec<_>>() - .join("\n"); - - MeshRequestResult { - success: true, - message: format!("Read file '{}'", file.name), - data: Some(json!({ - "fileId": file.id, - "name": file.name, - "description": file.description, - "summary": file.summary, - "body": body_content, - "plainText": plain_text, - "transcript": transcript, - "transcriptText": transcript_text, - "transcriptCount": file.transcript.len(), - "createdAt": file.created_at, - "updatedAt": file.updated_at, - })), - } - } - Ok(None) => MeshRequestResult { - success: false, - message: format!("File {} not found", file_id), - data: None, - }, - Err(e) => MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - }, - } - } - - MeshToolRequest::SendMessageToTask { task_id, message } => { - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return MeshRequestResult { - success: false, - message: format!("Task {} not found", task_id), - data: None, - } - } - Err(e) => { - return MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - } - } - }; - - if task.status != "running" { - return MeshRequestResult { - success: false, - message: format!("Task is not running (status: {})", task.status), - data: None, - }; - } - - if let Some(daemon_id) = task.daemon_id { - let command = DaemonCommand::SendMessage { task_id, message }; - match state.send_daemon_command(daemon_id, command).await { - Ok(()) => MeshRequestResult { - success: true, - message: "Message sent to task".to_string(), - data: Some(json!({ "taskId": task_id })), - }, - Err(e) => MeshRequestResult { - success: false, - message: format!("Failed to send message: {}", e), - data: None, - }, - } - } else { - MeshRequestResult { - success: false, - message: "Task has no daemon assigned".to_string(), - data: None, - } - } - } - - MeshToolRequest::UpdateTaskPlan { - task_id, - new_plan, - interrupt_if_running, - } => { - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return MeshRequestResult { - success: false, - message: format!("Task {} not found", task_id), - data: None, - } - } - Err(e) => { - return MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - } - } - }; - - // Interrupt if running and requested - if task.status == "running" && interrupt_if_running { - if let Some(daemon_id) = task.daemon_id { - let command = DaemonCommand::InterruptTask { - task_id, - graceful: true, - }; - let _ = state.send_daemon_command(daemon_id, command).await; - } - } - - let update_req = crate::db::models::UpdateTaskRequest { - plan: Some(new_plan), - version: Some(task.version), - ..Default::default() - }; - - match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { - Ok(Some(updated)) => { - state.broadcast_task_update(TaskUpdateNotification { - task_id, - owner_id: Some(task.owner_id), - version: updated.version, - status: updated.status.clone(), - updated_fields: vec!["plan".to_string()], - updated_by: "system".to_string(), - }); - MeshRequestResult { - success: true, - message: "Task plan updated".to_string(), - data: Some(json!({ "taskId": task_id })), - } - } - Ok(None) => MeshRequestResult { - success: false, - message: "Task not found".to_string(), - data: None, - }, - Err(e) => MeshRequestResult { - success: false, - message: format!("Failed to update task: {}", e), - data: None, - }, - } - } - - // Overlay operations - these require daemon communication - // For now, return placeholder responses since daemon implementation is separate - MeshToolRequest::PeekSiblingOverlay { sibling_task_id } => MeshRequestResult { - success: false, - message: format!( - "Overlay operations require a connected daemon. Task {} may not have overlay data yet.", - sibling_task_id - ), - data: None, - }, - - MeshToolRequest::GetOverlayDiff { task_id } => MeshRequestResult { - success: false, - message: format!( - "Overlay operations require a connected daemon. Task {} may not have overlay data yet.", - task_id - ), - data: None, - }, - - MeshToolRequest::PreviewMerge { task_id } => MeshRequestResult { - success: false, - message: format!( - "Merge preview requires a connected daemon. Task {} may not have overlay data yet.", - task_id - ), - data: None, - }, - - MeshToolRequest::MergeSubtask { task_id } => MeshRequestResult { - success: false, - message: format!( - "Merge operations require a connected daemon. Task {}", - task_id - ), - data: None, - }, - - MeshToolRequest::CompleteTask { task_id } => { - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return MeshRequestResult { - success: false, - message: format!("Task {} not found", task_id), - data: None, - } - } - Err(e) => { - return MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - } - } - }; - - // Update status to done - let update_req = crate::db::models::UpdateTaskRequest { - status: Some("done".to_string()), - version: Some(task.version), - ..Default::default() - }; - - match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { - Ok(Some(updated)) => { - state.broadcast_task_update(TaskUpdateNotification { - task_id, - owner_id: Some(task.owner_id), - version: updated.version, - status: "done".to_string(), - updated_fields: vec!["status".to_string()], - updated_by: "system".to_string(), - }); - let merge_mode = task.merge_mode.unwrap_or_else(|| "pr".to_string()); - MeshRequestResult { - success: true, - message: format!( - "Task {} completed. Merge mode: {}", - task_id, - &merge_mode - ), - data: Some(json!({ - "taskId": task_id, - "status": "done", - "mergeMode": merge_mode, - })), - } - } - Ok(None) => MeshRequestResult { - success: false, - message: "Task not found".to_string(), - data: None, - }, - Err(e) => MeshRequestResult { - success: false, - message: format!("Failed to complete task: {}", e), - data: None, - }, - } - } - - MeshToolRequest::SetMergeMode { task_id, mode } => { - let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { - Ok(Some(t)) => t, - Ok(None) => { - return MeshRequestResult { - success: false, - message: format!("Task {} not found", task_id), - data: None, - } - } - Err(e) => { - return MeshRequestResult { - success: false, - message: format!("Database error: {}", e), - data: None, - } - } - }; - - let update_req = crate::db::models::UpdateTaskRequest { - merge_mode: Some(mode.clone()), - version: Some(task.version), - ..Default::default() - }; - - match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { - Ok(Some(updated)) => { - state.broadcast_task_update(TaskUpdateNotification { - task_id, - owner_id: Some(task.owner_id), - version: updated.version, - status: updated.status, - updated_fields: vec!["merge_mode".to_string()], - updated_by: "system".to_string(), - }); - MeshRequestResult { - success: true, - message: format!("Merge mode set to '{}'", mode), - data: Some(json!({ "taskId": task_id, "mergeMode": mode })), - } - } - Ok(None) => MeshRequestResult { - success: false, - message: "Task not found".to_string(), - data: None, - }, - Err(e) => MeshRequestResult { - success: false, - message: format!("Failed to update merge mode: {}", e), - data: None, - }, - } - } - - // Supervisor-only tools - these should be handled via the supervisor.sh script, - // not through the mesh chat. Return an informative error. - MeshToolRequest::GetAllContractTasks { contract_id } => { - MeshRequestResult { - success: false, - message: format!( - "get_all_contract_tasks is a supervisor-only tool. Use supervisor.sh to access this functionality. Contract: {}", - contract_id - ), - data: None, - } - } - MeshToolRequest::WaitForTaskCompletion { task_id, timeout_seconds } => { - MeshRequestResult { - success: false, - message: format!( - "wait_for_task_completion is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}, Timeout: {}s", - task_id, timeout_seconds - ), - data: None, - } - } - MeshToolRequest::ReadTaskWorktree { task_id, file_path } => { - MeshRequestResult { - success: false, - message: format!( - "read_task_worktree is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}, Path: {}", - task_id, file_path - ), - data: None, - } - } - MeshToolRequest::SpawnTask { name, .. } => { - MeshRequestResult { - success: false, - message: format!( - "spawn_task is a supervisor-only tool. Only the contract supervisor can spawn new tasks. Task name: {}", - name - ), - data: None, - } - } - MeshToolRequest::CreateCheckpoint { task_id, message } => { - MeshRequestResult { - success: false, - message: format!( - "create_checkpoint is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}, Message: {}", - task_id, message - ), - data: None, - } - } - MeshToolRequest::ListTaskCheckpoints { task_id } => { - MeshRequestResult { - success: false, - message: format!( - "list_task_checkpoints is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}", - task_id - ), - data: None, - } - } - MeshToolRequest::GetTaskTree { task_id } => { - MeshRequestResult { - success: false, - message: format!( - "get_task_tree is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}", - task_id - ), - data: None, - } - } - } -} - -// ============================================================================= -// Chat History Endpoints -// ============================================================================= - -use crate::db::models::MeshChatHistoryResponse; - -/// Get chat history for the current conversation (requires authentication) -#[utoipa::path( - get, - path = "/api/v1/mesh/chat/history", - responses( - (status = 200, description = "Chat history", body = MeshChatHistoryResponse), - (status = 401, description = "Unauthorized"), - (status = 503, description = "Database not configured"), - (status = 500, description = "Internal server error") - ), - security( - ("bearer_auth" = []), - ("api_key" = []) - ), - tag = "Mesh" -)] -pub async fn get_chat_history( - State(state): State<SharedState>, - Authenticated(auth): Authenticated, -) -> impl IntoResponse { - let Some(ref pool) = state.db_pool else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(json!({ "error": "Database not configured" })), - ) - .into_response(); - }; - - let conversation = match repository::get_or_create_active_conversation(pool, auth.owner_id).await { - Ok(c) => c, - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": e.to_string() })), - ) - .into_response() - } - }; - - let messages = match repository::list_chat_messages(pool, conversation.id, None).await { - Ok(m) => m, - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": e.to_string() })), - ) - .into_response() - } - }; - - ( - StatusCode::OK, - Json(MeshChatHistoryResponse { - conversation_id: conversation.id, - messages, - }), - ) - .into_response() -} - -/// Clear chat history (archives current conversation and starts new, requires authentication) -#[utoipa::path( - delete, - path = "/api/v1/mesh/chat/history", - responses( - (status = 200, description = "History cleared"), - (status = 401, description = "Unauthorized"), - (status = 503, description = "Database not configured"), - (status = 500, description = "Internal server error") - ), - security( - ("bearer_auth" = []), - ("api_key" = []) - ), - tag = "Mesh" -)] -pub async fn clear_chat_history( - State(state): State<SharedState>, - Authenticated(auth): Authenticated, -) -> impl IntoResponse { - let Some(ref pool) = state.db_pool else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(json!({ "error": "Database not configured" })), - ) - .into_response(); - }; - - match repository::clear_conversation(pool, auth.owner_id).await { - Ok(new_conv) => ( - StatusCode::OK, - Json(json!({ "success": true, "conversationId": new_conv.id })), - ) - .into_response(), - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": e.to_string() })), - ) - .into_response(), - } -} diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index e5f0a81..19d2166 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -24,7 +24,6 @@ use uuid::Uuid; use crate::db::models::Task; use crate::db::repository; -use crate::llm::{check_deliverables_met, TaskInfo}; use crate::server::auth::{hash_api_key, API_KEY_HEADER}; use crate::server::messages::ApiError; use crate::server::state::{ @@ -609,71 +608,12 @@ struct DaemonAuthResult { owner_id: Uuid, } -/// Compute an action directive for the supervisor based on deliverable status. -/// Returns an [ACTION REQUIRED] message if all deliverables are met. -async fn compute_action_directive( - pool: &sqlx::PgPool, - contract_id: Uuid, - owner_id: Uuid, -) -> Option<String> { - // Get contract - let contract = match repository::get_contract_for_owner(pool, contract_id, owner_id).await { - Ok(Some(c)) => c, - _ => return None, - }; - - // Get tasks (non-supervisor only) - let tasks = match repository::list_tasks_by_contract(pool, contract_id, owner_id).await { - Ok(t) => t.into_iter().filter(|t| !t.is_supervisor).collect::<Vec<_>>(), - _ => return None, - }; - - // Get repositories - let repos = match repository::list_contract_repositories(pool, contract_id).await { - Ok(r) => r, - _ => return None, - }; - - // Get completed deliverables for the current phase - let completed_deliverables = contract.get_completed_deliverables(&contract.phase); - - let task_infos: Vec<TaskInfo> = tasks - .iter() - .map(|t| TaskInfo { - name: t.name.clone(), - status: t.status.clone(), - }) - .collect(); - - let has_repository = !repos.is_empty(); - - // Check deliverables (unused, but kept for future reference) - let _check = check_deliverables_met( - &contract.phase, - &contract.contract_type, - &completed_deliverables, - &task_infos, - has_repository, - ); - - // Generate directive based on deliverable status - if contract.phase == "execute" { - // Check if all tasks are done but PR deliverable is not marked complete - let all_tasks_done = !task_infos.is_empty() - && task_infos.iter().all(|t| t.status == "done"); - let pr_deliverable_complete = completed_deliverables.contains(&"pull-request".to_string()); - - if all_tasks_done && !pr_deliverable_complete { - let done_count = task_infos.len(); - return Some(format!( - "[INFO] All {} task(s) completed. System is auto-creating PR.", - done_count - )); - } - } - - None -} +// compute_action_directive removed alongside the LLM module — it used +// check_deliverables_met / TaskInfo from src/llm/phase_guidance.rs to +// nudge the supervisor with an "[INFO] all N tasks completed" message +// in the execute phase. Supervisors now receive `None` for the +// action_directive field; the auto-PR path below still fires when +// every non-supervisor task is done, so no behaviour is lost. /// Automatically create a PR when all non-supervisor tasks for a contract are done. /// Only applies to remote-repo contracts in the "execute" phase. @@ -1394,13 +1334,11 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re // Don't notify for supervisor tasks (they don't report to themselves) if !updated_task.is_supervisor { if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await { - // Compute action directive if task completed successfully - let action_directive = if updated_task.status == "done" { - compute_action_directive(&pool, contract_id, owner_id).await - } else { - None - }; - + // action_directive used to come from + // compute_action_directive (now removed alongside the + // LLM module). Passing None preserves the existing + // supervisor protocol; the auto-PR path below still + // fires when every task is done. state.notify_supervisor_of_task_completion( supervisor.id, supervisor.daemon_id, @@ -1409,7 +1347,7 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re &updated_task.status, updated_task.progress_summary.as_deref(), updated_task.error_message.as_deref(), - action_directive.as_deref(), + None, ).await; } } @@ -1812,8 +1750,14 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re // The request_id is the file_id we want to update if success { if let (Some(pool), Some(content)) = (&state.db_pool, content) { - // Convert markdown to body elements - let body = crate::llm::markdown_to_body(&content); + // Markdown → body. The full markdown parser lived in the + // (deleted) LLM module; we now wrap the raw markdown in a + // single Markdown body element so File records still round-trip. + // Lossless for the daemon-fetch flow because the editor + // re-parses the markdown content on display. + let body = vec![crate::db::models::BodyElement::Markdown { + content: content.clone(), + }]; // Update file in database let update_req = crate::db::models::UpdateFileRequest { diff --git a/makima/src/server/handlers/mod.rs b/makima/src/server/handlers/mod.rs index a39a4c0..5737360 100644 --- a/makima/src/server/handlers/mod.rs +++ b/makima/src/server/handlers/mod.rs @@ -2,9 +2,12 @@ //! //! Phase 5 removed: contract_chat, contract_daemon, contract_discuss, //! contracts, transcript_analysis. Contracts subsystem is gone. +//! +//! LLM removal removed: chat, mesh_chat, templates. LLM module is gone; +//! the chat-based UIs (file chat, mesh chat, discuss-contract, +//! contract-type templates) were the only consumers. pub mod api_keys; -pub mod chat; pub mod daemon_download; pub mod directive_documents; pub mod directives; @@ -13,7 +16,6 @@ pub mod files; pub mod history; pub mod listen; pub mod mesh; -pub mod mesh_chat; pub mod orders; pub mod mesh_daemon; pub mod mesh_merge; @@ -21,7 +23,6 @@ pub mod mesh_supervisor; pub mod mesh_ws; pub mod repository_history; pub mod speak; -pub mod templates; pub mod voice; pub mod users; pub mod versions; diff --git a/makima/src/server/handlers/templates.rs b/makima/src/server/handlers/templates.rs deleted file mode 100644 index aa97876..0000000 --- a/makima/src/server/handlers/templates.rs +++ /dev/null @@ -1,43 +0,0 @@ -//! Contract types API handler. -//! Only returns built-in contract types (simple, specification, execute). - -use axum::{ - http::StatusCode, - response::IntoResponse, - Json, -}; -use serde::Serialize; -use utoipa::ToSchema; - -use crate::llm::templates; -use crate::llm::templates::ContractTypeTemplate; - -// ============================================================================= -// Contract Type Templates (Built-in Only) -// ============================================================================= - -/// Response for listing contract types -#[derive(Debug, Serialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ListContractTypesResponse { - pub contract_types: Vec<ContractTypeTemplate>, -} - -/// List all available contract type templates (built-in only) -#[utoipa::path( - get, - path = "/api/v1/contract-types", - responses( - (status = 200, description = "Contract types retrieved successfully", body = ListContractTypesResponse) - ), - tag = "templates" -)] -pub async fn list_contract_types() -> impl IntoResponse { - // Only return built-in types (simple, specification, execute) - let contract_types = templates::all_contract_types(); - ( - StatusCode::OK, - Json(ListContractTypesResponse { contract_types }), - ) - .into_response() -} diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index a6c7787..bd48a8f 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -18,7 +18,7 @@ use tower_http::trace::TraceLayer; use utoipa::OpenApi; use utoipa_swagger_ui::SwaggerUi; -use crate::server::handlers::{api_keys, chat, daemon_download, directive_documents, directives, file_ws, files, history, listen, mesh, mesh_chat, mesh_daemon, mesh_merge, mesh_supervisor, mesh_ws, orders, repository_history, speak, templates, users, versions}; +use crate::server::handlers::{api_keys, daemon_download, directive_documents, directives, file_ws, files, history, listen, mesh, mesh_daemon, mesh_merge, mesh_supervisor, mesh_ws, orders, repository_history, speak, users, versions}; use crate::server::openapi::ApiDoc; use crate::server::state::SharedState; @@ -55,7 +55,6 @@ pub fn make_router(state: SharedState) -> Router { .put(files::update_file) .delete(files::delete_file), ) - .route("/files/{id}/chat", post(chat::chat_handler)) .route("/files/{id}/sync-from-repo", post(files::sync_file_from_repo)) // Version history endpoints .route("/files/{id}/versions", get(versions::list_versions)) @@ -88,12 +87,6 @@ pub fn make_router(state: SharedState) -> Router { .route("/mesh/tasks/{id}/check-target", post(mesh::check_target_exists)) .route("/mesh/tasks/{id}/reassign", post(mesh::reassign_task)) .route("/mesh/tasks/{id}/continue", post(mesh::continue_task)) - .route("/mesh/chat", post(mesh_chat::mesh_toplevel_chat_handler)) - .route( - "/mesh/chat/history", - get(mesh_chat::get_chat_history).delete(mesh_chat::clear_chat_history), - ) - .route("/mesh/tasks/{id}/chat", post(mesh_chat::mesh_chat_handler)) .route("/mesh/daemons", get(mesh::list_daemons)) .route("/mesh/daemons/directories", get(mesh::get_daemon_directories)) .route("/mesh/daemons/{id}", get(mesh::get_daemon)) @@ -279,8 +272,6 @@ pub fn make_router(state: SharedState) -> Router { .route("/orders/{id}/convert-to-step", post(orders::convert_to_step)) // Timeline endpoint (unified history for user) .route("/timeline", get(history::get_timeline)) - // Contract type templates (built-in only) - .route("/contract-types", get(templates::list_contract_types)) // Settings endpoints .route( "/settings/repository-history", diff --git a/makima/src/server/openapi.rs b/makima/src/server/openapi.rs index 5bbd0fe..13ba787 100644 --- a/makima/src/server/openapi.rs +++ b/makima/src/server/openapi.rs @@ -6,7 +6,7 @@ use crate::db::models::{ AddLocalRepositoryRequest, AddRemoteRepositoryRequest, BranchInfo, BranchListResponse, BranchTaskRequest, BranchTaskResponse, ChangePhaseRequest, - Contract, ContractChatHistoryResponse, ContractChatMessageRecord, ContractEvent, + Contract, ContractEvent, ContractListResponse, ContractRepository, ContractSummary, ContractWithRelations, CleanupResponse, CreateContractRequest, CreateDirectiveRequest, CreateDirectiveStepRequest, CreateFileRequest, @@ -31,7 +31,7 @@ use crate::server::auth::{ ApiKey, ApiKeyInfoResponse, CreateApiKeyRequest, CreateApiKeyResponse, RefreshApiKeyRequest, RefreshApiKeyResponse, RevokeApiKeyResponse, }; -use crate::server::handlers::{api_keys, directive_documents, directives, files, listen, mesh, mesh_chat, mesh_merge, orders, repository_history, users}; +use crate::server::handlers::{api_keys, directive_documents, directives, files, listen, mesh, mesh_merge, orders, repository_history, users}; use crate::server::messages::{ApiError, AudioEncoding, StartMessage, StopMessage, TranscriptMessage}; #[derive(OpenApi)] @@ -70,8 +70,6 @@ use crate::server::messages::{ApiError, AudioEncoding, StartMessage, StopMessage mesh::check_target_exists, mesh::get_task_patch_data, mesh::branch_task, - mesh_chat::get_chat_history, - mesh_chat::clear_chat_history, // Merge endpoints mesh_merge::list_branches, mesh_merge::merge_start, |
