//! Web server module for the makima audio API.
pub mod auth;
pub mod handlers;
pub mod messages;
pub mod openapi;
pub mod state;
use axum::{
http::StatusCode,
response::IntoResponse,
routing::{get, post, put},
Json, Router,
};
use serde::Serialize;
use tower_http::cors::{Any, CorsLayer};
use tower_http::trace::TraceLayer;
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
use crate::server::handlers::{api_keys, chat, contract_chat, contract_daemon, contract_discuss, contracts, daemon_download, directives, file_ws, files, history, listen, mesh, mesh_chat, mesh_daemon, mesh_merge, mesh_supervisor, mesh_ws, orders, repository_history, speak, templates, transcript_analysis, users, versions};
use crate::server::openapi::ApiDoc;
use crate::server::state::SharedState;
#[derive(Serialize)]
struct HealthResponse {
status: &'static str,
version: &'static str,
}
/// Health check endpoint for load balancers and orchestrators.
async fn health_check() -> impl IntoResponse {
(
StatusCode::OK,
Json(HealthResponse {
status: "healthy",
version: env!("CARGO_PKG_VERSION"),
}),
)
}
/// Create the axum Router with all routes configured.
pub fn make_router(state: SharedState) -> Router {
// API v1 routes
let api_v1 = Router::new()
.route("/listen", get(listen::websocket_handler))
.route("/speak", get(speak::websocket_handler))
// Listen/transcript analysis endpoints
.route("/listen/analyze", post(transcript_analysis::analyze_transcript))
.route("/listen/create-contract", post(transcript_analysis::create_contract_from_analysis))
.route("/listen/update-contract", post(transcript_analysis::update_contract_from_analysis))
.route("/files/subscribe", get(file_ws::file_subscription_handler))
.route("/files", get(files::list_files).post(files::create_file))
.route(
"/files/{id}",
get(files::get_file)
.put(files::update_file)
.delete(files::delete_file),
)
.route("/files/{id}/chat", post(chat::chat_handler))
.route("/files/{id}/sync-from-repo", post(files::sync_file_from_repo))
// Version history endpoints
.route("/files/{id}/versions", get(versions::list_versions))
.route("/files/{id}/versions/{version}", get(versions::get_version))
.route("/files/{id}/versions/restore", post(versions::restore_version))
// Mesh/task orchestration endpoints
.route(
"/mesh/tasks",
get(mesh::list_tasks).post(mesh::create_task),
)
.route(
"/mesh/tasks/{id}",
get(mesh::get_task)
.put(mesh::update_task)
.delete(mesh::delete_task),
)
.route("/mesh/tasks/{id}/subtasks", get(mesh::list_subtasks))
.route("/mesh/tasks/{id}/events", get(mesh::list_task_events))
.route("/mesh/tasks/{id}/output", get(mesh::get_task_output))
.route("/mesh/tasks/{id}/start", post(mesh::start_task))
.route("/mesh/tasks/{id}/stop", post(mesh::stop_task))
.route("/mesh/tasks/{id}/message", post(mesh::send_message))
.route("/mesh/tasks/{id}/retry-completion", post(mesh::retry_completion_action))
.route("/mesh/tasks/{id}/clone", post(mesh::clone_worktree))
.route("/mesh/tasks/{id}/worktree-info", get(mesh::get_worktree_info))
.route("/mesh/tasks/{id}/patches", get(mesh::list_task_patches))
.route("/mesh/tasks/{id}/patch-data", get(mesh::get_task_patch_data))
.route("/mesh/tasks/{id}/check-target", post(mesh::check_target_exists))
.route("/mesh/tasks/{id}/reassign", post(mesh::reassign_task))
.route("/mesh/tasks/{id}/continue", post(mesh::continue_task))
.route("/mesh/chat", post(mesh_chat::mesh_toplevel_chat_handler))
.route(
"/mesh/chat/history",
get(mesh_chat::get_chat_history).delete(mesh_chat::clear_chat_history),
)
.route("/mesh/tasks/{id}/chat", post(mesh_chat::mesh_chat_handler))
.route("/mesh/daemons", get(mesh::list_daemons))
.route("/mesh/daemons/directories", get(mesh::get_daemon_directories))
.route("/mesh/daemons/{id}", get(mesh::get_daemon))
.route("/mesh/daemons/{id}/restart", post(mesh::restart_daemon))
.route("/mesh/daemons/{id}/reauth", post(mesh::trigger_daemon_reauth))
.route("/mesh/daemons/{id}/reauth/code", post(mesh::submit_daemon_auth_code))
.route("/mesh/daemons/{id}/reauth/{request_id}/status", get(mesh::get_daemon_reauth_status))
// Merge endpoints for orchestrators
.route("/mesh/tasks/{id}/branches", get(mesh_merge::list_branches))
.route("/mesh/tasks/{id}/merge/start", post(mesh_merge::merge_start))
.route("/mesh/tasks/{id}/merge/status", get(mesh_merge::merge_status))
.route("/mesh/tasks/{id}/merge/resolve", post(mesh_merge::merge_resolve))
.route("/mesh/tasks/{id}/merge/commit", post(mesh_merge::merge_commit))
.route("/mesh/tasks/{id}/merge/abort", post(mesh_merge::merge_abort))
.route("/mesh/tasks/{id}/merge/skip", post(mesh_merge::merge_skip))
.route("/mesh/tasks/{id}/merge/check", get(mesh_merge::merge_check))
// Checkpoint endpoints
.route("/mesh/tasks/{id}/checkpoint", post(mesh_supervisor::create_checkpoint))
.route("/mesh/tasks/{id}/checkpoints", get(mesh_supervisor::list_checkpoints))
.route("/mesh/tasks/{id}/conversation", get(history::get_task_conversation))
// Resume and rewind endpoints
.route("/mesh/tasks/{id}/rewind", post(mesh::rewind_task))
.route("/mesh/tasks/{id}/fork", post(mesh::fork_task))
.route("/mesh/tasks/{id}/checkpoints/{cid}/resume", post(mesh::resume_from_checkpoint))
.route("/mesh/tasks/{id}/checkpoints/{cid}/branch", post(mesh::branch_from_checkpoint))
// Task branching endpoint
.route("/mesh/tasks/{id}/branch", post(mesh::branch_task))
// Supervisor endpoints (for supervisor.sh)
.route("/mesh/supervisor/contracts/{contract_id}/tasks", get(mesh_supervisor::list_contract_tasks))
.route("/mesh/supervisor/contracts/{contract_id}/tree", get(mesh_supervisor::get_contract_tree))
.route("/mesh/supervisor/tasks", post(mesh_supervisor::spawn_task))
.route("/mesh/supervisor/tasks/{task_id}/wait", post(mesh_supervisor::wait_for_task))
.route("/mesh/supervisor/tasks/{task_id}/read-file", post(mesh_supervisor::read_worktree_file))
// Supervisor git operations
.route("/mesh/supervisor/branches", post(mesh_supervisor::create_branch))
.route("/mesh/supervisor/tasks/{task_id}/merge", post(mesh_supervisor::merge_task))
.route("/mesh/supervisor/tasks/{task_id}/diff", get(mesh_supervisor::get_task_diff))
.route("/mesh/supervisor/pr", post(mesh_supervisor::create_pr))
// Supervisor order creation endpoint
.route("/mesh/supervisor/orders", post(mesh_supervisor::create_order_for_task))
// Supervisor question endpoints
.route("/mesh/supervisor/questions", post(mesh_supervisor::ask_question))
.route("/mesh/supervisor/questions/{question_id}/poll", get(mesh_supervisor::poll_question))
.route("/mesh/questions", get(mesh_supervisor::list_pending_questions))
.route("/mesh/questions/{question_id}/answer", post(mesh_supervisor::answer_question))
// Mesh WebSocket endpoints
.route("/mesh/tasks/subscribe", get(mesh_ws::task_subscription_handler))
.route("/mesh/daemons/connect", get(mesh_daemon::daemon_handler))
// Daemon binary download endpoints
.route("/daemon/download/platforms", get(daemon_download::list_daemon_platforms))
.route("/daemon/download/{platform}", get(daemon_download::download_daemon))
// API key management endpoints
.route(
"/auth/api-keys",
post(api_keys::create_api_key_handler)
.get(api_keys::get_api_key_handler)
.delete(api_keys::revoke_api_key_handler),
)
.route("/auth/api-keys/refresh", post(api_keys::refresh_api_key_handler))
// User account management endpoints
.route(
"/users/me",
axum::routing::delete(users::delete_account_handler),
)
.route("/users/me/password", axum::routing::put(users::change_password_handler))
.route("/users/me/email", axum::routing::put(users::change_email_handler))
// Contract endpoints
.route("/contracts/discuss", post(contract_discuss::discuss_contract_handler))
.route(
"/contracts",
get(contracts::list_contracts).post(contracts::create_contract),
)
.route(
"/contracts/{id}",
get(contracts::get_contract)
.put(contracts::update_contract)
.delete(contracts::delete_contract),
)
.route("/contracts/{id}/phase", post(contracts::change_phase))
.route("/contracts/{id}/deliverables/complete", post(contracts::mark_deliverable_complete))
.route("/contracts/{id}/events", get(contracts::get_events))
.route("/contracts/{id}/chat", post(contract_chat::contract_chat_handler))
.route(
"/contracts/{id}/chat/history",
get(contract_chat::get_contract_chat_history).delete(contract_chat::clear_contract_chat_history),
)
// Contract supervisor resume endpoints
.route("/contracts/{id}/supervisor/resume", post(mesh_supervisor::resume_supervisor))
.route("/contracts/{id}/supervisor/conversation/rewind", post(mesh_supervisor::rewind_conversation))
// Contract supervisor status endpoints
.route("/contracts/{id}/supervisor/status", get(contracts::get_supervisor_status))
.route("/contracts/{id}/supervisor/heartbeats", get(contracts::get_supervisor_heartbeats))
.route("/contracts/{id}/supervisor/sync", post(contracts::sync_supervisor))
// History endpoints
.route("/contracts/{id}/history", get(history::get_contract_history))
.route("/contracts/{id}/supervisor/conversation", get(history::get_supervisor_conversation))
// Contract daemon endpoints (for tasks to interact with contracts)
.route("/contracts/{id}/daemon/status", get(contract_daemon::get_contract_status))
.route("/contracts/{id}/daemon/checklist", get(contract_daemon::get_contract_checklist))
.route("/contracts/{id}/daemon/goals", get(contract_daemon::get_contract_goals))
.route("/contracts/{id}/daemon/report", post(contract_daemon::post_progress_report))
.route("/contracts/{id}/daemon/suggest-action", post(contract_daemon::get_suggest_action))
.route("/contracts/{id}/daemon/completion-action", post(contract_daemon::get_completion_action))
.route(
"/contracts/{id}/daemon/files",
get(contract_daemon::list_contract_files).post(contract_daemon::create_contract_file),
)
.route(
"/contracts/{id}/daemon/files/{file_id}",
get(contract_daemon::get_contract_file).put(contract_daemon::update_contract_file),
)
// Contract repository endpoints
.route("/contracts/{id}/repositories/remote", post(contracts::add_remote_repository))
.route("/contracts/{id}/repositories/local", post(contracts::add_local_repository))
.route("/contracts/{id}/repositories/managed", post(contracts::create_managed_repository))
.route(
"/contracts/{id}/repositories/{repo_id}",
axum::routing::delete(contracts::delete_repository),
)
.route(
"/contracts/{id}/repositories/{repo_id}/primary",
axum::routing::put(contracts::set_repository_primary),
)
// Contract task association endpoints
.route(
"/contracts/{id}/tasks/{task_id}",
post(contracts::add_task_to_contract).delete(contracts::remove_task_from_contract),
)
// Directive endpoints
.route(
"/directives",
get(directives::list_directives).post(directives::create_directive),
)
.route(
"/directives/{id}",
get(directives::get_directive)
.put(directives::update_directive)
.delete(directives::delete_directive),
)
.route("/directives/{id}/steps", post(directives::create_step))
.route("/directives/{id}/steps/batch", post(directives::batch_create_steps))
.route(
"/directives/{id}/steps/{step_id}",
put(directives::update_step).delete(directives::delete_step),
)
.route("/directives/{id}/start", post(directives::start_directive))
.route("/directives/{id}/pause", post(directives::pause_directive))
.route("/directives/{id}/advance", post(directives::advance_directive))
.route("/directives/{id}/steps/{step_id}/complete", post(directives::complete_step))
.route("/directives/{id}/steps/{step_id}/fail", post(directives::fail_step))
.route("/directives/{id}/steps/{step_id}/skip", post(directives::skip_step))
.route("/directives/{id}/goal", put(directives::update_goal))
.route("/directives/{id}/cleanup", post(directives::cleanup_directive))
.route("/directives/{id}/create-pr", post(directives::create_pr))
.route("/directives/{id}/pick-up-orders", post(directives::pick_up_orders))
// Order endpoints
.route(
"/orders",
get(orders::list_orders).post(orders::create_order),
)
.route(
"/orders/{id}",
get(orders::get_order)
.patch(orders::update_order)
.delete(orders::delete_order),
)
.route("/orders/{id}/link-directive", post(orders::link_to_directive))
.route("/orders/{id}/convert-to-step", post(orders::convert_to_step))
// Timeline endpoint (unified history for user)
.route("/timeline", get(history::get_timeline))
// Contract type templates (built-in only)
.route("/contract-types", get(templates::list_contract_types))
// Settings endpoints
.route(
"/settings/repository-history",
get(repository_history::list_repository_history),
)
.route(
"/settings/repository-history/suggestions",
get(repository_history::get_repository_suggestions),
)
.route(
"/settings/repository-history/{id}",
axum::routing::delete(repository_history::delete_repository_history),
)
.with_state(state);
let swagger = SwaggerUi::new("/swagger-ui")
.url("/api-docs/openapi.json", ApiDoc::openapi());
Router::new()
.route("/api/v1/healthcheck", get(health_check))
.nest("/api/v1", api_v1)
.merge(swagger)
.layer(
CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)
.allow_headers(Any),
)
.layer(TraceLayer::new_for_http())
}
/// Stale daemon cleanup interval in seconds
const DAEMON_CLEANUP_INTERVAL_SECS: u64 = 60;
/// Daemon heartbeat timeout in seconds (delete daemons older than this)
const DAEMON_HEARTBEAT_TIMEOUT_SECS: i64 = 120;
/// Anonymous task cleanup interval in seconds (24 hours)
const ANONYMOUS_TASK_CLEANUP_INTERVAL_SECS: u64 = 24 * 60 * 60;
/// Maximum age in days for anonymous tasks before cleanup
const ANONYMOUS_TASK_MAX_AGE_DAYS: i32 = 7;
/// Interval for checkpoint patch cleanup (hourly)
const CHECKPOINT_PATCH_CLEANUP_INTERVAL_SECS: u64 = 3600;
// Retry orchestrator checks for pending tasks every 30 seconds
const RETRY_ORCHESTRATOR_INTERVAL_SECS: u64 = 30;
/// Run the HTTP server with graceful shutdown support.
///
/// # Arguments
/// * `state` - Shared application state containing ML models
/// * `addr` - Address to bind to (e.g., "0.0.0.0:8080")
pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> {
// Start background cleanup tasks if database is available
if let Some(pool) = state.db_pool.clone() {
// Clone pool for each background task that needs it
let daemon_cleanup_pool = pool.clone();
let anonymous_task_cleanup_pool = pool.clone();
// Initial cleanup of any stale daemons from previous server run
match crate::db::repository::delete_stale_daemons(&pool, 0).await {
Ok(deleted) if deleted > 0 => {
tracing::info!(
deleted = deleted,
"Cleaned up stale daemons from previous server run"
);
}
Err(e) => {
tracing::warn!(error = %e, "Failed to clean up stale daemons on startup");
}
_ => {}
}
// Initial cleanup of any stale anonymous tasks
match crate::db::repository::cleanup_stale_anonymous_tasks(
&pool,
ANONYMOUS_TASK_MAX_AGE_DAYS,
).await {
Ok(deleted) if deleted > 0 => {
tracing::info!(
deleted = deleted,
max_age_days = ANONYMOUS_TASK_MAX_AGE_DAYS,
"Cleaned up stale anonymous tasks on startup"
);
}
Err(e) => {
tracing::warn!(error = %e, "Failed to clean up stale anonymous tasks on startup");
}
_ => {}
}
// Spawn periodic daemon cleanup task
tokio::spawn(async move {
let mut interval = tokio::time::interval(
std::time::Duration::from_secs(DAEMON_CLEANUP_INTERVAL_SECS)
);
loop {
interval.tick().await;
match crate::db::repository::delete_stale_daemons(
&daemon_cleanup_pool,
DAEMON_HEARTBEAT_TIMEOUT_SECS,
).await {
Ok(deleted) if deleted > 0 => {
tracing::info!(
deleted = deleted,
timeout_secs = DAEMON_HEARTBEAT_TIMEOUT_SECS,
"Deleted stale daemons"
);
}
Err(e) => {
tracing::warn!(error = %e, "Failed to delete stale daemons");
}
_ => {}
}
}
});
// Spawn periodic anonymous task cleanup task (runs daily)
tokio::spawn(async move {
let mut interval = tokio::time::interval(
std::time::Duration::from_secs(ANONYMOUS_TASK_CLEANUP_INTERVAL_SECS)
);
loop {
interval.tick().await;
match crate::db::repository::cleanup_stale_anonymous_tasks(
&anonymous_task_cleanup_pool,
ANONYMOUS_TASK_MAX_AGE_DAYS,
).await {
Ok(deleted) if deleted > 0 => {
tracing::info!(
deleted = deleted,
max_age_days = ANONYMOUS_TASK_MAX_AGE_DAYS,
"Cleaned up stale anonymous tasks"
);
}
Err(e) => {
tracing::warn!(error = %e, "Failed to clean up stale anonymous tasks");
}
_ => {}
}
}
});
// Clone pool for checkpoint patch cleanup
let checkpoint_patch_cleanup_pool = pool.clone();
// Initial cleanup of any expired checkpoint patches
match crate::db::repository::cleanup_expired_checkpoint_patches(&pool).await {
Ok(deleted) if deleted > 0 => {
tracing::info!(
deleted = deleted,
"Cleaned up expired checkpoint patches on startup"
);
}
Err(e) => {
tracing::warn!(error = %e, "Failed to clean up expired checkpoint patches on startup");
}
_ => {}
}
// Spawn periodic checkpoint patch cleanup task (runs hourly)
tokio::spawn(async move {
let mut interval = tokio::time::interval(
std::time::Duration::from_secs(CHECKPOINT_PATCH_CLEANUP_INTERVAL_SECS)
);
loop {
interval.tick().await;
match crate::db::repository::cleanup_expired_checkpoint_patches(
&checkpoint_patch_cleanup_pool,
).await {
Ok(deleted) if deleted > 0 => {
tracing::info!(
deleted = deleted,
"Cleaned up expired checkpoint patches"
);
}
Err(e) => {
tracing::warn!(error = %e, "Failed to clean up expired checkpoint patches");
}
_ => {}
}
}
});
// Clone state and pool for retry orchestrator
let retry_pool = pool.clone();
let retry_state = state.clone();
// Spawn retry orchestrator - periodically retries pending tasks on available daemons
tokio::spawn(async move {
let mut interval = tokio::time::interval(
std::time::Duration::from_secs(RETRY_ORCHESTRATOR_INTERVAL_SECS)
);
loop {
interval.tick().await;
// Get all contracts with pending tasks awaiting retry
match crate::db::repository::get_all_pending_task_contracts(&retry_pool).await {
Ok(contract_owners) => {
for (contract_id, owner_id) in contract_owners {
// Try to start a pending task for this contract
match handlers::mesh_supervisor::try_start_pending_task(
&retry_state,
contract_id,
owner_id,
).await {
Ok(Some(task)) => {
tracing::info!(
task_id = %task.id,
contract_id = %contract_id,
retry_count = task.retry_count,
"Retry orchestrator started pending task"
);
}
Ok(None) => {
// No tasks could be started (no available daemons, etc.)
}
Err(e) => {
tracing::warn!(
contract_id = %contract_id,
error = %e,
"Retry orchestrator failed to start pending task"
);
}
}
}
}
Err(e) => {
tracing::warn!(
error = %e,
"Retry orchestrator failed to query pending task contracts"
);
}
}
}
});
tracing::info!(
"Retry orchestrator started (interval: {}s)",
RETRY_ORCHESTRATOR_INTERVAL_SECS
);
// Spawn directive orchestrator - automates directive lifecycle
let directive_pool = pool.clone();
let directive_state = state.clone();
tokio::spawn(async move {
let mut orch = crate::orchestration::directive::DirectiveOrchestrator::new(
directive_pool,
directive_state,
);
let mut interval = tokio::time::interval(std::time::Duration::from_secs(15));
loop {
interval.tick().await;
if let Err(e) = orch.tick().await {
tracing::warn!(error = %e, "Directive orchestrator tick failed");
}
}
});
tracing::info!("Directive orchestrator started (interval: 15s)");
}
let app = make_router(state);
let listener = tokio::net::TcpListener::bind(addr).await?;
tracing::info!("Server listening on {}", addr);
tracing::info!("Swagger UI available at http://{}/swagger-ui", addr);
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await?;
Ok(())
}
/// Wait for shutdown signals (Ctrl+C or SIGTERM).
async fn shutdown_signal() {
let ctrl_c = async {
tokio::signal::ctrl_c()
.await
.expect("Failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("Failed to install signal handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}
tracing::info!("Shutdown signal received, starting graceful shutdown");
}