use std::{convert::Infallible, env, net::SocketAddr, time::Duration}; use axum::{ extract::{ws::{Message, WebSocket, WebSocketUpgrade}, State}, http::Method, response::IntoResponse, routing::{get, post}, Json, Router, }; use axum::response::sse::{Event, KeepAlive, Sse}; use serde::{Deserialize, Serialize}; use tower_http::{cors::{Any, CorsLayer}, trace::TraceLayer}; use tracing::info; use tracing_subscriber::EnvFilter; use tokio_stream::{wrappers::IntervalStream, StreamExt as _}; use futures::StreamExt as _; // for websocket split/next use futures::stream::Stream; // for SSE return type #[derive(Clone, Default)] struct AppState {} #[derive(Serialize)] struct HealthResponse { status: &'static str, } #[derive(Serialize)] struct HelloResponse { message: &'static str, } #[derive(Deserialize, Serialize)] struct EchoPayload { #[serde(flatten)] rest: serde_json::Value, } #[tokio::main] async fn main() { // Logging setup let filter = EnvFilter::try_from_default_env() .unwrap_or_else(|_| EnvFilter::new("info,tower_http=info,axum::rejection=trace")); tracing_subscriber::fmt() .with_env_filter(filter) .with_target(false) .compact() .init(); // Shared app state (extend as needed) let state = AppState::default(); // CORS to allow local frontend dev at 5173 and others let cors = CorsLayer::new() .allow_origin(Any) .allow_methods([Method::GET, Method::POST, Method::OPTIONS]) .allow_headers(Any); // Router let app = Router::new() .route("/", get(root)) .route("/health", get(health)) .route("/api/hello", get(hello)) .route("/api/echo", post(echo)) .route("/api/stream/transcript", get(stream_transcript_sse)) .route("/ws", get(ws_handler)) .with_state(state) .layer(cors) .layer(TraceLayer::new_for_http()); // Bind address let port = env::var("PORT").ok().and_then(|p| p.parse().ok()).unwrap_or(8080); let addr = SocketAddr::from(([0, 0, 0, 0], port)); info!(%addr, "starting soryu backend"); let listener = tokio::net::TcpListener::bind(addr).await.expect("bind port"); axum::serve(listener, app).await.expect("server error"); } async fn root() -> impl IntoResponse { Json(serde_json::json!({ "service": "soryu-backend", "endpoints": [ "/health", "/api/hello", "/api/echo", "/api/stream/transcript", "/ws" ], })) } async fn health() -> impl IntoResponse { Json(HealthResponse { status: "ok" }) } async fn hello(State(_state): State) -> impl IntoResponse { Json(HelloResponse { message: "Hello from Soryu backend" }) } async fn echo(Json(body): Json) -> impl IntoResponse { Json(EchoPayload { rest: body }) } // --- // Streaming transcript (SSE) skeleton // Provides a low-latency server-sent events stream that emits example // transcript chunks every ~250ms. async fn stream_transcript_sse(State(_state): State) -> Sse>> { const SAMPLE_LINES: &[&str] = &[ "speaker_a: hey there, can you hear me?", "speaker_b: loud and clear — let's begin.", "speaker_a: streaming transcript looks smooth so far.", "speaker_b: agreed, latency feels low.", "speaker_a: wrapping up the demo now.", ]; let mut idx = 0usize; let interval = tokio::time::interval(Duration::from_millis(250)); let stream = IntervalStream::new(interval).map(move |_| { let line = SAMPLE_LINES[idx % SAMPLE_LINES.len()]; idx += 1; let data = serde_json::json!({ "type": "transcript_chunk", "text": line, "ts_ms": chrono::Utc::now().timestamp_millis(), }); Ok(Event::default().json_data(&data).unwrap()) }); Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(10))) } // --- // WebSocket transcript stream skeleton // Sends example transcript messages on connect; ignores inbound messages. async fn ws_handler(ws: WebSocketUpgrade, State(_state): State) -> impl IntoResponse { ws.on_upgrade(handle_socket) } async fn handle_socket(mut socket: WebSocket) { const SAMPLE_LINES: &[&str] = &[ "speaker_a: hey there, can you hear me?", "speaker_b: loud and clear — let's begin.", "speaker_a: streaming transcript looks smooth so far.", "speaker_b: agreed, latency feels low.", "speaker_a: wrapping up the demo now.", ]; // Spawn a task to drain inbound messages (optional for skeleton) let mut recv_socket = socket.split().1; tokio::spawn(async move { while let Some(Ok(_msg)) = recv_socket.next().await { // Ignore inbound for skeleton; handle pings/acks here if needed } }); // Send a small transcript stream for line in SAMPLE_LINES { let payload = serde_json::json!({ "type": "transcript_chunk", "text": line, "ts_ms": chrono::Utc::now().timestamp_millis(), }) .to_string(); if socket.send(Message::Text(payload)).await.is_err() { return; } tokio::time::sleep(Duration::from_millis(250)).await; } let done = serde_json::json!({"type":"done"}).to_string(); let _ = socket.send(Message::Text(done)).await; }