summaryrefslogtreecommitdiff
path: root/makima/src/server/handlers/history.rs
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/server/handlers/history.rs')
-rw-r--r--makima/src/server/handlers/history.rs438
1 files changed, 438 insertions, 0 deletions
diff --git a/makima/src/server/handlers/history.rs b/makima/src/server/handlers/history.rs
new file mode 100644
index 0000000..b3dec97
--- /dev/null
+++ b/makima/src/server/handlers/history.rs
@@ -0,0 +1,438 @@
+//! HTTP handlers for history and conversation APIs.
+
+use axum::{
+ extract::{Path, Query, State},
+ http::StatusCode,
+ response::IntoResponse,
+ Json,
+};
+use uuid::Uuid;
+
+use crate::{
+ db::{
+ models::{
+ ContractHistoryResponse, ConversationMessage, HistoryQueryFilters,
+ SupervisorConversationResponse, TaskConversationResponse, TaskReference,
+ },
+ repository,
+ },
+ server::{auth::Authenticated, messages::ApiError, state::SharedState},
+};
+
+/// Query parameters for task conversation
+#[derive(Debug, serde::Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct TaskConversationParams {
+ pub include_tool_calls: Option<bool>,
+ pub include_tool_results: Option<bool>,
+ pub limit: Option<i32>,
+}
+
+/// Query parameters for timeline
+#[derive(Debug, serde::Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct TimelineQueryFilters {
+ pub contract_id: Option<Uuid>,
+ pub task_id: Option<Uuid>,
+ pub include_subtasks: Option<bool>,
+ pub from: Option<chrono::DateTime<chrono::Utc>>,
+ pub to: Option<chrono::DateTime<chrono::Utc>>,
+ pub limit: Option<i32>,
+}
+
+/// GET /api/v1/contracts/{id}/history
+/// Returns contract history timeline with filtering and pagination
+#[utoipa::path(
+ get,
+ path = "/api/v1/contracts/{id}/history",
+ params(
+ ("id" = Uuid, Path, description = "Contract ID"),
+ ("phase" = Option<String>, Query, description = "Filter by phase"),
+ ("event_types" = Option<String>, Query, description = "Filter by event types (comma-separated)"),
+ ("from" = Option<String>, Query, description = "Start date filter"),
+ ("to" = Option<String>, Query, description = "End date filter"),
+ ("limit" = Option<i32>, Query, description = "Limit results"),
+ ),
+ responses(
+ (status = 200, description = "Contract history", body = ContractHistoryResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 403, description = "Forbidden", body = ApiError),
+ (status = 404, description = "Contract not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "History"
+)]
+pub async fn get_contract_history(
+ State(state): State<SharedState>,
+ Path(contract_id): Path<Uuid>,
+ Query(filters): Query<HistoryQueryFilters>,
+ Authenticated(auth): Authenticated,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Verify contract exists and user has access
+ let contract = match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
+ Ok(Some(c)) => c,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Contract not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get contract {}: {}", contract_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get history events
+ match repository::get_contract_history(pool, contract.id, auth.owner_id, &filters).await {
+ Ok((events, total_count)) => {
+ Json(ContractHistoryResponse {
+ contract_id,
+ entries: events,
+ total_count,
+ cursor: None, // TODO: implement cursor pagination
+ })
+ .into_response()
+ }
+ Err(e) => {
+ tracing::error!("Failed to get contract history: {}", e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ }
+}
+
+/// GET /api/v1/contracts/{id}/supervisor/conversation
+/// Returns full supervisor conversation with spawned task references
+#[utoipa::path(
+ get,
+ path = "/api/v1/contracts/{id}/supervisor/conversation",
+ params(
+ ("id" = Uuid, Path, description = "Contract ID")
+ ),
+ responses(
+ (status = 200, description = "Supervisor conversation", body = SupervisorConversationResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 403, description = "Forbidden", body = ApiError),
+ (status = 404, description = "Supervisor not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "History"
+)]
+pub async fn get_supervisor_conversation(
+ State(state): State<SharedState>,
+ Path(contract_id): Path<Uuid>,
+ Authenticated(auth): Authenticated,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get contract for phase info and ownership check
+ let contract = match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
+ Ok(Some(c)) => c,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Contract not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get contract {}: {}", contract_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get the supervisor state
+ let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await {
+ Ok(Some(s)) => s,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Supervisor not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get supervisor state for {}: {}", contract_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Parse conversation history from JSONB
+ let messages: Vec<ConversationMessage> = supervisor_state
+ .conversation_history
+ .as_array()
+ .map(|arr| {
+ arr.iter()
+ .enumerate()
+ .map(|(i, v)| ConversationMessage {
+ id: i.to_string(),
+ role: v
+ .get("role")
+ .and_then(|r| r.as_str())
+ .unwrap_or("user")
+ .to_string(),
+ content: v
+ .get("content")
+ .and_then(|c| c.as_str())
+ .unwrap_or("")
+ .to_string(),
+ timestamp: supervisor_state.last_activity,
+ tool_calls: None,
+ tool_name: None,
+ tool_input: None,
+ tool_result: None,
+ is_error: None,
+ token_count: None,
+ cost_usd: None,
+ })
+ .collect()
+ })
+ .unwrap_or_default();
+
+ // Get spawned tasks
+ let tasks = match repository::list_contract_tasks(pool, contract_id).await {
+ Ok(t) => t,
+ Err(e) => {
+ tracing::warn!("Failed to get tasks for contract {}: {}", contract_id, e);
+ Vec::new()
+ }
+ };
+
+ let spawned_tasks: Vec<TaskReference> = tasks
+ .into_iter()
+ .filter(|t| !t.is_supervisor)
+ .map(|t| TaskReference {
+ task_id: t.id,
+ task_name: t.name,
+ status: t.status,
+ created_at: t.created_at,
+ completed_at: t.completed_at,
+ })
+ .collect();
+
+ Json(SupervisorConversationResponse {
+ contract_id,
+ supervisor_task_id: supervisor_state.task_id,
+ phase: contract.phase,
+ last_activity: supervisor_state.last_activity,
+ pending_task_ids: supervisor_state.pending_task_ids,
+ messages,
+ spawned_tasks,
+ })
+ .into_response()
+}
+
+/// GET /api/v1/mesh/tasks/{id}/conversation
+/// Returns task conversation history
+#[utoipa::path(
+ get,
+ path = "/api/v1/mesh/tasks/{id}/conversation",
+ params(
+ ("id" = Uuid, Path, description = "Task ID"),
+ ("include_tool_calls" = Option<bool>, Query, description = "Include tool call messages"),
+ ("include_tool_results" = Option<bool>, Query, description = "Include tool result messages"),
+ ("limit" = Option<i32>, Query, description = "Limit messages"),
+ ),
+ responses(
+ (status = 200, description = "Task conversation", body = TaskConversationResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 403, description = "Forbidden", body = ApiError),
+ (status = 404, description = "Task not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "History"
+)]
+pub async fn get_task_conversation(
+ State(state): State<SharedState>,
+ Path(task_id): Path<Uuid>,
+ Query(params): Query<TaskConversationParams>,
+ Authenticated(auth): Authenticated,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get task and verify ownership
+ let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task {}: {}", task_id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get conversation messages
+ let messages = match repository::get_task_conversation(
+ pool,
+ task_id,
+ params.include_tool_calls.unwrap_or(true),
+ params.include_tool_results.unwrap_or(true),
+ params.limit,
+ )
+ .await
+ {
+ Ok(m) => m,
+ Err(e) => {
+ tracing::error!("Failed to get task conversation: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Calculate totals
+ let total_cost: f64 = messages.iter().filter_map(|m| m.cost_usd).sum();
+
+ Json(TaskConversationResponse {
+ task_id,
+ task_name: task.name,
+ status: task.status,
+ messages,
+ total_tokens: None,
+ total_cost: if total_cost > 0.0 {
+ Some(total_cost)
+ } else {
+ None
+ },
+ })
+ .into_response()
+}
+
+/// GET /api/v1/timeline
+/// Returns unified timeline for authenticated user
+#[utoipa::path(
+ get,
+ path = "/api/v1/timeline",
+ params(
+ ("contract_id" = Option<Uuid>, Query, description = "Filter by contract"),
+ ("task_id" = Option<Uuid>, Query, description = "Filter by task"),
+ ("include_subtasks" = Option<bool>, Query, description = "Include subtask events"),
+ ("from" = Option<String>, Query, description = "Start date filter"),
+ ("to" = Option<String>, Query, description = "End date filter"),
+ ("limit" = Option<i32>, Query, description = "Limit results"),
+ ),
+ responses(
+ (status = 200, description = "Timeline events", body = ContractHistoryResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "History"
+)]
+pub async fn get_timeline(
+ State(state): State<SharedState>,
+ Query(filters): Query<TimelineQueryFilters>,
+ Authenticated(auth): Authenticated,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ let history_filters = HistoryQueryFilters {
+ phase: None,
+ event_types: None,
+ from: filters.from,
+ to: filters.to,
+ limit: filters.limit,
+ cursor: None,
+ };
+
+ let result = if let Some(contract_id) = filters.contract_id {
+ repository::get_contract_history(pool, contract_id, auth.owner_id, &history_filters).await
+ } else if let Some(task_id) = filters.task_id {
+ repository::get_task_history(pool, task_id, auth.owner_id, &history_filters).await
+ } else {
+ repository::get_timeline(pool, auth.owner_id, &history_filters).await
+ };
+
+ match result {
+ Ok((events, total_count)) => {
+ Json(ContractHistoryResponse {
+ contract_id: filters.contract_id.unwrap_or_default(),
+ entries: events,
+ total_count,
+ cursor: None,
+ })
+ .into_response()
+ }
+ Err(e) => {
+ tracing::error!("Failed to get timeline: {}", e);
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response()
+ }
+ }
+}