summaryrefslogblamecommitdiff
path: root/makima/src/server/handlers/history.rs
blob: bee6b023ef16c6fe8a95782d01a9ec3a8ab024a9 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13












                                                    
                                                                                                 






















                                                                                    
                                                                          
                                                    
                                                                          


































































































































































































                                                                                                         
                                                                                                  











































































































































































































                                                                                                     
//! HTTP handlers for history and conversation APIs.

use axum::{
    extract::{Path, Query, State},
    http::StatusCode,
    response::IntoResponse,
    Json,
};
use uuid::Uuid;

use crate::{
    db::{
        models::{
            flexible_datetime, ContractHistoryResponse, ConversationMessage, HistoryQueryFilters,
            SupervisorConversationResponse, TaskConversationResponse, TaskReference,
        },
        repository,
    },
    server::{auth::Authenticated, messages::ApiError, state::SharedState},
};

/// Query parameters for task conversation
#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TaskConversationParams {
    pub include_tool_calls: Option<bool>,
    pub include_tool_results: Option<bool>,
    pub limit: Option<i32>,
}

/// Query parameters for timeline
#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TimelineQueryFilters {
    pub contract_id: Option<Uuid>,
    pub task_id: Option<Uuid>,
    pub include_subtasks: Option<bool>,
    #[serde(default, deserialize_with = "flexible_datetime::deserialize")]
    pub from: Option<chrono::DateTime<chrono::Utc>>,
    #[serde(default, deserialize_with = "flexible_datetime::deserialize")]
    pub to: Option<chrono::DateTime<chrono::Utc>>,
    pub limit: Option<i32>,
}

/// GET /api/v1/contracts/{id}/history
/// Returns contract history timeline with filtering and pagination
#[utoipa::path(
    get,
    path = "/api/v1/contracts/{id}/history",
    params(
        ("id" = Uuid, Path, description = "Contract ID"),
        ("phase" = Option<String>, Query, description = "Filter by phase"),
        ("event_types" = Option<String>, Query, description = "Filter by event types (comma-separated)"),
        ("from" = Option<String>, Query, description = "Start date filter"),
        ("to" = Option<String>, Query, description = "End date filter"),
        ("limit" = Option<i32>, Query, description = "Limit results"),
    ),
    responses(
        (status = 200, description = "Contract history", body = ContractHistoryResponse),
        (status = 401, description = "Unauthorized", body = ApiError),
        (status = 403, description = "Forbidden", body = ApiError),
        (status = 404, description = "Contract not found", body = ApiError),
        (status = 503, description = "Database not configured", body = ApiError),
        (status = 500, description = "Internal server error", body = ApiError),
    ),
    security(
        ("bearer_auth" = []),
        ("api_key" = [])
    ),
    tag = "History"
)]
pub async fn get_contract_history(
    State(state): State<SharedState>,
    Path(contract_id): Path<Uuid>,
    Query(filters): Query<HistoryQueryFilters>,
    Authenticated(auth): Authenticated,
) -> impl IntoResponse {
    let Some(ref pool) = state.db_pool else {
        return (
            StatusCode::SERVICE_UNAVAILABLE,
            Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
        )
            .into_response();
    };

    // Verify contract exists and user has access
    let contract = match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
        Ok(Some(c)) => c,
        Ok(None) => {
            return (
                StatusCode::NOT_FOUND,
                Json(ApiError::new("NOT_FOUND", "Contract not found")),
            )
                .into_response();
        }
        Err(e) => {
            tracing::error!("Failed to get contract {}: {}", contract_id, e);
            return (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", e.to_string())),
            )
                .into_response();
        }
    };

    // Get history events
    match repository::get_contract_history(pool, contract.id, auth.owner_id, &filters).await {
        Ok((events, total_count)) => {
            Json(ContractHistoryResponse {
                contract_id,
                entries: events,
                total_count,
                cursor: None, // TODO: implement cursor pagination
            })
            .into_response()
        }
        Err(e) => {
            tracing::error!("Failed to get contract history: {}", e);
            (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", e.to_string())),
            )
                .into_response()
        }
    }
}

