//! HTTP handlers for history and conversation APIs.
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::IntoResponse,
Json,
};
use uuid::Uuid;
use crate::{
db::{
models::{
flexible_datetime, ContractHistoryResponse, ConversationMessage, HistoryQueryFilters,
SupervisorConversationResponse, TaskConversationResponse, TaskReference,
},
repository,
},
server::{auth::Authenticated, messages::ApiError, state::SharedState},
};
/// Query parameters for task conversation
#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TaskConversationParams {
pub include_tool_calls: Option<bool>,
pub include_tool_results: Option<bool>,
pub limit: Option<i32>,
}
/// Query parameters for timeline
#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TimelineQueryFilters {
pub contract_id: Option<Uuid>,
pub task_id: Option<Uuid>,
pub include_subtasks: Option<bool>,
#[serde(default, deserialize_with = "flexible_datetime::deserialize")]
pub from: Option<chrono::DateTime<chrono::Utc>>,
#[serde(default, deserialize_with = "flexible_datetime::deserialize")]
pub to: Option<chrono::DateTime<chrono::Utc>>,
pub limit: Option<i32>,
}
/// GET /api/v1/contracts/{id}/history
/// Returns contract history timeline with filtering and pagination
#[utoipa::path(
get,
path = "/api/v1/contracts/{id}/history",
params(
("id" = Uuid, Path, description = "Contract ID"),
("phase" = Option<String>, Query, description = "Filter by phase"),
("event_types" = Option<String>, Query, description = "Filter by event types (comma-separated)"),
("from" = Option<String>, Query, description = "Start date filter"),
("to" = Option<String>, Query, description = "End date filter"),
("limit" = Option<i32>, Query, description = "Limit results"),
),
responses(
(status = 200, description = "Contract history", body = ContractHistoryResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 403, description = "Forbidden", body = ApiError),
(status = 404, description = "Contract not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "History"
)]
pub async fn get_contract_history(
State(state): State<SharedState>,
Path(contract_id): Path<Uuid>,
Query(filters): Query<HistoryQueryFilters>,
Authenticated(auth): Authenticated,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Verify contract exists and user has access
let contract = match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
Ok(Some(c)) => c,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Contract not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get contract {}: {}", contract_id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Get history events
match repository::get_contract_history(pool, contract.id, auth.owner_id, &filters).await {
Ok((events, total_count)) => {
Json(ContractHistoryResponse {
contract_id,
entries: events,
total_count,
cursor: None, // TODO: implement cursor pagination
})
.into_response()
}
Err(e) => {
tracing::error!("Failed to get contract history: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}
/// GET /api/v1/contracts/{id}/supervisor/conversation
/// Returns full supervisor conversation with spawned task references
#[utoipa::path(
get,
path = "/api/v1/contracts/{id}/supervisor/conversation",
params(
("id" = Uuid, Path, description = "Contract ID")
),
responses(
(status = 200, description = "Supervisor conversation", body = SupervisorConversationResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 403, description = "Forbidden", body = ApiError),
(status = 404, description = "Supervisor not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "History"
)]
pub async fn get_supervisor_conversation(
State(state): State<SharedState>,
Path(contract_id): Path<Uuid>,
Authenticated(auth): Authenticated,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get contract for phase info and ownership check
let contract = match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
Ok(Some(c)) => c,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Contract not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get contract {}: {}", contract_id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Get the supervisor state
let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await {
Ok(Some(s)) => s,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Supervisor not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get supervisor state for {}: {}", contract_id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Parse conversation history from JSONB
let messages: Vec<ConversationMessage> = supervisor_state
.conversation_history
.as_array()
.map(|arr| {
arr.iter()
.enumerate()
.map(|(i, v)| ConversationMessage {
id: i.to_string(),
role: v
.get("role")
.and_then(|r| r.as_str())
.unwrap_or("user")
.to_string(),
content: v
.get("content")
.and_then(|c| c.as_str())
.unwrap_or("")
.to_string(),
timestamp: supervisor_state.last_activity,
tool_calls: None,
tool_name: None,
tool_input: None,
tool_result: None,
is_error: None,
token_count: None,
cost_usd: None,
})
.collect()
})
.unwrap_or_default();
// Get spawned tasks
let tasks = match repository::list_tasks_by_contract(pool, contract_id, auth.owner_id).await {
Ok(t) => t,
Err(e) => {
tracing::warn!("Failed to get tasks for contract {}: {}", contract_id, e);
Vec::new()
}
};
let spawned_tasks: Vec<TaskReference> = tasks
.into_iter()
.filter(|t| !t.is_supervisor)
.map(|t| TaskReference {
task_id: t.id,
task_name: t.name,
status: t.status,
created_at: t.created_at,
completed_at: t.completed_at,
})
.collect();
Json(SupervisorConversationResponse {
contract_id,
supervisor_task_id: supervisor_state.task_id,
phase: contract.phase,
last_activity: supervisor_state.last_activity,
pending_task_ids: supervisor_state.pending_task_ids,
messages,
spawned_tasks,
})
.into_response()
}
/// GET /api/v1/mesh/tasks/{id}/conversation
/// Returns task conversation history
#[utoipa::path(
get,
path = "/api/v1/mesh/tasks/{id}/conversation",
params(
("id" = Uuid, Path, description = "Task ID"),
("include_tool_calls" = Option<bool>, Query, description = "Include tool call messages"),
("include_tool_results" = Option<bool>, Query, description = "Include tool result messages"),
("limit" = Option<i32>, Query, description = "Limit messages"),
),
responses(
(status = 200, description = "Task conversation", body = TaskConversationResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 403, description = "Forbidden", body = ApiError),
(status = 404, description = "Task not found", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "History"
)]
pub async fn get_task_conversation(
State(state): State<SharedState>,
Path(task_id): Path<Uuid>,
Query(params): Query<TaskConversationParams>,
Authenticated(auth): Authenticated,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
// Get task and verify ownership
let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
Ok(Some(t)) => t,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ApiError::new("NOT_FOUND", "Task not found")),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get task {}: {}", task_id, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Get conversation messages
let messages = match repository::get_task_conversation(
pool,
task_id,
params.include_tool_calls.unwrap_or(true),
params.include_tool_results.unwrap_or(true),
params.limit,
)
.await
{
Ok(m) => m,
Err(e) => {
tracing::error!("Failed to get task conversation: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
};
// Calculate totals
let total_cost: f64 = messages.iter().filter_map(|m| m.cost_usd).sum();
Json(TaskConversationResponse {
task_id,
task_name: task.name,
status: task.status,
messages,
total_tokens: None,
total_cost: if total_cost > 0.0 {
Some(total_cost)
} else {
None
},
})
.into_response()
}
/// GET /api/v1/timeline
/// Returns unified timeline for authenticated user
#[utoipa::path(
get,
path = "/api/v1/timeline",
params(
("contract_id" = Option<Uuid>, Query, description = "Filter by contract"),
("task_id" = Option<Uuid>, Query, description = "Filter by task"),
("include_subtasks" = Option<bool>, Query, description = "Include subtask events"),
("from" = Option<String>, Query, description = "Start date filter"),
("to" = Option<String>, Query, description = "End date filter"),
("limit" = Option<i32>, Query, description = "Limit results"),
),
responses(
(status = 200, description = "Timeline events", body = ContractHistoryResponse),
(status = 401, description = "Unauthorized", body = ApiError),
(status = 503, description = "Database not configured", body = ApiError),
(status = 500, description = "Internal server error", body = ApiError),
),
security(
("bearer_auth" = []),
("api_key" = [])
),
tag = "History"
)]
pub async fn get_timeline(
State(state): State<SharedState>,
Query(filters): Query<TimelineQueryFilters>,
Authenticated(auth): Authenticated,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
)
.into_response();
};
let history_filters = HistoryQueryFilters {
phase: None,
event_types: None,
from: filters.from,
to: filters.to,
limit: filters.limit,
cursor: None,
};
let result = if let Some(contract_id) = filters.contract_id {
repository::get_contract_history(pool, contract_id, auth.owner_id, &history_filters).await
} else if let Some(task_id) = filters.task_id {
repository::get_task_history(pool, task_id, auth.owner_id, &history_filters).await
} else {
repository::get_timeline(pool, auth.owner_id, &history_filters).await
};
match result {
Ok((events, total_count)) => {
Json(ContractHistoryResponse {
contract_id: filters.contract_id.unwrap_or_default(),
entries: events,
total_count,
cursor: None,
})
.into_response()
}
Err(e) => {
tracing::error!("Failed to get timeline: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response()
}
}
}