From 87044a747b47bd83249d61a45842c7f7b2eae56d Mon Sep 17 00:00:00 2001 From: soryu Date: Sun, 11 Jan 2026 05:52:14 +0000 Subject: Contract system --- makima/src/server/handlers/chat.rs | 45 +- makima/src/server/handlers/contract_chat.rs | 2592 +++++++++++++++++++++++++ makima/src/server/handlers/contract_daemon.rs | 960 +++++++++ makima/src/server/handlers/contracts.rs | 1284 ++++++++++++ makima/src/server/handlers/files.rs | 219 ++- makima/src/server/handlers/listen.rs | 108 +- makima/src/server/handlers/mesh.rs | 105 +- makima/src/server/handlers/mesh_chat.rs | 124 ++ makima/src/server/handlers/mesh_daemon.rs | 211 +- makima/src/server/handlers/mesh_supervisor.rs | 1153 +++++++++++ makima/src/server/handlers/mod.rs | 5 + makima/src/server/handlers/templates.rs | 107 + 12 files changed, 6884 insertions(+), 29 deletions(-) create mode 100644 makima/src/server/handlers/contract_chat.rs create mode 100644 makima/src/server/handlers/contract_daemon.rs create mode 100644 makima/src/server/handlers/contracts.rs create mode 100644 makima/src/server/handlers/mesh_supervisor.rs create mode 100644 makima/src/server/handlers/templates.rs (limited to 'makima/src/server/handlers') diff --git a/makima/src/server/handlers/chat.rs b/makima/src/server/handlers/chat.rs index dfdb64e..9d8cd19 100644 --- a/makima/src/server/handlers/chat.rs +++ b/makima/src/server/handlers/chat.rs @@ -245,11 +245,12 @@ pub async fn chat_handler( ## Your Capabilities You have access to tools for: - **Viewing content**: view_body (see all elements), read_element (inspect specific element), view_transcript (read full transcript) -- **Adding content**: add_heading, add_paragraph, add_chart +- **Adding content**: add_heading, add_paragraph, add_code, add_list, add_chart - **Modifying content**: update_element, remove_element, reorder_elements, clear_body - **Document metadata**: set_summary - **Data processing**: parse_csv (convert CSV to JSON), jq (transform JSON data) - **Version history**: list_versions, read_version, restore_version +- **Templates**: suggest_templates (get phase-appropriate templates), apply_template (apply a template structure) ## Agentic Behavior Guidelines @@ -611,6 +612,7 @@ You have access to tools for: summary: current_summary.clone(), body: Some(current_body.clone()), version: None, // Internal update, skip version check + repo_file_path: None, }; match repository::update_file(pool, id, update_req).await { @@ -687,7 +689,27 @@ fn build_file_context(file: &crate::db::models::File) -> String { context.push_str(&format!("Summary: {}\n", summary)); } - context.push_str(&format!("Transcript entries: {}\n", file.transcript.len())); + // Include contract phase context if file belongs to a contract + if let Some(ref phase) = file.contract_phase { + context.push_str(&format!("\n## Contract Context\n")); + context.push_str(&format!("This file belongs to a contract in the '{}' phase.\n", phase)); + context.push_str("You can use 'suggest_templates' to get phase-appropriate templates, "); + context.push_str("or 'apply_template' to apply a template structure.\n"); + context.push_str(&format!( + "Templates for '{}' phase include: {}\n", + phase, + match phase.as_str() { + "research" => "research-notes, competitor-analysis, user-research", + "specify" => "requirements, user-stories, acceptance-criteria", + "plan" => "architecture, technical-design, task-breakdown", + "execute" => "dev-notes, test-plan, implementation-log", + "review" => "review-checklist, release-notes, retrospective", + _ => "(use suggest_templates to see available)", + } + )); + } + + context.push_str(&format!("\nTranscript entries: {}\n", file.transcript.len())); context.push_str(&format!("Body elements: {}\n", file.body.len())); // Add body overview @@ -727,6 +749,14 @@ fn build_file_context(file: &crate::db::models::File) -> String { BodyElement::Image { alt, .. } => { format!("Image{}", alt.as_ref().map(|a| format!(": {}", a)).unwrap_or_default()) } + BodyElement::Markdown { content } => { + let preview: String = content.chars().take(50).collect(); + if content.chars().count() > 50 { + format!("Markdown: {}...", preview) + } else { + format!("Markdown: {}", preview) + } + } }; context.push_str(&format!(" [{}] {}\n", i, desc)); } @@ -788,6 +818,9 @@ fn build_focused_element_context(body: &[BodyElement], focused_index: Option { + ("Markdown".to_string(), content.clone()) + } }; format!( @@ -903,6 +936,14 @@ async fn handle_version_request( BodyElement::Image { alt, .. } => { format!("Image{}", alt.as_ref().map(|a| format!(": {}", a)).unwrap_or_default()) } + BodyElement::Markdown { content } => { + let preview: String = content.chars().take(100).collect(); + if content.chars().count() > 100 { + format!("Markdown: {}...", preview) + } else { + format!("Markdown: {}", preview) + } + } }; format!("[{}] {}", i, desc) }) diff --git a/makima/src/server/handlers/contract_chat.rs b/makima/src/server/handlers/contract_chat.rs new file mode 100644 index 0000000..d090999 --- /dev/null +++ b/makima/src/server/handlers/contract_chat.rs @@ -0,0 +1,2592 @@ +//! Chat endpoint for LLM-powered contract management. +//! +//! This handler provides an agentic loop for managing contracts: creating tasks, +//! adding files, managing repositories, and handling phase transitions. + +use axum::{ + extract::{Path, State}, + http::StatusCode, + response::IntoResponse, + Json, +}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use utoipa::ToSchema; +use uuid::Uuid; + +use crate::db::{ + models::{ContractChatHistoryResponse, ContractWithRelations, CreateTaskRequest, UpdateFileRequest}, + repository, +}; +use crate::llm::{ + all_templates, analyze_task_output, body_to_markdown, format_checklist_markdown, + format_parsed_tasks, get_phase_checklist, parse_tasks_from_breakdown, + claude::{self, ClaudeClient, ClaudeError, ClaudeModel}, + groq::{GroqClient, GroqError, Message, ToolCallResponse}, + parse_contract_tool_call, templates_for_phase, ContractToolRequest, FileInfo, + LlmModel, TaskInfo, ToolCall, ToolResult, UserQuestion, CONTRACT_TOOLS, +}; +use crate::server::auth::Authenticated; +use crate::server::state::{DaemonCommand, SharedState}; + +/// Maximum number of tool-calling rounds to prevent infinite loops +const MAX_TOOL_ROUNDS: usize = 30; + +#[derive(Debug, Clone, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ContractChatHistoryMessage { + /// Role: "user" or "assistant" + pub role: String, + /// Message content + pub content: String, +} + +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ContractChatRequest { + /// The user's message/instruction + pub message: String, + /// Optional model selection: "claude-sonnet" (default), "claude-opus", or "groq" + #[serde(default)] + pub model: Option, + /// Optional conversation history for context continuity + #[serde(default)] + pub history: Option>, +} + +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ContractChatResponse { + /// The LLM's response message + pub response: String, + /// Tool calls that were executed + pub tool_calls: Vec, + /// Questions pending user answers (pauses conversation) + #[serde(skip_serializing_if = "Option::is_none")] + pub pending_questions: Option>, +} + +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ContractToolCallInfo { + pub name: String, + pub result: ToolResult, +} + +/// Enum to hold LLM clients +enum LlmClient { + Groq(GroqClient), + Claude(ClaudeClient), +} + +/// Unified result from LLM call +struct LlmResult { + content: Option, + tool_calls: Vec, + raw_tool_calls: Vec, + finish_reason: String, +} + +/// Helper to get contract with all relations +async fn get_contract_with_relations( + pool: &sqlx::PgPool, + contract_id: Uuid, + owner_id: Uuid, +) -> Result, sqlx::Error> { + let contract = match repository::get_contract_for_owner(pool, contract_id, owner_id).await? { + Some(c) => c, + None => return Ok(None), + }; + + let repositories = repository::list_contract_repositories(pool, contract_id) + .await + .unwrap_or_default(); + + let files = repository::list_files_in_contract(pool, contract_id, owner_id) + .await + .unwrap_or_default(); + + let tasks = repository::list_tasks_in_contract(pool, contract_id, owner_id) + .await + .unwrap_or_default(); + + Ok(Some(ContractWithRelations { + contract, + repositories, + files, + tasks, + })) +} + +/// Chat with a contract using LLM tool calling for management +#[utoipa::path( + post, + path = "/api/v1/contracts/{id}/chat", + request_body = ContractChatRequest, + responses( + (status = 200, description = "Chat completed successfully", body = ContractChatResponse), + (status = 401, description = "Unauthorized"), + (status = 404, description = "Contract not found"), + (status = 500, description = "Internal server error") + ), + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Contracts" +)] +pub async fn contract_chat_handler( + State(state): State, + Authenticated(auth): Authenticated, + Path(contract_id): Path, + Json(request): Json, +) -> impl IntoResponse { + // Check if database is configured + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(json!({ "error": "Database not configured" })), + ) + .into_response(); + }; + + // Get the contract (scoped by owner) + let contract = match get_contract_with_relations(pool, contract_id, auth.owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(json!({ "error": "Contract not found" })), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Database error: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("Database error: {}", e) })), + ) + .into_response(); + } + }; + + // Parse model selection (default to Claude Sonnet) + let model = request + .model + .as_ref() + .and_then(|m| LlmModel::from_str(m)) + .unwrap_or(LlmModel::ClaudeSonnet); + + tracing::info!("Contract chat using LLM model: {:?}", model); + + // Initialize the appropriate LLM client + let llm_client = match model { + LlmModel::ClaudeSonnet => match ClaudeClient::from_env(ClaudeModel::Sonnet) { + Ok(client) => LlmClient::Claude(client), + Err(ClaudeError::MissingApiKey) => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })), + ) + .into_response(); + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("Claude client error: {}", e) })), + ) + .into_response(); + } + }, + LlmModel::ClaudeOpus => match ClaudeClient::from_env(ClaudeModel::Opus) { + Ok(client) => LlmClient::Claude(client), + Err(ClaudeError::MissingApiKey) => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(json!({ "error": "ANTHROPIC_API_KEY not configured" })), + ) + .into_response(); + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("Claude client error: {}", e) })), + ) + .into_response(); + } + }, + LlmModel::GroqKimi => match GroqClient::from_env() { + Ok(client) => LlmClient::Groq(client), + Err(GroqError::MissingApiKey) => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(json!({ "error": "GROQ_API_KEY not configured" })), + ) + .into_response(); + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("Groq client error: {}", e) })), + ) + .into_response(); + } + }, + }; + + // Build contract context + let contract_context = build_contract_context(&contract); + + // Build system prompt for contract management + let system_prompt = format!( + r#"You are an intelligent contract management agent. You guide users through the contract lifecycle from research to completion, helping them organize work, create documentation, set up repositories, and execute tasks. + +## Your Capabilities +You have access to tools for: +- **Query**: get_contract_status, list_contract_files, list_contract_tasks, list_contract_repositories, read_file +- **File Management**: create_file_from_template, create_empty_file, list_available_templates +- **Task Management**: create_contract_task, delegate_content_generation, start_task +- **Phase Management**: get_phase_info, suggest_phase_transition, advance_phase +- **Repository Management**: list_daemon_directories, add_repository, set_primary_repository +- **Interactive**: ask_user + +## Content Generation Deferral +When asked to write substantial content, fill templates, or generate documentation: +- **Use delegate_content_generation** to create a task for the content generation +- This delegates the work to a task agent that can do more thorough research and writing + +**Use delegation for:** +- Filling in template content with real data +- Writing documentation based on requirements +- Generating user stories or specifications +- Creating detailed design documents +- Any substantial writing that requires research or analysis + +**Direct actions (no delegation needed):** +- Listing files/tasks/repos +- Reading files +- Phase transitions +- Creating empty files or templates +- Simple queries and status checks +- Asking user questions + +## Contract Lifecycle Phases + +### 1. RESEARCH Phase +**Purpose**: Gather information and understand the problem space +**Key Activities**: +- Conduct user research and interviews +- Analyze competitors and existing solutions +- Document findings and insights +- Identify opportunities and constraints +**Suggested Actions**: +- Create a "Research Notes" document to capture findings +- Create a "Competitor Analysis" document +- When research is complete, suggest transitioning to Specify phase + +### 2. SPECIFY Phase +**Purpose**: Define what needs to be built +**Key Activities**: +- Write clear requirements +- Create user stories with acceptance criteria +- Define scope and constraints +- Document technical constraints +**Suggested Actions**: +- Create a "Requirements" document +- Create "User Stories" with acceptance criteria +- When specifications are clear, suggest transitioning to Plan phase + +### 3. PLAN Phase +**Purpose**: Design the solution and break down the work +**Key Activities**: +- Design system architecture +- Create technical specifications +- Break work into implementable tasks +- Set up repositories for development +**Suggested Actions**: +- Create an "Architecture" document +- Create a "Task Breakdown" document +- **IMPORTANT**: Help set up a repository if not already configured +- When planning is complete and a repository is set, suggest transitioning to Execute phase + +### 4. EXECUTE Phase +**Purpose**: Implement the solution +**Key Activities**: +- Create and run tasks to implement features +- Write and run tests +- Track progress +- Document implementation decisions +**Suggested Actions**: +- Create tasks based on the task breakdown +- Monitor task progress and help resolve blockers +- When all tasks are complete, suggest transitioning to Review phase + +### 5. REVIEW Phase +**Purpose**: Validate and document the completed work +**Key Activities**: +- Review completed work +- Create release notes +- Conduct retrospective +- Document learnings +**Suggested Actions**: +- Create a "Release Notes" document +- Create a "Retrospective" document +- Help mark the contract as complete when review is done + +## Current Contract +{contract_context} + +## Proactive Guidance + +### Repository Setup (Critical for Plan/Execute phases) +When the user wants to add a local repository or set up for execution: +1. **First call list_daemon_directories** to get available paths from connected agents +2. Present the suggested directories to the user +3. Ask which path they want to use, or let them specify a custom path +4. Then call add_repository with the chosen path + +Example flow: +``` +User: "Set up a repository for this contract" +You: Call list_daemon_directories first +You: "I found these directories from your connected agent: + - /Users/alice/projects (Working Directory) + - /Users/alice/.makima/home (Makima Home) + Which would you like to use, or provide a custom path?" +``` + +### Phase Transitions +- Phases progress in order: research -> specify -> plan -> execute -> review +- You can ONLY advance forward one step at a time to the NEXT phase +- ALWAYS use suggest_phase_transition FIRST to get the exact nextPhase value +- Then use advance_phase with that exact nextPhase value +- Example: If currentPhase is "specify", nextPhase will be "plan" - use advance_phase with new_phase="plan" +- NEVER suggest advancing to the same phase the contract is already in + +### New Users +When a new contract is created or the user seems unsure: +1. Explain the current phase and what should be done +2. Suggest creating appropriate documents +3. Guide them toward the next milestone + +## Agentic Behavior Guidelines + +### 1. Understand Before Acting +- For complex requests, first gather information about the contract's current state +- Use get_contract_status or list_contract_files to understand what exists +- Consider the current phase when suggesting actions + +### 2. Phase-Appropriate Suggestions +- Suggest templates and actions appropriate for the current phase +- When creating files, prefer templates that match the contract's phase +- Advise when the contract might be ready for the next phase + +### 3. Help Plan Work +- When asked to plan work, read existing files to understand context +- Suggest creating tasks based on requirements or plans in files +- Offer to create task breakdowns from design documents + +### 4. Repository Management +- When adding local repositories, ALWAYS use list_daemon_directories first to get suggestions +- This provides the user with valid paths from their connected agents +- Don't ask users to manually type paths when suggestions are available + +### 5. Task Creation and Execution +- When creating tasks, derive plans from existing contract files when possible +- Use the contract's primary repository for tasks by default +- Create clear, actionable task plans +- After creating a task, you can use **start_task** to immediately begin execution +- A daemon must be connected for start_task to work + +### 6. Be Proactive but Efficient +- Guide users through the contract flow +- Don't over-analyze simple requests +- Use the minimum number of tool calls needed +- Provide clear summaries of actions taken + +## Important Notes +- This contract's ID is: {contract_id} +- All operations are scoped to this contract +- When creating tasks or files, they are automatically associated with this contract"#, + contract_context = contract_context, + contract_id = contract_id + ); + + // Run the agentic loop + run_contract_agentic_loop( + pool, + &state, + &llm_client, + system_prompt, + &request, + contract_id, + auth.owner_id, + ) + .await +} + +fn build_contract_context(contract: &crate::db::models::ContractWithRelations) -> String { + let c = &contract.contract; + let mut context = format!( + "Name: {}\nID: {}\nPhase: {}\nStatus: {}\n", + c.name, c.id, c.phase, c.status + ); + + if let Some(ref desc) = c.description { + context.push_str(&format!("Description: {}\n", desc)); + } + + // Build phase checklist + let file_infos: Vec = contract.files.iter().map(|f| FileInfo { + id: f.id, + name: f.name.clone(), + contract_phase: f.contract_phase.clone(), + }).collect(); + + let task_infos: Vec = contract.tasks.iter().map(|t| TaskInfo { + id: t.id, + name: t.name.clone(), + status: t.status.clone(), + }).collect(); + + let has_repository = !contract.repositories.is_empty(); + let phase_checklist = get_phase_checklist(&c.phase, &file_infos, &task_infos, has_repository); + + // Add phase checklist to context + context.push_str("\n"); + context.push_str(&format_checklist_markdown(&phase_checklist)); + + // Files summary + context.push_str(&format!("\n### Files ({} total)\n", contract.files.len())); + if !contract.files.is_empty() { + for file in contract.files.iter().take(5) { + let phase_label = file.contract_phase.as_deref().unwrap_or("none"); + context.push_str(&format!("- {} [{}] (ID: {})\n", file.name, phase_label, file.id)); + } + if contract.files.len() > 5 { + context.push_str(&format!("... and {} more\n", contract.files.len() - 5)); + } + } + + // Tasks summary + context.push_str(&format!("\n### Tasks ({} total)\n", contract.tasks.len())); + if !contract.tasks.is_empty() { + let pending = contract.tasks.iter().filter(|t| t.status == "pending").count(); + let running = contract.tasks.iter().filter(|t| t.status == "running").count(); + let done = contract.tasks.iter().filter(|t| t.status == "done").count(); + context.push_str(&format!("{} pending, {} running, {} done\n", pending, running, done)); + for task in contract.tasks.iter().take(5) { + context.push_str(&format!("- {} ({}) - ID: {}\n", task.name, task.status, task.id)); + } + if contract.tasks.len() > 5 { + context.push_str(&format!("... and {} more\n", contract.tasks.len() - 5)); + } + } + + // Repositories summary + context.push_str(&format!("\n### Repositories ({} total)\n", contract.repositories.len())); + if !contract.repositories.is_empty() { + for repo in &contract.repositories { + let primary = if repo.is_primary { " (primary)" } else { "" }; + let url_or_path = repo.repository_url.as_deref() + .or(repo.local_path.as_deref()) + .unwrap_or("managed"); + context.push_str(&format!("- {}: {}{}\n", repo.name, url_or_path, primary)); + } + } + + context +} + +/// Summarize older conversation history to reduce token usage +async fn summarize_conversation_history( + llm_client: &LlmClient, + messages: &[&crate::db::models::ContractChatMessageRecord], +) -> String { + // Build conversation text for summarization + let mut conversation_text = String::new(); + for msg in messages { + let role_label = if msg.role == "user" { "User" } else { "Assistant" }; + // Limit each message to avoid overwhelming the summarizer + let content = if msg.content.len() > 500 { + format!("{}...", &msg.content[..500]) + } else { + msg.content.clone() + }; + conversation_text.push_str(&format!("{}: {}\n", role_label, content)); + } + + // Limit total text to summarize + if conversation_text.len() > 8000 { + conversation_text = format!("{}...", &conversation_text[..8000]); + } + + let summary_prompt = format!( + "Summarize this conversation history in 2-3 sentences, focusing on key decisions, actions taken, and current state:\n\n{}", + conversation_text + ); + + // Use a simple chat call without tools for summarization + let summary = match llm_client { + LlmClient::Claude(client) => { + let claude_messages = vec![claude::Message { + role: "user".to_string(), + content: claude::MessageContent::Text(summary_prompt.clone()), + }]; + match client.chat_with_tools(claude_messages, &[]).await { + Ok(response) => response.content.unwrap_or_default(), + Err(e) => { + tracing::warn!("Failed to summarize conversation: {}", e); + "Previous conversation covered contract management tasks.".to_string() + } + } + } + LlmClient::Groq(client) => { + let groq_messages = vec![Message { + role: "user".to_string(), + content: Some(summary_prompt.clone()), + tool_calls: None, + tool_call_id: None, + }]; + match client.chat_with_tools(groq_messages, &[]).await { + Ok(response) => response.content.unwrap_or_default(), + Err(e) => { + tracing::warn!("Failed to summarize conversation: {}", e); + "Previous conversation covered contract management tasks.".to_string() + } + } + } + }; + + // Limit summary length + if summary.len() > 500 { + format!("{}...", &summary[..500]) + } else { + summary + } +} + +/// Run the agentic loop for contract chat +async fn run_contract_agentic_loop( + pool: &sqlx::PgPool, + state: &SharedState, + llm_client: &LlmClient, + system_prompt: String, + request: &ContractChatRequest, + contract_id: Uuid, + owner_id: Uuid, +) -> axum::response::Response { + // Get or create the conversation for persistent history + let conversation = match repository::get_or_create_contract_conversation(pool, contract_id, owner_id).await { + Ok(conv) => conv, + Err(e) => { + tracing::error!("Failed to get/create contract conversation: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("Failed to initialize conversation: {}", e) })), + ) + .into_response(); + } + }; + + // Load ALL existing messages from database + let saved_messages = match repository::list_contract_chat_messages(pool, conversation.id, None).await { + Ok(msgs) => msgs, + Err(e) => { + tracing::warn!("Failed to load contract chat history: {}", e); + Vec::new() + } + }; + + // Build initial messages + let mut messages = vec![Message { + role: "system".to_string(), + content: Some(system_prompt), + tool_calls: None, + tool_call_id: None, + }]; + + // Add saved conversation history, summarizing older messages if needed + // to stay under rate limits (~25k chars ≈ ~6k tokens for history) + const MAX_HISTORY_CHARS: usize = 25000; + const RECENT_MESSAGES_TO_KEEP: usize = 6; // Keep last 3 turns intact + + // Filter to user/assistant messages only + let history_messages: Vec<_> = saved_messages + .iter() + .filter(|m| m.role == "user" || m.role == "assistant") + .collect(); + + // Calculate total character count + let total_chars: usize = history_messages.iter().map(|m| m.content.len()).sum(); + + if total_chars > MAX_HISTORY_CHARS && history_messages.len() > RECENT_MESSAGES_TO_KEEP { + // Need to summarize older messages + let split_point = history_messages.len().saturating_sub(RECENT_MESSAGES_TO_KEEP); + let older_messages = &history_messages[..split_point]; + let recent_messages = &history_messages[split_point..]; + + // Generate summary of older conversation + let summary = summarize_conversation_history(&llm_client, older_messages).await; + + // Add summary as context + messages.push(Message { + role: "user".to_string(), + content: Some(format!("[Previous conversation summary: {}]", summary)), + tool_calls: None, + tool_call_id: None, + }); + messages.push(Message { + role: "assistant".to_string(), + content: Some("I understand the previous context. Let's continue.".to_string()), + tool_calls: None, + tool_call_id: None, + }); + + // Add recent messages in full + for saved_msg in recent_messages { + messages.push(Message { + role: saved_msg.role.clone(), + content: Some(saved_msg.content.clone()), + tool_calls: None, + tool_call_id: None, + }); + } + + tracing::info!( + total_messages = history_messages.len(), + summarized = older_messages.len(), + kept_recent = recent_messages.len(), + "Summarized older conversation history" + ); + } else { + // Add all messages directly + for saved_msg in history_messages { + messages.push(Message { + role: saved_msg.role.clone(), + content: Some(saved_msg.content.clone()), + tool_calls: None, + tool_call_id: None, + }); + } + } + + // Add current user message + messages.push(Message { + role: "user".to_string(), + content: Some(request.message.clone()), + tool_calls: None, + tool_call_id: None, + }); + + // Save the user message to database + if let Err(e) = repository::add_contract_chat_message( + pool, + conversation.id, + "user", + &request.message, + None, + None, + ).await { + tracing::warn!("Failed to save user message to contract chat history: {}", e); + } + + // State for tracking + let mut all_tool_call_infos: Vec = Vec::new(); + let mut final_response: Option = None; + let mut consecutive_failures = 0; + const MAX_CONSECUTIVE_FAILURES: usize = 3; + let mut pending_questions: Option> = None; + + // Multi-turn agentic tool calling loop + for round in 0..MAX_TOOL_ROUNDS { + tracing::info!( + round = round, + total_tool_calls = all_tool_call_infos.len(), + "Contract agentic loop iteration" + ); + + // Check consecutive failures + if consecutive_failures >= MAX_CONSECUTIVE_FAILURES { + tracing::warn!( + "Breaking contract loop due to {} consecutive failures", + consecutive_failures + ); + final_response = Some( + "I encountered multiple consecutive errors and stopped. \ + Please check the contract state and try again." + .to_string(), + ); + break; + } + + // Call the appropriate LLM API + let result = match llm_client { + LlmClient::Groq(groq) => { + match groq.chat_with_tools(messages.clone(), &CONTRACT_TOOLS).await { + Ok(r) => LlmResult { + content: r.content, + tool_calls: r.tool_calls, + raw_tool_calls: r.raw_tool_calls, + finish_reason: r.finish_reason, + }, + Err(e) => { + tracing::error!("Groq API error: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("LLM API error: {}", e) })), + ) + .into_response(); + } + } + } + LlmClient::Claude(claude_client) => { + let claude_messages = claude::groq_messages_to_claude(&messages); + match claude_client + .chat_with_tools(claude_messages, &CONTRACT_TOOLS) + .await + { + Ok(r) => { + let raw_tool_calls: Vec = r + .tool_calls + .iter() + .map(|tc| ToolCallResponse { + id: tc.id.clone(), + call_type: "function".to_string(), + function: crate::llm::groq::FunctionCall { + name: tc.name.clone(), + arguments: tc.arguments.to_string(), + }, + }) + .collect(); + + LlmResult { + content: r.content, + tool_calls: r.tool_calls, + raw_tool_calls, + finish_reason: r.stop_reason, + } + } + Err(e) => { + tracing::error!("Claude API error: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("LLM API error: {}", e) })), + ) + .into_response(); + } + } + } + }; + + // Check if there are tool calls to execute + if result.tool_calls.is_empty() { + final_response = result.content; + break; + } + + // Add assistant message with tool calls to conversation + messages.push(Message { + role: "assistant".to_string(), + content: result.content.clone(), + tool_calls: Some(result.raw_tool_calls.clone()), + tool_call_id: None, + }); + + // Execute each tool call + for (i, tool_call) in result.tool_calls.iter().enumerate() { + tracing::info!(tool = %tool_call.name, round = round, "Executing contract tool call"); + + // Parse the tool call + let mut execution_result = parse_contract_tool_call(tool_call); + + // Handle async contract tool requests + if let Some(contract_request) = execution_result.request.take() { + let async_result = + handle_contract_request(pool, &state.daemon_connections, contract_request, contract_id, owner_id).await; + execution_result.success = async_result.success; + execution_result.message = async_result.message; + execution_result.data = async_result.data; + } + + // Track consecutive failures + if execution_result.success { + consecutive_failures = 0; + } else { + consecutive_failures += 1; + tracing::warn!( + tool = %tool_call.name, + consecutive_failures = consecutive_failures, + "Contract tool call failed" + ); + } + + // Check for pending user questions + if let Some(questions) = execution_result.pending_questions { + tracing::info!( + question_count = questions.len(), + "Contract LLM requesting user input" + ); + pending_questions = Some(questions); + all_tool_call_infos.push(ContractToolCallInfo { + name: tool_call.name.clone(), + result: ToolResult { + success: execution_result.success, + message: execution_result.message.clone(), + }, + }); + break; + } + + // Build tool result message + let result_content = if let Some(data) = &execution_result.data { + json!({ + "success": execution_result.success, + "message": execution_result.message, + "data": data + }) + .to_string() + } else { + json!({ + "success": execution_result.success, + "message": execution_result.message + }) + .to_string() + }; + + // Add tool result message + let tool_call_id = match llm_client { + LlmClient::Groq(_) => result.raw_tool_calls[i].id.clone(), + LlmClient::Claude(_) => tool_call.id.clone(), + }; + + messages.push(Message { + role: "tool".to_string(), + content: Some(result_content), + tool_calls: None, + tool_call_id: Some(tool_call_id), + }); + + // Track for response + all_tool_call_infos.push(ContractToolCallInfo { + name: tool_call.name.clone(), + result: ToolResult { + success: execution_result.success, + message: execution_result.message, + }, + }); + } + + // If user questions are pending, pause + if pending_questions.is_some() { + final_response = result.content; + break; + } + + // If finish reason indicates completion, exit loop + let finish_lower = result.finish_reason.to_lowercase(); + if finish_lower == "stop" || finish_lower == "end_turn" { + final_response = result.content; + break; + } + } + + // Build response + let response_text = final_response.unwrap_or_else(|| { + if all_tool_call_infos.is_empty() { + "I couldn't understand your request. Please try rephrasing.".to_string() + } else { + format!( + "Done! Executed {} tool{}.", + all_tool_call_infos.len(), + if all_tool_call_infos.len() == 1 { "" } else { "s" } + ) + } + }); + + // Save assistant response to database + let tool_calls_json = if all_tool_call_infos.is_empty() { + None + } else { + serde_json::to_value(&all_tool_call_infos).ok() + }; + + let pending_questions_json = pending_questions.as_ref().and_then(|q| serde_json::to_value(q).ok()); + + if let Err(e) = repository::add_contract_chat_message( + pool, + conversation.id, + "assistant", + &response_text, + tool_calls_json, + pending_questions_json, + ).await { + tracing::warn!("Failed to save assistant response to contract chat history: {}", e); + } + + ( + StatusCode::OK, + Json(ContractChatResponse { + response: response_text, + tool_calls: all_tool_call_infos, + pending_questions, + }), + ) + .into_response() +} + +/// Result from handling an async contract tool request +struct ContractRequestResult { + success: bool, + message: String, + data: Option, +} + +/// Handle async contract tool requests that require database access +async fn handle_contract_request( + pool: &sqlx::PgPool, + daemon_connections: &dashmap::DashMap, + request: ContractToolRequest, + contract_id: Uuid, + owner_id: Uuid, +) -> ContractRequestResult { + match request { + ContractToolRequest::ListDaemonDirectories => { + let mut directories = Vec::new(); + + // Iterate over connected daemons belonging to this owner + for entry in daemon_connections.iter() { + let daemon = entry.value(); + + // Only include daemons belonging to this owner + if daemon.owner_id != owner_id { + continue; + } + + // Add working directory if available + if let Some(ref working_dir) = daemon.working_directory { + directories.push(json!({ + "path": working_dir, + "label": "Working Directory", + "type": "working", + "hostname": daemon.hostname, + })); + } + + // Add home directory if available + if let Some(ref home_dir) = daemon.home_directory { + directories.push(json!({ + "path": home_dir, + "label": "Makima Home", + "type": "home", + "hostname": daemon.hostname, + })); + } + } + + if directories.is_empty() { + ContractRequestResult { + success: true, + message: "No daemon directories available. Connect a daemon to get directory suggestions.".to_string(), + data: Some(json!({ "directories": [] })), + } + } else { + ContractRequestResult { + success: true, + message: format!("Found {} suggested directories from connected daemons", directories.len()), + data: Some(json!({ "directories": directories })), + } + } + } + + ContractToolRequest::GetContractStatus => { + match get_contract_with_relations(pool, contract_id, owner_id).await { + Ok(Some(cwr)) => { + let c = &cwr.contract; + ContractRequestResult { + success: true, + message: format!( + "Contract '{}' is in '{}' phase with status '{}'", + c.name, c.phase, c.status + ), + data: Some(json!({ + "name": c.name, + "phase": c.phase, + "status": c.status, + "description": c.description, + "fileCount": cwr.files.len(), + "taskCount": cwr.tasks.len(), + "repositoryCount": cwr.repositories.len(), + })), + } + } + Ok(None) => ContractRequestResult { + success: false, + message: "Contract not found".to_string(), + data: None, + }, + Err(e) => ContractRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + }, + } + } + + ContractToolRequest::ListContractFiles => { + match get_contract_with_relations(pool, contract_id, owner_id).await { + Ok(Some(cwr)) => { + let files: Vec = cwr + .files + .iter() + .map(|f| { + json!({ + "fileId": f.id, + "name": f.name, + "description": f.description, + "phase": f.contract_phase, + }) + }) + .collect(); + + ContractRequestResult { + success: true, + message: format!("Found {} files", files.len()), + data: Some(json!({ "files": files })), + } + } + Ok(None) => ContractRequestResult { + success: false, + message: "Contract not found".to_string(), + data: None, + }, + Err(e) => ContractRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + }, + } + } + + ContractToolRequest::ListContractTasks => { + match get_contract_with_relations(pool, contract_id, owner_id).await { + Ok(Some(cwr)) => { + let tasks: Vec = cwr + .tasks + .iter() + .map(|t| { + json!({ + "taskId": t.id, + "name": t.name, + "status": t.status, + "priority": t.priority, + }) + }) + .collect(); + + ContractRequestResult { + success: true, + message: format!("Found {} tasks", tasks.len()), + data: Some(json!({ "tasks": tasks })), + } + } + Ok(None) => ContractRequestResult { + success: false, + message: "Contract not found".to_string(), + data: None, + }, + Err(e) => ContractRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + }, + } + } + + ContractToolRequest::ListContractRepositories => { + match get_contract_with_relations(pool, contract_id, owner_id).await { + Ok(Some(cwr)) => { + let repos: Vec = cwr + .repositories + .iter() + .map(|r| { + json!({ + "repositoryId": r.id, + "name": r.name, + "repositoryUrl": r.repository_url, + "localPath": r.local_path, + "isPrimary": r.is_primary, + }) + }) + .collect(); + + ContractRequestResult { + success: true, + message: format!("Found {} repositories", repos.len()), + data: Some(json!({ "repositories": repos })), + } + } + Ok(None) => ContractRequestResult { + success: false, + message: "Contract not found".to_string(), + data: None, + }, + Err(e) => ContractRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + }, + } + } + + ContractToolRequest::ReadFile { file_id } => { + match repository::get_file_for_owner(pool, file_id, owner_id).await { + Ok(Some(file)) => { + // Verify file belongs to this contract + if file.contract_id != Some(contract_id) { + return ContractRequestResult { + success: false, + message: "File does not belong to this contract".to_string(), + data: None, + }; + } + + // Convert body to markdown for LLM consumption + let markdown = body_to_markdown(&file.body); + + ContractRequestResult { + success: true, + message: format!("Read file '{}'", file.name), + data: Some(json!({ + "fileId": file.id, + "name": file.name, + "description": file.description, + "summary": file.summary, + "plainText": markdown, + "phase": file.contract_phase, + })), + } + } + Ok(None) => ContractRequestResult { + success: false, + message: "File not found".to_string(), + data: None, + }, + Err(e) => ContractRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + }, + } + } + + ContractToolRequest::CreateFileFromTemplate { + template_id, + name, + description, + } => { + // Find the template + let templates = all_templates(); + let template = templates.iter().find(|t| t.id == template_id); + + let Some(template) = template else { + return ContractRequestResult { + success: false, + message: format!("Template '{}' not found", template_id), + data: None, + }; + }; + + // Verify contract exists and get current phase + let contract = match repository::get_contract_for_owner(pool, contract_id, owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ContractRequestResult { + success: false, + message: "Contract not found".to_string(), + data: None, + } + } + Err(e) => { + return ContractRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + } + } + }; + + // Use template's phase if available, otherwise use contract's current phase + let contract_phase = Some(template.phase.clone()).or(Some(contract.phase.clone())); + + // Create the file (contract_id is now required) + let create_req = crate::db::models::CreateFileRequest { + contract_id, + name: Some(name.clone()), + description, + body: template.suggested_body.clone(), + transcript: Vec::new(), + location: None, + repo_file_path: None, + contract_phase, + }; + + match repository::create_file_for_owner(pool, owner_id, create_req).await { + Ok(file) => ContractRequestResult { + success: true, + message: format!( + "Created file '{}' from template '{}'", + name, template.name + ), + data: Some(json!({ + "fileId": file.id, + "name": file.name, + "templateId": template_id, + })), + }, + Err(e) => ContractRequestResult { + success: false, + message: format!("Failed to create file: {}", e), + data: None, + }, + } + } + + ContractToolRequest::CreateEmptyFile { name, description } => { + // Verify contract exists and get current phase + let contract = match repository::get_contract_for_owner(pool, contract_id, owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ContractRequestResult { + success: false, + message: "Contract not found".to_string(), + data: None, + } + } + Err(e) => { + return ContractRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + } + } + }; + + // Create the file with current contract phase + let create_req = crate::db::models::CreateFileRequest { + contract_id, + name: Some(name.clone()), + description, + body: Vec::new(), + transcript: Vec::new(), + location: None, + repo_file_path: None, + contract_phase: Some(contract.phase.clone()), + }; + + match repository::create_file_for_owner(pool, owner_id, create_req).await { + Ok(file) => ContractRequestResult { + success: true, + message: format!("Created empty file '{}'", name), + data: Some(json!({ + "fileId": file.id, + "name": file.name, + })), + }, + Err(e) => ContractRequestResult { + success: false, + message: format!("Failed to create file: {}", e), + data: None, + }, + } + } + + ContractToolRequest::ListAvailableTemplates { phase } => { + let templates = if let Some(p) = phase { + templates_for_phase(&p) + } else { + all_templates() + }; + + let template_data: Vec = templates + .iter() + .map(|t| { + json!({ + "id": t.id, + "name": t.name, + "phase": t.phase, + "description": t.description, + }) + }) + .collect(); + + ContractRequestResult { + success: true, + message: format!("Found {} templates", templates.len()), + data: Some(json!({ "templates": template_data })), + } + } + + ContractToolRequest::CreateContractTask { + name, + plan, + repository_url, + base_branch, + } => { + // Get primary repository if not specified + let repo_url = if repository_url.is_some() { + repository_url + } else { + // Find primary repository + match get_contract_with_relations(pool, contract_id, owner_id).await { + Ok(Some(contract)) => { + contract + .repositories + .iter() + .find(|r| r.is_primary) + .and_then(|r| r.repository_url.clone().or(r.local_path.clone())) + } + _ => None, + } + }; + + let create_req = CreateTaskRequest { + contract_id, + name: name.clone(), + description: None, + plan, + parent_task_id: None, + repository_url: repo_url, + base_branch, + target_branch: None, + merge_mode: None, + priority: 0, + target_repo_path: None, + completion_action: None, + continue_from_task_id: None, + copy_files: None, + is_supervisor: false, + checkpoint_sha: None, + }; + + match repository::create_task_for_owner(pool, owner_id, create_req).await { + Ok(task) => ContractRequestResult { + success: true, + message: format!("Created task '{}' in contract", name), + data: Some(json!({ + "taskId": task.id, + "name": task.name, + "status": task.status, + })), + }, + Err(e) => ContractRequestResult { + success: false, + message: format!("Failed to create task: {}", e), + data: None, + }, + } + } + + ContractToolRequest::DelegateContentGeneration { + file_id, + instruction, + context, + } => { + // Build a task plan that includes the content generation instruction + let mut plan = format!( + "Content Generation Task\n\n\ + ## Instruction\n{}\n\n", + instruction + ); + + if let Some(ctx) = context { + plan.push_str(&format!("## Context\n{}\n\n", ctx)); + } + + // If file_id is provided, get file details and include them + let (file_name, file_info) = if let Some(fid) = file_id { + match repository::get_file_for_owner(pool, fid, owner_id).await { + Ok(Some(file)) => { + let info = format!( + "## Target File\n\ + - File ID: {}\n\ + - Name: {}\n\ + - Description: {}\n\n\ + The generated content should be structured to update this file.\n", + fid, + file.name, + file.description.as_deref().unwrap_or("(no description)") + ); + (Some(file.name.clone()), Some(info)) + } + _ => (None, None), + } + } else { + (None, None) + }; + + if let Some(info) = file_info { + plan.push_str(&info); + } + + // Get primary repository + let repo_url = match get_contract_with_relations(pool, contract_id, owner_id).await { + Ok(Some(contract)) => contract + .repositories + .iter() + .find(|r| r.is_primary) + .and_then(|r| r.repository_url.clone().or(r.local_path.clone())), + _ => None, + }; + + let task_name = format!( + "Generate content{}", + file_name.map(|n| format!(": {}", n)).unwrap_or_default() + ); + + let create_req = CreateTaskRequest { + contract_id, + name: task_name.clone(), + description: Some(instruction.clone()), + plan, + parent_task_id: None, + repository_url: repo_url, + base_branch: None, + target_branch: None, + merge_mode: None, + priority: 0, + target_repo_path: None, + completion_action: None, + continue_from_task_id: None, + copy_files: None, + is_supervisor: false, + checkpoint_sha: None, + }; + + match repository::create_task_for_owner(pool, owner_id, create_req).await { + Ok(task) => ContractRequestResult { + success: true, + message: format!( + "Created content generation task '{}'. Start the task to generate the content.", + task_name + ), + data: Some(json!({ + "taskId": task.id, + "name": task.name, + "status": task.status, + "targetFileId": file_id, + })), + }, + Err(e) => ContractRequestResult { + success: false, + message: format!("Failed to create content generation task: {}", e), + data: None, + }, + } + } + + ContractToolRequest::StartTask { task_id } => { + // Get the task + let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ContractRequestResult { + success: false, + message: "Task not found".to_string(), + data: None, + } + } + Err(e) => { + return ContractRequestResult { + success: false, + message: format!("Failed to get task: {}", e), + data: None, + } + } + }; + + // Check if task can be started + let startable_statuses = ["pending", "failed", "interrupted", "done", "merged"]; + if !startable_statuses.contains(&task.status.as_str()) { + return ContractRequestResult { + success: false, + message: format!("Task cannot be started from status: {}", task.status), + data: None, + }; + } + + // Find a connected daemon for this owner + let daemon_entry = daemon_connections + .iter() + .find(|d| d.value().owner_id == owner_id); + + let (target_daemon_id, command_sender) = match daemon_entry { + Some(entry) => { + let daemon = entry.value(); + (daemon.id, daemon.command_sender.clone()) + } + None => { + return ContractRequestResult { + success: false, + message: "No daemon connected. Start a daemon to run tasks.".to_string(), + data: None, + }; + } + }; + + // Check if this is an orchestrator + let subtask_count = match repository::list_subtasks_for_owner(pool, task_id, owner_id).await { + Ok(subtasks) => subtasks.len(), + Err(_) => 0, + }; + let is_orchestrator = task.depth == 0 && subtask_count > 0; + + // Update task status to 'starting' and assign daemon_id + let update_req = crate::db::models::UpdateTaskRequest { + status: Some("starting".to_string()), + daemon_id: Some(target_daemon_id), + version: Some(task.version), + ..Default::default() + }; + + let _updated_task = match repository::update_task_for_owner(pool, task_id, owner_id, update_req).await { + Ok(Some(t)) => t, + Ok(None) => { + return ContractRequestResult { + success: false, + message: "Task not found".to_string(), + data: None, + }; + } + Err(e) => { + return ContractRequestResult { + success: false, + message: format!("Failed to update task: {}", e), + data: None, + }; + } + }; + + // Send SpawnTask command to daemon + let command = DaemonCommand::SpawnTask { + task_id, + task_name: task.name.clone(), + plan: task.plan.clone(), + repo_url: task.repository_url.clone(), + base_branch: task.base_branch.clone(), + target_branch: task.target_branch.clone(), + parent_task_id: task.parent_task_id, + depth: task.depth, + is_orchestrator, + target_repo_path: task.target_repo_path.clone(), + completion_action: task.completion_action.clone(), + continue_from_task_id: task.continue_from_task_id, + copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), + contract_id: task.contract_id, + is_supervisor: task.is_supervisor, + }; + + if let Err(e) = command_sender.send(command).await { + // Rollback: reset status since command failed + let rollback_req = crate::db::models::UpdateTaskRequest { + status: Some("pending".to_string()), + clear_daemon_id: true, + ..Default::default() + }; + let _ = repository::update_task_for_owner(pool, task_id, owner_id, rollback_req).await; + return ContractRequestResult { + success: false, + message: format!("Failed to send task to daemon: {}", e), + data: None, + }; + } + + // Note: TaskUpdateNotification broadcast is handled by the mesh handler when daemon reports status + ContractRequestResult { + success: true, + message: format!("Started task '{}'. The task is now running on a connected daemon.", task.name), + data: Some(json!({ + "taskId": task_id, + "name": task.name, + "status": "starting", + })), + } + } + + ContractToolRequest::GetPhaseInfo => { + let contract = match repository::get_contract_for_owner(pool, contract_id, owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ContractRequestResult { + success: false, + message: "Contract not found".to_string(), + data: None, + } + } + Err(e) => { + return ContractRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + } + } + }; + + let phase_info = get_phase_description(&contract.phase); + let templates = templates_for_phase(&contract.phase); + let template_names: Vec = templates.iter().map(|t| t.name.clone()).collect(); + + ContractRequestResult { + success: true, + message: format!("Contract is in '{}' phase", contract.phase), + data: Some(json!({ + "phase": contract.phase, + "description": phase_info.0, + "activities": phase_info.1, + "suggestedTemplates": template_names, + "nextPhase": get_next_phase(&contract.phase), + })), + } + } + + ContractToolRequest::SuggestPhaseTransition => { + let contract = match get_contract_with_relations(pool, contract_id, owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ContractRequestResult { + success: false, + message: "Contract not found".to_string(), + data: None, + } + } + Err(e) => { + return ContractRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + } + } + }; + + let analysis = analyze_phase_readiness(&contract); + + ContractRequestResult { + success: true, + message: analysis.summary.clone(), + data: Some(json!({ + "currentPhase": contract.contract.phase, + "nextPhase": get_next_phase(&contract.contract.phase), + "ready": analysis.ready, + "summary": analysis.summary, + "reasons": analysis.reasons, + "suggestions": analysis.suggestions, + })), + } + } + + ContractToolRequest::AdvancePhase { new_phase } => { + let contract = match repository::get_contract_for_owner(pool, contract_id, owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ContractRequestResult { + success: false, + message: "Contract not found".to_string(), + data: None, + } + } + Err(e) => { + return ContractRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + } + } + }; + + // Validate phase transition + let current_phase = &contract.phase; + let valid_next = get_next_phase(current_phase); + + if valid_next.as_deref() != Some(&new_phase) { + return ContractRequestResult { + success: false, + message: format!( + "Cannot transition from '{}' to '{}'. Next valid phase is: {:?}", + current_phase, new_phase, valid_next + ), + data: None, + }; + } + + // Update phase + match repository::change_contract_phase_for_owner(pool, contract_id, owner_id, &new_phase).await { + Ok(Some(updated)) => { + // Get deliverables for the new phase + let deliverables = crate::llm::get_phase_deliverables(&new_phase); + + // Build suggested files list + let suggested_files: Vec = deliverables + .recommended_files + .iter() + .map(|f| json!({ + "templateId": f.template_id, + "name": f.name_suggestion, + "priority": format!("{:?}", f.priority).to_lowercase(), + "description": f.description, + })) + .collect(); + + ContractRequestResult { + success: true, + message: format!( + "Advanced contract from '{}' to '{}' phase. {}", + current_phase, new_phase, deliverables.guidance + ), + data: Some(json!({ + "previousPhase": current_phase, + "newPhase": updated.phase, + "phaseGuidance": deliverables.guidance, + "suggestedFiles": suggested_files, + "requiresRepository": deliverables.requires_repository, + "requiresTasks": deliverables.requires_tasks, + })), + } + }, + Ok(None) => ContractRequestResult { + success: false, + message: "Failed to update phase".to_string(), + data: None, + }, + Err(e) => ContractRequestResult { + success: false, + message: format!("Failed to update phase: {}", e), + data: None, + }, + } + } + + ContractToolRequest::AddRepository { + repo_type, + name, + url, + is_primary, + } => { + let add_result = match repo_type.as_str() { + "remote" => { + let url = url.unwrap_or_default(); + repository::add_remote_repository( + pool, + contract_id, + &name, + &url, + is_primary, + ) + .await + } + "local" => { + let path = url.unwrap_or_default(); + repository::add_local_repository( + pool, + contract_id, + &name, + &path, + is_primary, + ) + .await + } + "managed" => { + repository::create_managed_repository(pool, contract_id, &name, is_primary) + .await + } + _ => { + return ContractRequestResult { + success: false, + message: format!("Invalid repository type: {}", repo_type), + data: None, + } + } + }; + + match add_result { + Ok(repo) => ContractRequestResult { + success: true, + message: format!("Added {} repository '{}'", repo_type, name), + data: Some(json!({ + "repositoryId": repo.id, + "name": repo.name, + "isPrimary": repo.is_primary, + })), + }, + Err(e) => ContractRequestResult { + success: false, + message: format!("Failed to add repository: {}", e), + data: None, + }, + } + } + + ContractToolRequest::SetPrimaryRepository { repository_id } => { + match repository::set_repository_primary(pool, repository_id, contract_id).await { + Ok(true) => ContractRequestResult { + success: true, + message: "Set repository as primary".to_string(), + data: Some(json!({ + "repositoryId": repository_id, + })), + }, + Ok(false) => ContractRequestResult { + success: false, + message: "Repository not found".to_string(), + data: None, + }, + Err(e) => ContractRequestResult { + success: false, + message: format!("Failed to set primary repository: {}", e), + data: None, + }, + } + } + + // ============================================================================= + // Phase Guidance Handlers + // ============================================================================= + + ContractToolRequest::GetPhaseChecklist => { + match get_contract_with_relations(pool, contract_id, owner_id).await { + Ok(Some(cwr)) => { + let file_infos: Vec = cwr.files.iter().map(|f| FileInfo { + id: f.id, + name: f.name.clone(), + contract_phase: f.contract_phase.clone(), + }).collect(); + + let task_infos: Vec = cwr.tasks.iter().map(|t| TaskInfo { + id: t.id, + name: t.name.clone(), + status: t.status.clone(), + }).collect(); + + let has_repository = !cwr.repositories.is_empty(); + let checklist = get_phase_checklist(&cwr.contract.phase, &file_infos, &task_infos, has_repository); + + ContractRequestResult { + success: true, + message: checklist.summary.clone(), + data: Some(json!({ + "phase": checklist.phase, + "completionPercentage": checklist.completion_percentage, + "deliverables": checklist.file_deliverables, + "hasRepository": checklist.has_repository, + "repositoryRequired": checklist.repository_required, + "taskStats": checklist.task_stats, + "suggestions": checklist.suggestions, + "summary": checklist.summary, + })), + } + } + Ok(None) => ContractRequestResult { + success: false, + message: "Contract not found".to_string(), + data: None, + }, + Err(e) => ContractRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + }, + } + } + + // ============================================================================= + // Task Derivation Handlers + // ============================================================================= + + ContractToolRequest::DeriveTasksFromFile { file_id } => { + // First get the file + match repository::get_file_for_owner(pool, file_id, owner_id).await { + Ok(Some(file)) => { + // Verify file belongs to this contract + if file.contract_id != Some(contract_id) { + return ContractRequestResult { + success: false, + message: "File does not belong to this contract".to_string(), + data: None, + }; + } + + // Convert body to markdown for task parsing + let markdown = body_to_markdown(&file.body); + + // Parse tasks from the content + let parse_result = parse_tasks_from_breakdown(&markdown); + + ContractRequestResult { + success: true, + message: format!("Found {} tasks in file '{}'", parse_result.total, file.name), + data: Some(json!({ + "fileId": file_id, + "fileName": file.name, + "tasks": parse_result.tasks, + "groups": parse_result.groups, + "total": parse_result.total, + "warnings": parse_result.warnings, + "formatted": format_parsed_tasks(&parse_result), + })), + } + } + Ok(None) => ContractRequestResult { + success: false, + message: "File not found".to_string(), + data: None, + }, + Err(e) => ContractRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + }, + } + } + + ContractToolRequest::CreateChainedTasks { tasks } => { + // Get primary repository for tasks + let repo_url = match get_contract_with_relations(pool, contract_id, owner_id).await { + Ok(Some(contract)) => { + contract + .repositories + .iter() + .find(|r| r.is_primary) + .and_then(|r| r.repository_url.clone().or(r.local_path.clone())) + } + _ => None, + }; + + let mut created_tasks = Vec::new(); + let mut previous_task_id: Option = None; + + for task_def in &tasks { + let create_req = CreateTaskRequest { + contract_id, + name: task_def.name.clone(), + description: None, + plan: task_def.plan.clone(), + parent_task_id: None, + repository_url: repo_url.clone(), + base_branch: None, + target_branch: None, + merge_mode: None, + priority: 0, + target_repo_path: None, + completion_action: None, + continue_from_task_id: previous_task_id, + copy_files: None, + is_supervisor: false, + checkpoint_sha: None, + }; + + match repository::create_task_for_owner(pool, owner_id, create_req).await { + Ok(task) => { + created_tasks.push(json!({ + "taskId": task.id, + "name": task.name, + "status": task.status, + "chainedFrom": previous_task_id, + })); + previous_task_id = Some(task.id); + } + Err(e) => { + return ContractRequestResult { + success: false, + message: format!("Failed to create task '{}': {}", task_def.name, e), + data: Some(json!({ + "createdSoFar": created_tasks, + })), + }; + } + } + } + + ContractRequestResult { + success: true, + message: format!("Created {} chained tasks", created_tasks.len()), + data: Some(json!({ + "tasks": created_tasks, + "total": created_tasks.len(), + })), + } + } + + // ============================================================================= + // Task Completion Processing Handlers + // ============================================================================= + + ContractToolRequest::ProcessTaskCompletion { task_id } => { + // Get the task + match repository::get_task_for_owner(pool, task_id, owner_id).await { + Ok(Some(task)) => { + // Verify task belongs to this contract + if task.contract_id != Some(contract_id) { + return ContractRequestResult { + success: false, + message: "Task does not belong to this contract".to_string(), + data: None, + }; + } + + // Get contract for context + let contract = get_contract_with_relations(pool, contract_id, owner_id).await.ok().flatten(); + + let total_tasks = contract.as_ref().map(|c| c.tasks.len()).unwrap_or(0); + let completed_tasks = contract.as_ref() + .map(|c| c.tasks.iter().filter(|t| t.status == "done").count()) + .unwrap_or(0); + + // Note: Finding next chained task would require querying full Task objects + // Since TaskSummary doesn't have continue_from_task_id, we skip this for now + let next_task: Option<(Uuid, String)> = None; + + // Find Dev Notes file if exists + let dev_notes = if let Some(ref c) = contract { + c.files.iter() + .find(|f| f.name.to_lowercase().contains("dev") && f.name.to_lowercase().contains("notes")) + .map(|f| (f.id, f.name.clone())) + } else { + None + }; + + let contract_phase = contract.as_ref() + .map(|c| c.contract.phase.clone()) + .unwrap_or_else(|| "execute".to_string()); + + // Analyze the task output + let analysis = analyze_task_output( + task_id, + &task.name, + task.last_output.as_deref(), + task.progress_summary.as_deref(), + &contract_phase, + total_tasks, + completed_tasks, + next_task, + dev_notes, + ); + + ContractRequestResult { + success: true, + message: format!("Analyzed completion of task '{}'", task.name), + data: Some(json!({ + "taskId": task_id, + "taskName": task.name, + "taskStatus": task.status, + "summary": analysis.summary, + "filesAffected": analysis.files_affected, + "nextSteps": analysis.next_steps, + "phaseImpact": analysis.phase_impact, + })), + } + } + Ok(None) => ContractRequestResult { + success: false, + message: "Task not found".to_string(), + data: None, + }, + Err(e) => ContractRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + }, + } + } + + ContractToolRequest::UpdateFileFromTask { file_id, task_id, section_title } => { + // Get the task + let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ContractRequestResult { + success: false, + message: "Task not found".to_string(), + data: None, + }; + } + Err(e) => { + return ContractRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + }; + } + }; + + // Get the file + let file = match repository::get_file_for_owner(pool, file_id, owner_id).await { + Ok(Some(f)) => f, + Ok(None) => { + return ContractRequestResult { + success: false, + message: "File not found".to_string(), + data: None, + }; + } + Err(e) => { + return ContractRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + }; + } + }; + + // Verify file belongs to this contract + if file.contract_id != Some(contract_id) { + return ContractRequestResult { + success: false, + message: "File does not belong to this contract".to_string(), + data: None, + }; + } + + // Build the section to add + let title = section_title.unwrap_or_else(|| format!("Task: {}", task.name)); + let result_text = task.last_output.as_deref().unwrap_or("Task completed"); + + // Create new body elements to append + let mut new_body = file.body.clone(); + new_body.push(crate::db::models::BodyElement::Heading { + level: 2, + text: title, + }); + new_body.push(crate::db::models::BodyElement::Paragraph { + text: format!("Status: {}", task.status), + }); + new_body.push(crate::db::models::BodyElement::Paragraph { + text: result_text.to_string(), + }); + + // Update the file using UpdateFileRequest + let update_req = UpdateFileRequest { + name: None, + description: None, + transcript: None, + summary: None, + body: Some(new_body), + version: None, // Don't require version for this update + repo_file_path: None, + }; + + match repository::update_file_for_owner(pool, file_id, owner_id, update_req).await { + Ok(Some(updated_file)) => { + ContractRequestResult { + success: true, + message: format!("Updated file '{}' with task summary", file.name), + data: Some(json!({ + "fileId": file_id, + "fileName": updated_file.name, + "taskId": task_id, + "taskName": task.name, + })), + } + } + Ok(None) => ContractRequestResult { + success: false, + message: "Failed to update file".to_string(), + data: None, + }, + Err(e) => ContractRequestResult { + success: false, + message: format!("Database error: {}", e), + data: None, + }, + } + } + } +} + +/// Get description and activities for a phase +fn get_phase_description(phase: &str) -> (String, Vec) { + match phase { + "research" => ( + "Gather information, analyze competitors, and understand user needs".to_string(), + vec![ + "Conduct user research".to_string(), + "Analyze competitors".to_string(), + "Document findings".to_string(), + "Identify opportunities".to_string(), + ], + ), + "specify" => ( + "Define requirements, user stories, and acceptance criteria".to_string(), + vec![ + "Write requirements".to_string(), + "Create user stories".to_string(), + "Define acceptance criteria".to_string(), + "Document constraints".to_string(), + ], + ), + "plan" => ( + "Design architecture, create task breakdowns, and technical designs".to_string(), + vec![ + "Design system architecture".to_string(), + "Create technical specifications".to_string(), + "Break down into tasks".to_string(), + "Plan implementation order".to_string(), + ], + ), + "execute" => ( + "Implement features, write code, and run tasks".to_string(), + vec![ + "Implement features".to_string(), + "Write tests".to_string(), + "Track progress".to_string(), + "Document implementation details".to_string(), + ], + ), + "review" => ( + "Review work, create release notes, and conduct retrospectives".to_string(), + vec![ + "Review code and features".to_string(), + "Create release notes".to_string(), + "Conduct retrospective".to_string(), + "Document learnings".to_string(), + ], + ), + _ => ( + "Unknown phase".to_string(), + vec![], + ), + } +} + +/// Get the next phase in the lifecycle +fn get_next_phase(current: &str) -> Option { + match current { + "research" => Some("specify".to_string()), + "specify" => Some("plan".to_string()), + "plan" => Some("execute".to_string()), + "execute" => Some("review".to_string()), + "review" => None, // Final phase + _ => None, + } +} + +/// Phase readiness analysis result +struct PhaseReadinessAnalysis { + ready: bool, + summary: String, + reasons: Vec, + suggestions: Vec, +} + +/// Analyze if the contract is ready to transition to the next phase +fn analyze_phase_readiness(contract: &crate::db::models::ContractWithRelations) -> PhaseReadinessAnalysis { + let mut reasons = Vec::new(); + let mut suggestions = Vec::new(); + + match contract.contract.phase.as_str() { + "research" => { + // Check for research files + let research_files = contract.files.iter() + .filter(|f| f.contract_phase.as_deref() == Some("research")) + .count(); + + if research_files == 0 { + reasons.push("No research documents created yet".to_string()); + suggestions.push("Create research notes or competitor analysis documents".to_string()); + } else { + reasons.push(format!("{} research document(s) created", research_files)); + } + + let ready = research_files > 0; + PhaseReadinessAnalysis { + ready, + summary: if ready { + "Research phase has documentation. Consider transitioning to Specify phase.".to_string() + } else { + "Research phase needs more documentation before transitioning.".to_string() + }, + reasons, + suggestions, + } + } + "specify" => { + let spec_files = contract.files.iter() + .filter(|f| f.contract_phase.as_deref() == Some("specify")) + .count(); + + if spec_files == 0 { + reasons.push("No specification documents created yet".to_string()); + suggestions.push("Create requirements or user stories documents".to_string()); + } else { + reasons.push(format!("{} specification document(s) created", spec_files)); + } + + let ready = spec_files > 0; + PhaseReadinessAnalysis { + ready, + summary: if ready { + "Specification phase has documentation. Consider transitioning to Plan phase.".to_string() + } else { + "Specification phase needs requirements or user stories.".to_string() + }, + reasons, + suggestions, + } + } + "plan" => { + let plan_files = contract.files.iter() + .filter(|f| f.contract_phase.as_deref() == Some("plan")) + .count(); + + let has_repos = !contract.repositories.is_empty(); + + if plan_files == 0 { + reasons.push("No planning documents created yet".to_string()); + suggestions.push("Create architecture or task breakdown documents".to_string()); + } else { + reasons.push(format!("{} planning document(s) created", plan_files)); + } + + if !has_repos { + reasons.push("No repositories configured".to_string()); + suggestions.push("Add a repository for task execution".to_string()); + } else { + reasons.push(format!("{} repository(ies) configured", contract.repositories.len())); + } + + let ready = plan_files > 0 && has_repos; + PhaseReadinessAnalysis { + ready, + summary: if ready { + "Planning phase complete with documents and repositories. Ready for Execute phase.".to_string() + } else { + "Planning phase needs documentation and/or repository configuration.".to_string() + }, + reasons, + suggestions, + } + } + "execute" => { + let total_tasks = contract.tasks.len(); + let done_tasks = contract.tasks.iter().filter(|t| t.status == "done").count(); + let running_tasks = contract.tasks.iter().filter(|t| t.status == "running").count(); + + if total_tasks == 0 { + reasons.push("No tasks created yet".to_string()); + suggestions.push("Create tasks to implement the planned work".to_string()); + } else { + reasons.push(format!("{} of {} tasks completed", done_tasks, total_tasks)); + } + + if running_tasks > 0 { + reasons.push(format!("{} task(s) still running", running_tasks)); + suggestions.push("Wait for running tasks to complete".to_string()); + } + + let ready = total_tasks > 0 && done_tasks == total_tasks; + PhaseReadinessAnalysis { + ready, + summary: if ready { + "All tasks completed. Ready for Review phase.".to_string() + } else if total_tasks == 0 { + "No tasks created yet. Create and complete tasks before reviewing.".to_string() + } else { + format!("{}/{} tasks complete. Finish remaining tasks before review.", done_tasks, total_tasks) + }, + reasons, + suggestions, + } + } + "review" => { + let review_files = contract.files.iter() + .filter(|f| f.contract_phase.as_deref() == Some("review")) + .count(); + + if review_files == 0 { + suggestions.push("Create review checklist or release notes".to_string()); + } + + PhaseReadinessAnalysis { + ready: false, + summary: "Review is the final phase. Contract can be marked as complete when review is done.".to_string(), + reasons: vec!["Review phase is the final phase".to_string()], + suggestions, + } + } + _ => PhaseReadinessAnalysis { + ready: false, + summary: "Unknown phase".to_string(), + reasons: vec!["Phase not recognized".to_string()], + suggestions: vec![], + }, + } +} + +// ============================================================================= +// Contract Chat History Endpoints +// ============================================================================= + +/// Get contract chat history +#[utoipa::path( + get, + path = "/api/v1/contracts/{id}/chat/history", + responses( + (status = 200, description = "Chat history retrieved successfully", body = ContractChatHistoryResponse), + (status = 401, description = "Unauthorized"), + (status = 404, description = "Contract not found"), + (status = 500, description = "Internal server error") + ), + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Contracts" +)] +pub async fn get_contract_chat_history( + State(state): State, + Authenticated(auth): Authenticated, + Path(contract_id): Path, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(json!({ "error": "Database not configured" })), + ) + .into_response(); + }; + + // Verify contract exists and belongs to owner + match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(json!({ "error": "Contract not found" })), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Database error: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("Database error: {}", e) })), + ) + .into_response(); + } + } + + // Get or create conversation + let conversation = match repository::get_or_create_contract_conversation(pool, contract_id, auth.owner_id).await { + Ok(conv) => conv, + Err(e) => { + tracing::error!("Failed to get contract conversation: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("Failed to get conversation: {}", e) })), + ) + .into_response(); + } + }; + + // Get messages + let messages = match repository::list_contract_chat_messages(pool, conversation.id, Some(100)).await { + Ok(msgs) => msgs, + Err(e) => { + tracing::error!("Failed to list contract chat messages: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("Failed to list messages: {}", e) })), + ) + .into_response(); + } + }; + + ( + StatusCode::OK, + Json(ContractChatHistoryResponse { + contract_id, + conversation_id: conversation.id, + messages, + }), + ) + .into_response() +} + +/// Clear contract chat history (creates a new conversation) +#[utoipa::path( + delete, + path = "/api/v1/contracts/{id}/chat/history", + responses( + (status = 200, description = "Chat history cleared successfully"), + (status = 401, description = "Unauthorized"), + (status = 404, description = "Contract not found"), + (status = 500, description = "Internal server error") + ), + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Contracts" +)] +pub async fn clear_contract_chat_history( + State(state): State, + Authenticated(auth): Authenticated, + Path(contract_id): Path, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(json!({ "error": "Database not configured" })), + ) + .into_response(); + }; + + // Verify contract exists and belongs to owner + match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(json!({ "error": "Contract not found" })), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Database error: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("Database error: {}", e) })), + ) + .into_response(); + } + } + + // Clear conversation (archives existing and creates new) + match repository::clear_contract_conversation(pool, contract_id, auth.owner_id).await { + Ok(new_conversation) => { + ( + StatusCode::OK, + Json(json!({ + "message": "Chat history cleared", + "newConversationId": new_conversation.id + })), + ) + .into_response() + } + Err(e) => { + tracing::error!("Failed to clear contract conversation: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("Failed to clear history: {}", e) })), + ) + .into_response() + } + } +} diff --git a/makima/src/server/handlers/contract_daemon.rs b/makima/src/server/handlers/contract_daemon.rs new file mode 100644 index 0000000..13c5640 --- /dev/null +++ b/makima/src/server/handlers/contract_daemon.rs @@ -0,0 +1,960 @@ +//! HTTP handlers for daemon-to-contract interaction. +//! +//! These endpoints allow tasks running in daemons to interact with their +//! associated contracts via the contract.sh script. Authentication is via +//! tool keys registered by the daemon when starting a task. + +use axum::{ + extract::{Path, State}, + http::StatusCode, + response::IntoResponse, + Json, +}; +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; +use uuid::Uuid; + +use crate::db::{models::FileSummary, repository}; +use crate::llm::phase_guidance::{self, FileInfo, PhaseChecklist, TaskInfo}; +use crate::server::auth::Authenticated; +use crate::server::messages::ApiError; +use crate::server::state::SharedState; + +// ============================================================================= +// Request/Response Types +// ============================================================================= + +/// Contract status response for daemon. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ContractStatusResponse { + pub id: Uuid, + pub name: String, + pub phase: String, + pub status: String, + pub description: Option, +} + +/// Contract goals response. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ContractGoalsResponse { + /// Description serves as goals for the contract + pub description: Option, + pub phase: String, + pub phase_guidance: String, +} + +/// Progress report request from daemon. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ProgressReportRequest { + pub message: String, + #[serde(default)] + pub task_id: Option, +} + +/// Suggested action from server. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SuggestedActionResponse { + pub action: String, + pub description: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub data: Option, +} + +/// Completion action request. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CompletionActionRequest { + #[serde(default)] + pub task_id: Option, + #[serde(default)] + pub files_modified: Vec, + #[serde(default)] + pub lines_added: i32, + #[serde(default)] + pub lines_removed: i32, + #[serde(default)] + pub has_code_changes: bool, +} + +/// Recommended completion action. +#[derive(Debug, Clone, Serialize, ToSchema)] +#[serde(rename_all = "lowercase")] +pub enum CompletionAction { + Branch, + Merge, + Pr, + None, +} + +impl std::fmt::Display for CompletionAction { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + CompletionAction::Branch => write!(f, "branch"), + CompletionAction::Merge => write!(f, "merge"), + CompletionAction::Pr => write!(f, "pr"), + CompletionAction::None => write!(f, "none"), + } + } +} + +/// Completion action response. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CompletionActionResponse { + pub action: String, + pub reason: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub branch_name: Option, +} + +/// Create file request from daemon. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreateFileRequest { + pub name: String, + pub content: String, + #[serde(default)] + pub template_id: Option, +} + +/// Update file request from daemon. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct DaemonUpdateFileRequest { + /// Content to update in the file (as markdown body element) + pub content: String, +} + +// ============================================================================= +// Handlers +// ============================================================================= + +/// Get contract status for daemon. +#[utoipa::path( + get, + path = "/api/v1/contracts/{id}/daemon/status", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + responses( + (status = 200, description = "Contract status", body = ContractStatusResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + ), + security( + ("tool_key" = []), + ("api_key" = []) + ), + tag = "Contract Daemon" +)] +pub async fn get_contract_status( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match repository::get_contract_for_owner(pool, id, auth.owner_id).await { + Ok(Some(contract)) => Json(ContractStatusResponse { + id: contract.id, + name: contract.name, + phase: contract.phase, + status: contract.status, + description: contract.description, + }) + .into_response(), + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to get contract {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Get phase deliverables checklist. +#[utoipa::path( + get, + path = "/api/v1/contracts/{id}/daemon/checklist", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + responses( + (status = 200, description = "Phase checklist", body = PhaseChecklist), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + ), + security( + ("tool_key" = []), + ("api_key" = []) + ), + tag = "Contract Daemon" +)] +pub async fn get_contract_checklist( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get contract + let contract = match repository::get_contract_for_owner(pool, id, auth.owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get files for this contract + let files = match repository::list_files_in_contract(pool, id, auth.owner_id).await { + Ok(f) => f + .into_iter() + .map(|f| FileInfo { + id: f.id, + name: f.name, + contract_phase: f.contract_phase, + }) + .collect::>(), + Err(e) => { + tracing::warn!("Failed to get files for contract {}: {}", id, e); + Vec::new() + } + }; + + // Get tasks for this contract + let tasks = match repository::list_tasks_in_contract(pool, id, auth.owner_id).await { + Ok(t) => t + .into_iter() + .map(|t| TaskInfo { + id: t.id, + name: t.name, + status: t.status, + }) + .collect::>(), + Err(e) => { + tracing::warn!("Failed to get tasks for contract {}: {}", id, e); + Vec::new() + } + }; + + // Check if repository is configured + let has_repository = match repository::list_contract_repositories(pool, id).await { + Ok(repos) => !repos.is_empty(), + Err(_) => false, + }; + + let checklist = phase_guidance::get_phase_checklist(&contract.phase, &files, &tasks, has_repository); + + Json(checklist).into_response() +} + +/// Get contract goals. +#[utoipa::path( + get, + path = "/api/v1/contracts/{id}/daemon/goals", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + responses( + (status = 200, description = "Contract goals", body = ContractGoalsResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + ), + security( + ("tool_key" = []), + ("api_key" = []) + ), + tag = "Contract Daemon" +)] +pub async fn get_contract_goals( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match repository::get_contract_for_owner(pool, id, auth.owner_id).await { + Ok(Some(contract)) => { + let deliverables = phase_guidance::get_phase_deliverables(&contract.phase); + Json(ContractGoalsResponse { + description: contract.description, + phase: contract.phase, + phase_guidance: deliverables.guidance, + }) + .into_response() + } + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to get contract {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Post progress report to contract. +#[utoipa::path( + post, + path = "/api/v1/contracts/{id}/daemon/report", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + request_body = ProgressReportRequest, + responses( + (status = 200, description = "Report received"), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + ), + security( + ("tool_key" = []), + ("api_key" = []) + ), + tag = "Contract Daemon" +)] +pub async fn post_progress_report( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, + Json(req): Json, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify contract exists + match repository::get_contract_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + + // Log the report as a contract event + let event_type = "progress_report"; + let payload = serde_json::json!({ + "message": req.message, + "task_id": req.task_id, + }); + + if let Err(e) = repository::record_contract_event(pool, id, event_type, Some(payload)).await { + tracing::warn!("Failed to create contract event: {}", e); + } + + Json(serde_json::json!({"status": "received"})).into_response() +} + +/// Get suggested action based on contract state. +#[utoipa::path( + post, + path = "/api/v1/contracts/{id}/daemon/suggest-action", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + responses( + (status = 200, description = "Suggested action", body = SuggestedActionResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + ), + security( + ("tool_key" = []), + ("api_key" = []) + ), + tag = "Contract Daemon" +)] +pub async fn get_suggest_action( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get contract + let contract = match repository::get_contract_for_owner(pool, id, auth.owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get files and tasks for checklist + let files = repository::list_files_in_contract(pool, id, auth.owner_id) + .await + .unwrap_or_default() + .into_iter() + .map(|f| FileInfo { + id: f.id, + name: f.name, + contract_phase: f.contract_phase, + }) + .collect::>(); + + let tasks = repository::list_tasks_in_contract(pool, id, auth.owner_id) + .await + .unwrap_or_default() + .into_iter() + .map(|t| TaskInfo { + id: t.id, + name: t.name, + status: t.status, + }) + .collect::>(); + + let has_repository = repository::list_contract_repositories(pool, id) + .await + .map(|r| !r.is_empty()) + .unwrap_or(false); + + let checklist = phase_guidance::get_phase_checklist(&contract.phase, &files, &tasks, has_repository); + + // Determine suggested action based on checklist + let (action, description) = if !checklist.suggestions.is_empty() { + ("follow_suggestion", checklist.suggestions.first().unwrap().clone()) + } else if checklist.completion_percentage >= 100 { + ("advance_phase", format!("Phase {} is complete, consider advancing to next phase", contract.phase)) + } else { + ("continue", format!("Continue working on {} phase ({}% complete)", contract.phase, checklist.completion_percentage)) + }; + + Json(SuggestedActionResponse { + action: action.to_string(), + description, + data: None, + }) + .into_response() +} + +/// Get recommended completion action. +#[utoipa::path( + post, + path = "/api/v1/contracts/{id}/daemon/completion-action", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + request_body = CompletionActionRequest, + responses( + (status = 200, description = "Recommended completion action", body = CompletionActionResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + ), + security( + ("tool_key" = []), + ("api_key" = []) + ), + tag = "Contract Daemon" +)] +pub async fn get_completion_action( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, + Json(req): Json, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get contract + let contract = match repository::get_contract_for_owner(pool, id, auth.owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Determine completion action based on phase and changes + let has_changes = !req.files_modified.is_empty() || req.lines_added > 0 || req.lines_removed > 0; + let has_significant_changes = req.lines_added + req.lines_removed > 50; + + let (action, reason) = match contract.phase.as_str() { + "research" | "specify" => { + if has_changes { + (CompletionAction::Merge, "Early phase changes can be merged directly".to_string()) + } else { + (CompletionAction::None, "No changes to commit".to_string()) + } + } + "plan" => { + if has_significant_changes { + (CompletionAction::Pr, "Significant planning changes require review".to_string()) + } else if has_changes { + (CompletionAction::Merge, "Minor planning changes can be merged".to_string()) + } else { + (CompletionAction::None, "No changes to commit".to_string()) + } + } + "execute" => { + if req.has_code_changes { + (CompletionAction::Pr, "Code changes in execute phase require review".to_string()) + } else if has_changes { + (CompletionAction::Branch, "Documentation changes can be branched".to_string()) + } else { + (CompletionAction::None, "No changes to commit".to_string()) + } + } + "review" => { + if has_changes { + (CompletionAction::Pr, "Review phase changes should be reviewed".to_string()) + } else { + (CompletionAction::None, "No changes to commit".to_string()) + } + } + _ => (CompletionAction::None, "Unknown phase".to_string()), + }; + + // Generate branch name based on contract + let branch_name = if matches!(action, CompletionAction::Branch | CompletionAction::Pr) { + let slug = contract.name.to_lowercase().replace(' ', "-"); + Some(format!("contract/{}", slug)) + } else { + None + }; + + Json(CompletionActionResponse { + action: action.to_string(), + reason, + branch_name, + }) + .into_response() +} + +/// List contract files for daemon. +#[utoipa::path( + get, + path = "/api/v1/contracts/{id}/daemon/files", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + responses( + (status = 200, description = "List of contract files", body = Vec), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + ), + security( + ("tool_key" = []), + ("api_key" = []) + ), + tag = "Contract Daemon" +)] +pub async fn list_contract_files( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify contract exists + match repository::get_contract_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + + match repository::list_files_in_contract(pool, id, auth.owner_id).await { + Ok(files) => Json(files).into_response(), + Err(e) => { + tracing::error!("Failed to list files for contract {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Get a specific contract file. +#[utoipa::path( + get, + path = "/api/v1/contracts/{id}/daemon/files/{file_id}", + params( + ("id" = Uuid, Path, description = "Contract ID"), + ("file_id" = Uuid, Path, description = "File ID") + ), + responses( + (status = 200, description = "File content"), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract or file not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + ), + security( + ("tool_key" = []), + ("api_key" = []) + ), + tag = "Contract Daemon" +)] +pub async fn get_contract_file( + State(state): State, + Authenticated(auth): Authenticated, + Path((id, file_id)): Path<(Uuid, Uuid)>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify contract exists + match repository::get_contract_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + + // Get file and verify it belongs to this contract + match repository::get_file_for_owner(pool, file_id, auth.owner_id).await { + Ok(Some(file)) => { + if file.contract_id != Some(id) { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "File not found in this contract")), + ) + .into_response(); + } + Json(file).into_response() + } + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "File not found")), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to get file {}: {}", file_id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Update a contract file. +#[utoipa::path( + put, + path = "/api/v1/contracts/{id}/daemon/files/{file_id}", + params( + ("id" = Uuid, Path, description = "Contract ID"), + ("file_id" = Uuid, Path, description = "File ID") + ), + request_body = DaemonUpdateFileRequest, + responses( + (status = 200, description = "File updated"), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract or file not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + ), + security( + ("tool_key" = []), + ("api_key" = []) + ), + tag = "Contract Daemon" +)] +pub async fn update_contract_file( + State(state): State, + Authenticated(auth): Authenticated, + Path((id, file_id)): Path<(Uuid, Uuid)>, + Json(req): Json, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify contract exists + match repository::get_contract_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + + // Get file and verify it belongs to this contract + let file = match repository::get_file_for_owner(pool, file_id, auth.owner_id).await { + Ok(Some(f)) => f, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "File not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get file {}: {}", file_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + if file.contract_id != Some(id) { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "File not found in this contract")), + ) + .into_response(); + } + + // Update the file with content parsed as markdown + let body = crate::llm::markdown_to_body(&req.content); + let update_req = crate::db::models::UpdateFileRequest { + name: None, + description: None, + transcript: None, + summary: None, + body: Some(body), + version: None, + repo_file_path: None, + }; + + match repository::update_file_for_owner(pool, file_id, auth.owner_id, update_req).await { + Ok(Some(updated)) => Json(updated).into_response(), + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "File not found")), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to update file {}: {}", file_id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", format!("{}", e))), + ) + .into_response() + } + } +} + +/// Create a new contract file. +#[utoipa::path( + post, + path = "/api/v1/contracts/{id}/daemon/files", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + request_body = CreateFileRequest, + responses( + (status = 201, description = "File created"), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + ), + security( + ("tool_key" = []), + ("api_key" = []) + ), + tag = "Contract Daemon" +)] +pub async fn create_contract_file( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, + Json(req): Json, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify contract exists + match repository::get_contract_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Create the file with content parsed as markdown + let body = crate::llm::markdown_to_body(&req.content); + let create_req = crate::db::models::CreateFileRequest { + contract_id: id, + name: Some(req.name), + description: None, + transcript: vec![], + location: None, + body, + repo_file_path: None, + contract_phase: None, // Will be looked up from contract's current phase + }; + + match repository::create_file_for_owner(pool, auth.owner_id, create_req).await { + Ok(file) => (StatusCode::CREATED, Json(file)).into_response(), + Err(e) => { + tracing::error!("Failed to create file for contract {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} diff --git a/makima/src/server/handlers/contracts.rs b/makima/src/server/handlers/contracts.rs new file mode 100644 index 0000000..3d726df --- /dev/null +++ b/makima/src/server/handlers/contracts.rs @@ -0,0 +1,1284 @@ +//! HTTP handlers for contract CRUD operations. + +use axum::{ + extract::{Path, State}, + http::StatusCode, + response::IntoResponse, + Json, +}; +use uuid::Uuid; + +use crate::db::models::{ + AddLocalRepositoryRequest, AddRemoteRepositoryRequest, ChangePhaseRequest, + ContractListResponse, ContractRepository, ContractSummary, ContractWithRelations, + CreateContractRequest, CreateManagedRepositoryRequest, UpdateContractRequest, + UpdateTaskRequest, +}; +use crate::db::repository::{self, RepositoryError}; +use crate::server::auth::Authenticated; +use crate::server::messages::ApiError; +use crate::server::state::SharedState; + +/// Helper function to update the supervisor task with repository info when a primary repo is added. +/// This ensures the supervisor has access to the repository when it starts. +async fn update_supervisor_with_repo_if_needed( + pool: &sqlx::PgPool, + contract_id: uuid::Uuid, + owner_id: uuid::Uuid, + repo: &ContractRepository, +) { + // Only update for primary repositories + if !repo.is_primary { + return; + } + + // Get the supervisor task + let supervisor = match repository::get_contract_supervisor_task(pool, contract_id).await { + Ok(Some(s)) => s, + Ok(None) => { + tracing::debug!(contract_id = %contract_id, "No supervisor task found"); + return; + } + Err(e) => { + tracing::warn!(contract_id = %contract_id, error = %e, "Failed to get supervisor task"); + return; + } + }; + + // Only update if supervisor doesn't have a repository URL yet + if supervisor.repository_url.is_some() { + tracing::debug!( + supervisor_id = %supervisor.id, + "Supervisor already has repository URL" + ); + return; + } + + // Get repository URL (for remote repos) or local path (for local repos) + let repo_url = repo.repository_url.clone().or_else(|| repo.local_path.clone()); + + if repo_url.is_none() && repo.source_type != "managed" { + tracing::debug!( + supervisor_id = %supervisor.id, + "Repository has no URL or path to assign" + ); + return; + } + + // Update supervisor task with repository info + let update_req = UpdateTaskRequest { + repository_url: repo_url, + version: Some(supervisor.version), + ..Default::default() + }; + + match repository::update_task_for_owner(pool, supervisor.id, owner_id, update_req).await { + Ok(Some(updated)) => { + tracing::info!( + supervisor_id = %updated.id, + repository_url = ?updated.repository_url, + "Updated supervisor task with repository URL" + ); + } + Ok(None) => { + tracing::warn!(supervisor_id = %supervisor.id, "Supervisor task not found during update"); + } + Err(e) => { + tracing::warn!( + supervisor_id = %supervisor.id, + error = %e, + "Failed to update supervisor with repository URL" + ); + } + } +} + +/// List all root contracts (no parent) for the authenticated user's owner. +#[utoipa::path( + get, + path = "/api/v1/contracts", + responses( + (status = 200, description = "List of root contracts", body = ContractListResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Contracts" +)] +pub async fn list_contracts( + State(state): State, + Authenticated(auth): Authenticated, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match repository::list_contracts_for_owner(pool, auth.owner_id).await { + Ok(contracts) => { + let total = contracts.len() as i64; + Json(ContractListResponse { contracts, total }).into_response() + } + Err(e) => { + tracing::error!("Failed to list contracts: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Get a contract by ID with all its relations (repositories, files, tasks). +#[utoipa::path( + get, + path = "/api/v1/contracts/{id}", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + responses( + (status = 200, description = "Contract details with relations", body = ContractWithRelations), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Contracts" +)] +pub async fn get_contract( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get the contract + let contract = match repository::get_contract_for_owner(pool, id, auth.owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get repositories + let repositories = match repository::list_contract_repositories(pool, id).await { + Ok(r) => r, + Err(e) => { + tracing::warn!("Failed to get repositories for {}: {}", id, e); + Vec::new() + } + }; + + // Get files + let files = match repository::list_files_in_contract(pool, id, auth.owner_id).await { + Ok(f) => f, + Err(e) => { + tracing::warn!("Failed to get files for contract {}: {}", id, e); + Vec::new() + } + }; + + // Get tasks + let tasks = match repository::list_tasks_in_contract(pool, id, auth.owner_id).await { + Ok(t) => t, + Err(e) => { + tracing::warn!("Failed to get tasks for contract {}: {}", id, e); + Vec::new() + } + }; + + Json(ContractWithRelations { + contract, + repositories, + files, + tasks, + }) + .into_response() +} + +/// Create a new contract. +#[utoipa::path( + post, + path = "/api/v1/contracts", + request_body = CreateContractRequest, + responses( + (status = 201, description = "Contract created", body = ContractSummary), + (status = 400, description = "Invalid request", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Contracts" +)] +pub async fn create_contract( + State(state): State, + Authenticated(auth): Authenticated, + Json(req): Json, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match repository::create_contract_for_owner(pool, auth.owner_id, req.clone()).await { + Ok(contract) => { + // Create supervisor task for this contract + let supervisor_name = format!("{} Supervisor", contract.name); + let supervisor_plan = format!( + "You are the supervisor for contract '{}'. Your goal is to drive this contract to completion.\n\n{}", + contract.name, + contract.description.as_deref().unwrap_or("No description provided.") + ); + + // Get repository info from contract if available + let repo_url = { + // Try to get the first repository associated with this contract + match repository::list_contract_repositories(pool, contract.id).await { + Ok(repos) if !repos.is_empty() => { + let repo = &repos[0]; + repo.repository_url.clone() + } + _ => None, + } + }; + + let supervisor_req = crate::db::models::CreateTaskRequest { + name: supervisor_name, + description: None, + plan: supervisor_plan, + repository_url: repo_url, + base_branch: None, + target_branch: None, + parent_task_id: None, + contract_id: contract.id, + target_repo_path: None, + completion_action: None, + continue_from_task_id: None, + copy_files: None, + is_supervisor: true, + checkpoint_sha: None, + priority: 0, + merge_mode: None, + }; + + match repository::create_task_for_owner(pool, auth.owner_id, supervisor_req).await { + Ok(supervisor_task) => { + tracing::info!( + contract_id = %contract.id, + supervisor_task_id = %supervisor_task.id, + is_supervisor = supervisor_task.is_supervisor, + "Created supervisor task for contract" + ); + + // Update contract with supervisor_task_id + let update_req = crate::db::models::UpdateContractRequest { + supervisor_task_id: Some(supervisor_task.id), + version: Some(contract.version), + ..Default::default() + }; + if let Err(e) = repository::update_contract_for_owner(pool, contract.id, auth.owner_id, update_req).await { + tracing::warn!( + contract_id = %contract.id, + error = %e, + "Failed to link supervisor task to contract" + ); + } + } + Err(e) => { + tracing::warn!( + contract_id = %contract.id, + error = %e, + "Failed to create supervisor task for contract" + ); + } + } + + // Get the summary version with counts + match repository::get_contract_summary_for_owner(pool, contract.id, auth.owner_id).await + { + Ok(Some(summary)) => (StatusCode::CREATED, Json(summary)).into_response(), + Ok(None) => { + // Shouldn't happen, but return basic info if it does + ( + StatusCode::CREATED, + Json(ContractSummary { + id: contract.id, + name: contract.name, + description: contract.description, + phase: contract.phase, + status: contract.status, + file_count: 0, + task_count: 0, + repository_count: 0, + version: contract.version, + created_at: contract.created_at, + }), + ) + .into_response() + } + Err(e) => { + tracing::warn!("Failed to get contract summary: {}", e); + ( + StatusCode::CREATED, + Json(ContractSummary { + id: contract.id, + name: contract.name, + description: contract.description, + phase: contract.phase, + status: contract.status, + file_count: 0, + task_count: 0, + repository_count: 0, + version: contract.version, + created_at: contract.created_at, + }), + ) + .into_response() + } + } + } + Err(e) => { + tracing::error!("Failed to create contract: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Update a contract. +#[utoipa::path( + put, + path = "/api/v1/contracts/{id}", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + request_body = UpdateContractRequest, + responses( + (status = 200, description = "Contract updated", body = ContractSummary), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract not found", body = ApiError), + (status = 409, description = "Version conflict", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Contracts" +)] +pub async fn update_contract( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, + Json(req): Json, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match repository::update_contract_for_owner(pool, id, auth.owner_id, req).await { + Ok(Some(contract)) => { + // If contract is completed, stop the supervisor task + if contract.status == "completed" { + if let Some(supervisor_task_id) = contract.supervisor_task_id { + // Get the supervisor task to find its daemon + if let Ok(Some(supervisor)) = repository::get_task_for_owner(pool, supervisor_task_id, auth.owner_id).await { + if let Some(daemon_id) = supervisor.daemon_id { + let state_clone = state.clone(); + tokio::spawn(async move { + // Gracefully interrupt the supervisor + let cmd = crate::server::state::DaemonCommand::InterruptTask { + task_id: supervisor_task_id, + graceful: true, + }; + if let Err(e) = state_clone.send_daemon_command(daemon_id, cmd).await { + tracing::warn!( + supervisor_task_id = %supervisor_task_id, + daemon_id = %daemon_id, + error = %e, + "Failed to stop supervisor task on contract completion" + ); + } else { + tracing::info!( + supervisor_task_id = %supervisor_task_id, + contract_id = %id, + "Stopped supervisor task on contract completion" + ); + } + }); + } + } + } + } + + // Get summary with counts + match repository::get_contract_summary_for_owner(pool, contract.id, auth.owner_id).await + { + Ok(Some(summary)) => Json(summary).into_response(), + _ => Json(ContractSummary { + id: contract.id, + name: contract.name, + description: contract.description, + phase: contract.phase, + status: contract.status, + file_count: 0, + task_count: 0, + repository_count: 0, + version: contract.version, + created_at: contract.created_at, + }) + .into_response(), + } + } + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(), + Err(RepositoryError::VersionConflict { expected, actual }) => { + tracing::info!( + "Version conflict on contract {}: expected {}, actual {}", + id, + expected, + actual + ); + ( + StatusCode::CONFLICT, + Json(serde_json::json!({ + "code": "VERSION_CONFLICT", + "message": format!( + "Contract was modified. Expected version {}, actual version {}", + expected, actual + ), + "expectedVersion": expected, + "actualVersion": actual, + })), + ) + .into_response() + } + Err(RepositoryError::Database(e)) => { + tracing::error!("Failed to update contract {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Delete a contract. +#[utoipa::path( + delete, + path = "/api/v1/contracts/{id}", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + responses( + (status = 204, description = "Contract deleted"), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Contracts" +)] +pub async fn delete_contract( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match repository::delete_contract_for_owner(pool, id, auth.owner_id).await { + Ok(true) => StatusCode::NO_CONTENT.into_response(), + Ok(false) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to delete contract {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +// ============================================================================= +// Repository Management +// ============================================================================= + +/// Add a remote repository to a contract. +#[utoipa::path( + post, + path = "/api/v1/contracts/{id}/repositories/remote", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + request_body = AddRemoteRepositoryRequest, + responses( + (status = 201, description = "Repository added", body = ContractRepository), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Contracts" +)] +pub async fn add_remote_repository( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, + Json(req): Json, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify contract exists and belongs to owner + match repository::get_contract_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + + match repository::add_remote_repository(pool, id, &req.name, &req.repository_url, req.is_primary) + .await + { + Ok(repo) => { + // Update supervisor task with repository info if this is a primary repo + update_supervisor_with_repo_if_needed(pool, id, auth.owner_id, &repo).await; + (StatusCode::CREATED, Json(repo)).into_response() + } + Err(e) => { + tracing::error!("Failed to add remote repository to contract {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Add a local repository to a contract. +#[utoipa::path( + post, + path = "/api/v1/contracts/{id}/repositories/local", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + request_body = AddLocalRepositoryRequest, + responses( + (status = 201, description = "Repository added", body = ContractRepository), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Contracts" +)] +pub async fn add_local_repository( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, + Json(req): Json, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify contract exists and belongs to owner + match repository::get_contract_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + + match repository::add_local_repository(pool, id, &req.name, &req.local_path, req.is_primary) + .await + { + Ok(repo) => { + // Update supervisor task with repository info if this is a primary repo + update_supervisor_with_repo_if_needed(pool, id, auth.owner_id, &repo).await; + (StatusCode::CREATED, Json(repo)).into_response() + } + Err(e) => { + tracing::error!("Failed to add local repository to contract {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Create a managed repository (daemon will create it). +#[utoipa::path( + post, + path = "/api/v1/contracts/{id}/repositories/managed", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + request_body = CreateManagedRepositoryRequest, + responses( + (status = 201, description = "Repository creation requested", body = ContractRepository), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Contracts" +)] +pub async fn create_managed_repository( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, + Json(req): Json, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify contract exists and belongs to owner + match repository::get_contract_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + + match repository::create_managed_repository(pool, id, &req.name, req.is_primary).await { + Ok(repo) => { + // For managed repos, the daemon will create the repo and we'll update later + // For now, just mark that this is a managed repo configuration + // The helper handles the case where repo has no URL yet + update_supervisor_with_repo_if_needed(pool, id, auth.owner_id, &repo).await; + (StatusCode::CREATED, Json(repo)).into_response() + } + Err(e) => { + tracing::error!( + "Failed to create managed repository for contract {}: {}", + id, + e + ); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Delete a repository from a contract. +#[utoipa::path( + delete, + path = "/api/v1/contracts/{id}/repositories/{repo_id}", + params( + ("id" = Uuid, Path, description = "Contract ID"), + ("repo_id" = Uuid, Path, description = "Repository ID") + ), + responses( + (status = 204, description = "Repository removed"), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract or repository not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Contracts" +)] +pub async fn delete_repository( + State(state): State, + Authenticated(auth): Authenticated, + Path((id, repo_id)): Path<(Uuid, Uuid)>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify contract exists and belongs to owner + match repository::get_contract_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + + match repository::delete_contract_repository(pool, repo_id, id).await { + Ok(true) => StatusCode::NO_CONTENT.into_response(), + Ok(false) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Repository not found")), + ) + .into_response(), + Err(e) => { + tracing::error!( + "Failed to delete repository {} from contract {}: {}", + repo_id, + id, + e + ); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Set a repository as primary for a contract. +#[utoipa::path( + put, + path = "/api/v1/contracts/{id}/repositories/{repo_id}/primary", + params( + ("id" = Uuid, Path, description = "Contract ID"), + ("repo_id" = Uuid, Path, description = "Repository ID") + ), + responses( + (status = 204, description = "Repository set as primary"), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract or repository not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Contracts" +)] +pub async fn set_repository_primary( + State(state): State, + Authenticated(auth): Authenticated, + Path((id, repo_id)): Path<(Uuid, Uuid)>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify contract exists and belongs to owner + match repository::get_contract_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + + match repository::set_repository_primary(pool, repo_id, id).await { + Ok(true) => StatusCode::NO_CONTENT.into_response(), + Ok(false) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Repository not found")), + ) + .into_response(), + Err(e) => { + tracing::error!( + "Failed to set repository {} as primary for contract {}: {}", + repo_id, + id, + e + ); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +// ============================================================================= +// Task Association +// ============================================================================= + +/// Add a task to a contract. +#[utoipa::path( + post, + path = "/api/v1/contracts/{id}/tasks/{task_id}", + params( + ("id" = Uuid, Path, description = "Contract ID"), + ("task_id" = Uuid, Path, description = "Task ID") + ), + responses( + (status = 204, description = "Task added to contract"), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract or task not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Contracts" +)] +pub async fn add_task_to_contract( + State(state): State, + Authenticated(auth): Authenticated, + Path((id, task_id)): Path<(Uuid, Uuid)>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify contract exists and belongs to owner + match repository::get_contract_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + + // Verify task exists and belongs to owner + match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task {}: {}", task_id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + + match repository::add_task_to_contract(pool, id, task_id, auth.owner_id).await { + Ok(true) => StatusCode::NO_CONTENT.into_response(), + Ok(false) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to add task {} to contract {}: {}", task_id, id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Remove a task from a contract. +#[utoipa::path( + delete, + path = "/api/v1/contracts/{id}/tasks/{task_id}", + params( + ("id" = Uuid, Path, description = "Contract ID"), + ("task_id" = Uuid, Path, description = "Task ID") + ), + responses( + (status = 204, description = "Task removed from contract"), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract or task not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Contracts" +)] +pub async fn remove_task_from_contract( + State(state): State, + Authenticated(auth): Authenticated, + Path((id, task_id)): Path<(Uuid, Uuid)>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify contract exists and belongs to owner + match repository::get_contract_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + + match repository::remove_task_from_contract(pool, id, task_id, auth.owner_id).await { + Ok(true) => StatusCode::NO_CONTENT.into_response(), + Ok(false) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found in this contract")), + ) + .into_response(), + Err(e) => { + tracing::error!( + "Failed to remove task {} from contract {}: {}", + task_id, + id, + e + ); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +// ============================================================================= +// Phase Management +// ============================================================================= + +/// Change contract phase. +#[utoipa::path( + post, + path = "/api/v1/contracts/{id}/phase", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + request_body = ChangePhaseRequest, + responses( + (status = 200, description = "Phase changed", body = ContractSummary), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Contracts" +)] +pub async fn change_phase( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, + Json(req): Json, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match repository::change_contract_phase_for_owner(pool, id, auth.owner_id, &req.phase).await { + Ok(Some(contract)) => { + // Notify supervisor of phase change + if let Some(supervisor_task_id) = contract.supervisor_task_id { + if let Ok(Some(supervisor)) = repository::get_task_for_owner(pool, supervisor_task_id, auth.owner_id).await { + let state_clone = state.clone(); + let contract_id = contract.id; + let new_phase = contract.phase.clone(); + tokio::spawn(async move { + state_clone.notify_supervisor_of_phase_change( + supervisor.id, + supervisor.daemon_id, + contract_id, + &new_phase, + ).await; + }); + } + } + + // Get summary with counts + match repository::get_contract_summary_for_owner(pool, contract.id, auth.owner_id).await + { + Ok(Some(summary)) => Json(summary).into_response(), + _ => Json(ContractSummary { + id: contract.id, + name: contract.name, + description: contract.description, + phase: contract.phase, + status: contract.status, + file_count: 0, + task_count: 0, + repository_count: 0, + version: contract.version, + created_at: contract.created_at, + }) + .into_response(), + } + } + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to change phase for contract {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +// ============================================================================= +// Events +// ============================================================================= + +/// Get contract event history. +#[utoipa::path( + get, + path = "/api/v1/contracts/{id}/events", + params( + ("id" = Uuid, Path, description = "Contract ID") + ), + responses( + (status = 200, description = "Event history", body = Vec), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Contracts" +)] +pub async fn get_events( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify contract exists and belongs to owner + match repository::get_contract_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get contract {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + + match repository::list_contract_events(pool, id).await { + Ok(events) => Json(events).into_response(), + Err(e) => { + tracing::error!("Failed to get events for contract {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} diff --git a/makima/src/server/handlers/files.rs b/makima/src/server/handlers/files.rs index 9634b73..05e871c 100644 --- a/makima/src/server/handlers/files.rs +++ b/makima/src/server/handlers/files.rs @@ -8,11 +8,11 @@ use axum::{ }; use uuid::Uuid; -use crate::db::models::{CreateFileRequest, FileListResponse, FileSummary, UpdateFileRequest}; +use crate::db::models::{CreateFileRequest, FileListResponse, UpdateFileRequest}; use crate::db::repository::{self, RepositoryError}; use crate::server::auth::Authenticated; use crate::server::messages::ApiError; -use crate::server::state::{FileUpdateNotification, SharedState}; +use crate::server::state::{DaemonCommand, FileUpdateNotification, SharedState}; /// List all files for the authenticated user's owner. #[utoipa::path( @@ -42,9 +42,8 @@ pub async fn list_files( .into_response(); }; - match repository::list_files_for_owner(pool, auth.owner_id).await { - Ok(files) => { - let summaries: Vec = files.into_iter().map(FileSummary::from).collect(); + match repository::list_file_summaries_for_owner(pool, auth.owner_id).await { + Ok(summaries) => { let total = summaries.len() as i64; Json(FileListResponse { files: summaries, @@ -114,7 +113,7 @@ pub async fn get_file( } } -/// Create a new file. +/// Create a new file. Files must belong to a contract. #[utoipa::path( post, path = "/api/v1/files", @@ -123,6 +122,7 @@ pub async fn get_file( (status = 201, description = "File created", body = crate::db::models::File), (status = 400, description = "Invalid request", body = ApiError), (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Contract not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), @@ -145,6 +145,26 @@ pub async fn create_file( .into_response(); }; + // Verify the contract exists and belongs to the owner + match repository::get_contract_for_owner(pool, req.contract_id, auth.owner_id).await { + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("CONTRACT_NOT_FOUND", "Contract not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to verify contract: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + Ok(Some(_)) => {} // Contract exists, proceed + } + match repository::create_file_for_owner(pool, auth.owner_id, req).await { Ok(file) => (StatusCode::CREATED, Json(file)).into_response(), Err(e) => { @@ -310,3 +330,190 @@ pub async fn delete_file( } } } + +/// Sync a file from its linked repository file. +/// +/// This endpoint triggers an async sync operation. The file must have a +/// repo_file_path set, and its contract must have a linked repository. +/// A connected daemon will read the file and update the file content. +#[utoipa::path( + post, + path = "/api/v1/files/{id}/sync-from-repo", + params( + ("id" = Uuid, Path, description = "File ID") + ), + responses( + (status = 202, description = "Sync operation started"), + (status = 400, description = "File not linked to repository", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "File not found", body = ApiError), + (status = 503, description = "No daemon available", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Files" +)] +pub async fn sync_file_from_repo( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get the file and verify it has a repo_file_path + let file = match repository::get_file_for_owner(pool, id, auth.owner_id).await { + Ok(Some(f)) => f, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "File not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get file {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Check if file has a repo path and contract_id + let contract_id = match file.contract_id { + Some(id) => id, + None => { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "NO_CONTRACT", + "File is not associated with a contract", + )), + ) + .into_response(); + } + }; + + let repo_file_path = match file.repo_file_path { + Some(ref path) if !path.is_empty() => path.clone(), + _ => { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "NOT_LINKED", + "File is not linked to a repository file", + )), + ) + .into_response(); + } + }; + + // Get contract repositories + let repositories = match repository::list_contract_repositories(pool, contract_id).await { + Ok(repos) => repos, + Err(e) => { + tracing::error!("Failed to get contract repositories: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Check if contract has repositories + if repositories.is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "NO_REPOSITORY", + "Contract has no linked repositories", + )), + ) + .into_response(); + } + + // Use the first repository's local path + let repo = &repositories[0]; + let repo_local_path = match &repo.local_path { + Some(path) if !path.is_empty() => path.clone(), + _ => { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "NO_LOCAL_PATH", + "Repository has no local path configured", + )), + ) + .into_response(); + } + }; + + // Find a connected daemon for this owner + let daemon_id = state + .daemon_connections + .iter() + .find(|entry| entry.value().owner_id == auth.owner_id) + .map(|entry| entry.value().id); + + let daemon_id = match daemon_id { + Some(id) => id, + None => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new( + "NO_DAEMON", + "No daemon connected. Start a daemon to sync files from repository.", + )), + ) + .into_response(); + } + }; + + // Send ReadRepoFile command to daemon + // Use the file ID as the request_id so we can match the response + let command = DaemonCommand::ReadRepoFile { + request_id: id, + contract_id, + file_path: repo_file_path, + repo_path: repo_local_path, + }; + + if let Err(e) = state.send_daemon_command(daemon_id, command).await { + tracing::error!("Failed to send ReadRepoFile command: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DAEMON_ERROR", e)), + ) + .into_response(); + } + + // Update status to indicate sync in progress + if let Err(e) = sqlx::query("UPDATE files SET repo_sync_status = 'syncing' WHERE id = $1") + .bind(id) + .execute(pool) + .await + { + tracing::warn!("Failed to update repo_sync_status: {}", e); + } + + // Return 202 Accepted - the sync happens asynchronously + ( + StatusCode::ACCEPTED, + Json(serde_json::json!({ + "message": "Sync operation started", + "fileId": id, + })), + ) + .into_response() +} diff --git a/makima/src/server/handlers/listen.rs b/makima/src/server/handlers/listen.rs index a26c208..524c48a 100644 --- a/makima/src/server/handlers/listen.rs +++ b/makima/src/server/handlers/listen.rs @@ -9,13 +9,13 @@ use tokio::sync::mpsc; use uuid::Uuid; use crate::audio::{resample_and_mixdown, TARGET_CHANNELS, TARGET_SAMPLE_RATE}; -use crate::db::models::{CreateFileRequest, TranscriptEntry, UpdateFileRequest}; +use crate::db::models::{TranscriptEntry, UpdateFileRequest}; use crate::db::repository; use crate::listen::{align_speakers, samples_per_chunk, DialogueSegment, TimestampMode}; use crate::server::messages::{ AudioEncoding, ClientMessage, ServerMessage, StartMessage, TranscriptMessage, }; -use crate::server::state::SharedState; +use crate::server::state::{MlModels, SharedState}; /// Chunk size in milliseconds for triggering transcription processing. const STREAM_CHUNK_MS: u32 = 5_000; @@ -77,6 +77,23 @@ async fn handle_socket(socket: WebSocket, state: SharedState) { } }); + // Lazy-load ML models on first Listen connection + let ml_models = match state.get_ml_models().await { + Ok(models) => models, + Err(e) => { + tracing::error!(session_id = %session_id, error = %e, "Failed to load ML models"); + let _ = response_tx + .send(ServerMessage::Error { + code: "MODEL_LOAD_ERROR".into(), + message: format!("Failed to load ML models: {}", e), + }) + .await; + drop(response_tx); + let _ = sender_task.await; + return; + } + }; + // Send ready message let _ = response_tx .send(ServerMessage::Ready { @@ -106,9 +123,13 @@ async fn handle_socket(socket: WebSocket, state: SharedState) { let mut transcript_entries: Vec = Vec::new(); let mut transcript_counter: u32 = 0; + // Auth state (set when Start message includes valid auth_token and contract_id) + let mut authenticated_owner_id: Option = None; + let mut target_contract_id: Option = None; + // Reset Sortformer state for new session { - let mut sortformer = state.sortformer.lock().await; + let mut sortformer = ml_models.sortformer.lock().await; sortformer.reset_state(); } @@ -132,8 +153,51 @@ async fn handle_socket(socket: WebSocket, state: SharedState) { sample_rate = start.sample_rate, channels = start.channels, encoding = ?start.encoding, + contract_id = ?start.contract_id, + has_auth = start.auth_token.is_some(), "Session started" ); + + // Validate auth and contract if provided + if let (Some(token), Some(contract_id_str)) = (&start.auth_token, &start.contract_id) { + // Parse contract ID + if let Ok(contract_id) = Uuid::parse_str(contract_id_str) { + // Validate JWT token + if let Some(ref verifier) = state.jwt_verifier { + match verifier.verify(token) { + Ok(claims) => { + authenticated_owner_id = Some(claims.sub); + target_contract_id = Some(contract_id); + tracing::info!( + session_id = %session_id, + owner_id = %claims.sub, + contract_id = %contract_id, + "Authenticated session - transcripts will be saved to contract" + ); + } + Err(e) => { + tracing::warn!( + session_id = %session_id, + error = %e, + "Invalid auth token - transcripts will not be saved" + ); + } + } + } else { + tracing::debug!( + session_id = %session_id, + "No JWT verifier configured - transcripts will not be saved" + ); + } + } else { + tracing::warn!( + session_id = %session_id, + contract_id = contract_id_str, + "Invalid contract ID format" + ); + } + } + audio_format = Some(start); audio_buffer.clear(); eou_buffer.clear(); @@ -143,9 +207,12 @@ async fn handle_socket(socket: WebSocket, state: SharedState) { last_processed_len = 0; audio_offset = 0.0; finalized_segments.clear(); + file_id = None; + authenticated_owner_id = authenticated_owner_id; // Keep from above + target_contract_id = target_contract_id; // Keep from above // Reset models for new session - let mut sortformer = state.sortformer.lock().await; + let mut sortformer = ml_models.sortformer.lock().await; sortformer.reset_state(); } Ok(ClientMessage::Stop(stop)) => { @@ -165,7 +232,7 @@ async fn handle_socket(socket: WebSocket, state: SharedState) { ); // Process remaining audio with sliding window - match process_audio_window(&audio_buffer, audio_offset, &state).await { + match process_audio_window(&audio_buffer, audio_offset, ml_models).await { Ok(segments) => { tracing::debug!( session_id = %session_id, @@ -291,7 +358,7 @@ async fn handle_socket(socket: WebSocket, state: SharedState) { while eou_buffer.len() >= EOU_CHUNK_SIZE { let chunk: Vec = eou_buffer.drain(..EOU_CHUNK_SIZE).collect(); - let mut eou = state.parakeet_eou.lock().await; + let mut eou = ml_models.parakeet_eou.lock().await; if let Ok(text) = eou.transcribe(&chunk, false) { // Detect utterance boundary (sentence-ending punctuation) if !text.is_empty() && text != last_eou_text { @@ -325,7 +392,7 @@ async fn handle_socket(socket: WebSocket, state: SharedState) { "Processing audio with sliding window" ); - match process_audio_window(&audio_buffer, audio_offset, &state).await { + match process_audio_window(&audio_buffer, audio_offset, ml_models).await { Ok(segments) => { tracing::debug!( session_id = %session_id, @@ -339,21 +406,29 @@ async fn handle_socket(socket: WebSocket, state: SharedState) { let adjusted_start = seg.start + audio_offset; let adjusted_end = seg.end + audio_offset; if adjusted_end > last_sent_end_time { - // Create file on first transcript if database is available + // Create file on first transcript if authenticated with contract if file_id.is_none() { - if let Some(ref pool) = state.db_pool { - match repository::create_file(pool, CreateFileRequest { + if let (Some(owner_id), Some(contract_id), Some(pool)) = + (authenticated_owner_id, target_contract_id, &state.db_pool) + { + let create_req = crate::db::models::CreateFileRequest { + contract_id, name: None, // Auto-generated - description: None, + description: Some("Live transcription".to_string()), transcript: vec![], location: None, - }).await { + body: vec![], + repo_file_path: None, + contract_phase: None, // Will be looked up from contract + }; + match repository::create_file_for_owner(pool, owner_id, create_req).await { Ok(file) => { file_id = Some(file.id); tracing::info!( session_id = %session_id, file_id = %file.id, - "Created file for session" + contract_id = %contract_id, + "Created file for session in contract" ); } Err(e) => { @@ -468,6 +543,7 @@ async fn handle_socket(socket: WebSocket, state: SharedState) { summary: None, body: None, version: None, // Internal update, skip version check + repo_file_path: None, }).await { Ok(_) => { tracing::info!( @@ -649,7 +725,7 @@ fn text_similarity(a: &str, b: &str) -> f32 { async fn process_audio_window( samples: &[f32], _audio_offset: f32, - state: &SharedState, + ml_models: &MlModels, ) -> Result, Box> { // Apply sliding window - only process the last 30 seconds let window_start = samples.len().saturating_sub(MAX_WINDOW_SAMPLES); @@ -663,8 +739,8 @@ async fn process_audio_window( ); // Acquire model locks and run inference - let mut parakeet = state.parakeet.lock().await; - let mut sortformer = state.sortformer.lock().await; + let mut parakeet = ml_models.parakeet.lock().await; + let mut sortformer = ml_models.sortformer.lock().await; // Run streaming diarization (maintains speaker cache across calls) let diarization_segments = diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index 760740c..2d90a04 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -214,7 +214,27 @@ pub async fn create_task( }; match repository::create_task_for_owner(pool, auth.owner_id, req).await { - Ok(task) => (StatusCode::CREATED, Json(task)).into_response(), + Ok(task) => { + // Notify supervisor of new task creation if task belongs to a contract + if let Some(contract_id) = task.contract_id { + if !task.is_supervisor { + let pool = pool.clone(); + let state_clone = state.clone(); + let task_clone = task.clone(); + tokio::spawn(async move { + if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await { + state_clone.notify_supervisor_of_task_created( + supervisor.id, + supervisor.daemon_id, + task_clone.id, + &task_clone.name, + ).await; + } + }); + } + } + (StatusCode::CREATED, Json(task)).into_response() + } Err(e) => { tracing::error!("Failed to create task: {}", e); ( @@ -262,6 +282,26 @@ pub async fn update_task( .into_response(); }; + // Check if trying to set a supervisor task to a terminal status + if let Some(ref new_status) = req.status { + let terminal_statuses = ["done", "failed", "merged"]; + if terminal_statuses.contains(&new_status.as_str()) { + // Get the task to check if it's a supervisor + if let Ok(Some(task)) = repository::get_task_for_owner(pool, id, auth.owner_id).await { + if task.is_supervisor { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "SUPERVISOR_CANNOT_COMPLETE", + "Supervisor tasks cannot be marked as done, failed, or merged. They run for the lifetime of the contract.", + )), + ) + .into_response(); + } + } + } + } + // Track which fields are being updated for the notification let mut updated_fields = Vec::new(); if req.name.is_some() { @@ -288,6 +328,8 @@ pub async fn update_task( match repository::update_task_for_owner(pool, id, auth.owner_id, req).await { Ok(Some(task)) => { + let updated_fields_clone = updated_fields.clone(); + // Broadcast task update notification state.broadcast_task_update(TaskUpdateNotification { task_id: task.id, @@ -297,6 +339,28 @@ pub async fn update_task( updated_fields, updated_by: "user".to_string(), }); + + // Notify supervisor of status change if task belongs to a contract + if let Some(contract_id) = task.contract_id { + if !task.is_supervisor && updated_fields_clone.contains(&"status".to_string()) { + let pool = pool.clone(); + let state_clone = state.clone(); + let task_clone = task.clone(); + tokio::spawn(async move { + if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await { + state_clone.notify_supervisor_of_task_update( + supervisor.id, + supervisor.daemon_id, + task_clone.id, + &task_clone.name, + &task_clone.status, + &updated_fields_clone, + ).await; + } + }); + } + } + Json(task).into_response() } Ok(None) => ( @@ -556,7 +620,8 @@ pub async fn start_task( task_depth = task.depth, subtask_count = subtask_count, is_orchestrator = is_orchestrator, - "Starting task with orchestrator determination" + is_supervisor = task.is_supervisor, + "Starting task with orchestrator/supervisor determination" ); // IMPORTANT: Update database FIRST to assign daemon_id before sending command @@ -602,8 +667,18 @@ pub async fn start_task( completion_action: task.completion_action.clone(), continue_from_task_id: task.continue_from_task_id, copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), + contract_id: task.contract_id, + is_supervisor: task.is_supervisor, }; + tracing::info!( + task_id = %id, + is_supervisor = task.is_supervisor, + is_orchestrator = is_orchestrator, + daemon_id = %target_daemon_id, + "Sending SpawnTask command to daemon" + ); + if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { tracing::error!("Failed to send SpawnTask command: {}", e); // Rollback: clear daemon_id and reset status since command failed @@ -884,8 +959,11 @@ pub async fn send_message( } }; - // Check if task is running - if task.status != "running" { + // Check if task is running (except for AUTH_CODE messages and supervisor tasks) + // Supervisor tasks can receive messages even when not running - daemon will respawn Claude + let is_auth_code = req.message.starts_with("AUTH_CODE:"); + let is_supervisor = task.is_supervisor; + if task.status != "running" && !is_auth_code && !is_supervisor { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( @@ -900,8 +978,27 @@ pub async fn send_message( } // Find the daemon running this task + // For supervisors, if no daemon is assigned, find any available daemon for this owner let target_daemon_id = if let Some(daemon_id) = task.daemon_id { daemon_id + } else if is_supervisor { + // Supervisor without daemon - find one + match state.daemon_connections + .iter() + .find(|d| d.value().owner_id == auth.owner_id) + { + Some(entry) => entry.value().id, + None => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new( + "NO_DAEMON", + "No daemon available. Please start a daemon.", + )), + ) + .into_response(); + } + } } else { return ( StatusCode::SERVICE_UNAVAILABLE, diff --git a/makima/src/server/handlers/mesh_chat.rs b/makima/src/server/handlers/mesh_chat.rs index 5d6d2ee..3f650bc 100644 --- a/makima/src/server/handlers/mesh_chat.rs +++ b/makima/src/server/handlers/mesh_chat.rs @@ -930,6 +930,46 @@ async fn handle_mesh_request( merge_mode, priority, } => { + // Subtasks inherit contract_id from parent task + let contract_id = if let Some(parent_id) = parent_task_id { + match repository::get_task(pool, parent_id).await { + Ok(Some(parent_task)) => { + match parent_task.contract_id { + Some(cid) => cid, + None => { + return MeshRequestResult { + success: false, + message: "Parent task has no contract_id".to_string(), + data: None, + }; + } + } + } + Ok(None) => { + return MeshRequestResult { + success: false, + message: format!("Parent task {} not found", parent_id), + data: None, + }; + } + Err(e) => { + return MeshRequestResult { + success: false, + message: format!("Failed to look up parent task: {}", e), + data: None, + }; + } + } + } else { + // Root tasks created via LLM chat require a contract_id + // TODO: Add contract_id to create_task tool definition + return MeshRequestResult { + success: false, + message: "Cannot create root task without contract_id. Use parent_task_id to create subtasks.".to_string(), + data: None, + }; + }; + // Check if repository_url matches a daemon's working directory (for this owner) let is_daemon_working_dir = repository_url.as_ref().map(|url| { state.daemon_connections.iter().any(|entry| { @@ -962,6 +1002,7 @@ async fn handle_mesh_request( }; let create_req = CreateTaskRequest { + contract_id, name: name.clone(), description: None, plan, @@ -975,6 +1016,8 @@ async fn handle_mesh_request( completion_action, continue_from_task_id: None, copy_files: None, + is_supervisor: false, + checkpoint_sha: None, }; match repository::create_task_for_owner(pool, owner_id, create_req).await { @@ -1074,6 +1117,8 @@ async fn handle_mesh_request( completion_action: task.completion_action.clone(), continue_from_task_id: task.continue_from_task_id, copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), + contract_id: task.contract_id, + is_supervisor: task.is_supervisor, }; match state.send_daemon_command(target_daemon_id, command).await { @@ -1610,6 +1655,9 @@ async fn handle_mesh_request( crate::db::models::BodyElement::Image { src, alt, caption } => { json!({ "type": "image", "src": src, "alt": alt, "caption": caption }) } + crate::db::models::BodyElement::Markdown { content } => { + json!({ "type": "markdown", "content": content }) + } } }) .collect(); @@ -1640,6 +1688,9 @@ async fn handle_mesh_request( }).collect(); Some(list_text.join("\n")) } + crate::db::models::BodyElement::Markdown { content } => { + Some(content.clone()) + } _ => None, } }) @@ -1976,6 +2027,79 @@ async fn handle_mesh_request( }, } } + + // Supervisor-only tools - these should be handled via the supervisor.sh script, + // not through the mesh chat. Return an informative error. + MeshToolRequest::GetAllContractTasks { contract_id } => { + MeshRequestResult { + success: false, + message: format!( + "get_all_contract_tasks is a supervisor-only tool. Use supervisor.sh to access this functionality. Contract: {}", + contract_id + ), + data: None, + } + } + MeshToolRequest::WaitForTaskCompletion { task_id, timeout_seconds } => { + MeshRequestResult { + success: false, + message: format!( + "wait_for_task_completion is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}, Timeout: {}s", + task_id, timeout_seconds + ), + data: None, + } + } + MeshToolRequest::ReadTaskWorktree { task_id, file_path } => { + MeshRequestResult { + success: false, + message: format!( + "read_task_worktree is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}, Path: {}", + task_id, file_path + ), + data: None, + } + } + MeshToolRequest::SpawnTask { name, plan, parent_task_id, checkpoint_sha, .. } => { + MeshRequestResult { + success: false, + message: format!( + "spawn_task is a supervisor-only tool. Only the contract supervisor can spawn new tasks. Task name: {}", + name + ), + data: None, + } + } + MeshToolRequest::CreateCheckpoint { task_id, message } => { + MeshRequestResult { + success: false, + message: format!( + "create_checkpoint is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}, Message: {}", + task_id, message + ), + data: None, + } + } + MeshToolRequest::ListTaskCheckpoints { task_id } => { + MeshRequestResult { + success: false, + message: format!( + "list_task_checkpoints is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}", + task_id + ), + data: None, + } + } + MeshToolRequest::GetTaskTree { task_id } => { + MeshRequestResult { + success: false, + message: format!( + "get_task_tree is a supervisor-only tool. Use supervisor.sh to access this functionality. Task: {}", + task_id + ), + data: None, + } + } } } diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 644d0bc..178e5e1 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -301,6 +301,17 @@ pub enum DaemonMessage { #[serde(rename = "taskId")] task_id: Uuid, }, + /// Authentication required - OAuth token expired, provides login URL + AuthenticationRequired { + /// Task ID that triggered the auth error (if any) + #[serde(rename = "taskId")] + task_id: Option, + /// OAuth login URL for remote authentication + #[serde(rename = "loginUrl")] + login_url: String, + /// Hostname of the daemon requiring auth + hostname: Option, + }, /// Response to RetryCompletionAction command CompletionActionResult { #[serde(rename = "taskId")] @@ -343,6 +354,21 @@ pub enum DaemonMessage { #[serde(rename = "targetDir")] target_dir: String, }, + /// Response to ReadRepoFile command + RepoFileContent { + /// Request ID from the original command + #[serde(rename = "requestId")] + request_id: Uuid, + /// Path to the file that was read + #[serde(rename = "filePath")] + file_path: String, + /// File content (None if error occurred) + content: Option, + /// Whether the operation succeeded + success: bool, + /// Error message if operation failed + error: Option, + }, } /// Validated daemon authentication result. @@ -509,6 +535,31 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re "Daemon registered" ); + // Register daemon in database + if let Some(ref pool) = state.db_pool { + match repository::register_daemon( + pool, + owner_id, + &connection_id, + Some(&hostname), + Some(&machine_id), + max_concurrent_tasks as i32, + ).await { + Ok(db_daemon) => { + tracing::debug!( + daemon_id = %db_daemon.id, + "Daemon registered in database" + ); + } + Err(e) => { + tracing::error!( + error = %e, + "Failed to register daemon in database" + ); + } + } + } + // Register daemon in state with owner_id state.register_daemon( connection_id.clone(), @@ -718,6 +769,24 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re ], updated_by: "daemon".into(), }); + + // Notify supervisor if this task belongs to a contract + if let Some(contract_id) = updated_task.contract_id { + // Don't notify for supervisor tasks (they don't report to themselves) + if !updated_task.is_supervisor { + if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await { + state.notify_supervisor_of_task_completion( + supervisor.id, + supervisor.daemon_id, + updated_task.id, + &updated_task.name, + &updated_task.status, + updated_task.progress_summary.as_deref(), + updated_task.error_message.as_deref(), + ).await; + } + } + } } Ok(None) => { tracing::warn!( @@ -763,6 +832,50 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re ); state.revoke_tool_key(task_id); } + Ok(DaemonMessage::AuthenticationRequired { task_id, login_url, hostname }) => { + tracing::warn!( + task_id = ?task_id, + login_url = %login_url, + hostname = ?hostname, + "Daemon requires authentication - OAuth token expired" + ); + + // Broadcast as task output with auth_required type so UI can display the login link + let content = format!( + "🔐 Authentication required on daemon{}. Click to login: {}", + hostname.as_ref().map(|h| format!(" ({})", h)).unwrap_or_default(), + login_url + ); + + // Broadcast to task subscribers if we have a task_id + if let Some(tid) = task_id { + tracing::info!(task_id = %tid, "Broadcasting auth_required to task subscribers"); + state.broadcast_task_output(TaskOutputNotification { + task_id: tid, + owner_id: Some(owner_id), + message_type: "auth_required".to_string(), + content: "Authentication required".to_string(), // Constant for dedup + tool_name: None, + tool_input: Some(serde_json::json!({ + "loginUrl": login_url, + "hostname": hostname, + "taskId": tid.to_string(), + })), + is_error: Some(true), + cost_usd: None, + duration_ms: None, + is_partial: false, + }); + } else { + tracing::warn!("No task_id for auth_required - cannot broadcast to specific task"); + } + + // Also log the full URL for manual use + tracing::info!( + login_url = %login_url, + "OAuth login URL available - user should open this in browser" + ); + } Ok(DaemonMessage::DaemonDirectories { working_directory, home_directory, worktrees_directory }) => { tracing::info!( daemon_id = %daemon_uuid, @@ -874,6 +987,92 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re is_partial: false, }); } + Ok(DaemonMessage::RepoFileContent { + request_id, + file_path, + content, + success, + error, + }) => { + tracing::info!( + request_id = %request_id, + file_path = %file_path, + success = success, + content_len = content.as_ref().map(|c| c.len()), + error = ?error, + "Repo file content received from daemon" + ); + + // The request_id is the file_id we want to update + if success { + if let (Some(pool), Some(content)) = (&state.db_pool, content) { + // Convert markdown to body elements + let body = crate::llm::markdown_to_body(&content); + + // Update file in database + let update_req = crate::db::models::UpdateFileRequest { + name: None, + description: None, + transcript: None, + summary: None, + body: Some(body), + version: None, + repo_file_path: None, + }; + + match repository::update_file_for_owner(pool, request_id, owner_id, update_req).await { + Ok(Some(_file)) => { + tracing::info!( + file_id = %request_id, + "File synced from repository successfully" + ); + + // Update repo_sync_status to 'synced' and set repo_synced_at + if let Err(e) = sqlx::query( + "UPDATE files SET repo_sync_status = 'synced', repo_synced_at = NOW() WHERE id = $1" + ) + .bind(request_id) + .execute(pool) + .await + { + tracing::warn!( + file_id = %request_id, + error = %e, + "Failed to update repo sync status" + ); + } + + // Broadcast file update notification + state.broadcast_file_update(crate::server::state::FileUpdateNotification { + file_id: request_id, + version: 0, // Will be updated by next fetch + updated_fields: vec!["body".to_string(), "repo_sync_status".to_string()], + updated_by: "daemon".to_string(), + }); + } + Ok(None) => { + tracing::warn!( + file_id = %request_id, + "File not found when syncing from repository" + ); + } + Err(e) => { + tracing::error!( + file_id = %request_id, + error = %e, + "Failed to update file from repository content" + ); + } + } + } + } else { + tracing::warn!( + file_id = %request_id, + error = ?error, + "Daemon failed to read repo file" + ); + } + } Err(e) => { tracing::warn!("Failed to parse daemon message: {}", e); } @@ -913,10 +1112,20 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re // Cleanup on disconnect state.unregister_daemon(&connection_id); - // Clear daemon_id from any tasks that were running on this daemon + // Delete daemon from database and clear tasks if let Some(ref pool) = state.db_pool { let pool = pool.clone(); + let conn_id = connection_id.clone(); tokio::spawn(async move { + // Delete daemon from database + if let Err(e) = repository::delete_daemon_by_connection(&pool, &conn_id).await { + tracing::error!( + connection_id = %conn_id, + error = %e, + "Failed to delete daemon from database" + ); + } + // Find tasks assigned to this daemon that are still active if let Err(e) = clear_daemon_from_tasks(&pool, daemon_uuid).await { tracing::error!( diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs new file mode 100644 index 0000000..ac59130 --- /dev/null +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -0,0 +1,1153 @@ +//! HTTP handlers for supervisor-specific mesh operations. +//! +//! These endpoints are used by supervisor tasks (via supervisor.sh) to orchestrate +//! contract work: spawning tasks, waiting for completion, reading worktree files, etc. + +use axum::{ + extract::{Path, State}, + http::{HeaderMap, StatusCode}, + response::IntoResponse, + Json, +}; +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; +use uuid::Uuid; + +use crate::db::models::{CreateTaskRequest, Task, TaskSummary}; +use crate::db::repository; +use crate::server::handlers::mesh::{extract_auth, AuthSource}; +use crate::server::messages::ApiError; +use crate::server::state::{DaemonCommand, SharedState}; + +// ============================================================================= +// Request/Response Types +// ============================================================================= + +/// Request to spawn a new task from supervisor. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SpawnTaskRequest { + pub name: String, + pub plan: String, + pub contract_id: Uuid, + pub parent_task_id: Option, + pub checkpoint_sha: Option, + /// Repository URL for the task (supervisor should provide this) + pub repository_url: Option, +} + +/// Request to wait for task completion. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct WaitForTaskRequest { + #[serde(default = "default_timeout")] + pub timeout_seconds: i32, +} + +fn default_timeout() -> i32 { + 300 +} + +/// Request to read a file from task worktree. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ReadWorktreeFileRequest { + pub file_path: String, +} + +/// Request to create a checkpoint. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreateCheckpointRequest { + pub message: String, +} + +/// Response for task tree. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct TaskTreeResponse { + pub tasks: Vec, + pub supervisor_task_id: Option, + pub total_count: usize, +} + +/// Response for wait operation. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct WaitResponse { + pub task_id: Uuid, + pub status: String, + pub completed: bool, + pub output_summary: Option, +} + +/// Response for read file operation. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ReadFileResponse { + pub task_id: Uuid, + pub file_path: String, + pub content: String, + pub exists: bool, +} + +/// Response for checkpoint operations. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CheckpointResponse { + pub task_id: Uuid, + pub checkpoint_number: i32, + pub commit_sha: String, + pub message: String, +} + +/// Task checkpoint info. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct TaskCheckpoint { + pub id: Uuid, + pub task_id: Uuid, + pub checkpoint_number: i32, + pub commit_sha: String, + pub branch_name: String, + pub message: String, + pub files_changed: Option, + pub lines_added: i32, + pub lines_removed: i32, + pub created_at: chrono::DateTime, +} + +/// Response for list checkpoints. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CheckpointListResponse { + pub task_id: Uuid, + pub checkpoints: Vec, +} + +// ============================================================================= +// Helper Functions +// ============================================================================= + +/// Verify the request comes from a supervisor task and extract ownership info. +async fn verify_supervisor_auth( + state: &SharedState, + headers: &HeaderMap, + contract_id: Option, +) -> Result<(Uuid, Uuid), (StatusCode, Json)> { + let auth = extract_auth(state, headers); + + let task_id = match auth { + AuthSource::ToolKey(task_id) => task_id, + _ => { + return Err(( + StatusCode::UNAUTHORIZED, + Json(ApiError::new("UNAUTHORIZED", "Supervisor endpoints require tool key auth")), + )); + } + }; + + // Get the task to verify it's a supervisor and get owner_id + let pool = state.db_pool.as_ref().ok_or_else(|| { + ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + })?; + + let task = repository::get_task(pool, task_id) + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to get supervisor task"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to verify supervisor")), + ) + })? + .ok_or_else(|| { + ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + })?; + + // Verify task is a supervisor + if !task.is_supervisor { + return Err(( + StatusCode::FORBIDDEN, + Json(ApiError::new("NOT_SUPERVISOR", "Only supervisor tasks can use these endpoints")), + )); + } + + // If contract_id provided, verify the supervisor belongs to that contract + if let Some(cid) = contract_id { + if task.contract_id != Some(cid) { + return Err(( + StatusCode::FORBIDDEN, + Json(ApiError::new("CONTRACT_MISMATCH", "Supervisor does not belong to this contract")), + )); + } + } + + Ok((task_id, task.owner_id)) +} + +// ============================================================================= +// Contract Task Handlers +// ============================================================================= + +/// List all tasks in a contract's tree. +#[utoipa::path( + get, + path = "/api/v1/mesh/supervisor/contracts/{contract_id}/tasks", + params( + ("contract_id" = Uuid, Path, description = "Contract ID") + ), + responses( + (status = 200, description = "List of tasks in contract", body = TaskTreeResponse), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - not a supervisor"), + (status = 500, description = "Internal server error"), + ), + tag = "Mesh Supervisor" +)] +pub async fn list_contract_tasks( + State(state): State, + Path(contract_id): Path, + headers: HeaderMap, +) -> impl IntoResponse { + let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, Some(contract_id)).await { + Ok(ids) => ids, + Err(e) => return e.into_response(), + }; + + let pool = state.db_pool.as_ref().unwrap(); + + // Get all tasks for this contract + match repository::list_tasks_by_contract(pool, contract_id, owner_id).await { + Ok(tasks) => { + let supervisor_task_id = tasks.iter().find(|t| t.is_supervisor).map(|t| t.id); + let summaries: Vec = tasks.into_iter().map(TaskSummary::from).collect(); + let total_count = summaries.len(); + + ( + StatusCode::OK, + Json(TaskTreeResponse { + tasks: summaries, + supervisor_task_id, + total_count, + }), + ).into_response() + } + Err(e) => { + tracing::error!(error = %e, "Failed to list contract tasks"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to list tasks")), + ).into_response() + } + } +} + +/// Get full task tree structure for a contract. +#[utoipa::path( + get, + path = "/api/v1/mesh/supervisor/contracts/{contract_id}/tree", + params( + ("contract_id" = Uuid, Path, description = "Contract ID") + ), + responses( + (status = 200, description = "Task tree structure", body = TaskTreeResponse), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - not a supervisor"), + (status = 500, description = "Internal server error"), + ), + tag = "Mesh Supervisor" +)] +pub async fn get_contract_tree( + State(state): State, + Path(contract_id): Path, + headers: HeaderMap, +) -> impl IntoResponse { + // Same as list_contract_tasks for now - can add tree structure later + list_contract_tasks(State(state), Path(contract_id), headers).await +} + +// ============================================================================= +// Task Spawn Handler +// ============================================================================= + +/// Spawn a new task (supervisor only). +#[utoipa::path( + post, + path = "/api/v1/mesh/supervisor/tasks", + request_body = SpawnTaskRequest, + responses( + (status = 201, description = "Task created", body = Task), + (status = 400, description = "Invalid request"), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - not a supervisor"), + (status = 500, description = "Internal server error"), + ), + tag = "Mesh Supervisor" +)] +pub async fn spawn_task( + State(state): State, + headers: HeaderMap, + Json(request): Json, +) -> impl IntoResponse { + let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, Some(request.contract_id)).await { + Ok(ids) => ids, + Err(e) => return e.into_response(), + }; + + let pool = state.db_pool.as_ref().unwrap(); + + // Verify contract exists + let _contract = match repository::get_contract_for_owner(pool, request.contract_id, owner_id).await { + Ok(Some(c)) => c, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Contract not found")), + ).into_response(); + } + Err(e) => { + tracing::error!(error = %e, "Failed to get contract"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to get contract")), + ).into_response(); + } + }; + + // Get repository URL from the contract's primary repository + let repo_url = match repository::list_contract_repositories(pool, request.contract_id).await { + Ok(repos) => { + // Prefer primary repo, fallback to first repo + repos.iter() + .find(|r| r.is_primary) + .or(repos.first()) + .and_then(|r| r.repository_url.clone()) + } + Err(e) => { + tracing::warn!(error = %e, "Failed to get contract repositories, continuing without repo URL"); + None + } + }; + + // Supervisor can override with explicit repository_url + let repo_url = request.repository_url.clone().or(repo_url); + + // Create task request + let create_req = CreateTaskRequest { + name: request.name.clone(), + description: None, + plan: request.plan.clone(), + repository_url: repo_url.clone(), + contract_id: request.contract_id, + parent_task_id: request.parent_task_id, + is_supervisor: false, + checkpoint_sha: request.checkpoint_sha.clone(), + merge_mode: Some("manual".to_string()), + priority: 0, + base_branch: None, + target_branch: None, + target_repo_path: None, + completion_action: None, + continue_from_task_id: None, + copy_files: None, + }; + + // Create task in DB + let task = match repository::create_task_for_owner(pool, owner_id, create_req).await { + Ok(t) => t, + Err(e) => { + tracing::error!(error = %e, "Failed to create task"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to create task")), + ).into_response(); + } + }; + + tracing::info!( + supervisor_id = %supervisor_id, + task_id = %task.id, + task_name = %task.name, + "Supervisor spawned new task" + ); + + // Start task on a daemon + // Find a daemon that belongs to this owner + for entry in state.daemon_connections.iter() { + let daemon = entry.value(); + if daemon.owner_id == owner_id { + // Send spawn command to first available daemon + let cmd = DaemonCommand::SpawnTask { + task_id: task.id, + task_name: task.name.clone(), + plan: task.plan.clone(), + repo_url: repo_url.clone(), + base_branch: task.base_branch.clone(), + target_branch: task.target_branch.clone(), + parent_task_id: task.parent_task_id, + depth: task.depth, + is_orchestrator: false, + target_repo_path: task.target_repo_path.clone(), + completion_action: task.completion_action.clone(), + continue_from_task_id: task.continue_from_task_id, + copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), + contract_id: task.contract_id, + is_supervisor: false, + }; + + if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { + tracing::warn!(error = %e, daemon_id = %daemon.id, "Failed to send spawn command"); + } else { + tracing::info!(task_id = %task.id, daemon_id = %daemon.id, repo_url = ?repo_url, "Task spawn command sent"); + } + break; + } + } + + (StatusCode::CREATED, Json(task)).into_response() +} + +// ============================================================================= +// Wait for Task Handler +// ============================================================================= + +/// Wait for a task to complete. +#[utoipa::path( + post, + path = "/api/v1/mesh/supervisor/tasks/{task_id}/wait", + params( + ("task_id" = Uuid, Path, description = "Task ID to wait for") + ), + request_body = WaitForTaskRequest, + responses( + (status = 200, description = "Task completed or timed out", body = WaitResponse), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - not a supervisor"), + (status = 404, description = "Task not found"), + (status = 500, description = "Internal server error"), + ), + tag = "Mesh Supervisor" +)] +pub async fn wait_for_task( + State(state): State, + Path(task_id): Path, + headers: HeaderMap, + Json(request): Json, +) -> impl IntoResponse { + let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { + Ok(ids) => ids, + Err(e) => return e.into_response(), + }; + + let pool = state.db_pool.as_ref().unwrap(); + + // Verify task belongs to same owner + let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ).into_response(); + } + Err(e) => { + tracing::error!(error = %e, "Failed to get task"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to get task")), + ).into_response(); + } + }; + + // Check if already done + if task.status == "done" || task.status == "failed" || task.status == "merged" { + return ( + StatusCode::OK, + Json(WaitResponse { + task_id, + status: task.status, + completed: true, + output_summary: None, + }), + ).into_response(); + } + + // Subscribe to task completions + let mut rx = state.task_completions.subscribe(); + let timeout = tokio::time::Duration::from_secs(request.timeout_seconds as u64); + + // Wait for completion or timeout + let result = tokio::time::timeout(timeout, async { + loop { + match rx.recv().await { + Ok(notification) => { + if notification.task_id == task_id { + return Some(notification); + } + } + Err(_) => { + // Channel closed or lagged - check DB directly + if let Ok(Some(t)) = repository::get_task(pool, task_id).await { + if t.status == "done" || t.status == "failed" || t.status == "merged" { + return Some(crate::server::state::TaskCompletionNotification { + task_id: t.id, + owner_id: Some(t.owner_id), + contract_id: t.contract_id, + parent_task_id: t.parent_task_id, + status: t.status, + output_summary: None, + worktree_path: None, + error_message: t.error_message, + }); + } + } + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + } + } + }).await; + + match result { + Ok(Some(notification)) => { + ( + StatusCode::OK, + Json(WaitResponse { + task_id, + status: notification.status, + completed: true, + output_summary: notification.output_summary, + }), + ).into_response() + } + Ok(None) | Err(_) => { + // Timeout - check final status + let final_status = repository::get_task(pool, task_id) + .await + .ok() + .flatten() + .map(|t| t.status) + .unwrap_or_else(|| "unknown".to_string()); + + ( + StatusCode::OK, + Json(WaitResponse { + task_id, + status: final_status.clone(), + completed: final_status == "done" || final_status == "failed" || final_status == "merged", + output_summary: None, + }), + ).into_response() + } + } +} + +// ============================================================================= +// Read Worktree File Handler +// ============================================================================= + +/// Read a file from a task's worktree. +#[utoipa::path( + post, + path = "/api/v1/mesh/supervisor/tasks/{task_id}/read-file", + params( + ("task_id" = Uuid, Path, description = "Task ID") + ), + request_body = ReadWorktreeFileRequest, + responses( + (status = 200, description = "File content", body = ReadFileResponse), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - not a supervisor"), + (status = 404, description = "Task not found"), + (status = 500, description = "Internal server error"), + ), + tag = "Mesh Supervisor" +)] +pub async fn read_worktree_file( + State(state): State, + Path(task_id): Path, + headers: HeaderMap, + Json(request): Json, +) -> impl IntoResponse { + let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { + Ok(ids) => ids, + Err(e) => return e.into_response(), + }; + + let pool = state.db_pool.as_ref().unwrap(); + + // Get task to verify ownership + let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ).into_response(); + } + Err(e) => { + tracing::error!(error = %e, "Failed to get task"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to get task")), + ).into_response(); + } + }; + + // TODO: Implement file reading via worktree path + // For now, return not implemented - supervisor should use local file access via worktree + let _ = (task, request); + + ( + StatusCode::NOT_IMPLEMENTED, + Json(ApiError::new( + "NOT_IMPLEMENTED", + "Worktree file reading via API not yet implemented. Use local filesystem access via worktree path.", + )), + ).into_response() +} + +// ============================================================================= +// Checkpoint Handlers +// ============================================================================= + +/// Create a git checkpoint for a task. +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{task_id}/checkpoint", + params( + ("task_id" = Uuid, Path, description = "Task ID") + ), + request_body = CreateCheckpointRequest, + responses( + (status = 201, description = "Checkpoint created", body = CheckpointResponse), + (status = 401, description = "Unauthorized"), + (status = 404, description = "Task not found"), + (status = 500, description = "Internal server error"), + ), + tag = "Mesh Supervisor" +)] +pub async fn create_checkpoint( + State(state): State, + Path(task_id): Path, + headers: HeaderMap, + Json(request): Json, +) -> impl IntoResponse { + let auth = extract_auth(&state, &headers); + + let task_id_from_auth = match auth { + AuthSource::ToolKey(tid) => tid, + _ => { + return ( + StatusCode::UNAUTHORIZED, + Json(ApiError::new("UNAUTHORIZED", "Tool key required")), + ).into_response(); + } + }; + + // Can only create checkpoint for own task + if task_id_from_auth != task_id { + return ( + StatusCode::FORBIDDEN, + Json(ApiError::new("FORBIDDEN", "Can only create checkpoint for own task")), + ).into_response(); + } + + let pool = state.db_pool.as_ref().unwrap(); + + // Get task + let task = match repository::get_task(pool, task_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ).into_response(); + } + Err(e) => { + tracing::error!(error = %e, "Failed to get task"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to get task")), + ).into_response(); + } + }; + + // TODO: Implement checkpoint creation via daemon command + // For now, checkpoints should be created by the task itself via git commands + let _ = (task, request); + + ( + StatusCode::NOT_IMPLEMENTED, + Json(ApiError::new( + "NOT_IMPLEMENTED", + "Checkpoint creation via API not yet implemented. Use git commands directly in the task.", + )), + ).into_response() +} + +/// List checkpoints for a task. +#[utoipa::path( + get, + path = "/api/v1/mesh/tasks/{task_id}/checkpoints", + params( + ("task_id" = Uuid, Path, description = "Task ID") + ), + responses( + (status = 200, description = "List of checkpoints", body = CheckpointListResponse), + (status = 401, description = "Unauthorized"), + (status = 404, description = "Task not found"), + (status = 500, description = "Internal server error"), + ), + tag = "Mesh Supervisor" +)] +pub async fn list_checkpoints( + State(state): State, + Path(task_id): Path, + headers: HeaderMap, +) -> impl IntoResponse { + let auth = extract_auth(&state, &headers); + + let _task_id_from_auth = match auth { + AuthSource::ToolKey(tid) => tid, + _ => { + return ( + StatusCode::UNAUTHORIZED, + Json(ApiError::new("UNAUTHORIZED", "Tool key required")), + ).into_response(); + } + }; + + let pool = state.db_pool.as_ref().unwrap(); + + // Get checkpoints from DB + match repository::list_task_checkpoints(pool, task_id).await { + Ok(checkpoints) => { + let checkpoint_list: Vec = checkpoints + .into_iter() + .map(|c| TaskCheckpoint { + id: c.id, + task_id: c.task_id, + checkpoint_number: c.checkpoint_number, + commit_sha: c.commit_sha, + branch_name: c.branch_name, + message: c.message, + files_changed: c.files_changed, + lines_added: c.lines_added.unwrap_or(0), + lines_removed: c.lines_removed.unwrap_or(0), + created_at: c.created_at, + }) + .collect(); + + ( + StatusCode::OK, + Json(CheckpointListResponse { + task_id, + checkpoints: checkpoint_list, + }), + ).into_response() + } + Err(e) => { + tracing::error!(error = %e, "Failed to list checkpoints"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to list checkpoints")), + ).into_response() + } + } +} + +// ============================================================================= +// Git Operations - Request/Response Types +// ============================================================================= + +/// Request to create a new branch. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreateBranchRequest { + pub branch_name: String, + pub from_ref: Option, +} + +/// Response for branch creation. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreateBranchResponse { + pub success: bool, + pub branch_name: String, + pub message: String, +} + +/// Request to merge task changes. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct MergeTaskRequest { + pub target_branch: Option, + #[serde(default)] + pub squash: bool, +} + +/// Response for merge operation. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct MergeTaskResponse { + pub task_id: Uuid, + pub success: bool, + pub message: String, + pub commit_sha: Option, + pub conflicts: Option>, +} + +/// Request to create a pull request. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreatePRRequest { + pub task_id: Uuid, + pub title: String, + pub body: Option, + #[serde(default = "default_base_branch")] + pub base_branch: String, +} + +fn default_base_branch() -> String { + "main".to_string() +} + +/// Response for PR creation. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreatePRResponse { + pub task_id: Uuid, + pub success: bool, + pub message: String, + pub pr_url: Option, + pub pr_number: Option, +} + +/// Response for task diff. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct TaskDiffResponse { + pub task_id: Uuid, + pub success: bool, + pub diff: Option, + pub error: Option, +} + +// ============================================================================= +// Git Operations - Handlers +// ============================================================================= + +/// Create a new branch from supervisor's worktree. +#[utoipa::path( + post, + path = "/api/v1/mesh/supervisor/branches", + request_body = CreateBranchRequest, + responses( + (status = 201, description = "Branch created", body = CreateBranchResponse), + (status = 400, description = "Invalid request"), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - not a supervisor"), + (status = 500, description = "Internal server error"), + ), + tag = "Mesh Supervisor" +)] +pub async fn create_branch( + State(state): State, + headers: HeaderMap, + Json(request): Json, +) -> impl IntoResponse { + let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { + Ok(ids) => ids, + Err(e) => return e.into_response(), + }; + + // Find daemon running supervisor + let daemon_id = { + let pool = state.db_pool.as_ref().unwrap(); + match repository::get_task(pool, supervisor_id).await { + Ok(Some(task)) => task.daemon_id, + _ => None, + } + }; + + let Some(daemon_id) = daemon_id else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("NO_DAEMON", "Supervisor has no assigned daemon")), + ).into_response(); + }; + + // Send CreateBranch command to daemon + let cmd = DaemonCommand::CreateBranch { + task_id: supervisor_id, + branch_name: request.branch_name.clone(), + from_ref: request.from_ref, + }; + + if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { + tracing::error!(error = %e, "Failed to send CreateBranch command"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), + ).into_response(); + } + + // Note: Real implementation would wait for daemon response + // For now, return success immediately - daemon will send response via WebSocket + ( + StatusCode::CREATED, + Json(CreateBranchResponse { + success: true, + branch_name: request.branch_name, + message: "Branch creation command sent".to_string(), + }), + ).into_response() +} + +/// Merge a task's changes to a target branch. +#[utoipa::path( + post, + path = "/api/v1/mesh/supervisor/tasks/{task_id}/merge", + params( + ("task_id" = Uuid, Path, description = "Task ID to merge") + ), + request_body = MergeTaskRequest, + responses( + (status = 200, description = "Merge initiated", body = MergeTaskResponse), + (status = 400, description = "Invalid request"), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - not a supervisor"), + (status = 404, description = "Task not found"), + (status = 500, description = "Internal server error"), + ), + tag = "Mesh Supervisor" +)] +pub async fn merge_task( + State(state): State, + Path(task_id): Path, + headers: HeaderMap, + Json(request): Json, +) -> impl IntoResponse { + let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { + Ok(ids) => ids, + Err(e) => return e.into_response(), + }; + + let pool = state.db_pool.as_ref().unwrap(); + + // Get the target task + let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ).into_response(); + } + Err(e) => { + tracing::error!(error = %e, "Failed to get task"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to get task")), + ).into_response(); + } + }; + + // Get daemon running the task + let Some(daemon_id) = task.daemon_id else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")), + ).into_response(); + }; + + // Send MergeTaskToTarget command to daemon + let cmd = DaemonCommand::MergeTaskToTarget { + task_id, + target_branch: request.target_branch, + squash: request.squash, + }; + + if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { + tracing::error!(error = %e, "Failed to send MergeTaskToTarget command"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), + ).into_response(); + } + + ( + StatusCode::OK, + Json(MergeTaskResponse { + task_id, + success: true, + message: "Merge command sent".to_string(), + commit_sha: None, + conflicts: None, + }), + ).into_response() +} + +/// Create a pull request for a task's changes. +#[utoipa::path( + post, + path = "/api/v1/mesh/supervisor/pr", + request_body = CreatePRRequest, + responses( + (status = 201, description = "PR created", body = CreatePRResponse), + (status = 400, description = "Invalid request"), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - not a supervisor"), + (status = 404, description = "Task not found"), + (status = 500, description = "Internal server error"), + ), + tag = "Mesh Supervisor" +)] +pub async fn create_pr( + State(state): State, + headers: HeaderMap, + Json(request): Json, +) -> impl IntoResponse { + let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { + Ok(ids) => ids, + Err(e) => return e.into_response(), + }; + + let pool = state.db_pool.as_ref().unwrap(); + + // Get the target task + let task = match repository::get_task_for_owner(pool, request.task_id, owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ).into_response(); + } + Err(e) => { + tracing::error!(error = %e, "Failed to get task"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to get task")), + ).into_response(); + } + }; + + // Get daemon running the task + let Some(daemon_id) = task.daemon_id else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")), + ).into_response(); + }; + + // Send CreatePR command to daemon + let cmd = DaemonCommand::CreatePR { + task_id: request.task_id, + title: request.title.clone(), + body: request.body.clone(), + base_branch: request.base_branch.clone(), + }; + + if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { + tracing::error!(error = %e, "Failed to send CreatePR command"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), + ).into_response(); + } + + ( + StatusCode::CREATED, + Json(CreatePRResponse { + task_id: request.task_id, + success: true, + message: "PR creation command sent".to_string(), + pr_url: None, + pr_number: None, + }), + ).into_response() +} + +/// Get the diff for a task's changes. +#[utoipa::path( + get, + path = "/api/v1/mesh/supervisor/tasks/{task_id}/diff", + params( + ("task_id" = Uuid, Path, description = "Task ID") + ), + responses( + (status = 200, description = "Task diff", body = TaskDiffResponse), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - not a supervisor"), + (status = 404, description = "Task not found"), + (status = 500, description = "Internal server error"), + ), + tag = "Mesh Supervisor" +)] +pub async fn get_task_diff( + State(state): State, + Path(task_id): Path, + headers: HeaderMap, +) -> impl IntoResponse { + let (_supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { + Ok(ids) => ids, + Err(e) => return e.into_response(), + }; + + let pool = state.db_pool.as_ref().unwrap(); + + // Get the target task + let task = match repository::get_task_for_owner(pool, task_id, owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ).into_response(); + } + Err(e) => { + tracing::error!(error = %e, "Failed to get task"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to get task")), + ).into_response(); + } + }; + + // Get daemon running the task + let Some(daemon_id) = task.daemon_id else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")), + ).into_response(); + }; + + // Send GetTaskDiff command to daemon + let cmd = DaemonCommand::GetTaskDiff { task_id }; + + if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { + tracing::error!(error = %e, "Failed to send GetTaskDiff command"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), + ).into_response(); + } + + ( + StatusCode::OK, + Json(TaskDiffResponse { + task_id, + success: true, + diff: None, + error: Some("Diff command sent - response will be streamed".to_string()), + }), + ).into_response() +} diff --git a/makima/src/server/handlers/mod.rs b/makima/src/server/handlers/mod.rs index 8681104..8c2cb0c 100644 --- a/makima/src/server/handlers/mod.rs +++ b/makima/src/server/handlers/mod.rs @@ -2,6 +2,9 @@ pub mod api_keys; pub mod chat; +pub mod contract_chat; +pub mod contract_daemon; +pub mod contracts; pub mod file_ws; pub mod files; pub mod listen; @@ -9,6 +12,8 @@ pub mod mesh; pub mod mesh_chat; pub mod mesh_daemon; pub mod mesh_merge; +pub mod mesh_supervisor; pub mod mesh_ws; +pub mod templates; pub mod users; pub mod versions; diff --git a/makima/src/server/handlers/templates.rs b/makima/src/server/handlers/templates.rs new file mode 100644 index 0000000..868d5b4 --- /dev/null +++ b/makima/src/server/handlers/templates.rs @@ -0,0 +1,107 @@ +//! Templates API handler. + +use axum::{extract::Query, http::StatusCode, response::IntoResponse, Json}; +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; + +use crate::llm::templates; + +/// Query parameters for listing templates +#[derive(Debug, Deserialize, ToSchema)] +pub struct ListTemplatesQuery { + /// Filter by contract phase (research, specify, plan, execute, review) + pub phase: Option, +} + +/// Template summary for API response +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct TemplateSummary { + /// Template identifier + pub id: String, + /// Display name + pub name: String, + /// Contract phase this template is designed for + pub phase: String, + /// Brief description + pub description: String, + /// Number of body elements in the template + pub element_count: usize, +} + +/// Response for listing templates +#[derive(Debug, Serialize, ToSchema)] +pub struct ListTemplatesResponse { + pub templates: Vec, +} + +/// List available file templates +#[utoipa::path( + get, + path = "/api/v1/templates", + params( + ("phase" = Option, Query, description = "Filter by contract phase") + ), + responses( + (status = 200, description = "Templates retrieved successfully", body = ListTemplatesResponse) + ), + tag = "templates" +)] +pub async fn list_templates( + Query(query): Query, +) -> impl IntoResponse { + let template_list = match query.phase.as_deref() { + Some(phase) => templates::templates_for_phase(phase), + None => templates::all_templates(), + }; + + let summaries: Vec = template_list + .iter() + .map(|t| TemplateSummary { + id: t.id.clone(), + name: t.name.clone(), + phase: t.phase.clone(), + description: t.description.clone(), + element_count: t.suggested_body.len(), + }) + .collect(); + + ( + StatusCode::OK, + Json(ListTemplatesResponse { + templates: summaries, + }), + ) + .into_response() +} + +/// Get a specific template by ID +#[utoipa::path( + get, + path = "/api/v1/templates/{id}", + params( + ("id" = String, Path, description = "Template ID") + ), + responses( + (status = 200, description = "Template retrieved successfully", body = templates::FileTemplate), + (status = 404, description = "Template not found") + ), + tag = "templates" +)] +pub async fn get_template( + axum::extract::Path(id): axum::extract::Path, +) -> impl IntoResponse { + let all = templates::all_templates(); + let template = all.into_iter().find(|t| t.id == id); + + match template { + Some(t) => (StatusCode::OK, Json(serde_json::json!(t))).into_response(), + None => ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({ + "error": format!("Template '{}' not found", id) + })), + ) + .into_response(), + } +} -- cgit v1.2.3