/// GET /api/v1/contracts/{id}/supervisor/conversation
/// Returns full supervisor conversation with spawned task references
#[utoipa::path(
    get,
    path = "/api/v1/contracts/{id}/supervisor/conversation",
    params(
        ("id" = Uuid, Path, description = "Contract ID")
    ),
    responses(
        (status = 200, description = "Supervisor conversation", body = SupervisorConversationResponse),
        (status = 401, description = "Unauthorized", body = ApiError),
        (status = 403, description = "Forbidden", body = ApiError),
        (status = 404, description = "Supervisor not found", body = ApiError),
        (status = 503, description = "Database not configured", body = ApiError),
        (status = 500, description = "Internal server error", body = ApiError),
    ),
    security(
        ("bearer_auth" = []),
        ("api_key" = [])
    ),
    tag = "History"
)]
pub async fn get_supervisor_conversation(
    State(state): State<SharedState>,
    Path(contract_id): Path<Uuid>,
    Authenticated(auth): Authenticated,
) -> impl IntoResponse {
    let Some(ref pool) = state.db_pool else {
        return (
            StatusCode::SERVICE_UNAVAILABLE,
            Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
        )
            .into_response();
    };

    // Get contract for phase info and ownership check
    let contract = match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
        Ok(Some(c)) => c,
        Ok(None) => {
            return (
                StatusCode::NOT_FOUND,
                Json(ApiError::new("NOT_FOUND", "Contract not found")),
            )
                .into_response();
        }
        Err(e) => {
            tracing::error!("Failed to get contract {}: {}", contract_id, e);
            return (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", e.to_string())),
            )
                .into_response();
        }
    };

    // Get the supervisor state
    let supervisor_state = match repository::get_supervisor_state(pool, contract_id).await {
        Ok(Some(s)) => s,
        Ok(None) => {
            return (
                StatusCode::NOT_FOUND,
                Json(ApiError::new("NOT_FOUND", "Supervisor not found")),
            )
                .into_response();
        }
        Err(e) => {
            tracing::error!("Failed to get supervisor state for {}: {}", contract_id, e);
            return (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", e.to_string())),
            )
                .into_response();
        }
    };

    // Parse conversation history from JSONB
    let messages: Vec<ConversationMessage> = supervisor_state
        .conversation_history
        .as_array()
        .map(|arr| {
            arr.iter()
                .enumerate()
                .map(|(i, v)| ConversationMessage {
                    id: i.to_string(),
                    role: v
                        .get("role")
                        .and_then(|r| r.as_str())
                        .unwrap_or("user")
                        .to_string(),
                    content: v
                        .get("content")
                        .and_then(|c| c.as_str())
                        .unwrap_or("")
                        .to_string(),
                    timestamp: supervisor_state.last_activity,
                    tool_calls: None,
                    tool_name: None,
                    tool_input: None,
                    tool_result: None,
                    is_error: None,
                    token_count: None,
                    cost_usd: None,
                })
                .collect()
        })
        .unwrap_or_default();

    // Get spawned tasks
    let tasks = match repository::list_tasks_by_contract(pool, contract_id, auth.owner_id).await {
        Ok(t) => t,
        Err(e) => {
            tracing::warn!("Failed to get tasks for contract {}: {}", contract_id, e);
            Vec::new()
        }
    };

    let spawned_tasks: Vec<TaskReference> = tasks
        .into_iter()
        .filter(|t| !t.is_supervisor)
        .map(|t| TaskReference {
            task_id: t.id,
            task_name: t.name,
            status: t.status,
            created_at: t.created_at,
            completed_at: t.completed_at,
        })
        .collect();

    Json(SupervisorConversationResponse {
        contract_id,
        supervisor_task_id: supervisor_state.task_id,
        phase: contract.phase,
        last_activity: supervisor_state.last_activity,
        pending_task_ids: supervisor_state.pending_task_ids,
        messages,
        spawned_tasks,
    })
    .into_response()
}

