From a32dc56d2e5447ef8988cb98b8686476cc94e70c Mon Sep 17 00:00:00 2001 From: soryu Date: Tue, 23 Dec 2025 02:14:58 +0000 Subject: Add Postgres for persistence and File cabinet Migrations are local only currently, and must be run manually by setting POSTGRES_CONNECTION_URI --- makima/src/server/handlers/files.rs | 230 +++++++++++++++++++++++++++++++++++ makima/src/server/handlers/listen.rs | 82 ++++++++++++- makima/src/server/handlers/mod.rs | 1 + makima/src/server/mod.rs | 9 +- makima/src/server/openapi.rs | 22 +++- makima/src/server/state.rs | 14 ++- 6 files changed, 351 insertions(+), 7 deletions(-) create mode 100644 makima/src/server/handlers/files.rs (limited to 'makima/src/server') diff --git a/makima/src/server/handlers/files.rs b/makima/src/server/handlers/files.rs new file mode 100644 index 0000000..746d66b --- /dev/null +++ b/makima/src/server/handlers/files.rs @@ -0,0 +1,230 @@ +//! HTTP handlers for file CRUD operations. + +use axum::{ + extract::{Path, State}, + http::StatusCode, + response::IntoResponse, + Json, +}; +use uuid::Uuid; + +use crate::db::models::{CreateFileRequest, FileListResponse, FileSummary, UpdateFileRequest}; +use crate::db::repository; +use crate::server::messages::ApiError; +use crate::server::state::SharedState; + +/// List all files for the current owner. +#[utoipa::path( + get, + path = "/api/v1/files", + responses( + (status = 200, description = "List of files", body = FileListResponse), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + tag = "Files" +)] +pub async fn list_files(State(state): State) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match repository::list_files(pool).await { + Ok(files) => { + let summaries: Vec = files.into_iter().map(FileSummary::from).collect(); + let total = summaries.len() as i64; + Json(FileListResponse { + files: summaries, + total, + }) + .into_response() + } + Err(e) => { + tracing::error!("Failed to list files: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Get a single file by ID. +#[utoipa::path( + get, + path = "/api/v1/files/{id}", + params( + ("id" = Uuid, Path, description = "File ID") + ), + responses( + (status = 200, description = "File details", body = crate::db::models::File), + (status = 404, description = "File not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + tag = "Files" +)] +pub async fn get_file( + State(state): State, + Path(id): Path, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match repository::get_file(pool, id).await { + Ok(Some(file)) => Json(file).into_response(), + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "File not found")), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to get file {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Create a new file. +#[utoipa::path( + post, + path = "/api/v1/files", + request_body = CreateFileRequest, + responses( + (status = 201, description = "File created", body = crate::db::models::File), + (status = 400, description = "Invalid request", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + tag = "Files" +)] +pub async fn create_file( + State(state): State, + Json(req): Json, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match repository::create_file(pool, req).await { + Ok(file) => (StatusCode::CREATED, Json(file)).into_response(), + Err(e) => { + tracing::error!("Failed to create file: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Update an existing file. +#[utoipa::path( + put, + path = "/api/v1/files/{id}", + params( + ("id" = Uuid, Path, description = "File ID") + ), + request_body = UpdateFileRequest, + responses( + (status = 200, description = "File updated", body = crate::db::models::File), + (status = 404, description = "File not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + tag = "Files" +)] +pub async fn update_file( + State(state): State, + Path(id): Path, + Json(req): Json, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match repository::update_file(pool, id, req).await { + Ok(Some(file)) => Json(file).into_response(), + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "File not found")), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to update file {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} + +/// Delete a file. +#[utoipa::path( + delete, + path = "/api/v1/files/{id}", + params( + ("id" = Uuid, Path, description = "File ID") + ), + responses( + (status = 204, description = "File deleted"), + (status = 404, description = "File not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + tag = "Files" +)] +pub async fn delete_file( + State(state): State, + Path(id): Path, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match repository::delete_file(pool, id).await { + Ok(true) => StatusCode::NO_CONTENT.into_response(), + Ok(false) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "File not found")), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to delete file {}: {}", id, e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response() + } + } +} diff --git a/makima/src/server/handlers/listen.rs b/makima/src/server/handlers/listen.rs index bf6746c..93062f3 100644 --- a/makima/src/server/handlers/listen.rs +++ b/makima/src/server/handlers/listen.rs @@ -9,6 +9,8 @@ use tokio::sync::mpsc; use uuid::Uuid; use crate::audio::{resample_and_mixdown, TARGET_CHANNELS, TARGET_SAMPLE_RATE}; +use crate::db::models::{CreateFileRequest, TranscriptEntry, UpdateFileRequest}; +use crate::db::repository; use crate::listen::{align_speakers, samples_per_chunk, DialogueSegment, TimestampMode}; use crate::server::messages::{ AudioEncoding, ClientMessage, ServerMessage, StartMessage, TranscriptMessage, @@ -99,6 +101,11 @@ async fn handle_socket(socket: WebSocket, state: SharedState) { let mut audio_offset: f32 = 0.0; // Time offset from trimmed audio let mut finalized_segments: Vec = Vec::new(); + // File persistence state + let mut file_id: Option = None; + let mut transcript_entries: Vec = Vec::new(); + let mut transcript_counter: u32 = 0; + // Reset Sortformer state for new session { let mut sortformer = state.sortformer.lock().await; @@ -329,12 +336,52 @@ async fn handle_socket(socket: WebSocket, state: SharedState) { // Send segments with adjusted timestamps for seg in &segments { + let adjusted_start = seg.start + audio_offset; let adjusted_end = seg.end + audio_offset; if adjusted_end > last_sent_end_time { + // Create file on first transcript if database is available + if file_id.is_none() { + if let Some(ref pool) = state.db_pool { + match repository::create_file(pool, CreateFileRequest { + name: None, // Auto-generated + description: None, + transcript: vec![], + location: None, + }).await { + Ok(file) => { + file_id = Some(file.id); + tracing::info!( + session_id = %session_id, + file_id = %file.id, + "Created file for session" + ); + } + Err(e) => { + tracing::warn!( + session_id = %session_id, + error = %e, + "Failed to create file for session" + ); + } + } + } + } + + // Track transcript entry + transcript_counter += 1; + transcript_entries.push(TranscriptEntry { + id: format!("{}-{}", session_id, transcript_counter), + speaker: seg.speaker.clone(), + start: adjusted_start, + end: adjusted_end, + text: seg.text.clone(), + is_final: false, + }); + let _ = response_tx .send(ServerMessage::Transcript(TranscriptMessage { speaker: seg.speaker.clone(), - start: seg.start + audio_offset, + start: adjusted_start, end: adjusted_end, text: seg.text.clone(), is_final: false, @@ -399,6 +446,39 @@ async fn handle_socket(socket: WebSocket, state: SharedState) { } } + // Save final transcript to file if we have one + if let Some(fid) = file_id { + if let Some(ref pool) = state.db_pool { + // Mark all entries as final + for entry in &mut transcript_entries { + entry.is_final = true; + } + + match repository::update_file(pool, fid, UpdateFileRequest { + name: None, + description: None, + transcript: Some(transcript_entries.clone()), + }).await { + Ok(_) => { + tracing::info!( + session_id = %session_id, + file_id = %fid, + transcript_count = transcript_entries.len(), + "Saved final transcript to file" + ); + } + Err(e) => { + tracing::error!( + session_id = %session_id, + file_id = %fid, + error = %e, + "Failed to save final transcript to file" + ); + } + } + } + } + // Cleanup drop(response_tx); let _ = sender_task.await; diff --git a/makima/src/server/handlers/mod.rs b/makima/src/server/handlers/mod.rs index 94b0384..f249234 100644 --- a/makima/src/server/handlers/mod.rs +++ b/makima/src/server/handlers/mod.rs @@ -1,3 +1,4 @@ //! HTTP and WebSocket request handlers. +pub mod files; pub mod listen; diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index c509afa..bc3e679 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -17,7 +17,7 @@ use tower_http::trace::TraceLayer; use utoipa::OpenApi; use utoipa_swagger_ui::SwaggerUi; -use crate::server::handlers::listen; +use crate::server::handlers::{files, listen}; use crate::server::openapi::ApiDoc; use crate::server::state::SharedState; @@ -43,6 +43,13 @@ pub fn make_router(state: SharedState) -> Router { // API v1 routes let api_v1 = Router::new() .route("/listen", get(listen::websocket_handler)) + .route("/files", get(files::list_files).post(files::create_file)) + .route( + "/files/{id}", + get(files::get_file) + .put(files::update_file) + .delete(files::delete_file), + ) .with_state(state); let swagger = SwaggerUi::new("/swagger-ui") diff --git a/makima/src/server/openapi.rs b/makima/src/server/openapi.rs index 3e8c06c..b946ff3 100644 --- a/makima/src/server/openapi.rs +++ b/makima/src/server/openapi.rs @@ -2,19 +2,27 @@ use utoipa::OpenApi; -use crate::server::handlers::listen; +use crate::db::models::{ + CreateFileRequest, File, FileListResponse, FileSummary, TranscriptEntry, UpdateFileRequest, +}; +use crate::server::handlers::{files, listen}; use crate::server::messages::{ApiError, AudioEncoding, StartMessage, StopMessage, TranscriptMessage}; #[derive(OpenApi)] #[openapi( info( - title = "Makima Listen API", + title = "Makima API", version = "1.0.0", - description = "Streaming audio APIs for speech-to-text.", + description = "Streaming audio APIs for speech-to-text with persistence.", license(name = "MIT"), ), paths( listen::websocket_handler, + files::list_files, + files::get_file, + files::create_file, + files::update_file, + files::delete_file, ), components( schemas( @@ -23,10 +31,18 @@ use crate::server::messages::{ApiError, AudioEncoding, StartMessage, StopMessage StartMessage, StopMessage, TranscriptMessage, + // File schemas + File, + FileSummary, + FileListResponse, + CreateFileRequest, + UpdateFileRequest, + TranscriptEntry, ) ), tags( (name = "Listen", description = "Speech-to-text streaming endpoints"), + (name = "Files", description = "Transcript file management"), ) )] pub struct ApiDoc; diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs index 31e1518..8cdc26c 100644 --- a/makima/src/server/state.rs +++ b/makima/src/server/state.rs @@ -1,11 +1,12 @@ -//! Application state holding shared ML models. +//! Application state holding shared ML models and database pool. use std::sync::Arc; +use sqlx::PgPool; use tokio::sync::Mutex; use crate::listen::{DiarizationConfig, ParakeetEOU, ParakeetTDT, Sortformer}; -/// Shared application state containing ML models. +/// Shared application state containing ML models and database pool. /// /// Models are wrapped in `Mutex` for thread-safe mutable access during inference. pub struct AppState { @@ -15,6 +16,8 @@ pub struct AppState { pub parakeet_eou: Mutex, /// Speaker diarization model (Sortformer) pub sortformer: Mutex, + /// Optional database connection pool + pub db_pool: Option, } impl AppState { @@ -41,8 +44,15 @@ impl AppState { parakeet: Mutex::new(parakeet), parakeet_eou: Mutex::new(parakeet_eou), sortformer: Mutex::new(sortformer), + db_pool: None, }) } + + /// Set the database pool. + pub fn with_db_pool(mut self, pool: PgPool) -> Self { + self.db_pool = Some(pool); + self + } } /// Type alias for the shared application state. -- cgit v1.2.3