diff options
| author | soryu <soryu@soryu.co> | 2026-02-05 23:42:48 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-02-05 23:42:48 +0000 |
| commit | 88a4f15ce1310f8ee8693835be14aa5280233f17 (patch) | |
| tree | 5c1a0417e02071d2198d13478ffa85533b19f891 /makima/src/server/handlers/directives.rs | |
| parent | f1a50b80f3969d150bd1c31edde0aff05369157e (diff) | |
| download | soryu-88a4f15ce1310f8ee8693835be14aa5280233f17.tar.gz soryu-88a4f15ce1310f8ee8693835be14aa5280233f17.zip | |
Add directive-first chain system redesign
Redesigns the chain system with a directive-first architecture where
Directive is the top-level entity (the "why/what") and Chains are
generated execution plans (the "how") that can be dynamically modified.
Backend:
- Add database migration for directive system tables
- Add Directive, DirectiveChain, ChainStep, DirectiveEvent models
- Add DirectiveVerifier and DirectiveApproval models
- Add orchestration module with engine, planner, and verifier
- Add comprehensive API handlers for directives
- Add daemon CLI commands for directive management
- Add directive skill documentation
- Integrate contract completion with directive engine
- Add SSE endpoint for real-time directive events
Frontend:
- Add directives route with split-view layout
- Add 6-tab detail view (Overview, Chain, Events, Evaluations, Approvals, Verifiers)
- Add React Flow DAG visualization for chain steps
- Add SSE subscription hook for real-time event updates
- Add useDirectives and useDirectiveEventSubscription hooks
- Add directive types and API functions
Fixes:
- Fix test failures in ws/protocol, task_output, completion_gate, patch
- Fix word boundary matching in looks_like_task()
- Fix parse_last() to find actual last completion gate
- Fix create_export_patch when merge-base equals HEAD
- Clean up clippy warnings in new code
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Diffstat (limited to 'makima/src/server/handlers/directives.rs')
| -rw-r--r-- | makima/src/server/handlers/directives.rs | 1488 |
1 files changed, 1488 insertions, 0 deletions
diff --git a/makima/src/server/handlers/directives.rs b/makima/src/server/handlers/directives.rs new file mode 100644 index 0000000..6f6c3f1 --- /dev/null +++ b/makima/src/server/handlers/directives.rs @@ -0,0 +1,1488 @@ +//! API handlers for directives. +//! +//! Provides REST endpoints for managing directives, chains, steps, +//! evaluations, events, verifiers, and approvals. + +use axum::{ + extract::{Path, Query, State}, + http::StatusCode, + response::{ + sse::{Event, Sse}, + IntoResponse, + }, + Json, +}; +use futures::stream; +use serde::{Deserialize, Serialize}; +use std::convert::Infallible; +use std::time::Duration; +use uuid::Uuid; + +use crate::db::models::{ + AddStepRequest, CreateDirectiveRequest, CreateVerifierRequest, UpdateDirectiveRequest, + UpdateStepRequest, UpdateVerifierRequest, +}; +use crate::db::repository; +use crate::server::auth::Authenticated; +use crate::server::messages::ApiError; +use crate::server::state::SharedState; + +/// Query parameters for listing directives +#[derive(Debug, Deserialize)] +pub struct ListDirectivesQuery { + pub status: Option<String>, +} + +/// Query parameters for listing events +#[derive(Debug, Deserialize)] +pub struct ListEventsQuery { + pub limit: Option<i64>, +} + +/// Query parameters for listing evaluations +#[derive(Debug, Deserialize)] +pub struct ListEvaluationsQuery { + pub limit: Option<i64>, + #[serde(rename = "stepId")] + pub step_id: Option<Uuid>, +} + +/// Response for directive creation +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CreateDirectiveResponse { + pub id: Uuid, + pub title: String, + pub status: String, +} + +/// Response for approval actions +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ApprovalActionRequest { + pub response: Option<String>, +} + +// ============================================================================= +// Directive CRUD +// ============================================================================= + +/// Create a new directive +/// POST /api/v1/directives +pub async fn create_directive( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Json(req): Json<CreateDirectiveRequest>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match repository::create_directive_for_owner(pool, auth.owner_id, req).await { + Ok(directive) => Json(CreateDirectiveResponse { + id: directive.id, + title: directive.title, + status: directive.status, + }) + .into_response(), + Err(e) => { + tracing::error!("Failed to create directive: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } +} + +/// List directives for the authenticated owner +/// GET /api/v1/directives +pub async fn list_directives( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Query(params): Query<ListDirectivesQuery>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match repository::list_directives_for_owner(pool, auth.owner_id, params.status.as_deref()).await + { + Ok(directives) => Json(directives).into_response(), + Err(e) => { + tracing::error!("Failed to list directives: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } +} + +/// Get a directive with progress details +/// GET /api/v1/directives/:id +pub async fn get_directive( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match repository::get_directive_with_progress(pool, id, auth.owner_id).await { + Ok(Some(directive)) => Json(directive).into_response(), + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to get directive: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } +} + +/// Update a directive +/// PUT /api/v1/directives/:id +pub async fn update_directive( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, + Json(req): Json<UpdateDirectiveRequest>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match repository::update_directive_for_owner(pool, id, auth.owner_id, req).await { + Ok(directive) => Json(directive).into_response(), + Err(repository::RepositoryError::VersionConflict { expected, actual }) => ( + StatusCode::CONFLICT, + Json(ApiError::new( + "VERSION_CONFLICT", + &format!( + "Version conflict: expected {}, got {}", + expected, actual + ), + )), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to update directive: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } +} + +/// Archive a directive +/// DELETE /api/v1/directives/:id +pub async fn archive_directive( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + match repository::archive_directive_for_owner(pool, id, auth.owner_id).await { + Ok(true) => StatusCode::NO_CONTENT.into_response(), + Ok(false) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to archive directive: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } +} + +// ============================================================================= +// Directive Lifecycle +// ============================================================================= + +/// Start a directive (generate chain and begin execution) +/// POST /api/v1/directives/:id/start +pub async fn start_directive( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify ownership + match repository::get_directive_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response() + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } + + // Start directive via orchestration engine + let engine = crate::orchestration::DirectiveEngine::new(pool.clone()); + match engine.start_directive(id).await { + Ok(()) => { + // Return the updated directive with progress + match repository::get_directive_with_progress(pool, id, auth.owner_id).await { + Ok(Some(directive)) => Json(directive).into_response(), + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response(), + } + } + Err(e) => { + tracing::error!("Failed to start directive: {}", e); + ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("START_FAILED", &e.to_string())), + ) + .into_response() + } + } +} + +/// Pause a directive +/// POST /api/v1/directives/:id/pause +pub async fn pause_directive( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify ownership + match repository::get_directive_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response() + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } + + let engine = crate::orchestration::DirectiveEngine::new(pool.clone()); + match engine.pause_directive(id).await { + Ok(()) => match repository::get_directive(pool, id).await { + Ok(Some(directive)) => Json(directive).into_response(), + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response(), + }, + Err(e) => { + tracing::error!("Failed to pause directive: {}", e); + ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("PAUSE_FAILED", &e.to_string())), + ) + .into_response() + } + } +} + +/// Resume a paused directive +/// POST /api/v1/directives/:id/resume +pub async fn resume_directive( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify ownership + match repository::get_directive_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response() + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } + + let engine = crate::orchestration::DirectiveEngine::new(pool.clone()); + match engine.resume_directive(id).await { + Ok(()) => match repository::get_directive_with_progress(pool, id, auth.owner_id).await { + Ok(Some(directive)) => Json(directive).into_response(), + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response(), + }, + Err(e) => { + tracing::error!("Failed to resume directive: {}", e); + ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("RESUME_FAILED", &e.to_string())), + ) + .into_response() + } + } +} + +/// Stop a directive (cannot be resumed) +/// POST /api/v1/directives/:id/stop +pub async fn stop_directive( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify ownership + match repository::get_directive_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response() + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } + + let engine = crate::orchestration::DirectiveEngine::new(pool.clone()); + match engine.stop_directive(id).await { + Ok(()) => match repository::get_directive(pool, id).await { + Ok(Some(directive)) => Json(directive).into_response(), + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response(), + }, + Err(e) => { + tracing::error!("Failed to stop directive: {}", e); + ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("STOP_FAILED", &e.to_string())), + ) + .into_response() + } + } +} + +// ============================================================================= +// Chain Management +// ============================================================================= + +/// Get current chain for a directive +/// GET /api/v1/directives/:id/chain +pub async fn get_chain( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify ownership + match repository::get_directive_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response() + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } + + match repository::get_current_chain(pool, id).await { + Ok(Some(chain)) => { + match repository::list_chain_steps(pool, chain.id).await { + Ok(steps) => Json(serde_json::json!({ + "chain": chain, + "steps": steps, + })) + .into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response(), + } + } + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "No active chain")), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to get chain: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } +} + +/// Get chain graph for DAG visualization +/// GET /api/v1/directives/:id/chain/graph +pub async fn get_chain_graph( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify ownership + match repository::get_directive_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response() + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } + + // Get current chain + let chain = match repository::get_current_chain(pool, id).await { + Ok(Some(chain)) => chain, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "No active chain")), + ) + .into_response() + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + }; + + match repository::get_chain_graph(pool, chain.id).await { + Ok(graph) => Json(graph).into_response(), + Err(e) => { + tracing::error!("Failed to get chain graph: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } +} + +/// Regenerate chain (force replan) +/// POST /api/v1/directives/:id/chain/replan +pub async fn replan_chain( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify ownership + match repository::get_directive_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response() + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } + + let engine = crate::orchestration::DirectiveEngine::new(pool.clone()); + match engine.regenerate_chain(id, "Manual replan requested").await { + Ok(new_chain_id) => Json(serde_json::json!({ + "chainId": new_chain_id, + "message": "Chain regenerated successfully", + })) + .into_response(), + Err(e) => { + tracing::error!("Failed to replan chain: {}", e); + ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("REPLAN_FAILED", &e.to_string())), + ) + .into_response() + } + } +} + +// ============================================================================= +// Step Management +// ============================================================================= + +/// Add a step to the current chain +/// POST /api/v1/directives/:id/chain/steps +pub async fn add_step( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, + Json(req): Json<AddStepRequest>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify ownership + match repository::get_directive_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response() + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } + + // Get current chain + let chain = match repository::get_current_chain(pool, id).await { + Ok(Some(chain)) => chain, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "No active chain")), + ) + .into_response() + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + }; + + match repository::create_chain_step(pool, chain.id, req).await { + Ok(step) => (StatusCode::CREATED, Json(step)).into_response(), + Err(e) => { + tracing::error!("Failed to add step: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } +} + +/// Get step details +/// GET /api/v1/directives/:id/chain/steps/:step_id +pub async fn get_step( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path((id, step_id)): Path<(Uuid, Uuid)>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify ownership + match repository::get_directive_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response() + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } + + match repository::get_chain_step(pool, step_id).await { + Ok(Some(step)) => Json(step).into_response(), + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Step not found")), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to get step: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } +} + +/// Update a step +/// PUT /api/v1/directives/:id/chain/steps/:step_id +pub async fn update_step( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path((id, step_id)): Path<(Uuid, Uuid)>, + Json(req): Json<UpdateStepRequest>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify ownership + match repository::get_directive_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response() + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } + + match repository::update_chain_step(pool, step_id, req).await { + Ok(step) => Json(step).into_response(), + Err(e) => { + tracing::error!("Failed to update step: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } +} + +/// Delete a step +/// DELETE /api/v1/directives/:id/chain/steps/:step_id +pub async fn delete_step( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path((id, step_id)): Path<(Uuid, Uuid)>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify ownership + match repository::get_directive_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response() + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } + + match repository::delete_chain_step(pool, step_id).await { + Ok(true) => StatusCode::NO_CONTENT.into_response(), + Ok(false) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Step not found")), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to delete step: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } +} + +/// Skip a step +/// POST /api/v1/directives/:id/chain/steps/:step_id/skip +pub async fn skip_step( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path((id, step_id)): Path<(Uuid, Uuid)>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify ownership + match repository::get_directive_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response() + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } + + match repository::update_step_status(pool, step_id, "skipped").await { + Ok(step) => Json(step).into_response(), + Err(e) => { + tracing::error!("Failed to skip step: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } +} + +// ============================================================================= +// Evaluations +// ============================================================================= + +/// List evaluations for a directive +/// GET /api/v1/directives/:id/evaluations +pub async fn list_evaluations( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, + Query(params): Query<ListEvaluationsQuery>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify ownership + match repository::get_directive_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response() + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } + + let result = if let Some(step_id) = params.step_id { + repository::list_step_evaluations(pool, step_id).await + } else { + repository::list_directive_evaluations(pool, id, params.limit).await + }; + + match result { + Ok(evaluations) => Json(evaluations).into_response(), + Err(e) => { + tracing::error!("Failed to list evaluations: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } +} + +// ============================================================================= +// Events +// ============================================================================= + +/// List events for a directive +/// GET /api/v1/directives/:id/events +pub async fn list_events( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, + Query(params): Query<ListEventsQuery>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify ownership + match repository::get_directive_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response() + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } + + match repository::list_directive_events(pool, id, params.limit).await { + Ok(events) => Json(events).into_response(), + Err(e) => { + tracing::error!("Failed to list events: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } +} + +/// SSE stream of events for a directive +/// GET /api/v1/directives/:id/events/stream +pub async fn stream_events( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify ownership + match repository::get_directive_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response() + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } + + // Create SSE stream that polls for new events + let pool_clone = pool.clone(); + let stream = stream::unfold( + (pool_clone, id, None::<chrono::DateTime<chrono::Utc>>), + move |(pool, directive_id, last_seen)| async move { + // Wait a bit before next poll + tokio::time::sleep(Duration::from_secs(1)).await; + + // Get recent events + let events = repository::list_directive_events(&pool, directive_id, Some(10)) + .await + .unwrap_or_default(); + + // Filter to only new events + let new_events: Vec<_> = events + .into_iter() + .filter(|e| last_seen.map(|ls| e.created_at > ls).unwrap_or(true)) + .collect(); + + let new_last_seen = new_events.first().map(|e| e.created_at).or(last_seen); + + // Convert to SSE events + let sse_events: Vec<Result<Event, Infallible>> = new_events + .into_iter() + .map(|e| { + Ok(Event::default() + .event("directive_event") + .data(serde_json::to_string(&e).unwrap_or_default())) + }) + .collect(); + + Some((stream::iter(sse_events), (pool, directive_id, new_last_seen))) + }, + ); + + use futures::StreamExt; + Sse::new(stream.flatten()) + .keep_alive( + axum::response::sse::KeepAlive::new() + .interval(Duration::from_secs(15)) + .text("keepalive"), + ) + .into_response() +} + +// ============================================================================= +// Verifiers +// ============================================================================= + +/// List verifiers for a directive +/// GET /api/v1/directives/:id/verifiers +pub async fn list_verifiers( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify ownership + match repository::get_directive_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response() + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } + + match repository::list_directive_verifiers(pool, id).await { + Ok(verifiers) => Json(verifiers).into_response(), + Err(e) => { + tracing::error!("Failed to list verifiers: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } +} + +/// Add a verifier to a directive +/// POST /api/v1/directives/:id/verifiers +pub async fn add_verifier( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, + Json(req): Json<CreateVerifierRequest>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify ownership + match repository::get_directive_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response() + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } + + match repository::create_directive_verifier( + pool, + id, + &req.name, + &req.verifier_type, + req.command.as_deref(), + req.working_directory.as_deref(), + false, // auto_detect + vec![], // detect_files + req.weight.unwrap_or(1.0), + req.required.unwrap_or(false), + ) + .await + { + Ok(verifier) => (StatusCode::CREATED, Json(verifier)).into_response(), + Err(e) => { + tracing::error!("Failed to add verifier: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } +} + +/// Update a verifier +/// PUT /api/v1/directives/:id/verifiers/:verifier_id +pub async fn update_verifier( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path((id, verifier_id)): Path<(Uuid, Uuid)>, + Json(req): Json<UpdateVerifierRequest>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify ownership + match repository::get_directive_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response() + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } + + match repository::update_directive_verifier( + pool, + verifier_id, + req.enabled, + req.command.as_deref(), + req.weight, + req.required, + ) + .await + { + Ok(verifier) => Json(verifier).into_response(), + Err(e) => { + tracing::error!("Failed to update verifier: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } +} + +// ============================================================================= +// Approvals +// ============================================================================= + +/// List pending approvals for a directive +/// GET /api/v1/directives/:id/approvals +pub async fn list_approvals( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify ownership + match repository::get_directive_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response() + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } + + match repository::list_pending_approvals(pool, id).await { + Ok(approvals) => Json(approvals).into_response(), + Err(e) => { + tracing::error!("Failed to list approvals: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } +} + +/// Approve a pending approval request +/// POST /api/v1/directives/:id/approvals/:approval_id/approve +pub async fn approve_request( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path((id, approval_id)): Path<(Uuid, Uuid)>, + Json(req): Json<ApprovalActionRequest>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify ownership + match repository::get_directive_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response() + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } + + let engine = crate::orchestration::DirectiveEngine::new(pool.clone()); + match engine + .on_approval_resolved(approval_id, true, auth.owner_id) + .await + { + Ok(()) => { + match repository::resolve_approval( + pool, + approval_id, + "approved", + req.response.as_deref(), + auth.owner_id, + ) + .await + { + Ok(approval) => Json(approval).into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response(), + } + } + Err(e) => { + tracing::error!("Failed to process approval: {}", e); + ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("APPROVAL_FAILED", &e.to_string())), + ) + .into_response() + } + } +} + +/// Deny a pending approval request +/// POST /api/v1/directives/:id/approvals/:approval_id/deny +pub async fn deny_request( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path((id, approval_id)): Path<(Uuid, Uuid)>, + Json(req): Json<ApprovalActionRequest>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify ownership + match repository::get_directive_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response() + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response() + } + } + + let engine = crate::orchestration::DirectiveEngine::new(pool.clone()); + match engine + .on_approval_resolved(approval_id, false, auth.owner_id) + .await + { + Ok(()) => { + match repository::resolve_approval( + pool, + approval_id, + "denied", + req.response.as_deref(), + auth.owner_id, + ) + .await + { + Ok(approval) => Json(approval).into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", &e.to_string())), + ) + .into_response(), + } + } + Err(e) => { + tracing::error!("Failed to process denial: {}", e); + ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("DENIAL_FAILED", &e.to_string())), + ) + .into_response() + } + } +} |
