//! Chat endpoint for LLM-powered task orchestration.
//!
//! This handler provides an agentic loop for managing tasks, daemons, and
//! overlay operations through LLM tool calling.
use axum::{
extract::{Path, State},
http::StatusCode,
response::IntoResponse,
Json,
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use utoipa::ToSchema;
use uuid::Uuid;
use crate::db::{models::CreateTaskRequest, repository};
use crate::llm::{
claude::{self, ClaudeClient, ClaudeError, ClaudeModel},
groq::{GroqClient, GroqError, Message, ToolCallResponse},
parse_mesh_tool_call, LlmModel, MeshToolRequest, ToolCall, ToolResult, UserQuestion,
MESH_TOOLS,
};
use crate::server::auth::Authenticated;
use crate::server::state::{DaemonCommand, SharedState, TaskUpdateNotification};
/// Maximum number of tool-calling rounds to prevent infinite loops
const MAX_TOOL_ROUNDS: usize = 30;
#[derive(Debug, Clone, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct MeshChatHistoryMessage {
/// Role: "user" or "assistant"
pub role: String,
/// Message content
pub content: String,
}
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct MeshChatRequest {
/// The user's message/instruction
pub message: String,
/// Optional model selection: "claude-sonnet" (default), "claude-opus", or "groq"
#[serde(default)]
pub model: Option<String>,
/// Optional conversation history for context continuity (deprecated - now loaded from DB)
#[serde(default)]
pub history: Option<Vec<MeshChatHistoryMessage>>,
/// Context type: "mesh", "task", or "subtask"
#[serde(default)]
pub context_type: Option<String>,
/// Task ID if context is task/subtask
#[serde(default)]
pub context_task_id: Option<Uuid>,
}
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct MeshChatResponse {
/// The LLM's response message
pub response: String,
/// Tool calls that were executed
pub tool_calls: Vec<MeshToolCallInfo>,
/// Questions pending user answers (pauses conversation)
#[serde(skip_serializing_if = "Option::is_none")]
pub pending_questions: Option<Vec<UserQuestion>>,
}
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct MeshToolCallInfo {
pub name: String,
pub result: ToolResult,
}
/// Enum to hold LLM clients
enum LlmClient {
Groq(GroqClient),
Claude(ClaudeClient),
}
/// Unified result from LLM call
struct LlmResult {
content: Option<String>,
tool_calls: Vec<ToolCall>,
raw_tool_calls: Vec<ToolCallResponse>,
finish_reason: String,
}
/// Chat with mesh orchestrator at the top level (no specific task context)
#[utoipa::path(
post,
path = "/api/v1/mesh/chat",
request_body = MeshChatRequest,
responses(
(status = 200, description = "Chat completed successfully", body = MeshChatResponse),
(status = 401, description = "Unauthorized"),
(status = 500, description = "Internal server error")
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn mesh_toplevel_chat_handler(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Json(request): Json<MeshChatRequest>,
) -> impl IntoResponse {
// Check if database is configured
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({ "error": "Database not configured" })),
)
.into_response();
};
// Parse model selection (default to Claude Sonnet)
let model = request
.model
.as_ref()
.and_then(|m| LlmModel::from_str(m))
.unwrap_or(LlmModel::ClaudeSonnet);
tracing::info!("Mesh top-level chat using LLM model: {:?}", model);
// Initialize the appropriate LLM client
let llm_client = match model {
LlmModel::ClaudeSonnet => match ClaudeClient::from_env(ClaudeModel::Sonnet) {
Ok(client) => LlmClient::Claude(client),
Err(ClaudeError::MissingApiKey) => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })),
)
.into_response();
}
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": format!("Claude client error: {}", e) })),
)
.into_response();
}
},
LlmModel::ClaudeOpus => match ClaudeClient::from_env(ClaudeModel::Opus) {
Ok(client) => LlmClient::Claude(client),
Err(ClaudeError::MissingApiKey) => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })),
)
.into_response();
}
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": format!("Claude client error: {}", e) })),
)
.into_response();
}
},
LlmModel::GroqKimi => match GroqClient::from_env() {
Ok(client) => LlmClient::Groq(client),
Err(GroqError::MissingApiKey) => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({ "error": "GROQ_API_KEY not configured" })),
)
.into_response();
}
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": format!("Groq client error: {}", e) })),
)
.into_response();
}
},
};
// Build context about all tasks and daemons
let mesh_context = build_mesh_overview_context(pool, &state, auth.owner_id).await;
// Build agentic system prompt for top-level mesh orchestration
let system_prompt = format!(
r#"You are an intelligent task orchestration agent. You help users manage and coordinate tasks running on connected daemons with Claude Code containers.
## Your Capabilities
You have access to tools for:
- **Task Lifecycle**: create_task, run_task, pause_task, resume_task, interrupt_task, discard_task
- **Task Queries**: query_task_status, list_tasks, list_subtasks, list_siblings, list_daemons
- **File Access**: list_files, read_file (read documents from the files system)
- **Task Communication**: send_message_to_task, update_task_plan
- **Overlay/Merge Operations**: peek_sibling_overlay, get_overlay_diff, preview_merge, merge_subtask, complete_task, set_merge_mode
## Current Mesh Overview
{mesh_context}
## Agentic Behavior Guidelines
### 1. Analyze Before Acting
- For complex orchestration requests, first gather information using query_task_status, list_tasks, or list_daemons
- Understand the current state before making changes
- For simple, direct requests (e.g., "create a new task"), you can act immediately
### 2. Plan Multi-Step Operations
- Break complex orchestration into logical steps
- For parallel execution: create multiple subtasks, then run them on different daemons
- For sequential execution: create subtasks and run them in order
### 3. Create and Manage Tasks
- Use create_task to create new top-level tasks or subtasks
- Assign appropriate priorities and plans
- **Repository Default**: When creating tasks, use the daemon's working directory as the repository_url by default (shown as "Default Repository" above). Only omit repository_url if the task doesn't involve code, or use a different URL if the user explicitly requests it.
- If a working directory is a git repository, use it as the repository_url for code-related tasks
### 4. Coordinate Multiple Tasks
- Use list_tasks to see all tasks and their statuses
- Use list_daemons to see available compute resources
- Balance workload across daemons
### 5. Be Efficient
- Don't over-analyze simple requests
- Use the minimum number of tool calls needed
- Provide clear summaries of actions taken
## Important Notes
- Task IDs are UUIDs - ensure you use the correct format
- Running a task requires at least one connected daemon
- When creating subtasks, specify the parent_task_id
- Always confirm destructive operations (discard_task) with the user"#,
mesh_context = mesh_context
);
// Run the shared agentic loop
run_mesh_agentic_loop(pool, &state, &llm_client, system_prompt, &request, auth.owner_id).await
}
/// Chat with task mesh orchestrator using LLM tool calling (scoped by owner)
#[utoipa::path(
post,
path = "/api/v1/mesh/tasks/{id}/chat",
request_body = MeshChatRequest,
responses(
(status = 200, description = "Chat completed successfully", body = MeshChatResponse),
(status = 401, description = "Unauthorized"),
(status = 404, description = "Task not found"),
(status = 500, description = "Internal server error")
),
params(
("id" = Uuid, Path, description = "Task ID (context for orchestration)")
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn mesh_chat_handler(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
Path(task_id): Path<Uuid>,
Json(request): Json<MeshChatRequest>,
) -> impl IntoResponse {
// Check if database is configured
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({ "error": "Database not configured" })),
)
.into_response();
};
// Get the context task (scoped by owner)
let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
Ok(Some(task)) => task,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({ "error": "Task not found" })),
)
.into_response();
}
Err(e) => {
tracing::error!("Database error: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": format!("Database error: {}", e) })),
)
.into_response();
}
};
// Parse model selection (default to Claude Sonnet)
let model = request
.model
.as_ref()
.and_then(|m| LlmModel::from_str(m))
.unwrap_or(LlmModel::ClaudeSonnet);
tracing::info!("Mesh chat using LLM model: {:?}", model);
// Initialize the appropriate LLM client
let llm_client = match model {
LlmModel::ClaudeSonnet => match ClaudeClient::from_env(ClaudeModel::Sonnet) {
Ok(client) => LlmClient::Claude(client),
Err(ClaudeError::MissingApiKey) => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })),
)
.into_response();
}
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": format!("Claude client error: {}", e) })),
)
.into_response();
}
},
LlmModel::ClaudeOpus => match ClaudeClient::from_env(ClaudeModel::Opus) {
Ok(client) => LlmClient::Claude(client),
Err(ClaudeError::MissingApiKey) => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })),
)
.into_response();
}
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": format!("Claude client error: {}", e) })),
)
.into_response();
}
},
LlmModel::GroqKimi => match GroqClient::from_env() {
Ok(client) => LlmClient::Groq(client),
Err(GroqError::MissingApiKey) => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({ "error": "GROQ_API_KEY not configured" })),
)
.into_response();
}
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": format!("Groq client error: {}", e) })),
)
.into_response();
}
},
};
// Build context about the current task and mesh state
let task_context = build_task_context(&task);
// Build agentic system prompt for task orchestration
let system_prompt = format!(
r#"You are an intelligent task orchestration agent. You help users manage and coordinate tasks running on connected daemons with Claude Code containers.
## Your Capabilities
You have access to tools for:
- **Task Lifecycle**: create_task, run_task, pause_task, resume_task, interrupt_task, discard_task
- **Task Queries**: query_task_status, list_tasks, list_subtasks, list_siblings, list_daemons
- **File Access**: list_files, read_file (read documents from the files system)
- **Task Communication**: send_message_to_task, update_task_plan
- **Overlay/Merge Operations**: peek_sibling_overlay, get_overlay_diff, preview_merge, merge_subtask, complete_task, set_merge_mode
## Current Context
{task_context}
## Agentic Behavior Guidelines
### 1. Analyze Before Acting
- For complex orchestration requests, first gather information using query_task_status, list_tasks, or list_daemons
- Understand the current state before making changes
- For simple, direct requests (e.g., "pause this task"), you can act immediately
### 2. Plan Multi-Step Operations
- Break complex orchestration into logical steps
- For parallel execution: create multiple subtasks, then run them on different daemons
- For sequential execution: create subtasks and run them in order
### 3. Monitor Task Progress
- Use query_task_status to check on running tasks
- Watch for status changes and react accordingly
- Handle failures gracefully (retry, escalate, or report)
### 4. Coordinate Sibling Tasks
- Use peek_sibling_overlay to see what other tasks have changed
- Preview merges before completing to catch conflicts
- Coordinate timing when multiple tasks need to merge
### 5. Be Efficient
- Don't over-analyze simple requests
- Use the minimum number of tool calls needed
- Provide clear summaries of actions taken
## Important Notes
- Task IDs are UUIDs - ensure you use the correct format
- Running a task requires at least one connected daemon
- Overlay operations require the task to have been run at least once
- Always confirm destructive operations (discard_task) with the user
- When creating subtasks for this task, use parent_task_id: {task_id}"#,
task_context = task_context,
task_id = task_id
);
// Run the shared agentic loop
run_mesh_agentic_loop(pool, &state, &llm_client, system_prompt, &request, auth.owner_id).await
}
fn build_task_context(task: &crate::db::models::Task) -> String {
let mut context = format!(
"Current Task: {} (ID: {})\n",
task.name, task.id
);
context.push_str(&format!("Status: {}\n", task.status));
context.push_str(&format!("Priority: {}\n", task.priority));
if let Some(ref desc) = task.description {
context.push_str(&format!("Description: {}\n", desc));
}
// Truncate plan preview if too long
let plan_preview = if task.plan.len() > 200 {
format!("{}...", &task.plan[..200])
} else {
task.plan.clone()
};
context.push_str(&format!("Plan: {}\n", plan_preview));
if let Some(ref summary) = task.progress_summary {
context.push_str(&format!("Progress: {}\n", summary));
}
if let Some(ref error) = task.error_message {
context.push_str(&format!("Error: {}\n", error));
}
// Repository info
if let Some(ref url) = task.repository_url {
context.push_str(&format!("Repository: {}\n", url));
}
if let Some(ref branch) = task.base_branch {
context.push_str(&format!("Base branch: {}\n", branch));
}
context
}
/// Build overview context for top-level mesh orchestration
async fn build_mesh_overview_context(pool: &sqlx::PgPool, state: &SharedState, owner_id: Uuid) -> String {
let mut context = String::new();
// Get task counts by status
match repository::list_tasks_for_owner(pool, owner_id).await {
Ok(tasks) => {
let total = tasks.len();
let pending = tasks.iter().filter(|t| t.status == "pending").count();
let running = tasks.iter().filter(|t| t.status == "running").count();
let paused = tasks.iter().filter(|t| t.status == "paused").count();
let done = tasks.iter().filter(|t| t.status == "done").count();
let failed = tasks.iter().filter(|t| t.status == "failed").count();
context.push_str(&format!(
"Tasks: {} total ({} pending, {} running, {} paused, {} done, {} failed)\n",
total, pending, running, paused, done, failed
));
// List recent/active tasks
if !tasks.is_empty() {
context.push_str("\nRecent Tasks:\n");
for task in tasks.iter().take(5) {
context.push_str(&format!(
" - {} (ID: {}, Status: {})\n",
task.name, task.id, task.status
));
}
if tasks.len() > 5 {
context.push_str(&format!(" ... and {} more\n", tasks.len() - 5));
}
}
}
Err(e) => {
context.push_str(&format!("Error fetching tasks: {}\n", e));
}
}
// Get connected daemons for this owner
let owner_daemons: Vec<_> = state.daemon_connections.iter()
.filter(|e| e.value().owner_id == owner_id)
.collect();
let daemon_count = owner_daemons.len();
context.push_str(&format!("\nConnected Daemons: {}\n", daemon_count));
for entry in owner_daemons.iter().take(3) {
let daemon = entry.value();
let working_dir = daemon.working_directory.as_deref().unwrap_or("not set");
context.push_str(&format!(
" - {} (ID: {}, Working Directory: {})\n",
daemon.hostname.as_deref().unwrap_or("unknown"),
daemon.id,
working_dir
));
}
// Add default repository guidance if there's exactly one daemon with a working directory
let daemons_with_working_dir: Vec<_> = owner_daemons.iter()
.filter(|e| e.value().working_directory.is_some())
.collect();
if daemons_with_working_dir.len() == 1 {
if let Some(dir) = &daemons_with_working_dir[0].value().working_directory {
context.push_str(&format!(
"\nDefault Repository: {} (use this as repository_url when creating tasks unless user specifies otherwise)\n",
dir
));
}
}
context
}
/// Run the shared agentic loop for mesh chat
async fn run_mesh_agentic_loop(
pool: &sqlx::PgPool,
state: &SharedState,
llm_client: &LlmClient,
system_prompt: String,
request: &MeshChatRequest,
owner_id: Uuid,
) -> axum::response::Response {
// Get or create conversation for storing messages
let conversation = match repository::get_or_create_active_conversation(pool, owner_id).await {
Ok(c) => c,
Err(e) => {
tracing::error!("Failed to get/create conversation: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": format!("Failed to initialize conversation: {}", e) })),
)
.into_response();
}
};
// Build initial messages
let mut messages = vec![Message {
role: "system".to_string(),
content: Some(system_prompt),
tool_calls: None,
tool_call_id: None,
}];
// Load conversation history from database (or use provided for backwards compatibility)
if let Some(history) = &request.history {
// Legacy: use provided history
for hist_msg in history {
messages.push(Message {
role: hist_msg.role.clone(),
content: Some(hist_msg.content.clone()),
tool_calls: None,
tool_call_id: None,
});
}
tracing::info!(
history_messages = history.len(),
"Loaded mesh conversation history from request (legacy)"
);
} else {
// New: load from database
match repository::list_chat_messages(pool, conversation.id, Some(50)).await {
Ok(db_messages) => {
for msg in db_messages {
messages.push(Message {
role: msg.role.clone(),
content: Some(msg.content.clone()),
tool_calls: None,
tool_call_id: None,
});
}
tracing::info!(
history_messages = messages.len() - 1, // minus system message
"Loaded mesh conversation history from database"
);
}
Err(e) => {
tracing::warn!("Failed to load chat history: {}", e);
// Continue without history
}
}
}
// Add current user message
messages.push(Message {
role: "user".to_string(),
content: Some(request.message.clone()),
tool_calls: None,
tool_call_id: None,
});
// State for tracking
let mut all_tool_call_infos: Vec<MeshToolCallInfo> = Vec::new();
let mut final_response: Option<String> = None;
let mut consecutive_failures = 0;
const MAX_CONSECUTIVE_FAILURES: usize = 3;
let mut pending_questions: Option<Vec<UserQuestion>> = None;
// Multi-turn agentic tool calling loop
for round in 0..MAX_TOOL_ROUNDS {
tracing::info!(
round = round,
total_tool_calls = all_tool_call_infos.len(),
"Mesh agentic loop iteration"
);
// Check consecutive failures
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
tracing::warn!(
"Breaking mesh loop due to {} consecutive failures",
consecutive_failures
);
final_response = Some(
"I encountered multiple consecutive errors and stopped. \
Please check the task state and try again."
.to_string(),
);
break;
}
// Call the appropriate LLM API
let result = match llm_client {
LlmClient::Groq(groq) => {
match groq.chat_with_tools(messages.clone(), &MESH_TOOLS).await {
Ok(r) => LlmResult {
content: r.content,
tool_calls: r.tool_calls,
raw_tool_calls: r.raw_tool_calls,
finish_reason: r.finish_reason,
},
Err(e) => {
tracing::error!("Groq API error: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": format!("LLM API error: {}", e) })),
)
.into_response();
}
}
}
LlmClient::Claude(claude_client) => {
let claude_messages = claude::groq_messages_to_claude(&messages);
match claude_client
.chat_with_tools(claude_messages, &MESH_TOOLS)
.await
{
Ok(r) => {
let raw_tool_calls: Vec<ToolCallResponse> = r
.tool_calls
.iter()
.map(|tc| ToolCallResponse {
id: tc.id.clone(),
call_type: "function".to_string(),
function: crate::llm::groq::FunctionCall {
name: tc.name.clone(),
arguments: tc.arguments.to_string(),
},
})
.collect();
LlmResult {
content: r.content,
tool_calls: r.tool_calls,
raw_tool_calls,
finish_reason: r.stop_reason,
}
}
Err(e) => {
tracing::error!("Claude API error: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": format!("LLM API error: {}", e) })),
)
.into_response();
}
}
}
};
// Check if there are tool calls to execute
if result.tool_calls.is_empty() {
final_response = result.content;
break;
}
// Add assistant message with tool calls to conversation
messages.push(Message {
role: "assistant".to_string(),
content: result.content.clone(),
tool_calls: Some(result.raw_tool_calls.clone()),
tool_call_id: None,
});
// Execute each tool call
for (i, tool_call) in result.tool_calls.iter().enumerate() {
tracing::info!(tool = %tool_call.name, round = round, "Executing mesh tool call");
// Parse the tool call
let mut execution_result = parse_mesh_tool_call(tool_call);
// Handle async mesh tool requests
if let Some(mesh_request) = execution_result.request.take() {
let async_result = handle_mesh_request(pool, state, mesh_request, owner_id).await;
execution_result.success = async_result.success;
execution_result.message = async_result.message;
execution_result.data = async_result.data;
}
// Track consecutive failures
if execution_result.success {
consecutive_failures = 0;
} else {
consecutive_failures += 1;
tracing::warn!(
tool = %tool_call.name,
consecutive_failures = consecutive_failures,
"Mesh tool call failed"
);
}
// Check for pending user questions
if let Some(questions) = execution_result.pending_questions {
tracing::info!(
question_count = questions.len(),
"Mesh LLM requesting user input"
);
pending_questions = Some(questions);
all_tool_call_infos.push(MeshToolCallInfo {
name: tool_call.name.clone(),
result: ToolResult {
success: execution_result.success,
message: execution_result.message.clone(),
},
});
break;
}
// Build tool result message
let result_content = if let Some(data) = &execution_result.data {
json!({
"success": execution_result.success,
"message": execution_result.message,
"data": data
})
.to_string()
} else {
json!({
"success": execution_result.success,
"message": execution_result.message
})
.to_string()
};
// Add tool result message
let tool_call_id = match llm_client {
LlmClient::Groq(_) => result.raw_tool_calls[i].id.clone(),
LlmClient::Claude(_) => tool_call.id.clone(),
};
messages.push(Message {
role: "tool".to_string(),
content: Some(result_content),
tool_calls: None,
tool_call_id: Some(tool_call_id),
});
// Track for response
all_tool_call_infos.push(MeshToolCallInfo {
name: tool_call.name.clone(),
result: ToolResult {
success: execution_result.success,
message: execution_result.message,
},
});
}
// If user questions are pending, pause
if pending_questions.is_some() {
final_response = result.content;
break;
}
// If finish reason indicates completion, exit loop
let finish_lower = result.finish_reason.to_lowercase();
if finish_lower == "stop" || finish_lower == "end_turn" {
final_response = result.content;
break;
}
}
// Build response
let response_text = final_response.unwrap_or_else(|| {
if all_tool_call_infos.is_empty() {
"I couldn't understand your request. Please try rephrasing.".to_string()
} else {
format!(
"Done! Executed {} tool{}.",
all_tool_call_infos.len(),
if all_tool_call_infos.len() == 1 {
""
} else {
"s"
}
)
}
});
// Save messages to database (only if not using legacy history mode)
if request.history.is_none() {
let context_type = request.context_type.clone().unwrap_or_else(|| "mesh".to_string());
// Validate context_task_id exists before using it (to avoid FK constraint violation)
let context_task_id = if let Some(task_id) = request.context_task_id {
match repository::get_task_for_owner(pool, task_id, owner_id).await {
Ok(Some(_)) => Some(task_id),
Ok(None) => {
tracing::warn!("context_task_id {} not found, ignoring", task_id);
None
}
Err(e) => {
tracing::warn!("Failed to validate context_task_id {}: {}", task_id, e);
None
}
}
} else {
None
};
// Save user message
if let Err(e) = repository::add_chat_message(
pool,
conversation.id,
"user",
&request.message,
&context_type,
context_task_id,
None,
None,
)
.await
{
tracing::warn!("Failed to save user message to DB: {}", e);
}
// Serialize tool calls for storage
let tool_calls_json = if all_tool_call_infos.is_empty() {
None
} else {
Some(serde_json::to_value(&all_tool_call_infos).unwrap_or_default())
};
// Serialize pending questions for storage
let pending_questions_json = pending_questions
.as_ref()
.map(|q| serde_json::to_value(q).unwrap_or_default());
// Save assistant message
if let Err(e) = repository::add_chat_message(
pool,
conversation.id,
"assistant",
&response_text,
&context_type,
context_task_id,
tool_calls_json,
pending_questions_json,
)
.await
{
tracing::warn!("Failed to save assistant message to DB: {}", e);
}
tracing::info!(
conversation_id = %conversation.id,
context_type = %context_type,
"Saved mesh chat messages to database"
);
}
(
StatusCode::OK,
Json(MeshChatResponse {
response: response_text,
tool_calls: all_tool_call_infos,
pending_questions,
}),
)
.into_response()
}
/// Result from handling an async mesh tool request
struct MeshRequestResult {
success: bool,
message: String,
data: Option<serde_json::Value>,
}
/// Handle async mesh tool requests that require database/daemon access
async fn handle_mesh_request(
pool: &sqlx::PgPool,
state: &SharedState,
request: MeshToolRequest,
owner_id: Uuid,
) -> MeshRequestResult {
match request {
MeshToolRequest::CreateTask {
name,
plan,
parent_task_id,
repository_url,
base_branch,
merge_mode,
priority,
} => {
// Subtasks inherit contract_id from parent task
let contract_id = if let Some(parent_id) = parent_task_id {
match repository::get_task(pool, parent_id).await {
Ok(Some(parent_task)) => {
match parent_task.contract_id {
Some(cid) => cid,
None => {
return MeshRequestResult {
success: false,
message: "Parent task has no contract_id".to_string(),
data: None,
};
}
}
}
Ok(None) => {
return MeshRequestResult {
success: false,
message: format!("Parent task {} not found", parent_id),
data: None,
};
}
Err(e) => {
return MeshRequestResult {
success: false,
message: format!("Failed to look up parent task: {}", e),
data: None,
};
}
}
} else {
// Root tasks created via LLM chat require a contract_id
// TODO: Add contract_id to create_task tool definition
return MeshRequestResult {
success: false,
message: "Cannot create root task without contract_id. Use parent_task_id to create subtasks.".to_string(),
data: None,
};
};
// Check if repository_url matches a daemon's working directory (for this owner)
let is_daemon_working_dir = repository_url.as_ref().map(|url| {
state.daemon_connections.iter().any(|entry| {
entry.value().owner_id == owner_id &&
entry.value().working_directory.as_ref() == Some(url)
})
}).unwrap_or(false);
// Derive completion_action from merge_mode, or default to "branch" if using daemon working dir
let (completion_action, target_repo_path) = if let Some(ref mode) = merge_mode {
// Explicit merge_mode provided - derive from it
let action = match mode.as_str() {
"pr" => "pr".to_string(),
"auto" => "merge".to_string(),
"manual" => "branch".to_string(),
_ => "none".to_string(),
};
// If using daemon working dir and action involves the repo, set target_repo_path
let target = if is_daemon_working_dir && action != "none" {
repository_url.clone()
} else {
None
};
(Some(action), target)
} else if is_daemon_working_dir {
// No merge_mode but using daemon working dir - default to "pr"
(Some("pr".to_string()), repository_url.clone())
} else {
(None, None)
};
let create_req = CreateTaskRequest {
contract_id: Some(contract_id),
name: name.clone(),
description: None,
plan,
parent_task_id,
repository_url,
base_branch,
target_branch: None,
merge_mode,
priority: priority.unwrap_or(0),
target_repo_path,
completion_action,
continue_from_task_id: None,
copy_files: None,
is_supervisor: false,
checkpoint_sha: None,
branched_from_task_id: None,
conversation_history: None,
supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: None,
directive_step_id: None,
};
match repository::create_task_for_owner(pool, owner_id, create_req).await {
Ok(task) => MeshRequestResult {
success: true,
message: format!("Created task '{}' with ID {}", name, task.id),
data: Some(json!({
"taskId": task.id,
"name": task.name,
"status": task.status,
})),
},
Err(e) => MeshRequestResult {
success: false,
message: format!("Failed to create task: {}", e),
data: None,
},
}
}
MeshToolRequest::RunTask { task_id, daemon_id } => {
// Get task to check status
let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return MeshRequestResult {
success: false,
message: format!("Task {} not found", task_id),
data: None,
}
}
Err(e) => {
return MeshRequestResult {
success: false,
message: format!("Database error: {}", e),
data: None,
}
}
};
if task.status != "pending" && task.status != "paused" {
return MeshRequestResult {
success: false,
message: format!(
"Task cannot be run - status is '{}' (must be 'pending' or 'paused')",
task.status
),
data: None,
};
}
// Find a daemon to run on (must belong to this owner)
let target_daemon_id = if let Some(id) = daemon_id {
// Verify the specified daemon belongs to this owner
if !state.daemon_connections.iter().any(|d| d.value().id == id && d.value().owner_id == owner_id) {
return MeshRequestResult {
success: false,
message: "Specified daemon not found or not accessible.".to_string(),
data: None,
};
}
id
} else {
// Find any connected daemon for this owner
let daemon = state.daemon_connections.iter().find(|d| d.value().owner_id == owner_id);
match daemon {
Some(d) => d.value().id,
None => {
return MeshRequestResult {
success: false,
message: "No daemons connected for your account. Cannot run task.".to_string(),
data: None,
}
}
}
};
// Check if this is an orchestrator (depth 0 with subtasks)
let subtask_count = match repository::list_subtasks_for_owner(pool, task_id, owner_id).await {
Ok(subtasks) => subtasks.len(),
Err(_) => 0,
};
let is_orchestrator = task.depth == 0 && subtask_count > 0;
// IMPORTANT: Update database FIRST to assign daemon_id before sending command
// This prevents race conditions where the task starts but daemon_id is not set
let update_req = crate::db::models::UpdateTaskRequest {
status: Some("starting".to_string()),
daemon_id: Some(target_daemon_id),
version: Some(task.version),
..Default::default()
};
let updated_task = match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await {
Ok(Some(t)) => t,
Ok(None) => {
return MeshRequestResult {
success: false,
message: format!("Task {} not found", task_id),
data: None,
}
}
Err(e) => {
return MeshRequestResult {
success: false,
message: format!("Failed to update task: {}", e),
data: None,
}
}
};
// Get local_only and auto_merge_local from contract if task has one
let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id {
match repository::get_contract_for_owner(pool, contract_id, owner_id).await {
Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local),
_ => (false, false),
}
} else {
(false, false)
};
// Send SpawnTask command to daemon
let command = DaemonCommand::SpawnTask {
task_id,
task_name: task.name.clone(),
plan: task.plan.clone(),
repo_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
target_branch: task.target_branch.clone(),
parent_task_id: task.parent_task_id,
depth: task.depth,
is_orchestrator,
target_repo_path: task.target_repo_path.clone(),
completion_action: task.completion_action.clone(),
continue_from_task_id: task.continue_from_task_id,
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
autonomous_loop: false,
resume_session: false,
conversation_history: None,
patch_data: None,
patch_base_sha: None,
local_only,
auto_merge_local,
supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: task.directive_id,
};
match state.send_daemon_command(target_daemon_id, command).await {
Ok(()) => {
state.broadcast_task_update(TaskUpdateNotification {
task_id,
owner_id: Some(task.owner_id),
version: updated_task.version,
status: "starting".to_string(),
updated_fields: vec!["status".to_string(), "daemon_id".to_string()],
updated_by: "system".to_string(),
});
MeshRequestResult {
success: true,
message: format!("Task {} is now running on daemon {}", task_id, target_daemon_id),
data: Some(json!({
"taskId": task_id,
"daemonId": target_daemon_id,
"status": "starting",
})),
}
}
Err(e) => {
// Rollback: clear daemon_id and reset status since command failed
let rollback_req = crate::db::models::UpdateTaskRequest {
status: Some("pending".to_string()),
clear_daemon_id: true,
..Default::default()
};
let _ = repository::update_task_for_owner(pool, task_id, owner_id, rollback_req).await;
MeshRequestResult {
success: false,
message: format!("Failed to start task: {}", e),
data: None,
}
}
}
}
MeshToolRequest::PauseTask { task_id } => {
// Get task and its daemon
let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return MeshRequestResult {
success: false,
message: format!("Task {} not found", task_id),
data: None,
}
}
Err(e) => {
return MeshRequestResult {
success: false,
message: format!("Database error: {}", e),
data: None,
}
}
};
if task.status != "running" {
return MeshRequestResult {
success: false,
message: format!("Task is not running (status: {})", task.status),
data: None,
};
}
if let Some(daemon_id) = task.daemon_id {
let command = DaemonCommand::PauseTask { task_id };
if let Err(e) = state.send_daemon_command(daemon_id, command).await {
return MeshRequestResult {
success: false,
message: format!("Failed to pause task: {}", e),
data: None,
};
}
}
// Update status
let update_req = crate::db::models::UpdateTaskRequest {
status: Some("paused".to_string()),
version: Some(task.version),
..Default::default()
};
if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await {
state.broadcast_task_update(TaskUpdateNotification {
task_id,
owner_id: Some(task.owner_id),
version: updated.version,
status: "paused".to_string(),
updated_fields: vec!["status".to_string()],
updated_by: "system".to_string(),
});
}
MeshRequestResult {
success: true,
message: format!("Task {} paused", task_id),
data: Some(json!({ "taskId": task_id, "status": "paused" })),
}
}
MeshToolRequest::ResumeTask { task_id } => {
let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return MeshRequestResult {
success: false,
message: format!("Task {} not found", task_id),
data: None,
}
}
Err(e) => {
return MeshRequestResult {
success: false,
message: format!("Database error: {}", e),
data: None,
}
}
};
if task.status != "paused" {
return MeshRequestResult {
success: false,
message: format!("Task is not paused (status: {})", task.status),
data: None,
};
}
if let Some(daemon_id) = task.daemon_id {
let command = DaemonCommand::ResumeTask { task_id };
if let Err(e) = state.send_daemon_command(daemon_id, command).await {
return MeshRequestResult {
success: false,
message: format!("Failed to resume task: {}", e),
data: None,
};
}
}
// Update status
let update_req = crate::db::models::UpdateTaskRequest {
status: Some("running".to_string()),
version: Some(task.version),
..Default::default()
};
if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await {
state.broadcast_task_update(TaskUpdateNotification {
task_id,
owner_id: Some(task.owner_id),
version: updated.version,
status: "running".to_string(),
updated_fields: vec!["status".to_string()],
updated_by: "system".to_string(),
});
}
MeshRequestResult {
success: true,
message: format!("Task {} resumed", task_id),
data: Some(json!({ "taskId": task_id, "status": "running" })),
}
}
MeshToolRequest::InterruptTask { task_id, graceful } => {
let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return MeshRequestResult {
success: false,
message: format!("Task {} not found", task_id),
data: None,
}
}
Err(e) => {
return MeshRequestResult {
success: false,
message: format!("Database error: {}", e),
data: None,
}
}
};
if let Some(daemon_id) = task.daemon_id {
let command = DaemonCommand::InterruptTask { task_id, graceful };
if let Err(e) = state.send_daemon_command(daemon_id, command).await {
return MeshRequestResult {
success: false,
message: format!("Failed to interrupt task: {}", e),
data: None,
};
}
}
// Update status
let update_req = crate::db::models::UpdateTaskRequest {
status: Some("paused".to_string()),
version: Some(task.version),
..Default::default()
};
if let Ok(Some(updated)) = repository::update_task_for_owner(pool, task_id, owner_id, update_req).await {
state.broadcast_task_update(TaskUpdateNotification {
task_id,
owner_id: Some(task.owner_id),
version: updated.version,
status: "paused".to_string(),
updated_fields: vec!["status".to_string()],
updated_by: "system".to_string(),
});
}
MeshRequestResult {
success: true,
message: format!(
"Task {} {}interrupted",
task_id,
if graceful { "gracefully " } else { "" }
),
data: Some(json!({ "taskId": task_id, "status": "paused" })),
}
}
MeshToolRequest::DiscardTask { task_id } => {
match repository::delete_task_for_owner(pool, task_id, owner_id).await {
Ok(true) => MeshRequestResult {
success: true,
message: format!("Task {} discarded", task_id),
data: Some(json!({ "taskId": task_id, "deleted": true })),
},
Ok(false) => MeshRequestResult {
success: false,
message: format!("Task {} not found", task_id),
data: None,
},
Err(e) => MeshRequestResult {
success: false,
message: format!("Failed to delete task: {}", e),
data: None,
},
}
}
MeshToolRequest::QueryTaskStatus { task_id } => {
match repository::get_task_for_owner(pool, task_id, owner_id).await {
Ok(Some(task)) => MeshRequestResult {
success: true,
message: format!("Task '{}' is {}", task.name, task.status),
data: Some(json!({
"taskId": task.id,
"name": task.name,
"status": task.status,
"priority": task.priority,
"description": task.description,
"plan": task.plan,
"progressSummary": task.progress_summary,
"errorMessage": task.error_message,
"repositoryUrl": task.repository_url,
"baseBranch": task.base_branch,
"targetBranch": task.target_branch,
"mergeMode": task.merge_mode,
"prUrl": task.pr_url,
"daemonId": task.daemon_id,
"containerId": task.container_id,
"createdAt": task.created_at,
"startedAt": task.started_at,
"completedAt": task.completed_at,
})),
},
Ok(None) => MeshRequestResult {
success: false,
message: format!("Task {} not found", task_id),
data: None,
},
Err(e) => MeshRequestResult {
success: false,
message: format!("Database error: {}", e),
data: None,
},
}
}
MeshToolRequest::ListTasks {
status_filter,
parent_task_id,
} => {
// TODO: Add filtering support to repository
match repository::list_tasks_for_owner(pool, owner_id).await {
Ok(mut tasks) => {
// Apply filters
if let Some(ref status) = status_filter {
tasks.retain(|t| &t.status == status);
}
if let Some(ref parent_id) = parent_task_id {
tasks.retain(|t| t.parent_task_id.as_ref() == Some(parent_id));
}
let task_data: Vec<serde_json::Value> = tasks
.iter()
.map(|t| {
json!({
"taskId": t.id,
"name": t.name,
"status": t.status,
"priority": t.priority,
"parentTaskId": t.parent_task_id,
})
})
.collect();
MeshRequestResult {
success: true,
message: format!("Found {} tasks", tasks.len()),
data: Some(json!({ "tasks": task_data })),
}
}
Err(e) => MeshRequestResult {
success: false,
message: format!("Database error: {}", e),
data: None,
},
}
}
MeshToolRequest::ListSubtasks { task_id } => {
match repository::list_subtasks_for_owner(pool, task_id, owner_id).await {
Ok(subtasks) => {
let subtask_data: Vec<serde_json::Value> = subtasks
.iter()
.map(|t| {
json!({
"taskId": t.id,
"name": t.name,
"status": t.status,
"priority": t.priority,
})
})
.collect();
MeshRequestResult {
success: true,
message: format!("Found {} subtasks", subtasks.len()),
data: Some(json!({ "subtasks": subtask_data })),
}
}
Err(e) => MeshRequestResult {
success: false,
message: format!("Database error: {}", e),
data: None,
},
}
}
MeshToolRequest::ListSiblings { task_id } => {
// Get task to find parent
let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return MeshRequestResult {
success: false,
message: format!("Task {} not found", task_id),
data: None,
}
}
Err(e) => {
return MeshRequestResult {
success: false,
message: format!("Database error: {}", e),
data: None,
}
}
};
let Some(parent_id) = task.parent_task_id else {
return MeshRequestResult {
success: true,
message: "Task has no parent, so no siblings".to_string(),
data: Some(json!({ "siblings": [] })),
};
};
// Get all subtasks of parent, excluding current task
match repository::list_subtasks_for_owner(pool, parent_id, owner_id).await {
Ok(siblings) => {
let sibling_data: Vec<serde_json::Value> = siblings
.iter()
.filter(|t| t.id != task_id)
.map(|t| {
json!({
"taskId": t.id,
"name": t.name,
"status": t.status,
"priority": t.priority,
})
})
.collect();
MeshRequestResult {
success: true,
message: format!("Found {} sibling tasks", sibling_data.len()),
data: Some(json!({ "siblings": sibling_data })),
}
}
Err(e) => MeshRequestResult {
success: false,
message: format!("Database error: {}", e),
data: None,
},
}
}
MeshToolRequest::ListDaemons => {
// Only list daemons belonging to this owner
let daemons: Vec<serde_json::Value> = state
.daemon_connections
.iter()
.filter(|entry| entry.value().owner_id == owner_id)
.map(|entry| {
let d = entry.value();
json!({
"daemonId": d.id,
"connectionId": d.connection_id,
"hostname": d.hostname,
"machineId": d.machine_id,
})
})
.collect();
MeshRequestResult {
success: true,
message: format!("{} daemon(s) connected", daemons.len()),
data: Some(json!({ "daemons": daemons })),
}
}
MeshToolRequest::ListDaemonDirectories => {
let mut directories: Vec<serde_json::Value> = Vec::new();
// Only list directories from daemons belonging to this owner
for entry in state.daemon_connections.iter() {
let daemon = entry.value();
// Only include daemons belonging to this owner
if daemon.owner_id != owner_id {
continue;
}
// Add working directory if available
if let Some(ref working_dir) = daemon.working_directory {
directories.push(json!({
"path": working_dir,
"label": "Working Directory",
"directoryType": "working",
"hostname": daemon.hostname,
}));
}
// Add home directory if available
if let Some(ref home_dir) = daemon.home_directory {
directories.push(json!({
"path": home_dir,
"label": "Makima Home",
"directoryType": "home",
"hostname": daemon.hostname,
}));
}
}
MeshRequestResult {
success: true,
message: format!("Found {} available directories", directories.len()),
data: Some(json!({ "directories": directories })),
}
}
MeshToolRequest::ListFiles => {
match repository::list_files_for_owner(pool, owner_id).await {
Ok(files) => {
let file_data: Vec<serde_json::Value> = files
.iter()
.map(|f| {
json!({
"fileId": f.id,
"name": f.name,
"description": f.description,
"createdAt": f.created_at,
"updatedAt": f.updated_at,
})
})
.collect();
MeshRequestResult {
success: true,
message: format!("Found {} files", files.len()),
data: Some(json!({ "files": file_data })),
}
}
Err(e) => MeshRequestResult {
success: false,
message: format!("Database error: {}", e),
data: None,
},
}
}
MeshToolRequest::ReadFile { file_id } => {
match repository::get_file_for_owner(pool, file_id, owner_id).await {
Ok(Some(file)) => {
// Convert body elements to readable text
let body_content: Vec<serde_json::Value> = file
.body
.iter()
.map(|elem| {
match elem {
crate::db::models::BodyElement::Heading { level, text } => {
json!({ "type": "heading", "level": level, "text": text })
}
crate::db::models::BodyElement::Paragraph { text } => {
json!({ "type": "paragraph", "text": text })
}
crate::db::models::BodyElement::Code { language, content } => {
json!({ "type": "code", "language": language, "content": content })
}
crate::db::models::BodyElement::List { ordered, items } => {
json!({ "type": "list", "ordered": ordered, "items": items })
}
crate::db::models::BodyElement::Chart { chart_type, title, data, config: _ } => {
let data_count = data.as_array().map(|arr| arr.len()).unwrap_or(0);
json!({ "type": "chart", "chartType": chart_type, "title": title, "dataPoints": data_count })
}
crate::db::models::BodyElement::Image { src, alt, caption } => {
json!({ "type": "image", "src": src, "alt": alt, "caption": caption })
}
crate::db::models::BodyElement::Markdown { content } => {
json!({ "type": "markdown", "content": content })
}
}
})
.collect();
// Also build a plain text version for easier reading
let plain_text: String = file
.body
.iter()
.filter_map(|elem| {
match elem {
crate::db::models::BodyElement::Heading { level, text } => {
Some(format!("{} {}", "#".repeat(*level as usize), text))
}
crate::db::models::BodyElement::Paragraph { text } => {
Some(text.clone())
}
crate::db::models::BodyElement::Code { language, content } => {
let lang = language.as_deref().unwrap_or("");
Some(format!("```{}\n{}\n```", lang, content))
}
crate::db::models::BodyElement::List { ordered, items } => {
let list_text: Vec<String> = items.iter().enumerate().map(|(i, item)| {
if *ordered {
format!("{}. {}", i + 1, item)
} else {
format!("- {}", item)
}
}).collect();
Some(list_text.join("\n"))
}
crate::db::models::BodyElement::Markdown { content } => {
Some(content.clone())
}
_ => None,
}
})
.collect::<Vec<_>>()
.join("\n\n");
// Convert transcript entries to JSON
let transcript: Vec<serde_json::Value> = file
.transcript
.iter()
.map(|entry| {
json!({
"id": entry.id,
"speaker": entry.speaker,
"start": entry.start,
"end": entry.end,
"text": entry.text,
})
})
.collect();
// Build a plain text transcript for easier reading
let transcript_text: String = file
.transcript
.iter()
.map(|entry| {
format!("[{:.1}s] {}: {}", entry.start, entry.speaker, entry.text)
})
.collect::<Vec<_>>()
.join("\n");
MeshRequestResult {
success: true,
message: format!("Read file '{}'", file.name),
data: Some(json!({
"fileId": file.id,
"name": file.name,
"description": file.description,
"summary": file.summary,
"body": body_content,
"plainText": plain_text,
"transcript": transcript,
"transcriptText": transcript_text,
"transcriptCount": file.transcript.len(),
"createdAt": file.created_at,
"updatedAt": file.updated_at,
})),
}
}
Ok(None) => MeshRequestResult {
success: false,
message: format!("File {} not found", file_id),
data: None,
},
Err(e) => MeshRequestResult {
success: false,
message: format!("Database error: {}", e),
data: None,
},
}
}
MeshToolRequest::SendMessageToTask { task_id, message } => {
let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return MeshRequestResult {
success: false,
message: format!("Task {} not found", task_id),
data: None,
}
}
Err(e) => {
return MeshRequestResult {
success: false,
message: format!("Database error: {}", e),
data: None,
}
}
};
if task.status != "running" {
return MeshRequestResult {
success: false,
message: format!("Task is not running (status: {})", task.status),
data: None,
};
}
if let Some(daemon_id) = task.daemon_id {
let command = DaemonCommand::SendMessage { task_id, message };
match state.send_daemon_command(daemon_id, command).await {
Ok(()) => MeshRequestResult {
success: true,
message: "Message sent to task".to_string(),
data: Some(json!({ "taskId": task_id })),
},
Err(e) => MeshRequestResult {
success: false,
message: format!("Failed to send message: {}", e),
data: None,
},
}
} else {
MeshRequestResult {
success: false,
message: "Task has no daemon assigned".to_string(),
data: None,
}
}
}
MeshToolRequest::UpdateTaskPlan {
task_id,
new_plan,
interrupt_if_running,
} => {
let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return MeshRequestResult {
success: false,
message: format!("Task {} not found", task_id),
data: None,
}
}
Err(e) => {
return MeshRequestResult {
success: false,
message: format!("Database error: {}", e),
data: None,
}
}
};
// Interrupt if running and requested
if task.status == "running" && interrupt_if_running {
if let Some(daemon_id) = task.daemon_id {
let command = DaemonCommand::InterruptTask {
task_id,
graceful: true,
};
let _ = state.send_daemon_command(daemon_id, command).await;
}
}
let update_req = crate::db::models::UpdateTaskRequest {
plan: Some(new_plan),
version: Some(task.version),
..Default::default()
};
match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await {
Ok(Some(updated)) => {
state.broadcast_task_update(TaskUpdateNotification {
task_id,
owner_id: Some(task.owner_id),
version: updated.version,
status: updated.status.clone(),
updated_fields: vec!["plan".to_string()],
updated_by: "system".to_string(),
});
MeshRequestResult {
success: true,
message: "Task plan updated".to_string(),
data: Some(json!({ "taskId": task_id })),
}
}
Ok(None) => MeshRequestResult {
success: false,
message: "Task not found".to_string(),
data: None,
},
Err(e) => MeshRequestResult {
success: false,
message: format!("Failed to update task: {}", e),
data: None,
},
}
}
// Overlay operations - these require daemon communication
// For now, return placeholder responses since daemon implementation is separate
MeshToolRequest::PeekSiblingOverlay { sibling_task_id } => MeshRequestResult {
success: false,
message: format!(
"Overlay operations require a connected daemon. Task {} may not have overlay data yet.",
sibling_task_id
),
data: None,
},
MeshToolRequest::GetOverlayDiff { task_id } => MeshRequestResult {
success: false,
message: format!(
"Overlay operations require a connected daemon. Task {} may not have overlay data yet.",
task_id
),
data: None,
},
MeshToolRequest::PreviewMerge { task_id } => MeshRequestResult {
success: false,
message: format!(
"Merge preview requires a connected daemon. Task {} may not have overlay data yet.",
task_id
),
data: None,
},
MeshToolRequest::MergeSubtask { task_id } => MeshRequestResult {
success: false,
message: format!(
"Merge operations require a connected daemon. Task {}",
task_id
),
data: None,
},
MeshToolRequest::CompleteTask { task_id } => {
let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return MeshRequestResult {
success: false,
message: format!("Task {} not found", task_id),
data: None,
}
}
Err(e) => {
return MeshRequestResult {
success: false,
message: format!("Database error: {}", e),
data: None,
}
}
};
// Update status to done
let update_req = crate::db::models::UpdateTaskRequest {
status: Some("done".to_string()),
version: Some(task.version),
..Default::default()
};
match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await {
Ok(Some(updated)) => {
state.broadcast_task_update(TaskUpdateNotification {
task_id,
owner_id: Some(task.owner_id),
version: updated.version,
status: "done".to_string(),
updated_fields: vec!["status".to_string()],
updated_by: "system".to_string(),
});
let merge_mode = task.merge_mode.unwrap_or_else(|| "pr".to_string());
MeshRequestResult {
success: true,
message: format!(
"Task {} completed. Merge mode: {}",
task_id,
&merge_mode
),
data: Some(json!({
"taskId": task_id,
"status": "done",
"mergeMode": merge_mode,
})),
}
}
Ok(None) => MeshRequestResult {
success: false,
message: "Task not found".to_string(),
data: None,
},
Err(e) => MeshRequestResult {
success: false,
message: format!("Failed to complete task: {}", e),
data: None,
},
}
}
MeshToolRequest::SetMergeMode { task_id, mode } => {
let task = match repository::get_task_for_owner(pool, task_id, owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return MeshRequestResult {
success: false,
message: format!("Task {} not found", task_id),
data: None,
}
}
Err(e) => {
return MeshRequestResult {
success: false,
message: format!("Database error: {}", e),
data: None,
}
}
};
let update_req = crate::db::models::UpdateTaskRequest {
merge_mode: Some(mode.clone()),
version: Some(task.version),
..Default::default()
};
match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await {
Ok(Some(updated)) => {
state.broadcast_task_update(TaskUpdateNotification {
task_id,
owner_id: Some(task.owner_id),
version: updated.version,
status: updated.status,
updated_fields: vec!["merge_mode".to_string()],
updated_by: "system".to_string(),
});
MeshRequestResult {
success: true,
message: format!("Merge mode set to '{}'", mode),
data: Some(json!({ "taskId": task_id, "mergeMode": mode })),
}
}
Ok(None) => MeshRequestResult {
success: false,
message: "Task not found".to_string(),
data: None,
},
Err(e) => MeshRequestResult {
success: false,
message: format!("Failed to update merge mode: {}", e),
data: None,
},
}
}
// Supervisor-only tools - these should be handled via the supervisor.sh script,
// not through the mesh chat. Return an informative error.
MeshToolRequest::GetAllContractTasks { contract_id } => {
MeshRequestResult {
success: false,
message: format!(
"get_all_contract_tasks is a supervisor-only tool. Use supervisor.sh to access this functionality. Contract: {}",
contract_id
),
data: None,
}
}
MeshToolRequest::WaitForTaskCompletion { task_id, timeout_seconds } => {
MeshRequestResult {
success: false,
message: format!(
"wait_for_task_completion is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}, Timeout: {}s",
task_id, timeout_seconds
),
data: None,
}
}
MeshToolRequest::ReadTaskWorktree { task_id, file_path } => {
MeshRequestResult {
success: false,
message: format!(
"read_task_worktree is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}, Path: {}",
task_id, file_path
),
data: None,
}
}
MeshToolRequest::SpawnTask { name, .. } => {
MeshRequestResult {
success: false,
message: format!(
"spawn_task is a supervisor-only tool. Only the contract supervisor can spawn new tasks. Task name: {}",
name
),
data: None,
}
}
MeshToolRequest::CreateCheckpoint { task_id, message } => {
MeshRequestResult {
success: false,
message: format!(
"create_checkpoint is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}, Message: {}",
task_id, message
),
data: None,
}
}
MeshToolRequest::ListTaskCheckpoints { task_id } => {
MeshRequestResult {
success: false,
message: format!(
"list_task_checkpoints is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}",
task_id
),
data: None,
}
}
MeshToolRequest::GetTaskTree { task_id } => {
MeshRequestResult {
success: false,
message: format!(
"get_task_tree is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}",
task_id
),
data: None,
}
}
}
}
// =============================================================================
// Chat History Endpoints
// =============================================================================
use crate::db::models::MeshChatHistoryResponse;
/// Get chat history for the current conversation (requires authentication)
#[utoipa::path(
get,
path = "/api/v1/mesh/chat/history",
responses(
(status = 200, description = "Chat history", body = MeshChatHistoryResponse),
(status = 401, description = "Unauthorized"),
(status = 503, description = "Database not configured"),
(status = 500, description = "Internal server error")
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn get_chat_history(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({ "error": "Database not configured" })),
)
.into_response();
};
let conversation = match repository::get_or_create_active_conversation(pool, auth.owner_id).await {
Ok(c) => c,
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": e.to_string() })),
)
.into_response()
}
};
let messages = match repository::list_chat_messages(pool, conversation.id, None).await {
Ok(m) => m,
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": e.to_string() })),
)
.into_response()
}
};
(
StatusCode::OK,
Json(MeshChatHistoryResponse {
conversation_id: conversation.id,
messages,
}),
)
.into_response()
}
/// Clear chat history (archives current conversation and starts new, requires authentication)
#[utoipa::path(
delete,
path = "/api/v1/mesh/chat/history",
responses(
(status = 200, description = "History cleared"),
(status = 401, description = "Unauthorized"),
(status = 503, description = "Database not configured"),
(status = 500, description = "Internal server error")
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "Mesh"
)]
pub async fn clear_chat_history(
State(state): State<SharedState>,
Authenticated(auth): Authenticated,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({ "error": "Database not configured" })),
)
.into_response();
};
match repository::clear_conversation(pool, auth.owner_id).await {
Ok(new_conv) => (
StatusCode::OK,
Json(json!({ "success": true, "conversationId": new_conv.id })),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": e.to_string() })),
)
.into_response(),
}
}