//! Web server module for the makima audio API. pub mod auth; pub mod handlers; pub mod messages; pub mod openapi; pub mod state; use axum::{ http::StatusCode, response::IntoResponse, routing::{get, post, put}, Json, Router, }; use serde::Serialize; use tower_http::cors::{Any, CorsLayer}; use tower_http::trace::TraceLayer; use utoipa::OpenApi; use utoipa_swagger_ui::SwaggerUi; use crate::server::handlers::{api_keys, chat, contract_chat, contract_daemon, contract_discuss, contracts, daemon_download, directives, file_ws, files, history, listen, mesh, mesh_chat, mesh_daemon, mesh_merge, mesh_supervisor, mesh_ws, orders, repository_history, speak, templates, transcript_analysis, users, versions}; use crate::server::openapi::ApiDoc; use crate::server::state::SharedState; #[derive(Serialize)] struct HealthResponse { status: &'static str, version: &'static str, } /// Health check endpoint for load balancers and orchestrators. async fn health_check() -> impl IntoResponse { ( StatusCode::OK, Json(HealthResponse { status: "healthy", version: env!("CARGO_PKG_VERSION"), }), ) } /// Create the axum Router with all routes configured. pub fn make_router(state: SharedState) -> Router { // API v1 routes let api_v1 = Router::new() .route("/listen", get(listen::websocket_handler)) .route("/speak", get(speak::websocket_handler)) // Listen/transcript analysis endpoints .route("/listen/analyze", post(transcript_analysis::analyze_transcript)) .route("/listen/create-contract", post(transcript_analysis::create_contract_from_analysis)) .route("/listen/update-contract", post(transcript_analysis::update_contract_from_analysis)) .route("/files/subscribe", get(file_ws::file_subscription_handler)) .route("/files", get(files::list_files).post(files::create_file)) .route( "/files/{id}", get(files::get_file) .put(files::update_file) .delete(files::delete_file), ) .route("/files/{id}/chat", post(chat::chat_handler)) .route("/files/{id}/sync-from-repo", post(files::sync_file_from_repo)) // Version history endpoints .route("/files/{id}/versions", get(versions::list_versions)) .route("/files/{id}/versions/{version}", get(versions::get_version)) .route("/files/{id}/versions/restore", post(versions::restore_version)) // Mesh/task orchestration endpoints .route( "/mesh/tasks", get(mesh::list_tasks).post(mesh::create_task), ) .route( "/mesh/tasks/{id}", get(mesh::get_task) .put(mesh::update_task) .delete(mesh::delete_task), ) .route("/mesh/tasks/{id}/subtasks", get(mesh::list_subtasks)) .route("/mesh/tasks/{id}/events", get(mesh::list_task_events)) .route("/mesh/tasks/{id}/output", get(mesh::get_task_output)) .route("/mesh/tasks/{id}/start", post(mesh::start_task)) .route("/mesh/tasks/{id}/stop", post(mesh::stop_task)) .route("/mesh/tasks/{id}/message", post(mesh::send_message)) .route("/mesh/tasks/{id}/retry-completion", post(mesh::retry_completion_action)) .route("/mesh/tasks/{id}/clone", post(mesh::clone_worktree)) .route("/mesh/tasks/{id}/worktree-info", get(mesh::get_worktree_info)) .route("/mesh/tasks/{id}/diff", get(mesh::get_task_diff)) .route("/mesh/tasks/{id}/worktree-commit", post(mesh::commit_worktree)) .route("/mesh/tasks/{id}/patches", get(mesh::list_task_patches)) .route("/mesh/tasks/{id}/patch-data", get(mesh::get_task_patch_data)) .route("/mesh/tasks/{id}/check-target", post(mesh::check_target_exists)) .route("/mesh/tasks/{id}/reassign", post(mesh::reassign_task)) .route("/mesh/tasks/{id}/continue", post(mesh::continue_task)) .route("/mesh/chat", post(mesh_chat::mesh_toplevel_chat_handler)) .route( "/mesh/chat/history", get(mesh_chat::get_chat_history).delete(mesh_chat::clear_chat_history), ) .route("/mesh/tasks/{id}/chat", post(mesh_chat::mesh_chat_handler)) .route("/mesh/daemons", get(mesh::list_daemons)) .route("/mesh/daemons/directories", get(mesh::get_daemon_directories)) .route("/mesh/daemons/{id}", get(mesh::get_daemon)) .route("/mesh/daemons/{id}/restart", post(mesh::restart_daemon)) .route("/mesh/daemons/{id}/reauth", post(mesh::trigger_daemon_reauth)) .route("/mesh/daemons/{id}/reauth/code", post(mesh::submit_daemon_auth_code)) .route("/mesh/daemons/{id}/reauth/{request_id}/status", get(mesh::get_daemon_reauth_status)) // Merge endpoints for orchestrators .route("/mesh/tasks/{id}/branches", get(mesh_merge::list_branches)) .route("/mesh/tasks/{id}/merge/start", post(mesh_merge::merge_start)) .route("/mesh/tasks/{id}/merge/status", get(mesh_merge::merge_status)) .route("/mesh/tasks/{id}/merge/resolve", post(mesh_merge::merge_resolve)) .route("/mesh/tasks/{id}/merge/commit", post(mesh_merge::merge_commit)) .route("/mesh/tasks/{id}/merge/abort", post(mesh_merge::merge_abort)) .route("/mesh/tasks/{id}/merge/skip", post(mesh_merge::merge_skip)) .route("/mesh/tasks/{id}/merge/check", get(mesh_merge::merge_check)) // Checkpoint endpoints .route("/mesh/tasks/{id}/checkpoint", post(mesh_supervisor::create_checkpoint)) .route("/mesh/tasks/{id}/checkpoints", get(mesh_supervisor::list_checkpoints)) .route("/mesh/tasks/{id}/conversation", get(history::get_task_conversation)) // Resume and rewind endpoints .route("/mesh/tasks/{id}/rewind", post(mesh::rewind_task)) .route("/mesh/tasks/{id}/fork", post(mesh::fork_task)) .route("/mesh/tasks/{id}/checkpoints/{cid}/resume", post(mesh::resume_from_checkpoint)) .route("/mesh/tasks/{id}/checkpoints/{cid}/branch", post(mesh::branch_from_checkpoint)) // Task branching endpoint .route("/mesh/tasks/{id}/branch", post(mesh::branch_task)) // Supervisor endpoints (for supervisor.sh) .route("/mesh/supervisor/contracts/{contract_id}/tasks", get(mesh_supervisor::list_contract_tasks)) .route("/mesh/supervisor/contracts/{contract_id}/tree", get(mesh_supervisor::get_contract_tree)) .route("/mesh/supervisor/tasks", post(mesh_supervisor::spawn_task)) .route("/mesh/supervisor/tasks/{task_id}/wait", post(mesh_supervisor::wait_for_task)) .route("/mesh/supervisor/tasks/{task_id}/read-file", post(mesh_supervisor::read_worktree_file)) // Supervisor git operations .route("/mesh/supervisor/branches", post(mesh_supervisor::create_branch)) .route("/mesh/supervisor/tasks/{task_id}/merge", post(mesh_supervisor::merge_task)) .route("/mesh/supervisor/tasks/{task_id}/diff", get(mesh_supervisor::get_task_diff)) .route("/mesh/supervisor/pr", post(mesh_supervisor::create_pr)) // Supervisor order creation endpoint .route("/mesh/supervisor/orders", post(mesh_supervisor::create_order_for_task)) // Supervisor question endpoints .route("/mesh/supervisor/questions", post(mesh_supervisor::ask_question)) .route("/mesh/supervisor/questions/{question_id}/poll", get(mesh_supervisor::poll_question)) .route("/mesh/questions", get(mesh_supervisor::list_pending_questions)) .route("/mesh/questions/{question_id}/answer", post(mesh_supervisor::answer_question)) // Mesh WebSocket endpoints .route("/mesh/tasks/subscribe", get(mesh_ws::task_subscription_handler)) .route("/mesh/daemons/connect", get(mesh_daemon::daemon_handler)) // Daemon binary download endpoints .route("/daemon/download/platforms", get(daemon_download::list_daemon_platforms)) .route("/daemon/download/{platform}", get(daemon_download::download_daemon)) // API key management endpoints .route( "/auth/api-keys", post(api_keys::create_api_key_handler) .get(api_keys::get_api_key_handler) .delete(api_keys::revoke_api_key_handler), ) .route("/auth/api-keys/refresh", post(api_keys::refresh_api_key_handler)) // User account management endpoints .route( "/users/me", axum::routing::delete(users::delete_account_handler), ) .route("/users/me/password", axum::routing::put(users::change_password_handler)) .route("/users/me/email", axum::routing::put(users::change_email_handler)) // Contract endpoints .route("/contracts/discuss", post(contract_discuss::discuss_contract_handler)) .route( "/contracts", get(contracts::list_contracts).post(contracts::create_contract), ) .route( "/contracts/{id}", get(contracts::get_contract) .put(contracts::update_contract) .delete(contracts::delete_contract), ) .route("/contracts/{id}/phase", post(contracts::change_phase)) .route("/contracts/{id}/deliverables/complete", post(contracts::mark_deliverable_complete)) .route("/contracts/{id}/events", get(contracts::get_events)) .route("/contracts/{id}/chat", post(contract_chat::contract_chat_handler)) .route( "/contracts/{id}/chat/history", get(contract_chat::get_contract_chat_history).delete(contract_chat::clear_contract_chat_history), ) // Contract supervisor resume endpoints .route("/contracts/{id}/supervisor/resume", post(mesh_supervisor::resume_supervisor)) .route("/contracts/{id}/supervisor/conversation/rewind", post(mesh_supervisor::rewind_conversation)) // Contract supervisor status endpoints .route("/contracts/{id}/supervisor/status", get(contracts::get_supervisor_status)) .route("/contracts/{id}/supervisor/heartbeats", get(contracts::get_supervisor_heartbeats)) .route("/contracts/{id}/supervisor/sync", post(contracts::sync_supervisor)) // History endpoints .route("/contracts/{id}/history", get(history::get_contract_history)) .route("/contracts/{id}/supervisor/conversation", get(history::get_supervisor_conversation)) // Contract daemon endpoints (for tasks to interact with contracts) .route("/contracts/{id}/daemon/status", get(contract_daemon::get_contract_status)) .route("/contracts/{id}/daemon/checklist", get(contract_daemon::get_contract_checklist)) .route("/contracts/{id}/daemon/goals", get(contract_daemon::get_contract_goals)) .route("/contracts/{id}/daemon/report", post(contract_daemon::post_progress_report)) .route("/contracts/{id}/daemon/suggest-action", post(contract_daemon::get_suggest_action)) .route("/contracts/{id}/daemon/completion-action", post(contract_daemon::get_completion_action)) .route( "/contracts/{id}/daemon/files", get(contract_daemon::list_contract_files).post(contract_daemon::create_contract_file), ) .route( "/contracts/{id}/daemon/files/{file_id}", get(contract_daemon::get_contract_file).put(contract_daemon::update_contract_file), ) // Contract repository endpoints .route("/contracts/{id}/repositories/remote", post(contracts::add_remote_repository)) .route("/contracts/{id}/repositories/local", post(contracts::add_local_repository)) .route("/contracts/{id}/repositories/managed", post(contracts::create_managed_repository)) .route( "/contracts/{id}/repositories/{repo_id}", axum::routing::delete(contracts::delete_repository), ) .route( "/contracts/{id}/repositories/{repo_id}/primary", axum::routing::put(contracts::set_repository_primary), ) // Contract task association endpoints .route( "/contracts/{id}/tasks/{task_id}", post(contracts::add_task_to_contract).delete(contracts::remove_task_from_contract), ) // Directive endpoints .route( "/directives", get(directives::list_directives).post(directives::create_directive), ) .route( "/directives/{id}", get(directives::get_directive) .put(directives::update_directive) .delete(directives::delete_directive), ) .route("/directives/{id}/steps", post(directives::create_step)) .route("/directives/{id}/steps/batch", post(directives::batch_create_steps)) .route( "/directives/{id}/steps/{step_id}", put(directives::update_step).delete(directives::delete_step), ) .route("/directives/{id}/start", post(directives::start_directive)) .route("/directives/{id}/pause", post(directives::pause_directive)) .route("/directives/{id}/advance", post(directives::advance_directive)) .route("/directives/{id}/steps/{step_id}/complete", post(directives::complete_step)) .route("/directives/{id}/steps/{step_id}/fail", post(directives::fail_step)) .route("/directives/{id}/steps/{step_id}/skip", post(directives::skip_step)) .route("/directives/{id}/goal", put(directives::update_goal)) .route("/directives/{id}/cleanup", post(directives::cleanup_directive)) .route("/directives/{id}/create-pr", post(directives::create_pr)) .route("/directives/{id}/pick-up-orders", post(directives::pick_up_orders)) // Directive Order Group (DOG) endpoints .route( "/directives/{id}/dogs", get(directives::list_dogs).post(directives::create_dog), ) .route( "/directives/{id}/dogs/{dog_id}", get(directives::get_dog) .patch(directives::update_dog) .delete(directives::delete_dog), ) .route("/directives/{id}/dogs/{dog_id}/orders", get(directives::list_dog_orders)) .route("/directives/{id}/dogs/{dog_id}/pick-up-orders", post(directives::pick_up_dog_orders)) // Order endpoints .route( "/orders", get(orders::list_orders).post(orders::create_order), ) .route( "/orders/{id}", get(orders::get_order) .patch(orders::update_order) .delete(orders::delete_order), ) .route("/orders/{id}/link-directive", post(orders::link_to_directive)) .route("/orders/{id}/convert-to-step", post(orders::convert_to_step)) // Timeline endpoint (unified history for user) .route("/timeline", get(history::get_timeline)) // Contract type templates (built-in only) .route("/contract-types", get(templates::list_contract_types)) // Settings endpoints .route( "/settings/repository-history", get(repository_history::list_repository_history), ) .route( "/settings/repository-history/suggestions", get(repository_history::get_repository_suggestions), ) .route( "/settings/repository-history/{id}", axum::routing::delete(repository_history::delete_repository_history), ) .with_state(state); let swagger = SwaggerUi::new("/swagger-ui") .url("/api-docs/openapi.json", ApiDoc::openapi()); Router::new() .route("/api/v1/healthcheck", get(health_check)) .nest("/api/v1", api_v1) .merge(swagger) .layer( CorsLayer::new() .allow_origin(Any) .allow_methods(Any) .allow_headers(Any), ) .layer(TraceLayer::new_for_http()) } /// Stale daemon cleanup interval in seconds const DAEMON_CLEANUP_INTERVAL_SECS: u64 = 60; /// Daemon heartbeat timeout in seconds (delete daemons older than this) const DAEMON_HEARTBEAT_TIMEOUT_SECS: i64 = 120; /// Anonymous task cleanup interval in seconds (24 hours) const ANONYMOUS_TASK_CLEANUP_INTERVAL_SECS: u64 = 24 * 60 * 60; /// Maximum age in days for anonymous tasks before cleanup const ANONYMOUS_TASK_MAX_AGE_DAYS: i32 = 7; /// Interval for checkpoint patch cleanup (hourly) const CHECKPOINT_PATCH_CLEANUP_INTERVAL_SECS: u64 = 3600; // Retry orchestrator checks for pending tasks every 30 seconds const RETRY_ORCHESTRATOR_INTERVAL_SECS: u64 = 30; /// Run the HTTP server with graceful shutdown support. /// /// # Arguments /// * `state` - Shared application state containing ML models /// * `addr` - Address to bind to (e.g., "0.0.0.0:8080") pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> { // Start background cleanup tasks if database is available if let Some(pool) = state.db_pool.clone() { // Clone pool for each background task that needs it let daemon_cleanup_pool = pool.clone(); let anonymous_task_cleanup_pool = pool.clone(); // Initial cleanup of any stale daemons from previous server run match crate::db::repository::delete_stale_daemons(&pool, 0).await { Ok(deleted) if deleted > 0 => { tracing::info!( deleted = deleted, "Cleaned up stale daemons from previous server run" ); } Err(e) => { tracing::warn!(error = %e, "Failed to clean up stale daemons on startup"); } _ => {} } // Initial cleanup of any stale anonymous tasks match crate::db::repository::cleanup_stale_anonymous_tasks( &pool, ANONYMOUS_TASK_MAX_AGE_DAYS, ).await { Ok(deleted) if deleted > 0 => { tracing::info!( deleted = deleted, max_age_days = ANONYMOUS_TASK_MAX_AGE_DAYS, "Cleaned up stale anonymous tasks on startup" ); } Err(e) => { tracing::warn!(error = %e, "Failed to clean up stale anonymous tasks on startup"); } _ => {} } // Spawn periodic daemon cleanup task tokio::spawn(async move { let mut interval = tokio::time::interval( std::time::Duration::from_secs(DAEMON_CLEANUP_INTERVAL_SECS) ); loop { interval.tick().await; match crate::db::repository::delete_stale_daemons( &daemon_cleanup_pool, DAEMON_HEARTBEAT_TIMEOUT_SECS, ).await { Ok(deleted) if deleted > 0 => { tracing::info!( deleted = deleted, timeout_secs = DAEMON_HEARTBEAT_TIMEOUT_SECS, "Deleted stale daemons" ); } Err(e) => { tracing::warn!(error = %e, "Failed to delete stale daemons"); } _ => {} } } }); // Spawn periodic anonymous task cleanup task (runs daily) tokio::spawn(async move { let mut interval = tokio::time::interval( std::time::Duration::from_secs(ANONYMOUS_TASK_CLEANUP_INTERVAL_SECS) ); loop { interval.tick().await; match crate::db::repository::cleanup_stale_anonymous_tasks( &anonymous_task_cleanup_pool, ANONYMOUS_TASK_MAX_AGE_DAYS, ).await { Ok(deleted) if deleted > 0 => { tracing::info!( deleted = deleted, max_age_days = ANONYMOUS_TASK_MAX_AGE_DAYS, "Cleaned up stale anonymous tasks" ); } Err(e) => { tracing::warn!(error = %e, "Failed to clean up stale anonymous tasks"); } _ => {} } } }); // Clone pool for checkpoint patch cleanup let checkpoint_patch_cleanup_pool = pool.clone(); // Initial cleanup of any expired checkpoint patches match crate::db::repository::cleanup_expired_checkpoint_patches(&pool).await { Ok(deleted) if deleted > 0 => { tracing::info!( deleted = deleted, "Cleaned up expired checkpoint patches on startup" ); } Err(e) => { tracing::warn!(error = %e, "Failed to clean up expired checkpoint patches on startup"); } _ => {} } // Spawn periodic checkpoint patch cleanup task (runs hourly) tokio::spawn(async move { let mut interval = tokio::time::interval( std::time::Duration::from_secs(CHECKPOINT_PATCH_CLEANUP_INTERVAL_SECS) ); loop { interval.tick().await; match crate::db::repository::cleanup_expired_checkpoint_patches( &checkpoint_patch_cleanup_pool, ).await { Ok(deleted) if deleted > 0 => { tracing::info!( deleted = deleted, "Cleaned up expired checkpoint patches" ); } Err(e) => { tracing::warn!(error = %e, "Failed to clean up expired checkpoint patches"); } _ => {} } } }); // Clone state and pool for retry orchestrator let retry_pool = pool.clone(); let retry_state = state.clone(); // Spawn retry orchestrator - periodically retries pending tasks on available daemons tokio::spawn(async move { let mut interval = tokio::time::interval( std::time::Duration::from_secs(RETRY_ORCHESTRATOR_INTERVAL_SECS) ); loop { interval.tick().await; // Get all contracts with pending tasks awaiting retry match crate::db::repository::get_all_pending_task_contracts(&retry_pool).await { Ok(contract_owners) => { for (contract_id, owner_id) in contract_owners { // Try to start a pending task for this contract match handlers::mesh_supervisor::try_start_pending_task( &retry_state, contract_id, owner_id, ).await { Ok(Some(task)) => { tracing::info!( task_id = %task.id, contract_id = %contract_id, retry_count = task.retry_count, "Retry orchestrator started pending task" ); } Ok(None) => { // No tasks could be started (no available daemons, etc.) } Err(e) => { tracing::warn!( contract_id = %contract_id, error = %e, "Retry orchestrator failed to start pending task" ); } } } } Err(e) => { tracing::warn!( error = %e, "Retry orchestrator failed to query pending task contracts" ); } } } }); tracing::info!( "Retry orchestrator started (interval: {}s)", RETRY_ORCHESTRATOR_INTERVAL_SECS ); // Spawn directive orchestrator - automates directive lifecycle let directive_pool = pool.clone(); let directive_state = state.clone(); tokio::spawn(async move { let mut orch = crate::orchestration::directive::DirectiveOrchestrator::new( directive_pool, directive_state, ); let mut interval = tokio::time::interval(std::time::Duration::from_secs(15)); loop { interval.tick().await; if let Err(e) = orch.tick().await { tracing::warn!(error = %e, "Directive orchestrator tick failed"); } } }); tracing::info!("Directive orchestrator started (interval: 15s)"); } let app = make_router(state); let listener = tokio::net::TcpListener::bind(addr).await?; tracing::info!("Server listening on {}", addr); tracing::info!("Swagger UI available at http://{}/swagger-ui", addr); axum::serve(listener, app) .with_graceful_shutdown(shutdown_signal()) .await?; Ok(()) } /// Wait for shutdown signals (Ctrl+C or SIGTERM). async fn shutdown_signal() { let ctrl_c = async { tokio::signal::ctrl_c() .await .expect("Failed to install Ctrl+C handler"); }; #[cfg(unix)] let terminate = async { tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) .expect("Failed to install signal handler") .recv() .await; }; #[cfg(not(unix))] let terminate = std::future::pending::<()>(); tokio::select! { _ = ctrl_c => {}, _ = terminate => {}, } tracing::info!("Shutdown signal received, starting graceful shutdown"); }