//! HTTP handlers for history and conversation APIs. use axum::{ extract::{Path, Query, State}, http::StatusCode, response::IntoResponse, Json, }; use uuid::Uuid; use crate::{ db::{ models::{ flexible_datetime, ContractHistoryResponse, ConversationMessage, HistoryQueryFilters, SupervisorConversationResponse, TaskConversationResponse, TaskReference, }, repository, }, server::{auth::Authenticated, messages::ApiError, state::SharedState}, }; /// Query parameters for task conversation #[derive(Debug, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct TaskConversationParams { pub include_tool_calls: Option, pub include_tool_results: Option, pub limit: Option, } /// Query parameters for timeline #[derive(Debug, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct TimelineQueryFilters { pub contract_id: Option, pub task_id: Option, pub include_subtasks: Option, #[serde(default, deserialize_with = "flexible_datetime::deserialize")] pub from: Option>, #[serde(default, deserialize_with = "flexible_datetime::deserialize")] pub to: Option>, pub limit: Option, } /// GET /api/v1/contracts/{id}/history /// Returns contract history timeline with filtering and pagination #[utoipa::path( get, path = "/api/v1/contracts/{id}/history", params( ("id" = Uuid, Path, description = "Contract ID"), ("phase" = Option, Query, description = "Filter by phase"), ("event_types" = Option, Query, description = "Filter by event types (comma-separated)"), ("from" = Option, Query, description = "Start date filter"), ("to" = Option, Query, description = "End date filter"), ("limit" = Option, Query, description = "Limit results"), ), responses( (status = 200, description = "Contract history", body = ContractHistoryResponse), (status = 401, description = "Unauthorized", body = ApiError), (status = 403, description = "Forbidden", body = ApiError), (status = 404, description = "Contract not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "History" )] pub async fn get_contract_history( State(state): State, Path(contract_id): Path, Query(filters): Query, Authenticated(auth): Authenticated, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify contract exists and user has access let contract = match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { Ok(Some(c)) => c, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Contract not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get contract {}: {}", contract_id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Get history events match repository::get_contract_history(pool, contract.id, auth.owner_id, &filters).await { Ok((events, total_count)) => { Json(ContractHistoryResponse { contract_id, entries: events, total_count, cursor: None, // TODO: implement cursor pagination }) .into_response() } Err(e) => { tracing::error!("Failed to get contract history: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response() } } } /// GET /api/v1/contracts/{id}/supervisor/conversation /// Returns full supervisor conversation with spawned task references #[utoipa::path( get, path = "/api/v1/contracts/{id}/supervisor/conversation", params( ("id" = Uuid, Path, description = "Contract ID") ), responses( (status = 200, description = "Supervisor conversation", body = SupervisorConversationResponse), (status = 401, description = "Unauthorized", body = ApiError), (status = 403, description = "Forbidden", body = ApiError), (status = 404, description = "Supervisor not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "History" )] pub async fn get_supervisor_conversation( State(state): State, Path(contract_id): Path, Authenticated(auth): Authenticated, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Get contract for phase info and ownership check let contract = match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { Ok(Some(c)) => c, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Contract not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get contract {}: {}", contract_id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Get the supervisor state let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await { Ok(Some(s)) => s, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Supervisor not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get supervisor state for {}: {}", contract_id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Parse conversation history from JSONB let messages: Vec = supervisor_state .conversation_history .as_array() .map(|arr| { arr.iter() .enumerate() .map(|(i, v)| ConversationMessage { id: i.to_string(), role: v .get("role") .and_then(|r| r.as_str()) .unwrap_or("user") .to_string(), content: v .get("content") .and_then(|c| c.as_str()) .unwrap_or("") .to_string(), timestamp: supervisor_state.last_activity, tool_calls: None, tool_name: None, tool_input: None, tool_result: None, is_error: None, token_count: None, cost_usd: None, }) .collect() }) .unwrap_or_default(); // Get spawned tasks let tasks = match repository::list_tasks_by_contract(pool, contract_id, auth.owner_id).await { Ok(t) => t, Err(e) => { tracing::warn!("Failed to get tasks for contract {}: {}", contract_id, e); Vec::new() } }; let spawned_tasks: Vec = tasks .into_iter() .filter(|t| !t.is_supervisor) .map(|t| TaskReference { task_id: t.id, task_name: t.name, status: t.status, created_at: t.created_at, completed_at: t.completed_at, }) .collect(); Json(SupervisorConversationResponse { contract_id, supervisor_task_id: supervisor_state.task_id, phase: contract.phase, last_activity: supervisor_state.last_activity, pending_task_ids: supervisor_state.pending_task_ids, messages, spawned_tasks, }) .into_response() } /// GET /api/v1/mesh/tasks/{id}/conversation /// Returns task conversation history #[utoipa::path( get, path = "/api/v1/mesh/tasks/{id}/conversation", params( ("id" = Uuid, Path, description = "Task ID"), ("include_tool_calls" = Option, Query, description = "Include tool call messages"), ("include_tool_results" = Option, Query, description = "Include tool result messages"), ("limit" = Option, Query, description = "Limit messages"), ), responses( (status = 200, description = "Task conversation", body = TaskConversationResponse), (status = 401, description = "Unauthorized", body = ApiError), (status = 403, description = "Forbidden", body = ApiError), (status = 404, description = "Task not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "History" )] pub async fn get_task_conversation( State(state): State, Path(task_id): Path, Query(params): Query, Authenticated(auth): Authenticated, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Get task and verify ownership let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await { Ok(Some(t)) => t, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Task not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get task {}: {}", task_id, e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Get conversation messages let messages = match repository::get_task_conversation( pool, task_id, params.include_tool_calls.unwrap_or(true), params.include_tool_results.unwrap_or(true), params.limit, ) .await { Ok(m) => m, Err(e) => { tracing::error!("Failed to get task conversation: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response(); } }; // Calculate totals let total_cost: f64 = messages.iter().filter_map(|m| m.cost_usd).sum(); Json(TaskConversationResponse { task_id, task_name: task.name, status: task.status, messages, total_tokens: None, total_cost: if total_cost > 0.0 { Some(total_cost) } else { None }, }) .into_response() } /// GET /api/v1/timeline /// Returns unified timeline for authenticated user #[utoipa::path( get, path = "/api/v1/timeline", params( ("contract_id" = Option, Query, description = "Filter by contract"), ("task_id" = Option, Query, description = "Filter by task"), ("include_subtasks" = Option, Query, description = "Include subtask events"), ("from" = Option, Query, description = "Start date filter"), ("to" = Option, Query, description = "End date filter"), ("limit" = Option, Query, description = "Limit results"), ), responses( (status = 200, description = "Timeline events", body = ContractHistoryResponse), (status = 401, description = "Unauthorized", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), (status = 500, description = "Internal server error", body = ApiError), ), security( ("bearer_auth" = []), ("api_key" = []) ), tag = "History" )] pub async fn get_timeline( State(state): State, Query(filters): Query, Authenticated(auth): Authenticated, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; let history_filters = HistoryQueryFilters { phase: None, event_types: None, from: filters.from, to: filters.to, limit: filters.limit, cursor: None, }; let result = if let Some(contract_id) = filters.contract_id { repository::get_contract_history(pool, contract_id, auth.owner_id, &history_filters).await } else if let Some(task_id) = filters.task_id { repository::get_task_history(pool, task_id, auth.owner_id, &history_filters).await } else { repository::get_timeline(pool, auth.owner_id, &history_filters).await }; match result { Ok((events, total_count)) => { Json(ContractHistoryResponse { contract_id: filters.contract_id.unwrap_or_default(), entries: events, total_count, cursor: None, }) .into_response() } Err(e) => { tracing::error!("Failed to get timeline: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", e.to_string())), ) .into_response() } } }