/// GET /api/v1/mesh/tasks/{id}/conversation
/// Returns task conversation history
#[utoipa::path(
    get,
    path = "/api/v1/mesh/tasks/{id}/conversation",
    params(
        ("id" = Uuid, Path, description = "Task ID"),
        ("include_tool_calls" = Option<bool>, Query, description = "Include tool call messages"),
        ("include_tool_results" = Option<bool>, Query, description = "Include tool result messages"),
        ("limit" = Option<i32>, Query, description = "Limit messages"),
    ),
    responses(
        (status = 200, description = "Task conversation", body = TaskConversationResponse),
        (status = 401, description = "Unauthorized", body = ApiError),
        (status = 403, description = "Forbidden", body = ApiError),
        (status = 404, description = "Task not found", body = ApiError),
        (status = 503, description = "Database not configured", body = ApiError),
        (status = 500, description = "Internal server error", body = ApiError),
    ),
    security(
        ("bearer_auth" = []),
        ("api_key" = [])
    ),
    tag = "History"
)]
pub async fn get_task_conversation(
    State(state): State<SharedState>,
    Path(task_id): Path<Uuid>,
    Query(params): Query<TaskConversationParams>,
    Authenticated(auth): Authenticated,
) -> impl IntoResponse {
    let Some(ref pool) = state.db_pool else {
        return (
            StatusCode::SERVICE_UNAVAILABLE,
            Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
        )
            .into_response();
    };

    // Get task and verify ownership
    let task = match repository::get_task_for_owner(pool, task_id, auth.owner_id).await {
        Ok(Some(t)) => t,
        Ok(None) => {
            return (
                StatusCode::NOT_FOUND,
                Json(ApiError::new("NOT_FOUND", "Task not found")),
            )
                .into_response();
        }
        Err(e) => {
            tracing::error!("Failed to get task {}: {}", task_id, e);
            return (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", e.to_string())),
            )
                .into_response();
        }
    };

    // Get conversation messages
    let messages = match repository::get_task_conversation(
        pool,
        task_id,
        params.include_tool_calls.unwrap_or(true),
        params.include_tool_results.unwrap_or(true),
        params.limit,
    )
    .await
    {
        Ok(m) => m,
        Err(e) => {
            tracing::error!("Failed to get task conversation: {}", e);
            return (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", e.to_string())),
            )
                .into_response();
        }
    };

    // Calculate totals
    let total_cost: f64 = messages.iter().filter_map(|m| m.cost_usd).sum();

    Json(TaskConversationResponse {
        task_id,
        task_name: task.name,
        status: task.status,
        messages,
        total_tokens: None,
        total_cost: if total_cost > 0.0 {
            Some(total_cost)
        } else {
            None
        },
    })
    .into_response()
}

/// GET /api/v1/timeline
/// Returns unified timeline for authenticated user
#[utoipa::path(
    get,
    path = "/api/v1/timeline",
    params(
        ("contract_id" = Option<Uuid>, Query, description = "Filter by contract"),
        ("task_id" = Option<Uuid>, Query, description = "Filter by task"),
        ("include_subtasks" = Option<bool>, Query, description = "Include subtask events"),
        ("from" = Option<String>, Query, description = "Start date filter"),
        ("to" = Option<String>, Query, description = "End date filter"),
        ("limit" = Option<i32>, Query, description = "Limit results"),
    ),
    responses(
        (status = 200, description = "Timeline events", body = ContractHistoryResponse),
        (status = 401, description = "Unauthorized", body = ApiError),
        (status = 503, description = "Database not configured", body = ApiError),
        (status = 500, description = "Internal server error", body = ApiError),
    ),
    security(
        ("bearer_auth" = []),
        ("api_key" = [])
    ),
    tag = "History"
)]
pub async fn get_timeline(
    State(state): State<SharedState>,
    Query(filters): Query<TimelineQueryFilters>,
    Authenticated(auth): Authenticated,
) -> impl IntoResponse {
    let Some(ref pool) = state.db_pool else {
        return (
            StatusCode::SERVICE_UNAVAILABLE,
            Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
        )
            .into_response();
    };

    let history_filters = HistoryQueryFilters {
        phase: None,
        event_types: None,
        from: filters.from,
        to: filters.to,
        limit: filters.limit,
        cursor: None,
    };

    let result = if let Some(contract_id) = filters.contract_id {
        repository::get_contract_history(pool, contract_id, auth.owner_id, &history_filters).await
    } else if let Some(task_id) = filters.task_id {
        repository::get_task_history(pool, task_id, auth.owner_id, &history_filters).await
    } else {
        repository::get_timeline(pool, auth.owner_id, &history_filters).await
    };

    match result {
        Ok((events, total_count)) => {
            Json(ContractHistoryResponse {
                contract_id: filters.contract_id.unwrap_or_default(),
                entries: events,
                total_count,
                cursor: None,
            })
            .into_response()
        }
        Err(e) => {
            tracing::error!("Failed to get timeline: {}", e);
            (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(ApiError::new("DB_ERROR", e.to_string())),
            )
                .into_response()
        }
    }
}