//! API handlers for directives. //! //! Provides REST endpoints for managing directives, chains, steps, //! evaluations, events, verifiers, and approvals. use axum::{ extract::{Path, Query, State}, http::StatusCode, response::{ sse::{Event, Sse}, IntoResponse, }, Json, }; use futures::stream; use serde::{Deserialize, Serialize}; use std::convert::Infallible; use std::time::Duration; use uuid::Uuid; use crate::db::models::{ AddStepRequest, CreateDirectiveRequest, CreateVerifierRequest, UpdateDirectiveRequest, UpdateStepRequest, UpdateVerifierRequest, }; use crate::db::repository; use crate::server::auth::Authenticated; use crate::server::messages::ApiError; use crate::server::state::SharedState; /// Query parameters for listing directives #[derive(Debug, Deserialize)] pub struct ListDirectivesQuery { pub status: Option, } /// Query parameters for listing events #[derive(Debug, Deserialize)] pub struct ListEventsQuery { pub limit: Option, } /// Query parameters for listing evaluations #[derive(Debug, Deserialize)] pub struct ListEvaluationsQuery { pub limit: Option, #[serde(rename = "stepId")] pub step_id: Option, } /// Response for directive creation #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] pub struct CreateDirectiveResponse { pub id: Uuid, pub title: String, pub status: String, } /// Response for approval actions #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub struct ApprovalActionRequest { pub response: Option, } // ============================================================================= // Directive CRUD // ============================================================================= /// Create a new directive /// POST /api/v1/directives pub async fn create_directive( State(state): State, Authenticated(auth): Authenticated, Json(req): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; match repository::create_directive_for_owner(pool, auth.owner_id, req).await { Ok(directive) => Json(CreateDirectiveResponse { id: directive.id, title: directive.title, status: directive.status, }) .into_response(), Err(e) => { tracing::error!("Failed to create directive: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } } /// List directives for the authenticated owner /// GET /api/v1/directives pub async fn list_directives( State(state): State, Authenticated(auth): Authenticated, Query(params): Query, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; match repository::list_directives_for_owner(pool, auth.owner_id, params.status.as_deref()).await { Ok(directives) => Json(directives).into_response(), Err(e) => { tracing::error!("Failed to list directives: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } } /// Get a directive with progress details /// GET /api/v1/directives/:id pub async fn get_directive( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; match repository::get_directive_with_progress(pool, id, auth.owner_id).await { Ok(Some(directive)) => Json(directive).into_response(), Ok(None) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response(), Err(e) => { tracing::error!("Failed to get directive: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } } /// Update a directive /// PUT /api/v1/directives/:id pub async fn update_directive( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, Json(req): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; match repository::update_directive_for_owner(pool, id, auth.owner_id, req).await { Ok(directive) => Json(directive).into_response(), Err(repository::RepositoryError::VersionConflict { expected, actual }) => ( StatusCode::CONFLICT, Json(ApiError::new( "VERSION_CONFLICT", &format!( "Version conflict: expected {}, got {}", expected, actual ), )), ) .into_response(), Err(e) => { tracing::error!("Failed to update directive: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } } /// Archive a directive /// DELETE /api/v1/directives/:id pub async fn archive_directive( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; match repository::archive_directive_for_owner(pool, id, auth.owner_id).await { Ok(true) => StatusCode::NO_CONTENT.into_response(), Ok(false) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response(), Err(e) => { tracing::error!("Failed to archive directive: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } } // ============================================================================= // Directive Lifecycle // ============================================================================= /// Start a directive (generate chain and begin execution) /// POST /api/v1/directives/:id/start pub async fn start_directive( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify ownership match repository::get_directive_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response() } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } // Start directive via orchestration engine let engine = crate::orchestration::DirectiveEngine::new(pool.clone()); match engine.start_directive(id).await { Ok(()) => { // Return the updated directive with progress match repository::get_directive_with_progress(pool, id, auth.owner_id).await { Ok(Some(directive)) => Json(directive).into_response(), Ok(None) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response(), } } Err(e) => { tracing::error!("Failed to start directive: {}", e); ( StatusCode::BAD_REQUEST, Json(ApiError::new("START_FAILED", &e.to_string())), ) .into_response() } } } /// Pause a directive /// POST /api/v1/directives/:id/pause pub async fn pause_directive( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify ownership match repository::get_directive_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response() } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } let engine = crate::orchestration::DirectiveEngine::new(pool.clone()); match engine.pause_directive(id).await { Ok(()) => match repository::get_directive(pool, id).await { Ok(Some(directive)) => Json(directive).into_response(), Ok(None) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response(), }, Err(e) => { tracing::error!("Failed to pause directive: {}", e); ( StatusCode::BAD_REQUEST, Json(ApiError::new("PAUSE_FAILED", &e.to_string())), ) .into_response() } } } /// Resume a paused directive /// POST /api/v1/directives/:id/resume pub async fn resume_directive( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify ownership match repository::get_directive_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response() } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } let engine = crate::orchestration::DirectiveEngine::new(pool.clone()); match engine.resume_directive(id).await { Ok(()) => match repository::get_directive_with_progress(pool, id, auth.owner_id).await { Ok(Some(directive)) => Json(directive).into_response(), Ok(None) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response(), }, Err(e) => { tracing::error!("Failed to resume directive: {}", e); ( StatusCode::BAD_REQUEST, Json(ApiError::new("RESUME_FAILED", &e.to_string())), ) .into_response() } } } /// Stop a directive (cannot be resumed) /// POST /api/v1/directives/:id/stop pub async fn stop_directive( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify ownership match repository::get_directive_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response() } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } let engine = crate::orchestration::DirectiveEngine::new(pool.clone()); match engine.stop_directive(id).await { Ok(()) => match repository::get_directive(pool, id).await { Ok(Some(directive)) => Json(directive).into_response(), Ok(None) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response(), }, Err(e) => { tracing::error!("Failed to stop directive: {}", e); ( StatusCode::BAD_REQUEST, Json(ApiError::new("STOP_FAILED", &e.to_string())), ) .into_response() } } } // ============================================================================= // Chain Management // ============================================================================= /// Get current chain for a directive /// GET /api/v1/directives/:id/chain pub async fn get_chain( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify ownership match repository::get_directive_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response() } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } match repository::get_current_chain(pool, id).await { Ok(Some(chain)) => { match repository::list_chain_steps(pool, chain.id).await { Ok(steps) => Json(serde_json::json!({ "chain": chain, "steps": steps, })) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response(), } } Ok(None) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "No active chain")), ) .into_response(), Err(e) => { tracing::error!("Failed to get chain: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } } /// Get chain graph for DAG visualization /// GET /api/v1/directives/:id/chain/graph pub async fn get_chain_graph( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify ownership match repository::get_directive_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response() } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } // Get current chain let chain = match repository::get_current_chain(pool, id).await { Ok(Some(chain)) => chain, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "No active chain")), ) .into_response() } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } }; match repository::get_chain_graph(pool, chain.id).await { Ok(graph) => Json(graph).into_response(), Err(e) => { tracing::error!("Failed to get chain graph: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } } /// Regenerate chain (force replan) /// POST /api/v1/directives/:id/chain/replan pub async fn replan_chain( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify ownership match repository::get_directive_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response() } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } let engine = crate::orchestration::DirectiveEngine::new(pool.clone()); match engine.regenerate_chain(id, "Manual replan requested").await { Ok(new_chain_id) => Json(serde_json::json!({ "chainId": new_chain_id, "message": "Chain regenerated successfully", })) .into_response(), Err(e) => { tracing::error!("Failed to replan chain: {}", e); ( StatusCode::BAD_REQUEST, Json(ApiError::new("REPLAN_FAILED", &e.to_string())), ) .into_response() } } } // ============================================================================= // Step Management // ============================================================================= /// Add a step to the current chain /// POST /api/v1/directives/:id/chain/steps pub async fn add_step( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, Json(req): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify ownership match repository::get_directive_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response() } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } // Get current chain let chain = match repository::get_current_chain(pool, id).await { Ok(Some(chain)) => chain, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "No active chain")), ) .into_response() } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } }; match repository::create_chain_step(pool, chain.id, req).await { Ok(step) => (StatusCode::CREATED, Json(step)).into_response(), Err(e) => { tracing::error!("Failed to add step: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } } /// Get step details /// GET /api/v1/directives/:id/chain/steps/:step_id pub async fn get_step( State(state): State, Authenticated(auth): Authenticated, Path((id, step_id)): Path<(Uuid, Uuid)>, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify ownership match repository::get_directive_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response() } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } match repository::get_chain_step(pool, step_id).await { Ok(Some(step)) => Json(step).into_response(), Ok(None) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Step not found")), ) .into_response(), Err(e) => { tracing::error!("Failed to get step: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } } /// Update a step /// PUT /api/v1/directives/:id/chain/steps/:step_id pub async fn update_step( State(state): State, Authenticated(auth): Authenticated, Path((id, step_id)): Path<(Uuid, Uuid)>, Json(req): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify ownership match repository::get_directive_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response() } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } match repository::update_chain_step(pool, step_id, req).await { Ok(step) => Json(step).into_response(), Err(e) => { tracing::error!("Failed to update step: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } } /// Delete a step /// DELETE /api/v1/directives/:id/chain/steps/:step_id pub async fn delete_step( State(state): State, Authenticated(auth): Authenticated, Path((id, step_id)): Path<(Uuid, Uuid)>, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify ownership match repository::get_directive_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response() } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } match repository::delete_chain_step(pool, step_id).await { Ok(true) => StatusCode::NO_CONTENT.into_response(), Ok(false) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Step not found")), ) .into_response(), Err(e) => { tracing::error!("Failed to delete step: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } } /// Skip a step /// POST /api/v1/directives/:id/chain/steps/:step_id/skip pub async fn skip_step( State(state): State, Authenticated(auth): Authenticated, Path((id, step_id)): Path<(Uuid, Uuid)>, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify ownership match repository::get_directive_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response() } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } match repository::update_step_status(pool, step_id, "skipped").await { Ok(step) => Json(step).into_response(), Err(e) => { tracing::error!("Failed to skip step: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } } // ============================================================================= // Evaluations // ============================================================================= /// List evaluations for a directive /// GET /api/v1/directives/:id/evaluations pub async fn list_evaluations( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, Query(params): Query, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify ownership match repository::get_directive_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response() } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } let result = if let Some(step_id) = params.step_id { repository::list_step_evaluations(pool, step_id).await } else { repository::list_directive_evaluations(pool, id, params.limit).await }; match result { Ok(evaluations) => Json(evaluations).into_response(), Err(e) => { tracing::error!("Failed to list evaluations: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } } // ============================================================================= // Events // ============================================================================= /// List events for a directive /// GET /api/v1/directives/:id/events pub async fn list_events( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, Query(params): Query, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify ownership match repository::get_directive_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response() } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } match repository::list_directive_events(pool, id, params.limit).await { Ok(events) => Json(events).into_response(), Err(e) => { tracing::error!("Failed to list events: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } } /// SSE stream of events for a directive /// GET /api/v1/directives/:id/events/stream pub async fn stream_events( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify ownership match repository::get_directive_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response() } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } // Create SSE stream that polls for new events let pool_clone = pool.clone(); let stream = stream::unfold( (pool_clone, id, None::>), move |(pool, directive_id, last_seen)| async move { // Wait a bit before next poll tokio::time::sleep(Duration::from_secs(1)).await; // Get recent events let events = repository::list_directive_events(&pool, directive_id, Some(10)) .await .unwrap_or_default(); // Filter to only new events let new_events: Vec<_> = events .into_iter() .filter(|e| last_seen.map(|ls| e.created_at > ls).unwrap_or(true)) .collect(); let new_last_seen = new_events.first().map(|e| e.created_at).or(last_seen); // Convert to SSE events let sse_events: Vec> = new_events .into_iter() .map(|e| { Ok(Event::default() .event("directive_event") .data(serde_json::to_string(&e).unwrap_or_default())) }) .collect(); Some((stream::iter(sse_events), (pool, directive_id, new_last_seen))) }, ); use futures::StreamExt; Sse::new(stream.flatten()) .keep_alive( axum::response::sse::KeepAlive::new() .interval(Duration::from_secs(15)) .text("keepalive"), ) .into_response() } // ============================================================================= // Verifiers // ============================================================================= /// List verifiers for a directive /// GET /api/v1/directives/:id/verifiers pub async fn list_verifiers( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify ownership match repository::get_directive_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response() } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } match repository::list_directive_verifiers(pool, id).await { Ok(verifiers) => Json(verifiers).into_response(), Err(e) => { tracing::error!("Failed to list verifiers: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } } /// Add a verifier to a directive /// POST /api/v1/directives/:id/verifiers pub async fn add_verifier( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, Json(req): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify ownership match repository::get_directive_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response() } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } match repository::create_directive_verifier( pool, id, &req.name, &req.verifier_type, req.command.as_deref(), req.working_directory.as_deref(), false, // auto_detect vec![], // detect_files req.weight.unwrap_or(1.0), req.required.unwrap_or(false), ) .await { Ok(verifier) => (StatusCode::CREATED, Json(verifier)).into_response(), Err(e) => { tracing::error!("Failed to add verifier: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } } /// Update a verifier /// PUT /api/v1/directives/:id/verifiers/:verifier_id pub async fn update_verifier( State(state): State, Authenticated(auth): Authenticated, Path((id, verifier_id)): Path<(Uuid, Uuid)>, Json(req): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify ownership match repository::get_directive_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response() } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } match repository::update_directive_verifier( pool, verifier_id, req.enabled, req.command.as_deref(), req.weight, req.required, ) .await { Ok(verifier) => Json(verifier).into_response(), Err(e) => { tracing::error!("Failed to update verifier: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } } // ============================================================================= // Approvals // ============================================================================= /// List pending approvals for a directive /// GET /api/v1/directives/:id/approvals pub async fn list_approvals( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify ownership match repository::get_directive_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response() } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } match repository::list_pending_approvals(pool, id).await { Ok(approvals) => Json(approvals).into_response(), Err(e) => { tracing::error!("Failed to list approvals: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } } /// Approve a pending approval request /// POST /api/v1/directives/:id/approvals/:approval_id/approve pub async fn approve_request( State(state): State, Authenticated(auth): Authenticated, Path((id, approval_id)): Path<(Uuid, Uuid)>, Json(req): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify ownership match repository::get_directive_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response() } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } let engine = crate::orchestration::DirectiveEngine::new(pool.clone()); match engine .on_approval_resolved(approval_id, true, auth.owner_id) .await { Ok(()) => { match repository::resolve_approval( pool, approval_id, "approved", req.response.as_deref(), auth.owner_id, ) .await { Ok(approval) => Json(approval).into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response(), } } Err(e) => { tracing::error!("Failed to process approval: {}", e); ( StatusCode::BAD_REQUEST, Json(ApiError::new("APPROVAL_FAILED", &e.to_string())), ) .into_response() } } } /// Deny a pending approval request /// POST /api/v1/directives/:id/approvals/:approval_id/deny pub async fn deny_request( State(state): State, Authenticated(auth): Authenticated, Path((id, approval_id)): Path<(Uuid, Uuid)>, Json(req): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify ownership match repository::get_directive_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response() } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response() } } let engine = crate::orchestration::DirectiveEngine::new(pool.clone()); match engine .on_approval_resolved(approval_id, false, auth.owner_id) .await { Ok(()) => { match repository::resolve_approval( pool, approval_id, "denied", req.response.as_deref(), auth.owner_id, ) .await { Ok(approval) => Json(approval).into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DB_ERROR", &e.to_string())), ) .into_response(), } } Err(e) => { tracing::error!("Failed to process denial: {}", e); ( StatusCode::BAD_REQUEST, Json(ApiError::new("DENIAL_FAILED", &e.to_string())), ) .into_response() } } }